./ct_report/coverage/mongoose_wpool_cassandra.COVER.html

1 -module(mongoose_wpool_cassandra).
2 -behaviour(mongoose_wpool).
3
4 -export([init/0]).
5 -export([start/4]).
6 -export([stop/2]).
7
8 %% --------------------------------------------------------------
9 %% mongoose_wpool callbacks
10 init() ->
11
:-(
{ok, []} = application:ensure_all_started(cqerl),
12
:-(
application:set_env(cqerl, maps, true).
13
14 start(HostType, Tag, WpoolOptsIn, CqerlOpts) ->
15
:-(
PoolSize = proplists:get_value(workers, WpoolOptsIn, 20),
16
:-(
application:set_env(cqerl, num_clients, PoolSize),
17
:-(
ExtConfig = extend_config(CqerlOpts),
18
:-(
Servers = proplists:get_value(servers, ExtConfig),
19
:-(
set_cluster_config(Tag, Servers, ExtConfig),
20
:-(
Res = cqerl_cluster:add_nodes(Tag, Servers, ExtConfig),
21
:-(
case lists:keyfind(error, 1, Res) of
22 false ->
23
:-(
ok;
24 _ ->
25
:-(
erlang:throw({not_all_nodes_added, Res})
26 end,
27
:-(
Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag),
28
:-(
Worker = {mongoose_cassandra_worker, [Tag]},
29
:-(
WpoolOpts = [{worker, Worker} | WpoolOptsIn],
30
:-(
mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts).
31
32 stop(_, _) ->
33
:-(
ok.
34
35 %% --------------------------------------------------------------
36 %% Internal functions
37 extend_config(PoolConfig) ->
38
:-(
Defaults = #{
39 servers => [{"localhost", 9042}],
40 tcp_opts => [{keepalive, true}],
41 keyspace => mongooseim
42 },
43
:-(
ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)),
44
:-(
maps:to_list(ConfigMap).
45
46 %% make the config survive the restart of 'cqerl_cluster' in case of a network failure
47 set_cluster_config(Tag, Servers, ExtConfig) ->
48
:-(
Clusters = application:get_env(cqerl, cassandra_clusters, []),
49
:-(
ClusterConfig = {Tag, {Servers, ExtConfig}},
50
:-(
NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig),
51
:-(
application:set_env(cqerl, cassandra_clusters, NewClusters).
Line Hits Source