./ct_report/coverage/mod_mam_muc_rdbms_arch_async.COVER.html

1 -module(mod_mam_muc_rdbms_arch_async).
2
3 -behaviour(mongoose_batch_worker).
4
5 -include("mongoose_logger.hrl").
6
7 -define(PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]).
8 -define(FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]).
9
10 -behaviour(gen_mod).
11 -export([start/2, stop/1, supported_features/0]).
12 -export([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]).
13 -ignore_xref([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]).
14
15 -spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok.
16 archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) ->
17 416 Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0),
18 416 mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params).
19
20 -spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term().
21 mam_muc_archive_sync(Result, HostType) ->
22 58 mongoose_async_pools:sync(HostType, muc_mam),
23 58 Result.
24
25 %%% gen_mod callbacks
26 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
27 start(HostType, Opts) ->
28 18 {PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts),
29 18 mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra),
30 18 mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram),
31 18 mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram),
32 18 ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
33 18 ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
34 18 mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts).
35
36 -spec stop(mongooseim:host_type()) -> any().
37 stop(HostType) ->
38 18 ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50),
39 18 ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50),
40 18 mongoose_async_pools:stop_pool(HostType, muc_mam).
41
42 -spec supported_features() -> [atom()].
43 supported_features() ->
44 18 [dynamic_domains].
45
46 %%% flush callbacks
47 flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) ->
48 172 {FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]),
49 172 mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)),
50 172 mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime),
51 172 Result.
52
53 %% mam workers callbacks
54 do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount,
55 batch_size := MaxSize, batch_name := BatchName}) ->
56 172 Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc],
57 172 InsertResult =
58 case MessageCount of
59 MaxSize ->
60
:-(
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows));
61 OtherSize ->
62 172 Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows],
63 172 case lists:keyfind(error, 1, Results) of
64 172 false -> {updated, OtherSize};
65
:-(
Error -> Error
66 end
67 end,
68 172 case InsertResult of
69 172 {updated, _Count} -> ok;
70 {error, Reason} ->
71
:-(
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount),
72
:-(
?LOG_ERROR(#{what => archive_message_query_failed,
73 text => <<"archive_message query failed, modMucMamDropped metric updated">>,
74
:-(
message_count => MessageCount, reason => Reason}),
75
:-(
ok
76 end,
77 172 [mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc],
78 172 mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount),
79 172 ok.
Line Hits Source