1 |
|
-module(mongoose_async_pools). |
2 |
|
|
3 |
|
-include("mongoose_config_spec.hrl"). |
4 |
|
-include("mongoose_logger.hrl"). |
5 |
|
|
6 |
|
-behaviour(supervisor). |
7 |
|
-export([start_link/3, init/1]). |
8 |
|
-ignore_xref([start_link/3]). |
9 |
|
|
10 |
|
% API |
11 |
|
-export([start_pool/3, stop_pool/2, pool_name/2, config_spec/0]). |
12 |
|
-export([sync/2]). |
13 |
|
|
14 |
|
-type pool_id() :: atom(). |
15 |
|
-type pool_name() :: atom(). |
16 |
|
-type pool_opts() :: gen_mod:module_opts(). |
17 |
|
-type pool_extra() :: #{host_type := mongooseim:host_type(), |
18 |
|
queue_length := non_neg_integer(), |
19 |
|
_ => _}. |
20 |
|
|
21 |
|
-export_type([pool_id/0, pool_extra/0]). |
22 |
|
|
23 |
|
%%% API functions |
24 |
|
-spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) -> |
25 |
|
supervisor:startchild_ret(). |
26 |
|
start_pool(HostType, PoolId, Opts) -> |
27 |
51 |
?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}), |
28 |
51 |
Supervisor = sup_name(HostType, PoolId), |
29 |
51 |
ChildSpec = #{id => Supervisor, |
30 |
|
start => {?MODULE, start_link, [HostType, PoolId, Opts]}, |
31 |
|
restart => transient, |
32 |
|
type => supervisor}, |
33 |
51 |
ejabberd_sup:start_child(ChildSpec). |
34 |
|
|
35 |
|
-spec stop_pool(mongooseim:host_type(), pool_id()) -> ok. |
36 |
|
stop_pool(HostType, PoolId) -> |
37 |
51 |
?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}), |
38 |
51 |
ejabberd_sup:stop_child(sup_name(HostType, PoolId)). |
39 |
|
|
40 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
41 |
|
config_spec() -> |
42 |
146 |
#section{ |
43 |
|
items = #{<<"enabled">> => #option{type = boolean}, |
44 |
|
<<"flush_interval">> => #option{type = integer, validate = non_negative}, |
45 |
|
<<"batch_size">> => #option{type = integer, validate = non_negative}, |
46 |
|
<<"pool_size">> => #option{type = integer, validate = non_negative}} |
47 |
|
}. |
48 |
|
|
49 |
|
-spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). |
50 |
|
pool_name(HostType, PoolId) -> |
51 |
2035 |
persistent_term:get({?MODULE, HostType, PoolId}). |
52 |
|
|
53 |
|
-spec sync(mongooseim:host_type(), pool_id()) -> term(). |
54 |
|
sync(HostType, PoolId) -> |
55 |
105 |
Pids = get_workers(HostType, PoolId), |
56 |
105 |
Context = #{what => sync_failed, host_type => HostType, pool_id => PoolId}, |
57 |
105 |
F = fun(Pid) -> |
58 |
1680 |
safely:apply_and_log(gen_server, call, [Pid, sync], Context) |
59 |
|
end, |
60 |
105 |
Results = mongoose_lib:pmap(F, Pids), |
61 |
105 |
check_results(Results). |
62 |
|
|
63 |
|
check_results(Results) -> |
64 |
105 |
[check_result(Result) || Result <- Results]. |
65 |
|
|
66 |
106 |
check_result({ok, ok}) -> ok; |
67 |
1574 |
check_result({ok, skipped}) -> ok; |
68 |
:-( |
check_result(Other) -> ?LOG_ERROR(#{what => sync_failed, reason => Other}). |
69 |
|
|
70 |
|
-spec get_workers(mongooseim:host_type(), pool_id()) -> [atom()]. |
71 |
|
get_workers(HostType, PoolId) -> |
72 |
105 |
Pool = pool_name(HostType, PoolId), |
73 |
105 |
wpool:get_workers(Pool). |
74 |
|
|
75 |
|
%%% Supervisor callbacks |
76 |
|
-spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) -> |
77 |
|
{ok, pid()} | ignore | {error, term()}. |
78 |
|
start_link(HostType, PoolId, Opts) -> |
79 |
51 |
Supervisor = sup_name(HostType, PoolId), |
80 |
51 |
supervisor:start_link({local, Supervisor}, ?MODULE, [HostType, PoolId, Opts]). |
81 |
|
|
82 |
|
-spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. |
83 |
|
init([HostType, PoolId, Opts]) -> |
84 |
51 |
PoolName = gen_pool_name(HostType, PoolId), |
85 |
51 |
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter), |
86 |
51 |
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter), |
87 |
51 |
store_pool_name(HostType, PoolId, PoolName), |
88 |
51 |
WPoolOpts = make_wpool_opts(HostType, PoolId, Opts), |
89 |
51 |
WorkerSpec = #{id => PoolName, |
90 |
|
start => {wpool, start_pool, [PoolName, WPoolOpts]}, |
91 |
|
restart => permanent, |
92 |
|
type => supervisor}, |
93 |
51 |
SupFlags = #{strategy => one_for_one, |
94 |
|
intensity => 1, |
95 |
|
period => 5}, |
96 |
51 |
{ok, {SupFlags, [WorkerSpec]}}. |
97 |
|
|
98 |
|
%%% internal callbacks |
99 |
|
-spec sup_name(mongooseim:host_type(), pool_id()) -> atom(). |
100 |
|
sup_name(HostType, PoolId) -> |
101 |
153 |
list_to_atom( |
102 |
|
atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)). |
103 |
|
|
104 |
|
-spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok. |
105 |
|
store_pool_name(HostType, PoolId, PoolName) -> |
106 |
51 |
persistent_term:put({?MODULE, HostType, PoolId}, PoolName). |
107 |
|
|
108 |
|
-spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). |
109 |
|
gen_pool_name(HostType, PoolId) -> |
110 |
51 |
list_to_atom( |
111 |
|
atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)). |
112 |
|
|
113 |
|
-spec make_wpool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> any(). |
114 |
|
make_wpool_opts(HostType, PoolId, Opts) -> |
115 |
51 |
Interval = gen_mod:get_opt(flush_interval, Opts, 1000), |
116 |
51 |
MaxSize = gen_mod:get_opt(batch_size, Opts, 100), |
117 |
51 |
NumWorkers = gen_mod:get_opt(pool_size, Opts, 4 * erlang:system_info(schedulers_online)), |
118 |
51 |
FlushCallback = gen_mod:get_opt(flush_callback, Opts), |
119 |
51 |
FlushExtra = make_extra(HostType, PoolId, Opts), |
120 |
51 |
ProcessOpts = [{message_queue_data, off_heap}], |
121 |
51 |
WorkerOpts = {HostType, PoolId, Interval, MaxSize, FlushCallback, FlushExtra}, |
122 |
51 |
Worker = {mongoose_batch_worker, WorkerOpts}, |
123 |
51 |
[{worker, Worker}, |
124 |
|
{workers, NumWorkers}, |
125 |
|
{worker_opt, ProcessOpts}, |
126 |
|
{worker_shutdown, 10000}]. |
127 |
|
|
128 |
|
-spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> pool_extra(). |
129 |
|
make_extra(HostType, PoolId, Opts) -> |
130 |
51 |
DefExtra = #{host_type => HostType, queue_length => 0}, |
131 |
51 |
case {gen_mod:get_opt(init_callback, Opts, undefined), |
132 |
|
gen_mod:get_opt(flush_extra, Opts, |
133 |
51 |
fun(Val) -> maps:merge(Val, DefExtra) end, |
134 |
|
DefExtra)} of |
135 |
|
{undefined, Extra} -> |
136 |
51 |
Extra; |
137 |
|
{InitFun, Extra} when is_function(InitFun, 3) -> |
138 |
:-( |
Extra#{init_data => InitFun(HostType, PoolId, Opts)} |
139 |
|
end. |