./ct_report/coverage/mongoose_cets_discovery_rdbms.COVER.html

1 %% @doc MongooseIM RDBMS backend for cets_discovery.
2 -module(mongoose_cets_discovery_rdbms).
3 -behaviour(cets_discovery).
4 -export([init/1, get_nodes/1]).
5
6 -include("mongoose_logger.hrl").
7
8 -ifdef(TEST).
9 -include_lib("eunit/include/eunit.hrl").
10 -endif.
11
12 -type opts() :: #{cluster_name => binary(), node_name_to_insert => binary(), last_query_info => map(),
13 expire_time := non_neg_integer()}.
14 -type state() :: opts().
15
16 -spec init(opts()) -> state().
17 init(Opts = #{cluster_name := _, node_name_to_insert := _}) ->
18 4 maps:merge(defaults(), Opts).
19
20 defaults() ->
21 4 #{expire_time => 60 * 60 * 1, %% 1 hour in seconds
22 last_query_info => #{}}.
23
24 -spec get_nodes(state()) -> {cets_discovery:get_nodes_result(), state()}.
25 get_nodes(State = #{cluster_name := ClusterName, node_name_to_insert := Node}) ->
26 6 try
27 6 case is_rdbms_running() of
28 true ->
29 6 try_register(ClusterName, Node, State);
30 false ->
31
:-(
skip
32 end
33 of
34 {Num, Nodes, Info} ->
35 6 mongoose_node_num:set_node_num(Num),
36 6 {{ok, Nodes}, State#{last_query_info => Info}};
37 skip ->
38
:-(
{{error, rdbms_not_running}, State}
39 catch Class:Reason:Stacktrace ->
40
:-(
?LOG_ERROR(#{what => discovery_failed_select, class => Class,
41
:-(
reason => Reason, stacktrace => Stacktrace}),
42
:-(
{{error, Reason}, State}
43 end.
44
45 is_rdbms_running() ->
46 6 try mongoose_wpool:get_worker(rdbms, global) of
47 6 {ok, _} -> true;
48
:-(
_ -> false
49 catch _:_ ->
50
:-(
false
51 end.
52
53 try_register(ClusterName, NodeBin, State) when is_binary(NodeBin), is_binary(ClusterName) ->
54 6 prepare(),
55 6 Timestamp = timestamp(),
56 6 Node = binary_to_atom(NodeBin),
57 6 {selected, Rows} = select(ClusterName),
58 6 Zipped = [{binary_to_atom(DbNodeBin), Num, TS} || {DbNodeBin, Num, TS} <- Rows],
59 6 {Nodes, Nums, _Timestamps} = lists:unzip3(Zipped),
60 6 AlreadyRegistered = lists:member(Node, Nodes),
61 6 NodeNum =
62 case AlreadyRegistered of
63 true ->
64 2 update_existing(ClusterName, NodeBin, Timestamp),
65 2 {value, {_, Num, _TS}} = lists:keysearch(Node, 1, Zipped),
66 2 Num;
67 false ->
68 4 Num = next_free_num(lists:usort(Nums)),
69 %% Could fail with duplicate node_num reason.
70 %% In this case just wait for the next get_nodes call.
71 4 insert_new(ClusterName, NodeBin, Timestamp, Num),
72 4 Num
73 end,
74 6 RunCleaningResult = run_cleaning(ClusterName, Timestamp, Rows, State),
75 %% This could be used for debugging
76 6 Info = #{already_registered => AlreadyRegistered, timestamp => Timestamp,
77 node_num => Num, last_rows => Rows, run_cleaning_result => RunCleaningResult},
78 6 Nodes2 = skip_expired_nodes(Nodes, RunCleaningResult),
79 6 {NodeNum, Nodes2, Info}.
80
81 skip_expired_nodes(Nodes, {removed, ExpiredNodes}) ->
82 1 Nodes -- ExpiredNodes;
83 skip_expired_nodes(Nodes, {skip, _}) ->
84 5 Nodes.
85
86 run_cleaning(ClusterName, Timestamp, Rows, State) ->
87 6 Expired = [{DbNodeBin, Num, DbTS} || {DbNodeBin, Num, DbTS} <- Rows,
88 5 is_expired(DbTS, Timestamp, State)],
89 6 ExpiredNodes = [binary_to_atom(DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired],
90 6 case Expired of
91 [] ->
92 5 {skip, nothing_expired};
93 _ ->
94 1 [delete_node_from_db(ClusterName, DbNodeBin) || {DbNodeBin, _Num, _TS} <- Expired],
95 1 ?LOG_WARNING(#{what => cets_expired_nodes,
96 text => <<"Expired nodes are detected in discovery_nodes table">>,
97
:-(
expired_nodes => ExpiredNodes}),
98 1 {removed, ExpiredNodes}
99 end.
100
101 is_expired(DbTS, Timestamp, #{expire_time := ExpireTime}) when is_integer(DbTS) ->
102 5 (Timestamp - DbTS) > ExpireTime. %% compare seconds
103
104 delete_node_from_db(ClusterName, Node) ->
105 1 mongoose_rdbms:execute_successfully(global, cets_delete_node_from_db, [ClusterName, Node]).
106
107 prepare() ->
108 6 T = discovery_nodes,
109 6 mongoose_rdbms_timestamp:prepare(),
110 6 mongoose_rdbms:prepare(cets_disco_select, T, [cluster_name], select()),
111 6 mongoose_rdbms:prepare(cets_disco_insert_new, T,
112 [cluster_name, node_name, node_num, updated_timestamp], insert_new()),
113 6 mongoose_rdbms:prepare(cets_disco_update_existing, T,
114 [updated_timestamp, cluster_name, node_name], update_existing()),
115 6 mongoose_rdbms:prepare(cets_delete_node_from_db, T,
116 [cluster_name, node_name], delete_node_from_db()).
117
118 select() ->
119 6 <<"SELECT node_name, node_num, updated_timestamp FROM discovery_nodes WHERE cluster_name = ?">>.
120
121 select(ClusterName) ->
122 6 mongoose_rdbms:execute_successfully(global, cets_disco_select, [ClusterName]).
123
124 insert_new() ->
125 6 <<"INSERT INTO discovery_nodes (cluster_name, node_name, node_num, updated_timestamp)"
126 " VALUES (?, ?, ?, ?)">>.
127
128 insert_new(ClusterName, Node, Timestamp, Num) ->
129 4 mongoose_rdbms:execute(global, cets_disco_insert_new, [ClusterName, Node, Num, Timestamp]).
130
131 update_existing() ->
132 6 <<"UPDATE discovery_nodes SET updated_timestamp = ? WHERE cluster_name = ? AND node_name = ?">>.
133
134 delete_node_from_db() ->
135 6 <<"DELETE FROM discovery_nodes WHERE cluster_name = ? AND node_name = ?">>.
136
137 update_existing(ClusterName, Node, Timestamp) ->
138 2 mongoose_rdbms:execute(global, cets_disco_update_existing, [Timestamp, ClusterName, Node]).
139
140 %% in seconds
141 timestamp() ->
142 % We could use Erlang timestamp os:system_time(second).
143 % But we use the database server time as a central source of truth.
144 6 mongoose_rdbms_timestamp:select().
145
146 %% Returns a next free node id based on the currently registered ids
147 next_free_num([]) ->
148 2 0;
149 next_free_num([H | T = [E | _]]) when ((H + 1) =:= E) ->
150 %% Sequential, ignore H
151
:-(
next_free_num(T);
152 next_free_num([H | _]) ->
153 2 H + 1.
154
155 -ifdef(TEST).
156
157 jid_to_opt_binary_test_() ->
158 [?_assertEqual(0, next_free_num([])),
159 ?_assertEqual(3, next_free_num([1, 2, 5])),
160 ?_assertEqual(3, next_free_num([1, 2]))].
161
162 -endif.
Line Hits Source