./ct_report/coverage/mod_mam_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11 -behaviour(mongoose_module_metrics).
12
13 %% ----------------------------------------------------------------------
14 %% Exports
15
16 %% gen_mod handlers
17 -export([start/2, stop/1, hooks/1]).
18
19 %% MAM hook handlers
20 -export([archive_size/3,
21 archive_message/3,
22 lookup_messages/3,
23 remove_archive/3]).
24
25 %% mongoose_cassandra callbacks
26 -export([prepared_queries/0]).
27
28 %gdpr
29 -export([get_mam_pm_gdpr_data/3]).
30
31 %% ----------------------------------------------------------------------
32 %% Imports
33
34 %% Other
35 -import(mod_mam_utils,
36 [maybe_min/2,
37 maybe_max/2,
38 bare_jid/1,
39 full_jid/1
40 ]).
41
42 -include("mongoose.hrl").
43 -include("jlib.hrl").
44 -include("mongoose_rsm.hrl").
45
46 -callback encode(binary()) -> binary().
47 -callback decode(binary()) -> binary().
48
49 -record(mam_ca_filter, {
50 user_jid,
51 with_jid,
52 remote_jid,
53 start_id,
54 end_id
55 }).
56
57 -record(mam_message, {
58 id :: non_neg_integer(),
59 user_jid :: binary(),
60 remote_jid :: binary() | undefined,
61 from_jid :: binary() | undefined,
62 with_jid = <<>> :: binary(),
63 message :: binary() | undefined
64 }).
65
66 %% ----------------------------------------------------------------------
67 %% Types
68
69 -type filter() :: #mam_ca_filter{}.
70 -type message_id() :: non_neg_integer().
71 -type host_type() :: mongooseim:host_type().
72
73 %% ----------------------------------------------------------------------
74 %% gen_mod callbacks
75 %% Starting and stopping functions for users' archives
76
77 -spec start(host_type(), gen_mod:module_opts()) -> ok.
78 start(_HostType, _Opts) ->
79
:-(
ok.
80
81 -spec stop(host_type()) -> ok.
82 stop(_HostType) ->
83
:-(
ok.
84
85 %% ----------------------------------------------------------------------
86 %% Add hooks for mod_mam_pm
87
88 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
89 hooks(HostType) ->
90
:-(
[{mam_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
91 {mam_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
92 {mam_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
93 {mam_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
94 {get_mam_pm_gdpr_data, HostType, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50}].
95
96 %% ----------------------------------------------------------------------
97 %% mongoose_cassandra_worker callbacks
98
99 prepared_queries() ->
100 [
101 {insert_offset_hint_query, insert_offset_hint_query_cql()},
102 {prev_offset_query, prev_offset_query_cql()},
103 {insert_query, insert_query_cql()},
104 {fetch_user_messages_query, fetch_user_messages_cql()},
105 {select_for_removal_query, select_for_removal_query_cql()},
106 {remove_archive_query, remove_archive_query_cql()},
107 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}
108 ]
109
:-(
++ extract_messages_queries()
110 ++ extract_messages_r_queries()
111 ++ calc_count_queries()
112 ++ list_message_ids_queries().
113
114 %% ----------------------------------------------------------------------
115 %% Internal functions and callbacks
116
117 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
118 Acc :: integer(),
119 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
120 Extra :: gen_hook:extra().
121 archive_size(Size, #{owner := UserJID}, #{host_type := HostType}) when is_integer(Size) ->
122
:-(
Borders = Start = End = WithJID = undefined,
123
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, undefined),
124
:-(
{ok, calc_count(pool_name(HostType), UserJID, HostType, Filter)}.
125
126
127 %% ----------------------------------------------------------------------
128 %% INSERT MESSAGE
129
130 insert_query_cql() ->
131
:-(
"INSERT INTO mam_message "
132 "(id, user_jid, from_jid, remote_jid, with_jid, message) "
133 "VALUES (?, ?, ?, ?, ?, ?)".
134
135 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
136 Acc :: ok,
137 Params :: mod_mam:archive_message_params(),
138 Extra :: gen_hook:extra().
139 archive_message(_Result, Params, #{host_type := HostType}) ->
140
:-(
try
141
:-(
{ok, archive_message2(Params, HostType)}
142 catch _Type:Reason:StackTrace ->
143
:-(
mongoose_instrument:execute(mod_mam_pm_dropped, #{host_type => HostType}, #{count => 1}),
144
:-(
?LOG_ERROR(#{what => archive_message_failed,
145 host_type => HostType, mam_params => Params,
146
:-(
reason => Reason, stacktrace => StackTrace}),
147
:-(
{ok, {error, Reason}}
148 end.
149
150 archive_message2(#{message_id := MessID,
151 local_jid := LocJID,
152 remote_jid := RemJID,
153 source_jid := SrcJID,
154 packet := Packet}, HostType) ->
155
:-(
BLocJID = bare_jid(LocJID),
156
:-(
BRemBareJID = bare_jid(RemJID),
157
:-(
BRemFullJID = full_jid(RemJID),
158
:-(
BSrcJID = full_jid(SrcJID),
159
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
160
:-(
Message = #mam_message{
161 id = MessID,
162 user_jid = BLocJID,
163 from_jid = BSrcJID,
164 remote_jid = BRemFullJID,
165 message = BPacket
166 },
167
:-(
WithJIDs = lists:usort([<<>>, BRemFullJID, BRemBareJID]),
168
:-(
Messages = [Message#mam_message{with_jid = BWithJID} || BWithJID <- WithJIDs],
169
:-(
ok = write_messages(HostType, LocJID, Messages).
170
171 write_messages(HostType, UserJID, Messages) ->
172
:-(
MultiParams = [message_to_params(M) || M <- Messages],
173
:-(
mongoose_cassandra:cql_write_async(pool_name(HostType), UserJID,
174 ?MODULE, insert_query, MultiParams).
175
176 message_to_params(#mam_message{
177 id = MessID,
178 user_jid = BLocJID,
179 from_jid = BSrcJID,
180 remote_jid = BRemJID,
181 with_jid = BWithJID,
182 message = BPacket
183 }) ->
184
:-(
#{id => MessID, user_jid => BLocJID, from_jid => BSrcJID,
185 remote_jid => BRemJID, with_jid => BWithJID, message => BPacket}.
186
187 %% ----------------------------------------------------------------------
188 %% REMOVE ARCHIVE
189
190 remove_archive_query_cql() ->
191
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
192
193 remove_archive_offsets_query_cql() ->
194
:-(
"DELETE FROM mam_message WHERE user_jid = ? AND with_jid = ?".
195
196 select_for_removal_query_cql() ->
197
:-(
"SELECT DISTINCT user_jid, with_jid FROM mam_message WHERE user_jid = ?".
198
199 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
200 Acc :: term(),
201 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
202 Extra :: gen_hook:extra().
203 remove_archive(Acc, #{owner := UserJID}, #{host_type := HostType}) ->
204
:-(
remove_archive(HostType, UserJID),
205
:-(
{ok, Acc}.
206
207 remove_archive(HostType, UserJID) ->
208
:-(
PoolName = pool_name(HostType),
209
:-(
BUserJID = bare_jid(UserJID),
210
:-(
Params = #{user_jid => BUserJID},
211 %% Wait until deleted
212
:-(
DeleteFun =
213 fun(Rows, _AccIn) ->
214
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
215 remove_archive_query, Rows),
216
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
217 remove_archive_offsets_query, Rows)
218 end,
219
220
:-(
mongoose_cassandra:cql_foldl(PoolName, UserJID, ?MODULE,
221 select_for_removal_query, Params, DeleteFun, []).
222 %% ----------------------------------------------------------------------
223 %% SELECT MESSAGES
224
225 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
226 Acc :: {ok, mod_mam:lookup_result()} | {error, term()},
227 Params :: mam_iq:lookup_params(),
228 Extra :: gen_hook:extra().
229 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
230
:-(
{ok, Result};
231 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
232
:-(
{ok, {error, 'not-supported'}};
233 lookup_messages(_Result,
234 #{owner_jid := UserJID, rsm := RSM, borders := Borders,
235 start_ts := Start, end_ts := End, with_jid := WithJID,
236 search_text := undefined, page_size := PageSize,
237 is_simple := IsSimple, message_id := MsgID},
238 #{host_type := HostType}) ->
239
:-(
try
240
:-(
{ok, lookup_messages2(pool_name(HostType), HostType,
241 UserJID, RSM, Borders,
242 Start, End, WithJID,
243 PageSize, MsgID, IsSimple)}
244 catch _Type:Reason:S ->
245
:-(
{ok, {error, {Reason, S}}}
246 end.
247
248 lookup_messages2(PoolName, HostType,
249 UserJID = #jid{}, RSM, Borders,
250 Start, End, WithJID,
251 PageSize, MsgID, _IsSimple = true) ->
252 %% Simple query without calculating offset and total count
253
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID),
254
:-(
lookup_messages_simple(PoolName, HostType, UserJID, RSM, PageSize, Filter);
255 lookup_messages2(PoolName, HostType,
256 UserJID = #jid{}, RSM, Borders,
257 Start, End, WithJID,
258 PageSize, MsgID, _IsSimple) ->
259 %% Query with offset calculation
260 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
261 %% Not all queries are optimal. You would like to disable something for production
262 %% once you know how you will call bd
263
:-(
Strategy = rsm_to_strategy(RSM),
264
:-(
Filter = prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID),
265
:-(
case Strategy of
266 last_page ->
267
:-(
lookup_messages_last_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
268 by_offset ->
269
:-(
lookup_messages_by_offset(PoolName, HostType, UserJID, RSM, PageSize, Filter);
270 first_page ->
271
:-(
lookup_messages_first_page(PoolName, HostType, UserJID, RSM, PageSize, Filter);
272 before_id ->
273
:-(
lookup_messages_before_id(PoolName, HostType, UserJID, RSM, PageSize, Filter);
274 after_id ->
275
:-(
lookup_messages_after_id(PoolName, HostType, UserJID, RSM, PageSize, Filter)
276 end.
277
278 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
279
:-(
last_page;
280 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
281
:-(
first_page;
282 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
283
:-(
by_offset;
284 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
285
:-(
before_id;
286 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
287
:-(
after_id;
288 rsm_to_strategy(#rsm_in{}) ->
289
:-(
first_page;
290 rsm_to_strategy(undefined) ->
291
:-(
first_page.
292
293 lookup_messages_simple(PoolName, HostType, UserJID,
294 #rsm_in{direction = aft, id = ID},
295 PageSize, Filter) ->
296 %% Get last rows from result set
297
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, after_id(ID, Filter), PageSize, false),
298
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
299 lookup_messages_simple(PoolName, HostType, UserJID,
300 #rsm_in{direction = before, id = ID},
301 PageSize, Filter) ->
302
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, before_id(ID, Filter), PageSize, true),
303
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
304 lookup_messages_simple(PoolName, HostType, UserJID,
305 #rsm_in{direction = undefined, index = Offset},
306 PageSize, Filter) ->
307 %% Apply offset
308
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter,
309 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
310
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
311 false),
312
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}};
313 lookup_messages_simple(PoolName, HostType, UserJID,
314 _,
315 PageSize, Filter) ->
316
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
317
:-(
{ok, {undefined, undefined, rows_to_uniform_format(HostType, MessageRows)}}.
318
319 lookup_messages_last_page(PoolName, HostType, UserJID,
320 #rsm_in{direction = before, id = undefined},
321 0, Filter) ->
322 %% Last page
323
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
324
:-(
{ok, {TotalCount, TotalCount, []}};
325 lookup_messages_last_page(PoolName, HostType, UserJID,
326 #rsm_in{direction = before, id = undefined},
327 PageSize, Filter) ->
328 %% Last page
329
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, true),
330
:-(
MessageRowsCount = length(MessageRows),
331
:-(
case MessageRowsCount < PageSize of
332 true ->
333
:-(
{ok, {MessageRowsCount, 0,
334 rows_to_uniform_format(HostType, MessageRows)}};
335 false ->
336
:-(
FirstID = row_to_message_id(hd(MessageRows)),
337
:-(
Offset = calc_count(PoolName, UserJID, HostType, before_id(FirstID, Filter)),
338
:-(
{ok, {Offset + MessageRowsCount, Offset,
339 rows_to_uniform_format(HostType, MessageRows)}}
340 end.
341
342 lookup_messages_by_offset(PoolName, HostType, UserJID,
343 #rsm_in{direction = undefined, index = Offset},
344 0, Filter) when is_integer(Offset) ->
345 %% By offset
346
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
347
:-(
{ok, {TotalCount, Offset, []}};
348 lookup_messages_by_offset(PoolName, HostType, UserJID,
349 #rsm_in{direction = undefined, index = Offset},
350 PageSize, Filter) when is_integer(Offset) ->
351 %% By offset
352
:-(
StartId = offset_to_start_id(PoolName, UserJID, Filter, Offset), %% POTENTIALLY SLOW
353
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(StartId, Filter), PageSize,
354 false),
355
:-(
MessageRowsCount = length(MessageRows),
356
:-(
case MessageRowsCount < PageSize of
357 true ->
358
:-(
{ok, {Offset + MessageRowsCount, Offset,
359 rows_to_uniform_format(HostType, MessageRows)}};
360 false ->
361
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
362
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
363
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
364 rows_to_uniform_format(HostType, MessageRows)}}
365 end.
366
367 lookup_messages_first_page(PoolName, HostType, UserJID, _, 0, Filter) ->
368 %% First page, just count
369
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
370
:-(
{ok, {TotalCount, 0, []}};
371 lookup_messages_first_page(PoolName, HostType, UserJID, _, PageSize, Filter) ->
372 %% First page
373
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, Filter, PageSize, false),
374
:-(
MessageRowsCount = length(MessageRows),
375
:-(
case MessageRowsCount < PageSize of
376 true ->
377 %% Total number of messages is less than one page
378
:-(
{ok, {MessageRowsCount, 0,
379 rows_to_uniform_format(HostType, MessageRows)}};
380 false ->
381
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
382
:-(
CountAfterLastID = calc_count(PoolName, UserJID, HostType, after_id(LastID, Filter)),
383
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
384 rows_to_uniform_format(HostType, MessageRows)}}
385 end.
386
387 lookup_messages_before_id(PoolName, HostType, UserJID,
388 RSM = #rsm_in{direction = before, id = ID},
389 PageSize, Filter) ->
390
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
391
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
392
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, to_id(ID, Filter),
393 PageSize + 1, true),
394
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
395
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
396
397 lookup_messages_after_id(PoolName, HostType, UserJID,
398 RSM = #rsm_in{direction = aft, id = ID},
399 PageSize, Filter) ->
400
:-(
TotalCount = calc_count(PoolName, UserJID, HostType, Filter),
401
:-(
Offset = calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM),
402
:-(
MessageRows = extract_messages(PoolName, UserJID, HostType, from_id(ID, Filter),
403 PageSize + 1, false),
404
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(HostType, MessageRows)},
405
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
406
407
408 after_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
409
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
410
411 before_id(undefined, Filter) ->
412
:-(
Filter;
413 before_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
414
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
415
416 to_id(ID, Filter = #mam_ca_filter{end_id = BeforeID}) ->
417
:-(
Filter#mam_ca_filter{end_id = maybe_min(ID, BeforeID)}.
418
419 from_id(ID, Filter = #mam_ca_filter{start_id = AfterID}) ->
420
:-(
Filter#mam_ca_filter{start_id = maybe_max(ID, AfterID)}.
421
422
423 rows_to_uniform_format(HostType, MessageRows) ->
424
:-(
[row_to_uniform_format(HostType, Row) || Row <- MessageRows].
425
426 row_to_uniform_format(HostType, #{from_jid := FromJID, message := Msg, id := MsgID}) ->
427
:-(
SrcJID = jid:from_binary(FromJID),
428
:-(
Packet = stored_binary_to_packet(HostType, Msg),
429
:-(
#{id => MsgID, jid => SrcJID, packet => Packet}.
430
431 row_to_message_id(#{id := MsgID}) ->
432
:-(
MsgID.
433
434 -spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
435 Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
436 Params :: #{jid := jid:jid()},
437 Extra :: gen_hook:extra().
438 get_mam_pm_gdpr_data(Acc, #{jid := JID}, #{host_type := HostType}) ->
439
:-(
BinJID = jid:to_binary(jid:to_lower(JID)),
440
:-(
FilterMap = #{user_jid => BinJID, with_jid => <<"">>},
441
:-(
Rows = fetch_user_messages(pool_name(HostType), JID, FilterMap),
442
:-(
Messages = [rows_to_gdpr_mam_message(HostType, Row) || Row <- Rows],
443
:-(
{ok, Messages ++ Acc}.
444
445 rows_to_gdpr_mam_message(HostType, #{message := Data, id:= Id, from_jid:=FromJid}) ->
446
:-(
{Id, FromJid, exml:to_binary(stored_binary_to_packet(HostType, Data))}.
447
448 %% Offset is not supported
449 %% Each record is a tuple of form
450 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
451 %% Columns are `["id", "from_jid", "message"]'.
452 -spec extract_messages(PoolName, UserJID, HostType, Filter, IMax, ReverseLimit) ->
453 [Row] when
454 PoolName :: mongoose_cassandra:pool_name(),
455 UserJID :: jid:jid(),
456 HostType :: host_type(),
457 Filter :: filter(),
458 IMax :: pos_integer(),
459 ReverseLimit :: boolean(),
460 Row :: mongoose_cassandra:row().
461 extract_messages(_Worker, _UserJID, _HostType, _Filter, 0, _) ->
462
:-(
[];
463 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, false) ->
464
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
465
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
466
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
467
:-(
Rows;
468 extract_messages(PoolName, UserJID, _HostType, Filter, IMax, true) ->
469
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
470
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
471
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
472
:-(
lists:reverse(Rows).
473
474 fetch_user_messages(PoolName, UserJID, FilterMap) ->
475
:-(
QueryName = fetch_user_messages_query,
476
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
477
:-(
Rows.
478
479
480 %% @doc Calculate a zero-based index of the row with UID in the result test.
481 %%
482 %% If the element does not exists, the ID of the next element will
483 %% be returned instead.
484 %% @end
485 -spec calc_index(PoolName, UserJID, HostType, Filter, MessID) -> Count
486 when
487 PoolName :: mongoose_cassandra:pool_name(),
488 UserJID :: jid:jid(),
489 HostType :: host_type(),
490 Filter :: filter(),
491 MessID :: message_id(),
492 Count :: non_neg_integer().
493 calc_index(PoolName, UserJID, HostType, Filter, MessID) ->
494
:-(
calc_count(PoolName, UserJID, HostType, to_id(MessID, Filter)).
495
496 %% @doc Count of elements in RSet before the passed element.
497 %%
498 %% The element with the passed UID can be already deleted.
499 %% @end
500 -spec calc_before(PoolName, UserJID, HostType, Filter, MessID) -> Count
501 when
502 PoolName :: mongoose_cassandra:pool_name(),
503 UserJID :: jid:jid(),
504 HostType :: host_type(),
505 Filter :: filter(),
506 MessID :: message_id(),
507 Count :: non_neg_integer().
508 calc_before(PoolName, UserJID, HostType, Filter, MessID) ->
509
:-(
calc_count(PoolName, UserJID, HostType, before_id(MessID, Filter)).
510
511
512 %% @doc Get the total result set size.
513 %% "SELECT COUNT(*) as "count" FROM mam_message WHERE "
514 -spec calc_count(PoolName, UserJID, HostType, Filter) -> Count
515 when
516 PoolName :: mongoose_cassandra:pool_name(),
517 UserJID :: jid:jid(),
518 HostType :: host_type(),
519 Filter :: filter(),
520 Count :: non_neg_integer().
521 calc_count(PoolName, UserJID, _HostType, Filter) ->
522
:-(
QueryName = {calc_count_query, select_filter(Filter)},
523
:-(
Params = eval_filter_params(Filter),
524
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
525
:-(
Count.
526
527 %% @doc Convert offset to index of the first entry
528 %% Returns undefined if not there are not enough rows
529 %% Uses previously calculated offsets to speed up queries
530 -spec offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
531 when
532 PoolName :: mongoose_cassandra:pool_name(),
533 UserJID :: jid:jid(),
534 Offset :: non_neg_integer(),
535 Filter :: filter(),
536 Id :: non_neg_integer() | undefined.
537 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
538 Offset =< 100 ->
539
:-(
calc_offset_to_start_id(PoolName, UserJID, Filter, Offset);
540 offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
541
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
542 %% Try to find already calculated nearby offset to reduce query size
543
:-(
case mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, prev_offset_query, Params) of
544 {ok, []} -> %% No hints, just calculate offset sloooowly
545
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID, Filter, Offset),
546
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, 0, Offset, StartId);
547 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
548 %% Offset hint found, use it to reduce query size
549
:-(
case Offset of
550
:-(
PrevOffset -> PrevId;
551 _ ->
552
:-(
StartId = calc_offset_to_start_id(PoolName, UserJID,
553 Filter#mam_ca_filter{start_id = PrevId},
554 Offset - PrevOffset + 1),
555
:-(
maybe_save_offset_hint(PoolName, UserJID, Filter, PrevOffset, Offset, StartId)
556 end
557 end.
558
559 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
560 %% Hint is save only if previous offset hint was 50+ entires from current query
561 %% This function returns given StartId as passthrough for convenience
562 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), UserJID :: jid:jid(),
563 Filter :: filter(), HintOffset :: non_neg_integer(),
564 NewOffset :: non_neg_integer(),
565 StartId :: non_neg_integer() | undefined) ->
566 StartId :: non_neg_integer() | undefined.
567 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
568 StartId = undefined) ->
569
:-(
StartId;
570 maybe_save_offset_hint(PoolName, UserJID, Filter, HintOffset, NewOffset, StartId) ->
571
:-(
case abs(NewOffset - HintOffset) > 50 of
572 true ->
573
:-(
#mam_ca_filter{user_jid = FUserJID, with_jid = FWithJID} = Filter,
574
:-(
Row = #{user_jid => FUserJID, with_jid => FWithJID, offset => NewOffset, id => StartId},
575
:-(
mongoose_cassandra:cql_write(PoolName, UserJID, ?MODULE,
576 insert_offset_hint_query, [Row]);
577 false ->
578
:-(
skip
579 end,
580
:-(
StartId.
581
582 %% @doc Convert offset to index of the first entry
583 %% Returns undefined if not there are not enough rows
584 -spec calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) -> Id
585 when
586 PoolName :: mongoose_cassandra:pool_name(),
587 UserJID :: jid:jid(),
588 Offset :: non_neg_integer(),
589 Filter :: filter(),
590 Id :: non_neg_integer() | undefined.
591 calc_offset_to_start_id(PoolName, UserJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
592
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
593
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
594
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, Params),
595
:-(
case RowsIds of
596
:-(
[] -> undefined;
597 [_ | _] ->
598
:-(
maps:get(id, lists:last(RowsIds))
599 end.
600
601 %% @doc Get closest offset -> message id 'hint' for specified offset
602 prev_offset_query_cql() ->
603
:-(
"SELECT id, offset FROM mam_message_offset WHERE user_jid = ? and with_jid = ? "
604 "and offset <= ? LIMIT 1".
605
606 %% @doc Insert offset -> message id 'hint'
607 insert_offset_hint_query_cql() ->
608
:-(
"INSERT INTO mam_message_offset(user_jid, with_jid, id, offset) VALUES(?, ?, ?, ?)".
609
610 prepare_filter(UserJID, Borders, Start, End, WithJID, MsgID) ->
611
:-(
BUserJID = bare_jid(UserJID),
612 %% In Cassandra, a column cannot be restricted by both an equality and an inequality relation.
613 %% When MsgID is defined, it is used as both StartID and EndID to comply with this limitation.
614 %% This means that the `ids` filter effectively overrides any "before" or "after" filters.
615
:-(
{StartID, EndID} = case MsgID of
616 undefined ->
617
:-(
mod_mam_utils:calculate_msg_id_borders(Borders, Start, End);
618 ID ->
619
:-(
{ID, ID}
620 end,
621
:-(
BWithJID = maybe_full_jid(WithJID), %% it's NOT optional field
622
:-(
prepare_filter_params(BUserJID, BWithJID, StartID, EndID).
623
624 prepare_filter_params(BUserJID, BWithJID, StartID, EndID) ->
625
:-(
#mam_ca_filter{
626 user_jid = BUserJID,
627 with_jid = BWithJID,
628 start_id = StartID,
629 end_id = EndID
630 }.
631
632 eval_filter_params(#mam_ca_filter{
633 user_jid = BUserJID,
634 with_jid = BWithJID,
635 start_id = StartID,
636 end_id = EndID
637 }) ->
638
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
639 #{start_id => StartID, end_id =>EndID}),
640
:-(
maps:merge(#{user_jid => BUserJID, with_jid => BWithJID}, Optional).
641
642 select_filter(#mam_ca_filter{
643 start_id = StartID,
644 end_id = EndID
645 }) ->
646
:-(
select_filter(StartID, EndID).
647
648
649 -spec select_filter(StartID, EndID) ->
650 all | 'end' | start | start_end when
651 StartID :: integer() | undefined,
652 EndID :: integer() | undefined.
653 select_filter(undefined, undefined) ->
654
:-(
all;
655 select_filter(undefined, _) ->
656
:-(
'end';
657 select_filter(_, undefined) ->
658
:-(
start;
659 select_filter(_, _) ->
660
:-(
start_end.
661
662 prepare_filter_cql(StartID, EndID) ->
663 case StartID of
664
:-(
undefined -> "";
665
:-(
_ -> " AND id >= :start_id"
666
:-(
end ++
667 case EndID of
668
:-(
undefined -> "";
669
:-(
_ -> " AND id <= :end_id"
670 end.
671
672 filter_to_cql() ->
673
:-(
[{select_filter(StartID, EndID), prepare_filter_cql(StartID, EndID)}
674
:-(
|| StartID <- [undefined, 0], EndID <- [undefined, 0]].
675
676 -spec calc_offset(PoolName, UserJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
677 when
678 PoolName :: mongoose_cassandra:pool_name(),
679 UserJID :: jid:jid(),
680 HostType :: host_type(),
681 Filter :: filter(),
682 PageSize :: non_neg_integer(),
683 TotalCount :: non_neg_integer(),
684 RSM :: jlib:rsm_in() | undefined,
685 Offset :: non_neg_integer().
686 %% Requesting the Last Page in a Result Set
687 calc_offset(_W, _UserJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
688
:-(
max(0, TC - PS);
689 calc_offset(PoolName, UserJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
690 when is_integer(ID) ->
691
:-(
max(0, calc_before(PoolName, UserJID, HostType, F, ID) - PS);
692 calc_offset(PoolName, UserJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
693 when is_integer(ID) ->
694
:-(
calc_index(PoolName, UserJID, HostType, F, ID);
695 calc_offset(_W, _UserJID, _LS, _F, _PS, _TC, _RSM) ->
696
:-(
0.
697
698 -spec maybe_full_jid(undefined | jid:jid()) -> undefined | binary().
699
:-(
maybe_full_jid(undefined) -> <<>>;
700 maybe_full_jid(JID) ->
701
:-(
jid:to_binary(jid:to_lower(JID)).
702
703 %%====================================================================
704 %% Internal SQL part
705 %%====================================================================
706
707 extract_messages_queries() ->
708
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
709
:-(
|| {FilterName, Filter} <- filter_to_cql()].
710
711 extract_messages_r_queries() ->
712
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
713
:-(
|| {FilterName, Filter} <- filter_to_cql()].
714
715 calc_count_queries() ->
716
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
717
:-(
|| {FilterName, Filter} <- filter_to_cql()].
718
719 list_message_ids_queries() ->
720
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
721
:-(
|| {FilterName, Filter} <- filter_to_cql()].
722
723 extract_messages_cql(Filter) ->
724 "SELECT id, from_jid, message FROM mam_message "
725
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
726 Filter ++ " ORDER BY id LIMIT ?".
727
728 extract_messages_r_cql(Filter) ->
729 "SELECT id, from_jid, message FROM mam_message "
730
:-(
"WHERE user_jid = ? AND with_jid = ? " ++
731 Filter ++ " ORDER BY id DESC LIMIT ?".
732
733 calc_count_cql(Filter) ->
734 "SELECT COUNT(*) FROM mam_message "
735
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter.
736
737 list_message_ids_cql(Filter) ->
738 "SELECT id FROM mam_message "
739
:-(
"WHERE user_jid = ? AND with_jid = ? " ++ Filter ++
740 " ORDER BY id LIMIT ?".
741
742 fetch_user_messages_cql() ->
743
:-(
"SELECT id, from_jid, message FROM mam_message "
744 "WHERE user_jid = ? AND with_jid = ? "
745 "ORDER BY id".
746
747
748
749 %% ----------------------------------------------------------------------
750 %% Optimizations
751
752 packet_to_stored_binary(HostType, Packet) ->
753 %% Module implementing mam_message behaviour
754
:-(
Module = db_message_format(HostType),
755
:-(
mam_message:encode(Module, Packet).
756
757 stored_binary_to_packet(HostType, Bin) ->
758 %% Module implementing mam_message behaviour
759
:-(
Module = db_message_format(HostType),
760
:-(
mam_message:decode(Module, Bin).
761
762 %% ----------------------------------------------------------------------
763 %% Params getters
764
765 -spec db_message_format(HostType :: host_type()) -> module().
766 db_message_format(HostType) ->
767
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
768
769 -spec pool_name(HostType :: host_type()) -> term().
770 pool_name(_HostType) ->
771
:-(
default.
Line Hits Source