./ct_report/coverage/mod_mam_muc_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_muc_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10
11 %% gen_mod handlers
12 -export([start/2, stop/1]).
13
14 %% MAM hook handlers
15 -export([archive_size/4,
16 archive_message/3,
17 lookup_messages/3,
18 remove_archive/4]).
19
20 %% mongoose_cassandra callbacks
21 -export([prepared_queries/0, get_mam_muc_gdpr_data/3]).
22
23 -ignore_xref([
24 behaviour_info/1, remove_archive/4, start/2, stop/1
25 ]).
26
27 %% ----------------------------------------------------------------------
28 %% Imports
29
30 %% UID
31 -import(mod_mam_utils,
32 [encode_compact_uuid/2]).
33
34 %% Other
35 -import(mod_mam_utils,
36 [maybe_min/2,
37 maybe_max/2,
38 apply_start_border/2,
39 apply_end_border/2]).
40
41 -include("jlib.hrl").
42 -include("mongoose_rsm.hrl").
43
44 -record(mam_muc_ca_filter, {
45 room_jid,
46 with_nick,
47 nick_name,
48 start_id,
49 end_id
50 }).
51
52 -record(mam_muc_message, {
53 id :: non_neg_integer(),
54 room_jid :: binary(),
55 from_jid :: binary() | undefined,
56 nick_name :: binary(),
57 with_nick :: binary(),
58 message :: binary() | undefined
59 }).
60
61 -callback encode(binary()) -> binary().
62 -callback decode(binary()) -> binary().
63
64 %% ----------------------------------------------------------------------
65 %% Types
66
67 -type filter() :: #mam_muc_ca_filter{}.
68 -type message_id() :: non_neg_integer().
69 -type host_type() :: mongooseim:host_type().
70
71 %% ----------------------------------------------------------------------
72 %% gen_mod callbacks
73 %% Starting and stopping functions for users' archives
74
75 -spec start(host_type(), gen_mod:module_opts()) -> ok.
76 start(HostType, _Opts) ->
77
:-(
ejabberd_hooks:add(hooks(HostType)).
78
79 -spec stop(host_type()) -> ok.
80 stop(HostType) ->
81
:-(
ejabberd_hooks:delete(hooks(HostType)).
82
83 %% ----------------------------------------------------------------------
84 %% Add hooks for mod_mam_muc
85
86 hooks(HostType) ->
87
:-(
[{mam_muc_archive_message, HostType, ?MODULE, archive_message, 50},
88 {mam_muc_archive_size, HostType, ?MODULE, archive_size, 50},
89 {mam_muc_lookup_messages, HostType, ?MODULE, lookup_messages, 50},
90 {mam_muc_remove_archive, HostType, ?MODULE, remove_archive, 50},
91 {get_mam_muc_gdpr_data, HostType, ?MODULE, get_mam_muc_gdpr_data, 50}].
92
93 %% ----------------------------------------------------------------------
94 %% mongoose_cassandra_worker callbacks
95
96 prepared_queries() ->
97 [
98 {insert_offset_hint_query, insert_offset_hint_query_cql()},
99 {prev_offset_query, prev_offset_query_cql()},
100 {insert_query, insert_query_cql()},
101 {fetch_user_messages_query, fetch_user_messages_cql()},
102 {select_for_removal_query, select_for_removal_query_cql()},
103 {remove_archive_query, remove_archive_query_cql()},
104 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}]
105
:-(
++ extract_messages_queries()
106 ++ extract_messages_r_queries()
107 ++ calc_count_queries()
108 ++ list_message_ids_queries().
109
110 %%====================================================================
111 %% Internal functions
112 %%====================================================================
113
114 %% ----------------------------------------------------------------------
115 %% Internal functions and callbacks
116
117 archive_size(Size, HostType, _RoomID, RoomJID) when is_integer(Size) ->
118
:-(
PoolName = pool_name(HostType),
119
:-(
Borders = Start = End = WithNick = undefined,
120
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
121
:-(
calc_count(PoolName, RoomJID, HostType, Filter).
122
123
124 %% ----------------------------------------------------------------------
125 %% INSERT MESSAGE
126
127 insert_query_cql() ->
128
:-(
"INSERT INTO mam_muc_message "
129 "(id, room_jid, from_jid, nick_name, with_nick, message) "
130 "VALUES (?, ?, ?, ?, ?, ?)".
131
132 archive_message(_Result, HostType, Params) ->
133
:-(
try
134
:-(
archive_message2(Params, HostType)
135 catch _Type:Reason ->
136
:-(
{error, Reason}
137 end.
138
139 archive_message2(#{message_id := MessID,
140 local_jid := LocJID = #jid{},
141 remote_jid := FromJID = #jid{},
142 source_jid := #jid{lresource = BNick},
143 packet := Packet}, HostType) ->
144
:-(
BLocJID = mod_mam_utils:bare_jid(LocJID),
145
:-(
BFromJID = mod_mam_utils:bare_jid(FromJID),
146
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
147
:-(
Messages = [#mam_muc_message{
148 id = MessID,
149 room_jid = BLocJID,
150 from_jid = BWithFromJID,
151 nick_name = BNick,
152 message = BPacket,
153 with_nick = BWithNick
154
:-(
} || {BWithNick, BWithFromJID} <- [{<<>>, BFromJID}, {BNick, <<>>}]],
155
:-(
PoolName = pool_name(HostType),
156
:-(
write_messages(HostType, PoolName, Messages).
157
158 write_messages(HostType, RoomJID, Messages) ->
159
:-(
PoolName = pool_name(HostType),
160
:-(
MultiParams = [message_to_params(M) || M <- Messages],
161
:-(
mongoose_cassandra:cql_write_async(PoolName, RoomJID, ?MODULE, insert_query, MultiParams).
162
163 message_to_params(#mam_muc_message{
164 id = MessID,
165 room_jid = BLocJID,
166 from_jid = BFromJID,
167 nick_name = BNick,
168 with_nick = BWithNick,
169 message = BPacket
170 }) ->
171
:-(
#{id => MessID, room_jid => BLocJID, from_jid => BFromJID,
172 nick_name => BNick, with_nick => BWithNick, message => BPacket}.
173
174
175 %% ----------------------------------------------------------------------
176 %% REMOVE ARCHIVE
177
178 remove_archive_query_cql() ->
179
:-(
"DELETE FROM mam_muc_message WHERE room_jid = ? AND with_nick = ?".
180
181 remove_archive_offsets_query_cql() ->
182
:-(
"DELETE FROM mam_muc_message_offset WHERE room_jid = ? AND with_nick = ?".
183
184 select_for_removal_query_cql() ->
185
:-(
"SELECT DISTINCT room_jid, with_nick FROM mam_muc_message WHERE room_jid = ?".
186
187 remove_archive(Acc, HostType, _RoomID, RoomJID) ->
188
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
189
:-(
PoolName = pool_name(HostType),
190
:-(
Params = #{room_jid => BRoomJID},
191 %% Wait until deleted
192
193
:-(
DeleteFun =
194 fun(Rows, _AccIn) ->
195
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
196 remove_archive_query, Rows),
197
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
198 remove_archive_offsets_query, Rows)
199 end,
200
201
:-(
mongoose_cassandra:cql_foldl(PoolName, RoomJID, ?MODULE,
202 select_for_removal_query, Params, DeleteFun, []),
203
:-(
Acc.
204
205 %% ----------------------------------------------------------------------
206 %% SELECT MESSAGES
207
208 -spec lookup_messages(Result :: any(), HostType :: host_type(), Params :: map()) ->
209 {ok, mod_mam:lookup_result()}.
210 lookup_messages({error, _Reason} = Result, _HostType, _Params) ->
211
:-(
Result;
212 lookup_messages(_Result, _HostType, #{search_text := <<_/binary>>}) ->
213
:-(
{error, 'not-supported'};
214 lookup_messages(_Result, HostType,
215 #{owner_jid := RoomJID, rsm := RSM, borders := Borders,
216 start_ts := Start, end_ts := End, with_jid := WithJID,
217 search_text := undefined, page_size := PageSize,
218 is_simple := IsSimple}) ->
219
:-(
try
220
:-(
WithNick = maybe_jid_to_nick(WithJID),
221
:-(
PoolName = pool_name(HostType),
222
:-(
lookup_messages2(PoolName, HostType,
223 RoomJID, RSM, Borders,
224 Start, End, WithNick,
225 PageSize, IsSimple)
226 catch _Type:Reason:S ->
227
:-(
{error, {Reason, {stacktrace, S}}}
228 end.
229
230
:-(
maybe_jid_to_nick(#jid{lresource = BNick}) -> BNick;
231
:-(
maybe_jid_to_nick(undefined) -> undefined.
232
233
234 lookup_messages2(PoolName, HostType,
235 RoomJID = #jid{}, RSM, Borders,
236 Start, End, WithNick,
237 PageSize, _IsSimple = true) ->
238 %% Simple query without calculating offset and total count
239
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
240
:-(
lookup_messages_simple(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
241 lookup_messages2(PoolName, HostType,
242 RoomJID = #jid{}, RSM, Borders,
243 Start, End, WithNick,
244 PageSize, _IsSimple) ->
245 %% Query with offset calculation
246 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
247 %% Not all queries are optimal. You would like to disable something for production
248 %% once you know how you will call bd
249
:-(
Strategy = rsm_to_strategy(RSM),
250
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
251
:-(
case Strategy of
252 last_page ->
253
:-(
lookup_messages_last_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
254 by_offset ->
255
:-(
lookup_messages_by_offset(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
256 first_page ->
257
:-(
lookup_messages_first_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
258 before_id ->
259
:-(
lookup_messages_before_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
260 after_id ->
261
:-(
lookup_messages_after_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter)
262 end.
263
264 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
265
:-(
last_page;
266 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
267
:-(
first_page;
268 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
269
:-(
by_offset;
270 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
271
:-(
before_id;
272 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
273
:-(
after_id;
274 rsm_to_strategy(#rsm_in{}) ->
275
:-(
first_page;
276 rsm_to_strategy(undefined) ->
277
:-(
first_page.
278
279 lookup_messages_simple(PoolName, HostType, RoomJID,
280 #rsm_in{direction = aft, id = ID},
281 PageSize, Filter) ->
282 %% Get last rows from result set
283
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, after_id(ID, Filter), PageSize, false),
284
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
285 lookup_messages_simple(PoolName, HostType, RoomJID,
286 #rsm_in{direction = before, id = ID},
287 PageSize, Filter) ->
288
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, before_id(ID, Filter), PageSize, true),
289
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
290 lookup_messages_simple(PoolName, HostType, RoomJID,
291 #rsm_in{direction = undefined, index = Offset},
292 PageSize, Filter) ->
293 %% Apply offset
294
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter,
295 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
296
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
297 false),
298
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
299 lookup_messages_simple(PoolName, HostType, RoomJID,
300 _,
301 PageSize, Filter) ->
302
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
303
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}.
304
305 lookup_messages_last_page(PoolName, HostType, RoomJID,
306 #rsm_in{direction = before, id = undefined},
307 0, Filter) ->
308 %% Last page
309
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
310
:-(
{ok, {TotalCount, TotalCount, []}};
311 lookup_messages_last_page(PoolName, HostType, RoomJID,
312 #rsm_in{direction = before, id = undefined},
313 PageSize, Filter) ->
314 %% Last page
315
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, true),
316
:-(
MessageRowsCount = length(MessageRows),
317
:-(
case MessageRowsCount < PageSize of
318 true ->
319
:-(
{ok, {MessageRowsCount, 0,
320 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
321 false ->
322
:-(
FirstID = row_to_message_id(hd(MessageRows)),
323
:-(
Offset = calc_count(PoolName, RoomJID, HostType, before_id(FirstID, Filter)),
324
:-(
{ok, {Offset + MessageRowsCount, Offset,
325 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
326 end.
327
328 lookup_messages_by_offset(PoolName, HostType, RoomJID,
329 #rsm_in{direction = undefined, index = Offset},
330 0, Filter) when is_integer(Offset) ->
331 %% By offset
332
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
333
:-(
{ok, {TotalCount, Offset, []}};
334 lookup_messages_by_offset(PoolName, HostType, RoomJID,
335 #rsm_in{direction = undefined, index = Offset},
336 PageSize, Filter) when is_integer(Offset) ->
337 %% By offset
338
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter, Offset), %% POTENTIALLY SLOW
339
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
340 false),
341
:-(
MessageRowsCount = length(MessageRows),
342
:-(
case MessageRowsCount < PageSize of
343 true ->
344
:-(
{ok, {Offset + MessageRowsCount, Offset,
345 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
346 false ->
347
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
348
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
349
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
350 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
351 end.
352
353 lookup_messages_first_page(PoolName, HostType, RoomJID,
354 _,
355 0, Filter) ->
356 %% First page, just count
357
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
358
:-(
{ok, {TotalCount, 0, []}};
359 lookup_messages_first_page(PoolName, HostType, RoomJID,
360 _,
361 PageSize, Filter) ->
362 %% First page
363
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
364
:-(
MessageRowsCount = length(MessageRows),
365
:-(
case MessageRowsCount < PageSize of
366 true ->
367
:-(
{ok, {MessageRowsCount, 0,
368 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
369 false ->
370
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
371
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
372
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
373 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
374 end.
375
376 lookup_messages_before_id(PoolName, HostType, RoomJID,
377 RSM = #rsm_in{direction = before, id = ID},
378 PageSize, Filter) ->
379
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
380
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
381
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, to_id(ID, Filter),
382 PageSize + 1, true),
383
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
384
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
385
386 lookup_messages_after_id(PoolName, HostType, RoomJID,
387 RSM = #rsm_in{direction = aft, id = ID},
388 PageSize, Filter) ->
389
:-(
PoolName = pool_name(HostType),
390
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
391
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
392
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(ID, Filter),
393 PageSize + 1, false),
394
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
395
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
396
397
398 after_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
399
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
400
401 before_id(undefined, Filter) ->
402
:-(
Filter;
403 before_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
404
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
405
406 to_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
407
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID, BeforeID)}.
408
409 from_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
410
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID, AfterID)}.
411
412 rows_to_uniform_format(MessageRows, HostType, RoomJID) ->
413
:-(
[row_to_uniform_format(Row, HostType, RoomJID) || Row <- MessageRows].
414
415 row_to_uniform_format(#{nick_name := BNick, message := Data, id := MessID},
416 HostType, RoomJID) ->
417
:-(
SrcJID = jid:replace_resource(RoomJID, BNick),
418
:-(
Packet = stored_binary_to_packet(HostType, Data),
419
:-(
#{id => MessID, jid => SrcJID, packet => Packet}.
420
421 row_to_message_id(#{id := MsgID}) ->
422
:-(
MsgID.
423
424 -spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
425 host_type(), jid:jid()) ->
426 ejabberd_gen_mam_archive:mam_muc_gdpr_data().
427 get_mam_muc_gdpr_data(Acc, HostType, Jid) ->
428
:-(
BinJid = jid:to_binary(jid:to_lower(Jid)),
429
:-(
PoolName = pool_name(HostType),
430
:-(
FilterMap = #{from_jid => BinJid},
431
:-(
Rows = fetch_user_messages(PoolName, Jid, FilterMap),
432
:-(
Messages = [{Id, exml:to_binary(stored_binary_to_packet(HostType, Data))}
433
:-(
|| #{message := Data, id:= Id} <- Rows],
434
:-(
Messages ++ Acc.
435
436 %% Offset is not supported
437 %% Each record is a tuple of form
438 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
439 %% Columns are `["id", "nick_name", "message"]'.
440 -spec extract_messages(PoolName, RoomJID, HostType, Filter, IMax, ReverseLimit) ->
441 [Row] when
442 PoolName :: mongoose_cassandra:pool_name(),
443 RoomJID :: jid:jid(),
444 HostType :: host_type(),
445 Filter :: filter(),
446 IMax :: pos_integer(),
447 ReverseLimit :: boolean(),
448 Row :: mongoose_cassandra:row().
449 extract_messages(_Worker, _RoomJID, _HostType, _Filter, 0, _) ->
450
:-(
[];
451 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, false) ->
452
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
453
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
454
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
455
:-(
Rows;
456 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, true) ->
457
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
458
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
459
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
460
:-(
lists:reverse(Rows).
461
462 fetch_user_messages(PoolName, UserJID, FilterMap) ->
463
:-(
QueryName = fetch_user_messages_query,
464
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
465
:-(
lists:sort(Rows).
466
467
468 %% @doc Calculate a zero-based index of the row with UID in the result test.
469 %%
470 %% If the element does not exists, the ID of the next element will
471 %% be returned instead.
472 %% @end
473 -spec calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> Count
474 when
475 PoolName :: mongoose_cassandra:pool_name(),
476 RoomJID :: jid:jid(),
477 HostType :: host_type(),
478 Filter :: filter(),
479 MessID :: message_id(),
480 Count :: non_neg_integer().
481 calc_index(PoolName, RoomJID, HostType, Filter, MessID) ->
482
:-(
calc_count(PoolName, RoomJID, HostType, to_id(MessID, Filter)).
483
484 %% @doc Count of elements in RSet before the passed element.
485 %%
486 %% The element with the passed UID can be already deleted.
487 %% @end
488 -spec calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> Count
489 when
490 PoolName :: mongoose_cassandra:pool_name(),
491 RoomJID :: jid:jid(),
492 HostType :: host_type(),
493 Filter :: filter(),
494 MessID :: message_id(),
495 Count :: non_neg_integer().
496 calc_before(PoolName, RoomJID, HostType, Filter, MessID) ->
497
:-(
calc_count(PoolName, RoomJID, HostType, before_id(MessID, Filter)).
498
499
500 %% @doc Get the total result set size.
501 %% "SELECT COUNT(*) as "count" FROM mam_muc_message WHERE "
502 -spec calc_count(PoolName, RoomJID, HostType, Filter) -> Count
503 when
504 PoolName :: mongoose_cassandra:pool_name(),
505 RoomJID :: jid:jid(),
506 HostType :: host_type(),
507 Filter :: filter(),
508 Count :: non_neg_integer().
509 calc_count(PoolName, RoomJID, _HostType, Filter) ->
510
:-(
QueryName = {calc_count_query, select_filter(Filter)},
511
:-(
Params = eval_filter_params(Filter),
512
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName,
513 Params),
514
:-(
Count.
515
516 %% @doc Convert offset to index of the first entry
517 %% Returns undefined if not there are not enough rows
518 %% Uses previously calculated offsets to speed up queries
519 -spec offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id when
520 PoolName :: mongoose_cassandra:pool_name(),
521 RoomJID :: jid:jid(),
522 Offset :: non_neg_integer(),
523 Filter :: filter(),
524 Id :: non_neg_integer() | undefined.
525 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
526 Offset =< 100 ->
527
:-(
calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset);
528 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
529
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
530 %% Try to find already calculated nearby offset to reduce query size
531
:-(
case mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, prev_offset_query, Params) of
532 {ok, []} -> %% No hints, just calculate offset sloooowly
533
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset),
534
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, 0, Offset, StartId);
535 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
536 %% Offset hint found, use it to reduce query size
537
:-(
case Offset of
538
:-(
PrevOffset -> PrevId;
539 _ ->
540
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID,
541 Filter#mam_muc_ca_filter{start_id = PrevId},
542 Offset - PrevOffset + 1),
543
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, PrevOffset, Offset, StartId)
544 end
545 end.
546
547 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
548 %% Hint is save only if previous offset hint was 50+ entires from current query
549 %% This function returns given StartId as passthrough for convenience
550 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), RoomJID :: jid:jid(),
551 Filter :: filter(), HintOffset :: non_neg_integer(),
552 NewOffset :: non_neg_integer(),
553 StartId :: non_neg_integer() | undefined) ->
554 StartId :: non_neg_integer() | undefined.
555 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
556 StartId = undefined) ->
557
:-(
StartId;
558 maybe_save_offset_hint(PoolName, RoomJID, Filter, HintOffset, NewOffset, StartId) ->
559
:-(
case abs(NewOffset - HintOffset) > 50 of
560 true ->
561
:-(
#mam_muc_ca_filter{room_jid = FRoomJID, with_nick = FWithNick} = Filter,
562
:-(
Row = #{room_jid => FRoomJID, with_nick => FWithNick,
563 offset => NewOffset, id => StartId},
564
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
565 insert_offset_hint_query, [Row]);
566 false ->
567
:-(
skip
568 end,
569
:-(
StartId.
570
571 %% @doc Convert offset to index of the first entry
572 %% Returns undefined if not there are not enough rows
573 -spec calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id
574 when
575 PoolName :: mongoose_cassandra:pool_name(),
576 RoomJID :: jid:jid(),
577 Offset :: non_neg_integer(),
578 Filter :: filter(),
579 Id :: non_neg_integer() | undefined.
580 calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
581
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
582
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
583
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
584
:-(
case RowsIds of
585
:-(
[] -> undefined;
586 [_ | _] ->
587
:-(
maps:get(id, lists:last(RowsIds))
588 end.
589
590 %% @doc Get closest offset -> message id 'hint' for specified offset
591 prev_offset_query_cql() ->
592
:-(
"SELECT id, offset FROM mam_muc_message_offset WHERE room_jid = ? and with_nick = ?"
593 " and offset <= ? LIMIT 1".
594
595 %% @doc Insert offset -> message id 'hint'
596 insert_offset_hint_query_cql() ->
597
:-(
"INSERT INTO mam_muc_message_offset(room_jid, with_nick, id, offset) VALUES(?, ?, ?, ?)".
598
599 prepare_filter(RoomJID, Borders, Start, End, WithNick) ->
600
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
601
:-(
StartID = maybe_encode_compact_uuid(Start, 0),
602
:-(
EndID = maybe_encode_compact_uuid(End, 255),
603
:-(
StartID2 = apply_start_border(Borders, StartID),
604
:-(
EndID2 = apply_end_border(Borders, EndID),
605
:-(
BWithNick = maybe_nick(WithNick),
606
:-(
prepare_filter_params(BRoomJID, BWithNick, StartID2, EndID2).
607
608 prepare_filter_params(BRoomJID, BWithNick, StartID, EndID) ->
609
:-(
#mam_muc_ca_filter{
610 room_jid = BRoomJID,
611 with_nick = BWithNick,
612 start_id = StartID,
613 end_id = EndID
614 }.
615
616 eval_filter_params(#mam_muc_ca_filter{
617 room_jid = BRoomJID,
618 with_nick = BWithNick,
619 start_id = StartID,
620 end_id = EndID
621 }) ->
622
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
623 #{start_id => StartID, end_id =>EndID}),
624
:-(
maps:merge(#{room_jid => BRoomJID, with_nick => BWithNick}, Optional).
625
626 select_filter(#mam_muc_ca_filter{
627 start_id = StartID,
628 end_id = EndID
629 }) ->
630
:-(
select_filter(StartID, EndID).
631
632
633 -spec select_filter(StartID, EndID) ->
634 all | 'end' | start | start_end when
635 StartID :: integer() | undefined,
636 EndID :: integer() | undefined.
637 select_filter(undefined, undefined) ->
638
:-(
all;
639 select_filter(undefined, _) ->
640
:-(
'end';
641 select_filter(_, undefined) ->
642
:-(
start;
643 select_filter(_, _) ->
644
:-(
start_end.
645
646 prepare_filter_cql(StartID, EndID) ->
647 case StartID of
648
:-(
undefined -> "";
649
:-(
_ -> " AND id >= :start_id"
650
:-(
end ++
651 case EndID of
652
:-(
undefined -> "";
653
:-(
_ -> " AND id <= :end_id"
654 end.
655
656 filter_to_cql() ->
657
:-(
[{select_filter(StartID, EndID),
658 prepare_filter_cql(StartID, EndID)}
659
:-(
|| StartID <- [undefined, 0],
660
:-(
EndID <- [undefined, 0]].
661
662 -spec calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
663 when
664 PoolName :: mongoose_cassandra:pool_name(),
665 RoomJID :: jid:jid(),
666 HostType :: host_type(),
667 Filter :: filter(),
668 PageSize :: non_neg_integer(),
669 TotalCount :: non_neg_integer(),
670 RSM :: jlib:rsm_in() | undefined,
671 Offset :: non_neg_integer().
672 %% Requesting the Last Page in a Result Set
673 calc_offset(_W, _RoomJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
674
:-(
max(0, TC - PS);
675 calc_offset(PoolName, RoomJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
676 when is_integer(ID) ->
677
:-(
max(0, calc_before(PoolName, RoomJID, HostType, F, ID) - PS);
678 calc_offset(PoolName, RoomJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
679 when is_integer(ID) ->
680
:-(
calc_index(PoolName, RoomJID, HostType, F, ID);
681 calc_offset(_W, _RoomJID, _LS, _F, _PS, _TC, _RSM) ->
682
:-(
0.
683
684 maybe_encode_compact_uuid(undefined, _) ->
685
:-(
undefined;
686 maybe_encode_compact_uuid(Microseconds, NodeID) ->
687
:-(
encode_compact_uuid(Microseconds, NodeID).
688
689 maybe_nick(undefined) ->
690
:-(
<<>>;
691 maybe_nick(WithNick) when is_binary(WithNick) ->
692
:-(
WithNick.
693
694 %%====================================================================
695 %% Internal SQL part
696 %%====================================================================
697
698 extract_messages_queries() ->
699
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
700
:-(
|| {FilterName, Filter} <- filter_to_cql()].
701
702 extract_messages_r_queries() ->
703
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
704
:-(
|| {FilterName, Filter} <- filter_to_cql()].
705
706 calc_count_queries() ->
707
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
708
:-(
|| {FilterName, Filter} <- filter_to_cql()].
709
710 list_message_ids_queries() ->
711
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
712
:-(
|| {FilterName, Filter} <- filter_to_cql()].
713
714 extract_messages_cql(Filter) ->
715 "SELECT id, nick_name, message FROM mam_muc_message "
716
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
717 Filter ++ " ORDER BY id LIMIT ?".
718
719 extract_messages_r_cql(Filter) ->
720 "SELECT id, nick_name, message FROM mam_muc_message "
721
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
722 Filter ++ " ORDER BY id DESC LIMIT ?".
723
724 fetch_user_messages_cql() ->
725 %% attempt to order results in the next error:
726 %% "ORDER BY with 2ndary indexes is not supported."
727
:-(
"SELECT id, message FROM mam_muc_message "
728 "WHERE from_jid = ?".
729
730 calc_count_cql(Filter) ->
731 "SELECT COUNT(*) FROM mam_muc_message "
732
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter.
733
734 list_message_ids_cql(Filter) ->
735 "SELECT id FROM mam_muc_message "
736
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter ++
737 " ORDER BY id LIMIT ?".
738
739 %% ----------------------------------------------------------------------
740 %% Optimizations
741
742 packet_to_stored_binary(HostType, Packet) ->
743 %% Module implementing mam_muc_message behaviour
744
:-(
Module = db_message_format(HostType),
745
:-(
Module:encode(Packet).
746
747 stored_binary_to_packet(HostType, Bin) ->
748 %% Module implementing mam_muc_message behaviour
749
:-(
Module = db_message_format(HostType),
750
:-(
Module:decode(Bin).
751
752 %% ----------------------------------------------------------------------
753 %% Params getters
754
755 -spec db_message_format(HostType :: host_type()) -> module().
756 db_message_format(HostType) ->
757
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
758
759 -spec pool_name(HostType :: host_type()) -> mongoose_wpool:pool_name().
760 pool_name(HostType) ->
761
:-(
default.
Line Hits Source