./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/4,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/4]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 -ignore_xref([
32 behaviour_info/1, remove_archive/4
33 ]).
34
35 %% ----------------------------------------------------------------------
36 %% Imports
37
38 %% Other
39 -import(mod_mam_utils,
40 [maybe_min/2,
41 maybe_max/2,
42 bare_jid/1,
43 full_jid/1
44 ]).
45
46 -include("jlib.hrl").
47 -include("mongoose_rsm.hrl").
48
49 -callback encode(binary()) -> binary().
50 -callback decode(binary()) -> binary().
51
52 -record(mam_ca_filter, {
53 user_jid,
54 with_jid,
55 remote_jid,
56 start_id,
57 end_id
58 }).
59
60 -record(mam_message, {
61 id :: non_neg_integer(),
62 user_jid :: binary(),
63 remote_jid :: binary() | undefined,
64 from_jid :: binary() | undefined,
65 with_jid = <<>> :: binary(),
66 message :: binary() | undefined
67 }).
68
69 %% ----------------------------------------------------------------------
70 %% Types
71
72 -type filter() :: #mam_ca_filter{}.
73 -type message_id() :: non_neg_integer().
74 -type host_type() :: mongooseim:host_type().
75
76 %% ----------------------------------------------------------------------
77 %% gen_mod callbacks
78 %% Starting and stopping functions for users' archives
79
80 -spec start(host_type(), gen_mod:module_opts()) -> ok.
81 start(HostType, _Opts) ->
82
:-(
ejabberd_hooks:add(hooks(HostType)).
83
84 -spec stop(host_type()) -> ok.
85 stop(HostType) ->
86
:-(
ejabberd_hooks:delete(hooks(HostType)).
87
88 %% ----------------------------------------------------------------------
89 %% Add hooks for mod_mam
90
91 -spec hooks(host_type()) -> [ejabberd_hooks:hook()].
92 hooks(HostType) ->
93
:-(
[{mam_archive_message, HostType, ?MODULE, archive_message, 50},
94 {mam_archive_size, HostType, ?MODULE, archive_size, 50},
95 {mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50},
96 {mam_remove_archive, HostType, ?MODULE, remove_archive, 50},
97 {get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50}].
98
99 %% ----------------------------------------------------------------------
100 %% mongoose_cassandra_worker callbacks
101
102 prepared_queries() ->
103 [
104 {insert_offset_hint_query, insert_offset_hint_query_cql()},
105 {prev_offset_query, prev_offset_query_cql()},
106 {insert_query, insert_query_cql()},
107 {fetch_user_messages_query, fetch_user_messages_cql()},
108 {select_for_removal_query, select_for_removal_query_cql()},
109 {remove_archive_query, remove_archive_query_cql()},
110 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
111 ]
112
:-(
++ extract_messages_queries()
113 ++ extract_messages_r_queries()
114 ++ calc_count_queries()
115 ++ list_message_ids_queries().
116
117 %% ----------------------------------------------------------------------
118 %% Internal functions and callbacks
119
120 archive_size(Size, HostType, _UserID, UserJID) when is_integer(Size) ->
121
:-(
Borders = Start = End = WithJID = undefined,
122
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
123
:-(
calc_count(pool_name(HostType), UserJID, HostType, Filter).
124
125
126 %% ----------------------------------------------------------------------
127 %% INSERT MESSAGE
128
129 insert_query_cql() ->
130
:-(
"INSERT INTO mam_message "
131 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
132 "VALUES (?, ?, ?, ?, ?, ?)".
133
134 archive_message(_Result, HostType, Params) ->
135
:-(
try
136
:-(
archive_message2(Params, HostType)
137 catch _Type:Reason ->
138
:-(
{error, Reason}
139 end.
140
141 archive_message2(#{message_id := MessID,
142 local_jid := LocJID,
143 remote_jid := RemJID,
144 source_jid := SrcJID,
145 packet := Packet}, HostType) ->
146
:-(
BLocJID = bare_jid(LocJID),
147
:-(
BRemBareJID = bare_jid(RemJID),
148
:-(
BRemFullJID = full_jid(RemJID),
149
:-(
BSrcJID = full_jid(SrcJID),
150
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
151
:-(
Message = #mam_message{
152 id = MessID,
153 user_jid = BLocJID,
154 from_jid = BSrcJID,
155 remote_jid = BRemFullJID,
156 message = BPacket
157 },
158
:-(
WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
159
:-(
Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
160
:-(
write_messages(HostType, LocJID, Messages).
161
162 write_messages(HostType, UserJID, Messages) ->
163
:-(
MultiParams = [message_to_params(M) || M <- Messages],
164
:-(
mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
165 ?MODULE, insert_query, MultiParams).
166
167 message_to_params(#mam_message{
168 id = MessID,
169 user_jid = BLocJID,
170 from_jid = BSrcJID,
171 remote_jid = BRemJID,
172 with_jid = BWithJID,
173 message = BPacket
174 }) ->
175
:-(
#{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
176 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
177
178 %% ----------------------------------------------------------------------
179 %% REMOVE ARCHIVE
180
181 remove_archive_query_cql() ->
182
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
183
184 remove_archive_offsets_query_cql() ->
185
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
186
187 select_for_removal_query_cql() ->
188
:-(
"SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
189
190 remove_archive(Acc, HostType, _UserID, UserJID) ->
191
:-(
remove_archive(HostType, UserJID),
192
:-(
Acc.
193
194 remove_archive(HostType, UserJID) ->
195
:-(
PoolName = pool_name(HostType),
196
:-(
BUserJID = bare_jid(UserJID),
197
:-(
Params = #{user_jid => BUserJID},
198 %% Wait until deleted
199
:-(
DeleteFun =
200 fun(Rows, _AccIn) ->
201
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
202 remove_archive_query, Rows),
203
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
204 remove_archive_offsets_query, Rows)
205 end,
206
207
:-(
mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
208 select_for_removal_query, Params, DeleteFun, []).
209 %% ----------------------------------------------------------------------
210 %% SELECT MESSAGES
211
212 -spec lookup_messages(Result :: any(), HostType :: host_type(), Params :: map()) ->
213 {ok, mod_mam:lookup_result()}.
214 lookup_messages({error, _Reason} = Result, _HostType, _Params) ->
215
:-(
Result;
216 lookup_messages(_Result, _HostType, #{search_text := <<_/binary>>}) ->
217
:-(
{error, 'not-supported'};
218 lookup_messages(_Result, HostType,
219 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
220 start_ts := Start, end_ts := End, with_jid := WithJID,
221 search_text := undefined, page_size := PageSize,
222 is_simple := IsSimple}) ->
223
:-(
try
224
:-(
lookup_messages2(pool_name(HostType), HostType,
225 UserJID, RSM, Borders,
226 Start, End, WithJID,
227 PageSize, IsSimple)
228 catch _Type:Reason:S ->
229
:-(
{error, {Reason, S}}
230 end.
231
232 lookup_messages2(PoolName, HostType,
233 UserJID = #jid{}, RSM, Borders,
234 Start, End, WithJID,
235 PageSize, _IsSimple = true) ->
236 %% Simple query without calculating offset and total count
237
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
238
:-(
lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
239 lookup_messages2(PoolName, HostType,
240 UserJID = #jid{}, RSM, Borders,
241 Start, End, WithJID,
242 PageSize, _IsSimple) ->
243 %% Query with offset calculation
244 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
245 %% Not all queries are optimal. You would like to disable something for production
246 %% once you know how you will call bd
247
:-(
Strategy = rsm_to_strategy(RSM),
248
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID),
249
:-(
case Strategy of
250 last_page ->
251
:-(
lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
252 by_offset ->
253
:-(
lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
254 first_page ->
255
:-(
lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
256 before_id ->
257
:-(
lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
258 after_id ->
259
:-(
lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
260 end.
261
262 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
263
:-(
last_page;
264 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
265
:-(
first_page;
266 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
267
:-(
by_offset;
268 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
269
:-(
before_id;
270 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
271
:-(
after_id;
272 rsm_to_strategy(#rsm_in{}) ->
273
:-(
first_page;
274 rsm_to_strategy(undefined) ->
275
:-(
first_page.
276
277 lookup_messages_simple(PoolName, HostType, UserJID,
278 #rsm_in{direction = aft, id = ID},
279 PageSize, Filter) ->
280 %% Get last rows from result set
281
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
282
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
283 lookup_messages_simple(PoolName, HostType, UserJID,
284 #rsm_in{direction = before, id = ID},
285 PageSize, Filter) ->
286
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
287
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
288 lookup_messages_simple(PoolName, HostType, UserJID,
289 #rsm_in{direction = undefined, index = Offset},
290 PageSize, Filter) ->
291 %% Apply offset
292
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
293 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
294
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
295 false),
296
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
297 lookup_messages_simple(PoolName, HostType, UserJID,
298 _,
299 PageSize, Filter) ->
300
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
301
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
302
303 lookup_messages_last_page(PoolName, HostType, UserJID,
304 #rsm_in{direction = before, id = undefined},
305 0, Filter) ->
306 %% Last page
307
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
308
:-(
{ok, {TotalCount, TotalCount, []}};
309 lookup_messages_last_page(PoolName, HostType, UserJID,
310 #rsm_in{direction = before, id = undefined},
311 PageSize, Filter) ->
312 %% Last page
313
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
314
:-(
MessageRowsCount = length(MessageRows),
315
:-(
case MessageRowsCount < PageSize of
316 true ->
317
:-(
{ok, {MessageRowsCount, 0,
318 rows_to_uniform_format(HostType, MessageRows)}};
319 false ->
320
:-(
FirstID = row_to_message_id(hd(MessageRows)),
321
:-(
Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
322
:-(
{ok, {Offset + MessageRowsCount, Offset,
323 rows_to_uniform_format(HostType, MessageRows)}}
324 end.
325
326 lookup_messages_by_offset(PoolName, HostType, UserJID,
327 #rsm_in{direction = undefined, index = Offset},
328 0, Filter) when is_integer(Offset) ->
329 %% By offset
330
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
331
:-(
{ok, {TotalCount, Offset, []}};
332 lookup_messages_by_offset(PoolName, HostType, UserJID,
333 #rsm_in{direction = undefined, index = Offset},
334 PageSize, Filter) when is_integer(Offset) ->
335 %% By offset
336
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
337
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
338 false),
339
:-(
MessageRowsCount = length(MessageRows),
340
:-(
case MessageRowsCount < PageSize of
341 true ->
342
:-(
{ok, {Offset + MessageRowsCount, Offset,
343 rows_to_uniform_format(HostType, MessageRows)}};
344 false ->
345
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
346
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
347
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
348 rows_to_uniform_format(HostType, MessageRows)}}
349 end.
350
351 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
352 %% First page, just count
353
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
354
:-(
{ok, {TotalCount, 0, []}};
355 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
356 %% First page
357
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
358
:-(
MessageRowsCount = length(MessageRows),
359
:-(
case MessageRowsCount < PageSize of
360 true ->
361 %% Total number of messages is less than one page
362
:-(
{ok, {MessageRowsCount, 0,
363 rows_to_uniform_format(HostType, MessageRows)}};
364 false ->
365
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
366
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
367
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
368 rows_to_uniform_format(HostType, MessageRows)}}
369 end.
370
371 lookup_messages_before_id(PoolName, HostType, UserJID,
372 RSM = #rsm_in{direction = before, id = ID},
373 PageSize, Filter) ->
374
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
375
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
376
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
377 PageSize + 1, true),
378
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
379
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
380
381 lookup_messages_after_id(PoolName, HostType, UserJID,
382 RSM = #rsm_in{direction = aft, id = ID},
383 PageSize, Filter) ->
384
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
385
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
386
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
387 PageSize + 1, false),
388
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
389
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
390
391
392 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
393
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
394
395 before_id(undefined, Filter) ->
396
:-(
Filter;
397 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
398
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
399
400 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
401
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
402
403 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
404
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
405
406
407 rows_to_uniform_format(HostType, MessageRows) ->
408
:-(
[row_to_uniform_format(HostType, Row) || Row <- MessageRows].
409
410 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
411
:-(
SrcJID = jid:from_binary(FromJID),
412
:-(
Packet = stored_binary_to_packet(HostType, Msg),
413
:-(
#{id => MsgID, jid => SrcJID, packet => Packet}.
414
415 row_to_message_id(#{id := MsgID}) ->
416
:-(
MsgID.
417
418 -spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
419 host_type(), jid:jid()) ->
420 ejabberd_gen_mam_archive:mam_pm_gdpr_data().
421 get_mam_pm_gdpr_data(Acc, HostType, JID) ->
422
:-(
BinJID = jid:to_binary(jid:to_lower(JID)),
423
:-(
FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
424
:-(
Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
425
:-(
Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
426
:-(
Messages ++ Acc.
427
428 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
429
:-(
{Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
430
431 %% Offset is not supported
432 %% Each record is a tuple of form
433 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
434 %% Columns are `["id", "from_jid", "message"]'.
435 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
436 [Row] when
437 PoolName :: mongoose_cassandra:pool_name(),
438 UserJID :: jid:jid(),
439 HostType :: host_type(),
440 Filter :: filter(),
441 IMax :: pos_integer(),
442 ReverseLimit :: boolean(),
443 Row :: mongoose_cassandra:row().
444 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
445
:-(
[];
446 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
447
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
448
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
449
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
450
:-(
Rows;
451 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
452
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
453
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
454
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
455
:-(
lists:reverse(Rows).
456
457 fetch_user_messages(PoolName, UserJID, FilterMap) ->
458
:-(
QueryName = fetch_user_messages_query,
459
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
460
:-(
Rows.
461
462
463 %% @doc Calculate a zero-based index of the row with UID in the result test.
464 %%
465 %% If the element does not exists, the ID of the next element will
466 %% be returned instead.
467 %% @end
468 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
469 when
470 PoolName :: mongoose_cassandra:pool_name(),
471 UserJID :: jid:jid(),
472 HostType :: host_type(),
473 Filter :: filter(),
474 MessID :: message_id(),
475 Count :: non_neg_integer().
476 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
477
:-(
calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
478
479 %% @doc Count of elements in RSet before the passed element.
480 %%
481 %% The element with the passed UID can be already deleted.
482 %% @end
483 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
484 when
485 PoolName :: mongoose_cassandra:pool_name(),
486 UserJID :: jid:jid(),
487 HostType :: host_type(),
488 Filter :: filter(),
489 MessID :: message_id(),
490 Count :: non_neg_integer().
491 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
492
:-(
calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
493
494
495 %% @doc Get the total result set size.
496 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
497 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
498 when
499 PoolName :: mongoose_cassandra:pool_name(),
500 UserJID :: jid:jid(),
501 HostType :: host_type(),
502 Filter :: filter(),
503 Count :: non_neg_integer().
504 calc_count(PoolName, UserJID, _HostType, Filter) ->
505
:-(
QueryName = {calc_count_query, select_filter(Filter)},
506
:-(
Params = eval_filter_params(Filter),
507
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
508
:-(
Count.
509
510 %% @doc Convert offset to index of the first entry
511 %% Returns undefined if not there are not enough rows
512 %% Uses previously calculated offsets to speed up queries
513 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
514 when
515 PoolName :: mongoose_cassandra:pool_name(),
516 UserJID :: jid:jid(),
517 Offset :: non_neg_integer(),
518 Filter :: filter(),
519 Id :: non_neg_integer() | undefined.
520 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
521 Offset =< 100 ->
522
:-(
calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
523 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
524
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
525 %% Try to find already calculated nearby offset to reduce query size
526
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
527 {ok, []} -> %% No hints, just calculate offset sloooowly
528
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
529
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
530 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
531 %% Offset hint found, use it to reduce query size
532
:-(
case Offset of
533
:-(
PrevOffset -> PrevId;
534 _ ->
535
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
536 Filter#mam_ca_filter{start_id = PrevId},
537 Offset - PrevOffset + 1),
538
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
539 end
540 end.
541
542 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
543 %% Hint is save only if previous offset hint was 50+ entires from current query
544 %% This function returns given StartId as passthrough for convenience
545 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
546 Filter :: filter(), HintOffset :: non_neg_integer(),
547 NewOffset :: non_neg_integer(),
548 StartId :: non_neg_integer() | undefined) ->
549 StartId :: non_neg_integer() | undefined.
550 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
551 StartId = undefined) ->
552
:-(
StartId;
553 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
554
:-(
case abs(NewOffset - HintOffset) > 50 of
555 true ->
556
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
557
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
558
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
559 insert_offset_hint_query, [Row]);
560 false ->
561
:-(
skip
562 end,
563
:-(
StartId.
564
565 %% @doc Convert offset to index of the first entry
566 %% Returns undefined if not there are not enough rows
567 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
568 when
569 PoolName :: mongoose_cassandra:pool_name(),
570 UserJID :: jid:jid(),
571 Offset :: non_neg_integer(),
572 Filter :: filter(),
573 Id :: non_neg_integer() | undefined.
574 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
575
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
576
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
577
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
578
:-(
case RowsIds of
579
:-(
[] -> undefined;
580 [_ | _] ->
581
:-(
maps:get(id, lists:last(RowsIds))
582 end.
583
584 %% @doc Get closest offset -> message id 'hint' for specified offset
585 prev_offset_query_cql() ->
586
:-(
"SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
587 "and offset <= ? LIMIT 1".
588
589 %% @doc Insert offset -> message id 'hint'
590 insert_offset_hint_query_cql() ->
591
:-(
"INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
592
593 prepare_filter(UserJID, Borders, Start, End, WithJID) ->
594
:-(
BUserJID = bare_jid(UserJID),
595
:-(
{StartID, EndID} = mod_mam_utils:calculate_msg_id_borders(Borders, Start, End),
596
:-(
BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
597
:-(
prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
598
599 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
600
:-(
#mam_ca_filter{
601 user_jid = BUserJID,
602 with_jid = BWithJID,
603 start_id = StartID,
604 end_id = EndID
605 }.
606
607 eval_filter_params(#mam_ca_filter{
608 user_jid = BUserJID,
609 with_jid = BWithJID,
610 start_id = StartID,
611 end_id = EndID
612 }) ->
613
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
614 #{start_id => StartID, end_id =>EndID}),
615
:-(
maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
616
617 select_filter(#mam_ca_filter{
618 start_id = StartID,
619 end_id = EndID
620 }) ->
621
:-(
select_filter(StartID, EndID).
622
623
624 -spec select_filter(StartID, EndID) ->
625 all | 'end' | start | start_end when
626 StartID :: integer() | undefined,
627 EndID :: integer() | undefined.
628 select_filter(undefined, undefined) ->
629
:-(
all;
630 select_filter(undefined, _) ->
631
:-(
'end';
632 select_filter(_, undefined) ->
633
:-(
start;
634 select_filter(_, _) ->
635
:-(
start_end.
636
637 prepare_filter_cql(StartID, EndID) ->
638 case StartID of
639
:-(
undefined -> "";
640
:-(
_ -> " AND id >= :start_id"
641
:-(
end ++
642 case EndID of
643
:-(
undefined -> "";
644
:-(
_ -> " AND id <= :end_id"
645 end.
646
647 filter_to_cql() ->
648
:-(
[{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
649
:-(
|| StartID <- [undefined, 0], EndID <- [undefined, 0]].
650
651 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
652 when
653 PoolName :: mongoose_cassandra:pool_name(),
654 UserJID :: jid:jid(),
655 HostType :: host_type(),
656 Filter :: filter(),
657 PageSize :: non_neg_integer(),
658 TotalCount :: non_neg_integer(),
659 RSM :: jlib:rsm_in() | undefined,
660 Offset :: non_neg_integer().
661 %% Requesting the Last Page in a Result Set
662 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
663
:-(
max(0, TC - PS);
664 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
665 when is_integer(ID) ->
666
:-(
max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
667 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
668 when is_integer(ID) ->
669
:-(
calc_index(PoolName, UserJID, HostType, F, ID);
670 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
671
:-(
0.
672
673 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
674
:-(
maybe_full_jid(undefined) -> <<>>;
675 maybe_full_jid(JID) ->
676
:-(
jid:to_binary(jid:to_lower(JID)).
677
678 %%====================================================================
679 %% Internal SQL part
680 %%====================================================================
681
682 extract_messages_queries() ->
683
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
684
:-(
|| {FilterName, Filter} <- filter_to_cql()].
685
686 extract_messages_r_queries() ->
687
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
688
:-(
|| {FilterName, Filter} <- filter_to_cql()].
689
690 calc_count_queries() ->
691
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
692
:-(
|| {FilterName, Filter} <- filter_to_cql()].
693
694 list_message_ids_queries() ->
695
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
696
:-(
|| {FilterName, Filter} <- filter_to_cql()].
697
698 extract_messages_cql(Filter) ->
699 "SELECT id, from_jid, message FROM mam_message "
700
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
701 Filter ++ " ORDER BY id LIMIT ?".
702
703 extract_messages_r_cql(Filter) ->
704 "SELECT id, from_jid, message FROM mam_message "
705
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
706 Filter ++ " ORDER BY id DESC LIMIT ?".
707
708 calc_count_cql(Filter) ->
709 "SELECT COUNT(*) FROM mam_message "
710
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter.
711
712 list_message_ids_cql(Filter) ->
713 "SELECT id FROM mam_message "
714
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
715 " ORDER BY id LIMIT ?".
716
717 fetch_user_messages_cql() ->
718
:-(
"SELECT id, from_jid, message FROM mam_message "
719 "WHERE user_jid = ? AND with_jid = ? "
720 "ORDER BY id".
721
722
723
724 %% ----------------------------------------------------------------------
725 %% Optimizations
726
727 packet_to_stored_binary(HostType, Packet) ->
728 %% Module implementing mam_message behaviour
729
:-(
Module = db_message_format(HostType),
730
:-(
Module:encode(Packet).
731
732 stored_binary_to_packet(HostType, Bin) ->
733 %% Module implementing mam_message behaviour
734
:-(
Module = db_message_format(HostType),
735
:-(
Module:decode(Bin).
736
737 %% ----------------------------------------------------------------------
738 %% Params getters
739
740 -spec db_message_format(HostType :: host_type()) -> module().
741 db_message_format(HostType) ->
742
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
743
744 -spec pool_name(HostType :: host_type()) -> term().
745 pool_name(_HostType) ->
746
:-(
default.
Line Hits Source