./ct_report/coverage/mod_global_distrib_server_mgr.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_global_distrib_server_mgr).
17 -author('piotr.nosek@erlang-solutions.com').
18
19 -include("mongoose.hrl").
20
21 -behaviour(gen_server).
22
23 -export([start_link/2]).
24 -export([get_connection/1, ping_proc/1, get_state_info/1]).
25 -export([force_refresh/1, close_disabled/1]).
26 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
27
28 %% Debug
29 -export([get_enabled_endpoints/1, get_disabled_endpoints/1]).
30
31 -ignore_xref([close_disabled/1, force_refresh/1, get_disabled_endpoints/1,
32 get_enabled_endpoints/1, get_state_info/1, start_link/2]).
33
34 -type endpoint() :: mod_global_distrib_utils:endpoint().
35 -type endpoint_pid_tuple() :: {endpoint(), pid()}.
36
37 -record(endpoint_info, {
38 endpoint :: endpoint(),
39 conn_pool_ref :: atom(),
40 conn_pool_pid :: pid(),
41 monitor_ref :: reference()
42 }).
43
44 -type endpoint_info() :: #endpoint_info{}.
45 -type endpoints_changes() :: [{enable | disable, endpoint()}].
46
47 -record(state, {
48 server :: jid:lserver(),
49 supervisor :: pid(),
50 enabled :: [endpoint_info()],
51 disabled :: [endpoint_info()],
52 pending_endpoints :: endpoints_changes(),
53 pending_gets :: queue:queue(tuple()),
54 refresh_interval :: pos_integer(),
55 refresh_interval_when_disconnected :: pos_integer(),
56 gc_interval :: pos_integer(),
57 %% Used by force_refresh to block until refresh is fully done.
58 %% Listeners are notified only once and then this list is cleared.
59 pending_endpoints_listeners = [] :: [pid()],
60 %% Containts last result of get_endpoints
61 last_endpoints :: [endpoint()] | undefined
62 }).
63
64 -type state() :: #state{}.
65 %%--------------------------------------------------------------------
66 %% API
67 %%--------------------------------------------------------------------
68
69 -spec start_link(Server :: jid:lserver(), ServerSup :: pid()) -> {ok, pid()} | {error, any()}.
70 start_link(Server, ServerSup) when is_binary(Server) ->
71
:-(
Name = mod_global_distrib_utils:server_to_mgr_name(Server),
72
:-(
gen_server:start_link({local, Name}, ?MODULE, [Server, ServerSup], []).
73
74 %% Will return only when endpoints changes (if there are any) are fully applied.
75 %% If force refresh is already in progress, new concurrent calls won't trigger another refresh
76 %% but will return 'ok' when the current one is finished.
77 -spec force_refresh(Server :: jid:lserver()) -> ok.
78 force_refresh(Server) ->
79
:-(
do_call(Server, force_refresh).
80
81 -spec close_disabled(Server :: jid:lserver()) -> ok.
82 close_disabled(Server) ->
83
:-(
do_call(Server, close_disabled).
84
85 -spec get_connection(Server :: jid:lserver()) -> {ok, pid()} | no_return().
86 get_connection(Server) ->
87
:-(
{ok, CPoolRef} = do_call(Server, get_connection_pool),
88
:-(
{ok, cpool:get_connection(CPoolRef)}.
89
90 %% `ping_proc` instead of just `ping` to emphasize that this call does not ping
91 %% some remote server but the manager process instead
92 -spec ping_proc(Server :: jid:lserver()) -> pong | pang.
93 ping_proc(Server) ->
94
:-(
case catch do_call(Server, ping_proc) of
95
:-(
pong -> pong;
96
:-(
_Error -> pang
97 end.
98
99 -spec get_state_info(Server :: jid:lserver()) -> map().
100 get_state_info(Server) ->
101
:-(
do_call(Server, get_state_info).
102
103 %%--------------------------------------------------------------------
104 %% Debug API
105 %%--------------------------------------------------------------------
106
107 -spec get_enabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
108 get_enabled_endpoints(Server) ->
109
:-(
do_call(Server, get_enabled_endpoints).
110
111 -spec get_disabled_endpoints(Server :: jid:lserver()) -> [endpoint()].
112 get_disabled_endpoints(Server) ->
113
:-(
do_call(Server, get_disabled_endpoints).
114
115 %%--------------------------------------------------------------------
116 %% gen_server callbacks
117 %%--------------------------------------------------------------------
118
119 init([Server, Supervisor]) ->
120
:-(
process_flag(trap_exit, true),
121
122
:-(
RefreshInterval = mod_global_distrib_utils:opt(mod_global_distrib_sender,
123 endpoint_refresh_interval),
124
:-(
DisRefreshInterval = mod_global_distrib_utils:opt(mod_global_distrib_sender,
125 endpoint_refresh_interval_when_empty),
126
:-(
GCInterval = mod_global_distrib_utils:opt(mod_global_distrib_sender, disabled_gc_interval),
127
:-(
State = #state{
128 server = Server,
129 supervisor = Supervisor,
130 enabled = [],
131 disabled = [],
132 pending_endpoints = [],
133 pending_gets = queue:new(),
134 refresh_interval = RefreshInterval,
135 refresh_interval_when_disconnected = DisRefreshInterval,
136 gc_interval = GCInterval
137 },
138
139
:-(
State2 = refresh_connections(State),
140
:-(
schedule_refresh(State2),
141
:-(
schedule_gc(State2),
142
143
:-(
?LOG_INFO(ls(#{what => gd_mgr_started}, State2)),
144
:-(
{ok, State2}.
145
146 handle_call(get_connection_pool, From, #state{ enabled = [],
147 pending_gets = PendingGets } = State) ->
148
:-(
{noreply, State#state{ pending_gets = queue:in(From, PendingGets) }};
149 handle_call(get_connection_pool, _From, #state{ enabled = Enabled } = State) ->
150
:-(
{reply, {ok, pick_connection_pool(Enabled)}, State};
151 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = [] } = State) ->
152
:-(
State2 = refresh_connections(State),
153
:-(
case State2#state.pending_endpoints of
154
:-(
[] -> {reply, ok, State2};
155
:-(
_ -> {noreply, State2#state{ pending_endpoints_listeners = [From] }}
156 end;
157 handle_call(force_refresh, From, #state{ pending_endpoints_listeners = Listeners } = State) ->
158
:-(
{noreply, State#state{ pending_endpoints_listeners = [From | Listeners] }};
159 handle_call(close_disabled, _From, #state{ disabled = Disabled } = State) ->
160
:-(
lists:foreach(
161 fun(#endpoint_info{ endpoint = Endpoint }) ->
162
:-(
stop_disabled(Endpoint, State)
163 end, Disabled),
164
:-(
{reply, ok, State};
165 handle_call(get_enabled_endpoints, _From, State) ->
166
:-(
{reply, [ CI#endpoint_info.endpoint || CI <- State#state.enabled ], State};
167 handle_call(get_disabled_endpoints, _From, State) ->
168
:-(
{reply, [ CI#endpoint_info.endpoint || CI <- State#state.disabled ], State};
169 handle_call(ping_proc, _From, State) ->
170
:-(
{reply, pong, State};
171 handle_call(get_state_info, _From, State) ->
172
:-(
{reply, state_info(State), State}.
173
174 handle_cast({call_timeout, FromPid, Msg}, State) ->
175
:-(
?LOG_WARNING(ls(#{what => gd_mgr_call_timeout,
176
:-(
caller_pid => FromPid, caller_msg => Msg}, State)),
177
:-(
{noreply, State};
178 handle_cast(Msg, State) ->
179
:-(
?UNEXPECTED_CAST(Msg),
180
:-(
{noreply, State}.
181
182 handle_info(refresh, State) ->
183
:-(
State2 = case State#state.pending_endpoints of
184
:-(
[] -> refresh_connections(State);
185
:-(
_ -> State % May occur if we are in the middle of force_refresh
186 end,
187
:-(
case State#state.pending_endpoints == State2#state.pending_endpoints of
188 true ->
189
:-(
ok;
190 _ ->
191
:-(
?LOG_INFO(ls(#{what => gd_mgr_refresh,
192 text => <<"A list of pending_endpoints has changed in GD server manager">>,
193 pending_endpoints_before => State#state.pending_endpoints,
194
:-(
pending_endpoints_after => State2#state.pending_endpoints}, State))
195 end,
196
:-(
schedule_refresh(State2),
197
:-(
{noreply, State2};
198 handle_info(disabled_gc, #state{ disabled = Disabled } = State) ->
199
:-(
StoppedEndpoints = lists:flatmap(
200 fun(#endpoint_info{ endpoint = Endpoint, conn_pool_ref = ConnPool }) ->
201
:-(
try cpool:get_connection(ConnPool, no_wait_for_reconnect) of
202 _ConnPid ->
203
:-(
[]
204 catch error:no_connections ->
205
:-(
stop_disabled(Endpoint, State),
206
:-(
[Endpoint]
207 end
208 end, Disabled),
209
:-(
case StoppedEndpoints of
210 [] ->
211
:-(
ok;
212 _ ->
213
:-(
?LOG_INFO(ls(#{what => gd_mgr_disabled_gc,
214 text => <<"GD server manager stops some inactive endpoints">>,
215
:-(
stopped_endpoints => StoppedEndpoints}, State))
216 end,
217
:-(
schedule_gc(State),
218
:-(
{noreply, State};
219 handle_info(process_pending_get, #state{ enabled = [] } = State) ->
220
:-(
{noreply, State};
221 handle_info(process_pending_get, #state{ pending_gets = PendingGets,
222 enabled = Enabled } = State) ->
223
:-(
NState =
224 case queue:out(PendingGets) of
225 {{value, From}, NewPendingGets} ->
226
:-(
CPoolRef = pick_connection_pool(Enabled),
227
:-(
gen_server:reply(From, {ok, CPoolRef}),
228
:-(
State#state{ pending_gets = NewPendingGets };
229 {empty, _} ->
230
:-(
State
231 end,
232
:-(
maybe_schedule_process_get(NState),
233
:-(
{noreply, NState};
234 handle_info(process_pending_endpoint,
235 #state{ pending_endpoints = [{enable, Endpoint} | RPendingEndpoints] } = State) ->
236
:-(
State2 =
237 case catch enable(Endpoint, State) of
238 {ok, NState0} ->
239
:-(
?LOG_INFO(ls(#{what => gd_endpoint_enabled,
240 text => <<"GD server manager enables pending endpoint">>,
241
:-(
endpoint => Endpoint}, State)),
242
:-(
NState0;
243 Error ->
244
:-(
?LOG_ERROR(ls(#{what => gd_endpoint_enabling_failed,
245 text => <<"GD server manager cannot enable endpoint">>,
246
:-(
endpoint => Endpoint, reason => Error}, State)),
247
:-(
State
248 end,
249
250
:-(
maybe_schedule_process_get(State2),
251
:-(
maybe_schedule_process_endpoint(RPendingEndpoints),
252
:-(
State3 = State2#state{ pending_endpoints = RPendingEndpoints },
253
:-(
State4 = maybe_notify_endpoints_listeners(State3),
254
:-(
{noreply, State4};
255 handle_info(process_pending_endpoint,
256 #state{ pending_endpoints = [{disable, Endpoint} | RPendingEndpoints] } = State) ->
257
:-(
State2 =
258 case catch disable(Endpoint, State) of
259 {ok, NState0} ->
260
:-(
?LOG_INFO(ls(#{what => gd_endpoint_disabled,
261 text => <<"GD server manager disables pending endpoint">>,
262
:-(
endpoint => Endpoint}, State)),
263
:-(
NState0;
264 Error ->
265
:-(
?LOG_ERROR(ls(#{what => gd_endpoint_disabling_failed,
266 text => <<"GD server manager cannot disable endpoint">>,
267
:-(
endpoint => Endpoint, reason => Error}, State)),
268
:-(
State
269 end,
270
271
:-(
maybe_schedule_process_endpoint(RPendingEndpoints),
272
:-(
State3 = State2#state{ pending_endpoints = RPendingEndpoints },
273
:-(
State4 = maybe_notify_endpoints_listeners(State3),
274
:-(
{noreply, State4};
275 handle_info({'DOWN', MonitorRef, _Type, Pid, Reason}, #state{ enabled = Enabled,
276 disabled = Disabled } = State) ->
277
:-(
{Endpoint, Type, NState} =
278 case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Enabled) of
279 {value, #endpoint_info{ endpoint = Endpoint0 }, NEnabled} ->
280
:-(
{Endpoint0, enabled, State#state{ enabled = NEnabled }};
281 false ->
282
:-(
case lists:keytake(MonitorRef, #endpoint_info.monitor_ref, Disabled) of
283 {value, #endpoint_info{ endpoint = Endpoint0 }, NDisabled} ->
284
:-(
{Endpoint0, disabled, State#state{ disabled = NDisabled }};
285 false ->
286
:-(
{Pid, unknown, State}
287 end
288 end,
289
290
:-(
Reason2 =
291 case Reason of
292
:-(
shutdown -> normal;
293
:-(
_Other -> Reason
294 end,
295
:-(
?LOG_INFO(ls(#{what => gd_endpoint_closed,
296 text => <<"Disconnected from a GD endpoint">>,
297
:-(
type => Type, endpoint => Endpoint, reason => Reason2}, State)),
298
:-(
{noreply, NState};
299 handle_info(Msg, State) ->
300
:-(
?UNEXPECTED_INFO(Msg),
301
:-(
{noreply, State}.
302
303 code_change(_OldVsn, State, _Extra) ->
304
:-(
{ok, State}.
305
306 terminate(_Reason, _State) ->
307 %% TODO: Cleanup
308
:-(
ok.
309
310 %%--------------------------------------------------------------------
311 %% Internal functions
312 %%--------------------------------------------------------------------
313
314 -spec do_call(Server :: jid:lserver(), Msg :: any()) -> any().
315 do_call(Server, Msg) ->
316
:-(
MgrName = mod_global_distrib_utils:server_to_mgr_name(Server),
317
:-(
try
318
:-(
gen_server:call(MgrName, Msg)
319 catch exit:{timeout,_} = Reason:Stacktrace ->
320
:-(
catch gen_server:cast(MgrName, {call_timeout, self(), Msg}),
321
:-(
erlang:raise(exit, Reason, Stacktrace)
322 end.
323
324 -spec schedule_refresh(State :: state()) -> state().
325 schedule_refresh(#state{ refresh_interval = Interval, last_endpoints = [_|_] } = State) ->
326
:-(
do_schedule_refresh(State, Interval);
327 schedule_refresh(#state{ refresh_interval_when_disconnected = Interval } = State) ->
328 %% Try more often by default when get_endpoints returns empty list
329
:-(
do_schedule_refresh(State, Interval).
330
331 do_schedule_refresh(State, Interval) ->
332
:-(
erlang:send_after(timer:seconds(Interval), self(), refresh),
333
:-(
State.
334
335 -spec schedule_gc(State :: state()) -> state().
336 schedule_gc(#state{ gc_interval = Interval } = State) ->
337
:-(
erlang:send_after(timer:seconds(Interval), self(), disabled_gc),
338
:-(
State.
339
340 -spec maybe_schedule_process_endpoint(PendingEndpoints :: list()) -> any().
341 maybe_schedule_process_endpoint([]) ->
342
:-(
nop;
343 maybe_schedule_process_endpoint(_) ->
344
:-(
self() ! process_pending_endpoint.
345
346 -spec maybe_schedule_process_get(state()) -> any().
347 maybe_schedule_process_get(#state{ pending_gets = PendingGets, enabled = Enabled }) ->
348
:-(
case PendingGets == queue:new() of
349
:-(
true -> nop;
350 false ->
351
:-(
Enabled =/= [] orelse error(enabled_is_empty),
352
:-(
self() ! process_pending_get
353 end.
354
355 -spec maybe_notify_endpoints_listeners(state()) -> state().
356 maybe_notify_endpoints_listeners(#state{ pending_endpoints = [],
357 pending_endpoints_listeners = Listeners } = State) ->
358
:-(
lists:foreach(fun(Listener) -> gen_server:reply(Listener, ok) end, Listeners),
359
:-(
State#state{ pending_endpoints_listeners = [] };
360 maybe_notify_endpoints_listeners(State) ->
361
:-(
State.
362
363 -spec pick_connection_pool(Enabled :: [endpoint_info()]) -> pid() | no_connections.
364 pick_connection_pool(Enabled) ->
365
:-(
#endpoint_info{ conn_pool_ref = PoolRef } = lists:nth(rand:uniform(length(Enabled)), Enabled),
366
:-(
PoolRef.
367
368 -spec refresh_connections(State :: state()) -> state().
369 refresh_connections(#state{ server = Server, pending_endpoints = PendingEndpoints,
370 last_endpoints = LastEndpoints } = State) ->
371
:-(
?LOG_DEBUG(ls(#{what => gd_refreshing_endpoints}, State)),
372
:-(
{ok, NewEndpoints} = get_endpoints(Server),
373
:-(
case NewEndpoints of
374 LastEndpoints ->
375
:-(
nothing_new;
376 _ ->
377
:-(
?LOG_INFO(ls(#{what => gd_endpoints_change,
378 old_endpoints => LastEndpoints,
379
:-(
new_endpoints => NewEndpoints}, State))
380 end,
381
:-(
?LOG_DEBUG(ls(#{what => gd_fetched_endpoints, fetched_endpoints => NewEndpoints}, State)),
382
383
:-(
NPendingEndpoints = resolve_pending(NewEndpoints, State#state.enabled),
384
:-(
log_endpoints_changes(Server, NPendingEndpoints, State),
385
386
:-(
case PendingEndpoints of
387
:-(
[] -> maybe_schedule_process_endpoint(NPendingEndpoints);
388
:-(
_ -> already_scheduled
389 end,
390
391
:-(
FinalPendingEndpoints = PendingEndpoints ++ NPendingEndpoints,
392
393
:-(
case FinalPendingEndpoints of
394 [] ->
395
:-(
no_log;
396 _ ->
397
:-(
?LOG_DEBUG(ls(#{what => gd_endpoints_update_scheduled,
398 new_changes => NPendingEndpoints,
399 pending_changes => FinalPendingEndpoints,
400 new_changes_length => length(NPendingEndpoints),
401 pending_changes_length => length(FinalPendingEndpoints)},
402
:-(
State))
403 end,
404
:-(
State#state{ pending_endpoints = FinalPendingEndpoints, last_endpoints = NewEndpoints }.
405
406 -spec get_endpoints(Server :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}.
407 get_endpoints(Server) ->
408
:-(
{ok, EndpointsToResolve} =
409 case mongoose_config:lookup_opt({global_distrib_addr, Server}) of
410
:-(
{error, not_found} -> mod_global_distrib_mapping:endpoints(Server);
411
:-(
{ok, Endpoints} -> {ok, Endpoints}
412 end,
413
:-(
Resolved = mod_global_distrib_utils:resolve_endpoints(EndpointsToResolve),
414
:-(
{ok, Resolved}.
415
416 -spec resolve_pending(NewEndpointList :: [mod_global_distrib_utils:endpoint()],
417 OldEnabled :: [endpoint_pid_tuple()]) ->
418 endpoints_changes().
419 resolve_pending([], []) ->
420
:-(
[];
421 resolve_pending([], [#endpoint_info{ endpoint = ToDisable } | RToDisable]) ->
422
:-(
[{disable, ToDisable} | resolve_pending([], RToDisable)];
423 resolve_pending([MaybeToEnable | RNewEndpoints], OldEnabled) ->
424
:-(
case lists:keytake(MaybeToEnable, #endpoint_info.endpoint, OldEnabled) of
425
:-(
false -> [{enable, MaybeToEnable} | resolve_pending(RNewEndpoints, OldEnabled)];
426
:-(
{value, _, NOldEnabled} -> resolve_pending(RNewEndpoints, NOldEnabled)
427 end.
428
429 -spec log_endpoints_changes(Server :: jid:lserver(),
430 EndpointsChanges :: endpoints_changes(), term()) -> any().
431 log_endpoints_changes(Server, [], State) ->
432
:-(
?LOG_DEBUG(ls(#{what => gd_same_endpoints, server => Server,
433
:-(
text => <<"No endpoint changes">>}, State));
434 log_endpoints_changes(Server, EndpointsChanges, State) ->
435
:-(
?LOG_INFO(ls(#{what => gd_endpoints_changes, server => Server,
436
:-(
to_enable => [ E || {enable, E} <- EndpointsChanges ],
437
:-(
to_disable => [ E || {disable, E} <- EndpointsChanges ]}, State)).
438
439 -spec enable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
440 enable(Endpoint, #state{ disabled = Disabled, supervisor = Supervisor,
441 enabled = Enabled, server = Server } = State) ->
442
:-(
case lists:keytake(Endpoint, #endpoint_info.endpoint, Disabled) of
443 false ->
444
:-(
case catch mod_global_distrib_server_sup:start_pool(Supervisor, Endpoint, Server) of
445 {ok, ConnPoolRef, ConnPoolPid} ->
446
:-(
MonitorRef = monitor(process, ConnPoolPid),
447
:-(
EndpointInfo = #endpoint_info{
448 endpoint = Endpoint,
449 conn_pool_ref = ConnPoolRef,
450 conn_pool_pid = ConnPoolPid,
451 monitor_ref = MonitorRef
452 },
453
:-(
{ok, State#state{ enabled = [EndpointInfo | Enabled] }};
454 Error ->
455
:-(
{error, Error}
456 end;
457 {value, EndpointInfo, NewDisabled} ->
458
:-(
{ok, State#state{ enabled = [EndpointInfo | Enabled], disabled = NewDisabled }}
459 end.
460
461 -spec disable(Endpoint :: endpoint(), State :: state()) -> {ok, state()} | {error, any()}.
462 disable(Endpoint, #state{ disabled = Disabled, enabled = Enabled } = State) ->
463
:-(
{value, EndpointInfo, NewEnabled} = lists:keytake(Endpoint, #endpoint_info.endpoint, Enabled),
464
:-(
{ok, State#state{ enabled = NewEnabled, disabled = [EndpointInfo | Disabled] }}.
465
466 -spec stop_disabled(Endpoint :: endpoint(), State :: state()) -> any().
467 stop_disabled(Endpoint, State) ->
468
:-(
case catch mod_global_distrib_server_sup:stop_pool(
469 State#state.supervisor, Endpoint) of
470 ok ->
471
:-(
ok;
472 Error ->
473
:-(
?LOG_ERROR(ls(#{what => gd_cannot_close_disabled_connection,
474 reason => Error, endpoint => Endpoint},
475
:-(
State))
476 end.
477
478 state_info(#state{
479 server = Server,
480 supervisor = Supervisor,
481 enabled = Enabled,
482 disabled = Disabled,
483 pending_endpoints = PendingEndpoints,
484 pending_gets = PendingGets,
485 refresh_interval = RefreshInterval,
486 refresh_interval_when_disconnected = DisRefreshInterval,
487 gc_interval = GCInterval,
488 pending_endpoints_listeners = PendingEndpointsListeners,
489 last_endpoints = LastEndpoints
490 }) ->
491
:-(
#{server => Server,
492 supervisor => Supervisor,
493 enabled => Enabled,
494 disabled => Disabled,
495 pending_endpoints => PendingEndpoints,
496 pending_gets => PendingGets,
497 refresh_interval => RefreshInterval,
498 refresh_interval_when_disconnected => DisRefreshInterval,
499 gc_interval => GCInterval,
500 pending_endpoints_listeners => PendingEndpointsListeners,
501 last_endpoints => LastEndpoints}.
502
503
504 %% Log State
505 ls(LogMeta, State) ->
506
:-(
maps:merge(state_info(State), LogMeta).
Line Hits Source