./ct_report/coverage/mod_mam_elasticsearch_arch.COVER.html

1 %%------------------------------------------------------------------
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc ElasticSearch backend for Message Archive Management for
17 %% one-to-one messages.
18 %%%------------------------------------------------------------------
19 -module(mod_mam_elasticsearch_arch).
20
21 -behaviour(gen_mod).
22 -behaviour(ejabberd_gen_mam_archive).
23 -behaviour(mongoose_module_metrics).
24
25 %% gen_mod callbacks
26 -export([start/2]).
27 -export([stop/1]).
28 -export([hooks/1]).
29
30 %% ejabberd_gen_mam_archive callbacks
31 -export([archive_message/3]).
32 -export([lookup_messages/3]).
33 -export([remove_archive/3]).
34 -export([archive_size/3]).
35 -export([get_mam_pm_gdpr_data/3]).
36
37 -include("mongoose.hrl").
38 -include("mongoose_rsm.hrl").
39 -include("mod_mam.hrl").
40 -include("jlib.hrl").
41
42 -define(INDEX_NAME, <<"messages">>).
43 -define(TYPE_NAME, <<"pm">>).
44
45 %%-------------------------------------------------------------------
46 %% gen_mod callbacks
47 %%-------------------------------------------------------------------
48
49 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
50 start(_HostType, _Opts) ->
51 16 ok.
52
53 -spec stop(mongooseim:host_type()) -> ok.
54 stop(_HostType) ->
55 16 ok.
56
57 -spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
58 Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
59 Params :: #{jid := jid:jid()},
60 Extra :: gen_hook:extra().
61 get_mam_pm_gdpr_data(Acc, #{jid := Owner}, _Extra) ->
62 46 BinOwner = mod_mam_utils:bare_jid(Owner),
63 46 Filter = #{term => #{owner => BinOwner}},
64 46 Sorting = #{mam_id => #{order => asc}},
65 46 SearchQuery = #{query => #{bool => #{filter => Filter}}, sort => Sorting},
66 46 {ok, #{<<"hits">> := #{<<"hits">> := Hits}}}
67 = mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery),
68 46 Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits),
69 46 {ok, Messages ++ Acc}.
70
71 %%-------------------------------------------------------------------
72 %% ejabberd_gen_mam_archive callbacks
73 %%-------------------------------------------------------------------
74
75 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
76 Acc :: ok | {error, term()},
77 Params :: mod_mam:archive_message_params(),
78 Extra :: gen_hook:extra().
79 archive_message(_Result,
80 #{message_id := MessageId,
81 local_jid := LocalJid,
82 remote_jid := RemoteJid,
83 source_jid := SourceJid,
84 packet := Packet,
85 is_groupchat := IsGroupChat},
86 #{host_type := Host}) ->
87 1085 Owner = mod_mam_utils:bare_jid(LocalJid),
88 1085 Remote = mod_mam_utils:bare_jid(RemoteJid),
89 1085 SourceBinJid = mod_mam_utils:full_jid(SourceJid),
90 1085 DocId = make_document_id(Owner, MessageId),
91 1085 IsGroupChatBin = atom_to_binary(IsGroupChat),
92 1085 Doc = make_document(MessageId, Owner, Remote, SourceBinJid, Packet, IsGroupChatBin),
93 1085 case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of
94 {ok, _} ->
95 1085 {ok, ok};
96 {error, Reason} = Err ->
97
:-(
?LOG_ERROR(#{what => archive_message_failed,
98 user => Owner, server => Host, remote => Remote,
99
:-(
message_id => MessageId, reason => Reason}),
100
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
101
:-(
{ok, Err}
102 end.
103
104 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
105 Acc :: {ok, mod_mam:lookup_result()} | {error, term()},
106 Params :: mam_iq:lookup_params(),
107 Extra :: gen_hook:extra().
108 lookup_messages(Result,
109 #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params,
110 #{host_type := HostType})
111 when ID =/= undefined ->
112 18 {ok, lookup_message_page(Result, HostType, RSM, Params)};
113 lookup_messages(Result,
114 #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params,
115 #{host_type := HostType})
116 when ID =/= undefined ->
117 21 {ok, lookup_message_page(Result, HostType, RSM, Params)};
118 lookup_messages(Result, Params, #{host_type := HostType}) ->
119 89 {ok, do_lookup_messages(Result, HostType, Params)}.
120
121 lookup_message_page(AccResult, Host, #rsm_in{id = _ID} = RSM, #{message_id := MsgID} = Params) ->
122 39 PageSize = maps:get(page_size, Params),
123 39 case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of
124
:-(
{error, _} = Err -> Err;
125 {ok, LookupResult} ->
126 39 case MsgID of
127 undefined ->
128 37 mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult);
129 _ ->
130 2 {ok, LookupResult}
131 end
132 end.
133
134 do_lookup_messages(_Result, Host, Params) ->
135 128 SearchQuery0 = build_search_query(Params),
136 128 Sorting = [#{mam_id => #{order => determine_sorting(Params)}}],
137 128 ResultLimit = maps:get(page_size, Params),
138 128 SearchQuery1 = SearchQuery0#{sort => Sorting,
139 size => ResultLimit},
140 128 SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params),
141 128 case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of
142 {ok, Result} ->
143 128 {ok, search_result_to_mam_lookup_result(Result, Params)};
144 {error, Reason} = Err ->
145
:-(
?LOG_ERROR(maps:merge(Params,
146 #{what => lookup_messages_failed,
147
:-(
server => Host, reason => Reason})),
148
:-(
Err
149 end.
150
151 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
152 Acc :: integer(),
153 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
154 Extra :: gen_hook:extra().
155 archive_size(_Size, #{owner := OwnerJid}, _Extra)->
156 388 SearchQuery = build_search_query(#{owner_jid => OwnerJid}),
157 388 {ok, archive_size(SearchQuery)}.
158
159 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
160 Acc :: term(),
161 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
162 Extra :: gen_hook:extra().
163 remove_archive(Acc, #{owner := OwnerJid}, #{host_type := HostType}) ->
164 87 remove_archive(HostType, OwnerJid),
165 87 {ok, Acc}.
166
167 remove_archive(Host, OwnerJid) ->
168 87 SearchQuery = build_search_query(#{owner_jid => OwnerJid}),
169 87 case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of
170 ok ->
171 87 ok;
172 {error, Reason} ->
173
:-(
?LOG_ERROR(#{what => remove_archive_failed,
174
:-(
server => Host, user_jid => OwnerJid, reason => Reason}),
175
:-(
ok
176 end.
177
178 %%-------------------------------------------------------------------
179 %% Helpers
180 %%-------------------------------------------------------------------
181
182 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
183 hooks(Host) ->
184 32 [{mam_archive_message, Host, fun ?MODULE:archive_message/3, #{}, 50},
185 {mam_lookup_messages, Host, fun ?MODULE:lookup_messages/3, #{}, 50},
186 {mam_archive_size, Host, fun ?MODULE:archive_size/3, #{}, 50},
187 {mam_remove_archive, Host, fun ?MODULE:remove_archive/3, #{}, 50},
188 {get_mam_pm_gdpr_data, Host, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50}].
189
190 -spec make_document_id(binary(), mod_mam:message_id()) -> binary().
191 make_document_id(Owner, MessageId) ->
192 1085 <<Owner/binary, $$, (integer_to_binary(MessageId))/binary>>.
193
194 -spec make_document(mod_mam:message_id(), binary(), binary(),
195 binary(), exml:element(), binary()) ->
196 map().
197 make_document(MessageId, Owner, Remote, SourceBinJid, Packet, IsGroupChat) ->
198 1085 #{mam_id => MessageId,
199 owner => Owner,
200 remote => Remote,
201 source_jid => SourceBinJid,
202 message => exml:to_binary(Packet),
203 body => exml_query:path(Packet, [{element, <<"body">>}, cdata]),
204 is_groupchat => IsGroupChat
205 }.
206
207 -spec build_search_query(map()) -> mongoose_elasticsearch:query().
208 build_search_query(Params) ->
209 637 Filters = build_filters(Params),
210 637 TextSearchQuery = build_text_search_query(Params),
211 637 #{query =>
212 #{bool =>
213 #{must => TextSearchQuery,
214 filter => Filters}}}.
215
216 -spec build_filters(map()) -> [map()].
217 build_filters(Params) ->
218 637 Builders = [fun owner_filter/1,
219 fun with_jid_filter/1,
220 fun is_groupchat_filter/1,
221 fun specific_message_filter/1,
222 fun range_filter/1],
223 637 lists:flatmap(fun(F) -> F(Params) end, Builders).
224
225 -spec owner_filter(map()) -> [map()].
226 owner_filter(#{owner_jid := Owner}) ->
227 637 BinOwner = mod_mam_utils:bare_jid(Owner),
228 637 [#{term => #{owner => BinOwner}}].
229
230 -spec with_jid_filter(map()) -> [map()].
231 with_jid_filter(#{with_jid := #jid{} = WithJid}) ->
232 8 [#{term => #{remote => mod_mam_utils:bare_jid(WithJid)}}];
233 with_jid_filter(_) ->
234 629 [].
235
236 -spec is_groupchat_filter(map()) -> [map()].
237 is_groupchat_filter(#{include_groupchat := false}) ->
238 2 [#{term => #{is_groupchat => <<"false">>}}];
239 is_groupchat_filter(_) ->
240 635 [].
241
242 -spec specific_message_filter(map()) -> [map()].
243 specific_message_filter(#{message_id := ID}) when is_integer(ID) ->
244 7 [#{term => #{mam_id => ID}}];
245 specific_message_filter(_) ->
246 630 [].
247
248 -spec range_filter(map()) -> [map()].
249 range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) ->
250 162 {StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End),
251 162 Range1 = maybe_add_end_filter(EndId, #{}),
252 162 Range2 = maybe_add_start_filter(StartId, Range1),
253 162 case maps:size(Range2) of
254 0 ->
255 93 [];
256 _ ->
257 69 [#{range => #{mam_id => Range2}}]
258 end;
259 range_filter(_) ->
260 475 [].
261
262 -spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map().
263 maybe_add_end_filter(undefined, RangeMap) ->
264 122 RangeMap;
265 maybe_add_end_filter(Value, RangeMap) ->
266 40 RangeMap#{le => Value}.
267
268 -spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map().
269 maybe_add_start_filter(undefined, RangeMap) ->
270 124 RangeMap;
271 maybe_add_start_filter(Value, RangeMap) ->
272 38 RangeMap#{ge => Value}.
273
274 -spec build_text_search_query(map()) -> map().
275 build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) ->
276 5 #{simple_query_string => #{query => SearchText,
277 fields => [<<"body">>],
278 default_operator => <<"and">>}};
279 build_text_search_query(_) ->
280 632 #{match_all => #{}}.
281
282 -spec determine_sorting(map()) -> asc | desc.
283 determine_sorting(#{rsm := #rsm_in{direction = before}}) ->
284 33 desc;
285 determine_sorting(_) ->
286 95 asc.
287
288 -spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) ->
289 mongoose_elasticsearch:query().
290 maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) ->
291 4 Query#{from => Offset};
292 maybe_add_from_constraint(Query, _) ->
293 124 Query.
294
295 -spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result().
296 search_result_to_mam_lookup_result(Result, Params) ->
297 128 #{<<"hits">> :=
298 #{<<"hits">> := Hits,
299 <<"total">> := TotalCount}} = Result,
300
301 128 Messages = lists:sort(
302 lists:map(fun hit_to_mam_message/1, Hits)),
303
304 128 case maps:get(is_simple, Params) of
305 true ->
306 24 {undefined, undefined, Messages};
307 _ ->
308 104 CorrectedTotalCount = corrected_total_count(TotalCount, Params),
309 104 Count = length(Messages),
310 104 Offset = calculate_offset(TotalCount, Count, Params),
311 104 {CorrectedTotalCount, Offset, Messages}
312 end.
313
314 -spec hit_to_mam_message(map()) -> mod_mam:message_row().
315 hit_to_mam_message(#{<<"_source">> := JSON}) ->
316 445 MessageId = maps:get(<<"mam_id">>, JSON),
317 445 Packet = maps:get(<<"message">>, JSON),
318 445 SourceBinJid = maps:get(<<"source_jid">>, JSON),
319 445 {ok, Stanza} = exml:parse(Packet),
320 445 #{id => MessageId, jid => jid:from_binary(SourceBinJid), packet => Stanza}.
321
322 hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) ->
323 64 MessageId = maps:get(<<"mam_id">>, JSON),
324 64 Packet = maps:get(<<"message">>, JSON),
325 64 SourceBinJid = maps:get(<<"source_jid">>, JSON),
326 64 {integer_to_binary(MessageId), SourceBinJid, Packet}.
327
328 %% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count
329 %% returned by the query should represent the size of the whole result set, which in case of MAM
330 %% is bound only by the MAM filters.
331 %% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race
332 %% condition here, because a user may send a message between initial request to ElasticSearch and
333 %% the count request issued here.
334 -spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) ->
335 non_neg_integer().
336 corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) ->
337 21 Query = build_search_query(Params#{rsm := undefined}),
338 21 archive_size(Query);
339 corrected_total_count(Count, _) ->
340 83 Count.
341
342 -spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer().
343 calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) ->
344 4 Index;
345 calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) ->
346 21 TotalCount - Count;
347 calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) ->
348 %% Not sure how this works..
349 13 Params1 = update_borders(Params0#{rsm := undefined}, Id + 1),
350 13 Query = build_search_query(Params1),
351 13 archive_size(Query);
352 calculate_offset(_, _, _) ->
353 66 0.
354
355 -spec update_borders(map(), non_neg_integer()) -> map().
356 update_borders(#{borders := Borders} = Params, EndId) ->
357 13 Params#{borders := update_borders_to_id(Borders, EndId)}.
358
359 -spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}.
360 update_borders_to_id(undefined, EndId) ->
361 13 #mam_borders{to_id = EndId};
362 update_borders_to_id(Borders, EndId) ->
363
:-(
Borders#mam_borders{to_id = EndId}.
364
365 -spec archive_size(mongoose_elasticsearch:query()) -> non_neg_integer().
366 archive_size(Query) ->
367 422 case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of
368 {ok, Count} ->
369 422 Count;
370 {error, Reason} ->
371
:-(
?LOG_ERROR(#{what => archive_size_failed, reason => Reason, es_query => Query,
372
:-(
text => <<"Failed to retrieve count of messages from ElasticSearch">>}),
373
:-(
0
374 end.
Line Hits Source