./ct_report/coverage/mod_mam_rdbms_arch_async.COVER.html

1 -module(mod_mam_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -define(PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11 -export([start/2, stop/1, supported_features/0]).
12 -export([archive_pm_message/3, mam_archive_sync/2, flush/2]).
13 -ignore_xref([archive_pm_message/3, mam_archive_sync/2]).
14
15 -export([make_pool_opts/2, prepare_insert_queries/2]).
16
17 -spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok.
18 archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) ->
19
:-(
mongoose_async_pools:put_task(HostType, pm_mam, ArcID, Params).
20
21 -spec mam_archive_sync(term(), mongooseim:host_type()) -> term().
22 mam_archive_sync(Result, HostType) ->
23
:-(
mongoose_async_pools:sync(HostType, pm_mam),
24
:-(
Result.
25
26 %%% gen_mod callbacks
27 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
28 start(HostType, Opts) ->
29
:-(
{PoolOpts, Extra} = make_pool_opts(pm, Opts),
30
:-(
prepare_insert_queries(pm, Extra),
31
:-(
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
32
:-(
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
33
:-(
ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
34
:-(
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
35
:-(
mongoose_async_pools:start_pool(HostType, pm_mam, PoolOpts).
36
37 -spec stop(mongooseim:host_type()) -> any().
38 stop(HostType) ->
39
:-(
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50),
40
:-(
ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50),
41
:-(
mongoose_async_pools:stop_pool(HostType, pm_mam).
42
43 -spec supported_features() -> [atom()].
44 supported_features() ->
45
:-(
[dynamic_domains].
46
47 %%% internal callbacks
48 -spec make_pool_opts(mod_mam_meta:mam_type(), gen_mod:module_opts()) ->
49 {mongoose_async_pools:pool_opts(), mongoose_async_pools:pool_extra()}.
50 make_pool_opts(Type, Opts) ->
51
:-(
Extra = add_batch_name(Type, Opts),
52
:-(
PoolOpts = Extra#{pool_type => batch,
53 flush_callback => flush_callback(Type),
54 flush_extra => Extra},
55
:-(
{PoolOpts, Extra}.
56
57 %% Put batch_size into a statement name, so we could survive the module restarts
58 %% with different batch sizes
59 add_batch_name(pm, #{batch_size := MaxSize} = Opts) ->
60
:-(
Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)};
61 add_batch_name(muc, #{batch_size := MaxSize} = Opts) ->
62
:-(
Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}.
63
64
:-(
flush_callback(pm) -> fun ?MODULE:flush/2;
65
:-(
flush_callback(muc) -> fun mod_mam_muc_rdbms_arch_async:flush/2.
66
67 prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) ->
68
:-(
mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1),
69
:-(
mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize);
70 prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) ->
71
:-(
mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1),
72
:-(
mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize).
73
74 multi_name(Name, Times) ->
75
:-(
list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)).
76
77 %%% flush callbacks
78 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
79
:-(
{FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]),
80
:-(
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
81
:-(
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
82
:-(
Result.
83
84 %% mam workers callbacks
85 do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount,
86 batch_size := MaxSize, batch_name := BatchName}) ->
87
:-(
Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
88
:-(
InsertResult =
89 case MessageCount of
90 MaxSize ->
91
:-(
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
92 OtherSize ->
93
:-(
Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows],
94
:-(
case lists:keyfind(error, 1, Results) of
95
:-(
false -> {updated, OtherSize};
96
:-(
Error -> Error
97 end
98 end,
99
:-(
case InsertResult of
100
:-(
{updated, _Count} -> ok;
101 {error, Reason} ->
102
:-(
mongoose_metrics:update(HostType, modMamDropped, MessageCount),
103
:-(
?LOG_ERROR(#{what => archive_message_failed,
104 text => <<"archive_message query failed">>,
105
:-(
message_count => MessageCount, reason => Reason}),
106
:-(
ok
107 end,
108
:-(
[mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
109
:-(
mongoose_hooks:mam_flush_messages(HostType, MessageCount),
110
:-(
ok.
Line Hits Source