./ct_report/coverage/mod_mam_elasticsearch_arch.COVER.html

1 %%------------------------------------------------------------------
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc ElasticSearch backend for Message Archive Management for
17 %% one-to-one messages.
18 %%%------------------------------------------------------------------
19 -module(mod_mam_elasticsearch_arch).
20
21 -behaviour(gen_mod).
22 -behaviour(ejabberd_gen_mam_archive).
23 -behaviour(mongoose_module_metrics).
24
25 %% gen_mod callbacks
26 -export([start/2]).
27 -export([stop/1]).
28
29 %% ejabberd_gen_mam_archive callbacks
30 -export([archive_message/3]).
31 -export([lookup_messages/3]).
32 -export([remove_archive/4]).
33 -export([archive_size/4]).
34
35 -export([get_mam_pm_gdpr_data/3]).
36
37 -ignore_xref([remove_archive/4]).
38
39 -include("mongoose.hrl").
40 -include("mongoose_rsm.hrl").
41 -include("mod_mam.hrl").
42 -include("jlib.hrl").
43
44 -define(INDEX_NAME, <<"messages">>).
45 -define(TYPE_NAME, <<"pm">>).
46
47 %%-------------------------------------------------------------------
48 %% gen_mod callbacks
49 %%-------------------------------------------------------------------
50
51 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
52 start(HostType, _Opts) ->
53 15 ejabberd_hooks:add(hooks(HostType)),
54 15 ok.
55
56 -spec stop(mongooseim:host_type()) -> ok.
57 stop(HostType) ->
58 15 ejabberd_hooks:delete(hooks(HostType)),
59 15 ok.
60
61 -spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
62 mongooseim:host_type(), jid:jid()) ->
63 ejabberd_gen_mam_archive:mam_pm_gdpr_data().
64 get_mam_pm_gdpr_data(Acc, _HostType, Owner) ->
65 45 BinOwner = mod_mam_utils:bare_jid(Owner),
66 45 Filter = #{term => #{owner => BinOwner}},
67 45 Sorting = #{mam_id => #{order => asc}},
68 45 SearchQuery = #{query => #{bool => #{filter => Filter}}, sort => Sorting},
69 45 {ok, #{<<"hits">> := #{<<"hits">> := Hits}}}
70 = mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery),
71 45 Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits),
72 45 Messages ++ Acc.
73
74 %%-------------------------------------------------------------------
75 %% ejabberd_gen_mam_archive callbacks
76 %%-------------------------------------------------------------------
77
78 archive_message(_Result, Host, #{message_id := MessageId,
79 local_jid := LocalJid,
80 remote_jid := RemoteJid,
81 source_jid := SourceJid,
82 packet := Packet}) ->
83 703 Owner = mod_mam_utils:bare_jid(LocalJid),
84 703 Remote = mod_mam_utils:bare_jid(RemoteJid),
85 703 SourceBinJid = mod_mam_utils:full_jid(SourceJid),
86 703 DocId = make_document_id(Owner, MessageId),
87 703 Doc = make_document(MessageId, Owner, Remote, SourceBinJid, Packet),
88 703 case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of
89 {ok, _} ->
90 703 ok;
91 {error, Reason} = Err ->
92
:-(
?LOG_ERROR(#{what => archive_message_failed,
93 user => Owner, server => Host, remote => Remote,
94
:-(
message_id => MessageId, reason => Reason}),
95
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
96
:-(
Err
97 end.
98
99 lookup_messages(Result, Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params)
100 when ID =/= undefined ->
101 10 lookup_message_page(Result, Host, RSM, Params);
102 lookup_messages(Result, Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params)
103 when ID =/= undefined ->
104 11 lookup_message_page(Result, Host, RSM, Params);
105 lookup_messages(Result, Host, Params) ->
106 65 do_lookup_messages(Result, Host, Params).
107
108 lookup_message_page(AccResult, Host, #rsm_in{id = _ID} = RSM, Params) ->
109 21 PageSize = maps:get(page_size, Params),
110 21 case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of
111
:-(
{error, _} = Err -> Err;
112 {ok, LookupResult} ->
113 21 mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult)
114 end.
115
116 do_lookup_messages(_Result, Host, Params) ->
117 86 SearchQuery0 = build_search_query(Params),
118 86 Sorting = [#{mam_id => #{order => determine_sorting(Params)}}],
119 86 ResultLimit = maps:get(page_size, Params),
120 86 SearchQuery1 = SearchQuery0#{sort => Sorting,
121 size => ResultLimit},
122 86 SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params),
123 86 case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of
124 {ok, Result} ->
125 86 {ok, search_result_to_mam_lookup_result(Result, Params)};
126 {error, Reason} = Err ->
127
:-(
?LOG_ERROR(maps:merge(Params,
128 #{what => lookup_messages_failed,
129
:-(
server => Host, reason => Reason})),
130
:-(
Err
131 end.
132
133 -spec archive_size(Size :: integer(),
134 Host :: jid:server(),
135 _ArchiveId,
136 OwnerJid :: jid:jid()) -> non_neg_integer().
137 archive_size(_Size, _Host, _ArchiveId, OwnerJid) ->
138 221 SearchQuery = build_search_query(#{owner_jid => OwnerJid}),
139 221 archive_size(SearchQuery).
140
141 -spec remove_archive(Acc :: mongoose_acc:t(),
142 Host :: jid:server(),
143 ArchiveId :: mod_mam:archive_id(),
144 OwnerJid :: jid:jid()) -> Acc when Acc :: map().
145 remove_archive(Acc, Host, _ArchiveId, OwnerJid) ->
146 100 remove_archive(Host, OwnerJid),
147 100 Acc.
148
149 remove_archive(Host, OwnerJid) ->
150 100 SearchQuery = build_search_query(#{owner_jid => OwnerJid}),
151 100 case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of
152 ok ->
153 100 ok;
154 {error, Reason} ->
155
:-(
?LOG_ERROR(#{what => remove_archive_failed,
156
:-(
server => Host, user_jid => OwnerJid, reason => Reason}),
157
:-(
ok
158 end.
159
160 %%-------------------------------------------------------------------
161 %% Helpers
162 %%-------------------------------------------------------------------
163
164 -spec hooks(jid:lserver()) -> [ejabberd_hooks:hook()].
165 hooks(Host) ->
166 30 [{mam_archive_message, Host, ?MODULE, archive_message, 50},
167 {mam_lookup_messages, Host, ?MODULE, lookup_messages, 50},
168 {mam_archive_size, Host, ?MODULE, archive_size, 50},
169 {mam_remove_archive, Host, ?MODULE, remove_archive, 50},
170 {get_mam_pm_gdpr_data, Host, ?MODULE, get_mam_pm_gdpr_data, 50}].
171
172 -spec make_document_id(binary(), mod_mam:message_id()) -> binary().
173 make_document_id(Owner, MessageId) ->
174 703 <<Owner/binary, $$, (integer_to_binary(MessageId))/binary>>.
175
176 -spec make_document(mod_mam:message_id(), binary(), binary(), binary(), exml:element()) ->
177 map().
178 make_document(MessageId, Owner, Remote, SourceBinJid, Packet) ->
179 703 #{mam_id => MessageId,
180 owner => Owner,
181 remote => Remote,
182 source_jid => SourceBinJid,
183 message => exml:to_binary(Packet),
184 body => exml_query:path(Packet, [{element, <<"body">>}, cdata])
185 }.
186
187 -spec build_search_query(map()) -> mongoose_elasticsearch:query().
188 build_search_query(Params) ->
189 437 Filters = build_filters(Params),
190 437 TextSearchQuery = build_text_search_query(Params),
191 437 #{query =>
192 #{bool =>
193 #{must => TextSearchQuery,
194 filter => Filters}}}.
195
196 -spec build_filters(map()) -> [map()].
197 build_filters(Params) ->
198 437 Builders = [fun owner_filter/1,
199 fun with_jid_filter/1,
200 fun range_filter/1],
201 437 lists:flatmap(fun(F) -> F(Params) end, Builders).
202
203 -spec owner_filter(map()) -> [map()].
204 owner_filter(#{owner_jid := Owner}) ->
205 437 BinOwner = mod_mam_utils:bare_jid(Owner),
206 437 [#{term => #{owner => BinOwner}}].
207
208 -spec with_jid_filter(map()) -> [map()].
209 with_jid_filter(#{with_jid := #jid{} = WithJid}) ->
210 2 [#{term => #{remote => mod_mam_utils:bare_jid(WithJid)}}];
211 with_jid_filter(_) ->
212 435 [].
213
214 -spec range_filter(map()) -> [map()].
215 range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) ->
216 116 {StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End),
217 116 Range1 = maybe_add_end_filter(EndId, #{}),
218 116 Range2 = maybe_add_start_filter(StartId, Range1),
219 116 case maps:size(Range2) of
220 0 ->
221 70 [];
222 _ ->
223 46 [#{range => #{mam_id => Range2}}]
224 end;
225 range_filter(_) ->
226 321 [].
227
228 -spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map().
229 maybe_add_end_filter(undefined, RangeMap) ->
230 89 RangeMap;
231 maybe_add_end_filter(Value, RangeMap) ->
232 27 RangeMap#{le => Value}.
233
234 -spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map().
235 maybe_add_start_filter(undefined, RangeMap) ->
236 91 RangeMap;
237 maybe_add_start_filter(Value, RangeMap) ->
238 25 RangeMap#{ge => Value}.
239
240 -spec build_text_search_query(map()) -> map().
241 build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) ->
242 5 #{simple_query_string => #{query => SearchText,
243 fields => [<<"body">>],
244 default_operator => <<"and">>}};
245 build_text_search_query(_) ->
246 432 #{match_all => #{}}.
247
248 -spec determine_sorting(map()) -> asc | desc.
249 determine_sorting(#{rsm := #rsm_in{direction = before}}) ->
250 21 desc;
251 determine_sorting(_) ->
252 65 asc.
253
254 -spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) ->
255 mongoose_elasticsearch:query().
256 maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) ->
257 4 Query#{from => Offset};
258 maybe_add_from_constraint(Query, _) ->
259 82 Query.
260
261 -spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result().
262 search_result_to_mam_lookup_result(Result, Params) ->
263 86 #{<<"hits">> :=
264 #{<<"hits">> := Hits,
265 <<"total">> := TotalCount}} = Result,
266
267 86 Messages = lists:sort(
268 lists:map(fun hit_to_mam_message/1, Hits)),
269
270 86 case maps:get(is_simple, Params) of
271 true ->
272 2 {undefined, undefined, Messages};
273 _ ->
274 84 CorrectedTotalCount = corrected_total_count(TotalCount, Params),
275 84 Count = length(Messages),
276 84 Offset = calculate_offset(TotalCount, Count, Params),
277 84 {CorrectedTotalCount, Offset, Messages}
278 end.
279
280 -spec hit_to_mam_message(map()) -> mod_mam:message_row().
281 hit_to_mam_message(#{<<"_source">> := JSON}) ->
282 327 MessageId = maps:get(<<"mam_id">>, JSON),
283 327 Packet = maps:get(<<"message">>, JSON),
284 327 SourceBinJid = maps:get(<<"source_jid">>, JSON),
285 327 {ok, Stanza} = exml:parse(Packet),
286 327 #{id => MessageId, jid => jid:from_binary(SourceBinJid), packet => Stanza}.
287
288 hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) ->
289 63 MessageId = maps:get(<<"mam_id">>, JSON),
290 63 Packet = maps:get(<<"message">>, JSON),
291 63 SourceBinJid = maps:get(<<"source_jid">>, JSON),
292 63 {integer_to_binary(MessageId), SourceBinJid, Packet}.
293
294 %% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count
295 %% returned by the query should represent the size of the whole result set, which in case of MAM
296 %% is bound only by the MAM filters.
297 %% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race
298 %% condition here, because a user may send a message between initial request to ElasticSearch and
299 %% the count request issued here.
300 -spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) ->
301 non_neg_integer().
302 corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) ->
303 19 Query = build_search_query(Params#{rsm := undefined}),
304 19 archive_size(Query);
305 corrected_total_count(Count, _) ->
306 65 Count.
307
308 -spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer().
309 calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) ->
310 4 Index;
311 calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) ->
312 19 TotalCount - Count;
313 calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) ->
314 %% Not sure how this works..
315 11 Params1 = update_borders(Params0#{rsm := undefined}, Id + 1),
316 11 Query = build_search_query(Params1),
317 11 archive_size(Query);
318 calculate_offset(_, _, _) ->
319 50 0.
320
321 -spec update_borders(map(), non_neg_integer()) -> map().
322 update_borders(#{borders := Borders} = Params, EndId) ->
323 11 Params#{borders := update_borders_to_id(Borders, EndId)}.
324
325 -spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}.
326 update_borders_to_id(undefined, EndId) ->
327 11 #mam_borders{to_id = EndId};
328 update_borders_to_id(Borders, EndId) ->
329
:-(
Borders#mam_borders{to_id = EndId}.
330
331 -spec archive_size(mod_mam_elasticsearch:query()) -> non_neg_integer().
332 archive_size(Query) ->
333 251 case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of
334 {ok, Count} ->
335 251 Count;
336 {error, Reason} ->
337
:-(
?LOG_ERROR(#{what => archive_size_failed, reason => Reason, es_query => Query,
338
:-(
text => <<"Failed to retrieve count of messages from ElasticSearch">>}),
339
:-(
0
340 end.
Line Hits Source