./ct_report/coverage/mod_mam_muc_rdbms_arch_async.COVER.html

1 -module(mod_mam_muc_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -define(PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11 -export([start/2, stop/1, hooks/1, supported_features/0]).
12 -export([archive_muc_message/3, mam_muc_archive_sync/3]).
13 -export([flush/2]).
14 -ignore_xref([flush/2]).
15
16 -spec archive_muc_message(Acc, Params, Extra) -> {ok, Acc} when
17 Acc :: {ok, mod_mam:lookup_result()},
18 Params :: map(),
19 Extra :: gen_hook:extra().
20 archive_muc_message(Result,
21 #{archive_id := RoomID} = Params0,
22 #{host_type := HostType}) ->
23 424 Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
24 424 mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params),
25 424 {ok, Result}.
26
27 -spec mam_muc_archive_sync(Acc, Params, Extra) -> {ok, Acc} when
28 Acc :: ok,
29 Params :: map(),
30 Extra :: gen_hook:extra().
31 mam_muc_archive_sync(Result, _Params, #{host_type := HostType}) ->
32 116 mongoose_async_pools:sync(HostType, muc_mam),
33 116 {ok, Result}.
34
35 %%% gen_mod callbacks
36 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
37 start(HostType, Opts) ->
38 23 {PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts),
39 23 mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra),
40 23 mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
41 23 mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
42 23 mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts).
43
44 -spec stop(mongooseim:host_type()) -> any().
45 stop(HostType) ->
46 23 mongoose_async_pools:stop_pool(HostType, muc_mam).
47
48 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
49 hooks(HostType) ->
50 46 [
51 {mam_muc_archive_sync, HostType, fun ?MODULE:mam_muc_archive_sync/3, #{}, 50},
52 {mam_muc_archive_message, HostType, fun ?MODULE:archive_muc_message/3, #{}, 50}
53 ].
54
55 -spec supported_features() -> [atom()].
56 supported_features() ->
57
:-(
[dynamic_domains].
58
59 %%% flush callbacks
60 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
61 221 {FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]),
62 221 mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
63 221 mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
64 221 Result.
65
66 %% mam workers callbacks
67 do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount,
68 batch_size := MaxSize, batch_name := BatchName}) ->
69 221 Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
70 221 IsFullBuffer = MessageCount =:= MaxSize,
71 221 case IsFullBuffer of
72 true ->
73
:-(
Result = mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)),
74
:-(
process_batch_result(Result, Acc, HostType, MessageCount);
75 _ ->
76 221 Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows],
77 221 Process = lists:zip(Results, Acc),
78 221 process_list_results(Process, HostType)
79 end,
80 221 [mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
81 221 mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount),
82 221 ok.
83
84 process_batch_result({updated, _Count}, _, _, _) ->
85
:-(
ok;
86 process_batch_result({error, Reason}, Rows, HostType, MessageCount) ->
87
:-(
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount),
88
:-(
Keys = [ maps:with([message_id, archive_id], Row) || Row <- Rows ],
89
:-(
?LOG_ERROR(#{what => archive_muc_batch_messages_failed,
90 text => <<"A batch of muc messages failed to archive, modMucMamDropped metric updated">>,
91
:-(
keys => Keys, message_count => MessageCount, reason => Reason}),
92
:-(
ok.
93
94 process_list_results(Results, HostType) ->
95 221 lists:foreach(fun(R) -> process_single_result(R, HostType) end, Results).
96
97 process_single_result({{updated, _Count}, _}, _HostType) ->
98 424 ok;
99 process_single_result({{error, Reason}, #{message_id := MsgId, archive_id := ArcId}}, HostType) ->
100
:-(
mongoose_metrics:update(HostType, modMucMamDropped, 1),
101
:-(
?LOG_ERROR(#{what => archive_muc_single_message_failed,
102 text => <<"Single muc message failed to archive, modMucMamDropped metric updated">>,
103
:-(
message_id => MsgId, archive_id => ArcId, reason => Reason}).
Line Hits Source