./ct_report/coverage/mongoose_cets_discovery_rdbms.COVER.html

1 %% @doc MongooseIM RDBMS backend for cets_discovery.
2 -module(mongoose_cets_discovery_rdbms).
3 -behaviour(cets_discovery).
4 -export([init/1, get_nodes/1]).
5
6 %% these functions are exported for testing purposes only.
7 -export([select/1, insert_new/4, update_existing/3, delete_node_from_db/2]).
8 -ignore_xref([select/1, insert_new/4, update_existing/3, delete_node_from_db/2]).
9
10 -include("mongoose_logger.hrl").
11
12 -ifdef(TEST).
13 -include_lib("eunit/include/eunit.hrl").
14 -endif.
15
16 -type opts() :: #{cluster_name := binary(), node_name_to_insert := binary(),
17 last_query_info => map(), expire_time => non_neg_integer(),
18 any() => any()}.
19
20 -type state() :: #{cluster_name := binary(), node_name_to_insert := binary(),
21 last_query_info := map(), expire_time := non_neg_integer()}.
22
23 -spec init(opts()) -> state().
24 init(Opts = #{cluster_name := _, node_name_to_insert := _}) ->
25 6 Keys = [cluster_name, node_name_to_insert, last_query_info, expire_time],
26 6 maps:with(Keys, maps:merge(defaults(), Opts)).
27
28 defaults() ->
29 6 #{expire_time => 60 * 60 * 1, %% 1 hour in seconds
30 last_query_info => #{}}.
31
32 -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
33 get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
34 10 case is_rdbms_running() of
35 true ->
36 10 try try_register(ClusterName, Node, State) of
37 {Num, Nodes, Info} ->
38 10 mongoose_node_num:set_node_num(Num),
39 10 {{ok, [binary_to_atom(N) || N <- Nodes]},
40 State#{last_query_info => Info}}
41 catch Class:Reason:Stacktrace ->
42
:-(
?LOG_ERROR(#{what => discovery_failed_select, class => Class,
43
:-(
reason => Reason, stacktrace => Stacktrace}),
44
:-(
{{error, Reason}, State}
45 end;
46 false ->
47
:-(
{{error, rdbms_not_running}, State}
48 end.
49
50 is_rdbms_running() ->
51 10 try mongoose_wpool:get_worker(rdbms, global) of
52 10 {ok, _} -> true;
53
:-(
_ -> false
54 catch _:_ ->
55
:-(
false
56 end.
57
58 try_register(ClusterName, Node, State) when is_binary(Node), is_binary(ClusterName) ->
59 10 prepare(),
60 10 Timestamp = timestamp(),
61 10 {selected, Rows} = select(ClusterName),
62 10 {Nodes, Nums, _Timestamps} = lists:unzip3(Rows),
63 10 AlreadyRegistered = lists:member(Node, Nodes),
64 10 NodeNum =
65 case AlreadyRegistered of
66 true ->
67 4 update_existing(ClusterName, Node, Timestamp),
68 4 {value, {_, Num, _TS}} = lists:keysearch(Node, 1, Rows),
69 4 Num;
70 false ->
71 6 Num = first_free_num(lists:usort(Nums)),
72 %% Could fail with duplicate node_num reason.
73 %% In this case just wait for the next get_nodes call.
74 6 case insert_new(ClusterName, Node, Timestamp, Num) of
75
:-(
{error, _} -> 0; %% return default node num
76 6 {updated, 1} -> Num
77 end
78 end,
79 10 RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State),
80 %% This could be used for debugging
81 10 Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
82 node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult},
83 10 {NodeNum, skip_expired_nodes(Nodes, RunCleaningResult), Info}.
84
85 skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
86 10 (Nodes -- ExpiredNodes).
87
88 run_cleaning(ClusterName, Timestamp, Rows, State) ->
89 10 #{expire_time := ExpireTime, node_name_to_insert := CurrentNode} = State,
90 10 ExpiredNodes = [DbNode || {DbNode, _Num, DbTS} <- Rows,
91 8 is_expired(DbTS, Timestamp, ExpireTime),
92 2 DbNode =/= CurrentNode],
93 10 [delete_node_from_db(ClusterName, DbNode) || DbNode <- ExpiredNodes],
94 10 case ExpiredNodes of
95 9 [] -> ok;
96 [_ | _] ->
97 1 ?LOG_WARNING(#{what => cets_expired_nodes,
98 text => <<"Expired nodes are detected in discovery_nodes table">>,
99
:-(
expired_nodes => ExpiredNodes})
100 end,
101 10 {removed, ExpiredNodes}.
102
103 is_expired(DbTS, Timestamp, ExpireTime) when is_integer(Timestamp),
104 is_integer(ExpireTime),
105 is_integer(DbTS) ->
106 8 (Timestamp - DbTS) > ExpireTime. %% compare seconds
107
108 prepare() ->
109 10 T = discovery_nodes,
110 10 mongoose_rdbms_timestamp:prepare(),
111 10 mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
112 10 mongoose_rdbms:prepare(cets_disco_insert_new, T,
113 [cluster_name, node_name, node_num, updated_timestamp], insert_new()),
114 10 mongoose_rdbms:prepare(cets_disco_update_existing, T,
115 [updated_timestamp, cluster_name, node_name], update_existing()),
116 10 mongoose_rdbms:prepare(cets_delete_node_from_db, T,
117 [cluster_name, node_name], delete_node_from_db()).
118
119 select() ->
120 10 <<"SELECT node_name, node_num, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>.
121
122 select(ClusterName) ->
123 13 mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).
124
125 insert_new() ->
126 10 <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, updated_timestamp)"
127 " VALUES (?, ?, ?, ?)">>.
128
129 insert_new(ClusterName, Node, Timestamp, Num) ->
130 11 mongoose_rdbms:execute(global, cets_disco_insert_new, [ClusterName, Node, Num, Timestamp]).
131
132 update_existing() ->
133 10 <<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>.
134
135 update_existing(ClusterName, Node, Timestamp) ->
136 5 mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]).
137
138 delete_node_from_db() ->
139 10 <<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>.
140
141 delete_node_from_db(ClusterName, Node) ->
142 2 mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]).
143
144 %% in seconds
145 timestamp() ->
146 % We could use Erlang timestamp os:system_time(second).
147 % But we use the database server time as a central source of truth.
148 10 mongoose_rdbms_timestamp:select().
149
150 %% Returns a next free node id based on the currently registered ids
151 first_free_num(Nums) ->
152 %% 0 is default node_num, so lets start from 1
153 6 [FirstFreeNum | _] = lists:seq(1, length(Nums)+1) -- Nums,
154 6 FirstFreeNum.
155
156 -ifdef(TEST).
157
158 jid_to_opt_binary_test_() ->
159 [?_assertEqual(1, first_free_num([])),
160 ?_assertEqual(3, first_free_num([1, 2, 5])),
161 ?_assertEqual(1, first_free_num([2, 5])),
162 ?_assertEqual(3, first_free_num([1, 2]))].
163
164 -endif.
Line Hits Source