./ct_report/coverage/mod_mam_rdbms_arch.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc RDBMS backend for Message Archive Management.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(mod_mam_rdbms_arch).
8
9 %% ----------------------------------------------------------------------
10 %% Exports
11
12 %% gen_mod handlers
13 -export([start/2, stop/1, hooks/1, supported_features/0]).
14
15 %% MAM hook handlers
16 -behaviour(ejabberd_gen_mam_archive).
17 -behaviour(gen_mod).
18 -behaviour(mongoose_module_metrics).
19
20 -callback encode(term()) -> binary().
21 -callback decode(binary()) -> term().
22
23 -export([archive_size/3,
24 archive_message/3,
25 lookup_messages/3,
26 remove_archive/3,
27 remove_domain/3,
28 get_mam_pm_gdpr_data/3]).
29
30 %% Called from mod_mam_rdbms_async_pool_writer
31 -export([prepare_message/2, retract_message/2, prepare_insert/2]).
32
33 -type host_type() :: mongooseim:host_type().
34
35 %% ----------------------------------------------------------------------
36 %% Imports
37
38 -include("mongoose.hrl").
39 -include("jlib.hrl").
40 -include("mongoose_mam.hrl").
41
42 %% ----------------------------------------------------------------------
43 %% Types
44
45 -type env_vars() :: #{
46 host_type := host_type(),
47 archive_jid := jid:jid(),
48 table := atom(),
49 index_hint_fn := fun((env_vars()) -> mam_lookup_sql:sql_part()),
50 columns_sql_fn := fun((mam_lookup_sql:query_type()) -> mam_lookup_sql:sql_part()),
51 column_to_id_fn := fun((mam_lookup_sql:column()) -> string()),
52 lookup_fn := mam_lookup_sql:lookup_query_fn(),
53 decode_row_fn := fun((Row :: tuple(), env_vars()) -> Decoded :: tuple()),
54 has_message_retraction := boolean(),
55 has_full_text_search := boolean(),
56 db_jid_codec := module(),
57 db_message_codec := module()
58 }.
59
60 -export_type([env_vars/0]).
61
62 %% ----------------------------------------------------------------------
63 %% gen_mod callbacks
64 %% Starting and stopping functions for users' archives
65
66 -spec start(host_type(), gen_mod:module_opts()) -> ok.
67 start(_HostType, Opts) ->
68 1 register_prepared_queries(Opts),
69 1 ok.
70
71 -spec stop(host_type()) -> ok.
72 stop(_HostType) ->
73
:-(
ok.
74
75 -spec supported_features() -> [atom()].
76 supported_features() ->
77
:-(
[dynamic_domains].
78
79 -spec get_mam_pm_gdpr_data(Acc, Params, Extra) -> {ok, Acc} when
80 Acc :: ejabberd_gen_mam_archive:mam_pm_gdpr_data(),
81 Params :: #{jid := jid:jid()},
82 Extra :: gen_hook:extra().
83 get_mam_pm_gdpr_data(Acc,
84 #{jid := #jid{luser = LUser, lserver = LServer} = ArcJID},
85 #{host_type := HostType}) ->
86
:-(
case mod_mam_pm:archive_id(LServer, LUser) of
87 undefined ->
88
:-(
{ok, Acc};
89 ArcID ->
90
:-(
Env = env_vars(HostType, ArcJID),
91
:-(
{selected, Rows} = extract_gdpr_messages(Env, ArcID),
92
:-(
{ok, [uniform_to_gdpr(row_to_uniform_format(Row, Env)) || Row <- Rows] ++ Acc}
93 end.
94
95 -spec uniform_to_gdpr(mod_mam:message_row()) -> tuple().
96 uniform_to_gdpr(#{id := MessID, jid := RemoteJID, packet := Packet}) ->
97
:-(
{integer_to_binary(MessID), jid:to_binary(RemoteJID), exml:to_binary(Packet)}.
98
99 %% ----------------------------------------------------------------------
100 %% Add hooks for mod_mam_pm
101
102 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
103 hooks(HostType) ->
104 case gen_mod:get_module_opt(HostType, ?MODULE, no_writer) of
105 true ->
106
:-(
[];
107 false ->
108 1 [{mam_archive_message, HostType, fun ?MODULE:archive_message/3, #{}, 50}]
109 1 end ++
110 [{mam_archive_size, HostType, fun ?MODULE:archive_size/3, #{}, 50},
111 {mam_lookup_messages, HostType, fun ?MODULE:lookup_messages/3, #{}, 50},
112 {mam_remove_archive, HostType, fun ?MODULE:remove_archive/3, #{}, 50},
113 {get_mam_pm_gdpr_data, HostType, fun ?MODULE:get_mam_pm_gdpr_data/3, #{}, 50},
114 {remove_domain, HostType, fun ?MODULE:remove_domain/3, #{}, 50}].
115
116 %% ----------------------------------------------------------------------
117 %% SQL queries
118
119 register_prepared_queries(Opts) ->
120 1 prepare_insert(insert_mam_message, 1),
121 1 mongoose_rdbms:prepare(mam_archive_remove, mam_message, [user_id],
122 <<"DELETE FROM mam_message "
123 "WHERE user_id = ?">>),
124
125 %% Domain Removal
126 1 prepare_remove_domain(Opts),
127
128 1 mongoose_rdbms:prepare(mam_make_tombstone, mam_message, [message, user_id, id],
129 <<"UPDATE mam_message SET message = ?, search_body = '' "
130 "WHERE user_id = ? AND id = ?">>),
131 1 {LimitSQL, LimitMSSQL} = rdbms_queries:get_db_specific_limits_binaries(1),
132 1 mongoose_rdbms:prepare(mam_select_messages_to_retract_on_origin_id, mam_message,
133 [user_id, remote_bare_jid, origin_id, direction],
134 <<"SELECT ", LimitMSSQL/binary,
135 " id, message FROM mam_message"
136 " WHERE user_id = ? AND remote_bare_jid = ? "
137 " AND origin_id = ? AND direction = ?"
138 " ORDER BY id DESC ", LimitSQL/binary>>),
139 1 mongoose_rdbms:prepare(mam_select_messages_to_retract_on_stanza_id, mam_message,
140 [user_id, remote_bare_jid, id, direction],
141 <<"SELECT ", LimitMSSQL/binary,
142 " origin_id, message FROM mam_message"
143 " WHERE user_id = ? AND remote_bare_jid = ? "
144 " AND id = ? AND direction = ?"
145 " ORDER BY id DESC ", LimitSQL/binary>>).
146
147 prepare_remove_domain(#{delete_domain_limit := infinity}) ->
148 1 mongoose_rdbms:prepare(mam_remove_domain, mam_message, ['mam_server_user.server'],
149 <<"DELETE FROM mam_message "
150 "WHERE user_id IN "
151 "(SELECT id from mam_server_user WHERE server = ?)">>),
152 1 mongoose_rdbms:prepare(mam_remove_domain_prefs, mam_config, ['mam_server_user.server'],
153 <<"DELETE FROM mam_config "
154 "WHERE user_id IN "
155 "(SELECT id from mam_server_user WHERE server = ?)">>),
156 1 mongoose_rdbms:prepare(mam_remove_domain_users, mam_server_user, [server],
157 <<"DELETE FROM mam_server_user WHERE server = ?">>);
158 prepare_remove_domain(#{delete_domain_limit := Limit}) ->
159
:-(
LimitSQL = case mongoose_rdbms:db_type() of
160
:-(
mssql -> throw(delete_domain_limit_not_supported_for_mssql);
161
:-(
_ -> {MaybeLimitSQL, _} = rdbms_queries:get_db_specific_limits_binaries(Limit),
162
:-(
MaybeLimitSQL
163 end,
164
:-(
IdTable = <<"(SELECT * FROM ",
165 "(SELECT msg.user_id, msg.id FROM mam_message msg",
166 " INNER JOIN mam_server_user msu ON msu.id=msg.user_id",
167 " WHERE msu.server = ? ", LimitSQL/binary, ") AS T)">>,
168
:-(
mongoose_rdbms:prepare(mam_incr_remove_domain, mam_message, ['mam_server_user.server'],
169 <<"DELETE FROM mam_message WHERE (user_id, id) IN ", IdTable/binary>>),
170
:-(
CfgTable = <<"(SELECT * FROM ",
171 "(SELECT cfg.user_id, cfg.remote_jid FROM mam_config cfg",
172 " INNER JOIN mam_server_user msu ON msu.id=cfg.user_id",
173 " WHERE msu.server = ? ", LimitSQL/binary, ") AS T)">>,
174
:-(
mongoose_rdbms:prepare(mam_incr_remove_domain_prefs, mam_config, ['mam_server_user.server'],
175 <<"DELETE FROM mam_config "
176 "WHERE (user_id, remote_jid) IN ", CfgTable/binary>>),
177
:-(
ServerTable = <<"(SELECT * FROM ",
178 "(SELECT id FROM mam_server_user WHERE server = ? ", LimitSQL/binary, ") as t)">>,
179
:-(
mongoose_rdbms:prepare(mam_incr_remove_domain_users, mam_server_user, [server],
180 <<"DELETE FROM mam_server_user WHERE id IN ", ServerTable/binary>>).
181
182
183 %% ----------------------------------------------------------------------
184 %% Declarative logic
185
186 db_mappings() ->
187 %% One entry per the database field
188 61 [#db_mapping{column = id, param = message_id, format = int},
189 #db_mapping{column = user_id, param = archive_id, format = int},
190 #db_mapping{column = remote_bare_jid, param = remote_jid, format = bare_jid},
191 #db_mapping{column = remote_resource, param = remote_jid, format = jid_resource},
192 #db_mapping{column = direction, param = direction, format = direction},
193 #db_mapping{column = from_jid, param = source_jid, format = jid},
194 #db_mapping{column = origin_id, param = origin_id, format = maybe_string},
195 #db_mapping{column = message, param = packet, format = xml},
196 #db_mapping{column = search_body, param = packet, format = search},
197 #db_mapping{column = is_groupchat, param = is_groupchat, format = bool}].
198
199 lookup_fields() ->
200 %% Describe each possible filtering option
201 12 [#lookup_field{op = equal, column = user_id, param = archive_id, required = true},
202 #lookup_field{op = ge, column = id, param = start_id},
203 #lookup_field{op = le, column = id, param = end_id},
204 #lookup_field{op = equal, column = remote_bare_jid, param = remote_bare_jid},
205 #lookup_field{op = equal, column = remote_resource, param = remote_resource},
206 #lookup_field{op = like, column = search_body, param = norm_search_text, value_maker = search_words},
207 #lookup_field{op = equal, column = is_groupchat, param = include_groupchat},
208 #lookup_field{op = equal, column = id, param = message_id}].
209
210 -spec env_vars(host_type(), jid:jid()) -> env_vars().
211 env_vars(HostType, ArcJID) ->
212 %% Please, minimize the usage of the host_type field.
213 %% It's only for passing into RDBMS.
214 72 #{host_type => HostType,
215 archive_jid => ArcJID,
216 table => mam_message,
217 index_hint_fn => fun index_hint_sql/1,
218 columns_sql_fn => fun columns_sql/1,
219 column_to_id_fn => fun column_to_id/1,
220 lookup_fn => fun lookup_query/5,
221 decode_row_fn => fun row_to_uniform_format/2,
222 has_message_retraction => mod_mam_utils:has_message_retraction(mod_mam_pm, HostType),
223 has_full_text_search => mod_mam_utils:has_full_text_search(mod_mam_pm, HostType),
224 db_jid_codec => mod_mam_utils:db_jid_codec(HostType, ?MODULE),
225 db_message_codec => mod_mam_utils:db_message_codec(HostType, ?MODULE)}.
226
227 row_to_uniform_format(Row, Env) ->
228 8 mam_decoder:decode_row(Row, Env).
229
230 -spec index_hint_sql(env_vars()) -> string().
231 index_hint_sql(#{host_type := HostType}) ->
232 1 case mongoose_rdbms:db_engine(HostType) of
233
:-(
mysql -> "USE INDEX(PRIMARY, i_mam_message_rem) ";
234 1 _ -> ""
235 end.
236
237 1 columns_sql(lookup) -> "id, from_jid, message";
238
:-(
columns_sql(count) -> "COUNT(*)".
239
240 %% For each unique column in lookup_fields()
241
:-(
column_to_id(id) -> "i";
242 12 column_to_id(user_id) -> "u";
243
:-(
column_to_id(remote_bare_jid) -> "b";
244
:-(
column_to_id(remote_resource) -> "r";
245
:-(
column_to_id(search_body) -> "s";
246
:-(
column_to_id(is_groupchat) -> "g".
247
248 column_names(Mappings) ->
249 1 [Column || #db_mapping{column = Column} <- Mappings].
250
251 %% ----------------------------------------------------------------------
252 %% Options
253
254 -spec get_retract_id(exml:element(), env_vars()) -> none | mod_mam_utils:retraction_id().
255 get_retract_id(Packet, #{has_message_retraction := Enabled}) ->
256 60 mod_mam_utils:get_retract_id(Enabled, Packet).
257
258 %% ----------------------------------------------------------------------
259 %% Internal functions and callbacks
260
261 -spec archive_size(Acc, Params, Extra) -> {ok, Acc} when
262 Acc :: integer(),
263 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
264 Extra :: gen_hook:extra().
265 archive_size(Size, #{archive_id := ArcID, owner := ArcJID}, #{host_type := HostType}) when is_integer(Size) ->
266
:-(
Filter = [{equal, user_id, ArcID}],
267
:-(
Env = env_vars(HostType, ArcJID),
268
:-(
Result = lookup_query(count, Env, Filter, unordered, all),
269
:-(
{ok, mongoose_rdbms:selected_to_integer(Result)}.
270
271 -spec archive_message(Acc, Params, Extra) -> {ok, Acc} when
272 Acc :: ok,
273 Params :: mod_mam:archive_message_params(),
274 Extra :: gen_hook:extra().
275 archive_message(_Result, #{local_jid := ArcJID} = Params, #{host_type := HostType}) ->
276 60 try
277 60 assert_archive_id_provided(Params),
278 60 Env = env_vars(HostType, ArcJID),
279 60 do_archive_message(HostType, Params, Env),
280 60 retract_message(HostType, Params, Env),
281 60 {ok, ok}
282 catch error:Reason:StackTrace ->
283
:-(
mongoose_instrument:execute(mod_mam_pm_dropped, #{host_type => HostType}, #{count => 1}),
284
:-(
?LOG_ERROR(#{what => archive_message_failed,
285 host_type => HostType, mam_params => Params,
286
:-(
reason => Reason, stacktrace => StackTrace}),
287
:-(
erlang:raise(error, Reason, StackTrace)
288 end.
289
290 do_archive_message(HostType, Params, Env) ->
291 60 Row = mam_encoder:encode_message(Params, Env, db_mappings()),
292 60 {updated, 1} = mongoose_rdbms:execute_successfully(HostType, insert_mam_message, Row).
293
294 %% Retraction logic
295 %% Called after inserting a new message
296 -spec retract_message(host_type(), mod_mam:archive_message_params()) -> ok.
297 retract_message(HostType, #{local_jid := ArcJID} = Params) ->
298
:-(
Env = env_vars(HostType, ArcJID),
299
:-(
retract_message(HostType, Params, Env).
300
301 -spec retract_message(host_type(), mod_mam:archive_message_params(), env_vars()) -> ok.
302 retract_message(HostType, #{archive_id := ArcID, remote_jid := RemJID,
303 direction := Dir, packet := Packet} = Params, Env) ->
304 60 case get_retract_id(Packet, Env) of
305 60 none -> ok;
306 RetractionId ->
307
:-(
Info = get_retraction_info(HostType, ArcID, RemJID, RetractionId, Dir, Env),
308
:-(
make_tombstone(HostType, ArcID, RetractionId, Info, Params, Env)
309 end.
310
311 get_retraction_info(HostType, ArcID, RemJID, RetractionId, Dir, Env) ->
312 %% Code style notice:
313 %% - Add Ext prefix for all externally encoded data
314 %% (in cases, when we usually add Bin, B, S Esc prefixes)
315
:-(
ExtBareRemJID = mam_encoder:encode_jid(jid:to_bare(RemJID), Env),
316
:-(
ExtDir = mam_encoder:encode_direction(Dir),
317
:-(
{selected, Rows} = execute_select_messages_to_retract(
318 HostType, ArcID, ExtBareRemJID, RetractionId, ExtDir),
319
:-(
mam_decoder:decode_retraction_info(Env, Rows, RetractionId).
320
321 make_tombstone(_HostType, ArcID, RetractionId, skip, _Params, _Env) ->
322
:-(
?LOG_INFO(#{what => make_tombstone_failed,
323 text => <<"Message to retract was not found">>,
324
:-(
user_id => ArcID, retraction_context => RetractionId});
325 make_tombstone(HostType, ArcID, _RetractionId,
326 RetractionInfo = #{message_id := MessID}, Params,
327 #{archive_jid := ArcJID} = Env) ->
328
:-(
RetractionInfo1 = mongoose_hooks:mam_retraction(HostType, RetractionInfo, Params),
329
:-(
Tombstone = mod_mam_utils:tombstone(RetractionInfo1, ArcJID),
330
:-(
TombstoneData = mam_encoder:encode_packet(Tombstone, Env),
331
:-(
execute_make_tombstone(HostType, TombstoneData, ArcID, MessID).
332
333 execute_select_messages_to_retract(HostType, ArcID, BareRemJID, {origin_id, OriginID}, Dir) ->
334
:-(
mongoose_rdbms:execute_successfully(HostType, mam_select_messages_to_retract_on_origin_id,
335 [ArcID, BareRemJID, OriginID, Dir]);
336 execute_select_messages_to_retract(HostType, ArcID, BareRemJID, {stanza_id, BinStanzaId}, Dir) ->
337
:-(
StanzaId = mod_mam_utils:external_binary_to_mess_id(BinStanzaId),
338
:-(
mongoose_rdbms:execute_successfully(HostType, mam_select_messages_to_retract_on_stanza_id,
339 [ArcID, BareRemJID, StanzaId, Dir]).
340
341 execute_make_tombstone(HostType, TombstoneData, ArcID, MessID) ->
342
:-(
mongoose_rdbms:execute_successfully(HostType, mam_make_tombstone,
343 [TombstoneData, ArcID, MessID]).
344
345 %% Insert logic
346 -spec prepare_message(host_type(), mod_mam:archive_message_params()) -> list().
347 prepare_message(HostType, Params = #{local_jid := ArcJID}) ->
348
:-(
Env = env_vars(HostType, ArcJID),
349
:-(
mam_encoder:encode_message(Params, Env, db_mappings()).
350
351 -spec prepare_insert(Name :: atom(), NumRows :: pos_integer()) -> ok.
352 prepare_insert(Name, NumRows) ->
353 1 Table = mam_message,
354 1 Fields = column_names(db_mappings()),
355 1 {Query, Fields2} = rdbms_queries:create_bulk_insert_query(Table, Fields, NumRows),
356 1 mongoose_rdbms:prepare(Name, Table, Fields2, Query),
357 1 ok.
358
359 %% Removal logic
360 -spec remove_archive(Acc, Params, Extra) -> {ok, Acc} when
361 Acc :: term(),
362 Params :: #{archive_id := mod_mam:archive_id() | undefined, owner := jid:jid()},
363 Extra :: gen_hook:extra().
364 remove_archive(Acc, #{archive_id := ArcID}, #{host_type := HostType}) ->
365
:-(
mongoose_rdbms:execute_successfully(HostType, mam_archive_remove, [ArcID]),
366
:-(
{ok, Acc}.
367
368 -spec remove_domain(Acc, Params, Extra) -> {ok | stop, Acc} when
369 Acc :: mongoose_domain_api:remove_domain_acc(),
370 Params :: map(),
371 Extra :: gen_hook:extra().
372 remove_domain(Acc, #{domain := Domain}, #{host_type := HostType}) ->
373
:-(
F = fun() ->
374
:-(
case gen_mod:get_module_opt(HostType, ?MODULE, delete_domain_limit) of
375
:-(
infinity -> remove_domain_all(HostType, Domain);
376
:-(
Limit -> remove_domain_batch(HostType, Domain, Limit)
377 end,
378
:-(
Acc
379 end,
380
:-(
mongoose_domain_api:remove_domain_wrapper(Acc, F, ?MODULE).
381
382 -spec remove_domain_all(host_type(), jid:lserver()) -> any().
383 remove_domain_all(HostType, Domain) ->
384
:-(
{atomic, _} = mongoose_rdbms:sql_transaction(HostType, fun() ->
385
:-(
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain, [Domain]),
386
:-(
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_prefs, [Domain]),
387
:-(
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_users, [Domain])
388 end).
389
390 -spec remove_domain_batch(host_type(), jid:lserver(), non_neg_integer()) -> any().
391 remove_domain_batch(HostType, Domain, Limit) ->
392
:-(
DeleteQueries = [mam_incr_remove_domain, mam_incr_remove_domain_prefs, mam_incr_remove_domain_users],
393
:-(
TotalDeleted = mod_mam_utils:incremental_delete_domain(HostType, Domain, Limit, DeleteQueries, 0),
394
:-(
?LOG_INFO(#{what => mam_domain_removal_completed, total_records_deleted => TotalDeleted,
395
:-(
domain => Domain, host_type => HostType}).
396
397 %% GDPR logic
398 extract_gdpr_messages(Env, ArcID) ->
399
:-(
Filters = [{equal, user_id, ArcID}],
400
:-(
lookup_query(lookup, Env, Filters, asc, all).
401
402 %% Lookup logic
403 -spec lookup_messages(Acc, Params, Extra) -> {ok, Acc} when
404 Acc :: {ok, mod_mam:lookup_result()},
405 Params :: mam_iq:lookup_params(),
406 Extra :: gen_hook:extra().
407 lookup_messages(_Result, #{owner_jid := ArcJID} = Params, #{host_type := HostType}) ->
408 12 Env = env_vars(HostType, ArcJID),
409 12 ExdParams = mam_encoder:extend_lookup_params(Params, Env),
410 12 Filter = mam_filter:produce_filter(ExdParams, lookup_fields()),
411 12 {ok, mam_lookup:lookup(Env, Filter, ExdParams)}.
412
413 lookup_query(QueryType, Env, Filters, Order, OffsetLimit) ->
414 12 mam_lookup_sql:lookup_query(QueryType, Env, Filters, Order, OffsetLimit).
415
416 assert_archive_id_provided(#{archive_id := ArcID}) when is_integer(ArcID) ->
417 60 ok.
Line Hits Source