./ct_report/coverage/mod_mam_rdbms_arch_async.COVER.html

1 -module(mod_mam_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -behaviour(gen_mod).
8
9 -export([start/2, stop/1, hooks/1, instrumentation/1, supported_features/0]).
10 -export([archive_pm_message/3, mam_archive_sync/3]).
11 -export([flush/2]).
12
13 -export([make_pool_opts/2, prepare_insert_queries/2]).
14
15 -spec archive_pm_message(Acc, Params, Extra) -> {ok, Acc} when
16 Acc :: ok,
17 Params :: mod_mam:archive_message_params(),
18 Extra :: gen_hook:extra().
19 archive_pm_message(_Result, #{archive_id := ArcID} = Params, #{host_type := HostType}) ->
20 2209 {ok, mongoose_async_pools:put_task(HostType, pm_mam, ArcID, Params)}.
21
22 -spec mam_archive_sync(Acc, Params, Extra) -> {ok, Acc} when
23 Acc :: ok,
24 Params :: map(),
25 Extra :: gen_hook:extra().
26 mam_archive_sync(Result, _Params, #{host_type := HostType}) ->
27 74 mongoose_async_pools:sync(HostType, pm_mam),
28 74 {ok, Result}.
29
30 %%% gen_mod callbacks
31 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
32 start(HostType, Opts) ->
33 37 {PoolOpts, Extra} = make_pool_opts(pm, Opts),
34 37 prepare_insert_queries(pm, Extra),
35 37 mongoose_async_pools:start_pool(HostType, pm_mam, PoolOpts).
36
37 -spec stop(mongooseim:host_type()) -> any().
38 stop(HostType) ->
39 37 mongoose_async_pools:stop_pool(HostType, pm_mam).
40
41 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
42 hooks(HostType) ->
43 74 [
44 {mam_archive_sync, HostType, fun ?MODULE:mam_archive_sync/3, #{}, 50},
45 {mam_archive_message, HostType, fun ?MODULE:archive_pm_message/3, #{}, 50}
46 ].
47
48 -spec instrumentation(mongooseim:host_type()) -> [mongoose_instrument:spec()].
49 instrumentation(HostType) ->
50 75 [{mod_mam_pm_flushed, #{host_type => HostType},
51 #{metrics => #{time_per_message => histogram, time => histogram, count => spiral}}}].
52
53 -spec supported_features() -> [atom()].
54 supported_features() ->
55 1 [dynamic_domains].
56
57 %%% internal callbacks
58 -spec make_pool_opts(mod_mam:mam_type(), gen_mod:module_opts()) ->
59 {mongoose_async_pools:pool_opts(), mongoose_async_pools:pool_extra()}.
60 make_pool_opts(Type, Opts) ->
61 58 Extra = add_batch_name(Type, Opts),
62 58 PoolOpts = Extra#{pool_type => batch,
63 flush_callback => flush_callback(Type),
64 flush_extra => Extra},
65 58 {PoolOpts, Extra}.
66
67 %% Put batch_size into a statement name, so we could survive the module restarts
68 %% with different batch sizes
69 add_batch_name(pm, #{batch_size := MaxSize} = Opts) ->
70 37 Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)};
71 add_batch_name(muc, #{batch_size := MaxSize} = Opts) ->
72 21 Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}.
73
74 37 flush_callback(pm) -> fun ?MODULE:flush/2;
75 21 flush_callback(muc) -> fun mod_mam_muc_rdbms_arch_async:flush/2.
76
77 prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) ->
78 37 mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1),
79 37 mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize);
80 prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) ->
81 21 mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1),
82 21 mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize).
83
84 multi_name(Name, Times) ->
85 58 list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)).
86
87 %%% flush callbacks
88 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
89 880 mongoose_instrument:span(mod_mam_pm_flushed, #{host_type => HostType},
90 fun do_flush_pm/2, [Acc, Extra],
91 fun(Time, _Result) ->
92 880 #{time => Time,
93 time_per_message => round(Time / MessageCount),
94 count => MessageCount}
95 end).
96
97 %% mam workers callbacks
98 do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount,
99 batch_size := MaxSize, batch_name := BatchName}) ->
100 880 Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
101 880 IsFullBuffer = MessageCount =:= MaxSize,
102 880 case IsFullBuffer of
103 true ->
104 4 Result = mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)),
105 4 process_batch_result(Result, Acc, HostType, MessageCount);
106 false ->
107 876 Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows],
108 876 Process = lists:zip(Results, Acc),
109 876 process_list_results(Process, HostType)
110 end,
111 880 [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
112 880 mongoose_hooks:mam_flush_messages(HostType, MessageCount),
113 880 ok.
114
115 process_batch_result({updated, _Count}, _, _, _) ->
116 4 ok;
117 process_batch_result({error, Reason}, Rows, HostType, MessageCount) ->
118
:-(
mongoose_instrument:execute(mod_mam_pm_dropped, #{host_type => HostType}, #{count => MessageCount}),
119
:-(
Keys = [ maps:with([message_id, archive_id], Row) || Row <- Rows ],
120
:-(
?LOG_ERROR(#{what => archive_message_failed,
121 text => <<"archive_message batch query failed">>,
122
:-(
keys => Keys, message_count => MessageCount, reason => Reason}),
123
:-(
ok.
124
125 process_list_results(Results, HostType) ->
126 876 lists:foreach(fun(R) -> process_single_result(R, HostType) end, Results).
127
128 process_single_result({{updated, _Count}, _}, _HostType) ->
129 2089 ok;
130 process_single_result({{error, Reason}, #{message_id := MsgId, archive_id := ArcId}}, HostType) ->
131
:-(
mongoose_instrument:execute(mod_mam_pm_dropped, #{host_type => HostType}, #{count => 1}),
132
:-(
?LOG_ERROR(#{what => archive_message_failed,
133 text => <<"archive_message batch query failed">>,
134
:-(
message_id => MsgId, archive_id => ArcId, reason => Reason}).
Line Hits Source