./ct_report/coverage/mongoose_wpool_mgr.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 %%==============================================================================
17 %% @doc
18 %% This gen_server is responsible for starting and restarting pools of
19 %% given type
20 %%==============================================================================
21 -module(mongoose_wpool_mgr).
22
23 -behaviour(gen_server).
24
25 %% API
26 -export([start_link/1]).
27 -export([name/1]).
28 -export([start/5]).
29 -export([stop/3]).
30
31 %% gen_server callbacks
32 -export([init/1,
33 handle_call/3,
34 handle_cast/2,
35 handle_info/2,
36 terminate/2,
37 code_change/3]).
38
39 -ignore_xref([start_link/1]).
40
41 -include("mongoose.hrl").
42 -include("mongoose_logger.hrl").
43
44 -record(state, {type, pools, monitors}).
45
46 -type start_request() :: {start_pool,
47 mongoose_wpool:scope(), mongoose_wpool:tag(),
48 [any()], [any()]}.
49
50 -type stop_request() :: {stop_pool,
51 mongoose_wpool:scope(), mongoose_wpool:tag()}.
52
53 -type request() :: start_request() | stop_request().
54
55 -type monitored_pool() :: {mongoose_wpool:pool_type(), mongoose_wpool:scope(), mongoose_wpool:tag()}.
56
57 -type known_pools() :: #{monitored_pool() := #{monitor := undefined | reference(),
58 wpool_opts := [wpool:option()],
59 conn_opts := [term()]}}.
60
61 -type state() :: #state{type :: atom(),
62 pools :: known_pools(),
63 monitors :: #{reference() := monitored_pool()}}.
64 -type reply() :: ok.
65
66 %%%===================================================================
67 %%% API
68 %%%===================================================================
69
70 start_link(Type) ->
71 83 gen_server:start_link({local, name(Type)}, ?MODULE, [Type], []).
72
73 start(Type, Host, Tag, PoolOpts, ConnOpts) ->
74 135 ok = ensure_started(Type),
75 135 gen_server:call(name(Type), {start_pool, Host, Tag, PoolOpts, ConnOpts}).
76
77 stop(Type, Host, Tag) ->
78 131 gen_server:call(name(Type), {stop_pool, Host, Tag}).
79
80 -spec name(mongoose_wpool:pool_type()) -> mongoose_wpool:proc_name().
81 name(Type) ->
82 567 list_to_atom("mongoose_wpool_" ++ atom_to_list(Type) ++ "_mgr").
83 %%%===================================================================
84 %%% gen_server callbacks
85 %%%===================================================================
86
87 -spec init([any()]) -> {ok, state()}.
88 init([Type]) ->
89 83 {ok, #state{type = Type, pools = #{}, monitors = #{}}}.
90
91 -spec handle_call(request(), {pid(), term()}, state()) ->
92 {reply, reply(), state()}.
93 handle_call({start_pool, Host, Tag, WpoolOpts, ConnOpts}, _From,
94 #state{type = Type, pools = Pools, monitors = Monitors} = State) ->
95 135 ?LOG_INFO(#{what => pool_starting, pool_type => Type, tag => Tag, server => Host,
96 135 pool_opts => WpoolOpts}),
97 135 case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of
98 {_, Pid} = OkReply when is_pid(Pid) ->
99 135 Ref = erlang:monitor(process, Pid),
100 135 Key = {Type, Host, Tag},
101 135 NewMonitors = Monitors#{Ref => Key},
102 135 NewPools = Pools#{Key => #{monitor => Ref,
103 wpool_opts => WpoolOpts,
104 conn_opts => ConnOpts}},
105 135 {reply, OkReply, State#state{pools = NewPools, monitors = NewMonitors}};
106 Other ->
107
:-(
?LOG_ERROR(#{what => pool_start_failed,
108
:-(
pool_type => Type, server => Host, reason => Other}),
109
:-(
{reply, Other, State}
110 end;
111 handle_call({stop_pool, Host, Tag}, _From,
112 #state{type = Type, pools = Pools, monitors = Monitors} = State) ->
113 131 Key = {Type, Host, Tag},
114 131 case maps:take(Key, Pools) of
115 error ->
116
:-(
{reply, {error, unknown_pool}, State};
117 {Pool, NewPools} ->
118 131 {Result, NewMonitors} = maybe_stop_pool(Key, Pool, Monitors),
119 131 {reply, Result, State#state{pools = NewPools, monitors = NewMonitors}}
120 end.
121
122 handle_cast(_Msg, State) ->
123
:-(
{noreply, State}.
124
125 handle_info({'DOWN', MRef, process, _Pid, Reason}, #state{monitors = Monitors} = State) ->
126
:-(
case maps:take(MRef, Monitors) of
127 error ->
128
:-(
{noreply, State};
129 {Details, NewMonitors0} ->
130
:-(
NewState = restart_pool(Reason, Details, State#state{monitors = NewMonitors0}),
131
:-(
{noreply, NewState}
132 end;
133 handle_info({restart, PoolKey}, #state{pools = Pools, monitors = Monitors} = State) ->
134
:-(
case maps:get(PoolKey, Pools, undefined) of
135 undefined -> %% The pool was stopped in the meantime, no need to restart it
136
:-(
{noreply, State};
137 Pool ->
138
:-(
start_or_schedule_another_restart(PoolKey, Pool, State)
139 end.
140
141
142
143 terminate(_Reason, _State) ->
144
:-(
ok.
145
146 code_change(_OldVsn, State, _Extra) ->
147
:-(
{ok, State}.
148
149 %%%===================================================================
150 %%% Internal functions
151 %%%===================================================================
152 ensure_started(Type) ->
153 135 Name = name(Type),
154 135 case whereis(Name) of
155 undefined ->
156 83 do_start_type_sup(Type);
157 _ ->
158 52 ok
159 end.
160 do_start_type_sup(Type) ->
161 83 ChildSpec = mongoose_wpool_sup:child_spec(Type),
162 83 case supervisor:start_child(mongoose_wpool_sup, ChildSpec) of
163 {ok, _} ->
164 83 ok;
165 {error, {already_started, _}} ->
166
:-(
ok;
167 Other ->
168
:-(
?LOG_ERROR(#{what => pool_sup_start_failed, pool_type => Type, reason => Other}),
169
:-(
Other
170 end.
171
172 start_or_schedule_another_restart(PoolKey, Pool, State) ->
173
:-(
case try_starting(PoolKey, Pool, State) of
174 {ok, NewState} ->
175
:-(
{noreply, NewState};
176 _ ->
177
:-(
{noreply, do_schedule_restart(PoolKey, Pool, State)}
178 end.
179
180 try_starting({Type, Host, Tag} = PoolKey,
181 #{wpool_opts := WpoolOpts, conn_opts := ConnOpts} = Pool,
182 #state{pools = Pools, monitors = Monitors} = State) ->
183
:-(
case mongoose_wpool:call_start_callback(Type, [Host, Tag, WpoolOpts, ConnOpts]) of
184 {_, Pid} when is_pid(Pid) ->
185
:-(
Ref = erlang:monitor(process, Pid),
186
:-(
NewMonitors = Monitors#{Ref => PoolKey},
187
:-(
NewPools = Pools#{PoolKey => Pool#{monitor := Ref}},
188
189
:-(
{ok, State#state{pools = NewPools, monitors = NewMonitors}};
190 Other ->
191
:-(
?LOG_WARNING(#{what => pool_restart_failed, pool_key => PoolKey,
192
:-(
reason => Other}),
193
:-(
Other
194 end.
195
196
197 restart_pool(Reason, PoolKey, #state{pools = Pools} = State) ->
198
:-(
?LOG_ERROR(#{what => pool_restart, pool_key => PoolKey, reason => Reason}),
199
:-(
case maps:get(PoolKey, Pools, undefined) of
200 undefined ->
201
:-(
?LOG_WARNING(#{what => restart_unknown_pool_failed,
202 text => <<"Pool failed to restart - pool name is unknown">>,
203
:-(
pool_key => PoolKey}),
204
:-(
State;
205 Pool ->
206
:-(
do_schedule_restart(PoolKey, Pool, State)
207 end.
208
209 do_schedule_restart(PoolKey, Pool, #state{pools = Pools} = State) ->
210
:-(
timer:send_after(timer:seconds(2), {restart, PoolKey}),
211
:-(
NewPool = Pool#{monitor := undefined},
212
:-(
State#state{pools = Pools#{PoolKey := NewPool}}.
213
214 maybe_stop_pool(_, #{monitor := undefined}, Monitors) ->
215
:-(
{ok, Monitors};
216 maybe_stop_pool({Type, Host, Tag} = Key, #{monitor := Monitor}, Monitors) ->
217 131 erlang:demonitor(Monitor),
218 131 SupName = mongoose_wpool_type_sup:name(Type),
219 131 PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag),
220 131 NewMonitors = maps:remove(Monitor, Monitors),
221 131 case supervisor:terminate_child(SupName, PoolName) of
222 ok ->
223 131 {ok, NewMonitors};
224 Other ->
225
:-(
?LOG_WARNING(#{what => pool_stop_failed, pool_key => Key, reason => Other}),
226
:-(
{Other, NewMonitors}
227 end.
Line Hits Source