./ct_report/coverage/mongoose_async_pools.COVER.html

1 -module(mongoose_async_pools).
2
3 -include("mongoose_logger.hrl").
4
5 -behaviour(supervisor).
6 -export([start_link/3, init/1]).
7 -ignore_xref([start_link/3]).
8
9 % API
10 -export([start_pool/3, stop_pool/2]).
11 -export([put_task/3, put_task/4, broadcast/3, broadcast_task/4]).
12 -ignore_xref([put_task/3, broadcast/3, broadcast_task/4]).
13 -export([sync/2]).
14
15 -type task() :: term().
16 -type pool_id() :: atom(). % The subsystem, like 'pm_mam', or 'inbox'
17 -type pool_name() :: atom(). % The pool name, like 'inbox_sup_async_pool_localhost'
18 -type pool_type() :: batch | aggregate.
19 -type pool_opts() :: #{pool_type := pool_type(),
20 _ => _}.
21 -type pool_extra() :: #{host_type := mongooseim:host_type(),
22 queue_length => non_neg_integer(),
23 _ => _}.
24
25 -type flush_callback() ::
26 fun(([task()], pool_extra()) ->
27 ok | {error, term()}).
28 -type prep_callback() ::
29 fun((task(), pool_extra()) ->
30 {ok, task()} | {error, term()}).
31 -type aggregate_callback() ::
32 fun((task(), task(), pool_extra()) ->
33 {ok, task()} | {error, term()}).
34 -type request_callback() ::
35 fun((task() | [task()], pool_extra()) ->
36 term()).
37 -type verify_callback() ::
38 fun((term(), task(), pool_extra()) -> term()).
39
40 -export_type([flush_callback/0,
41 prep_callback/0,
42 aggregate_callback/0,
43 request_callback/0,
44 verify_callback/0]).
45
46 -export_type([task/0, pool_id/0, pool_opts/0, pool_extra/0]).
47
48 -spec put_task(mongooseim:host_type(), pool_id(), term()) -> ok.
49 put_task(HostType, PoolId, Task) ->
50
:-(
PoolName = pool_name(HostType, PoolId),
51
:-(
wpool:cast(PoolName, {task, Task}, best_worker).
52
53 -spec put_task(mongooseim:host_type(), pool_id(), term(), term()) -> ok.
54 put_task(HostType, PoolId, Key, Task) ->
55 3587 PoolName = pool_name(HostType, PoolId),
56 3587 wpool:cast(PoolName, {task, Key, Task}, {hash_worker, Key}).
57
58 -spec broadcast(mongooseim:host_type(), pool_id(), term()) -> ok.
59 broadcast(HostType, PoolId, Task) ->
60
:-(
PoolName = pool_name(HostType, PoolId),
61
:-(
wpool:broadcast(PoolName, {broadcast, Task}).
62
63 -spec broadcast_task(mongooseim:host_type(), pool_id(), term(), term()) -> ok.
64 broadcast_task(HostType, PoolId, Key, Task) ->
65
:-(
PoolName = pool_name(HostType, PoolId),
66
:-(
wpool:broadcast(PoolName, {task, Key, Task}).
67
68 %%% API functions
69 -spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) ->
70 supervisor:startchild_ret().
71 start_pool(HostType, PoolId, PoolOpts) ->
72 85 ?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}),
73 85 Supervisor = sup_name(HostType, PoolId),
74 85 ChildSpec = #{id => Supervisor,
75 start => {?MODULE, start_link, [HostType, PoolId, PoolOpts]},
76 restart => transient,
77 type => supervisor},
78 85 ejabberd_sup:start_child(ChildSpec).
79
80 -spec stop_pool(mongooseim:host_type(), pool_id()) -> ok.
81 stop_pool(HostType, PoolId) ->
82 85 ?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}),
83 85 ejabberd_sup:stop_child(sup_name(HostType, PoolId)).
84
85 -spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
86 pool_name(HostType, PoolId) ->
87 3813 persistent_term:get({?MODULE, HostType, PoolId}).
88
89 -spec sync(mongooseim:host_type(), pool_id()) -> term().
90 sync(HostType, PoolId) ->
91 226 Pool = pool_name(HostType, PoolId),
92 226 WorkerNames = wpool:get_workers(Pool),
93 226 Context = #{what => sync_failed, host_type => HostType, pool_id => PoolId},
94 226 F = fun(Pid) -> safely:apply_and_log(gen_server, call, [Pid, sync], Context) end,
95 226 Results = mongoose_lib:pmap(F, WorkerNames),
96 226 [check_result(Result) || Result <- Results].
97
98 273 check_result({ok, ok}) -> ok;
99 3343 check_result({ok, skipped}) -> ok;
100
:-(
check_result(Other) -> ?LOG_ERROR(#{what => sync_failed, reason => Other}).
101
102 %%% Supervisor callbacks
103 -spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) ->
104 {ok, pid()} | ignore | {error, term()}.
105 start_link(HostType, PoolId, PoolOpts) ->
106 85 Supervisor = sup_name(HostType, PoolId),
107 85 supervisor:start_link({local, Supervisor}, ?MODULE, {HostType, PoolId, PoolOpts}).
108
109 -spec init({mongooseim:host_type(), pool_id(), pool_opts()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
110 init({HostType, PoolId, PoolOpts}) ->
111 85 WPoolOpts = process_pool_opts(HostType, PoolId, PoolOpts),
112 85 PoolName = gen_pool_name(HostType, PoolId),
113 85 store_pool_name(HostType, PoolId, PoolName),
114 85 WorkerSpec = #{id => PoolName,
115 start => {wpool, start_pool, [PoolName, WPoolOpts]},
116 restart => permanent,
117 type => supervisor},
118 85 SupFlags = #{strategy => one_for_one,
119 intensity => 1,
120 period => 5},
121 85 {ok, {SupFlags, [WorkerSpec]}}.
122
123 %%% internal callbacks
124 -spec sup_name(mongooseim:host_type(), pool_id()) -> atom().
125 sup_name(HostType, PoolId) ->
126 255 list_to_atom(
127 atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)).
128
129 -spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok.
130 store_pool_name(HostType, PoolId, PoolName) ->
131 85 persistent_term:put({?MODULE, HostType, PoolId}, PoolName).
132
133 -spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
134 gen_pool_name(HostType, PoolId) ->
135 85 list_to_atom(
136 atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)).
137
138 -spec process_pool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> [wpool:option()].
139 process_pool_opts(HostType, PoolId, #{pool_size := NumWorkers} = Opts) ->
140 85 WorkerModule = select_worker_module(HostType, PoolId, Opts),
141 85 WorkerOpts = make_worker_opts(HostType, PoolId, Opts),
142 85 Worker = {WorkerModule, WorkerOpts},
143 85 [{worker, Worker},
144 {workers, NumWorkers},
145 {worker_opt, [{spawn_opt, [{message_queue_data, off_heap}]}]},
146 {worker_shutdown, 10000}].
147
148 select_worker_module(HostType, PoolId, #{pool_type := batch}) ->
149 58 mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter),
150 58 mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter),
151 58 mongoose_batch_worker;
152 select_worker_module(HostType, PoolId, #{pool_type := aggregate}) ->
153 27 mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, async_request], counter),
154 27 mongoose_aggregator_worker.
155
156 -spec make_worker_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> map().
157 make_worker_opts(HostType, PoolId, Opts) ->
158 85 Opts#{host_type => HostType, pool_id => PoolId,
159 flush_extra => make_extra(HostType, PoolId, Opts)}.
160
161 -spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> pool_extra().
162 make_extra(HostType, PoolId, Opts) ->
163 85 DefExtra = case maps:get(pool_type, Opts) of
164 58 batch -> #{host_type => HostType, queue_length => 0};
165 27 aggregate -> #{host_type => HostType}
166 end,
167 85 Extra = maps:merge(maps:get(flush_extra, Opts, #{}), DefExtra),
168 85 maybe_init_handler(HostType, PoolId, Opts, Extra).
169
170 maybe_init_handler(HostType, PoolId, Opts = #{init_callback := InitFun}, Extra)
171 when is_function(InitFun, 3) ->
172
:-(
Extra#{init_data => InitFun(HostType, PoolId, Opts)};
173 maybe_init_handler(_, _, _, Extra) ->
174 85 Extra.
Line Hits Source