./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/4,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/4]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 -ignore_xref([
32 behaviour_info/1, remove_archive/4
33 ]).
34
35 %% ----------------------------------------------------------------------
36 %% Imports
37
38 %% Other
39 -import(mod_mam_utils,
40 [maybe_min/2,
41 maybe_max/2,
42 bare_jid/1,
43 full_jid/1
44 ]).
45
46 -include("jlib.hrl").
47 -include("mongoose_rsm.hrl").
48
49 -callback encode(binary()) -> binary().
50 -callback decode(binary()) -> binary().
51
52 -record(mam_ca_filter, {
53 user_jid,
54 with_jid,
55 remote_jid,
56 start_id,
57 end_id
58 }).
59
60 -record(mam_message, {
61 id :: non_neg_integer(),
62 user_jid :: binary(),
63 remote_jid :: binary() | undefined,
64 from_jid :: binary() | undefined,
65 with_jid = <<>> :: binary(),
66 message :: binary() | undefined
67 }).
68
69 %% ----------------------------------------------------------------------
70 %% Types
71
72 -type filter() :: #mam_ca_filter{}.
73 -type message_id() :: non_neg_integer().
74 -type host_type() :: mongooseim:host_type().
75
76 %% ----------------------------------------------------------------------
77 %% gen_mod callbacks
78 %% Starting and stopping functions for users' archives
79
80 start(HostType, Opts) ->
81
:-(
start_pm(HostType, Opts).
82
83 stop(HostType) ->
84
:-(
stop_pm(HostType).
85
86 %% ----------------------------------------------------------------------
87 %% Add hooks for mod_mam
88
89 start_pm(HostType, _Opts) ->
90
:-(
case gen_mod:get_module_opt(HostType, ?MODULE, no_writer, false) of
91 true ->
92
:-(
ok;
93 false ->
94
:-(
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_message, 50)
95 end,
96
:-(
ejabberd_hooks:add(mam_archive_size, HostType, ?MODULE, archive_size, 50),
97
:-(
ejabberd_hooks:add(mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50),
98
:-(
ejabberd_hooks:add(mam_remove_archive, HostType, ?MODULE, remove_archive, 50),
99
:-(
ejabberd_hooks:add(get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50),
100
:-(
ok.
101
102 stop_pm(HostType) ->
103
:-(
case gen_mod:get_module_opt(HostType, ?MODULE, no_writer, false) of
104 true ->
105
:-(
ok;
106 false ->
107
:-(
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_message, 50)
108 end,
109
:-(
ejabberd_hooks:delete(mam_archive_size, HostType, ?MODULE, archive_size, 50),
110
:-(
ejabberd_hooks:delete(mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50),
111
:-(
ejabberd_hooks:delete(mam_remove_archive, HostType, ?MODULE, remove_archive, 50),
112
:-(
ejabberd_hooks:delete(get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50),
113
:-(
ok.
114
115 %% ----------------------------------------------------------------------
116 %% mongoose_cassandra_worker callbacks
117
118 prepared_queries() ->
119 [
120 {insert_offset_hint_query, insert_offset_hint_query_cql()},
121 {prev_offset_query, prev_offset_query_cql()},
122 {insert_query, insert_query_cql()},
123 {fetch_user_messages_query, fetch_user_messages_cql()},
124 {select_for_removal_query, select_for_removal_query_cql()},
125 {remove_archive_query, remove_archive_query_cql()},
126 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
127 ]
128
:-(
++ extract_messages_queries()
129 ++ extract_messages_r_queries()
130 ++ calc_count_queries()
131 ++ list_message_ids_queries().
132
133 %% ----------------------------------------------------------------------
134 %% Internal functions and callbacks
135
136 archive_size(Size, HostType, _UserID, UserJID) when is_integer(Size) ->
137
:-(
Borders = Start = End = WithJID = undefined,
138
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
139
:-(
calc_count(pool_name(HostType), UserJID, HostType, Filter).
140
141
142 %% ----------------------------------------------------------------------
143 %% INSERT MESSAGE
144
145 insert_query_cql() ->
146
:-(
"INSERT INTO mam_message "
147 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
148 "VALUES (?, ?, ?, ?, ?, ?)".
149
150 archive_message(_Result, HostType, Params) ->
151
:-(
try
152
:-(
archive_message2(Params, HostType)
153 catch _Type:Reason ->
154
:-(
{error, Reason}
155 end.
156
157 archive_message2(#{message_id := MessID,
158 local_jid := LocJID,
159 remote_jid := RemJID,
160 source_jid := SrcJID,
161 packet := Packet}, HostType) ->
162
:-(
BLocJID = bare_jid(LocJID),
163
:-(
BRemBareJID = bare_jid(RemJID),
164
:-(
BRemFullJID = full_jid(RemJID),
165
:-(
BSrcJID = full_jid(SrcJID),
166
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
167
:-(
Message = #mam_message{
168 id = MessID,
169 user_jid = BLocJID,
170 from_jid = BSrcJID,
171 remote_jid = BRemFullJID,
172 message = BPacket
173 },
174
:-(
WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
175
:-(
Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
176
:-(
write_messages(HostType, LocJID, Messages).
177
178 write_messages(HostType, UserJID, Messages) ->
179
:-(
MultiParams = [message_to_params(M) || M <- Messages],
180
:-(
mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
181 ?MODULE, insert_query, MultiParams).
182
183 message_to_params(#mam_message{
184 id = MessID,
185 user_jid = BLocJID,
186 from_jid = BSrcJID,
187 remote_jid = BRemJID,
188 with_jid = BWithJID,
189 message = BPacket
190 }) ->
191
:-(
#{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
192 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
193
194 %% ----------------------------------------------------------------------
195 %% REMOVE ARCHIVE
196
197 remove_archive_query_cql() ->
198
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
199
200 remove_archive_offsets_query_cql() ->
201
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
202
203 select_for_removal_query_cql() ->
204
:-(
"SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
205
206 remove_archive(Acc, HostType, _UserID, UserJID) ->
207
:-(
remove_archive(HostType, UserJID),
208
:-(
Acc.
209
210 remove_archive(HostType, UserJID) ->
211
:-(
PoolName = pool_name(HostType),
212
:-(
BUserJID = bare_jid(UserJID),
213
:-(
Params = #{user_jid => BUserJID},
214 %% Wait until deleted
215
:-(
DeleteFun =
216 fun(Rows, _AccIn) ->
217
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
218 remove_archive_query, Rows),
219
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
220 remove_archive_offsets_query, Rows)
221 end,
222
223
:-(
mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
224 select_for_removal_query, Params, DeleteFun, []).
225 %% ----------------------------------------------------------------------
226 %% SELECT MESSAGES
227
228 -spec lookup_messages(Result :: any(), HostType :: host_type(), Params :: map()) ->
229 {ok, mod_mam:lookup_result()}.
230 lookup_messages({error, _Reason} = Result, _HostType, _Params) ->
231
:-(
Result;
232 lookup_messages(_Result, _HostType, #{search_text := <<_/binary>>}) ->
233
:-(
{error, 'not-supported'};
234 lookup_messages(_Result, HostType,
235 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
236 start_ts := Start, end_ts := End, with_jid := WithJID,
237 search_text := undefined, page_size := PageSize,
238 is_simple := IsSimple}) ->
239
:-(
try
240
:-(
lookup_messages2(pool_name(HostType), HostType,
241 UserJID, RSM, Borders,
242 Start, End, WithJID,
243 PageSize, IsSimple)
244 catch _Type:Reason:S ->
245
:-(
{error, {Reason, S}}
246 end.
247
248 lookup_messages2(PoolName, HostType,
249 UserJID = #jid{}, RSM, Borders,
250 Start, End, WithJID,
251 PageSize, _IsSimple = true) ->
252 %% Simple query without calculating offset and total count
253
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
254
:-(
lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
255 lookup_messages2(PoolName, HostType,
256 UserJID = #jid{}, RSM, Borders,
257 Start, End, WithJID,
258 PageSize, _IsSimple) ->
259 %% Query with offset calculation
260 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
261 %% Not all queries are optimal. You would like to disable something for production
262 %% once you know how you will call bd
263
:-(
Strategy = rsm_to_strategy(RSM),
264
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
265
:-(
case Strategy of
266 last_page ->
267
:-(
lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
268 by_offset ->
269
:-(
lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
270 first_page ->
271
:-(
lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
272 before_id ->
273
:-(
lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
274 after_id ->
275
:-(
lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
276 end.
277
278 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
279
:-(
last_page;
280 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
281
:-(
first_page;
282 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
283
:-(
by_offset;
284 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
285
:-(
before_id;
286 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
287
:-(
after_id;
288 rsm_to_strategy(#rsm_in{}) ->
289
:-(
first_page;
290 rsm_to_strategy(undefined) ->
291
:-(
first_page.
292
293 lookup_messages_simple(PoolName, HostType, UserJID,
294 #rsm_in{direction = aft, id = ID},
295 PageSize, Filter) ->
296 %% Get last rows from result set
297
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
298
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
299 lookup_messages_simple(PoolName, HostType, UserJID,
300 #rsm_in{direction = before, id = ID},
301 PageSize, Filter) ->
302
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
303
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
304 lookup_messages_simple(PoolName, HostType, UserJID,
305 #rsm_in{direction = undefined, index = Offset},
306 PageSize, Filter) ->
307 %% Apply offset
308
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
309 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
310
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
311 false),
312
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
313 lookup_messages_simple(PoolName, HostType, UserJID,
314 _,
315 PageSize, Filter) ->
316
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
317
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
318
319 lookup_messages_last_page(PoolName, HostType, UserJID,
320 #rsm_in{direction = before, id = undefined},
321 0, Filter) ->
322 %% Last page
323
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
324
:-(
{ok, {TotalCount, TotalCount, []}};
325 lookup_messages_last_page(PoolName, HostType, UserJID,
326 #rsm_in{direction = before, id = undefined},
327 PageSize, Filter) ->
328 %% Last page
329
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
330
:-(
MessageRowsCount = length(MessageRows),
331
:-(
case MessageRowsCount < PageSize of
332 true ->
333
:-(
{ok, {MessageRowsCount, 0,
334 rows_to_uniform_format(HostType, MessageRows)}};
335 false ->
336
:-(
FirstID = row_to_message_id(hd(MessageRows)),
337
:-(
Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
338
:-(
{ok, {Offset + MessageRowsCount, Offset,
339 rows_to_uniform_format(HostType, MessageRows)}}
340 end.
341
342 lookup_messages_by_offset(PoolName, HostType, UserJID,
343 #rsm_in{direction = undefined, index = Offset},
344 0, Filter) when is_integer(Offset) ->
345 %% By offset
346
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
347
:-(
{ok, {TotalCount, Offset, []}};
348 lookup_messages_by_offset(PoolName, HostType, UserJID,
349 #rsm_in{direction = undefined, index = Offset},
350 PageSize, Filter) when is_integer(Offset) ->
351 %% By offset
352
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
353
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
354 false),
355
:-(
MessageRowsCount = length(MessageRows),
356
:-(
case MessageRowsCount < PageSize of
357 true ->
358
:-(
{ok, {Offset + MessageRowsCount, Offset,
359 rows_to_uniform_format(HostType, MessageRows)}};
360 false ->
361
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
362
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
363
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
364 rows_to_uniform_format(HostType, MessageRows)}}
365 end.
366
367 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
368 %% First page, just count
369
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
370
:-(
{ok, {TotalCount, 0, []}};
371 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
372 %% First page
373
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
374
:-(
MessageRowsCount = length(MessageRows),
375
:-(
case MessageRowsCount < PageSize of
376 true ->
377 %% Total number of messages is less than one page
378
:-(
{ok, {MessageRowsCount, 0,
379 rows_to_uniform_format(HostType, MessageRows)}};
380 false ->
381
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
382
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
383
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
384 rows_to_uniform_format(HostType, MessageRows)}}
385 end.
386
387 lookup_messages_before_id(PoolName, HostType, UserJID,
388 RSM = #rsm_in{direction = before, id = ID},
389 PageSize, Filter) ->
390
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
391
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
392
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
393 PageSize + 1, true),
394
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
395
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
396
397 lookup_messages_after_id(PoolName, HostType, UserJID,
398 RSM = #rsm_in{direction = aft, id = ID},
399 PageSize, Filter) ->
400
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
401
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
402
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
403 PageSize + 1, false),
404
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
405
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
406
407
408 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
409
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
410
411 before_id(undefined, Filter) ->
412
:-(
Filter;
413 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
414
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
415
416 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
417
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
418
419 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
420
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
421
422
423 rows_to_uniform_format(HostType, MessageRows) ->
424
:-(
[row_to_uniform_format(HostType, Row) || Row <- MessageRows].
425
426 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
427
:-(
SrcJID = jid:from_binary(FromJID),
428
:-(
Packet = stored_binary_to_packet(HostType, Msg),
429
:-(
#{id => MsgID, jid => SrcJID, packet => Packet}.
430
431 row_to_message_id(#{id := MsgID}) ->
432
:-(
MsgID.
433
434 -spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
435 host_type(), jid:jid()) ->
436 ejabberd_gen_mam_archive:mam_pm_gdpr_data().
437 get_mam_pm_gdpr_data(Acc, HostType, JID) ->
438
:-(
BinJID = jid:to_binary(jid:to_lower(JID)),
439
:-(
FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
440
:-(
Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
441
:-(
Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
442
:-(
Messages ++ Acc.
443
444 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
445
:-(
{Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
446
447 %% Offset is not supported
448 %% Each record is a tuple of form
449 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
450 %% Columns are `["id", "from_jid", "message"]'.
451 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
452 [Row] when
453 PoolName :: mongoose_cassandra:pool_name(),
454 UserJID :: jid:jid(),
455 HostType :: host_type(),
456 Filter :: filter(),
457 IMax :: pos_integer(),
458 ReverseLimit :: boolean(),
459 Row :: mongoose_cassandra:row().
460 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
461
:-(
[];
462 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
463
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
464
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
465
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
466
:-(
Rows;
467 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
468
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
469
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
470
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
471
:-(
lists:reverse(Rows).
472
473 fetch_user_messages(PoolName, UserJID, FilterMap) ->
474
:-(
QueryName = fetch_user_messages_query,
475
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
476
:-(
Rows.
477
478
479 %% @doc Calculate a zero-based index of the row with UID in the result test.
480 %%
481 %% If the element does not exists, the ID of the next element will
482 %% be returned instead.
483 %% @end
484 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
485 when
486 PoolName :: mongoose_cassandra:pool_name(),
487 UserJID :: jid:jid(),
488 HostType :: host_type(),
489 Filter :: filter(),
490 MessID :: message_id(),
491 Count :: non_neg_integer().
492 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
493
:-(
calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
494
495 %% @doc Count of elements in RSet before the passed element.
496 %%
497 %% The element with the passed UID can be already deleted.
498 %% @end
499 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
500 when
501 PoolName :: mongoose_cassandra:pool_name(),
502 UserJID :: jid:jid(),
503 HostType :: host_type(),
504 Filter :: filter(),
505 MessID :: message_id(),
506 Count :: non_neg_integer().
507 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
508
:-(
calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
509
510
511 %% @doc Get the total result set size.
512 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
513 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
514 when
515 PoolName :: mongoose_cassandra:pool_name(),
516 UserJID :: jid:jid(),
517 HostType :: host_type(),
518 Filter :: filter(),
519 Count :: non_neg_integer().
520 calc_count(PoolName, UserJID, _HostType, Filter) ->
521
:-(
QueryName = {calc_count_query, select_filter(Filter)},
522
:-(
Params = eval_filter_params(Filter),
523
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
524
:-(
Count.
525
526 %% @doc Convert offset to index of the first entry
527 %% Returns undefined if not there are not enough rows
528 %% Uses previously calculated offsets to speed up queries
529 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
530 when
531 PoolName :: mongoose_cassandra:pool_name(),
532 UserJID :: jid:jid(),
533 Offset :: non_neg_integer(),
534 Filter :: filter(),
535 Id :: non_neg_integer() | undefined.
536 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
537 Offset =< 100 ->
538
:-(
calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
539 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
540
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
541 %% Try to find already calculated nearby offset to reduce query size
542
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
543 {ok, []} -> %% No hints, just calculate offset sloooowly
544
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
545
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
546 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
547 %% Offset hint found, use it to reduce query size
548
:-(
case Offset of
549
:-(
PrevOffset -> PrevId;
550 _ ->
551
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
552 Filter#mam_ca_filter{start_id = PrevId},
553 Offset - PrevOffset + 1),
554
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
555 end
556 end.
557
558 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
559 %% Hint is save only if previous offset hint was 50+ entires from current query
560 %% This function returns given StartId as passthrough for convenience
561 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
562 Filter :: filter(), HintOffset :: non_neg_integer(),
563 NewOffset :: non_neg_integer(),
564 StartId :: non_neg_integer() | undefined) ->
565 StartId :: non_neg_integer() | undefined.
566 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
567 StartId = undefined) ->
568
:-(
StartId;
569 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
570
:-(
case abs(NewOffset - HintOffset) > 50 of
571 true ->
572
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
573
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
574
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
575 insert_offset_hint_query, [Row]);
576 false ->
577
:-(
skip
578 end,
579
:-(
StartId.
580
581 %% @doc Convert offset to index of the first entry
582 %% Returns undefined if not there are not enough rows
583 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
584 when
585 PoolName :: mongoose_cassandra:pool_name(),
586 UserJID :: jid:jid(),
587 Offset :: non_neg_integer(),
588 Filter :: filter(),
589 Id :: non_neg_integer() | undefined.
590 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
591
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
592
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
593
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
594
:-(
case RowsIds of
595
:-(
[] -> undefined;
596 [_ | _] ->
597
:-(
maps:get(id, lists:last(RowsIds))
598 end.
599
600 %% @doc Get closest offset -> message id 'hint' for specified offset
601 prev_offset_query_cql() ->
602
:-(
"SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
603 "and offset <= ? LIMIT 1".
604
605 %% @doc Insert offset -> message id 'hint'
606 insert_offset_hint_query_cql() ->
607
:-(
"INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
608
609 prepare_filter(UserJID, Borders, Start, End, WithJID) ->
610
:-(
BUserJID = bare_jid(UserJID),
611
:-(
{StartID, EndID} = mod_mam_utils:calculate_msg_id_borders(Borders, Start, End),
612
:-(
BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
613
:-(
prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
614
615 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
616
:-(
#mam_ca_filter{
617 user_jid = BUserJID,
618 with_jid = BWithJID,
619 start_id = StartID,
620 end_id = EndID
621 }.
622
623 eval_filter_params(#mam_ca_filter{
624 user_jid = BUserJID,
625 with_jid = BWithJID,
626 start_id = StartID,
627 end_id = EndID
628 }) ->
629
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
630 #{start_id => StartID, end_id =>EndID}),
631
:-(
maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
632
633 select_filter(#mam_ca_filter{
634 start_id = StartID,
635 end_id = EndID
636 }) ->
637
:-(
select_filter(StartID, EndID).
638
639
640 -spec select_filter(StartID, EndID) ->
641 all | 'end' | start | start_end when
642 StartID :: integer() | undefined,
643 EndID :: integer() | undefined.
644 select_filter(undefined, undefined) ->
645
:-(
all;
646 select_filter(undefined, _) ->
647
:-(
'end';
648 select_filter(_, undefined) ->
649
:-(
start;
650 select_filter(_, _) ->
651
:-(
start_end.
652
653 prepare_filter_cql(StartID, EndID) ->
654 case StartID of
655
:-(
undefined -> "";
656
:-(
_ -> " AND id >= :start_id"
657
:-(
end ++
658 case EndID of
659
:-(
undefined -> "";
660
:-(
_ -> " AND id <= :end_id"
661 end.
662
663 filter_to_cql() ->
664
:-(
[{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
665
:-(
|| StartID <- [undefined, 0], EndID <- [undefined, 0]].
666
667 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
668 when
669 PoolName :: mongoose_cassandra:pool_name(),
670 UserJID :: jid:jid(),
671 HostType :: host_type(),
672 Filter :: filter(),
673 PageSize :: non_neg_integer(),
674 TotalCount :: non_neg_integer(),
675 RSM :: jlib:rsm_in() | undefined,
676 Offset :: non_neg_integer().
677 %% Requesting the Last Page in a Result Set
678 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
679
:-(
max(0, TC - PS);
680 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
681 when is_integer(ID) ->
682
:-(
max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
683 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
684 when is_integer(ID) ->
685
:-(
calc_index(PoolName, UserJID, HostType, F, ID);
686 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
687
:-(
0.
688
689 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
690
:-(
maybe_full_jid(undefined) -> <<>>;
691 maybe_full_jid(JID) ->
692
:-(
jid:to_binary(jid:to_lower(JID)).
693
694 %%====================================================================
695 %% Internal SQL part
696 %%====================================================================
697
698 extract_messages_queries() ->
699
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
700
:-(
|| {FilterName, Filter} <- filter_to_cql()].
701
702 extract_messages_r_queries() ->
703
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
704
:-(
|| {FilterName, Filter} <- filter_to_cql()].
705
706 calc_count_queries() ->
707
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
708
:-(
|| {FilterName, Filter} <- filter_to_cql()].
709
710 list_message_ids_queries() ->
711
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
712
:-(
|| {FilterName, Filter} <- filter_to_cql()].
713
714 extract_messages_cql(Filter) ->
715 "SELECT id, from_jid, message FROM mam_message "
716
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
717 Filter ++ " ORDER BY id LIMIT ?".
718
719 extract_messages_r_cql(Filter) ->
720 "SELECT id, from_jid, message FROM mam_message "
721
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
722 Filter ++ " ORDER BY id DESC LIMIT ?".
723
724 calc_count_cql(Filter) ->
725 "SELECT COUNT(*) FROM mam_message "
726
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter.
727
728 list_message_ids_cql(Filter) ->
729 "SELECT id FROM mam_message "
730
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
731 " ORDER BY id LIMIT ?".
732
733 fetch_user_messages_cql() ->
734
:-(
"SELECT id, from_jid, message FROM mam_message "
735 "WHERE user_jid = ? AND with_jid = ? "
736 "ORDER BY id".
737
738
739
740 %% ----------------------------------------------------------------------
741 %% Optimizations
742
743 packet_to_stored_binary(HostType, Packet) ->
744 %% Module implementing mam_message behaviour
745
:-(
Module = db_message_format(HostType),
746
:-(
Module:encode(Packet).
747
748 stored_binary_to_packet(HostType, Bin) ->
749 %% Module implementing mam_message behaviour
750
:-(
Module = db_message_format(HostType),
751
:-(
Module:decode(Bin).
752
753 %% ----------------------------------------------------------------------
754 %% Params getters
755
756 -spec db_message_format(HostType :: host_type()) -> module().
757 db_message_format(HostType) ->
758
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format, mam_message_xml).
759
760 -spec pool_name(HostType :: host_type()) -> term().
761 pool_name(HostType) ->
762
:-(
gen_mod:get_module_opt(HostType, ?MODULE, pool_name, default).
Line Hits Source