1 |
|
%%------------------------------------------------------------------ |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc ElasticSearch backend for Message Archive Management for |
17 |
|
%% one-to-one messages. |
18 |
|
%%%------------------------------------------------------------------ |
19 |
|
-module(mod_mam_elasticsearch_arch). |
20 |
|
|
21 |
|
-behaviour(gen_mod). |
22 |
|
-behaviour(ejabberd_gen_mam_archive). |
23 |
|
-behaviour(mongoose_module_metrics). |
24 |
|
|
25 |
|
%% gen_mod callbacks |
26 |
|
-export([start/2]). |
27 |
|
-export([stop/1]). |
28 |
|
-export([hooks/1]). |
29 |
|
|
30 |
|
%% ejabberd_gen_mam_archive callbacks |
31 |
|
-export([archive_message/3]). |
32 |
|
-export([lookup_messages/3]). |
33 |
|
-export([remove_archive/3]). |
34 |
|
-export([archive_size/3]). |
35 |
|
-export([get_mam_pm_gdpr_data/3]). |
36 |
|
|
37 |
|
-include("mongoose.hrl"). |
38 |
|
-include("mongoose_rsm.hrl"). |
39 |
|
-include("mod_mam.hrl"). |
40 |
|
-include("jlib.hrl"). |
41 |
|
|
42 |
|
-define(INDEX_NAME, <<"messages">>). |
43 |
|
-define(TYPE_NAME, <<"pm">>). |
44 |
|
|
45 |
|
%%------------------------------------------------------------------- |
46 |
|
%% gen_mod callbacks |
47 |
|
%%------------------------------------------------------------------- |
48 |
|
|
49 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
50 |
|
start(_HostType, _Opts) -> |
51 |
16 |
ok. |
52 |
|
|
53 |
|
-spec stop(mongooseim:host_type()) -> ok. |
54 |
|
stop(_HostType) -> |
55 |
16 |
ok. |
56 |
|
|
57 |
|
-spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when |
58 |
|
Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(), |
59 |
|
Params :: #{jid := jid:jid()}, |
60 |
|
Extra :: gen_hook:extra(). |
61 |
|
get_mam_pm_gdpr_data(Acc, #{jid := Owner}, _Extra) -> |
62 |
46 |
BinOwner = mod_mam_utils:bare_jid(Owner), |
63 |
46 |
Filter = #{term => #{owner => BinOwner}}, |
64 |
46 |
Sorting = #{mam_id => #{order => asc}}, |
65 |
46 |
SearchQuery = #{query => #{bool => #{filter => Filter}}, sort => Sorting}, |
66 |
46 |
{ok, #{<<"hits">> := #{<<"hits">> := Hits}}} |
67 |
|
= mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery), |
68 |
46 |
Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits), |
69 |
46 |
{ok, Messages ++ Acc}. |
70 |
|
|
71 |
|
%%------------------------------------------------------------------- |
72 |
|
%% ejabberd_gen_mam_archive callbacks |
73 |
|
%%------------------------------------------------------------------- |
74 |
|
|
75 |
|
-spec archive_message(Acc, Params, Extra) -> {ok, Acc} when |
76 |
|
Acc :: ok | {error, term()}, |
77 |
|
Params :: mod_mam:archive_message_params(), |
78 |
|
Extra :: gen_hook:extra(). |
79 |
|
archive_message(_Result, |
80 |
|
#{message_id := MessageId, |
81 |
|
local_jid := LocalJid, |
82 |
|
remote_jid := RemoteJid, |
83 |
|
source_jid := SourceJid, |
84 |
|
packet := Packet, |
85 |
|
is_groupchat := IsGroupChat}, |
86 |
|
#{host_type := Host}) -> |
87 |
1085 |
Owner = mod_mam_utils:bare_jid(LocalJid), |
88 |
1085 |
Remote = mod_mam_utils:bare_jid(RemoteJid), |
89 |
1085 |
SourceBinJid = mod_mam_utils:full_jid(SourceJid), |
90 |
1085 |
DocId = make_document_id(Owner, MessageId), |
91 |
1085 |
IsGroupChatBin = atom_to_binary(IsGroupChat), |
92 |
1085 |
Doc = make_document(MessageId, Owner, Remote, SourceBinJid, Packet, IsGroupChatBin), |
93 |
1085 |
case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of |
94 |
|
{ok, _} -> |
95 |
1085 |
{ok, ok}; |
96 |
|
{error, Reason} = Err -> |
97 |
:-( |
?LOG_ERROR(#{what => archive_message_failed, |
98 |
|
user => Owner, server => Host, remote => Remote, |
99 |
:-( |
message_id => MessageId, reason => Reason}), |
100 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
101 |
:-( |
{ok, Err} |
102 |
|
end. |
103 |
|
|
104 |
|
-spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when |
105 |
|
Acc :: {ok, mod_mam:lookup_result()} | {error, term()}, |
106 |
|
Params :: mam_iq:lookup_params(), |
107 |
|
Extra :: gen_hook:extra(). |
108 |
|
lookup_messages(Result, |
109 |
|
#{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params, |
110 |
|
#{host_type := HostType}) |
111 |
|
when ID =/= undefined -> |
112 |
18 |
{ok, lookup_message_page(Result, HostType, RSM, Params)}; |
113 |
|
lookup_messages(Result, |
114 |
|
#{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params, |
115 |
|
#{host_type := HostType}) |
116 |
|
when ID =/= undefined -> |
117 |
21 |
{ok, lookup_message_page(Result, HostType, RSM, Params)}; |
118 |
|
lookup_messages(Result, Params, #{host_type := HostType}) -> |
119 |
89 |
{ok, do_lookup_messages(Result, HostType, Params)}. |
120 |
|
|
121 |
|
lookup_message_page(AccResult, Host, #rsm_in{id = _ID} = RSM, #{message_id := MsgID} = Params) -> |
122 |
39 |
PageSize = maps:get(page_size, Params), |
123 |
39 |
case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of |
124 |
:-( |
{error, _} = Err -> Err; |
125 |
|
{ok, LookupResult} -> |
126 |
39 |
case MsgID of |
127 |
|
undefined -> |
128 |
37 |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult); |
129 |
|
_ -> |
130 |
2 |
{ok, LookupResult} |
131 |
|
end |
132 |
|
end. |
133 |
|
|
134 |
|
do_lookup_messages(_Result, Host, Params) -> |
135 |
128 |
SearchQuery0 = build_search_query(Params), |
136 |
128 |
Sorting = [#{mam_id => #{order => determine_sorting(Params)}}], |
137 |
128 |
ResultLimit = maps:get(page_size, Params), |
138 |
128 |
SearchQuery1 = SearchQuery0#{sort => Sorting, |
139 |
|
size => ResultLimit}, |
140 |
128 |
SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params), |
141 |
128 |
case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of |
142 |
|
{ok, Result} -> |
143 |
128 |
{ok, search_result_to_mam_lookup_result(Result, Params)}; |
144 |
|
{error, Reason} = Err -> |
145 |
:-( |
?LOG_ERROR(maps:merge(Params, |
146 |
|
#{what => lookup_messages_failed, |
147 |
:-( |
server => Host, reason => Reason})), |
148 |
:-( |
Err |
149 |
|
end. |
150 |
|
|
151 |
|
-spec archive_size(Acc, Params, Extra) -> {ok, Acc} when |
152 |
|
Acc :: integer(), |
153 |
|
Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()}, |
154 |
|
Extra :: gen_hook:extra(). |
155 |
|
archive_size(_Size, #{owner := OwnerJid}, _Extra)-> |
156 |
388 |
SearchQuery = build_search_query(#{owner_jid => OwnerJid}), |
157 |
388 |
{ok, archive_size(SearchQuery)}. |
158 |
|
|
159 |
|
-spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when |
160 |
|
Acc :: term(), |
161 |
|
Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()}, |
162 |
|
Extra :: gen_hook:extra(). |
163 |
|
remove_archive(Acc, #{owner := OwnerJid}, #{host_type := HostType}) -> |
164 |
87 |
remove_archive(HostType, OwnerJid), |
165 |
87 |
{ok, Acc}. |
166 |
|
|
167 |
|
remove_archive(Host, OwnerJid) -> |
168 |
87 |
SearchQuery = build_search_query(#{owner_jid => OwnerJid}), |
169 |
87 |
case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of |
170 |
|
ok -> |
171 |
87 |
ok; |
172 |
|
{error, Reason} -> |
173 |
:-( |
?LOG_ERROR(#{what => remove_archive_failed, |
174 |
:-( |
server => Host, user_jid => OwnerJid, reason => Reason}), |
175 |
:-( |
ok |
176 |
|
end. |
177 |
|
|
178 |
|
%%------------------------------------------------------------------- |
179 |
|
%% Helpers |
180 |
|
%%------------------------------------------------------------------- |
181 |
|
|
182 |
|
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). |
183 |
|
hooks(Host) -> |
184 |
32 |
[{mam_archive_message, Host, fun ?MODULE:archive_message/3, #{}, 50}, |
185 |
|
{mam_lookup_messages, Host, fun ?MODULE:lookup_messages/3, #{}, 50}, |
186 |
|
{mam_archive_size, Host, fun ?MODULE:archive_size/3, #{}, 50}, |
187 |
|
{mam_remove_archive, Host, fun ?MODULE:remove_archive/3, #{}, 50}, |
188 |
|
{get_mam_pm_gdpr_data, Host, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50}]. |
189 |
|
|
190 |
|
-spec make_document_id(binary(), mod_mam:message_id()) -> binary(). |
191 |
|
make_document_id(Owner, MessageId) -> |
192 |
1085 |
<<Owner/binary, $$, (integer_to_binary(MessageId))/binary>>. |
193 |
|
|
194 |
|
-spec make_document(mod_mam:message_id(), binary(), binary(), |
195 |
|
binary(), exml:element(), binary()) -> |
196 |
|
map(). |
197 |
|
make_document(MessageId, Owner, Remote, SourceBinJid, Packet, IsGroupChat) -> |
198 |
1085 |
#{mam_id => MessageId, |
199 |
|
owner => Owner, |
200 |
|
remote => Remote, |
201 |
|
source_jid => SourceBinJid, |
202 |
|
message => exml:to_binary(Packet), |
203 |
|
body => exml_query:path(Packet, [{element, <<"body">>}, cdata]), |
204 |
|
is_groupchat => IsGroupChat |
205 |
|
}. |
206 |
|
|
207 |
|
-spec build_search_query(map()) -> mongoose_elasticsearch:query(). |
208 |
|
build_search_query(Params) -> |
209 |
637 |
Filters = build_filters(Params), |
210 |
637 |
TextSearchQuery = build_text_search_query(Params), |
211 |
637 |
#{query => |
212 |
|
#{bool => |
213 |
|
#{must => TextSearchQuery, |
214 |
|
filter => Filters}}}. |
215 |
|
|
216 |
|
-spec build_filters(map()) -> [map()]. |
217 |
|
build_filters(Params) -> |
218 |
637 |
Builders = [fun owner_filter/1, |
219 |
|
fun with_jid_filter/1, |
220 |
|
fun is_groupchat_filter/1, |
221 |
|
fun specific_message_filter/1, |
222 |
|
fun range_filter/1], |
223 |
637 |
lists:flatmap(fun(F) -> F(Params) end, Builders). |
224 |
|
|
225 |
|
-spec owner_filter(map()) -> [map()]. |
226 |
|
owner_filter(#{owner_jid := Owner}) -> |
227 |
637 |
BinOwner = mod_mam_utils:bare_jid(Owner), |
228 |
637 |
[#{term => #{owner => BinOwner}}]. |
229 |
|
|
230 |
|
-spec with_jid_filter(map()) -> [map()]. |
231 |
|
with_jid_filter(#{with_jid := #jid{} = WithJid}) -> |
232 |
8 |
[#{term => #{remote => mod_mam_utils:bare_jid(WithJid)}}]; |
233 |
|
with_jid_filter(_) -> |
234 |
629 |
[]. |
235 |
|
|
236 |
|
-spec is_groupchat_filter(map()) -> [map()]. |
237 |
|
is_groupchat_filter(#{include_groupchat := false}) -> |
238 |
2 |
[#{term => #{is_groupchat => <<"false">>}}]; |
239 |
|
is_groupchat_filter(_) -> |
240 |
635 |
[]. |
241 |
|
|
242 |
|
-spec specific_message_filter(map()) -> [map()]. |
243 |
|
specific_message_filter(#{message_id := ID}) when is_integer(ID) -> |
244 |
7 |
[#{term => #{mam_id => ID}}]; |
245 |
|
specific_message_filter(_) -> |
246 |
630 |
[]. |
247 |
|
|
248 |
|
-spec range_filter(map()) -> [map()]. |
249 |
|
range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) -> |
250 |
162 |
{StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End), |
251 |
162 |
Range1 = maybe_add_end_filter(EndId, #{}), |
252 |
162 |
Range2 = maybe_add_start_filter(StartId, Range1), |
253 |
162 |
case maps:size(Range2) of |
254 |
|
0 -> |
255 |
93 |
[]; |
256 |
|
_ -> |
257 |
69 |
[#{range => #{mam_id => Range2}}] |
258 |
|
end; |
259 |
|
range_filter(_) -> |
260 |
475 |
[]. |
261 |
|
|
262 |
|
-spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map(). |
263 |
|
maybe_add_end_filter(undefined, RangeMap) -> |
264 |
122 |
RangeMap; |
265 |
|
maybe_add_end_filter(Value, RangeMap) -> |
266 |
40 |
RangeMap#{le => Value}. |
267 |
|
|
268 |
|
-spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map(). |
269 |
|
maybe_add_start_filter(undefined, RangeMap) -> |
270 |
124 |
RangeMap; |
271 |
|
maybe_add_start_filter(Value, RangeMap) -> |
272 |
38 |
RangeMap#{ge => Value}. |
273 |
|
|
274 |
|
-spec build_text_search_query(map()) -> map(). |
275 |
|
build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) -> |
276 |
5 |
#{simple_query_string => #{query => SearchText, |
277 |
|
fields => [<<"body">>], |
278 |
|
default_operator => <<"and">>}}; |
279 |
|
build_text_search_query(_) -> |
280 |
632 |
#{match_all => #{}}. |
281 |
|
|
282 |
|
-spec determine_sorting(map()) -> asc | desc. |
283 |
|
determine_sorting(#{rsm := #rsm_in{direction = before}}) -> |
284 |
33 |
desc; |
285 |
|
determine_sorting(_) -> |
286 |
95 |
asc. |
287 |
|
|
288 |
|
-spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) -> |
289 |
|
mongoose_elasticsearch:query(). |
290 |
|
maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) -> |
291 |
4 |
Query#{from => Offset}; |
292 |
|
maybe_add_from_constraint(Query, _) -> |
293 |
124 |
Query. |
294 |
|
|
295 |
|
-spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result(). |
296 |
|
search_result_to_mam_lookup_result(Result, Params) -> |
297 |
128 |
#{<<"hits">> := |
298 |
|
#{<<"hits">> := Hits, |
299 |
|
<<"total">> := TotalCount}} = Result, |
300 |
|
|
301 |
128 |
Messages = lists:sort( |
302 |
|
lists:map(fun hit_to_mam_message/1, Hits)), |
303 |
|
|
304 |
128 |
case maps:get(is_simple, Params) of |
305 |
|
true -> |
306 |
24 |
{undefined, undefined, Messages}; |
307 |
|
_ -> |
308 |
104 |
CorrectedTotalCount = corrected_total_count(TotalCount, Params), |
309 |
104 |
Count = length(Messages), |
310 |
104 |
Offset = calculate_offset(TotalCount, Count, Params), |
311 |
104 |
{CorrectedTotalCount, Offset, Messages} |
312 |
|
end. |
313 |
|
|
314 |
|
-spec hit_to_mam_message(map()) -> mod_mam:message_row(). |
315 |
|
hit_to_mam_message(#{<<"_source">> := JSON}) -> |
316 |
445 |
MessageId = maps:get(<<"mam_id">>, JSON), |
317 |
445 |
Packet = maps:get(<<"message">>, JSON), |
318 |
445 |
SourceBinJid = maps:get(<<"source_jid">>, JSON), |
319 |
445 |
{ok, Stanza} = exml:parse(Packet), |
320 |
445 |
#{id => MessageId, jid => jid:from_binary(SourceBinJid), packet => Stanza}. |
321 |
|
|
322 |
|
hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) -> |
323 |
64 |
MessageId = maps:get(<<"mam_id">>, JSON), |
324 |
64 |
Packet = maps:get(<<"message">>, JSON), |
325 |
64 |
SourceBinJid = maps:get(<<"source_jid">>, JSON), |
326 |
64 |
{integer_to_binary(MessageId), SourceBinJid, Packet}. |
327 |
|
|
328 |
|
%% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count |
329 |
|
%% returned by the query should represent the size of the whole result set, which in case of MAM |
330 |
|
%% is bound only by the MAM filters. |
331 |
|
%% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race |
332 |
|
%% condition here, because a user may send a message between initial request to ElasticSearch and |
333 |
|
%% the count request issued here. |
334 |
|
-spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) -> |
335 |
|
non_neg_integer(). |
336 |
|
corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) -> |
337 |
21 |
Query = build_search_query(Params#{rsm := undefined}), |
338 |
21 |
archive_size(Query); |
339 |
|
corrected_total_count(Count, _) -> |
340 |
83 |
Count. |
341 |
|
|
342 |
|
-spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer(). |
343 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) -> |
344 |
4 |
Index; |
345 |
|
calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) -> |
346 |
21 |
TotalCount - Count; |
347 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) -> |
348 |
|
%% Not sure how this works.. |
349 |
13 |
Params1 = update_borders(Params0#{rsm := undefined}, Id + 1), |
350 |
13 |
Query = build_search_query(Params1), |
351 |
13 |
archive_size(Query); |
352 |
|
calculate_offset(_, _, _) -> |
353 |
66 |
0. |
354 |
|
|
355 |
|
-spec update_borders(map(), non_neg_integer()) -> map(). |
356 |
|
update_borders(#{borders := Borders} = Params, EndId) -> |
357 |
13 |
Params#{borders := update_borders_to_id(Borders, EndId)}. |
358 |
|
|
359 |
|
-spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}. |
360 |
|
update_borders_to_id(undefined, EndId) -> |
361 |
13 |
#mam_borders{to_id = EndId}; |
362 |
|
update_borders_to_id(Borders, EndId) -> |
363 |
:-( |
Borders#mam_borders{to_id = EndId}. |
364 |
|
|
365 |
|
-spec archive_size(mongoose_elasticsearch:query()) -> non_neg_integer(). |
366 |
|
archive_size(Query) -> |
367 |
422 |
case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of |
368 |
|
{ok, Count} -> |
369 |
422 |
Count; |
370 |
|
{error, Reason} -> |
371 |
:-( |
?LOG_ERROR(#{what => archive_size_failed, reason => Reason, es_query => Query, |
372 |
:-( |
text => <<"Failed to retrieve count of messages from ElasticSearch">>}), |
373 |
:-( |
0 |
374 |
|
end. |