
1 %% This module is designed assuming the fact, that records inserted
2 %% into domains or events table could appear in any order.
3 %% I.e. events with ids [1, 2, 3] could appear as [1, 3] for a short amount of time.
4 %% We also assume, event ids are never reused.
5 -module(mongoose_domain_loader).
6 -export([initial_load/0,
7 check_for_updates/0]).
9 %% For tests
10 -export([find_gaps_between/2]).
11 -ignore_xref([find_gaps_between/2]).
13 -include("mongoose_logger.hrl").
15 %% There are two important functions, called by service_domain_db:
16 %% - initial_load
17 %% - check_for_updates
18 -spec initial_load() -> skip | ok.
19 initial_load() ->
20 359 case mongoose_loader_state:get(undefined) of
21 undefined ->
22 %% If mongoose_loader_state is undefined,
23 %% this means we start for the first time with this core process
24 345 cold_load();
25 reset ->
26 %% Case when state has been reset without restarting core
27 %% For example, when we detected out-of-sync situation
28 11 cold_load(),
29 11 remove_outdated_domains_from_core(),
30 11 ok;
31 _ ->
32 %% Already synced to some point.
33 %% Just read updates from the event table.
34 3 skip
35 end.
37 %% Load from the domain table
38 cold_load() ->
39 %% We assume that to sync successfully we need conditions:
40 %% - events table does not contain gaps (if it contain a gap, a record could be missing).
41 %% - we have to check the whole event table for gaps.
42 %% - we don't care about gaps in the domain table - even if some state
43 %% is not visible yet in the domain table, it would be visible once
44 %% we try to fix the event gap.
45 356 {MinEventId, MaxEventId} = mongoose_domain_sql:get_minmax_event_id(),
46 %% It's important to get gaps info before the loading of domains
47 354 Gaps = find_gaps_between(MinEventId, MaxEventId),
48 %% Do domain loading from the main domain table
49 354 load_data_from_base(0, 1000),
50 %% Try to fix gaps
51 354 fix_gaps(Gaps),
52 354 State = #{min_event_id => MinEventId, max_event_id => MaxEventId},
53 354 mongoose_loader_state:set(State),
54 354 ok.
56 load_data_from_base(FromId, PageSize) ->
57 354 try
58 354 load_data_from_base_loop(FromId, PageSize, 0)
59 catch Class:Reason:Stacktrace ->
60 1 Text = <<"Loading initial domains from RDBMS failed">>,
61 1 ?LOG_CRITICAL(#{what => load_domains_from_base_failed,
62 text => Text,
63 from_id => FromId,
64 class => Class, reason => Reason,
stacktrace => Stacktrace}),
66 1 service_domain_db:restart()
67 end.
69 load_data_from_base_loop(FromId, PageSize, Loaded) ->
70 %% Crash on init if select fails.
71 362 case mongoose_domain_sql:select_from(FromId, PageSize) of
72 353 [] -> {ok, #{count => Loaded}};
73 Rows ->
74 9 PageMaxId = row_to_id(lists:last(Rows)),
75 8 insert_rows_to_core(Rows),
76 8 ?LOG_INFO(#{what => load_data_from_base,
77 8 count => length(Rows)}),
78 8 load_data_from_base_loop(PageMaxId, PageSize, Loaded + length(Rows))
79 end.
81 remove_outdated_domains_from_core() ->
82 11 CurrentSource = self(),
83 11 OutdatedDomains = mongoose_domain_core:get_all_outdated(CurrentSource),
84 11 remove_domains(OutdatedDomains),
85 11 ?LOG_WARNING(#{what => remove_outdated_domains_from_core,
count => length(OutdatedDomains)}),
87 11 ok.
89 %% If this function fails
90 %% (for example, if the database is not available at this moment),
91 %% it is safe to just call it again
92 -spec check_for_updates() -> empty_db | no_new_updates | ok.
93 check_for_updates() ->
94 690 MinMax = mongoose_domain_sql:get_minmax_event_id(),
95 688 State = mongoose_loader_state:get(undefined),
96 688 case check_for_updates(MinMax, State) of
more_pages -> check_for_updates();
98 685 Other -> Other
99 end.
101 check_for_updates({null, null}, _State) ->
102 299 empty_db; %% empty db
103 check_for_updates({Min, Max},
104 #{min_event_id := Min, max_event_id := Max}) ->
105 149 no_new_updates; %% no new updates
106 check_for_updates(MinMax = {Min, Max},
107 #{min_event_id := OldMin, max_event_id := OldMax})
108 when is_integer(Min), is_integer(Max) ->
109 238 {MinEventId, MaxEventId} = limit_max_id(OldMax, MinMax, 1000),
110 238 check_if_id_is_still_relevant(OldMax, MinEventId),
111 238 NewGapsFromBelow =
112 case {OldMin, OldMax} of
113 {MinEventId, _} ->
114 119 []; %% MinEventId is the same
115 {null, null} ->
116 105 []; %% Starting from an empty table
117 _ when MinEventId > OldMin ->
118 12 []; %% someone cleaned event table by removing some events
119 _ -> % MinEventId < OldMin
120 %% Race condition detected, check for new gaps
121 2 lists:seq(MinEventId, OldMin)
122 end,
123 238 FromId = case {OldMin, OldMax} of
124 105 {null, null} -> MinEventId;
125 133 _ -> OldMax + 1
126 end,
127 238 NewGapsFromThePage =
128 case OldMax of
129 MaxEventId ->
130 2 [];
131 _ ->
132 236 Rows = mongoose_domain_sql:select_updates_between(FromId, MaxEventId),
133 236 apply_changes(Rows),
134 236 Ids = rows_to_ids(Rows),
135 236 ids_to_gaps(FromId, MaxEventId, Ids)
136 end,
137 238 fix_gaps(NewGapsFromBelow ++ NewGapsFromThePage),
138 237 State2 = #{min_event_id => MinEventId, max_event_id => MaxEventId},
139 237 mongoose_loader_state:set(State2),
140 237 case MaxEventId < Max of
true -> more_pages;
142 237 false -> ok
143 end.
145 limit_max_id(null, {MinEventId, MaxEventId}, PageSize) ->
146 105 {MinEventId, min(MaxEventId, MinEventId + PageSize)};
147 limit_max_id(OldMax, {MinEventId, MaxEventId}, PageSize) ->
148 133 {MinEventId, min(MaxEventId, OldMax + PageSize)}.
150 rows_to_ids(Rows) ->
151 242 [row_to_id(Row) || Row <- Rows].
153 check_if_id_is_still_relevant(null, _MinEventId) ->
154 %% Starting from the empty event table
155 105 ok;
156 check_if_id_is_still_relevant(OldMax, MinEventId) when OldMax < MinEventId ->
157 %% Looks like this node has no DB connection for a long time.
158 %% But the event log in the DB has been truncated by some other node
159 %% meanwhile. We have to load the whole set of data from DB.
160 12 Text = <<"DB domain log had some updates to domains deleted,"
161 " which we have not applied yet. Have to crash.">>,
162 12 ?LOG_CRITICAL(#{what => events_log_out_of_sync,
text => Text}),
164 12 service_domain_db:restart();
165 check_if_id_is_still_relevant(_OldMax, _MinEventId) ->
166 121 ok.
168 apply_changes([]) ->
170 apply_changes(Rows) ->
171 242 Results = lists:map(fun apply_change/1, Rows),
172 242 ?LOG_INFO(#{what => load_updated_domains,
173 skips => count(skip, Results),
174 deletes => count(delete, Results),
175 242 inserts => count(insert, Results)}).
177 count(X, List) ->
count(X, List, 0).
180 count(X, [X|T], Count) ->
count(X, T, Count + 1);
182 count(X, [_|T], Count) ->
count(X, T, Count);
184 count(_, [], Count) ->
187 apply_change({_Id, <<>>, null}) -> %% Skip dummy domain
188 30 skip;
189 apply_change({_Id, Domain, null}) ->
190 %% Removed or disabled domain.
191 %% According to the SQL query, the HostType is null when:
192 %% - There is no record for the domain in the domain_settings table.
193 %% - Or domain_settings.enabled equals false.
194 103 mongoose_domain_core:delete(Domain),
195 103 delete;
196 apply_change({_Id, Domain, HostType}) ->
197 %% Inserted, reinserted (removed & inserted) or enabled record.
198 172 maybe_insert_to_core(Domain, HostType),
199 172 insert.
201 insert_rows_to_core(Rows) ->
202 8 lists:foreach(fun insert_row_to_core/1, Rows).
204 insert_row_to_core({_Id, Domain, HostType}) ->
205 11 maybe_insert_to_core(Domain, HostType).
207 maybe_insert_to_core(Domain, HostType) ->
208 183 Source = self(),
209 183 case mongoose_domain_core:insert(Domain, HostType, Source) of
210 {error, bad_insert} ->
211 %% we already have such dynamic domain paired with
212 %% another host type, enforce update of the domain.
213 1 mongoose_domain_core:delete(Domain),
214 1 mongoose_domain_core:insert(Domain, HostType, Source);
215 182 _ -> ok %% ignore other errors
216 end.
218 remove_domains(DomainsWithHostTypes) ->
219 11 lists:foreach(fun remove_domain/1, DomainsWithHostTypes).
221 remove_domain({Domain, _HostType}) ->
224 row_to_id({Id, _Domain, _HostType}) ->
225 313 mongoose_rdbms:result_to_integer(Id).
227 find_gaps_between(null, null) ->
228 283 [];
229 find_gaps_between(MinEventId, MaxEventId) when (MaxEventId - MinEventId) < 100 ->
230 %% For small sets just grab ids without aggregating
231 79 Ids = mongoose_domain_sql:get_event_ids_between(MinEventId, MaxEventId),
232 79 ids_to_gaps(MinEventId, MaxEventId, Ids);
233 find_gaps_between(MinEventId, MaxEventId) ->
234 4 Expected = MaxEventId - MinEventId + 1,
235 4 Count = mongoose_domain_sql:count_events_between_ids(MinEventId, MaxEventId),
236 4 case Count of
Expected -> [];
238 _ ->
239 %% Recursive binary search using COUNT
240 4 Mid = MinEventId + (MaxEventId - MinEventId) div 2,
241 4 find_gaps_between(MinEventId, Mid) ++ find_gaps_between(Mid + 1, MaxEventId)
242 end.
244 ids_to_gaps(MinEventId, MaxEventId, Ids) ->
245 315 AllIds = lists:seq(MinEventId, MaxEventId),
246 %% Find missing ids
247 315 ordsets:subtract(AllIds, Ids).
249 fix_gaps(Gaps) ->
250 %% Retries are only for extra safety, one try would be enough usually
251 592 fix_gaps(Gaps, 3).
253 fix_gaps([], _Retries) ->
254 591 ok;
255 fix_gaps(Gaps, Retries) when Retries > 0 ->
256 %% A gap is an event id without a record. But it has records above and below.
257 %% It occurs pretty rarely.
258 %%
259 %% There are two reasons for it:
260 %% - a transaction is very slow, and not committed yet (but the key is already
261 %% autoincremented, so a gap appears).
262 %% - a transaction is aborted, so the key would never be used.
263 %%
264 %% There is no easy way to check for a reason.
265 %%
266 %% fix_gaps tries to insert_dummy_event with a gap event id.
267 %% This makes the state of transaction for gap events obvious:
268 %% - if this insert fails, this means the actual record finally
269 %% appears and we can read it.
270 %% - if this insert passes - the transaction, that initially used this id has failed.
271 %% (or that transaction would get aborted, which is still fine for a consistent sync.
272 %% The transactions are restarted in mongoose_domain_sql:transaction/1.
273 %% But it should rarely happen)
274 %%
275 %% RDBMS servers do not overwrite data when INSERT operation is used.
276 %% i.e. only one insert for a key succeeded.
277 7 [catch mongoose_domain_sql:insert_dummy_event(Id) || Id <- Gaps],
278 %% The gaps should be filled at this point
279 6 Rows = lists:append([mongoose_domain_sql:select_updates_between(Id, Id) || Id <- Gaps]),
280 6 ?LOG_WARNING(#{what => domain_fix_gaps, gaps => Gaps, rows => Rows}),
281 6 apply_changes(lists:usort(Rows)),
282 6 Ids = rows_to_ids(Rows),
283 %% We still retry to fill the gaps, in case insert_dummy_event fails
284 %% It could fail, if there is a database connectivity issues, for example
285 6 fix_gaps(Gaps -- Ids, Retries - 1).
