1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
%%============================================================================== |
17 |
|
%% @doc |
18 |
|
%% This gen_server is responsible for starting and restarting pools of |
19 |
|
%% given type |
20 |
|
%%============================================================================== |
21 |
|
-module(mongoose_wpool_mgr). |
22 |
|
|
23 |
|
-behaviour(gen_server). |
24 |
|
|
25 |
|
%% API |
26 |
|
-export([start_link/1]). |
27 |
|
-export([name/1]). |
28 |
|
-export([start/5]). |
29 |
|
-export([stop/3]). |
30 |
|
|
31 |
|
%% gen_server callbacks |
32 |
|
-export([init/1, |
33 |
|
handle_call/3, |
34 |
|
handle_cast/2, |
35 |
|
handle_info/2, |
36 |
|
terminate/2, |
37 |
|
code_change/3]). |
38 |
|
|
39 |
|
-ignore_xref([start_link/1]). |
40 |
|
|
41 |
|
-include("mongoose_logger.hrl"). |
42 |
|
|
43 |
|
-record(state, {type, pools, monitors}). |
44 |
|
|
45 |
|
-type start_request() :: {start_pool, |
46 |
|
mongoose_wpool:scope(), mongoose_wpool:tag(), |
47 |
|
[any()], [any()]}. |
48 |
|
|
49 |
|
-type stop_request() :: {stop_pool, |
50 |
|
mongoose_wpool:scope(), mongoose_wpool:tag()}. |
51 |
|
|
52 |
|
-type request() :: start_request() | stop_request(). |
53 |
|
|
54 |
|
-type monitored_pool() :: {mongoose_wpool:pool_type(), mongoose_wpool:scope(), mongoose_wpool:tag()}. |
55 |
|
|
56 |
|
-type known_pools() :: #{monitored_pool() := #{monitor := undefined | reference(), |
57 |
|
wpool_opts := [wpool:option()], |
58 |
|
conn_opts := [term()]}}. |
59 |
|
|
60 |
|
-type state() :: #state{type :: atom(), |
61 |
|
pools :: known_pools(), |
62 |
|
monitors :: #{reference() := monitored_pool()}}. |
63 |
|
-type reply() :: ok. |
64 |
|
|
65 |
|
%%%=================================================================== |
66 |
|
%%% API |
67 |
|
%%%=================================================================== |
68 |
|
|
69 |
|
start_link(Type) -> |
70 |
187 |
gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []). |
71 |
|
|
72 |
|
start(Type, Host, Tag, PoolOpts, ConnOpts) -> |
73 |
188 |
ok = ensure_started(Type), |
74 |
188 |
gen_server:call(name(Type), {start_pool, Host, Tag, PoolOpts, ConnOpts}). |
75 |
|
|
76 |
|
stop(Type, Host, Tag) -> |
77 |
184 |
gen_server:call(name(Type), {stop_pool, Host, Tag}). |
78 |
|
|
79 |
|
-spec name(mongoose_wpool:pool_type()) -> mongoose_wpool:proc_name(). |
80 |
|
name(Type) -> |
81 |
934 |
list_to_atom("mongoose_wpool_" ++ atom_to_list(Type) ++ "_mgr"). |
82 |
|
%%%=================================================================== |
83 |
|
%%% gen_server callbacks |
84 |
|
%%%=================================================================== |
85 |
|
|
86 |
|
-spec init([any()]) -> {ok, state()}. |
87 |
|
init([Type]) -> |
88 |
187 |
{ok, #state{type = Type, pools = #{}, monitors = #{}}}. |
89 |
|
|
90 |
|
-spec handle_call(request(), {pid(), term()}, state()) -> |
91 |
|
{reply, reply(), state()}. |
92 |
|
handle_call({start_pool, Host, Tag, WpoolOpts, ConnOpts}, _From, |
93 |
|
#state{type = Type, pools = Pools, monitors = Monitors} = State) -> |
94 |
188 |
?LOG_INFO(#{what => pool_starting, pool_type => Type, tag => Tag, server => Host, |
95 |
188 |
pool_opts => WpoolOpts}), |
96 |
188 |
case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of |
97 |
|
{_, Pid} = OkReply when is_pid(Pid) -> |
98 |
188 |
Ref = erlang:monitor(process, Pid), |
99 |
188 |
Key = {Type, Host, Tag}, |
100 |
188 |
NewMonitors = Monitors#{Ref => Key}, |
101 |
188 |
NewPools = Pools#{Key => #{monitor => Ref, |
102 |
|
wpool_opts => WpoolOpts, |
103 |
|
conn_opts => ConnOpts}}, |
104 |
188 |
{reply, OkReply, State#state{pools = NewPools, monitors = NewMonitors}}; |
105 |
|
Other -> |
106 |
:-( |
?LOG_ERROR(#{what => pool_start_failed, |
107 |
:-( |
pool_type => Type, server => Host, reason => Other}), |
108 |
:-( |
{reply, Other, State} |
109 |
|
end; |
110 |
|
handle_call({stop_pool, Host, Tag}, _From, |
111 |
|
#state{type = Type, pools = Pools, monitors = Monitors} = State) -> |
112 |
184 |
Key = {Type, Host, Tag}, |
113 |
184 |
case maps:take(Key, Pools) of |
114 |
|
error -> |
115 |
:-( |
{reply, {error, unknown_pool}, State}; |
116 |
|
{Pool, NewPools} -> |
117 |
184 |
{Result, NewMonitors} = maybe_stop_pool(Key, Pool, Monitors), |
118 |
184 |
{reply, Result, State#state{pools = NewPools, monitors = NewMonitors}} |
119 |
|
end. |
120 |
|
|
121 |
|
handle_cast(_Msg, State) -> |
122 |
:-( |
{noreply, State}. |
123 |
|
|
124 |
|
handle_info({'DOWN', MRef, process, _Pid, Reason}, #state{monitors = Monitors} = State) -> |
125 |
:-( |
case maps:take(MRef, Monitors) of |
126 |
|
error -> |
127 |
:-( |
{noreply, State}; |
128 |
|
{Details, NewMonitors0} -> |
129 |
:-( |
NewState = restart_pool(Reason, Details, State#state{monitors = NewMonitors0}), |
130 |
:-( |
{noreply, NewState} |
131 |
|
end; |
132 |
|
handle_info({restart, PoolKey}, #state{pools = Pools} = State) -> |
133 |
:-( |
case maps:get(PoolKey, Pools, undefined) of |
134 |
|
undefined -> %% The pool was stopped in the meantime, no need to restart it |
135 |
:-( |
{noreply, State}; |
136 |
|
Pool -> |
137 |
:-( |
start_or_schedule_another_restart(PoolKey, Pool, State) |
138 |
|
end. |
139 |
|
|
140 |
|
terminate(_Reason, _State) -> |
141 |
:-( |
ok. |
142 |
|
|
143 |
|
code_change(_OldVsn, State, _Extra) -> |
144 |
:-( |
{ok, State}. |
145 |
|
|
146 |
|
%%%=================================================================== |
147 |
|
%%% Internal functions |
148 |
|
%%%=================================================================== |
149 |
|
ensure_started(Type) -> |
150 |
188 |
Name = name(Type), |
151 |
188 |
case whereis(Name) of |
152 |
|
undefined -> |
153 |
187 |
do_start_type_sup(Type); |
154 |
|
_ -> |
155 |
1 |
ok |
156 |
|
end. |
157 |
|
do_start_type_sup(Type) -> |
158 |
187 |
ChildSpec = mongoose_wpool_sup:child_spec(Type), |
159 |
187 |
case supervisor:start_child(mongoose_wpool_sup, ChildSpec) of |
160 |
|
{ok, _} -> |
161 |
187 |
ok; |
162 |
|
{error, {already_started, _}} -> |
163 |
:-( |
ok; |
164 |
|
Other -> |
165 |
:-( |
?LOG_ERROR(#{what => pool_sup_start_failed, pool_type => Type, reason => Other}), |
166 |
:-( |
Other |
167 |
|
end. |
168 |
|
|
169 |
|
start_or_schedule_another_restart(PoolKey, Pool, State) -> |
170 |
:-( |
case try_starting(PoolKey, Pool, State) of |
171 |
|
{ok, NewState} -> |
172 |
:-( |
{noreply, NewState}; |
173 |
|
_ -> |
174 |
:-( |
{noreply, do_schedule_restart(PoolKey, Pool, State)} |
175 |
|
end. |
176 |
|
|
177 |
|
try_starting({Type, Host, Tag} = PoolKey, |
178 |
|
#{wpool_opts := WpoolOpts, conn_opts := ConnOpts} = Pool, |
179 |
|
#state{pools = Pools, monitors = Monitors} = State) -> |
180 |
:-( |
case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of |
181 |
|
{_, Pid} when is_pid(Pid) -> |
182 |
:-( |
Ref = erlang:monitor(process, Pid), |
183 |
:-( |
NewMonitors = Monitors#{Ref => PoolKey}, |
184 |
:-( |
NewPools = Pools#{PoolKey => Pool#{monitor := Ref}}, |
185 |
|
|
186 |
:-( |
{ok, State#state{pools = NewPools, monitors = NewMonitors}}; |
187 |
|
Other -> |
188 |
:-( |
?LOG_WARNING(#{what => pool_restart_failed, pool_key => PoolKey, |
189 |
:-( |
reason => Other}), |
190 |
:-( |
Other |
191 |
|
end. |
192 |
|
|
193 |
|
|
194 |
|
restart_pool(Reason, PoolKey, #state{pools = Pools} = State) -> |
195 |
:-( |
?LOG_ERROR(#{what => pool_restart, pool_key => PoolKey, reason => Reason}), |
196 |
:-( |
case maps:get(PoolKey, Pools, undefined) of |
197 |
|
undefined -> |
198 |
:-( |
?LOG_WARNING(#{what => restart_unknown_pool_failed, |
199 |
|
text => <<"Pool failed to restart - pool name is unknown">>, |
200 |
:-( |
pool_key => PoolKey}), |
201 |
:-( |
State; |
202 |
|
Pool -> |
203 |
:-( |
do_schedule_restart(PoolKey, Pool, State) |
204 |
|
end. |
205 |
|
|
206 |
|
do_schedule_restart(PoolKey, Pool, #state{pools = Pools} = State) -> |
207 |
:-( |
timer:send_after(timer:seconds(2), {restart, PoolKey}), |
208 |
:-( |
NewPool = Pool#{monitor := undefined}, |
209 |
:-( |
State#state{pools = Pools#{PoolKey := NewPool}}. |
210 |
|
|
211 |
|
maybe_stop_pool(_, #{monitor := undefined}, Monitors) -> |
212 |
:-( |
{ok, Monitors}; |
213 |
|
maybe_stop_pool({Type, Host, Tag} = Key, #{monitor := Monitor}, Monitors) -> |
214 |
184 |
erlang:demonitor(Monitor), |
215 |
184 |
SupName = mongoose_wpool_type_sup:name(Type), |
216 |
184 |
PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag), |
217 |
184 |
NewMonitors = maps:remove(Monitor, Monitors), |
218 |
184 |
case supervisor:terminate_child(SupName, PoolName) of |
219 |
|
ok -> |
220 |
184 |
{ok, NewMonitors}; |
221 |
|
Other -> |
222 |
:-( |
?LOG_WARNING(#{what => pool_stop_failed, pool_key => Key, reason => Other}), |
223 |
:-( |
{Other, NewMonitors} |
224 |
|
end. |