./ct_report/coverage/mod_mam_riak_timed_arch_yz.COVER.html

1 %%==============================================================================
2 %% Copyright 2015 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_mam_riak_timed_arch_yz).
17
18 -behaviour(ejabberd_gen_mam_archive).
19 -behaviour(gen_mod).
20 -behaviour(mongoose_module_metrics).
21
22 -callback encode(term()) -> binary().
23 -callback decode(binary()) -> term().
24
25
26 -include("mongoose.hrl").
27 -include("jlib.hrl").
28 -include("mongoose_rsm.hrl").
29
30 %% API
31 -export([start/2,
32 stop/1,
33 archive_size/4,
34 lookup_messages/2,
35 remove_archive/4]).
36
37 -export([archive_message/3,
38 archive_message_muc/3,
39 lookup_messages/3,
40 lookup_messages_muc/3]).
41
42 -export([key/3]).
43
44 %% For tests only
45 -export([create_obj/6, read_archive/8, bucket/2,
46 list_mam_buckets/1, remove_bucket/1]).
47
48 -export([get_mam_muc_gdpr_data/3, get_mam_pm_gdpr_data/3]).
49
50 -ignore_xref([archive_message_muc/3, behaviour_info/1, bucket/2, create_obj/6,
51 key/3, list_mam_buckets/1, lookup_messages/2, lookup_messages_muc/3,
52 read_archive/8, remove_archive/4, remove_bucket/1]).
53
54 -type yearweeknum() :: {non_neg_integer(), 1..53}.
55
56 %% @doc Start module
57 %%
58 %% Options:
59 %% - `pm' option starts one-to-one chat archives
60 %% - `muc' option starts multichat archives
61 %%
62 %% Use both options `pm, muc' to archive both MUC and private messages
63 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
64 start(HostType, Opts) ->
65
:-(
ejabberd_hooks:add(hooks(HostType, Opts)).
66
67 -spec stop(mongooseim:host_type()) -> ok.
68 stop(HostType) ->
69
:-(
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE),
70
:-(
ejabberd_hooks:delete(hooks(HostType, Opts)).
71
72 hooks(HostType, Opts) ->
73
:-(
lists:flatmap(fun(Type) -> hooks(HostType, Type, Opts) end, [pm, muc]).
74
75 hooks(HostType, pm, #{pm := true}) ->
76
:-(
[{mam_archive_message, HostType, ?MODULE, archive_message, 50},
77 {mam_archive_size, HostType, ?MODULE, archive_size, 50},
78 {mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50},
79 {mam_remove_archive, HostType, ?MODULE, remove_archive, 50},
80 {get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50}];
81 hooks(HostType, muc, #{muc := true}) ->
82
:-(
[{mam_muc_archive_message, HostType, ?MODULE, archive_message_muc, 50},
83 {mam_muc_archive_size, HostType, ?MODULE, archive_size, 50},
84 {mam_muc_lookup_messages, HostType, ?MODULE, lookup_messages_muc, 50},
85 {mam_muc_remove_archive, HostType, ?MODULE, remove_archive, 50},
86 {get_mam_muc_gdpr_data, HostType, ?MODULE, get_mam_muc_gdpr_data, 50}];
87 hooks(_HostType, _Opt, _Opts) ->
88
:-(
[].
89
90 yz_search_index(Host) ->
91
:-(
gen_mod:get_module_opt(Host, ?MODULE, [riak, search_index]).
92
93 mam_bucket_type(Host) ->
94
:-(
gen_mod:get_module_opt(Host, ?MODULE, [riak, bucket_type]).
95
96 %% LocJID - archive owner's JID
97 %% RemJID - interlocutor's JID
98 %% SrcJID - "Real" sender JID
99 archive_message(_Result, Host, #{message_id := MessId,
100 local_jid := LocJID,
101 remote_jid := RemJID,
102 source_jid := SrcJID,
103 packet := Packet} = Params) ->
104
:-(
try
105
:-(
archive_message(Host, MessId, LocJID, RemJID, SrcJID, LocJID, Packet, pm)
106 catch Class:Reason:StackTrace ->
107
:-(
?LOG_WARNING(maps:merge(Params,
108 #{what => archive_message_failed,
109 text => <<"Could not write message to archive">>,
110
:-(
class => Class, reason => Reason, stacktrace => StackTrace})),
111
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
112
:-(
{error, Reason}
113 end.
114
115 %% LocJID - MUC/MUC Light room's JID
116 %% FromJID - "Real" sender JID
117 %% SrcJID - Full JID of user within room (room@domain/user)
118 archive_message_muc(_Result, Host, #{message_id := MessId,
119 local_jid := LocJID,
120 remote_jid := FromJID,
121 source_jid := SrcJID,
122 packet := Packet} = Params) ->
123
:-(
RemJIDMuc = maybe_muc_jid(SrcJID),
124
:-(
try
125
:-(
archive_message(Host, MessId, LocJID, RemJIDMuc, SrcJID, FromJID, Packet, muc)
126 catch Class:Reason:StackTrace ->
127
:-(
?LOG_WARNING(maps:merge(Params,
128 #{what => archive_muc_message_failed,
129 text => <<"Could not write MUC message to archive">>,
130
:-(
class => Class, reason => Reason, stacktrace => StackTrace})),
131
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
132
:-(
{error, Reason}
133 end.
134
135 maybe_muc_jid(#jid{lresource = RemRes}) ->
136
:-(
{<<>>, RemRes, <<>>};
137 maybe_muc_jid(Other) ->
138
:-(
Other.
139
140
141 lookup_messages({error, _Reason} = Result, _Host, _Params) ->
142
:-(
Result;
143 lookup_messages(_Result, Host, Params) ->
144
:-(
try
145
:-(
lookup_messages(Host, Params)
146 catch _Type:Reason:S ->
147
:-(
{error, {Reason, {stacktrace, S}}}
148 end.
149
150
151 lookup_messages_muc(Result, Host, #{with_jid := WithJID} = Params) ->
152
:-(
WithJIDMuc = maybe_muc_jid(WithJID),
153
:-(
lookup_messages(Result, Host, Params#{with_jid => WithJIDMuc}).
154
155
156 archive_size(_Size, Host, _ArchiveID, ArchiveJID) ->
157
:-(
OwnerJID = mod_mam_utils:bare_jid(ArchiveJID),
158
:-(
RemoteJID = undefined,
159
:-(
{MsgIdStartNoRSM, MsgIdEndNoRSM} =
160 mod_mam_utils:calculate_msg_id_borders(undefined, undefined, undefined),
161
:-(
F = fun get_msg_id_key/3,
162
:-(
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID,
163 MsgIdStartNoRSM, MsgIdEndNoRSM, undefined,
164 [{rows, 1}], F),
165
:-(
TotalCount.
166
167 %% use correct bucket for given date
168
169 -spec bucket(jid:server(), calendar:date() | yearweeknum() | integer()) ->
170 {binary(), binary()} | undefined.
171 bucket(Host, MsgId) when is_integer(MsgId) ->
172
:-(
{MicroSec, _} = mod_mam_utils:decode_compact_uuid(MsgId),
173
:-(
{MsgDate, _} = calendar:system_time_to_universal_time(MicroSec, microsecond),
174
:-(
bucket(Host, MsgDate);
175 bucket(Host, {_, _, _} = Date) ->
176
:-(
bucket(Host, calendar:iso_week_number(Date));
177 bucket(Host, {Year, Week}) ->
178
:-(
YearBin = integer_to_binary(Year),
179
:-(
WeekNumBin = integer_to_binary(Week),
180
:-(
{mam_bucket_type(Host), <<"mam_", YearBin/binary, "_", WeekNumBin/binary>>};
181 bucket(_Host, _) ->
182
:-(
undefined.
183
184 list_mam_buckets(Host) ->
185
:-(
Type = mam_bucket_type(Host),
186
:-(
{ok, Buckets} = riakc_pb_socket:list_buckets(mongoose_riak:get_worker(), Type),
187
:-(
[{Type, Bucket} || Bucket <- Buckets].
188
189
190 remove_bucket(Bucket) ->
191
:-(
{ok, Keys} = mongoose_riak:list_keys(Bucket),
192
:-(
[mongoose_riak:delete(Bucket, Key) || Key <- Keys].
193
194
195 %% PM:
196 %% * LocJID - archive owner's JID
197 %% * RemJID - interlocutor's JID
198 %% * SrcJID - "Real" sender JID
199 %% * OwnerJID - Same as LocJID
200 %% MUC / MUC Light:
201 %% * LocJID - MUC/MUC Light room's JID
202 %% * RemJID - Nickname of JID of destination
203 %% * SrcJID - Full JID of user within room (room@domain/user)
204 %% * OwnerJID - "Real" sender JID (not room specific)
205 archive_message(Host, MessID, LocJID, RemJID, SrcJID, OwnerJID, Packet, Type) ->
206
:-(
LocalJID = mod_mam_utils:bare_jid(LocJID),
207
:-(
RemoteJID = mod_mam_utils:bare_jid(RemJID),
208
:-(
SourceJID = mod_mam_utils:full_jid(SrcJID),
209
:-(
BareOwnerJID = mod_mam_utils:bare_jid(OwnerJID),
210
:-(
MsgId = integer_to_binary(MessID),
211
:-(
Key = key(LocalJID, RemoteJID, MsgId),
212
213
:-(
Bucket = bucket(Host, MessID),
214
215
:-(
RiakMap = create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type),
216
:-(
case mongoose_riak:update_type(Bucket, Key, riakc_map:to_op(RiakMap)) of
217
:-(
ok -> ok;
218
:-(
Other -> throw(Other)
219 end.
220
221 create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type) ->
222
:-(
ModMAM =
223 case Type of
224
:-(
pm -> mod_mam;
225
:-(
muc -> mod_mam_muc
226 end,
227
:-(
BodyChars = mod_mam_utils:packet_to_search_body(ModMAM, Host, Packet),
228
:-(
BodyValue = unicode:characters_to_binary(BodyChars),
229
:-(
Ops = [
230 {{<<"msg_id">>, register},
231
:-(
fun(R) -> riakc_register:set(MsgId, R) end},
232 {{<<"source_jid">>, register},
233
:-(
fun(R) -> riakc_register:set(SourceJID, R) end},
234 {{<<"msg_owner_jid">>, register},
235
:-(
fun(R) -> riakc_register:set(BareOwnerJID, R) end},
236 {{<<"mam_type">>, register},
237
:-(
fun(R) -> riakc_register:set(atom_to_binary(Type, latin1), R) end},
238 {{<<"packet">>, register},
239
:-(
fun(R) -> riakc_register:set(packet_to_stored_binary(Host, Packet), R) end},
240 {{<<"search_text">>, register},
241
:-(
fun(R) -> riakc_register:set(BodyValue, R) end}
242 ],
243
244
:-(
mongoose_riak:create_new_map(Ops).
245
246 lookup_messages(Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params)
247 when ID =/= undefined ->
248
:-(
lookup_message_page(Host, RSM, Params);
249 lookup_messages(Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params)
250 when ID =/= undefined ->
251
:-(
lookup_message_page(Host, RSM, Params);
252 lookup_messages(Host, Params) ->
253
:-(
do_lookup_messages(Host, Params).
254
255 lookup_message_page(Host, RSM, Params) ->
256
:-(
PageSize = maps:get(page_size, Params),
257
:-(
{ok, Result} = do_lookup_messages(Host, Params#{page_size := 1 + PageSize}),
258
:-(
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result).
259
260 do_lookup_messages(Host, Params) ->
261
:-(
OwnerJID = mod_mam_utils:bare_jid(maps:get(owner_jid, Params)),
262
:-(
RemoteJID = mod_mam_utils:bare_jid(maps:get(with_jid, Params)),
263
264
:-(
RSM = maps:get(rsm, Params),
265
266
:-(
SearchOpts2 = add_sorting(RSM, [{rows, maps:get(page_size, Params)}]),
267
:-(
SearchOpts = add_offset(RSM, SearchOpts2),
268
269
:-(
F = fun get_msg_id_key/3,
270
271
:-(
Borders = maps:get(borders, Params),
272
:-(
Start = maps:get(start_ts, Params),
273
:-(
End = maps:get(end_ts, Params),
274
:-(
SearchText = maps:get(search_text, Params),
275
:-(
{MsgIdStart, MsgIdEnd} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End),
276
:-(
{TotalCountFullQuery, Result} = read_archive(Host, OwnerJID, RemoteJID,
277 MsgIdStart, MsgIdEnd, SearchText,
278 SearchOpts, F),
279
280
:-(
SortedKeys = sort_messages(Result),
281
:-(
case maps:get(is_simple, Params) of
282 true ->
283
:-(
{ok, {undefined, undefined, get_messages(Host, SortedKeys)}};
284 _ ->
285
:-(
{MsgIdStartNoRSM, MsgIdEndNoRSM} =
286 mod_mam_utils:calculate_msg_id_borders(Borders, Start, End),
287
:-(
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID,
288 MsgIdStartNoRSM, MsgIdEndNoRSM, SearchText,
289 [{rows, 1}], F),
290
:-(
SLen = length(SortedKeys),
291
:-(
Args = {Host, OwnerJID, RemoteJID, MsgIdStartNoRSM, SearchText},
292
:-(
Offset = calculate_offset(RSM, TotalCountFullQuery, SLen, Args),
293
:-(
{ok, {TotalCount, Offset, get_messages(Host, SortedKeys)}}
294 end.
295
296
297 add_sorting(#rsm_in{direction = before}, Opts) ->
298
:-(
[{sort, <<"msg_id_register desc">>} | Opts];
299 add_sorting(_, Opts) ->
300
:-(
[{sort, <<"msg_id_register asc">>} | Opts].
301
302 add_offset(#rsm_in{index = Offset}, Opts) when is_integer(Offset) ->
303
:-(
[{start, Offset} | Opts];
304 add_offset(_, Opts) ->
305
:-(
Opts.
306
307 calculate_offset(#rsm_in{direction = before}, TotalCount, PageSize, _) ->
308
:-(
TotalCount - PageSize;
309 calculate_offset(#rsm_in{direction = aft, id = Id}, _, _,
310 {Host, Owner, Remote, MsgIdStart, SearchText})
311 when Id /= undefined ->
312
:-(
{Count, _} = read_archive(Host, Owner, Remote,
313 MsgIdStart, Id, SearchText,
314 [{rows, 1}], fun get_msg_id_key/3),
315
:-(
Count;
316 calculate_offset(#rsm_in{direction = undefined, index = Index}, _, _, _)
317 when is_integer(Index) ->
318
:-(
Index;
319 calculate_offset(_, _TotalCount, _PageSize, _) ->
320
:-(
0.
321
322 get_msg_id_key(Bucket, Key, Msgs) ->
323
:-(
[_, _, MsgId] = decode_key(Key),
324
:-(
Item = {binary_to_integer(MsgId), Bucket, Key},
325
:-(
[Item | Msgs].
326
327 get_messages(Host, BucketKeys) ->
328
:-(
lists:flatten([get_message2(Host, MsgId, Bucket, Key) || {MsgId, Bucket, Key} <- BucketKeys]).
329
330 get_message2(Host, MsgId, Bucket, Key) ->
331
:-(
case mongoose_riak:fetch_type(Bucket, Key) of
332 {ok, RiakMap} ->
333
:-(
SourceJID = riakc_map:fetch({<<"source_jid">>, register}, RiakMap),
334
:-(
PacketBin = riakc_map:fetch({<<"packet">>, register}, RiakMap),
335
:-(
Packet = stored_binary_to_packet(Host, PacketBin),
336
:-(
#{id => MsgId, jid => jid:from_binary(SourceJID), packet => Packet};
337 _ ->
338
:-(
[]
339 end.
340 -spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
341 mongooseim:host_type(), jid:jid()) ->
342 ejabberd_gen_mam_archive:mam_pm_gdpr_data().
343 get_mam_pm_gdpr_data(Acc, _HostType, OwnerJid) ->
344
:-(
Messages = get_mam_gdpr_data(OwnerJid, <<"pm">>),
345
:-(
[{Id, jid:to_binary(Jid), exml:to_binary(Packet)} || #{id := Id, jid := Jid, packet := Packet} <- Messages] ++ Acc.
346
347 -spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
348 mongooseim:host_type(), jid:jid()) ->
349 ejabberd_gen_mam_archive:mam_muc_gdpr_data().
350 get_mam_muc_gdpr_data(Acc, _HostType, JID) ->
351
:-(
Messages = get_mam_gdpr_data(JID, <<"muc">>),
352
:-(
[{MsgId, exml:to_binary(Packet)} || #{id := MsgId, packet := Packet} <- Messages] ++ Acc.
353
354 get_mam_gdpr_data(#jid{ lserver = LServer } = BareJid, Type) ->
355
:-(
BareLJidBin = jid:to_binary(jid:to_lower(BareJid)),
356
:-(
Query = <<"msg_owner_jid_register:", BareLJidBin/binary, " AND mam_type_register:", Type/binary>>,
357
:-(
SearchOpts = [],
358
:-(
{ok, _Cnt, _, MsgIds} = fold_archive(LServer, fun get_msg_id_key/3, Query, SearchOpts, []),
359
:-(
get_messages(LServer, MsgIds).
360
361 remove_archive(Acc, Host, _ArchiveID, ArchiveJID) ->
362
:-(
remove_archive(Host, ArchiveJID),
363
:-(
Acc.
364
365 remove_archive(Host, ArchiveJID) ->
366
:-(
{ok, TotalCount, _, _} = R = remove_chunk(Host, ArchiveJID, 0),
367
:-(
Result = do_remove_archive(100, R, Host, ArchiveJID),
368
:-(
case Result of
369 {stopped, N} ->
370
:-(
?LOG_WARNING(#{what => remove_archive_failed,
371 text => <<"archive removal stopped for jid after "
372 "processing {processed_count} items out of {total_count}">>,
373 archive_jid => ArchiveJID,
374
:-(
processed_count => N, total_count => TotalCount}),
375
:-(
ok;
376 {ok, _} ->
377
:-(
ok
378 end.
379
380 remove_chunk(Host, ArchiveJID, Acc) ->
381
:-(
KeyFiletrs = key_filters(mod_mam_utils:bare_jid(ArchiveJID)),
382
:-(
fold_archive(Host,
383 fun delete_key_fun/3,
384 KeyFiletrs,
385 [{rows, 50}, {sort, <<"msg_id_register asc">>}], Acc).
386
387 do_remove_archive(0, {ok, _, _, Acc}, _, _) ->
388
:-(
{stopped, Acc};
389 do_remove_archive(_, {ok, 0, _, Acc}, _, _) ->
390
:-(
{ok, Acc};
391 do_remove_archive(N, {ok, _TotalResults, _RowsIterated, Acc}, Host, ArchiveJID) ->
392
:-(
timer:sleep(1000), % give Riak some time to clear after just removed keys
393
:-(
R = remove_chunk(Host, ArchiveJID, Acc),
394
:-(
do_remove_archive(N-1, R, Host, ArchiveJID).
395
396 delete_key_fun(Bucket, Key, N) ->
397
:-(
ok = mongoose_riak:delete(Bucket, Key, [{dw, 2}]),
398
:-(
N + 1.
399
400
401 key(LocalJID, RemoteJID, MsgId) ->
402
:-(
<<LocalJID/binary, $/, RemoteJID/binary, $/, MsgId/binary>>.
403
404 decode_key(KeyBinary) ->
405
:-(
binary:split(KeyBinary, <<"/">>, [global]).
406
407 -spec read_archive(jid:server(),
408 binary() | undefined,
409 binary() | undefined,
410 term(),
411 term(),
412 binary() | undefined,
413 [term()],
414 fun()) ->
415 {integer(), list()} | {error, term()}.
416 read_archive(Host, OwnerJID, WithJID, Start, End, SearchText, SearchOpts, Fun) ->
417
:-(
KeyFilters = key_filters(OwnerJID, WithJID, Start, End, SearchText),
418
:-(
{ok, Cnt, _, NewAcc} = fold_archive(Host, Fun, KeyFilters, SearchOpts, []),
419
:-(
{Cnt, NewAcc}.
420
421
422 sort_messages(Msgs) ->
423
:-(
SortFun = fun({MsgId1, _, _}, {MsgId2, _, _}) ->
424
:-(
MsgId1 =< MsgId2
425 end,
426
:-(
lists:sort(SortFun, Msgs).
427
428 fold_archive(Host, Fun, Query, SearchOpts, InitialAcc) ->
429
:-(
Result = mongoose_riak:search(yz_search_index(Host), Query, SearchOpts),
430
:-(
case Result of
431 {ok, {search_results, [], _, Count}} ->
432
:-(
{ok, Count, 0, InitialAcc};
433 {ok, {search_results, Results, _Score, Count}} ->
434
:-(
{ok, Count, length(Results), do_fold_archive(Fun, Results, InitialAcc)};
435 {error, R} = Err ->
436
:-(
?LOG_WARNING(#{what => mam_read_error,
437
:-(
cql_query => Query, reason => R}),
438
:-(
Err
439 end.
440
441 do_fold_archive(Fun, BucketKeys, InitialAcc) ->
442
:-(
lists:foldl(fun({_Index, Props}, Acc) ->
443
:-(
{_, Bucket} = lists:keyfind(<<"_yz_rb">>, 1, Props),
444
:-(
{_, Type} = lists:keyfind(<<"_yz_rt">>, 1, Props),
445
:-(
{_, Key} = lists:keyfind(<<"_yz_rk">>, 1, Props),
446
:-(
Fun({Type, Bucket}, Key, Acc)
447 end, InitialAcc, BucketKeys).
448
449 %% Filter API
450 key_filters(LocalJid) ->
451
:-(
key_filters(LocalJid, undefined, undefined, undefined, undefined).
452
453 key_filters(LocalJid, RemoteJid, Start, End, SearchText) ->
454
:-(
JidFilter = jid_filters(LocalJid, RemoteJid),
455
:-(
IdFilter = id_filters(Start, End),
456
:-(
TextFilter = search_text_filter(SearchText),
457
458
:-(
Separator = <<" AND ">>,
459
:-(
Filters0 = [JidFilter, IdFilter, TextFilter],
460
:-(
Filters1 = [[Filter, Separator] || Filter <- Filters0, is_binary(Filter)],
461
:-(
FiltersBin = list_to_binary(Filters1),
462
:-(
binary:part(FiltersBin, 0, byte_size(FiltersBin) - byte_size(Separator)).
463
464 %% Filter helpers
465 -spec search_text_filter(binary() | undefined) -> binary().
466 search_text_filter(undefined) ->
467
:-(
undefined;
468 search_text_filter(SearchText) ->
469
:-(
Separator = <<"~1 AND search_text_register:">>,
470
:-(
NormText = mod_mam_utils:normalize_search_text(SearchText, Separator),
471 %% Fuzzy search on tokens from search phrase
472
:-(
<<"search_text_register:", NormText/binary, "~1">>.
473
474 jid_filters(LocalJid, undefined) ->
475
:-(
<<"_yz_rk:", LocalJid/binary, "/*/*">>;
476 jid_filters(LocalJid, RemoteJid) ->
477
:-(
<<"_yz_rk:", LocalJid/binary, "/", RemoteJid/binary, "/*">>.
478
479 id_filters(undefined, undefined) ->
480
:-(
undefined;
481 id_filters(MsgId, MsgId) ->
482
:-(
MsgIdBin = integer_to_binary(MsgId),
483
:-(
<<"msg_id_register:", MsgIdBin/binary>>;
484 id_filters(StartInt, undefined) ->
485
:-(
solr_id_filters(integer_to_binary(StartInt), <<"*">>);
486 id_filters(undefined, EndInt) ->
487
:-(
solr_id_filters(<<"*">>, integer_to_binary(EndInt));
488 id_filters(StartInt, EndInt) ->
489
:-(
solr_id_filters(integer_to_binary(StartInt), integer_to_binary(EndInt)).
490
491 solr_id_filters(Start, End) ->
492
:-(
<<"msg_id_register:[", Start/binary, " TO ", End/binary, " ]">>.
493
494 %% ----------------------------------------------------------------------
495 %% Optimizations
496
497 packet_to_stored_binary(Host, Packet) ->
498
:-(
Module = db_message_codec(Host),
499
:-(
mam_message:encode(Module, Packet).
500
501 stored_binary_to_packet(Host, Bin) ->
502
:-(
Module = db_message_codec(Host),
503
:-(
mam_message:decode(Module, Bin).
504
505 -spec db_message_codec(Host :: jid:server()) -> module().
506 db_message_codec(Host) ->
507
:-(
gen_mod:get_module_opt(Host, ?MODULE, db_message_format).
Line Hits Source