./ct_report/coverage/mod_mam_muc_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_muc_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11
12 %% gen_mod handlers
13 -export([start/2, stop/1, hooks/1]).
14
15 %% MAM hook handlers
16 -export([archive_size/3,
17 archive_message/3,
18 lookup_messages/3,
19 remove_archive/3,
20 get_mam_muc_gdpr_data/3]).
21
22 %% mongoose_cassandra callbacks
23 -export([prepared_queries/0]).
24
25 %% ----------------------------------------------------------------------
26 %% Imports
27
28 %% UID
29 -import(mod_mam_utils,
30 [encode_compact_uuid/2]).
31
32 %% Other
33 -import(mod_mam_utils,
34 [maybe_min/2,
35 maybe_max/2,
36 apply_start_border/2,
37 apply_end_border/2]).
38
39 -include("jlib.hrl").
40 -include("mongoose_rsm.hrl").
41
42 -record(mam_muc_ca_filter, {
43 room_jid,
44 with_nick,
45 nick_name,
46 start_id,
47 end_id
48 }).
49
50 -record(mam_muc_message, {
51 id :: non_neg_integer(),
52 room_jid :: binary(),
53 from_jid :: binary() | undefined,
54 nick_name :: binary(),
55 with_nick :: binary(),
56 message :: binary() | undefined
57 }).
58
59 -callback encode(binary()) -> binary().
60 -callback decode(binary()) -> binary().
61
62 %% ----------------------------------------------------------------------
63 %% Types
64
65 -type filter() :: #mam_muc_ca_filter{}.
66 -type message_id() :: non_neg_integer().
67 -type host_type() :: mongooseim:host_type().
68
69 %% ----------------------------------------------------------------------
70 %% gen_mod callbacks
71 %% Starting and stopping functions for users' archives
72
73 -spec start(host_type(), gen_mod:module_opts()) -> ok.
74 start(_HostType, _Opts) ->
75 10 ok.
76
77 -spec stop(host_type()) -> ok.
78 stop(_HostType) ->
79 10 ok.
80
81 %% ----------------------------------------------------------------------
82 %% Add hooks for mod_mam_muc
83
84 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
85 hooks(HostType) ->
86 20 [{mam_muc_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
87 {mam_muc_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
88 {mam_muc_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
89 {mam_muc_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
90 {get_mam_muc_gdpr_data, HostType, fun ?MODULE:get_mam_muc_gdpr_data/3, #{}, 50}].
91
92 %% ----------------------------------------------------------------------
93 %% mongoose_cassandra_worker callbacks
94
95 prepared_queries() ->
96 [
97 {insert_offset_hint_query, insert_offset_hint_query_cql()},
98 {prev_offset_query, prev_offset_query_cql()},
99 {insert_query, insert_query_cql()},
100 {fetch_user_messages_query, fetch_user_messages_cql()},
101 {select_for_removal_query, select_for_removal_query_cql()},
102 {remove_archive_query, remove_archive_query_cql()},
103 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}]
104 683 ++ extract_messages_queries()
105 ++ extract_messages_r_queries()
106 ++ calc_count_queries()
107 ++ list_message_ids_queries().
108
109 %%====================================================================
110 %% Internal functions
111 %%====================================================================
112
113 %% ----------------------------------------------------------------------
114 %% Internal functions and callbacks
115
116 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
117 Acc :: integer(),
118 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
119 Extra :: gen_hook:extra().
120 archive_size(Size, #{room := RoomJID}, #{host_type := HostType}) when is_integer(Size) ->
121 50 PoolName = pool_name(HostType),
122 50 Borders = Start = End = WithNick = undefined,
123 50 Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, undefined),
124 50 {ok, calc_count(PoolName, RoomJID, HostType, Filter)}.
125
126
127 %% ----------------------------------------------------------------------
128 %% INSERT MESSAGE
129
130 insert_query_cql() ->
131 683 "INSERT INTO mam_muc_message "
132 "(id, room_jid, from_jid, nick_name, with_nick, message) "
133 "VALUES (?, ?, ?, ?, ?, ?)".
134
135 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
136 Acc :: ok | {error, term()},
137 Params :: mod_mam:archive_message_params(),
138 Extra :: gen_hook:extra().
139 archive_message(_Result, Params, #{host_type := HostType}) ->
140 334 try
141 334 {ok, archive_message2(Params, HostType)}
142 catch _Type:Reason ->
143
:-(
{ok, {error, Reason}}
144 end.
145
146 archive_message2(#{message_id := MessID,
147 local_jid := LocJID = #jid{},
148 remote_jid := FromJID = #jid{},
149 source_jid := #jid{lresource = BNick},
150 packet := Packet}, HostType) ->
151 334 BLocJID = mod_mam_utils:bare_jid(LocJID),
152 334 BFromJID = mod_mam_utils:bare_jid(FromJID),
153 334 BPacket = packet_to_stored_binary(HostType, Packet),
154 334 Messages = [#mam_muc_message{
155 id = MessID,
156 room_jid = BLocJID,
157 from_jid = BWithFromJID,
158 nick_name = BNick,
159 message = BPacket,
160 with_nick = BWithNick
161 334 } || {BWithNick, BWithFromJID} <- [{<<>>, BFromJID}, {BNick, <<>>}]],
162 334 write_messages(HostType, Messages).
163
164 write_messages(HostType, Messages) ->
165 334 PoolName = pool_name(HostType),
166 334 RoomJID = undefined,
167 334 MultiParams = [message_to_params(M) || M <- Messages],
168 334 mongoose_cassandra:cql_write_async(PoolName, RoomJID, ?MODULE, insert_query, MultiParams).
169
170 message_to_params(#mam_muc_message{
171 id = MessID,
172 room_jid = BLocJID,
173 from_jid = BFromJID,
174 nick_name = BNick,
175 with_nick = BWithNick,
176 message = BPacket
177 }) ->
178 668 #{id => MessID, room_jid => BLocJID, from_jid => BFromJID,
179 nick_name => BNick, with_nick => BWithNick, message => BPacket}.
180
181
182 %% ----------------------------------------------------------------------
183 %% REMOVE ARCHIVE
184
185 remove_archive_query_cql() ->
186 683 "DELETE FROM mam_muc_message WHERE room_jid = ? AND with_nick = ?".
187
188 remove_archive_offsets_query_cql() ->
189 683 "DELETE FROM mam_muc_message_offset WHERE room_jid = ? AND with_nick = ?".
190
191 select_for_removal_query_cql() ->
192 683 "SELECT DISTINCT room_jid, with_nick FROM mam_muc_message WHERE room_jid = ?".
193
194 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
195 Acc :: term(),
196 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
197 Extra :: gen_hook:extra().
198 remove_archive(Acc, #{room := RoomJID}, #{host_type := HostType}) ->
199 67 BRoomJID = mod_mam_utils:bare_jid(RoomJID),
200 67 PoolName = pool_name(HostType),
201 67 Params = #{room_jid => BRoomJID},
202 %% Wait until deleted
203
204 67 DeleteFun =
205 fun(Rows, _AccIn) ->
206 67 mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
207 remove_archive_query, Rows),
208 67 mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
209 remove_archive_offsets_query, Rows)
210 end,
211
212 67 mongoose_cassandra:cql_foldl(PoolName, RoomJID, ?MODULE,
213 select_for_removal_query, Params, DeleteFun, []),
214 67 {ok, Acc}.
215
216 %% ----------------------------------------------------------------------
217 %% SELECT MESSAGES
218
219 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
220 Acc :: {ok, mod_mam:lookup_result()},
221 Params :: mam_iq:lookup_params(),
222 Extra :: gen_hook:extra().
223 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
224
:-(
{ok, Result};
225 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
226
:-(
{ok, {error, 'not-supported'}};
227 lookup_messages(_Result,
228 #{owner_jid := RoomJID, rsm := RSM, borders := Borders,
229 start_ts := Start, end_ts := End, with_jid := WithJID,
230 search_text := undefined, page_size := PageSize,
231 is_simple := IsSimple, message_id := MsgID},
232 #{host_type := HostType}) ->
233 63 try
234 63 WithNick = maybe_jid_to_nick(WithJID),
235 63 PoolName = pool_name(HostType),
236 63 {ok, lookup_messages2(PoolName, HostType,
237 RoomJID, RSM, Borders,
238 Start, End, WithNick,
239 PageSize, MsgID, IsSimple)}
240 catch _Type:Reason:Stacktrace ->
241
:-(
{ok, {error, {Reason, {stacktrace, Stacktrace}}}}
242 end.
243
244 2 maybe_jid_to_nick(#jid{lresource = BNick}) -> BNick;
245 61 maybe_jid_to_nick(undefined) -> undefined.
246
247
248 lookup_messages2(PoolName, HostType,
249 RoomJID = #jid{}, RSM, Borders,
250 Start, End, WithNick,
251 PageSize, MsgID, _IsSimple = true) ->
252 %% Simple query without calculating offset and total count
253 15 Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID),
254 15 lookup_messages_simple(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
255 lookup_messages2(PoolName, HostType,
256 RoomJID = #jid{}, RSM, Borders,
257 Start, End, WithNick,
258 PageSize, MsgID, _IsSimple) ->
259 %% Query with offset calculation
260 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
261 %% Not all queries are optimal. You would like to disable something for production
262 %% once you know how you will call bd
263 48 Strategy = rsm_to_strategy(RSM),
264 48 Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID),
265 48 case Strategy of
266 last_page ->
267 6 lookup_messages_last_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
268 by_offset ->
269 2 lookup_messages_by_offset(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
270 first_page ->
271 36 lookup_messages_first_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
272 before_id ->
273 2 lookup_messages_before_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
274 after_id ->
275 2 lookup_messages_after_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter)
276 end.
277
278 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
279 6 last_page;
280 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
281
:-(
first_page;
282 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
283 2 by_offset;
284 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
285 2 before_id;
286 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
287 2 after_id;
288 rsm_to_strategy(#rsm_in{}) ->
289 5 first_page;
290 rsm_to_strategy(undefined) ->
291 31 first_page.
292
293 lookup_messages_simple(PoolName, HostType, RoomJID,
294 #rsm_in{direction = aft, id = ID},
295 PageSize, Filter) ->
296 %% Get last rows from result set
297 5 MessageRows = extract_messages(PoolName, RoomJID, HostType, after_id(ID, Filter), PageSize, false),
298 5 {ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
299 lookup_messages_simple(PoolName, HostType, RoomJID,
300 #rsm_in{direction = before, id = ID},
301 PageSize, Filter) ->
302 7 MessageRows = extract_messages(PoolName, RoomJID, HostType, before_id(ID, Filter), PageSize, true),
303 7 {ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
304 lookup_messages_simple(PoolName, HostType, RoomJID,
305 #rsm_in{direction = undefined, index = Offset},
306 PageSize, Filter) ->
307 %% Apply offset
308
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter,
309 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
310
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
311 false),
312
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
313 lookup_messages_simple(PoolName, HostType, RoomJID,
314 _,
315 PageSize, Filter) ->
316 3 MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
317 3 {ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}.
318
319 lookup_messages_last_page(PoolName, HostType, RoomJID,
320 #rsm_in{direction = before, id = undefined},
321 0, Filter) ->
322 %% Last page
323 1 TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
324 1 {ok, {TotalCount, TotalCount, []}};
325 lookup_messages_last_page(PoolName, HostType, RoomJID,
326 #rsm_in{direction = before, id = undefined},
327 PageSize, Filter) ->
328 %% Last page
329 5 MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, true),
330 5 MessageRowsCount = length(MessageRows),
331 5 case MessageRowsCount < PageSize of
332 true ->
333
:-(
{ok, {MessageRowsCount, 0,
334 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
335 false ->
336 5 FirstID = row_to_message_id(hd(MessageRows)),
337 5 Offset = calc_count(PoolName, RoomJID, HostType, before_id(FirstID, Filter)),
338 5 {ok, {Offset + MessageRowsCount, Offset,
339 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
340 end.
341
342 lookup_messages_by_offset(PoolName, HostType, RoomJID,
343 #rsm_in{direction = undefined, index = Offset},
344 0, Filter) when is_integer(Offset) ->
345 %% By offset
346 1 TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
347 1 {ok, {TotalCount, Offset, []}};
348 lookup_messages_by_offset(PoolName, HostType, RoomJID,
349 #rsm_in{direction = undefined, index = Offset},
350 PageSize, Filter) when is_integer(Offset) ->
351 %% By offset
352 1 StartId = offset_to_start_id(PoolName, RoomJID, Filter, Offset), %% POTENTIALLY SLOW
353 1 MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
354 false),
355 1 MessageRowsCount = length(MessageRows),
356 1 case MessageRowsCount < PageSize of
357 true ->
358
:-(
{ok, {Offset + MessageRowsCount, Offset,
359 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
360 false ->
361 1 LastID = row_to_message_id(lists:last(MessageRows)),
362 1 CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
363 1 {ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
364 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
365 end.
366
367 lookup_messages_first_page(PoolName, HostType, RoomJID,
368 _,
369 0, Filter) ->
370 %% First page, just count
371 2 TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
372 2 {ok, {TotalCount, 0, []}};
373 lookup_messages_first_page(PoolName, HostType, RoomJID,
374 _,
375 PageSize, Filter) ->
376 %% First page
377 34 MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
378 34 MessageRowsCount = length(MessageRows),
379 34 case MessageRowsCount < PageSize of
380 true ->
381 31 {ok, {MessageRowsCount, 0,
382 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
383 false ->
384 3 LastID = row_to_message_id(lists:last(MessageRows)),
385 3 CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
386 3 {ok, {MessageRowsCount + CountAfterLastID, 0,
387 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
388 end.
389
390 lookup_messages_before_id(PoolName, HostType, RoomJID,
391 RSM = #rsm_in{direction = before, id = ID},
392 PageSize, Filter) ->
393 2 TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
394 2 Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
395 2 MessageRows = extract_messages(PoolName, RoomJID, HostType, to_id(ID, Filter),
396 PageSize + 1, true),
397 2 Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
398 2 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
399
400 lookup_messages_after_id(PoolName, HostType, RoomJID,
401 RSM = #rsm_in{direction = aft, id = ID},
402 PageSize, Filter) ->
403 2 PoolName = pool_name(HostType),
404 2 TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
405 2 Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
406 2 MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(ID, Filter),
407 PageSize + 1, false),
408 2 Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
409 2 mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
410
411
412 after_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
413 9 Filter#mam_muc_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
414
415 before_id(undefined, Filter) ->
416 2 Filter;
417 before_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
418 12 Filter#mam_muc_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
419
420 to_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
421 4 Filter#mam_muc_ca_filter{end_id = maybe_min(ID, BeforeID)}.
422
423 from_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
424 3 Filter#mam_muc_ca_filter{start_id = maybe_max(ID, AfterID)}.
425
426 rows_to_uniform_format(MessageRows, HostType, RoomJID) ->
427 59 [row_to_uniform_format(Row, HostType, RoomJID) || Row <- MessageRows].
428
429 row_to_uniform_format(#{nick_name := BNick, message := Data, id := MessID},
430 HostType, RoomJID) ->
431 243 SrcJID = jid:replace_resource(RoomJID, BNick),
432 243 Packet = stored_binary_to_packet(HostType, Data),
433 243 #{id => MessID, jid => SrcJID, packet => Packet}.
434
435 row_to_message_id(#{id := MsgID}) ->
436 9 MsgID.
437
438 -spec get_mam_muc_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
439 Acc :: ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
440 Params :: #{jid := jid:jid()},
441 Extra :: gen_hook:extra().
442 get_mam_muc_gdpr_data(Acc, #{jid := Jid}, #{host_type := HostType}) ->
443 17 BinJid = jid:to_binary(jid:to_lower(Jid)),
444 17 PoolName = pool_name(HostType),
445 17 FilterMap = #{from_jid => BinJid},
446 17 Rows = fetch_user_messages(PoolName, Jid, FilterMap),
447 17 Messages = [{Id, exml:to_binary(stored_binary_to_packet(HostType, Data))}
448 17 || #{message := Data, id:= Id} <- Rows],
449 17 {ok, Messages ++ Acc}.
450
451 %% Offset is not supported
452 %% Each record is a tuple of form
453 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
454 %% Columns are `["id", "nick_name", "message"]'.
455 -spec extract_messages(PoolName, RoomJID, HostType, Filter, IMax, ReverseLimit) ->
456 [Row] when
457 PoolName :: mongoose_cassandra:pool_name(),
458 RoomJID :: jid:jid(),
459 HostType :: host_type(),
460 Filter :: filter(),
461 IMax :: pos_integer(),
462 ReverseLimit :: boolean(),
463 Row :: mongoose_cassandra:row().
464 extract_messages(_Worker, _RoomJID, _HostType, _Filter, 0, _) ->
465
:-(
[];
466 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, false) ->
467 45 QueryName = {extract_messages_query, select_filter(Filter)},
468 45 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
469 45 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
470 45 Rows;
471 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, true) ->
472 14 QueryName = {extract_messages_r_query, select_filter(Filter)},
473 14 Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
474 14 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
475 14 lists:reverse(Rows).
476
477 fetch_user_messages(PoolName, UserJID, FilterMap) ->
478 17 QueryName = fetch_user_messages_query,
479 17 {ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
480 17 lists:sort(Rows).
481
482
483 %% @doc Calculate a zero-based index of the row with UID in the result test.
484 %%
485 %% If the element does not exists, the ID of the next element will
486 %% be returned instead.
487 %% @end
488 -spec calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> Count
489 when
490 PoolName :: mongoose_cassandra:pool_name(),
491 RoomJID :: jid:jid(),
492 HostType :: host_type(),
493 Filter :: filter(),
494 MessID :: message_id(),
495 Count :: non_neg_integer().
496 calc_index(PoolName, RoomJID, HostType, Filter, MessID) ->
497 2 calc_count(PoolName, RoomJID, HostType, to_id(MessID, Filter)).
498
499 %% @doc Count of elements in RSet before the passed element.
500 %%
501 %% The element with the passed UID can be already deleted.
502 %% @end
503 -spec calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> Count
504 when
505 PoolName :: mongoose_cassandra:pool_name(),
506 RoomJID :: jid:jid(),
507 HostType :: host_type(),
508 Filter :: filter(),
509 MessID :: message_id(),
510 Count :: non_neg_integer().
511 calc_before(PoolName, RoomJID, HostType, Filter, MessID) ->
512 2 calc_count(PoolName, RoomJID, HostType, before_id(MessID, Filter)).
513
514
515 %% @doc Get the total result set size.
516 %% "SELECT COUNT(*) as "count" FROM mam_muc_message WHERE "
517 -spec calc_count(PoolName, RoomJID, HostType, Filter) -> Count
518 when
519 PoolName :: mongoose_cassandra:pool_name(),
520 RoomJID :: jid:jid(),
521 HostType :: host_type(),
522 Filter :: filter(),
523 Count :: non_neg_integer().
524 calc_count(PoolName, RoomJID, _HostType, Filter) ->
525 71 QueryName = {calc_count_query, select_filter(Filter)},
526 71 Params = eval_filter_params(Filter),
527 71 {ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName,
528 Params),
529 71 Count.
530
531 %% @doc Convert offset to index of the first entry
532 %% Returns undefined if not there are not enough rows
533 %% Uses previously calculated offsets to speed up queries
534 -spec offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id when
535 PoolName :: mongoose_cassandra:pool_name(),
536 RoomJID :: jid:jid(),
537 Offset :: non_neg_integer(),
538 Filter :: filter(),
539 Id :: non_neg_integer() | undefined.
540 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
541 Offset =< 100 ->
542 1 calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset);
543 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
544
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
545 %% Try to find already calculated nearby offset to reduce query size
546
:-(
case mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, prev_offset_query, Params) of
547 {ok, []} -> %% No hints, just calculate offset sloooowly
548
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset),
549
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, 0, Offset, StartId);
550 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
551 %% Offset hint found, use it to reduce query size
552
:-(
case Offset of
553
:-(
PrevOffset -> PrevId;
554 _ ->
555
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID,
556 Filter#mam_muc_ca_filter{start_id = PrevId},
557 Offset - PrevOffset + 1),
558
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, PrevOffset, Offset, StartId)
559 end
560 end.
561
562 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
563 %% Hint is save only if previous offset hint was 50+ entires from current query
564 %% This function returns given StartId as passthrough for convenience
565 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), RoomJID :: jid:jid(),
566 Filter :: filter(), HintOffset :: non_neg_integer(),
567 NewOffset :: non_neg_integer(),
568 StartId :: non_neg_integer() | undefined) ->
569 StartId :: non_neg_integer() | undefined.
570 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
571 StartId = undefined) ->
572
:-(
StartId;
573 maybe_save_offset_hint(PoolName, RoomJID, Filter, HintOffset, NewOffset, StartId) ->
574
:-(
case abs(NewOffset - HintOffset) > 50 of
575 true ->
576
:-(
#mam_muc_ca_filter{room_jid = FRoomJID, with_nick = FWithNick} = Filter,
577
:-(
Row = #{room_jid => FRoomJID, with_nick => FWithNick,
578 offset => NewOffset, id => StartId},
579
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
580 insert_offset_hint_query, [Row]);
581 false ->
582
:-(
skip
583 end,
584
:-(
StartId.
585
586 %% @doc Convert offset to index of the first entry
587 %% Returns undefined if not there are not enough rows
588 -spec calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id
589 when
590 PoolName :: mongoose_cassandra:pool_name(),
591 RoomJID :: jid:jid(),
592 Offset :: non_neg_integer(),
593 Filter :: filter(),
594 Id :: non_neg_integer() | undefined.
595 calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
596 1 QueryName = {list_message_ids_query, select_filter(Filter)},
597 1 Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
598 1 {ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
599 1 case RowsIds of
600
:-(
[] -> undefined;
601 [_ | _] ->
602 1 maps:get(id, lists:last(RowsIds))
603 end.
604
605 %% @doc Get closest offset -> message id 'hint' for specified offset
606 prev_offset_query_cql() ->
607 683 "SELECT id, offset FROM mam_muc_message_offset WHERE room_jid = ? and with_nick = ?"
608 " and offset <= ? LIMIT 1".
609
610 %% @doc Insert offset -> message id 'hint'
611 insert_offset_hint_query_cql() ->
612 683 "INSERT INTO mam_muc_message_offset(room_jid, with_nick, id, offset) VALUES(?, ?, ?, ?)".
613
614 prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID) ->
615 113 BRoomJID = mod_mam_utils:bare_jid(RoomJID),
616 113 StartID = maybe_encode_compact_uuid(Start, 0),
617 113 EndID = maybe_encode_compact_uuid(End, 255),
618 %% In Cassandra, a column cannot be restricted by both an equality and an inequality relation.
619 %% When MsgID is defined, it is used as both StartID2 and EndID2 to comply with this limitation.
620 %% This means that the `ids` filter effectively overrides any "before" or "after" filters.
621 113 {StartID2, EndID2} = case MsgID of
622 undefined ->
623 106 {apply_start_border(Borders, StartID),
624 apply_end_border(Borders, EndID)};
625 ID ->
626 7 {ID, ID}
627 end,
628 113 BWithNick = maybe_nick(WithNick),
629 113 prepare_filter_params(BRoomJID, BWithNick, StartID2, EndID2).
630
631 prepare_filter_params(BRoomJID, BWithNick, StartID, EndID) ->
632 113 #mam_muc_ca_filter{
633 room_jid = BRoomJID,
634 with_nick = BWithNick,
635 start_id = StartID,
636 end_id = EndID
637 }.
638
639 eval_filter_params(#mam_muc_ca_filter{
640 room_jid = BRoomJID,
641 with_nick = BWithNick,
642 start_id = StartID,
643 end_id = EndID
644 }) ->
645 131 Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
646 #{start_id => StartID, end_id =>EndID}),
647 131 maps:merge(#{room_jid => BRoomJID, with_nick => BWithNick}, Optional).
648
649 select_filter(#mam_muc_ca_filter{
650 start_id = StartID,
651 end_id = EndID
652 }) ->
653 131 select_filter(StartID, EndID).
654
655
656 -spec select_filter(StartID, EndID) ->
657 all | 'end' | start | start_end when
658 StartID :: integer() | undefined,
659 EndID :: integer() | undefined.
660 select_filter(undefined, undefined) ->
661 2825 all;
662 select_filter(undefined, _) ->
663 2744 'end';
664 select_filter(_, undefined) ->
665 2745 start;
666 select_filter(_, _) ->
667 2745 start_end.
668
669 prepare_filter_cql(StartID, EndID) ->
670 case StartID of
671 5464 undefined -> "";
672 5464 _ -> " AND id >= :start_id"
673 10928 end ++
674 case EndID of
675 5464 undefined -> "";
676 5464 _ -> " AND id <= :end_id"
677 end.
678
679 filter_to_cql() ->
680 2732 [{select_filter(StartID, EndID),
681 prepare_filter_cql(StartID, EndID)}
682 2732 || StartID <- [undefined, 0],
683 5464 EndID <- [undefined, 0]].
684
685 -spec calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
686 when
687 PoolName :: mongoose_cassandra:pool_name(),
688 RoomJID :: jid:jid(),
689 HostType :: host_type(),
690 Filter :: filter(),
691 PageSize :: non_neg_integer(),
692 TotalCount :: non_neg_integer(),
693 RSM :: jlib:rsm_in() | undefined,
694 Offset :: non_neg_integer().
695 %% Requesting the Last Page in a Result Set
696 calc_offset(_W, _RoomJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
697
:-(
max(0, TC - PS);
698 calc_offset(PoolName, RoomJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
699 when is_integer(ID) ->
700 2 max(0, calc_before(PoolName, RoomJID, HostType, F, ID) - PS);
701 calc_offset(PoolName, RoomJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
702 when is_integer(ID) ->
703 2 calc_index(PoolName, RoomJID, HostType, F, ID);
704 calc_offset(_W, _RoomJID, _LS, _F, _PS, _TC, _RSM) ->
705
:-(
0.
706
707 maybe_encode_compact_uuid(undefined, _) ->
708 226 undefined;
709 maybe_encode_compact_uuid(Microseconds, NodeID) ->
710
:-(
encode_compact_uuid(Microseconds, NodeID).
711
712 maybe_nick(undefined) ->
713 111 <<>>;
714 maybe_nick(WithNick) when is_binary(WithNick) ->
715 2 WithNick.
716
717 %%====================================================================
718 %% Internal SQL part
719 %%====================================================================
720
721 extract_messages_queries() ->
722 683 [{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
723 683 || {FilterName, Filter} <- filter_to_cql()].
724
725 extract_messages_r_queries() ->
726 683 [{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
727 683 || {FilterName, Filter} <- filter_to_cql()].
728
729 calc_count_queries() ->
730 683 [{{calc_count_query, FilterName}, calc_count_cql(Filter)}
731 683 || {FilterName, Filter} <- filter_to_cql()].
732
733 list_message_ids_queries() ->
734 683 [{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
735 683 || {FilterName, Filter} <- filter_to_cql()].
736
737 extract_messages_cql(Filter) ->
738 "SELECT id, nick_name, message FROM mam_muc_message "
739 2732 "WHERE room_jid = ? AND with_nick = ? " ++
740 Filter ++ " ORDER BY id LIMIT ?".
741
742 extract_messages_r_cql(Filter) ->
743 "SELECT id, nick_name, message FROM mam_muc_message "
744 2732 "WHERE room_jid = ? AND with_nick = ? " ++
745 Filter ++ " ORDER BY id DESC LIMIT ?".
746
747 fetch_user_messages_cql() ->
748 %% attempt to order results in the next error:
749 %% "ORDER BY with 2ndary indexes is not supported."
750 683 "SELECT id, message FROM mam_muc_message "
751 "WHERE from_jid = ?".
752
753 calc_count_cql(Filter) ->
754 "SELECT COUNT(*) FROM mam_muc_message "
755 2732 "WHERE room_jid = ? AND with_nick = ? " ++ Filter.
756
757 list_message_ids_cql(Filter) ->
758 "SELECT id FROM mam_muc_message "
759 2732 "WHERE room_jid = ? AND with_nick = ? " ++ Filter ++
760 " ORDER BY id LIMIT ?".
761
762 %% ----------------------------------------------------------------------
763 %% Optimizations
764
765 packet_to_stored_binary(HostType, Packet) ->
766 %% Module implementing mam_muc_message behaviour
767 334 Module = db_message_format(HostType),
768 334 mam_message:encode(Module, Packet).
769
770 stored_binary_to_packet(HostType, Bin) ->
771 %% Module implementing mam_muc_message behaviour
772 256 Module = db_message_format(HostType),
773 256 mam_message:decode(Module, Bin).
774
775 %% ----------------------------------------------------------------------
776 %% Params getters
777
778 -spec db_message_format(HostType :: host_type()) -> module().
779 db_message_format(HostType) ->
780 590 gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
781
782 -spec pool_name(HostType :: host_type()) -> default. %% returns mongoose_wpool:pool_name().
783 pool_name(_HostType) ->
784 533 default.
Line Hits Source