./ct_report/coverage/mod_mam_rdbms_arch_async.COVER.html

1 -module(mod_mam_rdbms_arch_async).
2
3 -include("mongoose_logger.hrl").
4
5 -define(PM_PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]).
6 -define(PM_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]).
7 -define(MUC_PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(MUC_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11 -export([start/2, stop/1, config_spec/0, supported_features/0]).
12
13 -export([archive_pm_message/3, archive_muc_message/3]).
14 -export([mam_archive_sync/2, mam_muc_archive_sync/2]).
15 -export([flush_pm/2, flush_muc/2]).
16
17 -type writer_type() :: pm | muc.
18
19 -ignore_xref([archive_pm_message/3, archive_muc_message/3]).
20 -ignore_xref([mam_archive_sync/2, mam_muc_archive_sync/2]).
21
22 -spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) ->
23 ok | {error, timeout}.
24 archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) ->
25 1465 PoolName = mongoose_async_pools:pool_name(HostType, pm_mam),
26 1465 wpool:cast(PoolName, {task, Params}, {hash_worker, ArcID}).
27
28 -spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) ->
29 ok.
30 archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) ->
31 465 Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
32 465 PoolName = mongoose_async_pools:pool_name(HostType, muc_mam),
33 465 wpool:cast(PoolName, {task, Params}, {hash_worker, RoomID}).
34
35 -spec mam_archive_sync(term(), mongooseim:host_type()) -> term().
36 mam_archive_sync(Result, HostType) ->
37 47 mongoose_async_pools:sync(HostType, pm_mam),
38 47 Result.
39
40 -spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term().
41 mam_muc_archive_sync(Result, HostType) ->
42 58 mongoose_async_pools:sync(HostType, muc_mam),
43 58 Result.
44
45 %%% gen_mod callbacks
46 -spec start(mongooseim:host_type(), [{writer_type(), gen_mod:module_opts()}]) -> any().
47 start(HostType, Opts) ->
48 37 [ start_pool(HostType, Mod) || Mod <- Opts ].
49
50 -spec stop(mongooseim:host_type()) -> any().
51 stop(HostType) ->
52 37 Opts = gen_mod:get_module_opts(HostType, ?MODULE),
53 37 [ stop_pool(HostType, Mod) || Mod <- Opts ].
54
55 -spec config_spec() -> mongoose_config_spec:config_section().
56 config_spec() ->
57 146 mongoose_async_pools:config_spec().
58
59 -spec supported_features() -> [atom()].
60 supported_features() ->
61 37 [dynamic_domains].
62
63 %%% internal callbacks
64 -spec start_pool(mongooseim:host_type(), {writer_type(), gen_mod:module_opts()}) ->
65 supervisor:startchild_ret().
66 start_pool(HostType, {Type, Opts}) ->
67 51 {Opts1, Extra} = extend_opts(Type, Opts),
68 51 prepare_insert_queries(Type, Extra),
69 51 ensure_metrics(Type, HostType),
70 51 register_hooks(Type, HostType),
71 51 start_pool(Type, HostType, Opts1).
72
73 extend_opts(Type, Opts) ->
74 51 Merge = maps:merge(defaults(), maps:from_list(Opts)),
75 51 Extra = add_batch_name(Type, Merge),
76 51 Opts1 = maps:to_list(Extra),
77 51 Opts2 = gen_mod:set_opt(flush_extra, Opts1, Extra),
78 51 {add_callback(Type, Opts2), Extra}.
79
80 %% Put batch_size into a statement name, so we could survive the module restarts
81 %% with different batch sizes
82 add_batch_name(pm, #{batch_size := MaxSize} = Opts) ->
83 33 Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)};
84 add_batch_name(muc, #{batch_size := MaxSize} = Opts) ->
85 18 Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}.
86
87 add_callback(pm, Opts) ->
88 33 gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_pm/2);
89 add_callback(muc, Opts) ->
90 18 gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_muc/2).
91
92 defaults() ->
93 51 #{flush_interval => 2000,
94 batch_size => 30,
95 pool_size => 4 * erlang:system_info(schedulers_online)}.
96
97 prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) ->
98 33 mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1),
99 33 mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize);
100 prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) ->
101 18 mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1),
102 18 mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize).
103
104 multi_name(Name, Times) ->
105 51 list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)).
106
107 ensure_metrics(pm, HostType) ->
108 33 mongoose_metrics:ensure_metric(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, histogram),
109 33 mongoose_metrics:ensure_metric(HostType, ?PM_FLUSH_TIME, histogram);
110 ensure_metrics(muc, HostType) ->
111 18 mongoose_metrics:ensure_metric(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, histogram),
112 18 mongoose_metrics:ensure_metric(HostType, ?MUC_FLUSH_TIME, histogram).
113
114 register_hooks(pm, HostType) ->
115 33 ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
116 33 ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50);
117 register_hooks(muc, HostType) ->
118 18 ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
119 18 ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50).
120
121 start_pool(pm, HostType, Opts) ->
122 33 mongoose_async_pools:start_pool(HostType, pm_mam, Opts);
123 start_pool(muc, HostType, Opts) ->
124 18 mongoose_async_pools:start_pool(HostType, muc_mam, Opts).
125
126 -spec stop_pool(mongooseim:host_type(), {writer_type(), term()}) -> ok.
127 stop_pool(HostType, {pm, _}) ->
128 33 ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
129 33 ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
130 33 mongoose_async_pools:stop_pool(HostType, pm_mam);
131 stop_pool(HostType, {muc, _}) ->
132 18 ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
133 18 ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
134 18 mongoose_async_pools:stop_pool(HostType, muc_mam).
135
136 %%% flush callbacks
137 flush_pm(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
138 590 {FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]),
139 590 mongoose_metrics:update(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
140 590 mongoose_metrics:update(HostType, ?PM_FLUSH_TIME, FlushTime),
141 590 Result.
142
143 flush_muc(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
144 249 {FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]),
145 249 mongoose_metrics:update(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
146 249 mongoose_metrics:update(HostType, ?MUC_FLUSH_TIME, FlushTime),
147 249 Result.
148
149 %% mam workers callbacks
150 do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount,
151 batch_size := MaxSize, batch_name := BatchName}) ->
152 590 Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
153 590 InsertResult =
154 case MessageCount of
155 MaxSize ->
156 7 mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
157 OtherSize ->
158 583 Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows],
159 583 case lists:keyfind(error, 1, Results) of
160 559 false -> {updated, OtherSize};
161 24 Error -> Error
162 end
163 end,
164 590 case InsertResult of
165 566 {updated, _Count} -> ok;
166 {error, Reason} ->
167 24 mongoose_metrics:update(HostType, modMamDropped, MessageCount),
168 24 ?LOG_ERROR(#{what => archive_message_failed,
169 text => <<"archive_message query failed">>,
170
:-(
message_count => MessageCount, reason => Reason}),
171 24 ok
172 end,
173 590 [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
174 590 mongoose_hooks:mam_flush_messages(HostType, MessageCount),
175 590 ok.
176
177 do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount,
178 batch_size := MaxSize, batch_name := BatchName}) ->
179 249 Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
180 249 InsertResult =
181 case MessageCount of
182 MaxSize ->
183
:-(
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
184 OtherSize ->
185 249 Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows],
186 249 case lists:keyfind(error, 1, Results) of
187 200 false -> {updated, OtherSize};
188 49 Error -> Error
189 end
190 end,
191 249 case InsertResult of
192 200 {updated, _Count} -> ok;
193 {error, Reason} ->
194 49 mongoose_metrics:update(HostType, modMucMamDropped, MessageCount),
195 49 ?LOG_ERROR(#{what => archive_message_query_failed,
196 text => <<"archive_message query failed, modMucMamDropped metric updated">>,
197
:-(
message_count => MessageCount, reason => Reason}),
198 49 ok
199 end,
200 249 [mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
201 249 mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount),
202 249 ok.
Line Hits Source