./ct_report/coverage/mod_mam.COVER.html

1 %%==============================================================================
2 %% Copyright 2016 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_mam).
18 -behaviour(gen_mod).
19 -behaviour(mongoose_module_metrics).
20
21 -include("mod_mam.hrl").
22 -include("mongoose_config_spec.hrl").
23
24 -type module_opts() :: gen_mod:module_opts().
25 -type module_map() :: gen_mod_deps:module_map().
26
27 %% ----------------------------------------------------------------------
28 %% Datetime types
29 %% Microseconds from 01.01.1970
30 -type unix_timestamp() :: non_neg_integer().
31
32 %% ----------------------------------------------------------------------
33 %% Other types
34 -type archive_behaviour() :: roster | always | never.
35 -type message_id() :: non_neg_integer().
36
37 -type archive_id() :: non_neg_integer().
38
39 -type borders() :: #mam_borders{}.
40
41 -type message_row() :: #{id := message_id(), jid := jid:jid(), packet := exml:element()}.
42 -type lookup_result() :: {TotalCount :: non_neg_integer() | undefined,
43 Offset :: non_neg_integer() | undefined,
44 MessageRows :: [message_row()]}.
45
46 %% Internal types
47 -type iterator_fun() :: fun(() -> {'ok', {_, _}}).
48 -type rewriter_fun() :: fun((JID :: jid:literal_jid())
49 -> jid:literal_jid()).
50 -type restore_option() :: {rewrite_jids, rewriter_fun() | [{binary(), binary()}]}
51 | new_message_ids.
52
53 -type preference() :: {DefaultMode :: archive_behaviour(),
54 AlwaysJIDs :: [jid:literal_jid()],
55 NeverJIDs :: [jid:literal_jid()]}.
56
57 -type archive_message_params() :: #{message_id := message_id(),
58 archive_id := archive_id(),
59 local_jid := jid:jid(),
60 remote_jid := jid:jid(),
61 source_jid := jid:jid(),
62 origin_id := binary() | none,
63 direction := atom(),
64 packet := exml:element(),
65 %% Only in mod_mam_muc_rdbms_arch:retract_message/2
66 sender_id => archive_id()}.
67
68 -export_type([rewriter_fun/0,
69 borders/0,
70 preference/0,
71 archive_behaviour/0,
72 iterator_fun/0,
73 unix_timestamp/0,
74 archive_id/0,
75 lookup_result/0,
76 message_row/0,
77 message_id/0,
78 restore_option/0,
79 archive_message_params/0
80 ]).
81
82 -export([start/2, stop/1, config_spec/0, supported_features/0, deps/2, config_metrics/1]).
83
84 -export([remove_unused_backend_opts/1]).
85
86 %%--------------------------------------------------------------------
87 %% API
88 %%--------------------------------------------------------------------
89
90 -spec supported_features() -> [atom()].
91 supported_features() ->
92
:-(
[dynamic_domains].
93
94 -spec start(mongooseim:host_type(), module_opts()) -> any().
95 start(_HostType, _Opts) ->
96 57 ok.
97
98 -spec stop(mongooseim:host_type()) -> any().
99 stop(_HostType) ->
100 57 ok.
101
102 -spec config_spec() -> mongoose_config_spec:config_section().
103 config_spec() ->
104 166 Items = maps:merge(common_config_items(), root_config_items()),
105 166 #section{
106 items = Items#{<<"pm">> => pm_config_spec(),
107 <<"muc">> => muc_config_spec(),
108 <<"riak">> => riak_config_spec()},
109 defaults = #{<<"backend">> => rdbms,
110 <<"no_stanzaid_element">> => false,
111 <<"is_archivable_message">> => mod_mam_utils,
112 <<"send_message">> => mod_mam_utils,
113 <<"archive_chat_markers">> => false,
114 <<"message_retraction">> => true,
115 <<"full_text_search">> => true,
116 <<"cache_users">> => true,
117 <<"default_result_limit">> => 50,
118 <<"max_result_limit">> => 50},
119 process = fun ?MODULE:remove_unused_backend_opts/1
120 }.
121
122
:-(
remove_unused_backend_opts(Opts = #{backend := riak}) -> Opts;
123
:-(
remove_unused_backend_opts(Opts) -> maps:remove(riak, Opts).
124
125 pm_config_spec() ->
126 166 #section{items = maps:merge(common_config_items(), pm_config_items()),
127 defaults = #{<<"archive_groupchats">> => false,
128 <<"same_mam_id_for_peers">> => false}}.
129
130 muc_config_spec() ->
131 166 #section{items = maps:merge(common_config_items(), muc_config_items()),
132 defaults = #{<<"host">> => mod_muc:default_host()}}.
133
134 root_config_items() ->
135 166 Cache = mongoose_user_cache:config_spec(),
136 166 AsyncWriter = async_config_spec(),
137 166 #{<<"cache">> => Cache#section{include = always},
138 <<"async_writer">> => AsyncWriter#section{include = always}}.
139
140 common_config_items() ->
141 498 #{%% General options
142 <<"backend">> => #option{type = atom,
143 validate = {enum, [rdbms, riak, cassandra, elasticsearch]}},
144 <<"no_stanzaid_element">> => #option{type = boolean},
145 <<"is_archivable_message">> => #option{type = atom,
146 validate = module},
147 <<"send_message">> => #option{type = atom,
148 validate = module},
149 <<"archive_chat_markers">> => #option{type = boolean},
150 <<"message_retraction">> => #option{type = boolean},
151
152 %% Common backend options
153 <<"user_prefs_store">> => #option{type = atom,
154 validate = {enum, [rdbms, cassandra, mnesia]}},
155 <<"full_text_search">> => #option{type = boolean},
156
157 %% RDBMS-specific options
158 <<"cache_users">> => #option{type = boolean},
159
160 %% Low-level options
161 <<"default_result_limit">> => #option{type = integer,
162 validate = non_negative},
163 <<"max_result_limit">> => #option{type = integer,
164 validate = non_negative},
165 <<"db_jid_format">> => #option{type = atom,
166 validate = module},
167 <<"db_message_format">> => #option{type = atom,
168 validate = module},
169 <<"extra_fin_element">> => #option{type = atom,
170 validate = module},
171 <<"extra_lookup_params">> => #option{type = atom,
172 validate = module}
173 }.
174
175 pm_config_items() ->
176 166 #{<<"async_writer">> => async_config_spec(),
177 <<"archive_groupchats">> => #option{type = boolean},
178 <<"same_mam_id_for_peers">> => #option{type = boolean}}.
179
180 muc_config_items() ->
181 166 #{<<"async_writer">> => async_config_spec(),
182 <<"host">> => #option{type = string,
183 validate = subdomain_template,
184 process = fun mongoose_subdomain_utils:make_subdomain_pattern/1}}.
185
186 async_config_spec() ->
187 498 #section{
188 items = #{<<"enabled">> => #option{type = boolean},
189 <<"flush_interval">> => #option{type = integer, validate = non_negative},
190 <<"batch_size">> => #option{type = integer, validate = non_negative},
191 <<"pool_size">> => #option{type = integer, validate = non_negative}},
192 defaults = #{<<"enabled">> => true,
193 <<"flush_interval">> => 2000,
194 <<"batch_size">> => 30,
195 <<"pool_size">> => 4 * erlang:system_info(schedulers_online)}
196 }.
197
198 riak_config_spec() ->
199 166 #section{
200 items = #{<<"search_index">> => #option{type = binary,
201 validate = non_empty},
202 <<"bucket_type">> => #option{type = binary,
203 validate = non_empty}},
204 defaults = #{<<"search_index">> => <<"mam">>,
205 <<"bucket_type">> => <<"mam_yz">>},
206 include = always
207 }.
208
209 -spec deps(mongooseim:host_type(), module_opts()) -> gen_mod_deps:deps().
210 deps(_HostType, Opts) ->
211 337 DepsWithPm = handle_nested_opts(pm, Opts, #{}),
212 337 DepsWithPmAndMuc = handle_nested_opts(muc, Opts, DepsWithPm),
213
214 337 [{DepMod, DepOpts, hard} || {DepMod, DepOpts} <- maps:to_list(DepsWithPmAndMuc)].
215
216 %%--------------------------------------------------------------------
217 %% Helpers
218 %%--------------------------------------------------------------------
219
220 -type mam_type() :: pm | muc.
221 -type mam_backend() :: rdbms | riak | cassandra | elasticsearch.
222
223 -spec handle_nested_opts(mam_type(), module_opts(), module_map()) -> module_map().
224 handle_nested_opts(Key, RootOpts, Deps) ->
225 674 case maps:find(Key, RootOpts) of
226 197 error -> Deps;
227 {ok, Opts} ->
228 477 FullOpts = maps:merge(maps:without([pm, muc], RootOpts), Opts),
229 477 parse_opts(Key, FullOpts, Deps)
230 end.
231
232 -spec parse_opts(mam_type(), module_opts(), module_map()) -> module_map().
233 parse_opts(Type, Opts, Deps) ->
234 %% Opts are merged root options with options inside pm or muc section
235 477 CoreMod = mam_type_to_core_mod(Type),
236 477 CoreModOpts = maps:with(valid_core_mod_opts(CoreMod), Opts),
237 477 WithCoreDeps = add_dep(CoreMod, CoreModOpts, Deps),
238 477 {Backend, BackendOpts} = maps:take(backend, Opts),
239 477 WithPrefs = add_prefs_store_module(Backend, Type, Opts, WithCoreDeps),
240 477 parse_backend_opts(Backend, Type, BackendOpts, WithPrefs).
241
242 -spec mam_type_to_core_mod(mam_type()) -> module().
243 269 mam_type_to_core_mod(pm) -> mod_mam_pm;
244 208 mam_type_to_core_mod(muc) -> mod_mam_muc.
245
246 %% Get a list of options to pass into the two modules.
247 %% They don't have to be defined in pm or muc sections, the root section is enough.
248 -spec valid_core_mod_opts(module()) -> [atom()].
249 valid_core_mod_opts(mod_mam_pm) ->
250 269 [archive_groupchats, same_mam_id_for_peers] ++ common_opts();
251 valid_core_mod_opts(mod_mam_muc) ->
252 208 [host] ++ common_opts().
253
254 common_opts() ->
255 477 [async_writer,
256 is_archivable_message,
257 send_message,
258 archive_chat_markers,
259 extra_fin_element,
260 extra_lookup_params,
261 full_text_search,
262 message_retraction,
263 default_result_limit,
264 max_result_limit,
265 no_stanzaid_element].
266
267 -spec add_prefs_store_module(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
268 add_prefs_store_module(Backend, Type, #{user_prefs_store := Store}, Deps) ->
269 250 PrefsModule = prefs_module(Backend, Store),
270 250 add_dep(PrefsModule, #{Type => true}, Deps);
271 add_prefs_store_module(_Backend, _Type, _Opts, Deps) ->
272 227 Deps.
273
274 -spec parse_backend_opts(mam_backend(), mam_type(), module_opts(), module_map()) -> module_map().
275 parse_backend_opts(cassandra, Type, Opts, Deps) ->
276 234 Opts1 = maps:with([db_message_format], Opts),
277 234 add_dep(cassandra_arch_module(Type), maps:merge(arch_defaults(), Opts1), Deps);
278 parse_backend_opts(riak, Type, Opts, Deps) ->
279
:-(
Opts1 = maps:with([db_message_format, riak], Opts),
280
:-(
add_dep(mod_mam_riak_timed_arch_yz, maps:merge(arch_defaults(), Opts1#{Type => true}), Deps);
281 parse_backend_opts(rdbms, Type, Opts, Deps) ->
282
:-(
lists:foldl(fun(OptionGroup, DepsIn) -> add_rdbms_deps(OptionGroup, Type, Opts, DepsIn) end,
283 Deps, [basic, user_cache, async_writer]);
284 parse_backend_opts(elasticsearch, Type, _Opts, Deps0) ->
285 243 add_dep(elasticsearch_arch_module(Type), Deps0).
286
287 -spec add_rdbms_deps(basic | user_cache | async_writer,
288 mam_type(), module_opts(), module_map()) -> module_map().
289 add_rdbms_deps(basic, Type, Opts, Deps) ->
290
:-(
Opts1 = maps:with([db_message_format, db_jid_format], Opts),
291
:-(
Deps1 = add_dep(rdbms_arch_module(Type), maps:merge(rdbms_arch_defaults(Type), Opts1), Deps),
292
:-(
add_dep(mod_mam_rdbms_user, user_db_types(Type), Deps1);
293 add_rdbms_deps(user_cache, Type, #{cache_users := true, cache := CacheOpts}, Deps) ->
294
:-(
Deps1 = case gen_mod:get_opt(module, CacheOpts, internal) of
295
:-(
internal -> Deps;
296
:-(
mod_cache_users -> add_dep(mod_cache_users, Deps)
297 end,
298
:-(
add_dep(mod_mam_cache_user, CacheOpts#{Type => true}, Deps1);
299 add_rdbms_deps(async_writer, Type, #{async_writer := AsyncOpts = #{enabled := true}}, Deps) ->
300
:-(
Deps1 = add_dep(rdbms_arch_module(Type), #{no_writer => true}, Deps),
301
:-(
add_dep(rdbms_async_arch_module(Type), AsyncOpts, Deps1);
302 add_rdbms_deps(_, _Type, _Opts, Deps) ->
303
:-(
Deps.
304
305 % muc backend requires both pm and muc user DB to populate sender_id column
306 -spec user_db_types(mam_type()) -> module_opts().
307
:-(
user_db_types(pm) -> #{pm => true};
308
:-(
user_db_types(muc) -> #{pm => true, muc => true}.
309
310 127 cassandra_arch_module(pm) -> mod_mam_cassandra_arch;
311 107 cassandra_arch_module(muc) -> mod_mam_muc_cassandra_arch.
312
313 234 arch_defaults() -> #{db_message_format => mam_message_xml}.
314
315 rdbms_arch_defaults(pm) ->
316
:-(
maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_mini});
317 rdbms_arch_defaults(muc) ->
318
:-(
maps:merge(rdbms_arch_defaults(), #{db_jid_format => mam_jid_rfc}).
319
320 rdbms_arch_defaults() ->
321
:-(
#{db_message_format => mam_message_compressed_eterm,
322 no_writer => false}.
323
324
:-(
rdbms_arch_module(pm) -> mod_mam_rdbms_arch;
325
:-(
rdbms_arch_module(muc) -> mod_mam_muc_rdbms_arch.
326
327
:-(
rdbms_async_arch_module(pm) -> mod_mam_rdbms_arch_async;
328
:-(
rdbms_async_arch_module(muc) -> mod_mam_muc_rdbms_arch_async.
329
330 142 elasticsearch_arch_module(pm) -> mod_mam_elasticsearch_arch;
331 101 elasticsearch_arch_module(muc) -> mod_mam_muc_elasticsearch_arch.
332
333
:-(
prefs_module(rdbms, rdbms) -> mod_mam_rdbms_prefs;
334 125 prefs_module(cassandra, cassandra) -> mod_mam_cassandra_prefs;
335 125 prefs_module(_, mnesia) -> mod_mam_mnesia_prefs;
336 prefs_module(Backend, PrefsStore) ->
337
:-(
error(#{what => invalid_mam_user_prefs_store,
338 backend => Backend,
339 user_prefs_store => PrefsStore}).
340
341 -spec add_dep(module(), module_map()) -> module_map().
342 add_dep(Dep, Deps) ->
343 243 add_dep(Dep, #{}, Deps).
344
345 -spec add_dep(module(), module_opts(), module_map()) -> module_map().
346 add_dep(Dep, Opts, Deps) ->
347 1204 PrevOpts = maps:get(Dep, Deps, #{}),
348 1204 NewOpts = maps:merge(PrevOpts, Opts),
349 1204 maps:put(Dep, NewOpts, Deps).
350
351 config_metrics(Host) ->
352
:-(
mongoose_module_metrics:opts_for_module(Host, ?MODULE, [backend]).
Line Hits Source