1 |
|
-module(mongoose_wpool_cassandra). |
2 |
|
-behaviour(mongoose_wpool). |
3 |
|
|
4 |
|
-export([init/0]). |
5 |
|
-export([start/4]). |
6 |
|
-export([stop/2]). |
7 |
|
|
8 |
|
%% -------------------------------------------------------------- |
9 |
|
%% mongoose_wpool callbacks |
10 |
|
-spec init() -> ok. |
11 |
|
init() -> |
12 |
83 |
{ok, []} = application:ensure_all_started(cqerl), |
13 |
83 |
application:set_env(cqerl, maps, true). |
14 |
|
|
15 |
|
-spec start(mongooseim:host_type_or_global(), mongoose_wpool:tag(), |
16 |
|
mongoose_wpool:pool_opts(), mongoose_wpool:conn_opts()) -> {ok, pid()} | {error, any()}. |
17 |
|
start(HostType, Tag, WpoolOptsIn, ConnOpts) -> |
18 |
83 |
PoolSize = proplists:get_value(workers, WpoolOptsIn), |
19 |
83 |
application:set_env(cqerl, num_clients, PoolSize), |
20 |
83 |
Servers = prepare_cqerl_servers(ConnOpts), |
21 |
83 |
CqerlOpts = prepare_cqerl_opts(ConnOpts), |
22 |
83 |
set_cluster_config(Tag, Servers, CqerlOpts), |
23 |
83 |
Res = cqerl_cluster:add_nodes(Tag, Servers, CqerlOpts), |
24 |
83 |
case lists:keyfind(error, 1, Res) of |
25 |
|
false -> |
26 |
83 |
ok; |
27 |
|
_ -> |
28 |
:-( |
erlang:throw({not_all_nodes_added, Res}) |
29 |
|
end, |
30 |
83 |
Name = mongoose_wpool:make_pool_name(cassandra, HostType, Tag), |
31 |
83 |
Worker = {mongoose_cassandra_worker, [Tag]}, |
32 |
83 |
WpoolOpts = [{worker, Worker} | WpoolOptsIn], |
33 |
83 |
mongoose_wpool:start_sup_pool(cassandra, Name, WpoolOpts). |
34 |
|
|
35 |
|
-spec stop(mongooseim:host_type_or_global(), mongoose_wpool:tag()) -> ok. |
36 |
|
stop(_, _) -> |
37 |
79 |
ok. |
38 |
|
|
39 |
|
%% -------------------------------------------------------------- |
40 |
|
%% Internal functions |
41 |
|
prepare_cqerl_servers(#{servers := Servers}) -> |
42 |
83 |
[cqerl_server(Server) || Server <- Servers]. |
43 |
|
|
44 |
83 |
cqerl_server(#{host := Host, port := Port}) -> {Host, Port}; |
45 |
:-( |
cqerl_server(#{host := Host}) -> Host. |
46 |
|
|
47 |
|
prepare_cqerl_opts(ConnOpts) -> |
48 |
83 |
lists:flatmap(fun(Opt) -> cqerl_opts(Opt, ConnOpts) end, [keyspace, auth, tcp, tls]). |
49 |
|
|
50 |
|
cqerl_opts(keyspace, #{keyspace := Keyspace}) -> |
51 |
83 |
[{keyspace, Keyspace}]; |
52 |
|
cqerl_opts(auth, #{auth := #{plain := #{username := UserName, password := Password}}}) -> |
53 |
:-( |
[{cqerl_auth_plain_handler, [{UserName, Password}]}]; |
54 |
|
cqerl_opts(tcp, #{}) -> |
55 |
83 |
[{tcp_opts, [{keepalive, true}]}]; % always set |
56 |
|
cqerl_opts(tls, #{tls := TLSOpts}) -> |
57 |
83 |
[{ssl, just_tls:make_ssl_opts(TLSOpts)}]; |
58 |
|
cqerl_opts(_Opt, #{}) -> |
59 |
83 |
[]. |
60 |
|
|
61 |
|
%% make the config survive the restart of 'cqerl_cluster' in case of a network failure |
62 |
|
set_cluster_config(Tag, Servers, ExtConfig) -> |
63 |
83 |
Clusters = application:get_env(cqerl, cassandra_clusters, []), |
64 |
83 |
ClusterConfig = {Tag, {Servers, ExtConfig}}, |
65 |
83 |
NewClusters = lists:keystore(Tag, 1, Clusters, ClusterConfig), |
66 |
83 |
application:set_env(cqerl, cassandra_clusters, NewClusters). |