./ct_report/coverage/mod_mam_muc_rdbms_arch_async.COVER.html

1 -module(mod_mam_muc_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -define(PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11 -export([start/2, stop/1, supported_features/0]).
12 -export([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]).
13 -ignore_xref([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]).
14
15 -spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok.
16 archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) ->
17
:-(
Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
18
:-(
mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params).
19
20 -spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term().
21 mam_muc_archive_sync(Result, HostType) ->
22
:-(
mongoose_async_pools:sync(HostType, muc_mam),
23
:-(
Result.
24
25 %%% gen_mod callbacks
26 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
27 start(HostType, Opts) ->
28
:-(
{PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts),
29
:-(
mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra),
30
:-(
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
31
:-(
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
32
:-(
ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
33
:-(
ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
34
:-(
mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts).
35
36 -spec stop(mongooseim:host_type()) -> any().
37 stop(HostType) ->
38
:-(
ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
39
:-(
ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
40
:-(
mongoose_async_pools:stop_pool(HostType, muc_mam).
41
42 -spec supported_features() -> [atom()].
43 supported_features() ->
44
:-(
[dynamic_domains].
45
46 %%% flush callbacks
47 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
48
:-(
{FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]),
49
:-(
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
50
:-(
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
51
:-(
Result.
52
53 %% mam workers callbacks
54 do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount,
55 batch_size := MaxSize, batch_name := BatchName}) ->
56
:-(
Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
57
:-(
InsertResult =
58 case MessageCount of
59 MaxSize ->
60
:-(
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
61 OtherSize ->
62
:-(
Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows],
63
:-(
case lists:keyfind(error, 1, Results) of
64
:-(
false -> {updated, OtherSize};
65
:-(
Error -> Error
66 end
67 end,
68
:-(
case InsertResult of
69
:-(
{updated, _Count} -> ok;
70 {error, Reason} ->
71
:-(
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount),
72
:-(
?LOG_ERROR(#{what => archive_message_query_failed,
73 text => <<"archive_message query failed, modMucMamDropped metric updated">>,
74
:-(
message_count => MessageCount, reason => Reason}),
75
:-(
ok
76 end,
77
:-(
[mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
78
:-(
mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount),
79
:-(
ok.
Line Hits Source