1 |
|
-module(mongoose_wpool_rabbit). |
2 |
|
-behaviour(mongoose_wpool). |
3 |
|
|
4 |
|
-export([init/0]). |
5 |
|
-export([start/4]). |
6 |
|
-export([stop/2]). |
7 |
|
|
8 |
|
-define(MAX_WORKER_MSG_QUEUE_LEN, 1000). |
9 |
|
|
10 |
|
init() -> |
11 |
:-( |
application:ensure_all_started(amqp_client). |
12 |
|
|
13 |
|
start(Host, Tag, WpoolOptsIn, AMQPOpts) -> |
14 |
:-( |
IsConfirmEnabled = proplists:get_value(confirms_enabled, AMQPOpts, false), |
15 |
:-( |
PoolName = mongoose_wpool:make_pool_name(rabbit, Host, Tag), |
16 |
:-( |
Worker = {mongoose_rabbit_worker, |
17 |
|
[{amqp_client_opts, amqp_client_opts(AMQPOpts)}, |
18 |
|
{host, Host}, |
19 |
|
{pool_tag, Tag}, |
20 |
|
{confirms, IsConfirmEnabled}, |
21 |
|
{max_queue_len, proplists:get_value(max_worker_queue_len, |
22 |
|
AMQPOpts, |
23 |
|
?MAX_WORKER_MSG_QUEUE_LEN)}]}, |
24 |
:-( |
WpoolOpts = [{worker, Worker} | WpoolOptsIn], |
25 |
:-( |
mongoose_wpool:start_sup_pool(rabbit, PoolName, WpoolOpts). |
26 |
|
|
27 |
|
stop(_, _) -> |
28 |
:-( |
ok. |
29 |
|
|
30 |
|
amqp_client_opts(AMQPOpts) -> |
31 |
:-( |
Opts = [{host, proplists:get_value(amqp_host, AMQPOpts)}, |
32 |
|
{port, proplists:get_value(amqp_port, AMQPOpts)}, |
33 |
|
{username, list_to_bin_or_undef(proplists:get_value(amqp_username, |
34 |
|
AMQPOpts))}, |
35 |
|
{password, list_to_bin_or_undef(proplists:get_value(amqp_password, |
36 |
|
AMQPOpts))}], |
37 |
:-( |
VerifiedOpts = verify_opts(Opts), |
38 |
:-( |
mongoose_amqp:network_params(VerifiedOpts). |
39 |
|
|
40 |
|
list_to_bin_or_undef(Val) when is_list(Val) -> |
41 |
:-( |
binary:list_to_bin(Val); |
42 |
|
list_to_bin_or_undef(_) -> |
43 |
:-( |
undefined. |
44 |
|
|
45 |
|
verify_opts(Opts) -> |
46 |
:-( |
lists:filter(fun({_Opt, undefined}) -> false; |
47 |
:-( |
(_Other) -> true |
48 |
|
end, Opts). |
49 |
|
|