1 |
|
-module(mod_mam_muc_rdbms_arch_async). |
2 |
|
|
3 |
|
-behaviour(mongoose_batch_worker). |
4 |
|
|
5 |
|
-include("mongoose_logger.hrl"). |
6 |
|
|
7 |
|
-define(PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]). |
8 |
|
-define(FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]). |
9 |
|
|
10 |
|
-behaviour(gen_mod). |
11 |
|
-export([start/2, stop/1, hooks/1, supported_features/0]). |
12 |
|
-export([archive_muc_message/3, mam_muc_archive_sync/3]). |
13 |
|
-export([flush/2]). |
14 |
|
-ignore_xref([flush/2]). |
15 |
|
|
16 |
|
-spec archive_muc_message(Acc, Params, Extra) -> {ok, Acc} when |
17 |
|
Acc :: {ok, mod_mam:lookup_result()}, |
18 |
|
Params :: map(), |
19 |
|
Extra :: gen_hook:extra(). |
20 |
|
archive_muc_message(Result, |
21 |
|
#{archive_id := RoomID} = Params0, |
22 |
|
#{host_type := HostType}) -> |
23 |
716 |
Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0), |
24 |
716 |
mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params), |
25 |
716 |
{ok, Result}. |
26 |
|
|
27 |
|
-spec mam_muc_archive_sync(Acc, Params, Extra) -> {ok, Acc} when |
28 |
|
Acc :: ok, |
29 |
|
Params :: map(), |
30 |
|
Extra :: gen_hook:extra(). |
31 |
|
mam_muc_archive_sync(Result, _Params, #{host_type := HostType}) -> |
32 |
152 |
mongoose_async_pools:sync(HostType, muc_mam), |
33 |
152 |
{ok, Result}. |
34 |
|
|
35 |
|
%%% gen_mod callbacks |
36 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
37 |
|
start(HostType, Opts) -> |
38 |
20 |
{PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts), |
39 |
20 |
mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra), |
40 |
20 |
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram), |
41 |
20 |
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram), |
42 |
20 |
mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts). |
43 |
|
|
44 |
|
-spec stop(mongooseim:host_type()) -> any(). |
45 |
|
stop(HostType) -> |
46 |
20 |
mongoose_async_pools:stop_pool(HostType, muc_mam). |
47 |
|
|
48 |
|
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). |
49 |
|
hooks(HostType) -> |
50 |
40 |
[ |
51 |
|
{mam_muc_archive_sync, HostType, fun ?MODULE:mam_muc_archive_sync/3, #{}, 50}, |
52 |
|
{mam_muc_archive_message, HostType, fun ?MODULE:archive_muc_message/3, #{}, 50} |
53 |
|
]. |
54 |
|
|
55 |
|
-spec supported_features() -> [atom()]. |
56 |
|
supported_features() -> |
57 |
20 |
[dynamic_domains]. |
58 |
|
|
59 |
|
%%% flush callbacks |
60 |
|
flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) -> |
61 |
293 |
{FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]), |
62 |
293 |
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), |
63 |
293 |
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime), |
64 |
293 |
Result. |
65 |
|
|
66 |
|
%% mam workers callbacks |
67 |
|
do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount, |
68 |
|
batch_size := MaxSize, batch_name := BatchName}) -> |
69 |
293 |
Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], |
70 |
293 |
IsFullBuffer = MessageCount =:= MaxSize, |
71 |
293 |
case IsFullBuffer of |
72 |
|
true -> |
73 |
1 |
Result = mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)), |
74 |
1 |
process_batch_result(Result, Acc, HostType, MessageCount); |
75 |
|
_ -> |
76 |
292 |
Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows], |
77 |
292 |
Process = lists:zip(Results, Acc), |
78 |
292 |
process_list_results(Process, HostType) |
79 |
|
end, |
80 |
293 |
[mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], |
81 |
293 |
mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount), |
82 |
293 |
ok. |
83 |
|
|
84 |
|
process_batch_result({updated, _Count}, _, _, _) -> |
85 |
1 |
ok; |
86 |
|
process_batch_result({error, Reason}, Rows, HostType, MessageCount) -> |
87 |
:-( |
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount), |
88 |
:-( |
Keys = [ maps:with([message_id, archive_id], Row) || Row <- Rows ], |
89 |
:-( |
?LOG_ERROR(#{what => archive_muc_batch_messages_failed, |
90 |
|
text => <<"A batch of muc messages failed to archive, modMucMamDropped metric updated">>, |
91 |
:-( |
keys => Keys, message_count => MessageCount, reason => Reason}), |
92 |
:-( |
ok. |
93 |
|
|
94 |
|
process_list_results(Results, HostType) -> |
95 |
292 |
lists:foreach(fun(R) -> process_single_result(R, HostType) end, Results). |
96 |
|
|
97 |
|
process_single_result({{updated, _Count}, _}, _HostType) -> |
98 |
686 |
ok; |
99 |
|
process_single_result({{error, Reason}, #{message_id := MsgId, archive_id := ArcId}}, HostType) -> |
100 |
:-( |
mongoose_metrics:update(HostType, modMucMamDropped, 1), |
101 |
:-( |
?LOG_ERROR(#{what => archive_muc_single_message_failed, |
102 |
|
text => <<"Single muc message failed to archive, modMucMamDropped metric updated">>, |
103 |
:-( |
message_id => MsgId, archive_id => ArcId, reason => Reason}). |