1 |
|
%%%------------------------------------------------------------------- |
2 |
|
%%% @author Uvarov Michael <arcusfelis@gmail.com> |
3 |
|
%%% @copyright (C) 2013, Uvarov Michael |
4 |
|
%%% @doc RDBMS backend for Message Archive Management. |
5 |
|
%%% @end |
6 |
|
%%%------------------------------------------------------------------- |
7 |
|
-module(mod_mam_rdbms_arch). |
8 |
|
|
9 |
|
%% ---------------------------------------------------------------------- |
10 |
|
%% Exports |
11 |
|
|
12 |
|
%% gen_mod handlers |
13 |
|
-export([start/2, stop/1, supported_features/0]). |
14 |
|
|
15 |
|
%% MAM hook handlers |
16 |
|
-behaviour(ejabberd_gen_mam_archive). |
17 |
|
-behaviour(gen_mod). |
18 |
|
-behaviour(mongoose_module_metrics). |
19 |
|
|
20 |
|
-callback encode(term()) -> binary(). |
21 |
|
-callback decode(binary()) -> term(). |
22 |
|
|
23 |
|
-export([archive_size/4, |
24 |
|
archive_message/3, |
25 |
|
lookup_messages/3, |
26 |
|
remove_archive/4, |
27 |
|
remove_domain/3]). |
28 |
|
|
29 |
|
-export([get_mam_pm_gdpr_data/3]). |
30 |
|
|
31 |
|
%% Called from mod_mam_rdbms_async_pool_writer |
32 |
|
-export([prepare_message/2, retract_message/2, prepare_insert/2]). |
33 |
|
|
34 |
|
-ignore_xref([behaviour_info/1, remove_archive/4, remove_domain/3]). |
35 |
|
|
36 |
|
-type host_type() :: mongooseim:host_type(). |
37 |
|
|
38 |
|
%% ---------------------------------------------------------------------- |
39 |
|
%% Imports |
40 |
|
|
41 |
|
-include("mongoose.hrl"). |
42 |
|
-include("jlib.hrl"). |
43 |
|
-include_lib("exml/include/exml.hrl"). |
44 |
|
-include("mongoose_rsm.hrl"). |
45 |
|
-include("mongoose_mam.hrl"). |
46 |
|
|
47 |
|
%% ---------------------------------------------------------------------- |
48 |
|
%% Types |
49 |
|
|
50 |
|
-type env_vars() :: #{ |
51 |
|
host_type := host_type(), |
52 |
|
archive_jid := jid:jid(), |
53 |
|
table := atom(), |
54 |
|
index_hint_fn := fun((env_vars()) -> mam_lookup_sql:sql_part()), |
55 |
|
columns_sql_fn := fun((mam_lookup_sql:query_type()) -> mam_lookup_sql:sql_part()), |
56 |
|
column_to_id_fn := fun((mam_lookup_sql:column()) -> string()), |
57 |
|
lookup_fn := mam_lookup_sql:lookup_query_fn(), |
58 |
|
decode_row_fn := fun((Row :: tuple(), env_vars()) -> Decoded :: tuple()), |
59 |
|
has_message_retraction := boolean(), |
60 |
|
has_full_text_search := boolean(), |
61 |
|
db_jid_codec := module(), |
62 |
|
db_message_codec := module() |
63 |
|
}. |
64 |
|
|
65 |
|
-export_type([env_vars/0]). |
66 |
|
|
67 |
|
%% ---------------------------------------------------------------------- |
68 |
|
%% gen_mod callbacks |
69 |
|
%% Starting and stopping functions for users' archives |
70 |
|
|
71 |
|
-spec start(host_type(), gen_mod:module_opts()) -> ok. |
72 |
|
start(HostType, _Opts) -> |
73 |
:-( |
start_hooks(HostType), |
74 |
:-( |
register_prepared_queries(), |
75 |
:-( |
ok. |
76 |
|
|
77 |
|
-spec stop(host_type()) -> ok. |
78 |
|
stop(HostType) -> |
79 |
:-( |
stop_hooks(HostType). |
80 |
|
|
81 |
|
-spec supported_features() -> [atom()]. |
82 |
|
supported_features() -> |
83 |
:-( |
[dynamic_domains]. |
84 |
|
|
85 |
|
-spec get_mam_pm_gdpr_data(ejabberd_gen_mam_archive:mam_pm_gdpr_data(), |
86 |
|
host_type(), jid:jid()) -> |
87 |
|
ejabberd_gen_mam_archive:mam_pm_gdpr_data(). |
88 |
|
get_mam_pm_gdpr_data(Acc, HostType, |
89 |
|
#jid{luser = LUser, lserver = LServer} = ArcJID) -> |
90 |
:-( |
case mod_mam:archive_id(LServer, LUser) of |
91 |
|
undefined -> |
92 |
:-( |
Acc; |
93 |
|
ArcID -> |
94 |
:-( |
Env = env_vars(HostType, ArcJID), |
95 |
:-( |
{selected, Rows} = extract_gdpr_messages(Env, ArcID), |
96 |
:-( |
[uniform_to_gdpr(row_to_uniform_format(Row, Env)) || Row <- Rows] ++ Acc |
97 |
|
end. |
98 |
|
|
99 |
|
-spec uniform_to_gdpr(mod_mam:message_row()) -> tuple(). |
100 |
|
uniform_to_gdpr(#{id := MessID, jid := RemoteJID, packet := Packet}) -> |
101 |
:-( |
{integer_to_binary(MessID), jid:to_binary(RemoteJID), exml:to_binary(Packet)}. |
102 |
|
|
103 |
|
%% ---------------------------------------------------------------------- |
104 |
|
%% Add hooks for mod_mam |
105 |
|
|
106 |
|
-spec start_hooks(host_type()) -> ok. |
107 |
|
start_hooks(HostType) -> |
108 |
:-( |
ejabberd_hooks:add(hooks(HostType)). |
109 |
|
|
110 |
|
-spec stop_hooks(host_type()) -> ok. |
111 |
|
stop_hooks(HostType) -> |
112 |
:-( |
ejabberd_hooks:delete(hooks(HostType)). |
113 |
|
|
114 |
|
hooks(HostType) -> |
115 |
|
case gen_mod:get_module_opt(HostType, ?MODULE, no_writer) of |
116 |
|
true -> |
117 |
:-( |
[]; |
118 |
|
false -> |
119 |
:-( |
[{mam_archive_message, HostType, ?MODULE, archive_message, 50}] |
120 |
:-( |
end ++ |
121 |
|
[{mam_archive_size, HostType, ?MODULE, archive_size, 50}, |
122 |
|
{mam_lookup_messages, HostType, ?MODULE, lookup_messages, 50}, |
123 |
|
{mam_remove_archive, HostType, ?MODULE, remove_archive, 50}, |
124 |
|
{remove_domain, HostType, ?MODULE, remove_domain, 50}, |
125 |
|
{get_mam_pm_gdpr_data, HostType, ?MODULE, get_mam_pm_gdpr_data, 50}]. |
126 |
|
|
127 |
|
%% ---------------------------------------------------------------------- |
128 |
|
%% SQL queries |
129 |
|
|
130 |
|
register_prepared_queries() -> |
131 |
:-( |
prepare_insert(insert_mam_message, 1), |
132 |
:-( |
mongoose_rdbms:prepare(mam_archive_remove, mam_message, [user_id], |
133 |
|
<<"DELETE FROM mam_message " |
134 |
|
"WHERE user_id = ?">>), |
135 |
:-( |
mongoose_rdbms:prepare(mam_remove_domain, mam_message, ['mam_server_user.server'], |
136 |
|
<<"DELETE FROM mam_message " |
137 |
|
"WHERE user_id IN " |
138 |
|
"(SELECT id from mam_server_user WHERE server = ?)">>), |
139 |
:-( |
mongoose_rdbms:prepare(mam_remove_domain_prefs, mam_config, ['mam_server_user.server'], |
140 |
|
<<"DELETE FROM mam_config " |
141 |
|
"WHERE user_id IN " |
142 |
|
"(SELECT id from mam_server_user WHERE server = ?)">>), |
143 |
:-( |
mongoose_rdbms:prepare(mam_remove_domain_users, mam_server_user, [server], |
144 |
|
<<"DELETE FROM mam_server_user WHERE server = ?">>), |
145 |
:-( |
mongoose_rdbms:prepare(mam_make_tombstone, mam_message, [message, user_id, id], |
146 |
|
<<"UPDATE mam_message SET message = ?, search_body = '' " |
147 |
|
"WHERE user_id = ? AND id = ?">>), |
148 |
:-( |
{LimitSQL, LimitMSSQL} = rdbms_queries:get_db_specific_limits_binaries(1), |
149 |
:-( |
mongoose_rdbms:prepare(mam_select_messages_to_retract_on_origin_id, mam_message, |
150 |
|
[user_id, remote_bare_jid, origin_id, direction], |
151 |
|
<<"SELECT ", LimitMSSQL/binary, |
152 |
|
" id, message FROM mam_message" |
153 |
|
" WHERE user_id = ? AND remote_bare_jid = ? " |
154 |
|
" AND origin_id = ? AND direction = ?" |
155 |
|
" ORDER BY id DESC ", LimitSQL/binary>>), |
156 |
:-( |
mongoose_rdbms:prepare(mam_select_messages_to_retract_on_stanza_id, mam_message, |
157 |
|
[user_id, remote_bare_jid, id, direction], |
158 |
|
<<"SELECT ", LimitMSSQL/binary, |
159 |
|
" origin_id, message FROM mam_message" |
160 |
|
" WHERE user_id = ? AND remote_bare_jid = ? " |
161 |
|
" AND id = ? AND direction = ?" |
162 |
|
" ORDER BY id DESC ", LimitSQL/binary>>). |
163 |
|
|
164 |
|
%% ---------------------------------------------------------------------- |
165 |
|
%% Declarative logic |
166 |
|
|
167 |
|
db_mappings() -> |
168 |
|
%% One entry per the database field |
169 |
:-( |
[#db_mapping{column = id, param = message_id, format = int}, |
170 |
|
#db_mapping{column = user_id, param = archive_id, format = int}, |
171 |
|
#db_mapping{column = remote_bare_jid, param = remote_jid, format = bare_jid}, |
172 |
|
#db_mapping{column = remote_resource, param = remote_jid, format = jid_resource}, |
173 |
|
#db_mapping{column = direction, param = direction, format = direction}, |
174 |
|
#db_mapping{column = from_jid, param = source_jid, format = jid}, |
175 |
|
#db_mapping{column = origin_id, param = origin_id, format = maybe_string}, |
176 |
|
#db_mapping{column = message, param = packet, format = xml}, |
177 |
|
#db_mapping{column = search_body, param = packet, format = search}]. |
178 |
|
|
179 |
|
lookup_fields() -> |
180 |
|
%% Describe each possible filtering option |
181 |
:-( |
[#lookup_field{op = equal, column = user_id, param = archive_id, required = true}, |
182 |
|
#lookup_field{op = ge, column = id, param = start_id}, |
183 |
|
#lookup_field{op = le, column = id, param = end_id}, |
184 |
|
#lookup_field{op = equal, column = remote_bare_jid, param = remote_bare_jid}, |
185 |
|
#lookup_field{op = equal, column = remote_resource, param = remote_resource}, |
186 |
|
#lookup_field{op = like, column = search_body, param = norm_search_text, value_maker = search_words}]. |
187 |
|
|
188 |
|
-spec env_vars(host_type(), jid:jid()) -> env_vars(). |
189 |
|
env_vars(HostType, ArcJID) -> |
190 |
|
%% Please, minimize the usage of the host_type field. |
191 |
|
%% It's only for passing into RDBMS. |
192 |
:-( |
#{host_type => HostType, |
193 |
|
archive_jid => ArcJID, |
194 |
|
table => mam_message, |
195 |
|
index_hint_fn => fun index_hint_sql/1, |
196 |
|
columns_sql_fn => fun columns_sql/1, |
197 |
|
column_to_id_fn => fun column_to_id/1, |
198 |
|
lookup_fn => fun lookup_query/5, |
199 |
|
decode_row_fn => fun row_to_uniform_format/2, |
200 |
|
has_message_retraction => mod_mam_utils:has_message_retraction(mod_mam, HostType), |
201 |
|
has_full_text_search => mod_mam_utils:has_full_text_search(mod_mam, HostType), |
202 |
|
db_jid_codec => db_jid_codec(HostType, ?MODULE), |
203 |
|
db_message_codec => db_message_codec(HostType, ?MODULE)}. |
204 |
|
|
205 |
|
row_to_uniform_format(Row, Env) -> |
206 |
:-( |
mam_decoder:decode_row(Row, Env). |
207 |
|
|
208 |
|
-spec index_hint_sql(env_vars()) -> string(). |
209 |
|
index_hint_sql(#{host_type := HostType}) -> |
210 |
:-( |
case mongoose_rdbms:db_engine(HostType) of |
211 |
:-( |
mysql -> "USE INDEX(PRIMARY, i_mam_message_rem) "; |
212 |
:-( |
_ -> "" |
213 |
|
end. |
214 |
|
|
215 |
:-( |
columns_sql(lookup) -> "id, from_jid, message"; |
216 |
:-( |
columns_sql(count) -> "COUNT(*)". |
217 |
|
|
218 |
|
%% For each unique column in lookup_fields() |
219 |
:-( |
column_to_id(id) -> "i"; |
220 |
:-( |
column_to_id(user_id) -> "u"; |
221 |
:-( |
column_to_id(remote_bare_jid) -> "b"; |
222 |
:-( |
column_to_id(remote_resource) -> "r"; |
223 |
:-( |
column_to_id(search_body) -> "s". |
224 |
|
|
225 |
|
column_names(Mappings) -> |
226 |
:-( |
[Column || #db_mapping{column = Column} <- Mappings]. |
227 |
|
|
228 |
|
%% ---------------------------------------------------------------------- |
229 |
|
%% Options |
230 |
|
|
231 |
|
-spec db_jid_codec(host_type(), module()) -> module(). |
232 |
|
db_jid_codec(HostType, Module) -> |
233 |
:-( |
gen_mod:get_module_opt(HostType, Module, db_jid_format). |
234 |
|
|
235 |
|
-spec db_message_codec(host_type(), module()) -> module(). |
236 |
|
db_message_codec(HostType, Module) -> |
237 |
:-( |
gen_mod:get_module_opt(HostType, Module, db_message_format). |
238 |
|
|
239 |
|
-spec get_retract_id(exml:element(), env_vars()) -> none | mod_mam_utils:retraction_id(). |
240 |
|
get_retract_id(Packet, #{has_message_retraction := Enabled}) -> |
241 |
:-( |
mod_mam_utils:get_retract_id(Enabled, Packet). |
242 |
|
|
243 |
|
%% ---------------------------------------------------------------------- |
244 |
|
%% Internal functions and callbacks |
245 |
|
|
246 |
|
-spec archive_size(Size :: integer(), HostType :: host_type(), |
247 |
|
ArcId :: mod_mam:archive_id(), ArcJID :: jid:jid()) -> integer(). |
248 |
|
archive_size(Size, HostType, ArcID, ArcJID) when is_integer(Size) -> |
249 |
:-( |
Filter = [{equal, user_id, ArcID}], |
250 |
:-( |
Env = env_vars(HostType, ArcJID), |
251 |
:-( |
Result = lookup_query(count, Env, Filter, unordered, all), |
252 |
:-( |
mongoose_rdbms:selected_to_integer(Result). |
253 |
|
|
254 |
|
-spec archive_message(_Result, host_type(), mod_mam:archive_message_params()) -> ok. |
255 |
|
archive_message(_Result, HostType, Params = #{local_jid := ArcJID}) -> |
256 |
:-( |
try |
257 |
:-( |
assert_archive_id_provided(Params), |
258 |
:-( |
Env = env_vars(HostType, ArcJID), |
259 |
:-( |
do_archive_message(HostType, Params, Env), |
260 |
:-( |
retract_message(HostType, Params, Env), |
261 |
:-( |
ok |
262 |
|
catch error:Reason:StackTrace -> |
263 |
:-( |
?LOG_ERROR(#{what => archive_message_failed, |
264 |
|
host_type => HostType, mam_params => Params, |
265 |
:-( |
reason => Reason, stacktrace => StackTrace}), |
266 |
:-( |
erlang:raise(error, Reason, StackTrace) |
267 |
|
end. |
268 |
|
|
269 |
|
do_archive_message(HostType, Params, Env) -> |
270 |
:-( |
Row = mam_encoder:encode_message(Params, Env, db_mappings()), |
271 |
:-( |
{updated, 1} = mongoose_rdbms:execute_successfully(HostType, insert_mam_message, Row). |
272 |
|
|
273 |
|
%% Retraction logic |
274 |
|
%% Called after inserting a new message |
275 |
|
-spec retract_message(host_type(), mod_mam:archive_message_params()) -> ok. |
276 |
|
retract_message(HostType, #{local_jid := ArcJID} = Params) -> |
277 |
:-( |
Env = env_vars(HostType, ArcJID), |
278 |
:-( |
retract_message(HostType, Params, Env). |
279 |
|
|
280 |
|
-spec retract_message(host_type(), mod_mam:archive_message_params(), env_vars()) -> ok. |
281 |
|
retract_message(HostType, #{archive_id := ArcID, remote_jid := RemJID, |
282 |
|
direction := Dir, packet := Packet} = Params, Env) -> |
283 |
:-( |
case get_retract_id(Packet, Env) of |
284 |
:-( |
none -> ok; |
285 |
|
RetractionId -> |
286 |
:-( |
Info = get_retraction_info(HostType, ArcID, RemJID, RetractionId, Dir, Env), |
287 |
:-( |
make_tombstone(HostType, ArcID, RetractionId, Info, Params, Env) |
288 |
|
end. |
289 |
|
|
290 |
|
get_retraction_info(HostType, ArcID, RemJID, RetractionId, Dir, Env) -> |
291 |
|
%% Code style notice: |
292 |
|
%% - Add Ext prefix for all externally encoded data |
293 |
|
%% (in cases, when we usually add Bin, B, S Esc prefixes) |
294 |
:-( |
ExtBareRemJID = mam_encoder:encode_jid(jid:to_bare(RemJID), Env), |
295 |
:-( |
ExtDir = mam_encoder:encode_direction(Dir), |
296 |
:-( |
{selected, Rows} = execute_select_messages_to_retract( |
297 |
|
HostType, ArcID, ExtBareRemJID, RetractionId, ExtDir), |
298 |
:-( |
mam_decoder:decode_retraction_info(Env, Rows, RetractionId). |
299 |
|
|
300 |
|
make_tombstone(_HostType, ArcID, RetractionId, skip, _Params, _Env) -> |
301 |
:-( |
?LOG_INFO(#{what => make_tombstone_failed, |
302 |
|
text => <<"Message to retract was not found">>, |
303 |
:-( |
user_id => ArcID, retraction_context => RetractionId}); |
304 |
|
make_tombstone(HostType, ArcID, _RetractionId, |
305 |
|
RetractionInfo = #{message_id := MessID}, Params, |
306 |
|
#{archive_jid := ArcJID} = Env) -> |
307 |
:-( |
RetractionInfo1 = mongoose_hooks:mam_retraction(HostType, RetractionInfo, Params), |
308 |
:-( |
Tombstone = mod_mam_utils:tombstone(RetractionInfo1, ArcJID), |
309 |
:-( |
TombstoneData = mam_encoder:encode_packet(Tombstone, Env), |
310 |
:-( |
execute_make_tombstone(HostType, TombstoneData, ArcID, MessID). |
311 |
|
|
312 |
|
execute_select_messages_to_retract(HostType, ArcID, BareRemJID, {origin_id, OriginID}, Dir) -> |
313 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_select_messages_to_retract_on_origin_id, |
314 |
|
[ArcID, BareRemJID, OriginID, Dir]); |
315 |
|
execute_select_messages_to_retract(HostType, ArcID, BareRemJID, {stanza_id, BinStanzaId}, Dir) -> |
316 |
:-( |
StanzaId = mod_mam_utils:external_binary_to_mess_id(BinStanzaId), |
317 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_select_messages_to_retract_on_stanza_id, |
318 |
|
[ArcID, BareRemJID, StanzaId, Dir]). |
319 |
|
|
320 |
|
execute_make_tombstone(HostType, TombstoneData, ArcID, MessID) -> |
321 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_make_tombstone, |
322 |
|
[TombstoneData, ArcID, MessID]). |
323 |
|
|
324 |
|
%% Insert logic |
325 |
|
-spec prepare_message(host_type(), mod_mam:archive_message_params()) -> list(). |
326 |
|
prepare_message(HostType, Params = #{local_jid := ArcJID}) -> |
327 |
:-( |
Env = env_vars(HostType, ArcJID), |
328 |
:-( |
mam_encoder:encode_message(Params, Env, db_mappings()). |
329 |
|
|
330 |
|
-spec prepare_insert(Name :: atom(), NumRows :: pos_integer()) -> ok. |
331 |
|
prepare_insert(Name, NumRows) -> |
332 |
:-( |
Table = mam_message, |
333 |
:-( |
Fields = column_names(db_mappings()), |
334 |
:-( |
{Query, Fields2} = rdbms_queries:create_bulk_insert_query(Table, Fields, NumRows), |
335 |
:-( |
mongoose_rdbms:prepare(Name, Table, Fields2, Query), |
336 |
:-( |
ok. |
337 |
|
|
338 |
|
%% Removal logic |
339 |
|
-spec remove_archive(Acc :: mongoose_acc:t(), HostType :: host_type(), |
340 |
|
ArcID :: mod_mam:archive_id(), |
341 |
|
RoomJID :: jid:jid()) -> mongoose_acc:t(). |
342 |
|
remove_archive(Acc, HostType, ArcID, _ArcJID) -> |
343 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_archive_remove, [ArcID]), |
344 |
:-( |
Acc. |
345 |
|
|
346 |
|
-spec remove_domain(mongoose_hooks:simple_acc(), host_type(), jid:lserver()) -> |
347 |
|
mongoose_hooks:simple_acc(). |
348 |
|
remove_domain(Acc, HostType, Domain) -> |
349 |
:-( |
{atomic, _} = mongoose_rdbms:sql_transaction(HostType, fun() -> |
350 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain, [Domain]), |
351 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_prefs, [Domain]), |
352 |
:-( |
mongoose_rdbms:execute_successfully(HostType, mam_remove_domain_users, [Domain]) |
353 |
|
end), |
354 |
:-( |
Acc. |
355 |
|
|
356 |
|
%% GDPR logic |
357 |
|
extract_gdpr_messages(Env, ArcID) -> |
358 |
:-( |
Filters = [{equal, user_id, ArcID}], |
359 |
:-( |
lookup_query(lookup, Env, Filters, asc, all). |
360 |
|
|
361 |
|
%% Lookup logic |
362 |
|
-spec lookup_messages(Result :: any(), HostType :: host_type(), Params :: map()) -> |
363 |
|
{ok, mod_mam:lookup_result()}. |
364 |
|
lookup_messages({error, _Reason}=Result, _HostType, _Params) -> |
365 |
:-( |
Result; |
366 |
|
lookup_messages(_Result, HostType, Params = #{owner_jid := ArcJID}) -> |
367 |
:-( |
Env = env_vars(HostType, ArcJID), |
368 |
:-( |
ExdParams = mam_encoder:extend_lookup_params(Params, Env), |
369 |
:-( |
Filter = mam_filter:produce_filter(ExdParams, lookup_fields()), |
370 |
:-( |
mam_lookup:lookup(Env, Filter, ExdParams). |
371 |
|
|
372 |
|
lookup_query(QueryType, Env, Filters, Order, OffsetLimit) -> |
373 |
:-( |
mam_lookup_sql:lookup_query(QueryType, Env, Filters, Order, OffsetLimit). |
374 |
|
|
375 |
|
assert_archive_id_provided(#{archive_id := ArcID}) when is_integer(ArcID) -> |
376 |
:-( |
ok. |