./ct_report/coverage/mongoose_wpool_cassandra.COVER.html

1 -module(mongoose_wpool_cassandra).
2 -behaviour(mongoose_wpool).
3
4 -export([init/0]).
5 -export([start/4]).
6 -export([stop/2]).
7
8 -ifdef(TEST).
9 -export([prepare_cqerl_opts/1]).
10 -endif.
11
12 %% --------------------------------------------------------------
13 %% mongoose_wpool callbacks
14 -spec init() -> ok.
15 init() ->
16
:-(
{ok, []} = application:ensure_all_started(cqerl),
17
:-(
application:set_env(cqerl, maps, true).
18
19 -spec start(mongooseim:host_type_or_global(), mongoose_wpool:tag(),
20 mongoose_wpool:pool_opts(), mongoose_wpool:conn_opts()) -> {ok, pid()} | {error, any()}.
21 start(HostType, Tag, WpoolOptsIn, ConnOpts) ->
22
:-(
PoolSize = proplists:get_value(workers, WpoolOptsIn),
23
:-(
application:set_env(cqerl, num_clients, PoolSize),
24
:-(
Servers = prepare_cqerl_servers(ConnOpts),
25
:-(
CqerlOpts = prepare_cqerl_opts(ConnOpts),
26
:-(
set_cluster_config(Tag, Servers, CqerlOpts),
27
:-(
Res = cqerl_cluster:add_nodes(Tag, Servers, CqerlOpts),
28
:-(
case lists:keyfind(error, 1, Res) of
29 false ->
30
:-(
ok;
31 _ ->
32
:-(
erlang:throw({not_all_nodes_added, Res})
33 end,
34
:-(
Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag),
35
:-(
Worker = {mongoose_cassandra_worker, [Tag]},
36
:-(
WpoolOpts = [{worker, Worker} | WpoolOptsIn],
37
:-(
mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts).
38
39 -spec stop(mongooseim:host_type_or_global(), mongoose_wpool:tag()) -> ok.
40 stop(_, _) ->
41
:-(
ok.
42
43 %% --------------------------------------------------------------
44 %% Internal functions
45 prepare_cqerl_servers(#{servers := Servers}) ->
46
:-(
[cqerl_server(Server) || Server <- Servers].
47
48
:-(
cqerl_server(#{host := Host, port := Port}) -> {Host, Port};
49
:-(
cqerl_server(#{host := Host}) -> Host.
50
51 prepare_cqerl_opts(ConnOpts) ->
52
:-(
lists:flatmap(fun(Opt) -> cqerl_opts(Opt, ConnOpts) end, [keyspace, auth, tcp, tls]).
53
54 cqerl_opts(keyspace, #{keyspace := Keyspace}) ->
55
:-(
[{keyspace, Keyspace}];
56 cqerl_opts(auth, #{auth := #{plain := #{username := UserName, password := Password}}}) ->
57
:-(
[{auth, {cqerl_auth_plain_handler, [{UserName, Password}]}}];
58 cqerl_opts(tcp, #{}) ->
59
:-(
[{tcp_opts, [{keepalive, true}]}]; % always set
60 cqerl_opts(tls, #{tls := TLSOpts}) ->
61
:-(
[{ssl, just_tls:make_ssl_opts(TLSOpts)}];
62 cqerl_opts(_Opt, #{}) ->
63
:-(
[].
64
65 %% make the config survive the restart of 'cqerl_cluster' in case of a network failure
66 set_cluster_config(Tag, Servers, ExtConfig) ->
67
:-(
Clusters = application:get_env(cqerl, cassandra_clusters, []),
68
:-(
ClusterConfig = {Tag, {Servers, ExtConfig}},
69
:-(
NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig),
70
:-(
application:set_env(cqerl, cassandra_clusters, NewClusters).
Line Hits Source