1 |
|
-module(mongoose_wpool_cassandra). |
2 |
|
-behaviour(mongoose_wpool). |
3 |
|
|
4 |
|
-export([init/0]). |
5 |
|
-export([start/4]). |
6 |
|
-export([stop/2]). |
7 |
|
|
8 |
|
-ifdef(TEST). |
9 |
|
-export([prepare_cqerl_opts/1]). |
10 |
|
-endif. |
11 |
|
|
12 |
|
%% -------------------------------------------------------------- |
13 |
|
%% mongoose_wpool callbacks |
14 |
|
-spec init() -> ok. |
15 |
|
init() -> |
16 |
:-( |
{ok, []} = application:ensure_all_started(cqerl), |
17 |
:-( |
application:set_env(cqerl, maps, true). |
18 |
|
|
19 |
|
-spec start(mongooseim:host_type_or_global(), mongoose_wpool:tag(), |
20 |
|
mongoose_wpool:pool_opts(), mongoose_wpool:conn_opts()) -> {ok, pid()} | {error, any()}. |
21 |
|
start(HostType, Tag, WpoolOptsIn, ConnOpts) -> |
22 |
:-( |
PoolSize = proplists:get_value(workers, WpoolOptsIn), |
23 |
:-( |
application:set_env(cqerl, num_clients, PoolSize), |
24 |
:-( |
Servers = prepare_cqerl_servers(ConnOpts), |
25 |
:-( |
CqerlOpts = prepare_cqerl_opts(ConnOpts), |
26 |
:-( |
set_cluster_config(Tag, Servers, CqerlOpts), |
27 |
:-( |
Res = cqerl_cluster:add_nodes(Tag, Servers, CqerlOpts), |
28 |
:-( |
case lists:keyfind(error, 1, Res) of |
29 |
|
false -> |
30 |
:-( |
ok; |
31 |
|
_ -> |
32 |
:-( |
erlang:throw({not_all_nodes_added, Res}) |
33 |
|
end, |
34 |
:-( |
Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag), |
35 |
:-( |
Worker = {mongoose_cassandra_worker, [Tag]}, |
36 |
:-( |
WpoolOpts = [{worker, Worker} | WpoolOptsIn], |
37 |
:-( |
mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts). |
38 |
|
|
39 |
|
-spec stop(mongooseim:host_type_or_global(), mongoose_wpool:tag()) -> ok. |
40 |
|
stop(_, _) -> |
41 |
:-( |
ok. |
42 |
|
|
43 |
|
%% -------------------------------------------------------------- |
44 |
|
%% Internal functions |
45 |
|
prepare_cqerl_servers(#{servers := Servers}) -> |
46 |
:-( |
[cqerl_server(Server) || Server <- Servers]. |
47 |
|
|
48 |
:-( |
cqerl_server(#{host := Host, port := Port}) -> {Host, Port}; |
49 |
:-( |
cqerl_server(#{host := Host}) -> Host. |
50 |
|
|
51 |
|
prepare_cqerl_opts(ConnOpts) -> |
52 |
:-( |
lists:flatmap(fun(Opt) -> cqerl_opts(Opt, ConnOpts) end, [keyspace, auth, tcp, tls]). |
53 |
|
|
54 |
|
cqerl_opts(keyspace, #{keyspace := Keyspace}) -> |
55 |
:-( |
[{keyspace, Keyspace}]; |
56 |
|
cqerl_opts(auth, #{auth := #{plain := #{username := UserName, password := Password}}}) -> |
57 |
:-( |
[{auth, {cqerl_auth_plain_handler, [{UserName, Password}]}}]; |
58 |
|
cqerl_opts(tcp, #{}) -> |
59 |
:-( |
[{tcp_opts, [{keepalive, true}]}]; % always set |
60 |
|
cqerl_opts(tls, #{tls := TLSOpts}) -> |
61 |
:-( |
[{ssl, just_tls:make_ssl_opts(TLSOpts)}]; |
62 |
|
cqerl_opts(_Opt, #{}) -> |
63 |
:-( |
[]. |
64 |
|
|
65 |
|
%% make the config survive the restart of 'cqerl_cluster' in case of a network failure |
66 |
|
set_cluster_config(Tag, Servers, ExtConfig) -> |
67 |
:-( |
Clusters = application:get_env(cqerl, cassandra_clusters, []), |
68 |
:-( |
ClusterConfig = {Tag, {Servers, ExtConfig}}, |
69 |
:-( |
NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig), |
70 |
:-( |
application:set_env(cqerl, cassandra_clusters, NewClusters). |