./ct_report/coverage/mongoose_wpool_cassandra.COVER.html

1 -module(mongoose_wpool_cassandra).
2 -behaviour(mongoose_wpool).
3
4 -export([init/0]).
5 -export([start/4]).
6 -export([stop/2]).
7
8 -ifdef(TEST).
9 -export([prepare_cqerl_opts/1]).
10 -endif.
11
12 %% --------------------------------------------------------------
13 %% mongoose_wpool callbacks
14 -spec init() -> ok.
15 init() ->
16 4 {ok, []} = application:ensure_all_started(cqerl),
17 4 application:set_env(cqerl, maps, true).
18
19 -spec start(mongooseim:host_type_or_global(), mongoose_wpool:tag(),
20 mongoose_wpool:pool_opts(), mongoose_wpool:conn_opts()) -> {ok, pid()} | {error, any()}.
21 start(HostType, Tag, WpoolOptsIn, ConnOpts) ->
22 4 PoolSize = proplists:get_value(workers, WpoolOptsIn),
23 4 application:set_env(cqerl, num_clients, PoolSize),
24 4 Servers = prepare_cqerl_servers(ConnOpts),
25 4 CqerlOpts = prepare_cqerl_opts(ConnOpts),
26 4 set_cluster_config(Tag, Servers, CqerlOpts),
27 4 Res = cqerl_cluster:add_nodes(Tag, Servers, CqerlOpts),
28 4 case lists:keyfind(error, 1, Res) of
29 false ->
30 4 ok;
31 _ ->
32
:-(
erlang:throw({not_all_nodes_added, Res})
33 end,
34 4 Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag),
35 4 Worker = {mongoose_cassandra_worker, [Tag]},
36 4 WpoolOpts = [{worker, Worker} | WpoolOptsIn],
37 4 mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts).
38
39 -spec stop(mongooseim:host_type_or_global(), mongoose_wpool:tag()) -> ok.
40 stop(_, _) ->
41
:-(
ok.
42
43 %% --------------------------------------------------------------
44 %% Internal functions
45 prepare_cqerl_servers(#{servers := Servers}) ->
46 4 [cqerl_server(Server) || Server <- Servers].
47
48 4 cqerl_server(#{host := Host, port := Port}) -> {Host, Port};
49
:-(
cqerl_server(#{host := Host}) -> Host.
50
51 prepare_cqerl_opts(ConnOpts) ->
52 4 lists:flatmap(fun(Opt) -> cqerl_opts(Opt, ConnOpts) end, [keyspace, auth, tcp, tls]).
53
54 cqerl_opts(keyspace, #{keyspace := Keyspace}) ->
55 4 [{keyspace, Keyspace}];
56 cqerl_opts(auth, #{auth := #{plain := #{username := UserName, password := Password}}}) ->
57
:-(
[{auth, {cqerl_auth_plain_handler, [{UserName, Password}]}}];
58 cqerl_opts(tcp, #{}) ->
59 4 [{tcp_opts, [{keepalive, true}]}]; % always set
60 cqerl_opts(tls, #{tls := TLSOpts}) ->
61 4 [{ssl, just_tls:make_ssl_opts(TLSOpts)}];
62 cqerl_opts(_Opt, #{}) ->
63 4 [].
64
65 %% make the config survive the restart of 'cqerl_cluster' in case of a network failure
66 set_cluster_config(Tag, Servers, ExtConfig) ->
67 4 Clusters = application:get_env(cqerl, cassandra_clusters, []),
68 4 ClusterConfig = {Tag, {Servers, ExtConfig}},
69 4 NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig),
70 4 application:set_env(cqerl, cassandra_clusters, NewClusters).
Line Hits Source