1 |
|
-module(mongoose_async_pools). |
2 |
|
|
3 |
|
-include("mongoose_logger.hrl"). |
4 |
|
|
5 |
|
-behaviour(supervisor). |
6 |
|
-export([start_link/3, init/1]). |
7 |
|
-ignore_xref([start_link/3]). |
8 |
|
|
9 |
|
% API |
10 |
|
-export([start_pool/3, stop_pool/2]). |
11 |
|
-export([put_task/3, put_task/4, broadcast/3, broadcast_task/4]). |
12 |
|
-ignore_xref([put_task/3, broadcast/3, broadcast_task/4]). |
13 |
|
-export([sync/2]). |
14 |
|
|
15 |
|
-type task() :: term(). |
16 |
|
-type pool_id() :: atom(). % The subsystem, like 'pm_mam', or 'inbox' |
17 |
|
-type pool_name() :: atom(). % The pool name, like 'inbox_sup_async_pool_localhost' |
18 |
|
-type pool_type() :: batch | aggregate. |
19 |
|
-type pool_opts() :: #{pool_type := pool_type(), |
20 |
|
_ => _}. |
21 |
|
-type pool_extra() :: #{host_type := mongooseim:host_type(), |
22 |
|
queue_length => non_neg_integer(), |
23 |
|
_ => _}. |
24 |
|
|
25 |
|
-type flush_callback() :: |
26 |
|
fun(([task()], pool_extra()) -> |
27 |
|
ok | {error, term()}). |
28 |
|
-type prep_callback() :: |
29 |
|
fun((task(), pool_extra()) -> |
30 |
|
{ok, task()} | {error, term()}). |
31 |
|
-type aggregate_callback() :: |
32 |
|
fun((task(), task(), pool_extra()) -> |
33 |
|
{ok, task()} | {error, term()}). |
34 |
|
-type request_callback() :: |
35 |
|
fun((task() | [task()], pool_extra()) -> |
36 |
|
term()). |
37 |
|
-type verify_callback() :: |
38 |
|
fun((term(), task(), pool_extra()) -> term()). |
39 |
|
|
40 |
|
-export_type([flush_callback/0, |
41 |
|
prep_callback/0, |
42 |
|
aggregate_callback/0, |
43 |
|
request_callback/0, |
44 |
|
verify_callback/0]). |
45 |
|
|
46 |
|
-export_type([task/0, pool_id/0, pool_opts/0, pool_extra/0]). |
47 |
|
|
48 |
|
-spec put_task(mongooseim:host_type(), pool_id(), term()) -> ok. |
49 |
|
put_task(HostType, PoolId, Task) -> |
50 |
:-( |
PoolName = pool_name(HostType, PoolId), |
51 |
:-( |
wpool:cast(PoolName, {task, Task}, best_worker). |
52 |
|
|
53 |
|
-spec put_task(mongooseim:host_type(), pool_id(), term(), term()) -> ok. |
54 |
|
put_task(HostType, PoolId, Key, Task) -> |
55 |
3629 |
PoolName = pool_name(HostType, PoolId), |
56 |
3629 |
wpool:cast(PoolName, {task, Key, Task}, {hash_worker, Key}). |
57 |
|
|
58 |
|
-spec broadcast(mongooseim:host_type(), pool_id(), term()) -> ok. |
59 |
|
broadcast(HostType, PoolId, Task) -> |
60 |
:-( |
PoolName = pool_name(HostType, PoolId), |
61 |
:-( |
wpool:broadcast(PoolName, {broadcast, Task}). |
62 |
|
|
63 |
|
-spec broadcast_task(mongooseim:host_type(), pool_id(), term(), term()) -> ok. |
64 |
|
broadcast_task(HostType, PoolId, Key, Task) -> |
65 |
:-( |
PoolName = pool_name(HostType, PoolId), |
66 |
:-( |
wpool:broadcast(PoolName, {task, Key, Task}). |
67 |
|
|
68 |
|
%%% API functions |
69 |
|
-spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) -> |
70 |
|
supervisor:startchild_ret(). |
71 |
|
start_pool(HostType, PoolId, PoolOpts) -> |
72 |
78 |
?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}), |
73 |
78 |
Supervisor = sup_name(HostType, PoolId), |
74 |
78 |
ChildSpec = #{id => Supervisor, |
75 |
|
start => {?MODULE, start_link, [HostType, PoolId, PoolOpts]}, |
76 |
|
restart => transient, |
77 |
|
type => supervisor}, |
78 |
78 |
ejabberd_sup:start_child(ChildSpec). |
79 |
|
|
80 |
|
-spec stop_pool(mongooseim:host_type(), pool_id()) -> ok. |
81 |
|
stop_pool(HostType, PoolId) -> |
82 |
78 |
?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}), |
83 |
78 |
ejabberd_sup:stop_child(sup_name(HostType, PoolId)). |
84 |
|
|
85 |
|
-spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). |
86 |
|
pool_name(HostType, PoolId) -> |
87 |
3855 |
persistent_term:get({?MODULE, HostType, PoolId}). |
88 |
|
|
89 |
|
-spec sync(mongooseim:host_type(), pool_id()) -> term(). |
90 |
|
sync(HostType, PoolId) -> |
91 |
226 |
Pool = pool_name(HostType, PoolId), |
92 |
226 |
WorkerNames = wpool:get_workers(Pool), |
93 |
226 |
Context = #{what => sync_failed, host_type => HostType, pool_id => PoolId}, |
94 |
226 |
F = fun(Pid) -> safely:apply_and_log(gen_server, call, [Pid, sync], Context) end, |
95 |
226 |
Results = mongoose_lib:pmap(F, WorkerNames), |
96 |
226 |
[check_result(Result) || Result <- Results]. |
97 |
|
|
98 |
250 |
check_result({ok, ok}) -> ok; |
99 |
3366 |
check_result({ok, skipped}) -> ok; |
100 |
:-( |
check_result(Other) -> ?LOG_ERROR(#{what => sync_failed, reason => Other}). |
101 |
|
|
102 |
|
%%% Supervisor callbacks |
103 |
|
-spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) -> |
104 |
|
{ok, pid()} | ignore | {error, term()}. |
105 |
|
start_link(HostType, PoolId, PoolOpts) -> |
106 |
78 |
Supervisor = sup_name(HostType, PoolId), |
107 |
78 |
supervisor:start_link({local, Supervisor}, ?MODULE, {HostType, PoolId, PoolOpts}). |
108 |
|
|
109 |
|
-spec init({mongooseim:host_type(), pool_id(), pool_opts()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. |
110 |
|
init({HostType, PoolId, PoolOpts}) -> |
111 |
78 |
WPoolOpts = process_pool_opts(HostType, PoolId, PoolOpts), |
112 |
78 |
PoolName = gen_pool_name(HostType, PoolId), |
113 |
78 |
store_pool_name(HostType, PoolId, PoolName), |
114 |
78 |
WorkerSpec = #{id => PoolName, |
115 |
|
start => {wpool, start_pool, [PoolName, WPoolOpts]}, |
116 |
|
restart => permanent, |
117 |
|
type => supervisor}, |
118 |
78 |
SupFlags = #{strategy => one_for_one, |
119 |
|
intensity => 1, |
120 |
|
period => 5}, |
121 |
78 |
{ok, {SupFlags, [WorkerSpec]}}. |
122 |
|
|
123 |
|
%%% internal callbacks |
124 |
|
-spec sup_name(mongooseim:host_type(), pool_id()) -> atom(). |
125 |
|
sup_name(HostType, PoolId) -> |
126 |
234 |
list_to_atom( |
127 |
|
atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)). |
128 |
|
|
129 |
|
-spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok. |
130 |
|
store_pool_name(HostType, PoolId, PoolName) -> |
131 |
78 |
persistent_term:put({?MODULE, HostType, PoolId}, PoolName). |
132 |
|
|
133 |
|
-spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name(). |
134 |
|
gen_pool_name(HostType, PoolId) -> |
135 |
78 |
list_to_atom( |
136 |
|
atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)). |
137 |
|
|
138 |
|
-spec process_pool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> [wpool:option()]. |
139 |
|
process_pool_opts(HostType, PoolId, #{pool_size := NumWorkers} = Opts) -> |
140 |
78 |
WorkerModule = select_worker_module(HostType, PoolId, Opts), |
141 |
78 |
WorkerOpts = make_worker_opts(HostType, PoolId, Opts), |
142 |
78 |
Worker = {WorkerModule, WorkerOpts}, |
143 |
78 |
[{worker, Worker}, |
144 |
|
{workers, NumWorkers}, |
145 |
|
{worker_opt, [{spawn_opt, [{message_queue_data, off_heap}]}]}, |
146 |
|
{worker_shutdown, 10000}]. |
147 |
|
|
148 |
|
select_worker_module(HostType, PoolId, #{pool_type := batch}) -> |
149 |
53 |
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter), |
150 |
53 |
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter), |
151 |
53 |
mongoose_batch_worker; |
152 |
|
select_worker_module(HostType, PoolId, #{pool_type := aggregate}) -> |
153 |
25 |
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, async_request], counter), |
154 |
25 |
mongoose_aggregator_worker. |
155 |
|
|
156 |
|
-spec make_worker_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> map(). |
157 |
|
make_worker_opts(HostType, PoolId, Opts) -> |
158 |
78 |
Opts#{host_type => HostType, pool_id => PoolId, |
159 |
|
flush_extra => make_extra(HostType, PoolId, Opts)}. |
160 |
|
|
161 |
|
-spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> pool_extra(). |
162 |
|
make_extra(HostType, PoolId, Opts) -> |
163 |
78 |
DefExtra = case maps:get(pool_type, Opts) of |
164 |
53 |
batch -> #{host_type => HostType, queue_length => 0}; |
165 |
25 |
aggregate -> #{host_type => HostType} |
166 |
|
end, |
167 |
78 |
Extra = maps:merge(maps:get(flush_extra, Opts, #{}), DefExtra), |
168 |
78 |
maybe_init_handler(HostType, PoolId, Opts, Extra). |
169 |
|
|
170 |
|
maybe_init_handler(HostType, PoolId, Opts = #{init_callback := InitFun}, Extra) |
171 |
|
when is_function(InitFun, 3) -> |
172 |
:-( |
Extra#{init_data => InitFun(HostType, PoolId, Opts)}; |
173 |
|
maybe_init_handler(_, _, _, Extra) -> |
174 |
78 |
Extra. |