./ct_report/coverage/mod_global_distrib_server_mgr.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_global_distrib_server_mgr).
17 -author('piotr.nosek@erlang-solutions.com').
18
19 -include("mongoose.hrl").
20
21 -behaviour(gen_server).
22
23 -export([start_link/2]).
24 -export([get_connection/1, ping_proc/1, get_state_info/1]).
25 -export([force_refresh/1, close_disabled/1]).
26 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
27
28 %% Debug
29 -export([get_enabled_endpoints/1, get_disabled_endpoints/1]).
30
31 -ignore_xref([close_disabled/1, force_refresh/1, get_disabled_endpoints/1,
32 get_enabled_endpoints/1, get_state_info/1, start_link/2]).
33
34 -type endpoint() :: mod_global_distrib_utils:endpoint().
35 -type endpoint_pid_tuple() :: {endpoint(), pid()}.
36
37 -record(endpoint_info, {
38 endpoint :: endpoint(),
39 conn_pool_ref :: atom(),
40 conn_pool_pid :: pid(),
41 monitor_ref :: reference()
42 }).
43
44 -type endpoint_info() :: #endpoint_info{}.
45 -type endpoints_changes() :: [{enable | disable, endpoint()}].
46
47 -record(state, {
48 server :: jid:lserver(),
49 supervisor :: pid(),
50 enabled :: [endpoint_info()],
51 disabled :: [endpoint_info()],
52 pending_endpoints :: endpoints_changes(),
53 pending_gets :: queue:queue(tuple()),
54 refresh_interval :: pos_integer(),
55 refresh_interval_when_disconnected :: pos_integer(),
56 gc_interval :: pos_integer(),
57 %% Used by force_refresh to block until refresh is fully done.
58 %% Listeners are notified only once and then this list is cleared.
59 pending_endpoints_listeners = [] :: [pid()],
60 %% Containts last result of get_endpoints
61 last_endpoints :: [endpoint()] | undefined
62 }).
63
64 -type state() :: #state{}.
65 %%--------------------------------------------------------------------
66 %% API
67 %%--------------------------------------------------------------------
68
69 -spec start_link(Server :: jid:lserver(), ServerSup :: pid()) -> {ok, pid()} | {error, any()}.
70 start_link(Server, ServerSup) when is_binary(Server) ->
71 71 Name = mod_global_distrib_utils:server_to_mgr_name(Server),
72 71 gen_server:start_link({local, Name}, ?MODULE, [Server, ServerSup], []).
73
74 %% Will return only when endpoints changes (if there are any) are fully applied.
75 %% If force refresh is already in progress, new concurrent calls won't trigger another refresh
76 %% but will return 'ok' when the current one is finished.
77 -spec force_refresh(Server :: jid:lserver()) -> ok.
78 force_refresh(Server) ->
79 10 do_call(Server, force_refresh).
80
81 -spec close_disabled(Server :: jid:lserver()) -> ok.
82 close_disabled(Server) ->
83
:-(
do_call(Server, close_disabled).
84
85 -spec get_connection(Server :: jid:lserver()) -> {ok, pid()} | no_return().
86 get_connection(Server) ->
87 52 {ok, CPoolRef} = do_call(Server, get_connection_pool),
88 36 {ok, cpool:get_connection(CPoolRef)}.
89
90 %% `ping_proc` instead of just `ping` to emphasize that this call does not ping
91 %% some remote server but the manager process instead
92 -spec ping_proc(Server :: jid:lserver()) -> pong | pang.
93 ping_proc(Server) ->
94 1023 case catch do_call(Server, ping_proc) of
95 968 pong -> pong;
96 55 _Error -> pang
97 end.
98
99 -spec get_state_info(Server :: jid:lserver()) -> map().
100 get_state_info(Server) ->
101 10 do_call(Server, get_state_info).
102
103 %%--------------------------------------------------------------------
104 %% Debug API
105 %%--------------------------------------------------------------------
106
107 -spec get_enabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
108 get_enabled_endpoints(Server) ->
109 16 do_call(Server, get_enabled_endpoints).
110
111 -spec get_disabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
112 get_disabled_endpoints(Server) ->
113 16 do_call(Server, get_disabled_endpoints).
114
115 %%--------------------------------------------------------------------
116 %% gen_server callbacks
117 %%--------------------------------------------------------------------
118
119 init([Server, Supervisor]) ->
120 71 process_flag(trap_exit, true),
121
122 71 RefreshInterval = mod_global_distrib_utils:opt(mod_global_distrib_sender,
123 endpoint_refresh_interval),
124 71 DisRefreshInterval = mod_global_distrib_utils:opt(mod_global_distrib_sender,
125 endpoint_refresh_interval_when_empty),
126 71 GCInterval = mod_global_distrib_utils:opt(mod_global_distrib_sender, disabled_gc_interval),
127 71 State = #state{
128 server = Server,
129 supervisor = Supervisor,
130 enabled = [],
131 disabled = [],
132 pending_endpoints = [],
133 pending_gets = queue:new(),
134 refresh_interval = RefreshInterval,
135 refresh_interval_when_disconnected = DisRefreshInterval,
136 gc_interval = GCInterval
137 },
138
139 71 State2 = refresh_connections(State),
140 71 schedule_refresh(State2),
141 71 schedule_gc(State2),
142
143 71 ?LOG_INFO(ls(#{what => gd_mgr_started}, State2)),
144 71 {ok, State2}.
145
146 handle_call(get_connection_pool, From, #state{ enabled = [],
147 pending_gets = PendingGets } = State) ->
148 1 {noreply, State#state{ pending_gets = queue:in(From, PendingGets) }};
149 handle_call(get_connection_pool, _From, #state{ enabled = Enabled } = State) ->
150 35 {reply, {ok, pick_connection_pool(Enabled)}, State};
151 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = [] } = State) ->
152 10 State2 = refresh_connections(State),
153 10 case State2#state.pending_endpoints of
154 4 [] -> {reply, ok, State2};
155 6 _ -> {noreply, State2#state{ pending_endpoints_listeners = [From] }}
156 end;
157 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = Listeners } = State) ->
158
:-(
{noreply, State#state{ pending_endpoints_listeners = [From | Listeners] }};
159 handle_call(close_disabled, _From, #state{ disabled = Disabled } = State) ->
160
:-(
lists:foreach(
161 fun(#endpoint_info{ endpoint = Endpoint }) ->
162
:-(
stop_disabled(Endpoint, State)
163 end, Disabled),
164
:-(
{reply, ok, State};
165 handle_call(get_enabled_endpoints, _From, State) ->
166 16 {reply, [ CI#endpoint_info.endpoint || CI <- State#state.enabled ], State};
167 handle_call(get_disabled_endpoints, _From, State) ->
168 16 {reply, [ CI#endpoint_info.endpoint || CI <- State#state.disabled ], State};
169 handle_call(ping_proc, _From, State) ->
170 968 {reply, pong, State};
171 handle_call(get_state_info, _From, State) ->
172 10 {reply, state_info(State), State}.
173
174 handle_cast({call_timeout, FromPid, Msg}, State) ->
175
:-(
?LOG_WARNING(ls(#{what => gd_mgr_call_timeout,
176
:-(
caller_pid => FromPid, caller_msg => Msg}, State)),
177
:-(
{noreply, State};
178 handle_cast(Msg, State) ->
179
:-(
?UNEXPECTED_CAST(Msg),
180
:-(
{noreply, State}.
181
182 handle_info(refresh, State) ->
183
:-(
State2 = case State#state.pending_endpoints of
184
:-(
[] -> refresh_connections(State);
185
:-(
_ -> State % May occur if we are in the middle of force_refresh
186 end,
187
:-(
case State#state.pending_endpoints == State2#state.pending_endpoints of
188 true ->
189
:-(
ok;
190 _ ->
191
:-(
?LOG_INFO(ls(#{what => gd_mgr_refresh,
192 text => <<"A list of pending_endpoints has changed in GD server manager">>,
193 pending_endpoints_before => State#state.pending_endpoints,
194
:-(
pending_endpoints_after => State2#state.pending_endpoints}, State))
195 end,
196
:-(
schedule_refresh(State2),
197
:-(
{noreply, State2};
198 handle_info(disabled_gc, #state{ disabled = Disabled } = State) ->
199 19 StoppedEndpoints = lists:flatmap(
200 fun(#endpoint_info{ endpoint = Endpoint, conn_pool_ref = ConnPool }) ->
201 3 try cpool:get_connection(ConnPool, no_wait_for_reconnect) of
202 _ConnPid ->
203 2 []
204 catch error:no_connections ->
205 1 stop_disabled(Endpoint, State),
206 1 [Endpoint]
207 end
208 end, Disabled),
209 19 case StoppedEndpoints of
210 [] ->
211 18 ok;
212 _ ->
213 1 ?LOG_INFO(ls(#{what => gd_mgr_disabled_gc,
214 text => <<"GD server manager stops some inactive endpoints">>,
215
:-(
stopped_endpoints => StoppedEndpoints}, State))
216 end,
217 19 schedule_gc(State),
218 19 {noreply, State};
219 handle_info(process_pending_get, #state{ enabled = [] } = State) ->
220
:-(
{noreply, State};
221 handle_info(process_pending_get, #state{ pending_gets = PendingGets,
222 enabled = Enabled } = State) ->
223 1 NState =
224 case queue:out(PendingGets) of
225 {{value, From}, NewPendingGets} ->
226 1 CPoolRef = pick_connection_pool(Enabled),
227 1 gen_server:reply(From, {ok, CPoolRef}),
228 1 State#state{ pending_gets = NewPendingGets };
229 {empty, _} ->
230
:-(
State
231 end,
232 1 maybe_schedule_process_get(NState),
233 1 {noreply, NState};
234 handle_info(process_pending_endpoint,
235 #state{ pending_endpoints = [{enable, Endpoint} | RPendingEndpoints] } = State) ->
236 96 State2 =
237 case catch enable(Endpoint, State) of
238 {ok, NState0} ->
239 94 ?LOG_INFO(ls(#{what => gd_endpoint_enabled,
240 text => <<"GD server manager enables pending endpoint">>,
241
:-(
endpoint => Endpoint}, State)),
242 94 NState0;
243 Error ->
244 2 ?LOG_ERROR(ls(#{what => gd_endpoint_enabling_failed,
245 text => <<"GD server manager cannot enable endpoint">>,
246
:-(
endpoint => Endpoint, reason => Error}, State)),
247 2 State
248 end,
249
250 96 maybe_schedule_process_get(State2),
251 96 maybe_schedule_process_endpoint(RPendingEndpoints),
252 96 State3 = State2#state{ pending_endpoints = RPendingEndpoints },
253 96 State4 = maybe_notify_endpoints_listeners(State3),
254 96 {noreply, State4};
255 handle_info(process_pending_endpoint,
256 #state{ pending_endpoints = [{disable, Endpoint} | RPendingEndpoints] } = State) ->
257 3 State2 =
258 case catch disable(Endpoint, State) of
259 {ok, NState0} ->
260 3 ?LOG_INFO(ls(#{what => gd_endpoint_disabled,
261 text => <<"GD server manager disables pending endpoint">>,
262
:-(
endpoint => Endpoint}, State)),
263 3 NState0;
264 Error ->
265
:-(
?LOG_ERROR(ls(#{what => gd_endpoint_disabling_failed,
266 text => <<"GD server manager cannot disable endpoint">>,
267
:-(
endpoint => Endpoint, reason => Error}, State)),
268
:-(
State
269 end,
270
271 3 maybe_schedule_process_endpoint(RPendingEndpoints),
272 3 State3 = State2#state{ pending_endpoints = RPendingEndpoints },
273 3 State4 = maybe_notify_endpoints_listeners(State3),
274 3 {noreply, State4};
275 handle_info({'DOWN', MonitorRef, _Type, Pid, Reason}, #state{ enabled = Enabled,
276 disabled = Disabled } = State) ->
277 92 {Endpoint, Type, NState} =
278 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Enabled) of
279 {value, #endpoint_info{ endpoint = Endpoint0 }, NEnabled} ->
280 89 {Endpoint0, enabled, State#state{ enabled = NEnabled }};
281 false ->
282 3 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Disabled) of
283 {value, #endpoint_info{ endpoint = Endpoint0 }, NDisabled} ->
284 3 {Endpoint0, disabled, State#state{ disabled = NDisabled }};
285 false ->
286
:-(
{Pid, unknown, State}
287 end
288 end,
289
290 92 Reason2 =
291 case Reason of
292 92 shutdown -> normal;
293
:-(
_Other -> Reason
294 end,
295 92 ?LOG_INFO(ls(#{what => gd_endpoint_closed,
296 text => <<"Disconnected from a GD endpoint">>,
297
:-(
type => Type, endpoint => Endpoint, reason => Reason2}, State)),
298 92 {noreply, NState};
299 handle_info(Msg, State) ->
300
:-(
?UNEXPECTED_INFO(Msg),
301
:-(
{noreply, State}.
302
303 code_change(_OldVsn, State, _Extra) ->
304
:-(
{ok, State}.
305
306 terminate(_Reason, _State) ->
307 %% TODO: Cleanup
308 71 ok.
309
310 %%--------------------------------------------------------------------
311 %% Internal functions
312 %%--------------------------------------------------------------------
313
314 -spec do_call(Server :: jid:lserver(), Msg :: any()) -> any().
315 do_call(Server, Msg) ->
316 1127 MgrName = mod_global_distrib_utils:server_to_mgr_name(Server),
317 1127 try
318 1127 gen_server:call(MgrName, Msg)
319 catch exit:{timeout,_} = Reason:Stacktrace ->
320
:-(
catch gen_server:cast(MgrName, {call_timeout, self(), Msg}),
321
:-(
erlang:raise(exit, Reason, Stacktrace)
322 end.
323
324 -spec schedule_refresh(State :: state()) -> state().
325 schedule_refresh(#state{ refresh_interval = Interval, last_endpoints = [_|_] } = State) ->
326 68 do_schedule_refresh(State, Interval);
327 schedule_refresh(#state{ refresh_interval_when_disconnected = Interval } = State) ->
328 %% Try more often by default when get_endpoints returns empty list
329 3 do_schedule_refresh(State, Interval).
330
331 do_schedule_refresh(State, Interval) ->
332 71 erlang:send_after(timer:seconds(Interval), self(), refresh),
333 71 State.
334
335 -spec schedule_gc(State :: state()) -> state().
336 schedule_gc(#state{ gc_interval = Interval } = State) ->
337 90 erlang:send_after(timer:seconds(Interval), self(), disabled_gc),
338 90 State.
339
340 -spec maybe_schedule_process_endpoint(PendingEndpoints :: list()) -> any().
341 maybe_schedule_process_endpoint([]) ->
342 80 nop;
343 maybe_schedule_process_endpoint(_) ->
344 99 self() ! process_pending_endpoint.
345
346 -spec maybe_schedule_process_get(state()) -> any().
347 maybe_schedule_process_get(#state{ pending_gets = PendingGets, enabled = Enabled }) ->
348 97 case PendingGets == queue:new() of
349 96 true -> nop;
350 false ->
351 1 Enabled =/= [] orelse error(enabled_is_empty),
352 1 self() ! process_pending_get
353 end.
354
355 -spec maybe_notify_endpoints_listeners(state()) -> state().
356 maybe_notify_endpoints_listeners(#state{ pending_endpoints = [],
357 pending_endpoints_listeners = Listeners } = State) ->
358 73 lists:foreach(fun(Listener) -> gen_server:reply(Listener, ok) end, Listeners),
359 73 State#state{ pending_endpoints_listeners = [] };
360 maybe_notify_endpoints_listeners(State) ->
361 26 State.
362
363 -spec pick_connection_pool(Enabled :: [endpoint_info()]) -> pid() | no_connections.
364 pick_connection_pool(Enabled) ->
365 36 #endpoint_info{ conn_pool_ref = PoolRef } = lists:nth(rand:uniform(length(Enabled)), Enabled),
366 36 PoolRef.
367
368 -spec refresh_connections(State :: state()) -> state().
369 refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoints,
370 last_endpoints = LastEndpoints } = State) ->
371 81 ?LOG_DEBUG(ls(#{what => gd_refreshing_endpoints}, State)),
372 81 {ok, NewEndpoints} = get_endpoints(Server),
373 81 case NewEndpoints of
374 LastEndpoints ->
375 5 nothing_new;
376 _ ->
377 76 ?LOG_INFO(ls(#{what => gd_endpoints_change,
378 old_endpoints => LastEndpoints,
379
:-(
new_endpoints => NewEndpoints}, State))
380 end,
381 81 ?LOG_DEBUG(ls(#{what => gd_fetched_endpoints, fetched_endpoints => NewEndpoints}, State)),
382
383 81 NPendingEndpoints = resolve_pending(NewEndpoints, State#state.enabled),
384 81 log_endpoints_changes(Server, NPendingEndpoints, State),
385
386 81 case PendingEndpoints of
387 80 [] -> maybe_schedule_process_endpoint(NPendingEndpoints);
388 1 _ -> already_scheduled
389 end,
390
391 81 FinalPendingEndpoints = PendingEndpoints ++ NPendingEndpoints,
392
393 81 case FinalPendingEndpoints of
394 [] ->
395 7 no_log;
396 _ ->
397 74 ?LOG_DEBUG(ls(#{what => gd_endpoints_update_scheduled,
398 new_changes => NPendingEndpoints,
399 pending_changes => FinalPendingEndpoints,
400 new_changes_length => length(NPendingEndpoints),
401 pending_changes_length => length(FinalPendingEndpoints)},
402 74 State))
403 end,
404 81 State#state{ pending_endpoints = FinalPendingEndpoints, last_endpoints = NewEndpoints }.
405
406 -spec get_endpoints(Server :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}.
407 get_endpoints(Server) ->
408 81 {ok, EndpointsToResolve} =
409 case mongoose_config:lookup_opt({global_distrib_addr, Server}) of
410 81 {error, not_found} -> mod_global_distrib_mapping:endpoints(Server);
411
:-(
{ok, Endpoints} -> {ok, Endpoints}
412 end,
413 81 Resolved = mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve),
414 81 {ok, Resolved}.
415
416 -spec resolve_pending(NewEndpointList :: [mod_global_distrib_utils:endpoint()],
417 OldEnabled :: [endpoint_pid_tuple()]) ->
418 endpoints_changes().
419 resolve_pending([], []) ->
420 81 [];
421 resolve_pending([], [#endpoint_info{ endpoint = ToDisable } | RToDisable]) ->
422 3 [{disable, ToDisable} | resolve_pending([], RToDisable)];
423 resolve_pending([MaybeToEnable | RNewEndpoints], OldEnabled) ->
424 103 case lists:keytake(MaybeToEnable, #endpoint_info.endpoint, OldEnabled) of
425 96 false -> [{enable, MaybeToEnable} | resolve_pending(RNewEndpoints, OldEnabled)];
426 7 {value, _, NOldEnabled} -> resolve_pending(RNewEndpoints, NOldEnabled)
427 end.
428
429 -spec log_endpoints_changes(Server :: jid:lserver(),
430 EndpointsChanges :: endpoints_changes(), term()) -> any().
431 log_endpoints_changes(Server, [], State) ->
432 7 ?LOG_DEBUG(ls(#{what => gd_same_endpoints, server => Server,
433 7 text => <<"No endpoint changes">>}, State));
434 log_endpoints_changes(Server, EndpointsChanges, State) ->
435 74 ?LOG_INFO(ls(#{what => gd_endpoints_changes, server => Server,
436 96 to_enable => [ E || {enable, E} <- EndpointsChanges ],
437 3 to_disable => [ E || {disable, E} <- EndpointsChanges ]}, State)).
438
439 -spec enable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
440 enable(Endpoint, #state{ disabled = Disabled, supervisor = Supervisor,
441 enabled = Enabled, server = Server } = State) ->
442 96 case lists:keytake(Endpoint, #endpoint_info.endpoint, Disabled) of
443 false ->
444 96 case catch mod_global_distrib_server_sup:start_pool(Supervisor, Endpoint, Server) of
445 {ok, ConnPoolRef, ConnPoolPid} ->
446 94 MonitorRef = monitor(process, ConnPoolPid),
447 94 EndpointInfo = #endpoint_info{
448 endpoint = Endpoint,
449 conn_pool_ref = ConnPoolRef,
450 conn_pool_pid = ConnPoolPid,
451 monitor_ref = MonitorRef
452 },
453 94 {ok, State#state{ enabled = [EndpointInfo | Enabled] }};
454 Error ->
455 2 {error, Error}
456 end;
457 {value, EndpointInfo, NewDisabled} ->
458
:-(
{ok, State#state{ enabled = [EndpointInfo | Enabled], disabled = NewDisabled }}
459 end.
460
461 -spec disable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
462 disable(Endpoint, #state{ disabled = Disabled, enabled = Enabled } = State) ->
463 3 {value, EndpointInfo, NewEnabled} = lists:keytake(Endpoint, #endpoint_info.endpoint, Enabled),
464 3 {ok, State#state{ enabled = NewEnabled, disabled = [EndpointInfo | Disabled] }}.
465
466 -spec stop_disabled(Endpoint :: endpoint(), State :: state()) -> any().
467 stop_disabled(Endpoint, State) ->
468 1 case catch mod_global_distrib_server_sup:stop_pool(
469 State#state.supervisor, Endpoint) of
470 ok ->
471
:-(
ok;
472 Error ->
473 1 ?LOG_ERROR(ls(#{what => gd_cannot_close_disabled_connection,
474 reason => Error, endpoint => Endpoint},
475
:-(
State))
476 end.
477
478 state_info(#state{
479 server = Server,
480 supervisor = Supervisor,
481 enabled = Enabled,
482 disabled = Disabled,
483 pending_endpoints = PendingEndpoints,
484 pending_gets = PendingGets,
485 refresh_interval = RefreshInterval,
486 refresh_interval_when_disconnected = DisRefreshInterval,
487 gc_interval = GCInterval,
488 pending_endpoints_listeners = PendingEndpointsListeners,
489 last_endpoints = LastEndpoints
490 }) ->
491 424 #{server => Server,
492 supervisor => Supervisor,
493 enabled => Enabled,
494 disabled => Disabled,
495 pending_endpoints => PendingEndpoints,
496 pending_gets => PendingGets,
497 refresh_interval => RefreshInterval,
498 refresh_interval_when_disconnected => DisRefreshInterval,
499 gc_interval => GCInterval,
500 pending_endpoints_listeners => PendingEndpointsListeners,
501 last_endpoints => LastEndpoints}.
502
503
504 %% Log State
505 ls(LogMeta, State) ->
506 414 maps:merge(state_info(State), LogMeta).
Line Hits Source