./ct_report/coverage/mod_mam_muc_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_muc_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10
11 %% gen_mod handlers
12 -export([start/2, stop/1]).
13
14 %% MAM hook handlers
15 -export([archive_size/4,
16 archive_message/3,
17 lookup_messages/3,
18 remove_archive/4]).
19
20 %% mongoose_cassandra callbacks
21 -export([prepared_queries/0, get_mam_muc_gdpr_data/3]).
22
23 -ignore_xref([
24 behaviour_info/1, remove_archive/4, start/2, stop/1
25 ]).
26
27 %% ----------------------------------------------------------------------
28 %% Imports
29
30 %% UID
31 -import(mod_mam_utils,
32 [encode_compact_uuid/2]).
33
34 %% Other
35 -import(mod_mam_utils,
36 [maybe_min/2,
37 maybe_max/2,
38 apply_start_border/2,
39 apply_end_border/2]).
40
41 -include("jlib.hrl").
42 -include("mongoose_rsm.hrl").
43
44 -record(mam_muc_ca_filter, {
45 room_jid,
46 with_nick,
47 nick_name,
48 start_id,
49 end_id
50 }).
51
52 -record(mam_muc_message, {
53 id :: non_neg_integer(),
54 room_jid :: binary(),
55 from_jid :: binary() | undefined,
56 nick_name :: binary(),
57 with_nick :: binary(),
58 message :: binary() | undefined
59 }).
60
61 -callback encode(binary()) -> binary().
62 -callback decode(binary()) -> binary().
63
64 %% ----------------------------------------------------------------------
65 %% Types
66
67 -type filter() :: #mam_muc_ca_filter{}.
68 -type message_id() :: non_neg_integer().
69 -type host_type() :: mongooseim:host_type().
70
71 %% ----------------------------------------------------------------------
72 %% gen_mod callbacks
73 %% Starting and stopping functions for users' archives
74
75 -spec start(host_type(), _) -> ok.
76 start(HostType, _Opts) ->
77
:-(
ejabberd_hooks:add(hooks(HostType)).
78
79 -spec stop(host_type()) -> ok.
80 stop(HostType) ->
81
:-(
ejabberd_hooks:delete(hooks(HostType)).
82
83 %% ----------------------------------------------------------------------
84 %% Add hooks for mod_mam_muc
85
86 hooks(HostType) ->
87
:-(
NoWriter = gen_mod:get_module_opt(HostType, ?MODULE, no_writer, false),
88 case NoWriter of
89 true ->
90
:-(
[];
91 false ->
92
:-(
[{mam_muc_archive_message, HostType, ?MODULE, archive_message, 50}]
93
:-(
end ++
94 [{mam_muc_archive_size, HostType, ?MODULE, archive_size, 50},
95 {mam_muc_lookup_messages, HostType, ?MODULE, lookup_messages, 50},
96 {mam_muc_remove_archive, HostType, ?MODULE, remove_archive, 50},
97 {get_mam_muc_gdpr_data, HostType, ?MODULE, get_mam_muc_gdpr_data, 50}].
98
99 %% ----------------------------------------------------------------------
100 %% mongoose_cassandra_worker callbacks
101
102 prepared_queries() ->
103 [
104 {insert_offset_hint_query, insert_offset_hint_query_cql()},
105 {prev_offset_query, prev_offset_query_cql()},
106 {insert_query, insert_query_cql()},
107 {fetch_user_messages_query, fetch_user_messages_cql()},
108 {select_for_removal_query, select_for_removal_query_cql()},
109 {remove_archive_query, remove_archive_query_cql()},
110 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}]
111
:-(
++ extract_messages_queries()
112 ++ extract_messages_r_queries()
113 ++ calc_count_queries()
114 ++ list_message_ids_queries().
115
116 %%====================================================================
117 %% Internal functions
118 %%====================================================================
119
120 %% ----------------------------------------------------------------------
121 %% Internal functions and callbacks
122
123 archive_size(Size, HostType, _RoomID, RoomJID) when is_integer(Size) ->
124
:-(
PoolName = pool_name(HostType),
125
:-(
Borders = Start = End = WithNick = undefined,
126
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
127
:-(
calc_count(PoolName, RoomJID, HostType, Filter).
128
129
130 %% ----------------------------------------------------------------------
131 %% INSERT MESSAGE
132
133 insert_query_cql() ->
134
:-(
"INSERT INTO mam_muc_message "
135 "(id, room_jid, from_jid, nick_name, with_nick, message) "
136 "VALUES (?, ?, ?, ?, ?, ?)".
137
138 archive_message(_Result, HostType, Params) ->
139
:-(
try
140
:-(
archive_message2(Params, HostType)
141 catch _Type:Reason ->
142
:-(
{error, Reason}
143 end.
144
145 archive_message2(#{message_id := MessID,
146 local_jid := LocJID = #jid{},
147 remote_jid := FromJID = #jid{},
148 source_jid := #jid{lresource = BNick},
149 packet := Packet}, HostType) ->
150
:-(
BLocJID = mod_mam_utils:bare_jid(LocJID),
151
:-(
BFromJID = mod_mam_utils:bare_jid(FromJID),
152
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
153
:-(
Messages = [#mam_muc_message{
154 id = MessID,
155 room_jid = BLocJID,
156 from_jid = BWithFromJID,
157 nick_name = BNick,
158 message = BPacket,
159 with_nick = BWithNick
160
:-(
} || {BWithNick, BWithFromJID} <- [{<<>>, BFromJID}, {BNick, <<>>}]],
161
:-(
PoolName = pool_name(HostType),
162
:-(
write_messages(HostType, PoolName, Messages).
163
164 write_messages(HostType, RoomJID, Messages) ->
165
:-(
PoolName = pool_name(HostType),
166
:-(
MultiParams = [message_to_params(M) || M <- Messages],
167
:-(
mongoose_cassandra:cql_write_async(PoolName, RoomJID, ?MODULE, insert_query, MultiParams).
168
169 message_to_params(#mam_muc_message{
170 id = MessID,
171 room_jid = BLocJID,
172 from_jid = BFromJID,
173 nick_name = BNick,
174 with_nick = BWithNick,
175 message = BPacket
176 }) ->
177
:-(
#{id => MessID, room_jid => BLocJID, from_jid => BFromJID,
178 nick_name => BNick, with_nick => BWithNick, message => BPacket}.
179
180
181 %% ----------------------------------------------------------------------
182 %% REMOVE ARCHIVE
183
184 remove_archive_query_cql() ->
185
:-(
"DELETE FROM mam_muc_message WHERE room_jid = ? AND with_nick = ?".
186
187 remove_archive_offsets_query_cql() ->
188
:-(
"DELETE FROM mam_muc_message_offset WHERE room_jid = ? AND with_nick = ?".
189
190 select_for_removal_query_cql() ->
191
:-(
"SELECT DISTINCT room_jid, with_nick FROM mam_muc_message WHERE room_jid = ?".
192
193 remove_archive(Acc, HostType, _RoomID, RoomJID) ->
194
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
195
:-(
PoolName = pool_name(HostType),
196
:-(
Params = #{room_jid => BRoomJID},
197 %% Wait until deleted
198
199
:-(
DeleteFun =
200 fun(Rows, _AccIn) ->
201
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
202 remove_archive_query, Rows),
203
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
204 remove_archive_offsets_query, Rows)
205 end,
206
207
:-(
mongoose_cassandra:cql_foldl(PoolName, RoomJID, ?MODULE,
208 select_for_removal_query, Params, DeleteFun, []),
209
:-(
Acc.
210
211 %% ----------------------------------------------------------------------
212 %% SELECT MESSAGES
213
214 -spec lookup_messages(Result :: any(), HostType :: host_type(), Params :: map()) ->
215 {ok, mod_mam:lookup_result()}.
216 lookup_messages({error, _Reason} = Result, _HostType, _Params) ->
217
:-(
Result;
218 lookup_messages(_Result, _HostType, #{search_text := <<_/binary>>}) ->
219
:-(
{error, 'not-supported'};
220 lookup_messages(_Result, HostType,
221 #{owner_jid := RoomJID, rsm := RSM, borders := Borders,
222 start_ts := Start, end_ts := End, with_jid := WithJID,
223 search_text := undefined, page_size := PageSize,
224 is_simple := IsSimple}) ->
225
:-(
try
226
:-(
WithNick = maybe_jid_to_nick(WithJID),
227
:-(
PoolName = pool_name(HostType),
228
:-(
lookup_messages2(PoolName, HostType,
229 RoomJID, RSM, Borders,
230 Start, End, WithNick,
231 PageSize, IsSimple)
232 catch _Type:Reason:S ->
233
:-(
{error, {Reason, {stacktrace, S}}}
234 end.
235
236
:-(
maybe_jid_to_nick(#jid{lresource = BNick}) -> BNick;
237
:-(
maybe_jid_to_nick(undefined) -> undefined.
238
239
240 lookup_messages2(PoolName, HostType,
241 RoomJID = #jid{}, RSM, Borders,
242 Start, End, WithNick,
243 PageSize, _IsSimple = true) ->
244 %% Simple query without calculating offset and total count
245
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
246
:-(
lookup_messages_simple(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
247 lookup_messages2(PoolName, HostType,
248 RoomJID = #jid{}, RSM, Borders,
249 Start, End, WithNick,
250 PageSize, _IsSimple) ->
251 %% Query with offset calculation
252 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
253 %% Not all queries are optimal. You would like to disable something for production
254 %% once you know how you will call bd
255
:-(
Strategy = rsm_to_strategy(RSM),
256
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
257
:-(
case Strategy of
258 last_page ->
259
:-(
lookup_messages_last_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
260 by_offset ->
261
:-(
lookup_messages_by_offset(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
262 first_page ->
263
:-(
lookup_messages_first_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
264 before_id ->
265
:-(
lookup_messages_before_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
266 after_id ->
267
:-(
lookup_messages_after_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter)
268 end.
269
270 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
271
:-(
last_page;
272 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
273
:-(
first_page;
274 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
275
:-(
by_offset;
276 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
277
:-(
before_id;
278 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
279
:-(
after_id;
280 rsm_to_strategy(#rsm_in{}) ->
281
:-(
first_page;
282 rsm_to_strategy(undefined) ->
283
:-(
first_page.
284
285 lookup_messages_simple(PoolName, HostType, RoomJID,
286 #rsm_in{direction = aft, id = ID},
287 PageSize, Filter) ->
288 %% Get last rows from result set
289
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, after_id(ID, Filter), PageSize, false),
290
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
291 lookup_messages_simple(PoolName, HostType, RoomJID,
292 #rsm_in{direction = before, id = ID},
293 PageSize, Filter) ->
294
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, before_id(ID, Filter), PageSize, true),
295
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
296 lookup_messages_simple(PoolName, HostType, RoomJID,
297 #rsm_in{direction = undefined, index = Offset},
298 PageSize, Filter) ->
299 %% Apply offset
300
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter,
301 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
302
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
303 false),
304
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
305 lookup_messages_simple(PoolName, HostType, RoomJID,
306 _,
307 PageSize, Filter) ->
308
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
309
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}.
310
311 lookup_messages_last_page(PoolName, HostType, RoomJID,
312 #rsm_in{direction = before, id = undefined},
313 0, Filter) ->
314 %% Last page
315
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
316
:-(
{ok, {TotalCount, TotalCount, []}};
317 lookup_messages_last_page(PoolName, HostType, RoomJID,
318 #rsm_in{direction = before, id = undefined},
319 PageSize, Filter) ->
320 %% Last page
321
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, true),
322
:-(
MessageRowsCount = length(MessageRows),
323
:-(
case MessageRowsCount < PageSize of
324 true ->
325
:-(
{ok, {MessageRowsCount, 0,
326 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
327 false ->
328
:-(
FirstID = row_to_message_id(hd(MessageRows)),
329
:-(
Offset = calc_count(PoolName, RoomJID, HostType, before_id(FirstID, Filter)),
330
:-(
{ok, {Offset + MessageRowsCount, Offset,
331 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
332 end.
333
334 lookup_messages_by_offset(PoolName, HostType, RoomJID,
335 #rsm_in{direction = undefined, index = Offset},
336 0, Filter) when is_integer(Offset) ->
337 %% By offset
338
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
339
:-(
{ok, {TotalCount, Offset, []}};
340 lookup_messages_by_offset(PoolName, HostType, RoomJID,
341 #rsm_in{direction = undefined, index = Offset},
342 PageSize, Filter) when is_integer(Offset) ->
343 %% By offset
344
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter, Offset), %% POTENTIALLY SLOW
345
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
346 false),
347
:-(
MessageRowsCount = length(MessageRows),
348
:-(
case MessageRowsCount < PageSize of
349 true ->
350
:-(
{ok, {Offset + MessageRowsCount, Offset,
351 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
352 false ->
353
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
354
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
355
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
356 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
357 end.
358
359 lookup_messages_first_page(PoolName, HostType, RoomJID,
360 _,
361 0, Filter) ->
362 %% First page, just count
363
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
364
:-(
{ok, {TotalCount, 0, []}};
365 lookup_messages_first_page(PoolName, HostType, RoomJID,
366 _,
367 PageSize, Filter) ->
368 %% First page
369
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
370
:-(
MessageRowsCount = length(MessageRows),
371
:-(
case MessageRowsCount < PageSize of
372 true ->
373
:-(
{ok, {MessageRowsCount, 0,
374 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
375 false ->
376
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
377
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
378
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
379 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
380 end.
381
382 lookup_messages_before_id(PoolName, HostType, RoomJID,
383 RSM = #rsm_in{direction = before, id = ID},
384 PageSize, Filter) ->
385
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
386
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
387
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, to_id(ID, Filter),
388 PageSize + 1, true),
389
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
390
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
391
392 lookup_messages_after_id(PoolName, HostType, RoomJID,
393 RSM = #rsm_in{direction = aft, id = ID},
394 PageSize, Filter) ->
395
:-(
PoolName = pool_name(HostType),
396
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
397
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
398
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(ID, Filter),
399 PageSize + 1, false),
400
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
401
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
402
403
404 after_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
405
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
406
407 before_id(undefined, Filter) ->
408
:-(
Filter;
409 before_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
410
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
411
412 to_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
413
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID, BeforeID)}.
414
415 from_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
416
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID, AfterID)}.
417
418 rows_to_uniform_format(MessageRows, HostType, RoomJID) ->
419
:-(
[row_to_uniform_format(Row, HostType, RoomJID) || Row <- MessageRows].
420
421 row_to_uniform_format(#{nick_name := BNick, message := Data, id := MessID},
422 HostType, RoomJID) ->
423
:-(
SrcJID = jid:replace_resource(RoomJID, BNick),
424
:-(
Packet = stored_binary_to_packet(HostType, Data),
425
:-(
#{id => MessID, jid => SrcJID, packet => Packet}.
426
427 row_to_message_id(#{id := MsgID}) ->
428
:-(
MsgID.
429
430 -spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
431 host_type(), jid:jid()) ->
432 ejabberd_gen_mam_archive:mam_muc_gdpr_data().
433 get_mam_muc_gdpr_data(Acc, HostType, Jid) ->
434
:-(
BinJid = jid:to_binary(jid:to_lower(Jid)),
435
:-(
PoolName = pool_name(HostType),
436
:-(
FilterMap = #{from_jid => BinJid},
437
:-(
Rows = fetch_user_messages(PoolName, Jid, FilterMap),
438
:-(
Messages = [{Id, exml:to_binary(stored_binary_to_packet(HostType, Data))}
439
:-(
|| #{message := Data, id:= Id} <- Rows],
440
:-(
Messages ++ Acc.
441
442 %% Offset is not supported
443 %% Each record is a tuple of form
444 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
445 %% Columns are `["id", "nick_name", "message"]'.
446 -spec extract_messages(PoolName, RoomJID, HostType, Filter, IMax, ReverseLimit) ->
447 [Row] when
448 PoolName :: mongoose_cassandra:pool_name(),
449 RoomJID :: jid:jid(),
450 HostType :: host_type(),
451 Filter :: filter(),
452 IMax :: pos_integer(),
453 ReverseLimit :: boolean(),
454 Row :: mongoose_cassandra:row().
455 extract_messages(_Worker, _RoomJID, _HostType, _Filter, 0, _) ->
456
:-(
[];
457 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, false) ->
458
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
459
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
460
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
461
:-(
Rows;
462 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, true) ->
463
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
464
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
465
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
466
:-(
lists:reverse(Rows).
467
468 fetch_user_messages(PoolName, UserJID, FilterMap) ->
469
:-(
QueryName = fetch_user_messages_query,
470
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
471
:-(
lists:sort(Rows).
472
473
474 %% @doc Calculate a zero-based index of the row with UID in the result test.
475 %%
476 %% If the element does not exists, the ID of the next element will
477 %% be returned instead.
478 %% @end
479 -spec calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> Count
480 when
481 PoolName :: mongoose_cassandra:pool_name(),
482 RoomJID :: jid:jid(),
483 HostType :: host_type(),
484 Filter :: filter(),
485 MessID :: message_id(),
486 Count :: non_neg_integer().
487 calc_index(PoolName, RoomJID, HostType, Filter, MessID) ->
488
:-(
calc_count(PoolName, RoomJID, HostType, to_id(MessID, Filter)).
489
490 %% @doc Count of elements in RSet before the passed element.
491 %%
492 %% The element with the passed UID can be already deleted.
493 %% @end
494 -spec calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> Count
495 when
496 PoolName :: mongoose_cassandra:pool_name(),
497 RoomJID :: jid:jid(),
498 HostType :: host_type(),
499 Filter :: filter(),
500 MessID :: message_id(),
501 Count :: non_neg_integer().
502 calc_before(PoolName, RoomJID, HostType, Filter, MessID) ->
503
:-(
calc_count(PoolName, RoomJID, HostType, before_id(MessID, Filter)).
504
505
506 %% @doc Get the total result set size.
507 %% "SELECT COUNT(*) as "count" FROM mam_muc_message WHERE "
508 -spec calc_count(PoolName, RoomJID, HostType, Filter) -> Count
509 when
510 PoolName :: mongoose_cassandra:pool_name(),
511 RoomJID :: jid:jid(),
512 HostType :: host_type(),
513 Filter :: filter(),
514 Count :: non_neg_integer().
515 calc_count(PoolName, RoomJID, _HostType, Filter) ->
516
:-(
QueryName = {calc_count_query, select_filter(Filter)},
517
:-(
Params = eval_filter_params(Filter),
518
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName,
519 Params),
520
:-(
Count.
521
522 %% @doc Convert offset to index of the first entry
523 %% Returns undefined if not there are not enough rows
524 %% Uses previously calculated offsets to speed up queries
525 -spec offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id when
526 PoolName :: mongoose_cassandra:pool_name(),
527 RoomJID :: jid:jid(),
528 Offset :: non_neg_integer(),
529 Filter :: filter(),
530 Id :: non_neg_integer() | undefined.
531 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
532 Offset =< 100 ->
533
:-(
calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset);
534 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
535
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
536 %% Try to find already calculated nearby offset to reduce query size
537
:-(
case mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, prev_offset_query, Params) of
538 {ok, []} -> %% No hints, just calculate offset sloooowly
539
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset),
540
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, 0, Offset, StartId);
541 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
542 %% Offset hint found, use it to reduce query size
543
:-(
case Offset of
544
:-(
PrevOffset -> PrevId;
545 _ ->
546
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID,
547 Filter#mam_muc_ca_filter{start_id = PrevId},
548 Offset - PrevOffset + 1),
549
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, PrevOffset, Offset, StartId)
550 end
551 end.
552
553 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
554 %% Hint is save only if previous offset hint was 50+ entires from current query
555 %% This function returns given StartId as passthrough for convenience
556 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), RoomJID :: jid:jid(),
557 Filter :: filter(), HintOffset :: non_neg_integer(),
558 NewOffset :: non_neg_integer(),
559 StartId :: non_neg_integer() | undefined) ->
560 StartId :: non_neg_integer() | undefined.
561 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
562 StartId = undefined) ->
563
:-(
StartId;
564 maybe_save_offset_hint(PoolName, RoomJID, Filter, HintOffset, NewOffset, StartId) ->
565
:-(
case abs(NewOffset - HintOffset) > 50 of
566 true ->
567
:-(
#mam_muc_ca_filter{room_jid = FRoomJID, with_nick = FWithNick} = Filter,
568
:-(
Row = #{room_jid => FRoomJID, with_nick => FWithNick,
569 offset => NewOffset, id => StartId},
570
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
571 insert_offset_hint_query, [Row]);
572 false ->
573
:-(
skip
574 end,
575
:-(
StartId.
576
577 %% @doc Convert offset to index of the first entry
578 %% Returns undefined if not there are not enough rows
579 -spec calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id
580 when
581 PoolName :: mongoose_cassandra:pool_name(),
582 RoomJID :: jid:jid(),
583 Offset :: non_neg_integer(),
584 Filter :: filter(),
585 Id :: non_neg_integer() | undefined.
586 calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
587
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
588
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
589
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
590
:-(
case RowsIds of
591
:-(
[] -> undefined;
592 [_ | _] ->
593
:-(
maps:get(id, lists:last(RowsIds))
594 end.
595
596 %% @doc Get closest offset -> message id 'hint' for specified offset
597 prev_offset_query_cql() ->
598
:-(
"SELECT id, offset FROM mam_muc_message_offset WHERE room_jid = ? and with_nick = ?"
599 " and offset <= ? LIMIT 1".
600
601 %% @doc Insert offset -> message id 'hint'
602 insert_offset_hint_query_cql() ->
603
:-(
"INSERT INTO mam_muc_message_offset(room_jid, with_nick, id, offset) VALUES(?, ?, ?, ?)".
604
605 prepare_filter(RoomJID, Borders, Start, End, WithNick) ->
606
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
607
:-(
StartID = maybe_encode_compact_uuid(Start, 0),
608
:-(
EndID = maybe_encode_compact_uuid(End, 255),
609
:-(
StartID2 = apply_start_border(Borders, StartID),
610
:-(
EndID2 = apply_end_border(Borders, EndID),
611
:-(
BWithNick = maybe_nick(WithNick),
612
:-(
prepare_filter_params(BRoomJID, BWithNick, StartID2, EndID2).
613
614 prepare_filter_params(BRoomJID, BWithNick, StartID, EndID) ->
615
:-(
#mam_muc_ca_filter{
616 room_jid = BRoomJID,
617 with_nick = BWithNick,
618 start_id = StartID,
619 end_id = EndID
620 }.
621
622 eval_filter_params(#mam_muc_ca_filter{
623 room_jid = BRoomJID,
624 with_nick = BWithNick,
625 start_id = StartID,
626 end_id = EndID
627 }) ->
628
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
629 #{start_id => StartID, end_id =>EndID}),
630
:-(
maps:merge(#{room_jid => BRoomJID, with_nick => BWithNick}, Optional).
631
632 select_filter(#mam_muc_ca_filter{
633 start_id = StartID,
634 end_id = EndID
635 }) ->
636
:-(
select_filter(StartID, EndID).
637
638
639 -spec select_filter(StartID, EndID) ->
640 all | 'end' | start | start_end when
641 StartID :: integer() | undefined,
642 EndID :: integer() | undefined.
643 select_filter(undefined, undefined) ->
644
:-(
all;
645 select_filter(undefined, _) ->
646
:-(
'end';
647 select_filter(_, undefined) ->
648
:-(
start;
649 select_filter(_, _) ->
650
:-(
start_end.
651
652 prepare_filter_cql(StartID, EndID) ->
653 case StartID of
654
:-(
undefined -> "";
655
:-(
_ -> " AND id >= :start_id"
656
:-(
end ++
657 case EndID of
658
:-(
undefined -> "";
659
:-(
_ -> " AND id <= :end_id"
660 end.
661
662 filter_to_cql() ->
663
:-(
[{select_filter(StartID, EndID),
664 prepare_filter_cql(StartID, EndID)}
665
:-(
|| StartID <- [undefined, 0],
666
:-(
EndID <- [undefined, 0]].
667
668 -spec calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
669 when
670 PoolName :: mongoose_cassandra:pool_name(),
671 RoomJID :: jid:jid(),
672 HostType :: host_type(),
673 Filter :: filter(),
674 PageSize :: non_neg_integer(),
675 TotalCount :: non_neg_integer(),
676 RSM :: jlib:rsm_in() | undefined,
677 Offset :: non_neg_integer().
678 %% Requesting the Last Page in a Result Set
679 calc_offset(_W, _RoomJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
680
:-(
max(0, TC - PS);
681 calc_offset(PoolName, RoomJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
682 when is_integer(ID) ->
683
:-(
max(0, calc_before(PoolName, RoomJID, HostType, F, ID) - PS);
684 calc_offset(PoolName, RoomJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
685 when is_integer(ID) ->
686
:-(
calc_index(PoolName, RoomJID, HostType, F, ID);
687 calc_offset(_W, _RoomJID, _LS, _F, _PS, _TC, _RSM) ->
688
:-(
0.
689
690 maybe_encode_compact_uuid(undefined, _) ->
691
:-(
undefined;
692 maybe_encode_compact_uuid(Microseconds, NodeID) ->
693
:-(
encode_compact_uuid(Microseconds, NodeID).
694
695 maybe_nick(undefined) ->
696
:-(
<<>>;
697 maybe_nick(WithNick) when is_binary(WithNick) ->
698
:-(
WithNick.
699
700 %%====================================================================
701 %% Internal SQL part
702 %%====================================================================
703
704 extract_messages_queries() ->
705
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
706
:-(
|| {FilterName, Filter} <- filter_to_cql()].
707
708 extract_messages_r_queries() ->
709
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
710
:-(
|| {FilterName, Filter} <- filter_to_cql()].
711
712 calc_count_queries() ->
713
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
714
:-(
|| {FilterName, Filter} <- filter_to_cql()].
715
716 list_message_ids_queries() ->
717
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
718
:-(
|| {FilterName, Filter} <- filter_to_cql()].
719
720 extract_messages_cql(Filter) ->
721 "SELECT id, nick_name, message FROM mam_muc_message "
722
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
723 Filter ++ " ORDER BY id LIMIT ?".
724
725 extract_messages_r_cql(Filter) ->
726 "SELECT id, nick_name, message FROM mam_muc_message "
727
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
728 Filter ++ " ORDER BY id DESC LIMIT ?".
729
730 fetch_user_messages_cql() ->
731 %% attempt to order results in the next error:
732 %% "ORDER BY with 2ndary indexes is not supported."
733
:-(
"SELECT id, message FROM mam_muc_message "
734 "WHERE from_jid = ?".
735
736 calc_count_cql(Filter) ->
737 "SELECT COUNT(*) FROM mam_muc_message "
738
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter.
739
740 list_message_ids_cql(Filter) ->
741 "SELECT id FROM mam_muc_message "
742
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter ++
743 " ORDER BY id LIMIT ?".
744
745 %% ----------------------------------------------------------------------
746 %% Optimizations
747
748 packet_to_stored_binary(HostType, Packet) ->
749 %% Module implementing mam_muc_message behaviour
750
:-(
Module = db_message_format(HostType),
751
:-(
Module:encode(Packet).
752
753 stored_binary_to_packet(HostType, Bin) ->
754 %% Module implementing mam_muc_message behaviour
755
:-(
Module = db_message_format(HostType),
756
:-(
Module:decode(Bin).
757
758 %% ----------------------------------------------------------------------
759 %% Params getters
760
761 -spec db_message_format(HostType :: host_type()) -> module().
762 db_message_format(HostType) ->
763
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format, mam_message_xml).
764
765 -spec pool_name(HostType :: host_type()) -> mongoose_wpool:pool_name().
766 pool_name(HostType) ->
767
:-(
gen_mod:get_module_opt(HostType, ?MODULE, pool_name, default).
Line Hits Source