./ct_report/coverage/mongoose_wpool.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @doc
3 %%% This is here because there are pool options which have to be given when calling
4 %%% the pool (selection strategy, timeout), while we want to set it once for the pool and not
5 %%% worry about them later, hence additional storage.
6 %%% @end
7 %%%-------------------------------------------------------------------
8 -module(mongoose_wpool).
9 -author("bartlomiej.gorny@erlang-solutions.com").
10 -include("mongoose.hrl").
11
12 -record(mongoose_wpool, {
13 name :: pool_name(),
14 atom_name :: wpool:name(),
15 strategy :: wpool:strategy(),
16 call_timeout :: pos_integer()
17 }).
18
19 %% API
20 -export([ensure_started/0,
21 start/2, start/3, start/4, start/5,
22 stop/0, stop/1, stop/2, stop/3,
23 get_worker/2, get_worker/3,
24 call/2, call/3, call/4, call/5,
25 send_request/2, send_request/3, send_request/4, send_request/5,
26 cast/2, cast/3, cast/4, cast/5,
27 get_pool_settings/3, get_pools/0, stats/3]).
28
29 -export([start_sup_pool/3]).
30 -export([start_configured_pools/0, start_configured_pools/1, start_configured_pools/2]).
31 -export([is_configured/1]).
32 -export([make_pool_name/3]).
33 -export([call_start_callback/2]).
34
35 %% Mostly for tests
36 -export([expand_pools/2]).
37
38 -ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2,
39 send_request/2, send_request/3, send_request/4, send_request/5,
40 is_configured/2, is_configured/1, is_configured/1, start/2, start/3,
41 start/5, start_configured_pools/1, start_configured_pools/2, stats/3,
42 stop/1, stop/2]).
43
44 -type pool_type() :: redis | http | rdbms | cassandra | elastic | generic | rabbit | ldap.
45
46 %% Config scope
47 -type scope() :: global | host_type | mongooseim:host_type().
48 -type host_type_or_global() :: mongooseim:host_type_or_global().
49 -type tag() :: atom().
50
51 %% Name of a process
52 -type proc_name() :: atom().
53
54 %% ID of a pool. Used as a key for an ETS table
55 -type pool_name() :: {PoolType :: pool_type(),
56 HostType :: host_type_or_global(),
57 Tag :: tag()}.
58
59 -type pool_opts_in() :: map().
60 -type pool_opts() :: [wpool:option()].
61 -type conn_opts() :: map().
62
63 -type pool_map_in() :: #{type := pool_type(),
64 scope := scope(),
65 tag := tag(),
66 opts := pool_opts_in(),
67 conn_opts := conn_opts()}.
68 %% Pool map with expanded HostType argument instead of scope
69 -type pool_map() :: #{type := pool_type(),
70 host_type := host_type_or_global(),
71 tag := tag(),
72 opts := pool_opts(),
73 conn_opts := conn_opts()}.
74 -type pool_error() :: {pool_not_started, term()}.
75 -type worker_result() :: {ok, pid()} | {error, pool_error()}.
76 -type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_error()}.
77 -type start_result() :: {ok, pid()} | {error, term()}.
78 -type stop_result() :: ok | term().
79
80 -export_type([pool_type/0, tag/0, scope/0, proc_name/0, pool_name/0, pool_opts/0, conn_opts/0]).
81
82 -type callback_fun() :: init | start | is_supported_strategy | stop.
83
84 -callback init() -> ok | {error, term()}.
85 -callback start(host_type_or_global(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) ->
86 {ok, {pid(), proplists:proplist()}} | {ok, pid()} | {error, Reason :: term()}.
87 -callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean().
88 -callback stop(host_type_or_global(), tag()) -> ok.
89
90 -optional_callbacks([is_supported_strategy/1]).
91
92 ensure_started() ->
93 42 wpool:start(),
94 42 case whereis(mongoose_wpool_sup) of
95 undefined ->
96
:-(
mongoose_wpool_sup:start_link();
97 _ ->
98 42 ok
99 end,
100 42 ejabberd_sup:create_ets_table(
101 ?MODULE,
102 [named_table, public,
103 {read_concurrency, true},
104 {keypos, #mongoose_wpool.name},
105 {heir, whereis(mongoose_wpool_sup), undefined}]).
106
107 start_configured_pools() ->
108 42 Pools = mongoose_config:get_opt(outgoing_pools),
109 42 start_configured_pools(Pools).
110
111 start_configured_pools(PoolsIn) ->
112 45 start_configured_pools(PoolsIn, ?ALL_HOST_TYPES).
113
114 start_configured_pools(PoolsIn, HostTypes) ->
115 46 [call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)],
116 46 Pools = expand_pools(PoolsIn, HostTypes),
117 46 [start(Pool) || Pool <- Pools].
118
119 -spec start(pool_map()) -> start_result().
120 start(#{type := PoolType, host_type := HostType, tag := Tag,
121 opts := PoolOpts, conn_opts := ConnOpts}) ->
122 130 start(PoolType, HostType, Tag, PoolOpts, ConnOpts).
123
124 -spec start(pool_type(), pool_opts()) -> start_result().
125 start(PoolType, PoolOpts) ->
126
:-(
start(PoolType, global, PoolOpts).
127
128 -spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result().
129 start(PoolType, HostType, PoolOpts) ->
130
:-(
start(PoolType, HostType, default, PoolOpts).
131
132 -spec start(pool_type(), host_type_or_global(), tag(),
133 pool_opts()) -> start_result().
134 start(PoolType, HostType, Tag, PoolOpts) ->
135
:-(
start(PoolType, HostType, Tag, PoolOpts, #{}).
136
137 -spec start(pool_type(), host_type_or_global(), tag(),
138 pool_opts(), conn_opts()) -> start_result().
139 start(PoolType, HostType, Tag, PoolOpts, ConnOpts) ->
140 130 {Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]),
141 130 Opts = lists:append(Opts0),
142 130 Strategy = proplists:get_value(strategy, Opts, best_worker),
143 130 CallTimeout = proplists:get_value(call_timeout, Opts, 5000),
144 %% If a callback doesn't explicitly blacklist a strategy, let's proceed.
145 130 CallbackModule = make_callback_module_name(PoolType),
146 130 case catch CallbackModule:is_supported_strategy(Strategy) of
147 false ->
148
:-(
error({strategy_not_supported, PoolType, HostType, Tag, Strategy});
149 _ ->
150 130 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout)
151 end.
152
153 -spec start(pool_type(), host_type_or_global(), tag(),
154 pool_opts(), conn_opts(), wpool:strategy(), pos_integer()) ->
155 start_result().
156 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) ->
157 130 case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of
158 {ok, Pid} ->
159 130 ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
160 atom_name = make_pool_name(PoolType, HostType, Tag),
161 strategy = Strategy,
162 call_timeout = CallTimeout}),
163 130 {ok, Pid};
164 Error ->
165
:-(
Error
166 end.
167
168 %% @doc this function starts the worker_pool's pool under a specific supervisor
169 %% in MongooseIM application.
170 %% It's needed for 2 reasons:
171 %% 1. We want to have a full control of all the pools and its restarts
172 %% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor,
173 %% if not, there is no way to stop the pool.
174 -spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) ->
175 {ok, pid()} | {error, term()}.
176 start_sup_pool(PoolType, ProcName, WpoolOpts) ->
177 130 SupName = mongoose_wpool_type_sup:name(PoolType),
178 130 ChildSpec = #{id => ProcName,
179 start => {wpool, start_pool, [ProcName, WpoolOpts]},
180 restart => temporary,
181 type => supervisor,
182 modules => [wpool]},
183 130 supervisor:start_child(SupName, ChildSpec).
184
185 -spec stop() -> term().
186 stop() ->
187 42 [stop_pool(PoolName) || PoolName <- get_pools()].
188
189 -spec stop_pool(pool_name()) -> stop_result().
190 stop_pool({PoolType, HostType, Tag}) ->
191 118 stop(PoolType, HostType, Tag).
192
193 -spec stop(pool_type()) -> stop_result().
194 stop(PoolType) ->
195
:-(
stop(PoolType, global).
196
197 -spec stop(pool_type(), host_type_or_global()) -> stop_result().
198 stop(PoolType, HostType) ->
199
:-(
stop(PoolType, HostType, default).
200
201 -spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result().
202 stop(PoolType, HostType, Tag) ->
203 121 try
204 121 ets:delete(?MODULE, {PoolType, HostType, Tag}),
205 121 call_callback(stop, PoolType, [HostType, Tag]),
206 121 mongoose_wpool_mgr:stop(PoolType, HostType, Tag)
207 catch
208 C:R:S ->
209
:-(
?LOG_ERROR(#{what => pool_stop_failed,
210 pool_type => PoolType, server => HostType, pool_tag => Tag,
211 pool_key => {PoolType, HostType, Tag},
212
:-(
class => C, reason => R, stacktrace => S})
213 end.
214
215 -spec is_configured(pool_type()) -> boolean().
216 is_configured(PoolType) ->
217 258 Pools = mongoose_config:get_opt(outgoing_pools),
218 258 lists:any(fun(#{type := Type}) -> Type =:= PoolType end, Pools).
219
220 -spec get_worker(pool_type(), host_type_or_global()) -> worker_result().
221 get_worker(PoolType, HostType) ->
222 24 get_worker(PoolType, HostType, default).
223
224 -spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result().
225 get_worker(PoolType, HostType, Tag) ->
226 96573 case get_pool(PoolType, HostType, Tag) of
227 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
228 96573 Worker = wpool_pool:Strategy(make_pool_name(Pool)),
229 96573 {ok, whereis(Worker)};
230 Err ->
231
:-(
Err
232 end.
233
234 call(PoolType, Request) ->
235
:-(
call(PoolType, global, Request).
236
237 call(PoolType, HostType, Request) ->
238
:-(
call(PoolType, HostType, default, Request).
239
240 call(PoolType, HostType, Tag, Request) ->
241 115459 case get_pool(PoolType, HostType, Tag) of
242 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
243 115459 wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout);
244 Err ->
245
:-(
Err
246 end.
247
248 call(PoolType, HostType, Tag, HashKey, Request) ->
249
:-(
case get_pool(PoolType, HostType, Tag) of
250 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
251
:-(
wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
252 Err ->
253
:-(
Err
254 end.
255
256 send_request(PoolType, Request) ->
257
:-(
send_request(PoolType, global, Request).
258
259 send_request(PoolType, HostType, Request) ->
260
:-(
send_request(PoolType, HostType, default, Request).
261
262 send_request(PoolType, HostType, Tag, Request) ->
263 649 case get_pool(PoolType, HostType, Tag) of
264 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
265 649 wpool_send_request(make_pool_name(Pool), Request, Strategy, CallTimeout);
266 Err ->
267
:-(
Err
268 end.
269
270 send_request(PoolType, HostType, Tag, HashKey, Request) ->
271
:-(
case get_pool(PoolType, HostType, Tag) of
272 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
273
:-(
wpool_send_request(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
274 Err ->
275
:-(
Err
276 end.
277
278 wpool_send_request(PoolName, Request, Strategy, Timeout) ->
279 649 wpool:send_request(PoolName, Request, Strategy, Timeout).
280
281 cast(PoolType, Request) ->
282
:-(
cast(PoolType, global, Request).
283
284 cast(PoolType, HostType, Request) ->
285
:-(
cast(PoolType, HostType, default, Request).
286
287 cast(PoolType, HostType, Tag, Request) ->
288 4 case get_pool(PoolType, HostType, Tag) of
289 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
290 4 wpool:cast(make_pool_name(Pool), Request, Strategy);
291 Err ->
292
:-(
Err
293 end.
294
295 cast(PoolType, HostType, Tag, HashKey, Request) ->
296
:-(
case get_pool(PoolType, HostType, Tag) of
297 {ok, #mongoose_wpool{} = Pool} ->
298
:-(
wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey});
299 Err ->
300
:-(
Err
301 end.
302
303 -spec get_pool_settings(pool_type(), host_type_or_global(), tag()) ->
304 #mongoose_wpool{} | undefined.
305 get_pool_settings(PoolType, HostType, Tag) ->
306 97 case get_pool(PoolType, HostType, Tag) of
307 97 {ok, PoolRec} -> PoolRec;
308
:-(
{error, {pool_not_started, _}} -> undefined
309 end.
310
311 -spec get_pools() -> [pool_name()].
312 get_pools() ->
313 42 lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)).
314
315 stats(PoolType, HostType, Tag) ->
316
:-(
wpool:stats(make_pool_name(PoolType, HostType, Tag)).
317
318 -spec make_pool_name(pool_type(), scope(), tag()) -> proc_name().
319 make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) ->
320 766 make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag);
321 make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) ->
322 769 binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$,
323 HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8).
324
325 make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) ->
326
:-(
make_pool_name(PoolType, HostType, Tag);
327 make_pool_name(#mongoose_wpool{atom_name = AtomName}) ->
328 212685 AtomName.
329
330 -spec call_start_callback(pool_type(), list()) -> term().
331 call_start_callback(PoolType, Args) ->
332 130 call_callback(start, PoolType, Args).
333
334 -spec call_callback(callback_fun(), pool_type(), list()) -> term().
335 call_callback(CallbackFun, PoolType, Args) ->
336 339 try
337 339 CallbackModule = make_callback_module_name(PoolType),
338 339 erlang:apply(CallbackModule, CallbackFun, Args)
339 catch E:R:ST ->
340
:-(
?LOG_ERROR(#{what => pool_callback_failed,
341 pool_type => PoolType, callback_function => CallbackFun,
342
:-(
error => E, reason => R, stacktrace => ST}),
343
:-(
{error, {callback_crashed, CallbackFun, E, R, ST}}
344 end.
345
346 -spec make_callback_module_name(pool_type()) -> module().
347 make_callback_module_name(PoolType) ->
348 469 Name = "mongoose_wpool_" ++ atom_to_list(PoolType),
349 469 list_to_atom(Name).
350
351 -spec expand_pools([pool_map_in()], [mongooseim:host_type()]) -> [pool_map()].
352 expand_pools(Pools, HostTypes) ->
353 %% First we select only pools for a specific vhost
354 46 HostSpecific = [{Type, HT, Tag} || #{type := Type, scope := HT, tag := Tag} <- Pools, is_binary(HT)],
355 %% Then we expand all pools with `host_type` as HostType parameter but using host_type specific configs
356 %% if they were provided
357 46 F = fun(M = #{type := PoolType, scope := host_type, tag := Tag}) ->
358
:-(
[M#{scope => HostType} || HostType <- HostTypes,
359
:-(
not lists:member({PoolType, HostType, Tag}, HostSpecific)];
360 130 (Other) -> [Other]
361 end,
362 46 Pools1 = lists:flatmap(F, Pools),
363 46 lists:map(fun prepare_pool_map/1, Pools1).
364
365 -spec prepare_pool_map(pool_map_in()) -> pool_map().
366 prepare_pool_map(Pool = #{scope := HT, opts := Opts}) ->
367 %% Rename "scope" field to "host_type" and change wpool opts to a KV list
368 130 Pool1 = maps:remove(scope, Pool),
369 130 Pool1#{host_type => HT, opts => maps:to_list(Opts)}.
370
371 -spec get_unique_types([pool_map_in()]) -> [pool_type()].
372 get_unique_types(Pools) ->
373 46 lists:usort([maps:get(type, Pool) || Pool <- Pools]).
374
375 -spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result().
376 get_pool(PoolType, HostType, Tag) ->
377 325509 case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of
378 112727 [] when is_binary(HostType) -> get_pool(PoolType, global, Tag);
379
:-(
[] -> {error, {pool_not_started, {PoolType, HostType, Tag}}};
380 212782 [Pool] -> {ok, Pool}
381 end.
Line Hits Source