1 |
|
%%------------------------------------------------------------------ |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc ElasticSearch backend for Message Archive Management for |
17 |
|
%% one-to-one messages. |
18 |
|
%%%------------------------------------------------------------------ |
19 |
|
-module(mod_mam_elasticsearch_arch). |
20 |
|
|
21 |
|
-behaviour(gen_mod). |
22 |
|
-behaviour(ejabberd_gen_mam_archive). |
23 |
|
-behaviour(mongoose_module_metrics). |
24 |
|
|
25 |
|
%% gen_mod callbacks |
26 |
|
-export([start/2]). |
27 |
|
-export([stop/1]). |
28 |
|
|
29 |
|
%% ejabberd_gen_mam_archive callbacks |
30 |
|
-export([archive_message/3]). |
31 |
|
-export([lookup_messages/3]). |
32 |
|
-export([remove_archive/4]). |
33 |
|
-export([archive_size/4]). |
34 |
|
|
35 |
|
-export([get_mam_pm_gdpr_data/3]). |
36 |
|
|
37 |
|
-ignore_xref([remove_archive/4]). |
38 |
|
|
39 |
|
-include("mongoose.hrl"). |
40 |
|
-include("mongoose_rsm.hrl"). |
41 |
|
-include("mod_mam.hrl"). |
42 |
|
-include("jlib.hrl"). |
43 |
|
|
44 |
|
-define(INDEX_NAME, <<"messages">>). |
45 |
|
-define(TYPE_NAME, <<"pm">>). |
46 |
|
|
47 |
|
%%------------------------------------------------------------------- |
48 |
|
%% gen_mod callbacks |
49 |
|
%%------------------------------------------------------------------- |
50 |
|
|
51 |
|
-spec start(jid:server(), list()) -> ok. |
52 |
|
start(Host, _Opts) -> |
53 |
:-( |
ejabberd_hooks:add(hooks(Host)), |
54 |
:-( |
ok. |
55 |
|
|
56 |
|
-spec stop(jid:server()) -> ok. |
57 |
|
stop(Host) -> |
58 |
:-( |
ejabberd_hooks:delete(hooks(Host)), |
59 |
:-( |
ok. |
60 |
|
|
61 |
|
-spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(), |
62 |
|
mongooseim:host_type(), jid:jid()) -> |
63 |
|
ejabberd_gen_mam_archive:mam_pm_gdpr_data(). |
64 |
|
get_mam_pm_gdpr_data(Acc, _HostType, Owner) -> |
65 |
:-( |
BinOwner = mod_mam_utils:bare_jid(Owner), |
66 |
:-( |
Filter = #{term => #{owner => BinOwner}}, |
67 |
:-( |
Sorting = #{mam_id => #{order => asc}}, |
68 |
:-( |
SearchQuery = #{query => #{bool => #{filter => Filter}}, sort => Sorting}, |
69 |
:-( |
{ok, #{<<"hits">> := #{<<"hits">> := Hits}}} |
70 |
|
= mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery), |
71 |
:-( |
Messages = lists:map(fun hit_to_gdpr_mam_message/1, Hits), |
72 |
:-( |
Messages ++ Acc. |
73 |
|
|
74 |
|
%%------------------------------------------------------------------- |
75 |
|
%% ejabberd_gen_mam_archive callbacks |
76 |
|
%%------------------------------------------------------------------- |
77 |
|
|
78 |
|
archive_message(_Result, Host, #{message_id := MessageId, |
79 |
|
local_jid := LocalJid, |
80 |
|
remote_jid := RemoteJid, |
81 |
|
source_jid := SourceJid, |
82 |
|
packet := Packet}) -> |
83 |
:-( |
Owner = mod_mam_utils:bare_jid(LocalJid), |
84 |
:-( |
Remote = mod_mam_utils:bare_jid(RemoteJid), |
85 |
:-( |
SourceBinJid = mod_mam_utils:full_jid(SourceJid), |
86 |
:-( |
DocId = make_document_id(Owner, MessageId), |
87 |
:-( |
Doc = make_document(MessageId, Owner, Remote, SourceBinJid, Packet), |
88 |
:-( |
case mongoose_elasticsearch:insert_document(?INDEX_NAME, ?TYPE_NAME, DocId, Doc) of |
89 |
|
{ok, _} -> |
90 |
:-( |
ok; |
91 |
|
{error, Reason} = Err -> |
92 |
:-( |
?LOG_ERROR(#{what => archive_message_failed, |
93 |
|
user => Owner, server => Host, remote => Remote, |
94 |
:-( |
message_id => MessageId, reason => Reason}), |
95 |
:-( |
mongoose_metrics:update(Host, modMamDropped, 1), |
96 |
:-( |
Err |
97 |
|
end. |
98 |
|
|
99 |
|
lookup_messages(Result, Host, #{rsm := #rsm_in{direction = before, id = ID} = RSM} = Params) |
100 |
|
when ID =/= undefined -> |
101 |
:-( |
lookup_message_page(Result, Host, RSM, Params); |
102 |
|
lookup_messages(Result, Host, #{rsm := #rsm_in{direction = aft, id = ID} = RSM} = Params) |
103 |
|
when ID =/= undefined -> |
104 |
:-( |
lookup_message_page(Result, Host, RSM, Params); |
105 |
|
lookup_messages(Result, Host, Params) -> |
106 |
:-( |
do_lookup_messages(Result, Host, Params). |
107 |
|
|
108 |
|
lookup_message_page(AccResult, Host, #rsm_in{id = _ID} = RSM, Params) -> |
109 |
:-( |
PageSize = maps:get(page_size, Params), |
110 |
:-( |
case do_lookup_messages(AccResult, Host, Params#{page_size := 1 + PageSize}) of |
111 |
:-( |
{error, _} = Err -> Err; |
112 |
|
{ok, LookupResult} -> |
113 |
:-( |
mod_mam_utils:check_for_item_not_found(RSM, PageSize, LookupResult) |
114 |
|
end. |
115 |
|
|
116 |
|
do_lookup_messages(_Result, Host, Params) -> |
117 |
:-( |
SearchQuery0 = build_search_query(Params), |
118 |
:-( |
Sorting = [#{mam_id => #{order => determine_sorting(Params)}}], |
119 |
:-( |
ResultLimit = maps:get(page_size, Params), |
120 |
:-( |
SearchQuery1 = SearchQuery0#{sort => Sorting, |
121 |
|
size => ResultLimit}, |
122 |
:-( |
SearchQuery2 = maybe_add_from_constraint(SearchQuery1, Params), |
123 |
:-( |
case mongoose_elasticsearch:search(?INDEX_NAME, ?TYPE_NAME, SearchQuery2) of |
124 |
|
{ok, Result} -> |
125 |
:-( |
{ok, search_result_to_mam_lookup_result(Result, Params)}; |
126 |
|
{error, Reason} = Err -> |
127 |
:-( |
?LOG_ERROR(maps:merge(Params, |
128 |
|
#{what => lookup_messages_failed, |
129 |
:-( |
server => Host, reason => Reason})), |
130 |
:-( |
Err |
131 |
|
end. |
132 |
|
|
133 |
|
-spec archive_size(Size :: integer(), |
134 |
|
Host :: jid:server(), |
135 |
|
_ArchiveId, |
136 |
|
OwnerJid :: jid:jid()) -> non_neg_integer(). |
137 |
|
archive_size(_Size, _Host, _ArchiveId, OwnerJid) -> |
138 |
:-( |
SearchQuery = build_search_query(#{owner_jid => OwnerJid}), |
139 |
:-( |
archive_size(SearchQuery). |
140 |
|
|
141 |
|
-spec remove_archive(Acc :: mongoose_acc:t(), |
142 |
|
Host :: jid:server(), |
143 |
|
ArchiveId :: mod_mam:archive_id(), |
144 |
|
OwnerJid :: jid:jid()) -> Acc when Acc :: map(). |
145 |
|
remove_archive(Acc, Host, _ArchiveId, OwnerJid) -> |
146 |
:-( |
remove_archive(Host, OwnerJid), |
147 |
:-( |
Acc. |
148 |
|
|
149 |
|
remove_archive(Host, OwnerJid) -> |
150 |
:-( |
SearchQuery = build_search_query(#{owner_jid => OwnerJid}), |
151 |
:-( |
case mongoose_elasticsearch:delete_by_query(?INDEX_NAME, ?TYPE_NAME, SearchQuery) of |
152 |
|
ok -> |
153 |
:-( |
ok; |
154 |
|
{error, Reason} -> |
155 |
:-( |
?LOG_ERROR(#{what => remove_archive_failed, |
156 |
:-( |
server => Host, user_jid => OwnerJid, reason => Reason}), |
157 |
:-( |
ok |
158 |
|
end. |
159 |
|
|
160 |
|
%%------------------------------------------------------------------- |
161 |
|
%% Helpers |
162 |
|
%%------------------------------------------------------------------- |
163 |
|
|
164 |
|
-spec hooks(jid:lserver()) -> [ejabberd_hooks:hook()]. |
165 |
|
hooks(Host) -> |
166 |
:-( |
[{mam_archive_message, Host, ?MODULE, archive_message, 50}, |
167 |
|
{mam_lookup_messages, Host, ?MODULE, lookup_messages, 50}, |
168 |
|
{mam_archive_size, Host, ?MODULE, archive_size, 50}, |
169 |
|
{mam_remove_archive, Host, ?MODULE, remove_archive, 50}, |
170 |
|
{get_mam_pm_gdpr_data, Host, ?MODULE, get_mam_pm_gdpr_data, 50}]. |
171 |
|
|
172 |
|
-spec make_document_id(binary(), mod_mam:message_id()) -> binary(). |
173 |
|
make_document_id(Owner, MessageId) -> |
174 |
:-( |
<<Owner/binary, $$, (integer_to_binary(MessageId))/binary>>. |
175 |
|
|
176 |
|
-spec make_document(mod_mam:message_id(), binary(), binary(), binary(), exml:element()) -> |
177 |
|
map(). |
178 |
|
make_document(MessageId, Owner, Remote, SourceBinJid, Packet) -> |
179 |
:-( |
#{mam_id => MessageId, |
180 |
|
owner => Owner, |
181 |
|
remote => Remote, |
182 |
|
source_jid => SourceBinJid, |
183 |
|
message => exml:to_binary(Packet), |
184 |
|
body => exml_query:path(Packet, [{element, <<"body">>}, cdata]) |
185 |
|
}. |
186 |
|
|
187 |
|
-spec build_search_query(map()) -> mongoose_elasticsearch:query(). |
188 |
|
build_search_query(Params) -> |
189 |
:-( |
Filters = build_filters(Params), |
190 |
:-( |
TextSearchQuery = build_text_search_query(Params), |
191 |
:-( |
#{query => |
192 |
|
#{bool => |
193 |
|
#{must => TextSearchQuery, |
194 |
|
filter => Filters}}}. |
195 |
|
|
196 |
|
-spec build_filters(map()) -> [map()]. |
197 |
|
build_filters(Params) -> |
198 |
:-( |
Builders = [fun owner_filter/1, |
199 |
|
fun with_jid_filter/1, |
200 |
|
fun range_filter/1], |
201 |
:-( |
lists:flatmap(fun(F) -> F(Params) end, Builders). |
202 |
|
|
203 |
|
-spec owner_filter(map()) -> [map()]. |
204 |
|
owner_filter(#{owner_jid := Owner}) -> |
205 |
:-( |
BinOwner = mod_mam_utils:bare_jid(Owner), |
206 |
:-( |
[#{term => #{owner => BinOwner}}]. |
207 |
|
|
208 |
|
-spec with_jid_filter(map()) -> [map()]. |
209 |
|
with_jid_filter(#{with_jid := #jid{} = WithJid}) -> |
210 |
:-( |
[#{term => #{remote => mod_mam_utils:bare_jid(WithJid)}}]; |
211 |
|
with_jid_filter(_) -> |
212 |
:-( |
[]. |
213 |
|
|
214 |
|
-spec range_filter(map()) -> [map()]. |
215 |
|
range_filter(#{end_ts := End, start_ts := Start, borders := Borders, rsm := RSM}) -> |
216 |
:-( |
{StartId, EndId} = mod_mam_utils:calculate_msg_id_borders(RSM, Borders, Start, End), |
217 |
:-( |
Range1 = maybe_add_end_filter(EndId, #{}), |
218 |
:-( |
Range2 = maybe_add_start_filter(StartId, Range1), |
219 |
:-( |
case maps:size(Range2) of |
220 |
|
0 -> |
221 |
:-( |
[]; |
222 |
|
_ -> |
223 |
:-( |
[#{range => #{mam_id => Range2}}] |
224 |
|
end; |
225 |
|
range_filter(_) -> |
226 |
:-( |
[]. |
227 |
|
|
228 |
|
-spec maybe_add_end_filter(undefined | mod_mam:message_id(), map()) -> map(). |
229 |
|
maybe_add_end_filter(undefined, RangeMap) -> |
230 |
:-( |
RangeMap; |
231 |
|
maybe_add_end_filter(Value, RangeMap) -> |
232 |
:-( |
RangeMap#{le => Value}. |
233 |
|
|
234 |
|
-spec maybe_add_start_filter(undefined | mod_mam:message_id(), map()) -> map(). |
235 |
|
maybe_add_start_filter(undefined, RangeMap) -> |
236 |
:-( |
RangeMap; |
237 |
|
maybe_add_start_filter(Value, RangeMap) -> |
238 |
:-( |
RangeMap#{ge => Value}. |
239 |
|
|
240 |
|
-spec build_text_search_query(map()) -> map(). |
241 |
|
build_text_search_query(#{search_text := SearchText}) when is_binary(SearchText) -> |
242 |
:-( |
#{simple_query_string => #{query => SearchText, |
243 |
|
fields => [<<"body">>], |
244 |
|
default_operator => <<"and">>}}; |
245 |
|
build_text_search_query(_) -> |
246 |
:-( |
#{match_all => #{}}. |
247 |
|
|
248 |
|
-spec determine_sorting(map()) -> asc | desc. |
249 |
|
determine_sorting(#{rsm := #rsm_in{direction = before}}) -> |
250 |
:-( |
desc; |
251 |
|
determine_sorting(_) -> |
252 |
:-( |
asc. |
253 |
|
|
254 |
|
-spec maybe_add_from_constraint(mongoose_elasticsearch:query(), map()) -> |
255 |
|
mongoose_elasticsearch:query(). |
256 |
|
maybe_add_from_constraint(Query, #{rsm := #rsm_in{index = Offset}}) when is_integer(Offset) -> |
257 |
:-( |
Query#{from => Offset}; |
258 |
|
maybe_add_from_constraint(Query, _) -> |
259 |
:-( |
Query. |
260 |
|
|
261 |
|
-spec search_result_to_mam_lookup_result(map(), map()) -> mod_mam:lookup_result(). |
262 |
|
search_result_to_mam_lookup_result(Result, Params) -> |
263 |
:-( |
#{<<"hits">> := |
264 |
|
#{<<"hits">> := Hits, |
265 |
|
<<"total">> := TotalCount}} = Result, |
266 |
|
|
267 |
:-( |
Messages = lists:sort( |
268 |
|
lists:map(fun hit_to_mam_message/1, Hits)), |
269 |
|
|
270 |
:-( |
case maps:get(is_simple, Params) of |
271 |
|
true -> |
272 |
:-( |
{undefined, undefined, Messages}; |
273 |
|
_ -> |
274 |
:-( |
CorrectedTotalCount = corrected_total_count(TotalCount, Params), |
275 |
:-( |
Count = length(Messages), |
276 |
:-( |
Offset = calculate_offset(TotalCount, Count, Params), |
277 |
:-( |
{CorrectedTotalCount, Offset, Messages} |
278 |
|
end. |
279 |
|
|
280 |
|
-spec hit_to_mam_message(map()) -> mod_mam:message_row(). |
281 |
|
hit_to_mam_message(#{<<"_source">> := JSON}) -> |
282 |
:-( |
MessageId = maps:get(<<"mam_id">>, JSON), |
283 |
:-( |
Packet = maps:get(<<"message">>, JSON), |
284 |
:-( |
SourceBinJid = maps:get(<<"source_jid">>, JSON), |
285 |
:-( |
{ok, Stanza} = exml:parse(Packet), |
286 |
:-( |
#{id => MessageId, jid => jid:from_binary(SourceBinJid), packet => Stanza}. |
287 |
|
|
288 |
|
hit_to_gdpr_mam_message(#{<<"_source">> := JSON}) -> |
289 |
:-( |
MessageId = maps:get(<<"mam_id">>, JSON), |
290 |
:-( |
Packet = maps:get(<<"message">>, JSON), |
291 |
:-( |
SourceBinJid = maps:get(<<"source_jid">>, JSON), |
292 |
:-( |
{integer_to_binary(MessageId), SourceBinJid, Packet}. |
293 |
|
|
294 |
|
%% Usage of RSM affects the `"total"' value returned by ElasticSearch. Per RSM spec, the count |
295 |
|
%% returned by the query should represent the size of the whole result set, which in case of MAM |
296 |
|
%% is bound only by the MAM filters. |
297 |
|
%% The solution is to compute the archive size as if the RSM wasn't used. There is an obvious race |
298 |
|
%% condition here, because a user may send a message between initial request to ElasticSearch and |
299 |
|
%% the count request issued here. |
300 |
|
-spec corrected_total_count(non_neg_integer(), mongoose_elasticsearch:query()) -> |
301 |
|
non_neg_integer(). |
302 |
|
corrected_total_count(_, #{rsm := #rsm_in{id = Id}} = Params) when is_integer(Id) -> |
303 |
:-( |
Query = build_search_query(Params#{rsm := undefined}), |
304 |
:-( |
archive_size(Query); |
305 |
|
corrected_total_count(Count, _) -> |
306 |
:-( |
Count. |
307 |
|
|
308 |
|
-spec calculate_offset(non_neg_integer(), non_neg_integer(), map()) -> non_neg_integer(). |
309 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = undefined, index = Index}}) when is_integer(Index) -> |
310 |
:-( |
Index; |
311 |
|
calculate_offset(TotalCount, Count, #{rsm := #rsm_in{direction = before}}) -> |
312 |
:-( |
TotalCount - Count; |
313 |
|
calculate_offset(_, _, #{rsm := #rsm_in{direction = aft, id = Id}} = Params0) when is_integer(Id) -> |
314 |
|
%% Not sure how this works.. |
315 |
:-( |
Params1 = update_borders(Params0#{rsm := undefined}, Id + 1), |
316 |
:-( |
Query = build_search_query(Params1), |
317 |
:-( |
archive_size(Query); |
318 |
|
calculate_offset(_, _, _) -> |
319 |
:-( |
0. |
320 |
|
|
321 |
|
-spec update_borders(map(), non_neg_integer()) -> map(). |
322 |
|
update_borders(#{borders := Borders} = Params, EndId) -> |
323 |
:-( |
Params#{borders := update_borders_to_id(Borders, EndId)}. |
324 |
|
|
325 |
|
-spec update_borders_to_id(#mam_borders{} | undefined, non_neg_integer()) -> #mam_borders{}. |
326 |
|
update_borders_to_id(undefined, EndId) -> |
327 |
:-( |
#mam_borders{to_id = EndId}; |
328 |
|
update_borders_to_id(Borders, EndId) -> |
329 |
:-( |
Borders#mam_borders{to_id = EndId}. |
330 |
|
|
331 |
|
-spec archive_size(mod_mam_elasticsearch:query()) -> non_neg_integer(). |
332 |
|
archive_size(Query) -> |
333 |
:-( |
case mongoose_elasticsearch:count(?INDEX_NAME, ?TYPE_NAME, Query) of |
334 |
|
{ok, Count} -> |
335 |
:-( |
Count; |
336 |
|
{error, Reason} -> |
337 |
:-( |
?LOG_ERROR(#{what => archive_size_failed, reason => Reason, es_query => Query, |
338 |
:-( |
text => <<"Failed to retrieve count of messages from ElasticSearch">>}), |
339 |
:-( |
0 |
340 |
|
end. |