1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mod_global_distrib_server_mgr). |
17 |
|
-author('piotr.nosek@erlang-solutions.com'). |
18 |
|
|
19 |
|
-include("mongoose.hrl"). |
20 |
|
|
21 |
|
-behaviour(gen_server). |
22 |
|
|
23 |
|
-export([start_link/2]). |
24 |
|
-export([get_connection/1, ping_proc/1, get_state_info/1]). |
25 |
|
-export([force_refresh/1, close_disabled/1]). |
26 |
|
-export([init/1, handle_continue/2, handle_call/3, handle_cast/2, |
27 |
|
handle_info/2, terminate/2, code_change/3]). |
28 |
|
|
29 |
|
%% Debug |
30 |
|
-export([get_enabled_endpoints/1, get_disabled_endpoints/1]). |
31 |
|
|
32 |
|
-ignore_xref([close_disabled/1, force_refresh/1, get_disabled_endpoints/1, |
33 |
|
get_enabled_endpoints/1, get_state_info/1, start_link/2]). |
34 |
|
|
35 |
|
-type endpoint() :: mod_global_distrib_utils:endpoint(). |
36 |
|
-type endpoint_pid_tuple() :: {endpoint(), pid()}. |
37 |
|
|
38 |
|
-record(endpoint_info, { |
39 |
|
endpoint :: endpoint(), |
40 |
|
conn_pool_ref :: atom(), |
41 |
|
conn_pool_pid :: pid(), |
42 |
|
monitor_ref :: reference() |
43 |
|
}). |
44 |
|
|
45 |
|
-type endpoint_info() :: #endpoint_info{}. |
46 |
|
-type endpoints_changes() :: [{enable | disable, endpoint()}]. |
47 |
|
|
48 |
|
-record(state, { |
49 |
|
server :: jid:lserver(), |
50 |
|
supervisor :: pid(), |
51 |
|
enabled :: [endpoint_info()], |
52 |
|
disabled :: [endpoint_info()], |
53 |
|
pending_endpoints :: endpoints_changes(), |
54 |
|
pending_gets :: queue:queue(tuple()), |
55 |
|
refresh_interval :: pos_integer(), |
56 |
|
refresh_interval_when_disconnected :: pos_integer(), |
57 |
|
gc_interval :: pos_integer(), |
58 |
|
%% Used by force_refresh to block until refresh is fully done. |
59 |
|
%% Listeners are notified only once and then this list is cleared. |
60 |
|
pending_endpoints_listeners = [] :: [pid()], |
61 |
|
%% Containts last result of get_endpoints |
62 |
|
last_endpoints :: [endpoint()] | undefined, |
63 |
|
conn_opts :: map() |
64 |
|
}). |
65 |
|
|
66 |
|
-type state() :: #state{}. |
67 |
|
%%-------------------------------------------------------------------- |
68 |
|
%% API |
69 |
|
%%-------------------------------------------------------------------- |
70 |
|
|
71 |
|
-spec start_link(Server :: jid:lserver(), ServerSup :: pid()) -> {ok, pid()} | {error, any()}. |
72 |
|
start_link(Server, ServerSup) when is_binary(Server) -> |
73 |
80 |
Name = mod_global_distrib_utils:server_to_mgr_name(Server), |
74 |
80 |
gen_server:start_link({local, Name}, ?MODULE, [Server, ServerSup], []). |
75 |
|
|
76 |
|
%% Will return only when endpoints changes (if there are any) are fully applied. |
77 |
|
%% If force refresh is already in progress, new concurrent calls won't trigger another refresh |
78 |
|
%% but will return 'ok' when the current one is finished. |
79 |
|
-spec force_refresh(Server :: jid:lserver()) -> ok. |
80 |
|
force_refresh(Server) -> |
81 |
10 |
do_call(Server, force_refresh). |
82 |
|
|
83 |
|
-spec close_disabled(Server :: jid:lserver()) -> ok. |
84 |
|
close_disabled(Server) -> |
85 |
:-( |
do_call(Server, close_disabled). |
86 |
|
|
87 |
|
-spec get_connection(Server :: jid:lserver()) -> {ok, pid()} | no_return(). |
88 |
|
get_connection(Server) -> |
89 |
47 |
{ok, CPoolRef} = do_call(Server, get_connection_pool), |
90 |
36 |
{ok, cpool:get_connection(CPoolRef)}. |
91 |
|
|
92 |
|
%% `ping_proc` instead of just `ping` to emphasize that this call does not ping |
93 |
|
%% some remote server but the manager process instead |
94 |
|
-spec ping_proc(Server :: jid:lserver()) -> pong | pang. |
95 |
|
ping_proc(Server) -> |
96 |
569 |
case catch do_call(Server, ping_proc) of |
97 |
502 |
pong -> pong; |
98 |
67 |
_Error -> pang |
99 |
|
end. |
100 |
|
|
101 |
|
-spec get_state_info(Server :: jid:lserver()) -> map(). |
102 |
|
get_state_info(Server) -> |
103 |
10 |
do_call(Server, get_state_info). |
104 |
|
|
105 |
|
%%-------------------------------------------------------------------- |
106 |
|
%% Debug API |
107 |
|
%%-------------------------------------------------------------------- |
108 |
|
|
109 |
|
-spec get_enabled_endpoints(Server :: jid:lserver()) -> [endpoint()]. |
110 |
|
get_enabled_endpoints(Server) -> |
111 |
16 |
do_call(Server, get_enabled_endpoints). |
112 |
|
|
113 |
|
-spec get_disabled_endpoints(Server :: jid:lserver()) -> [endpoint()]. |
114 |
|
get_disabled_endpoints(Server) -> |
115 |
16 |
do_call(Server, get_disabled_endpoints). |
116 |
|
|
117 |
|
%%-------------------------------------------------------------------- |
118 |
|
%% gen_server callbacks |
119 |
|
%%-------------------------------------------------------------------- |
120 |
|
|
121 |
|
init([Server, Supervisor]) -> |
122 |
80 |
process_flag(trap_exit, true), |
123 |
|
|
124 |
80 |
HostType = mod_global_distrib_utils:host_type(), |
125 |
80 |
ConnOpts = gen_mod:get_module_opt(HostType, mod_global_distrib_hosts_refresher, connections), |
126 |
80 |
#{endpoint_refresh_interval := RefreshInterval, |
127 |
|
endpoint_refresh_interval_when_empty := DisRefreshInterval, |
128 |
|
disabled_gc_interval := GCInterval} = ConnOpts, |
129 |
|
|
130 |
80 |
State = #state{ |
131 |
|
server = Server, |
132 |
|
supervisor = Supervisor, |
133 |
|
enabled = [], |
134 |
|
disabled = [], |
135 |
|
pending_endpoints = [], |
136 |
|
pending_gets = queue:new(), |
137 |
|
refresh_interval = RefreshInterval, |
138 |
|
refresh_interval_when_disconnected = DisRefreshInterval, |
139 |
|
gc_interval = GCInterval, |
140 |
|
conn_opts = ConnOpts |
141 |
|
}, |
142 |
|
|
143 |
80 |
?LOG_INFO(ls(#{what => gd_mgr_started}, State)), |
144 |
80 |
{ok, State, {continue, initial_refresh}}. |
145 |
|
|
146 |
|
handle_continue(initial_refresh, State) -> |
147 |
80 |
State2 = refresh_connections(State), |
148 |
77 |
schedule_refresh(State2), |
149 |
77 |
schedule_gc(State2), |
150 |
77 |
{noreply, State2}. |
151 |
|
|
152 |
|
handle_call(get_connection_pool, From, #state{ enabled = [], |
153 |
|
pending_gets = PendingGets } = State) -> |
154 |
12 |
{noreply, State#state{ pending_gets = queue:in(From, PendingGets) }}; |
155 |
|
handle_call(get_connection_pool, _From, #state{ enabled = Enabled } = State) -> |
156 |
24 |
{reply, {ok, pick_connection_pool(Enabled)}, State}; |
157 |
|
handle_call(force_refresh, From, #state{ pending_endpoints_listeners = [] } = State) -> |
158 |
10 |
State2 = refresh_connections(State), |
159 |
10 |
case State2#state.pending_endpoints of |
160 |
1 |
[] -> {reply, ok, State2}; |
161 |
9 |
_ -> {noreply, State2#state{ pending_endpoints_listeners = [From] }} |
162 |
|
end; |
163 |
|
handle_call(force_refresh, From, #state{ pending_endpoints_listeners = Listeners } = State) -> |
164 |
:-( |
{noreply, State#state{ pending_endpoints_listeners = [From | Listeners] }}; |
165 |
|
handle_call(close_disabled, _From, #state{ disabled = Disabled } = State) -> |
166 |
:-( |
lists:foreach( |
167 |
|
fun(#endpoint_info{ endpoint = Endpoint }) -> |
168 |
:-( |
stop_disabled(Endpoint, State) |
169 |
|
end, Disabled), |
170 |
:-( |
{reply, ok, State}; |
171 |
|
handle_call(get_enabled_endpoints, _From, State) -> |
172 |
16 |
{reply, [ CI#endpoint_info.endpoint || CI <- State#state.enabled ], State}; |
173 |
|
handle_call(get_disabled_endpoints, _From, State) -> |
174 |
16 |
{reply, [ CI#endpoint_info.endpoint || CI <- State#state.disabled ], State}; |
175 |
|
handle_call(ping_proc, _From, State) -> |
176 |
502 |
{reply, pong, State}; |
177 |
|
handle_call(get_state_info, _From, State) -> |
178 |
10 |
{reply, state_info(State), State}. |
179 |
|
|
180 |
|
handle_cast({call_timeout, FromPid, Msg}, State) -> |
181 |
:-( |
?LOG_WARNING(ls(#{what => gd_mgr_call_timeout, |
182 |
:-( |
caller_pid => FromPid, caller_msg => Msg}, State)), |
183 |
:-( |
{noreply, State}; |
184 |
|
handle_cast(Msg, State) -> |
185 |
:-( |
?UNEXPECTED_CAST(Msg), |
186 |
:-( |
{noreply, State}. |
187 |
|
|
188 |
|
handle_info(refresh, State) -> |
189 |
:-( |
State2 = case State#state.pending_endpoints of |
190 |
:-( |
[] -> refresh_connections(State); |
191 |
:-( |
_ -> State % May occur if we are in the middle of force_refresh |
192 |
|
end, |
193 |
:-( |
case State#state.pending_endpoints == State2#state.pending_endpoints of |
194 |
|
true -> |
195 |
:-( |
ok; |
196 |
|
_ -> |
197 |
:-( |
?LOG_INFO(ls(#{what => gd_mgr_refresh, |
198 |
|
text => <<"A list of pending_endpoints has changed in GD server manager">>, |
199 |
|
pending_endpoints_before => State#state.pending_endpoints, |
200 |
:-( |
pending_endpoints_after => State2#state.pending_endpoints}, State)) |
201 |
|
end, |
202 |
:-( |
schedule_refresh(State2), |
203 |
:-( |
{noreply, State2}; |
204 |
|
handle_info(disabled_gc, #state{ disabled = Disabled } = State) -> |
205 |
22 |
StoppedEndpoints = lists:flatmap( |
206 |
|
fun(#endpoint_info{ endpoint = Endpoint, conn_pool_ref = ConnPool }) -> |
207 |
3 |
try cpool:get_connection(ConnPool, no_wait_for_reconnect) of |
208 |
|
_ConnPid -> |
209 |
2 |
[] |
210 |
|
catch error:no_connections -> |
211 |
1 |
stop_disabled(Endpoint, State), |
212 |
1 |
[Endpoint] |
213 |
|
end |
214 |
|
end, Disabled), |
215 |
22 |
case StoppedEndpoints of |
216 |
|
[] -> |
217 |
21 |
ok; |
218 |
|
_ -> |
219 |
1 |
?LOG_INFO(ls(#{what => gd_mgr_disabled_gc, |
220 |
|
text => <<"GD server manager stops some inactive endpoints">>, |
221 |
:-( |
stopped_endpoints => StoppedEndpoints}, State)) |
222 |
|
end, |
223 |
22 |
schedule_gc(State), |
224 |
22 |
{noreply, State}; |
225 |
|
handle_info(process_pending_get, #state{ enabled = [] } = State) -> |
226 |
:-( |
{noreply, State}; |
227 |
|
handle_info(process_pending_get, #state{ pending_gets = PendingGets, |
228 |
|
enabled = Enabled } = State) -> |
229 |
12 |
NState = |
230 |
|
case queue:out(PendingGets) of |
231 |
|
{{value, From}, NewPendingGets} -> |
232 |
12 |
CPoolRef = pick_connection_pool(Enabled), |
233 |
12 |
gen_server:reply(From, {ok, CPoolRef}), |
234 |
12 |
State#state{ pending_gets = NewPendingGets }; |
235 |
|
{empty, _} -> |
236 |
:-( |
State |
237 |
|
end, |
238 |
12 |
maybe_schedule_process_get(NState), |
239 |
12 |
{noreply, NState}; |
240 |
|
handle_info(process_pending_endpoint, |
241 |
|
#state{ pending_endpoints = [{enable, Endpoint} | RPendingEndpoints] } = State) -> |
242 |
105 |
State2 = |
243 |
|
case catch enable(Endpoint, State) of |
244 |
|
{ok, NState0} -> |
245 |
100 |
?LOG_INFO(ls(#{what => gd_endpoint_enabled, |
246 |
|
text => <<"GD server manager enables pending endpoint">>, |
247 |
:-( |
endpoint => Endpoint}, State)), |
248 |
100 |
NState0; |
249 |
|
Error -> |
250 |
5 |
?LOG_ERROR(ls(#{what => gd_endpoint_enabling_failed, |
251 |
|
text => <<"GD server manager cannot enable endpoint">>, |
252 |
:-( |
endpoint => Endpoint, reason => Error}, State)), |
253 |
5 |
State |
254 |
|
end, |
255 |
|
|
256 |
105 |
maybe_schedule_process_get(State2), |
257 |
105 |
maybe_schedule_process_endpoint(RPendingEndpoints), |
258 |
105 |
State3 = State2#state{ pending_endpoints = RPendingEndpoints }, |
259 |
105 |
State4 = maybe_notify_endpoints_listeners(State3), |
260 |
105 |
{noreply, State4}; |
261 |
|
handle_info(process_pending_endpoint, |
262 |
|
#state{ pending_endpoints = [{disable, Endpoint} | RPendingEndpoints] } = State) -> |
263 |
3 |
State2 = |
264 |
|
case catch disable(Endpoint, State) of |
265 |
|
{ok, NState0} -> |
266 |
3 |
?LOG_INFO(ls(#{what => gd_endpoint_disabled, |
267 |
|
text => <<"GD server manager disables pending endpoint">>, |
268 |
:-( |
endpoint => Endpoint}, State)), |
269 |
3 |
NState0; |
270 |
|
Error -> |
271 |
:-( |
?LOG_ERROR(ls(#{what => gd_endpoint_disabling_failed, |
272 |
|
text => <<"GD server manager cannot disable endpoint">>, |
273 |
:-( |
endpoint => Endpoint, reason => Error}, State)), |
274 |
:-( |
State |
275 |
|
end, |
276 |
|
|
277 |
3 |
maybe_schedule_process_endpoint(RPendingEndpoints), |
278 |
3 |
State3 = State2#state{ pending_endpoints = RPendingEndpoints }, |
279 |
3 |
State4 = maybe_notify_endpoints_listeners(State3), |
280 |
3 |
{noreply, State4}; |
281 |
|
handle_info({'DOWN', MonitorRef, _Type, Pid, Reason}, #state{ enabled = Enabled, |
282 |
|
disabled = Disabled } = State) -> |
283 |
98 |
{Endpoint, Type, NState} = |
284 |
|
case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Enabled) of |
285 |
|
{value, #endpoint_info{ endpoint = Endpoint0 }, NEnabled} -> |
286 |
95 |
{Endpoint0, enabled, State#state{ enabled = NEnabled }}; |
287 |
|
false -> |
288 |
3 |
case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Disabled) of |
289 |
|
{value, #endpoint_info{ endpoint = Endpoint0 }, NDisabled} -> |
290 |
3 |
{Endpoint0, disabled, State#state{ disabled = NDisabled }}; |
291 |
|
false -> |
292 |
:-( |
{Pid, unknown, State} |
293 |
|
end |
294 |
|
end, |
295 |
|
|
296 |
98 |
Reason2 = |
297 |
|
case Reason of |
298 |
98 |
shutdown -> normal; |
299 |
:-( |
_Other -> Reason |
300 |
|
end, |
301 |
98 |
?LOG_INFO(ls(#{what => gd_endpoint_closed, |
302 |
|
text => <<"Disconnected from a GD endpoint">>, |
303 |
:-( |
type => Type, endpoint => Endpoint, reason => Reason2}, State)), |
304 |
98 |
{noreply, NState}; |
305 |
|
handle_info(Msg, State) -> |
306 |
:-( |
?UNEXPECTED_INFO(Msg), |
307 |
:-( |
{noreply, State}. |
308 |
|
|
309 |
|
code_change(_OldVsn, State, _Extra) -> |
310 |
:-( |
{ok, State}. |
311 |
|
|
312 |
|
terminate(_Reason, _State) -> |
313 |
|
%% TODO: Cleanup |
314 |
80 |
ok. |
315 |
|
|
316 |
|
%%-------------------------------------------------------------------- |
317 |
|
%% Internal functions |
318 |
|
%%-------------------------------------------------------------------- |
319 |
|
|
320 |
|
-spec do_call(Server :: jid:lserver(), Msg :: any()) -> any(). |
321 |
|
do_call(Server, Msg) -> |
322 |
668 |
MgrName = mod_global_distrib_utils:server_to_mgr_name(Server), |
323 |
668 |
try |
324 |
668 |
gen_server:call(MgrName, Msg) |
325 |
|
catch exit:{timeout,_} = Reason:Stacktrace -> |
326 |
:-( |
catch gen_server:cast(MgrName, {call_timeout, self(), Msg}), |
327 |
:-( |
erlang:raise(exit, Reason, Stacktrace) |
328 |
|
end. |
329 |
|
|
330 |
|
-spec schedule_refresh(State :: state()) -> state(). |
331 |
|
schedule_refresh(#state{ refresh_interval = Interval, last_endpoints = [_|_] } = State) -> |
332 |
74 |
do_schedule_refresh(State, Interval); |
333 |
|
schedule_refresh(#state{ refresh_interval_when_disconnected = Interval } = State) -> |
334 |
|
%% Try more often by default when get_endpoints returns empty list |
335 |
3 |
do_schedule_refresh(State, Interval). |
336 |
|
|
337 |
|
do_schedule_refresh(State, Interval) -> |
338 |
77 |
erlang:send_after(timer:seconds(Interval), self(), refresh), |
339 |
77 |
State. |
340 |
|
|
341 |
|
-spec schedule_gc(State :: state()) -> state(). |
342 |
|
schedule_gc(#state{ gc_interval = Interval } = State) -> |
343 |
99 |
erlang:send_after(timer:seconds(Interval), self(), disabled_gc), |
344 |
99 |
State. |
345 |
|
|
346 |
|
-spec maybe_schedule_process_endpoint(PendingEndpoints :: list()) -> any(). |
347 |
|
maybe_schedule_process_endpoint([]) -> |
348 |
82 |
nop; |
349 |
|
maybe_schedule_process_endpoint(_) -> |
350 |
109 |
self() ! process_pending_endpoint. |
351 |
|
|
352 |
|
-spec maybe_schedule_process_get(state()) -> any(). |
353 |
|
maybe_schedule_process_get(#state{ pending_gets = PendingGets, enabled = Enabled }) -> |
354 |
117 |
case PendingGets == queue:new() of |
355 |
105 |
true -> nop; |
356 |
|
false -> |
357 |
12 |
Enabled =/= [] orelse error(enabled_is_empty), |
358 |
12 |
self() ! process_pending_get |
359 |
|
end. |
360 |
|
|
361 |
|
-spec maybe_notify_endpoints_listeners(state()) -> state(). |
362 |
|
maybe_notify_endpoints_listeners(#state{ pending_endpoints = [], |
363 |
|
pending_endpoints_listeners = Listeners } = State) -> |
364 |
78 |
lists:foreach(fun(Listener) -> gen_server:reply(Listener, ok) end, Listeners), |
365 |
78 |
State#state{ pending_endpoints_listeners = [] }; |
366 |
|
maybe_notify_endpoints_listeners(State) -> |
367 |
30 |
State. |
368 |
|
|
369 |
|
-spec pick_connection_pool(Enabled :: [endpoint_info()]) -> pid() | no_connections. |
370 |
|
pick_connection_pool(Enabled) -> |
371 |
36 |
#endpoint_info{ conn_pool_ref = PoolRef } = lists:nth(rand:uniform(length(Enabled)), Enabled), |
372 |
36 |
PoolRef. |
373 |
|
|
374 |
|
-spec refresh_connections(State :: state()) -> state(). |
375 |
|
refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoints, |
376 |
|
last_endpoints = LastEndpoints } = State) -> |
377 |
90 |
?LOG_DEBUG(ls(#{what => gd_refreshing_endpoints}, State)), |
378 |
90 |
NewEndpoints = get_endpoints(Server), |
379 |
87 |
case NewEndpoints of |
380 |
|
LastEndpoints -> |
381 |
5 |
nothing_new; |
382 |
|
_ -> |
383 |
82 |
?LOG_INFO(ls(#{what => gd_endpoints_change, |
384 |
|
old_endpoints => LastEndpoints, |
385 |
:-( |
new_endpoints => NewEndpoints}, State)) |
386 |
|
end, |
387 |
87 |
?LOG_DEBUG(ls(#{what => gd_fetched_endpoints, fetched_endpoints => NewEndpoints}, State)), |
388 |
|
|
389 |
87 |
NPendingEndpoints = resolve_pending(NewEndpoints, State#state.enabled), |
390 |
87 |
log_endpoints_changes(Server, NPendingEndpoints, State), |
391 |
|
|
392 |
87 |
case PendingEndpoints of |
393 |
83 |
[] -> maybe_schedule_process_endpoint(NPendingEndpoints); |
394 |
4 |
_ -> already_scheduled |
395 |
|
end, |
396 |
|
|
397 |
87 |
FinalPendingEndpoints = PendingEndpoints ++ NPendingEndpoints, |
398 |
|
|
399 |
87 |
case FinalPendingEndpoints of |
400 |
|
[] -> |
401 |
4 |
no_log; |
402 |
|
_ -> |
403 |
83 |
?LOG_DEBUG(ls(#{what => gd_endpoints_update_scheduled, |
404 |
|
new_changes => NPendingEndpoints, |
405 |
|
pending_changes => FinalPendingEndpoints, |
406 |
|
new_changes_length => length(NPendingEndpoints), |
407 |
|
pending_changes_length => length(FinalPendingEndpoints)}, |
408 |
83 |
State)) |
409 |
|
end, |
410 |
87 |
State#state{ pending_endpoints = FinalPendingEndpoints, last_endpoints = NewEndpoints }. |
411 |
|
|
412 |
|
-spec get_endpoints(Server :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()]. |
413 |
|
get_endpoints(Server) -> |
414 |
90 |
EndpointsToResolve = mod_global_distrib_mapping:endpoints(Server), |
415 |
90 |
mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve). |
416 |
|
|
417 |
|
-spec resolve_pending(NewEndpointList :: [mod_global_distrib_utils:endpoint()], |
418 |
|
OldEnabled :: [endpoint_pid_tuple()]) -> |
419 |
|
endpoints_changes(). |
420 |
|
resolve_pending([], []) -> |
421 |
87 |
[]; |
422 |
|
resolve_pending([], [#endpoint_info{ endpoint = ToDisable } | RToDisable]) -> |
423 |
3 |
[{disable, ToDisable} | resolve_pending([], RToDisable)]; |
424 |
|
resolve_pending([MaybeToEnable | RNewEndpoints], OldEnabled) -> |
425 |
110 |
case lists:keytake(MaybeToEnable, #endpoint_info.endpoint, OldEnabled) of |
426 |
107 |
false -> [{enable, MaybeToEnable} | resolve_pending(RNewEndpoints, OldEnabled)]; |
427 |
3 |
{value, _, NOldEnabled} -> resolve_pending(RNewEndpoints, NOldEnabled) |
428 |
|
end. |
429 |
|
|
430 |
|
-spec log_endpoints_changes(Server :: jid:lserver(), |
431 |
|
EndpointsChanges :: endpoints_changes(), term()) -> any(). |
432 |
|
log_endpoints_changes(Server, [], State) -> |
433 |
4 |
?LOG_DEBUG(ls(#{what => gd_same_endpoints, server => Server, |
434 |
4 |
text => <<"No endpoint changes">>}, State)); |
435 |
|
log_endpoints_changes(Server, EndpointsChanges, State) -> |
436 |
83 |
?LOG_INFO(ls(#{what => gd_endpoints_changes, server => Server, |
437 |
107 |
to_enable => [ E || {enable, E} <- EndpointsChanges ], |
438 |
3 |
to_disable => [ E || {disable, E} <- EndpointsChanges ]}, State)). |
439 |
|
|
440 |
|
-spec enable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}. |
441 |
|
enable(Endpoint, #state{ disabled = Disabled, supervisor = Supervisor, |
442 |
|
enabled = Enabled, server = Server, conn_opts = ConnOpts } = State) -> |
443 |
105 |
case lists:keytake(Endpoint, #endpoint_info.endpoint, Disabled) of |
444 |
|
false -> |
445 |
105 |
case catch mod_global_distrib_server_sup:start_pool(Supervisor, Endpoint, |
446 |
|
Server, ConnOpts) of |
447 |
|
{ok, ConnPoolRef, ConnPoolPid} -> |
448 |
100 |
MonitorRef = monitor(process, ConnPoolPid), |
449 |
100 |
EndpointInfo = #endpoint_info{ |
450 |
|
endpoint = Endpoint, |
451 |
|
conn_pool_ref = ConnPoolRef, |
452 |
|
conn_pool_pid = ConnPoolPid, |
453 |
|
monitor_ref = MonitorRef |
454 |
|
}, |
455 |
100 |
{ok, State#state{ enabled = [EndpointInfo | Enabled] }}; |
456 |
|
Error -> |
457 |
5 |
{error, Error} |
458 |
|
end; |
459 |
|
{value, EndpointInfo, NewDisabled} -> |
460 |
:-( |
{ok, State#state{ enabled = [EndpointInfo | Enabled], disabled = NewDisabled }} |
461 |
|
end. |
462 |
|
|
463 |
|
-spec disable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}. |
464 |
|
disable(Endpoint, #state{ disabled = Disabled, enabled = Enabled } = State) -> |
465 |
3 |
{value, EndpointInfo, NewEnabled} = lists:keytake(Endpoint, #endpoint_info.endpoint, Enabled), |
466 |
3 |
{ok, State#state{ enabled = NewEnabled, disabled = [EndpointInfo | Disabled] }}. |
467 |
|
|
468 |
|
-spec stop_disabled(Endpoint :: endpoint(), State :: state()) -> any(). |
469 |
|
stop_disabled(Endpoint, State) -> |
470 |
1 |
case catch mod_global_distrib_server_sup:stop_pool( |
471 |
|
State#state.supervisor, Endpoint) of |
472 |
|
ok -> |
473 |
:-( |
ok; |
474 |
|
Error -> |
475 |
1 |
?LOG_ERROR(ls(#{what => gd_cannot_close_disabled_connection, |
476 |
|
reason => Error, endpoint => Endpoint}, |
477 |
:-( |
State)) |
478 |
|
end. |
479 |
|
|
480 |
|
state_info(#state{ |
481 |
|
server = Server, |
482 |
|
supervisor = Supervisor, |
483 |
|
enabled = Enabled, |
484 |
|
disabled = Disabled, |
485 |
|
pending_endpoints = PendingEndpoints, |
486 |
|
pending_gets = PendingGets, |
487 |
|
refresh_interval = RefreshInterval, |
488 |
|
refresh_interval_when_disconnected = DisRefreshInterval, |
489 |
|
gc_interval = GCInterval, |
490 |
|
pending_endpoints_listeners = PendingEndpointsListeners, |
491 |
|
last_endpoints = LastEndpoints |
492 |
|
}) -> |
493 |
463 |
#{server => Server, |
494 |
|
supervisor => Supervisor, |
495 |
|
enabled => Enabled, |
496 |
|
disabled => Disabled, |
497 |
|
pending_endpoints => PendingEndpoints, |
498 |
|
pending_gets => PendingGets, |
499 |
|
refresh_interval => RefreshInterval, |
500 |
|
refresh_interval_when_disconnected => DisRefreshInterval, |
501 |
|
gc_interval => GCInterval, |
502 |
|
pending_endpoints_listeners => PendingEndpointsListeners, |
503 |
|
last_endpoints => LastEndpoints}. |
504 |
|
|
505 |
|
|
506 |
|
%% Log State |
507 |
|
ls(LogMeta, State) -> |
508 |
453 |
maps:merge(state_info(State), LogMeta). |