./ct_report/coverage/mod_mam_muc_elasticsearch_arch.COVER.html

1 %%------------------------------------------------------------------
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc ElasticSearch backend for Message Archive Management for
17 %% group messages.
18 %%------------------------------------------------------------------
19 -module(mod_mam_muc_elasticsearch_arch).
20
21 -behaviour(gen_mod).
22 -behaviour(ejabberd_gen_mam_archive).
23
24 %% gen_mod callbacks
25 -export([start/2]).
26 -export([stop/1]).
27
28 %% ejabberd_gen_mam_archive callbacks
29 -export([archive_message/3]).
30 -export([lookup_messages/3]).
31 -export([remove_archive/4]).
32 -export([archive_size/4]).
33
34 %gdpr
35 -export([get_mam_muc_gdpr_data/3]).
36
37 -ignore_xref([remove_archive/4]).
38
39 -include("mongoose.hrl").
40 -include("mongoose_rsm.hrl").
41 -include("mod_mam.hrl").
42 -include("jlib.hrl").
43
44 -define(INDEX_NAME, <<"muc_messages">>).
45 -define(TYPE_NAME, <<"muc">>).
46
47 %%-------------------------------------------------------------------
48 %% gen_mod callbacks
49 %%-------------------------------------------------------------------
50
51 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
52 start(HostType, _Opts) ->
53 8 ejabberd_hooks:add(hooks(HostType)),
54 8 ok.
55
56 -spec stop(mongooseim:host_type()) -> ok.
57 stop(HostType) ->
58 8 ejabberd_hooks:delete(hooks(HostType)),
59 8 ok.
60
61 %%-------------------------------------------------------------------
62 %% ejabberd_gen_mam_archive callbacks
63 %%-------------------------------------------------------------------
64 -spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
65 mongooseim:host_type(), jid:jid()) ->
66 ejabberd_gen_mam_archive:mam_muc_gdpr_data().
67 get_mam_muc_gdpr_data(Acc, _HostType, Source) ->
68 34 BinSource = mod_mam_utils:bare_jid(Source),
69 34 Filter = #{term => #{from_jid => BinSource}},
70 34 Sorting = #{mam_id => #{order => asc}},
71 34 SearchQuery = #{query => #{bool => #{filter => Filter}},
72 sort => Sorting},
73 34 case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of
74 {ok, #{<<"hits">> := #{<<"hits">> := Hits}}} ->
75 34 Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits),
76 34 Messages ++ Acc;
77 {error, _} ->
78
:-(
Acc
79 end.
80
81 archive_message(_Result, Host, #{message_id := MessageId,
82 local_jid := RoomJid,
83 remote_jid := FromJID,
84 source_jid := SourceJid,
85 packet := Packet} = Params) ->
86 204 Room = mod_mam_utils:bare_jid(RoomJid),
87 204 SourceBinJid = mod_mam_utils:full_jid(SourceJid),
88 204 From = mod_mam_utils:bare_jid(FromJID),
89 204 DocId = make_document_id(Room, MessageId),
90 204 Doc = make_document(MessageId, Room, SourceBinJid, Packet, From),
91 204 case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of
92 {ok, _} ->
93 204 ok;
94 {error, Reason} = Err ->
95
:-(
?LOG_ERROR(maps:merge(Params,
96 #{what => archive_muc_message_failed, reason => Reason,
97 server => Host, room => Room, source => SourceBinJid,
98
:-(
message_id => MessageId})),
99
:-(
mongoose_metrics:update(Host, modMamDropped, 1),
100
:-(
Err
101 end.
102
103 lookup_messages(Result, Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params)
104 when ID =/= undefined ->
105 3 lookup_message_page(Result, Host, RSM, Params);
106 lookup_messages(Result, Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params)
107 when ID =/= undefined ->
108 2 lookup_message_page(Result, Host, RSM, Params);
109 lookup_messages(Result, Host, Params) ->
110 35 do_lookup_messages(Result, Host, Params).
111
112 lookup_message_page(AccResult, Host, RSM, Params) ->
113 5 PageSize = maps:get(page_size, Params),
114 5 case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of
115
:-(
{error, _} = Err -> Err;
116 {ok, LookupResult} ->
117 5 mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult)
118 end.
119
120 do_lookup_messages(_Result, Host, Params) ->
121 40 SearchQuery0 = build_search_query(Params),
122 40 Sorting = [#{mam_id => #{order => determine_sorting(Params)}}],
123 40 ResultLimit = maps:get(page_size, Params),
124 40 SearchQuery1 = SearchQuery0#{sort => Sorting,
125 size => ResultLimit},
126 40 SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params),
127 40 case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of
128 {ok, Result} ->
129 40 {ok, search_result_to_mam_lookup_result(Result, Params)};
130 {error, Reason} = Err ->
131
:-(
?LOG_ERROR(maps:merge(Params,
132 #{what => lookup_muc_messages_failed,
133
:-(
server => Host, reason => Reason})),
134
:-(
Err
135 end.
136
137 -spec archive_size(Size :: integer(),
138 Host :: jid:server(),
139 _ArchiveId,
140 RoomJid :: jid:jid()) -> non_neg_integer().
141 archive_size(_Size, _Host, _ArchiveId, RoomJid) ->
142 107 SearchQuery = build_search_query(#{owner_jid => RoomJid}),
143 107 archive_size(SearchQuery).
144
145 -spec remove_archive(Acc :: mongoose_acc:t(),
146 Host :: jid:server(),
147 ArchiveId :: mod_mam:archive_id(),
148 RoomJid :: jid:jid()) -> Acc when Acc :: map().
149 remove_archive(Acc, Host, _ArchiveId, RoomJid) ->
150 33 SearchQuery = build_search_query(#{owner_jid => RoomJid}),
151 33 case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of
152 ok ->
153 33 ok;
154 {error, Reason} ->
155
:-(
?LOG_ERROR(#{what => remove_muc_archive_failed,
156
:-(
server => Host, room_jid => RoomJid, reason => Reason}),
157
:-(
ok
158 end,
159 33 Acc.
160
161 %%-------------------------------------------------------------------
162 %% Helpers
163 %%-------------------------------------------------------------------
164
165 -spec hooks(jid:lserver()) -> [ejabberd_hooks:hook()].
166 hooks(Host) ->
167 16 [{mam_muc_archive_message, Host, ?MODULE, archive_message, 50},
168 {mam_muc_lookup_messages, Host, ?MODULE, lookup_messages, 50},
169 {mam_muc_archive_size, Host, ?MODULE, archive_size, 50},
170 {mam_muc_remove_archive, Host, ?MODULE, remove_archive, 50},
171 {get_mam_muc_gdpr_data, Host, ?MODULE, get_mam_muc_gdpr_data, 50}].
172
173 -spec make_document_id(binary(), mod_mam:message_id()) -> binary().
174 make_document_id(Room, MessageId) ->
175 204 <<Room/binary, $$, (integer_to_binary(MessageId))/binary>>.
176
177 -spec make_document(mod_mam:message_id(), binary(), binary(), exml:element(),
178 binary()) -> map().
179 make_document(MessageId, Room, SourceBinJid, Packet, FromJID) ->
180 204 #{mam_id => MessageId,
181 from_jid => FromJID,
182 room => Room,
183 source_jid => SourceBinJid,
184 message => exml:to_binary(Packet),
185 body => exml_query:path(Packet, [{element, <<"body">>}, cdata])
186 }.
187
188 -spec build_search_query(map()) -> mongoose_elasticsearch:query().
189 build_search_query(Params) ->
190 186 Filters = build_filters(Params),
191 186 TextSearchQuery = build_text_search_query(Params),
192 186 #{query =>
193 #{bool =>
194 #{must => TextSearchQuery,
195 filter => Filters}}}.
196
197 -spec build_filters(map()) -> [map()].
198 build_filters(Params) ->
199 186 Builders = [fun room_filter/1,
200 fun with_jid_filter/1,
201 fun range_filter/1],
202 186 lists:flatmap(fun(F) -> F(Params) end, Builders).
203
204 -spec room_filter(map()) -> [map()].
205 room_filter(#{owner_jid := Room}) ->
206 186 BinRoom = mod_mam_utils:bare_jid(Room),
207 186 [#{term => #{room => BinRoom}}].
208
209 -spec with_jid_filter(map()) -> [map()].
210 with_jid_filter(#{with_jid := #jid{} = WithJid}) ->
211 2 [#{term => #{source_jid => mod_mam_utils:full_jid(WithJid)}}];
212 with_jid_filter(_) ->
213 184 [].
214
215 -spec range_filter(map()) -> [map()].
216 range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) ->
217 46 {StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End),
218 46 Range1 = maybe_add_end_filter(EndId, #{}),
219 46 Range2 = maybe_add_start_filter(StartId, Range1),
220
221 46 case maps:size(Range2) of
222 0 ->
223 35 [];
224 _ ->
225 11 [#{range => #{mam_id => Range2}}]
226 end;
227 range_filter(_) ->
228 140 [].
229
230 -spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map().
231 maybe_add_end_filter(undefined, RangeMap) ->
232 40 RangeMap;
233 maybe_add_end_filter(Value, RangeMap) ->
234 6 RangeMap#{le => Value}.
235
236 -spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map().
237 maybe_add_start_filter(undefined, RangeMap) ->
238 40 RangeMap;
239 maybe_add_start_filter(Value, RangeMap) ->
240 6 RangeMap#{ge => Value}.
241
242 -spec build_text_search_query(map()) -> map().
243 build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) ->
244 1 #{simple_query_string => #{query => SearchText,
245 fields => [<<"body">>],
246 default_operator => <<"and">>}};
247 build_text_search_query(_) ->
248 185 #{match_all => #{}}.
249
250 -spec determine_sorting(map()) -> asc | desc.
251 determine_sorting(#{rsm := #rsm_in{direction = before}}) ->
252 8 desc;
253 determine_sorting(_) ->
254 32 asc.
255
256 -spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) ->
257 mongoose_elasticsearch:query().
258 maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) ->
259 2 Query#{from => Offset};
260 maybe_add_from_constraint(Query, _) ->
261 38 Query.
262
263 -spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result().
264 search_result_to_mam_lookup_result(Result, Params) ->
265 40 #{<<"hits">> :=
266 #{<<"hits">> := Hits,
267 <<"total">> := TotalCount}} = Result,
268
269 40 Messages = lists:sort(
270 lists:map(fun hit_to_mam_message/1, Hits)),
271
272 40 case maps:get(is_simple, Params) of
273 true ->
274 1 {undefined, undefined, Messages};
275 _ ->
276 39 CorrectedTotalCount = corrected_total_count(TotalCount, Params),
277 39 Count = length(Messages),
278 39 Offset = calculate_offset(TotalCount, Count, Params),
279 39 {CorrectedTotalCount, Offset, Messages}
280 end.
281
282 -spec hit_to_mam_message(map()) -> mod_mam:message_row().
283 hit_to_mam_message(#{<<"_source">> := JSON}) ->
284 200 MessageId = maps:get(<<"mam_id">>, JSON),
285 200 Packet = maps:get(<<"message">>, JSON),
286 200 SourceJid = maps:get(<<"source_jid">>, JSON),
287 200 {ok, Stanza} = exml:parse(Packet),
288 200 #{id => MessageId, jid => jid:from_binary(SourceJid), packet => Stanza}.
289
290 hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) ->
291 13 MessageId = maps:get(<<"mam_id">>, JSON),
292 13 Packet = maps:get(<<"message">>, JSON),
293 13 {integer_to_binary(MessageId), Packet}.
294
295 %% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count
296 %% returned by the query should represent the size of the whole result set, which in case of MAM
297 %% is bound only by the MAM filters.
298 %% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race
299 %% condition here, because a user may send a message between initial request to ElasticSearch and
300 %% the count request issued here.
301 -spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) ->
302 non_neg_integer().
303 corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) ->
304 4 Query = build_search_query(Params#{rsm := undefined}),
305 4 archive_size(Query);
306 corrected_total_count(Count, _) ->
307 35 Count.
308
309 -spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer().
310 calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) ->
311 2 Index;
312 calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) ->
313 7 TotalCount - Count;
314 calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) ->
315 %% Not sure how this works..
316 2 Params1 = update_borders(Params0#{rsm := undefined}, Id + 1),
317 2 Query = build_search_query(Params1),
318 2 archive_size(Query);
319 calculate_offset(_, _, _) ->
320 28 0.
321
322 -spec update_borders(map(), non_neg_integer()) -> map().
323 update_borders(#{borders := Borders} = Params, EndId) ->
324 2 Params#{borders := update_borders_to_id(Borders, EndId)}.
325
326 -spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}.
327 update_borders_to_id(undefined, EndId) ->
328 2 #mam_borders{to_id = EndId};
329 update_borders_to_id(Borders, EndId) ->
330
:-(
Borders#mam_borders{to_id = EndId}.
331
332 -spec archive_size(mod_mam_elasticsearch:query()) -> non_neg_integer().
333 archive_size(Query) ->
334 113 case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of
335 {ok, Count} ->
336 113 Count;
337 {error, Reason} ->
338
:-(
?LOG_ERROR(#{what => archive_size_failed, es_query => Query, reason => Reason}),
339
:-(
0
340 end.
Line Hits Source