./ct_report/coverage/mongoose_cluster_id.COVER.html

1 -module(mongoose_cluster_id).
2
3 -include("mongoose.hrl").
4
5 -export([
6 start/0,
7 get_cached_cluster_id/0,
8 get_backend_cluster_id/0
9 ]).
10
11 % For testing purposes only
12 -export([clean_table/0, clean_cache/0]).
13
14 -ignore_xref([clean_table/0, clean_cache/0, get_backend_cluster_id/0]).
15
16 -record(mongoose_cluster_id, {key :: atom(), value :: cluster_id()}).
17 -type cluster_id() :: binary().
18 -type maybe_cluster_id() :: {ok, cluster_id()} | {error, any()}.
19 -type mongoose_backend() :: rdbms
20 | mnesia
21 | cets.
22
23 -spec start() -> maybe_cluster_id().
24 start() ->
25 %% Consider rewriting this logic, so it does not block the starting process.
26 %% Currently, we have to do an SQL query each time we restart MongooseIM
27 %% application in the tests.
28 106 init_cache(),
29 106 Backend = which_backend_available(),
30 106 IntBackend = which_volatile_backend_available(),
31 106 maybe_prepare_queries(Backend),
32 106 cets_long:run_tracked(#{task => wait_for_any_backend,
33 backend => Backend, volatile_backend => IntBackend},
34 106 fun() -> wait_for_any_backend(Backend, IntBackend) end),
35 106 CachedRes = get_cached_cluster_id(IntBackend),
36 106 BackendRes = get_backend_cluster_id(),
37 106 case {CachedRes, BackendRes} of
38 {{ok, ID}, {ok, ID}} ->
39 56 {ok, ID};
40 {{ok, ID}, {error, _}} ->
41 1 set_new_cluster_id(ID, Backend);
42 {{error, _}, {ok, ID}} ->
43 47 set_new_cluster_id(ID, IntBackend);
44 {{error, _}, {error, _}} ->
45 2 make_and_set_new_cluster_id();
46 {{ok, CachedID}, {ok, BackendID}} ->
47
:-(
?LOG_ERROR(#{what => cluster_id_setup_conflict,
48 text => <<"Mnesia and Backend have different cluster IDs">>,
49
:-(
cached_id => CachedID, backend_id => BackendID}),
50
:-(
{error, conflict}
51 end.
52
53
54 %% If RDBMS is available before CETS - it is enough for us to continue
55 %% the starting procedure
56 wait_for_any_backend(Backend, IntBackend) ->
57 106 Alias = erlang:alias([reply]),
58 106 Pids = lists:append([wait_for_backend_promise(B, Alias) || B <- lists:sort([Backend, IntBackend])]),
59 106 wait_for_first_reply(Alias),
60 %% Interrupt other waiting calls to reduce the logging noise
61 106 [erlang:exit(Pid, shutdown) || Pid <- Pids],
62 106 ok.
63
64 wait_for_first_reply(Alias) ->
65 106 receive
66 {ready, Alias} ->
67 106 ok
68 end.
69
70 wait_for_backend_promise(mnesia, Alias) ->
71 106 Alias ! {ready, Alias},
72 106 [];
73 wait_for_backend_promise(cets, Alias) ->
74
:-(
[spawn(fun() ->
75 %% We have to do it, because we want to read from across the cluster
76 %% in the start/0 function.
77
:-(
ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity),
78
:-(
Alias ! {ready, Alias}
79 end)];
80 wait_for_backend_promise(rdbms, Alias) ->
81 106 [spawn(fun() ->
82
:-(
cets_long:run_tracked(#{task => wait_for_rdbms}, fun() -> wait_for_rdbms() end),
83
:-(
Alias ! {ready, Alias}
84 end)].
85
86 wait_for_rdbms() ->
87
:-(
case get_backend_cluster_id(rdbms) of
88 {ok, _} ->
89
:-(
ok;
90 _ ->
91
:-(
timer:sleep(100),
92
:-(
wait_for_rdbms()
93 end.
94
95 %% Get cached version
96 -spec get_cached_cluster_id() -> maybe_cluster_id().
97 get_cached_cluster_id() ->
98 53 IntBackend = which_volatile_backend_available(),
99 53 get_cached_cluster_id(IntBackend).
100
101 get_cached_cluster_id(mnesia) ->
102 159 T = fun() -> mnesia:read(mongoose_cluster_id, cluster_id) end,
103 159 case mnesia:transaction(T) of
104 {atomic, [#mongoose_cluster_id{value = ClusterID}]} ->
105 110 {ok, ClusterID};
106 {atomic, []} ->
107 49 {error, cluster_id_not_in_cache};
108 {aborted, Reason} ->
109
:-(
{error, Reason}
110 end;
111 get_cached_cluster_id(cets) ->
112
:-(
case ets:lookup(cets_cluster_id, cluster_id) of
113 [{cluster_id, ClusterID}] ->
114
:-(
{ok, ClusterID};
115 [] ->
116
:-(
{error, cluster_id_not_in_cache}
117 end.
118
119 %% ====================================================================
120 %% Internal getters and setters
121 %% ====================================================================
122 -spec get_backend_cluster_id() -> maybe_cluster_id().
123 get_backend_cluster_id() ->
124 109 get_backend_cluster_id(which_backend_available()).
125
126 -spec set_new_cluster_id(cluster_id()) -> maybe_cluster_id().
127 set_new_cluster_id(ID) ->
128 2 set_new_cluster_id(ID, which_backend_available()).
129
130 -spec make_and_set_new_cluster_id() -> maybe_cluster_id().
131 make_and_set_new_cluster_id() ->
132 2 NewID = make_cluster_id(),
133 2 set_new_cluster_id(NewID).
134
135 %% ====================================================================
136 %% Internal functions
137 %% ====================================================================
138 init_cache() ->
139 106 init_cache(which_volatile_backend_available()).
140
141 init_cache(mnesia) ->
142 106 mongoose_mnesia:create_table(mongoose_cluster_id,
143 [{type, set},
144 {record_name, mongoose_cluster_id},
145 {attributes, record_info(fields, mongoose_cluster_id)},
146 {ram_copies, [node()]}
147 ]);
148 init_cache(cets) ->
149
:-(
cets:start(cets_cluster_id, #{}),
150
:-(
cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id).
151
152 -spec maybe_prepare_queries(mongoose_backend()) -> ok.
153
:-(
maybe_prepare_queries(mnesia) -> ok;
154 maybe_prepare_queries(rdbms) ->
155 106 mongoose_rdbms:prepare(cluster_insert_new, mongoose_cluster_id, [v],
156 <<"INSERT INTO mongoose_cluster_id(k,v) VALUES ('cluster_id', ?)">>),
157 106 mongoose_rdbms:prepare(cluster_select, mongoose_cluster_id, [],
158 <<"SELECT v FROM mongoose_cluster_id WHERE k='cluster_id'">>),
159 106 ok.
160
161 -spec execute_cluster_insert_new(binary()) -> mongoose_rdbms:query_result().
162 execute_cluster_insert_new(ID) ->
163 3 mongoose_rdbms:execute_successfully(global, cluster_insert_new, [ID]).
164
165 -spec make_cluster_id() -> cluster_id().
166 make_cluster_id() ->
167 2 uuid:uuid_to_string(uuid:get_v4(), binary_standard).
168
169 %% Which backend is enabled
170 -spec which_backend_available() -> mongoose_backend().
171 which_backend_available() ->
172 219 case mongoose_wpool:get_pool_settings(rdbms, global, default) of
173
:-(
undefined -> which_volatile_backend_available();
174 219 _ -> rdbms
175 end.
176
177 which_volatile_backend_available() ->
178 270 case mongoose_config:get_opt(internal_databases) of
179 #{cets := _} ->
180
:-(
cets;
181 #{mnesia := _} ->
182 270 mnesia
183 end.
184
185 -spec set_new_cluster_id(cluster_id(), mongoose_backend()) -> ok | {error, any()}.
186 set_new_cluster_id(ID, rdbms) ->
187 3 try execute_cluster_insert_new(ID) of
188 {updated, 1} ->
189 3 set_new_cluster_id(ID, which_volatile_backend_available()),
190 3 {ok, ID}
191 catch
192 Class:Reason:Stacktrace ->
193
:-(
?LOG_WARNING(#{what => cluster_id_set_failed,
194 text => <<"Error inserting cluster ID into RDBMS">>,
195 cluster_id => ID,
196
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
197
:-(
{error, {Class, Reason}}
198 end;
199 set_new_cluster_id(ID, mnesia) ->
200 50 T = fun() -> mnesia:write(#mongoose_cluster_id{key = cluster_id, value = ID}) end,
201 50 case mnesia:transaction(T) of
202 {atomic, ok} ->
203 50 {ok, ID};
204 {aborted, Reason} ->
205
:-(
{error, Reason}
206 end;
207 set_new_cluster_id(ID, cets) ->
208
:-(
cets:insert_serial(cets_cluster_id, {cluster_id, ID}),
209
:-(
{ok, ID}.
210
211 %% Get cluster ID
212 -spec get_backend_cluster_id(mongoose_backend()) -> maybe_cluster_id().
213 get_backend_cluster_id(rdbms) ->
214 109 try mongoose_rdbms:execute_successfully(global, cluster_select, []) of
215 105 {selected, [{ID}]} -> {ok, ID};
216 4 {selected, []} -> {error, no_value_in_backend}
217 catch
218 Class:Reason:Stacktrace ->
219
:-(
?LOG_WARNING(#{what => cluster_id_get_failed,
220 text => <<"Error getting cluster ID from RDBMS">>,
221
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
222
:-(
{error, {Class, Reason}}
223 end;
224 get_backend_cluster_id(mnesia) ->
225
:-(
get_cached_cluster_id(mnesia);
226 get_backend_cluster_id(cets) ->
227
:-(
get_cached_cluster_id(cets).
228
229 clean_table() ->
230 2 clean_table(which_backend_available()).
231
232 -spec clean_table(mongoose_backend()) -> ok | {error, any()}.
233 clean_table(rdbms) ->
234 2 SQLQuery = [<<"TRUNCATE TABLE mongoose_cluster_id;">>],
235 2 try mongoose_rdbms:sql_query(global, SQLQuery) of
236 2 {selected, _} -> ok;
237
:-(
{updated, _} -> ok;
238
:-(
{error, _} = Err -> Err
239 catch
240 Class:Reason:Stacktrace ->
241
:-(
?LOG_WARNING(#{what => cluster_id_clean_failed,
242 text => <<"Error truncating mongoose_cluster_id table">>,
243 sql_query => SQLQuery,
244
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
245
:-(
{error, {Class, Reason}}
246 end;
247
:-(
clean_table(_) -> ok.
248
249 clean_cache() ->
250 2 clean_cache(which_volatile_backend_available()).
251
252 clean_cache(mnesia) ->
253 2 mnesia:dirty_delete(mongoose_cluster_id, cluster_id);
254 clean_cache(cets) ->
255
:-(
cets:delete(cets_cluster_id, cluster_id).
Line Hits Source