1 |
|
%%------------------------------------------------------------------ |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc ElasticSearch backend for Message Archive Management for |
17 |
|
%% group messages. |
18 |
|
%%------------------------------------------------------------------ |
19 |
|
-module(mod_mam_muc_elasticsearch_arch). |
20 |
|
|
21 |
|
-behaviour(gen_mod). |
22 |
|
-behaviour(ejabberd_gen_mam_archive). |
23 |
|
|
24 |
|
%% gen_mod callbacks |
25 |
|
-export([start/2]). |
26 |
|
-export([stop/1]). |
27 |
|
-export([hooks/1]). |
28 |
|
|
29 |
|
%% ejabberd_gen_mam_archive callbacks |
30 |
|
-export([archive_message/3]). |
31 |
|
-export([lookup_messages/3]). |
32 |
|
-export([remove_archive/3]). |
33 |
|
-export([archive_size/3]). |
34 |
|
-export([get_mam_muc_gdpr_data/3]). |
35 |
|
|
36 |
|
-include("mongoose.hrl"). |
37 |
|
-include("mongoose_rsm.hrl"). |
38 |
|
-include("mod_mam.hrl"). |
39 |
|
-include("jlib.hrl"). |
40 |
|
|
41 |
|
-define(INDEX_NAME, <<"muc_messages">>). |
42 |
|
-define(TYPE_NAME, <<"muc">>). |
43 |
|
|
44 |
|
%%------------------------------------------------------------------- |
45 |
|
%% gen_mod callbacks |
46 |
|
%%------------------------------------------------------------------- |
47 |
|
|
48 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
49 |
|
start(_HostType, _Opts) -> |
50 |
:-( |
ok. |
51 |
|
|
52 |
|
-spec stop(mongooseim:host_type()) -> ok. |
53 |
|
stop(_HostType) -> |
54 |
:-( |
ok. |
55 |
|
|
56 |
|
%%------------------------------------------------------------------- |
57 |
|
%% ejabberd_gen_mam_archive callbacks |
58 |
|
%%------------------------------------------------------------------- |
59 |
|
-spec get_mam_muc_gdpr_data(Acc, Params, Extra) -> {ok | stop, Acc} when |
60 |
|
Acc :: ejabberd_gen_mam_archive:mam_muc_gdpr_data(), |
61 |
|
Params :: #{jid := jid:jid()}, |
62 |
|
Extra :: gen_hook:extra(). |
63 |
|
get_mam_muc_gdpr_data(Acc, #{jid := Source}, _Extra) -> |
64 |
:-( |
BinSource = mod_mam_utils:bare_jid(Source), |
65 |
:-( |
Filter = #{term => #{from_jid => BinSource}}, |
66 |
:-( |
Sorting = #{mam_id => #{order => asc}}, |
67 |
:-( |
SearchQuery = #{query => #{bool => #{filter => Filter}}, |
68 |
|
sort => Sorting}, |
69 |
:-( |
case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of |
70 |
|
{ok, #{<<"hits">> := #{<<"hits">> := Hits}}} -> |
71 |
:-( |
Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits), |
72 |
:-( |
{ok, Messages ++ Acc}; |
73 |
|
{error, _} -> |
74 |
:-( |
{ok, Acc} |
75 |
|
end. |
76 |
|
|
77 |
|
-spec archive_message(Acc, Params, Extra) -> {ok, Acc} when |
78 |
|
Acc :: ok | {error, term()}, |
79 |
|
Params :: mod_mam:archive_message_params(), |
80 |
|
Extra :: gen_hook:extra(). |
81 |
|
archive_message(_Result, Params, #{host_type := HostType}) -> |
82 |
:-( |
#{message_id := MessageId, |
83 |
|
local_jid := RoomJid, |
84 |
|
remote_jid := FromJID, |
85 |
|
source_jid := SourceJid, |
86 |
|
packet := Packet} = Params, |
87 |
:-( |
Room = mod_mam_utils:bare_jid(RoomJid), |
88 |
:-( |
SourceBinJid = mod_mam_utils:full_jid(SourceJid), |
89 |
:-( |
From = mod_mam_utils:bare_jid(FromJID), |
90 |
:-( |
DocId = make_document_id(Room, MessageId), |
91 |
:-( |
Doc = make_document(MessageId, Room, SourceBinJid, Packet, From), |
92 |
:-( |
case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of |
93 |
|
{ok, _} -> |
94 |
:-( |
{ok, ok}; |
95 |
|
{error, Reason} = Err -> |
96 |
:-( |
?LOG_ERROR(maps:merge(Params, |
97 |
|
#{what => archive_muc_message_failed, reason => Reason, |
98 |
|
server => HostType, room => Room, source => SourceBinJid, |
99 |
:-( |
message_id => MessageId})), |
100 |
:-( |
mongoose_instrument:execute(mod_mam_muc_dropped, #{host_type => HostType}, #{count => 1}), |
101 |
:-( |
{ok, Err} |
102 |
|
end. |
103 |
|
|
104 |
|
-spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when |
105 |
|
Acc :: {ok, mod_mam:lookup_result()} | {error, term()}, |
106 |
|
Params :: mam_iq:lookup_params(), |
107 |
|
Extra :: gen_hook:extra(). |
108 |
|
lookup_messages(Result, |
109 |
|
#{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params, |
110 |
|
#{host_type := HostType}) |
111 |
|
when ID =/= undefined -> |
112 |
:-( |
{ok, lookup_message_page(Result, HostType, RSM, Params)}; |
113 |
|
lookup_messages(Result, |
114 |
|
#{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params, |
115 |
|
#{host_type := HostType}) |
116 |
|
when ID =/= undefined -> |
117 |
:-( |
{ok, lookup_message_page(Result, HostType, RSM, Params)}; |
118 |
|
lookup_messages(Result, Params, #{host_type := HostType}) -> |
119 |
:-( |
{ok, do_lookup_messages(Result, HostType, Params)}. |
120 |
|
|
121 |
|
lookup_message_page(AccResult, Host, RSM, #{message_id := MsgID} = Params) -> |
122 |
:-( |
PageSize = maps:get(page_size, Params), |
123 |
:-( |
case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of |
124 |
|
{error, _} = Err -> |
125 |
:-( |
Err; |
126 |
|
{ok, LookupResult} -> |
127 |
:-( |
case MsgID of |
128 |
|
undefined -> |
129 |
:-( |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult); |
130 |
|
_ -> |
131 |
:-( |
{ok, LookupResult} |
132 |
|
end |
133 |
|
end. |
134 |
|
|
135 |
|
do_lookup_messages(_Result, Host, Params) -> |
136 |
:-( |
SearchQuery0 = build_search_query(Params), |
137 |
:-( |
Sorting = [#{mam_id => #{order => determine_sorting(Params)}}], |
138 |
:-( |
ResultLimit = maps:get(page_size, Params), |
139 |
:-( |
SearchQuery1 = SearchQuery0#{sort => Sorting, |
140 |
|
size => ResultLimit}, |
141 |
:-( |
SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params), |
142 |
:-( |
case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of |
143 |
|
{ok, Result} -> |
144 |
:-( |
{ok, search_result_to_mam_lookup_result(Result, Params)}; |
145 |
|
{error, Reason} = Err -> |
146 |
:-( |
?LOG_ERROR(maps:merge(Params, |
147 |
|
#{what => lookup_muc_messages_failed, |
148 |
:-( |
server => Host, reason => Reason})), |
149 |
:-( |
Err |
150 |
|
end. |
151 |
|
|
152 |
|
-spec archive_size(Acc, Params, Extra) -> {ok, Acc} when |
153 |
|
Acc :: integer(), |
154 |
|
Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()}, |
155 |
|
Extra :: gen_hook:extra(). |
156 |
|
archive_size(_Size, #{room := RoomJid}, _Extra) -> |
157 |
:-( |
SearchQuery = build_search_query(#{owner_jid => RoomJid}), |
158 |
:-( |
{ok, archive_size(SearchQuery)}. |
159 |
|
|
160 |
|
-spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when |
161 |
|
Acc :: ok, |
162 |
|
Params :: #{archive_id := mod_mam:archive_id() | undefined, room := jid:jid()}, |
163 |
|
Extra :: gen_hook:extra(). |
164 |
|
remove_archive(Acc, #{room := RoomJid}, #{host_type := HostType}) -> |
165 |
:-( |
SearchQuery = build_search_query(#{owner_jid => RoomJid}), |
166 |
:-( |
case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of |
167 |
|
ok -> |
168 |
:-( |
ok; |
169 |
|
{error, Reason} -> |
170 |
:-( |
?LOG_ERROR(#{what => remove_muc_archive_failed, |
171 |
:-( |
server => HostType, room_jid => RoomJid, reason => Reason}), |
172 |
:-( |
ok |
173 |
|
end, |
174 |
:-( |
{ok, Acc}. |
175 |
|
|
176 |
|
%%------------------------------------------------------------------- |
177 |
|
%% Helpers |
178 |
|
%%------------------------------------------------------------------- |
179 |
|
|
180 |
|
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). |
181 |
|
hooks(Host) -> |
182 |
:-( |
[{mam_muc_archive_message, Host, fun ?MODULE:archive_message/3, #{}, 50}, |
183 |
|
{mam_muc_lookup_messages, Host, fun ?MODULE:lookup_messages/3, #{}, 50}, |
184 |
|
{mam_muc_archive_size, Host, fun ?MODULE:archive_size/3, #{}, 50}, |
185 |
|
{mam_muc_remove_archive, Host, fun ?MODULE:remove_archive/3, #{}, 50}, |
186 |
|
{get_mam_muc_gdpr_data, Host, fun ?MODULE:get_mam_muc_gdpr_data/3, #{}, 50}]. |
187 |
|
|
188 |
|
-spec make_document_id(binary(), mod_mam:message_id()) -> binary(). |
189 |
|
make_document_id(Room, MessageId) -> |
190 |
:-( |
<<Room/binary, $$, (integer_to_binary(MessageId))/binary>>. |
191 |
|
|
192 |
|
-spec make_document(mod_mam:message_id(), binary(), binary(), exml:element(), |
193 |
|
binary()) -> map(). |
194 |
|
make_document(MessageId, Room, SourceBinJid, Packet, FromJID) -> |
195 |
:-( |
#{mam_id => MessageId, |
196 |
|
from_jid => FromJID, |
197 |
|
room => Room, |
198 |
|
source_jid => SourceBinJid, |
199 |
|
message => exml:to_binary(Packet), |
200 |
|
body => exml_query:path(Packet, [{element, <<"body">>}, cdata]) |
201 |
|
}. |
202 |
|
|
203 |
|
-spec build_search_query(map()) -> mongoose_elasticsearch:query(). |
204 |
|
build_search_query(Params) -> |
205 |
:-( |
Filters = build_filters(Params), |
206 |
:-( |
TextSearchQuery = build_text_search_query(Params), |
207 |
:-( |
#{query => |
208 |
|
#{bool => |
209 |
|
#{must => TextSearchQuery, |
210 |
|
filter => Filters}}}. |
211 |
|
|
212 |
|
-spec build_filters(map()) -> [map()]. |
213 |
|
build_filters(Params) -> |
214 |
:-( |
Builders = [fun room_filter/1, |
215 |
|
fun with_jid_filter/1, |
216 |
|
fun range_filter/1, |
217 |
|
fun specific_message_filter/1], |
218 |
:-( |
lists:flatmap(fun(F) -> F(Params) end, Builders). |
219 |
|
|
220 |
|
-spec room_filter(map()) -> [map()]. |
221 |
|
room_filter(#{owner_jid := Room}) -> |
222 |
:-( |
BinRoom = mod_mam_utils:bare_jid(Room), |
223 |
:-( |
[#{term => #{room => BinRoom}}]. |
224 |
|
|
225 |
|
-spec with_jid_filter(map()) -> [map()]. |
226 |
|
with_jid_filter(#{with_jid := #jid{} = WithJid}) -> |
227 |
:-( |
[#{term => #{source_jid => mod_mam_utils:full_jid(WithJid)}}]; |
228 |
|
with_jid_filter(_) -> |
229 |
:-( |
[]. |
230 |
|
|
231 |
|
-spec range_filter(map()) -> [map()]. |
232 |
|
range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) -> |
233 |
:-( |
{StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End), |
234 |
:-( |
Range1 = maybe_add_end_filter(EndId, #{}), |
235 |
:-( |
Range2 = maybe_add_start_filter(StartId, Range1), |
236 |
|
|
237 |
:-( |
case maps:size(Range2) of |
238 |
|
0 -> |
239 |
:-( |
[]; |
240 |
|
_ -> |
241 |
:-( |
[#{range => #{mam_id => Range2}}] |
242 |
|
end; |
243 |
|
range_filter(_) -> |
244 |
:-( |
[]. |
245 |
|
|
246 |
|
-spec specific_message_filter(map()) -> [map()]. |
247 |
|
specific_message_filter(#{message_id := ID}) when is_integer(ID) -> |
248 |
:-( |
[#{term => #{mam_id => ID}}]; |
249 |
|
specific_message_filter(_) -> |
250 |
:-( |
[]. |
251 |
|
|
252 |
|
-spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map(). |
253 |
|
maybe_add_end_filter(undefined, RangeMap) -> |
254 |
:-( |
RangeMap; |
255 |
|
maybe_add_end_filter(Value, RangeMap) -> |
256 |
:-( |
RangeMap#{le => Value}. |
257 |
|
|
258 |
|
-spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map(). |
259 |
|
maybe_add_start_filter(undefined, RangeMap) -> |
260 |
:-( |
RangeMap; |
261 |
|
maybe_add_start_filter(Value, RangeMap) -> |
262 |
:-( |
RangeMap#{ge => Value}. |
263 |
|
|
264 |
|
-spec build_text_search_query(map()) -> map(). |
265 |
|
build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) -> |
266 |
:-( |
#{simple_query_string => #{query => SearchText, |
267 |
|
fields => [<<"body">>], |
268 |
|
default_operator => <<"and">>}}; |
269 |
|
build_text_search_query(_) -> |
270 |
:-( |
#{match_all => #{}}. |
271 |
|
|
272 |
|
-spec determine_sorting(map()) -> asc | desc. |
273 |
|
determine_sorting(#{rsm := #rsm_in{direction = before}}) -> |
274 |
:-( |
desc; |
275 |
|
determine_sorting(_) -> |
276 |
:-( |
asc. |
277 |
|
|
278 |
|
-spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) -> |
279 |
|
mongoose_elasticsearch:query(). |
280 |
|
maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) -> |
281 |
:-( |
Query#{from => Offset}; |
282 |
|
maybe_add_from_constraint(Query, _) -> |
283 |
:-( |
Query. |
284 |
|
|
285 |
|
-spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result(). |
286 |
|
search_result_to_mam_lookup_result(Result, Params) -> |
287 |
:-( |
#{<<"hits">> := |
288 |
|
#{<<"hits">> := Hits, |
289 |
|
<<"total">> := TotalCount}} = Result, |
290 |
|
|
291 |
:-( |
Messages = lists:sort( |
292 |
|
lists:map(fun hit_to_mam_message/1, Hits)), |
293 |
|
|
294 |
:-( |
case maps:get(is_simple, Params) of |
295 |
|
true -> |
296 |
:-( |
{undefined, undefined, Messages}; |
297 |
|
_ -> |
298 |
:-( |
CorrectedTotalCount = corrected_total_count(TotalCount, Params), |
299 |
:-( |
Count = length(Messages), |
300 |
:-( |
Offset = calculate_offset(TotalCount, Count, Params), |
301 |
:-( |
{CorrectedTotalCount, Offset, Messages} |
302 |
|
end. |
303 |
|
|
304 |
|
-spec hit_to_mam_message(map()) -> mod_mam:message_row(). |
305 |
|
hit_to_mam_message(#{<<"_source">> := JSON}) -> |
306 |
:-( |
MessageId = maps:get(<<"mam_id">>, JSON), |
307 |
:-( |
Packet = maps:get(<<"message">>, JSON), |
308 |
:-( |
SourceJid = maps:get(<<"source_jid">>, JSON), |
309 |
:-( |
{ok, Stanza} = exml:parse(Packet), |
310 |
:-( |
#{id => MessageId, jid => jid:from_binary(SourceJid), packet => Stanza}. |
311 |
|
|
312 |
|
hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) -> |
313 |
:-( |
MessageId = maps:get(<<"mam_id">>, JSON), |
314 |
:-( |
Packet = maps:get(<<"message">>, JSON), |
315 |
:-( |
{integer_to_binary(MessageId), Packet}. |
316 |
|
|
317 |
|
%% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count |
318 |
|
%% returned by the query should represent the size of the whole result set, which in case of MAM |
319 |
|
%% is bound only by the MAM filters. |
320 |
|
%% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race |
321 |
|
%% condition here, because a user may send a message between initial request to ElasticSearch and |
322 |
|
%% the count request issued here. |
323 |
|
-spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) -> |
324 |
|
non_neg_integer(). |
325 |
|
corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) -> |
326 |
:-( |
Query = build_search_query(Params#{rsm := undefined}), |
327 |
:-( |
archive_size(Query); |
328 |
|
corrected_total_count(Count, _) -> |
329 |
:-( |
Count. |
330 |
|
|
331 |
|
-spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer(). |
332 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) -> |
333 |
:-( |
Index; |
334 |
|
calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) -> |
335 |
:-( |
TotalCount - Count; |
336 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) -> |
337 |
|
%% Not sure how this works.. |
338 |
:-( |
Params1 = update_borders(Params0#{rsm := undefined}, Id + 1), |
339 |
:-( |
Query = build_search_query(Params1), |
340 |
:-( |
archive_size(Query); |
341 |
|
calculate_offset(_, _, _) -> |
342 |
:-( |
0. |
343 |
|
|
344 |
|
-spec update_borders(map(), non_neg_integer()) -> map(). |
345 |
|
update_borders(#{borders := Borders} = Params, EndId) -> |
346 |
:-( |
Params#{borders := update_borders_to_id(Borders, EndId)}. |
347 |
|
|
348 |
|
-spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}. |
349 |
|
update_borders_to_id(undefined, EndId) -> |
350 |
:-( |
#mam_borders{to_id = EndId}; |
351 |
|
update_borders_to_id(Borders, EndId) -> |
352 |
:-( |
Borders#mam_borders{to_id = EndId}. |
353 |
|
|
354 |
|
-spec archive_size(mongoose_elasticsearch:query()) -> non_neg_integer(). |
355 |
|
archive_size(Query) -> |
356 |
:-( |
case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of |
357 |
|
{ok, Count} -> |
358 |
:-( |
Count; |
359 |
|
{error, Reason} -> |
360 |
:-( |
?LOG_ERROR(#{what => archive_size_failed, es_query => Query, reason => Reason}), |
361 |
:-( |
0 |
362 |
|
end. |