1 |
|
%%%------------------------------------------------------------------- |
2 |
|
%%% @doc |
3 |
|
%%% This is here because there are pool options which have to be given when calling |
4 |
|
%%% the pool (selection strategy, timeout), while we want to set it once for the pool and not |
5 |
|
%%% worry about them later, hence additional storage. |
6 |
|
%%% @end |
7 |
|
%%%------------------------------------------------------------------- |
8 |
|
-module(mongoose_wpool). |
9 |
|
-author("bartlomiej.gorny@erlang-solutions.com"). |
10 |
|
-include("mongoose.hrl"). |
11 |
|
|
12 |
|
-record(mongoose_wpool, { |
13 |
|
name :: pool_name(), |
14 |
|
atom_name :: wpool:name(), |
15 |
|
strategy :: wpool:strategy(), |
16 |
|
call_timeout :: pos_integer() |
17 |
|
}). |
18 |
|
|
19 |
|
%% API |
20 |
|
-export([ensure_started/0, |
21 |
|
start/2, start/3, start/4, start/5, |
22 |
|
stop/0, stop/1, stop/2, stop/3, |
23 |
|
get_worker/2, get_worker/3, |
24 |
|
call/2, call/3, call/4, call/5, |
25 |
|
send_request/2, send_request/3, send_request/4, send_request/5, |
26 |
|
cast/2, cast/3, cast/4, cast/5, |
27 |
|
get_pool_settings/3, get_pools/0, stats/3]). |
28 |
|
|
29 |
|
-export([start_sup_pool/3]). |
30 |
|
-export([start_configured_pools/0, start_configured_pools/1, start_configured_pools/2]). |
31 |
|
-export([is_configured/1]). |
32 |
|
-export([make_pool_name/3]). |
33 |
|
-export([call_start_callback/2]). |
34 |
|
-export([verify_unique_pools/1]). |
35 |
|
|
36 |
|
%% Mostly for tests |
37 |
|
-export([expand_pools/2]). |
38 |
|
|
39 |
|
-ignore_xref([call/2, cast/2, cast/3, expand_pools/2, get_worker/2, |
40 |
|
send_request/2, send_request/3, send_request/4, send_request/5, |
41 |
|
is_configured/2, is_configured/1, is_configured/1, start/2, start/3, |
42 |
|
start/5, start_configured_pools/1, start_configured_pools/2, stats/3, |
43 |
|
stop/1, stop/2]). |
44 |
|
|
45 |
|
-type pool_type() :: redis | http | rdbms | cassandra | elastic | generic | rabbit | ldap. |
46 |
|
|
47 |
|
%% Config scope |
48 |
|
-type scope() :: global | host_type | mongooseim:host_type(). |
49 |
|
-type host_type_or_global() :: mongooseim:host_type_or_global(). |
50 |
|
-type tag() :: atom(). |
51 |
|
|
52 |
|
%% Name of a process |
53 |
|
-type proc_name() :: atom(). |
54 |
|
|
55 |
|
%% ID of a pool. Used as a key for an ETS table |
56 |
|
-type pool_name() :: {PoolType :: pool_type(), |
57 |
|
HostType :: host_type_or_global(), |
58 |
|
Tag :: tag()}. |
59 |
|
|
60 |
|
-type pool_opts_in() :: map(). |
61 |
|
-type pool_opts() :: [wpool:option()]. |
62 |
|
-type conn_opts() :: map(). |
63 |
|
|
64 |
|
-type pool_map_in() :: #{type := pool_type(), |
65 |
|
scope := scope(), |
66 |
|
tag := tag(), |
67 |
|
opts := pool_opts_in(), |
68 |
|
conn_opts := conn_opts()}. |
69 |
|
%% Pool map with expanded HostType argument instead of scope |
70 |
|
-type pool_map() :: #{type := pool_type(), |
71 |
|
host_type := host_type_or_global(), |
72 |
|
tag := tag(), |
73 |
|
opts := pool_opts(), |
74 |
|
conn_opts := conn_opts()}. |
75 |
|
-type pool_error() :: {pool_not_started, term()}. |
76 |
|
-type worker_result() :: {ok, pid()} | {error, pool_error()}. |
77 |
|
-type pool_record_result() :: {ok, #mongoose_wpool{}} | {error, pool_error()}. |
78 |
|
-type start_result() :: {ok, pid()} | {error, term()}. |
79 |
|
-type stop_result() :: ok | term(). |
80 |
|
|
81 |
|
-export_type([pool_type/0, tag/0, scope/0, proc_name/0, pool_name/0, pool_opts/0, conn_opts/0]). |
82 |
|
|
83 |
|
-type callback_fun() :: init | start | is_supported_strategy | stop. |
84 |
|
|
85 |
|
-callback init() -> ok | {error, term()}. |
86 |
|
-callback start(host_type_or_global(), tag(), WPoolOpts :: pool_opts(), ConnOpts :: conn_opts()) -> |
87 |
|
{ok, {pid(), proplists:proplist()}} | {ok, pid()} | {error, Reason :: term()}. |
88 |
|
-callback is_supported_strategy(Strategy :: wpool:strategy()) -> boolean(). |
89 |
|
-callback stop(host_type_or_global(), tag()) -> ok. |
90 |
|
|
91 |
|
-optional_callbacks([is_supported_strategy/1]). |
92 |
|
|
93 |
|
%% @doc Verify that all outgoing_pools have unique type, tag and scope |
94 |
|
-spec verify_unique_pools([pool_map_in()]) -> [pool_map()]. |
95 |
|
verify_unique_pools(Pools) -> |
96 |
98 |
PoolKeys = lists:map(fun pool_id/1, Pools), |
97 |
98 |
case {lists:usort(PoolKeys), lists:sort(PoolKeys)} of |
98 |
98 |
{SortedPool, SortedPool} -> Pools; |
99 |
|
{Uniques, ContainsDuplicates} -> |
100 |
:-( |
Dups = lists:subtract(ContainsDuplicates, Uniques), |
101 |
:-( |
error(#{what => duplicate_pools, duplicates => Dups, |
102 |
|
text => <<"Some pools are duplicated">>}) |
103 |
|
end. |
104 |
|
|
105 |
|
|
106 |
|
%% @doc Create a unique ID based on the configuration identifiers |
107 |
|
-spec pool_id(pool_map_in()) -> {scope(), pool_type(), tag()}. |
108 |
|
pool_id(#{scope := HostType, type := Type, tag := Tag}) when is_binary(HostType) -> |
109 |
:-( |
{host_type, Type, Tag}; |
110 |
|
pool_id(#{scope := Scope, type := Type, tag := Tag}) -> |
111 |
98 |
{Scope, Type, Tag}. |
112 |
|
|
113 |
|
ensure_started() -> |
114 |
99 |
wpool:start(), |
115 |
99 |
case whereis(mongoose_wpool_sup) of |
116 |
|
undefined -> |
117 |
:-( |
mongoose_wpool_sup:start_link(); |
118 |
|
_ -> |
119 |
99 |
ok |
120 |
|
end, |
121 |
99 |
ejabberd_sup:create_ets_table( |
122 |
|
?MODULE, |
123 |
|
[named_table, public, |
124 |
|
{read_concurrency, true}, |
125 |
|
{keypos, #mongoose_wpool.name}, |
126 |
|
{heir, whereis(mongoose_wpool_sup), undefined}]). |
127 |
|
|
128 |
|
start_configured_pools() -> |
129 |
98 |
Pools = mongoose_config:get_opt(outgoing_pools), |
130 |
98 |
start_configured_pools(Pools). |
131 |
|
|
132 |
|
start_configured_pools(PoolsIn) -> |
133 |
104 |
start_configured_pools(PoolsIn, ?ALL_HOST_TYPES). |
134 |
|
|
135 |
|
start_configured_pools(PoolsIn, HostTypes) -> |
136 |
105 |
[call_callback(init, PoolType, []) || PoolType <- get_unique_types(PoolsIn)], |
137 |
99 |
Pools = expand_pools(PoolsIn, HostTypes), |
138 |
99 |
[start(Pool) || Pool <- Pools]. |
139 |
|
|
140 |
|
-spec start(pool_map()) -> start_result(). |
141 |
|
start(#{type := PoolType, host_type := HostType, tag := Tag, |
142 |
|
opts := PoolOpts, conn_opts := ConnOpts}) -> |
143 |
99 |
start(PoolType, HostType, Tag, PoolOpts, ConnOpts). |
144 |
|
|
145 |
|
-spec start(pool_type(), pool_opts()) -> start_result(). |
146 |
|
start(PoolType, PoolOpts) -> |
147 |
:-( |
start(PoolType, global, PoolOpts). |
148 |
|
|
149 |
|
-spec start(pool_type(), host_type_or_global(), pool_opts()) -> start_result(). |
150 |
|
start(PoolType, HostType, PoolOpts) -> |
151 |
:-( |
start(PoolType, HostType, default, PoolOpts). |
152 |
|
|
153 |
|
-spec start(pool_type(), host_type_or_global(), tag(), |
154 |
|
pool_opts()) -> start_result(). |
155 |
|
start(PoolType, HostType, Tag, PoolOpts) -> |
156 |
69 |
start(PoolType, HostType, Tag, PoolOpts, #{}). |
157 |
|
|
158 |
|
-spec start(pool_type(), host_type_or_global(), tag(), |
159 |
|
pool_opts(), conn_opts()) -> start_result(). |
160 |
|
start(PoolType, HostType, Tag, PoolOpts, ConnOpts) -> |
161 |
168 |
{Opts0, WpoolOptsIn} = proplists:split(PoolOpts, [strategy, call_timeout]), |
162 |
168 |
Opts = lists:append(Opts0), |
163 |
168 |
Strategy = proplists:get_value(strategy, Opts, best_worker), |
164 |
168 |
CallTimeout = proplists:get_value(call_timeout, Opts, 5000), |
165 |
|
%% If a callback doesn't explicitly blacklist a strategy, let's proceed. |
166 |
168 |
CallbackModule = make_callback_module_name(PoolType), |
167 |
168 |
case catch CallbackModule:is_supported_strategy(Strategy) of |
168 |
|
false -> |
169 |
:-( |
error({strategy_not_supported, PoolType, HostType, Tag, Strategy}); |
170 |
|
_ -> |
171 |
168 |
start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) |
172 |
|
end. |
173 |
|
|
174 |
|
-spec start(pool_type(), host_type_or_global(), tag(), |
175 |
|
pool_opts(), conn_opts(), wpool:strategy(), pos_integer()) -> |
176 |
|
start_result(). |
177 |
|
start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts, Strategy, CallTimeout) -> |
178 |
168 |
case mongoose_wpool_mgr:start(PoolType, HostType, Tag, WpoolOptsIn, ConnOpts) of |
179 |
|
{ok, Pid} -> |
180 |
168 |
ets:insert(?MODULE, #mongoose_wpool{name = {PoolType, HostType, Tag}, |
181 |
|
atom_name = make_pool_name(PoolType, HostType, Tag), |
182 |
|
strategy = Strategy, |
183 |
|
call_timeout = CallTimeout}), |
184 |
168 |
{ok, Pid}; |
185 |
|
Error -> |
186 |
:-( |
Error |
187 |
|
end. |
188 |
|
|
189 |
|
%% @doc this function starts the worker_pool's pool under a specific supervisor |
190 |
|
%% in MongooseIM application. |
191 |
|
%% It's needed for 2 reasons: |
192 |
|
%% 1. We want to have a full control of all the pools and its restarts |
193 |
|
%% 2. When a pool is started via wpool:start_pool it's supposed be called by a supervisor, |
194 |
|
%% if not, there is no way to stop the pool. |
195 |
|
-spec start_sup_pool(pool_type(), proc_name(), [wpool:option()]) -> |
196 |
|
{ok, pid()} | {error, term()}. |
197 |
|
start_sup_pool(PoolType, ProcName, WpoolOpts) -> |
198 |
168 |
SupName = mongoose_wpool_type_sup:name(PoolType), |
199 |
168 |
ChildSpec = #{id => ProcName, |
200 |
|
start => {wpool, start_pool, [ProcName, WpoolOpts]}, |
201 |
|
restart => temporary, |
202 |
|
type => supervisor, |
203 |
|
modules => [wpool]}, |
204 |
168 |
supervisor:start_child(SupName, ChildSpec). |
205 |
|
|
206 |
|
-spec stop() -> term(). |
207 |
|
stop() -> |
208 |
98 |
[stop_pool(PoolName) || PoolName <- get_pools()]. |
209 |
|
|
210 |
|
-spec stop_pool(pool_name()) -> stop_result(). |
211 |
|
stop_pool({PoolType, HostType, Tag}) -> |
212 |
95 |
stop(PoolType, HostType, Tag). |
213 |
|
|
214 |
|
-spec stop(pool_type()) -> stop_result(). |
215 |
|
stop(PoolType) -> |
216 |
:-( |
stop(PoolType, global). |
217 |
|
|
218 |
|
-spec stop(pool_type(), host_type_or_global()) -> stop_result(). |
219 |
|
stop(PoolType, HostType) -> |
220 |
:-( |
stop(PoolType, HostType, default). |
221 |
|
|
222 |
|
-spec stop(pool_type(), host_type_or_global(), tag()) -> stop_result(). |
223 |
|
stop(PoolType, HostType, Tag) -> |
224 |
165 |
try |
225 |
165 |
ets:delete(?MODULE, {PoolType, HostType, Tag}), |
226 |
165 |
call_callback(stop, PoolType, [HostType, Tag]), |
227 |
165 |
mongoose_wpool_mgr:stop(PoolType, HostType, Tag) |
228 |
|
catch |
229 |
|
C:R:S -> |
230 |
:-( |
?LOG_ERROR(#{what => pool_stop_failed, |
231 |
|
pool_type => PoolType, server => HostType, pool_tag => Tag, |
232 |
|
pool_key => {PoolType, HostType, Tag}, |
233 |
:-( |
class => C, reason => R, stacktrace => S}) |
234 |
|
end. |
235 |
|
|
236 |
|
-spec is_configured(pool_type()) -> boolean(). |
237 |
|
is_configured(PoolType) -> |
238 |
35 |
Pools = mongoose_config:get_opt(outgoing_pools), |
239 |
35 |
lists:any(fun(#{type := Type}) -> Type =:= PoolType end, Pools). |
240 |
|
|
241 |
|
-spec get_worker(pool_type(), host_type_or_global()) -> worker_result(). |
242 |
|
get_worker(PoolType, HostType) -> |
243 |
:-( |
get_worker(PoolType, HostType, default). |
244 |
|
|
245 |
|
-spec get_worker(pool_type(), host_type_or_global(), tag()) -> worker_result(). |
246 |
|
get_worker(PoolType, HostType, Tag) -> |
247 |
1974 |
case get_pool(PoolType, HostType, Tag) of |
248 |
|
{ok, #mongoose_wpool{strategy = Strategy} = Pool} -> |
249 |
1974 |
Worker = wpool_pool:Strategy(make_pool_name(Pool)), |
250 |
1974 |
{ok, whereis(Worker)}; |
251 |
|
Err -> |
252 |
:-( |
Err |
253 |
|
end. |
254 |
|
|
255 |
|
call(PoolType, Request) -> |
256 |
:-( |
call(PoolType, global, Request). |
257 |
|
|
258 |
|
call(PoolType, HostType, Request) -> |
259 |
:-( |
call(PoolType, HostType, default, Request). |
260 |
|
|
261 |
|
call(PoolType, HostType, Tag, Request) -> |
262 |
472 |
case get_pool(PoolType, HostType, Tag) of |
263 |
|
{ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} -> |
264 |
136 |
wpool:call(make_pool_name(Pool), Request, Strategy, CallTimeout); |
265 |
|
Err -> |
266 |
336 |
Err |
267 |
|
end. |
268 |
|
|
269 |
|
call(PoolType, HostType, Tag, HashKey, Request) -> |
270 |
:-( |
case get_pool(PoolType, HostType, Tag) of |
271 |
|
{ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} -> |
272 |
:-( |
wpool:call(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout); |
273 |
|
Err -> |
274 |
:-( |
Err |
275 |
|
end. |
276 |
|
|
277 |
|
send_request(PoolType, Request) -> |
278 |
:-( |
send_request(PoolType, global, Request). |
279 |
|
|
280 |
|
send_request(PoolType, HostType, Request) -> |
281 |
:-( |
send_request(PoolType, HostType, default, Request). |
282 |
|
|
283 |
|
send_request(PoolType, HostType, Tag, Request) -> |
284 |
:-( |
case get_pool(PoolType, HostType, Tag) of |
285 |
|
{ok, #mongoose_wpool{strategy = Strategy, call_timeout = CallTimeout} = Pool} -> |
286 |
:-( |
wpool_send_request(make_pool_name(Pool), Request, Strategy, CallTimeout); |
287 |
|
Err -> |
288 |
:-( |
Err |
289 |
|
end. |
290 |
|
|
291 |
|
send_request(PoolType, HostType, Tag, HashKey, Request) -> |
292 |
:-( |
case get_pool(PoolType, HostType, Tag) of |
293 |
|
{ok, #mongoose_wpool{call_timeout = CallTimeout} = Pool} -> |
294 |
:-( |
wpool_send_request(make_pool_name(Pool), Request, {hash_worker, HashKey}, CallTimeout); |
295 |
|
Err -> |
296 |
:-( |
Err |
297 |
|
end. |
298 |
|
|
299 |
|
wpool_send_request(PoolName, Request, Strategy, Timeout) -> |
300 |
:-( |
wpool:send_request(PoolName, Request, Strategy, Timeout). |
301 |
|
|
302 |
|
cast(PoolType, Request) -> |
303 |
:-( |
cast(PoolType, global, Request). |
304 |
|
|
305 |
|
cast(PoolType, HostType, Request) -> |
306 |
:-( |
cast(PoolType, HostType, default, Request). |
307 |
|
|
308 |
|
cast(PoolType, HostType, Tag, Request) -> |
309 |
98 |
case get_pool(PoolType, HostType, Tag) of |
310 |
|
{ok, #mongoose_wpool{strategy = Strategy} = Pool} -> |
311 |
96 |
wpool:cast(make_pool_name(Pool), Request, Strategy); |
312 |
|
Err -> |
313 |
2 |
Err |
314 |
|
end. |
315 |
|
|
316 |
|
cast(PoolType, HostType, Tag, HashKey, Request) -> |
317 |
377 |
case get_pool(PoolType, HostType, Tag) of |
318 |
|
{ok, #mongoose_wpool{} = Pool} -> |
319 |
377 |
wpool:cast(make_pool_name(Pool), Request, {hash_worker, HashKey}); |
320 |
|
Err -> |
321 |
:-( |
Err |
322 |
|
end. |
323 |
|
|
324 |
|
-spec get_pool_settings(pool_type(), host_type_or_global(), tag()) -> |
325 |
|
#mongoose_wpool{} | undefined. |
326 |
|
get_pool_settings(PoolType, HostType, Tag) -> |
327 |
240 |
case get_pool(PoolType, HostType, Tag) of |
328 |
:-( |
{ok, PoolRec} -> PoolRec; |
329 |
240 |
{error, {pool_not_started, _}} -> undefined |
330 |
|
end. |
331 |
|
|
332 |
|
-spec get_pools() -> [pool_name()]. |
333 |
|
get_pools() -> |
334 |
98 |
lists:map(fun(#mongoose_wpool{name = Name}) -> Name end, ets:tab2list(?MODULE)). |
335 |
|
|
336 |
|
stats(PoolType, HostType, Tag) -> |
337 |
:-( |
wpool:stats(make_pool_name(PoolType, HostType, Tag)). |
338 |
|
|
339 |
|
-spec make_pool_name(pool_type(), scope(), tag()) -> proc_name(). |
340 |
|
make_pool_name(PoolType, HostType, Tag) when is_atom(HostType) -> |
341 |
300 |
make_pool_name(PoolType, atom_to_binary(HostType, utf8), Tag); |
342 |
|
make_pool_name(PoolType, HostType, Tag) when is_binary(HostType) -> |
343 |
507 |
binary_to_atom(<<"mongoose_wpool$", (atom_to_binary(PoolType, utf8))/binary, $$, |
344 |
|
HostType/binary, $$, (atom_to_binary(Tag, utf8))/binary>>, utf8). |
345 |
|
|
346 |
|
make_pool_name(#mongoose_wpool{atom_name = undefined, name = {PoolType, HostType, Tag}}) -> |
347 |
:-( |
make_pool_name(PoolType, HostType, Tag); |
348 |
|
make_pool_name(#mongoose_wpool{atom_name = AtomName}) -> |
349 |
2583 |
AtomName. |
350 |
|
|
351 |
|
-spec call_start_callback(pool_type(), list()) -> term(). |
352 |
|
call_start_callback(PoolType, Args) -> |
353 |
168 |
call_callback(start, PoolType, Args). |
354 |
|
|
355 |
|
-spec call_callback(callback_fun(), pool_type(), list()) -> term(). |
356 |
|
call_callback(CallbackFun, PoolType, Args) -> |
357 |
432 |
try |
358 |
432 |
CallbackModule = make_callback_module_name(PoolType), |
359 |
432 |
erlang:apply(CallbackModule, CallbackFun, Args) |
360 |
|
catch E:R:ST -> |
361 |
:-( |
?LOG_ERROR(#{what => pool_callback_failed, |
362 |
|
pool_type => PoolType, callback_function => CallbackFun, |
363 |
:-( |
error => E, reason => R, stacktrace => ST}), |
364 |
:-( |
{error, {callback_crashed, CallbackFun, E, R, ST}} |
365 |
|
end. |
366 |
|
|
367 |
|
-spec make_callback_module_name(pool_type()) -> module(). |
368 |
|
make_callback_module_name(PoolType) -> |
369 |
600 |
Name = "mongoose_wpool_" ++ atom_to_list(PoolType), |
370 |
600 |
list_to_atom(Name). |
371 |
|
|
372 |
|
-spec expand_pools([pool_map_in()], [mongooseim:host_type()]) -> [pool_map()]. |
373 |
|
expand_pools(Pools, HostTypes) -> |
374 |
|
%% First we select only pools for a specific vhost |
375 |
99 |
HostSpecific = [{Type, HT, Tag} || #{type := Type, scope := HT, tag := Tag} <- Pools, is_binary(HT)], |
376 |
|
%% Then we expand all pools with `host_type` as HostType parameter but using host_type specific configs |
377 |
|
%% if they were provided |
378 |
99 |
F = fun(M = #{type := PoolType, scope := host_type, tag := Tag}) -> |
379 |
:-( |
[M#{scope => HostType} || HostType <- HostTypes, |
380 |
:-( |
not lists:member({PoolType, HostType, Tag}, HostSpecific)]; |
381 |
99 |
(Other) -> [Other] |
382 |
|
end, |
383 |
99 |
Pools1 = lists:flatmap(F, Pools), |
384 |
99 |
lists:map(fun prepare_pool_map/1, Pools1). |
385 |
|
|
386 |
|
-spec prepare_pool_map(pool_map_in()) -> pool_map(). |
387 |
|
prepare_pool_map(Pool = #{scope := HT, opts := Opts}) -> |
388 |
|
%% Rename "scope" field to "host_type" and change wpool opts to a KV list |
389 |
99 |
Pool1 = maps:remove(scope, Pool), |
390 |
99 |
Pool1#{host_type => HT, opts => maps:to_list(Opts)}. |
391 |
|
|
392 |
|
-spec get_unique_types([pool_map_in()]) -> [pool_type()]. |
393 |
|
get_unique_types(Pools) -> |
394 |
105 |
lists:usort([maps:get(type, Pool) || Pool <- Pools]). |
395 |
|
|
396 |
|
-spec get_pool(pool_type(), host_type_or_global(), tag()) -> pool_record_result(). |
397 |
|
get_pool(PoolType, HostType, Tag) -> |
398 |
3567 |
case ets:lookup(?MODULE, {PoolType, HostType, Tag}) of |
399 |
406 |
[] when is_binary(HostType) -> get_pool(PoolType, global, Tag); |
400 |
578 |
[] -> {error, {pool_not_started, {PoolType, HostType, Tag}}}; |
401 |
2583 |
[Pool] -> {ok, Pool} |
402 |
|
end. |