./ct_report/coverage/mongoose_async_pools.COVER.html

1 -module(mongoose_async_pools).
2
3 -include("mongoose_config_spec.hrl").
4 -include("mongoose_logger.hrl").
5
6 -behaviour(supervisor).
7 -export([start_link/3, init/1]).
8 -ignore_xref([start_link/3]).
9
10 % API
11 -export([start_pool/3, stop_pool/2, pool_name/2, config_spec/0]).
12 -export([sync/2]).
13
14 -type pool_id() :: atom().
15 -type pool_name() :: atom().
16 -type pool_opts() :: gen_mod:module_opts().
17 -type pool_extra() :: #{host_type := mongooseim:host_type(),
18 queue_length := non_neg_integer(),
19 _ => _}.
20
21 -export_type([pool_id/0, pool_extra/0]).
22
23 %%% API functions
24 -spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) ->
25 supervisor:startchild_ret().
26 start_pool(HostType, PoolId, Opts) ->
27
:-(
?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}),
28
:-(
Supervisor = sup_name(HostType, PoolId),
29
:-(
ChildSpec = #{id => Supervisor,
30 start => {?MODULE, start_link, [HostType, PoolId, Opts]},
31 restart => transient,
32 type => supervisor},
33
:-(
ejabberd_sup:start_child(ChildSpec).
34
35 -spec stop_pool(mongooseim:host_type(), pool_id()) -> ok.
36 stop_pool(HostType, PoolId) ->
37
:-(
?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}),
38
:-(
ejabberd_sup:stop_child(sup_name(HostType, PoolId)).
39
40 -spec config_spec() -> mongoose_config_spec:config_section().
41 config_spec() ->
42 160 #section{
43 items = #{<<"enabled">> => #option{type = boolean},
44 <<"flush_interval">> => #option{type = integer, validate = non_negative},
45 <<"batch_size">> => #option{type = integer, validate = non_negative},
46 <<"pool_size">> => #option{type = integer, validate = non_negative}}
47 }.
48
49 -spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
50 pool_name(HostType, PoolId) ->
51
:-(
persistent_term:get({?MODULE, HostType, PoolId}).
52
53 -spec sync(mongooseim:host_type(), pool_id()) -> term().
54 sync(HostType, PoolId) ->
55
:-(
Pids = get_workers(HostType, PoolId),
56
:-(
Context = #{what => sync_failed, host_type => HostType, pool_id => PoolId},
57
:-(
F = fun(Pid) ->
58
:-(
safely:apply_and_log(gen_server, call, [Pid, sync], Context)
59 end,
60
:-(
Results = mongoose_lib:pmap(F, Pids),
61
:-(
check_results(Results).
62
63 check_results(Results) ->
64
:-(
[check_result(Result) || Result <- Results].
65
66
:-(
check_result({ok, ok}) -> ok;
67
:-(
check_result({ok, skipped}) -> ok;
68
:-(
check_result(Other) -> ?LOG_ERROR(#{what => sync_failed, reason => Other}).
69
70 -spec get_workers(mongooseim:host_type(), pool_id()) -> [atom()].
71 get_workers(HostType, PoolId) ->
72
:-(
Pool = pool_name(HostType, PoolId),
73
:-(
wpool:get_workers(Pool).
74
75 %%% Supervisor callbacks
76 -spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) ->
77 {ok, pid()} | ignore | {error, term()}.
78 start_link(HostType, PoolId, Opts) ->
79
:-(
Supervisor = sup_name(HostType, PoolId),
80
:-(
supervisor:start_link({local, Supervisor}, ?MODULE, [HostType, PoolId, Opts]).
81
82 -spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
83 init([HostType, PoolId, Opts]) ->
84
:-(
PoolName = gen_pool_name(HostType, PoolId),
85
:-(
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter),
86
:-(
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter),
87
:-(
store_pool_name(HostType, PoolId, PoolName),
88
:-(
WPoolOpts = make_wpool_opts(HostType, PoolId, Opts),
89
:-(
WorkerSpec = #{id => PoolName,
90 start => {wpool, start_pool, [PoolName, WPoolOpts]},
91 restart => permanent,
92 type => supervisor},
93
:-(
SupFlags = #{strategy => one_for_one,
94 intensity => 1,
95 period => 5},
96
:-(
{ok, {SupFlags, [WorkerSpec]}}.
97
98 %%% internal callbacks
99 -spec sup_name(mongooseim:host_type(), pool_id()) -> atom().
100 sup_name(HostType, PoolId) ->
101
:-(
list_to_atom(
102 atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)).
103
104 -spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok.
105 store_pool_name(HostType, PoolId, PoolName) ->
106
:-(
persistent_term:put({?MODULE, HostType, PoolId}, PoolName).
107
108 -spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
109 gen_pool_name(HostType, PoolId) ->
110
:-(
list_to_atom(
111 atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)).
112
113 -spec make_wpool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> any().
114 make_wpool_opts(HostType, PoolId, Opts) ->
115
:-(
Interval = gen_mod:get_opt(flush_interval, Opts, 1000),
116
:-(
MaxSize = gen_mod:get_opt(batch_size, Opts, 100),
117
:-(
NumWorkers = gen_mod:get_opt(pool_size, Opts, 4 * erlang:system_info(schedulers_online)),
118
:-(
FlushCallback = gen_mod:get_opt(flush_callback, Opts),
119
:-(
FlushExtra = make_extra(HostType, PoolId, Opts),
120
:-(
ProcessOpts = [{message_queue_data, off_heap}],
121
:-(
WorkerOpts = {HostType, PoolId, Interval, MaxSize, FlushCallback, FlushExtra},
122
:-(
Worker = {mongoose_batch_worker, WorkerOpts},
123
:-(
[{worker, Worker},
124 {workers, NumWorkers},
125 {worker_opt, ProcessOpts},
126 {worker_shutdown, 10000}].
127
128 -spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> pool_extra().
129 make_extra(HostType, PoolId, Opts) ->
130
:-(
DefExtra = #{host_type => HostType, queue_length => 0},
131
:-(
case {gen_mod:get_opt(init_callback, Opts, undefined),
132 gen_mod:get_opt(flush_extra, Opts,
133
:-(
fun(Val) -> maps:merge(Val, DefExtra) end,
134 DefExtra)} of
135 {undefined, Extra} ->
136
:-(
Extra;
137 {InitFun, Extra} when is_function(InitFun, 3) ->
138
:-(
Extra#{init_data => InitFun(HostType, PoolId, Opts)}
139 end.
Line Hits Source