./ct_report/coverage/mongoose_wpool_mgr.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 %%==============================================================================
17 %% @doc
18 %% This gen_server is responsible for starting and restarting pools of
19 %% given type
20 %%==============================================================================
21 -module(mongoose_wpool_mgr).
22
23 -behaviour(gen_server).
24
25 %% API
26 -export([start_link/1]).
27 -export([name/1]).
28 -export([start/5]).
29 -export([stop/3]).
30
31 %% gen_server callbacks
32 -export([init/1,
33 handle_call/3,
34 handle_cast/2,
35 handle_info/2,
36 terminate/2,
37 code_change/3]).
38
39 -ignore_xref([start_link/1]).
40
41 -include("mongoose_logger.hrl").
42
43 -record(state, {type, pools, monitors}).
44
45 -type start_request() :: {start_pool,
46 mongoose_wpool:scope(), mongoose_wpool:tag(),
47 [any()], [any()]}.
48
49 -type stop_request() :: {stop_pool,
50 mongoose_wpool:scope(), mongoose_wpool:tag()}.
51
52 -type request() :: start_request() | stop_request().
53
54 -type monitored_pool() :: {mongoose_wpool:pool_type(), mongoose_wpool:scope(), mongoose_wpool:tag()}.
55
56 -type known_pools() :: #{monitored_pool() := #{monitor := undefined | reference(),
57 wpool_opts := [wpool:option()],
58 conn_opts := [term()]}}.
59
60 -type state() :: #state{type :: atom(),
61 pools :: known_pools(),
62 monitors :: #{reference() := monitored_pool()}}.
63 -type reply() :: ok.
64
65 %%%===================================================================
66 %%% API
67 %%%===================================================================
68
69 start_link(Type) ->
70 214 gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
71
72 start(Type, Host, Tag, PoolOpts, ConnOpts) ->
73 317 ok = ensure_started(Type),
74 317 gen_server:call(name(Type), {start_pool, Host, Tag, PoolOpts, ConnOpts}).
75
76 stop(Type, Host, Tag) ->
77 311 gen_server:call(name(Type), {stop_pool, Host, Tag}).
78
79 -spec name(mongoose_wpool:pool_type()) -> mongoose_wpool:proc_name().
80 name(Type) ->
81 1373 list_to_atom("mongoose_wpool_" ++ atom_to_list(Type) ++ "_mgr").
82 %%%===================================================================
83 %%% gen_server callbacks
84 %%%===================================================================
85
86 -spec init([any()]) -> {ok, state()}.
87 init([Type]) ->
88 214 {ok, #state{type = Type, pools = #{}, monitors = #{}}}.
89
90 -spec handle_call(request(), {pid(), term()}, state()) ->
91 {reply, reply(), state()}.
92 handle_call({start_pool, Host, Tag, WpoolOpts, ConnOpts}, _From,
93 #state{type = Type, pools = Pools, monitors = Monitors} = State) ->
94 317 ?LOG_INFO(#{what => pool_starting, pool_type => Type, tag => Tag, server => Host,
95 317 pool_opts => WpoolOpts}),
96 317 case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of
97 {_, Pid} = OkReply when is_pid(Pid) ->
98 317 Ref = erlang:monitor(process, Pid),
99 317 Key = {Type, Host, Tag},
100 317 NewMonitors = Monitors#{Ref => Key},
101 317 NewPools = Pools#{Key => #{monitor => Ref,
102 wpool_opts => WpoolOpts,
103 conn_opts => ConnOpts}},
104 317 {reply, OkReply, State#state{pools = NewPools, monitors = NewMonitors}};
105 Other ->
106
:-(
?LOG_ERROR(#{what => pool_start_failed,
107
:-(
pool_type => Type, server => Host, reason => Other}),
108
:-(
{reply, Other, State}
109 end;
110 handle_call({stop_pool, Host, Tag}, _From,
111 #state{type = Type, pools = Pools, monitors = Monitors} = State) ->
112 311 Key = {Type, Host, Tag},
113 311 case maps:take(Key, Pools) of
114 error ->
115
:-(
{reply, {error, unknown_pool}, State};
116 {Pool, NewPools} ->
117 311 {Result, NewMonitors} = maybe_stop_pool(Key, Pool, Monitors),
118 311 {reply, Result, State#state{pools = NewPools, monitors = NewMonitors}}
119 end.
120
121 handle_cast(_Msg, State) ->
122
:-(
{noreply, State}.
123
124 handle_info({'DOWN', MRef, process, _Pid, Reason}, #state{monitors = Monitors} = State) ->
125
:-(
case maps:take(MRef, Monitors) of
126 error ->
127
:-(
{noreply, State};
128 {Details, NewMonitors0} ->
129
:-(
NewState = restart_pool(Reason, Details, State#state{monitors = NewMonitors0}),
130
:-(
{noreply, NewState}
131 end;
132 handle_info({restart, PoolKey}, #state{pools = Pools} = State) ->
133
:-(
case maps:get(PoolKey, Pools, undefined) of
134 undefined -> %% The pool was stopped in the meantime, no need to restart it
135
:-(
{noreply, State};
136 Pool ->
137
:-(
start_or_schedule_another_restart(PoolKey, Pool, State)
138 end.
139
140 terminate(_Reason, _State) ->
141
:-(
ok.
142
143 code_change(_OldVsn, State, _Extra) ->
144
:-(
{ok, State}.
145
146 %%%===================================================================
147 %%% Internal functions
148 %%%===================================================================
149 ensure_started(Type) ->
150 317 Name = name(Type),
151 317 case whereis(Name) of
152 undefined ->
153 214 do_start_type_sup(Type);
154 _ ->
155 103 ok
156 end.
157 do_start_type_sup(Type) ->
158 214 ChildSpec = mongoose_wpool_sup:child_spec(Type),
159 214 case supervisor:start_child(mongoose_wpool_sup, ChildSpec) of
160 {ok, _} ->
161 214 ok;
162 {error, {already_started, _}} ->
163
:-(
ok;
164 Other ->
165
:-(
?LOG_ERROR(#{what => pool_sup_start_failed, pool_type => Type, reason => Other}),
166
:-(
Other
167 end.
168
169 start_or_schedule_another_restart(PoolKey, Pool, State) ->
170
:-(
case try_starting(PoolKey, Pool, State) of
171 {ok, NewState} ->
172
:-(
{noreply, NewState};
173 _ ->
174
:-(
{noreply, do_schedule_restart(PoolKey, Pool, State)}
175 end.
176
177 try_starting({Type, Host, Tag} = PoolKey,
178 #{wpool_opts := WpoolOpts, conn_opts := ConnOpts} = Pool,
179 #state{pools = Pools, monitors = Monitors} = State) ->
180
:-(
case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of
181 {_, Pid} when is_pid(Pid) ->
182
:-(
Ref = erlang:monitor(process, Pid),
183
:-(
NewMonitors = Monitors#{Ref => PoolKey},
184
:-(
NewPools = Pools#{PoolKey => Pool#{monitor := Ref}},
185
186
:-(
{ok, State#state{pools = NewPools, monitors = NewMonitors}};
187 Other ->
188
:-(
?LOG_WARNING(#{what => pool_restart_failed, pool_key => PoolKey,
189
:-(
reason => Other}),
190
:-(
Other
191 end.
192
193
194 restart_pool(Reason, PoolKey, #state{pools = Pools} = State) ->
195
:-(
?LOG_ERROR(#{what => pool_restart, pool_key => PoolKey, reason => Reason}),
196
:-(
case maps:get(PoolKey, Pools, undefined) of
197 undefined ->
198
:-(
?LOG_WARNING(#{what => restart_unknown_pool_failed,
199 text => <<"Pool failed to restart - pool name is unknown">>,
200
:-(
pool_key => PoolKey}),
201
:-(
State;
202 Pool ->
203
:-(
do_schedule_restart(PoolKey, Pool, State)
204 end.
205
206 do_schedule_restart(PoolKey, Pool, #state{pools = Pools} = State) ->
207
:-(
timer:send_after(timer:seconds(2), {restart, PoolKey}),
208
:-(
NewPool = Pool#{monitor := undefined},
209
:-(
State#state{pools = Pools#{PoolKey := NewPool}}.
210
211 maybe_stop_pool(_, #{monitor := undefined}, Monitors) ->
212
:-(
{ok, Monitors};
213 maybe_stop_pool({Type, Host, Tag} = Key, #{monitor := Monitor}, Monitors) ->
214 311 erlang:demonitor(Monitor),
215 311 SupName = mongoose_wpool_type_sup:name(Type),
216 311 PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag),
217 311 NewMonitors = maps:remove(Monitor, Monitors),
218 311 case supervisor:terminate_child(SupName, PoolName) of
219 ok ->
220 311 {ok, NewMonitors};
221 Other ->
222
:-(
?LOG_WARNING(#{what => pool_stop_failed, pool_key => Key, reason => Other}),
223
:-(
{Other, NewMonitors}
224 end.
Line Hits Source