./ct_report/coverage/mod_global_distrib_server_mgr.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_global_distrib_server_mgr).
17 -author('piotr.nosek@erlang-solutions.com').
18
19 -include("mongoose.hrl").
20
21 -behaviour(gen_server).
22
23 -export([start_link/2]).
24 -export([get_connection/1, ping_proc/1, get_state_info/1]).
25 -export([force_refresh/1, close_disabled/1]).
26 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
27
28 %% Debug
29 -export([get_enabled_endpoints/1, get_disabled_endpoints/1]).
30
31 -ignore_xref([close_disabled/1, force_refresh/1, get_disabled_endpoints/1,
32 get_enabled_endpoints/1, get_state_info/1, start_link/2]).
33
34 -type endpoint() :: mod_global_distrib_utils:endpoint().
35 -type endpoint_pid_tuple() :: {endpoint(), pid()}.
36
37 -record(endpoint_info, {
38 endpoint :: endpoint(),
39 conn_pool_ref :: atom(),
40 conn_pool_pid :: pid(),
41 monitor_ref :: reference()
42 }).
43
44 -type endpoint_info() :: #endpoint_info{}.
45 -type endpoints_changes() :: [{enable | disable, endpoint()}].
46
47 -record(state, {
48 server :: jid:lserver(),
49 supervisor :: pid(),
50 enabled :: [endpoint_info()],
51 disabled :: [endpoint_info()],
52 pending_endpoints :: endpoints_changes(),
53 pending_gets :: queue:queue(tuple()),
54 refresh_interval :: pos_integer(),
55 refresh_interval_when_disconnected :: pos_integer(),
56 gc_interval :: pos_integer(),
57 %% Used by force_refresh to block until refresh is fully done.
58 %% Listeners are notified only once and then this list is cleared.
59 pending_endpoints_listeners = [] :: [pid()],
60 %% Containts last result of get_endpoints
61 last_endpoints :: [endpoint()] | undefined,
62 conn_opts :: map()
63 }).
64
65 -type state() :: #state{}.
66 %%--------------------------------------------------------------------
67 %% API
68 %%--------------------------------------------------------------------
69
70 -spec start_link(Server :: jid:lserver(), ServerSup :: pid()) -> {ok, pid()} | {error, any()}.
71 start_link(Server, ServerSup) when is_binary(Server) ->
72 78 Name = mod_global_distrib_utils:server_to_mgr_name(Server),
73 78 gen_server:start_link({local, Name}, ?MODULE, [Server, ServerSup], []).
74
75 %% Will return only when endpoints changes (if there are any) are fully applied.
76 %% If force refresh is already in progress, new concurrent calls won't trigger another refresh
77 %% but will return 'ok' when the current one is finished.
78 -spec force_refresh(Server :: jid:lserver()) -> ok.
79 force_refresh(Server) ->
80 10 do_call(Server, force_refresh).
81
82 -spec close_disabled(Server :: jid:lserver()) -> ok.
83 close_disabled(Server) ->
84
:-(
do_call(Server, close_disabled).
85
86 -spec get_connection(Server :: jid:lserver()) -> {ok, pid()} | no_return().
87 get_connection(Server) ->
88 50 {ok, CPoolRef} = do_call(Server, get_connection_pool),
89 37 {ok, cpool:get_connection(CPoolRef)}.
90
91 %% `ping_proc` instead of just `ping` to emphasize that this call does not ping
92 %% some remote server but the manager process instead
93 -spec ping_proc(Server :: jid:lserver()) -> pong | pang.
94 ping_proc(Server) ->
95 1106 case catch do_call(Server, ping_proc) of
96 1041 pong -> pong;
97 65 _Error -> pang
98 end.
99
100 -spec get_state_info(Server :: jid:lserver()) -> map().
101 get_state_info(Server) ->
102 10 do_call(Server, get_state_info).
103
104 %%--------------------------------------------------------------------
105 %% Debug API
106 %%--------------------------------------------------------------------
107
108 -spec get_enabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
109 get_enabled_endpoints(Server) ->
110 16 do_call(Server, get_enabled_endpoints).
111
112 -spec get_disabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
113 get_disabled_endpoints(Server) ->
114 16 do_call(Server, get_disabled_endpoints).
115
116 %%--------------------------------------------------------------------
117 %% gen_server callbacks
118 %%--------------------------------------------------------------------
119
120 init([Server, Supervisor]) ->
121 78 process_flag(trap_exit, true),
122
123 78 HostType = mod_global_distrib_utils:host_type(),
124 78 ConnOpts = gen_mod:get_module_opt(HostType, mod_global_distrib_hosts_refresher, connections),
125 78 #{endpoint_refresh_interval := RefreshInterval,
126 endpoint_refresh_interval_when_empty := DisRefreshInterval,
127 disabled_gc_interval := GCInterval} = ConnOpts,
128
129 78 State = #state{
130 server = Server,
131 supervisor = Supervisor,
132 enabled = [],
133 disabled = [],
134 pending_endpoints = [],
135 pending_gets = queue:new(),
136 refresh_interval = RefreshInterval,
137 refresh_interval_when_disconnected = DisRefreshInterval,
138 gc_interval = GCInterval,
139 conn_opts = ConnOpts
140 },
141
142 78 State2 = refresh_connections(State),
143 78 schedule_refresh(State2),
144 78 schedule_gc(State2),
145
146 78 ?LOG_INFO(ls(#{what => gd_mgr_started}, State2)),
147 78 {ok, State2}.
148
149 handle_call(get_connection_pool, From, #state{ enabled = [],
150 pending_gets = PendingGets } = State) ->
151 1 {noreply, State#state{ pending_gets = queue:in(From, PendingGets) }};
152 handle_call(get_connection_pool, _From, #state{ enabled = Enabled } = State) ->
153 36 {reply, {ok, pick_connection_pool(Enabled)}, State};
154 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = [] } = State) ->
155 10 State2 = refresh_connections(State),
156 10 case State2#state.pending_endpoints of
157 4 [] -> {reply, ok, State2};
158 6 _ -> {noreply, State2#state{ pending_endpoints_listeners = [From] }}
159 end;
160 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = Listeners } = State) ->
161
:-(
{noreply, State#state{ pending_endpoints_listeners = [From | Listeners] }};
162 handle_call(close_disabled, _From, #state{ disabled = Disabled } = State) ->
163
:-(
lists:foreach(
164 fun(#endpoint_info{ endpoint = Endpoint }) ->
165
:-(
stop_disabled(Endpoint, State)
166 end, Disabled),
167
:-(
{reply, ok, State};
168 handle_call(get_enabled_endpoints, _From, State) ->
169 16 {reply, [ CI#endpoint_info.endpoint || CI <- State#state.enabled ], State};
170 handle_call(get_disabled_endpoints, _From, State) ->
171 16 {reply, [ CI#endpoint_info.endpoint || CI <- State#state.disabled ], State};
172 handle_call(ping_proc, _From, State) ->
173 1041 {reply, pong, State};
174 handle_call(get_state_info, _From, State) ->
175 10 {reply, state_info(State), State}.
176
177 handle_cast({call_timeout, FromPid, Msg}, State) ->
178
:-(
?LOG_WARNING(ls(#{what => gd_mgr_call_timeout,
179
:-(
caller_pid => FromPid, caller_msg => Msg}, State)),
180
:-(
{noreply, State};
181 handle_cast(Msg, State) ->
182
:-(
?UNEXPECTED_CAST(Msg),
183
:-(
{noreply, State}.
184
185 handle_info(refresh, State) ->
186
:-(
State2 = case State#state.pending_endpoints of
187
:-(
[] -> refresh_connections(State);
188
:-(
_ -> State % May occur if we are in the middle of force_refresh
189 end,
190
:-(
case State#state.pending_endpoints == State2#state.pending_endpoints of
191 true ->
192
:-(
ok;
193 _ ->
194
:-(
?LOG_INFO(ls(#{what => gd_mgr_refresh,
195 text => <<"A list of pending_endpoints has changed in GD server manager">>,
196 pending_endpoints_before => State#state.pending_endpoints,
197
:-(
pending_endpoints_after => State2#state.pending_endpoints}, State))
198 end,
199
:-(
schedule_refresh(State2),
200
:-(
{noreply, State2};
201 handle_info(disabled_gc, #state{ disabled = Disabled } = State) ->
202 20 StoppedEndpoints = lists:flatmap(
203 fun(#endpoint_info{ endpoint = Endpoint, conn_pool_ref = ConnPool }) ->
204 3 try cpool:get_connection(ConnPool, no_wait_for_reconnect) of
205 _ConnPid ->
206 2 []
207 catch error:no_connections ->
208 1 stop_disabled(Endpoint, State),
209 1 [Endpoint]
210 end
211 end, Disabled),
212 20 case StoppedEndpoints of
213 [] ->
214 19 ok;
215 _ ->
216 1 ?LOG_INFO(ls(#{what => gd_mgr_disabled_gc,
217 text => <<"GD server manager stops some inactive endpoints">>,
218
:-(
stopped_endpoints => StoppedEndpoints}, State))
219 end,
220 20 schedule_gc(State),
221 20 {noreply, State};
222 handle_info(process_pending_get, #state{ enabled = [] } = State) ->
223
:-(
{noreply, State};
224 handle_info(process_pending_get, #state{ pending_gets = PendingGets,
225 enabled = Enabled } = State) ->
226 1 NState =
227 case queue:out(PendingGets) of
228 {{value, From}, NewPendingGets} ->
229 1 CPoolRef = pick_connection_pool(Enabled),
230 1 gen_server:reply(From, {ok, CPoolRef}),
231 1 State#state{ pending_gets = NewPendingGets };
232 {empty, _} ->
233
:-(
State
234 end,
235 1 maybe_schedule_process_get(NState),
236 1 {noreply, NState};
237 handle_info(process_pending_endpoint,
238 #state{ pending_endpoints = [{enable, Endpoint} | RPendingEndpoints] } = State) ->
239 103 State2 =
240 case catch enable(Endpoint, State) of
241 {ok, NState0} ->
242 102 ?LOG_INFO(ls(#{what => gd_endpoint_enabled,
243 text => <<"GD server manager enables pending endpoint">>,
244
:-(
endpoint => Endpoint}, State)),
245 102 NState0;
246 Error ->
247 1 ?LOG_ERROR(ls(#{what => gd_endpoint_enabling_failed,
248 text => <<"GD server manager cannot enable endpoint">>,
249
:-(
endpoint => Endpoint, reason => Error}, State)),
250 1 State
251 end,
252
253 103 maybe_schedule_process_get(State2),
254 103 maybe_schedule_process_endpoint(RPendingEndpoints),
255 103 State3 = State2#state{ pending_endpoints = RPendingEndpoints },
256 103 State4 = maybe_notify_endpoints_listeners(State3),
257 103 {noreply, State4};
258 handle_info(process_pending_endpoint,
259 #state{ pending_endpoints = [{disable, Endpoint} | RPendingEndpoints] } = State) ->
260 3 State2 =
261 case catch disable(Endpoint, State) of
262 {ok, NState0} ->
263 3 ?LOG_INFO(ls(#{what => gd_endpoint_disabled,
264 text => <<"GD server manager disables pending endpoint">>,
265
:-(
endpoint => Endpoint}, State)),
266 3 NState0;
267 Error ->
268
:-(
?LOG_ERROR(ls(#{what => gd_endpoint_disabling_failed,
269 text => <<"GD server manager cannot disable endpoint">>,
270
:-(
endpoint => Endpoint, reason => Error}, State)),
271
:-(
State
272 end,
273
274 3 maybe_schedule_process_endpoint(RPendingEndpoints),
275 3 State3 = State2#state{ pending_endpoints = RPendingEndpoints },
276 3 State4 = maybe_notify_endpoints_listeners(State3),
277 3 {noreply, State4};
278 handle_info({'DOWN', MonitorRef, _Type, Pid, Reason}, #state{ enabled = Enabled,
279 disabled = Disabled } = State) ->
280 99 {Endpoint, Type, NState} =
281 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Enabled) of
282 {value, #endpoint_info{ endpoint = Endpoint0 }, NEnabled} ->
283 96 {Endpoint0, enabled, State#state{ enabled = NEnabled }};
284 false ->
285 3 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Disabled) of
286 {value, #endpoint_info{ endpoint = Endpoint0 }, NDisabled} ->
287 3 {Endpoint0, disabled, State#state{ disabled = NDisabled }};
288 false ->
289
:-(
{Pid, unknown, State}
290 end
291 end,
292
293 99 Reason2 =
294 case Reason of
295 98 shutdown -> normal;
296 1 _Other -> Reason
297 end,
298 99 ?LOG_INFO(ls(#{what => gd_endpoint_closed,
299 text => <<"Disconnected from a GD endpoint">>,
300
:-(
type => Type, endpoint => Endpoint, reason => Reason2}, State)),
301 99 {noreply, NState};
302 handle_info(Msg, State) ->
303
:-(
?UNEXPECTED_INFO(Msg),
304
:-(
{noreply, State}.
305
306 code_change(_OldVsn, State, _Extra) ->
307
:-(
{ok, State}.
308
309 terminate(_Reason, _State) ->
310 %% TODO: Cleanup
311 78 ok.
312
313 %%--------------------------------------------------------------------
314 %% Internal functions
315 %%--------------------------------------------------------------------
316
317 -spec do_call(Server :: jid:lserver(), Msg :: any()) -> any().
318 do_call(Server, Msg) ->
319 1208 MgrName = mod_global_distrib_utils:server_to_mgr_name(Server),
320 1208 try
321 1208 gen_server:call(MgrName, Msg)
322 catch exit:{timeout,_} = Reason:Stacktrace ->
323
:-(
catch gen_server:cast(MgrName, {call_timeout, self(), Msg}),
324
:-(
erlang:raise(exit, Reason, Stacktrace)
325 end.
326
327 -spec schedule_refresh(State :: state()) -> state().
328 schedule_refresh(#state{ refresh_interval = Interval, last_endpoints = [_|_] } = State) ->
329 75 do_schedule_refresh(State, Interval);
330 schedule_refresh(#state{ refresh_interval_when_disconnected = Interval } = State) ->
331 %% Try more often by default when get_endpoints returns empty list
332 3 do_schedule_refresh(State, Interval).
333
334 do_schedule_refresh(State, Interval) ->
335 78 erlang:send_after(timer:seconds(Interval), self(), refresh),
336 78 State.
337
338 -spec schedule_gc(State :: state()) -> state().
339 schedule_gc(#state{ gc_interval = Interval } = State) ->
340 98 erlang:send_after(timer:seconds(Interval), self(), disabled_gc),
341 98 State.
342
343 -spec maybe_schedule_process_endpoint(PendingEndpoints :: list()) -> any().
344 maybe_schedule_process_endpoint([]) ->
345 87 nop;
346 maybe_schedule_process_endpoint(_) ->
347 106 self() ! process_pending_endpoint.
348
349 -spec maybe_schedule_process_get(state()) -> any().
350 maybe_schedule_process_get(#state{ pending_gets = PendingGets, enabled = Enabled }) ->
351 104 case PendingGets == queue:new() of
352 103 true -> nop;
353 false ->
354 1 Enabled =/= [] orelse error(enabled_is_empty),
355 1 self() ! process_pending_get
356 end.
357
358 -spec maybe_notify_endpoints_listeners(state()) -> state().
359 maybe_notify_endpoints_listeners(#state{ pending_endpoints = [],
360 pending_endpoints_listeners = Listeners } = State) ->
361 80 lists:foreach(fun(Listener) -> gen_server:reply(Listener, ok) end, Listeners),
362 80 State#state{ pending_endpoints_listeners = [] };
363 maybe_notify_endpoints_listeners(State) ->
364 26 State.
365
366 -spec pick_connection_pool(Enabled :: [endpoint_info()]) -> pid() | no_connections.
367 pick_connection_pool(Enabled) ->
368 37 #endpoint_info{ conn_pool_ref = PoolRef } = lists:nth(rand:uniform(length(Enabled)), Enabled),
369 37 PoolRef.
370
371 -spec refresh_connections(State :: state()) -> state().
372 refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoints,
373 last_endpoints = LastEndpoints } = State) ->
374 88 ?LOG_DEBUG(ls(#{what => gd_refreshing_endpoints}, State)),
375 88 NewEndpoints = get_endpoints(Server),
376 88 case NewEndpoints of
377 LastEndpoints ->
378 5 nothing_new;
379 _ ->
380 83 ?LOG_INFO(ls(#{what => gd_endpoints_change,
381 old_endpoints => LastEndpoints,
382
:-(
new_endpoints => NewEndpoints}, State))
383 end,
384 88 ?LOG_DEBUG(ls(#{what => gd_fetched_endpoints, fetched_endpoints => NewEndpoints}, State)),
385
386 88 NPendingEndpoints = resolve_pending(NewEndpoints, State#state.enabled),
387 88 log_endpoints_changes(Server, NPendingEndpoints, State),
388
389 88 case PendingEndpoints of
390 87 [] -> maybe_schedule_process_endpoint(NPendingEndpoints);
391 1 _ -> already_scheduled
392 end,
393
394 88 FinalPendingEndpoints = PendingEndpoints ++ NPendingEndpoints,
395
396 88 case FinalPendingEndpoints of
397 [] ->
398 7 no_log;
399 _ ->
400 81 ?LOG_DEBUG(ls(#{what => gd_endpoints_update_scheduled,
401 new_changes => NPendingEndpoints,
402 pending_changes => FinalPendingEndpoints,
403 new_changes_length => length(NPendingEndpoints),
404 pending_changes_length => length(FinalPendingEndpoints)},
405 81 State))
406 end,
407 88 State#state{ pending_endpoints = FinalPendingEndpoints, last_endpoints = NewEndpoints }.
408
409 -spec get_endpoints(Server :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()].
410 get_endpoints(Server) ->
411 88 EndpointsToResolve = mod_global_distrib_mapping:endpoints(Server),
412 88 mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve).
413
414 -spec resolve_pending(NewEndpointList :: [mod_global_distrib_utils:endpoint()],
415 OldEnabled :: [endpoint_pid_tuple()]) ->
416 endpoints_changes().
417 resolve_pending([], []) ->
418 88 [];
419 resolve_pending([], [#endpoint_info{ endpoint = ToDisable } | RToDisable]) ->
420 3 [{disable, ToDisable} | resolve_pending([], RToDisable)];
421 resolve_pending([MaybeToEnable | RNewEndpoints], OldEnabled) ->
422 110 case lists:keytake(MaybeToEnable, #endpoint_info.endpoint, OldEnabled) of
423 103 false -> [{enable, MaybeToEnable} | resolve_pending(RNewEndpoints, OldEnabled)];
424 7 {value, _, NOldEnabled} -> resolve_pending(RNewEndpoints, NOldEnabled)
425 end.
426
427 -spec log_endpoints_changes(Server :: jid:lserver(),
428 EndpointsChanges :: endpoints_changes(), term()) -> any().
429 log_endpoints_changes(Server, [], State) ->
430 7 ?LOG_DEBUG(ls(#{what => gd_same_endpoints, server => Server,
431 7 text => <<"No endpoint changes">>}, State));
432 log_endpoints_changes(Server, EndpointsChanges, State) ->
433 81 ?LOG_INFO(ls(#{what => gd_endpoints_changes, server => Server,
434 103 to_enable => [ E || {enable, E} <- EndpointsChanges ],
435 3 to_disable => [ E || {disable, E} <- EndpointsChanges ]}, State)).
436
437 -spec enable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
438 enable(Endpoint, #state{ disabled = Disabled, supervisor = Supervisor,
439 enabled = Enabled, server = Server, conn_opts = ConnOpts } = State) ->
440 103 case lists:keytake(Endpoint, #endpoint_info.endpoint, Disabled) of
441 false ->
442 103 case catch mod_global_distrib_server_sup:start_pool(Supervisor, Endpoint,
443 Server, ConnOpts) of
444 {ok, ConnPoolRef, ConnPoolPid} ->
445 102 MonitorRef = monitor(process, ConnPoolPid),
446 102 EndpointInfo = #endpoint_info{
447 endpoint = Endpoint,
448 conn_pool_ref = ConnPoolRef,
449 conn_pool_pid = ConnPoolPid,
450 monitor_ref = MonitorRef
451 },
452 102 {ok, State#state{ enabled = [EndpointInfo | Enabled] }};
453 Error ->
454 1 {error, Error}
455 end;
456 {value, EndpointInfo, NewDisabled} ->
457
:-(
{ok, State#state{ enabled = [EndpointInfo | Enabled], disabled = NewDisabled }}
458 end.
459
460 -spec disable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
461 disable(Endpoint, #state{ disabled = Disabled, enabled = Enabled } = State) ->
462 3 {value, EndpointInfo, NewEnabled} = lists:keytake(Endpoint, #endpoint_info.endpoint, Enabled),
463 3 {ok, State#state{ enabled = NewEnabled, disabled = [EndpointInfo | Disabled] }}.
464
465 -spec stop_disabled(Endpoint :: endpoint(), State :: state()) -> any().
466 stop_disabled(Endpoint, State) ->
467 1 case catch mod_global_distrib_server_sup:stop_pool(
468 State#state.supervisor, Endpoint) of
469 ok ->
470
:-(
ok;
471 Error ->
472 1 ?LOG_ERROR(ls(#{what => gd_cannot_close_disabled_connection,
473 reason => Error, endpoint => Endpoint},
474
:-(
State))
475 end.
476
477 state_info(#state{
478 server = Server,
479 supervisor = Supervisor,
480 enabled = Enabled,
481 disabled = Disabled,
482 pending_endpoints = PendingEndpoints,
483 pending_gets = PendingGets,
484 refresh_interval = RefreshInterval,
485 refresh_interval_when_disconnected = DisRefreshInterval,
486 gc_interval = GCInterval,
487 pending_endpoints_listeners = PendingEndpointsListeners,
488 last_endpoints = LastEndpoints
489 }) ->
490 459 #{server => Server,
491 supervisor => Supervisor,
492 enabled => Enabled,
493 disabled => Disabled,
494 pending_endpoints => PendingEndpoints,
495 pending_gets => PendingGets,
496 refresh_interval => RefreshInterval,
497 refresh_interval_when_disconnected => DisRefreshInterval,
498 gc_interval => GCInterval,
499 pending_endpoints_listeners => PendingEndpointsListeners,
500 last_endpoints => LastEndpoints}.
501
502
503 %% Log State
504 ls(LogMeta, State) ->
505 449 maps:merge(state_info(State), LogMeta).
Line Hits Source