1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2015 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mod_mam_riak_timed_arch_yz). |
17 |
|
|
18 |
|
-behaviour(ejabberd_gen_mam_archive). |
19 |
|
-behaviour(gen_mod). |
20 |
|
-behaviour(mongoose_module_metrics). |
21 |
|
|
22 |
|
-callback encode(term()) -> binary(). |
23 |
|
-callback decode(binary()) -> term(). |
24 |
|
|
25 |
|
|
26 |
|
-include("mongoose.hrl"). |
27 |
|
-include("jlib.hrl"). |
28 |
|
-include("mongoose_rsm.hrl"). |
29 |
|
|
30 |
|
%% API |
31 |
|
-export([start/2, |
32 |
|
stop/1, |
33 |
|
archive_size/4, |
34 |
|
lookup_messages/2, |
35 |
|
remove_archive/4]). |
36 |
|
|
37 |
|
-export([archive_message/3, |
38 |
|
archive_message_muc/3, |
39 |
|
lookup_messages/3, |
40 |
|
lookup_messages_muc/3]). |
41 |
|
|
42 |
|
-export([key/3]). |
43 |
|
|
44 |
|
%% For tests only |
45 |
|
-export([create_obj/6, read_archive/8, bucket/2, |
46 |
|
list_mam_buckets/1, remove_bucket/1]). |
47 |
|
|
48 |
|
-export([get_mam_muc_gdpr_data/3, get_mam_pm_gdpr_data/3]). |
49 |
|
|
50 |
|
-ignore_xref([archive_message_muc/3, behaviour_info/1, bucket/2, create_obj/6, |
51 |
|
key/3, list_mam_buckets/1, lookup_messages/2, lookup_messages_muc/3, |
52 |
|
read_archive/8, remove_archive/4, remove_bucket/1]). |
53 |
|
|
54 |
|
-type yearweeknum() :: {non_neg_integer(), 1..53}. |
55 |
|
|
56 |
|
%% @doc Start module |
57 |
|
%% |
58 |
|
%% Options: |
59 |
|
%% - `pm' option starts one-to-one chat archives |
60 |
|
%% - `muc' option starts multichat archives |
61 |
|
%% |
62 |
|
%% Use both options `pm, muc' to archive both MUC and private messages |
63 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
64 |
|
start(HostType, Opts) -> |
65 |
:-( |
ejabberd_hooks:add(hooks(HostType, Opts)). |
66 |
|
|
67 |
|
-spec stop(mongooseim:host_type()) -> ok. |
68 |
|
stop(HostType) -> |
69 |
:-( |
Opts = gen_mod:get_loaded_module_opts(HostType, ?MODULE), |
70 |
:-( |
ejabberd_hooks:delete(hooks(HostType, Opts)). |
71 |
|
|
72 |
|
hooks(HostType, Opts) -> |
73 |
:-( |
lists:flatmap(fun(Type) -> hooks(HostType, Type, Opts) end, [pm, muc]). |
74 |
|
|
75 |
|
hooks(HostType, pm, #{pm := true}) -> |
76 |
:-( |
[{mam_archive_message, HostType, ?MODULE, archive_message, 50}, |
77 |
|
{mam_archive_size, HostType, ?MODULE, archive_size, 50}, |
78 |
|
{mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50}, |
79 |
|
{mam_remove_archive, HostType, ?MODULE, remove_archive, 50}, |
80 |
|
{get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50}]; |
81 |
|
hooks(HostType, muc, #{muc := true}) -> |
82 |
:-( |
[{mam_muc_archive_message, HostType, ?MODULE, archive_message_muc, 50}, |
83 |
|
{mam_muc_archive_size, HostType, ?MODULE, archive_size, 50}, |
84 |
|
{mam_muc_lookup_messages, HostType, ?MODULE, lookup_messages_muc, 50}, |
85 |
|
{mam_muc_remove_archive, HostType, ?MODULE, remove_archive, 50}, |
86 |
|
{get_mam_muc_gdpr_data, HostType, ?MODULE, get_mam_muc_gdpr_data, 50}]; |
87 |
|
hooks(_HostType, _Opt, _Opts) -> |
88 |
:-( |
[]. |
89 |
|
|
90 |
|
yz_search_index(Host) -> |
91 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, [riak, search_index]). |
92 |
|
|
93 |
|
mam_bucket_type(Host) -> |
94 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, [riak, bucket_type]). |
95 |
|
|
96 |
|
%% LocJID - archive owner's JID |
97 |
|
%% RemJID - interlocutor's JID |
98 |
|
%% SrcJID - "Real" sender JID |
99 |
|
archive_message(_Result, Host, #{message_id := MessId, |
100 |
|
local_jid := LocJID, |
101 |
|
remote_jid := RemJID, |
102 |
|
source_jid := SrcJID, |
103 |
|
packet := Packet} = Params) -> |
104 |
:-( |
try |
105 |
:-( |
archive_message(Host, MessId, LocJID, RemJID, SrcJID, LocJID, Packet, pm) |
106 |
|
catch Class:Reason:StackTrace -> |
107 |
:-( |
?LOG_WARNING(maps:merge(Params, |
108 |
|
#{what => archive_message_failed, |
109 |
|
text => <<"Could not write message to archive">>, |
110 |
:-( |
class => Class, reason => Reason, stacktrace => StackTrace})), |
111 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
112 |
:-( |
{error, Reason} |
113 |
|
end. |
114 |
|
|
115 |
|
%% LocJID - MUC/MUC Light room's JID |
116 |
|
%% FromJID - "Real" sender JID |
117 |
|
%% SrcJID - Full JID of user within room (room@domain/user) |
118 |
|
archive_message_muc(_Result, Host, #{message_id := MessId, |
119 |
|
local_jid := LocJID, |
120 |
|
remote_jid := FromJID, |
121 |
|
source_jid := SrcJID, |
122 |
|
packet := Packet} = Params) -> |
123 |
:-( |
RemJIDMuc = maybe_muc_jid(SrcJID), |
124 |
:-( |
try |
125 |
:-( |
archive_message(Host, MessId, LocJID, RemJIDMuc, SrcJID, FromJID, Packet, muc) |
126 |
|
catch Class:Reason:StackTrace -> |
127 |
:-( |
?LOG_WARNING(maps:merge(Params, |
128 |
|
#{what => archive_muc_message_failed, |
129 |
|
text => <<"Could not write MUC message to archive">>, |
130 |
:-( |
class => Class, reason => Reason, stacktrace => StackTrace})), |
131 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
132 |
:-( |
{error, Reason} |
133 |
|
end. |
134 |
|
|
135 |
|
maybe_muc_jid(#jid{lresource = RemRes}) -> |
136 |
:-( |
{<<>>, RemRes, <<>>}; |
137 |
|
maybe_muc_jid(Other) -> |
138 |
:-( |
Other. |
139 |
|
|
140 |
|
|
141 |
|
lookup_messages({error, _Reason} = Result, _Host, _Params) -> |
142 |
:-( |
Result; |
143 |
|
lookup_messages(_Result, Host, Params) -> |
144 |
:-( |
try |
145 |
:-( |
lookup_messages(Host, Params) |
146 |
|
catch _Type:Reason:S -> |
147 |
:-( |
{error, {Reason, {stacktrace, S}}} |
148 |
|
end. |
149 |
|
|
150 |
|
|
151 |
|
lookup_messages_muc(Result, Host, #{with_jid := WithJID} = Params) -> |
152 |
:-( |
WithJIDMuc = maybe_muc_jid(WithJID), |
153 |
:-( |
lookup_messages(Result, Host, Params#{with_jid => WithJIDMuc}). |
154 |
|
|
155 |
|
|
156 |
|
archive_size(_Size, Host, _ArchiveID, ArchiveJID) -> |
157 |
:-( |
OwnerJID = mod_mam_utils:bare_jid(ArchiveJID), |
158 |
:-( |
RemoteJID = undefined, |
159 |
:-( |
{MsgIdStartNoRSM, MsgIdEndNoRSM} = |
160 |
|
mod_mam_utils:calculate_msg_id_borders(undefined, undefined, undefined), |
161 |
:-( |
F = fun get_msg_id_key/3, |
162 |
:-( |
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID, |
163 |
|
MsgIdStartNoRSM, MsgIdEndNoRSM, undefined, |
164 |
|
[{rows, 1}], F), |
165 |
:-( |
TotalCount. |
166 |
|
|
167 |
|
%% use correct bucket for given date |
168 |
|
|
169 |
|
-spec bucket(jid:server(), calendar:date() | yearweeknum() | integer()) -> |
170 |
|
{binary(), binary()} | undefined. |
171 |
|
bucket(Host, MsgId) when is_integer(MsgId) -> |
172 |
:-( |
{MicroSec, _} = mod_mam_utils:decode_compact_uuid(MsgId), |
173 |
:-( |
{MsgDate, _} = calendar:system_time_to_universal_time(MicroSec, microsecond), |
174 |
:-( |
bucket(Host, MsgDate); |
175 |
|
bucket(Host, {_, _, _} = Date) -> |
176 |
:-( |
bucket(Host, calendar:iso_week_number(Date)); |
177 |
|
bucket(Host, {Year, Week}) -> |
178 |
:-( |
YearBin = integer_to_binary(Year), |
179 |
:-( |
WeekNumBin = integer_to_binary(Week), |
180 |
:-( |
{mam_bucket_type(Host), <<"mam_", YearBin/binary, "_", WeekNumBin/binary>>}; |
181 |
|
bucket(_Host, _) -> |
182 |
:-( |
undefined. |
183 |
|
|
184 |
|
list_mam_buckets(Host) -> |
185 |
:-( |
Type = mam_bucket_type(Host), |
186 |
:-( |
{ok, Buckets} = riakc_pb_socket:list_buckets(mongoose_riak:get_worker(), Type), |
187 |
:-( |
[{Type, Bucket} || Bucket <- Buckets]. |
188 |
|
|
189 |
|
|
190 |
|
remove_bucket(Bucket) -> |
191 |
:-( |
{ok, Keys} = mongoose_riak:list_keys(Bucket), |
192 |
:-( |
[mongoose_riak:delete(Bucket, Key) || Key <- Keys]. |
193 |
|
|
194 |
|
|
195 |
|
%% PM: |
196 |
|
%% * LocJID - archive owner's JID |
197 |
|
%% * RemJID - interlocutor's JID |
198 |
|
%% * SrcJID - "Real" sender JID |
199 |
|
%% * OwnerJID - Same as LocJID |
200 |
|
%% MUC / MUC Light: |
201 |
|
%% * LocJID - MUC/MUC Light room's JID |
202 |
|
%% * RemJID - Nickname of JID of destination |
203 |
|
%% * SrcJID - Full JID of user within room (room@domain/user) |
204 |
|
%% * OwnerJID - "Real" sender JID (not room specific) |
205 |
|
archive_message(Host, MessID, LocJID, RemJID, SrcJID, OwnerJID, Packet, Type) -> |
206 |
:-( |
LocalJID = mod_mam_utils:bare_jid(LocJID), |
207 |
:-( |
RemoteJID = mod_mam_utils:bare_jid(RemJID), |
208 |
:-( |
SourceJID = mod_mam_utils:full_jid(SrcJID), |
209 |
:-( |
BareOwnerJID = mod_mam_utils:bare_jid(OwnerJID), |
210 |
:-( |
MsgId = integer_to_binary(MessID), |
211 |
:-( |
Key = key(LocalJID, RemoteJID, MsgId), |
212 |
|
|
213 |
:-( |
Bucket = bucket(Host, MessID), |
214 |
|
|
215 |
:-( |
RiakMap = create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type), |
216 |
:-( |
case mongoose_riak:update_type(Bucket, Key, riakc_map:to_op(RiakMap)) of |
217 |
:-( |
ok -> ok; |
218 |
:-( |
Other -> throw(Other) |
219 |
|
end. |
220 |
|
|
221 |
|
create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type) -> |
222 |
:-( |
ModMAM = |
223 |
|
case Type of |
224 |
:-( |
pm -> mod_mam; |
225 |
:-( |
muc -> mod_mam_muc |
226 |
|
end, |
227 |
:-( |
BodyChars = mod_mam_utils:packet_to_search_body(ModMAM, Host, Packet), |
228 |
:-( |
BodyValue = unicode:characters_to_binary(BodyChars), |
229 |
:-( |
Ops = [ |
230 |
|
{{<<"msg_id">>, register}, |
231 |
:-( |
fun(R) -> riakc_register:set(MsgId, R) end}, |
232 |
|
{{<<"source_jid">>, register}, |
233 |
:-( |
fun(R) -> riakc_register:set(SourceJID, R) end}, |
234 |
|
{{<<"msg_owner_jid">>, register}, |
235 |
:-( |
fun(R) -> riakc_register:set(BareOwnerJID, R) end}, |
236 |
|
{{<<"mam_type">>, register}, |
237 |
:-( |
fun(R) -> riakc_register:set(atom_to_binary(Type, latin1), R) end}, |
238 |
|
{{<<"packet">>, register}, |
239 |
:-( |
fun(R) -> riakc_register:set(packet_to_stored_binary(Host, Packet), R) end}, |
240 |
|
{{<<"search_text">>, register}, |
241 |
:-( |
fun(R) -> riakc_register:set(BodyValue, R) end} |
242 |
|
], |
243 |
|
|
244 |
:-( |
mongoose_riak:create_new_map(Ops). |
245 |
|
|
246 |
|
lookup_messages(Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params) |
247 |
|
when ID =/= undefined -> |
248 |
:-( |
lookup_message_page(Host, RSM, Params); |
249 |
|
lookup_messages(Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params) |
250 |
|
when ID =/= undefined -> |
251 |
:-( |
lookup_message_page(Host, RSM, Params); |
252 |
|
lookup_messages(Host, Params) -> |
253 |
:-( |
do_lookup_messages(Host, Params). |
254 |
|
|
255 |
|
lookup_message_page(Host, RSM, Params) -> |
256 |
:-( |
PageSize = maps:get(page_size, Params), |
257 |
:-( |
{ok, Result} = do_lookup_messages(Host, Params#{page_size := 1 + PageSize}), |
258 |
:-( |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result). |
259 |
|
|
260 |
|
do_lookup_messages(Host, Params) -> |
261 |
:-( |
OwnerJID = mod_mam_utils:bare_jid(maps:get(owner_jid, Params)), |
262 |
:-( |
RemoteJID = mod_mam_utils:bare_jid(maps:get(with_jid, Params)), |
263 |
|
|
264 |
:-( |
RSM = maps:get(rsm, Params), |
265 |
|
|
266 |
:-( |
SearchOpts2 = add_sorting(RSM, [{rows, maps:get(page_size, Params)}]), |
267 |
:-( |
SearchOpts = add_offset(RSM, SearchOpts2), |
268 |
|
|
269 |
:-( |
F = fun get_msg_id_key/3, |
270 |
|
|
271 |
:-( |
Borders = maps:get(borders, Params), |
272 |
:-( |
Start = maps:get(start_ts, Params), |
273 |
:-( |
End = maps:get(end_ts, Params), |
274 |
:-( |
SearchText = maps:get(search_text, Params), |
275 |
:-( |
{MsgIdStart, MsgIdEnd} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End), |
276 |
:-( |
{TotalCountFullQuery, Result} = read_archive(Host, OwnerJID, RemoteJID, |
277 |
|
MsgIdStart, MsgIdEnd, SearchText, |
278 |
|
SearchOpts, F), |
279 |
|
|
280 |
:-( |
SortedKeys = sort_messages(Result), |
281 |
:-( |
case maps:get(is_simple, Params) of |
282 |
|
true -> |
283 |
:-( |
{ok, {undefined, undefined, get_messages(Host, SortedKeys)}}; |
284 |
|
_ -> |
285 |
:-( |
{MsgIdStartNoRSM, MsgIdEndNoRSM} = |
286 |
|
mod_mam_utils:calculate_msg_id_borders(Borders, Start, End), |
287 |
:-( |
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID, |
288 |
|
MsgIdStartNoRSM, MsgIdEndNoRSM, SearchText, |
289 |
|
[{rows, 1}], F), |
290 |
:-( |
SLen = length(SortedKeys), |
291 |
:-( |
Args = {Host, OwnerJID, RemoteJID, MsgIdStartNoRSM, SearchText}, |
292 |
:-( |
Offset = calculate_offset(RSM, TotalCountFullQuery, SLen, Args), |
293 |
:-( |
{ok, {TotalCount, Offset, get_messages(Host, SortedKeys)}} |
294 |
|
end. |
295 |
|
|
296 |
|
|
297 |
|
add_sorting(#rsm_in{direction = before}, Opts) -> |
298 |
:-( |
[{sort, <<"msg_id_register desc">>} | Opts]; |
299 |
|
add_sorting(_, Opts) -> |
300 |
:-( |
[{sort, <<"msg_id_register asc">>} | Opts]. |
301 |
|
|
302 |
|
add_offset(#rsm_in{index = Offset}, Opts) when is_integer(Offset) -> |
303 |
:-( |
[{start, Offset} | Opts]; |
304 |
|
add_offset(_, Opts) -> |
305 |
:-( |
Opts. |
306 |
|
|
307 |
|
calculate_offset(#rsm_in{direction = before}, TotalCount, PageSize, _) -> |
308 |
:-( |
TotalCount - PageSize; |
309 |
|
calculate_offset(#rsm_in{direction = aft, id = Id}, _, _, |
310 |
|
{Host, Owner, Remote, MsgIdStart, SearchText}) |
311 |
|
when Id /= undefined -> |
312 |
:-( |
{Count, _} = read_archive(Host, Owner, Remote, |
313 |
|
MsgIdStart, Id, SearchText, |
314 |
|
[{rows, 1}], fun get_msg_id_key/3), |
315 |
:-( |
Count; |
316 |
|
calculate_offset(#rsm_in{direction = undefined, index = Index}, _, _, _) |
317 |
|
when is_integer(Index) -> |
318 |
:-( |
Index; |
319 |
|
calculate_offset(_, _TotalCount, _PageSize, _) -> |
320 |
:-( |
0. |
321 |
|
|
322 |
|
get_msg_id_key(Bucket, Key, Msgs) -> |
323 |
:-( |
[_, _, MsgId] = decode_key(Key), |
324 |
:-( |
Item = {binary_to_integer(MsgId), Bucket, Key}, |
325 |
:-( |
[Item | Msgs]. |
326 |
|
|
327 |
|
get_messages(Host, BucketKeys) -> |
328 |
:-( |
lists:flatten([get_message2(Host, MsgId, Bucket, Key) || {MsgId, Bucket, Key} <- BucketKeys]). |
329 |
|
|
330 |
|
get_message2(Host, MsgId, Bucket, Key) -> |
331 |
:-( |
case mongoose_riak:fetch_type(Bucket, Key) of |
332 |
|
{ok, RiakMap} -> |
333 |
:-( |
SourceJID = riakc_map:fetch({<<"source_jid">>, register}, RiakMap), |
334 |
:-( |
PacketBin = riakc_map:fetch({<<"packet">>, register}, RiakMap), |
335 |
:-( |
Packet = stored_binary_to_packet(Host, PacketBin), |
336 |
:-( |
#{id => MsgId, jid => jid:from_binary(SourceJID), packet => Packet}; |
337 |
|
_ -> |
338 |
:-( |
[] |
339 |
|
end. |
340 |
|
-spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(), |
341 |
|
mongooseim:host_type(), jid:jid()) -> |
342 |
|
ejabberd_gen_mam_archive:mam_pm_gdpr_data(). |
343 |
|
get_mam_pm_gdpr_data(Acc, _HostType, OwnerJid) -> |
344 |
:-( |
Messages = get_mam_gdpr_data(OwnerJid, <<"pm">>), |
345 |
:-( |
[{Id, jid:to_binary(Jid), exml:to_binary(Packet)} || #{id := Id, jid := Jid, packet := Packet} <- Messages] ++ Acc. |
346 |
|
|
347 |
|
-spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(), |
348 |
|
mongooseim:host_type(), jid:jid()) -> |
349 |
|
ejabberd_gen_mam_archive:mam_muc_gdpr_data(). |
350 |
|
get_mam_muc_gdpr_data(Acc, _HostType, JID) -> |
351 |
:-( |
Messages = get_mam_gdpr_data(JID, <<"muc">>), |
352 |
:-( |
[{MsgId, exml:to_binary(Packet)} || #{id := MsgId, packet := Packet} <- Messages] ++ Acc. |
353 |
|
|
354 |
|
get_mam_gdpr_data(#jid{ lserver = LServer } = BareJid, Type) -> |
355 |
:-( |
BareLJidBin = jid:to_binary(jid:to_lower(BareJid)), |
356 |
:-( |
Query = <<"msg_owner_jid_register:", BareLJidBin/binary, " AND mam_type_register:", Type/binary>>, |
357 |
:-( |
SearchOpts = [], |
358 |
:-( |
{ok, _Cnt, _, MsgIds} = fold_archive(LServer, fun get_msg_id_key/3, Query, SearchOpts, []), |
359 |
:-( |
get_messages(LServer, MsgIds). |
360 |
|
|
361 |
|
remove_archive(Acc, Host, _ArchiveID, ArchiveJID) -> |
362 |
:-( |
remove_archive(Host, ArchiveJID), |
363 |
:-( |
Acc. |
364 |
|
|
365 |
|
remove_archive(Host, ArchiveJID) -> |
366 |
:-( |
{ok, TotalCount, _, _} = R = remove_chunk(Host, ArchiveJID, 0), |
367 |
:-( |
Result = do_remove_archive(100, R, Host, ArchiveJID), |
368 |
:-( |
case Result of |
369 |
|
{stopped, N} -> |
370 |
:-( |
?LOG_WARNING(#{what => remove_archive_failed, |
371 |
|
text => <<"archive removal stopped for jid after " |
372 |
|
"processing {processed_count} items out of {total_count}">>, |
373 |
|
archive_jid => ArchiveJID, |
374 |
:-( |
processed_count => N, total_count => TotalCount}), |
375 |
:-( |
ok; |
376 |
|
{ok, _} -> |
377 |
:-( |
ok |
378 |
|
end. |
379 |
|
|
380 |
|
remove_chunk(Host, ArchiveJID, Acc) -> |
381 |
:-( |
KeyFiletrs = key_filters(mod_mam_utils:bare_jid(ArchiveJID)), |
382 |
:-( |
fold_archive(Host, |
383 |
|
fun delete_key_fun/3, |
384 |
|
KeyFiletrs, |
385 |
|
[{rows, 50}, {sort, <<"msg_id_register asc">>}], Acc). |
386 |
|
|
387 |
|
do_remove_archive(0, {ok, _, _, Acc}, _, _) -> |
388 |
:-( |
{stopped, Acc}; |
389 |
|
do_remove_archive(_, {ok, 0, _, Acc}, _, _) -> |
390 |
:-( |
{ok, Acc}; |
391 |
|
do_remove_archive(N, {ok, _TotalResults, _RowsIterated, Acc}, Host, ArchiveJID) -> |
392 |
:-( |
timer:sleep(1000), % give Riak some time to clear after just removed keys |
393 |
:-( |
R = remove_chunk(Host, ArchiveJID, Acc), |
394 |
:-( |
do_remove_archive(N-1, R, Host, ArchiveJID). |
395 |
|
|
396 |
|
delete_key_fun(Bucket, Key, N) -> |
397 |
:-( |
ok = mongoose_riak:delete(Bucket, Key, [{dw, 2}]), |
398 |
:-( |
N + 1. |
399 |
|
|
400 |
|
|
401 |
|
key(LocalJID, RemoteJID, MsgId) -> |
402 |
:-( |
<<LocalJID/binary, $/, RemoteJID/binary, $/, MsgId/binary>>. |
403 |
|
|
404 |
|
decode_key(KeyBinary) -> |
405 |
:-( |
binary:split(KeyBinary, <<"/">>, [global]). |
406 |
|
|
407 |
|
-spec read_archive(jid:server(), |
408 |
|
binary() | undefined, |
409 |
|
binary() | undefined, |
410 |
|
term(), |
411 |
|
term(), |
412 |
|
binary() | undefined, |
413 |
|
[term()], |
414 |
|
fun()) -> |
415 |
|
{integer(), list()} | {error, term()}. |
416 |
|
read_archive(Host, OwnerJID, WithJID, Start, End, SearchText, SearchOpts, Fun) -> |
417 |
:-( |
KeyFilters = key_filters(OwnerJID, WithJID, Start, End, SearchText), |
418 |
:-( |
{ok, Cnt, _, NewAcc} = fold_archive(Host, Fun, KeyFilters, SearchOpts, []), |
419 |
:-( |
{Cnt, NewAcc}. |
420 |
|
|
421 |
|
|
422 |
|
sort_messages(Msgs) -> |
423 |
:-( |
SortFun = fun({MsgId1, _, _}, {MsgId2, _, _}) -> |
424 |
:-( |
MsgId1 =< MsgId2 |
425 |
|
end, |
426 |
:-( |
lists:sort(SortFun, Msgs). |
427 |
|
|
428 |
|
fold_archive(Host, Fun, Query, SearchOpts, InitialAcc) -> |
429 |
:-( |
Result = mongoose_riak:search(yz_search_index(Host), Query, SearchOpts), |
430 |
:-( |
case Result of |
431 |
|
{ok, {search_results, [], _, Count}} -> |
432 |
:-( |
{ok, Count, 0, InitialAcc}; |
433 |
|
{ok, {search_results, Results, _Score, Count}} -> |
434 |
:-( |
{ok, Count, length(Results), do_fold_archive(Fun, Results, InitialAcc)}; |
435 |
|
{error, R} = Err -> |
436 |
:-( |
?LOG_WARNING(#{what => mam_read_error, |
437 |
:-( |
cql_query => Query, reason => R}), |
438 |
:-( |
Err |
439 |
|
end. |
440 |
|
|
441 |
|
do_fold_archive(Fun, BucketKeys, InitialAcc) -> |
442 |
:-( |
lists:foldl(fun({_Index, Props}, Acc) -> |
443 |
:-( |
{_, Bucket} = lists:keyfind(<<"_yz_rb">>, 1, Props), |
444 |
:-( |
{_, Type} = lists:keyfind(<<"_yz_rt">>, 1, Props), |
445 |
:-( |
{_, Key} = lists:keyfind(<<"_yz_rk">>, 1, Props), |
446 |
:-( |
Fun({Type, Bucket}, Key, Acc) |
447 |
|
end, InitialAcc, BucketKeys). |
448 |
|
|
449 |
|
%% Filter API |
450 |
|
key_filters(LocalJid) -> |
451 |
:-( |
key_filters(LocalJid, undefined, undefined, undefined, undefined). |
452 |
|
|
453 |
|
key_filters(LocalJid, RemoteJid, Start, End, SearchText) -> |
454 |
:-( |
JidFilter = jid_filters(LocalJid, RemoteJid), |
455 |
:-( |
IdFilter = id_filters(Start, End), |
456 |
:-( |
TextFilter = search_text_filter(SearchText), |
457 |
|
|
458 |
:-( |
Separator = <<" AND ">>, |
459 |
:-( |
Filters0 = [JidFilter, IdFilter, TextFilter], |
460 |
:-( |
Filters1 = [[Filter, Separator] || Filter <- Filters0, is_binary(Filter)], |
461 |
:-( |
FiltersBin = list_to_binary(Filters1), |
462 |
:-( |
binary:part(FiltersBin, 0, byte_size(FiltersBin) - byte_size(Separator)). |
463 |
|
|
464 |
|
%% Filter helpers |
465 |
|
-spec search_text_filter(binary() | undefined) -> binary(). |
466 |
|
search_text_filter(undefined) -> |
467 |
:-( |
undefined; |
468 |
|
search_text_filter(SearchText) -> |
469 |
:-( |
Separator = <<"~1 AND search_text_register:">>, |
470 |
:-( |
NormText = mod_mam_utils:normalize_search_text(SearchText, Separator), |
471 |
|
%% Fuzzy search on tokens from search phrase |
472 |
:-( |
<<"search_text_register:", NormText/binary, "~1">>. |
473 |
|
|
474 |
|
jid_filters(LocalJid, undefined) -> |
475 |
:-( |
<<"_yz_rk:", LocalJid/binary, "/*/*">>; |
476 |
|
jid_filters(LocalJid, RemoteJid) -> |
477 |
:-( |
<<"_yz_rk:", LocalJid/binary, "/", RemoteJid/binary, "/*">>. |
478 |
|
|
479 |
|
id_filters(undefined, undefined) -> |
480 |
:-( |
undefined; |
481 |
|
id_filters(MsgId, MsgId) -> |
482 |
:-( |
MsgIdBin = integer_to_binary(MsgId), |
483 |
:-( |
<<"msg_id_register:", MsgIdBin/binary>>; |
484 |
|
id_filters(StartInt, undefined) -> |
485 |
:-( |
solr_id_filters(integer_to_binary(StartInt), <<"*">>); |
486 |
|
id_filters(undefined, EndInt) -> |
487 |
:-( |
solr_id_filters(<<"*">>, integer_to_binary(EndInt)); |
488 |
|
id_filters(StartInt, EndInt) -> |
489 |
:-( |
solr_id_filters(integer_to_binary(StartInt), integer_to_binary(EndInt)). |
490 |
|
|
491 |
|
solr_id_filters(Start, End) -> |
492 |
:-( |
<<"msg_id_register:[", Start/binary, " TO ", End/binary, " ]">>. |
493 |
|
|
494 |
|
%% ---------------------------------------------------------------------- |
495 |
|
%% Optimizations |
496 |
|
|
497 |
|
packet_to_stored_binary(Host, Packet) -> |
498 |
:-( |
Module = db_message_codec(Host), |
499 |
:-( |
mam_message:encode(Module, Packet). |
500 |
|
|
501 |
|
stored_binary_to_packet(Host, Bin) -> |
502 |
:-( |
Module = db_message_codec(Host), |
503 |
:-( |
mam_message:decode(Module, Bin). |
504 |
|
|
505 |
|
-spec db_message_codec(Host :: jid:server()) -> module(). |
506 |
|
db_message_codec(Host) -> |
507 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, db_message_format). |