./ct_report/coverage/mongoose_cets_discovery_rdbms.COVER.html

1 %% @doc MongooseIM RDBMS backend for cets_discovery.
2 -module(mongoose_cets_discovery_rdbms).
3 -behaviour(cets_discovery).
4 -export([init/1, get_nodes/1]).
5
6 %% these functions are exported for testing purposes only.
7 -export([select/1, insert_new/5, update_existing/4, delete_node_from_db/2]).
8 -ignore_xref([select/1, insert_new/5, update_existing/4, delete_node_from_db/2]).
9
10 -include("mongoose_logger.hrl").
11
12 -ifdef(TEST).
13 -include_lib("eunit/include/eunit.hrl").
14 -endif.
15
16 -type opts() :: #{cluster_name := binary(), node_name_to_insert := binary(),
17 last_query_info => map(), expire_time => non_neg_integer(),
18 node_ip_binary => binary(),
19 any() => any()}.
20
21 -type state() :: #{cluster_name := binary(), node_name_to_insert := binary(),
22 last_query_info := map(), expire_time := non_neg_integer(),
23 node_ip_binary := binary(), address_pairs := #{binary() => binary()}}.
24
25 -spec init(opts()) -> state().
26 init(Opts = #{cluster_name := ClusterName, node_name_to_insert := Node})
27 when is_binary(ClusterName), is_binary(Node) ->
28 47 Keys = [cluster_name, node_name_to_insert, last_query_info, expire_time, node_ip_binary],
29 47 maps:with(Keys, maps:merge(defaults(), Opts)).
30
31 defaults() ->
32 47 #{expire_time => 60 * 60 * 1, %% 1 hour in seconds
33 last_query_info => #{},
34 node_ip_binary => <<>>,
35 address_pairs => #{}}.
36
37 -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
38 get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
39 775 case is_rdbms_running() of
40 true ->
41 646 try try_register(ClusterName, Node, State) of
42 {Num, Nodes, Info, AddrPairs} ->
43 646 mongoose_node_num:set_node_num(Num),
44 646 {{ok, [binary_to_atom(N) || N <- Nodes]},
45 State#{last_query_info => Info, address_pairs => AddrPairs}}
46 catch Class:Reason:Stacktrace ->
47
:-(
?LOG_ERROR(#{what => discovery_failed_select, class => Class,
48
:-(
reason => Reason, stacktrace => Stacktrace}),
49
:-(
{{error, Reason}, State}
50 end;
51 false ->
52 129 {{error, rdbms_not_running}, State}
53 end.
54
55 is_rdbms_running() ->
56 775 try mongoose_wpool:get_worker(rdbms, global) of
57 646 {ok, _} -> true;
58
:-(
_ -> false
59 catch _:_ ->
60 129 false
61 end.
62
63 try_register(ClusterName, Node, State = #{node_ip_binary := Address})
64 when is_binary(Node), is_binary(ClusterName) ->
65 646 prepare(),
66 646 Timestamp = timestamp(),
67 646 {selected, Rows} = select(ClusterName),
68 646 Nodes = [element(1, Row) || Row <- Rows],
69 646 Nums = [element(2, Row) || Row <- Rows],
70 646 Addresses = [element(3, Row) || Row <- Rows],
71 646 AddrPairs = maps:from_list(lists:zip(Nodes, Addresses)),
72 646 AlreadyRegistered = lists:member(Node, Nodes),
73 646 NodeNum =
74 case AlreadyRegistered of
75 true ->
76 628 update_existing(ClusterName, Node, Address, Timestamp),
77 628 {value, {_, Num, _Addr, _TS}} = lists:keysearch(Node, 1, Rows),
78 628 Num;
79 false ->
80 18 Num = first_free_num(lists:usort(Nums)),
81 %% Could fail with duplicate node_num reason.
82 %% In this case just wait for the next get_nodes call.
83 18 case insert_new(ClusterName, Node, Num, Address, Timestamp) of
84 2 {error, _} -> 0; %% return default node num
85 16 {updated, 1} -> Num
86 end
87 end,
88 646 RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State),
89 %% This could be used for debugging
90 646 Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
91 address => Address,
92 node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult},
93 646 {NodeNum, skip_expired_nodes(Nodes, RunCleaningResult), Info, AddrPairs}.
94
95 skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
96 646 (Nodes -- ExpiredNodes).
97
98 run_cleaning(ClusterName, Timestamp, Rows, State) ->
99 646 #{expire_time := ExpireTime, node_name_to_insert := CurrentNode} = State,
100 646 ExpiredNodes = [DbNode || {DbNode, _Num, _Addr, DbTS} <- Rows,
101 1683 is_expired(DbTS, Timestamp, ExpireTime),
102 2 DbNode =/= CurrentNode],
103 646 [delete_node_from_db(ClusterName, DbNode) || DbNode <- ExpiredNodes],
104 646 case ExpiredNodes of
105 645 [] -> ok;
106 [_ | _] ->
107 1 ?LOG_WARNING(#{what => cets_expired_nodes,
108 text => <<"Expired nodes are detected in discovery_nodes table">>,
109
:-(
expired_nodes => ExpiredNodes})
110 end,
111 646 {removed, ExpiredNodes}.
112
113 is_expired(DbTS, Timestamp, ExpireTime) when is_integer(Timestamp),
114 is_integer(ExpireTime),
115 is_integer(DbTS) ->
116 1683 (Timestamp - DbTS) > ExpireTime. %% compare seconds
117
118 prepare() ->
119 646 T = discovery_nodes,
120 646 mongoose_rdbms_timestamp:prepare(),
121 646 mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
122 646 mongoose_rdbms:prepare(cets_disco_insert_new, T,
123 [cluster_name, node_name, node_num, address, updated_timestamp], insert_new()),
124 646 mongoose_rdbms:prepare(cets_disco_update_existing, T,
125 [updated_timestamp, address, cluster_name, node_name], update_existing()),
126 646 mongoose_rdbms:prepare(cets_delete_node_from_db, T,
127 [cluster_name, node_name], delete_node_from_db()).
128
129 select() ->
130 646 <<"SELECT node_name, node_num, address, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>.
131
132 select(ClusterName) ->
133 649 mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).
134
135 insert_new() ->
136 646 <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, address, updated_timestamp)"
137 " VALUES (?, ?, ?, ?, ?)">>.
138
139 insert_new(ClusterName, NodeName, NodeNum, Address, UpdatedTimestamp) ->
140 24 mongoose_rdbms:execute(global, cets_disco_insert_new,
141 [ClusterName, NodeName, NodeNum, Address, UpdatedTimestamp]).
142
143 update_existing() ->
144 646 <<"UPDATE discovery_nodes SET updated_timestamp = ?, address = ? WHERE cluster_name = ? AND node_name = ?">>.
145
146 update_existing(ClusterName, NodeName, Address, UpdatedTimestamp) ->
147 629 mongoose_rdbms:execute(global, cets_disco_update_existing, [UpdatedTimestamp, Address, ClusterName, NodeName]).
148
149 delete_node_from_db() ->
150 646 <<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>.
151
152 delete_node_from_db(ClusterName, Node) ->
153 4 mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]).
154
155 %% in seconds
156 timestamp() ->
157 % We could use Erlang timestamp os:system_time(second).
158 % But we use the database server time as a central source of truth.
159 646 mongoose_rdbms_timestamp:select().
160
161 %% Returns a next free node id based on the currently registered ids
162 first_free_num(Nums) ->
163 %% 0 is default node_num, so lets start from 1
164 18 [FirstFreeNum | _] = lists:seq(1, length(Nums)+1) -- Nums,
165 18 FirstFreeNum.
166
167 -ifdef(TEST).
168
169 jid_to_opt_binary_test_() ->
170 [?_assertEqual(1, first_free_num([])),
171 ?_assertEqual(3, first_free_num([1, 2, 5])),
172 ?_assertEqual(1, first_free_num([2, 5])),
173 ?_assertEqual(3, first_free_num([1, 2]))].
174
175 -endif.
Line Hits Source