1 |
|
%%%------------------------------------------------------------------- |
2 |
|
%%% @author Uvarov Michael <arcusfelis@gmail.com> |
3 |
|
%%% @copyright (C) 2013, Uvarov Michael |
4 |
|
%%% @doc RDBMS backend for Message Archive Management. |
5 |
|
%%% @end |
6 |
|
%%%------------------------------------------------------------------- |
7 |
|
-module(mod_mam_muc_cassandra_arch). |
8 |
|
-behaviour(mongoose_cassandra). |
9 |
|
-behaviour(ejabberd_gen_mam_archive). |
10 |
|
-behaviour(gen_mod). |
11 |
|
|
12 |
|
%% gen_mod handlers |
13 |
|
-export([start/2, stop/1, hooks/1]). |
14 |
|
|
15 |
|
%% MAM hook handlers |
16 |
|
-export([archive_size/3, |
17 |
|
archive_message/3, |
18 |
|
lookup_messages/3, |
19 |
|
remove_archive/3, |
20 |
|
get_mam_muc_gdpr_data/3]). |
21 |
|
|
22 |
|
%% mongoose_cassandra callbacks |
23 |
|
-export([prepared_queries/0]). |
24 |
|
|
25 |
|
%% ---------------------------------------------------------------------- |
26 |
|
%% Imports |
27 |
|
|
28 |
|
%% UID |
29 |
|
-import(mod_mam_utils, |
30 |
|
[encode_compact_uuid/2]). |
31 |
|
|
32 |
|
%% Other |
33 |
|
-import(mod_mam_utils, |
34 |
|
[maybe_min/2, |
35 |
|
maybe_max/2, |
36 |
|
apply_start_border/2, |
37 |
|
apply_end_border/2]). |
38 |
|
|
39 |
|
-include("mongoose.hrl"). |
40 |
|
-include("jlib.hrl"). |
41 |
|
-include("mongoose_rsm.hrl"). |
42 |
|
|
43 |
|
-record(mam_muc_ca_filter, { |
44 |
|
room_jid, |
45 |
|
with_nick, |
46 |
|
nick_name, |
47 |
|
start_id, |
48 |
|
end_id |
49 |
|
}). |
50 |
|
|
51 |
|
-record(mam_muc_message, { |
52 |
|
id :: non_neg_integer(), |
53 |
|
room_jid :: binary(), |
54 |
|
from_jid :: binary() | undefined, |
55 |
|
nick_name :: binary(), |
56 |
|
with_nick :: binary(), |
57 |
|
message :: binary() | undefined |
58 |
|
}). |
59 |
|
|
60 |
|
-callback encode(binary()) -> binary(). |
61 |
|
-callback decode(binary()) -> binary(). |
62 |
|
|
63 |
|
%% ---------------------------------------------------------------------- |
64 |
|
%% Types |
65 |
|
|
66 |
|
-type filter() :: #mam_muc_ca_filter{}. |
67 |
|
-type message_id() :: non_neg_integer(). |
68 |
|
-type host_type() :: mongooseim:host_type(). |
69 |
|
|
70 |
|
%% ---------------------------------------------------------------------- |
71 |
|
%% gen_mod callbacks |
72 |
|
%% Starting and stopping functions for users' archives |
73 |
|
|
74 |
|
-spec start(host_type(), gen_mod:module_opts()) -> ok. |
75 |
|
start(_HostType, _Opts) -> |
76 |
:-( |
ok. |
77 |
|
|
78 |
|
-spec stop(host_type()) -> ok. |
79 |
|
stop(_HostType) -> |
80 |
:-( |
ok. |
81 |
|
|
82 |
|
%% ---------------------------------------------------------------------- |
83 |
|
%% Add hooks for mod_mam_muc |
84 |
|
|
85 |
|
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). |
86 |
|
hooks(HostType) -> |
87 |
:-( |
[{mam_muc_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50}, |
88 |
|
{mam_muc_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50}, |
89 |
|
{mam_muc_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50}, |
90 |
|
{mam_muc_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50}, |
91 |
|
{get_mam_muc_gdpr_data, HostType, fun ?MODULE:get_mam_muc_gdpr_data/3, #{}, 50}]. |
92 |
|
|
93 |
|
%% ---------------------------------------------------------------------- |
94 |
|
%% mongoose_cassandra_worker callbacks |
95 |
|
|
96 |
|
prepared_queries() -> |
97 |
|
[ |
98 |
|
{insert_offset_hint_query, insert_offset_hint_query_cql()}, |
99 |
|
{prev_offset_query, prev_offset_query_cql()}, |
100 |
|
{insert_query, insert_query_cql()}, |
101 |
|
{fetch_user_messages_query, fetch_user_messages_cql()}, |
102 |
|
{select_for_removal_query, select_for_removal_query_cql()}, |
103 |
|
{remove_archive_query, remove_archive_query_cql()}, |
104 |
|
{remove_archive_offsets_query, remove_archive_offsets_query_cql()}] |
105 |
:-( |
++ extract_messages_queries() |
106 |
|
++ extract_messages_r_queries() |
107 |
|
++ calc_count_queries() |
108 |
|
++ list_message_ids_queries(). |
109 |
|
|
110 |
|
%%==================================================================== |
111 |
|
%% Internal functions |
112 |
|
%%==================================================================== |
113 |
|
|
114 |
|
%% ---------------------------------------------------------------------- |
115 |
|
%% Internal functions and callbacks |
116 |
|
|
117 |
|
-spec archive_size(Acc, Params, Extra) -> {ok, Acc} when |
118 |
|
Acc :: integer(), |
119 |
|
Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()}, |
120 |
|
Extra :: gen_hook:extra(). |
121 |
|
archive_size(Size, #{room := RoomJID}, #{host_type := HostType}) when is_integer(Size) -> |
122 |
:-( |
PoolName = pool_name(HostType), |
123 |
:-( |
Borders = Start = End = WithNick = undefined, |
124 |
:-( |
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, undefined), |
125 |
:-( |
{ok, calc_count(PoolName, RoomJID, HostType, Filter)}. |
126 |
|
|
127 |
|
|
128 |
|
%% ---------------------------------------------------------------------- |
129 |
|
%% INSERT MESSAGE |
130 |
|
|
131 |
|
insert_query_cql() -> |
132 |
:-( |
"INSERT INTO mam_muc_message " |
133 |
|
"(id, room_jid, from_jid, nick_name, with_nick, message) " |
134 |
|
"VALUES (?, ?, ?, ?, ?, ?)". |
135 |
|
|
136 |
|
-spec archive_message(Acc, Params, Extra) -> {ok, Acc} when |
137 |
|
Acc :: ok | {error, term()}, |
138 |
|
Params :: mod_mam:archive_message_params(), |
139 |
|
Extra :: gen_hook:extra(). |
140 |
|
archive_message(_Result, Params, #{host_type := HostType}) -> |
141 |
:-( |
try |
142 |
:-( |
{ok, archive_message2(Params, HostType)} |
143 |
|
catch _Type:Reason:StackTrace -> |
144 |
:-( |
mongoose_instrument:execute(mod_mam_muc_dropped, #{host_type => HostType}, #{count => 1}), |
145 |
:-( |
?LOG_ERROR(#{what => archive_muc_message_failed, |
146 |
|
host_type => HostType, mam_params => Params, |
147 |
:-( |
reason => Reason, stacktrace => StackTrace}), |
148 |
:-( |
{ok, {error, Reason}} |
149 |
|
end. |
150 |
|
|
151 |
|
archive_message2(#{message_id := MessID, |
152 |
|
local_jid := LocJID = #jid{}, |
153 |
|
remote_jid := FromJID = #jid{}, |
154 |
|
source_jid := #jid{lresource = BNick}, |
155 |
|
packet := Packet}, HostType) -> |
156 |
:-( |
BLocJID = mod_mam_utils:bare_jid(LocJID), |
157 |
:-( |
BFromJID = mod_mam_utils:bare_jid(FromJID), |
158 |
:-( |
BPacket = packet_to_stored_binary(HostType, Packet), |
159 |
:-( |
Messages = [#mam_muc_message{ |
160 |
|
id = MessID, |
161 |
|
room_jid = BLocJID, |
162 |
|
from_jid = BWithFromJID, |
163 |
|
nick_name = BNick, |
164 |
|
message = BPacket, |
165 |
|
with_nick = BWithNick |
166 |
:-( |
} || {BWithNick, BWithFromJID} <- [{<<>>, BFromJID}, {BNick, <<>>}]], |
167 |
:-( |
ok = write_messages(HostType, Messages). |
168 |
|
|
169 |
|
write_messages(HostType, Messages) -> |
170 |
:-( |
PoolName = pool_name(HostType), |
171 |
:-( |
RoomJID = undefined, |
172 |
:-( |
MultiParams = [message_to_params(M) || M <- Messages], |
173 |
:-( |
mongoose_cassandra:cql_write_async(PoolName, RoomJID, ?MODULE, insert_query, MultiParams). |
174 |
|
|
175 |
|
message_to_params(#mam_muc_message{ |
176 |
|
id = MessID, |
177 |
|
room_jid = BLocJID, |
178 |
|
from_jid = BFromJID, |
179 |
|
nick_name = BNick, |
180 |
|
with_nick = BWithNick, |
181 |
|
message = BPacket |
182 |
|
}) -> |
183 |
:-( |
#{id => MessID, room_jid => BLocJID, from_jid => BFromJID, |
184 |
|
nick_name => BNick, with_nick => BWithNick, message => BPacket}. |
185 |
|
|
186 |
|
|
187 |
|
%% ---------------------------------------------------------------------- |
188 |
|
%% REMOVE ARCHIVE |
189 |
|
|
190 |
|
remove_archive_query_cql() -> |
191 |
:-( |
"DELETE FROM mam_muc_message WHERE room_jid = ? AND with_nick = ?". |
192 |
|
|
193 |
|
remove_archive_offsets_query_cql() -> |
194 |
:-( |
"DELETE FROM mam_muc_message_offset WHERE room_jid = ? AND with_nick = ?". |
195 |
|
|
196 |
|
select_for_removal_query_cql() -> |
197 |
:-( |
"SELECT DISTINCT room_jid, with_nick FROM mam_muc_message WHERE room_jid = ?". |
198 |
|
|
199 |
|
-spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when |
200 |
|
Acc :: term(), |
201 |
|
Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()}, |
202 |
|
Extra :: gen_hook:extra(). |
203 |
|
remove_archive(Acc, #{room := RoomJID}, #{host_type := HostType}) -> |
204 |
:-( |
BRoomJID = mod_mam_utils:bare_jid(RoomJID), |
205 |
:-( |
PoolName = pool_name(HostType), |
206 |
:-( |
Params = #{room_jid => BRoomJID}, |
207 |
|
%% Wait until deleted |
208 |
|
|
209 |
:-( |
DeleteFun = |
210 |
|
fun(Rows, _AccIn) -> |
211 |
:-( |
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE, |
212 |
|
remove_archive_query, Rows), |
213 |
:-( |
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE, |
214 |
|
remove_archive_offsets_query, Rows) |
215 |
|
end, |
216 |
|
|
217 |
:-( |
mongoose_cassandra:cql_foldl(PoolName, RoomJID, ?MODULE, |
218 |
|
select_for_removal_query, Params, DeleteFun, []), |
219 |
:-( |
{ok, Acc}. |
220 |
|
|
221 |
|
%% ---------------------------------------------------------------------- |
222 |
|
%% SELECT MESSAGES |
223 |
|
|
224 |
|
-spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when |
225 |
|
Acc :: {ok, mod_mam:lookup_result()}, |
226 |
|
Params :: mam_iq:lookup_params(), |
227 |
|
Extra :: gen_hook:extra(). |
228 |
|
lookup_messages({error, _Reason} = Result, _Params, _Extra) -> |
229 |
:-( |
{ok, Result}; |
230 |
|
lookup_messages(_Result, #{search_text := <<_/binary>>}, _Extra) -> |
231 |
:-( |
{ok, {error, 'not-supported'}}; |
232 |
|
lookup_messages(_Result, |
233 |
|
#{owner_jid := RoomJID, rsm := RSM, borders := Borders, |
234 |
|
start_ts := Start, end_ts := End, with_jid := WithJID, |
235 |
|
search_text := undefined, page_size := PageSize, |
236 |
|
is_simple := IsSimple, message_id := MsgID}, |
237 |
|
#{host_type := HostType}) -> |
238 |
:-( |
try |
239 |
:-( |
WithNick = maybe_jid_to_nick(WithJID), |
240 |
:-( |
PoolName = pool_name(HostType), |
241 |
:-( |
{ok, lookup_messages2(PoolName, HostType, |
242 |
|
RoomJID, RSM, Borders, |
243 |
|
Start, End, WithNick, |
244 |
|
PageSize, MsgID, IsSimple)} |
245 |
|
catch _Type:Reason:Stacktrace -> |
246 |
:-( |
{ok, {error, {Reason, {stacktrace, Stacktrace}}}} |
247 |
|
end. |
248 |
|
|
249 |
:-( |
maybe_jid_to_nick(#jid{lresource = BNick}) -> BNick; |
250 |
:-( |
maybe_jid_to_nick(undefined) -> undefined. |
251 |
|
|
252 |
|
|
253 |
|
lookup_messages2(PoolName, HostType, |
254 |
|
RoomJID = #jid{}, RSM, Borders, |
255 |
|
Start, End, WithNick, |
256 |
|
PageSize, MsgID, _IsSimple = true) -> |
257 |
|
%% Simple query without calculating offset and total count |
258 |
:-( |
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID), |
259 |
:-( |
lookup_messages_simple(PoolName, HostType, RoomJID, RSM, PageSize, Filter); |
260 |
|
lookup_messages2(PoolName, HostType, |
261 |
|
RoomJID = #jid{}, RSM, Borders, |
262 |
|
Start, End, WithNick, |
263 |
|
PageSize, MsgID, _IsSimple) -> |
264 |
|
%% Query with offset calculation |
265 |
|
%% We cannot just use RDBMS code because "LIMIT X, Y" is not supported by cassandra |
266 |
|
%% Not all queries are optimal. You would like to disable something for production |
267 |
|
%% once you know how you will call bd |
268 |
:-( |
Strategy = rsm_to_strategy(RSM), |
269 |
:-( |
Filter = prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID), |
270 |
:-( |
case Strategy of |
271 |
|
last_page -> |
272 |
:-( |
lookup_messages_last_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter); |
273 |
|
by_offset -> |
274 |
:-( |
lookup_messages_by_offset(PoolName, HostType, RoomJID, RSM, PageSize, Filter); |
275 |
|
first_page -> |
276 |
:-( |
lookup_messages_first_page(PoolName, HostType, RoomJID, RSM, PageSize, Filter); |
277 |
|
before_id -> |
278 |
:-( |
lookup_messages_before_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter); |
279 |
|
after_id -> |
280 |
:-( |
lookup_messages_after_id(PoolName, HostType, RoomJID, RSM, PageSize, Filter) |
281 |
|
end. |
282 |
|
|
283 |
|
rsm_to_strategy(#rsm_in{direction = before, id = undefined}) -> |
284 |
:-( |
last_page; |
285 |
|
rsm_to_strategy(#rsm_in{direction = undefined, index = 0}) -> |
286 |
:-( |
first_page; |
287 |
|
rsm_to_strategy(#rsm_in{direction = undefined, index = Offset}) when is_integer(Offset) -> |
288 |
:-( |
by_offset; |
289 |
|
rsm_to_strategy(#rsm_in{direction = before, id = Id}) when is_integer(Id) -> |
290 |
:-( |
before_id; |
291 |
|
rsm_to_strategy(#rsm_in{direction = aft, id = Id}) when is_integer(Id) -> |
292 |
:-( |
after_id; |
293 |
|
rsm_to_strategy(#rsm_in{}) -> |
294 |
:-( |
first_page; |
295 |
|
rsm_to_strategy(undefined) -> |
296 |
:-( |
first_page. |
297 |
|
|
298 |
|
lookup_messages_simple(PoolName, HostType, RoomJID, |
299 |
|
#rsm_in{direction = aft, id = ID}, |
300 |
|
PageSize, Filter) -> |
301 |
|
%% Get last rows from result set |
302 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, after_id(ID, Filter), PageSize, false), |
303 |
:-( |
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}; |
304 |
|
lookup_messages_simple(PoolName, HostType, RoomJID, |
305 |
|
#rsm_in{direction = before, id = ID}, |
306 |
|
PageSize, Filter) -> |
307 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, before_id(ID, Filter), PageSize, true), |
308 |
:-( |
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}; |
309 |
|
lookup_messages_simple(PoolName, HostType, RoomJID, |
310 |
|
#rsm_in{direction = undefined, index = Offset}, |
311 |
|
PageSize, Filter) -> |
312 |
|
%% Apply offset |
313 |
:-( |
StartId = offset_to_start_id(PoolName, RoomJID, Filter, |
314 |
|
Offset), %% POTENTIALLY SLOW AND NOT SIMPLE :) |
315 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize, |
316 |
|
false), |
317 |
:-( |
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}; |
318 |
|
lookup_messages_simple(PoolName, HostType, RoomJID, |
319 |
|
_, |
320 |
|
PageSize, Filter) -> |
321 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false), |
322 |
:-( |
{ok, {undefined, undefined, rows_to_uniform_format(MessageRows, HostType, RoomJID)}}. |
323 |
|
|
324 |
|
lookup_messages_last_page(PoolName, HostType, RoomJID, |
325 |
|
#rsm_in{direction = before, id = undefined}, |
326 |
|
0, Filter) -> |
327 |
|
%% Last page |
328 |
:-( |
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter), |
329 |
:-( |
{ok, {TotalCount, TotalCount, []}}; |
330 |
|
lookup_messages_last_page(PoolName, HostType, RoomJID, |
331 |
|
#rsm_in{direction = before, id = undefined}, |
332 |
|
PageSize, Filter) -> |
333 |
|
%% Last page |
334 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, true), |
335 |
:-( |
MessageRowsCount = length(MessageRows), |
336 |
:-( |
case MessageRowsCount < PageSize of |
337 |
|
true -> |
338 |
:-( |
{ok, {MessageRowsCount, 0, |
339 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID)}}; |
340 |
|
false -> |
341 |
:-( |
FirstID = row_to_message_id(hd(MessageRows)), |
342 |
:-( |
Offset = calc_count(PoolName, RoomJID, HostType, before_id(FirstID, Filter)), |
343 |
:-( |
{ok, {Offset + MessageRowsCount, Offset, |
344 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID)}} |
345 |
|
end. |
346 |
|
|
347 |
|
lookup_messages_by_offset(PoolName, HostType, RoomJID, |
348 |
|
#rsm_in{direction = undefined, index = Offset}, |
349 |
|
0, Filter) when is_integer(Offset) -> |
350 |
|
%% By offset |
351 |
:-( |
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter), |
352 |
:-( |
{ok, {TotalCount, Offset, []}}; |
353 |
|
lookup_messages_by_offset(PoolName, HostType, RoomJID, |
354 |
|
#rsm_in{direction = undefined, index = Offset}, |
355 |
|
PageSize, Filter) when is_integer(Offset) -> |
356 |
|
%% By offset |
357 |
:-( |
StartId = offset_to_start_id(PoolName, RoomJID, Filter, Offset), %% POTENTIALLY SLOW |
358 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(StartId, Filter), PageSize, |
359 |
|
false), |
360 |
:-( |
MessageRowsCount = length(MessageRows), |
361 |
:-( |
case MessageRowsCount < PageSize of |
362 |
|
true -> |
363 |
:-( |
{ok, {Offset + MessageRowsCount, Offset, |
364 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID)}}; |
365 |
|
false -> |
366 |
:-( |
LastID = row_to_message_id(lists:last(MessageRows)), |
367 |
:-( |
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)), |
368 |
:-( |
{ok, {Offset + MessageRowsCount + CountAfterLastID, Offset, |
369 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID)}} |
370 |
|
end. |
371 |
|
|
372 |
|
lookup_messages_first_page(PoolName, HostType, RoomJID, |
373 |
|
_, |
374 |
|
0, Filter) -> |
375 |
|
%% First page, just count |
376 |
:-( |
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter), |
377 |
:-( |
{ok, {TotalCount, 0, []}}; |
378 |
|
lookup_messages_first_page(PoolName, HostType, RoomJID, |
379 |
|
_, |
380 |
|
PageSize, Filter) -> |
381 |
|
%% First page |
382 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, Filter, PageSize, false), |
383 |
:-( |
MessageRowsCount = length(MessageRows), |
384 |
:-( |
case MessageRowsCount < PageSize of |
385 |
|
true -> |
386 |
:-( |
{ok, {MessageRowsCount, 0, |
387 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID)}}; |
388 |
|
false -> |
389 |
:-( |
LastID = row_to_message_id(lists:last(MessageRows)), |
390 |
:-( |
CountAfterLastID = calc_count(PoolName, RoomJID, HostType, after_id(LastID, Filter)), |
391 |
:-( |
{ok, {MessageRowsCount + CountAfterLastID, 0, |
392 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID)}} |
393 |
|
end. |
394 |
|
|
395 |
|
lookup_messages_before_id(PoolName, HostType, RoomJID, |
396 |
|
RSM = #rsm_in{direction = before, id = ID}, |
397 |
|
PageSize, Filter) -> |
398 |
:-( |
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter), |
399 |
:-( |
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM), |
400 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, to_id(ID, Filter), |
401 |
|
PageSize + 1, true), |
402 |
:-( |
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)}, |
403 |
:-( |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result). |
404 |
|
|
405 |
|
lookup_messages_after_id(PoolName, HostType, RoomJID, |
406 |
|
RSM = #rsm_in{direction = aft, id = ID}, |
407 |
|
PageSize, Filter) -> |
408 |
:-( |
PoolName = pool_name(HostType), |
409 |
:-( |
TotalCount = calc_count(PoolName, RoomJID, HostType, Filter), |
410 |
:-( |
Offset = calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM), |
411 |
:-( |
MessageRows = extract_messages(PoolName, RoomJID, HostType, from_id(ID, Filter), |
412 |
|
PageSize + 1, false), |
413 |
:-( |
Result = {TotalCount, Offset, rows_to_uniform_format(MessageRows, HostType, RoomJID)}, |
414 |
:-( |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result). |
415 |
|
|
416 |
|
|
417 |
|
after_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) -> |
418 |
:-( |
Filter#mam_muc_ca_filter{start_id = maybe_max(ID + 1, AfterID)}. |
419 |
|
|
420 |
|
before_id(undefined, Filter) -> |
421 |
:-( |
Filter; |
422 |
|
before_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) -> |
423 |
:-( |
Filter#mam_muc_ca_filter{end_id = maybe_min(ID - 1, BeforeID)}. |
424 |
|
|
425 |
|
to_id(ID, Filter = #mam_muc_ca_filter{end_id = BeforeID}) -> |
426 |
:-( |
Filter#mam_muc_ca_filter{end_id = maybe_min(ID, BeforeID)}. |
427 |
|
|
428 |
|
from_id(ID, Filter = #mam_muc_ca_filter{start_id = AfterID}) -> |
429 |
:-( |
Filter#mam_muc_ca_filter{start_id = maybe_max(ID, AfterID)}. |
430 |
|
|
431 |
|
rows_to_uniform_format(MessageRows, HostType, RoomJID) -> |
432 |
:-( |
[row_to_uniform_format(Row, HostType, RoomJID) || Row <- MessageRows]. |
433 |
|
|
434 |
|
row_to_uniform_format(#{nick_name := BNick, message := Data, id := MessID}, |
435 |
|
HostType, RoomJID) -> |
436 |
:-( |
SrcJID = jid:replace_resource(RoomJID, BNick), |
437 |
:-( |
Packet = stored_binary_to_packet(HostType, Data), |
438 |
:-( |
#{id => MessID, jid => SrcJID, packet => Packet}. |
439 |
|
|
440 |
|
row_to_message_id(#{id := MsgID}) -> |
441 |
:-( |
MsgID. |
442 |
|
|
443 |
|
-spec get_mam_muc_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when |
444 |
|
Acc :: ejabberd_gen_mam_archive:mam_muc_gdpr_data(), |
445 |
|
Params :: #{jid := jid:jid()}, |
446 |
|
Extra :: gen_hook:extra(). |
447 |
|
get_mam_muc_gdpr_data(Acc, #{jid := Jid}, #{host_type := HostType}) -> |
448 |
:-( |
BinJid = jid:to_binary(jid:to_lower(Jid)), |
449 |
:-( |
PoolName = pool_name(HostType), |
450 |
:-( |
FilterMap = #{from_jid => BinJid}, |
451 |
:-( |
Rows = fetch_user_messages(PoolName, Jid, FilterMap), |
452 |
:-( |
Messages = [{Id, exml:to_binary(stored_binary_to_packet(HostType, Data))} |
453 |
:-( |
|| #{message := Data, id:= Id} <- Rows], |
454 |
:-( |
{ok, Messages ++ Acc}. |
455 |
|
|
456 |
|
%% Offset is not supported |
457 |
|
%% Each record is a tuple of form |
458 |
|
%% `{<<"13663125233">>, <<"bob@localhost">>, <<"res1">>, <<binary>>}'. |
459 |
|
%% Columns are `["id", "nick_name", "message"]'. |
460 |
|
-spec extract_messages(PoolName, RoomJID, HostType, Filter, IMax, ReverseLimit) -> |
461 |
|
[Row] when |
462 |
|
PoolName :: mongoose_cassandra:pool_name(), |
463 |
|
RoomJID :: jid:jid(), |
464 |
|
HostType :: host_type(), |
465 |
|
Filter :: filter(), |
466 |
|
IMax :: pos_integer(), |
467 |
|
ReverseLimit :: boolean(), |
468 |
|
Row :: mongoose_cassandra:row(). |
469 |
|
extract_messages(_Worker, _RoomJID, _HostType, _Filter, 0, _) -> |
470 |
:-( |
[]; |
471 |
|
extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, false) -> |
472 |
:-( |
QueryName = {extract_messages_query, select_filter(Filter)}, |
473 |
:-( |
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)), |
474 |
:-( |
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params), |
475 |
:-( |
Rows; |
476 |
|
extract_messages(PoolName, RoomJID, _HostType, Filter, IMax, true) -> |
477 |
:-( |
QueryName = {extract_messages_r_query, select_filter(Filter)}, |
478 |
:-( |
Params = maps:put('[limit]', IMax, eval_filter_params(Filter)), |
479 |
:-( |
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params), |
480 |
:-( |
lists:reverse(Rows). |
481 |
|
|
482 |
|
fetch_user_messages(PoolName, UserJID, FilterMap) -> |
483 |
:-( |
QueryName = fetch_user_messages_query, |
484 |
:-( |
{ok, Rows} = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, QueryName, FilterMap), |
485 |
:-( |
lists:sort(Rows). |
486 |
|
|
487 |
|
|
488 |
|
%% @doc Calculate a zero-based index of the row with UID in the result test. |
489 |
|
%% |
490 |
|
%% If the element does not exists, the ID of the next element will |
491 |
|
%% be returned instead. |
492 |
|
%% @end |
493 |
|
-spec calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> Count |
494 |
|
when |
495 |
|
PoolName :: mongoose_cassandra:pool_name(), |
496 |
|
RoomJID :: jid:jid(), |
497 |
|
HostType :: host_type(), |
498 |
|
Filter :: filter(), |
499 |
|
MessID :: message_id(), |
500 |
|
Count :: non_neg_integer(). |
501 |
|
calc_index(PoolName, RoomJID, HostType, Filter, MessID) -> |
502 |
:-( |
calc_count(PoolName, RoomJID, HostType, to_id(MessID, Filter)). |
503 |
|
|
504 |
|
%% @doc Count of elements in RSet before the passed element. |
505 |
|
%% |
506 |
|
%% The element with the passed UID can be already deleted. |
507 |
|
%% @end |
508 |
|
-spec calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> Count |
509 |
|
when |
510 |
|
PoolName :: mongoose_cassandra:pool_name(), |
511 |
|
RoomJID :: jid:jid(), |
512 |
|
HostType :: host_type(), |
513 |
|
Filter :: filter(), |
514 |
|
MessID :: message_id(), |
515 |
|
Count :: non_neg_integer(). |
516 |
|
calc_before(PoolName, RoomJID, HostType, Filter, MessID) -> |
517 |
:-( |
calc_count(PoolName, RoomJID, HostType, before_id(MessID, Filter)). |
518 |
|
|
519 |
|
|
520 |
|
%% @doc Get the total result set size. |
521 |
|
%% "SELECT COUNT(*) as "count" FROM mam_muc_message WHERE " |
522 |
|
-spec calc_count(PoolName, RoomJID, HostType, Filter) -> Count |
523 |
|
when |
524 |
|
PoolName :: mongoose_cassandra:pool_name(), |
525 |
|
RoomJID :: jid:jid(), |
526 |
|
HostType :: host_type(), |
527 |
|
Filter :: filter(), |
528 |
|
Count :: non_neg_integer(). |
529 |
|
calc_count(PoolName, RoomJID, _HostType, Filter) -> |
530 |
:-( |
QueryName = {calc_count_query, select_filter(Filter)}, |
531 |
:-( |
Params = eval_filter_params(Filter), |
532 |
:-( |
{ok, [#{count := Count}]} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, |
533 |
|
Params), |
534 |
:-( |
Count. |
535 |
|
|
536 |
|
%% @doc Convert offset to index of the first entry |
537 |
|
%% Returns undefined if not there are not enough rows |
538 |
|
%% Uses previously calculated offsets to speed up queries |
539 |
|
-spec offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id when |
540 |
|
PoolName :: mongoose_cassandra:pool_name(), |
541 |
|
RoomJID :: jid:jid(), |
542 |
|
Offset :: non_neg_integer(), |
543 |
|
Filter :: filter(), |
544 |
|
Id :: non_neg_integer() | undefined. |
545 |
|
offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0, |
546 |
|
Offset =< 100 -> |
547 |
:-( |
calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset); |
548 |
|
offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 -> |
549 |
:-( |
Params = maps:put(offset, Offset, eval_filter_params(Filter)), |
550 |
|
%% Try to find already calculated nearby offset to reduce query size |
551 |
:-( |
case mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, prev_offset_query, Params) of |
552 |
|
{ok, []} -> %% No hints, just calculate offset sloooowly |
553 |
:-( |
StartId = calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset), |
554 |
:-( |
maybe_save_offset_hint(PoolName, RoomJID, Filter, 0, Offset, StartId); |
555 |
|
{ok, [#{offset := PrevOffset, id := PrevId}]} -> |
556 |
|
%% Offset hint found, use it to reduce query size |
557 |
:-( |
case Offset of |
558 |
:-( |
PrevOffset -> PrevId; |
559 |
|
_ -> |
560 |
:-( |
StartId = calc_offset_to_start_id(PoolName, RoomJID, |
561 |
|
Filter#mam_muc_ca_filter{start_id = PrevId}, |
562 |
|
Offset - PrevOffset + 1), |
563 |
:-( |
maybe_save_offset_hint(PoolName, RoomJID, Filter, PrevOffset, Offset, StartId) |
564 |
|
end |
565 |
|
end. |
566 |
|
|
567 |
|
%% @doc Saves offset hint for future use in order to speed up queries with similar offset |
568 |
|
%% Hint is save only if previous offset hint was 50+ entires from current query |
569 |
|
%% This function returns given StartId as passthrough for convenience |
570 |
|
-spec maybe_save_offset_hint(PoolName :: mongoose_cassandra:pool_name(), RoomJID :: jid:jid(), |
571 |
|
Filter :: filter(), HintOffset :: non_neg_integer(), |
572 |
|
NewOffset :: non_neg_integer(), |
573 |
|
StartId :: non_neg_integer() | undefined) -> |
574 |
|
StartId :: non_neg_integer() | undefined. |
575 |
|
maybe_save_offset_hint(_PoolName, _UserJID, _Filter, _HintOffset, _NewOffset, |
576 |
|
StartId = undefined) -> |
577 |
:-( |
StartId; |
578 |
|
maybe_save_offset_hint(PoolName, RoomJID, Filter, HintOffset, NewOffset, StartId) -> |
579 |
:-( |
case abs(NewOffset - HintOffset) > 50 of |
580 |
|
true -> |
581 |
:-( |
#mam_muc_ca_filter{room_jid = FRoomJID, with_nick = FWithNick} = Filter, |
582 |
:-( |
Row = #{room_jid => FRoomJID, with_nick => FWithNick, |
583 |
|
offset => NewOffset, id => StartId}, |
584 |
:-( |
mongoose_cassandra:cql_write(PoolName, RoomJID, ?MODULE, |
585 |
|
insert_offset_hint_query, [Row]); |
586 |
|
false -> |
587 |
:-( |
skip |
588 |
|
end, |
589 |
:-( |
StartId. |
590 |
|
|
591 |
|
%% @doc Convert offset to index of the first entry |
592 |
|
%% Returns undefined if not there are not enough rows |
593 |
|
-spec calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) -> Id |
594 |
|
when |
595 |
|
PoolName :: mongoose_cassandra:pool_name(), |
596 |
|
RoomJID :: jid:jid(), |
597 |
|
Offset :: non_neg_integer(), |
598 |
|
Filter :: filter(), |
599 |
|
Id :: non_neg_integer() | undefined. |
600 |
|
calc_offset_to_start_id(PoolName, RoomJID, Filter, Offset) when is_integer(Offset), Offset >= 0 -> |
601 |
:-( |
QueryName = {list_message_ids_query, select_filter(Filter)}, |
602 |
:-( |
Params = maps:put('[limit]', Offset + 1, eval_filter_params(Filter)), |
603 |
:-( |
{ok, RowsIds} = mongoose_cassandra:cql_read(PoolName, RoomJID, ?MODULE, QueryName, Params), |
604 |
:-( |
case RowsIds of |
605 |
:-( |
[] -> undefined; |
606 |
|
[_ | _] -> |
607 |
:-( |
maps:get(id, lists:last(RowsIds)) |
608 |
|
end. |
609 |
|
|
610 |
|
%% @doc Get closest offset -> message id 'hint' for specified offset |
611 |
|
prev_offset_query_cql() -> |
612 |
:-( |
"SELECT id, offset FROM mam_muc_message_offset WHERE room_jid = ? and with_nick = ?" |
613 |
|
" and offset <= ? LIMIT 1". |
614 |
|
|
615 |
|
%% @doc Insert offset -> message id 'hint' |
616 |
|
insert_offset_hint_query_cql() -> |
617 |
:-( |
"INSERT INTO mam_muc_message_offset(room_jid, with_nick, id, offset) VALUES(?, ?, ?, ?)". |
618 |
|
|
619 |
|
prepare_filter(RoomJID, Borders, Start, End, WithNick, MsgID) -> |
620 |
:-( |
BRoomJID = mod_mam_utils:bare_jid(RoomJID), |
621 |
:-( |
StartID = maybe_encode_compact_uuid(Start, 0), |
622 |
:-( |
EndID = maybe_encode_compact_uuid(End, 255), |
623 |
|
%% In Cassandra, a column cannot be restricted by both an equality and an inequality relation. |
624 |
|
%% When MsgID is defined, it is used as both StartID2 and EndID2 to comply with this limitation. |
625 |
|
%% This means that the `ids` filter effectively overrides any "before" or "after" filters. |
626 |
:-( |
{StartID2, EndID2} = case MsgID of |
627 |
|
undefined -> |
628 |
:-( |
{apply_start_border(Borders, StartID), |
629 |
|
apply_end_border(Borders, EndID)}; |
630 |
|
ID -> |
631 |
:-( |
{ID, ID} |
632 |
|
end, |
633 |
:-( |
BWithNick = maybe_nick(WithNick), |
634 |
:-( |
prepare_filter_params(BRoomJID, BWithNick, StartID2, EndID2). |
635 |
|
|
636 |
|
prepare_filter_params(BRoomJID, BWithNick, StartID, EndID) -> |
637 |
:-( |
#mam_muc_ca_filter{ |
638 |
|
room_jid = BRoomJID, |
639 |
|
with_nick = BWithNick, |
640 |
|
start_id = StartID, |
641 |
|
end_id = EndID |
642 |
|
}. |
643 |
|
|
644 |
|
eval_filter_params(#mam_muc_ca_filter{ |
645 |
|
room_jid = BRoomJID, |
646 |
|
with_nick = BWithNick, |
647 |
|
start_id = StartID, |
648 |
|
end_id = EndID |
649 |
|
}) -> |
650 |
:-( |
Optional = maps:filter(fun(_K, V) -> V =/= undefined end, |
651 |
|
#{start_id => StartID, end_id =>EndID}), |
652 |
:-( |
maps:merge(#{room_jid => BRoomJID, with_nick => BWithNick}, Optional). |
653 |
|
|
654 |
|
select_filter(#mam_muc_ca_filter{ |
655 |
|
start_id = StartID, |
656 |
|
end_id = EndID |
657 |
|
}) -> |
658 |
:-( |
select_filter(StartID, EndID). |
659 |
|
|
660 |
|
|
661 |
|
-spec select_filter(StartID, EndID) -> |
662 |
|
all | 'end' | start | start_end when |
663 |
|
StartID :: integer() | undefined, |
664 |
|
EndID :: integer() | undefined. |
665 |
|
select_filter(undefined, undefined) -> |
666 |
:-( |
all; |
667 |
|
select_filter(undefined, _) -> |
668 |
:-( |
'end'; |
669 |
|
select_filter(_, undefined) -> |
670 |
:-( |
start; |
671 |
|
select_filter(_, _) -> |
672 |
:-( |
start_end. |
673 |
|
|
674 |
|
prepare_filter_cql(StartID, EndID) -> |
675 |
|
case StartID of |
676 |
:-( |
undefined -> ""; |
677 |
:-( |
_ -> " AND id >= :start_id" |
678 |
:-( |
end ++ |
679 |
|
case EndID of |
680 |
:-( |
undefined -> ""; |
681 |
:-( |
_ -> " AND id <= :end_id" |
682 |
|
end. |
683 |
|
|
684 |
|
filter_to_cql() -> |
685 |
:-( |
[{select_filter(StartID, EndID), |
686 |
|
prepare_filter_cql(StartID, EndID)} |
687 |
:-( |
|| StartID <- [undefined, 0], |
688 |
:-( |
EndID <- [undefined, 0]]. |
689 |
|
|
690 |
|
-spec calc_offset(PoolName, RoomJID, HostType, Filter, PageSize, TotalCount, RSM) -> Offset |
691 |
|
when |
692 |
|
PoolName :: mongoose_cassandra:pool_name(), |
693 |
|
RoomJID :: jid:jid(), |
694 |
|
HostType :: host_type(), |
695 |
|
Filter :: filter(), |
696 |
|
PageSize :: non_neg_integer(), |
697 |
|
TotalCount :: non_neg_integer(), |
698 |
|
RSM :: jlib:rsm_in() | undefined, |
699 |
|
Offset :: non_neg_integer(). |
700 |
|
%% Requesting the Last Page in a Result Set |
701 |
|
calc_offset(_W, _RoomJID, _LS, _F, PS, TC, #rsm_in{direction = before, id = undefined}) -> |
702 |
:-( |
max(0, TC - PS); |
703 |
|
calc_offset(PoolName, RoomJID, HostType, F, PS, _TC, #rsm_in{direction = before, id = ID}) |
704 |
|
when is_integer(ID) -> |
705 |
:-( |
max(0, calc_before(PoolName, RoomJID, HostType, F, ID) - PS); |
706 |
|
calc_offset(PoolName, RoomJID, HostType, F, _PS, _TC, #rsm_in{direction = aft, id = ID}) |
707 |
|
when is_integer(ID) -> |
708 |
:-( |
calc_index(PoolName, RoomJID, HostType, F, ID); |
709 |
|
calc_offset(_W, _RoomJID, _LS, _F, _PS, _TC, _RSM) -> |
710 |
:-( |
0. |
711 |
|
|
712 |
|
maybe_encode_compact_uuid(undefined, _) -> |
713 |
:-( |
undefined; |
714 |
|
maybe_encode_compact_uuid(Microseconds, NodeID) -> |
715 |
:-( |
encode_compact_uuid(Microseconds, NodeID). |
716 |
|
|
717 |
|
maybe_nick(undefined) -> |
718 |
:-( |
<<>>; |
719 |
|
maybe_nick(WithNick) when is_binary(WithNick) -> |
720 |
:-( |
WithNick. |
721 |
|
|
722 |
|
%%==================================================================== |
723 |
|
%% Internal SQL part |
724 |
|
%%==================================================================== |
725 |
|
|
726 |
|
extract_messages_queries() -> |
727 |
:-( |
[{{extract_messages_query, FilterName}, extract_messages_cql(Filter)} |
728 |
:-( |
|| {FilterName, Filter} <- filter_to_cql()]. |
729 |
|
|
730 |
|
extract_messages_r_queries() -> |
731 |
:-( |
[{{extract_messages_r_query, FilterName}, extract_messages_r_cql(Filter)} |
732 |
:-( |
|| {FilterName, Filter} <- filter_to_cql()]. |
733 |
|
|
734 |
|
calc_count_queries() -> |
735 |
:-( |
[{{calc_count_query, FilterName}, calc_count_cql(Filter)} |
736 |
:-( |
|| {FilterName, Filter} <- filter_to_cql()]. |
737 |
|
|
738 |
|
list_message_ids_queries() -> |
739 |
:-( |
[{{list_message_ids_query, FilterName}, list_message_ids_cql(Filter)} |
740 |
:-( |
|| {FilterName, Filter} <- filter_to_cql()]. |
741 |
|
|
742 |
|
extract_messages_cql(Filter) -> |
743 |
|
"SELECT id, nick_name, message FROM mam_muc_message " |
744 |
:-( |
"WHERE room_jid = ? AND with_nick = ? " ++ |
745 |
|
Filter ++ " ORDER BY id LIMIT ?". |
746 |
|
|
747 |
|
extract_messages_r_cql(Filter) -> |
748 |
|
"SELECT id, nick_name, message FROM mam_muc_message " |
749 |
:-( |
"WHERE room_jid = ? AND with_nick = ? " ++ |
750 |
|
Filter ++ " ORDER BY id DESC LIMIT ?". |
751 |
|
|
752 |
|
fetch_user_messages_cql() -> |
753 |
|
%% attempt to order results in the next error: |
754 |
|
%% "ORDER BY with 2ndary indexes is not supported." |
755 |
:-( |
"SELECT id, message FROM mam_muc_message " |
756 |
|
"WHERE from_jid = ?". |
757 |
|
|
758 |
|
calc_count_cql(Filter) -> |
759 |
|
"SELECT COUNT(*) FROM mam_muc_message " |
760 |
:-( |
"WHERE room_jid = ? AND with_nick = ? " ++ Filter. |
761 |
|
|
762 |
|
list_message_ids_cql(Filter) -> |
763 |
|
"SELECT id FROM mam_muc_message " |
764 |
:-( |
"WHERE room_jid = ? AND with_nick = ? " ++ Filter ++ |
765 |
|
" ORDER BY id LIMIT ?". |
766 |
|
|
767 |
|
%% ---------------------------------------------------------------------- |
768 |
|
%% Optimizations |
769 |
|
|
770 |
|
packet_to_stored_binary(HostType, Packet) -> |
771 |
|
%% Module implementing mam_muc_message behaviour |
772 |
:-( |
Module = db_message_format(HostType), |
773 |
:-( |
mam_message:encode(Module, Packet). |
774 |
|
|
775 |
|
stored_binary_to_packet(HostType, Bin) -> |
776 |
|
%% Module implementing mam_muc_message behaviour |
777 |
:-( |
Module = db_message_format(HostType), |
778 |
:-( |
mam_message:decode(Module, Bin). |
779 |
|
|
780 |
|
%% ---------------------------------------------------------------------- |
781 |
|
%% Params getters |
782 |
|
|
783 |
|
-spec db_message_format(HostType :: host_type()) -> module(). |
784 |
|
db_message_format(HostType) -> |
785 |
:-( |
gen_mod:get_module_opt(HostType, ?MODULE, db_message_format). |
786 |
|
|
787 |
|
-spec pool_name(HostType :: host_type()) -> default. %% returns mongoose_wpool:pool_name(). |
788 |
|
pool_name(_HostType) -> |
789 |
:-( |
default. |