1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2015 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mod_mam_riak_timed_arch_yz). |
17 |
|
|
18 |
|
-behaviour(ejabberd_gen_mam_archive). |
19 |
|
-behaviour(gen_mod). |
20 |
|
-behaviour(mongoose_module_metrics). |
21 |
|
|
22 |
|
-callback encode(term()) -> binary(). |
23 |
|
-callback decode(binary()) -> term(). |
24 |
|
|
25 |
|
|
26 |
|
-include("mongoose.hrl"). |
27 |
|
-include("jlib.hrl"). |
28 |
|
-include("mongoose_rsm.hrl"). |
29 |
|
|
30 |
|
%% API |
31 |
|
-export([start/2, |
32 |
|
stop/1, |
33 |
|
archive_size/4, |
34 |
|
lookup_messages/2, |
35 |
|
remove_archive/4]). |
36 |
|
|
37 |
|
-export([archive_message/3, |
38 |
|
archive_message_muc/3, |
39 |
|
lookup_messages/3, |
40 |
|
lookup_messages_muc/3]). |
41 |
|
|
42 |
|
-export([key/3]). |
43 |
|
|
44 |
|
%% For tests only |
45 |
|
-export([create_obj/6, read_archive/8, bucket/2, |
46 |
|
list_mam_buckets/1, remove_bucket/1]). |
47 |
|
|
48 |
|
-export([get_mam_muc_gdpr_data/3, get_mam_pm_gdpr_data/3]). |
49 |
|
|
50 |
|
-ignore_xref([archive_message_muc/3, behaviour_info/1, bucket/2, create_obj/6, |
51 |
|
key/3, list_mam_buckets/1, lookup_messages/2, lookup_messages_muc/3, |
52 |
|
read_archive/8, remove_archive/4, remove_bucket/1]). |
53 |
|
|
54 |
|
-type yearweeknum() :: {non_neg_integer(), 1..53}. |
55 |
|
|
56 |
|
%% @doc Start module |
57 |
|
%% |
58 |
|
%% Options: |
59 |
|
%% - `pm' option starts one-to-one chat archives |
60 |
|
%% - `muc' option starts multichat archives |
61 |
|
%% |
62 |
|
%% Use both options `pm, muc' to archive both MUC and private messages |
63 |
|
start(Host, Opts) -> |
64 |
:-( |
case gen_mod:get_module_opt(Host, ?MODULE, pm, false) of |
65 |
|
true -> |
66 |
:-( |
start_chat_archive(Host, Opts); |
67 |
|
false -> |
68 |
:-( |
ok |
69 |
|
end, |
70 |
:-( |
case gen_mod:get_module_opt(Host, ?MODULE, muc, false) of |
71 |
|
true -> |
72 |
:-( |
start_muc_archive(Host, Opts); |
73 |
|
false -> |
74 |
:-( |
ok |
75 |
|
end. |
76 |
|
|
77 |
|
start_chat_archive(Host, _Opts) -> |
78 |
:-( |
ejabberd_hooks:add(mam_archive_message, Host, ?MODULE, archive_message, 50), |
79 |
:-( |
ejabberd_hooks:add(mam_archive_size, Host, ?MODULE, archive_size, 50), |
80 |
:-( |
ejabberd_hooks:add(mam_lookup_messages, Host, ?MODULE, lookup_messages, 50), |
81 |
:-( |
ejabberd_hooks:add(mam_remove_archive, Host, ?MODULE, remove_archive, 50), |
82 |
:-( |
ejabberd_hooks:add(get_mam_pm_gdpr_data, Host, ?MODULE, get_mam_pm_gdpr_data, 50). |
83 |
|
|
84 |
|
start_muc_archive(Host, _Opts) -> |
85 |
:-( |
ejabberd_hooks:add(mam_muc_archive_message, Host, ?MODULE, archive_message_muc, 50), |
86 |
:-( |
ejabberd_hooks:add(mam_muc_archive_size, Host, ?MODULE, archive_size, 50), |
87 |
:-( |
ejabberd_hooks:add(mam_muc_lookup_messages, Host, ?MODULE, lookup_messages_muc, 50), |
88 |
:-( |
ejabberd_hooks:add(mam_muc_remove_archive, Host, ?MODULE, remove_archive, 50), |
89 |
:-( |
ejabberd_hooks:add(get_mam_muc_gdpr_data, Host, ?MODULE, get_mam_muc_gdpr_data, 50). |
90 |
|
|
91 |
|
stop(Host) -> |
92 |
:-( |
case gen_mod:get_module_opt(Host, ?MODULE, pm, false) of |
93 |
|
true -> |
94 |
:-( |
stop_chat_archive(Host); |
95 |
|
false -> |
96 |
:-( |
ok |
97 |
|
end, |
98 |
:-( |
case gen_mod:get_module_opt(Host, ?MODULE, muc, false) of |
99 |
|
true -> |
100 |
:-( |
stop_muc_archive(Host); |
101 |
|
false -> |
102 |
:-( |
ok |
103 |
|
end. |
104 |
|
|
105 |
|
stop_chat_archive(Host) -> |
106 |
:-( |
ejabberd_hooks:delete(mam_archive_message, Host, ?MODULE, archive_message_muc, 50), |
107 |
:-( |
ejabberd_hooks:delete(mam_archive_size, Host, ?MODULE, archive_size, 50), |
108 |
:-( |
ejabberd_hooks:delete(mam_lookup_messages, Host, ?MODULE, lookup_messages_muc, 50), |
109 |
:-( |
ejabberd_hooks:delete(mam_remove_archive, Host, ?MODULE, remove_archive, 50), |
110 |
:-( |
ejabberd_hooks:delete(get_mam_pm_gdpr_data, Host, ?MODULE, get_mam_pm_gdpr_data, 50), |
111 |
:-( |
ok. |
112 |
|
|
113 |
|
stop_muc_archive(Host) -> |
114 |
:-( |
ejabberd_hooks:delete(mam_muc_archive_message, Host, ?MODULE, archive_message, 50), |
115 |
:-( |
ejabberd_hooks:delete(mam_muc_archive_size, Host, ?MODULE, archive_size, 50), |
116 |
:-( |
ejabberd_hooks:delete(mam_muc_lookup_messages, Host, ?MODULE, lookup_messages, 50), |
117 |
:-( |
ejabberd_hooks:delete(mam_muc_remove_archive, Host, ?MODULE, remove_archive, 50), |
118 |
:-( |
ejabberd_hooks:delete(get_mam_muc_gdpr_data, Host, ?MODULE, get_mam_muc_gdpr_data, 50), |
119 |
:-( |
ok. |
120 |
|
|
121 |
|
yz_search_index(Host) -> |
122 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, search_index, <<"mam">>). |
123 |
|
|
124 |
|
mam_bucket_type(Host) -> |
125 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, bucket_type, <<"mam_yz">>). |
126 |
|
|
127 |
|
%% LocJID - archive owner's JID |
128 |
|
%% RemJID - interlocutor's JID |
129 |
|
%% SrcJID - "Real" sender JID |
130 |
|
archive_message(_Result, Host, #{message_id := MessId, |
131 |
|
local_jid := LocJID, |
132 |
|
remote_jid := RemJID, |
133 |
|
source_jid := SrcJID, |
134 |
|
packet := Packet} = Params) -> |
135 |
:-( |
try |
136 |
:-( |
archive_message(Host, MessId, LocJID, RemJID, SrcJID, LocJID, Packet, pm) |
137 |
|
catch Class:Reason:StackTrace -> |
138 |
:-( |
?LOG_WARNING(maps:merge(Params, |
139 |
|
#{what => archive_message_failed, |
140 |
|
text => <<"Could not write message to archive">>, |
141 |
:-( |
class => Class, reason => Reason, stacktrace => StackTrace})), |
142 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
143 |
:-( |
{error, Reason} |
144 |
|
end. |
145 |
|
|
146 |
|
%% LocJID - MUC/MUC Light room's JID |
147 |
|
%% FromJID - "Real" sender JID |
148 |
|
%% SrcJID - Full JID of user within room (room@domain/user) |
149 |
|
archive_message_muc(_Result, Host, #{message_id := MessId, |
150 |
|
local_jid := LocJID, |
151 |
|
remote_jid := FromJID, |
152 |
|
source_jid := SrcJID, |
153 |
|
packet := Packet} = Params) -> |
154 |
:-( |
RemJIDMuc = maybe_muc_jid(SrcJID), |
155 |
:-( |
try |
156 |
:-( |
archive_message(Host, MessId, LocJID, RemJIDMuc, SrcJID, FromJID, Packet, muc) |
157 |
|
catch Class:Reason:StackTrace -> |
158 |
:-( |
?LOG_WARNING(maps:merge(Params, |
159 |
|
#{what => archive_muc_message_failed, |
160 |
|
text => <<"Could not write MUC message to archive">>, |
161 |
:-( |
class => Class, reason => Reason, stacktrace => StackTrace})), |
162 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
163 |
:-( |
{error, Reason} |
164 |
|
end. |
165 |
|
|
166 |
|
maybe_muc_jid(#jid{lresource = RemRes}) -> |
167 |
:-( |
{<<>>, RemRes, <<>>}; |
168 |
|
maybe_muc_jid(Other) -> |
169 |
:-( |
Other. |
170 |
|
|
171 |
|
|
172 |
|
lookup_messages({error, _Reason} = Result, _Host, _Params) -> |
173 |
:-( |
Result; |
174 |
|
lookup_messages(_Result, Host, Params) -> |
175 |
:-( |
try |
176 |
:-( |
lookup_messages(Host, Params) |
177 |
|
catch _Type:Reason:S -> |
178 |
:-( |
{error, {Reason, {stacktrace, S}}} |
179 |
|
end. |
180 |
|
|
181 |
|
|
182 |
|
lookup_messages_muc(Result, Host, #{with_jid := WithJID} = Params) -> |
183 |
:-( |
WithJIDMuc = maybe_muc_jid(WithJID), |
184 |
:-( |
lookup_messages(Result, Host, Params#{with_jid => WithJIDMuc}). |
185 |
|
|
186 |
|
|
187 |
|
archive_size(_Size, Host, _ArchiveID, ArchiveJID) -> |
188 |
:-( |
OwnerJID = mod_mam_utils:bare_jid(ArchiveJID), |
189 |
:-( |
RemoteJID = undefined, |
190 |
:-( |
{MsgIdStartNoRSM, MsgIdEndNoRSM} = |
191 |
|
mod_mam_utils:calculate_msg_id_borders(undefined, undefined, undefined), |
192 |
:-( |
F = fun get_msg_id_key/3, |
193 |
:-( |
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID, |
194 |
|
MsgIdStartNoRSM, MsgIdEndNoRSM, undefined, |
195 |
|
[{rows, 1}], F), |
196 |
:-( |
TotalCount. |
197 |
|
|
198 |
|
%% use correct bucket for given date |
199 |
|
|
200 |
|
-spec bucket(jid:server(), calendar:date() | yearweeknum() | integer()) -> |
201 |
|
{binary(), binary()} | undefined. |
202 |
|
bucket(Host, MsgId) when is_integer(MsgId) -> |
203 |
:-( |
{MicroSec, _} = mod_mam_utils:decode_compact_uuid(MsgId), |
204 |
:-( |
{MsgDate, _} = calendar:system_time_to_universal_time(MicroSec, microsecond), |
205 |
:-( |
bucket(Host, MsgDate); |
206 |
|
bucket(Host, {_, _, _} = Date) -> |
207 |
:-( |
bucket(Host, calendar:iso_week_number(Date)); |
208 |
|
bucket(Host, {Year, Week}) -> |
209 |
:-( |
YearBin = integer_to_binary(Year), |
210 |
:-( |
WeekNumBin = integer_to_binary(Week), |
211 |
:-( |
{mam_bucket_type(Host), <<"mam_", YearBin/binary, "_", WeekNumBin/binary>>}; |
212 |
|
bucket(_Host, _) -> |
213 |
:-( |
undefined. |
214 |
|
|
215 |
|
list_mam_buckets(Host) -> |
216 |
:-( |
Type = mam_bucket_type(Host), |
217 |
:-( |
{ok, Buckets} = riakc_pb_socket:list_buckets(mongoose_riak:get_worker(), Type), |
218 |
:-( |
[{Type, Bucket} || Bucket <- Buckets]. |
219 |
|
|
220 |
|
|
221 |
|
remove_bucket(Bucket) -> |
222 |
:-( |
{ok, Keys} = mongoose_riak:list_keys(Bucket), |
223 |
:-( |
[mongoose_riak:delete(Bucket, Key) || Key <- Keys]. |
224 |
|
|
225 |
|
|
226 |
|
%% PM: |
227 |
|
%% * LocJID - archive owner's JID |
228 |
|
%% * RemJID - interlocutor's JID |
229 |
|
%% * SrcJID - "Real" sender JID |
230 |
|
%% * OwnerJID - Same as LocJID |
231 |
|
%% MUC / MUC Light: |
232 |
|
%% * LocJID - MUC/MUC Light room's JID |
233 |
|
%% * RemJID - Nickname of JID of destination |
234 |
|
%% * SrcJID - Full JID of user within room (room@domain/user) |
235 |
|
%% * OwnerJID - "Real" sender JID (not room specific) |
236 |
|
archive_message(Host, MessID, LocJID, RemJID, SrcJID, OwnerJID, Packet, Type) -> |
237 |
:-( |
LocalJID = mod_mam_utils:bare_jid(LocJID), |
238 |
:-( |
RemoteJID = mod_mam_utils:bare_jid(RemJID), |
239 |
:-( |
SourceJID = mod_mam_utils:full_jid(SrcJID), |
240 |
:-( |
BareOwnerJID = mod_mam_utils:bare_jid(OwnerJID), |
241 |
:-( |
MsgId = integer_to_binary(MessID), |
242 |
:-( |
Key = key(LocalJID, RemoteJID, MsgId), |
243 |
|
|
244 |
:-( |
Bucket = bucket(Host, MessID), |
245 |
|
|
246 |
:-( |
RiakMap = create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type), |
247 |
:-( |
case mongoose_riak:update_type(Bucket, Key, riakc_map:to_op(RiakMap)) of |
248 |
:-( |
ok -> ok; |
249 |
:-( |
Other -> throw(Other) |
250 |
|
end. |
251 |
|
|
252 |
|
create_obj(Host, MsgId, SourceJID, BareOwnerJID, Packet, Type) -> |
253 |
:-( |
ModMAM = |
254 |
|
case Type of |
255 |
:-( |
pm -> mod_mam; |
256 |
:-( |
muc -> mod_mam_muc |
257 |
|
end, |
258 |
:-( |
BodyChars = mod_mam_utils:packet_to_search_body(ModMAM, Host, Packet), |
259 |
:-( |
BodyValue = unicode:characters_to_binary(BodyChars), |
260 |
:-( |
Ops = [ |
261 |
|
{{<<"msg_id">>, register}, |
262 |
:-( |
fun(R) -> riakc_register:set(MsgId, R) end}, |
263 |
|
{{<<"source_jid">>, register}, |
264 |
:-( |
fun(R) -> riakc_register:set(SourceJID, R) end}, |
265 |
|
{{<<"msg_owner_jid">>, register}, |
266 |
:-( |
fun(R) -> riakc_register:set(BareOwnerJID, R) end}, |
267 |
|
{{<<"mam_type">>, register}, |
268 |
:-( |
fun(R) -> riakc_register:set(atom_to_binary(Type, latin1), R) end}, |
269 |
|
{{<<"packet">>, register}, |
270 |
:-( |
fun(R) -> riakc_register:set(packet_to_stored_binary(Host, Packet), R) end}, |
271 |
|
{{<<"search_text">>, register}, |
272 |
:-( |
fun(R) -> riakc_register:set(BodyValue, R) end} |
273 |
|
], |
274 |
|
|
275 |
:-( |
mongoose_riak:create_new_map(Ops). |
276 |
|
|
277 |
|
lookup_messages(Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params) |
278 |
|
when ID =/= undefined -> |
279 |
:-( |
lookup_message_page(Host, RSM, Params); |
280 |
|
lookup_messages(Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params) |
281 |
|
when ID =/= undefined -> |
282 |
:-( |
lookup_message_page(Host, RSM, Params); |
283 |
|
lookup_messages(Host, Params) -> |
284 |
:-( |
do_lookup_messages(Host, Params). |
285 |
|
|
286 |
|
lookup_message_page(Host, RSM, Params) -> |
287 |
:-( |
PageSize = maps:get(page_size, Params), |
288 |
:-( |
{ok, Result} = do_lookup_messages(Host, Params#{page_size := 1 + PageSize}), |
289 |
:-( |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, Result). |
290 |
|
|
291 |
|
do_lookup_messages(Host, Params) -> |
292 |
:-( |
OwnerJID = mod_mam_utils:bare_jid(maps:get(owner_jid, Params)), |
293 |
:-( |
RemoteJID = mod_mam_utils:bare_jid(maps:get(with_jid, Params)), |
294 |
|
|
295 |
:-( |
RSM = maps:get(rsm, Params), |
296 |
|
|
297 |
:-( |
SearchOpts2 = add_sorting(RSM, [{rows, maps:get(page_size, Params)}]), |
298 |
:-( |
SearchOpts = add_offset(RSM, SearchOpts2), |
299 |
|
|
300 |
:-( |
F = fun get_msg_id_key/3, |
301 |
|
|
302 |
:-( |
Borders = maps:get(borders, Params), |
303 |
:-( |
Start = maps:get(start_ts, Params), |
304 |
:-( |
End = maps:get(end_ts, Params), |
305 |
:-( |
SearchText = maps:get(search_text, Params), |
306 |
:-( |
{MsgIdStart, MsgIdEnd} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End), |
307 |
:-( |
{TotalCountFullQuery, Result} = read_archive(Host, OwnerJID, RemoteJID, |
308 |
|
MsgIdStart, MsgIdEnd, SearchText, |
309 |
|
SearchOpts, F), |
310 |
|
|
311 |
:-( |
SortedKeys = sort_messages(Result), |
312 |
:-( |
case maps:get(is_simple, Params) of |
313 |
|
true -> |
314 |
:-( |
{ok, {undefined, undefined, get_messages(Host, SortedKeys)}}; |
315 |
|
_ -> |
316 |
:-( |
{MsgIdStartNoRSM, MsgIdEndNoRSM} = |
317 |
|
mod_mam_utils:calculate_msg_id_borders(Borders, Start, End), |
318 |
:-( |
{TotalCount, _} = read_archive(Host, OwnerJID, RemoteJID, |
319 |
|
MsgIdStartNoRSM, MsgIdEndNoRSM, SearchText, |
320 |
|
[{rows, 1}], F), |
321 |
:-( |
SLen = length(SortedKeys), |
322 |
:-( |
Args = {Host, OwnerJID, RemoteJID, MsgIdStartNoRSM, SearchText}, |
323 |
:-( |
Offset = calculate_offset(RSM, TotalCountFullQuery, SLen, Args), |
324 |
:-( |
{ok, {TotalCount, Offset, get_messages(Host, SortedKeys)}} |
325 |
|
end. |
326 |
|
|
327 |
|
|
328 |
|
add_sorting(#rsm_in{direction = before}, Opts) -> |
329 |
:-( |
[{sort, <<"msg_id_register desc">>} | Opts]; |
330 |
|
add_sorting(_, Opts) -> |
331 |
:-( |
[{sort, <<"msg_id_register asc">>} | Opts]. |
332 |
|
|
333 |
|
add_offset(#rsm_in{index = Offset}, Opts) when is_integer(Offset) -> |
334 |
:-( |
[{start, Offset} | Opts]; |
335 |
|
add_offset(_, Opts) -> |
336 |
:-( |
Opts. |
337 |
|
|
338 |
|
calculate_offset(#rsm_in{direction = before}, TotalCount, PageSize, _) -> |
339 |
:-( |
TotalCount - PageSize; |
340 |
|
calculate_offset(#rsm_in{direction = aft, id = Id}, _, _, |
341 |
|
{Host, Owner, Remote, MsgIdStart, SearchText}) |
342 |
|
when Id /= undefined -> |
343 |
:-( |
{Count, _} = read_archive(Host, Owner, Remote, |
344 |
|
MsgIdStart, Id, SearchText, |
345 |
|
[{rows, 1}], fun get_msg_id_key/3), |
346 |
:-( |
Count; |
347 |
|
calculate_offset(#rsm_in{direction = undefined, index = Index}, _, _, _) |
348 |
|
when is_integer(Index) -> |
349 |
:-( |
Index; |
350 |
|
calculate_offset(_, _TotalCount, _PageSize, _) -> |
351 |
:-( |
0. |
352 |
|
|
353 |
|
get_msg_id_key(Bucket, Key, Msgs) -> |
354 |
:-( |
[_, _, MsgId] = decode_key(Key), |
355 |
:-( |
Item = {binary_to_integer(MsgId), Bucket, Key}, |
356 |
:-( |
[Item | Msgs]. |
357 |
|
|
358 |
|
get_messages(Host, BucketKeys) -> |
359 |
:-( |
lists:flatten([get_message2(Host, MsgId, Bucket, Key) || {MsgId, Bucket, Key} <- BucketKeys]). |
360 |
|
|
361 |
|
get_message2(Host, MsgId, Bucket, Key) -> |
362 |
:-( |
case mongoose_riak:fetch_type(Bucket, Key) of |
363 |
|
{ok, RiakMap} -> |
364 |
:-( |
SourceJID = riakc_map:fetch({<<"source_jid">>, register}, RiakMap), |
365 |
:-( |
PacketBin = riakc_map:fetch({<<"packet">>, register}, RiakMap), |
366 |
:-( |
Packet = stored_binary_to_packet(Host, PacketBin), |
367 |
:-( |
#{id => MsgId, jid => jid:from_binary(SourceJID), packet => Packet}; |
368 |
|
_ -> |
369 |
:-( |
[] |
370 |
|
end. |
371 |
|
-spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(), |
372 |
|
mongooseim:host_type(), jid:jid()) -> |
373 |
|
ejabberd_gen_mam_archive:mam_pm_gdpr_data(). |
374 |
|
get_mam_pm_gdpr_data(Acc, _HostType, OwnerJid) -> |
375 |
:-( |
Messages = get_mam_gdpr_data(OwnerJid, <<"pm">>), |
376 |
:-( |
[{Id, jid:to_binary(Jid), exml:to_binary(Packet)} || #{id := Id, jid := Jid, packet := Packet} <- Messages] ++ Acc. |
377 |
|
|
378 |
|
-spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(), |
379 |
|
mongooseim:host_type(), jid:jid()) -> |
380 |
|
ejabberd_gen_mam_archive:mam_muc_gdpr_data(). |
381 |
|
get_mam_muc_gdpr_data(Acc, _HostType, JID) -> |
382 |
:-( |
Messages = get_mam_gdpr_data(JID, <<"muc">>), |
383 |
:-( |
[{MsgId, exml:to_binary(Packet)} || #{id := MsgId, packet := Packet} <- Messages] ++ Acc. |
384 |
|
|
385 |
|
get_mam_gdpr_data(#jid{ lserver = LServer } = BareJid, Type) -> |
386 |
:-( |
BareLJidBin = jid:to_binary(jid:to_lower(BareJid)), |
387 |
:-( |
Query = <<"msg_owner_jid_register:", BareLJidBin/binary, " AND mam_type_register:", Type/binary>>, |
388 |
:-( |
SearchOpts = [], |
389 |
:-( |
{ok, _Cnt, _, MsgIds} = fold_archive(LServer, fun get_msg_id_key/3, Query, SearchOpts, []), |
390 |
:-( |
get_messages(LServer, MsgIds). |
391 |
|
|
392 |
|
remove_archive(Acc, Host, _ArchiveID, ArchiveJID) -> |
393 |
:-( |
remove_archive(Host, ArchiveJID), |
394 |
:-( |
Acc. |
395 |
|
|
396 |
|
remove_archive(Host, ArchiveJID) -> |
397 |
:-( |
{ok, TotalCount, _, _} = R = remove_chunk(Host, ArchiveJID, 0), |
398 |
:-( |
Result = do_remove_archive(100, R, Host, ArchiveJID), |
399 |
:-( |
case Result of |
400 |
|
{stopped, N} -> |
401 |
:-( |
?LOG_WARNING(#{what => remove_archive_failed, |
402 |
|
text => <<"archive removal stopped for jid after " |
403 |
|
"processing {processed_count} items out of {total_count}">>, |
404 |
|
archive_jid => ArchiveJID, |
405 |
:-( |
processed_count => N, total_count => TotalCount}), |
406 |
:-( |
ok; |
407 |
|
{ok, _} -> |
408 |
:-( |
ok |
409 |
|
end. |
410 |
|
|
411 |
|
remove_chunk(Host, ArchiveJID, Acc) -> |
412 |
:-( |
KeyFiletrs = key_filters(mod_mam_utils:bare_jid(ArchiveJID)), |
413 |
:-( |
fold_archive(Host, |
414 |
|
fun delete_key_fun/3, |
415 |
|
KeyFiletrs, |
416 |
|
[{rows, 50}, {sort, <<"msg_id_register asc">>}], Acc). |
417 |
|
|
418 |
|
do_remove_archive(0, {ok, _, _, Acc}, _, _) -> |
419 |
:-( |
{stopped, Acc}; |
420 |
|
do_remove_archive(_, {ok, 0, _, Acc}, _, _) -> |
421 |
:-( |
{ok, Acc}; |
422 |
|
do_remove_archive(N, {ok, _TotalResults, _RowsIterated, Acc}, Host, ArchiveJID) -> |
423 |
:-( |
timer:sleep(1000), % give Riak some time to clear after just removed keys |
424 |
:-( |
R = remove_chunk(Host, ArchiveJID, Acc), |
425 |
:-( |
do_remove_archive(N-1, R, Host, ArchiveJID). |
426 |
|
|
427 |
|
delete_key_fun(Bucket, Key, N) -> |
428 |
:-( |
ok = mongoose_riak:delete(Bucket, Key, [{dw, 2}]), |
429 |
:-( |
N + 1. |
430 |
|
|
431 |
|
|
432 |
|
key(LocalJID, RemoteJID, MsgId) -> |
433 |
:-( |
<<LocalJID/binary, $/, RemoteJID/binary, $/, MsgId/binary>>. |
434 |
|
|
435 |
|
decode_key(KeyBinary) -> |
436 |
:-( |
binary:split(KeyBinary, <<"/">>, [global]). |
437 |
|
|
438 |
|
-spec read_archive(jid:server(), |
439 |
|
binary() | undefined, |
440 |
|
binary() | undefined, |
441 |
|
term(), |
442 |
|
term(), |
443 |
|
binary() | undefined, |
444 |
|
[term()], |
445 |
|
fun()) -> |
446 |
|
{integer(), list()} | {error, term()}. |
447 |
|
read_archive(Host, OwnerJID, WithJID, Start, End, SearchText, SearchOpts, Fun) -> |
448 |
:-( |
KeyFilters = key_filters(OwnerJID, WithJID, Start, End, SearchText), |
449 |
:-( |
{ok, Cnt, _, NewAcc} = fold_archive(Host, Fun, KeyFilters, SearchOpts, []), |
450 |
:-( |
{Cnt, NewAcc}. |
451 |
|
|
452 |
|
|
453 |
|
sort_messages(Msgs) -> |
454 |
:-( |
SortFun = fun({MsgId1, _, _}, {MsgId2, _, _}) -> |
455 |
:-( |
MsgId1 =< MsgId2 |
456 |
|
end, |
457 |
:-( |
lists:sort(SortFun, Msgs). |
458 |
|
|
459 |
|
fold_archive(Host, Fun, Query, SearchOpts, InitialAcc) -> |
460 |
:-( |
Result = mongoose_riak:search(yz_search_index(Host), Query, SearchOpts), |
461 |
:-( |
case Result of |
462 |
|
{ok, {search_results, [], _, Count}} -> |
463 |
:-( |
{ok, Count, 0, InitialAcc}; |
464 |
|
{ok, {search_results, Results, _Score, Count}} -> |
465 |
:-( |
{ok, Count, length(Results), do_fold_archive(Fun, Results, InitialAcc)}; |
466 |
|
{error, R} = Err -> |
467 |
:-( |
?LOG_WARNING(#{what => mam_read_error, |
468 |
:-( |
cql_query => Query, reason => R}), |
469 |
:-( |
Err |
470 |
|
end. |
471 |
|
|
472 |
|
do_fold_archive(Fun, BucketKeys, InitialAcc) -> |
473 |
:-( |
lists:foldl(fun({_Index, Props}, Acc) -> |
474 |
:-( |
{_, Bucket} = lists:keyfind(<<"_yz_rb">>, 1, Props), |
475 |
:-( |
{_, Type} = lists:keyfind(<<"_yz_rt">>, 1, Props), |
476 |
:-( |
{_, Key} = lists:keyfind(<<"_yz_rk">>, 1, Props), |
477 |
:-( |
Fun({Type, Bucket}, Key, Acc) |
478 |
|
end, InitialAcc, BucketKeys). |
479 |
|
|
480 |
|
%% Filter API |
481 |
|
key_filters(LocalJid) -> |
482 |
:-( |
key_filters(LocalJid, undefined, undefined, undefined, undefined). |
483 |
|
|
484 |
|
key_filters(LocalJid, RemoteJid, Start, End, SearchText) -> |
485 |
:-( |
JidFilter = jid_filters(LocalJid, RemoteJid), |
486 |
:-( |
IdFilter = id_filters(Start, End), |
487 |
:-( |
TextFilter = search_text_filter(SearchText), |
488 |
|
|
489 |
:-( |
Separator = <<" AND ">>, |
490 |
:-( |
Filters0 = [JidFilter, IdFilter, TextFilter], |
491 |
:-( |
Filters1 = [[Filter, Separator] || Filter <- Filters0, is_binary(Filter)], |
492 |
:-( |
FiltersBin = list_to_binary(Filters1), |
493 |
:-( |
binary:part(FiltersBin, 0, byte_size(FiltersBin) - byte_size(Separator)). |
494 |
|
|
495 |
|
%% Filter helpers |
496 |
|
-spec search_text_filter(binary() | undefined) -> binary(). |
497 |
|
search_text_filter(undefined) -> |
498 |
:-( |
undefined; |
499 |
|
search_text_filter(SearchText) -> |
500 |
:-( |
Separator = <<"~1 AND search_text_register:">>, |
501 |
:-( |
NormText = mod_mam_utils:normalize_search_text(SearchText, Separator), |
502 |
|
%% Fuzzy search on tokens from search phrase |
503 |
:-( |
<<"search_text_register:", NormText/binary, "~1">>. |
504 |
|
|
505 |
|
jid_filters(LocalJid, undefined) -> |
506 |
:-( |
<<"_yz_rk:", LocalJid/binary, "/*/*">>; |
507 |
|
jid_filters(LocalJid, RemoteJid) -> |
508 |
:-( |
<<"_yz_rk:", LocalJid/binary, "/", RemoteJid/binary, "/*">>. |
509 |
|
|
510 |
|
id_filters(undefined, undefined) -> |
511 |
:-( |
undefined; |
512 |
|
id_filters(MsgId, MsgId) -> |
513 |
:-( |
MsgIdBin = integer_to_binary(MsgId), |
514 |
:-( |
<<"msg_id_register:", MsgIdBin/binary>>; |
515 |
|
id_filters(StartInt, undefined) -> |
516 |
:-( |
solr_id_filters(integer_to_binary(StartInt), <<"*">>); |
517 |
|
id_filters(undefined, EndInt) -> |
518 |
:-( |
solr_id_filters(<<"*">>, integer_to_binary(EndInt)); |
519 |
|
id_filters(StartInt, EndInt) -> |
520 |
:-( |
solr_id_filters(integer_to_binary(StartInt), integer_to_binary(EndInt)). |
521 |
|
|
522 |
|
solr_id_filters(Start, End) -> |
523 |
:-( |
<<"msg_id_register:[", Start/binary, " TO ", End/binary, " ]">>. |
524 |
|
|
525 |
|
%% ---------------------------------------------------------------------- |
526 |
|
%% Optimizations |
527 |
|
|
528 |
|
packet_to_stored_binary(Host, Packet) -> |
529 |
:-( |
Module = db_message_codec(Host), |
530 |
:-( |
mam_message:encode(Module, Packet). |
531 |
|
|
532 |
|
stored_binary_to_packet(Host, Bin) -> |
533 |
:-( |
Module = db_message_codec(Host), |
534 |
:-( |
mam_message:decode(Module, Bin). |
535 |
|
|
536 |
|
-spec db_message_codec(Host :: jid:server()) -> module(). |
537 |
|
db_message_codec(Host) -> |
538 |
:-( |
gen_mod:get_module_opt(Host, ?MODULE, db_message_format, mam_message_xml). |