1 |
|
-module(mod_mam_rdbms_arch_async). |
2 |
|
|
3 |
|
-behaviour(mongoose_batch_worker). |
4 |
|
|
5 |
|
-include("mongoose_logger.hrl"). |
6 |
|
|
7 |
|
-define(PER_MESSAGE_FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, per_message_flush_time]). |
8 |
|
-define(FLUSH_TIME, [mod_mam_rdbms_async_pool_writer, flush_time]). |
9 |
|
|
10 |
|
-behaviour(gen_mod). |
11 |
|
-export([start/2, stop/1, supported_features/0]). |
12 |
|
-export([archive_pm_message/3, mam_archive_sync/2, flush/2]). |
13 |
|
-ignore_xref([archive_pm_message/3, mam_archive_sync/2]). |
14 |
|
|
15 |
|
-export([make_pool_opts/2, prepare_insert_queries/2]). |
16 |
|
|
17 |
|
-spec archive_pm_message(_Result, mongooseim:host_type(), mod_mam:archive_message_params()) -> ok. |
18 |
|
archive_pm_message(_Result, HostType, Params = #{archive_id := ArcID}) -> |
19 |
:-( |
mongoose_async_pools:put_task(HostType, pm_mam, ArcID, Params). |
20 |
|
|
21 |
|
-spec mam_archive_sync(term(), mongooseim:host_type()) -> term(). |
22 |
|
mam_archive_sync(Result, HostType) -> |
23 |
:-( |
mongoose_async_pools:sync(HostType, pm_mam), |
24 |
:-( |
Result. |
25 |
|
|
26 |
|
%%% gen_mod callbacks |
27 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
28 |
|
start(HostType, Opts) -> |
29 |
:-( |
{PoolOpts, Extra} = make_pool_opts(pm, Opts), |
30 |
:-( |
prepare_insert_queries(pm, Extra), |
31 |
:-( |
mongoose_metrics:ensure_metric(HostType, ?PER_MESSAGE_FLUSH_TIME, histogram), |
32 |
:-( |
mongoose_metrics:ensure_metric(HostType, ?FLUSH_TIME, histogram), |
33 |
:-( |
ejabberd_hooks:add(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50), |
34 |
:-( |
ejabberd_hooks:add(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50), |
35 |
:-( |
mongoose_async_pools:start_pool(HostType, pm_mam, PoolOpts). |
36 |
|
|
37 |
|
-spec stop(mongooseim:host_type()) -> any(). |
38 |
|
stop(HostType) -> |
39 |
:-( |
ejabberd_hooks:delete(mam_archive_message, HostType, ?MODULE, archive_pm_message, 50), |
40 |
:-( |
ejabberd_hooks:delete(mam_archive_sync, HostType, ?MODULE, mam_archive_sync, 50), |
41 |
:-( |
mongoose_async_pools:stop_pool(HostType, pm_mam). |
42 |
|
|
43 |
|
-spec supported_features() -> [atom()]. |
44 |
|
supported_features() -> |
45 |
:-( |
[dynamic_domains]. |
46 |
|
|
47 |
|
%%% internal callbacks |
48 |
|
-spec make_pool_opts(mod_mam:mam_type(), gen_mod:module_opts()) -> |
49 |
|
{mongoose_async_pools:pool_opts(), mongoose_async_pools:pool_extra()}. |
50 |
|
make_pool_opts(Type, Opts) -> |
51 |
:-( |
Extra = add_batch_name(Type, Opts), |
52 |
:-( |
PoolOpts = Extra#{pool_type => batch, |
53 |
|
flush_callback => flush_callback(Type), |
54 |
|
flush_extra => Extra}, |
55 |
:-( |
{PoolOpts, Extra}. |
56 |
|
|
57 |
|
%% Put batch_size into a statement name, so we could survive the module restarts |
58 |
|
%% with different batch sizes |
59 |
|
add_batch_name(pm, #{batch_size := MaxSize} = Opts) -> |
60 |
:-( |
Opts#{batch_name => multi_name(insert_mam_messages, MaxSize)}; |
61 |
|
add_batch_name(muc, #{batch_size := MaxSize} = Opts) -> |
62 |
:-( |
Opts#{batch_name => multi_name(insert_mam_muc_messages, MaxSize)}. |
63 |
|
|
64 |
:-( |
flush_callback(pm) -> fun ?MODULE:flush/2; |
65 |
:-( |
flush_callback(muc) -> fun mod_mam_muc_rdbms_arch_async:flush/2. |
66 |
|
|
67 |
|
prepare_insert_queries(pm, #{batch_size := MaxSize, batch_name := BatchName}) -> |
68 |
:-( |
mod_mam_rdbms_arch:prepare_insert(insert_mam_message, 1), |
69 |
:-( |
mod_mam_rdbms_arch:prepare_insert(BatchName, MaxSize); |
70 |
|
prepare_insert_queries(muc, #{batch_size := MaxSize, batch_name := BatchName}) -> |
71 |
:-( |
mod_mam_muc_rdbms_arch:prepare_insert(insert_mam_muc_message, 1), |
72 |
:-( |
mod_mam_muc_rdbms_arch:prepare_insert(BatchName, MaxSize). |
73 |
|
|
74 |
|
multi_name(Name, Times) -> |
75 |
:-( |
list_to_atom(atom_to_list(Name) ++ integer_to_list(Times)). |
76 |
|
|
77 |
|
%%% flush callbacks |
78 |
|
flush(Acc, Extra = #{host_type := HostType, queue_length := MessageCount}) -> |
79 |
:-( |
{FlushTime, Result} = timer:tc(fun do_flush_pm/2, [Acc, Extra]), |
80 |
:-( |
mongoose_metrics:update(HostType, ?PER_MESSAGE_FLUSH_TIME, round(FlushTime / MessageCount)), |
81 |
:-( |
mongoose_metrics:update(HostType, ?FLUSH_TIME, FlushTime), |
82 |
:-( |
Result. |
83 |
|
|
84 |
|
%% mam workers callbacks |
85 |
|
do_flush_pm(Acc, #{host_type := HostType, queue_length := MessageCount, |
86 |
|
batch_size := MaxSize, batch_name := BatchName}) -> |
87 |
:-( |
Rows = [mod_mam_rdbms_arch:prepare_message(HostType, Params) || Params <- Acc], |
88 |
:-( |
InsertResult = |
89 |
|
case MessageCount of |
90 |
|
MaxSize -> |
91 |
:-( |
mongoose_rdbms:execute(HostType, BatchName, lists:append(Rows)); |
92 |
|
OtherSize -> |
93 |
:-( |
Results = [mongoose_rdbms:execute(HostType, insert_mam_message, Row) || Row <- Rows], |
94 |
:-( |
case lists:keyfind(error, 1, Results) of |
95 |
:-( |
false -> {updated, OtherSize}; |
96 |
:-( |
Error -> Error |
97 |
|
end |
98 |
|
end, |
99 |
:-( |
case InsertResult of |
100 |
:-( |
{updated, _Count} -> ok; |
101 |
|
{error, Reason} -> |
102 |
:-( |
mongoose_metrics:update(HostType, modMamDropped, MessageCount), |
103 |
:-( |
?LOG_ERROR(#{what => archive_message_failed, |
104 |
|
text => <<"archive_message query failed">>, |
105 |
:-( |
message_count => MessageCount, reason => Reason}), |
106 |
:-( |
ok |
107 |
|
end, |
108 |
:-( |
[mod_mam_rdbms_arch:retract_message(HostType, Params) || Params <- Acc], |
109 |
:-( |
mongoose_hooks:mam_flush_messages(HostType, MessageCount), |
110 |
:-( |
ok. |