./ct_report/coverage/mod_mam.COVER.html

1 %%==============================================================================
2 %% Copyright 2016 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_mam).
18 -behaviour(gen_mod).
19 -behaviour(mongoose_module_metrics).
20 -xep([{xep, 313}, {version, "1.1.0"}, {legacy_versions, ["0.5"]}]).
21 -xep([{xep, 441}, {version, "0.2.0"}]).
22 -xep([{xep, 424}, {version, "0.3.0"}]).
23
24 -include("mod_mam.hrl").
25 -include("mongoose_config_spec.hrl").
26
27 -type module_opts() :: gen_mod:module_opts().
28 -type module_map() :: gen_mod_deps:module_map().
29
30 %% ----------------------------------------------------------------------
31 %% Datetime types
32 %% Microseconds from 01.01.1970
33 -type unix_timestamp() :: non_neg_integer().
34
35 %% ----------------------------------------------------------------------
36 %% Other types
37 -type archive_behaviour() :: roster | always | never.
38 -type message_id() :: non_neg_integer().
39
40 -type archive_id() :: non_neg_integer().
41
42 -type borders() :: #mam_borders{}.
43
44 -type message_row() :: #{id := message_id(), jid := jid:jid(), packet := exml:element()}.
45 -type lookup_result() :: {TotalCount :: non_neg_integer() | undefined,
46 Offset :: non_neg_integer() | undefined,
47 MessageRows :: [message_row()]}.
48
49 -type lookup_result_map() :: #{total_count := TotalCount :: non_neg_integer() | undefined,
50 offset := Offset :: non_neg_integer() | undefined,
51 messages := MessageRows :: [message_row()],
52 is_complete => boolean()}.
53
54 %% Internal types
55 -type iterator_fun() :: fun(() -> {'ok', {_, _}}).
56 -type rewriter_fun() :: fun((JID :: jid:literal_jid())
57 -> jid:literal_jid()).
58 -type restore_option() :: {rewrite_jids, rewriter_fun() | [{binary(), binary()}]}
59 | new_message_ids.
60
61 -type preference() :: {DefaultMode :: archive_behaviour(),
62 AlwaysJIDs :: [jid:literal_jid()],
63 NeverJIDs :: [jid:literal_jid()]}.
64
65 -type archive_message_params() :: #{message_id := message_id(),
66 archive_id := archive_id(),
67 local_jid := jid:jid(),
68 remote_jid := jid:jid(),
69 source_jid := jid:jid(),
70 origin_id := binary() | none,
71 direction := atom(),
72 packet := exml:element(),
73 %% Only in mod_mam_pm
74 is_groupchat => boolean(),
75 %% Only in mod_mam_muc_rdbms_arch:retract_message/2
76 sender_id => archive_id()}.
77
78 -export_type([rewriter_fun/0,
79 borders/0,
80 preference/0,
81 archive_behaviour/0,
82 iterator_fun/0,
83 unix_timestamp/0,
84 archive_id/0,
85 lookup_result/0,
86 lookup_result_map/0,
87 message_row/0,
88 message_id/0,
89 restore_option/0,
90 archive_message_params/0,
91 mam_type/0
92 ]).
93
94 -export([start/2, stop/1, config_spec/0, supported_features/0, deps/2, config_metrics/1]).
95
96 %%--------------------------------------------------------------------
97 %% API
98 %%--------------------------------------------------------------------
99
100 -spec supported_features() -> [atom()].
101 supported_features() ->
102 1 [dynamic_domains].
103
104 -spec start(mongooseim:host_type(), module_opts()) -> any().
105 start(_HostType, _Opts) ->
106 160 ok.
107
108 -spec stop(mongooseim:host_type()) -> any().
109 stop(_HostType) ->
110 160 ok.
111
112 -spec config_spec() -> mongoose_config_spec:config_section().
113 config_spec() ->
114 66 Items = maps:merge(common_config_items(), root_config_items()),
115 66 #section{
116 items = Items#{<<"pm">> => pm_config_spec(),
117 <<"muc">> => muc_config_spec()},
118 defaults = #{<<"backend">> => rdbms,
119 <<"no_stanzaid_element">> => false,
120 <<"is_archivable_message">> => mod_mam_utils,
121 <<"send_message">> => mod_mam_utils,
122 <<"archive_chat_markers">> => false,
123 <<"message_retraction">> => true,
124 <<"full_text_search">> => true,
125 <<"cache_users">> => true,
126 <<"default_result_limit">> => 50,
127 <<"max_result_limit">> => 50,
128 <<"enforce_simple_queries">> => false}
129 }.
130
131 pm_config_spec() ->
132 66 #section{items = maps:merge(common_config_items(), pm_config_items()),
133 defaults = #{<<"archive_groupchats">> => false,
134 <<"same_mam_id_for_peers">> => false}}.
135
136 muc_config_spec() ->
137 66 #section{items = maps:merge(common_config_items(), muc_config_items()),
138 defaults = #{<<"host">> => mod_muc:default_host()}}.
139
140 root_config_items() ->
141 66 Cache = mongoose_user_cache:config_spec(),
142 66 AsyncWriter = async_config_spec(),
143 66 #{<<"cache">> => Cache#section{include = always},
144 <<"async_writer">> => AsyncWriter#section{include = always}}.
145
146 common_config_items() ->
147 198 #{%% General options
148 <<"backend">> => #option{type = atom,
149 validate = {enum, [rdbms, cassandra, elasticsearch]}},
150 <<"no_stanzaid_element">> => #option{type = boolean},
151 <<"is_archivable_message">> => #option{type = atom,
152 validate = module},
153 <<"send_message">> => #option{type = atom,
154 validate = module},
155 <<"archive_chat_markers">> => #option{type = boolean},
156 <<"message_retraction">> => #option{type = boolean},
157
158 %% Common backend options
159 <<"user_prefs_store">> => #option{type = atom,
160 validate = {enum, [rdbms, cassandra, mnesia]}},
161 <<"full_text_search">> => #option{type = boolean},
162
163 %% RDBMS-specific options
164 <<"cache_users">> => #option{type = boolean},
165
166 %% Low-level options
167 <<"default_result_limit">> => #option{type = integer,
168 validate = non_negative},
169 <<"enforce_simple_queries">> => #option{type = boolean},
170 <<"delete_domain_limit">> => #option{type = int_or_infinity,
171 validate = positive},
172 <<"max_result_limit">> => #option{type = integer,
173 validate = non_negative},
174 <<"db_jid_format">> => #option{type = atom,
175 validate = module},
176 <<"db_message_format">> => #option{type = atom,
177 validate = module},
178 <<"extra_fin_element">> => #option{type = atom,
179 validate = module},
180 <<"extra_lookup_params">> => #option{type = atom,
181 validate = module}
182 }.
183
184 pm_config_items() ->
185 66 #{<<"async_writer">> => async_config_spec(),
186 <<"archive_groupchats">> => #option{type = boolean},
187 <<"same_mam_id_for_peers">> => #option{type = boolean}}.
188
189 muc_config_items() ->
190 66 #{<<"async_writer">> => async_config_spec(),
191 <<"host">> => #option{type = string,
192 validate = subdomain_template,
193 process = fun mongoose_subdomain_utils:make_subdomain_pattern/1}}.
194
195 async_config_spec() ->
196 198 #section{
197 items = #{<<"enabled">> => #option{type = boolean},
198 <<"flush_interval">> => #option{type = integer, validate = non_negative},
199 <<"batch_size">> => #option{type = integer, validate = non_negative},
200 <<"pool_size">> => #option{type = integer, validate = non_negative}},
201 defaults = #{<<"enabled">> => true,
202 <<"flush_interval">> => 2000,
203 <<"batch_size">> => 30,
204 <<"pool_size">> => 4 * erlang:system_info(schedulers_online)}
205 }.
206
207 -spec deps(mongooseim:host_type(), module_opts()) -> gen_mod_deps:deps().
208 deps(_HostType, Opts) ->
209 4952 DepsWithPm = handle_nested_opts(pm, Opts, #{}),
210 4952 DepsWithPmAndMuc = handle_nested_opts(muc, Opts, DepsWithPm),
211
212 4952 [{DepMod, DepOpts, hard} || {DepMod, DepOpts} <- maps:to_list(DepsWithPmAndMuc)].
213
214 %%--------------------------------------------------------------------
215 %% Helpers
216 %%--------------------------------------------------------------------
217
218 -type mam_type() :: pm | muc.
219 -type mam_backend() :: rdbms | cassandra | elasticsearch.
220
221 -spec handle_nested_opts(mam_type(), module_opts(), module_map()) -> module_map().
222 handle_nested_opts(Key, RootOpts, Deps) ->
223 9904 case maps:find(Key, RootOpts) of
224 4548 error -> Deps;
225 {ok, Opts} ->
226 5356 FullOpts = maps:merge(maps:without([pm, muc], RootOpts), Opts),
227 5356 parse_opts(Key, FullOpts, Deps)
228 end.
229
230 -spec parse_opts(mam_type(), module_opts(), module_map()) -> module_map().
231 parse_opts(Type, Opts, Deps) ->
232 %% Opts are merged root options with options inside pm or muc section
233 5356 CoreMod = mam_type_to_core_mod(Type),
234 5356 CoreModOpts = maps:with(valid_core_mod_opts(CoreMod), Opts),
235 5356 WithCoreDeps = add_dep(CoreMod, CoreModOpts, Deps),
236 5356 {Backend, BackendOpts} = maps:take(backend, Opts),
237 5356 WithPrefs = add_prefs_store_module(Backend, Type, Opts, WithCoreDeps),
238 5356 parse_backend_opts(Backend, Type, BackendOpts, WithPrefs).
239
240 -spec mam_type_to_core_mod(mam_type()) -> module().
241 3313 mam_type_to_core_mod(pm) -> mod_mam_pm;
242 2043 mam_type_to_core_mod(muc) -> mod_mam_muc.
243
244 %% Get a list of options to pass into the two modules.
245 %% They don't have to be defined in pm or muc sections, the root section is enough.
246 -spec valid_core_mod_opts(module()) -> [atom()].
247 valid_core_mod_opts(mod_mam_pm) ->
248 3313 [archive_groupchats, same_mam_id_for_peers] ++ common_opts();
249 valid_core_mod_opts(mod_mam_muc) ->
250 2043 [host] ++ common_opts().
251
252 common_opts() ->
253 5356 [async_writer,
254 is_archivable_message,
255 send_message,
256 archive_chat_markers,
257 extra_fin_element,
258 extra_lookup_params,
259 full_text_search,
260 message_retraction,
261 default_result_limit,
262 max_result_limit,
263 enforce_simple_queries,
264 no_stanzaid_element].
265
266 -spec add_prefs_store_module(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
267 add_prefs_store_module(Backend, Type, #{user_prefs_store := Store}, Deps) ->
268 4920 PrefsModule = prefs_module(Backend, Store),
269 4920 add_dep(PrefsModule, #{Type => true}, Deps);
270 add_prefs_store_module(_Backend, _Type, _Opts, Deps) ->
271 436 Deps.
272
273 -spec parse_backend_opts(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
274 parse_backend_opts(cassandra, Type, Opts, Deps) ->
275
:-(
Opts1 = maps:with([db_message_format], Opts),
276
:-(
add_dep(cassandra_arch_module(Type), maps:merge(arch_defaults(), Opts1), Deps);
277 parse_backend_opts(rdbms, Type, Opts, Deps) ->
278 5356 lists:foldl(fun(OptionGroup, DepsIn) -> add_rdbms_deps(OptionGroup, Type, Opts, DepsIn) end,
279 Deps, [basic, user_cache, async_writer]);
280 parse_backend_opts(elasticsearch, Type, _Opts, Deps0) ->
281
:-(
add_dep(elasticsearch_arch_module(Type), Deps0).
282
283 -spec add_rdbms_deps(basic | user_cache | async_writer,
284 mam_type(), module_opts(), module_map()) -> module_map().
285 add_rdbms_deps(basic, Type, Opts, Deps) ->
286 5356 Opts1 = maps:with([db_message_format, db_jid_format, delete_domain_limit], Opts),
287 5356 Deps1 = add_dep(rdbms_arch_module(Type), maps:merge(rdbms_arch_defaults(Type), Opts1), Deps),
288 5356 add_dep(mod_mam_rdbms_user, user_db_types(Type), Deps1);
289 add_rdbms_deps(user_cache, Type, #{cache_users := true, cache := CacheOpts}, Deps) ->
290 2404 Deps1 = case gen_mod:get_opt(module, CacheOpts, internal) of
291 2404 internal -> Deps;
292
:-(
mod_cache_users -> add_dep(mod_cache_users, Deps)
293 end,
294 2404 add_dep(mod_mam_cache_user, CacheOpts#{Type => true}, Deps1);
295 add_rdbms_deps(async_writer, Type, #{async_writer := AsyncOpts = #{enabled := true}}, Deps) ->
296 2262 Deps1 = add_dep(rdbms_arch_module(Type), #{no_writer => true}, Deps),
297 2262 add_dep(rdbms_async_arch_module(Type), AsyncOpts, Deps1);
298 add_rdbms_deps(_, _Type, _Opts, Deps) ->
299 6046 Deps.
300
301 % muc backend requires both pm and muc user DB to populate sender_id column
302 -spec user_db_types(mam_type()) -> module_opts().
303 3313 user_db_types(pm) -> #{pm => true};
304 2043 user_db_types(muc) -> #{pm => true, muc => true}.
305
306
:-(
cassandra_arch_module(pm) -> mod_mam_cassandra_arch;
307
:-(
cassandra_arch_module(muc) -> mod_mam_muc_cassandra_arch.
308
309
:-(
arch_defaults() -> #{db_message_format => mam_message_xml}.
310
311 rdbms_arch_defaults(pm) ->
312 3313 maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_mini});
313 rdbms_arch_defaults(muc) ->
314 2043 maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_rfc}).
315
316 rdbms_arch_defaults() ->
317 5356 #{db_message_format => mam_message_compressed_eterm,
318 no_writer => false, delete_domain_limit => infinity}.
319
320 4722 rdbms_arch_module(pm) -> mod_mam_rdbms_arch;
321 2896 rdbms_arch_module(muc) -> mod_mam_muc_rdbms_arch.
322
323 1409 rdbms_async_arch_module(pm) -> mod_mam_rdbms_arch_async;
324 853 rdbms_async_arch_module(muc) -> mod_mam_muc_rdbms_arch_async.
325
326
:-(
elasticsearch_arch_module(pm) -> mod_mam_elasticsearch_arch;
327
:-(
elasticsearch_arch_module(muc) -> mod_mam_muc_elasticsearch_arch.
328
329 4920 prefs_module(rdbms, rdbms) -> mod_mam_rdbms_prefs;
330
:-(
prefs_module(cassandra, cassandra) -> mod_mam_cassandra_prefs;
331
:-(
prefs_module(_, mnesia) -> mod_mam_mnesia_prefs;
332 prefs_module(Backend, PrefsStore) ->
333
:-(
error(#{what => invalid_mam_user_prefs_store,
334 backend => Backend,
335 user_prefs_store => PrefsStore}).
336
337 -spec add_dep(module(), module_map()) -> module_map().
338 add_dep(Dep, Deps) ->
339
:-(
add_dep(Dep, #{}, Deps).
340
341 -spec add_dep(module(), module_opts(), module_map()) -> module_map().
342 add_dep(Dep, Opts, Deps) ->
343 27916 PrevOpts = maps:get(Dep, Deps, #{}),
344 27916 NewOpts = maps:merge(PrevOpts, Opts),
345 27916 maps:put(Dep, NewOpts, Deps).
346
347 config_metrics(Host) ->
348 12 mongoose_module_metrics:opts_for_module(Host, ?MODULE, [backend]).
Line Hits Source