./ct_report/coverage/mongoose_cets_discovery_rdbms.COVER.html

1 %% @doc MongooseIM RDBMS backend for cets_discovery.
2 -module(mongoose_cets_discovery_rdbms).
3 -behaviour(cets_discovery).
4 -export([init/1, get_nodes/1]).
5
6 %% these functions are exported for testing purposes only.
7 -export([select/1, insert_new/5, update_existing/3, delete_node_from_db/1,
8 cluster_name_with_vsn/1]).
9 -ignore_xref([select/1, insert_new/5, update_existing/3, delete_node_from_db/1,
10 cluster_name_with_vsn/1]).
11
12 -include("mongoose_logger.hrl").
13
14 -ifdef(TEST).
15 -include_lib("eunit/include/eunit.hrl").
16 -endif.
17
18 -type opts() :: #{cluster_name := binary(), node_name_to_insert := binary(),
19 last_query_info => map(), expire_time => non_neg_integer(),
20 node_ip_binary => binary(),
21 any() => any()}.
22
23 -type state() :: #{cluster_name := binary(), node_name_to_insert := binary(),
24 last_query_info := map(), expire_time := non_neg_integer(),
25 node_ip_binary := binary(), address_pairs := #{binary() => binary()}}.
26
27 -spec init(opts()) -> state().
28 init(Opts = #{cluster_name := ClusterName, node_name_to_insert := Node})
29 when is_binary(ClusterName), is_binary(Node) ->
30 52 Keys = [node_name_to_insert, expire_time, last_query_info, node_ip_binary],
31 52 StateOpts = maps:merge(defaults(), maps:with(Keys, Opts)),
32 52 StateOpts#{cluster_name => cluster_name_with_vsn(ClusterName)}.
33
34 cluster_name_with_vsn(ClusterName) ->
35 53 {ok, CetsVsn} = application:get_key(cets, vsn),
36 53 [MajorVsn, MinorVsn | _] = string:tokens(CetsVsn, "."),
37 53 iolist_to_binary([ClusterName, $-, MajorVsn, $., MinorVsn]).
38
39 defaults() ->
40 52 #{expire_time => 60 * 60 * 1, %% 1 hour in seconds
41 last_query_info => #{},
42 node_ip_binary => <<>>,
43 address_pairs => #{}}.
44
45 -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
46 get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
47 582 case is_rdbms_running() of
48 true ->
49 448 try try_register(ClusterName, Node, State) of
50 {Num, Nodes, Info, AddrPairs} ->
51 448 mongoose_node_num:set_node_num(Num),
52 448 {{ok, [binary_to_atom(N) || N <- Nodes]},
53 State#{last_query_info => Info, address_pairs => AddrPairs}}
54 catch Class:Reason:Stacktrace ->
55
:-(
?LOG_ERROR(#{what => discovery_failed_select, class => Class,
56
:-(
reason => Reason, stacktrace => Stacktrace}),
57
:-(
{{error, Reason}, State}
58 end;
59 false ->
60 134 {{error, rdbms_not_running}, State}
61 end.
62
63 is_rdbms_running() ->
64 582 try mongoose_wpool:get_worker(rdbms, global) of
65 448 {ok, _} -> true;
66 4 _ -> false
67 catch _:_ ->
68 130 false
69 end.
70
71 try_register(ClusterName, Node, State = #{node_ip_binary := Address})
72 when is_binary(Node), is_binary(ClusterName) ->
73 448 prepare(),
74 448 Timestamp = timestamp(),
75 448 {selected, Rows} = select(ClusterName),
76 448 Nodes = [element(1, Row) || Row <- Rows],
77 448 Nums = [element(2, Row) || Row <- Rows],
78 448 Addresses = [element(3, Row) || Row <- Rows],
79 448 AddrPairs = maps:from_list(lists:zip(Nodes, Addresses)),
80 448 AlreadyRegistered = lists:member(Node, Nodes),
81 448 NodeNum =
82 case AlreadyRegistered of
83 true ->
84 424 update_existing(Node, Address, Timestamp),
85 424 {value, {_, Num, _Addr, _TS}} = lists:keysearch(Node, 1, Rows),
86 424 Num;
87 false ->
88 24 Num = first_free_num(lists:usort(Nums)),
89 24 delete_node_from_db(Node), % Delete node if it was a member of another cluster
90 %% Could fail with duplicate node_num reason.
91 %% In this case just wait for the next get_nodes call.
92 24 case insert_new(ClusterName, Node, Num, Address, Timestamp) of
93 1 {error, _} -> 0; %% return default node num
94 23 {updated, 1} -> Num
95 end
96 end,
97 448 RunCleaningResult = run_cleaning(Timestamp, Rows, State),
98 %% This could be used for debugging
99 448 Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
100 address => Address,
101 node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult},
102 448 {NodeNum, skip_expired_nodes(Nodes, RunCleaningResult), Info, AddrPairs}.
103
104 skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
105 448 (Nodes -- ExpiredNodes).
106
107 run_cleaning(Timestamp, Rows, State) ->
108 448 #{expire_time := ExpireTime, node_name_to_insert := CurrentNode} = State,
109 448 ExpiredNodes = [DbNode || {DbNode, _Num, _Addr, DbTS} <- Rows,
110 1142 is_expired(DbTS, Timestamp, ExpireTime),
111 2 DbNode =/= CurrentNode],
112 448 [delete_node_from_db(DbNode) || DbNode <- ExpiredNodes],
113 448 case ExpiredNodes of
114 447 [] -> ok;
115 [_ | _] ->
116 1 ?LOG_WARNING(#{what => cets_expired_nodes,
117 text => <<"Expired nodes are detected in discovery_nodes table">>,
118
:-(
expired_nodes => ExpiredNodes})
119 end,
120 448 {removed, ExpiredNodes}.
121
122 is_expired(DbTS, Timestamp, ExpireTime) when is_integer(Timestamp),
123 is_integer(ExpireTime),
124 is_integer(DbTS) ->
125 1142 (Timestamp - DbTS) > ExpireTime. %% compare seconds
126
127 prepare() ->
128 448 T = discovery_nodes,
129 448 mongoose_rdbms_timestamp:prepare(),
130 448 mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
131 448 mongoose_rdbms:prepare(cets_disco_insert_new, T,
132 [cluster_name, node_name, node_num, address, updated_timestamp], insert_new()),
133 448 mongoose_rdbms:prepare(cets_disco_update_existing, T,
134 [updated_timestamp, address, node_name], update_existing()),
135 448 mongoose_rdbms:prepare(cets_delete_node_from_db, T,
136 [node_name], delete_node_from_db()).
137
138 select() ->
139 448 <<"SELECT node_name, node_num, address, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>.
140
141 select(ClusterName) ->
142 451 mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).
143
144 insert_new() ->
145 448 <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, address, updated_timestamp)"
146 " VALUES (?, ?, ?, ?, ?)">>.
147
148 insert_new(ClusterName, NodeName, NodeNum, Address, UpdatedTimestamp) ->
149 31 mongoose_rdbms:execute(global, cets_disco_insert_new,
150 [ClusterName, NodeName, NodeNum, Address, UpdatedTimestamp]).
151
152 update_existing() ->
153 448 <<"UPDATE discovery_nodes SET updated_timestamp = ?, address = ? WHERE node_name = ?">>.
154
155 update_existing(NodeName, Address, UpdatedTimestamp) ->
156 425 mongoose_rdbms:execute(global, cets_disco_update_existing, [UpdatedTimestamp, Address, NodeName]).
157
158 delete_node_from_db() ->
159 448 <<"DELETE FROM discovery_nodes WHERE node_name = ?">>.
160
161 delete_node_from_db(Node) ->
162 28 mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [Node]).
163
164 %% in seconds
165 timestamp() ->
166 % We could use Erlang timestamp os:system_time(second).
167 % But we use the database server time as a central source of truth.
168 448 mongoose_rdbms_timestamp:select().
169
170 %% Returns a next free node id based on the currently registered ids
171 first_free_num(Nums) ->
172 %% 0 is default node_num, so lets start from 1
173 24 [FirstFreeNum | _] = lists:seq(1, length(Nums)+1) -- Nums,
174 24 FirstFreeNum.
175
176 -ifdef(TEST).
177
178 jid_to_opt_binary_test_() ->
179 [?_assertEqual(1, first_free_num([])),
180 ?_assertEqual(3, first_free_num([1, 2, 5])),
181 ?_assertEqual(1, first_free_num([2, 5])),
182 ?_assertEqual(3, first_free_num([1, 2]))].
183
184 -endif.
Line Hits Source