./ct_report/coverage/mongoose_wpool.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @doc
3 %%% This is here because there are pool options which have to be given when calling
4 %%% the pool (selection strategy, timeout), while we want to set it once for the pool and not
5 %%% worry about them later, hence additional storage.
6 %%% @end
7 %%%-------------------------------------------------------------------
8 -module(mongoose_wpool).
9 -author("bartlomiej.gorny@erlang-solutions.com").
10 -include("mongoose.hrl").
11
12 -type call_timeout() :: pos_integer() | undefined.
13 -record(mongoose_wpool, {
14 name :: pool_name(),
15 atom_name :: wpool:name(),
16 strategy :: wpool:strategy() | undefined,
17 call_timeout :: call_timeout()
18 }).
19 -dialyzer({no_match, start/4}).
20
21 %% API
22 -export([ensure_started/0,
23 start/2, start/3, start/4, start/5,
24 stop/0, stop/1, stop/2, stop/3,
25 get_worker/1, get_worker/2, get_worker/3,
26 call/2, call/3, call/4, call/5,
27 cast/2, cast/3, cast/4, cast/5,
28 get_pool_settings/3, get_pools/0, stats/3]).
29
30 -export([start_sup_pool/3]).
31 -export([start_configured_pools/0]).
32 -export([start_configured_pools/1]).
33 -export([start_configured_pools/2]).
34 -export([is_configured/1]).
35 -export([make_pool_name/3]).
36 -export([call_start_callback/2]).
37
38 %% Mostly for tests
39 -export([expand_pools/2]).
40
41 -ignore_xref([behaviour_info/1, call/2, cast/2, cast/3, expand_pools/2, get_worker/2,
42 is_configured/2, is_configured/1, is_configured/1, start/2, start/3,
43 start/5, start_configured_pools/1, start_configured_pools/2, stats/3,
44 stop/1, stop/2]).
45
46 -type pool_type() :: redis | riak | http | rdbms | cassandra | elastic | generic
47 | rabbit | ldap.
48
49 %% Config scope
50 -type scope() :: global | host | mongooseim:host_type().
51 -type host_type_or_global() :: mongooseim:host_type_or_global().
52
53 -type tag() :: atom().
54 %% Name of a process
55 -type proc_name() :: atom().
56
57 %% ID of a pool. Used as a key for an ETS table
58 -type pool_name() :: {PoolType :: pool_type(),
59 HostType :: host_type_or_global(),
60 Tag :: tag()}.
61
62 -type pool_opts() :: [wpool:option()].
63 -type conn_opts() :: [{atom(), any()}].
64
65 -type pool_tuple_in() :: {PoolType :: pool_type(),
66 HostType :: scope(),
67 Tag :: tag(),
68 WpoolOpts :: pool_opts(),
69 ConnOpts :: conn_opts()}.
70 %% Pool tuple with expanded HostType argument
71 -type pool_tuple() :: {PoolType :: pool_type(),
72 %% does not contain `host' atom
73 HostType :: host_type_or_global(),
74 Tag :: tag(),
75 WpoolOpts :: pool_opts(),
76 ConnOpts :: conn_opts()}.
77 -type pool_error() :: {pool_not_started, term()}.
78 -type worker_result() :: {ok, pid()} | {error, pool_error()}.
79 -type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_error()}.
80 -type start_result() :: {ok, pid()} | {error, term()}.
81 -type stop_result() :: ok | term().
82
83 -export_type([pool_type/0]).
84 -export_type([tag/0]).
85 -export_type([scope/0]).
86 -export_type([proc_name/0]).
87 -export_type([pool_opts/0]).
88 -export_type([conn_opts/0]).
89
90 -type callback_fun() :: init | start | default_opts | is_supported_strategy | stop.
91
92 -callback init() -> ok | {error, term()}.
93 -callback start(scope(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) ->
94 {ok, {pid(), proplists:proplist()}} | {ok, pid()} |
95 {external, pid()} | {error, Reason :: term()}.
96 -callback default_opts() -> conn_opts().
97 -callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean().
98 -callback stop(scope(), tag()) -> ok.
99
100 -optional_callbacks([default_opts/0, is_supported_strategy/1]).
101
102 ensure_started() ->
103 82 wpool:start(),
104 82 case whereis(mongoose_wpool_sup) of
105 undefined ->
106
:-(
mongoose_wpool_sup:start_link();
107 _ ->
108 82 ok
109 end,
110
111 82 case ets:info(?MODULE) of
112 undefined ->
113 % we set heir here because the whole thing may be started by an ephemeral process
114 80 ets:new(?MODULE, [named_table, public,
115 {read_concurrency, true},
116 {keypos, #mongoose_wpool.name},
117 {heir, whereis(mongoose_wpool_sup), undefined}]);
118 _ ->
119 2 ok
120 end.
121
122 start_configured_pools() ->
123 80 Pools = mongoose_config:get_opt(outgoing_pools, []),
124 80 start_configured_pools(Pools).
125
126 start_configured_pools(PoolsIn) ->
127 100 start_configured_pools(PoolsIn, ?ALL_HOST_TYPES).
128
129 start_configured_pools(PoolsIn, HostTypes) ->
130 103 [call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)],
131 103 Pools = expand_pools(PoolsIn, HostTypes),
132 103 [start(Pool) || Pool <- Pools].
133
134 -spec start(pool_tuple()) -> start_result().
135 start({PoolType, HostType, Tag, PoolOpts, ConnOpts}) ->
136 103 start(PoolType, HostType, Tag, PoolOpts, ConnOpts).
137
138 -spec start(pool_type(), pool_opts()) -> start_result().
139 start(PoolType, PoolOpts) ->
140
:-(
start(PoolType, global, PoolOpts).
141
142 -spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result().
143 start(PoolType, HostType, PoolOpts) ->
144
:-(
start(PoolType, HostType, default, PoolOpts).
145
146 -spec start(pool_type(), host_type_or_global(), tag(),
147 pool_opts()) -> start_result().
148 start(PoolType, HostType, Tag, PoolOpts) ->
149 32 start(PoolType, HostType, Tag, PoolOpts, []).
150
151 -spec start(pool_type(), host_type_or_global(), tag(),
152 pool_opts(), conn_opts()) -> start_result().
153 start(PoolType, HostType, Tag, PoolOpts, ConnOpts) ->
154 135 {Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]),
155 135 Opts = lists:append(Opts0) ++ default_opts(PoolType),
156 135 Strategy = proplists:get_value(strategy, Opts, best_worker),
157 135 CallTimeout = proplists:get_value(call_timeout, Opts, 5000),
158 %% If a callback doesn't explicitly blacklist a strategy, let's proceed.
159 135 CallbackModule = make_callback_module_name(PoolType),
160 135 case catch CallbackModule:is_supported_strategy(Strategy) of
161 false ->
162
:-(
error({strategy_not_supported, PoolType, HostType, Tag, Strategy});
163 _ ->
164 135 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout)
165 end.
166
167 -spec start(pool_type(), host_type_or_global(), tag(),
168 pool_opts(), conn_opts(), wpool:strategy(), call_timeout()) ->
169 start_result().
170 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) ->
171 135 case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of
172 {ok, Pid} ->
173 135 ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
174 atom_name = make_pool_name(PoolType, HostType, Tag),
175 strategy = Strategy,
176 call_timeout = CallTimeout}),
177 135 {ok, Pid};
178 {external, Pid} ->
179
:-(
ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
180 atom_name = make_pool_name(PoolType, HostType, Tag)
181 }),
182
:-(
{ok, Pid};
183 Error ->
184
:-(
Error
185 end.
186
187 %% @doc this function starts the worker_pool's pool under a specific supervisor
188 %% in MongooseIM application.
189 %% It's needed for 2 reasons:
190 %% 1. We want to have a full control of all the pools and its restarts
191 %% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor,
192 %% if not, there is no way to stop the pool.
193 -spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) ->
194 {ok, pid()} | {error, term()}.
195 start_sup_pool(PoolType, ProcName, WpoolOpts) ->
196 135 SupName = mongoose_wpool_type_sup:name(PoolType),
197 135 ChildSpec = #{id => ProcName,
198 start => {wpool, start_pool, [ProcName, WpoolOpts]},
199 restart => temporary,
200 type => supervisor,
201 modules => [wpool]},
202 135 supervisor:start_child(SupName, ChildSpec).
203
204 -spec stop() -> term().
205 stop() ->
206 80 [stop_pool(PoolName) || PoolName <- get_pools()].
207
208 -spec stop_pool(pool_name()) -> stop_result().
209 stop_pool({PoolType, HostType, Tag}) ->
210 76 stop(PoolType, HostType, Tag).
211
212 -spec stop(pool_type()) -> stop_result().
213 stop(PoolType) ->
214
:-(
stop(PoolType, global).
215
216 -spec stop(pool_type(), host_type_or_global()) -> stop_result().
217 stop(PoolType, HostType) ->
218
:-(
stop(PoolType, HostType, default).
219
220 -spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result().
221 stop(PoolType, HostType, Tag) ->
222 131 try
223 131 ets:delete(?MODULE, {PoolType, HostType, Tag}),
224 131 call_callback(stop, PoolType, [HostType, Tag]),
225 131 mongoose_wpool_mgr:stop(PoolType, HostType, Tag)
226 catch
227 C:R:S ->
228
:-(
?LOG_ERROR(#{what => pool_stop_failed,
229 pool_type => PoolType, server => HostType, pool_tag => Tag,
230 pool_key => {PoolType, HostType, Tag},
231
:-(
class => C, reason => R, stacktrace => S})
232 end.
233
234 -spec is_configured(pool_type()) -> boolean().
235 is_configured(PoolType) ->
236 16 Pools = mongoose_config:get_opt(outgoing_pools, []),
237 16 lists:keymember(PoolType, 1, Pools).
238
239 -spec get_worker(pool_type()) -> worker_result().
240 get_worker(PoolType) ->
241 32 get_worker(PoolType, global).
242
243 -spec get_worker(pool_type(), host_type_or_global()) -> worker_result().
244 get_worker(PoolType, HostType) ->
245 32 get_worker(PoolType, HostType, default).
246
247 -spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result().
248 get_worker(PoolType, HostType, Tag) ->
249 2455 case get_pool(PoolType, HostType, Tag) of
250 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
251 2423 Worker = wpool_pool:Strategy(make_pool_name(Pool)),
252 2423 {ok, whereis(Worker)};
253 Err ->
254 32 Err
255 end.
256
257 call(PoolType, Request) ->
258
:-(
call(PoolType, global, Request).
259
260 call(PoolType, HostType, Request) ->
261 224 call(PoolType, HostType, default, Request).
262
263 call(PoolType, HostType, Tag, Request) ->
264 406 case get_pool(PoolType, HostType, Tag) of
265 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
266 182 wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout);
267 Err ->
268 224 Err
269 end.
270
271 call(PoolType, HostType, Tag, HashKey, Request) ->
272
:-(
case get_pool(PoolType, HostType, Tag) of
273 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
274
:-(
wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
275 Err ->
276
:-(
Err
277 end.
278
279 cast(PoolType, Request) ->
280
:-(
cast(PoolType, global, Request).
281
282 cast(PoolType, HostType, Request) ->
283
:-(
cast(PoolType, HostType, default, Request).
284
285 cast(PoolType, HostType, Tag, Request) ->
286 168 case get_pool(PoolType, HostType, Tag) of
287 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
288 166 wpool:cast(make_pool_name(Pool), Request, Strategy);
289 Err ->
290 2 Err
291 end.
292
293 cast(PoolType, HostType, Tag, HashKey, Request) ->
294
:-(
case get_pool(PoolType, HostType, Tag) of
295 {ok, #mongoose_wpool{} = Pool} ->
296
:-(
wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey});
297 Err ->
298
:-(
Err
299 end.
300
301 -spec get_pool_settings(pool_type(), host_type_or_global(), tag()) ->
302 #mongoose_wpool{} | undefined.
303 get_pool_settings(PoolType, HostType, Tag) ->
304 213 case get_pool(PoolType, HostType, Tag) of
305
:-(
{ok, PoolRec} -> PoolRec;
306 213 {error, {pool_not_started, _}} -> undefined
307 end.
308
309 -spec get_pools() -> [pool_name()].
310 get_pools() ->
311 81 lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)).
312
313 stats(PoolType, HostType, Tag) ->
314
:-(
wpool:stats(make_pool_name(PoolType, HostType, Tag)).
315
316 -spec make_pool_name(pool_type(), scope(), tag()) -> proc_name().
317 make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) ->
318 302 make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag);
319 make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) ->
320 407 binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$,
321 HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8).
322
323 make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) ->
324
:-(
make_pool_name(PoolType, HostType, Tag);
325 make_pool_name(#mongoose_wpool{atom_name = AtomName}) ->
326 2771 AtomName.
327
328 -spec call_start_callback(pool_type(), list()) -> term().
329 call_start_callback(PoolType, Args) ->
330 135 call_callback(start, PoolType, Args).
331
332 -spec call_callback(callback_fun(), pool_type(), list()) -> term().
333 call_callback(CallbackFun, PoolType, Args) ->
334 369 try
335 369 CallbackModule = make_callback_module_name(PoolType),
336 369 erlang:apply(CallbackModule, CallbackFun, Args)
337 catch E:R:ST ->
338
:-(
?LOG_ERROR(#{what => pool_callback_failed,
339 pool_type => PoolType, callback_function => CallbackFun,
340
:-(
error => E, reason => R, stacktrace => ST}),
341
:-(
{error, {callback_crashed, CallbackFun, E, R, ST}}
342 end.
343
344 -spec make_callback_module_name(pool_type()) -> module().
345 make_callback_module_name(PoolType) ->
346 639 Name = "mongoose_wpool_" ++ atom_to_list(PoolType),
347 639 list_to_atom(Name).
348
349 -spec default_opts(pool_type()) -> conn_opts().
350 default_opts(PoolType) ->
351 135 Mod = make_callback_module_name(PoolType),
352 135 case erlang:function_exported(Mod, default_opts, 0) of
353
:-(
true -> Mod:default_opts();
354 135 false -> []
355 end.
356
357 -spec expand_pools([pool_tuple_in()], [mongooseim:host_type()]) -> [pool_tuple()].
358 expand_pools(Pools, HostTypes) ->
359 %% First we select only pools for a specific vhost
360 103 HostSpecific = [{PoolType, HostType, Tag} ||
361 103 {PoolType, HostType, Tag, _, _} <- Pools,
362 103 is_binary(HostType)],
363 %% Then we expand all pools with `host` as HostType parameter but using host specific configs
364 %% if they were provided
365 103 F = fun({PoolType, host, Tag, WpoolOpts, ConnOpts}) ->
366 3 [{PoolType, HostType, Tag, WpoolOpts, ConnOpts} ||
367 3 HostType <- HostTypes,
368 3 not lists:member({PoolType, HostType, Tag}, HostSpecific)];
369 100 (Other) -> [Other]
370 end,
371 103 lists:flatmap(F, Pools).
372
373 -spec get_unique_types([pool_tuple_in()]) -> [pool_type()].
374 get_unique_types(Pools) ->
375 103 lists:usort([PoolType || {PoolType, _, _, _, _} <- Pools]).
376
377 -spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result().
378 get_pool(PoolType, HostType, Tag) ->
379 3548 case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of
380 306 [] when is_binary(HostType) -> get_pool(PoolType, global, Tag);
381 471 [] -> {error, {pool_not_started, {PoolType, HostType, Tag}}};
382 2771 [Pool] -> {ok, Pool}
383 end.
Line Hits Source