./ct_report/coverage/mongoose_async_pools.COVER.html

1 -module(mongoose_async_pools).
2
3 -include("mongoose_logger.hrl").
4
5 -behaviour(supervisor).
6 -export([start_link/3, init/1]).
7 -ignore_xref([start_link/3]).
8
9 % API
10 -export([start_pool/3, stop_pool/2]).
11 -export([put_task/3, put_task/4]).
12 -ignore_xref([put_task/3]).
13 -export([sync/2]).
14
15 -type task() :: term().
16 -type pool_id() :: atom(). % The subsystem, like 'pm_mam', or 'inbox'
17 -type pool_name() :: atom(). % The pool name, like 'inbox_sup_async_pool_localhost'
18 -type pool_type() :: batch | aggregate.
19 -type pool_opts() :: #{pool_type := pool_type(),
20 _ => _}.
21 -type pool_extra() :: #{host_type := mongooseim:host_type(),
22 queue_length => non_neg_integer(),
23 _ => _}.
24
25 -type flush_callback() ::
26 fun(([task()], pool_extra()) ->
27 ok | {error, term()}).
28 -type prep_callback() ::
29 fun((task(), pool_extra()) ->
30 {ok, task()} | {error, term()}).
31 -type aggregate_callback() ::
32 fun((task(), task(), pool_extra()) ->
33 {ok, task()} | {error, term()}).
34 -type request_callback() ::
35 fun((task() | [task()], pool_extra()) ->
36 term()).
37 -type verify_callback() ::
38 fun((term(), task(), pool_extra()) -> term()).
39
40 -export_type([flush_callback/0,
41 prep_callback/0,
42 aggregate_callback/0,
43 request_callback/0,
44 verify_callback/0]).
45
46 -export_type([pool_id/0, pool_opts/0, pool_extra/0]).
47
48 -spec put_task(mongooseim:host_type(), pool_id(), term()) -> ok.
49 put_task(HostType, PoolId, Task) ->
50
:-(
PoolName = pool_name(HostType, PoolId),
51
:-(
wpool:cast(PoolName, {task, Task}, best_worker).
52
53 -spec put_task(mongooseim:host_type(), pool_id(), term(), term()) -> ok.
54 put_task(HostType, PoolId, Key, Task) ->
55
:-(
PoolName = pool_name(HostType, PoolId),
56
:-(
wpool:cast(PoolName, {task, Key, Task}, {hash_worker, Key}).
57
58 %%% API functions
59 -spec start_pool(mongooseim:host_type(), pool_id(), pool_opts()) ->
60 supervisor:startchild_ret().
61 start_pool(HostType, PoolId, PoolOpts) ->
62
:-(
?LOG_INFO(#{what => async_pool_starting, host_type => HostType, pool_id => PoolId}),
63
:-(
Supervisor = sup_name(HostType, PoolId),
64
:-(
ChildSpec = #{id => Supervisor,
65 start => {?MODULE, start_link, [HostType, PoolId, PoolOpts]},
66 restart => transient,
67 type => supervisor},
68
:-(
ejabberd_sup:start_child(ChildSpec).
69
70 -spec stop_pool(mongooseim:host_type(), pool_id()) -> ok.
71 stop_pool(HostType, PoolId) ->
72
:-(
?LOG_INFO(#{what => async_pool_stopping, host_type => HostType, pool_id => PoolId}),
73
:-(
ejabberd_sup:stop_child(sup_name(HostType, PoolId)).
74
75 -spec pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
76 pool_name(HostType, PoolId) ->
77
:-(
persistent_term:get({?MODULE, HostType, PoolId}).
78
79 -spec sync(mongooseim:host_type(), pool_id()) -> term().
80 sync(HostType, PoolId) ->
81
:-(
Pool = pool_name(HostType, PoolId),
82
:-(
WorkerNames = wpool:get_workers(Pool),
83
:-(
Context = #{what => sync_failed, host_type => HostType, pool_id => PoolId},
84
:-(
F = fun(Pid) -> safely:apply_and_log(gen_server, call, [Pid, sync], Context) end,
85
:-(
Results = mongoose_lib:pmap(F, WorkerNames),
86
:-(
[check_result(Result) || Result <- Results].
87
88
:-(
check_result({ok, ok}) -> ok;
89
:-(
check_result({ok, skipped}) -> ok;
90
:-(
check_result(Other) -> ?LOG_ERROR(#{what => sync_failed, reason => Other}).
91
92 %%% Supervisor callbacks
93 -spec start_link(mongooseim:host_type(), pool_id(), pool_opts()) ->
94 {ok, pid()} | ignore | {error, term()}.
95 start_link(HostType, PoolId, PoolOpts) ->
96
:-(
Supervisor = sup_name(HostType, PoolId),
97
:-(
supervisor:start_link({local, Supervisor}, ?MODULE, {HostType, PoolId, PoolOpts}).
98
99 -spec init({mongooseim:host_type(), pool_id(), pool_opts()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
100 init({HostType, PoolId, PoolOpts}) ->
101
:-(
WPoolOpts = process_pool_opts(HostType, PoolId, PoolOpts),
102
:-(
PoolName = gen_pool_name(HostType, PoolId),
103
:-(
store_pool_name(HostType, PoolId, PoolName),
104
:-(
WorkerSpec = #{id => PoolName,
105 start => {wpool, start_pool, [PoolName, WPoolOpts]},
106 restart => permanent,
107 type => supervisor},
108
:-(
SupFlags = #{strategy => one_for_one,
109 intensity => 1,
110 period => 5},
111
:-(
{ok, {SupFlags, [WorkerSpec]}}.
112
113 %%% internal callbacks
114 -spec sup_name(mongooseim:host_type(), pool_id()) -> atom().
115 sup_name(HostType, PoolId) ->
116
:-(
list_to_atom(
117 atom_to_list(PoolId) ++ "_sup_async_pool_" ++ binary_to_list(HostType)).
118
119 -spec store_pool_name(mongooseim:host_type(), pool_id(), pool_name()) -> ok.
120 store_pool_name(HostType, PoolId, PoolName) ->
121
:-(
persistent_term:put({?MODULE, HostType, PoolId}, PoolName).
122
123 -spec gen_pool_name(mongooseim:host_type(), pool_id()) -> pool_name().
124 gen_pool_name(HostType, PoolId) ->
125
:-(
list_to_atom(
126 atom_to_list(PoolId) ++ "_async_pool_" ++ binary_to_list(HostType)).
127
128 -spec process_pool_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> [wpool:option()].
129 process_pool_opts(HostType, PoolId, #{pool_size := NumWorkers} = Opts) ->
130
:-(
WorkerModule = select_worker_module(HostType, PoolId, Opts),
131
:-(
WorkerOpts = make_worker_opts(HostType, PoolId, Opts),
132
:-(
Worker = {WorkerModule, WorkerOpts},
133
:-(
[{worker, Worker},
134 {workers, NumWorkers},
135 {worker_opt, [{message_queue_data, off_heap}]},
136 {worker_shutdown, 10000}].
137
138 select_worker_module(HostType, PoolId, #{pool_type := batch}) ->
139
:-(
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, timed_flushes], counter),
140
:-(
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, batch_flushes], counter),
141
:-(
mongoose_batch_worker;
142 select_worker_module(HostType, PoolId, #{pool_type := aggregate}) ->
143
:-(
mongoose_metrics:ensure_metric(HostType, [?MODULE, PoolId, async_request], counter),
144
:-(
mongoose_aggregator_worker.
145
146 -spec make_worker_opts(mongooseim:host_type(), pool_id(), pool_opts()) -> map().
147 make_worker_opts(HostType, PoolId, Opts) ->
148
:-(
Opts#{host_type => HostType, pool_id => PoolId,
149 flush_extra => make_extra(HostType, PoolId, Opts)}.
150
151 -spec make_extra(mongooseim:host_type(), pool_id(), pool_opts()) -> pool_extra().
152 make_extra(HostType, PoolId, Opts) ->
153
:-(
DefExtra = case maps:get(pool_type, Opts) of
154
:-(
batch -> #{host_type => HostType, queue_length => 0};
155
:-(
aggregate -> #{host_type => HostType}
156 end,
157
:-(
Extra = maps:merge(maps:get(flush_extra, Opts, #{}), DefExtra),
158
:-(
maybe_init_handler(HostType, PoolId, Opts, Extra).
159
160 maybe_init_handler(HostType, PoolId, Opts = #{init_callback := InitFun}, Extra)
161 when is_function(InitFun, 3) ->
162
:-(
Extra#{init_data => InitFun(HostType, PoolId, Opts)};
163 maybe_init_handler(_, _, _, Extra) ->
164
:-(
Extra.
Line Hits Source