1 |
|
%% This module is designed assuming the fact, that records inserted |
2 |
|
%% into domains or events table could appear in any order. |
3 |
|
%% I.e. events with ids [1, 2, 3] could appear as [1, 3] for a short amount of time. |
4 |
|
%% We also assume, event ids are never reused. |
5 |
|
-module(mongoose_domain_loader). |
6 |
|
-export([initial_load/0, |
7 |
|
check_for_updates/0]). |
8 |
|
|
9 |
|
%% For tests |
10 |
|
-export([find_gaps_between/2]). |
11 |
|
-ignore_xref([find_gaps_between/2]). |
12 |
|
|
13 |
|
-include("mongoose_logger.hrl"). |
14 |
|
|
15 |
|
%% There are two important functions, called by service_domain_db: |
16 |
|
%% - initial_load |
17 |
|
%% - check_for_updates |
18 |
|
-spec initial_load() -> skip | ok. |
19 |
|
initial_load() -> |
20 |
105 |
case mongoose_loader_state:get(undefined) of |
21 |
|
undefined -> |
22 |
|
%% If mongoose_loader_state is undefined, |
23 |
|
%% this means we start for the first time with this core process |
24 |
83 |
cold_load(); |
25 |
|
reset -> |
26 |
|
%% Case when state has been reset without restarting core |
27 |
|
%% For example, when we detected out-of-sync situation |
28 |
10 |
cold_load(), |
29 |
10 |
remove_outdated_domains_from_core(), |
30 |
9 |
ok; |
31 |
|
_ -> |
32 |
|
%% Already synced to some point. |
33 |
|
%% Just read updates from the event table. |
34 |
12 |
skip |
35 |
|
end. |
36 |
|
|
37 |
|
%% Load from the domain table |
38 |
|
cold_load() -> |
39 |
|
%% We assume that to sync successfully we need conditions: |
40 |
|
%% - events table does not contain gaps (if it contain a gap, a record could be missing). |
41 |
|
%% - we have to check the whole event table for gaps. |
42 |
|
%% - we don't care about gaps in the domain table - even if some state |
43 |
|
%% is not visible yet in the domain table, it would be visible once |
44 |
|
%% we try to fix the event gap. |
45 |
93 |
{MinEventId, MaxEventId} = mongoose_domain_sql:get_minmax_event_id(), |
46 |
|
%% It's important to get gaps info before the loading of domains |
47 |
93 |
Gaps = find_gaps_between(MinEventId, MaxEventId), |
48 |
|
%% Do domain loading from the main domain table |
49 |
93 |
load_data_from_base(0, 1000), |
50 |
|
%% Try to fix gaps |
51 |
93 |
fix_gaps(Gaps), |
52 |
93 |
State = #{min_event_id => MinEventId, max_event_id => MaxEventId}, |
53 |
93 |
mongoose_loader_state:set(State), |
54 |
93 |
ok. |
55 |
|
|
56 |
|
load_data_from_base(FromId, PageSize) -> |
57 |
93 |
try |
58 |
93 |
load_data_from_base_loop(FromId, PageSize, 0) |
59 |
|
catch Class:Reason:Stacktrace -> |
60 |
1 |
Text = <<"Loading initial domains from RDBMS failed">>, |
61 |
1 |
?LOG_CRITICAL(#{what => load_domains_from_base_failed, |
62 |
|
text => Text, |
63 |
|
from_id => FromId, |
64 |
|
class => Class, reason => Reason, |
65 |
:-( |
stacktrace => Stacktrace}), |
66 |
1 |
service_domain_db:restart() |
67 |
|
end. |
68 |
|
|
69 |
|
load_data_from_base_loop(FromId, PageSize, Loaded) -> |
70 |
|
%% Crash on init if select fails. |
71 |
109 |
case mongoose_domain_sql:select_from(FromId, PageSize) of |
72 |
92 |
[] -> {ok, #{count => Loaded}}; |
73 |
|
Rows -> |
74 |
17 |
PageMaxId = row_to_id(lists:last(Rows)), |
75 |
16 |
insert_rows_to_core(Rows), |
76 |
16 |
?LOG_INFO(#{what => load_data_from_base, |
77 |
16 |
count => length(Rows)}), |
78 |
16 |
load_data_from_base_loop(PageMaxId, PageSize, Loaded + length(Rows)) |
79 |
|
end. |
80 |
|
|
81 |
|
remove_outdated_domains_from_core() -> |
82 |
10 |
CurrentSource = self(), |
83 |
10 |
OutdatedDomains = mongoose_domain_core:get_all_outdated(CurrentSource), |
84 |
10 |
remove_domains(OutdatedDomains), |
85 |
10 |
?LOG_WARNING(#{what => remove_outdated_domains_from_core, |
86 |
:-( |
count => length(OutdatedDomains)}), |
87 |
9 |
ok. |
88 |
|
|
89 |
|
%% If this function fails |
90 |
|
%% (for example, if the database is not available at this moment), |
91 |
|
%% it is safe to just call it again |
92 |
|
-spec check_for_updates() -> empty_db | no_new_updates | ok. |
93 |
|
check_for_updates() -> |
94 |
472 |
MinMax = mongoose_domain_sql:get_minmax_event_id(), |
95 |
471 |
State = mongoose_loader_state:get(undefined), |
96 |
471 |
case check_for_updates(MinMax, State) of |
97 |
:-( |
more_pages -> check_for_updates(); |
98 |
459 |
Other -> Other |
99 |
|
end. |
100 |
|
|
101 |
|
check_for_updates({null, null}, _State) -> |
102 |
74 |
empty_db; %% empty db |
103 |
|
check_for_updates({Min, Max}, |
104 |
|
#{min_event_id := Min, max_event_id := Max}) -> |
105 |
138 |
no_new_updates; %% no new updates |
106 |
|
check_for_updates(MinMax = {Min, Max}, |
107 |
|
#{min_event_id := OldMin, max_event_id := OldMax}) |
108 |
|
when is_integer(Min), is_integer(Max) -> |
109 |
257 |
{MinEventId, MaxEventId} = limit_max_id(OldMax, MinMax, 1000), |
110 |
257 |
check_if_id_is_still_relevant(OldMax, MinEventId), |
111 |
257 |
NewGapsFromBelow = |
112 |
|
case {OldMin, OldMax} of |
113 |
|
{MinEventId, _} -> |
114 |
202 |
[]; %% MinEventId is the same |
115 |
|
{null, null} -> |
116 |
31 |
[]; %% Starting from an empty table |
117 |
|
_ when MinEventId > OldMin -> |
118 |
11 |
[]; %% someone cleaned event table by removing some events |
119 |
|
_ -> % MinEventId < OldMin |
120 |
|
%% Race condition detected, check for new gaps |
121 |
13 |
lists:seq(MinEventId, OldMin) |
122 |
|
end, |
123 |
257 |
FromId = case {OldMin, OldMax} of |
124 |
31 |
{null, null} -> MinEventId; |
125 |
226 |
_ -> OldMax + 1 |
126 |
|
end, |
127 |
257 |
NewGapsFromThePage = |
128 |
|
case OldMax of |
129 |
|
MaxEventId -> |
130 |
3 |
[]; |
131 |
|
_ -> |
132 |
254 |
Rows = mongoose_domain_sql:select_updates_between(FromId, MaxEventId), |
133 |
253 |
apply_changes(Rows), |
134 |
253 |
Ids = rows_to_ids(Rows), |
135 |
253 |
ids_to_gaps(FromId, MaxEventId, Ids) |
136 |
|
end, |
137 |
247 |
fix_gaps(NewGapsFromBelow ++ NewGapsFromThePage), |
138 |
247 |
State2 = #{min_event_id => MinEventId, max_event_id => MaxEventId}, |
139 |
247 |
mongoose_loader_state:set(State2), |
140 |
247 |
case MaxEventId < Max of |
141 |
:-( |
true -> more_pages; |
142 |
247 |
false -> ok |
143 |
|
end. |
144 |
|
|
145 |
|
limit_max_id(null, {MinEventId, MaxEventId}, PageSize) -> |
146 |
31 |
{MinEventId, min(MaxEventId, MinEventId + PageSize)}; |
147 |
|
limit_max_id(OldMax, {MinEventId, MaxEventId}, PageSize) -> |
148 |
226 |
{MinEventId, min(MaxEventId, OldMax + PageSize)}. |
149 |
|
|
150 |
|
rows_to_ids(Rows) -> |
151 |
264 |
[row_to_id(Row) || Row <- Rows]. |
152 |
|
|
153 |
|
check_if_id_is_still_relevant(null, _MinEventId) -> |
154 |
|
%% Starting from the empty event table |
155 |
31 |
ok; |
156 |
|
check_if_id_is_still_relevant(OldMax, MinEventId) when OldMax < MinEventId -> |
157 |
|
%% Looks like this node has no DB connection for a long time. |
158 |
|
%% But the event log in the DB has been truncated by some other node |
159 |
|
%% meanwhile. We have to load the whole set of data from DB. |
160 |
11 |
Text = <<"DB domain log had some updates to domains deleted," |
161 |
|
" which we have not applied yet. Have to crash.">>, |
162 |
11 |
?LOG_CRITICAL(#{what => events_log_out_of_sync, |
163 |
:-( |
text => Text}), |
164 |
11 |
service_domain_db:restart(); |
165 |
|
check_if_id_is_still_relevant(_OldMax, _MinEventId) -> |
166 |
215 |
ok. |
167 |
|
|
168 |
|
apply_changes([]) -> |
169 |
9 |
ok; |
170 |
|
apply_changes(Rows) -> |
171 |
255 |
Results = lists:map(fun apply_change/1, Rows), |
172 |
255 |
?LOG_INFO(#{what => load_updated_domains, |
173 |
|
skips => count(skip, Results), |
174 |
|
deletes => count(delete, Results), |
175 |
255 |
inserts => count(insert, Results)}). |
176 |
|
|
177 |
|
count(X, List) -> |
178 |
:-( |
count(X, List, 0). |
179 |
|
|
180 |
|
count(X, [X|T], Count) -> |
181 |
:-( |
count(X, T, Count + 1); |
182 |
|
count(X, [_|T], Count) -> |
183 |
:-( |
count(X, T, Count); |
184 |
|
count(_, [], Count) -> |
185 |
:-( |
Count. |
186 |
|
|
187 |
|
apply_change({_Id, <<>>, null}) -> %% Skip dummy domain |
188 |
51 |
skip; |
189 |
|
apply_change({_Id, Domain, null}) -> |
190 |
|
%% Removed or disabled domain. |
191 |
|
%% According to the SQL query, the HostType is null when: |
192 |
|
%% - There is no record for the domain in the domain_settings table. |
193 |
|
%% - Or domain_settings.enabled equals false. |
194 |
146 |
mongoose_domain_core:delete(Domain), |
195 |
146 |
delete; |
196 |
|
apply_change({_Id, Domain, HostType}) -> |
197 |
|
%% Inserted, reinserted (removed & inserted) or enabled record. |
198 |
203 |
maybe_insert_to_core(Domain, HostType), |
199 |
203 |
insert. |
200 |
|
|
201 |
|
insert_rows_to_core(Rows) -> |
202 |
16 |
lists:foreach(fun insert_row_to_core/1, Rows). |
203 |
|
|
204 |
|
insert_row_to_core({_Id, Domain, HostType}) -> |
205 |
25 |
maybe_insert_to_core(Domain, HostType). |
206 |
|
|
207 |
|
maybe_insert_to_core(Domain, HostType) -> |
208 |
228 |
Source = self(), |
209 |
228 |
case mongoose_domain_core:insert(Domain, HostType, Source) of |
210 |
|
{error, bad_insert} -> |
211 |
|
%% we already have such dynamic domain paired with |
212 |
|
%% another host type, enforce update of the domain. |
213 |
1 |
mongoose_domain_core:delete(Domain), |
214 |
1 |
mongoose_domain_core:insert(Domain, HostType, Source); |
215 |
227 |
_ -> ok %% ignore other errors |
216 |
|
end. |
217 |
|
|
218 |
|
remove_domains(DomainsWithHostTypes) -> |
219 |
10 |
lists:foreach(fun remove_domain/1, DomainsWithHostTypes). |
220 |
|
|
221 |
|
remove_domain({Domain, _HostType}) -> |
222 |
:-( |
mongoose_domain_core:delete(Domain). |
223 |
|
|
224 |
|
row_to_id({Id, _Domain, _HostType}) -> |
225 |
416 |
mongoose_rdbms:result_to_integer(Id). |
226 |
|
|
227 |
|
find_gaps_between(null, null) -> |
228 |
56 |
[]; |
229 |
|
find_gaps_between(MinEventId, MaxEventId) when (MaxEventId - MinEventId) < 100 -> |
230 |
|
%% For small sets just grab ids without aggregating |
231 |
45 |
Ids = mongoose_domain_sql:get_event_ids_between(MinEventId, MaxEventId), |
232 |
45 |
ids_to_gaps(MinEventId, MaxEventId, Ids); |
233 |
|
find_gaps_between(MinEventId, MaxEventId) -> |
234 |
4 |
Expected = MaxEventId - MinEventId + 1, |
235 |
4 |
Count = mongoose_domain_sql:count_events_between_ids(MinEventId, MaxEventId), |
236 |
4 |
case Count of |
237 |
:-( |
Expected -> []; |
238 |
|
_ -> |
239 |
|
%% Recursive binary search using COUNT |
240 |
4 |
Mid = MinEventId + (MaxEventId - MinEventId) div 2, |
241 |
4 |
find_gaps_between(MinEventId, Mid) ++ find_gaps_between(Mid + 1, MaxEventId) |
242 |
|
end. |
243 |
|
|
244 |
|
ids_to_gaps(MinEventId, MaxEventId, Ids) -> |
245 |
298 |
AllIds = lists:seq(MinEventId, MaxEventId), |
246 |
|
%% Find missing ids |
247 |
289 |
ordsets:subtract(AllIds, Ids). |
248 |
|
|
249 |
|
fix_gaps(Gaps) -> |
250 |
|
%% Retries are only for extra safety, one try would be enough usually |
251 |
340 |
fix_gaps(Gaps, 3). |
252 |
|
|
253 |
|
fix_gaps([], _Retries) -> |
254 |
340 |
ok; |
255 |
|
fix_gaps(Gaps, Retries) when Retries > 0 -> |
256 |
|
%% A gap is an event id without a record. But it has records above and below. |
257 |
|
%% It occurs pretty rarely. |
258 |
|
%% |
259 |
|
%% There are two reasons for it: |
260 |
|
%% - a transaction is very slow, and not committed yet (but the key is already |
261 |
|
%% autoincremented, so a gap appears). |
262 |
|
%% - a transaction is aborted, so the key would never be used. |
263 |
|
%% |
264 |
|
%% There is no easy way to check for a reason. |
265 |
|
%% |
266 |
|
%% fix_gaps tries to insert_dummy_event with a gap event id. |
267 |
|
%% This makes the state of transaction for gap events obvious: |
268 |
|
%% - if this insert fails, this means the actual record finally |
269 |
|
%% appears and we can read it. |
270 |
|
%% - if this insert passes - the transaction, that initially used this id has failed. |
271 |
|
%% (or that transaction would get aborted, which is still fine for a consistent sync. |
272 |
|
%% The transactions are restarted in mongoose_domain_sql:transaction/1. |
273 |
|
%% But it should rarely happen) |
274 |
|
%% |
275 |
|
%% RDBMS servers do not overwrite data when INSERT operation is used. |
276 |
|
%% i.e. only one insert for a key succeeded. |
277 |
11 |
[catch mongoose_domain_sql:insert_dummy_event(Id) || Id <- Gaps], |
278 |
|
%% The gaps should be filled at this point |
279 |
11 |
Rows = lists:append([mongoose_domain_sql:select_updates_between(Id, Id) || Id <- Gaps]), |
280 |
11 |
?LOG_WARNING(#{what => domain_fix_gaps, gaps => Gaps, rows => Rows}), |
281 |
11 |
apply_changes(lists:usort(Rows)), |
282 |
11 |
Ids = rows_to_ids(Rows), |
283 |
|
%% We still retry to fill the gaps, in case insert_dummy_event fails |
284 |
|
%% It could fail, if there is a database connectivity issues, for example |
285 |
11 |
fix_gaps(Gaps -- Ids, Retries - 1). |