1 |
|
%%------------------------------------------------------------------ |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc ElasticSearch backend for Message Archive Management for |
17 |
|
%% group messages. |
18 |
|
%%------------------------------------------------------------------ |
19 |
|
-module(mod_mam_muc_elasticsearch_arch). |
20 |
|
|
21 |
|
-behaviour(gen_mod). |
22 |
|
-behaviour(ejabberd_gen_mam_archive). |
23 |
|
|
24 |
|
%% gen_mod callbacks |
25 |
|
-export([start/2]). |
26 |
|
-export([stop/1]). |
27 |
|
|
28 |
|
%% ejabberd_gen_mam_archive callbacks |
29 |
|
-export([archive_message/3]). |
30 |
|
-export([lookup_messages/3]). |
31 |
|
-export([remove_archive/4]). |
32 |
|
-export([archive_size/4]). |
33 |
|
|
34 |
|
%gdpr |
35 |
|
-export([get_mam_muc_gdpr_data/3]). |
36 |
|
|
37 |
|
-ignore_xref([remove_archive/4]). |
38 |
|
|
39 |
|
-include("mongoose.hrl"). |
40 |
|
-include("mongoose_rsm.hrl"). |
41 |
|
-include("mod_mam.hrl"). |
42 |
|
-include("jlib.hrl"). |
43 |
|
|
44 |
|
-define(INDEX_NAME, <<"muc_messages">>). |
45 |
|
-define(TYPE_NAME, <<"muc">>). |
46 |
|
|
47 |
|
%%------------------------------------------------------------------- |
48 |
|
%% gen_mod callbacks |
49 |
|
%%------------------------------------------------------------------- |
50 |
|
|
51 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
52 |
|
start(HostType, _Opts) -> |
53 |
8 |
ejabberd_hooks:add(hooks(HostType)), |
54 |
8 |
ok. |
55 |
|
|
56 |
|
-spec stop(mongooseim:host_type()) -> ok. |
57 |
|
stop(HostType) -> |
58 |
8 |
ejabberd_hooks:delete(hooks(HostType)), |
59 |
8 |
ok. |
60 |
|
|
61 |
|
%%------------------------------------------------------------------- |
62 |
|
%% ejabberd_gen_mam_archive callbacks |
63 |
|
%%------------------------------------------------------------------- |
64 |
|
-spec get_mam_muc_gdpr_data(ejabberd_gen_mam_archive:mam_muc_gdpr_data(), |
65 |
|
mongooseim:host_type(), jid:jid()) -> |
66 |
|
ejabberd_gen_mam_archive:mam_muc_gdpr_data(). |
67 |
|
get_mam_muc_gdpr_data(Acc, _HostType, Source) -> |
68 |
34 |
BinSource = mod_mam_utils:bare_jid(Source), |
69 |
34 |
Filter = #{term => #{from_jid => BinSource}}, |
70 |
34 |
Sorting = #{mam_id => #{order => asc}}, |
71 |
34 |
SearchQuery = #{query => #{bool => #{filter => Filter}}, |
72 |
|
sort => Sorting}, |
73 |
34 |
case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of |
74 |
|
{ok, #{<<"hits">> := #{<<"hits">> := Hits}}} -> |
75 |
34 |
Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits), |
76 |
34 |
Messages ++ Acc; |
77 |
|
{error, _} -> |
78 |
:-( |
Acc |
79 |
|
end. |
80 |
|
|
81 |
|
archive_message(_Result, Host, #{message_id := MessageId, |
82 |
|
local_jid := RoomJid, |
83 |
|
remote_jid := FromJID, |
84 |
|
source_jid := SourceJid, |
85 |
|
packet := Packet} = Params) -> |
86 |
204 |
Room = mod_mam_utils:bare_jid(RoomJid), |
87 |
204 |
SourceBinJid = mod_mam_utils:full_jid(SourceJid), |
88 |
204 |
From = mod_mam_utils:bare_jid(FromJID), |
89 |
204 |
DocId = make_document_id(Room, MessageId), |
90 |
204 |
Doc = make_document(MessageId, Room, SourceBinJid, Packet, From), |
91 |
204 |
case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of |
92 |
|
{ok, _} -> |
93 |
204 |
ok; |
94 |
|
{error, Reason} = Err -> |
95 |
:-( |
?LOG_ERROR(maps:merge(Params, |
96 |
|
#{what => archive_muc_message_failed, reason => Reason, |
97 |
|
server => Host, room => Room, source => SourceBinJid, |
98 |
:-( |
message_id => MessageId})), |
99 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
100 |
:-( |
Err |
101 |
|
end. |
102 |
|
|
103 |
|
lookup_messages(Result, Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params) |
104 |
|
when ID =/= undefined -> |
105 |
3 |
lookup_message_page(Result, Host, RSM, Params); |
106 |
|
lookup_messages(Result, Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params) |
107 |
|
when ID =/= undefined -> |
108 |
2 |
lookup_message_page(Result, Host, RSM, Params); |
109 |
|
lookup_messages(Result, Host, Params) -> |
110 |
35 |
do_lookup_messages(Result, Host, Params). |
111 |
|
|
112 |
|
lookup_message_page(AccResult, Host, RSM, Params) -> |
113 |
5 |
PageSize = maps:get(page_size, Params), |
114 |
5 |
case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of |
115 |
:-( |
{error, _} = Err -> Err; |
116 |
|
{ok, LookupResult} -> |
117 |
5 |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult) |
118 |
|
end. |
119 |
|
|
120 |
|
do_lookup_messages(_Result, Host, Params) -> |
121 |
40 |
SearchQuery0 = build_search_query(Params), |
122 |
40 |
Sorting = [#{mam_id => #{order => determine_sorting(Params)}}], |
123 |
40 |
ResultLimit = maps:get(page_size, Params), |
124 |
40 |
SearchQuery1 = SearchQuery0#{sort => Sorting, |
125 |
|
size => ResultLimit}, |
126 |
40 |
SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params), |
127 |
40 |
case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of |
128 |
|
{ok, Result} -> |
129 |
40 |
{ok, search_result_to_mam_lookup_result(Result, Params)}; |
130 |
|
{error, Reason} = Err -> |
131 |
:-( |
?LOG_ERROR(maps:merge(Params, |
132 |
|
#{what => lookup_muc_messages_failed, |
133 |
:-( |
server => Host, reason => Reason})), |
134 |
:-( |
Err |
135 |
|
end. |
136 |
|
|
137 |
|
-spec archive_size(Size :: integer(), |
138 |
|
Host :: jid:server(), |
139 |
|
_ArchiveId, |
140 |
|
RoomJid :: jid:jid()) -> non_neg_integer(). |
141 |
|
archive_size(_Size, _Host, _ArchiveId, RoomJid) -> |
142 |
107 |
SearchQuery = build_search_query(#{owner_jid => RoomJid}), |
143 |
107 |
archive_size(SearchQuery). |
144 |
|
|
145 |
|
-spec remove_archive(Acc :: mongoose_acc:t(), |
146 |
|
Host :: jid:server(), |
147 |
|
ArchiveId :: mod_mam:archive_id(), |
148 |
|
RoomJid :: jid:jid()) -> Acc when Acc :: map(). |
149 |
|
remove_archive(Acc, Host, _ArchiveId, RoomJid) -> |
150 |
33 |
SearchQuery = build_search_query(#{owner_jid => RoomJid}), |
151 |
33 |
case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of |
152 |
|
ok -> |
153 |
33 |
ok; |
154 |
|
{error, Reason} -> |
155 |
:-( |
?LOG_ERROR(#{what => remove_muc_archive_failed, |
156 |
:-( |
server => Host, room_jid => RoomJid, reason => Reason}), |
157 |
:-( |
ok |
158 |
|
end, |
159 |
33 |
Acc. |
160 |
|
|
161 |
|
%%------------------------------------------------------------------- |
162 |
|
%% Helpers |
163 |
|
%%------------------------------------------------------------------- |
164 |
|
|
165 |
|
-spec hooks(jid:lserver()) -> [ejabberd_hooks:hook()]. |
166 |
|
hooks(Host) -> |
167 |
16 |
[{mam_muc_archive_message, Host, ?MODULE, archive_message, 50}, |
168 |
|
{mam_muc_lookup_messages, Host, ?MODULE, lookup_messages, 50}, |
169 |
|
{mam_muc_archive_size, Host, ?MODULE, archive_size, 50}, |
170 |
|
{mam_muc_remove_archive, Host, ?MODULE, remove_archive, 50}, |
171 |
|
{get_mam_muc_gdpr_data, Host, ?MODULE, get_mam_muc_gdpr_data, 50}]. |
172 |
|
|
173 |
|
-spec make_document_id(binary(), mod_mam:message_id()) -> binary(). |
174 |
|
make_document_id(Room, MessageId) -> |
175 |
204 |
<<Room/binary, $$, (integer_to_binary(MessageId))/binary>>. |
176 |
|
|
177 |
|
-spec make_document(mod_mam:message_id(), binary(), binary(), exml:element(), |
178 |
|
binary()) -> map(). |
179 |
|
make_document(MessageId, Room, SourceBinJid, Packet, FromJID) -> |
180 |
204 |
#{mam_id => MessageId, |
181 |
|
from_jid => FromJID, |
182 |
|
room => Room, |
183 |
|
source_jid => SourceBinJid, |
184 |
|
message => exml:to_binary(Packet), |
185 |
|
body => exml_query:path(Packet, [{element, <<"body">>}, cdata]) |
186 |
|
}. |
187 |
|
|
188 |
|
-spec build_search_query(map()) -> mongoose_elasticsearch:query(). |
189 |
|
build_search_query(Params) -> |
190 |
186 |
Filters = build_filters(Params), |
191 |
186 |
TextSearchQuery = build_text_search_query(Params), |
192 |
186 |
#{query => |
193 |
|
#{bool => |
194 |
|
#{must => TextSearchQuery, |
195 |
|
filter => Filters}}}. |
196 |
|
|
197 |
|
-spec build_filters(map()) -> [map()]. |
198 |
|
build_filters(Params) -> |
199 |
186 |
Builders = [fun room_filter/1, |
200 |
|
fun with_jid_filter/1, |
201 |
|
fun range_filter/1], |
202 |
186 |
lists:flatmap(fun(F) -> F(Params) end, Builders). |
203 |
|
|
204 |
|
-spec room_filter(map()) -> [map()]. |
205 |
|
room_filter(#{owner_jid := Room}) -> |
206 |
186 |
BinRoom = mod_mam_utils:bare_jid(Room), |
207 |
186 |
[#{term => #{room => BinRoom}}]. |
208 |
|
|
209 |
|
-spec with_jid_filter(map()) -> [map()]. |
210 |
|
with_jid_filter(#{with_jid := #jid{} = WithJid}) -> |
211 |
2 |
[#{term => #{source_jid => mod_mam_utils:full_jid(WithJid)}}]; |
212 |
|
with_jid_filter(_) -> |
213 |
184 |
[]. |
214 |
|
|
215 |
|
-spec range_filter(map()) -> [map()]. |
216 |
|
range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) -> |
217 |
46 |
{StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End), |
218 |
46 |
Range1 = maybe_add_end_filter(EndId, #{}), |
219 |
46 |
Range2 = maybe_add_start_filter(StartId, Range1), |
220 |
|
|
221 |
46 |
case maps:size(Range2) of |
222 |
|
0 -> |
223 |
35 |
[]; |
224 |
|
_ -> |
225 |
11 |
[#{range => #{mam_id => Range2}}] |
226 |
|
end; |
227 |
|
range_filter(_) -> |
228 |
140 |
[]. |
229 |
|
|
230 |
|
-spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map(). |
231 |
|
maybe_add_end_filter(undefined, RangeMap) -> |
232 |
40 |
RangeMap; |
233 |
|
maybe_add_end_filter(Value, RangeMap) -> |
234 |
6 |
RangeMap#{le => Value}. |
235 |
|
|
236 |
|
-spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map(). |
237 |
|
maybe_add_start_filter(undefined, RangeMap) -> |
238 |
40 |
RangeMap; |
239 |
|
maybe_add_start_filter(Value, RangeMap) -> |
240 |
6 |
RangeMap#{ge => Value}. |
241 |
|
|
242 |
|
-spec build_text_search_query(map()) -> map(). |
243 |
|
build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) -> |
244 |
1 |
#{simple_query_string => #{query => SearchText, |
245 |
|
fields => [<<"body">>], |
246 |
|
default_operator => <<"and">>}}; |
247 |
|
build_text_search_query(_) -> |
248 |
185 |
#{match_all => #{}}. |
249 |
|
|
250 |
|
-spec determine_sorting(map()) -> asc | desc. |
251 |
|
determine_sorting(#{rsm := #rsm_in{direction = before}}) -> |
252 |
8 |
desc; |
253 |
|
determine_sorting(_) -> |
254 |
32 |
asc. |
255 |
|
|
256 |
|
-spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) -> |
257 |
|
mongoose_elasticsearch:query(). |
258 |
|
maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) -> |
259 |
2 |
Query#{from => Offset}; |
260 |
|
maybe_add_from_constraint(Query, _) -> |
261 |
38 |
Query. |
262 |
|
|
263 |
|
-spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result(). |
264 |
|
search_result_to_mam_lookup_result(Result, Params) -> |
265 |
40 |
#{<<"hits">> := |
266 |
|
#{<<"hits">> := Hits, |
267 |
|
<<"total">> := TotalCount}} = Result, |
268 |
|
|
269 |
40 |
Messages = lists:sort( |
270 |
|
lists:map(fun hit_to_mam_message/1, Hits)), |
271 |
|
|
272 |
40 |
case maps:get(is_simple, Params) of |
273 |
|
true -> |
274 |
1 |
{undefined, undefined, Messages}; |
275 |
|
_ -> |
276 |
39 |
CorrectedTotalCount = corrected_total_count(TotalCount, Params), |
277 |
39 |
Count = length(Messages), |
278 |
39 |
Offset = calculate_offset(TotalCount, Count, Params), |
279 |
39 |
{CorrectedTotalCount, Offset, Messages} |
280 |
|
end. |
281 |
|
|
282 |
|
-spec hit_to_mam_message(map()) -> mod_mam:message_row(). |
283 |
|
hit_to_mam_message(#{<<"_source">> := JSON}) -> |
284 |
200 |
MessageId = maps:get(<<"mam_id">>, JSON), |
285 |
200 |
Packet = maps:get(<<"message">>, JSON), |
286 |
200 |
SourceJid = maps:get(<<"source_jid">>, JSON), |
287 |
200 |
{ok, Stanza} = exml:parse(Packet), |
288 |
200 |
#{id => MessageId, jid => jid:from_binary(SourceJid), packet => Stanza}. |
289 |
|
|
290 |
|
hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) -> |
291 |
13 |
MessageId = maps:get(<<"mam_id">>, JSON), |
292 |
13 |
Packet = maps:get(<<"message">>, JSON), |
293 |
13 |
{integer_to_binary(MessageId), Packet}. |
294 |
|
|
295 |
|
%% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count |
296 |
|
%% returned by the query should represent the size of the whole result set, which in case of MAM |
297 |
|
%% is bound only by the MAM filters. |
298 |
|
%% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race |
299 |
|
%% condition here, because a user may send a message between initial request to ElasticSearch and |
300 |
|
%% the count request issued here. |
301 |
|
-spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) -> |
302 |
|
non_neg_integer(). |
303 |
|
corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) -> |
304 |
4 |
Query = build_search_query(Params#{rsm := undefined}), |
305 |
4 |
archive_size(Query); |
306 |
|
corrected_total_count(Count, _) -> |
307 |
35 |
Count. |
308 |
|
|
309 |
|
-spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer(). |
310 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) -> |
311 |
2 |
Index; |
312 |
|
calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) -> |
313 |
7 |
TotalCount - Count; |
314 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) -> |
315 |
|
%% Not sure how this works.. |
316 |
2 |
Params1 = update_borders(Params0#{rsm := undefined}, Id + 1), |
317 |
2 |
Query = build_search_query(Params1), |
318 |
2 |
archive_size(Query); |
319 |
|
calculate_offset(_, _, _) -> |
320 |
28 |
0. |
321 |
|
|
322 |
|
-spec update_borders(map(), non_neg_integer()) -> map(). |
323 |
|
update_borders(#{borders := Borders} = Params, EndId) -> |
324 |
2 |
Params#{borders := update_borders_to_id(Borders, EndId)}. |
325 |
|
|
326 |
|
-spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}. |
327 |
|
update_borders_to_id(undefined, EndId) -> |
328 |
2 |
#mam_borders{to_id = EndId}; |
329 |
|
update_borders_to_id(Borders, EndId) -> |
330 |
:-( |
Borders#mam_borders{to_id = EndId}. |
331 |
|
|
332 |
|
-spec archive_size(mod_mam_elasticsearch:query()) -> non_neg_integer(). |
333 |
|
archive_size(Query) -> |
334 |
113 |
case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of |
335 |
|
{ok, Count} -> |
336 |
113 |
Count; |
337 |
|
{error, Reason} -> |
338 |
:-( |
?LOG_ERROR(#{what => archive_size_failed, es_query => Query, reason => Reason}), |
339 |
:-( |
0 |
340 |
|
end. |