1 |
|
-module(mod_mam_rdbms_arch_async). |
2 |
|
|
3 |
|
-include("mongoose_logger.hrl"). |
4 |
|
|
5 |
|
-define(PM_PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]). |
6 |
|
-define(PM_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]). |
7 |
|
-define(MUC_PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]). |
8 |
|
-define(MUC_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]). |
9 |
|
|
10 |
|
-behaviour(gen_mod). |
11 |
|
-export([start/2, stop/1, config_spec/0, supported_features/0]). |
12 |
|
|
13 |
|
-export([archive_pm_message/3, archive_muc_message/3]). |
14 |
|
-export([mam_archive_sync/2, mam_muc_archive_sync/2]). |
15 |
|
-export([flush_pm/2, flush_muc/2]). |
16 |
|
|
17 |
|
-type writer_type() :: pm | muc. |
18 |
|
|
19 |
|
-ignore_xref([archive_pm_message/3, archive_muc_message/3]). |
20 |
|
-ignore_xref([mam_archive_sync/2, mam_muc_archive_sync/2]). |
21 |
|
|
22 |
|
-spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> |
23 |
|
ok | {error, timeout}. |
24 |
|
archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) -> |
25 |
1465 |
PoolName = mongoose_async_pools:pool_name(HostType, pm_mam), |
26 |
1465 |
wpool:cast(PoolName, {task, Params}, {hash_worker, ArcID}). |
27 |
|
|
28 |
|
-spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> |
29 |
|
ok. |
30 |
|
archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) -> |
31 |
465 |
Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0), |
32 |
465 |
PoolName = mongoose_async_pools:pool_name(HostType, muc_mam), |
33 |
465 |
wpool:cast(PoolName, {task, Params}, {hash_worker, RoomID}). |
34 |
|
|
35 |
|
-spec mam_archive_sync(term(), mongooseim:host_type()) -> term(). |
36 |
|
mam_archive_sync(Result, HostType) -> |
37 |
47 |
mongoose_async_pools:sync(HostType, pm_mam), |
38 |
47 |
Result. |
39 |
|
|
40 |
|
-spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term(). |
41 |
|
mam_muc_archive_sync(Result, HostType) -> |
42 |
58 |
mongoose_async_pools:sync(HostType, muc_mam), |
43 |
58 |
Result. |
44 |
|
|
45 |
|
%%% gen_mod callbacks |
46 |
|
-spec start(mongooseim:host_type(), [{writer_type(), gen_mod:module_opts()}]) -> any(). |
47 |
|
start(HostType, Opts) -> |
48 |
37 |
[ start_pool(HostType, Mod) || Mod <- Opts ]. |
49 |
|
|
50 |
|
-spec stop(mongooseim:host_type()) -> any(). |
51 |
|
stop(HostType) -> |
52 |
37 |
Opts = gen_mod:get_module_opts(HostType, ?MODULE), |
53 |
37 |
[ stop_pool(HostType, Mod) || Mod <- Opts ]. |
54 |
|
|
55 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
56 |
|
config_spec() -> |
57 |
146 |
mongoose_async_pools:config_spec(). |
58 |
|
|
59 |
|
-spec supported_features() -> [atom()]. |
60 |
|
supported_features() -> |
61 |
37 |
[dynamic_domains]. |
62 |
|
|
63 |
|
%%% internal callbacks |
64 |
|
-spec start_pool(mongooseim:host_type(), {writer_type(), gen_mod:module_opts()}) -> |
65 |
|
supervisor:startchild_ret(). |
66 |
|
start_pool(HostType, {Type, Opts}) -> |
67 |
51 |
{Opts1, Extra} = extend_opts(Type, Opts), |
68 |
51 |
prepare_insert_queries(Type, Extra), |
69 |
51 |
ensure_metrics(Type, HostType), |
70 |
51 |
register_hooks(Type, HostType), |
71 |
51 |
start_pool(Type, HostType, Opts1). |
72 |
|
|
73 |
|
extend_opts(Type, Opts) -> |
74 |
51 |
Merge = maps:merge(defaults(), maps:from_list(Opts)), |
75 |
51 |
Extra = add_batch_name(Type, Merge), |
76 |
51 |
Opts1 = maps:to_list(Extra), |
77 |
51 |
Opts2 = gen_mod:set_opt(flush_extra, Opts1, Extra), |
78 |
51 |
{add_callback(Type, Opts2), Extra}. |
79 |
|
|
80 |
|
%% Put batch_size into a statement name, so we could survive the module restarts |
81 |
|
%% with different batch sizes |
82 |
|
add_batch_name(pm, #{batch_size := MaxSize} = Opts) -> |
83 |
33 |
Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)}; |
84 |
|
add_batch_name(muc, #{batch_size := MaxSize} = Opts) -> |
85 |
18 |
Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}. |
86 |
|
|
87 |
|
add_callback(pm, Opts) -> |
88 |
33 |
gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_pm/2); |
89 |
|
add_callback(muc, Opts) -> |
90 |
18 |
gen_mod:set_opt(flush_callback, Opts, fun ?MODULE:flush_muc/2). |
91 |
|
|
92 |
|
defaults() -> |
93 |
51 |
#{flush_interval => 2000, |
94 |
|
batch_size => 30, |
95 |
|
pool_size => 4 * erlang:system_info(schedulers_online)}. |
96 |
|
|
97 |
|
prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) -> |
98 |
33 |
mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1), |
99 |
33 |
mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize); |
100 |
|
prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) -> |
101 |
18 |
mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1), |
102 |
18 |
mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize). |
103 |
|
|
104 |
|
multi_name(Name, Times) -> |
105 |
51 |
list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)). |
106 |
|
|
107 |
|
ensure_metrics(pm, HostType) -> |
108 |
33 |
mongoose_metrics:ensure_metric(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, histogram), |
109 |
33 |
mongoose_metrics:ensure_metric(HostType, ?PM_FLUSH_TIME, histogram); |
110 |
|
ensure_metrics(muc, HostType) -> |
111 |
18 |
mongoose_metrics:ensure_metric(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, histogram), |
112 |
18 |
mongoose_metrics:ensure_metric(HostType, ?MUC_FLUSH_TIME, histogram). |
113 |
|
|
114 |
|
register_hooks(pm, HostType) -> |
115 |
33 |
ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50), |
116 |
33 |
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50); |
117 |
|
register_hooks(muc, HostType) -> |
118 |
18 |
ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50), |
119 |
18 |
ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50). |
120 |
|
|
121 |
|
start_pool(pm, HostType, Opts) -> |
122 |
33 |
mongoose_async_pools:start_pool(HostType, pm_mam, Opts); |
123 |
|
start_pool(muc, HostType, Opts) -> |
124 |
18 |
mongoose_async_pools:start_pool(HostType, muc_mam, Opts). |
125 |
|
|
126 |
|
-spec stop_pool(mongooseim:host_type(), {writer_type(), term()}) -> ok. |
127 |
|
stop_pool(HostType, {pm, _}) -> |
128 |
33 |
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50), |
129 |
33 |
ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50), |
130 |
33 |
mongoose_async_pools:stop_pool(HostType, pm_mam); |
131 |
|
stop_pool(HostType, {muc, _}) -> |
132 |
18 |
ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50), |
133 |
18 |
ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50), |
134 |
18 |
mongoose_async_pools:stop_pool(HostType, muc_mam). |
135 |
|
|
136 |
|
%%% flush callbacks |
137 |
|
flush_pm(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) -> |
138 |
590 |
{FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]), |
139 |
590 |
mongoose_metrics:update(HostType, ?PM_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), |
140 |
590 |
mongoose_metrics:update(HostType, ?PM_FLUSH_TIME, FlushTime), |
141 |
590 |
Result. |
142 |
|
|
143 |
|
flush_muc(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) -> |
144 |
249 |
{FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]), |
145 |
249 |
mongoose_metrics:update(HostType, ?MUC_PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), |
146 |
249 |
mongoose_metrics:update(HostType, ?MUC_FLUSH_TIME, FlushTime), |
147 |
249 |
Result. |
148 |
|
|
149 |
|
%% mam workers callbacks |
150 |
|
do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount, |
151 |
|
batch_size := MaxSize, batch_name := BatchName}) -> |
152 |
590 |
Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], |
153 |
590 |
InsertResult = |
154 |
|
case MessageCount of |
155 |
|
MaxSize -> |
156 |
7 |
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)); |
157 |
|
OtherSize -> |
158 |
583 |
Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows], |
159 |
583 |
case lists:keyfind(error, 1, Results) of |
160 |
559 |
false -> {updated, OtherSize}; |
161 |
24 |
Error -> Error |
162 |
|
end |
163 |
|
end, |
164 |
590 |
case InsertResult of |
165 |
566 |
{updated, _Count} -> ok; |
166 |
|
{error, Reason} -> |
167 |
24 |
mongoose_metrics:update(HostType, modMamDropped, MessageCount), |
168 |
24 |
?LOG_ERROR(#{what => archive_message_failed, |
169 |
|
text => <<"archive_message query failed">>, |
170 |
:-( |
message_count => MessageCount, reason => Reason}), |
171 |
24 |
ok |
172 |
|
end, |
173 |
590 |
[mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], |
174 |
590 |
mongoose_hooks:mam_flush_messages(HostType, MessageCount), |
175 |
590 |
ok. |
176 |
|
|
177 |
|
do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount, |
178 |
|
batch_size := MaxSize, batch_name := BatchName}) -> |
179 |
249 |
Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], |
180 |
249 |
InsertResult = |
181 |
|
case MessageCount of |
182 |
|
MaxSize -> |
183 |
:-( |
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)); |
184 |
|
OtherSize -> |
185 |
249 |
Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows], |
186 |
249 |
case lists:keyfind(error, 1, Results) of |
187 |
200 |
false -> {updated, OtherSize}; |
188 |
49 |
Error -> Error |
189 |
|
end |
190 |
|
end, |
191 |
249 |
case InsertResult of |
192 |
200 |
{updated, _Count} -> ok; |
193 |
|
{error, Reason} -> |
194 |
49 |
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount), |
195 |
49 |
?LOG_ERROR(#{what => archive_message_query_failed, |
196 |
|
text => <<"archive_message query failed, modMucMamDropped metric updated">>, |
197 |
:-( |
message_count => MessageCount, reason => Reason}), |
198 |
49 |
ok |
199 |
|
end, |
200 |
249 |
[mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], |
201 |
249 |
mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount), |
202 |
249 |
ok. |