./ct_report/coverage/mongoose_wpool.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @doc
3 %%% This is here because there are pool options which have to be given when calling
4 %%% the pool (selection strategy, timeout), while we want to set it once for the pool and not
5 %%% worry about them later, hence additional storage.
6 %%% @end
7 %%%-------------------------------------------------------------------
8 -module(mongoose_wpool).
9 -author("bartlomiej.gorny@erlang-solutions.com").
10 -include("mongoose.hrl").
11
12 -record(mongoose_wpool, {
13 name :: pool_name(),
14 atom_name :: wpool:name(),
15 strategy :: wpool:strategy(),
16 call_timeout :: pos_integer()
17 }).
18
19 %% API
20 -export([ensure_started/0,
21 start/2, start/3, start/4, start/5,
22 stop/0, stop/1, stop/2, stop/3,
23 get_worker/1, get_worker/2, get_worker/3,
24 call/2, call/3, call/4, call/5,
25 send_request/2, send_request/3, send_request/4, send_request/5,
26 cast/2, cast/3, cast/4, cast/5,
27 get_pool_settings/3, get_pools/0, stats/3]).
28
29 -export([start_sup_pool/3]).
30 -export([start_configured_pools/0]).
31 -export([start_configured_pools/1]).
32 -export([start_configured_pools/2]).
33 -export([is_configured/1]).
34 -export([make_pool_name/3]).
35 -export([call_start_callback/2]).
36
37 %% Mostly for tests
38 -export([expand_pools/2]).
39
40 -ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2,
41 send_request/2, send_request/4, send_request/5,
42 is_configured/2, is_configured/1, is_configured/1, start/2, start/3,
43 start/5, start_configured_pools/1, start_configured_pools/2, stats/3,
44 stop/1, stop/2]).
45
46 -type pool_type() :: redis | riak | http | rdbms | cassandra | elastic | generic
47 | rabbit | ldap.
48
49 %% Config scope
50 -type scope() :: global | host | mongooseim:host_type().
51 -type host_type_or_global() :: mongooseim:host_type_or_global().
52
53 -type tag() :: atom().
54 %% Name of a process
55 -type proc_name() :: atom().
56
57 %% ID of a pool. Used as a key for an ETS table
58 -type pool_name() :: {PoolType :: pool_type(),
59 HostType :: host_type_or_global(),
60 Tag :: tag()}.
61
62 -type pool_opts_in() :: map().
63 -type pool_opts() :: [wpool:option()].
64 -type conn_opts() :: map().
65
66 -type pool_map_in() :: #{type := pool_type(),
67 scope := scope(),
68 tag := tag(),
69 opts := pool_opts_in(),
70 conn_opts := conn_opts()}.
71 %% Pool map with expanded HostType argument instead of scope
72 -type pool_map() :: #{type := pool_type(),
73 host_type := host_type_or_global(),
74 tag := tag(),
75 opts := pool_opts(),
76 conn_opts := conn_opts()}.
77 -type pool_error() :: {pool_not_started, term()}.
78 -type worker_result() :: {ok, pid()} | {error, pool_error()}.
79 -type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_error()}.
80 -type start_result() :: {ok, pid()} | {error, term()}.
81 -type stop_result() :: ok | term().
82
83 -export_type([pool_type/0]).
84 -export_type([tag/0]).
85 -export_type([scope/0]).
86 -export_type([proc_name/0]).
87 -export_type([pool_opts/0]).
88 -export_type([conn_opts/0]).
89
90 -type callback_fun() :: init | start | is_supported_strategy | stop.
91
92 -callback init() -> ok | {error, term()}.
93 -callback start(host_type_or_global(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) ->
94 {ok, {pid(), proplists:proplist()}} | {ok, pid()} | {error, Reason :: term()}.
95 -callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean().
96 -callback stop(host_type_or_global(), tag()) -> ok.
97
98 -optional_callbacks([is_supported_strategy/1]).
99
100 ensure_started() ->
101 83 wpool:start(),
102 83 case whereis(mongoose_wpool_sup) of
103 undefined ->
104
:-(
mongoose_wpool_sup:start_link();
105 _ ->
106 83 ok
107 end,
108
109 83 case ets:info(?MODULE) of
110 undefined ->
111 % we set heir here because the whole thing may be started by an ephemeral process
112 83 ets:new(?MODULE, [named_table, public,
113 {read_concurrency, true},
114 {keypos, #mongoose_wpool.name},
115 {heir, whereis(mongoose_wpool_sup), undefined}]);
116 _ ->
117
:-(
ok
118 end.
119
120 start_configured_pools() ->
121 83 Pools = mongoose_config:get_opt(outgoing_pools),
122 83 start_configured_pools(Pools).
123
124 start_configured_pools(PoolsIn) ->
125 106 start_configured_pools(PoolsIn, ?ALL_HOST_TYPES).
126
127 start_configured_pools(PoolsIn, HostTypes) ->
128 107 [call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)],
129 107 Pools = expand_pools(PoolsIn, HostTypes),
130 107 [start(Pool) || Pool <- Pools].
131
132 -spec start(pool_map()) -> start_result().
133 start(#{type := PoolType, host_type := HostType, tag := Tag,
134 opts := PoolOpts, conn_opts := ConnOpts}) ->
135 273 start(PoolType, HostType, Tag, PoolOpts, ConnOpts).
136
137 -spec start(pool_type(), pool_opts()) -> start_result().
138 start(PoolType, PoolOpts) ->
139
:-(
start(PoolType, global, PoolOpts).
140
141 -spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result().
142 start(PoolType, HostType, PoolOpts) ->
143
:-(
start(PoolType, HostType, default, PoolOpts).
144
145 -spec start(pool_type(), host_type_or_global(), tag(),
146 pool_opts()) -> start_result().
147 start(PoolType, HostType, Tag, PoolOpts) ->
148 34 start(PoolType, HostType, Tag, PoolOpts, #{}).
149
150 -spec start(pool_type(), host_type_or_global(), tag(),
151 pool_opts(), conn_opts()) -> start_result().
152 start(PoolType, HostType, Tag, PoolOpts, ConnOpts) ->
153 307 {Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]),
154 307 Opts = lists:append(Opts0),
155 307 Strategy = proplists:get_value(strategy, Opts, best_worker),
156 307 CallTimeout = proplists:get_value(call_timeout, Opts, 5000),
157 %% If a callback doesn't explicitly blacklist a strategy, let's proceed.
158 307 CallbackModule = make_callback_module_name(PoolType),
159 307 case catch CallbackModule:is_supported_strategy(Strategy) of
160 false ->
161
:-(
error({strategy_not_supported, PoolType, HostType, Tag, Strategy});
162 _ ->
163 307 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout)
164 end.
165
166 -spec start(pool_type(), host_type_or_global(), tag(),
167 pool_opts(), conn_opts(), wpool:strategy(), pos_integer()) ->
168 start_result().
169 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) ->
170 307 case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of
171 {ok, Pid} ->
172 306 ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
173 atom_name = make_pool_name(PoolType, HostType, Tag),
174 strategy = Strategy,
175 call_timeout = CallTimeout}),
176 306 {ok, Pid};
177 Error ->
178 1 Error
179 end.
180
181 %% @doc this function starts the worker_pool's pool under a specific supervisor
182 %% in MongooseIM application.
183 %% It's needed for 2 reasons:
184 %% 1. We want to have a full control of all the pools and its restarts
185 %% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor,
186 %% if not, there is no way to stop the pool.
187 -spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) ->
188 {ok, pid()} | {error, term()}.
189 start_sup_pool(PoolType, ProcName, WpoolOpts) ->
190 307 SupName = mongoose_wpool_type_sup:name(PoolType),
191 307 ChildSpec = #{id => ProcName,
192 start => {wpool, start_pool, [ProcName, WpoolOpts]},
193 restart => temporary,
194 type => supervisor,
195 modules => [wpool]},
196 307 supervisor:start_child(SupName, ChildSpec).
197
198 -spec stop() -> term().
199 stop() ->
200 83 [stop_pool(PoolName) || PoolName <- get_pools()].
201
202 -spec stop_pool(pool_name()) -> stop_result().
203 stop_pool({PoolType, HostType, Tag}) ->
204 237 stop(PoolType, HostType, Tag).
205
206 -spec stop(pool_type()) -> stop_result().
207 stop(PoolType) ->
208
:-(
stop(PoolType, global).
209
210 -spec stop(pool_type(), host_type_or_global()) -> stop_result().
211 stop(PoolType, HostType) ->
212
:-(
stop(PoolType, HostType, default).
213
214 -spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result().
215 stop(PoolType, HostType, Tag) ->
216 294 try
217 294 ets:delete(?MODULE, {PoolType, HostType, Tag}),
218 294 call_callback(stop, PoolType, [HostType, Tag]),
219 294 mongoose_wpool_mgr:stop(PoolType, HostType, Tag)
220 catch
221 C:R:S ->
222
:-(
?LOG_ERROR(#{what => pool_stop_failed,
223 pool_type => PoolType, server => HostType, pool_tag => Tag,
224 pool_key => {PoolType, HostType, Tag},
225
:-(
class => C, reason => R, stacktrace => S})
226 end.
227
228 -spec is_configured(pool_type()) -> boolean().
229 is_configured(PoolType) ->
230 89 Pools = mongoose_config:get_opt(outgoing_pools),
231 89 lists:any(fun(#{type := Type}) -> Type =:= PoolType end, Pools).
232
233 -spec get_worker(pool_type()) -> worker_result().
234 get_worker(PoolType) ->
235 110 get_worker(PoolType, global).
236
237 -spec get_worker(pool_type(), host_type_or_global()) -> worker_result().
238 get_worker(PoolType, HostType) ->
239 110 get_worker(PoolType, HostType, default).
240
241 -spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result().
242 get_worker(PoolType, HostType, Tag) ->
243 2436 case get_pool(PoolType, HostType, Tag) of
244 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
245 2326 Worker = wpool_pool:Strategy(make_pool_name(Pool)),
246 2326 {ok, whereis(Worker)};
247 Err ->
248 110 Err
249 end.
250
251 call(PoolType, Request) ->
252
:-(
call(PoolType, global, Request).
253
254 call(PoolType, HostType, Request) ->
255 310 call(PoolType, HostType, default, Request).
256
257 call(PoolType, HostType, Tag, Request) ->
258 493 case get_pool(PoolType, HostType, Tag) of
259 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
260 183 wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout);
261 Err ->
262 310 Err
263 end.
264
265 call(PoolType, HostType, Tag, HashKey, Request) ->
266 1246 case get_pool(PoolType, HostType, Tag) of
267 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
268 1246 wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
269 Err ->
270
:-(
Err
271 end.
272
273 send_request(PoolType, Request) ->
274
:-(
send_request(PoolType, global, Request).
275
276 send_request(PoolType, HostType, Request) ->
277
:-(
send_request(PoolType, HostType, default, Request).
278
279 send_request(PoolType, HostType, Tag, Request) ->
280
:-(
case get_pool(PoolType, HostType, Tag) of
281 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
282
:-(
wpool_send_request(make_pool_name(Pool), Request, Strategy, CallTimeout);
283 Err ->
284
:-(
Err
285 end.
286
287 send_request(PoolType, HostType, Tag, HashKey, Request) ->
288
:-(
case get_pool(PoolType, HostType, Tag) of
289 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
290
:-(
wpool_send_request(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
291 Err ->
292
:-(
Err
293 end.
294
295 wpool_send_request(PoolName, Request, Strategy, Timeout) ->
296
:-(
wpool:send_request(PoolName, Request, Strategy, Timeout).
297
298 cast(PoolType, Request) ->
299
:-(
cast(PoolType, global, Request).
300
301 cast(PoolType, HostType, Request) ->
302
:-(
cast(PoolType, HostType, default, Request).
303
304 cast(PoolType, HostType, Tag, Request) ->
305 98 case get_pool(PoolType, HostType, Tag) of
306 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
307 95 wpool:cast(make_pool_name(Pool), Request, Strategy);
308 Err ->
309 3 Err
310 end.
311
312 cast(PoolType, HostType, Tag, HashKey, Request) ->
313 889 case get_pool(PoolType, HostType, Tag) of
314 {ok, #mongoose_wpool{} = Pool} ->
315 889 wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey});
316 Err ->
317
:-(
Err
318 end.
319
320 -spec get_pool_settings(pool_type(), host_type_or_global(), tag()) ->
321 #mongoose_wpool{} | undefined.
322 get_pool_settings(PoolType, HostType, Tag) ->
323 221 case get_pool(PoolType, HostType, Tag) of
324
:-(
{ok, PoolRec} -> PoolRec;
325 221 {error, {pool_not_started, _}} -> undefined
326 end.
327
328 -spec get_pools() -> [pool_name()].
329 get_pools() ->
330 83 lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)).
331
332 stats(PoolType, HostType, Tag) ->
333
:-(
wpool:stats(make_pool_name(PoolType, HostType, Tag)).
334
335 -spec make_pool_name(pool_type(), scope(), tag()) -> proc_name().
336 make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) ->
337 2484 make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag);
338 make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) ->
339 2589 binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$,
340 HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8).
341
342 make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) ->
343
:-(
make_pool_name(PoolType, HostType, Tag);
344 make_pool_name(#mongoose_wpool{atom_name = AtomName}) ->
345 4739 AtomName.
346
347 -spec call_start_callback(pool_type(), list()) -> term().
348 call_start_callback(PoolType, Args) ->
349 307 call_callback(start, PoolType, Args).
350
351 -spec call_callback(callback_fun(), pool_type(), list()) -> term().
352 call_callback(CallbackFun, PoolType, Args) ->
353 874 try
354 874 CallbackModule = make_callback_module_name(PoolType),
355 874 erlang:apply(CallbackModule, CallbackFun, Args)
356 catch E:R:ST ->
357
:-(
?LOG_ERROR(#{what => pool_callback_failed,
358 pool_type => PoolType, callback_function => CallbackFun,
359
:-(
error => E, reason => R, stacktrace => ST}),
360
:-(
{error, {callback_crashed, CallbackFun, E, R, ST}}
361 end.
362
363 -spec make_callback_module_name(pool_type()) -> module().
364 make_callback_module_name(PoolType) ->
365 1181 Name = "mongoose_wpool_" ++ atom_to_list(PoolType),
366 1181 list_to_atom(Name).
367
368 -spec expand_pools([pool_map_in()], [mongooseim:host_type()]) -> [pool_map()].
369 expand_pools(Pools, HostTypes) ->
370 %% First we select only pools for a specific vhost
371 107 HostSpecific = [{Type, HT, Tag} || #{type := Type, scope := HT, tag := Tag} <- Pools, is_binary(HT)],
372 %% Then we expand all pools with `host` as HostType parameter but using host specific configs
373 %% if they were provided
374 107 F = fun(M = #{type := PoolType, scope := host, tag := Tag}) ->
375 1 [M#{scope => HostType} || HostType <- HostTypes,
376 1 not lists:member({PoolType, HostType, Tag}, HostSpecific)];
377 272 (Other) -> [Other]
378 end,
379 107 Pools1 = lists:flatmap(F, Pools),
380 107 lists:map(fun prepare_pool_map/1, Pools1).
381
382 -spec prepare_pool_map(pool_map_in()) -> pool_map().
383 prepare_pool_map(Pool = #{scope := HT, opts := Opts}) ->
384 %% Rename "scope" field to "host_type" and change wpool opts to a KV list
385 273 Pool1 = maps:remove(scope, Pool),
386 273 Pool1#{host_type => HT, opts => maps:to_list(Opts)}.
387
388 -spec get_unique_types([pool_map_in()]) -> [pool_type()].
389 get_unique_types(Pools) ->
390 107 lists:usort([maps:get(type, Pool) || Pool <- Pools]).
391
392 -spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result().
393 get_pool(PoolType, HostType, Tag) ->
394 5776 case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of
395 393 [] when is_binary(HostType) -> get_pool(PoolType, global, Tag);
396 644 [] -> {error, {pool_not_started, {PoolType, HostType, Tag}}};
397 4739 [Pool] -> {ok, Pool}
398 end.
Line Hits Source