./ct_report/coverage/mongoose_wpool.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @doc
3 %%% This is here because there are pool options which have to be given when calling
4 %%% the pool (selection strategy, timeout), while we want to set it once for the pool and not
5 %%% worry about them later, hence additional storage.
6 %%% @end
7 %%%-------------------------------------------------------------------
8 -module(mongoose_wpool).
9 -author("bartlomiej.gorny@erlang-solutions.com").
10 -include("mongoose.hrl").
11
12 -record(mongoose_wpool, {
13 name :: pool_name(),
14 atom_name :: wpool:name(),
15 strategy :: wpool:strategy(),
16 call_timeout :: pos_integer()
17 }).
18
19 %% API
20 -export([ensure_started/0,
21 start/2, start/3, start/4, start/5,
22 stop/0, stop/1, stop/2, stop/3,
23 get_worker/2, get_worker/3,
24 call/2, call/3, call/4, call/5,
25 send_request/2, send_request/3, send_request/4, send_request/5,
26 cast/2, cast/3, cast/4, cast/5,
27 get_pool_settings/3, get_pools/0, stats/3]).
28
29 -export([start_sup_pool/3]).
30 -export([start_configured_pools/0, start_configured_pools/1, start_configured_pools/2]).
31 -export([is_configured/1]).
32 -export([make_pool_name/3]).
33 -export([call_start_callback/2]).
34 -export([verify_unique_pools/1]).
35
36 %% Mostly for tests
37 -export([expand_pools/2]).
38
39 -ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2,
40 send_request/2, send_request/3, send_request/4, send_request/5,
41 is_configured/2, is_configured/1, is_configured/1, start/2, start/3,
42 start/5, start_configured_pools/1, start_configured_pools/2, stats/3,
43 stop/1, stop/2]).
44
45 -type pool_type() :: redis | http | rdbms | cassandra | elastic | generic | rabbit | ldap.
46
47 %% Config scope
48 -type scope() :: global | host_type | mongooseim:host_type().
49 -type host_type_or_global() :: mongooseim:host_type_or_global().
50 -type tag() :: atom().
51
52 %% Name of a process
53 -type proc_name() :: atom().
54
55 %% ID of a pool. Used as a key for an ETS table
56 -type pool_name() :: {PoolType :: pool_type(),
57 HostType :: host_type_or_global(),
58 Tag :: tag()}.
59
60 -type pool_opts_in() :: map().
61 -type pool_opts() :: [wpool:option()].
62 -type conn_opts() :: map().
63
64 -type pool_map_in() :: #{type := pool_type(),
65 scope := scope(),
66 tag := tag(),
67 opts := pool_opts_in(),
68 conn_opts := conn_opts()}.
69 %% Pool map with expanded HostType argument instead of scope
70 -type pool_map() :: #{type := pool_type(),
71 host_type := host_type_or_global(),
72 tag := tag(),
73 opts := pool_opts(),
74 conn_opts := conn_opts()}.
75 -type pool_error() :: {pool_not_started, term()}.
76 -type worker_result() :: {ok, pid()} | {error, pool_error()}.
77 -type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_error()}.
78 -type start_result() :: {ok, pid()} | {error, term()}.
79 -type stop_result() :: ok | term().
80
81 -export_type([pool_type/0, tag/0, scope/0, proc_name/0, pool_name/0, pool_opts/0, conn_opts/0]).
82
83 -type callback_fun() :: init | start | is_supported_strategy | stop.
84
85 -callback init() -> ok | {error, term()}.
86 -callback start(host_type_or_global(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) ->
87 {ok, {pid(), proplists:proplist()}} | {ok, pid()} | {error, Reason :: term()}.
88 -callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean().
89 -callback stop(host_type_or_global(), tag()) -> ok.
90
91 -optional_callbacks([is_supported_strategy/1]).
92
93 %% @doc Verify that all outgoing_pools have unique type, tag and scope
94 -spec verify_unique_pools([pool_map_in()]) -> [pool_map()].
95 verify_unique_pools(Pools) ->
96 98 PoolKeys = lists:map(fun pool_id/1, Pools),
97 98 case {lists:usort(PoolKeys), lists:sort(PoolKeys)} of
98 98 {SortedPool, SortedPool} -> Pools;
99 {Uniques, ContainsDuplicates} ->
100
:-(
Dups = lists:subtract(ContainsDuplicates, Uniques),
101
:-(
error(#{what => duplicate_pools, duplicates => Dups,
102 text => <<"Some pools are duplicated">>})
103 end.
104
105
106 %% @doc Create a unique ID based on the configuration identifiers
107 -spec pool_id(pool_map_in()) -> {scope(), pool_type(), tag()}.
108 pool_id(#{scope := HostType, type := Type, tag := Tag}) when is_binary(HostType) ->
109
:-(
{host_type, Type, Tag};
110 pool_id(#{scope := Scope, type := Type, tag := Tag}) ->
111 98 {Scope, Type, Tag}.
112
113 ensure_started() ->
114 99 wpool:start(),
115 99 case whereis(mongoose_wpool_sup) of
116 undefined ->
117
:-(
mongoose_wpool_sup:start_link();
118 _ ->
119 99 ok
120 end,
121 99 ejabberd_sup:create_ets_table(
122 ?MODULE,
123 [named_table, public,
124 {read_concurrency, true},
125 {keypos, #mongoose_wpool.name},
126 {heir, whereis(mongoose_wpool_sup), undefined}]).
127
128 start_configured_pools() ->
129 98 Pools = mongoose_config:get_opt(outgoing_pools),
130 98 start_configured_pools(Pools).
131
132 start_configured_pools(PoolsIn) ->
133 104 start_configured_pools(PoolsIn, ?ALL_HOST_TYPES).
134
135 start_configured_pools(PoolsIn, HostTypes) ->
136 105 [call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)],
137 99 Pools = expand_pools(PoolsIn, HostTypes),
138 99 [start(Pool) || Pool <- Pools].
139
140 -spec start(pool_map()) -> start_result().
141 start(#{type := PoolType, host_type := HostType, tag := Tag,
142 opts := PoolOpts, conn_opts := ConnOpts}) ->
143 99 start(PoolType, HostType, Tag, PoolOpts, ConnOpts).
144
145 -spec start(pool_type(), pool_opts()) -> start_result().
146 start(PoolType, PoolOpts) ->
147
:-(
start(PoolType, global, PoolOpts).
148
149 -spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result().
150 start(PoolType, HostType, PoolOpts) ->
151
:-(
start(PoolType, HostType, default, PoolOpts).
152
153 -spec start(pool_type(), host_type_or_global(), tag(),
154 pool_opts()) -> start_result().
155 start(PoolType, HostType, Tag, PoolOpts) ->
156 69 start(PoolType, HostType, Tag, PoolOpts, #{}).
157
158 -spec start(pool_type(), host_type_or_global(), tag(),
159 pool_opts(), conn_opts()) -> start_result().
160 start(PoolType, HostType, Tag, PoolOpts, ConnOpts) ->
161 168 {Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]),
162 168 Opts = lists:append(Opts0),
163 168 Strategy = proplists:get_value(strategy, Opts, best_worker),
164 168 CallTimeout = proplists:get_value(call_timeout, Opts, 5000),
165 %% If a callback doesn't explicitly blacklist a strategy, let's proceed.
166 168 CallbackModule = make_callback_module_name(PoolType),
167 168 case catch CallbackModule:is_supported_strategy(Strategy) of
168 false ->
169
:-(
error({strategy_not_supported, PoolType, HostType, Tag, Strategy});
170 _ ->
171 168 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout)
172 end.
173
174 -spec start(pool_type(), host_type_or_global(), tag(),
175 pool_opts(), conn_opts(), wpool:strategy(), pos_integer()) ->
176 start_result().
177 start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) ->
178 168 case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of
179 {ok, Pid} ->
180 168 ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag},
181 atom_name = make_pool_name(PoolType, HostType, Tag),
182 strategy = Strategy,
183 call_timeout = CallTimeout}),
184 168 {ok, Pid};
185 Error ->
186
:-(
Error
187 end.
188
189 %% @doc this function starts the worker_pool's pool under a specific supervisor
190 %% in MongooseIM application.
191 %% It's needed for 2 reasons:
192 %% 1. We want to have a full control of all the pools and its restarts
193 %% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor,
194 %% if not, there is no way to stop the pool.
195 -spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) ->
196 {ok, pid()} | {error, term()}.
197 start_sup_pool(PoolType, ProcName, WpoolOpts) ->
198 168 SupName = mongoose_wpool_type_sup:name(PoolType),
199 168 ChildSpec = #{id => ProcName,
200 start => {wpool, start_pool, [ProcName, WpoolOpts]},
201 restart => temporary,
202 type => supervisor,
203 modules => [wpool]},
204 168 supervisor:start_child(SupName, ChildSpec).
205
206 -spec stop() -> term().
207 stop() ->
208 98 [stop_pool(PoolName) || PoolName <- get_pools()].
209
210 -spec stop_pool(pool_name()) -> stop_result().
211 stop_pool({PoolType, HostType, Tag}) ->
212 95 stop(PoolType, HostType, Tag).
213
214 -spec stop(pool_type()) -> stop_result().
215 stop(PoolType) ->
216
:-(
stop(PoolType, global).
217
218 -spec stop(pool_type(), host_type_or_global()) -> stop_result().
219 stop(PoolType, HostType) ->
220
:-(
stop(PoolType, HostType, default).
221
222 -spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result().
223 stop(PoolType, HostType, Tag) ->
224 165 try
225 165 ets:delete(?MODULE, {PoolType, HostType, Tag}),
226 165 call_callback(stop, PoolType, [HostType, Tag]),
227 165 mongoose_wpool_mgr:stop(PoolType, HostType, Tag)
228 catch
229 C:R:S ->
230
:-(
?LOG_ERROR(#{what => pool_stop_failed,
231 pool_type => PoolType, server => HostType, pool_tag => Tag,
232 pool_key => {PoolType, HostType, Tag},
233
:-(
class => C, reason => R, stacktrace => S})
234 end.
235
236 -spec is_configured(pool_type()) -> boolean().
237 is_configured(PoolType) ->
238 35 Pools = mongoose_config:get_opt(outgoing_pools),
239 35 lists:any(fun(#{type := Type}) -> Type =:= PoolType end, Pools).
240
241 -spec get_worker(pool_type(), host_type_or_global()) -> worker_result().
242 get_worker(PoolType, HostType) ->
243
:-(
get_worker(PoolType, HostType, default).
244
245 -spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result().
246 get_worker(PoolType, HostType, Tag) ->
247 1974 case get_pool(PoolType, HostType, Tag) of
248 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
249 1974 Worker = wpool_pool:Strategy(make_pool_name(Pool)),
250 1974 {ok, whereis(Worker)};
251 Err ->
252
:-(
Err
253 end.
254
255 call(PoolType, Request) ->
256
:-(
call(PoolType, global, Request).
257
258 call(PoolType, HostType, Request) ->
259
:-(
call(PoolType, HostType, default, Request).
260
261 call(PoolType, HostType, Tag, Request) ->
262 472 case get_pool(PoolType, HostType, Tag) of
263 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
264 136 wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout);
265 Err ->
266 336 Err
267 end.
268
269 call(PoolType, HostType, Tag, HashKey, Request) ->
270
:-(
case get_pool(PoolType, HostType, Tag) of
271 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
272
:-(
wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
273 Err ->
274
:-(
Err
275 end.
276
277 send_request(PoolType, Request) ->
278
:-(
send_request(PoolType, global, Request).
279
280 send_request(PoolType, HostType, Request) ->
281
:-(
send_request(PoolType, HostType, default, Request).
282
283 send_request(PoolType, HostType, Tag, Request) ->
284
:-(
case get_pool(PoolType, HostType, Tag) of
285 {ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} ->
286
:-(
wpool_send_request(make_pool_name(Pool), Request, Strategy, CallTimeout);
287 Err ->
288
:-(
Err
289 end.
290
291 send_request(PoolType, HostType, Tag, HashKey, Request) ->
292
:-(
case get_pool(PoolType, HostType, Tag) of
293 {ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} ->
294
:-(
wpool_send_request(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout);
295 Err ->
296
:-(
Err
297 end.
298
299 wpool_send_request(PoolName, Request, Strategy, Timeout) ->
300
:-(
wpool:send_request(PoolName, Request, Strategy, Timeout).
301
302 cast(PoolType, Request) ->
303
:-(
cast(PoolType, global, Request).
304
305 cast(PoolType, HostType, Request) ->
306
:-(
cast(PoolType, HostType, default, Request).
307
308 cast(PoolType, HostType, Tag, Request) ->
309 98 case get_pool(PoolType, HostType, Tag) of
310 {ok, #mongoose_wpool{strategy = Strategy} = Pool} ->
311 96 wpool:cast(make_pool_name(Pool), Request, Strategy);
312 Err ->
313 2 Err
314 end.
315
316 cast(PoolType, HostType, Tag, HashKey, Request) ->
317 377 case get_pool(PoolType, HostType, Tag) of
318 {ok, #mongoose_wpool{} = Pool} ->
319 377 wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey});
320 Err ->
321
:-(
Err
322 end.
323
324 -spec get_pool_settings(pool_type(), host_type_or_global(), tag()) ->
325 #mongoose_wpool{} | undefined.
326 get_pool_settings(PoolType, HostType, Tag) ->
327 240 case get_pool(PoolType, HostType, Tag) of
328
:-(
{ok, PoolRec} -> PoolRec;
329 240 {error, {pool_not_started, _}} -> undefined
330 end.
331
332 -spec get_pools() -> [pool_name()].
333 get_pools() ->
334 98 lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)).
335
336 stats(PoolType, HostType, Tag) ->
337
:-(
wpool:stats(make_pool_name(PoolType, HostType, Tag)).
338
339 -spec make_pool_name(pool_type(), scope(), tag()) -> proc_name().
340 make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) ->
341 300 make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag);
342 make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) ->
343 507 binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$,
344 HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8).
345
346 make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) ->
347
:-(
make_pool_name(PoolType, HostType, Tag);
348 make_pool_name(#mongoose_wpool{atom_name = AtomName}) ->
349 2583 AtomName.
350
351 -spec call_start_callback(pool_type(), list()) -> term().
352 call_start_callback(PoolType, Args) ->
353 168 call_callback(start, PoolType, Args).
354
355 -spec call_callback(callback_fun(), pool_type(), list()) -> term().
356 call_callback(CallbackFun, PoolType, Args) ->
357 432 try
358 432 CallbackModule = make_callback_module_name(PoolType),
359 432 erlang:apply(CallbackModule, CallbackFun, Args)
360 catch E:R:ST ->
361
:-(
?LOG_ERROR(#{what => pool_callback_failed,
362 pool_type => PoolType, callback_function => CallbackFun,
363
:-(
error => E, reason => R, stacktrace => ST}),
364
:-(
{error, {callback_crashed, CallbackFun, E, R, ST}}
365 end.
366
367 -spec make_callback_module_name(pool_type()) -> module().
368 make_callback_module_name(PoolType) ->
369 600 Name = "mongoose_wpool_" ++ atom_to_list(PoolType),
370 600 list_to_atom(Name).
371
372 -spec expand_pools([pool_map_in()], [mongooseim:host_type()]) -> [pool_map()].
373 expand_pools(Pools, HostTypes) ->
374 %% First we select only pools for a specific vhost
375 99 HostSpecific = [{Type, HT, Tag} || #{type := Type, scope := HT, tag := Tag} <- Pools, is_binary(HT)],
376 %% Then we expand all pools with `host_type` as HostType parameter but using host_type specific configs
377 %% if they were provided
378 99 F = fun(M = #{type := PoolType, scope := host_type, tag := Tag}) ->
379
:-(
[M#{scope => HostType} || HostType <- HostTypes,
380
:-(
not lists:member({PoolType, HostType, Tag}, HostSpecific)];
381 99 (Other) -> [Other]
382 end,
383 99 Pools1 = lists:flatmap(F, Pools),
384 99 lists:map(fun prepare_pool_map/1, Pools1).
385
386 -spec prepare_pool_map(pool_map_in()) -> pool_map().
387 prepare_pool_map(Pool = #{scope := HT, opts := Opts}) ->
388 %% Rename "scope" field to "host_type" and change wpool opts to a KV list
389 99 Pool1 = maps:remove(scope, Pool),
390 99 Pool1#{host_type => HT, opts => maps:to_list(Opts)}.
391
392 -spec get_unique_types([pool_map_in()]) -> [pool_type()].
393 get_unique_types(Pools) ->
394 105 lists:usort([maps:get(type, Pool) || Pool <- Pools]).
395
396 -spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result().
397 get_pool(PoolType, HostType, Tag) ->
398 3567 case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of
399 406 [] when is_binary(HostType) -> get_pool(PoolType, global, Tag);
400 578 [] -> {error, {pool_not_started, {PoolType, HostType, Tag}}};
401 2583 [Pool] -> {ok, Pool}
402 end.
Line Hits Source