./ct_report/coverage/mongoose_wpool.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @doc
3 %%% This is here because there are pool options which have to be given when calling
4 %%% the pool (selection strategy, timeout), while we want to set it once for the pool and not
5 %%% worry about them later, hence additional storage.
6 %%% @end
7 %%%-------------------------------------------------------------------
8 -module(mongoose_wpool).
9 -author("bartlomiej.gorny@erlang-solutions.com").
10 -include("mongoose.hrl").
11 -include("mongoose_config_spec.hrl").
12
13 -record(mongoose_wpool, {
14 name :: pool_name(),
15 atom_name :: wpool:name(),
16 strategy :: wpool:strategy(),
17 call_timeout :: pos_integer()
18 }).
19
20 %% API
21 -export([ensure_started/0,
22 start/2, start/3, start/4, start/5,
23 stop/0, stop/1, stop/2, stop/3,
24 get_worker/2, get_worker/3,
25 call/2, call/3, call/4, call/5,
26 send_request/2, send_request/3, send_request/4, send_request/5,
27 cast/2, cast/3, cast/4, cast/5,
28 get_pool_settings/3, get_pools/0, stats/3]).
29
30 -export([start_sup_pool/3]).
31 -export([start_configured_pools/0]).
32 -export([start_configured_pools/1]).
33 -export([start_configured_pools/2]).
34 -export([is_configured/1]).
35 -export([make_pool_name/3]).
36 -export([call_start_callback/2]).
37
38 %% Mostly for tests
39 -export([expand_pools/2]).
40
41 -ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2,
42 send_request/2, send_request/3, send_request/4, send_request/5,
43 is_configured/2, is_configured/1, is_configured/1, start/2, start/3,
44 start/5, start_configured_pools/1, start_configured_pools/2, stats/3,
45 stop/1, stop/2]).
46
47 -type pool_type() :: redis | http | rdbms | cassandra | elastic | generic
48 | rabbit | ldap.
49
50 %% Config scope
51 -type scope() :: global | host_type | mongooseim:host_type().
52 -type host_type_or_global() :: mongooseim:host_type_or_global().
53
54 -type tag() :: atom().
55 %% Name of a process
56 -type proc_name() :: atom().
57
58 %% ID of a pool. Used as a key for an ETS table
59 -type pool_name() :: {PoolType :: pool_type(),
60 HostType :: host_type_or_global(),
61 Tag :: tag()}.
62
63 -type pool_opts_in() :: map().
64 -type pool_opts() :: [wpool:option()].
65 -type conn_opts() :: map().
66
67 -type pool_map_in() :: #{type := pool_type(),
68 scope := scope(),
69 tag := tag(),
70 opts := pool_opts_in(),
71 conn_opts := conn_opts()}.
72 %% Pool map with expanded HostType argument instead of scope
73 -type pool_map() :: #{type := pool_type(),
74 host_type := host_type_or_global(),
75 tag := tag(),
76 opts := pool_opts(),
77 conn_opts := conn_opts()}.
78 -type pool_error() :: {pool_not_started, term()}.
79 -type worker_result() :: {ok, pid()} | {error, pool_error()}.
80 -type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_error()}.
81 -type start_result() :: {ok, pid()} | {error, term()}.
82 -type stop_result() :: ok | term().
83
84 -export_type([pool_type/0]).
85 -export_type([tag/0]).
86 -export_type([scope/0]).
87 -export_type([proc_name/0]).
88 -export_type([pool_name/0]).
89 -export_type([pool_opts/0]).
90 -export_type([conn_opts/0]).
91
92 -type callback_fun() :: init | start | is_supported_strategy | stop.
93
94 -callback init() -> ok | {error, term()}.
95 -callback start(host_type_or_global(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) ->
96 {ok, {pid(), proplists:proplist()}} | {ok, pid()} | {error, Reason :: term()}.
97 -callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean().
98 -callback stop(host_type_or_global(), tag()) -> ok.
99
100 -optional_callbacks([is_supported_strategy/1]).
101
102 ensure_started() ->
103 104 wpool:start(),
104 104 case whereis(mongoose_wpool_sup) of
105 undefined ->
106
:-(
mongoose_wpool_sup:start_link();
107 _ ->
108 104 ok
109 end,
110 104 ejabberd_sup:create_ets_table(
111 ?MODULE,
112 [named_table, public,
113 {read_concurrency, true},
114 {keypos, #mongoose_wpool.name},
115 {heir, whereis(mongoose_wpool_sup), undefined}]).
116
117 start_configured_pools() ->
118 104 Pools = mongoose_config:get_opt(outgoing_pools),
119 104 start_configured_pools(Pools).
120
121 start_configured_pools(PoolsIn) ->
122 125 start_configured_pools(PoolsIn, ?ALL_HOST_TYPES).
123
124 start_configured_pools(PoolsIn, HostTypes) ->
125 127 [call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)],
126 127 Pools = expand_pools(PoolsIn, HostTypes),
127 127 [start(Pool) || Pool <- Pools].
128
129 -spec start(pool_map()) -> start_result().
130 start(#{type := PoolType, host_type := HostType, tag := Tag,
131 opts := PoolOpts, conn_opts := ConnOpts}) ->
132 231 start(PoolType, HostType, Tag, PoolOpts, ConnOpts).
133
134 -spec start(pool_type(), pool_opts()) -> start_result().
135 start(PoolType, PoolOpts) ->
136
:-(
start(PoolType, global, PoolOpts).
137
138 -spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result().
139 start(PoolType, HostType, PoolOpts) ->
140
:-(
start(PoolType, HostType, default, PoolOpts).
141
142 -spec start(pool_type(), host_type_or_global(), tag(),
143 pool_opts()) -> start_result().
144 start(PoolType, HostType, Tag, PoolOpts) ->
145 88 start(PoolType, HostType, Tag, PoolOpts, #{}).
146
147 -spec start(pool_type(), host_type_or_global(), tag(),
148 pool_opts(), conn_opts()) -> start_result().
149 start(PoolType, HostType, Tag, PoolOpts, ConnOpts) ->
150 319 {Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]),
151 319 Opts = lists:append(Opts0),
152 319 Strategy = proplists:get_value(strategy, Opts, best_worker),
153 319 CallTimeout = proplists:get_value(call_timeout, Opts, 5000),
154 %% If a callback doesn't explicitly blacklist a strategy, let's proceed.
155 319 CallbackModule = make_callback_module_name(PoolType),
156 319 case catch CallbackModule:is_supported_strategy(Strategy) of
157 false ->
158
:-(
error({strategy_not_supported, PoolType, HostType, Tag, Strategy});
159 _ ->
160 319 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout)
161 end.
162
163 -spec start(pool_type(), host_type_or_global(), tag(),
164 pool_opts(), conn_opts(), wpool:strategy(), pos_integer()) ->
165 start_result().
166 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) ->
167 319 case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of
168 {ok, Pid} ->
169 319 ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
170 atom_name = make_pool_name(PoolType, HostType, Tag),
171 strategy = Strategy,
172 call_timeout = CallTimeout}),
173 319 {ok, Pid};
174 Error ->
175
:-(
Error
176 end.
177
178 %% @doc this function starts the worker_pool's pool under a specific supervisor
179 %% in MongooseIM application.
180 %% It's needed for 2 reasons:
181 %% 1. We want to have a full control of all the pools and its restarts
182 %% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor,
183 %% if not, there is no way to stop the pool.
184 -spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) ->
185 {ok, pid()} | {error, term()}.
186 start_sup_pool(PoolType, ProcName, WpoolOpts) ->
187 319 SupName = mongoose_wpool_type_sup:name(PoolType),
188 319 ChildSpec = #{id => ProcName,
189 start => {wpool, start_pool, [ProcName, WpoolOpts]},
190 restart => temporary,
191 type => supervisor,
192 modules => [wpool]},
193 319 supervisor:start_child(SupName, ChildSpec).
194
195 -spec stop() -> term().
196 stop() ->
197 104 [stop_pool(PoolName) || PoolName <- get_pools()].
198
199 -spec stop_pool(pool_name()) -> stop_result().
200 stop_pool({PoolType, HostType, Tag}) ->
201 203 stop(PoolType, HostType, Tag).
202
203 -spec stop(pool_type()) -> stop_result().
204 stop(PoolType) ->
205
:-(
stop(PoolType, global).
206
207 -spec stop(pool_type(), host_type_or_global()) -> stop_result().
208 stop(PoolType, HostType) ->
209
:-(
stop(PoolType, HostType, default).
210
211 -spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result().
212 stop(PoolType, HostType, Tag) ->
213 313 try
214 313 ets:delete(?MODULE, {PoolType, HostType, Tag}),
215 313 call_callback(stop, PoolType, [HostType, Tag]),
216 313 mongoose_wpool_mgr:stop(PoolType, HostType, Tag)
217 catch
218 C:R:S ->
219
:-(
?LOG_ERROR(#{what => pool_stop_failed,
220 pool_type => PoolType, server => HostType, pool_tag => Tag,
221 pool_key => {PoolType, HostType, Tag},
222
:-(
class => C, reason => R, stacktrace => S})
223 end.
224
225 -spec is_configured(pool_type()) -> boolean().
226 is_configured(PoolType) ->
227 259 Pools = mongoose_config:get_opt(outgoing_pools),
228 259 lists:any(fun(#{type := Type}) -> Type =:= PoolType end, Pools).
229
230 -spec get_worker(pool_type(), host_type_or_global()) -> worker_result().
231 get_worker(PoolType, HostType) ->
232 24 get_worker(PoolType, HostType, default).
233
234 -spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result().
235 get_worker(PoolType, HostType, Tag) ->
236 1946 case get_pool(PoolType, HostType, Tag) of
237 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
238 1946 Worker = wpool_pool:Strategy(make_pool_name(Pool)),
239 1946 {ok, whereis(Worker)};
240 Err ->
241
:-(
Err
242 end.
243
244 call(PoolType, Request) ->
245
:-(
call(PoolType, global, Request).
246
247 call(PoolType, HostType, Request) ->
248
:-(
call(PoolType, HostType, default, Request).
249
250 call(PoolType, HostType, Tag, Request) ->
251 123284 case get_pool(PoolType, HostType, Tag) of
252 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
253 123284 wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout);
254 Err ->
255
:-(
Err
256 end.
257
258 call(PoolType, HostType, Tag, HashKey, Request) ->
259
:-(
case get_pool(PoolType, HostType, Tag) of
260 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
261
:-(
wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
262 Err ->
263
:-(
Err
264 end.
265
266 send_request(PoolType, Request) ->
267
:-(
send_request(PoolType, global, Request).
268
269 send_request(PoolType, HostType, Request) ->
270
:-(
send_request(PoolType, HostType, default, Request).
271
272 send_request(PoolType, HostType, Tag, Request) ->
273 654 case get_pool(PoolType, HostType, Tag) of
274 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
275 654 wpool_send_request(make_pool_name(Pool), Request, Strategy, CallTimeout);
276 Err ->
277
:-(
Err
278 end.
279
280 send_request(PoolType, HostType, Tag, HashKey, Request) ->
281
:-(
case get_pool(PoolType, HostType, Tag) of
282 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
283
:-(
wpool_send_request(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
284 Err ->
285
:-(
Err
286 end.
287
288 wpool_send_request(PoolName, Request, Strategy, Timeout) ->
289 654 wpool:send_request(PoolName, Request, Strategy, Timeout).
290
291 cast(PoolType, Request) ->
292
:-(
cast(PoolType, global, Request).
293
294 cast(PoolType, HostType, Request) ->
295
:-(
cast(PoolType, HostType, default, Request).
296
297 cast(PoolType, HostType, Tag, Request) ->
298 154 case get_pool(PoolType, HostType, Tag) of
299 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
300 154 wpool:cast(make_pool_name(Pool), Request, Strategy);
301 Err ->
302
:-(
Err
303 end.
304
305 cast(PoolType, HostType, Tag, HashKey, Request) ->
306 404 case get_pool(PoolType, HostType, Tag) of
307 {ok, #mongoose_wpool{} = Pool} ->
308 404 wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey});
309 Err ->
310
:-(
Err
311 end.
312
313 -spec get_pool_settings(pool_type(), host_type_or_global(), tag()) ->
314 #mongoose_wpool{} | undefined.
315 get_pool_settings(PoolType, HostType, Tag) ->
316 221 case get_pool(PoolType, HostType, Tag) of
317 221 {ok, PoolRec} -> PoolRec;
318
:-(
{error, {pool_not_started, _}} -> undefined
319 end.
320
321 -spec get_pools() -> [pool_name()].
322 get_pools() ->
323 104 lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)).
324
325 stats(PoolType, HostType, Tag) ->
326
:-(
wpool:stats(make_pool_name(PoolType, HostType, Tag)).
327
328 -spec make_pool_name(pool_type(), scope(), tag()) -> proc_name().
329 make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) ->
330 1069 make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag);
331 make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) ->
332 1339 binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$,
333 HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8).
334
335 make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) ->
336
:-(
make_pool_name(PoolType, HostType, Tag);
337 make_pool_name(#mongoose_wpool{atom_name = AtomName}) ->
338 126442 AtomName.
339
340 -spec call_start_callback(pool_type(), list()) -> term().
341 call_start_callback(PoolType, Args) ->
342 319 call_callback(start, PoolType, Args).
343
344 -spec call_callback(callback_fun(), pool_type(), list()) -> term().
345 call_callback(CallbackFun, PoolType, Args) ->
346 863 try
347 863 CallbackModule = make_callback_module_name(PoolType),
348 863 erlang:apply(CallbackModule, CallbackFun, Args)
349 catch E:R:ST ->
350
:-(
?LOG_ERROR(#{what => pool_callback_failed,
351 pool_type => PoolType, callback_function => CallbackFun,
352
:-(
error => E, reason => R, stacktrace => ST}),
353
:-(
{error, {callback_crashed, CallbackFun, E, R, ST}}
354 end.
355
356 -spec make_callback_module_name(pool_type()) -> module().
357 make_callback_module_name(PoolType) ->
358 1182 Name = "mongoose_wpool_" ++ atom_to_list(PoolType),
359 1182 list_to_atom(Name).
360
361 -spec expand_pools([pool_map_in()], [mongooseim:host_type()]) -> [pool_map()].
362 expand_pools(Pools, HostTypes) ->
363 %% First we select only pools for a specific vhost
364 127 HostSpecific = [{Type, HT, Tag} || #{type := Type, scope := HT, tag := Tag} <- Pools, is_binary(HT)],
365 %% Then we expand all pools with `host_type` as HostType parameter but using host_type specific configs
366 %% if they were provided
367 127 F = fun(M = #{type := PoolType, scope := host_type, tag := Tag}) ->
368 1 [M#{scope => HostType} || HostType <- HostTypes,
369 1 not lists:member({PoolType, HostType, Tag}, HostSpecific)];
370 230 (Other) -> [Other]
371 end,
372 127 Pools1 = lists:flatmap(F, Pools),
373 127 lists:map(fun prepare_pool_map/1, Pools1).
374
375 -spec prepare_pool_map(pool_map_in()) -> pool_map().
376 prepare_pool_map(Pool = #{scope := HT, opts := Opts}) ->
377 %% Rename "scope" field to "host_type" and change wpool opts to a KV list
378 231 Pool1 = maps:remove(scope, Pool),
379 231 Pool1#{host_type => HT, opts => maps:to_list(Opts)}.
380
381 -spec get_unique_types([pool_map_in()]) -> [pool_type()].
382 get_unique_types(Pools) ->
383 127 lists:usort([maps:get(type, Pool) || Pool <- Pools]).
384
385 -spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result().
386 get_pool(PoolType, HostType, Tag) ->
387 243347 case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of
388 116684 [] when is_binary(HostType) -> get_pool(PoolType, global, Tag);
389
:-(
[] -> {error, {pool_not_started, {PoolType, HostType, Tag}}};
390 126663 [Pool] -> {ok, Pool}
391 end.
Line Hits Source