./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1, hooks/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/3,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/3]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 %% ----------------------------------------------------------------------
32 %% Imports
33
34 %% Other
35 -import(mod_mam_utils,
36 [maybe_min/2,
37 maybe_max/2,
38 bare_jid/1,
39 full_jid/1
40 ]).
41
42 -include("jlib.hrl").
43 -include("mongoose_rsm.hrl").
44
45 -callback encode(binary()) -> binary().
46 -callback decode(binary()) -> binary().
47
48 -record(mam_ca_filter, {
49 user_jid,
50 with_jid,
51 remote_jid,
52 start_id,
53 end_id
54 }).
55
56 -record(mam_message, {
57 id :: non_neg_integer(),
58 user_jid :: binary(),
59 remote_jid :: binary() | undefined,
60 from_jid :: binary() | undefined,
61 with_jid = <<>> :: binary(),
62 message :: binary() | undefined
63 }).
64
65 %% ----------------------------------------------------------------------
66 %% Types
67
68 -type filter() :: #mam_ca_filter{}.
69 -type message_id() :: non_neg_integer().
70 -type host_type() :: mongooseim:host_type().
71
72 %% ----------------------------------------------------------------------
73 %% gen_mod callbacks
74 %% Starting and stopping functions for users' archives
75
76 -spec start(host_type(), gen_mod:module_opts()) -> ok.
77 start(_HostType, _Opts) ->
78 18 ok.
79
80 -spec stop(host_type()) -> ok.
81 stop(_HostType) ->
82 18 ok.
83
84 %% ----------------------------------------------------------------------
85 %% Add hooks for mod_mam_pm
86
87 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
88 hooks(HostType) ->
89 36 [{mam_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
90 {mam_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
91 {mam_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
92 {mam_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
93 {get_mam_pm_gdpr_data, HostType, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50}].
94
95 %% ----------------------------------------------------------------------
96 %% mongoose_cassandra_worker callbacks
97
98 prepared_queries() ->
99 [
100 {insert_offset_hint_query, insert_offset_hint_query_cql()},
101 {prev_offset_query, prev_offset_query_cql()},
102 {insert_query, insert_query_cql()},
103 {fetch_user_messages_query, fetch_user_messages_cql()},
104 {select_for_removal_query, select_for_removal_query_cql()},
105 {remove_archive_query, remove_archive_query_cql()},
106 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
107 ]
108 1515 ++ extract_messages_queries()
109 ++ extract_messages_r_queries()
110 ++ calc_count_queries()
111 ++ list_message_ids_queries().
112
113 %% ----------------------------------------------------------------------
114 %% Internal functions and callbacks
115
116 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
117 Acc :: integer(),
118 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
119 Extra :: gen_hook:extra().
120 archive_size(Size, #{owner := UserJID}, #{host_type := HostType}) when is_integer(Size) ->
121 133 Borders = Start = End = WithJID = undefined,
122 133 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, undefined),
123 133 {ok, calc_count(pool_name(HostType), UserJID, HostType, Filter)}.
124
125
126 %% ----------------------------------------------------------------------
127 %% INSERT MESSAGE
128
129 insert_query_cql() ->
130 1515 "INSERT INTO mam_message "
131 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
132 "VALUES (?, ?, ?, ?, ?, ?)".
133
134 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
135 Acc :: ok,
136 Params :: mod_mam:archive_message_params(),
137 Extra :: gen_hook:extra().
138 archive_message(_Result, Params, #{host_type := HostType}) ->
139 1029 try
140 1029 {ok, archive_message2(Params, HostType)}
141 catch _Type:Reason ->
142
:-(
{ok, {error, Reason}}
143 end.
144
145 archive_message2(#{message_id := MessID,
146 local_jid := LocJID,
147 remote_jid := RemJID,
148 source_jid := SrcJID,
149 packet := Packet}, HostType) ->
150 1029 BLocJID = bare_jid(LocJID),
151 1029 BRemBareJID = bare_jid(RemJID),
152 1029 BRemFullJID = full_jid(RemJID),
153 1029 BSrcJID = full_jid(SrcJID),
154 1029 BPacket = packet_to_stored_binary(HostType, Packet),
155 1029 Message = #mam_message{
156 id = MessID,
157 user_jid = BLocJID,
158 from_jid = BSrcJID,
159 remote_jid = BRemFullJID,
160 message = BPacket
161 },
162 1029 WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
163 1029 Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
164 1029 write_messages(HostType, LocJID, Messages).
165
166 write_messages(HostType, UserJID, Messages) ->
167 1029 MultiParams = [message_to_params(M) || M <- Messages],
168 1029 mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
169 ?MODULE, insert_query, MultiParams).
170
171 message_to_params(#mam_message{
172 id = MessID,
173 user_jid = BLocJID,
174 from_jid = BSrcJID,
175 remote_jid = BRemJID,
176 with_jid = BWithJID,
177 message = BPacket
178 }) ->
179 2330 #{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
180 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
181
182 %% ----------------------------------------------------------------------
183 %% REMOVE ARCHIVE
184
185 remove_archive_query_cql() ->
186 1515 "DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
187
188 remove_archive_offsets_query_cql() ->
189 1515 "DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
190
191 select_for_removal_query_cql() ->
192 1515 "SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
193
194 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
195 Acc :: term(),
196 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
197 Extra :: gen_hook:extra().
198 remove_archive(Acc, #{owner := UserJID}, #{host_type := HostType}) ->
199 49 remove_archive(HostType, UserJID),
200 49 {ok, Acc}.
201
202 remove_archive(HostType, UserJID) ->
203 49 PoolName = pool_name(HostType),
204 49 BUserJID = bare_jid(UserJID),
205 49 Params = #{user_jid => BUserJID},
206 %% Wait until deleted
207 49 DeleteFun =
208 fun(Rows, _AccIn) ->
209 49 mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
210 remove_archive_query, Rows),
211 49 mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
212 remove_archive_offsets_query, Rows)
213 end,
214
215 49 mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
216 select_for_removal_query, Params, DeleteFun, []).
217 %% ----------------------------------------------------------------------
218 %% SELECT MESSAGES
219
220 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
221 Acc :: {ok, mod_mam:lookup_result()} | {error, term()},
222 Params :: mam_iq:lookup_params(),
223 Extra :: gen_hook:extra().
224 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
225
:-(
{ok, Result};
226 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
227
:-(
{ok, {error, 'not-supported'}};
228 lookup_messages(_Result,
229 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
230 start_ts := Start, end_ts := End, with_jid := WithJID,
231 search_text := undefined, page_size := PageSize,
232 is_simple := IsSimple, message_id := MsgID},
233 #{host_type := HostType}) ->
234 119 try
235 119 {ok, lookup_messages2(pool_name(HostType), HostType,
236 UserJID, RSM, Borders,
237 Start, End, WithJID,
238 PageSize, MsgID, IsSimple)}
239 catch _Type:Reason:S ->
240
:-(
{ok, {error, {Reason, S}}}
241 end.
242
243 lookup_messages2(PoolName, HostType,
244 UserJID = #jid{}, RSM, Borders,
245 Start, End, WithJID,
246 PageSize, MsgID, _IsSimple = true) ->
247 %% Simple query without calculating offset and total count
248 24 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID),
249 24 lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
250 lookup_messages2(PoolName, HostType,
251 UserJID = #jid{}, RSM, Borders,
252 Start, End, WithJID,
253 PageSize, MsgID, _IsSimple) ->
254 %% Query with offset calculation
255 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
256 %% Not all queries are optimal. You would like to disable something for production
257 %% once you know how you will call bd
258 95 Strategy = rsm_to_strategy(RSM),
259 95 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID),
260 95 case Strategy of
261 last_page ->
262 13 lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
263 by_offset ->
264 4 lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
265 first_page ->
266 57 lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
267 before_id ->
268 8 lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
269 after_id ->
270 13 lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
271 end.
272
273 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
274 13 last_page;
275 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
276
:-(
first_page;
277 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
278 4 by_offset;
279 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
280 8 before_id;
281 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
282 13 after_id;
283 rsm_to_strategy(#rsm_in{}) ->
284 16 first_page;
285 rsm_to_strategy(undefined) ->
286 41 first_page.
287
288 lookup_messages_simple(PoolName, HostType, UserJID,
289 #rsm_in{direction = aft, id = ID},
290 PageSize, Filter) ->
291 %% Get last rows from result set
292 8 MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
293 8 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
294 lookup_messages_simple(PoolName, HostType, UserJID,
295 #rsm_in{direction = before, id = ID},
296 PageSize, Filter) ->
297 12 MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
298 12 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
299 lookup_messages_simple(PoolName, HostType, UserJID,
300 #rsm_in{direction = undefined, index = Offset},
301 PageSize, Filter) ->
302 %% Apply offset
303
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
304 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
305
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
306 false),
307
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
308 lookup_messages_simple(PoolName, HostType, UserJID,
309 _,
310 PageSize, Filter) ->
311 4 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
312 4 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
313
314 lookup_messages_last_page(PoolName, HostType, UserJID,
315 #rsm_in{direction = before, id = undefined},
316 0, Filter) ->
317 %% Last page
318 2 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
319 2 {ok, {TotalCount, TotalCount, []}};
320 lookup_messages_last_page(PoolName, HostType, UserJID,
321 #rsm_in{direction = before, id = undefined},
322 PageSize, Filter) ->
323 %% Last page
324 11 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
325 11 MessageRowsCount = length(MessageRows),
326 11 case MessageRowsCount < PageSize of
327 true ->
328
:-(
{ok, {MessageRowsCount, 0,
329 rows_to_uniform_format(HostType, MessageRows)}};
330 false ->
331 11 FirstID = row_to_message_id(hd(MessageRows)),
332 11 Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
333 11 {ok, {Offset + MessageRowsCount, Offset,
334 rows_to_uniform_format(HostType, MessageRows)}}
335 end.
336
337 lookup_messages_by_offset(PoolName, HostType, UserJID,
338 #rsm_in{direction = undefined, index = Offset},
339 0, Filter) when is_integer(Offset) ->
340 %% By offset
341 2 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
342 2 {ok, {TotalCount, Offset, []}};
343 lookup_messages_by_offset(PoolName, HostType, UserJID,
344 #rsm_in{direction = undefined, index = Offset},
345 PageSize, Filter) when is_integer(Offset) ->
346 %% By offset
347 2 StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
348 2 MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
349 false),
350 2 MessageRowsCount = length(MessageRows),
351 2 case MessageRowsCount < PageSize of
352 true ->
353
:-(
{ok, {Offset + MessageRowsCount, Offset,
354 rows_to_uniform_format(HostType, MessageRows)}};
355 false ->
356 2 LastID = row_to_message_id(lists:last(MessageRows)),
357 2 CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
358 2 {ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
359 rows_to_uniform_format(HostType, MessageRows)}}
360 end.
361
362 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
363 %% First page, just count
364 4 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
365 4 {ok, {TotalCount, 0, []}};
366 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
367 %% First page
368 53 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
369 53 MessageRowsCount = length(MessageRows),
370 53 case MessageRowsCount < PageSize of
371 true ->
372 %% Total number of messages is less than one page
373 44 {ok, {MessageRowsCount, 0,
374 rows_to_uniform_format(HostType, MessageRows)}};
375 false ->
376 9 LastID = row_to_message_id(lists:last(MessageRows)),
377 9 CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
378 9 {ok, {MessageRowsCount + CountAfterLastID, 0,
379 rows_to_uniform_format(HostType, MessageRows)}}
380 end.
381
382 lookup_messages_before_id(PoolName, HostType, UserJID,
383 RSM = #rsm_in{direction = before, id = ID},
384 PageSize, Filter) ->
385 8 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
386 8 Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
387 8 MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
388 PageSize + 1, true),
389 8 Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
390 8 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
391
392 lookup_messages_after_id(PoolName, HostType, UserJID,
393 RSM = #rsm_in{direction = aft, id = ID},
394 PageSize, Filter) ->
395 13 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
396 13 Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
397 13 MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
398 PageSize + 1, false),
399 13 Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
400 13 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
401
402
403 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
404 19 Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
405
406 before_id(undefined, Filter) ->
407 2 Filter;
408 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
409 29 Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
410
411 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
412 21 Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
413
414 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
415 15 Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
416
417
418 rows_to_uniform_format(HostType, MessageRows) ->
419 111 [row_to_uniform_format(HostType, Row) || Row <- MessageRows].
420
421 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
422 409 SrcJID = jid:from_binary(FromJID),
423 409 Packet = stored_binary_to_packet(HostType, Msg),
424 409 #{id => MsgID, jid => SrcJID, packet => Packet}.
425
426 row_to_message_id(#{id := MsgID}) ->
427 22 MsgID.
428
429 -spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
430 Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
431 Params :: #{jid := jid:jid()},
432 Extra :: gen_hook:extra().
433 get_mam_pm_gdpr_data(Acc, #{jid := JID}, #{host_type := HostType}) ->
434 21 BinJID = jid:to_binary(jid:to_lower(JID)),
435 21 FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
436 21 Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
437 21 Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
438 21 {ok, Messages ++ Acc}.
439
440 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
441 42 {Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
442
443 %% Offset is not supported
444 %% Each record is a tuple of form
445 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
446 %% Columns are `["id", "from_jid", "message"]'.
447 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
448 [Row] when
449 PoolName :: mongoose_cassandra:pool_name(),
450 UserJID :: jid:jid(),
451 HostType :: host_type(),
452 Filter :: filter(),
453 IMax :: pos_integer(),
454 ReverseLimit :: boolean(),
455 Row :: mongoose_cassandra:row().
456 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
457
:-(
[];
458 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
459 80 QueryName = {extract_messages_query, select_filter(Filter)},
460 80 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
461 80 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
462 80 Rows;
463 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
464 31 QueryName = {extract_messages_r_query, select_filter(Filter)},
465 31 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
466 31 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
467 31 lists:reverse(Rows).
468
469 fetch_user_messages(PoolName, UserJID, FilterMap) ->
470 21 QueryName = fetch_user_messages_query,
471 21 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
472 21 Rows.
473
474
475 %% @doc Calculate a zero-based index of the row with UID in the result test.
476 %%
477 %% If the element does not exists, the ID of the next element will
478 %% be returned instead.
479 %% @end
480 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
481 when
482 PoolName :: mongoose_cassandra:pool_name(),
483 UserJID :: jid:jid(),
484 HostType :: host_type(),
485 Filter :: filter(),
486 MessID :: message_id(),
487 Count :: non_neg_integer().
488 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
489 13 calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
490
491 %% @doc Count of elements in RSet before the passed element.
492 %%
493 %% The element with the passed UID can be already deleted.
494 %% @end
495 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
496 when
497 PoolName :: mongoose_cassandra:pool_name(),
498 UserJID :: jid:jid(),
499 HostType :: host_type(),
500 Filter :: filter(),
501 MessID :: message_id(),
502 Count :: non_neg_integer().
503 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
504 8 calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
505
506
507 %% @doc Get the total result set size.
508 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
509 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
510 when
511 PoolName :: mongoose_cassandra:pool_name(),
512 UserJID :: jid:jid(),
513 HostType :: host_type(),
514 Filter :: filter(),
515 Count :: non_neg_integer().
516 calc_count(PoolName, UserJID, _HostType, Filter) ->
517 205 QueryName = {calc_count_query, select_filter(Filter)},
518 205 Params = eval_filter_params(Filter),
519 205 {ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
520 205 Count.
521
522 %% @doc Convert offset to index of the first entry
523 %% Returns undefined if not there are not enough rows
524 %% Uses previously calculated offsets to speed up queries
525 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
526 when
527 PoolName :: mongoose_cassandra:pool_name(),
528 UserJID :: jid:jid(),
529 Offset :: non_neg_integer(),
530 Filter :: filter(),
531 Id :: non_neg_integer() | undefined.
532 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
533 Offset =< 100 ->
534 2 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
535 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
536
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
537 %% Try to find already calculated nearby offset to reduce query size
538
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
539 {ok, []} -> %% No hints, just calculate offset sloooowly
540
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
541
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
542 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
543 %% Offset hint found, use it to reduce query size
544
:-(
case Offset of
545
:-(
PrevOffset -> PrevId;
546 _ ->
547
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
548 Filter#mam_ca_filter{start_id = PrevId},
549 Offset - PrevOffset + 1),
550
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
551 end
552 end.
553
554 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
555 %% Hint is save only if previous offset hint was 50+ entires from current query
556 %% This function returns given StartId as passthrough for convenience
557 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
558 Filter :: filter(), HintOffset :: non_neg_integer(),
559 NewOffset :: non_neg_integer(),
560 StartId :: non_neg_integer() | undefined) ->
561 StartId :: non_neg_integer() | undefined.
562 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
563 StartId = undefined) ->
564
:-(
StartId;
565 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
566
:-(
case abs(NewOffset - HintOffset) > 50 of
567 true ->
568
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
569
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
570
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
571 insert_offset_hint_query, [Row]);
572 false ->
573
:-(
skip
574 end,
575
:-(
StartId.
576
577 %% @doc Convert offset to index of the first entry
578 %% Returns undefined if not there are not enough rows
579 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
580 when
581 PoolName :: mongoose_cassandra:pool_name(),
582 UserJID :: jid:jid(),
583 Offset :: non_neg_integer(),
584 Filter :: filter(),
585 Id :: non_neg_integer() | undefined.
586 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
587 2 QueryName = {list_message_ids_query, select_filter(Filter)},
588 2 Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
589 2 {ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
590 2 case RowsIds of
591
:-(
[] -> undefined;
592 [_ | _] ->
593 2 maps:get(id, lists:last(RowsIds))
594 end.
595
596 %% @doc Get closest offset -> message id 'hint' for specified offset
597 prev_offset_query_cql() ->
598 1515 "SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
599 "and offset <= ? LIMIT 1".
600
601 %% @doc Insert offset -> message id 'hint'
602 insert_offset_hint_query_cql() ->
603 1515 "INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
604
605 prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID) ->
606 252 BUserJID = bare_jid(UserJID),
607 %% In Cassandra, a column cannot be restricted by both an equality and an inequality relation.
608 %% When MsgID is defined, it is used as both StartID and EndID to comply with this limitation.
609 %% This means that the `ids` filter effectively overrides any "before" or "after" filters.
610 252 {StartID, EndID} = case MsgID of
611 undefined ->
612 245 mod_mam_utils:calculate_msg_id_borders(Borders, Start, End);
613 ID ->
614 7 {ID, ID}
615 end,
616 252 BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
617 252 prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
618
619 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
620 252 #mam_ca_filter{
621 user_jid = BUserJID,
622 with_jid = BWithJID,
623 start_id = StartID,
624 end_id = EndID
625 }.
626
627 eval_filter_params(#mam_ca_filter{
628 user_jid = BUserJID,
629 with_jid = BWithJID,
630 start_id = StartID,
631 end_id = EndID
632 }) ->
633 318 Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
634 #{start_id => StartID, end_id =>EndID}),
635 318 maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
636
637 select_filter(#mam_ca_filter{
638 start_id = StartID,
639 end_id = EndID
640 }) ->
641 318 select_filter(StartID, EndID).
642
643
644 -spec select_filter(StartID, EndID) ->
645 all | 'end' | start | start_end when
646 StartID :: integer() | undefined,
647 EndID :: integer() | undefined.
648 select_filter(undefined, undefined) ->
649 6272 all;
650 select_filter(undefined, _) ->
651 6102 'end';
652 select_filter(_, undefined) ->
653 6100 start;
654 select_filter(_, _) ->
655 6084 start_end.
656
657 prepare_filter_cql(StartID, EndID) ->
658 case StartID of
659 12120 undefined -> "";
660 12120 _ -> " AND id >= :start_id"
661 24240 end ++
662 case EndID of
663 12120 undefined -> "";
664 12120 _ -> " AND id <= :end_id"
665 end.
666
667 filter_to_cql() ->
668 6060 [{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
669 6060 || StartID <- [undefined, 0], EndID <- [undefined, 0]].
670
671 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
672 when
673 PoolName :: mongoose_cassandra:pool_name(),
674 UserJID :: jid:jid(),
675 HostType :: host_type(),
676 Filter :: filter(),
677 PageSize :: non_neg_integer(),
678 TotalCount :: non_neg_integer(),
679 RSM :: jlib:rsm_in() | undefined,
680 Offset :: non_neg_integer().
681 %% Requesting the Last Page in a Result Set
682 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
683
:-(
max(0, TC - PS);
684 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
685 when is_integer(ID) ->
686 8 max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
687 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
688 when is_integer(ID) ->
689 13 calc_index(PoolName, UserJID, HostType, F, ID);
690 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
691
:-(
0.
692
693 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
694 248 maybe_full_jid(undefined) -> <<>>;
695 maybe_full_jid(JID) ->
696 4 jid:to_binary(jid:to_lower(JID)).
697
698 %%====================================================================
699 %% Internal SQL part
700 %%====================================================================
701
702 extract_messages_queries() ->
703 1515 [{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
704 1515 || {FilterName, Filter} <- filter_to_cql()].
705
706 extract_messages_r_queries() ->
707 1515 [{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
708 1515 || {FilterName, Filter} <- filter_to_cql()].
709
710 calc_count_queries() ->
711 1515 [{{calc_count_query, FilterName}, calc_count_cql(Filter)}
712 1515 || {FilterName, Filter} <- filter_to_cql()].
713
714 list_message_ids_queries() ->
715 1515 [{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
716 1515 || {FilterName, Filter} <- filter_to_cql()].
717
718 extract_messages_cql(Filter) ->
719 "SELECT id, from_jid, message FROM mam_message "
720 6060 "WHERE user_jid = ? AND with_jid = ? " ++
721 Filter ++ " ORDER BY id LIMIT ?".
722
723 extract_messages_r_cql(Filter) ->
724 "SELECT id, from_jid, message FROM mam_message "
725 6060 "WHERE user_jid = ? AND with_jid = ? " ++
726 Filter ++ " ORDER BY id DESC LIMIT ?".
727
728 calc_count_cql(Filter) ->
729 "SELECT COUNT(*) FROM mam_message "
730 6060 "WHERE user_jid = ? AND with_jid = ? " ++ Filter.
731
732 list_message_ids_cql(Filter) ->
733 "SELECT id FROM mam_message "
734 6060 "WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
735 " ORDER BY id LIMIT ?".
736
737 fetch_user_messages_cql() ->
738 1515 "SELECT id, from_jid, message FROM mam_message "
739 "WHERE user_jid = ? AND with_jid = ? "
740 "ORDER BY id".
741
742
743
744 %% ----------------------------------------------------------------------
745 %% Optimizations
746
747 packet_to_stored_binary(HostType, Packet) ->
748 %% Module implementing mam_message behaviour
749 1029 Module = db_message_format(HostType),
750 1029 mam_message:encode(Module, Packet).
751
752 stored_binary_to_packet(HostType, Bin) ->
753 %% Module implementing mam_message behaviour
754 451 Module = db_message_format(HostType),
755 451 mam_message:decode(Module, Bin).
756
757 %% ----------------------------------------------------------------------
758 %% Params getters
759
760 -spec db_message_format(HostType :: host_type()) -> module().
761 db_message_format(HostType) ->
762 1480 gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
763
764 -spec pool_name(HostType :: host_type()) -> term().
765 pool_name(_HostType) ->
766 1351 default.
Line Hits Source