./ct_report/coverage/mod_mam_muc_cassandra_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_muc_cassandra_arch).
8 -behaviour(mongoose_cassandra).
9 -behaviour(ejabberd_gen_mam_archive).
10 -behaviour(gen_mod).
11
12 %% gen_mod handlers
13 -export([start/2, stop/1, hooks/1]).
14
15 %% MAM hook handlers
16 -export([archive_size/3,
17 archive_message/3,
18 lookup_messages/3,
19 remove_archive/3,
20 get_mam_muc_gdpr_data/3]).
21
22 %% mongoose_cassandra callbacks
23 -export([prepared_queries/0]).
24
25 %% ----------------------------------------------------------------------
26 %% Imports
27
28 %% UID
29 -import(mod_mam_utils,
30 [encode_compact_uuid/2]).
31
32 %% Other
33 -import(mod_mam_utils,
34 [maybe_min/2,
35 maybe_max/2,
36 apply_start_border/2,
37 apply_end_border/2]).
38
39 -include("jlib.hrl").
40 -include("mongoose_rsm.hrl").
41
42 -record(mam_muc_ca_filter, {
43 room_jid,
44 with_nick,
45 nick_name,
46 start_id,
47 end_id
48 }).
49
50 -record(mam_muc_message, {
51 id :: non_neg_integer(),
52 room_jid :: binary(),
53 from_jid :: binary() | undefined,
54 nick_name :: binary(),
55 with_nick :: binary(),
56 message :: binary() | undefined
57 }).
58
59 -callback encode(binary()) -> binary().
60 -callback decode(binary()) -> binary().
61
62 %% ----------------------------------------------------------------------
63 %% Types
64
65 -type filter() :: #mam_muc_ca_filter{}.
66 -type message_id() :: non_neg_integer().
67 -type host_type() :: mongooseim:host_type().
68
69 %% ----------------------------------------------------------------------
70 %% gen_mod callbacks
71 %% Starting and stopping functions for users' archives
72
73 -spec start(host_type(), gen_mod:module_opts()) -> ok.
74 start(_HostType, _Opts) ->
75
:-(
ok.
76
77 -spec stop(host_type()) -> ok.
78 stop(_HostType) ->
79
:-(
ok.
80
81 %% ----------------------------------------------------------------------
82 %% Add hooks for mod_mam_muc
83
84 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
85 hooks(HostType) ->
86
:-(
[{mam_muc_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50},
87 {mam_muc_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
88 {mam_muc_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
89 {mam_muc_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
90 {get_mam_muc_gdpr_data, HostType, fun ?MODULE:get_mam_muc_gdpr_data/3, #{}, 50}].
91
92 %% ----------------------------------------------------------------------
93 %% mongoose_cassandra_worker callbacks
94
95 prepared_queries() ->
96 [
97 {insert_offset_hint_query, insert_offset_hint_query_cql()},
98 {prev_offset_query, prev_offset_query_cql()},
99 {insert_query, insert_query_cql()},
100 {fetch_user_messages_query, fetch_user_messages_cql()},
101 {select_for_removal_query, select_for_removal_query_cql()},
102 {remove_archive_query, remove_archive_query_cql()},
103 {remove_archive_offsets_query, remove_archive_offsets_query_cql()}]
104
:-(
++ extract_messages_queries()
105 ++ extract_messages_r_queries()
106 ++ calc_count_queries()
107 ++ list_message_ids_queries().
108
109 %%====================================================================
110 %% Internal functions
111 %%====================================================================
112
113 %% ----------------------------------------------------------------------
114 %% Internal functions and callbacks
115
116 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
117 Acc :: integer(),
118 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
119 Extra :: gen_hook:extra().
120 archive_size(Size, #{room := RoomJID}, #{host_type := HostType}) when is_integer(Size) ->
121
:-(
PoolName = pool_name(HostType),
122
:-(
Borders = Start = End = WithNick = undefined,
123
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
124
:-(
{ok, calc_count(PoolName, RoomJID, HostType, Filter)}.
125
126
127 %% ----------------------------------------------------------------------
128 %% INSERT MESSAGE
129
130 insert_query_cql() ->
131
:-(
"INSERT INTO mam_muc_message "
132 "(id, room_jid, from_jid, nick_name, with_nick, message) "
133 "VALUES (?, ?, ?, ?, ?, ?)".
134
135 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
136 Acc :: ok | {error, term()},
137 Params :: mod_mam:archive_message_params(),
138 Extra :: gen_hook:extra().
139 archive_message(_Result, Params, #{host_type := HostType}) ->
140
:-(
try
141
:-(
{ok, archive_message2(Params, HostType)}
142 catch _Type:Reason ->
143
:-(
{ok, {error, Reason}}
144 end.
145
146 archive_message2(#{message_id := MessID,
147 local_jid := LocJID = #jid{},
148 remote_jid := FromJID = #jid{},
149 source_jid := #jid{lresource = BNick},
150 packet := Packet}, HostType) ->
151
:-(
BLocJID = mod_mam_utils:bare_jid(LocJID),
152
:-(
BFromJID = mod_mam_utils:bare_jid(FromJID),
153
:-(
BPacket = packet_to_stored_binary(HostType, Packet),
154
:-(
Messages = [#mam_muc_message{
155 id = MessID,
156 room_jid = BLocJID,
157 from_jid = BWithFromJID,
158 nick_name = BNick,
159 message = BPacket,
160 with_nick = BWithNick
161
:-(
} || {BWithNick, BWithFromJID} <- [{<<>>, BFromJID}, {BNick, <<>>}]],
162
:-(
write_messages(HostType, Messages).
163
164 write_messages(HostType, Messages) ->
165
:-(
PoolName = pool_name(HostType),
166
:-(
RoomJID = undefined,
167
:-(
MultiParams = [message_to_params(M) || M <- Messages],
168
:-(
mongoose_cassandra:cql_write_async(PoolName, RoomJID, ?MODULE, insert_query, MultiParams).
169
170 message_to_params(#mam_muc_message{
171 id = MessID,
172 room_jid = BLocJID,
173 from_jid = BFromJID,
174 nick_name = BNick,
175 with_nick = BWithNick,
176 message = BPacket
177 }) ->
178
:-(
#{id => MessID, room_jid => BLocJID, from_jid => BFromJID,
179 nick_name => BNick, with_nick => BWithNick, message => BPacket}.
180
181
182 %% ----------------------------------------------------------------------
183 %% REMOVE ARCHIVE
184
185 remove_archive_query_cql() ->
186
:-(
"DELETE FROM mam_muc_message WHERE room_jid = ? AND with_nick = ?".
187
188 remove_archive_offsets_query_cql() ->
189
:-(
"DELETE FROM mam_muc_message_offset WHERE room_jid = ? AND with_nick = ?".
190
191 select_for_removal_query_cql() ->
192
:-(
"SELECT DISTINCT room_jid, with_nick FROM mam_muc_message WHERE room_jid = ?".
193
194 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
195 Acc :: term(),
196 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
197 Extra :: gen_hook:extra().
198 remove_archive(Acc, #{room := RoomJID}, #{host_type := HostType}) ->
199
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
200
:-(
PoolName = pool_name(HostType),
201
:-(
Params = #{room_jid => BRoomJID},
202 %% Wait until deleted
203
204
:-(
DeleteFun =
205 fun(Rows, _AccIn) ->
206
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
207 remove_archive_query, Rows),
208
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
209 remove_archive_offsets_query, Rows)
210 end,
211
212
:-(
mongoose_cassandra:cql_foldl(PoolName, RoomJID, ?MODULE,
213 select_for_removal_query, Params, DeleteFun, []),
214
:-(
{ok, Acc}.
215
216 %% ----------------------------------------------------------------------
217 %% SELECT MESSAGES
218
219 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
220 Acc :: {ok, mod_mam:lookup_result()},
221 Params :: mam_iq:lookup_params(),
222 Extra :: gen_hook:extra().
223 lookup_messages({error, _Reason} = Result, _Params, _Extra) ->
224
:-(
{ok, Result};
225 lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) ->
226
:-(
{ok, {error, 'not-supported'}};
227 lookup_messages(_Result,
228 #{owner_jid := RoomJID, rsm := RSM, borders := Borders,
229 start_ts := Start, end_ts := End, with_jid := WithJID,
230 search_text := undefined, page_size := PageSize,
231 is_simple := IsSimple},
232 #{host_type := HostType}) ->
233
:-(
try
234
:-(
WithNick = maybe_jid_to_nick(WithJID),
235
:-(
PoolName = pool_name(HostType),
236
:-(
{ok, lookup_messages2(PoolName, HostType,
237 RoomJID, RSM, Borders,
238 Start, End, WithNick,
239 PageSize, IsSimple)}
240 catch _Type:Reason:Stacktrace ->
241
:-(
{ok, {error, {Reason, {stacktrace, Stacktrace}}}}
242 end.
243
244
:-(
maybe_jid_to_nick(#jid{lresource = BNick}) -> BNick;
245
:-(
maybe_jid_to_nick(undefined) -> undefined.
246
247
248 lookup_messages2(PoolName, HostType,
249 RoomJID = #jid{}, RSM, Borders,
250 Start, End, WithNick,
251 PageSize, _IsSimple = true) ->
252 %% Simple query without calculating offset and total count
253
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
254
:-(
lookup_messages_simple(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
255 lookup_messages2(PoolName, HostType,
256 RoomJID = #jid{}, RSM, Borders,
257 Start, End, WithNick,
258 PageSize, _IsSimple) ->
259 %% Query with offset calculation
260 %% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra
261 %% Not all queries are optimal. You would like to disable something for production
262 %% once you know how you will call bd
263
:-(
Strategy = rsm_to_strategy(RSM),
264
:-(
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick),
265
:-(
case Strategy of
266 last_page ->
267
:-(
lookup_messages_last_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
268 by_offset ->
269
:-(
lookup_messages_by_offset(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
270 first_page ->
271
:-(
lookup_messages_first_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
272 before_id ->
273
:-(
lookup_messages_before_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter);
274 after_id ->
275
:-(
lookup_messages_after_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter)
276 end.
277
278 rsm_to_strategy(#rsm_in{direction = before, id = undefined}) ->
279
:-(
last_page;
280 rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) ->
281
:-(
first_page;
282 rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) ->
283
:-(
by_offset;
284 rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) ->
285
:-(
before_id;
286 rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) ->
287
:-(
after_id;
288 rsm_to_strategy(#rsm_in{}) ->
289
:-(
first_page;
290 rsm_to_strategy(undefined) ->
291
:-(
first_page.
292
293 lookup_messages_simple(PoolName, HostType, RoomJID,
294 #rsm_in{direction = aft, id = ID},
295 PageSize, Filter) ->
296 %% Get last rows from result set
297
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, after_id(ID, Filter), PageSize, false),
298
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
299 lookup_messages_simple(PoolName, HostType, RoomJID,
300 #rsm_in{direction = before, id = ID},
301 PageSize, Filter) ->
302
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, before_id(ID, Filter), PageSize, true),
303
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
304 lookup_messages_simple(PoolName, HostType, RoomJID,
305 #rsm_in{direction = undefined, index = Offset},
306 PageSize, Filter) ->
307 %% Apply offset
308
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter,
309 Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :)
310
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
311 false),
312
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
313 lookup_messages_simple(PoolName, HostType, RoomJID,
314 _,
315 PageSize, Filter) ->
316
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
317
:-(
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}.
318
319 lookup_messages_last_page(PoolName, HostType, RoomJID,
320 #rsm_in{direction = before, id = undefined},
321 0, Filter) ->
322 %% Last page
323
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
324
:-(
{ok, {TotalCount, TotalCount, []}};
325 lookup_messages_last_page(PoolName, HostType, RoomJID,
326 #rsm_in{direction = before, id = undefined},
327 PageSize, Filter) ->
328 %% Last page
329
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, true),
330
:-(
MessageRowsCount = length(MessageRows),
331
:-(
case MessageRowsCount < PageSize of
332 true ->
333
:-(
{ok, {MessageRowsCount, 0,
334 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
335 false ->
336
:-(
FirstID = row_to_message_id(hd(MessageRows)),
337
:-(
Offset = calc_count(PoolName, RoomJID, HostType, before_id(FirstID, Filter)),
338
:-(
{ok, {Offset + MessageRowsCount, Offset,
339 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
340 end.
341
342 lookup_messages_by_offset(PoolName, HostType, RoomJID,
343 #rsm_in{direction = undefined, index = Offset},
344 0, Filter) when is_integer(Offset) ->
345 %% By offset
346
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
347
:-(
{ok, {TotalCount, Offset, []}};
348 lookup_messages_by_offset(PoolName, HostType, RoomJID,
349 #rsm_in{direction = undefined, index = Offset},
350 PageSize, Filter) when is_integer(Offset) ->
351 %% By offset
352
:-(
StartId = offset_to_start_id(PoolName, RoomJID, Filter, Offset), %% POTENTIALLY SLOW
353
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize,
354 false),
355
:-(
MessageRowsCount = length(MessageRows),
356
:-(
case MessageRowsCount < PageSize of
357 true ->
358
:-(
{ok, {Offset + MessageRowsCount, Offset,
359 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
360 false ->
361
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
362
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
363
:-(
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset,
364 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
365 end.
366
367 lookup_messages_first_page(PoolName, HostType, RoomJID,
368 _,
369 0, Filter) ->
370 %% First page, just count
371
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
372
:-(
{ok, {TotalCount, 0, []}};
373 lookup_messages_first_page(PoolName, HostType, RoomJID,
374 _,
375 PageSize, Filter) ->
376 %% First page
377
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false),
378
:-(
MessageRowsCount = length(MessageRows),
379
:-(
case MessageRowsCount < PageSize of
380 true ->
381
:-(
{ok, {MessageRowsCount, 0,
382 rows_to_uniform_format(MessageRows, HostType, RoomJID)}};
383 false ->
384
:-(
LastID = row_to_message_id(lists:last(MessageRows)),
385
:-(
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)),
386
:-(
{ok, {MessageRowsCount + CountAfterLastID, 0,
387 rows_to_uniform_format(MessageRows, HostType, RoomJID)}}
388 end.
389
390 lookup_messages_before_id(PoolName, HostType, RoomJID,
391 RSM = #rsm_in{direction = before, id = ID},
392 PageSize, Filter) ->
393
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
394
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
395
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, to_id(ID, Filter),
396 PageSize + 1, true),
397
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
398
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
399
400 lookup_messages_after_id(PoolName, HostType, RoomJID,
401 RSM = #rsm_in{direction = aft, id = ID},
402 PageSize, Filter) ->
403
:-(
PoolName = pool_name(HostType),
404
:-(
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter),
405
:-(
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM),
406
:-(
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(ID, Filter),
407 PageSize + 1, false),
408
:-(
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)},
409
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
410
411
412 after_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
413
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID + 1, AfterID)}.
414
415 before_id(undefined, Filter) ->
416
:-(
Filter;
417 before_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
418
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}.
419
420 to_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) ->
421
:-(
Filter#mam_muc_ca_filter{end_id = maybe_min(ID, BeforeID)}.
422
423 from_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) ->
424
:-(
Filter#mam_muc_ca_filter{start_id = maybe_max(ID, AfterID)}.
425
426 rows_to_uniform_format(MessageRows, HostType, RoomJID) ->
427
:-(
[row_to_uniform_format(Row, HostType, RoomJID) || Row <- MessageRows].
428
429 row_to_uniform_format(#{nick_name := BNick, message := Data, id := MessID},
430 HostType, RoomJID) ->
431
:-(
SrcJID = jid:replace_resource(RoomJID, BNick),
432
:-(
Packet = stored_binary_to_packet(HostType, Data),
433
:-(
#{id => MessID, jid => SrcJID, packet => Packet}.
434
435 row_to_message_id(#{id := MsgID}) ->
436
:-(
MsgID.
437
438 -spec get_mam_muc_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
439 Acc :: ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
440 Params :: #{jid := jid:jid()},
441 Extra :: gen_hook:extra().
442 get_mam_muc_gdpr_data(Acc, #{jid := Jid}, #{host_type := HostType}) ->
443
:-(
BinJid = jid:to_binary(jid:to_lower(Jid)),
444
:-(
PoolName = pool_name(HostType),
445
:-(
FilterMap = #{from_jid => BinJid},
446
:-(
Rows = fetch_user_messages(PoolName, Jid, FilterMap),
447
:-(
Messages = [{Id, exml:to_binary(stored_binary_to_packet(HostType, Data))}
448
:-(
|| #{message := Data, id:= Id} <- Rows],
449
:-(
{ok, Messages ++ Acc}.
450
451 %% Offset is not supported
452 %% Each record is a tuple of form
453 %% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'.
454 %% Columns are `["id", "nick_name", "message"]'.
455 -spec extract_messages(PoolName, RoomJID, HostType, Filter, IMax, ReverseLimit) ->
456 [Row] when
457 PoolName :: mongoose_cassandra:pool_name(),
458 RoomJID :: jid:jid(),
459 HostType :: host_type(),
460 Filter :: filter(),
461 IMax :: pos_integer(),
462 ReverseLimit :: boolean(),
463 Row :: mongoose_cassandra:row().
464 extract_messages(_Worker, _RoomJID, _HostType, _Filter, 0, _) ->
465
:-(
[];
466 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, false) ->
467
:-(
QueryName = {extract_messages_query, select_filter(Filter)},
468
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
469
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
470
:-(
Rows;
471 extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, true) ->
472
:-(
QueryName = {extract_messages_r_query, select_filter(Filter)},
473
:-(
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)),
474
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
475
:-(
lists:reverse(Rows).
476
477 fetch_user_messages(PoolName, UserJID, FilterMap) ->
478
:-(
QueryName = fetch_user_messages_query,
479
:-(
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap),
480
:-(
lists:sort(Rows).
481
482
483 %% @doc Calculate a zero-based index of the row with UID in the result test.
484 %%
485 %% If the element does not exists, the ID of the next element will
486 %% be returned instead.
487 %% @end
488 -spec calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> Count
489 when
490 PoolName :: mongoose_cassandra:pool_name(),
491 RoomJID :: jid:jid(),
492 HostType :: host_type(),
493 Filter :: filter(),
494 MessID :: message_id(),
495 Count :: non_neg_integer().
496 calc_index(PoolName, RoomJID, HostType, Filter, MessID) ->
497
:-(
calc_count(PoolName, RoomJID, HostType, to_id(MessID, Filter)).
498
499 %% @doc Count of elements in RSet before the passed element.
500 %%
501 %% The element with the passed UID can be already deleted.
502 %% @end
503 -spec calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> Count
504 when
505 PoolName :: mongoose_cassandra:pool_name(),
506 RoomJID :: jid:jid(),
507 HostType :: host_type(),
508 Filter :: filter(),
509 MessID :: message_id(),
510 Count :: non_neg_integer().
511 calc_before(PoolName, RoomJID, HostType, Filter, MessID) ->
512
:-(
calc_count(PoolName, RoomJID, HostType, before_id(MessID, Filter)).
513
514
515 %% @doc Get the total result set size.
516 %% "SELECT COUNT(*) as "count" FROM mam_muc_message WHERE "
517 -spec calc_count(PoolName, RoomJID, HostType, Filter) -> Count
518 when
519 PoolName :: mongoose_cassandra:pool_name(),
520 RoomJID :: jid:jid(),
521 HostType :: host_type(),
522 Filter :: filter(),
523 Count :: non_neg_integer().
524 calc_count(PoolName, RoomJID, _HostType, Filter) ->
525
:-(
QueryName = {calc_count_query, select_filter(Filter)},
526
:-(
Params = eval_filter_params(Filter),
527
:-(
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName,
528 Params),
529
:-(
Count.
530
531 %% @doc Convert offset to index of the first entry
532 %% Returns undefined if not there are not enough rows
533 %% Uses previously calculated offsets to speed up queries
534 -spec offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id when
535 PoolName :: mongoose_cassandra:pool_name(),
536 RoomJID :: jid:jid(),
537 Offset :: non_neg_integer(),
538 Filter :: filter(),
539 Id :: non_neg_integer() | undefined.
540 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0,
541 Offset =< 100 ->
542
:-(
calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset);
543 offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
544
:-(
Params = maps:put(offset, Offset, eval_filter_params(Filter)),
545 %% Try to find already calculated nearby offset to reduce query size
546
:-(
case mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, prev_offset_query, Params) of
547 {ok, []} -> %% No hints, just calculate offset sloooowly
548
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset),
549
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, 0, Offset, StartId);
550 {ok, [#{offset := PrevOffset, id := PrevId}]} ->
551 %% Offset hint found, use it to reduce query size
552
:-(
case Offset of
553
:-(
PrevOffset -> PrevId;
554 _ ->
555
:-(
StartId = calc_offset_to_start_id(PoolName, RoomJID,
556 Filter#mam_muc_ca_filter{start_id = PrevId},
557 Offset - PrevOffset + 1),
558
:-(
maybe_save_offset_hint(PoolName, RoomJID, Filter, PrevOffset, Offset, StartId)
559 end
560 end.
561
562 %% @doc Saves offset hint for future use in order to speed up queries with similar offset
563 %% Hint is save only if previous offset hint was 50+ entires from current query
564 %% This function returns given StartId as passthrough for convenience
565 -spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), RoomJID :: jid:jid(),
566 Filter :: filter(), HintOffset :: non_neg_integer(),
567 NewOffset :: non_neg_integer(),
568 StartId :: non_neg_integer() | undefined) ->
569 StartId :: non_neg_integer() | undefined.
570 maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset,
571 StartId = undefined) ->
572
:-(
StartId;
573 maybe_save_offset_hint(PoolName, RoomJID, Filter, HintOffset, NewOffset, StartId) ->
574
:-(
case abs(NewOffset - HintOffset) > 50 of
575 true ->
576
:-(
#mam_muc_ca_filter{room_jid = FRoomJID, with_nick = FWithNick} = Filter,
577
:-(
Row = #{room_jid => FRoomJID, with_nick => FWithNick,
578 offset => NewOffset, id => StartId},
579
:-(
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE,
580 insert_offset_hint_query, [Row]);
581 false ->
582
:-(
skip
583 end,
584
:-(
StartId.
585
586 %% @doc Convert offset to index of the first entry
587 %% Returns undefined if not there are not enough rows
588 -spec calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id
589 when
590 PoolName :: mongoose_cassandra:pool_name(),
591 RoomJID :: jid:jid(),
592 Offset :: non_neg_integer(),
593 Filter :: filter(),
594 Id :: non_neg_integer() | undefined.
595 calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 ->
596
:-(
QueryName = {list_message_ids_query, select_filter(Filter)},
597
:-(
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)),
598
:-(
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params),
599
:-(
case RowsIds of
600
:-(
[] -> undefined;
601 [_ | _] ->
602
:-(
maps:get(id, lists:last(RowsIds))
603 end.
604
605 %% @doc Get closest offset -> message id 'hint' for specified offset
606 prev_offset_query_cql() ->
607
:-(
"SELECT id, offset FROM mam_muc_message_offset WHERE room_jid = ? and with_nick = ?"
608 " and offset <= ? LIMIT 1".
609
610 %% @doc Insert offset -> message id 'hint'
611 insert_offset_hint_query_cql() ->
612
:-(
"INSERT INTO mam_muc_message_offset(room_jid, with_nick, id, offset) VALUES(?, ?, ?, ?)".
613
614 prepare_filter(RoomJID, Borders, Start, End, WithNick) ->
615
:-(
BRoomJID = mod_mam_utils:bare_jid(RoomJID),
616
:-(
StartID = maybe_encode_compact_uuid(Start, 0),
617
:-(
EndID = maybe_encode_compact_uuid(End, 255),
618
:-(
StartID2 = apply_start_border(Borders, StartID),
619
:-(
EndID2 = apply_end_border(Borders, EndID),
620
:-(
BWithNick = maybe_nick(WithNick),
621
:-(
prepare_filter_params(BRoomJID, BWithNick, StartID2, EndID2).
622
623 prepare_filter_params(BRoomJID, BWithNick, StartID, EndID) ->
624
:-(
#mam_muc_ca_filter{
625 room_jid = BRoomJID,
626 with_nick = BWithNick,
627 start_id = StartID,
628 end_id = EndID
629 }.
630
631 eval_filter_params(#mam_muc_ca_filter{
632 room_jid = BRoomJID,
633 with_nick = BWithNick,
634 start_id = StartID,
635 end_id = EndID
636 }) ->
637
:-(
Optional = maps:filter(fun(_K, V) -> V =/= undefined end,
638 #{start_id => StartID, end_id =>EndID}),
639
:-(
maps:merge(#{room_jid => BRoomJID, with_nick => BWithNick}, Optional).
640
641 select_filter(#mam_muc_ca_filter{
642 start_id = StartID,
643 end_id = EndID
644 }) ->
645
:-(
select_filter(StartID, EndID).
646
647
648 -spec select_filter(StartID, EndID) ->
649 all | 'end' | start | start_end when
650 StartID :: integer() | undefined,
651 EndID :: integer() | undefined.
652 select_filter(undefined, undefined) ->
653
:-(
all;
654 select_filter(undefined, _) ->
655
:-(
'end';
656 select_filter(_, undefined) ->
657
:-(
start;
658 select_filter(_, _) ->
659
:-(
start_end.
660
661 prepare_filter_cql(StartID, EndID) ->
662 case StartID of
663
:-(
undefined -> "";
664
:-(
_ -> " AND id >= :start_id"
665
:-(
end ++
666 case EndID of
667
:-(
undefined -> "";
668
:-(
_ -> " AND id <= :end_id"
669 end.
670
671 filter_to_cql() ->
672
:-(
[{select_filter(StartID, EndID),
673 prepare_filter_cql(StartID, EndID)}
674
:-(
|| StartID <- [undefined, 0],
675
:-(
EndID <- [undefined, 0]].
676
677 -spec calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset
678 when
679 PoolName :: mongoose_cassandra:pool_name(),
680 RoomJID :: jid:jid(),
681 HostType :: host_type(),
682 Filter :: filter(),
683 PageSize :: non_neg_integer(),
684 TotalCount :: non_neg_integer(),
685 RSM :: jlib:rsm_in() | undefined,
686 Offset :: non_neg_integer().
687 %% Requesting the Last Page in a Result Set
688 calc_offset(_W, _RoomJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) ->
689
:-(
max(0, TC - PS);
690 calc_offset(PoolName, RoomJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID})
691 when is_integer(ID) ->
692
:-(
max(0, calc_before(PoolName, RoomJID, HostType, F, ID) - PS);
693 calc_offset(PoolName, RoomJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID})
694 when is_integer(ID) ->
695
:-(
calc_index(PoolName, RoomJID, HostType, F, ID);
696 calc_offset(_W, _RoomJID, _LS, _F, _PS, _TC, _RSM) ->
697
:-(
0.
698
699 maybe_encode_compact_uuid(undefined, _) ->
700
:-(
undefined;
701 maybe_encode_compact_uuid(Microseconds, NodeID) ->
702
:-(
encode_compact_uuid(Microseconds, NodeID).
703
704 maybe_nick(undefined) ->
705
:-(
<<>>;
706 maybe_nick(WithNick) when is_binary(WithNick) ->
707
:-(
WithNick.
708
709 %%====================================================================
710 %% Internal SQL part
711 %%====================================================================
712
713 extract_messages_queries() ->
714
:-(
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)}
715
:-(
|| {FilterName, Filter} <- filter_to_cql()].
716
717 extract_messages_r_queries() ->
718
:-(
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)}
719
:-(
|| {FilterName, Filter} <- filter_to_cql()].
720
721 calc_count_queries() ->
722
:-(
[{{calc_count_query, FilterName}, calc_count_cql(Filter)}
723
:-(
|| {FilterName, Filter} <- filter_to_cql()].
724
725 list_message_ids_queries() ->
726
:-(
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)}
727
:-(
|| {FilterName, Filter} <- filter_to_cql()].
728
729 extract_messages_cql(Filter) ->
730 "SELECT id, nick_name, message FROM mam_muc_message "
731
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
732 Filter ++ " ORDER BY id LIMIT ?".
733
734 extract_messages_r_cql(Filter) ->
735 "SELECT id, nick_name, message FROM mam_muc_message "
736
:-(
"WHERE room_jid = ? AND with_nick = ? " ++
737 Filter ++ " ORDER BY id DESC LIMIT ?".
738
739 fetch_user_messages_cql() ->
740 %% attempt to order results in the next error:
741 %% "ORDER BY with 2ndary indexes is not supported."
742
:-(
"SELECT id, message FROM mam_muc_message "
743 "WHERE from_jid = ?".
744
745 calc_count_cql(Filter) ->
746 "SELECT COUNT(*) FROM mam_muc_message "
747
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter.
748
749 list_message_ids_cql(Filter) ->
750 "SELECT id FROM mam_muc_message "
751
:-(
"WHERE room_jid = ? AND with_nick = ? " ++ Filter ++
752 " ORDER BY id LIMIT ?".
753
754 %% ----------------------------------------------------------------------
755 %% Optimizations
756
757 packet_to_stored_binary(HostType, Packet) ->
758 %% Module implementing mam_muc_message behaviour
759
:-(
Module = db_message_format(HostType),
760
:-(
mam_message:encode(Module, Packet).
761
762 stored_binary_to_packet(HostType, Bin) ->
763 %% Module implementing mam_muc_message behaviour
764
:-(
Module = db_message_format(HostType),
765
:-(
mam_message:decode(Module, Bin).
766
767 %% ----------------------------------------------------------------------
768 %% Params getters
769
770 -spec db_message_format(HostType :: host_type()) -> module().
771 db_message_format(HostType) ->
772
:-(
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format).
773
774 -spec pool_name(HostType :: host_type()) -> default. %% returns mongoose_wpool:pool_name().
775 pool_name(_HostType) ->
776
:-(
default.
Line Hits Source