./ct_report/coverage/mod_mam_muc_rdbms_arch_async.COVER.html

1 -module(mod_mam_muc_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -behaviour(gen_mod).
8 -export([start/2, stop/1, hooks/1, instrumentation/1, supported_features/0]).
9 -export([archive_muc_message/3, mam_muc_archive_sync/3]).
10 -export([flush/2]).
11 -ignore_xref([flush/2]).
12
13 -spec archive_muc_message(Acc, Params, Extra) -> {ok, Acc} when
14 Acc :: {ok, mod_mam:lookup_result()},
15 Params :: map(),
16 Extra :: gen_hook:extra().
17 archive_muc_message(Result,
18 #{archive_id := RoomID} = Params0,
19 #{host_type := HostType}) ->
20 722 Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
21 722 mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params),
22 722 {ok, Result}.
23
24 -spec mam_muc_archive_sync(Acc, Params, Extra) -> {ok, Acc} when
25 Acc :: ok,
26 Params :: map(),
27 Extra :: gen_hook:extra().
28 mam_muc_archive_sync(Result, _Params, #{host_type := HostType}) ->
29 164 mongoose_async_pools:sync(HostType, muc_mam),
30 164 {ok, Result}.
31
32 %%% gen_mod callbacks
33 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
34 start(HostType, Opts) ->
35 23 {PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts),
36 23 mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra),
37 23 mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts).
38
39 -spec stop(mongooseim:host_type()) -> any().
40 stop(HostType) ->
41 23 mongoose_async_pools:stop_pool(HostType, muc_mam).
42
43 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
44 hooks(HostType) ->
45 46 [
46 {mam_muc_archive_sync, HostType, fun ?MODULE:mam_muc_archive_sync/3, #{}, 50},
47 {mam_muc_archive_message, HostType, fun ?MODULE:archive_muc_message/3, #{}, 50}
48 ].
49
50 -spec instrumentation(mongooseim:host_type()) -> [mongoose_instrument:spec()].
51 instrumentation(HostType) ->
52 47 [{mod_mam_muc_flushed, #{host_type => HostType},
53 #{metrics => #{time_per_message => histogram, time => histogram, count => spiral}}}].
54
55 -spec supported_features() -> [atom()].
56 supported_features() ->
57
:-(
[dynamic_domains].
58
59 %%% flush callbacks
60 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
61 410 mongoose_instrument:span(mod_mam_muc_flushed, #{host_type => HostType},
62 fun do_flush_muc/2, [Acc, Extra],
63 fun(Time, _Result) ->
64 410 #{time => Time,
65 time_per_message => round(Time / MessageCount),
66 count => MessageCount}
67 end).
68
69 %% mam workers callbacks
70 do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount,
71 batch_size := MaxSize, batch_name := BatchName}) ->
72 410 Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
73 410 IsFullBuffer = MessageCount =:= MaxSize,
74 410 case IsFullBuffer of
75 true ->
76
:-(
Result = mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)),
77
:-(
process_batch_result(Result, Acc, HostType, MessageCount);
78 _ ->
79 410 Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows],
80 410 Process = lists:zip(Results, Acc),
81 410 process_list_results(Process, HostType)
82 end,
83 410 [mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
84 410 mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount),
85 410 ok.
86
87 process_batch_result({updated, _Count}, _, _, _) ->
88
:-(
ok;
89 process_batch_result({error, Reason}, Rows, HostType, MessageCount) ->
90
:-(
mongoose_instrument:execute(mod_mam_muc_dropped, #{host_type => HostType}, #{count => MessageCount}),
91
:-(
Keys = [ maps:with([message_id, archive_id], Row) || Row <- Rows ],
92
:-(
?LOG_ERROR(#{what => archive_muc_batch_messages_failed,
93 text => <<"A batch of muc messages failed to archive, modMucMamDropped metric updated">>,
94
:-(
keys => Keys, message_count => MessageCount, reason => Reason}),
95
:-(
ok.
96
97 process_list_results(Results, HostType) ->
98 410 lists:foreach(fun(R) -> process_single_result(R, HostType) end, Results).
99
100 process_single_result({{updated, _Count}, _}, _HostType) ->
101 720 ok;
102 process_single_result({{error, Reason}, #{message_id := MsgId, archive_id := ArcId}}, HostType) ->
103 2 mongoose_instrument:execute(mod_mam_muc_dropped, #{host_type => HostType}, #{count => 1}),
104 2 ?LOG_ERROR(#{what => archive_muc_single_message_failed,
105 text => <<"Single muc message failed to archive, modMucMamDropped metric updated">>,
106
:-(
message_id => MsgId, archive_id => ArcId, reason => Reason}).
Line Hits Source