./ct_report/coverage/mongoose_cluster_id.COVER.html

1 -module(mongoose_cluster_id).
2
3 -include("mongoose.hrl").
4
5 -export([
6 start/0,
7 get_cached_cluster_id/0,
8 get_backend_cluster_id/0
9 ]).
10
11 % For testing purposes only
12 -export([clean_table/0, clean_cache/0]).
13
14 -ignore_xref([clean_table/0, clean_cache/0, get_backend_cluster_id/0]).
15
16 -record(mongoose_cluster_id, {key :: atom(), value :: cluster_id()}).
17 -type cluster_id() :: binary().
18 -type maybe_cluster_id() :: {ok, cluster_id()} | {error, any()}.
19 -type mongoose_backend() :: rdbms
20 | mnesia
21 | cets.
22
23 -spec start() -> maybe_cluster_id().
24 start() ->
25 %% Consider rewriting this logic, so it does not block the starting process.
26 %% Currently, we have to do an SQL query each time we restart MongooseIM
27 %% application in the tests.
28 96 init_cache(),
29 96 Backend = which_backend_available(),
30 96 IntBackend = which_volatile_backend_available(),
31 96 maybe_prepare_queries(Backend),
32 96 CachedRes = get_cached_cluster_id(IntBackend),
33 96 BackendRes = get_backend_cluster_id(),
34 96 case {CachedRes, BackendRes} of
35 {{ok, ID}, {ok, ID}} ->
36 33 {ok, ID};
37 {{ok, ID}, {error, _}} ->
38 1 set_new_cluster_id(ID, Backend);
39 {{error, _}, {ok, ID}} ->
40 60 set_new_cluster_id(ID, IntBackend);
41 {{error, _}, {error, _}} ->
42 2 make_and_set_new_cluster_id();
43 {{ok, CachedID}, {ok, BackendID}} ->
44
:-(
?LOG_ERROR(#{what => cluster_id_setup_conflict,
45 text => <<"Mnesia and Backend have different cluster IDs">>,
46
:-(
cached_id => CachedID, backend_id => BackendID}),
47
:-(
{error, conflict}
48 end.
49
50 %% Get cached version
51 -spec get_cached_cluster_id() -> maybe_cluster_id().
52 get_cached_cluster_id() ->
53 53 IntBackend = which_volatile_backend_available(),
54 53 get_cached_cluster_id(IntBackend).
55
56 get_cached_cluster_id(mnesia) ->
57 149 T = fun() -> mnesia:read(mongoose_cluster_id, cluster_id) end,
58 149 case mnesia:transaction(T) of
59 {atomic, [#mongoose_cluster_id{value = ClusterID}]} ->
60 87 {ok, ClusterID};
61 {atomic, []} ->
62 62 {error, cluster_id_not_in_cache};
63 {aborted, Reason} ->
64
:-(
{error, Reason}
65 end;
66 get_cached_cluster_id(cets) ->
67
:-(
case ets:lookup(cets_cluster_id, cluster_id) of
68 [{cluster_id, ClusterID}] ->
69
:-(
{ok, ClusterID};
70 [] ->
71
:-(
{error, cluster_id_not_in_cache}
72 end.
73
74 %% ====================================================================
75 %% Internal getters and setters
76 %% ====================================================================
77 -spec get_backend_cluster_id() -> maybe_cluster_id().
78 get_backend_cluster_id() ->
79 99 get_backend_cluster_id(which_backend_available()).
80
81 -spec set_new_cluster_id(cluster_id()) -> maybe_cluster_id().
82 set_new_cluster_id(ID) ->
83 2 set_new_cluster_id(ID, which_backend_available()).
84
85 -spec make_and_set_new_cluster_id() -> maybe_cluster_id().
86 make_and_set_new_cluster_id() ->
87 2 NewID = make_cluster_id(),
88 2 set_new_cluster_id(NewID).
89
90 %% ====================================================================
91 %% Internal functions
92 %% ====================================================================
93 init_cache() ->
94 96 init_cache(which_volatile_backend_available()).
95
96 init_cache(mnesia) ->
97 96 mongoose_mnesia:create_table(mongoose_cluster_id,
98 [{type, set},
99 {record_name, mongoose_cluster_id},
100 {attributes, record_info(fields, mongoose_cluster_id)},
101 {ram_copies, [node()]}
102 ]);
103 init_cache(cets) ->
104
:-(
cets:start(cets_cluster_id, #{}),
105
:-(
cets_discovery:add_table(mongoose_cets_discovery, cets_cluster_id),
106 %% We have to do it, because we want to read from across the cluster
107 %% in the start/0 function.
108
:-(
ok = cets_discovery:wait_for_ready(mongoose_cets_discovery, infinity).
109
110 -spec maybe_prepare_queries(mongoose_backend()) -> ok.
111
:-(
maybe_prepare_queries(mnesia) -> ok;
112 maybe_prepare_queries(rdbms) ->
113 96 mongoose_rdbms:prepare(cluster_insert_new, mongoose_cluster_id, [v],
114 <<"INSERT INTO mongoose_cluster_id(k,v) VALUES ('cluster_id', ?)">>),
115 96 mongoose_rdbms:prepare(cluster_select, mongoose_cluster_id, [],
116 <<"SELECT v FROM mongoose_cluster_id WHERE k='cluster_id'">>),
117 96 ok.
118
119 -spec execute_cluster_insert_new(binary()) -> mongoose_rdbms:query_result().
120 execute_cluster_insert_new(ID) ->
121 3 mongoose_rdbms:execute_successfully(global, cluster_insert_new, [ID]).
122
123 -spec make_cluster_id() -> cluster_id().
124 make_cluster_id() ->
125 2 uuid:uuid_to_string(uuid:get_v4(), binary_standard).
126
127 %% Which backend is enabled
128 -spec which_backend_available() -> mongoose_backend().
129 which_backend_available() ->
130 199 case mongoose_wpool:get_pool_settings(rdbms, global, default) of
131
:-(
undefined -> which_volatile_backend_available();
132 199 _ -> rdbms
133 end.
134
135 which_volatile_backend_available() ->
136 250 case mongoose_config:get_opt(internal_databases) of
137 #{cets := _} ->
138
:-(
cets;
139 #{mnesia := _} ->
140 250 mnesia
141 end.
142
143 -spec set_new_cluster_id(cluster_id(), mongoose_backend()) -> ok | {error, any()}.
144 set_new_cluster_id(ID, rdbms) ->
145 3 try execute_cluster_insert_new(ID) of
146 {updated, 1} ->
147 3 set_new_cluster_id(ID, which_volatile_backend_available()),
148 3 {ok, ID}
149 catch
150 Class:Reason:Stacktrace ->
151
:-(
?LOG_WARNING(#{what => cluster_id_set_failed,
152 text => <<"Error inserting cluster ID into RDBMS">>,
153 cluster_id => ID,
154
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
155
:-(
{error, {Class, Reason}}
156 end;
157 set_new_cluster_id(ID, mnesia) ->
158 63 T = fun() -> mnesia:write(#mongoose_cluster_id{key = cluster_id, value = ID}) end,
159 63 case mnesia:transaction(T) of
160 {atomic, ok} ->
161 63 {ok, ID};
162 {aborted, Reason} ->
163
:-(
{error, Reason}
164 end;
165 set_new_cluster_id(ID, cets) ->
166
:-(
cets:insert_serial(cets_cluster_id, {cluster_id, ID}),
167
:-(
{ok, ID}.
168
169 %% Get cluster ID
170 -spec get_backend_cluster_id(mongoose_backend()) -> maybe_cluster_id().
171 get_backend_cluster_id(rdbms) ->
172 99 try mongoose_rdbms:execute_successfully(global, cluster_select, []) of
173 95 {selected, [{ID}]} -> {ok, ID};
174 4 {selected, []} -> {error, no_value_in_backend}
175 catch
176 Class:Reason:Stacktrace ->
177
:-(
?LOG_WARNING(#{what => cluster_id_get_failed,
178 text => <<"Error getting cluster ID from RDBMS">>,
179
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
180
:-(
{error, {Class, Reason}}
181 end;
182 get_backend_cluster_id(mnesia) ->
183
:-(
get_cached_cluster_id(mnesia);
184 get_backend_cluster_id(cets) ->
185
:-(
get_cached_cluster_id(cets).
186
187 clean_table() ->
188 2 clean_table(which_backend_available()).
189
190 -spec clean_table(mongoose_backend()) -> ok | {error, any()}.
191 clean_table(rdbms) ->
192 2 SQLQuery = [<<"TRUNCATE TABLE mongoose_cluster_id;">>],
193 2 try mongoose_rdbms:sql_query(global, SQLQuery) of
194 2 {selected, _} -> ok;
195
:-(
{updated, _} -> ok;
196
:-(
{error, _} = Err -> Err
197 catch
198 Class:Reason:Stacktrace ->
199
:-(
?LOG_WARNING(#{what => cluster_id_clean_failed,
200 text => <<"Error truncating mongoose_cluster_id table">>,
201 sql_query => SQLQuery,
202
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
203
:-(
{error, {Class, Reason}}
204 end;
205
:-(
clean_table(_) -> ok.
206
207 clean_cache() ->
208 2 clean_cache(which_volatile_backend_available()).
209
210 clean_cache(mnesia) ->
211 2 mnesia:dirty_delete(mongoose_cluster_id, cluster_id);
212 clean_cache(cets) ->
213
:-(
cets:delete(cets_cluster_id, cluster_id).
Line Hits Source