./ct_report/coverage/mod_mam_rdbms_arch_async.COVER.html

1 -module(mod_mam_rdbms_arch_async).
2
3 -include("mongoose_logger.hrl").
4
5 -define(PM_PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]).
6 -define(PM_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]).
7 -define(MUC_PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(MUC_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11 -export([start/2, stop/1, config_spec/0, supported_features/0]).
12
13 -export([archive_pm_message/3, archive_muc_message/3]).
14 -export([mam_archive_sync/2, mam_muc_archive_sync/2]).
15 -export([flush_pm/2, flush_muc/2]).
16
17 -type writer_type() :: pm | muc.
18
19 -ignore_xref([archive_pm_message/3, archive_muc_message/3]).
20 -ignore_xref([mam_archive_sync/2, mam_muc_archive_sync/2]).
21
22 -spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) ->
23 ok | {error, timeout}.
24 archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) ->
25
:-(
PoolName = mongoose_async_pools:pool_name(HostType, pm_mam),
26
:-(
wpool:cast(PoolName, {task, Params}, {hash_worker, ArcID}).
27
28 -spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) ->
29 ok.
30 archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) ->
31
:-(
Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
32
:-(
PoolName = mongoose_async_pools:pool_name(HostType, muc_mam),
33
:-(
wpool:cast(PoolName, {task, Params}, {hash_worker, RoomID}).
34
35 -spec mam_archive_sync(term(), mongooseim:host_type()) -> term().
36 mam_archive_sync(Result, HostType) ->
37
:-(
mongoose_async_pools:sync(HostType, pm_mam),
38
:-(
Result.
39
40 -spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term().
41 mam_muc_archive_sync(Result, HostType) ->
42
:-(
mongoose_async_pools:sync(HostType, muc_mam),
43
:-(
Result.
44
45 %%% gen_mod callbacks
46 -spec start(mongooseim:host_type(), [{writer_type(), gen_mod:module_opts()}]) -> any().
47 start(HostType, Opts) ->
48
:-(
[ start_pool(HostType, Mod) || Mod <- Opts ].
49
50 -spec stop(mongooseim:host_type()) -> any().
51 stop(HostType) ->
52
:-(
Opts = gen_mod:get_module_opts(HostType, ?MODULE),
53
:-(
[ stop_pool(HostType, Mod) || Mod <- Opts ].
54
55 -spec config_spec() -> mongoose_config_spec:config_section().
56 config_spec() ->
57 160 mongoose_async_pools:config_spec().
58
59 -spec supported_features() -> [atom()].
60 supported_features() ->
61
:-(
[dynamic_domains].
62
63 %%% internal callbacks
64 -spec start_pool(mongooseim:host_type(), {writer_type(), gen_mod:module_opts()}) ->
65 supervisor:startchild_ret().
66 start_pool(HostType, {Type, Opts}) ->
67
:-(
{Opts1, Extra} = extend_opts(Type, Opts),
68
:-(
prepare_insert_queries(Type, Extra),
69
:-(
ensure_metrics(Type, HostType),
70
:-(
register_hooks(Type, HostType),
71
:-(
start_pool(Type, HostType, Opts1).
72
73 extend_opts(Type, Opts) ->
74
:-(
Merge = maps:merge(defaults(), maps:from_list(Opts)),
75
:-(
Extra = add_batch_name(Type, Merge),
76
:-(
Opts1 = maps:to_list(Extra),
77
:-(
Opts2 = gen_mod:set_opt(flush_extra, Opts1, Extra),
78
:-(
{add_callback(Type, Opts2), Extra}.
79
80 %% Put batch_size into a statement name, so we could survive the module restarts
81 %% with different batch sizes
82 add_batch_name(pm, #{batch_size := MaxSize} = Opts) ->
83
:-(
Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)};
84 add_batch_name(muc, #{batch_size := MaxSize} = Opts) ->
85
:-(
Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}.
86
87 add_callback(pm, Opts) ->
88
:-(
gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_pm/2);
89 add_callback(muc, Opts) ->
90
:-(
gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_muc/2).
91
92 defaults() ->
93
:-(
#{flush_interval => 2000,
94 batch_size => 30,
95 pool_size => 4 * erlang:system_info(schedulers_online)}.
96
97 prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) ->
98
:-(
mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1),
99
:-(
mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize);
100 prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) ->
101
:-(
mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1),
102
:-(
mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize).
103
104 multi_name(Name, Times) ->
105
:-(
list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)).
106
107 ensure_metrics(pm, HostType) ->
108
:-(
mongoose_metrics:ensure_metric(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, histogram),
109
:-(
mongoose_metrics:ensure_metric(HostType, ?PM_FLUSH_TIME, histogram);
110 ensure_metrics(muc, HostType) ->
111
:-(
mongoose_metrics:ensure_metric(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, histogram),
112
:-(
mongoose_metrics:ensure_metric(HostType, ?MUC_FLUSH_TIME, histogram).
113
114 register_hooks(pm, HostType) ->
115
:-(
ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
116
:-(
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50);
117 register_hooks(muc, HostType) ->
118
:-(
ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
119
:-(
ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50).
120
121 start_pool(pm, HostType, Opts) ->
122
:-(
mongoose_async_pools:start_pool(HostType, pm_mam, Opts);
123 start_pool(muc, HostType, Opts) ->
124
:-(
mongoose_async_pools:start_pool(HostType, muc_mam, Opts).
125
126 -spec stop_pool(mongooseim:host_type(), {writer_type(), term()}) -> ok.
127 stop_pool(HostType, {pm, _}) ->
128
:-(
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
129
:-(
ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
130
:-(
mongoose_async_pools:stop_pool(HostType, pm_mam);
131 stop_pool(HostType, {muc, _}) ->
132
:-(
ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
133
:-(
ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
134
:-(
mongoose_async_pools:stop_pool(HostType, muc_mam).
135
136 %%% flush callbacks
137 flush_pm(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
138
:-(
{FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]),
139
:-(
mongoose_metrics:update(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
140
:-(
mongoose_metrics:update(HostType, ?PM_FLUSH_TIME, FlushTime),
141
:-(
Result.
142
143 flush_muc(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
144
:-(
{FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]),
145
:-(
mongoose_metrics:update(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
146
:-(
mongoose_metrics:update(HostType, ?MUC_FLUSH_TIME, FlushTime),
147
:-(
Result.
148
149 %% mam workers callbacks
150 do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount,
151 batch_size := MaxSize, batch_name := BatchName}) ->
152
:-(
Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
153
:-(
InsertResult =
154 case MessageCount of
155 MaxSize ->
156
:-(
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
157 OtherSize ->
158
:-(
Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows],
159
:-(
case lists:keyfind(error, 1, Results) of
160
:-(
false -> {updated, OtherSize};
161
:-(
Error -> Error
162 end
163 end,
164
:-(
case InsertResult of
165
:-(
{updated, _Count} -> ok;
166 {error, Reason} ->
167
:-(
mongoose_metrics:update(HostType, modMamDropped, MessageCount),
168
:-(
?LOG_ERROR(#{what => archive_message_failed,
169 text => <<"archive_message query failed">>,
170
:-(
message_count => MessageCount, reason => Reason}),
171
:-(
ok
172 end,
173
:-(
[mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
174
:-(
mongoose_hooks:mam_flush_messages(HostType, MessageCount),
175
:-(
ok.
176
177 do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount,
178 batch_size := MaxSize, batch_name := BatchName}) ->
179
:-(
Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
180
:-(
InsertResult =
181 case MessageCount of
182 MaxSize ->
183
:-(
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
184 OtherSize ->
185
:-(
Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows],
186
:-(
case lists:keyfind(error, 1, Results) of
187
:-(
false -> {updated, OtherSize};
188
:-(
Error -> Error
189 end
190 end,
191
:-(
case InsertResult of
192
:-(
{updated, _Count} -> ok;
193 {error, Reason} ->
194
:-(
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount),
195
:-(
?LOG_ERROR(#{what => archive_message_query_failed,
196 text => <<"archive_message query failed, modMucMamDropped metric updated">>,
197
:-(
message_count => MessageCount, reason => Reason}),
198
:-(
ok
199 end,
200
:-(
[mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
201
:-(
mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount),
202
:-(
ok.
Line Hits Source