./ct_report/coverage/mongoose_wpool_cassandra.COVER.html

1 -module(mongoose_wpool_cassandra).
2 -behaviour(mongoose_wpool).
3
4 -export([init/0]).
5 -export([start/4]).
6 -export([stop/2]).
7
8 %% --------------------------------------------------------------
9 %% mongoose_wpool callbacks
10 -spec init() -> ok.
11 init() ->
12 83 {ok, []} = application:ensure_all_started(cqerl),
13 83 application:set_env(cqerl, maps, true).
14
15 -spec start(mongooseim:host_type_or_global(), mongoose_wpool:tag(),
16 mongoose_wpool:pool_opts(), mongoose_wpool:conn_opts()) -> {ok, pid()} | {error, any()}.
17 start(HostType, Tag, WpoolOptsIn, ConnOpts) ->
18 83 PoolSize = proplists:get_value(workers, WpoolOptsIn),
19 83 application:set_env(cqerl, num_clients, PoolSize),
20 83 Servers = prepare_cqerl_servers(ConnOpts),
21 83 CqerlOpts = prepare_cqerl_opts(ConnOpts),
22 83 set_cluster_config(Tag, Servers, CqerlOpts),
23 83 Res = cqerl_cluster:add_nodes(Tag, Servers, CqerlOpts),
24 83 case lists:keyfind(error, 1, Res) of
25 false ->
26 83 ok;
27 _ ->
28
:-(
erlang:throw({not_all_nodes_added, Res})
29 end,
30 83 Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag),
31 83 Worker = {mongoose_cassandra_worker, [Tag]},
32 83 WpoolOpts = [{worker, Worker} | WpoolOptsIn],
33 83 mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts).
34
35 -spec stop(mongooseim:host_type_or_global(), mongoose_wpool:tag()) -> ok.
36 stop(_, _) ->
37 79 ok.
38
39 %% --------------------------------------------------------------
40 %% Internal functions
41 prepare_cqerl_servers(#{servers := Servers}) ->
42 83 [cqerl_server(Server) || Server <- Servers].
43
44 83 cqerl_server(#{host := Host, port := Port}) -> {Host, Port};
45
:-(
cqerl_server(#{host := Host}) -> Host.
46
47 prepare_cqerl_opts(ConnOpts) ->
48 83 lists:flatmap(fun(Opt) -> cqerl_opts(Opt, ConnOpts) end, [keyspace, auth, tcp, tls]).
49
50 cqerl_opts(keyspace, #{keyspace := Keyspace}) ->
51 83 [{keyspace, Keyspace}];
52 cqerl_opts(auth, #{auth := #{plain := #{username := UserName, password := Password}}}) ->
53
:-(
[{cqerl_auth_plain_handler, [{UserName, Password}]}];
54 cqerl_opts(tcp, #{}) ->
55 83 [{tcp_opts, [{keepalive, true}]}]; % always set
56 cqerl_opts(tls, #{tls := TLSOpts}) ->
57 83 [{ssl, just_tls:make_ssl_opts(TLSOpts)}];
58 cqerl_opts(_Opt, #{}) ->
59 83 [].
60
61 %% make the config survive the restart of 'cqerl_cluster' in case of a network failure
62 set_cluster_config(Tag, Servers, ExtConfig) ->
63 83 Clusters = application:get_env(cqerl, cassandra_clusters, []),
64 83 ClusterConfig = {Tag, {Servers, ExtConfig}},
65 83 NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig),
66 83 application:set_env(cqerl, cassandra_clusters, NewClusters).
Line Hits Source