./ct_report/coverage/mod_mam.COVER.html

1 %%==============================================================================
2 %% Copyright 2016 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_mam).
18 -behaviour(gen_mod).
19 -behaviour(mongoose_module_metrics).
20 -xep([{xep, 313}, {version, "1.1.0"}, {legacy_versions, ["0.5"]}]).
21 -xep([{xep, 424}, {version, "0.3.0"}]).
22
23 -include("mod_mam.hrl").
24 -include("mongoose_config_spec.hrl").
25
26 -type module_opts() :: gen_mod:module_opts().
27 -type module_map() :: gen_mod_deps:module_map().
28
29 %% ----------------------------------------------------------------------
30 %% Datetime types
31 %% Microseconds from 01.01.1970
32 -type unix_timestamp() :: non_neg_integer().
33
34 %% ----------------------------------------------------------------------
35 %% Other types
36 -type archive_behaviour() :: roster | always | never.
37 -type message_id() :: non_neg_integer().
38
39 -type archive_id() :: non_neg_integer().
40
41 -type borders() :: #mam_borders{}.
42
43 -type message_row() :: #{id := message_id(), jid := jid:jid(), packet := exml:element()}.
44 -type lookup_result() :: {TotalCount :: non_neg_integer() | undefined,
45 Offset :: non_neg_integer() | undefined,
46 MessageRows :: [message_row()]}.
47
48 -type lookup_result_map() :: #{total_count := TotalCount :: non_neg_integer() | undefined,
49 offset := Offset :: non_neg_integer() | undefined,
50 messages := MessageRows :: [message_row()],
51 is_complete => boolean()}.
52
53 %% Internal types
54 -type iterator_fun() :: fun(() -> {'ok', {_, _}}).
55 -type rewriter_fun() :: fun((JID :: jid:literal_jid())
56 -> jid:literal_jid()).
57 -type restore_option() :: {rewrite_jids, rewriter_fun() | [{binary(), binary()}]}
58 | new_message_ids.
59
60 -type preference() :: {DefaultMode :: archive_behaviour(),
61 AlwaysJIDs :: [jid:literal_jid()],
62 NeverJIDs :: [jid:literal_jid()]}.
63
64 -type archive_message_params() :: #{message_id := message_id(),
65 archive_id := archive_id(),
66 local_jid := jid:jid(),
67 remote_jid := jid:jid(),
68 source_jid := jid:jid(),
69 origin_id := binary() | none,
70 direction := atom(),
71 packet := exml:element(),
72 %% Only in mod_mam_pm
73 is_groupchat => boolean(),
74 %% Only in mod_mam_muc_rdbms_arch:retract_message/2
75 sender_id => archive_id()}.
76
77 -export_type([rewriter_fun/0,
78 borders/0,
79 preference/0,
80 archive_behaviour/0,
81 iterator_fun/0,
82 unix_timestamp/0,
83 archive_id/0,
84 lookup_result/0,
85 lookup_result_map/0,
86 message_row/0,
87 message_id/0,
88 restore_option/0,
89 archive_message_params/0,
90 mam_type/0
91 ]).
92
93 -export([start/2, stop/1, config_spec/0, supported_features/0, deps/2, config_metrics/1]).
94
95 %%--------------------------------------------------------------------
96 %% API
97 %%--------------------------------------------------------------------
98
99 -spec supported_features() -> [atom()].
100 supported_features() ->
101 205 [dynamic_domains].
102
103 -spec start(mongooseim:host_type(), module_opts()) -> any().
104 start(_HostType, _Opts) ->
105 203 ok.
106
107 -spec stop(mongooseim:host_type()) -> any().
108 stop(_HostType) ->
109 203 ok.
110
111 -spec config_spec() -> mongoose_config_spec:config_section().
112 config_spec() ->
113 186 Items = maps:merge(common_config_items(), root_config_items()),
114 186 #section{
115 items = Items#{<<"pm">> => pm_config_spec(),
116 <<"muc">> => muc_config_spec()},
117 defaults = #{<<"backend">> => rdbms,
118 <<"no_stanzaid_element">> => false,
119 <<"is_archivable_message">> => mod_mam_utils,
120 <<"send_message">> => mod_mam_utils,
121 <<"archive_chat_markers">> => false,
122 <<"message_retraction">> => true,
123 <<"full_text_search">> => true,
124 <<"cache_users">> => true,
125 <<"default_result_limit">> => 50,
126 <<"max_result_limit">> => 50,
127 <<"enforce_simple_queries">> => false}
128 }.
129
130 pm_config_spec() ->
131 186 #section{items = maps:merge(common_config_items(), pm_config_items()),
132 defaults = #{<<"archive_groupchats">> => false,
133 <<"same_mam_id_for_peers">> => false}}.
134
135 muc_config_spec() ->
136 186 #section{items = maps:merge(common_config_items(), muc_config_items()),
137 defaults = #{<<"host">> => mod_muc:default_host()}}.
138
139 root_config_items() ->
140 186 Cache = mongoose_user_cache:config_spec(),
141 186 AsyncWriter = async_config_spec(),
142 186 #{<<"cache">> => Cache#section{include = always},
143 <<"async_writer">> => AsyncWriter#section{include = always}}.
144
145 common_config_items() ->
146 558 #{%% General options
147 <<"backend">> => #option{type = atom,
148 validate = {enum, [rdbms, cassandra, elasticsearch]}},
149 <<"no_stanzaid_element">> => #option{type = boolean},
150 <<"is_archivable_message">> => #option{type = atom,
151 validate = module},
152 <<"send_message">> => #option{type = atom,
153 validate = module},
154 <<"archive_chat_markers">> => #option{type = boolean},
155 <<"message_retraction">> => #option{type = boolean},
156
157 %% Common backend options
158 <<"user_prefs_store">> => #option{type = atom,
159 validate = {enum, [rdbms, cassandra, mnesia]}},
160 <<"full_text_search">> => #option{type = boolean},
161
162 %% RDBMS-specific options
163 <<"cache_users">> => #option{type = boolean},
164
165 %% Low-level options
166 <<"default_result_limit">> => #option{type = integer,
167 validate = non_negative},
168 <<"enforce_simple_queries">> => #option{type = boolean},
169 <<"delete_domain_limit">> => #option{type = int_or_infinity,
170 validate = positive},
171 <<"max_result_limit">> => #option{type = integer,
172 validate = non_negative},
173 <<"db_jid_format">> => #option{type = atom,
174 validate = module},
175 <<"db_message_format">> => #option{type = atom,
176 validate = module},
177 <<"extra_fin_element">> => #option{type = atom,
178 validate = module},
179 <<"extra_lookup_params">> => #option{type = atom,
180 validate = module}
181 }.
182
183 pm_config_items() ->
184 186 #{<<"async_writer">> => async_config_spec(),
185 <<"archive_groupchats">> => #option{type = boolean},
186 <<"same_mam_id_for_peers">> => #option{type = boolean}}.
187
188 muc_config_items() ->
189 186 #{<<"async_writer">> => async_config_spec(),
190 <<"host">> => #option{type = string,
191 validate = subdomain_template,
192 process = fun mongoose_subdomain_utils:make_subdomain_pattern/1}}.
193
194 async_config_spec() ->
195 558 #section{
196 items = #{<<"enabled">> => #option{type = boolean},
197 <<"flush_interval">> => #option{type = integer, validate = non_negative},
198 <<"batch_size">> => #option{type = integer, validate = non_negative},
199 <<"pool_size">> => #option{type = integer, validate = non_negative}},
200 defaults = #{<<"enabled">> => true,
201 <<"flush_interval">> => 2000,
202 <<"batch_size">> => 30,
203 <<"pool_size">> => 4 * erlang:system_info(schedulers_online)}
204 }.
205
206 -spec deps(mongooseim:host_type(), module_opts()) -> gen_mod_deps:deps().
207 deps(_HostType, Opts) ->
208 6593 DepsWithPm = handle_nested_opts(pm, Opts, #{}),
209 6593 DepsWithPmAndMuc = handle_nested_opts(muc, Opts, DepsWithPm),
210
211 6593 [{DepMod, DepOpts, hard} || {DepMod, DepOpts} <- maps:to_list(DepsWithPmAndMuc)].
212
213 %%--------------------------------------------------------------------
214 %% Helpers
215 %%--------------------------------------------------------------------
216
217 -type mam_type() :: pm | muc.
218 -type mam_backend() :: rdbms | cassandra | elasticsearch.
219
220 -spec handle_nested_opts(mam_type(), module_opts(), module_map()) -> module_map().
221 handle_nested_opts(Key, RootOpts, Deps) ->
222 13186 case maps:find(Key, RootOpts) of
223 6109 error -> Deps;
224 {ok, Opts} ->
225 7077 FullOpts = maps:merge(maps:without([pm, muc], RootOpts), Opts),
226 7077 parse_opts(Key, FullOpts, Deps)
227 end.
228
229 -spec parse_opts(mam_type(), module_opts(), module_map()) -> module_map().
230 parse_opts(Type, Opts, Deps) ->
231 %% Opts are merged root options with options inside pm or muc section
232 7077 CoreMod = mam_type_to_core_mod(Type),
233 7077 CoreModOpts = maps:with(valid_core_mod_opts(CoreMod), Opts),
234 7077 WithCoreDeps = add_dep(CoreMod, CoreModOpts, Deps),
235 7077 {Backend, BackendOpts} = maps:take(backend, Opts),
236 7077 WithPrefs = add_prefs_store_module(Backend, Type, Opts, WithCoreDeps),
237 7077 parse_backend_opts(Backend, Type, BackendOpts, WithPrefs).
238
239 -spec mam_type_to_core_mod(mam_type()) -> module().
240 4614 mam_type_to_core_mod(pm) -> mod_mam_pm;
241 2463 mam_type_to_core_mod(muc) -> mod_mam_muc.
242
243 %% Get a list of options to pass into the two modules.
244 %% They don't have to be defined in pm or muc sections, the root section is enough.
245 -spec valid_core_mod_opts(module()) -> [atom()].
246 valid_core_mod_opts(mod_mam_pm) ->
247 4614 [archive_groupchats, same_mam_id_for_peers] ++ common_opts();
248 valid_core_mod_opts(mod_mam_muc) ->
249 2463 [host] ++ common_opts().
250
251 common_opts() ->
252 7077 [async_writer,
253 is_archivable_message,
254 send_message,
255 archive_chat_markers,
256 extra_fin_element,
257 extra_lookup_params,
258 full_text_search,
259 message_retraction,
260 default_result_limit,
261 max_result_limit,
262 enforce_simple_queries,
263 no_stanzaid_element].
264
265 -spec add_prefs_store_module(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
266 add_prefs_store_module(Backend, Type, #{user_prefs_store := Store}, Deps) ->
267 6696 PrefsModule = prefs_module(Backend, Store),
268 6696 add_dep(PrefsModule, #{Type => true}, Deps);
269 add_prefs_store_module(_Backend, _Type, _Opts, Deps) ->
270 381 Deps.
271
272 -spec parse_backend_opts(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
273 parse_backend_opts(cassandra, Type, Opts, Deps) ->
274
:-(
Opts1 = maps:with([db_message_format], Opts),
275
:-(
add_dep(cassandra_arch_module(Type), maps:merge(arch_defaults(), Opts1), Deps);
276 parse_backend_opts(rdbms, Type, Opts, Deps) ->
277 7077 lists:foldl(fun(OptionGroup, DepsIn) -> add_rdbms_deps(OptionGroup, Type, Opts, DepsIn) end,
278 Deps, [basic, user_cache, async_writer]);
279 parse_backend_opts(elasticsearch, Type, _Opts, Deps0) ->
280
:-(
add_dep(elasticsearch_arch_module(Type), Deps0).
281
282 -spec add_rdbms_deps(basic | user_cache | async_writer,
283 mam_type(), module_opts(), module_map()) -> module_map().
284 add_rdbms_deps(basic, Type, Opts, Deps) ->
285 7077 Opts1 = maps:with([db_message_format, db_jid_format, delete_domain_limit], Opts),
286 7077 Deps1 = add_dep(rdbms_arch_module(Type), maps:merge(rdbms_arch_defaults(Type), Opts1), Deps),
287 7077 add_dep(mod_mam_rdbms_user, user_db_types(Type), Deps1);
288 add_rdbms_deps(user_cache, Type, #{cache_users := true, cache := CacheOpts}, Deps) ->
289 3249 Deps1 = case gen_mod:get_opt(module, CacheOpts, internal) of
290 3249 internal -> Deps;
291
:-(
mod_cache_users -> add_dep(mod_cache_users, Deps)
292 end,
293 3249 add_dep(mod_mam_cache_user, CacheOpts#{Type => true}, Deps1);
294 add_rdbms_deps(async_writer, Type, #{async_writer := AsyncOpts = #{enabled := true}}, Deps) ->
295 2141 Deps1 = add_dep(rdbms_arch_module(Type), #{no_writer => true}, Deps),
296 2141 add_dep(rdbms_async_arch_module(Type), AsyncOpts, Deps1);
297 add_rdbms_deps(_, _Type, _Opts, Deps) ->
298 8764 Deps.
299
300 % muc backend requires both pm and muc user DB to populate sender_id column
301 -spec user_db_types(mam_type()) -> module_opts().
302 4614 user_db_types(pm) -> #{pm => true};
303 2463 user_db_types(muc) -> #{pm => true, muc => true}.
304
305
:-(
cassandra_arch_module(pm) -> mod_mam_cassandra_arch;
306
:-(
cassandra_arch_module(muc) -> mod_mam_muc_cassandra_arch.
307
308
:-(
arch_defaults() -> #{db_message_format => mam_message_xml}.
309
310 rdbms_arch_defaults(pm) ->
311 4614 maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_mini});
312 rdbms_arch_defaults(muc) ->
313 2463 maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_rfc}).
314
315 rdbms_arch_defaults() ->
316 7077 #{db_message_format => mam_message_compressed_eterm,
317 no_writer => false, delete_domain_limit => infinity}.
318
319 6014 rdbms_arch_module(pm) -> mod_mam_rdbms_arch;
320 3204 rdbms_arch_module(muc) -> mod_mam_muc_rdbms_arch.
321
322 1400 rdbms_async_arch_module(pm) -> mod_mam_rdbms_arch_async;
323 741 rdbms_async_arch_module(muc) -> mod_mam_muc_rdbms_arch_async.
324
325
:-(
elasticsearch_arch_module(pm) -> mod_mam_elasticsearch_arch;
326
:-(
elasticsearch_arch_module(muc) -> mod_mam_muc_elasticsearch_arch.
327
328 4784 prefs_module(rdbms, rdbms) -> mod_mam_rdbms_prefs;
329
:-(
prefs_module(cassandra, cassandra) -> mod_mam_cassandra_prefs;
330 1912 prefs_module(_, mnesia) -> mod_mam_mnesia_prefs;
331 prefs_module(Backend, PrefsStore) ->
332
:-(
error(#{what => invalid_mam_user_prefs_store,
333 backend => Backend,
334 user_prefs_store => PrefsStore}).
335
336 -spec add_dep(module(), module_map()) -> module_map().
337 add_dep(Dep, Deps) ->
338
:-(
add_dep(Dep, #{}, Deps).
339
340 -spec add_dep(module(), module_opts(), module_map()) -> module_map().
341 add_dep(Dep, Opts, Deps) ->
342 35458 PrevOpts = maps:get(Dep, Deps, #{}),
343 35458 NewOpts = maps:merge(PrevOpts, Opts),
344 35458 maps:put(Dep, NewOpts, Deps).
345
346 config_metrics(Host) ->
347 12 mongoose_module_metrics:opts_for_module(Host, ?MODULE, [backend]).
Line Hits Source