./ct_report/coverage/mongoose_async_pools.COVER.html

1 -module(mongoose_async_pools).
2
3 -include("mongoose_config_spec.hrl").
4 -include("mongoose_logger.hrl").
5
6 -behaviour(supervisor).
7 -export([start_link/3, init/1]).
8 -ignore_xref([start_link/3]).
9
10 % API
11 -export([start_pool/3, stop_pool/2, pool_name/2, config_spec/0]).
12 -export([sync/2]).
13
14 -type pool_id() :: atom().
15 -type pool_name() :: atom().
16 -type pool_opts() :: gen_mod:module_opts().
17 -type pool_extra() :: #{host_type := mongooseim:host_type(),
18 queue_length := non_neg_integer(),
19 _ => _}.
20
21 -export_type([pool_id/0, pool_extra/0]).
22
23 %%% API functions
24 -spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) ->
25 supervisor:startchild_ret().
26 start_pool(HostType, PoolId, Opts) ->
27 51 ?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}),
28 51 Supervisor = sup_name(HostType, PoolId),
29 51 ChildSpec = #{id => Supervisor,
30 start => {?MODULE, start_link, [HostType, PoolId, Opts]},
31 restart => transient,
32 type => supervisor},
33 51 ejabberd_sup:start_child(ChildSpec).
34
35 -spec stop_pool(mongooseim:host_type(), pool_id()) -> ok.
36 stop_pool(HostType, PoolId) ->
37 51 ?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}),
38 51 ejabberd_sup:stop_child(sup_name(HostType, PoolId)).
39
40 -spec config_spec() -> mongoose_config_spec:config_section().
41 config_spec() ->
42 146 #section{
43 items = #{<<"enabled">> => #option{type = boolean},
44 <<"flush_interval">> => #option{type = integer, validate = non_negative},
45 <<"batch_size">> => #option{type = integer, validate = non_negative},
46 <<"pool_size">> => #option{type = integer, validate = non_negative}}
47 }.
48
49 -spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
50 pool_name(HostType, PoolId) ->
51 2035 persistent_term:get({?MODULE, HostType, PoolId}).
52
53 -spec sync(mongooseim:host_type(), pool_id()) -> term().
54 sync(HostType, PoolId) ->
55 105 Pids = get_workers(HostType, PoolId),
56 105 Context = #{what => sync_failed, host_type => HostType, pool_id => PoolId},
57 105 F = fun(Pid) ->
58 1680 safely:apply_and_log(gen_server, call, [Pid, sync], Context)
59 end,
60 105 Results = mongoose_lib:pmap(F, Pids),
61 105 check_results(Results).
62
63 check_results(Results) ->
64 105 [check_result(Result) || Result <- Results].
65
66 106 check_result({ok, ok}) -> ok;
67 1574 check_result({ok, skipped}) -> ok;
68
:-(
check_result(Other) -> ?LOG_ERROR(#{what => sync_failed, reason => Other}).
69
70 -spec get_workers(mongooseim:host_type(), pool_id()) -> [atom()].
71 get_workers(HostType, PoolId) ->
72 105 Pool = pool_name(HostType, PoolId),
73 105 wpool:get_workers(Pool).
74
75 %%% Supervisor callbacks
76 -spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) ->
77 {ok, pid()} | ignore | {error, term()}.
78 start_link(HostType, PoolId, Opts) ->
79 51 Supervisor = sup_name(HostType, PoolId),
80 51 supervisor:start_link({local, Supervisor}, ?MODULE, [HostType, PoolId, Opts]).
81
82 -spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
83 init([HostType, PoolId, Opts]) ->
84 51 PoolName = gen_pool_name(HostType, PoolId),
85 51 mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter),
86 51 mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter),
87 51 store_pool_name(HostType, PoolId, PoolName),
88 51 WPoolOpts = make_wpool_opts(HostType, PoolId, Opts),
89 51 WorkerSpec = #{id => PoolName,
90 start => {wpool, start_pool, [PoolName, WPoolOpts]},
91 restart => permanent,
92 type => supervisor},
93 51 SupFlags = #{strategy => one_for_one,
94 intensity => 1,
95 period => 5},
96 51 {ok, {SupFlags, [WorkerSpec]}}.
97
98 %%% internal callbacks
99 -spec sup_name(mongooseim:host_type(), pool_id()) -> atom().
100 sup_name(HostType, PoolId) ->
101 153 list_to_atom(
102 atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)).
103
104 -spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok.
105 store_pool_name(HostType, PoolId, PoolName) ->
106 51 persistent_term:put({?MODULE, HostType, PoolId}, PoolName).
107
108 -spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
109 gen_pool_name(HostType, PoolId) ->
110 51 list_to_atom(
111 atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)).
112
113 -spec make_wpool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> any().
114 make_wpool_opts(HostType, PoolId, Opts) ->
115 51 Interval = gen_mod:get_opt(flush_interval, Opts, 1000),
116 51 MaxSize = gen_mod:get_opt(batch_size, Opts, 100),
117 51 NumWorkers = gen_mod:get_opt(pool_size, Opts, 4 * erlang:system_info(schedulers_online)),
118 51 FlushCallback = gen_mod:get_opt(flush_callback, Opts),
119 51 FlushExtra = make_extra(HostType, PoolId, Opts),
120 51 ProcessOpts = [{message_queue_data, off_heap}],
121 51 WorkerOpts = {HostType, PoolId, Interval, MaxSize, FlushCallback, FlushExtra},
122 51 Worker = {mongoose_batch_worker, WorkerOpts},
123 51 [{worker, Worker},
124 {workers, NumWorkers},
125 {worker_opt, ProcessOpts},
126 {worker_shutdown, 10000}].
127
128 -spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> pool_extra().
129 make_extra(HostType, PoolId, Opts) ->
130 51 DefExtra = #{host_type => HostType, queue_length => 0},
131 51 case {gen_mod:get_opt(init_callback, Opts, undefined),
132 gen_mod:get_opt(flush_extra, Opts,
133 51 fun(Val) -> maps:merge(Val, DefExtra) end,
134 DefExtra)} of
135 {undefined, Extra} ->
136 51 Extra;
137 {InitFun, Extra} when is_function(InitFun, 3) ->
138
:-(
Extra#{init_data => InitFun(HostType, PoolId, Opts)}
139 end.
Line Hits Source