./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1, hooks/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/3,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/3]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 %% ----------------------------------------------------------------------
32 %% Imports
33
34 %% Other
35 -import(mod_mam_utils,
36 [maybe_min/2,
37 maybe_max/2,
38 bare_jid/1,
39 full_jid/1
40 ]).
41
42 -include("jlib.hrl").
43 -include("mongoose_rsm.hrl").
44
45 -callback encode(binary()) -> binary().
46 -callback decode(binary()) -> binary().
47
48 -record(mam_ca_filter, {
49 user_jid,
50 with_jid,
51 remote_jid,
52 start_id,
53 end_id
54 }).
55
56 -record(mam_message, {
57 id :: non_neg_integer(),
58 user_jid :: binary(),
59 remote_jid :: binary() | undefined,
60 from_jid :: binary() | undefined,
61 with_jid = <<>> :: binary(),
62 message :: binary() | undefined
63 }).
64
65 %% ----------------------------------------------------------------------
66 %% Types
67
68 -type filter() :: #mam_ca_filter{}.
69 -type message_id() :: non_neg_integer().
70 -type host_type() :: mongooseim:host_type().
71
72 %% ----------------------------------------------------------------------
73 %% gen_mod callbacks
74 %% Starting and stopping functions for users' archives
75
76 -spec start(host_type(), gen_mod:module_opts()) -> ok.
77 start(_HostType, _Opts) ->
78 18 ok.
79
80 -spec stop(host_type()) -> ok.
81 stop(_HostType) ->
82 18 ok.
83
84 %% ----------------------------------------------------------------------
85 %% Add hooks for mod_mam_pm
86
87 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
88 hooks(HostType) ->
89 36 [{mam_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
90 {mam_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
91 {mam_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
92 {mam_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
93 {get_mam_pm_gdpr_data, HostType, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50}].
94
95 %% ----------------------------------------------------------------------
96 %% mongoose_cassandra_worker callbacks
97
98 prepared_queries() ->
99 [
100 {insert_offset_hint_query, insert_offset_hint_query_cql()},
101 {prev_offset_query, prev_offset_query_cql()},
102 {insert_query, insert_query_cql()},
103 {fetch_user_messages_query, fetch_user_messages_cql()},
104 {select_for_removal_query, select_for_removal_query_cql()},
105 {remove_archive_query, remove_archive_query_cql()},
106 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
107 ]
108 1164 ++ extract_messages_queries()
109 ++ extract_messages_r_queries()
110 ++ calc_count_queries()
111 ++ list_message_ids_queries().
112
113 %% ----------------------------------------------------------------------
114 %% Internal functions and callbacks
115
116 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
117 Acc :: integer(),
118 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
119 Extra :: gen_hook:extra().
120 archive_size(Size, #{owner := UserJID}, #{host_type := HostType}) when is_integer(Size) ->
121 104 Borders = Start = End = WithJID = undefined,
122 104 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
123 104 {ok, calc_count(pool_name(HostType), UserJID, HostType, Filter)}.
124
125
126 %% ----------------------------------------------------------------------
127 %% INSERT MESSAGE
128
129 insert_query_cql() ->
130 1164 "INSERT INTO mam_message "
131 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
132 "VALUES (?, ?, ?, ?, ?, ?)".
133
134 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
135 Acc :: ok,
136 Params :: mod_mam:archive_message_params(),
137 Extra :: gen_hook:extra().
138 archive_message(_Result, Params, #{host_type := HostType}) ->
139 735 try
140 735 {ok, archive_message2(Params, HostType)}
141 catch _Type:Reason ->
142
:-(
{ok, {error, Reason}}
143 end.
144
145 archive_message2(#{message_id := MessID,
146 local_jid := LocJID,
147 remote_jid := RemJID,
148 source_jid := SrcJID,
149 packet := Packet}, HostType) ->
150 735 BLocJID = bare_jid(LocJID),
151 735 BRemBareJID = bare_jid(RemJID),
152 735 BRemFullJID = full_jid(RemJID),
153 735 BSrcJID = full_jid(SrcJID),
154 735 BPacket = packet_to_stored_binary(HostType, Packet),
155 735 Message = #mam_message{
156 id = MessID,
157 user_jid = BLocJID,
158 from_jid = BSrcJID,
159 remote_jid = BRemFullJID,
160 message = BPacket
161 },
162 735 WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
163 735 Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
164 735 write_messages(HostType, LocJID, Messages).
165
166 write_messages(HostType, UserJID, Messages) ->
167 735 MultiParams = [message_to_params(M) || M <- Messages],
168 735 mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
169 ?MODULE, insert_query, MultiParams).
170
171 message_to_params(#mam_message{
172 id = MessID,
173 user_jid = BLocJID,
174 from_jid = BSrcJID,
175 remote_jid = BRemJID,
176 with_jid = BWithJID,
177 message = BPacket
178 }) ->
179 1720 #{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
180 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
181
182 %% ----------------------------------------------------------------------
183 %% REMOVE ARCHIVE
184
185 remove_archive_query_cql() ->
186 1164 "DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
187
188 remove_archive_offsets_query_cql() ->
189 1164 "DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
190
191 select_for_removal_query_cql() ->
192 1164 "SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
193
194 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
195 Acc :: term(),
196 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
197 Extra :: gen_hook:extra().
198 remove_archive(Acc, #{owner := UserJID}, #{host_type := HostType}) ->
199 49 remove_archive(HostType, UserJID),
200 49 {ok, Acc}.
201
202 remove_archive(HostType, UserJID) ->
203 49 PoolName = pool_name(HostType),
204 49 BUserJID = bare_jid(UserJID),
205 49 Params = #{user_jid => BUserJID},
206 %% Wait until deleted
207 49 DeleteFun =
208 fun(Rows, _AccIn) ->
209 49 mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
210 remove_archive_query, Rows),
211 49 mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
212 remove_archive_offsets_query, Rows)
213 end,
214
215 49 mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
216 select_for_removal_query, Params, DeleteFun, []).
217 %% ----------------------------------------------------------------------
218 %% SELECT MESSAGES
219
220 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
221 Acc :: {ok, mod_mam:lookup_result()} | {error, term()},
222 Params :: mam_iq:lookup_params(),
223 Extra :: gen_hook:extra().
224 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
225
:-(
{ok, Result};
226 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
227
:-(
{ok, {error, 'not-supported'}};
228 lookup_messages(_Result,
229 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
230 start_ts := Start, end_ts := End, with_jid := WithJID,
231 search_text := undefined, page_size := PageSize,
232 is_simple := IsSimple},
233 #{host_type := HostType}) ->
234 99 try
235 99 {ok, lookup_messages2(pool_name(HostType), HostType,
236 UserJID, RSM, Borders,
237 Start, End, WithJID,
238 PageSize, IsSimple)}
239 catch _Type:Reason:S ->
240
:-(
{ok, {error, {Reason, S}}}
241 end.
242
243 lookup_messages2(PoolName, HostType,
244 UserJID = #jid{}, RSM, Borders,
245 Start, End, WithJID,
246 PageSize, _IsSimple = true) ->
247 %% Simple query without calculating offset and total count
248 17 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
249 17 lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
250 lookup_messages2(PoolName, HostType,
251 UserJID = #jid{}, RSM, Borders,
252 Start, End, WithJID,
253 PageSize, _IsSimple) ->
254 %% Query with offset calculation
255 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
256 %% Not all queries are optimal. You would like to disable something for production
257 %% once you know how you will call bd
258 82 Strategy = rsm_to_strategy(RSM),
259 82 Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
260 82 case Strategy of
261 last_page ->
262 11 lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
263 by_offset ->
264 4 lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
265 first_page ->
266 48 lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
267 before_id ->
268 8 lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
269 after_id ->
270 11 lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
271 end.
272
273 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
274 11 last_page;
275 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
276
:-(
first_page;
277 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
278 4 by_offset;
279 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
280 8 before_id;
281 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
282 11 after_id;
283 rsm_to_strategy(#rsm_in{}) ->
284 14 first_page;
285 rsm_to_strategy(undefined) ->
286 34 first_page.
287
288 lookup_messages_simple(PoolName, HostType, UserJID,
289 #rsm_in{direction = aft, id = ID},
290 PageSize, Filter) ->
291 %% Get last rows from result set
292 6 MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
293 6 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
294 lookup_messages_simple(PoolName, HostType, UserJID,
295 #rsm_in{direction = before, id = ID},
296 PageSize, Filter) ->
297 10 MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
298 10 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
299 lookup_messages_simple(PoolName, HostType, UserJID,
300 #rsm_in{direction = undefined, index = Offset},
301 PageSize, Filter) ->
302 %% Apply offset
303
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
304 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
305
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
306 false),
307
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
308 lookup_messages_simple(PoolName, HostType, UserJID,
309 _,
310 PageSize, Filter) ->
311 1 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
312 1 {ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
313
314 lookup_messages_last_page(PoolName, HostType, UserJID,
315 #rsm_in{direction = before, id = undefined},
316 0, Filter) ->
317 %% Last page
318 2 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
319 2 {ok, {TotalCount, TotalCount, []}};
320 lookup_messages_last_page(PoolName, HostType, UserJID,
321 #rsm_in{direction = before, id = undefined},
322 PageSize, Filter) ->
323 %% Last page
324 9 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
325 9 MessageRowsCount = length(MessageRows),
326 9 case MessageRowsCount < PageSize of
327 true ->
328
:-(
{ok, {MessageRowsCount, 0,
329 rows_to_uniform_format(HostType, MessageRows)}};
330 false ->
331 9 FirstID = row_to_message_id(hd(MessageRows)),
332 9 Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
333 9 {ok, {Offset + MessageRowsCount, Offset,
334 rows_to_uniform_format(HostType, MessageRows)}}
335 end.
336
337 lookup_messages_by_offset(PoolName, HostType, UserJID,
338 #rsm_in{direction = undefined, index = Offset},
339 0, Filter) when is_integer(Offset) ->
340 %% By offset
341 2 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
342 2 {ok, {TotalCount, Offset, []}};
343 lookup_messages_by_offset(PoolName, HostType, UserJID,
344 #rsm_in{direction = undefined, index = Offset},
345 PageSize, Filter) when is_integer(Offset) ->
346 %% By offset
347 2 StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
348 2 MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
349 false),
350 2 MessageRowsCount = length(MessageRows),
351 2 case MessageRowsCount < PageSize of
352 true ->
353
:-(
{ok, {Offset + MessageRowsCount, Offset,
354 rows_to_uniform_format(HostType, MessageRows)}};
355 false ->
356 2 LastID = row_to_message_id(lists:last(MessageRows)),
357 2 CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
358 2 {ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
359 rows_to_uniform_format(HostType, MessageRows)}}
360 end.
361
362 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
363 %% First page, just count
364 4 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
365 4 {ok, {TotalCount, 0, []}};
366 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
367 %% First page
368 44 MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
369 44 MessageRowsCount = length(MessageRows),
370 44 case MessageRowsCount < PageSize of
371 true ->
372 %% Total number of messages is less than one page
373 37 {ok, {MessageRowsCount, 0,
374 rows_to_uniform_format(HostType, MessageRows)}};
375 false ->
376 7 LastID = row_to_message_id(lists:last(MessageRows)),
377 7 CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
378 7 {ok, {MessageRowsCount + CountAfterLastID, 0,
379 rows_to_uniform_format(HostType, MessageRows)}}
380 end.
381
382 lookup_messages_before_id(PoolName, HostType, UserJID,
383 RSM = #rsm_in{direction = before, id = ID},
384 PageSize, Filter) ->
385 8 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
386 8 Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
387 8 MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
388 PageSize + 1, true),
389 8 Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
390 8 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
391
392 lookup_messages_after_id(PoolName, HostType, UserJID,
393 RSM = #rsm_in{direction = aft, id = ID},
394 PageSize, Filter) ->
395 11 TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
396 11 Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
397 11 MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
398 PageSize + 1, false),
399 11 Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
400 11 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
401
402
403 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
404 15 Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
405
406 before_id(undefined, Filter) ->
407
:-(
Filter;
408 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
409 27 Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
410
411 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
412 19 Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
413
414 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
415 13 Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
416
417
418 rows_to_uniform_format(HostType, MessageRows) ->
419 91 [row_to_uniform_format(HostType, Row) || Row <- MessageRows].
420
421 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
422 363 SrcJID = jid:from_binary(FromJID),
423 363 Packet = stored_binary_to_packet(HostType, Msg),
424 363 #{id => MsgID, jid => SrcJID, packet => Packet}.
425
426 row_to_message_id(#{id := MsgID}) ->
427 18 MsgID.
428
429 -spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
430 Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
431 Params :: #{jid := jid:jid()},
432 Extra :: gen_hook:extra().
433 get_mam_pm_gdpr_data(Acc, #{jid := JID}, #{host_type := HostType}) ->
434 21 BinJID = jid:to_binary(jid:to_lower(JID)),
435 21 FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
436 21 Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
437 21 Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
438 21 {ok, Messages ++ Acc}.
439
440 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
441 42 {Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
442
443 %% Offset is not supported
444 %% Each record is a tuple of form
445 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
446 %% Columns are `["id", "from_jid", "message"]'.
447 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
448 [Row] when
449 PoolName :: mongoose_cassandra:pool_name(),
450 UserJID :: jid:jid(),
451 HostType :: host_type(),
452 Filter :: filter(),
453 IMax :: pos_integer(),
454 ReverseLimit :: boolean(),
455 Row :: mongoose_cassandra:row().
456 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
457
:-(
[];
458 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
459 64 QueryName = {extract_messages_query, select_filter(Filter)},
460 64 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
461 64 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
462 64 Rows;
463 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
464 27 QueryName = {extract_messages_r_query, select_filter(Filter)},
465 27 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
466 27 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
467 27 lists:reverse(Rows).
468
469 fetch_user_messages(PoolName, UserJID, FilterMap) ->
470 21 QueryName = fetch_user_messages_query,
471 21 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
472 21 Rows.
473
474
475 %% @doc Calculate a zero-based index of the row with UID in the result test.
476 %%
477 %% If the element does not exists, the ID of the next element will
478 %% be returned instead.
479 %% @end
480 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
481 when
482 PoolName :: mongoose_cassandra:pool_name(),
483 UserJID :: jid:jid(),
484 HostType :: host_type(),
485 Filter :: filter(),
486 MessID :: message_id(),
487 Count :: non_neg_integer().
488 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
489 11 calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
490
491 %% @doc Count of elements in RSet before the passed element.
492 %%
493 %% The element with the passed UID can be already deleted.
494 %% @end
495 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
496 when
497 PoolName :: mongoose_cassandra:pool_name(),
498 UserJID :: jid:jid(),
499 HostType :: host_type(),
500 Filter :: filter(),
501 MessID :: message_id(),
502 Count :: non_neg_integer().
503 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
504 8 calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
505
506
507 %% @doc Get the total result set size.
508 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
509 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
510 when
511 PoolName :: mongoose_cassandra:pool_name(),
512 UserJID :: jid:jid(),
513 HostType :: host_type(),
514 Filter :: filter(),
515 Count :: non_neg_integer().
516 calc_count(PoolName, UserJID, _HostType, Filter) ->
517 168 QueryName = {calc_count_query, select_filter(Filter)},
518 168 Params = eval_filter_params(Filter),
519 168 {ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
520 168 Count.
521
522 %% @doc Convert offset to index of the first entry
523 %% Returns undefined if not there are not enough rows
524 %% Uses previously calculated offsets to speed up queries
525 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
526 when
527 PoolName :: mongoose_cassandra:pool_name(),
528 UserJID :: jid:jid(),
529 Offset :: non_neg_integer(),
530 Filter :: filter(),
531 Id :: non_neg_integer() | undefined.
532 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
533 Offset =< 100 ->
534 2 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
535 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
536
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
537 %% Try to find already calculated nearby offset to reduce query size
538
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
539 {ok, []} -> %% No hints, just calculate offset sloooowly
540
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
541
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
542 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
543 %% Offset hint found, use it to reduce query size
544
:-(
case Offset of
545
:-(
PrevOffset -> PrevId;
546 _ ->
547
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
548 Filter#mam_ca_filter{start_id = PrevId},
549 Offset - PrevOffset + 1),
550
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
551 end
552 end.
553
554 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
555 %% Hint is save only if previous offset hint was 50+ entires from current query
556 %% This function returns given StartId as passthrough for convenience
557 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
558 Filter :: filter(), HintOffset :: non_neg_integer(),
559 NewOffset :: non_neg_integer(),
560 StartId :: non_neg_integer() | undefined) ->
561 StartId :: non_neg_integer() | undefined.
562 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
563 StartId = undefined) ->
564
:-(
StartId;
565 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
566
:-(
case abs(NewOffset - HintOffset) > 50 of
567 true ->
568
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
569
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
570
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
571 insert_offset_hint_query, [Row]);
572 false ->
573
:-(
skip
574 end,
575
:-(
StartId.
576
577 %% @doc Convert offset to index of the first entry
578 %% Returns undefined if not there are not enough rows
579 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
580 when
581 PoolName :: mongoose_cassandra:pool_name(),
582 UserJID :: jid:jid(),
583 Offset :: non_neg_integer(),
584 Filter :: filter(),
585 Id :: non_neg_integer() | undefined.
586 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
587 2 QueryName = {list_message_ids_query, select_filter(Filter)},
588 2 Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
589 2 {ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
590 2 case RowsIds of
591
:-(
[] -> undefined;
592 [_ | _] ->
593 2 maps:get(id, lists:last(RowsIds))
594 end.
595
596 %% @doc Get closest offset -> message id 'hint' for specified offset
597 prev_offset_query_cql() ->
598 1164 "SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
599 "and offset <= ? LIMIT 1".
600
601 %% @doc Insert offset -> message id 'hint'
602 insert_offset_hint_query_cql() ->
603 1164 "INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
604
605 prepare_filter(UserJID, Borders, Start, End, WithJID) ->
606 203 BUserJID = bare_jid(UserJID),
607 203 {StartID, EndID} = mod_mam_utils:calculate_msg_id_borders(Borders, Start, End),
608 203 BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
609 203 prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
610
611 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
612 203 #mam_ca_filter{
613 user_jid = BUserJID,
614 with_jid = BWithJID,
615 start_id = StartID,
616 end_id = EndID
617 }.
618
619 eval_filter_params(#mam_ca_filter{
620 user_jid = BUserJID,
621 with_jid = BWithJID,
622 start_id = StartID,
623 end_id = EndID
624 }) ->
625 261 Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
626 #{start_id => StartID, end_id =>EndID}),
627 261 maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
628
629 select_filter(#mam_ca_filter{
630 start_id = StartID,
631 end_id = EndID
632 }) ->
633 261 select_filter(StartID, EndID).
634
635
636 -spec select_filter(StartID, EndID) ->
637 all | 'end' | start | start_end when
638 StartID :: integer() | undefined,
639 EndID :: integer() | undefined.
640 select_filter(undefined, undefined) ->
641 4828 all;
642 select_filter(undefined, _) ->
643 4696 'end';
644 select_filter(_, undefined) ->
645 4692 start;
646 select_filter(_, _) ->
647 4669 start_end.
648
649 prepare_filter_cql(StartID, EndID) ->
650 case StartID of
651 9312 undefined -> "";
652 9312 _ -> " AND id >= :start_id"
653 18624 end ++
654 case EndID of
655 9312 undefined -> "";
656 9312 _ -> " AND id <= :end_id"
657 end.
658
659 filter_to_cql() ->
660 4656 [{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
661 4656 || StartID <- [undefined, 0], EndID <- [undefined, 0]].
662
663 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
664 when
665 PoolName :: mongoose_cassandra:pool_name(),
666 UserJID :: jid:jid(),
667 HostType :: host_type(),
668 Filter :: filter(),
669 PageSize :: non_neg_integer(),
670 TotalCount :: non_neg_integer(),
671 RSM :: jlib:rsm_in() | undefined,
672 Offset :: non_neg_integer().
673 %% Requesting the Last Page in a Result Set
674 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
675
:-(
max(0, TC - PS);
676 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
677 when is_integer(ID) ->
678 8 max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
679 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
680 when is_integer(ID) ->
681 11 calc_index(PoolName, UserJID, HostType, F, ID);
682 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
683
:-(
0.
684
685 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
686 201 maybe_full_jid(undefined) -> <<>>;
687 maybe_full_jid(JID) ->
688 2 jid:to_binary(jid:to_lower(JID)).
689
690 %%====================================================================
691 %% Internal SQL part
692 %%====================================================================
693
694 extract_messages_queries() ->
695 1164 [{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
696 1164 || {FilterName, Filter} <- filter_to_cql()].
697
698 extract_messages_r_queries() ->
699 1164 [{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
700 1164 || {FilterName, Filter} <- filter_to_cql()].
701
702 calc_count_queries() ->
703 1164 [{{calc_count_query, FilterName}, calc_count_cql(Filter)}
704 1164 || {FilterName, Filter} <- filter_to_cql()].
705
706 list_message_ids_queries() ->
707 1164 [{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
708 1164 || {FilterName, Filter} <- filter_to_cql()].
709
710 extract_messages_cql(Filter) ->
711 "SELECT id, from_jid, message FROM mam_message "
712 4656 "WHERE user_jid = ? AND with_jid = ? " ++
713 Filter ++ " ORDER BY id LIMIT ?".
714
715 extract_messages_r_cql(Filter) ->
716 "SELECT id, from_jid, message FROM mam_message "
717 4656 "WHERE user_jid = ? AND with_jid = ? " ++
718 Filter ++ " ORDER BY id DESC LIMIT ?".
719
720 calc_count_cql(Filter) ->
721 "SELECT COUNT(*) FROM mam_message "
722 4656 "WHERE user_jid = ? AND with_jid = ? " ++ Filter.
723
724 list_message_ids_cql(Filter) ->
725 "SELECT id FROM mam_message "
726 4656 "WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
727 " ORDER BY id LIMIT ?".
728
729 fetch_user_messages_cql() ->
730 1164 "SELECT id, from_jid, message FROM mam_message "
731 "WHERE user_jid = ? AND with_jid = ? "
732 "ORDER BY id".
733
734
735
736 %% ----------------------------------------------------------------------
737 %% Optimizations
738
739 packet_to_stored_binary(HostType, Packet) ->
740 %% Module implementing mam_message behaviour
741 735 Module = db_message_format(HostType),
742 735 mam_message:encode(Module, Packet).
743
744 stored_binary_to_packet(HostType, Bin) ->
745 %% Module implementing mam_message behaviour
746 405 Module = db_message_format(HostType),
747 405 mam_message:decode(Module, Bin).
748
749 %% ----------------------------------------------------------------------
750 %% Params getters
751
752 -spec db_message_format(HostType :: host_type()) -> module().
753 db_message_format(HostType) ->
754 1140 gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
755
756 -spec pool_name(HostType :: host_type()) -> term().
757 pool_name(_HostType) ->
758 1008 default.
Line Hits Source