1 |
|
%% @doc MongooseIM RDBMS backend for cets_discovery. |
2 |
|
-module(mongoose_cets_discovery_rdbms). |
3 |
|
-behaviour(cets_discovery). |
4 |
|
-export([init/1, get_nodes/1]). |
5 |
|
|
6 |
|
-include("mongoose_logger.hrl"). |
7 |
|
|
8 |
|
-ifdef(TEST). |
9 |
|
-include_lib("eunit/include/eunit.hrl"). |
10 |
|
-endif. |
11 |
|
|
12 |
|
-type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map(), |
13 |
|
expire_time := non_neg_integer()}. |
14 |
|
-type state() :: opts(). |
15 |
|
|
16 |
|
-spec init(opts()) -> state(). |
17 |
|
init(Opts = #{cluster_name := _, node_name_to_insert := _}) -> |
18 |
:-( |
maps:merge(defaults(), Opts). |
19 |
|
|
20 |
|
defaults() -> |
21 |
:-( |
#{expire_time => 60 * 60 * 1, %% 1 hour in seconds |
22 |
|
last_query_info => #{}}. |
23 |
|
|
24 |
|
-spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}. |
25 |
|
get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) -> |
26 |
:-( |
try |
27 |
:-( |
case is_rdbms_running() of |
28 |
|
true -> |
29 |
:-( |
try_register(ClusterName, Node, State); |
30 |
|
false -> |
31 |
:-( |
skip |
32 |
|
end |
33 |
|
of |
34 |
|
{Num, Nodes, Info} -> |
35 |
:-( |
mongoose_node_num:set_node_num(Num), |
36 |
:-( |
{{ok, Nodes}, State#{last_query_info => Info}}; |
37 |
|
skip -> |
38 |
:-( |
{{error, rdbms_not_running}, State} |
39 |
|
catch Class:Reason:Stacktrace -> |
40 |
:-( |
?LOG_ERROR(#{what => discovery_failed_select, class => Class, |
41 |
:-( |
reason => Reason, stacktrace => Stacktrace}), |
42 |
:-( |
{{error, Reason}, State} |
43 |
|
end. |
44 |
|
|
45 |
|
is_rdbms_running() -> |
46 |
:-( |
try mongoose_wpool:get_worker(rdbms, global) of |
47 |
:-( |
{ok, _} -> true; |
48 |
:-( |
_ -> false |
49 |
|
catch _:_ -> |
50 |
:-( |
false |
51 |
|
end. |
52 |
|
|
53 |
|
try_register(ClusterName, NodeBin, State) when is_binary(NodeBin), is_binary(ClusterName) -> |
54 |
:-( |
prepare(), |
55 |
:-( |
Timestamp = timestamp(), |
56 |
:-( |
Node = binary_to_atom(NodeBin), |
57 |
:-( |
{selected, Rows} = select(ClusterName), |
58 |
:-( |
Zipped = [{binary_to_atom(DbNodeBin), Num, TS} || {DbNodeBin, Num, TS} <- Rows], |
59 |
:-( |
{Nodes, Nums, _Timestamps} = lists:unzip3(Zipped), |
60 |
:-( |
AlreadyRegistered = lists:member(Node, Nodes), |
61 |
:-( |
NodeNum = |
62 |
|
case AlreadyRegistered of |
63 |
|
true -> |
64 |
:-( |
update_existing(ClusterName, NodeBin, Timestamp), |
65 |
:-( |
{value, {_, Num, _TS}} = lists:keysearch(Node, 1, Zipped), |
66 |
:-( |
Num; |
67 |
|
false -> |
68 |
:-( |
Num = next_free_num(lists:usort(Nums)), |
69 |
|
%% Could fail with duplicate node_num reason. |
70 |
|
%% In this case just wait for the next get_nodes call. |
71 |
:-( |
insert_new(ClusterName, NodeBin, Timestamp, Num), |
72 |
:-( |
Num |
73 |
|
end, |
74 |
:-( |
RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State), |
75 |
|
%% This could be used for debugging |
76 |
:-( |
Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp, |
77 |
|
node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult}, |
78 |
:-( |
Nodes2 = skip_expired_nodes(Nodes, RunCleaningResult), |
79 |
:-( |
{NodeNum, Nodes2, Info}. |
80 |
|
|
81 |
|
skip_expired_nodes(Nodes, {removed, ExpiredNodes}) -> |
82 |
:-( |
Nodes -- ExpiredNodes; |
83 |
|
skip_expired_nodes(Nodes, {skip, _}) -> |
84 |
:-( |
Nodes. |
85 |
|
|
86 |
|
run_cleaning(ClusterName, Timestamp, Rows, State) -> |
87 |
:-( |
Expired = [{DbNodeBin, Num, DbTS} || {DbNodeBin, Num, DbTS} <- Rows, |
88 |
:-( |
is_expired(DbTS, Timestamp, State)], |
89 |
:-( |
ExpiredNodes = [binary_to_atom(DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired], |
90 |
:-( |
case Expired of |
91 |
|
[] -> |
92 |
:-( |
{skip, nothing_expired}; |
93 |
|
_ -> |
94 |
:-( |
[delete_node_from_db(ClusterName, DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired], |
95 |
:-( |
?LOG_WARNING(#{what => cets_expired_nodes, |
96 |
|
text => <<"Expired nodes are detected in discovery_nodes table">>, |
97 |
:-( |
expired_nodes => ExpiredNodes}), |
98 |
:-( |
{removed, ExpiredNodes} |
99 |
|
end. |
100 |
|
|
101 |
|
is_expired(DbTS, Timestamp, #{expire_time := ExpireTime}) when is_integer(DbTS) -> |
102 |
:-( |
(Timestamp - DbTS) > ExpireTime. %% compare seconds |
103 |
|
|
104 |
|
delete_node_from_db(ClusterName, Node) -> |
105 |
:-( |
mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]). |
106 |
|
|
107 |
|
prepare() -> |
108 |
:-( |
T = discovery_nodes, |
109 |
:-( |
mongoose_rdbms_timestamp:prepare(), |
110 |
:-( |
mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()), |
111 |
:-( |
mongoose_rdbms:prepare(cets_disco_insert_new, T, |
112 |
|
[cluster_name, node_name, node_num, updated_timestamp], insert_new()), |
113 |
:-( |
mongoose_rdbms:prepare(cets_disco_update_existing, T, |
114 |
|
[updated_timestamp, cluster_name, node_name], update_existing()), |
115 |
:-( |
mongoose_rdbms:prepare(cets_delete_node_from_db, T, |
116 |
|
[cluster_name, node_name], delete_node_from_db()). |
117 |
|
|
118 |
|
select() -> |
119 |
:-( |
<<"SELECT node_name, node_num, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>. |
120 |
|
|
121 |
|
select(ClusterName) -> |
122 |
:-( |
mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]). |
123 |
|
|
124 |
|
insert_new() -> |
125 |
:-( |
<<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, updated_timestamp)" |
126 |
|
" VALUES (?, ?, ?, ?)">>. |
127 |
|
|
128 |
|
insert_new(ClusterName, Node, Timestamp, Num) -> |
129 |
:-( |
mongoose_rdbms:execute(global, cets_disco_insert_new, [ClusterName, Node, Num, Timestamp]). |
130 |
|
|
131 |
|
update_existing() -> |
132 |
:-( |
<<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>. |
133 |
|
|
134 |
|
delete_node_from_db() -> |
135 |
:-( |
<<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>. |
136 |
|
|
137 |
|
update_existing(ClusterName, Node, Timestamp) -> |
138 |
:-( |
mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]). |
139 |
|
|
140 |
|
%% in seconds |
141 |
|
timestamp() -> |
142 |
|
% We could use Erlang timestamp os:system_time(second). |
143 |
|
% But we use the database server time as a central source of truth. |
144 |
:-( |
mongoose_rdbms_timestamp:select(). |
145 |
|
|
146 |
|
%% Returns a next free node id based on the currently registered ids |
147 |
|
next_free_num([]) -> |
148 |
:-( |
0; |
149 |
|
next_free_num([H | T = [E | _]]) when ((H + 1) =:= E) -> |
150 |
|
%% Sequential, ignore H |
151 |
:-( |
next_free_num(T); |
152 |
|
next_free_num([H | _]) -> |
153 |
:-( |
H + 1. |
154 |
|
|
155 |
|
-ifdef(TEST). |
156 |
|
|
157 |
|
jid_to_opt_binary_test_() -> |
158 |
|
[?_assertEqual(0, next_free_num([])), |
159 |
|
?_assertEqual(3, next_free_num([1, 2, 5])), |
160 |
|
?_assertEqual(3, next_free_num([1, 2]))]. |
161 |
|
|
162 |
|
-endif. |