./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/4,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/4]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 -ignore_xref([
32 behaviour_info/1, remove_archive/4
33 ]).
34
35 %% ----------------------------------------------------------------------
36 %% Imports
37
38 %% Other
39 -import(mod_mam_utils,
40 [maybe_min/2,
41 maybe_max/2,
42 bare_jid/1,
43 full_jid/1
44 ]).
45
46 -include("jlib.hrl").
47 -include("mongoose_rsm.hrl").
48
49 -callback encode(binary()) -> binary().
50 -callback decode(binary()) -> binary().
51
52 -record(mam_ca_filter, {
53 user_jid,
54 with_jid,
55 remote_jid,
56 start_id,
57 end_id
58 }).
59
60 -record(mam_message, {
61 id :: non_neg_integer(),
62 user_jid :: binary(),
63 remote_jid :: binary() | undefined,
64 from_jid :: binary() | undefined,
65 with_jid = <<>> :: binary(),
66 message :: binary() | undefined
67 }).
68
69 %% ----------------------------------------------------------------------
70 %% Types
71
72 -type filter() :: #mam_ca_filter{}.
73 -type message_id() :: non_neg_integer().
74 -type host_type() :: mongooseim:host_type().
75
76 %% ----------------------------------------------------------------------
77 %% gen_mod callbacks
78 %% Starting and stopping functions for users' archives
79
80 -spec start(host_type(), gen_mod:module_opts()) -> ok.
81 start(HostType, _Opts) ->
82 16 ejabberd_hooks:add(hooks(HostType)).
83
84 -spec stop(host_type()) -> ok.
85 stop(HostType) ->
86 16 ejabberd_hooks:delete(hooks(HostType)).
87
88 %% ----------------------------------------------------------------------
89 %% Add hooks for mod_mam_pm
90
91 -spec hooks(host_type()) -> [ejabberd_hooks:hook()].
92 hooks(HostType) ->
93 32 [{mam_archive_message, HostType, ?MODULE, archive_message, 50},
94 {mam_archive_size, HostType, ?MODULE, archive_size, 50},
95 {mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50},
96 {mam_remove_archive, HostType, ?MODULE, remove_archive, 50},
97 {get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50}].
98
99 %% ----------------------------------------------------------------------
100 %% mongoose_cassandra_worker callbacks
101
102 prepared_queries() ->
103 [
104 {insert_offset_hint_query, insert_offset_hint_query_cql()},
105 {prev_offset_query, prev_offset_query_cql()},
106 {insert_query, insert_query_cql()},
107 {fetch_user_messages_query, fetch_user_messages_cql()},
108 {select_for_removal_query, select_for_removal_query_cql()},
109 {remove_archive_query, remove_archive_query_cql()},
110 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
111 ]
112 1227 ++ extract_messages_queries()
113 ++ extract_messages_r_queries()
114 ++ calc_count_queries()
115 ++ list_message_ids_queries().
116
117 %% ----------------------------------------------------------------------
118 %% Internal functions and callbacks
119
120 archive_size(Size, HostType, _UserID, UserJID) when is_integer(Size) ->
121 107 Borders = Start = End = WithJID = undefined,
122 107 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
123 107 calc_count(pool_name(HostType), UserJID, HostType, Filter).
124
125
126 %% ----------------------------------------------------------------------
127 %% INSERT MESSAGE
128
129 insert_query_cql() ->
130 1227 "INSERT INTO mam_message "
131 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
132 "VALUES (?, ?, ?, ?, ?, ?)".
133
134 archive_message(_Result, HostType, Params) ->
135 693 try
136 693 archive_message2(Params, HostType)
137 catch _Type:Reason ->
138
:-(
{error, Reason}
139 end.
140
141 archive_message2(#{message_id := MessID,
142 local_jid := LocJID,
143 remote_jid := RemJID,
144 source_jid := SrcJID,
145 packet := Packet}, HostType) ->
146 693 BLocJID = bare_jid(LocJID),
147 693 BRemBareJID = bare_jid(RemJID),
148 693 BRemFullJID = full_jid(RemJID),
149 693 BSrcJID = full_jid(SrcJID),
150 693 BPacket = packet_to_stored_binary(HostType, Packet),
151 693 Message = #mam_message{
152 id = MessID,
153 user_jid = BLocJID,
154 from_jid = BSrcJID,
155 remote_jid = BRemFullJID,
156 message = BPacket
157 },
158 693 WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
159 693 Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
160 693 write_messages(HostType, LocJID, Messages).
161
162 write_messages(HostType, UserJID, Messages) ->
163 693 MultiParams = [message_to_params(M) || M <- Messages],
164 693 mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
165 ?MODULE, insert_query, MultiParams).
166
167 message_to_params(#mam_message{
168 id = MessID,
169 user_jid = BLocJID,
170 from_jid = BSrcJID,
171 remote_jid = BRemJID,
172 with_jid = BWithJID,
173 message = BPacket
174 }) ->
175 1646 #{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
176 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
177
178 %% ----------------------------------------------------------------------
179 %% REMOVE ARCHIVE
180
181 remove_archive_query_cql() ->
182 1227 "DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
183
184 remove_archive_offsets_query_cql() ->
185 1227 "DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
186
187 select_for_removal_query_cql() ->
188 1227 "SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
189
190 remove_archive(Acc, HostType, _UserID, UserJID) ->
191 86 remove_archive(HostType, UserJID),
192 86 Acc.
193
194 remove_archive(HostType, UserJID) ->
195 86 PoolName = pool_name(HostType),
196 86 BUserJID = bare_jid(UserJID),
197 86 Params = #{user_jid => BUserJID},
198 %% Wait until deleted
199 86 DeleteFun =
200 fun(Rows, _AccIn) ->
201 86 mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
202 remove_archive_query, Rows),
203 86 mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
204 remove_archive_offsets_query, Rows)
205 end,
206
207 86 mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
208 select_for_removal_query, Params, DeleteFun, []).
209 %% ----------------------------------------------------------------------
210 %% SELECT MESSAGES
211
212 -spec lookup_messages(Result :: any(), HostType :: host_type(), Params :: map()) ->
213 {ok, mod_mam:lookup_result()}.
214 lookup_messages({error, _Reason} = Result, _HostType, _Params) ->
215
:-(
Result;
216 lookup_messages(_Result, _HostType, #{search_text := <<_/binary>>}) ->
217
:-(
{error, 'not-supported'};
218 lookup_messages(_Result, HostType,
219 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
220 start_ts := Start, end_ts := End, with_jid := WithJID,
221 search_text := undefined, page_size := PageSize,
222 is_simple := IsSimple}) ->
223 90 try
224 90 lookup_messages2(pool_name(HostType), HostType,
225 UserJID, RSM, Borders,
226 Start, End, WithJID,
227 PageSize, IsSimple)
228 catch _Type:Reason:S ->
229
:-(
{error, {Reason, S}}
230 end.
231
232 lookup_messages2(PoolName, HostType,
233 UserJID = #jid{}, RSM, Borders,
234 Start, End, WithJID,
235 PageSize, _IsSimple = true) ->
236 %% Simple query without calculating offset and total count
237 11 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
238 11 lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
239 lookup_messages2(PoolName, HostType,
240 UserJID = #jid{}, RSM, Borders,
241 Start, End, WithJID,
242 PageSize, _IsSimple) ->
243 %% Query with offset calculation
244 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
245 %% Not all queries are optimal. You would like to disable something for production
246 %% once you know how you will call bd
247 79 Strategy = rsm_to_strategy(RSM),
248 79 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
249 79 case Strategy of
250 last_page ->
251 11 lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
252 by_offset ->
253 4 lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
254 first_page ->
255 45 lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
256 before_id ->
257 8 lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
258 after_id ->
259 11 lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
260 end.
261
262 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
263 11 last_page;
264 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
265
:-(
first_page;
266 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
267 4 by_offset;
268 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
269 8 before_id;
270 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
271 11 after_id;
272 rsm_to_strategy(#rsm_in{}) ->
273 14 first_page;
274 rsm_to_strategy(undefined) ->
275 31 first_page.
276
277 lookup_messages_simple(PoolName, HostType, UserJID,
278 #rsm_in{direction = aft, id = ID},
279 PageSize, Filter) ->
280 %% Get last rows from result set
281
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
282
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
283 lookup_messages_simple(PoolName, HostType, UserJID,
284 #rsm_in{direction = before, id = ID},
285 PageSize, Filter) ->
286 11 MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
287 11 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
288 lookup_messages_simple(PoolName, HostType, UserJID,
289 #rsm_in{direction = undefined, index = Offset},
290 PageSize, Filter) ->
291 %% Apply offset
292
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
293 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
294
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
295 false),
296
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
297 lookup_messages_simple(PoolName, HostType, UserJID,
298 _,
299 PageSize, Filter) ->
300
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
301
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
302
303 lookup_messages_last_page(PoolName, HostType, UserJID,
304 #rsm_in{direction = before, id = undefined},
305 0, Filter) ->
306 %% Last page
307 2 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
308 2 {ok, {TotalCount, TotalCount, []}};
309 lookup_messages_last_page(PoolName, HostType, UserJID,
310 #rsm_in{direction = before, id = undefined},
311 PageSize, Filter) ->
312 %% Last page
313 9 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
314 9 MessageRowsCount = length(MessageRows),
315 9 case MessageRowsCount < PageSize of
316 true ->
317
:-(
{ok, {MessageRowsCount, 0,
318 rows_to_uniform_format(HostType, MessageRows)}};
319 false ->
320 9 FirstID = row_to_message_id(hd(MessageRows)),
321 9 Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
322 9 {ok, {Offset + MessageRowsCount, Offset,
323 rows_to_uniform_format(HostType, MessageRows)}}
324 end.
325
326 lookup_messages_by_offset(PoolName, HostType, UserJID,
327 #rsm_in{direction = undefined, index = Offset},
328 0, Filter) when is_integer(Offset) ->
329 %% By offset
330 2 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
331 2 {ok, {TotalCount, Offset, []}};
332 lookup_messages_by_offset(PoolName, HostType, UserJID,
333 #rsm_in{direction = undefined, index = Offset},
334 PageSize, Filter) when is_integer(Offset) ->
335 %% By offset
336 2 StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
337 2 MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
338 false),
339 2 MessageRowsCount = length(MessageRows),
340 2 case MessageRowsCount < PageSize of
341 true ->
342
:-(
{ok, {Offset + MessageRowsCount, Offset,
343 rows_to_uniform_format(HostType, MessageRows)}};
344 false ->
345 2 LastID = row_to_message_id(lists:last(MessageRows)),
346 2 CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
347 2 {ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
348 rows_to_uniform_format(HostType, MessageRows)}}
349 end.
350
351 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
352 %% First page, just count
353 4 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
354 4 {ok, {TotalCount, 0, []}};
355 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
356 %% First page
357 41 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
358 41 MessageRowsCount = length(MessageRows),
359 41 case MessageRowsCount < PageSize of
360 true ->
361 %% Total number of messages is less than one page
362 34 {ok, {MessageRowsCount, 0,
363 rows_to_uniform_format(HostType, MessageRows)}};
364 false ->
365 7 LastID = row_to_message_id(lists:last(MessageRows)),
366 7 CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
367 7 {ok, {MessageRowsCount + CountAfterLastID, 0,
368 rows_to_uniform_format(HostType, MessageRows)}}
369 end.
370
371 lookup_messages_before_id(PoolName, HostType, UserJID,
372 RSM = #rsm_in{direction = before, id = ID},
373 PageSize, Filter) ->
374 8 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
375 8 Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
376 8 MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
377 PageSize + 1, true),
378 8 Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
379 8 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
380
381 lookup_messages_after_id(PoolName, HostType, UserJID,
382 RSM = #rsm_in{direction = aft, id = ID},
383 PageSize, Filter) ->
384 11 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
385 11 Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
386 11 MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
387 PageSize + 1, false),
388 11 Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
389 11 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
390
391
392 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
393 9 Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
394
395 before_id(undefined, Filter) ->
396 9 Filter;
397 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
398 19 Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
399
400 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
401 19 Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
402
403 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
404 13 Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
405
406
407 rows_to_uniform_format(HostType, MessageRows) ->
408 82 [row_to_uniform_format(HostType, Row) || Row <- MessageRows].
409
410 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
411 328 SrcJID = jid:from_binary(FromJID),
412 328 Packet = stored_binary_to_packet(HostType, Msg),
413 328 #{id => MsgID, jid => SrcJID, packet => Packet}.
414
415 row_to_message_id(#{id := MsgID}) ->
416 18 MsgID.
417
418 -spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
419 host_type(), jid:jid()) ->
420 ejabberd_gen_mam_archive:mam_pm_gdpr_data().
421 get_mam_pm_gdpr_data(Acc, HostType, JID) ->
422 21 BinJID = jid:to_binary(jid:to_lower(JID)),
423 21 FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
424 21 Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
425 21 Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
426 21 Messages ++ Acc.
427
428 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
429 42 {Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
430
431 %% Offset is not supported
432 %% Each record is a tuple of form
433 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
434 %% Columns are `["id", "from_jid", "message"]'.
435 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
436 [Row] when
437 PoolName :: mongoose_cassandra:pool_name(),
438 UserJID :: jid:jid(),
439 HostType :: host_type(),
440 Filter :: filter(),
441 IMax :: pos_integer(),
442 ReverseLimit :: boolean(),
443 Row :: mongoose_cassandra:row().
444 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
445
:-(
[];
446 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
447 54 QueryName = {extract_messages_query, select_filter(Filter)},
448 54 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
449 54 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
450 54 Rows;
451 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
452 28 QueryName = {extract_messages_r_query, select_filter(Filter)},
453 28 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
454 28 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
455 28 lists:reverse(Rows).
456
457 fetch_user_messages(PoolName, UserJID, FilterMap) ->
458 21 QueryName = fetch_user_messages_query,
459 21 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
460 21 Rows.
461
462
463 %% @doc Calculate a zero-based index of the row with UID in the result test.
464 %%
465 %% If the element does not exists, the ID of the next element will
466 %% be returned instead.
467 %% @end
468 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
469 when
470 PoolName :: mongoose_cassandra:pool_name(),
471 UserJID :: jid:jid(),
472 HostType :: host_type(),
473 Filter :: filter(),
474 MessID :: message_id(),
475 Count :: non_neg_integer().
476 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
477 11 calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
478
479 %% @doc Count of elements in RSet before the passed element.
480 %%
481 %% The element with the passed UID can be already deleted.
482 %% @end
483 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
484 when
485 PoolName :: mongoose_cassandra:pool_name(),
486 UserJID :: jid:jid(),
487 HostType :: host_type(),
488 Filter :: filter(),
489 MessID :: message_id(),
490 Count :: non_neg_integer().
491 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
492 8 calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
493
494
495 %% @doc Get the total result set size.
496 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
497 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
498 when
499 PoolName :: mongoose_cassandra:pool_name(),
500 UserJID :: jid:jid(),
501 HostType :: host_type(),
502 Filter :: filter(),
503 Count :: non_neg_integer().
504 calc_count(PoolName, UserJID, _HostType, Filter) ->
505 171 QueryName = {calc_count_query, select_filter(Filter)},
506 171 Params = eval_filter_params(Filter),
507 171 {ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
508 171 Count.
509
510 %% @doc Convert offset to index of the first entry
511 %% Returns undefined if not there are not enough rows
512 %% Uses previously calculated offsets to speed up queries
513 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
514 when
515 PoolName :: mongoose_cassandra:pool_name(),
516 UserJID :: jid:jid(),
517 Offset :: non_neg_integer(),
518 Filter :: filter(),
519 Id :: non_neg_integer() | undefined.
520 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
521 Offset =< 100 ->
522 2 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
523 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
524
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
525 %% Try to find already calculated nearby offset to reduce query size
526
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
527 {ok, []} -> %% No hints, just calculate offset sloooowly
528
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
529
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
530 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
531 %% Offset hint found, use it to reduce query size
532
:-(
case Offset of
533
:-(
PrevOffset -> PrevId;
534 _ ->
535
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
536 Filter#mam_ca_filter{start_id = PrevId},
537 Offset - PrevOffset + 1),
538
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
539 end
540 end.
541
542 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
543 %% Hint is save only if previous offset hint was 50+ entires from current query
544 %% This function returns given StartId as passthrough for convenience
545 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
546 Filter :: filter(), HintOffset :: non_neg_integer(),
547 NewOffset :: non_neg_integer(),
548 StartId :: non_neg_integer() | undefined) ->
549 StartId :: non_neg_integer() | undefined.
550 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
551 StartId = undefined) ->
552
:-(
StartId;
553 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
554
:-(
case abs(NewOffset - HintOffset) > 50 of
555 true ->
556
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
557
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
558
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
559 insert_offset_hint_query, [Row]);
560 false ->
561
:-(
skip
562 end,
563
:-(
StartId.
564
565 %% @doc Convert offset to index of the first entry
566 %% Returns undefined if not there are not enough rows
567 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
568 when
569 PoolName :: mongoose_cassandra:pool_name(),
570 UserJID :: jid:jid(),
571 Offset :: non_neg_integer(),
572 Filter :: filter(),
573 Id :: non_neg_integer() | undefined.
574 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
575 2 QueryName = {list_message_ids_query, select_filter(Filter)},
576 2 Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
577 2 {ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
578 2 case RowsIds of
579
:-(
[] -> undefined;
580 [_ | _] ->
581 2 maps:get(id, lists:last(RowsIds))
582 end.
583
584 %% @doc Get closest offset -> message id 'hint' for specified offset
585 prev_offset_query_cql() ->
586 1227 "SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
587 "and offset <= ? LIMIT 1".
588
589 %% @doc Insert offset -> message id 'hint'
590 insert_offset_hint_query_cql() ->
591 1227 "INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
592
593 prepare_filter(UserJID, Borders, Start, End, WithJID) ->
594 197 BUserJID = bare_jid(UserJID),
595 197 {StartID, EndID} = mod_mam_utils:calculate_msg_id_borders(Borders, Start, End),
596 197 BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
597 197 prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
598
599 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
600 197 #mam_ca_filter{
601 user_jid = BUserJID,
602 with_jid = BWithJID,
603 start_id = StartID,
604 end_id = EndID
605 }.
606
607 eval_filter_params(#mam_ca_filter{
608 user_jid = BUserJID,
609 with_jid = BWithJID,
610 start_id = StartID,
611 end_id = EndID
612 }) ->
613 255 Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
614 #{start_id => StartID, end_id =>EndID}),
615 255 maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
616
617 select_filter(#mam_ca_filter{
618 start_id = StartID,
619 end_id = EndID
620 }) ->
621 255 select_filter(StartID, EndID).
622
623
624 -spec select_filter(StartID, EndID) ->
625 all | 'end' | start | start_end when
626 StartID :: integer() | undefined,
627 EndID :: integer() | undefined.
628 select_filter(undefined, undefined) ->
629 5088 all;
630 select_filter(undefined, _) ->
631 4941 'end';
632 select_filter(_, undefined) ->
633 4938 start;
634 select_filter(_, _) ->
635 4920 start_end.
636
637 prepare_filter_cql(StartID, EndID) ->
638 case StartID of
639 9816 undefined -> "";
640 9816 _ -> " AND id >= :start_id"
641 19632 end ++
642 case EndID of
643 9816 undefined -> "";
644 9816 _ -> " AND id <= :end_id"
645 end.
646
647 filter_to_cql() ->
648 4908 [{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
649 4908 || StartID <- [undefined, 0], EndID <- [undefined, 0]].
650
651 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
652 when
653 PoolName :: mongoose_cassandra:pool_name(),
654 UserJID :: jid:jid(),
655 HostType :: host_type(),
656 Filter :: filter(),
657 PageSize :: non_neg_integer(),
658 TotalCount :: non_neg_integer(),
659 RSM :: jlib:rsm_in() | undefined,
660 Offset :: non_neg_integer().
661 %% Requesting the Last Page in a Result Set
662 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
663
:-(
max(0, TC - PS);
664 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
665 when is_integer(ID) ->
666 8 max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
667 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
668 when is_integer(ID) ->
669 11 calc_index(PoolName, UserJID, HostType, F, ID);
670 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
671
:-(
0.
672
673 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
674 194 maybe_full_jid(undefined) -> <<>>;
675 maybe_full_jid(JID) ->
676 3 jid:to_binary(jid:to_lower(JID)).
677
678 %%====================================================================
679 %% Internal SQL part
680 %%====================================================================
681
682 extract_messages_queries() ->
683 1227 [{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
684 1227 || {FilterName, Filter} <- filter_to_cql()].
685
686 extract_messages_r_queries() ->
687 1227 [{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
688 1227 || {FilterName, Filter} <- filter_to_cql()].
689
690 calc_count_queries() ->
691 1227 [{{calc_count_query, FilterName}, calc_count_cql(Filter)}
692 1227 || {FilterName, Filter} <- filter_to_cql()].
693
694 list_message_ids_queries() ->
695 1227 [{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
696 1227 || {FilterName, Filter} <- filter_to_cql()].
697
698 extract_messages_cql(Filter) ->
699 "SELECT id, from_jid, message FROM mam_message "
700 4908 "WHERE user_jid = ? AND with_jid = ? " ++
701 Filter ++ " ORDER BY id LIMIT ?".
702
703 extract_messages_r_cql(Filter) ->
704 "SELECT id, from_jid, message FROM mam_message "
705 4908 "WHERE user_jid = ? AND with_jid = ? " ++
706 Filter ++ " ORDER BY id DESC LIMIT ?".
707
708 calc_count_cql(Filter) ->
709 "SELECT COUNT(*) FROM mam_message "
710 4908 "WHERE user_jid = ? AND with_jid = ? " ++ Filter.
711
712 list_message_ids_cql(Filter) ->
713 "SELECT id FROM mam_message "
714 4908 "WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
715 " ORDER BY id LIMIT ?".
716
717 fetch_user_messages_cql() ->
718 1227 "SELECT id, from_jid, message FROM mam_message "
719 "WHERE user_jid = ? AND with_jid = ? "
720 "ORDER BY id".
721
722
723
724 %% ----------------------------------------------------------------------
725 %% Optimizations
726
727 packet_to_stored_binary(HostType, Packet) ->
728 %% Module implementing mam_message behaviour
729 693 Module = db_message_format(HostType),
730 693 Module:encode(Packet).
731
732 stored_binary_to_packet(HostType, Bin) ->
733 %% Module implementing mam_message behaviour
734 370 Module = db_message_format(HostType),
735 370 Module:decode(Bin).
736
737 %% ----------------------------------------------------------------------
738 %% Params getters
739
740 -spec db_message_format(HostType :: host_type()) -> module().
741 db_message_format(HostType) ->
742 1063 gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
743
744 -spec pool_name(HostType :: host_type()) -> term().
745 pool_name(_HostType) ->
746 997 default.
Line Hits Source