./ct_report/coverage/mod_mam.COVER.html

1 %%==============================================================================
2 %% Copyright 2016 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_mam).
18 -behaviour(gen_mod).
19 -behaviour(mongoose_module_metrics).
20 -xep([{xep, 313}, {version, "0.6"}, {legacy_versions, ["0.5"]}]).
21 -xep([{xep, 424}, {version, "0.3.0"}]).
22
23 -include("mod_mam.hrl").
24 -include("mongoose_config_spec.hrl").
25
26 -type module_opts() :: gen_mod:module_opts().
27 -type module_map() :: gen_mod_deps:module_map().
28
29 %% ----------------------------------------------------------------------
30 %% Datetime types
31 %% Microseconds from 01.01.1970
32 -type unix_timestamp() :: non_neg_integer().
33
34 %% ----------------------------------------------------------------------
35 %% Other types
36 -type archive_behaviour() :: roster | always | never.
37 -type message_id() :: non_neg_integer().
38
39 -type archive_id() :: non_neg_integer().
40
41 -type borders() :: #mam_borders{}.
42
43 -type message_row() :: #{id := message_id(), jid := jid:jid(), packet := exml:element()}.
44 -type lookup_result() :: {TotalCount :: non_neg_integer() | undefined,
45 Offset :: non_neg_integer() | undefined,
46 MessageRows :: [message_row()]}.
47
48 -type lookup_result_map() :: #{total_count := TotalCount :: non_neg_integer() | undefined,
49 offset := Offset :: non_neg_integer() | undefined,
50 messages := MessageRows :: [message_row()],
51 is_complete => boolean()}.
52
53 %% Internal types
54 -type iterator_fun() :: fun(() -> {'ok', {_, _}}).
55 -type rewriter_fun() :: fun((JID :: jid:literal_jid())
56 -> jid:literal_jid()).
57 -type restore_option() :: {rewrite_jids, rewriter_fun() | [{binary(), binary()}]}
58 | new_message_ids.
59
60 -type preference() :: {DefaultMode :: archive_behaviour(),
61 AlwaysJIDs :: [jid:literal_jid()],
62 NeverJIDs :: [jid:literal_jid()]}.
63
64 -type archive_message_params() :: #{message_id := message_id(),
65 archive_id := archive_id(),
66 local_jid := jid:jid(),
67 remote_jid := jid:jid(),
68 source_jid := jid:jid(),
69 origin_id := binary() | none,
70 direction := atom(),
71 packet := exml:element(),
72 %% Only in mod_mam_muc_rdbms_arch:retract_message/2
73 sender_id => archive_id()}.
74
75 -export_type([rewriter_fun/0,
76 borders/0,
77 preference/0,
78 archive_behaviour/0,
79 iterator_fun/0,
80 unix_timestamp/0,
81 archive_id/0,
82 lookup_result/0,
83 lookup_result_map/0,
84 message_row/0,
85 message_id/0,
86 restore_option/0,
87 archive_message_params/0,
88 mam_type/0
89 ]).
90
91 -export([start/2, stop/1, config_spec/0, supported_features/0, deps/2, config_metrics/1]).
92
93 %%--------------------------------------------------------------------
94 %% API
95 %%--------------------------------------------------------------------
96
97 -spec supported_features() -> [atom()].
98 supported_features() ->
99 192 [dynamic_domains].
100
101 -spec start(mongooseim:host_type(), module_opts()) -> any().
102 start(_HostType, _Opts) ->
103 190 ok.
104
105 -spec stop(mongooseim:host_type()) -> any().
106 stop(_HostType) ->
107 190 ok.
108
109 -spec config_spec() -> mongoose_config_spec:config_section().
110 config_spec() ->
111 186 Items = maps:merge(common_config_items(), root_config_items()),
112 186 #section{
113 items = Items#{<<"pm">> => pm_config_spec(),
114 <<"muc">> => muc_config_spec()},
115 defaults = #{<<"backend">> => rdbms,
116 <<"no_stanzaid_element">> => false,
117 <<"is_archivable_message">> => mod_mam_utils,
118 <<"send_message">> => mod_mam_utils,
119 <<"archive_chat_markers">> => false,
120 <<"message_retraction">> => true,
121 <<"full_text_search">> => true,
122 <<"cache_users">> => true,
123 <<"default_result_limit">> => 50,
124 <<"max_result_limit">> => 50,
125 <<"enforce_simple_queries">> => false}
126 }.
127
128 pm_config_spec() ->
129 186 #section{items = maps:merge(common_config_items(), pm_config_items()),
130 defaults = #{<<"archive_groupchats">> => false,
131 <<"same_mam_id_for_peers">> => false}}.
132
133 muc_config_spec() ->
134 186 #section{items = maps:merge(common_config_items(), muc_config_items()),
135 defaults = #{<<"host">> => mod_muc:default_host()}}.
136
137 root_config_items() ->
138 186 Cache = mongoose_user_cache:config_spec(),
139 186 AsyncWriter = async_config_spec(),
140 186 #{<<"cache">> => Cache#section{include = always},
141 <<"async_writer">> => AsyncWriter#section{include = always}}.
142
143 common_config_items() ->
144 558 #{%% General options
145 <<"backend">> => #option{type = atom,
146 validate = {enum, [rdbms, cassandra, elasticsearch]}},
147 <<"no_stanzaid_element">> => #option{type = boolean},
148 <<"is_archivable_message">> => #option{type = atom,
149 validate = module},
150 <<"send_message">> => #option{type = atom,
151 validate = module},
152 <<"archive_chat_markers">> => #option{type = boolean},
153 <<"message_retraction">> => #option{type = boolean},
154
155 %% Common backend options
156 <<"user_prefs_store">> => #option{type = atom,
157 validate = {enum, [rdbms, cassandra, mnesia]}},
158 <<"full_text_search">> => #option{type = boolean},
159
160 %% RDBMS-specific options
161 <<"cache_users">> => #option{type = boolean},
162
163 %% Low-level options
164 <<"default_result_limit">> => #option{type = integer,
165 validate = non_negative},
166 <<"enforce_simple_queries">> => #option{type = boolean},
167 <<"delete_domain_limit">> => #option{type = int_or_infinity,
168 validate = positive},
169 <<"max_result_limit">> => #option{type = integer,
170 validate = non_negative},
171 <<"db_jid_format">> => #option{type = atom,
172 validate = module},
173 <<"db_message_format">> => #option{type = atom,
174 validate = module},
175 <<"extra_fin_element">> => #option{type = atom,
176 validate = module},
177 <<"extra_lookup_params">> => #option{type = atom,
178 validate = module}
179 }.
180
181 pm_config_items() ->
182 186 #{<<"async_writer">> => async_config_spec(),
183 <<"archive_groupchats">> => #option{type = boolean},
184 <<"same_mam_id_for_peers">> => #option{type = boolean}}.
185
186 muc_config_items() ->
187 186 #{<<"async_writer">> => async_config_spec(),
188 <<"host">> => #option{type = string,
189 validate = subdomain_template,
190 process = fun mongoose_subdomain_utils:make_subdomain_pattern/1}}.
191
192 async_config_spec() ->
193 558 #section{
194 items = #{<<"enabled">> => #option{type = boolean},
195 <<"flush_interval">> => #option{type = integer, validate = non_negative},
196 <<"batch_size">> => #option{type = integer, validate = non_negative},
197 <<"pool_size">> => #option{type = integer, validate = non_negative}},
198 defaults = #{<<"enabled">> => true,
199 <<"flush_interval">> => 2000,
200 <<"batch_size">> => 30,
201 <<"pool_size">> => 4 * erlang:system_info(schedulers_online)}
202 }.
203
204 -spec deps(mongooseim:host_type(), module_opts()) -> gen_mod_deps:deps().
205 deps(_HostType, Opts) ->
206 1176 DepsWithPm = handle_nested_opts(pm, Opts, #{}),
207 1176 DepsWithPmAndMuc = handle_nested_opts(muc, Opts, DepsWithPm),
208
209 1176 [{DepMod, DepOpts, hard} || {DepMod, DepOpts} <- maps:to_list(DepsWithPmAndMuc)].
210
211 %%--------------------------------------------------------------------
212 %% Helpers
213 %%--------------------------------------------------------------------
214
215 -type mam_type() :: pm | muc.
216 -type mam_backend() :: rdbms | cassandra | elasticsearch.
217
218 -spec handle_nested_opts(mam_type(), module_opts(), module_map()) -> module_map().
219 handle_nested_opts(Key, RootOpts, Deps) ->
220 2352 case maps:find(Key, RootOpts) of
221 813 error -> Deps;
222 {ok, Opts} ->
223 1539 FullOpts = maps:merge(maps:without([pm, muc], RootOpts), Opts),
224 1539 parse_opts(Key, FullOpts, Deps)
225 end.
226
227 -spec parse_opts(mam_type(), module_opts(), module_map()) -> module_map().
228 parse_opts(Type, Opts, Deps) ->
229 %% Opts are merged root options with options inside pm or muc section
230 1539 CoreMod = mam_type_to_core_mod(Type),
231 1539 CoreModOpts = maps:with(valid_core_mod_opts(CoreMod), Opts),
232 1539 WithCoreDeps = add_dep(CoreMod, CoreModOpts, Deps),
233 1539 {Backend, BackendOpts} = maps:take(backend, Opts),
234 1539 WithPrefs = add_prefs_store_module(Backend, Type, Opts, WithCoreDeps),
235 1539 parse_backend_opts(Backend, Type, BackendOpts, WithPrefs).
236
237 -spec mam_type_to_core_mod(mam_type()) -> module().
238 905 mam_type_to_core_mod(pm) -> mod_mam_pm;
239 634 mam_type_to_core_mod(muc) -> mod_mam_muc.
240
241 %% Get a list of options to pass into the two modules.
242 %% They don't have to be defined in pm or muc sections, the root section is enough.
243 -spec valid_core_mod_opts(module()) -> [atom()].
244 valid_core_mod_opts(mod_mam_pm) ->
245 905 [archive_groupchats, same_mam_id_for_peers] ++ common_opts();
246 valid_core_mod_opts(mod_mam_muc) ->
247 634 [host] ++ common_opts().
248
249 common_opts() ->
250 1539 [async_writer,
251 is_archivable_message,
252 send_message,
253 archive_chat_markers,
254 extra_fin_element,
255 extra_lookup_params,
256 full_text_search,
257 message_retraction,
258 default_result_limit,
259 max_result_limit,
260 enforce_simple_queries,
261 no_stanzaid_element].
262
263 -spec add_prefs_store_module(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
264 add_prefs_store_module(Backend, Type, #{user_prefs_store := Store}, Deps) ->
265 1148 PrefsModule = prefs_module(Backend, Store),
266 1148 add_dep(PrefsModule, #{Type => true}, Deps);
267 add_prefs_store_module(_Backend, _Type, _Opts, Deps) ->
268 391 Deps.
269
270 -spec parse_backend_opts(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
271 parse_backend_opts(cassandra, Type, Opts, Deps) ->
272
:-(
Opts1 = maps:with([db_message_format], Opts),
273
:-(
add_dep(cassandra_arch_module(Type), maps:merge(arch_defaults(), Opts1), Deps);
274 parse_backend_opts(rdbms, Type, Opts, Deps) ->
275 1539 lists:foldl(fun(OptionGroup, DepsIn) -> add_rdbms_deps(OptionGroup, Type, Opts, DepsIn) end,
276 Deps, [basic, user_cache, async_writer]);
277 parse_backend_opts(elasticsearch, Type, _Opts, Deps0) ->
278
:-(
add_dep(elasticsearch_arch_module(Type), Deps0).
279
280 -spec add_rdbms_deps(basic | user_cache | async_writer,
281 mam_type(), module_opts(), module_map()) -> module_map().
282 add_rdbms_deps(basic, Type, Opts, Deps) ->
283 1539 Opts1 = maps:with([db_message_format, db_jid_format, delete_domain_limit], Opts),
284 1539 Deps1 = add_dep(rdbms_arch_module(Type), maps:merge(rdbms_arch_defaults(Type), Opts1), Deps),
285 1539 add_dep(mod_mam_rdbms_user, user_db_types(Type), Deps1);
286 add_rdbms_deps(user_cache, Type, #{cache_users := true, cache := CacheOpts}, Deps) ->
287 883 Deps1 = case gen_mod:get_opt(module, CacheOpts, internal) of
288 883 internal -> Deps;
289
:-(
mod_cache_users -> add_dep(mod_cache_users, Deps)
290 end,
291 883 add_dep(mod_mam_cache_user, CacheOpts#{Type => true}, Deps1);
292 add_rdbms_deps(async_writer, Type, #{async_writer := AsyncOpts = #{enabled := true}}, Deps) ->
293 563 Deps1 = add_dep(rdbms_arch_module(Type), #{no_writer => true}, Deps),
294 563 add_dep(rdbms_async_arch_module(Type), AsyncOpts, Deps1);
295 add_rdbms_deps(_, _Type, _Opts, Deps) ->
296 1632 Deps.
297
298 % muc backend requires both pm and muc user DB to populate sender_id column
299 -spec user_db_types(mam_type()) -> module_opts().
300 905 user_db_types(pm) -> #{pm => true};
301 634 user_db_types(muc) -> #{pm => true, muc => true}.
302
303
:-(
cassandra_arch_module(pm) -> mod_mam_cassandra_arch;
304
:-(
cassandra_arch_module(muc) -> mod_mam_muc_cassandra_arch.
305
306
:-(
arch_defaults() -> #{db_message_format => mam_message_xml}.
307
308 rdbms_arch_defaults(pm) ->
309 905 maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_mini});
310 rdbms_arch_defaults(muc) ->
311 634 maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_rfc}).
312
313 rdbms_arch_defaults() ->
314 1539 #{db_message_format => mam_message_compressed_eterm,
315 no_writer => false, delete_domain_limit => infinity}.
316
317 1246 rdbms_arch_module(pm) -> mod_mam_rdbms_arch;
318 856 rdbms_arch_module(muc) -> mod_mam_muc_rdbms_arch.
319
320 341 rdbms_async_arch_module(pm) -> mod_mam_rdbms_arch_async;
321 222 rdbms_async_arch_module(muc) -> mod_mam_muc_rdbms_arch_async.
322
323
:-(
elasticsearch_arch_module(pm) -> mod_mam_elasticsearch_arch;
324
:-(
elasticsearch_arch_module(muc) -> mod_mam_muc_elasticsearch_arch.
325
326 820 prefs_module(rdbms, rdbms) -> mod_mam_rdbms_prefs;
327
:-(
prefs_module(cassandra, cassandra) -> mod_mam_cassandra_prefs;
328 328 prefs_module(_, mnesia) -> mod_mam_mnesia_prefs;
329 prefs_module(Backend, PrefsStore) ->
330
:-(
error(#{what => invalid_mam_user_prefs_store,
331 backend => Backend,
332 user_prefs_store => PrefsStore}).
333
334 -spec add_dep(module(), module_map()) -> module_map().
335 add_dep(Dep, Deps) ->
336
:-(
add_dep(Dep, #{}, Deps).
337
338 -spec add_dep(module(), module_opts(), module_map()) -> module_map().
339 add_dep(Dep, Opts, Deps) ->
340 7774 PrevOpts = maps:get(Dep, Deps, #{}),
341 7774 NewOpts = maps:merge(PrevOpts, Opts),
342 7774 maps:put(Dep, NewOpts, Deps).
343
344 config_metrics(Host) ->
345 6 mongoose_module_metrics:opts_for_module(Host, ?MODULE, [backend]).
Line Hits Source