./ct_report/coverage/mongoose_cluster_id.COVER.html

1 -module(mongoose_cluster_id).
2
3 -include("mongoose.hrl").
4
5 -export([
6 start/0,
7 get_cached_cluster_id/0,
8 get_backend_cluster_id/0
9 ]).
10
11 % For testing purposes only
12 -export([clean_table/0, clean_cache/0]).
13
14 -ignore_xref([clean_table/0, clean_cache/0, get_backend_cluster_id/0]).
15
16 -record(mongoose_cluster_id, {key :: atom(), value :: cluster_id()}).
17 -type cluster_id() :: binary().
18 -type maybe_cluster_id() :: {ok, cluster_id()} | {error, any()}.
19 -type mongoose_backend() :: rdbms
20 | mnesia
21 | cets.
22
23 -spec start() -> maybe_cluster_id().
24 start() ->
25 %% Consider rewriting this logic, so it does not block the starting process.
26 %% Currently, we have to do an SQL query each time we restart MongooseIM
27 %% application in the tests.
28 101 init_cache(),
29 101 Backend = which_backend_available(),
30 101 IntBackend = which_volatile_backend_available(),
31 101 maybe_prepare_queries(Backend),
32 101 cets_long:run_tracked(#{task => wait_for_any_backend,
33 backend => Backend, volatile_backend => IntBackend},
34 101 fun() -> wait_for_any_backend(Backend, IntBackend) end),
35 101 CachedRes = get_cached_cluster_id(IntBackend),
36 101 BackendRes = get_backend_cluster_id(),
37 101 case {CachedRes, BackendRes} of
38 {{ok, ID}, {ok, ID}} ->
39 55 {ok, ID};
40 {{ok, ID}, {error, _}} ->
41
:-(
set_new_cluster_id(ID, Backend);
42 {{error, _}, {ok, ID}} ->
43
:-(
set_new_cluster_id(ID, IntBackend);
44 {{error, _}, {error, _}} ->
45 46 make_and_set_new_cluster_id();
46 {{ok, CachedID}, {ok, BackendID}} ->
47
:-(
?LOG_ERROR(#{what => cluster_id_setup_conflict,
48 text => <<"Mnesia and Backend have different cluster IDs">>,
49
:-(
cached_id => CachedID, backend_id => BackendID}),
50
:-(
{error, conflict}
51 end.
52
53
54 %% If RDBMS is available before CETS - it is enough for us to continue
55 %% the starting procedure
56 wait_for_any_backend(Backend, IntBackend) ->
57 101 Alias = erlang:alias([reply]),
58 101 Pids = lists:append([wait_for_backend_promise(B, Alias) || B <- lists:sort([Backend, IntBackend])]),
59 101 wait_for_first_reply(Alias),
60 %% Interrupt other waiting calls to reduce the logging noise
61 101 [erlang:exit(Pid, shutdown) || Pid <- Pids],
62 101 ok.
63
64 wait_for_first_reply(Alias) ->
65 101 receive
66 {ready, Alias} ->
67 101 ok
68 end.
69
70 wait_for_backend_promise(mnesia, Alias) ->
71 202 Alias ! {ready, Alias},
72 202 [];
73 wait_for_backend_promise(cets, Alias) ->
74
:-(
[spawn(fun() ->
75 %% We have to do it, because we want to read from across the cluster
76 %% in the start/0 function.
77
:-(
ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity),
78
:-(
Alias ! {ready, Alias}
79 end)];
80 wait_for_backend_promise(rdbms, Alias) ->
81
:-(
[spawn(fun() ->
82
:-(
cets_long:run_tracked(#{task => wait_for_rdbms}, fun() -> wait_for_rdbms() end),
83
:-(
Alias ! {ready, Alias}
84 end)].
85
86 wait_for_rdbms() ->
87
:-(
case get_backend_cluster_id(rdbms) of
88 {ok, _} ->
89
:-(
ok;
90 _ ->
91
:-(
timer:sleep(100),
92
:-(
wait_for_rdbms()
93 end.
94
95 %% Get cached version
96 -spec get_cached_cluster_id() -> maybe_cluster_id().
97 get_cached_cluster_id() ->
98 48 IntBackend = which_volatile_backend_available(),
99 48 get_cached_cluster_id(IntBackend).
100
101 get_cached_cluster_id(mnesia) ->
102 250 T = fun() -> mnesia:read(mongoose_cluster_id, cluster_id) end,
103 250 case mnesia:transaction(T) of
104 {atomic, [#mongoose_cluster_id{value = ClusterID}]} ->
105 158 {ok, ClusterID};
106 {atomic, []} ->
107 92 {error, cluster_id_not_in_cache};
108 {aborted, Reason} ->
109
:-(
{error, Reason}
110 end;
111 get_cached_cluster_id(cets) ->
112
:-(
case ets:lookup(cets_cluster_id, cluster_id) of
113 [{cluster_id, ClusterID}] ->
114
:-(
{ok, ClusterID};
115 [] ->
116
:-(
{error, cluster_id_not_in_cache}
117 end.
118
119 %% ====================================================================
120 %% Internal getters and setters
121 %% ====================================================================
122 -spec get_backend_cluster_id() -> maybe_cluster_id().
123 get_backend_cluster_id() ->
124 101 get_backend_cluster_id(which_backend_available()).
125
126 -spec set_new_cluster_id(cluster_id()) -> maybe_cluster_id().
127 set_new_cluster_id(ID) ->
128 46 set_new_cluster_id(ID, which_backend_available()).
129
130 -spec make_and_set_new_cluster_id() -> maybe_cluster_id().
131 make_and_set_new_cluster_id() ->
132 46 NewID = make_cluster_id(),
133 46 set_new_cluster_id(NewID).
134
135 %% ====================================================================
136 %% Internal functions
137 %% ====================================================================
138 init_cache() ->
139 101 init_cache(which_volatile_backend_available()).
140
141 init_cache(mnesia) ->
142 101 mongoose_mnesia:create_table(mongoose_cluster_id,
143 [{type, set},
144 {record_name, mongoose_cluster_id},
145 {attributes, record_info(fields, mongoose_cluster_id)},
146 {ram_copies, [node()]}
147 ]);
148 init_cache(cets) ->
149
:-(
cets:start(cets_cluster_id, #{}),
150
:-(
cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id).
151
152 -spec maybe_prepare_queries(mongoose_backend()) -> ok.
153 101 maybe_prepare_queries(mnesia) -> ok;
154 maybe_prepare_queries(rdbms) ->
155
:-(
mongoose_rdbms:prepare(cluster_insert_new, mongoose_cluster_id, [v],
156 <<"INSERT INTO mongoose_cluster_id(k,v) VALUES ('cluster_id', ?)">>),
157
:-(
mongoose_rdbms:prepare(cluster_select, mongoose_cluster_id, [],
158 <<"SELECT v FROM mongoose_cluster_id WHERE k='cluster_id'">>),
159
:-(
ok.
160
161 -spec execute_cluster_insert_new(binary()) -> mongoose_rdbms:query_result().
162 execute_cluster_insert_new(ID) ->
163
:-(
mongoose_rdbms:execute_successfully(global, cluster_insert_new, [ID]).
164
165 -spec make_cluster_id() -> cluster_id().
166 make_cluster_id() ->
167 46 uuid:uuid_to_string(uuid:get_v4(), binary_standard).
168
169 %% Which backend is enabled
170 -spec which_backend_available() -> mongoose_backend().
171 which_backend_available() ->
172 248 case mongoose_wpool:get_pool_settings(rdbms, global, default) of
173 248 undefined -> which_volatile_backend_available();
174
:-(
_ -> rdbms
175 end.
176
177 which_volatile_backend_available() ->
178 498 case mongoose_config:get_opt(internal_databases) of
179 #{cets := _} ->
180
:-(
cets;
181 #{mnesia := _} ->
182 498 mnesia
183 end.
184
185 -spec set_new_cluster_id(cluster_id(), mongoose_backend()) -> ok | {error, any()}.
186 set_new_cluster_id(ID, rdbms) ->
187
:-(
try execute_cluster_insert_new(ID) of
188 {updated, 1} ->
189
:-(
set_new_cluster_id(ID, which_volatile_backend_available()),
190
:-(
{ok, ID}
191 catch
192 Class:Reason:Stacktrace ->
193
:-(
?LOG_WARNING(#{what => cluster_id_set_failed,
194 text => <<"Error inserting cluster ID into RDBMS">>,
195 cluster_id => ID,
196
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
197
:-(
{error, {Class, Reason}}
198 end;
199 set_new_cluster_id(ID, mnesia) ->
200 46 T = fun() -> mnesia:write(#mongoose_cluster_id{key = cluster_id, value = ID}) end,
201 46 case mnesia:transaction(T) of
202 {atomic, ok} ->
203 46 {ok, ID};
204 {aborted, Reason} ->
205
:-(
{error, Reason}
206 end;
207 set_new_cluster_id(ID, cets) ->
208
:-(
cets:insert_serial(cets_cluster_id, {cluster_id, ID}),
209
:-(
{ok, ID}.
210
211 %% Get cluster ID
212 -spec get_backend_cluster_id(mongoose_backend()) -> maybe_cluster_id().
213 get_backend_cluster_id(rdbms) ->
214
:-(
try mongoose_rdbms:execute_successfully(global, cluster_select, []) of
215
:-(
{selected, [{ID}]} -> {ok, ID};
216
:-(
{selected, []} -> {error, no_value_in_backend}
217 catch
218 Class:Reason:Stacktrace ->
219
:-(
?LOG_WARNING(#{what => cluster_id_get_failed,
220 text => <<"Error getting cluster ID from RDBMS">>,
221
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
222
:-(
{error, {Class, Reason}}
223 end;
224 get_backend_cluster_id(mnesia) ->
225 101 get_cached_cluster_id(mnesia);
226 get_backend_cluster_id(cets) ->
227
:-(
get_cached_cluster_id(cets).
228
229 clean_table() ->
230
:-(
clean_table(which_backend_available()).
231
232 -spec clean_table(mongoose_backend()) -> ok | {error, any()}.
233 clean_table(rdbms) ->
234
:-(
SQLQuery = [<<"TRUNCATE TABLE mongoose_cluster_id;">>],
235
:-(
try mongoose_rdbms:sql_query(global, SQLQuery) of
236
:-(
{selected, _} -> ok;
237
:-(
{updated, _} -> ok;
238
:-(
{error, _} = Err -> Err
239 catch
240 Class:Reason:Stacktrace ->
241
:-(
?LOG_WARNING(#{what => cluster_id_clean_failed,
242 text => <<"Error truncating mongoose_cluster_id table">>,
243 sql_query => SQLQuery,
244
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
245
:-(
{error, {Class, Reason}}
246 end;
247
:-(
clean_table(_) -> ok.
248
249 clean_cache() ->
250
:-(
clean_cache(which_volatile_backend_available()).
251
252 clean_cache(mnesia) ->
253
:-(
mnesia:dirty_delete(mongoose_cluster_id, cluster_id);
254 clean_cache(cets) ->
255
:-(
cets:delete(cets_cluster_id, cluster_id).
Line Hits Source