./ct_report/coverage/mod_mam_muc_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_muc_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11
12 %% gen_mod handlers
13 -export([start/2, stop/1, hooks/1]).
14
15 %% MAM hook handlers
16 -export([archive_size/3,
17 archive_message/3,
18 lookup_messages/3,
19 remove_archive/3,
20 get_mam_muc_gdpr_data/3]).
21
22 %% mongoose_cassandra callbacks
23 -export([prepared_queries/0]).
24
25 %% ----------------------------------------------------------------------
26 %% Imports
27
28 %% UID
29 -import(mod_mam_utils,
30 [encode_compact_uuid/2]).
31
32 %% Other
33 -import(mod_mam_utils,
34 [maybe_min/2,
35 maybe_max/2,
36 apply_start_border/2,
37 apply_end_border/2]).
38
39 -include("mongoose.hrl").
40 -include("jlib.hrl").
41 -include("mongoose_rsm.hrl").
42
43 -record(mam_muc_ca_filter, {
44 room_jid,
45 with_nick,
46 nick_name,
47 start_id,
48 end_id
49 }).
50
51 -record(mam_muc_message, {
52 id :: non_neg_integer(),
53 room_jid :: binary(),
54 from_jid :: binary() | undefined,
55 nick_name :: binary(),
56 with_nick :: binary(),
57 message :: binary() | undefined
58 }).
59
60 -callback encode(binary()) -> binary().
61 -callback decode(binary()) -> binary().
62
63 %% ----------------------------------------------------------------------
64 %% Types
65
66 -type filter() :: #mam_muc_ca_filter{}.
67 -type message_id() :: non_neg_integer().
68 -type host_type() :: mongooseim:host_type().
69
70 %% ----------------------------------------------------------------------
71 %% gen_mod callbacks
72 %% Starting and stopping functions for users' archives
73
74 -spec start(host_type(), gen_mod:module_opts()) -> ok.
75 start(_HostType, _Opts) ->
76
:-(
ok.
77
78 -spec stop(host_type()) -> ok.
79 stop(_HostType) ->
80
:-(
ok.
81
82 %% ----------------------------------------------------------------------
83 %% Add hooks for mod_mam_muc
84
85 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
86 hooks(HostType) ->
87
:-(
[{mam_muc_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
88 {mam_muc_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
89 {mam_muc_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
90 {mam_muc_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
91 {get_mam_muc_gdpr_data, HostType, fun ?MODULE:get_mam_muc_gdpr_data/3, #{}, 50}].
92
93 %% ----------------------------------------------------------------------
94 %% mongoose_cassandra_worker callbacks
95
96 prepared_queries() ->
97 [
98 {insert_offset_hint_query, insert_offset_hint_query_cql()},
99 {prev_offset_query, prev_offset_query_cql()},
100 {insert_query, insert_query_cql()},
101 {fetch_user_messages_query, fetch_user_messages_cql()},
102 {select_for_removal_query, select_for_removal_query_cql()},
103 {remove_archive_query, remove_archive_query_cql()},
104 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}]
105
:-(
++ extract_messages_queries()
106 ++ extract_messages_r_queries()
107 ++ calc_count_queries()
108 ++ list_message_ids_queries().
109
110 %%====================================================================
111 %% Internal functions
112 %%====================================================================
113
114 %% ----------------------------------------------------------------------
115 %% Internal functions and callbacks
116
117 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
118 Acc :: integer(),
119 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
120 Extra :: gen_hook:extra().
121 archive_size(Size, #{room := RoomJID}, #{host_type := HostType}) when is_integer(Size) ->
122
:-(
PoolName = pool_name(HostType),
123
:-(
Borders = Start = End = WithNick = undefined,
124
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, undefined),
125
:-(
{ok, calc_count(PoolName, RoomJID, HostType, Filter)}.
126
127
128 %% ----------------------------------------------------------------------
129 %% INSERT MESSAGE
130
131 insert_query_cql() ->
132
:-(
"INSERT INTO mam_muc_message "
133 "(id, room_jid, from_jid, nick_name, with_nick, message) "
134 "VALUES (?, ?, ?, ?, ?, ?)".
135
136 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
137 Acc :: ok | {error, term()},
138 Params :: mod_mam:archive_message_params(),
139 Extra :: gen_hook:extra().
140 archive_message(_Result, Params, #{host_type := HostType}) ->
141
:-(
try
142
:-(
{ok, archive_message2(Params, HostType)}
143 catch _Type:Reason:StackTrace ->
144
:-(
mongoose_instrument:execute(mod_mam_muc_dropped, #{host_type => HostType}, #{count => 1}),
145
:-(
?LOG_ERROR(#{what => archive_muc_message_failed,
146 host_type => HostType, mam_params => Params,
147
:-(
reason => Reason, stacktrace => StackTrace}),
148
:-(
{ok, {error, Reason}}
149 end.
150
151 archive_message2(#{message_id := MessID,
152 local_jid := LocJID = #jid{},
153 remote_jid := FromJID = #jid{},
154 source_jid := #jid{lresource = BNick},
155 packet := Packet}, HostType) ->
156
:-(
BLocJID = mod_mam_utils:bare_jid(LocJID),
157
:-(
BFromJID = mod_mam_utils:bare_jid(FromJID),
158
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
159
:-(
Messages = [#mam_muc_message{
160 id = MessID,
161 room_jid = BLocJID,
162 from_jid = BWithFromJID,
163 nick_name = BNick,
164 message = BPacket,
165 with_nick = BWithNick
166
:-(
} || {BWithNick, BWithFromJID} <- [{<<>>, BFromJID}, {BNick, <<>>}]],
167
:-(
ok = write_messages(HostType, Messages).
168
169 write_messages(HostType, Messages) ->
170
:-(
PoolName = pool_name(HostType),
171
:-(
RoomJID = undefined,
172
:-(
MultiParams = [message_to_params(M) || M <- Messages],
173
:-(
mongoose_cassandra:cql_write_async(PoolName, RoomJID, ?MODULE, insert_query, MultiParams).
174
175 message_to_params(#mam_muc_message{
176 id = MessID,
177 room_jid = BLocJID,
178 from_jid = BFromJID,
179 nick_name = BNick,
180 with_nick = BWithNick,
181 message = BPacket
182 }) ->
183
:-(
#{id => MessID, room_jid => BLocJID, from_jid => BFromJID,
184 nick_name => BNick, with_nick => BWithNick, message => BPacket}.
185
186
187 %% ----------------------------------------------------------------------
188 %% REMOVE ARCHIVE
189
190 remove_archive_query_cql() ->
191
:-(
"DELETE FROM mam_muc_message WHERE room_jid = ? AND with_nick = ?".
192
193 remove_archive_offsets_query_cql() ->
194
:-(
"DELETE FROM mam_muc_message_offset WHERE room_jid = ? AND with_nick = ?".
195
196 select_for_removal_query_cql() ->
197
:-(
"SELECT DISTINCT room_jid, with_nick FROM mam_muc_message WHERE room_jid = ?".
198
199 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
200 Acc :: term(),
201 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
202 Extra :: gen_hook:extra().
203 remove_archive(Acc, #{room := RoomJID}, #{host_type := HostType}) ->
204
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
205
:-(
PoolName = pool_name(HostType),
206
:-(
Params = #{room_jid => BRoomJID},
207 %% Wait until deleted
208
209
:-(
DeleteFun =
210 fun(Rows, _AccIn) ->
211
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
212 remove_archive_query, Rows),
213
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
214 remove_archive_offsets_query, Rows)
215 end,
216
217
:-(
mongoose_cassandra:cql_foldl(PoolName, RoomJID, ?MODULE,
218 select_for_removal_query, Params, DeleteFun, []),
219
:-(
{ok, Acc}.
220
221 %% ----------------------------------------------------------------------
222 %% SELECT MESSAGES
223
224 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
225 Acc :: {ok, mod_mam:lookup_result()},
226 Params :: mam_iq:lookup_params(),
227 Extra :: gen_hook:extra().
228 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
229
:-(
{ok, Result};
230 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
231
:-(
{ok, {error, 'not-supported'}};
232 lookup_messages(_Result,
233 #{owner_jid := RoomJID, rsm := RSM, borders := Borders,
234 start_ts := Start, end_ts := End, with_jid := WithJID,
235 search_text := undefined, page_size := PageSize,
236 is_simple := IsSimple, message_id := MsgID},
237 #{host_type := HostType}) ->
238
:-(
try
239
:-(
WithNick = maybe_jid_to_nick(WithJID),
240
:-(
PoolName = pool_name(HostType),
241
:-(
{ok, lookup_messages2(PoolName, HostType,
242 RoomJID, RSM, Borders,
243 Start, End, WithNick,
244 PageSize, MsgID, IsSimple)}
245 catch _Type:Reason:Stacktrace ->
246
:-(
{ok, {error, {Reason, {stacktrace, Stacktrace}}}}
247 end.
248
249
:-(
maybe_jid_to_nick(#jid{lresource = BNick}) -> BNick;
250
:-(
maybe_jid_to_nick(undefined) -> undefined.
251
252
253 lookup_messages2(PoolName, HostType,
254 RoomJID = #jid{}, RSM, Borders,
255 Start, End, WithNick,
256 PageSize, MsgID, _IsSimple = true) ->
257 %% Simple query without calculating offset and total count
258
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID),
259
:-(
lookup_messages_simple(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
260 lookup_messages2(PoolName, HostType,
261 RoomJID = #jid{}, RSM, Borders,
262 Start, End, WithNick,
263 PageSize, MsgID, _IsSimple) ->
264 %% Query with offset calculation
265 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
266 %% Not all queries are optimal. You would like to disable something for production
267 %% once you know how you will call bd
268
:-(
Strategy = rsm_to_strategy(RSM),
269
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID),
270
:-(
case Strategy of
271 last_page ->
272
:-(
lookup_messages_last_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
273 by_offset ->
274
:-(
lookup_messages_by_offset(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
275 first_page ->
276
:-(
lookup_messages_first_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
277 before_id ->
278
:-(
lookup_messages_before_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
279 after_id ->
280
:-(
lookup_messages_after_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter)
281 end.
282
283 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
284
:-(
last_page;
285 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
286
:-(
first_page;
287 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
288
:-(
by_offset;
289 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
290
:-(
before_id;
291 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
292
:-(
after_id;
293 rsm_to_strategy(#rsm_in{}) ->
294
:-(
first_page;
295 rsm_to_strategy(undefined) ->
296
:-(
first_page.
297
298 lookup_messages_simple(PoolName, HostType, RoomJID,
299 #rsm_in{direction = aft, id = ID},
300 PageSize, Filter) ->
301 %% Get last rows from result set
302
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, after_id(ID, Filter), PageSize, false),
303
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
304 lookup_messages_simple(PoolName, HostType, RoomJID,
305 #rsm_in{direction = before, id = ID},
306 PageSize, Filter) ->
307
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, before_id(ID, Filter), PageSize, true),
308
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
309 lookup_messages_simple(PoolName, HostType, RoomJID,
310 #rsm_in{direction = undefined, index = Offset},
311 PageSize, Filter) ->
312 %% Apply offset
313
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter,
314 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
315
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
316 false),
317
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
318 lookup_messages_simple(PoolName, HostType, RoomJID,
319 _,
320 PageSize, Filter) ->
321
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
322
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}.
323
324 lookup_messages_last_page(PoolName, HostType, RoomJID,
325 #rsm_in{direction = before, id = undefined},
326 0, Filter) ->
327 %% Last page
328
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
329
:-(
{ok, {TotalCount, TotalCount, []}};
330 lookup_messages_last_page(PoolName, HostType, RoomJID,
331 #rsm_in{direction = before, id = undefined},
332 PageSize, Filter) ->
333 %% Last page
334
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, true),
335
:-(
MessageRowsCount = length(MessageRows),
336
:-(
case MessageRowsCount < PageSize of
337 true ->
338
:-(
{ok, {MessageRowsCount, 0,
339 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
340 false ->
341
:-(
FirstID = row_to_message_id(hd(MessageRows)),
342
:-(
Offset = calc_count(PoolName, RoomJID, HostType, before_id(FirstID, Filter)),
343
:-(
{ok, {Offset + MessageRowsCount, Offset,
344 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
345 end.
346
347 lookup_messages_by_offset(PoolName, HostType, RoomJID,
348 #rsm_in{direction = undefined, index = Offset},
349 0, Filter) when is_integer(Offset) ->
350 %% By offset
351
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
352
:-(
{ok, {TotalCount, Offset, []}};
353 lookup_messages_by_offset(PoolName, HostType, RoomJID,
354 #rsm_in{direction = undefined, index = Offset},
355 PageSize, Filter) when is_integer(Offset) ->
356 %% By offset
357
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter, Offset), %% POTENTIALLY SLOW
358
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
359 false),
360
:-(
MessageRowsCount = length(MessageRows),
361
:-(
case MessageRowsCount < PageSize of
362 true ->
363
:-(
{ok, {Offset + MessageRowsCount, Offset,
364 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
365 false ->
366
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
367
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
368
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
369 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
370 end.
371
372 lookup_messages_first_page(PoolName, HostType, RoomJID,
373 _,
374 0, Filter) ->
375 %% First page, just count
376
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
377
:-(
{ok, {TotalCount, 0, []}};
378 lookup_messages_first_page(PoolName, HostType, RoomJID,
379 _,
380 PageSize, Filter) ->
381 %% First page
382
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
383
:-(
MessageRowsCount = length(MessageRows),
384
:-(
case MessageRowsCount < PageSize of
385 true ->
386
:-(
{ok, {MessageRowsCount, 0,
387 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
388 false ->
389
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
390
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
391
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
392 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
393 end.
394
395 lookup_messages_before_id(PoolName, HostType, RoomJID,
396 RSM = #rsm_in{direction = before, id = ID},
397 PageSize, Filter) ->
398
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
399
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
400
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, to_id(ID, Filter),
401 PageSize + 1, true),
402
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
403
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
404
405 lookup_messages_after_id(PoolName, HostType, RoomJID,
406 RSM = #rsm_in{direction = aft, id = ID},
407 PageSize, Filter) ->
408
:-(
PoolName = pool_name(HostType),
409
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
410
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
411
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(ID, Filter),
412 PageSize + 1, false),
413
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
414
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
415
416
417 after_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
418
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
419
420 before_id(undefined, Filter) ->
421
:-(
Filter;
422 before_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
423
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
424
425 to_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
426
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID, BeforeID)}.
427
428 from_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
429
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID, AfterID)}.
430
431 rows_to_uniform_format(MessageRows, HostType, RoomJID) ->
432
:-(
[row_to_uniform_format(Row, HostType, RoomJID) || Row <- MessageRows].
433
434 row_to_uniform_format(#{nick_name := BNick, message := Data, id := MessID},
435 HostType, RoomJID) ->
436
:-(
SrcJID = jid:replace_resource(RoomJID, BNick),
437
:-(
Packet = stored_binary_to_packet(HostType, Data),
438
:-(
#{id => MessID, jid => SrcJID, packet => Packet}.
439
440 row_to_message_id(#{id := MsgID}) ->
441
:-(
MsgID.
442
443 -spec get_mam_muc_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
444 Acc :: ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
445 Params :: #{jid := jid:jid()},
446 Extra :: gen_hook:extra().
447 get_mam_muc_gdpr_data(Acc, #{jid := Jid}, #{host_type := HostType}) ->
448
:-(
BinJid = jid:to_binary(jid:to_lower(Jid)),
449
:-(
PoolName = pool_name(HostType),
450
:-(
FilterMap = #{from_jid => BinJid},
451
:-(
Rows = fetch_user_messages(PoolName, Jid, FilterMap),
452
:-(
Messages = [{Id, exml:to_binary(stored_binary_to_packet(HostType, Data))}
453
:-(
|| #{message := Data, id:= Id} <- Rows],
454
:-(
{ok, Messages ++ Acc}.
455
456 %% Offset is not supported
457 %% Each record is a tuple of form
458 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
459 %% Columns are `["id", "nick_name", "message"]'.
460 -spec extract_messages(PoolName, RoomJID, HostType, Filter, IMax, ReverseLimit) ->
461 [Row] when
462 PoolName :: mongoose_cassandra:pool_name(),
463 RoomJID :: jid:jid(),
464 HostType :: host_type(),
465 Filter :: filter(),
466 IMax :: pos_integer(),
467 ReverseLimit :: boolean(),
468 Row :: mongoose_cassandra:row().
469 extract_messages(_Worker, _RoomJID, _HostType, _Filter, 0, _) ->
470
:-(
[];
471 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, false) ->
472
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
473
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
474
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
475
:-(
Rows;
476 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, true) ->
477
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
478
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
479
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
480
:-(
lists:reverse(Rows).
481
482 fetch_user_messages(PoolName, UserJID, FilterMap) ->
483
:-(
QueryName = fetch_user_messages_query,
484
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
485
:-(
lists:sort(Rows).
486
487
488 %% @doc Calculate a zero-based index of the row with UID in the result test.
489 %%
490 %% If the element does not exists, the ID of the next element will
491 %% be returned instead.
492 %% @end
493 -spec calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> Count
494 when
495 PoolName :: mongoose_cassandra:pool_name(),
496 RoomJID :: jid:jid(),
497 HostType :: host_type(),
498 Filter :: filter(),
499 MessID :: message_id(),
500 Count :: non_neg_integer().
501 calc_index(PoolName, RoomJID, HostType, Filter, MessID) ->
502
:-(
calc_count(PoolName, RoomJID, HostType, to_id(MessID, Filter)).
503
504 %% @doc Count of elements in RSet before the passed element.
505 %%
506 %% The element with the passed UID can be already deleted.
507 %% @end
508 -spec calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> Count
509 when
510 PoolName :: mongoose_cassandra:pool_name(),
511 RoomJID :: jid:jid(),
512 HostType :: host_type(),
513 Filter :: filter(),
514 MessID :: message_id(),
515 Count :: non_neg_integer().
516 calc_before(PoolName, RoomJID, HostType, Filter, MessID) ->
517
:-(
calc_count(PoolName, RoomJID, HostType, before_id(MessID, Filter)).
518
519
520 %% @doc Get the total result set size.
521 %% "SELECT COUNT(*) as "count" FROM mam_muc_message WHERE "
522 -spec calc_count(PoolName, RoomJID, HostType, Filter) -> Count
523 when
524 PoolName :: mongoose_cassandra:pool_name(),
525 RoomJID :: jid:jid(),
526 HostType :: host_type(),
527 Filter :: filter(),
528 Count :: non_neg_integer().
529 calc_count(PoolName, RoomJID, _HostType, Filter) ->
530
:-(
QueryName = {calc_count_query, select_filter(Filter)},
531
:-(
Params = eval_filter_params(Filter),
532
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName,
533 Params),
534
:-(
Count.
535
536 %% @doc Convert offset to index of the first entry
537 %% Returns undefined if not there are not enough rows
538 %% Uses previously calculated offsets to speed up queries
539 -spec offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id when
540 PoolName :: mongoose_cassandra:pool_name(),
541 RoomJID :: jid:jid(),
542 Offset :: non_neg_integer(),
543 Filter :: filter(),
544 Id :: non_neg_integer() | undefined.
545 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
546 Offset =< 100 ->
547
:-(
calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset);
548 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
549
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
550 %% Try to find already calculated nearby offset to reduce query size
551
:-(
case mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, prev_offset_query, Params) of
552 {ok, []} -> %% No hints, just calculate offset sloooowly
553
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset),
554
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, 0, Offset, StartId);
555 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
556 %% Offset hint found, use it to reduce query size
557
:-(
case Offset of
558
:-(
PrevOffset -> PrevId;
559 _ ->
560
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID,
561 Filter#mam_muc_ca_filter{start_id = PrevId},
562 Offset - PrevOffset + 1),
563
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, PrevOffset, Offset, StartId)
564 end
565 end.
566
567 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
568 %% Hint is save only if previous offset hint was 50+ entires from current query
569 %% This function returns given StartId as passthrough for convenience
570 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), RoomJID :: jid:jid(),
571 Filter :: filter(), HintOffset :: non_neg_integer(),
572 NewOffset :: non_neg_integer(),
573 StartId :: non_neg_integer() | undefined) ->
574 StartId :: non_neg_integer() | undefined.
575 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
576 StartId = undefined) ->
577
:-(
StartId;
578 maybe_save_offset_hint(PoolName, RoomJID, Filter, HintOffset, NewOffset, StartId) ->
579
:-(
case abs(NewOffset - HintOffset) > 50 of
580 true ->
581
:-(
#mam_muc_ca_filter{room_jid = FRoomJID, with_nick = FWithNick} = Filter,
582
:-(
Row = #{room_jid => FRoomJID, with_nick => FWithNick,
583 offset => NewOffset, id => StartId},
584
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
585 insert_offset_hint_query, [Row]);
586 false ->
587
:-(
skip
588 end,
589
:-(
StartId.
590
591 %% @doc Convert offset to index of the first entry
592 %% Returns undefined if not there are not enough rows
593 -spec calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id
594 when
595 PoolName :: mongoose_cassandra:pool_name(),
596 RoomJID :: jid:jid(),
597 Offset :: non_neg_integer(),
598 Filter :: filter(),
599 Id :: non_neg_integer() | undefined.
600 calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
601
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
602
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
603
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
604
:-(
case RowsIds of
605
:-(
[] -> undefined;
606 [_ | _] ->
607
:-(
maps:get(id, lists:last(RowsIds))
608 end.
609
610 %% @doc Get closest offset -> message id 'hint' for specified offset
611 prev_offset_query_cql() ->
612
:-(
"SELECT id, offset FROM mam_muc_message_offset WHERE room_jid = ? and with_nick = ?"
613 " and offset <= ? LIMIT 1".
614
615 %% @doc Insert offset -> message id 'hint'
616 insert_offset_hint_query_cql() ->
617
:-(
"INSERT INTO mam_muc_message_offset(room_jid, with_nick, id, offset) VALUES(?, ?, ?, ?)".
618
619 prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID) ->
620
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
621
:-(
StartID = maybe_encode_compact_uuid(Start, 0),
622
:-(
EndID = maybe_encode_compact_uuid(End, 255),
623 %% In Cassandra, a column cannot be restricted by both an equality and an inequality relation.
624 %% When MsgID is defined, it is used as both StartID2 and EndID2 to comply with this limitation.
625 %% This means that the `ids` filter effectively overrides any "before" or "after" filters.
626
:-(
{StartID2, EndID2} = case MsgID of
627 undefined ->
628
:-(
{apply_start_border(Borders, StartID),
629 apply_end_border(Borders, EndID)};
630 ID ->
631
:-(
{ID, ID}
632 end,
633
:-(
BWithNick = maybe_nick(WithNick),
634
:-(
prepare_filter_params(BRoomJID, BWithNick, StartID2, EndID2).
635
636 prepare_filter_params(BRoomJID, BWithNick, StartID, EndID) ->
637
:-(
#mam_muc_ca_filter{
638 room_jid = BRoomJID,
639 with_nick = BWithNick,
640 start_id = StartID,
641 end_id = EndID
642 }.
643
644 eval_filter_params(#mam_muc_ca_filter{
645 room_jid = BRoomJID,
646 with_nick = BWithNick,
647 start_id = StartID,
648 end_id = EndID
649 }) ->
650
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
651 #{start_id => StartID, end_id =>EndID}),
652
:-(
maps:merge(#{room_jid => BRoomJID, with_nick => BWithNick}, Optional).
653
654 select_filter(#mam_muc_ca_filter{
655 start_id = StartID,
656 end_id = EndID
657 }) ->
658
:-(
select_filter(StartID, EndID).
659
660
661 -spec select_filter(StartID, EndID) ->
662 all | 'end' | start | start_end when
663 StartID :: integer() | undefined,
664 EndID :: integer() | undefined.
665 select_filter(undefined, undefined) ->
666
:-(
all;
667 select_filter(undefined, _) ->
668
:-(
'end';
669 select_filter(_, undefined) ->
670
:-(
start;
671 select_filter(_, _) ->
672
:-(
start_end.
673
674 prepare_filter_cql(StartID, EndID) ->
675 case StartID of
676
:-(
undefined -> "";
677
:-(
_ -> " AND id >= :start_id"
678
:-(
end ++
679 case EndID of
680
:-(
undefined -> "";
681
:-(
_ -> " AND id <= :end_id"
682 end.
683
684 filter_to_cql() ->
685
:-(
[{select_filter(StartID, EndID),
686 prepare_filter_cql(StartID, EndID)}
687
:-(
|| StartID <- [undefined, 0],
688
:-(
EndID <- [undefined, 0]].
689
690 -spec calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
691 when
692 PoolName :: mongoose_cassandra:pool_name(),
693 RoomJID :: jid:jid(),
694 HostType :: host_type(),
695 Filter :: filter(),
696 PageSize :: non_neg_integer(),
697 TotalCount :: non_neg_integer(),
698 RSM :: jlib:rsm_in() | undefined,
699 Offset :: non_neg_integer().
700 %% Requesting the Last Page in a Result Set
701 calc_offset(_W, _RoomJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
702
:-(
max(0, TC - PS);
703 calc_offset(PoolName, RoomJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
704 when is_integer(ID) ->
705
:-(
max(0, calc_before(PoolName, RoomJID, HostType, F, ID) - PS);
706 calc_offset(PoolName, RoomJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
707 when is_integer(ID) ->
708
:-(
calc_index(PoolName, RoomJID, HostType, F, ID);
709 calc_offset(_W, _RoomJID, _LS, _F, _PS, _TC, _RSM) ->
710
:-(
0.
711
712 maybe_encode_compact_uuid(undefined, _) ->
713
:-(
undefined;
714 maybe_encode_compact_uuid(Microseconds, NodeID) ->
715
:-(
encode_compact_uuid(Microseconds, NodeID).
716
717 maybe_nick(undefined) ->
718
:-(
<<>>;
719 maybe_nick(WithNick) when is_binary(WithNick) ->
720
:-(
WithNick.
721
722 %%====================================================================
723 %% Internal SQL part
724 %%====================================================================
725
726 extract_messages_queries() ->
727
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
728
:-(
|| {FilterName, Filter} <- filter_to_cql()].
729
730 extract_messages_r_queries() ->
731
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
732
:-(
|| {FilterName, Filter} <- filter_to_cql()].
733
734 calc_count_queries() ->
735
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
736
:-(
|| {FilterName, Filter} <- filter_to_cql()].
737
738 list_message_ids_queries() ->
739
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
740
:-(
|| {FilterName, Filter} <- filter_to_cql()].
741
742 extract_messages_cql(Filter) ->
743 "SELECT id, nick_name, message FROM mam_muc_message "
744
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
745 Filter ++ " ORDER BY id LIMIT ?".
746
747 extract_messages_r_cql(Filter) ->
748 "SELECT id, nick_name, message FROM mam_muc_message "
749
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
750 Filter ++ " ORDER BY id DESC LIMIT ?".
751
752 fetch_user_messages_cql() ->
753 %% attempt to order results in the next error:
754 %% "ORDER BY with 2ndary indexes is not supported."
755
:-(
"SELECT id, message FROM mam_muc_message "
756 "WHERE from_jid = ?".
757
758 calc_count_cql(Filter) ->
759 "SELECT COUNT(*) FROM mam_muc_message "
760
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter.
761
762 list_message_ids_cql(Filter) ->
763 "SELECT id FROM mam_muc_message "
764
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter ++
765 " ORDER BY id LIMIT ?".
766
767 %% ----------------------------------------------------------------------
768 %% Optimizations
769
770 packet_to_stored_binary(HostType, Packet) ->
771 %% Module implementing mam_muc_message behaviour
772
:-(
Module = db_message_format(HostType),
773
:-(
mam_message:encode(Module, Packet).
774
775 stored_binary_to_packet(HostType, Bin) ->
776 %% Module implementing mam_muc_message behaviour
777
:-(
Module = db_message_format(HostType),
778
:-(
mam_message:decode(Module, Bin).
779
780 %% ----------------------------------------------------------------------
781 %% Params getters
782
783 -spec db_message_format(HostType :: host_type()) -> module().
784 db_message_format(HostType) ->
785
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
786
787 -spec pool_name(HostType :: host_type()) -> default. %% returns mongoose_wpool:pool_name().
788 pool_name(_HostType) ->
789
:-(
default.
Line Hits Source