./ct_report/coverage/mod_global_distrib_server_mgr.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_global_distrib_server_mgr).
17 -author('piotr.nosek@erlang-solutions.com').
18
19 -include("mongoose.hrl").
20
21 -behaviour(gen_server).
22
23 -export([start_link/2]).
24 -export([get_connection/1, ping_proc/1, get_state_info/1]).
25 -export([force_refresh/1, close_disabled/1]).
26 -export([init/1, handle_continue/2, handle_call/3, handle_cast/2,
27 handle_info/2, terminate/2, code_change/3]).
28
29 %% Debug
30 -export([get_enabled_endpoints/1, get_disabled_endpoints/1]).
31
32 -ignore_xref([close_disabled/1, force_refresh/1, get_disabled_endpoints/1,
33 get_enabled_endpoints/1, get_state_info/1, start_link/2]).
34
35 -type endpoint() :: mod_global_distrib_utils:endpoint().
36 -type endpoint_pid_tuple() :: {endpoint(), pid()}.
37
38 -record(endpoint_info, {
39 endpoint :: endpoint(),
40 conn_pool_ref :: atom(),
41 conn_pool_pid :: pid(),
42 monitor_ref :: reference()
43 }).
44
45 -type endpoint_info() :: #endpoint_info{}.
46 -type endpoints_changes() :: [{enable | disable, endpoint()}].
47
48 -record(state, {
49 server :: jid:lserver(),
50 supervisor :: pid(),
51 enabled :: [endpoint_info()],
52 disabled :: [endpoint_info()],
53 pending_endpoints :: endpoints_changes(),
54 pending_gets :: queue:queue(tuple()),
55 refresh_interval :: pos_integer(),
56 refresh_interval_when_disconnected :: pos_integer(),
57 gc_interval :: pos_integer(),
58 %% Used by force_refresh to block until refresh is fully done.
59 %% Listeners are notified only once and then this list is cleared.
60 pending_endpoints_listeners = [] :: [pid()],
61 %% Containts last result of get_endpoints
62 last_endpoints :: [endpoint()] | undefined,
63 conn_opts :: map()
64 }).
65
66 -type state() :: #state{}.
67 %%--------------------------------------------------------------------
68 %% API
69 %%--------------------------------------------------------------------
70
71 -spec start_link(Server :: jid:lserver(), ServerSup :: pid()) -> {ok, pid()} | {error, any()}.
72 start_link(Server, ServerSup) when is_binary(Server) ->
73 80 Name = mod_global_distrib_utils:server_to_mgr_name(Server),
74 80 gen_server:start_link({local, Name}, ?MODULE, [Server, ServerSup], []).
75
76 %% Will return only when endpoints changes (if there are any) are fully applied.
77 %% If force refresh is already in progress, new concurrent calls won't trigger another refresh
78 %% but will return 'ok' when the current one is finished.
79 -spec force_refresh(Server :: jid:lserver()) -> ok.
80 force_refresh(Server) ->
81 10 do_call(Server, force_refresh).
82
83 -spec close_disabled(Server :: jid:lserver()) -> ok.
84 close_disabled(Server) ->
85
:-(
do_call(Server, close_disabled).
86
87 -spec get_connection(Server :: jid:lserver()) -> {ok, pid()} | no_return().
88 get_connection(Server) ->
89 47 {ok, CPoolRef} = do_call(Server, get_connection_pool),
90 36 {ok, cpool:get_connection(CPoolRef)}.
91
92 %% `ping_proc` instead of just `ping` to emphasize that this call does not ping
93 %% some remote server but the manager process instead
94 -spec ping_proc(Server :: jid:lserver()) -> pong | pang.
95 ping_proc(Server) ->
96 569 case catch do_call(Server, ping_proc) of
97 502 pong -> pong;
98 67 _Error -> pang
99 end.
100
101 -spec get_state_info(Server :: jid:lserver()) -> map().
102 get_state_info(Server) ->
103 10 do_call(Server, get_state_info).
104
105 %%--------------------------------------------------------------------
106 %% Debug API
107 %%--------------------------------------------------------------------
108
109 -spec get_enabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
110 get_enabled_endpoints(Server) ->
111 16 do_call(Server, get_enabled_endpoints).
112
113 -spec get_disabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
114 get_disabled_endpoints(Server) ->
115 16 do_call(Server, get_disabled_endpoints).
116
117 %%--------------------------------------------------------------------
118 %% gen_server callbacks
119 %%--------------------------------------------------------------------
120
121 init([Server, Supervisor]) ->
122 80 process_flag(trap_exit, true),
123
124 80 HostType = mod_global_distrib_utils:host_type(),
125 80 ConnOpts = gen_mod:get_module_opt(HostType, mod_global_distrib_hosts_refresher, connections),
126 80 #{endpoint_refresh_interval := RefreshInterval,
127 endpoint_refresh_interval_when_empty := DisRefreshInterval,
128 disabled_gc_interval := GCInterval} = ConnOpts,
129
130 80 State = #state{
131 server = Server,
132 supervisor = Supervisor,
133 enabled = [],
134 disabled = [],
135 pending_endpoints = [],
136 pending_gets = queue:new(),
137 refresh_interval = RefreshInterval,
138 refresh_interval_when_disconnected = DisRefreshInterval,
139 gc_interval = GCInterval,
140 conn_opts = ConnOpts
141 },
142
143 80 ?LOG_INFO(ls(#{what => gd_mgr_started}, State)),
144 80 {ok, State, {continue, initial_refresh}}.
145
146 handle_continue(initial_refresh, State) ->
147 80 State2 = refresh_connections(State),
148 77 schedule_refresh(State2),
149 77 schedule_gc(State2),
150 77 {noreply, State2}.
151
152 handle_call(get_connection_pool, From, #state{ enabled = [],
153 pending_gets = PendingGets } = State) ->
154 12 {noreply, State#state{ pending_gets = queue:in(From, PendingGets) }};
155 handle_call(get_connection_pool, _From, #state{ enabled = Enabled } = State) ->
156 24 {reply, {ok, pick_connection_pool(Enabled)}, State};
157 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = [] } = State) ->
158 10 State2 = refresh_connections(State),
159 10 case State2#state.pending_endpoints of
160 1 [] -> {reply, ok, State2};
161 9 _ -> {noreply, State2#state{ pending_endpoints_listeners = [From] }}
162 end;
163 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = Listeners } = State) ->
164
:-(
{noreply, State#state{ pending_endpoints_listeners = [From | Listeners] }};
165 handle_call(close_disabled, _From, #state{ disabled = Disabled } = State) ->
166
:-(
lists:foreach(
167 fun(#endpoint_info{ endpoint = Endpoint }) ->
168
:-(
stop_disabled(Endpoint, State)
169 end, Disabled),
170
:-(
{reply, ok, State};
171 handle_call(get_enabled_endpoints, _From, State) ->
172 16 {reply, [ CI#endpoint_info.endpoint || CI <- State#state.enabled ], State};
173 handle_call(get_disabled_endpoints, _From, State) ->
174 16 {reply, [ CI#endpoint_info.endpoint || CI <- State#state.disabled ], State};
175 handle_call(ping_proc, _From, State) ->
176 502 {reply, pong, State};
177 handle_call(get_state_info, _From, State) ->
178 10 {reply, state_info(State), State}.
179
180 handle_cast({call_timeout, FromPid, Msg}, State) ->
181
:-(
?LOG_WARNING(ls(#{what => gd_mgr_call_timeout,
182
:-(
caller_pid => FromPid, caller_msg => Msg}, State)),
183
:-(
{noreply, State};
184 handle_cast(Msg, State) ->
185
:-(
?UNEXPECTED_CAST(Msg),
186
:-(
{noreply, State}.
187
188 handle_info(refresh, State) ->
189
:-(
State2 = case State#state.pending_endpoints of
190
:-(
[] -> refresh_connections(State);
191
:-(
_ -> State % May occur if we are in the middle of force_refresh
192 end,
193
:-(
case State#state.pending_endpoints == State2#state.pending_endpoints of
194 true ->
195
:-(
ok;
196 _ ->
197
:-(
?LOG_INFO(ls(#{what => gd_mgr_refresh,
198 text => <<"A list of pending_endpoints has changed in GD server manager">>,
199 pending_endpoints_before => State#state.pending_endpoints,
200
:-(
pending_endpoints_after => State2#state.pending_endpoints}, State))
201 end,
202
:-(
schedule_refresh(State2),
203
:-(
{noreply, State2};
204 handle_info(disabled_gc, #state{ disabled = Disabled } = State) ->
205 22 StoppedEndpoints = lists:flatmap(
206 fun(#endpoint_info{ endpoint = Endpoint, conn_pool_ref = ConnPool }) ->
207 3 try cpool:get_connection(ConnPool, no_wait_for_reconnect) of
208 _ConnPid ->
209 2 []
210 catch error:no_connections ->
211 1 stop_disabled(Endpoint, State),
212 1 [Endpoint]
213 end
214 end, Disabled),
215 22 case StoppedEndpoints of
216 [] ->
217 21 ok;
218 _ ->
219 1 ?LOG_INFO(ls(#{what => gd_mgr_disabled_gc,
220 text => <<"GD server manager stops some inactive endpoints">>,
221
:-(
stopped_endpoints => StoppedEndpoints}, State))
222 end,
223 22 schedule_gc(State),
224 22 {noreply, State};
225 handle_info(process_pending_get, #state{ enabled = [] } = State) ->
226
:-(
{noreply, State};
227 handle_info(process_pending_get, #state{ pending_gets = PendingGets,
228 enabled = Enabled } = State) ->
229 12 NState =
230 case queue:out(PendingGets) of
231 {{value, From}, NewPendingGets} ->
232 12 CPoolRef = pick_connection_pool(Enabled),
233 12 gen_server:reply(From, {ok, CPoolRef}),
234 12 State#state{ pending_gets = NewPendingGets };
235 {empty, _} ->
236
:-(
State
237 end,
238 12 maybe_schedule_process_get(NState),
239 12 {noreply, NState};
240 handle_info(process_pending_endpoint,
241 #state{ pending_endpoints = [{enable, Endpoint} | RPendingEndpoints] } = State) ->
242 105 State2 =
243 case catch enable(Endpoint, State) of
244 {ok, NState0} ->
245 100 ?LOG_INFO(ls(#{what => gd_endpoint_enabled,
246 text => <<"GD server manager enables pending endpoint">>,
247
:-(
endpoint => Endpoint}, State)),
248 100 NState0;
249 Error ->
250 5 ?LOG_ERROR(ls(#{what => gd_endpoint_enabling_failed,
251 text => <<"GD server manager cannot enable endpoint">>,
252
:-(
endpoint => Endpoint, reason => Error}, State)),
253 5 State
254 end,
255
256 105 maybe_schedule_process_get(State2),
257 105 maybe_schedule_process_endpoint(RPendingEndpoints),
258 105 State3 = State2#state{ pending_endpoints = RPendingEndpoints },
259 105 State4 = maybe_notify_endpoints_listeners(State3),
260 105 {noreply, State4};
261 handle_info(process_pending_endpoint,
262 #state{ pending_endpoints = [{disable, Endpoint} | RPendingEndpoints] } = State) ->
263 3 State2 =
264 case catch disable(Endpoint, State) of
265 {ok, NState0} ->
266 3 ?LOG_INFO(ls(#{what => gd_endpoint_disabled,
267 text => <<"GD server manager disables pending endpoint">>,
268
:-(
endpoint => Endpoint}, State)),
269 3 NState0;
270 Error ->
271
:-(
?LOG_ERROR(ls(#{what => gd_endpoint_disabling_failed,
272 text => <<"GD server manager cannot disable endpoint">>,
273
:-(
endpoint => Endpoint, reason => Error}, State)),
274
:-(
State
275 end,
276
277 3 maybe_schedule_process_endpoint(RPendingEndpoints),
278 3 State3 = State2#state{ pending_endpoints = RPendingEndpoints },
279 3 State4 = maybe_notify_endpoints_listeners(State3),
280 3 {noreply, State4};
281 handle_info({'DOWN', MonitorRef, _Type, Pid, Reason}, #state{ enabled = Enabled,
282 disabled = Disabled } = State) ->
283 98 {Endpoint, Type, NState} =
284 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Enabled) of
285 {value, #endpoint_info{ endpoint = Endpoint0 }, NEnabled} ->
286 95 {Endpoint0, enabled, State#state{ enabled = NEnabled }};
287 false ->
288 3 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Disabled) of
289 {value, #endpoint_info{ endpoint = Endpoint0 }, NDisabled} ->
290 3 {Endpoint0, disabled, State#state{ disabled = NDisabled }};
291 false ->
292
:-(
{Pid, unknown, State}
293 end
294 end,
295
296 98 Reason2 =
297 case Reason of
298 98 shutdown -> normal;
299
:-(
_Other -> Reason
300 end,
301 98 ?LOG_INFO(ls(#{what => gd_endpoint_closed,
302 text => <<"Disconnected from a GD endpoint">>,
303
:-(
type => Type, endpoint => Endpoint, reason => Reason2}, State)),
304 98 {noreply, NState};
305 handle_info(Msg, State) ->
306
:-(
?UNEXPECTED_INFO(Msg),
307
:-(
{noreply, State}.
308
309 code_change(_OldVsn, State, _Extra) ->
310
:-(
{ok, State}.
311
312 terminate(_Reason, _State) ->
313 %% TODO: Cleanup
314 80 ok.
315
316 %%--------------------------------------------------------------------
317 %% Internal functions
318 %%--------------------------------------------------------------------
319
320 -spec do_call(Server :: jid:lserver(), Msg :: any()) -> any().
321 do_call(Server, Msg) ->
322 668 MgrName = mod_global_distrib_utils:server_to_mgr_name(Server),
323 668 try
324 668 gen_server:call(MgrName, Msg)
325 catch exit:{timeout,_} = Reason:Stacktrace ->
326
:-(
catch gen_server:cast(MgrName, {call_timeout, self(), Msg}),
327
:-(
erlang:raise(exit, Reason, Stacktrace)
328 end.
329
330 -spec schedule_refresh(State :: state()) -> state().
331 schedule_refresh(#state{ refresh_interval = Interval, last_endpoints = [_|_] } = State) ->
332 74 do_schedule_refresh(State, Interval);
333 schedule_refresh(#state{ refresh_interval_when_disconnected = Interval } = State) ->
334 %% Try more often by default when get_endpoints returns empty list
335 3 do_schedule_refresh(State, Interval).
336
337 do_schedule_refresh(State, Interval) ->
338 77 erlang:send_after(timer:seconds(Interval), self(), refresh),
339 77 State.
340
341 -spec schedule_gc(State :: state()) -> state().
342 schedule_gc(#state{ gc_interval = Interval } = State) ->
343 99 erlang:send_after(timer:seconds(Interval), self(), disabled_gc),
344 99 State.
345
346 -spec maybe_schedule_process_endpoint(PendingEndpoints :: list()) -> any().
347 maybe_schedule_process_endpoint([]) ->
348 82 nop;
349 maybe_schedule_process_endpoint(_) ->
350 109 self() ! process_pending_endpoint.
351
352 -spec maybe_schedule_process_get(state()) -> any().
353 maybe_schedule_process_get(#state{ pending_gets = PendingGets, enabled = Enabled }) ->
354 117 case PendingGets == queue:new() of
355 105 true -> nop;
356 false ->
357 12 Enabled =/= [] orelse error(enabled_is_empty),
358 12 self() ! process_pending_get
359 end.
360
361 -spec maybe_notify_endpoints_listeners(state()) -> state().
362 maybe_notify_endpoints_listeners(#state{ pending_endpoints = [],
363 pending_endpoints_listeners = Listeners } = State) ->
364 78 lists:foreach(fun(Listener) -> gen_server:reply(Listener, ok) end, Listeners),
365 78 State#state{ pending_endpoints_listeners = [] };
366 maybe_notify_endpoints_listeners(State) ->
367 30 State.
368
369 -spec pick_connection_pool(Enabled :: [endpoint_info()]) -> pid() | no_connections.
370 pick_connection_pool(Enabled) ->
371 36 #endpoint_info{ conn_pool_ref = PoolRef } = lists:nth(rand:uniform(length(Enabled)), Enabled),
372 36 PoolRef.
373
374 -spec refresh_connections(State :: state()) -> state().
375 refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoints,
376 last_endpoints = LastEndpoints } = State) ->
377 90 ?LOG_DEBUG(ls(#{what => gd_refreshing_endpoints}, State)),
378 90 NewEndpoints = get_endpoints(Server),
379 87 case NewEndpoints of
380 LastEndpoints ->
381 5 nothing_new;
382 _ ->
383 82 ?LOG_INFO(ls(#{what => gd_endpoints_change,
384 old_endpoints => LastEndpoints,
385
:-(
new_endpoints => NewEndpoints}, State))
386 end,
387 87 ?LOG_DEBUG(ls(#{what => gd_fetched_endpoints, fetched_endpoints => NewEndpoints}, State)),
388
389 87 NPendingEndpoints = resolve_pending(NewEndpoints, State#state.enabled),
390 87 log_endpoints_changes(Server, NPendingEndpoints, State),
391
392 87 case PendingEndpoints of
393 83 [] -> maybe_schedule_process_endpoint(NPendingEndpoints);
394 4 _ -> already_scheduled
395 end,
396
397 87 FinalPendingEndpoints = PendingEndpoints ++ NPendingEndpoints,
398
399 87 case FinalPendingEndpoints of
400 [] ->
401 4 no_log;
402 _ ->
403 83 ?LOG_DEBUG(ls(#{what => gd_endpoints_update_scheduled,
404 new_changes => NPendingEndpoints,
405 pending_changes => FinalPendingEndpoints,
406 new_changes_length => length(NPendingEndpoints),
407 pending_changes_length => length(FinalPendingEndpoints)},
408 83 State))
409 end,
410 87 State#state{ pending_endpoints = FinalPendingEndpoints, last_endpoints = NewEndpoints }.
411
412 -spec get_endpoints(Server :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()].
413 get_endpoints(Server) ->
414 90 EndpointsToResolve = mod_global_distrib_mapping:endpoints(Server),
415 90 mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve).
416
417 -spec resolve_pending(NewEndpointList :: [mod_global_distrib_utils:endpoint()],
418 OldEnabled :: [endpoint_pid_tuple()]) ->
419 endpoints_changes().
420 resolve_pending([], []) ->
421 87 [];
422 resolve_pending([], [#endpoint_info{ endpoint = ToDisable } | RToDisable]) ->
423 3 [{disable, ToDisable} | resolve_pending([], RToDisable)];
424 resolve_pending([MaybeToEnable | RNewEndpoints], OldEnabled) ->
425 110 case lists:keytake(MaybeToEnable, #endpoint_info.endpoint, OldEnabled) of
426 107 false -> [{enable, MaybeToEnable} | resolve_pending(RNewEndpoints, OldEnabled)];
427 3 {value, _, NOldEnabled} -> resolve_pending(RNewEndpoints, NOldEnabled)
428 end.
429
430 -spec log_endpoints_changes(Server :: jid:lserver(),
431 EndpointsChanges :: endpoints_changes(), term()) -> any().
432 log_endpoints_changes(Server, [], State) ->
433 4 ?LOG_DEBUG(ls(#{what => gd_same_endpoints, server => Server,
434 4 text => <<"No endpoint changes">>}, State));
435 log_endpoints_changes(Server, EndpointsChanges, State) ->
436 83 ?LOG_INFO(ls(#{what => gd_endpoints_changes, server => Server,
437 107 to_enable => [ E || {enable, E} <- EndpointsChanges ],
438 3 to_disable => [ E || {disable, E} <- EndpointsChanges ]}, State)).
439
440 -spec enable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
441 enable(Endpoint, #state{ disabled = Disabled, supervisor = Supervisor,
442 enabled = Enabled, server = Server, conn_opts = ConnOpts } = State) ->
443 105 case lists:keytake(Endpoint, #endpoint_info.endpoint, Disabled) of
444 false ->
445 105 case catch mod_global_distrib_server_sup:start_pool(Supervisor, Endpoint,
446 Server, ConnOpts) of
447 {ok, ConnPoolRef, ConnPoolPid} ->
448 100 MonitorRef = monitor(process, ConnPoolPid),
449 100 EndpointInfo = #endpoint_info{
450 endpoint = Endpoint,
451 conn_pool_ref = ConnPoolRef,
452 conn_pool_pid = ConnPoolPid,
453 monitor_ref = MonitorRef
454 },
455 100 {ok, State#state{ enabled = [EndpointInfo | Enabled] }};
456 Error ->
457 5 {error, Error}
458 end;
459 {value, EndpointInfo, NewDisabled} ->
460
:-(
{ok, State#state{ enabled = [EndpointInfo | Enabled], disabled = NewDisabled }}
461 end.
462
463 -spec disable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
464 disable(Endpoint, #state{ disabled = Disabled, enabled = Enabled } = State) ->
465 3 {value, EndpointInfo, NewEnabled} = lists:keytake(Endpoint, #endpoint_info.endpoint, Enabled),
466 3 {ok, State#state{ enabled = NewEnabled, disabled = [EndpointInfo | Disabled] }}.
467
468 -spec stop_disabled(Endpoint :: endpoint(), State :: state()) -> any().
469 stop_disabled(Endpoint, State) ->
470 1 case catch mod_global_distrib_server_sup:stop_pool(
471 State#state.supervisor, Endpoint) of
472 ok ->
473
:-(
ok;
474 Error ->
475 1 ?LOG_ERROR(ls(#{what => gd_cannot_close_disabled_connection,
476 reason => Error, endpoint => Endpoint},
477
:-(
State))
478 end.
479
480 state_info(#state{
481 server = Server,
482 supervisor = Supervisor,
483 enabled = Enabled,
484 disabled = Disabled,
485 pending_endpoints = PendingEndpoints,
486 pending_gets = PendingGets,
487 refresh_interval = RefreshInterval,
488 refresh_interval_when_disconnected = DisRefreshInterval,
489 gc_interval = GCInterval,
490 pending_endpoints_listeners = PendingEndpointsListeners,
491 last_endpoints = LastEndpoints
492 }) ->
493 463 #{server => Server,
494 supervisor => Supervisor,
495 enabled => Enabled,
496 disabled => Disabled,
497 pending_endpoints => PendingEndpoints,
498 pending_gets => PendingGets,
499 refresh_interval => RefreshInterval,
500 refresh_interval_when_disconnected => DisRefreshInterval,
501 gc_interval => GCInterval,
502 pending_endpoints_listeners => PendingEndpointsListeners,
503 last_endpoints => LastEndpoints}.
504
505
506 %% Log State
507 ls(LogMeta, State) ->
508 453 maps:merge(state_info(State), LogMeta).
Line Hits Source