1 |
|
-module(mod_mam_muc_rdbms_arch_async). |
2 |
|
|
3 |
|
-behaviour(mongoose_batch_worker). |
4 |
|
|
5 |
|
-include("mongoose_logger.hrl"). |
6 |
|
|
7 |
|
-define(PER_MESSAGE_FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, per_message_flush_time]). |
8 |
|
-define(FLUSH_TIME, [mod_mam_muc_rdbms_async_pool_writer, flush_time]). |
9 |
|
|
10 |
|
-behaviour(gen_mod). |
11 |
|
-export([start/2, stop/1, supported_features/0]). |
12 |
|
-export([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]). |
13 |
|
-ignore_xref([archive_muc_message/3, mam_muc_archive_sync/2, flush/2]). |
14 |
|
|
15 |
|
-spec archive_muc_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok. |
16 |
|
archive_muc_message(_Result, HostType, Params0 = #{archive_id := RoomID}) -> |
17 |
416 |
Params = mod_mam_muc_rdbms_arch:extend_params_with_sender_id(HostType, Params0), |
18 |
416 |
mongoose_async_pools:put_task(HostType, muc_mam, RoomID, Params). |
19 |
|
|
20 |
|
-spec mam_muc_archive_sync(term(), mongooseim:host_type()) -> term(). |
21 |
|
mam_muc_archive_sync(Result, HostType) -> |
22 |
58 |
mongoose_async_pools:sync(HostType, muc_mam), |
23 |
58 |
Result. |
24 |
|
|
25 |
|
%%% gen_mod callbacks |
26 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
27 |
|
start(HostType, Opts) -> |
28 |
18 |
{PoolOpts, Extra} = mod_mam_rdbms_arch_async:make_pool_opts(muc, Opts), |
29 |
18 |
mod_mam_rdbms_arch_async:prepare_insert_queries(muc, Extra), |
30 |
18 |
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram), |
31 |
18 |
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram), |
32 |
18 |
ejabberd_hooks:add(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50), |
33 |
18 |
ejabberd_hooks:add(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50), |
34 |
18 |
mongoose_async_pools:start_pool(HostType, muc_mam, PoolOpts). |
35 |
|
|
36 |
|
-spec stop(mongooseim:host_type()) -> any(). |
37 |
|
stop(HostType) -> |
38 |
18 |
ejabberd_hooks:delete(mam_muc_archive_sync, HostType, ?MODULE, mam_muc_archive_sync, 50), |
39 |
18 |
ejabberd_hooks:delete(mam_muc_archive_message, HostType, ?MODULE, archive_muc_message, 50), |
40 |
18 |
mongoose_async_pools:stop_pool(HostType, muc_mam). |
41 |
|
|
42 |
|
-spec supported_features() -> [atom()]. |
43 |
|
supported_features() -> |
44 |
18 |
[dynamic_domains]. |
45 |
|
|
46 |
|
%%% flush callbacks |
47 |
|
flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) -> |
48 |
172 |
{FlushTime, Result} = timer:tc(fun do_flush_muc/2, [Acc, Extra]), |
49 |
172 |
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), |
50 |
172 |
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime), |
51 |
172 |
Result. |
52 |
|
|
53 |
|
%% mam workers callbacks |
54 |
|
do_flush_muc(Acc, #{host_type := HostType, queue_length := MessageCount, |
55 |
|
batch_size := MaxSize, batch_name := BatchName}) -> |
56 |
172 |
Rows = [mod_mam_muc_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], |
57 |
172 |
InsertResult = |
58 |
|
case MessageCount of |
59 |
|
MaxSize -> |
60 |
:-( |
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)); |
61 |
|
OtherSize -> |
62 |
172 |
Results = [mongoose_rdbms:execute(HostType, insert_mam_muc_message, Row) || Row <- Rows], |
63 |
172 |
case lists:keyfind(error, 1, Results) of |
64 |
172 |
false -> {updated, OtherSize}; |
65 |
:-( |
Error -> Error |
66 |
|
end |
67 |
|
end, |
68 |
172 |
case InsertResult of |
69 |
172 |
{updated, _Count} -> ok; |
70 |
|
{error, Reason} -> |
71 |
:-( |
mongoose_metrics:update(HostType, modMucMamDropped, MessageCount), |
72 |
:-( |
?LOG_ERROR(#{what => archive_message_query_failed, |
73 |
|
text => <<"archive_message query failed, modMucMamDropped metric updated">>, |
74 |
:-( |
message_count => MessageCount, reason => Reason}), |
75 |
:-( |
ok |
76 |
|
end, |
77 |
172 |
[mod_mam_muc_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], |
78 |
172 |
mongoose_hooks:mam_muc_flush_messages(HostType, MessageCount), |
79 |
172 |
ok. |