./ct_report/coverage/mod_mam_rdbms_arch_async.COVER.html

1 -module(mod_mam_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -define(PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11
12 -export([start/2, stop/1, hooks/1, supported_features/0]).
13 -export([archive_pm_message/3, mam_archive_sync/3]).
14 -export([flush/2]).
15
16 -export([make_pool_opts/2, prepare_insert_queries/2]).
17
18 -spec archive_pm_message(Acc, Params, Extra) -> {ok, Acc} when
19 Acc :: ok,
20 Params :: mod_mam:archive_message_params(),
21 Extra :: gen_hook:extra().
22 archive_pm_message(_Result, #{archive_id := ArcID} = Params, #{host_type := HostType}) ->
23 2215 {ok, mongoose_async_pools:put_task(HostType, pm_mam, ArcID, Params)}.
24
25 -spec mam_archive_sync(Acc, Params, Extra) -> {ok, Acc} when
26 Acc :: ok,
27 Params :: map(),
28 Extra :: gen_hook:extra().
29 mam_archive_sync(Result, _Params, #{host_type := HostType}) ->
30 74 mongoose_async_pools:sync(HostType, pm_mam),
31 74 {ok, Result}.
32
33 %%% gen_mod callbacks
34 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
35 start(HostType, Opts) ->
36 35 {PoolOpts, Extra} = make_pool_opts(pm, Opts),
37 35 prepare_insert_queries(pm, Extra),
38 35 mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
39 35 mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
40 35 mongoose_async_pools:start_pool(HostType, pm_mam, PoolOpts).
41
42 -spec stop(mongooseim:host_type()) -> any().
43 stop(HostType) ->
44 35 mongoose_async_pools:stop_pool(HostType, pm_mam).
45
46 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
47 hooks(HostType) ->
48 70 [
49 {mam_archive_sync, HostType, fun ?MODULE:mam_archive_sync/3, #{}, 50},
50 {mam_archive_message, HostType, fun ?MODULE:archive_pm_message/3, #{}, 50}
51 ].
52
53 -spec supported_features() -> [atom()].
54 supported_features() ->
55 35 [dynamic_domains].
56
57 %%% internal callbacks
58 -spec make_pool_opts(mod_mam:mam_type(), gen_mod:module_opts()) ->
59 {mongoose_async_pools:pool_opts(), mongoose_async_pools:pool_extra()}.
60 make_pool_opts(Type, Opts) ->
61 55 Extra = add_batch_name(Type, Opts),
62 55 PoolOpts = Extra#{pool_type => batch,
63 flush_callback => flush_callback(Type),
64 flush_extra => Extra},
65 55 {PoolOpts, Extra}.
66
67 %% Put batch_size into a statement name, so we could survive the module restarts
68 %% with different batch sizes
69 add_batch_name(pm, #{batch_size := MaxSize} = Opts) ->
70 35 Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)};
71 add_batch_name(muc, #{batch_size := MaxSize} = Opts) ->
72 20 Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}.
73
74 35 flush_callback(pm) -> fun ?MODULE:flush/2;
75 20 flush_callback(muc) -> fun mod_mam_muc_rdbms_arch_async:flush/2.
76
77 prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) ->
78 35 mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1),
79 35 mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize);
80 prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) ->
81 20 mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1),
82 20 mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize).
83
84 multi_name(Name, Times) ->
85 55 list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)).
86
87 %%% flush callbacks
88 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
89 799 {FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]),
90 799 mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
91 799 mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
92 799 Result.
93
94 %% mam workers callbacks
95 do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount,
96 batch_size := MaxSize, batch_name := BatchName}) ->
97 799 Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
98 799 IsFullBuffer = MessageCount =:= MaxSize,
99 799 case IsFullBuffer of
100 true ->
101 6 Result = mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)),
102 6 process_batch_result(Result, Acc, HostType, MessageCount);
103 false ->
104 793 Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows],
105 793 Process = lists:zip(Results, Acc),
106 793 process_list_results(Process, HostType)
107 end,
108 799 [mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
109 799 mongoose_hooks:mam_flush_messages(HostType, MessageCount),
110 799 ok.
111
112 process_batch_result({updated, _Count}, _, _, _) ->
113 6 ok;
114 process_batch_result({error, Reason}, Rows, HostType, MessageCount) ->
115
:-(
mongoose_metrics:update(HostType, modMamDropped, MessageCount),
116
:-(
Keys = [ maps:with([message_id, archive_id], Row) || Row <- Rows ],
117
:-(
?LOG_ERROR(#{what => archive_message_failed,
118 text => <<"archive_message batch query failed">>,
119
:-(
keys => Keys, message_count => MessageCount, reason => Reason}),
120
:-(
ok.
121
122 process_list_results(Results, HostType) ->
123 793 lists:foreach(fun(R) -> process_single_result(R, HostType) end, Results).
124
125 process_single_result({{updated, _Count}, _}, _HostType) ->
126 2035 ok;
127 process_single_result({{error, Reason}, #{message_id := MsgId, archive_id := ArcId}}, HostType) ->
128
:-(
mongoose_metrics:update(HostType, modMamDropped, 1),
129
:-(
?LOG_ERROR(#{what => archive_message_failed,
130 text => <<"archive_message batch query failed">>,
131
:-(
message_id => MsgId, archive_id => ArcId, reason => Reason}).
Line Hits Source