
1 %%------------------------------------------------------------------
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc ElasticSearch backend for Message Archive Management for
17 %% group messages.
18 %%------------------------------------------------------------------
19 -module(mod_mam_muc_elasticsearch_arch).
21 -behaviour(gen_mod).
22 -behaviour(ejabberd_gen_mam_archive).
24 %% gen_mod callbacks
25 -export([start/2]).
26 -export([stop/1]).
27 -export([hooks/1]).
29 %% ejabberd_gen_mam_archive callbacks
30 -export([archive_message/3]).
31 -export([lookup_messages/3]).
32 -export([remove_archive/3]).
33 -export([archive_size/3]).
34 -export([get_mam_muc_gdpr_data/3]).
36 -include("mongoose.hrl").
37 -include("mongoose_rsm.hrl").
38 -include("mod_mam.hrl").
39 -include("jlib.hrl").
41 -define(INDEX_NAME, <<"muc_messages">>).
42 -define(TYPE_NAME, <<"muc">>).
44 %%-------------------------------------------------------------------
45 %% gen_mod callbacks
46 %%-------------------------------------------------------------------
48 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
49 start(_HostType, _Opts) ->
50 8 ok.
52 -spec stop(mongooseim:host_type()) -> ok.
53 stop(_HostType) ->
54 8 ok.
56 %%-------------------------------------------------------------------
57 %% ejabberd_gen_mam_archive callbacks
58 %%-------------------------------------------------------------------
59 -spec get_mam_muc_gdpr_data(Acc, Params, Extra) -> {ok | stop, Acc} when
60 Acc :: ejabberd_gen_mam_archive:mam_muc_gdpr_data(),
61 Params :: #{jid := jid:jid()},
62 Extra :: gen_hook:extra().
63 get_mam_muc_gdpr_data(Acc, #{jid := Source}, _Extra) ->
64 34 BinSource = mod_mam_utils:bare_jid(Source),
65 34 Filter = #{term => #{from_jid => BinSource}},
66 34 Sorting = #{mam_id => #{order => asc}},
67 34 SearchQuery = #{query => #{bool => #{filter => Filter}},
68 sort => Sorting},
69 34 case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of
70 {ok, #{<<"hits">> := #{<<"hits">> := Hits}}} ->
71 34 Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits),
72 34 {ok, Messages ++ Acc};
73 {error, _} ->
{ok, Acc}
75 end.
77 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
78 Acc :: ok | {error, term()},
79 Params :: mod_mam:archive_message_params(),
80 Extra :: gen_hook:extra().
81 archive_message(_Result, Params, #{host_type := HostType}) ->
82 351 #{message_id := MessageId,
83 local_jid := RoomJid,
84 remote_jid := FromJID,
85 source_jid := SourceJid,
86 packet := Packet} = Params,
87 351 Room = mod_mam_utils:bare_jid(RoomJid),
88 351 SourceBinJid = mod_mam_utils:full_jid(SourceJid),
89 351 From = mod_mam_utils:bare_jid(FromJID),
90 351 DocId = make_document_id(Room, MessageId),
91 351 Doc = make_document(MessageId, Room, SourceBinJid, Packet, From),
92 351 case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of
93 {ok, _} ->
94 351 {ok, ok};
95 {error, Reason} = Err ->
97 #{what => archive_muc_message_failed, reason => Reason,
98 server => HostType, room => Room, source => SourceBinJid,
message_id => MessageId})),
mongoose_metrics:update(HostType, modMamDropped, 1),
{ok, Err}
102 end.
104 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
105 Acc :: {ok, mod_mam:lookup_result()} | {error, term()},
106 Params :: mam_iq:lookup_params(),
107 Extra :: gen_hook:extra().
108 lookup_messages(Result,
109 #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params,
110 #{host_type := HostType})
111 when ID =/= undefined ->
112 7 {ok, lookup_message_page(Result, HostType, RSM, Params)};
113 lookup_messages(Result,
114 #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params,
115 #{host_type := HostType})
116 when ID =/= undefined ->
117 7 {ok, lookup_message_page(Result, HostType, RSM, Params)};
118 lookup_messages(Result, Params, #{host_type := HostType}) ->
119 50 {ok, do_lookup_messages(Result, HostType, Params)}.
121 lookup_message_page(AccResult, Host, RSM, #{message_id := MsgID} = Params) ->
122 14 PageSize = maps:get(page_size, Params),
123 14 case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of
124 {error, _} = Err ->
126 {ok, LookupResult} ->
127 14 case MsgID of
128 undefined ->
129 12 mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult);
130 _ ->
131 2 {ok, LookupResult}
132 end
133 end.
135 do_lookup_messages(_Result, Host, Params) ->
136 64 SearchQuery0 = build_search_query(Params),
137 64 Sorting = [#{mam_id => #{order => determine_sorting(Params)}}],
138 64 ResultLimit = maps:get(page_size, Params),
139 64 SearchQuery1 = SearchQuery0#{sort => Sorting,
140 size => ResultLimit},
141 64 SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params),
142 64 case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of
143 {ok, Result} ->
144 64 {ok, search_result_to_mam_lookup_result(Result, Params)};
145 {error, Reason} = Err ->
147 #{what => lookup_muc_messages_failed,
server => Host, reason => Reason})),
150 end.
152 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
153 Acc :: integer(),
154 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
155 Extra :: gen_hook:extra().
156 archive_size(_Size, #{room := RoomJid}, _Extra) ->
157 223 SearchQuery = build_search_query(#{owner_jid => RoomJid}),
158 223 {ok, archive_size(SearchQuery)}.
160 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
161 Acc :: ok,
162 Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()},
163 Extra :: gen_hook:extra().
164 remove_archive(Acc, #{room := RoomJid}, #{host_type := HostType}) ->
165 73 SearchQuery = build_search_query(#{owner_jid => RoomJid}),
166 73 case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of
167 ok ->
168 73 ok;
169 {error, Reason} ->
?LOG_ERROR(#{what => remove_muc_archive_failed,
server => HostType, room_jid => RoomJid, reason => Reason}),
173 end,
174 73 {ok, Acc}.
176 %%-------------------------------------------------------------------
177 %% Helpers
178 %%-------------------------------------------------------------------
180 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
181 hooks(Host) ->
182 16 [{mam_muc_archive_message, Host, fun ?MODULE:archive_message/3, #{}, 50},
183 {mam_muc_lookup_messages, Host, fun ?MODULE:lookup_messages/3, #{}, 50},
184 {mam_muc_archive_size, Host, fun ?MODULE:archive_size/3, #{}, 50},
185 {mam_muc_remove_archive, Host, fun ?MODULE:remove_archive/3, #{}, 50},
186 {get_mam_muc_gdpr_data, Host, fun ?MODULE:get_mam_muc_gdpr_data/3, #{}, 50}].
188 -spec make_document_id(binary(), mod_mam:message_id()) -> binary().
189 make_document_id(Room, MessageId) ->
190 351 <<Room/binary, $$, (integer_to_binary(MessageId))/binary>>.
192 -spec make_document(mod_mam:message_id(), binary(), binary(), exml:element(),
193 binary()) -> map().
194 make_document(MessageId, Room, SourceBinJid, Packet, FromJID) ->
195 351 #{mam_id => MessageId,
196 from_jid => FromJID,
197 room => Room,
198 source_jid => SourceBinJid,
199 message => exml:to_binary(Packet),
200 body => exml_query:path(Packet, [{element, <<"body">>}, cdata])
201 }.
203 -spec build_search_query(map()) -> mongoose_elasticsearch:query().
204 build_search_query(Params) ->
205 366 Filters = build_filters(Params),
206 366 TextSearchQuery = build_text_search_query(Params),
207 366 #{query =>
208 #{bool =>
209 #{must => TextSearchQuery,
210 filter => Filters}}}.
212 -spec build_filters(map()) -> [map()].
213 build_filters(Params) ->
214 366 Builders = [fun room_filter/1,
215 fun with_jid_filter/1,
216 fun range_filter/1,
217 fun specific_message_filter/1],
218 366 lists:flatmap(fun(F) -> F(Params) end, Builders).
220 -spec room_filter(map()) -> [map()].
221 room_filter(#{owner_jid := Room}) ->
222 366 BinRoom = mod_mam_utils:bare_jid(Room),
223 366 [#{term => #{room => BinRoom}}].
225 -spec with_jid_filter(map()) -> [map()].
226 with_jid_filter(#{with_jid := #jid{} = WithJid}) ->
227 2 [#{term => #{source_jid => mod_mam_utils:full_jid(WithJid)}}];
228 with_jid_filter(_) ->
229 364 [].
231 -spec range_filter(map()) -> [map()].
232 range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) ->
233 70 {StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End),
234 70 Range1 = maybe_add_end_filter(EndId, #{}),
235 70 Range2 = maybe_add_start_filter(StartId, Range1),
237 70 case maps:size(Range2) of
238 0 ->
239 49 [];
240 _ ->
241 21 [#{range => #{mam_id => Range2}}]
242 end;
243 range_filter(_) ->
244 296 [].
246 -spec specific_message_filter(map()) -> [map()].
247 specific_message_filter(#{message_id := ID}) when is_integer(ID) ->
248 7 [#{term => #{mam_id => ID}}];
249 specific_message_filter(_) ->
250 359 [].
252 -spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map().
253 maybe_add_end_filter(undefined, RangeMap) ->
254 59 RangeMap;
255 maybe_add_end_filter(Value, RangeMap) ->
256 11 RangeMap#{le => Value}.
258 -spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map().
259 maybe_add_start_filter(undefined, RangeMap) ->
260 58 RangeMap;
261 maybe_add_start_filter(Value, RangeMap) ->
262 12 RangeMap#{ge => Value}.
264 -spec build_text_search_query(map()) -> map().
265 build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) ->
266 1 #{simple_query_string => #{query => SearchText,
267 fields => [<<"body">>],
268 default_operator => <<"and">>}};
269 build_text_search_query(_) ->
270 365 #{match_all => #{}}.
272 -spec determine_sorting(map()) -> asc | desc.
273 determine_sorting(#{rsm := #rsm_in{direction = before}}) ->
274 15 desc;
275 determine_sorting(_) ->
276 49 asc.
278 -spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) ->
279 mongoose_elasticsearch:query().
280 maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) ->
281 2 Query#{from => Offset};
282 maybe_add_from_constraint(Query, _) ->
283 62 Query.
285 -spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result().
286 search_result_to_mam_lookup_result(Result, Params) ->
287 64 #{<<"hits">> :=
288 #{<<"hits">> := Hits,
289 <<"total">> := TotalCount}} = Result,
291 64 Messages = lists:sort(
292 lists:map(fun hit_to_mam_message/1, Hits)),
294 64 case maps:get(is_simple, Params) of
295 true ->
296 15 {undefined, undefined, Messages};
297 _ ->
298 49 CorrectedTotalCount = corrected_total_count(TotalCount, Params),
299 49 Count = length(Messages),
300 49 Offset = calculate_offset(TotalCount, Count, Params),
301 49 {CorrectedTotalCount, Offset, Messages}
302 end.
304 -spec hit_to_mam_message(map()) -> mod_mam:message_row().
305 hit_to_mam_message(#{<<"_source">> := JSON}) ->
306 253 MessageId = maps:get(<<"mam_id">>, JSON),
307 253 Packet = maps:get(<<"message">>, JSON),
308 253 SourceJid = maps:get(<<"source_jid">>, JSON),
309 253 {ok, Stanza} = exml:parse(Packet),
310 253 #{id => MessageId, jid => jid:from_binary(SourceJid), packet => Stanza}.
312 hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) ->
313 13 MessageId = maps:get(<<"mam_id">>, JSON),
314 13 Packet = maps:get(<<"message">>, JSON),
315 13 {integer_to_binary(MessageId), Packet}.
317 %% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count
318 %% returned by the query should represent the size of the whole result set, which in case of MAM
319 %% is bound only by the MAM filters.
320 %% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race
321 %% condition here, because a user may send a message between initial request to ElasticSearch and
322 %% the count request issued here.
323 -spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) ->
324 non_neg_integer().
325 corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) ->
326 4 Query = build_search_query(Params#{rsm := undefined}),
327 4 archive_size(Query);
328 corrected_total_count(Count, _) ->
329 45 Count.
331 -spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer().
332 calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) ->
333 2 Index;
334 calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) ->
335 8 TotalCount - Count;
336 calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) ->
337 %% Not sure how this works..
338 2 Params1 = update_borders(Params0#{rsm := undefined}, Id + 1),
339 2 Query = build_search_query(Params1),
340 2 archive_size(Query);
341 calculate_offset(_, _, _) ->
342 37 0.
344 -spec update_borders(map(), non_neg_integer()) -> map().
345 update_borders(#{borders := Borders} = Params, EndId) ->
346 2 Params#{borders := update_borders_to_id(Borders, EndId)}.
348 -spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}.
349 update_borders_to_id(undefined, EndId) ->
350 2 #mam_borders{to_id = EndId};
351 update_borders_to_id(Borders, EndId) ->
Borders#mam_borders{to_id = EndId}.
354 -spec archive_size(mongoose_elasticsearch:query()) -> non_neg_integer().
355 archive_size(Query) ->
356 229 case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of
357 {ok, Count} ->
358 229 Count;
359 {error, Reason} ->
?LOG_ERROR(#{what => archive_size_failed, es_query => Query, reason => Reason}),
362 end.
