./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1, hooks/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/3,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/3]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 %% ----------------------------------------------------------------------
32 %% Imports
33
34 %% Other
35 -import(mod_mam_utils,
36 [maybe_min/2,
37 maybe_max/2,
38 bare_jid/1,
39 full_jid/1
40 ]).
41
42 -include("jlib.hrl").
43 -include("mongoose_rsm.hrl").
44
45 -callback encode(binary()) -> binary().
46 -callback decode(binary()) -> binary().
47
48 -record(mam_ca_filter, {
49 user_jid,
50 with_jid,
51 remote_jid,
52 start_id,
53 end_id
54 }).
55
56 -record(mam_message, {
57 id :: non_neg_integer(),
58 user_jid :: binary(),
59 remote_jid :: binary() | undefined,
60 from_jid :: binary() | undefined,
61 with_jid = <<>> :: binary(),
62 message :: binary() | undefined
63 }).
64
65 %% ----------------------------------------------------------------------
66 %% Types
67
68 -type filter() :: #mam_ca_filter{}.
69 -type message_id() :: non_neg_integer().
70 -type host_type() :: mongooseim:host_type().
71
72 %% ----------------------------------------------------------------------
73 %% gen_mod callbacks
74 %% Starting and stopping functions for users' archives
75
76 -spec start(host_type(), gen_mod:module_opts()) -> ok.
77 start(_HostType, _Opts) ->
78
:-(
ok.
79
80 -spec stop(host_type()) -> ok.
81 stop(_HostType) ->
82
:-(
ok.
83
84 %% ----------------------------------------------------------------------
85 %% Add hooks for mod_mam_pm
86
87 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
88 hooks(HostType) ->
89
:-(
[{mam_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
90 {mam_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
91 {mam_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
92 {mam_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
93 {get_mam_pm_gdpr_data, HostType, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50}].
94
95 %% ----------------------------------------------------------------------
96 %% mongoose_cassandra_worker callbacks
97
98 prepared_queries() ->
99 [
100 {insert_offset_hint_query, insert_offset_hint_query_cql()},
101 {prev_offset_query, prev_offset_query_cql()},
102 {insert_query, insert_query_cql()},
103 {fetch_user_messages_query, fetch_user_messages_cql()},
104 {select_for_removal_query, select_for_removal_query_cql()},
105 {remove_archive_query, remove_archive_query_cql()},
106 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
107 ]
108
:-(
++ extract_messages_queries()
109 ++ extract_messages_r_queries()
110 ++ calc_count_queries()
111 ++ list_message_ids_queries().
112
113 %% ----------------------------------------------------------------------
114 %% Internal functions and callbacks
115
116 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
117 Acc :: integer(),
118 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
119 Extra :: gen_hook:extra().
120 archive_size(Size, #{owner := UserJID}, #{host_type := HostType}) when is_integer(Size) ->
121
:-(
Borders = Start = End = WithJID = undefined,
122
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, undefined),
123
:-(
{ok, calc_count(pool_name(HostType), UserJID, HostType, Filter)}.
124
125
126 %% ----------------------------------------------------------------------
127 %% INSERT MESSAGE
128
129 insert_query_cql() ->
130
:-(
"INSERT INTO mam_message "
131 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
132 "VALUES (?, ?, ?, ?, ?, ?)".
133
134 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
135 Acc :: ok,
136 Params :: mod_mam:archive_message_params(),
137 Extra :: gen_hook:extra().
138 archive_message(_Result, Params, #{host_type := HostType}) ->
139
:-(
try
140
:-(
{ok, archive_message2(Params, HostType)}
141 catch _Type:Reason ->
142
:-(
{ok, {error, Reason}}
143 end.
144
145 archive_message2(#{message_id := MessID,
146 local_jid := LocJID,
147 remote_jid := RemJID,
148 source_jid := SrcJID,
149 packet := Packet}, HostType) ->
150
:-(
BLocJID = bare_jid(LocJID),
151
:-(
BRemBareJID = bare_jid(RemJID),
152
:-(
BRemFullJID = full_jid(RemJID),
153
:-(
BSrcJID = full_jid(SrcJID),
154
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
155
:-(
Message = #mam_message{
156 id = MessID,
157 user_jid = BLocJID,
158 from_jid = BSrcJID,
159 remote_jid = BRemFullJID,
160 message = BPacket
161 },
162
:-(
WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
163
:-(
Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
164
:-(
write_messages(HostType, LocJID, Messages).
165
166 write_messages(HostType, UserJID, Messages) ->
167
:-(
MultiParams = [message_to_params(M) || M <- Messages],
168
:-(
mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
169 ?MODULE, insert_query, MultiParams).
170
171 message_to_params(#mam_message{
172 id = MessID,
173 user_jid = BLocJID,
174 from_jid = BSrcJID,
175 remote_jid = BRemJID,
176 with_jid = BWithJID,
177 message = BPacket
178 }) ->
179
:-(
#{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
180 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
181
182 %% ----------------------------------------------------------------------
183 %% REMOVE ARCHIVE
184
185 remove_archive_query_cql() ->
186
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
187
188 remove_archive_offsets_query_cql() ->
189
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
190
191 select_for_removal_query_cql() ->
192
:-(
"SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
193
194 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
195 Acc :: term(),
196 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
197 Extra :: gen_hook:extra().
198 remove_archive(Acc, #{owner := UserJID}, #{host_type := HostType}) ->
199
:-(
remove_archive(HostType, UserJID),
200
:-(
{ok, Acc}.
201
202 remove_archive(HostType, UserJID) ->
203
:-(
PoolName = pool_name(HostType),
204
:-(
BUserJID = bare_jid(UserJID),
205
:-(
Params = #{user_jid => BUserJID},
206 %% Wait until deleted
207
:-(
DeleteFun =
208 fun(Rows, _AccIn) ->
209
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
210 remove_archive_query, Rows),
211
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
212 remove_archive_offsets_query, Rows)
213 end,
214
215
:-(
mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
216 select_for_removal_query, Params, DeleteFun, []).
217 %% ----------------------------------------------------------------------
218 %% SELECT MESSAGES
219
220 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
221 Acc :: {ok, mod_mam:lookup_result()} | {error, term()},
222 Params :: mam_iq:lookup_params(),
223 Extra :: gen_hook:extra().
224 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
225
:-(
{ok, Result};
226 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
227
:-(
{ok, {error, 'not-supported'}};
228 lookup_messages(_Result,
229 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
230 start_ts := Start, end_ts := End, with_jid := WithJID,
231 search_text := undefined, page_size := PageSize,
232 is_simple := IsSimple, message_id := MsgID},
233 #{host_type := HostType}) ->
234
:-(
try
235
:-(
{ok, lookup_messages2(pool_name(HostType), HostType,
236 UserJID, RSM, Borders,
237 Start, End, WithJID,
238 PageSize, MsgID, IsSimple)}
239 catch _Type:Reason:S ->
240
:-(
{ok, {error, {Reason, S}}}
241 end.
242
243 lookup_messages2(PoolName, HostType,
244 UserJID = #jid{}, RSM, Borders,
245 Start, End, WithJID,
246 PageSize, MsgID, _IsSimple = true) ->
247 %% Simple query without calculating offset and total count
248
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID),
249
:-(
lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
250 lookup_messages2(PoolName, HostType,
251 UserJID = #jid{}, RSM, Borders,
252 Start, End, WithJID,
253 PageSize, MsgID, _IsSimple) ->
254 %% Query with offset calculation
255 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
256 %% Not all queries are optimal. You would like to disable something for production
257 %% once you know how you will call bd
258
:-(
Strategy = rsm_to_strategy(RSM),
259
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID),
260
:-(
case Strategy of
261 last_page ->
262
:-(
lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
263 by_offset ->
264
:-(
lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
265 first_page ->
266
:-(
lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
267 before_id ->
268
:-(
lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
269 after_id ->
270
:-(
lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
271 end.
272
273 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
274
:-(
last_page;
275 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
276
:-(
first_page;
277 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
278
:-(
by_offset;
279 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
280
:-(
before_id;
281 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
282
:-(
after_id;
283 rsm_to_strategy(#rsm_in{}) ->
284
:-(
first_page;
285 rsm_to_strategy(undefined) ->
286
:-(
first_page.
287
288 lookup_messages_simple(PoolName, HostType, UserJID,
289 #rsm_in{direction = aft, id = ID},
290 PageSize, Filter) ->
291 %% Get last rows from result set
292
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
293
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
294 lookup_messages_simple(PoolName, HostType, UserJID,
295 #rsm_in{direction = before, id = ID},
296 PageSize, Filter) ->
297
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
298
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
299 lookup_messages_simple(PoolName, HostType, UserJID,
300 #rsm_in{direction = undefined, index = Offset},
301 PageSize, Filter) ->
302 %% Apply offset
303
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
304 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
305
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
306 false),
307
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
308 lookup_messages_simple(PoolName, HostType, UserJID,
309 _,
310 PageSize, Filter) ->
311
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
312
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
313
314 lookup_messages_last_page(PoolName, HostType, UserJID,
315 #rsm_in{direction = before, id = undefined},
316 0, Filter) ->
317 %% Last page
318
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
319
:-(
{ok, {TotalCount, TotalCount, []}};
320 lookup_messages_last_page(PoolName, HostType, UserJID,
321 #rsm_in{direction = before, id = undefined},
322 PageSize, Filter) ->
323 %% Last page
324
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
325
:-(
MessageRowsCount = length(MessageRows),
326
:-(
case MessageRowsCount < PageSize of
327 true ->
328
:-(
{ok, {MessageRowsCount, 0,
329 rows_to_uniform_format(HostType, MessageRows)}};
330 false ->
331
:-(
FirstID = row_to_message_id(hd(MessageRows)),
332
:-(
Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
333
:-(
{ok, {Offset + MessageRowsCount, Offset,
334 rows_to_uniform_format(HostType, MessageRows)}}
335 end.
336
337 lookup_messages_by_offset(PoolName, HostType, UserJID,
338 #rsm_in{direction = undefined, index = Offset},
339 0, Filter) when is_integer(Offset) ->
340 %% By offset
341
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
342
:-(
{ok, {TotalCount, Offset, []}};
343 lookup_messages_by_offset(PoolName, HostType, UserJID,
344 #rsm_in{direction = undefined, index = Offset},
345 PageSize, Filter) when is_integer(Offset) ->
346 %% By offset
347
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
348
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
349 false),
350
:-(
MessageRowsCount = length(MessageRows),
351
:-(
case MessageRowsCount < PageSize of
352 true ->
353
:-(
{ok, {Offset + MessageRowsCount, Offset,
354 rows_to_uniform_format(HostType, MessageRows)}};
355 false ->
356
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
357
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
358
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
359 rows_to_uniform_format(HostType, MessageRows)}}
360 end.
361
362 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
363 %% First page, just count
364
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
365
:-(
{ok, {TotalCount, 0, []}};
366 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
367 %% First page
368
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
369
:-(
MessageRowsCount = length(MessageRows),
370
:-(
case MessageRowsCount < PageSize of
371 true ->
372 %% Total number of messages is less than one page
373
:-(
{ok, {MessageRowsCount, 0,
374 rows_to_uniform_format(HostType, MessageRows)}};
375 false ->
376
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
377
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
378
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
379 rows_to_uniform_format(HostType, MessageRows)}}
380 end.
381
382 lookup_messages_before_id(PoolName, HostType, UserJID,
383 RSM = #rsm_in{direction = before, id = ID},
384 PageSize, Filter) ->
385
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
386
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
387
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
388 PageSize + 1, true),
389
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
390
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
391
392 lookup_messages_after_id(PoolName, HostType, UserJID,
393 RSM = #rsm_in{direction = aft, id = ID},
394 PageSize, Filter) ->
395
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
396
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
397
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
398 PageSize + 1, false),
399
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
400
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
401
402
403 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
404
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
405
406 before_id(undefined, Filter) ->
407
:-(
Filter;
408 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
409
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
410
411 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
412
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
413
414 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
415
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
416
417
418 rows_to_uniform_format(HostType, MessageRows) ->
419
:-(
[row_to_uniform_format(HostType, Row) || Row <- MessageRows].
420
421 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
422
:-(
SrcJID = jid:from_binary(FromJID),
423
:-(
Packet = stored_binary_to_packet(HostType, Msg),
424
:-(
#{id => MsgID, jid => SrcJID, packet => Packet}.
425
426 row_to_message_id(#{id := MsgID}) ->
427
:-(
MsgID.
428
429 -spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
430 Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
431 Params :: #{jid := jid:jid()},
432 Extra :: gen_hook:extra().
433 get_mam_pm_gdpr_data(Acc, #{jid := JID}, #{host_type := HostType}) ->
434
:-(
BinJID = jid:to_binary(jid:to_lower(JID)),
435
:-(
FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
436
:-(
Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
437
:-(
Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
438
:-(
{ok, Messages ++ Acc}.
439
440 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
441
:-(
{Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
442
443 %% Offset is not supported
444 %% Each record is a tuple of form
445 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
446 %% Columns are `["id", "from_jid", "message"]'.
447 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
448 [Row] when
449 PoolName :: mongoose_cassandra:pool_name(),
450 UserJID :: jid:jid(),
451 HostType :: host_type(),
452 Filter :: filter(),
453 IMax :: pos_integer(),
454 ReverseLimit :: boolean(),
455 Row :: mongoose_cassandra:row().
456 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
457
:-(
[];
458 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
459
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
460
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
461
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
462
:-(
Rows;
463 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
464
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
465
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
466
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
467
:-(
lists:reverse(Rows).
468
469 fetch_user_messages(PoolName, UserJID, FilterMap) ->
470
:-(
QueryName = fetch_user_messages_query,
471
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
472
:-(
Rows.
473
474
475 %% @doc Calculate a zero-based index of the row with UID in the result test.
476 %%
477 %% If the element does not exists, the ID of the next element will
478 %% be returned instead.
479 %% @end
480 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
481 when
482 PoolName :: mongoose_cassandra:pool_name(),
483 UserJID :: jid:jid(),
484 HostType :: host_type(),
485 Filter :: filter(),
486 MessID :: message_id(),
487 Count :: non_neg_integer().
488 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
489
:-(
calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
490
491 %% @doc Count of elements in RSet before the passed element.
492 %%
493 %% The element with the passed UID can be already deleted.
494 %% @end
495 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
496 when
497 PoolName :: mongoose_cassandra:pool_name(),
498 UserJID :: jid:jid(),
499 HostType :: host_type(),
500 Filter :: filter(),
501 MessID :: message_id(),
502 Count :: non_neg_integer().
503 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
504
:-(
calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
505
506
507 %% @doc Get the total result set size.
508 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
509 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
510 when
511 PoolName :: mongoose_cassandra:pool_name(),
512 UserJID :: jid:jid(),
513 HostType :: host_type(),
514 Filter :: filter(),
515 Count :: non_neg_integer().
516 calc_count(PoolName, UserJID, _HostType, Filter) ->
517
:-(
QueryName = {calc_count_query, select_filter(Filter)},
518
:-(
Params = eval_filter_params(Filter),
519
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
520
:-(
Count.
521
522 %% @doc Convert offset to index of the first entry
523 %% Returns undefined if not there are not enough rows
524 %% Uses previously calculated offsets to speed up queries
525 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
526 when
527 PoolName :: mongoose_cassandra:pool_name(),
528 UserJID :: jid:jid(),
529 Offset :: non_neg_integer(),
530 Filter :: filter(),
531 Id :: non_neg_integer() | undefined.
532 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
533 Offset =< 100 ->
534
:-(
calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
535 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
536
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
537 %% Try to find already calculated nearby offset to reduce query size
538
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
539 {ok, []} -> %% No hints, just calculate offset sloooowly
540
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
541
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
542 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
543 %% Offset hint found, use it to reduce query size
544
:-(
case Offset of
545
:-(
PrevOffset -> PrevId;
546 _ ->
547
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
548 Filter#mam_ca_filter{start_id = PrevId},
549 Offset - PrevOffset + 1),
550
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
551 end
552 end.
553
554 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
555 %% Hint is save only if previous offset hint was 50+ entires from current query
556 %% This function returns given StartId as passthrough for convenience
557 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
558 Filter :: filter(), HintOffset :: non_neg_integer(),
559 NewOffset :: non_neg_integer(),
560 StartId :: non_neg_integer() | undefined) ->
561 StartId :: non_neg_integer() | undefined.
562 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
563 StartId = undefined) ->
564
:-(
StartId;
565 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
566
:-(
case abs(NewOffset - HintOffset) > 50 of
567 true ->
568
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
569
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
570
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
571 insert_offset_hint_query, [Row]);
572 false ->
573
:-(
skip
574 end,
575
:-(
StartId.
576
577 %% @doc Convert offset to index of the first entry
578 %% Returns undefined if not there are not enough rows
579 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
580 when
581 PoolName :: mongoose_cassandra:pool_name(),
582 UserJID :: jid:jid(),
583 Offset :: non_neg_integer(),
584 Filter :: filter(),
585 Id :: non_neg_integer() | undefined.
586 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
587
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
588
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
589
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
590
:-(
case RowsIds of
591
:-(
[] -> undefined;
592 [_ | _] ->
593
:-(
maps:get(id, lists:last(RowsIds))
594 end.
595
596 %% @doc Get closest offset -> message id 'hint' for specified offset
597 prev_offset_query_cql() ->
598
:-(
"SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
599 "and offset <= ? LIMIT 1".
600
601 %% @doc Insert offset -> message id 'hint'
602 insert_offset_hint_query_cql() ->
603
:-(
"INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
604
605 prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID) ->
606
:-(
BUserJID = bare_jid(UserJID),
607 %% In Cassandra, a column cannot be restricted by both an equality and an inequality relation.
608 %% When MsgID is defined, it is used as both StartID and EndID to comply with this limitation.
609 %% This means that the `ids` filter effectively overrides any "before" or "after" filters.
610
:-(
{StartID, EndID} = case MsgID of
611 undefined ->
612
:-(
mod_mam_utils:calculate_msg_id_borders(Borders, Start, End);
613 ID ->
614
:-(
{ID, ID}
615 end,
616
:-(
BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
617
:-(
prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
618
619 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
620
:-(
#mam_ca_filter{
621 user_jid = BUserJID,
622 with_jid = BWithJID,
623 start_id = StartID,
624 end_id = EndID
625 }.
626
627 eval_filter_params(#mam_ca_filter{
628 user_jid = BUserJID,
629 with_jid = BWithJID,
630 start_id = StartID,
631 end_id = EndID
632 }) ->
633
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
634 #{start_id => StartID, end_id =>EndID}),
635
:-(
maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
636
637 select_filter(#mam_ca_filter{
638 start_id = StartID,
639 end_id = EndID
640 }) ->
641
:-(
select_filter(StartID, EndID).
642
643
644 -spec select_filter(StartID, EndID) ->
645 all | 'end' | start | start_end when
646 StartID :: integer() | undefined,
647 EndID :: integer() | undefined.
648 select_filter(undefined, undefined) ->
649
:-(
all;
650 select_filter(undefined, _) ->
651
:-(
'end';
652 select_filter(_, undefined) ->
653
:-(
start;
654 select_filter(_, _) ->
655
:-(
start_end.
656
657 prepare_filter_cql(StartID, EndID) ->
658 case StartID of
659
:-(
undefined -> "";
660
:-(
_ -> " AND id >= :start_id"
661
:-(
end ++
662 case EndID of
663
:-(
undefined -> "";
664
:-(
_ -> " AND id <= :end_id"
665 end.
666
667 filter_to_cql() ->
668
:-(
[{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
669
:-(
|| StartID <- [undefined, 0], EndID <- [undefined, 0]].
670
671 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
672 when
673 PoolName :: mongoose_cassandra:pool_name(),
674 UserJID :: jid:jid(),
675 HostType :: host_type(),
676 Filter :: filter(),
677 PageSize :: non_neg_integer(),
678 TotalCount :: non_neg_integer(),
679 RSM :: jlib:rsm_in() | undefined,
680 Offset :: non_neg_integer().
681 %% Requesting the Last Page in a Result Set
682 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
683
:-(
max(0, TC - PS);
684 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
685 when is_integer(ID) ->
686
:-(
max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
687 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
688 when is_integer(ID) ->
689
:-(
calc_index(PoolName, UserJID, HostType, F, ID);
690 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
691
:-(
0.
692
693 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
694
:-(
maybe_full_jid(undefined) -> <<>>;
695 maybe_full_jid(JID) ->
696
:-(
jid:to_binary(jid:to_lower(JID)).
697
698 %%====================================================================
699 %% Internal SQL part
700 %%====================================================================
701
702 extract_messages_queries() ->
703
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
704
:-(
|| {FilterName, Filter} <- filter_to_cql()].
705
706 extract_messages_r_queries() ->
707
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
708
:-(
|| {FilterName, Filter} <- filter_to_cql()].
709
710 calc_count_queries() ->
711
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
712
:-(
|| {FilterName, Filter} <- filter_to_cql()].
713
714 list_message_ids_queries() ->
715
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
716
:-(
|| {FilterName, Filter} <- filter_to_cql()].
717
718 extract_messages_cql(Filter) ->
719 "SELECT id, from_jid, message FROM mam_message "
720
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
721 Filter ++ " ORDER BY id LIMIT ?".
722
723 extract_messages_r_cql(Filter) ->
724 "SELECT id, from_jid, message FROM mam_message "
725
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
726 Filter ++ " ORDER BY id DESC LIMIT ?".
727
728 calc_count_cql(Filter) ->
729 "SELECT COUNT(*) FROM mam_message "
730
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter.
731
732 list_message_ids_cql(Filter) ->
733 "SELECT id FROM mam_message "
734
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
735 " ORDER BY id LIMIT ?".
736
737 fetch_user_messages_cql() ->
738
:-(
"SELECT id, from_jid, message FROM mam_message "
739 "WHERE user_jid = ? AND with_jid = ? "
740 "ORDER BY id".
741
742
743
744 %% ----------------------------------------------------------------------
745 %% Optimizations
746
747 packet_to_stored_binary(HostType, Packet) ->
748 %% Module implementing mam_message behaviour
749
:-(
Module = db_message_format(HostType),
750
:-(
mam_message:encode(Module, Packet).
751
752 stored_binary_to_packet(HostType, Bin) ->
753 %% Module implementing mam_message behaviour
754
:-(
Module = db_message_format(HostType),
755
:-(
mam_message:decode(Module, Bin).
756
757 %% ----------------------------------------------------------------------
758 %% Params getters
759
760 -spec db_message_format(HostType :: host_type()) -> module().
761 db_message_format(HostType) ->
762
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
763
764 -spec pool_name(HostType :: host_type()) -> term().
765 pool_name(_HostType) ->
766
:-(
default.
Line Hits Source