1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
%%============================================================================== |
17 |
|
%% @doc |
18 |
|
%% This gen_server is responsible for starting and restarting pools of |
19 |
|
%% given type |
20 |
|
%%============================================================================== |
21 |
|
-module(mongoose_wpool_mgr). |
22 |
|
|
23 |
|
-behaviour(gen_server). |
24 |
|
|
25 |
|
%% API |
26 |
|
-export([start_link/1]). |
27 |
|
-export([name/1]). |
28 |
|
-export([start/5]). |
29 |
|
-export([stop/3]). |
30 |
|
|
31 |
|
%% gen_server callbacks |
32 |
|
-export([init/1, |
33 |
|
handle_call/3, |
34 |
|
handle_cast/2, |
35 |
|
handle_info/2, |
36 |
|
terminate/2, |
37 |
|
code_change/3]). |
38 |
|
|
39 |
|
-ignore_xref([start_link/1]). |
40 |
|
|
41 |
|
-include("mongoose.hrl"). |
42 |
|
-include("mongoose_logger.hrl"). |
43 |
|
|
44 |
|
-record(state, {type, pools, monitors}). |
45 |
|
|
46 |
|
-type start_request() :: {start_pool, |
47 |
|
mongoose_wpool:scope(), mongoose_wpool:tag(), |
48 |
|
[any()], [any()]}. |
49 |
|
|
50 |
|
-type stop_request() :: {stop_pool, |
51 |
|
mongoose_wpool:scope(), mongoose_wpool:tag()}. |
52 |
|
|
53 |
|
-type request() :: start_request() | stop_request(). |
54 |
|
|
55 |
|
-type monitored_pool() :: {mongoose_wpool:pool_type(), mongoose_wpool:scope(), mongoose_wpool:tag()}. |
56 |
|
|
57 |
|
-type known_pools() :: #{monitored_pool() := #{monitor := undefined | reference(), |
58 |
|
wpool_opts := [wpool:option()], |
59 |
|
conn_opts := [term()]}}. |
60 |
|
|
61 |
|
-type state() :: #state{type :: atom(), |
62 |
|
pools :: known_pools(), |
63 |
|
monitors :: #{reference() := monitored_pool()}}. |
64 |
|
-type reply() :: ok. |
65 |
|
|
66 |
|
%%%=================================================================== |
67 |
|
%%% API |
68 |
|
%%%=================================================================== |
69 |
|
|
70 |
|
start_link(Type) -> |
71 |
163 |
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). |
72 |
|
|
73 |
|
start(Type, Host, Tag, PoolOpts, ConnOpts) -> |
74 |
294 |
ok = ensure_started(Type), |
75 |
294 |
gen_server:call(name(Type), {start_pool, Host, Tag, PoolOpts, ConnOpts}). |
76 |
|
|
77 |
|
stop(Type, Host, Tag) -> |
78 |
281 |
gen_server:call(name(Type), {stop_pool, Host, Tag}). |
79 |
|
|
80 |
|
-spec name(mongoose_wpool:pool_type()) -> mongoose_wpool:proc_name(). |
81 |
|
name(Type) -> |
82 |
1195 |
list_to_atom("mongoose_wpool_" ++ atom_to_list(Type) ++ "_mgr"). |
83 |
|
%%%=================================================================== |
84 |
|
%%% gen_server callbacks |
85 |
|
%%%=================================================================== |
86 |
|
|
87 |
|
-spec init([any()]) -> {ok, state()}. |
88 |
|
init([Type]) -> |
89 |
163 |
{ok, #state{type = Type, pools = #{}, monitors = #{}}}. |
90 |
|
|
91 |
|
-spec handle_call(request(), {pid(), term()}, state()) -> |
92 |
|
{reply, reply(), state()}. |
93 |
|
handle_call({start_pool, Host, Tag, WpoolOpts, ConnOpts}, _From, |
94 |
|
#state{type = Type, pools = Pools, monitors = Monitors} = State) -> |
95 |
294 |
?LOG_INFO(#{what => pool_starting, pool_type => Type, tag => Tag, server => Host, |
96 |
294 |
pool_opts => WpoolOpts}), |
97 |
294 |
case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of |
98 |
|
{_, Pid} = OkReply when is_pid(Pid) -> |
99 |
293 |
Ref = erlang:monitor(process, Pid), |
100 |
293 |
Key = {Type, Host, Tag}, |
101 |
293 |
NewMonitors = Monitors#{Ref => Key}, |
102 |
293 |
NewPools = Pools#{Key => #{monitor => Ref, |
103 |
|
wpool_opts => WpoolOpts, |
104 |
|
conn_opts => ConnOpts}}, |
105 |
293 |
{reply, OkReply, State#state{pools = NewPools, monitors = NewMonitors}}; |
106 |
|
Other -> |
107 |
1 |
?LOG_ERROR(#{what => pool_start_failed, |
108 |
:-( |
pool_type => Type, server => Host, reason => Other}), |
109 |
1 |
{reply, Other, State} |
110 |
|
end; |
111 |
|
handle_call({stop_pool, Host, Tag}, _From, |
112 |
|
#state{type = Type, pools = Pools, monitors = Monitors} = State) -> |
113 |
281 |
Key = {Type, Host, Tag}, |
114 |
281 |
case maps:take(Key, Pools) of |
115 |
|
error -> |
116 |
:-( |
{reply, {error, unknown_pool}, State}; |
117 |
|
{Pool, NewPools} -> |
118 |
281 |
{Result, NewMonitors} = maybe_stop_pool(Key, Pool, Monitors), |
119 |
281 |
{reply, Result, State#state{pools = NewPools, monitors = NewMonitors}} |
120 |
|
end. |
121 |
|
|
122 |
|
handle_cast(_Msg, State) -> |
123 |
:-( |
{noreply, State}. |
124 |
|
|
125 |
|
handle_info({'DOWN', MRef, process, _Pid, Reason}, #state{monitors = Monitors} = State) -> |
126 |
:-( |
case maps:take(MRef, Monitors) of |
127 |
|
error -> |
128 |
:-( |
{noreply, State}; |
129 |
|
{Details, NewMonitors0} -> |
130 |
:-( |
NewState = restart_pool(Reason, Details, State#state{monitors = NewMonitors0}), |
131 |
:-( |
{noreply, NewState} |
132 |
|
end; |
133 |
|
handle_info({restart, PoolKey}, #state{pools = Pools} = State) -> |
134 |
:-( |
case maps:get(PoolKey, Pools, undefined) of |
135 |
|
undefined -> %% The pool was stopped in the meantime, no need to restart it |
136 |
:-( |
{noreply, State}; |
137 |
|
Pool -> |
138 |
:-( |
start_or_schedule_another_restart(PoolKey, Pool, State) |
139 |
|
end. |
140 |
|
|
141 |
|
terminate(_Reason, _State) -> |
142 |
:-( |
ok. |
143 |
|
|
144 |
|
code_change(_OldVsn, State, _Extra) -> |
145 |
:-( |
{ok, State}. |
146 |
|
|
147 |
|
%%%=================================================================== |
148 |
|
%%% Internal functions |
149 |
|
%%%=================================================================== |
150 |
|
ensure_started(Type) -> |
151 |
294 |
Name = name(Type), |
152 |
294 |
case whereis(Name) of |
153 |
|
undefined -> |
154 |
163 |
do_start_type_sup(Type); |
155 |
|
_ -> |
156 |
131 |
ok |
157 |
|
end. |
158 |
|
do_start_type_sup(Type) -> |
159 |
163 |
ChildSpec = mongoose_wpool_sup:child_spec(Type), |
160 |
163 |
case supervisor:start_child(mongoose_wpool_sup, ChildSpec) of |
161 |
|
{ok, _} -> |
162 |
163 |
ok; |
163 |
|
{error, {already_started, _}} -> |
164 |
:-( |
ok; |
165 |
|
Other -> |
166 |
:-( |
?LOG_ERROR(#{what => pool_sup_start_failed, pool_type => Type, reason => Other}), |
167 |
:-( |
Other |
168 |
|
end. |
169 |
|
|
170 |
|
start_or_schedule_another_restart(PoolKey, Pool, State) -> |
171 |
:-( |
case try_starting(PoolKey, Pool, State) of |
172 |
|
{ok, NewState} -> |
173 |
:-( |
{noreply, NewState}; |
174 |
|
_ -> |
175 |
:-( |
{noreply, do_schedule_restart(PoolKey, Pool, State)} |
176 |
|
end. |
177 |
|
|
178 |
|
try_starting({Type, Host, Tag} = PoolKey, |
179 |
|
#{wpool_opts := WpoolOpts, conn_opts := ConnOpts} = Pool, |
180 |
|
#state{pools = Pools, monitors = Monitors} = State) -> |
181 |
:-( |
case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of |
182 |
|
{_, Pid} when is_pid(Pid) -> |
183 |
:-( |
Ref = erlang:monitor(process, Pid), |
184 |
:-( |
NewMonitors = Monitors#{Ref => PoolKey}, |
185 |
:-( |
NewPools = Pools#{PoolKey => Pool#{monitor := Ref}}, |
186 |
|
|
187 |
:-( |
{ok, State#state{pools = NewPools, monitors = NewMonitors}}; |
188 |
|
Other -> |
189 |
:-( |
?LOG_WARNING(#{what => pool_restart_failed, pool_key => PoolKey, |
190 |
:-( |
reason => Other}), |
191 |
:-( |
Other |
192 |
|
end. |
193 |
|
|
194 |
|
|
195 |
|
restart_pool(Reason, PoolKey, #state{pools = Pools} = State) -> |
196 |
:-( |
?LOG_ERROR(#{what => pool_restart, pool_key => PoolKey, reason => Reason}), |
197 |
:-( |
case maps:get(PoolKey, Pools, undefined) of |
198 |
|
undefined -> |
199 |
:-( |
?LOG_WARNING(#{what => restart_unknown_pool_failed, |
200 |
|
text => <<"Pool failed to restart - pool name is unknown">>, |
201 |
:-( |
pool_key => PoolKey}), |
202 |
:-( |
State; |
203 |
|
Pool -> |
204 |
:-( |
do_schedule_restart(PoolKey, Pool, State) |
205 |
|
end. |
206 |
|
|
207 |
|
do_schedule_restart(PoolKey, Pool, #state{pools = Pools} = State) -> |
208 |
:-( |
timer:send_after(timer:seconds(2), {restart, PoolKey}), |
209 |
:-( |
NewPool = Pool#{monitor := undefined}, |
210 |
:-( |
State#state{pools = Pools#{PoolKey := NewPool}}. |
211 |
|
|
212 |
|
maybe_stop_pool(_, #{monitor := undefined}, Monitors) -> |
213 |
:-( |
{ok, Monitors}; |
214 |
|
maybe_stop_pool({Type, Host, Tag} = Key, #{monitor := Monitor}, Monitors) -> |
215 |
281 |
erlang:demonitor(Monitor), |
216 |
281 |
SupName = mongoose_wpool_type_sup:name(Type), |
217 |
281 |
PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag), |
218 |
281 |
NewMonitors = maps:remove(Monitor, Monitors), |
219 |
281 |
case supervisor:terminate_child(SupName, PoolName) of |
220 |
|
ok -> |
221 |
281 |
{ok, NewMonitors}; |
222 |
|
Other -> |
223 |
:-( |
?LOG_WARNING(#{what => pool_stop_failed, pool_key => Key, reason => Other}), |
224 |
:-( |
{Other, NewMonitors} |
225 |
|
end. |