1 |
|
-module(mongoose_wpool_cassandra). |
2 |
|
-behaviour(mongoose_wpool). |
3 |
|
|
4 |
|
-export([init/0]). |
5 |
|
-export([start/4]). |
6 |
|
-export([stop/2]). |
7 |
|
|
8 |
|
%% -------------------------------------------------------------- |
9 |
|
%% mongoose_wpool callbacks |
10 |
|
init() -> |
11 |
:-( |
{ok, []} = application:ensure_all_started(cqerl), |
12 |
:-( |
application:set_env(cqerl, maps, true). |
13 |
|
|
14 |
|
start(HostType, Tag, WpoolOptsIn, CqerlOpts) -> |
15 |
:-( |
PoolSize = proplists:get_value(workers, WpoolOptsIn, 20), |
16 |
:-( |
application:set_env(cqerl, num_clients, PoolSize), |
17 |
:-( |
ExtConfig = extend_config(CqerlOpts), |
18 |
:-( |
Servers = proplists:get_value(servers, ExtConfig), |
19 |
:-( |
set_cluster_config(Tag, Servers, ExtConfig), |
20 |
:-( |
Res = cqerl_cluster:add_nodes(Tag, Servers, ExtConfig), |
21 |
:-( |
case lists:keyfind(error, 1, Res) of |
22 |
|
false -> |
23 |
:-( |
ok; |
24 |
|
_ -> |
25 |
:-( |
erlang:throw({not_all_nodes_added, Res}) |
26 |
|
end, |
27 |
:-( |
Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag), |
28 |
:-( |
Worker = {mongoose_cassandra_worker, [Tag]}, |
29 |
:-( |
WpoolOpts = [{worker, Worker} | WpoolOptsIn], |
30 |
:-( |
mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts). |
31 |
|
|
32 |
|
stop(_, _) -> |
33 |
:-( |
ok. |
34 |
|
|
35 |
|
%% -------------------------------------------------------------- |
36 |
|
%% Internal functions |
37 |
|
extend_config(PoolConfig) -> |
38 |
:-( |
Defaults = #{ |
39 |
|
servers => [{"localhost", 9042}], |
40 |
|
tcp_opts => [{keepalive, true}], |
41 |
|
keyspace => mongooseim |
42 |
|
}, |
43 |
:-( |
ConfigMap = maps:merge(Defaults, maps:from_list(PoolConfig)), |
44 |
:-( |
maps:to_list(ConfigMap). |
45 |
|
|
46 |
|
%% make the config survive the restart of 'cqerl_cluster' in case of a network failure |
47 |
|
set_cluster_config(Tag, Servers, ExtConfig) -> |
48 |
:-( |
Clusters = application:get_env(cqerl, cassandra_clusters, []), |
49 |
:-( |
ClusterConfig = {Tag, {Servers, ExtConfig}}, |
50 |
:-( |
NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig), |
51 |
:-( |
application:set_env(cqerl, cassandra_clusters, NewClusters). |