./ct_report/coverage/mongoose_wpool_rabbit.COVER.html

1 -module(mongoose_wpool_rabbit).
2 -behaviour(mongoose_wpool).
3
4 -export([init/0]).
5 -export([start/4]).
6 -export([stop/2]).
7
8 -define(MAX_WORKER_MSG_QUEUE_LEN, 1000).
9
10 init() ->
11 1 application:ensure_all_started(amqp_client).
12
13 start(Host, Tag, WpoolOptsIn, AMQPOpts) ->
14 1 IsConfirmEnabled = proplists:get_value(confirms_enabled, AMQPOpts, false),
15 1 PoolName = mongoose_wpool:make_pool_name(rabbit, Host, Tag),
16 1 Worker = {mongoose_rabbit_worker,
17 [{amqp_client_opts, amqp_client_opts(AMQPOpts)},
18 {host, Host},
19 {pool_tag, Tag},
20 {confirms, IsConfirmEnabled},
21 {max_queue_len, proplists:get_value(max_worker_queue_len,
22 AMQPOpts,
23 ?MAX_WORKER_MSG_QUEUE_LEN)}]},
24 1 WpoolOpts = [{worker, Worker} | WpoolOptsIn],
25 1 mongoose_wpool:start_sup_pool(rabbit, PoolName, WpoolOpts).
26
27 stop(_, _) ->
28
:-(
ok.
29
30 amqp_client_opts(AMQPOpts) ->
31 1 Opts = [{host, proplists:get_value(amqp_host, AMQPOpts)},
32 {port, proplists:get_value(amqp_port, AMQPOpts)},
33 {username, list_to_bin_or_undef(proplists:get_value(amqp_username,
34 AMQPOpts))},
35 {password, list_to_bin_or_undef(proplists:get_value(amqp_password,
36 AMQPOpts))}],
37 1 VerifiedOpts = verify_opts(Opts),
38 1 mongoose_amqp:network_params(VerifiedOpts).
39
40 list_to_bin_or_undef(Val) when is_list(Val) ->
41 2 binary:list_to_bin(Val);
42 list_to_bin_or_undef(_) ->
43
:-(
undefined.
44
45 verify_opts(Opts) ->
46 1 lists:filter(fun({_Opt, undefined}) -> false;
47 4 (_Other) -> true
48 end, Opts).
49
Line Hits Source