./ct_report/coverage/mod_mam_riak_timed_arch_yz.COVER.html

1 %%==============================================================================
2 %% Copyright 2015 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_mam_riak_timed_arch_yz).
17
18 -behaviour(ejabberd_gen_mam_archive).
19 -behaviour(gen_mod).
20 -behaviour(mongoose_module_metrics).
21
22 -callback encode(term()) -> binary().
23 -callback decode(binary()) -> term().
24
25
26 -include("mongoose.hrl").
27 -include("jlib.hrl").
28 -include("mongoose_rsm.hrl").
29
30 %% API
31 -export([start/2,
32 stop/1,
33 archive_size/4,
34 lookup_messages/2,
35 remove_archive/4]).
36
37 -export([archive_message/3,
38 archive_message_muc/3,
39 lookup_messages/3,
40 lookup_messages_muc/3]).
41
42 -export([key/3]).
43
44 %% For tests only
45 -export([create_obj/6, read_archive/8, bucket/2,
46 list_mam_buckets/1, remove_bucket/1]).
47
48 -export([get_mam_muc_gdpr_data/3, get_mam_pm_gdpr_data/3]).
49
50 -ignore_xref([archive_message_muc/3, behaviour_info/1, bucket/2, create_obj/6,
51 key/3, list_mam_buckets/1, lookup_messages/2, lookup_messages_muc/3,
52 read_archive/8, remove_archive/4, remove_bucket/1]).
53
54 -type yearweeknum() :: {non_neg_integer(), 1..53}.
55
56 %% @doc Start module
57 %%
58 %% Options:
59 %% - `pm' option starts one-to-one chat archives
60 %% - `muc' option starts multichat archives
61 %%
62 %% Use both options `pm, muc' to archive both MUC and private messages
63 start(Host, Opts) ->
64
:-(
case gen_mod:get_module_opt(Host, ?MODULE, pm, false) of
65 true ->
66
:-(
start_chat_archive(Host, Opts);
67 false ->
68
:-(
ok
69 end,
70
:-(
case gen_mod:get_module_opt(Host, ?MODULE, muc, false) of
71 true ->
72
:-(
start_muc_archive(Host, Opts);
73 false ->
74
:-(
ok
75 end.
76
77 start_chat_archive(Host, _Opts) ->
78
:-(
ejabberd_hooks:add(mam_archive_message, Host, ?MODULE, archive_message, 50),
79
:-(
ejabberd_hooks:add(mam_archive_size, Host, ?MODULE, archive_size, 50),
80
:-(
ejabberd_hooks:add(mam_lookup_messages, Host, ?MODULE, lookup_messages, 50),
81
:-(
ejabberd_hooks:add(mam_remove_archive, Host, ?MODULE, remove_archive, 50),
82
:-(
ejabberd_hooks:add(get_mam_pm_gdpr_data, Host, ?MODULE, get_mam_pm_gdpr_data, 50).
83
84 start_muc_archive(Host, _Opts) ->
85
:-(
ejabberd_hooks:add(mam_muc_archive_message, Host, ?MODULE, archive_message_muc, 50),
86
:-(
ejabberd_hooks:add(mam_muc_archive_size, Host, ?MODULE, archive_size, 50),
87
:-(
ejabberd_hooks:add(mam_muc_lookup_messages, Host, ?MODULE, lookup_messages_muc, 50),
88
:-(
ejabberd_hooks:add(mam_muc_remove_archive, Host, ?MODULE, remove_archive, 50),
89
:-(
ejabberd_hooks:add(get_mam_muc_gdpr_data, Host, ?MODULE, get_mam_muc_gdpr_data, 50).
90
91 stop(Host) ->
92
:-(
case gen_mod:get_module_opt(Host, ?MODULE, pm, false) of
93 true ->
94
:-(
stop_chat_archive(Host);
95 false ->
96
:-(
ok
97 end,
98
:-(
case gen_mod:get_module_opt(Host, ?MODULE, muc, false) of
99 true ->
100
:-(
stop_muc_archive(Host);
101 false ->
102
:-(
ok
103 end.
104
105 stop_chat_archive(Host) ->
106
:-(
ejabberd_hooks:delete(mam_archive_message, Host, ?MODULE, archive_message_muc, 50),
107
:-(
ejabberd_hooks:delete(mam_archive_size, Host, ?MODULE, archive_size, 50),
108
:-(
ejabberd_hooks:delete(mam_lookup_messages, Host, ?MODULE, lookup_messages_muc, 50),
109
:-(
ejabberd_hooks:delete(mam_remove_archive, Host, ?MODULE, remove_archive, 50),
110
:-(
ejabberd_hooks:delete(get_mam_pm_gdpr_data, Host, ?MODULE, get_mam_pm_gdpr_data, 50),
111
:-(
ok.
112
113 stop_muc_archive(Host) ->
114
:-(
ejabberd_hooks:delete(mam_muc_archive_message, Host, ?MODULE, archive_message, 50),
115
:-(
ejabberd_hooks:delete(mam_muc_archive_size, Host, ?MODULE, archive_size, 50),
116
:-(
ejabberd_hooks:delete(mam_muc_lookup_messages, Host, ?MODULE, lookup_messages, 50),
117
:-(
ejabberd_hooks:delete(mam_muc_remove_archive, Host, ?MODULE, remove_archive, 50),
118
:-(
ejabberd_hooks:delete(get_mam_muc_gdpr_data, Host, ?MODULE, get_mam_muc_gdpr_data, 50),
119
:-(
ok.
120
121 yz_search_index(Host) ->
122
:-(
gen_mod:get_module_opt(Host, ?MODULE, search_index, <<"mam">>).
123
124 mam_bucket_type(Host) ->
125
:-(
gen_mod:get_module_opt(Host, ?MODULE, bucket_type, <<"mam_yz">>).
126
127 %% LocJID - archive owner's JID
128 %% RemJID - interlocutor's JID
129 %% SrcJID - "Real" sender JID
130 archive_message(_Result, Host, #{message_id := MessId,
131 local_jid := LocJID,
132 remote_jid := RemJID,
133 source_jid := SrcJID,
134 packet := Packet} = Params) ->
135
:-(
try
136
:-(
archive_message(Host, MessId, LocJID, RemJID, SrcJID, LocJID, Packet, pm)
137 catch Class:Reason:StackTrace ->
138
:-(
?LOG_WARNING(maps:merge(Params,
139 #{what => archive_message_failed,
140 text => <<"Could not write message to archive">>,
141
:-(
class => Class, reason => Reason, stacktrace => StackTrace})),
142
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
143
:-(
{error, Reason}
144 end.
145
146 %% LocJID - MUC/MUC Light room's JID
147 %% FromJID - "Real" sender JID
148 %% SrcJID - Full JID of user within room (room@domain/user)
149 archive_message_muc(_Result, Host, #{message_id := MessId,
150 local_jid := LocJID,
151 remote_jid := FromJID,
152 source_jid := SrcJID,
153 packet := Packet} = Params) ->
154
:-(
RemJIDMuc = maybe_muc_jid(SrcJID),
155
:-(
try
156
:-(
archive_message(Host, MessId, LocJID, RemJIDMuc, SrcJID, FromJID, Packet, muc)
157 catch Class:Reason:StackTrace ->
158
:-(
?LOG_WARNING(maps:merge(Params,
159 #{what => archive_muc_message_failed,
160 text => <<"Could not write MUC message to archive">>,
161
:-(
class => Class, reason => Reason, stacktrace => StackTrace})),
162
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
163
:-(
{error, Reason}
164 end.
165
166 maybe_muc_jid(#jid{lresource = RemRes}) ->
167
:-(
{<<>>, RemRes, <<>>};
168 maybe_muc_jid(Other) ->
169
:-(
Other.
170
171
172 lookup_messages({error, _Reason} = Result, _Host, _Params) ->
173
:-(
Result;
174 lookup_messages(_Result, Host, Params) ->
175
:-(
try
176
:-(
lookup_messages(Host, Params)
177 catch _Type:Reason:S ->
178
:-(
{error, {Reason, {stacktrace, S}}}
179 end.
180
181
182 lookup_messages_muc(Result, Host, #{with_jid := WithJID} = Params) ->
183
:-(
WithJIDMuc = maybe_muc_jid(WithJID),
184
:-(
lookup_messages(Result, Host, Params#{with_jid => WithJIDMuc}).
185
186
187 archive_size(_Size, Host, _ArchiveID, ArchiveJID) ->
188
:-(
OwnerJID = mod_mam_utils:bare_jid(ArchiveJID),
189
:-(
RemoteJID = undefined,
190
:-(
{MsgIdStartNoRSM, MsgIdEndNoRSM} =
191 mod_mam_utils:calculate_msg_id_borders(undefined, undefined, undefined),
192
:-(
F = fun get_msg_id_key/3,
193
:-(
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID,
194 MsgIdStartNoRSM, MsgIdEndNoRSM, undefined,
195 [{rows, 1}], F),
196
:-(
TotalCount.
197
198 %% use correct bucket for given date
199
200 -spec bucket(jid:server(), calendar:date() | yearweeknum() | integer()) ->
201 {binary(), binary()} | undefined.
202 bucket(Host, MsgId) when is_integer(MsgId) ->
203
:-(
{MicroSec, _} = mod_mam_utils:decode_compact_uuid(MsgId),
204
:-(
{MsgDate, _} = calendar:system_time_to_universal_time(MicroSec, microsecond),
205
:-(
bucket(Host, MsgDate);
206 bucket(Host, {_, _, _} = Date) ->
207
:-(
bucket(Host, calendar:iso_week_number(Date));
208 bucket(Host, {Year, Week}) ->
209
:-(
YearBin = integer_to_binary(Year),
210
:-(
WeekNumBin = integer_to_binary(Week),
211
:-(
{mam_bucket_type(Host), <<"mam_", YearBin/binary, "_", WeekNumBin/binary>>};
212 bucket(_Host, _) ->
213
:-(
undefined.
214
215 list_mam_buckets(Host) ->
216
:-(
Type = mam_bucket_type(Host),
217
:-(
{ok, Buckets} = riakc_pb_socket:list_buckets(mongoose_riak:get_worker(), Type),
218
:-(
[{Type, Bucket} || Bucket <- Buckets].
219
220
221 remove_bucket(Bucket) ->
222
:-(
{ok, Keys} = mongoose_riak:list_keys(Bucket),
223
:-(
[mongoose_riak:delete(Bucket, Key) || Key <- Keys].
224
225
226 %% PM:
227 %% * LocJID - archive owner's JID
228 %% * RemJID - interlocutor's JID
229 %% * SrcJID - "Real" sender JID
230 %% * OwnerJID - Same as LocJID
231 %% MUC / MUC Light:
232 %% * LocJID - MUC/MUC Light room's JID
233 %% * RemJID - Nickname of JID of destination
234 %% * SrcJID - Full JID of user within room (room@domain/user)
235 %% * OwnerJID - "Real" sender JID (not room specific)
236 archive_message(Host, MessID, LocJID, RemJID, SrcJID, OwnerJID, Packet, Type) ->
237
:-(
LocalJID = mod_mam_utils:bare_jid(LocJID),
238
:-(
RemoteJID = mod_mam_utils:bare_jid(RemJID),
239
:-(
SourceJID = mod_mam_utils:full_jid(SrcJID),
240
:-(
BareOwnerJID = mod_mam_utils:bare_jid(OwnerJID),
241
:-(
MsgId = integer_to_binary(MessID),
242
:-(
Key = key(LocalJID, RemoteJID, MsgId),
243
244
:-(
Bucket = bucket(Host, MessID),
245
246
:-(
RiakMap = create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type),
247
:-(
case mongoose_riak:update_type(Bucket, Key, riakc_map:to_op(RiakMap)) of
248
:-(
ok -> ok;
249
:-(
Other -> throw(Other)
250 end.
251
252 create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type) ->
253
:-(
ModMAM =
254 case Type of
255
:-(
pm -> mod_mam;
256
:-(
muc -> mod_mam_muc
257 end,
258
:-(
BodyChars = mod_mam_utils:packet_to_search_body(ModMAM, Host, Packet),
259
:-(
BodyValue = unicode:characters_to_binary(BodyChars),
260
:-(
Ops = [
261 {{<<"msg_id">>, register},
262
:-(
fun(R) -> riakc_register:set(MsgId, R) end},
263 {{<<"source_jid">>, register},
264
:-(
fun(R) -> riakc_register:set(SourceJID, R) end},
265 {{<<"msg_owner_jid">>, register},
266
:-(
fun(R) -> riakc_register:set(BareOwnerJID, R) end},
267 {{<<"mam_type">>, register},
268
:-(
fun(R) -> riakc_register:set(atom_to_binary(Type, latin1), R) end},
269 {{<<"packet">>, register},
270
:-(
fun(R) -> riakc_register:set(packet_to_stored_binary(Host, Packet), R) end},
271 {{<<"search_text">>, register},
272
:-(
fun(R) -> riakc_register:set(BodyValue, R) end}
273 ],
274
275
:-(
mongoose_riak:create_new_map(Ops).
276
277 lookup_messages(Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params)
278 when ID =/= undefined ->
279
:-(
lookup_message_page(Host, RSM, Params);
280 lookup_messages(Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params)
281 when ID =/= undefined ->
282
:-(
lookup_message_page(Host, RSM, Params);
283 lookup_messages(Host, Params) ->
284
:-(
do_lookup_messages(Host, Params).
285
286 lookup_message_page(Host, RSM, Params) ->
287
:-(
PageSize = maps:get(page_size, Params),
288
:-(
{ok, Result} = do_lookup_messages(Host, Params#{page_size := 1 + PageSize}),
289
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
290
291 do_lookup_messages(Host, Params) ->
292
:-(
OwnerJID = mod_mam_utils:bare_jid(maps:get(owner_jid, Params)),
293
:-(
RemoteJID = mod_mam_utils:bare_jid(maps:get(with_jid, Params)),
294
295
:-(
RSM = maps:get(rsm, Params),
296
297
:-(
SearchOpts2 = add_sorting(RSM, [{rows, maps:get(page_size, Params)}]),
298
:-(
SearchOpts = add_offset(RSM, SearchOpts2),
299
300
:-(
F = fun get_msg_id_key/3,
301
302
:-(
Borders = maps:get(borders, Params),
303
:-(
Start = maps:get(start_ts, Params),
304
:-(
End = maps:get(end_ts, Params),
305
:-(
SearchText = maps:get(search_text, Params),
306
:-(
{MsgIdStart, MsgIdEnd} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End),
307
:-(
{TotalCountFullQuery, Result} = read_archive(Host, OwnerJID, RemoteJID,
308 MsgIdStart, MsgIdEnd, SearchText,
309 SearchOpts, F),
310
311
:-(
SortedKeys = sort_messages(Result),
312
:-(
case maps:get(is_simple, Params) of
313 true ->
314
:-(
{ok, {undefined, undefined, get_messages(Host, SortedKeys)}};
315 _ ->
316
:-(
{MsgIdStartNoRSM, MsgIdEndNoRSM} =
317 mod_mam_utils:calculate_msg_id_borders(Borders, Start, End),
318
:-(
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID,
319 MsgIdStartNoRSM, MsgIdEndNoRSM, SearchText,
320 [{rows, 1}], F),
321
:-(
SLen = length(SortedKeys),
322
:-(
Args = {Host, OwnerJID, RemoteJID, MsgIdStartNoRSM, SearchText},
323
:-(
Offset = calculate_offset(RSM, TotalCountFullQuery, SLen, Args),
324
:-(
{ok, {TotalCount, Offset, get_messages(Host, SortedKeys)}}
325 end.
326
327
328 add_sorting(#rsm_in{direction = before}, Opts) ->
329
:-(
[{sort, <<"msg_id_register desc">>} | Opts];
330 add_sorting(_, Opts) ->
331
:-(
[{sort, <<"msg_id_register asc">>} | Opts].
332
333 add_offset(#rsm_in{index = Offset}, Opts) when is_integer(Offset) ->
334
:-(
[{start, Offset} | Opts];
335 add_offset(_, Opts) ->
336
:-(
Opts.
337
338 calculate_offset(#rsm_in{direction = before}, TotalCount, PageSize, _) ->
339
:-(
TotalCount - PageSize;
340 calculate_offset(#rsm_in{direction = aft, id = Id}, _, _,
341 {Host, Owner, Remote, MsgIdStart, SearchText})
342 when Id /= undefined ->
343
:-(
{Count, _} = read_archive(Host, Owner, Remote,
344 MsgIdStart, Id, SearchText,
345 [{rows, 1}], fun get_msg_id_key/3),
346
:-(
Count;
347 calculate_offset(#rsm_in{direction = undefined, index = Index}, _, _, _)
348 when is_integer(Index) ->
349
:-(
Index;
350 calculate_offset(_, _TotalCount, _PageSize, _) ->
351
:-(
0.
352
353 get_msg_id_key(Bucket, Key, Msgs) ->
354
:-(
[_, _, MsgId] = decode_key(Key),
355
:-(
Item = {binary_to_integer(MsgId), Bucket, Key},
356
:-(
[Item | Msgs].
357
358 get_messages(Host, BucketKeys) ->
359
:-(
lists:flatten([get_message2(Host, MsgId, Bucket, Key) || {MsgId, Bucket, Key} <- BucketKeys]).
360
361 get_message2(Host, MsgId, Bucket, Key) ->
362
:-(
case mongoose_riak:fetch_type(Bucket, Key) of
363 {ok, RiakMap} ->
364
:-(
SourceJID = riakc_map:fetch({<<"source_jid">>, register}, RiakMap),
365
:-(
PacketBin = riakc_map:fetch({<<"packet">>, register}, RiakMap),
366
:-(
Packet = stored_binary_to_packet(Host, PacketBin),
367
:-(
#{id => MsgId, jid => jid:from_binary(SourceJID), packet => Packet};
368 _ ->
369
:-(
[]
370 end.
371 -spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
372 mongooseim:host_type(), jid:jid()) ->
373 ejabberd_gen_mam_archive:mam_pm_gdpr_data().
374 get_mam_pm_gdpr_data(Acc, _HostType, OwnerJid) ->
375
:-(
Messages = get_mam_gdpr_data(OwnerJid, <<"pm">>),
376
:-(
[{Id, jid:to_binary(Jid), exml:to_binary(Packet)} || #{id := Id, jid := Jid, packet := Packet} <- Messages] ++ Acc.
377
378 -spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
379 mongooseim:host_type(), jid:jid()) ->
380 ejabberd_gen_mam_archive:mam_muc_gdpr_data().
381 get_mam_muc_gdpr_data(Acc, _HostType, JID) ->
382
:-(
Messages = get_mam_gdpr_data(JID, <<"muc">>),
383
:-(
[{MsgId, exml:to_binary(Packet)} || #{id := MsgId, packet := Packet} <- Messages] ++ Acc.
384
385 get_mam_gdpr_data(#jid{ lserver = LServer } = BareJid, Type) ->
386
:-(
BareLJidBin = jid:to_binary(jid:to_lower(BareJid)),
387
:-(
Query = <<"msg_owner_jid_register:", BareLJidBin/binary, " AND mam_type_register:", Type/binary>>,
388
:-(
SearchOpts = [],
389
:-(
{ok, _Cnt, _, MsgIds} = fold_archive(LServer, fun get_msg_id_key/3, Query, SearchOpts, []),
390
:-(
get_messages(LServer, MsgIds).
391
392 remove_archive(Acc, Host, _ArchiveID, ArchiveJID) ->
393
:-(
remove_archive(Host, ArchiveJID),
394
:-(
Acc.
395
396 remove_archive(Host, ArchiveJID) ->
397
:-(
{ok, TotalCount, _, _} = R = remove_chunk(Host, ArchiveJID, 0),
398
:-(
Result = do_remove_archive(100, R, Host, ArchiveJID),
399
:-(
case Result of
400 {stopped, N} ->
401
:-(
?LOG_WARNING(#{what => remove_archive_failed,
402 text => <<"archive removal stopped for jid after "
403 "processing {processed_count} items out of {total_count}">>,
404 archive_jid => ArchiveJID,
405
:-(
processed_count => N, total_count => TotalCount}),
406
:-(
ok;
407 {ok, _} ->
408
:-(
ok
409 end.
410
411 remove_chunk(Host, ArchiveJID, Acc) ->
412
:-(
KeyFiletrs = key_filters(mod_mam_utils:bare_jid(ArchiveJID)),
413
:-(
fold_archive(Host,
414 fun delete_key_fun/3,
415 KeyFiletrs,
416 [{rows, 50}, {sort, <<"msg_id_register asc">>}], Acc).
417
418 do_remove_archive(0, {ok, _, _, Acc}, _, _) ->
419
:-(
{stopped, Acc};
420 do_remove_archive(_, {ok, 0, _, Acc}, _, _) ->
421
:-(
{ok, Acc};
422 do_remove_archive(N, {ok, _TotalResults, _RowsIterated, Acc}, Host, ArchiveJID) ->
423
:-(
timer:sleep(1000), % give Riak some time to clear after just removed keys
424
:-(
R = remove_chunk(Host, ArchiveJID, Acc),
425
:-(
do_remove_archive(N-1, R, Host, ArchiveJID).
426
427 delete_key_fun(Bucket, Key, N) ->
428
:-(
ok = mongoose_riak:delete(Bucket, Key, [{dw, 2}]),
429
:-(
N + 1.
430
431
432 key(LocalJID, RemoteJID, MsgId) ->
433
:-(
<<LocalJID/binary, $/, RemoteJID/binary, $/, MsgId/binary>>.
434
435 decode_key(KeyBinary) ->
436
:-(
binary:split(KeyBinary, <<"/">>, [global]).
437
438 -spec read_archive(jid:server(),
439 binary() | undefined,
440 binary() | undefined,
441 term(),
442 term(),
443 binary() | undefined,
444 [term()],
445 fun()) ->
446 {integer(), list()} | {error, term()}.
447 read_archive(Host, OwnerJID, WithJID, Start, End, SearchText, SearchOpts, Fun) ->
448
:-(
KeyFilters = key_filters(OwnerJID, WithJID, Start, End, SearchText),
449
:-(
{ok, Cnt, _, NewAcc} = fold_archive(Host, Fun, KeyFilters, SearchOpts, []),
450
:-(
{Cnt, NewAcc}.
451
452
453 sort_messages(Msgs) ->
454
:-(
SortFun = fun({MsgId1, _, _}, {MsgId2, _, _}) ->
455
:-(
MsgId1 =< MsgId2
456 end,
457
:-(
lists:sort(SortFun, Msgs).
458
459 fold_archive(Host, Fun, Query, SearchOpts, InitialAcc) ->
460
:-(
Result = mongoose_riak:search(yz_search_index(Host), Query, SearchOpts),
461
:-(
case Result of
462 {ok, {search_results, [], _, Count}} ->
463
:-(
{ok, Count, 0, InitialAcc};
464 {ok, {search_results, Results, _Score, Count}} ->
465
:-(
{ok, Count, length(Results), do_fold_archive(Fun, Results, InitialAcc)};
466 {error, R} = Err ->
467
:-(
?LOG_WARNING(#{what => mam_read_error,
468
:-(
cql_query => Query, reason => R}),
469
:-(
Err
470 end.
471
472 do_fold_archive(Fun, BucketKeys, InitialAcc) ->
473
:-(
lists:foldl(fun({_Index, Props}, Acc) ->
474
:-(
{_, Bucket} = lists:keyfind(<<"_yz_rb">>, 1, Props),
475
:-(
{_, Type} = lists:keyfind(<<"_yz_rt">>, 1, Props),
476
:-(
{_, Key} = lists:keyfind(<<"_yz_rk">>, 1, Props),
477
:-(
Fun({Type, Bucket}, Key, Acc)
478 end, InitialAcc, BucketKeys).
479
480 %% Filter API
481 key_filters(LocalJid) ->
482
:-(
key_filters(LocalJid, undefined, undefined, undefined, undefined).
483
484 key_filters(LocalJid, RemoteJid, Start, End, SearchText) ->
485
:-(
JidFilter = jid_filters(LocalJid, RemoteJid),
486
:-(
IdFilter = id_filters(Start, End),
487
:-(
TextFilter = search_text_filter(SearchText),
488
489
:-(
Separator = <<" AND ">>,
490
:-(
Filters0 = [JidFilter, IdFilter, TextFilter],
491
:-(
Filters1 = [[Filter, Separator] || Filter <- Filters0, is_binary(Filter)],
492
:-(
FiltersBin = list_to_binary(Filters1),
493
:-(
binary:part(FiltersBin, 0, byte_size(FiltersBin) - byte_size(Separator)).
494
495 %% Filter helpers
496 -spec search_text_filter(binary() | undefined) -> binary().
497 search_text_filter(undefined) ->
498
:-(
undefined;
499 search_text_filter(SearchText) ->
500
:-(
Separator = <<"~1 AND search_text_register:">>,
501
:-(
NormText = mod_mam_utils:normalize_search_text(SearchText, Separator),
502 %% Fuzzy search on tokens from search phrase
503
:-(
<<"search_text_register:", NormText/binary, "~1">>.
504
505 jid_filters(LocalJid, undefined) ->
506
:-(
<<"_yz_rk:", LocalJid/binary, "/*/*">>;
507 jid_filters(LocalJid, RemoteJid) ->
508
:-(
<<"_yz_rk:", LocalJid/binary, "/", RemoteJid/binary, "/*">>.
509
510 id_filters(undefined, undefined) ->
511
:-(
undefined;
512 id_filters(MsgId, MsgId) ->
513
:-(
MsgIdBin = integer_to_binary(MsgId),
514
:-(
<<"msg_id_register:", MsgIdBin/binary>>;
515 id_filters(StartInt, undefined) ->
516
:-(
solr_id_filters(integer_to_binary(StartInt), <<"*">>);
517 id_filters(undefined, EndInt) ->
518
:-(
solr_id_filters(<<"*">>, integer_to_binary(EndInt));
519 id_filters(StartInt, EndInt) ->
520
:-(
solr_id_filters(integer_to_binary(StartInt), integer_to_binary(EndInt)).
521
522 solr_id_filters(Start, End) ->
523
:-(
<<"msg_id_register:[", Start/binary, " TO ", End/binary, " ]">>.
524
525 %% ----------------------------------------------------------------------
526 %% Optimizations
527
528 packet_to_stored_binary(Host, Packet) ->
529
:-(
Module = db_message_codec(Host),
530
:-(
mam_message:encode(Module, Packet).
531
532 stored_binary_to_packet(Host, Bin) ->
533
:-(
Module = db_message_codec(Host),
534
:-(
mam_message:decode(Module, Bin).
535
536 -spec db_message_codec(Host :: jid:server()) -> module().
537 db_message_codec(Host) ->
538
:-(
gen_mod:get_module_opt(Host, ?MODULE, db_message_format, mam_message_xml).
Line Hits Source