./ct_report/coverage/mongoose_tcp_listener.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mongoose_tcp_listener).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -include("mongoose.hrl").
21
22 -behaviour(supervisor).
23
24 %% We do not block on send anymore.
25 -define(TCP_SEND_TIMEOUT, 15000).
26
27 -export([start_listener/1, start_link/1, init/1]).
28
29 %% Internal
30 -export([start_accept_loop/3, accept_loop/3, read_connection_details/2]).
31
32 -ignore_xref([start_link/1, start_accept_loop/3, read_connection_details/2]).
33
34 -type options() :: #{module := module(),
35 port := inet:port_number(),
36 ip_tuple := inet:ip_address(),
37 ip_address := string(),
38 ip_version := 4 | 6,
39 proto := tcp,
40 num_acceptors := pos_integer(),
41 backlog := non_neg_integer(),
42 proxy_protocol := boolean(),
43 atom() => any()}.
44
45 -type connection_details() :: #{
46 proxy := boolean(),
47 version => 1 | 2,
48 src_address := inet:ip_address() | binary(),
49 src_port := inet:port_number(),
50 dest_address := inet:ip_address() | binary(),
51 dest_port := inet:port_number()
52 }.
53 -export_type([options/0, connection_details/0]).
54
55 %%--------------------------------------------------------------------
56 %% API
57 %%--------------------------------------------------------------------
58
59 -spec start_listener(options()) -> ok.
60 start_listener(Opts = #{proto := tcp}) ->
61 193 ListenerId = mongoose_listener_config:listener_id(Opts),
62 193 mongoose_listener_sup:start_child(listener_child_spec(ListenerId, Opts)).
63
64 listener_child_spec(ListenerId, Opts) ->
65 193 #{id => ListenerId,
66 start => {?MODULE, start_link, [Opts]},
67 restart => permanent,
68 shutdown => 1000,
69 type => supervisor,
70 modules => [?MODULE]}.
71
72 -spec start_link(options()) -> any().
73 start_link(Opts) ->
74 193 supervisor:start_link(?MODULE, Opts).
75
76 -spec init(options()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
77 init(Opts = #{module := Module, num_acceptors := NumAcceptors}) ->
78 193 try
79 193 ListenSocket = listen_tcp(Opts),
80 193 Id = mongoose_listener_config:listener_id(Opts),
81 193 Children = [make_childspec({Id, I}, ListenSocket, Module, Opts)
82 193 || I <- lists:seq(1, NumAcceptors)],
83 193 {ok, {#{strategy => one_for_one, intensity => 100, period => 1}, Children}}
84 catch
85
:-(
Error -> exit(Error)
86 end.
87
88 %%--------------------------------------------------------------------
89 %% Helpers
90 %%--------------------------------------------------------------------
91
92 -spec start_accept_loop(Socket :: port(),
93 Module :: module(),
94 Opts :: options()) -> {ok, pid()}.
95 start_accept_loop(ListenSock, Module, Opts) ->
96 19300 Pid = proc_lib:spawn_link(?MODULE, accept_loop, [ListenSock, Module, Opts]),
97 19300 {ok, Pid}.
98
99 -spec accept_loop(Socket :: port(),
100 Module :: module(),
101 Opts :: options()) -> no_return().
102 accept_loop(ListenSocket, Module, Opts) ->
103 19385 case do_accept(ListenSocket, Opts) of
104 {ok, Socket, ConnectionDetails} ->
105 85 ?LOG_INFO(#{what => tcp_accepted,
106 socket => Socket, handler_module => Module,
107 85 conn_details => ConnectionDetails}),
108 85 mongoose_transport:accept(
109 Module, Socket, Opts, ConnectionDetails),
110 85 ?MODULE:accept_loop(ListenSocket, Module, Opts);
111 {error, Reason} ->
112 2 ?LOG_INFO(#{what => tcp_accept_failed,
113 listen_socket => ListenSocket,
114 2 reason => Reason, handler_module => Module}),
115 2 ?MODULE:accept_loop(ListenSocket, Module, Opts)
116 end.
117
118 -spec do_accept(gen_tcp:socket(), options()) ->
119 {ok, gen_tcp:socket(), connection_details()} | {error, term()}.
120 do_accept(ListenSocket, Opts) ->
121 19385 case gen_tcp:accept(ListenSocket) of
122 {ok, Socket} ->
123 87 ?MODULE:read_connection_details(Socket, Opts);
124 Other ->
125
:-(
Other
126 end.
127
128 -spec read_connection_details(gen_tcp:socket(), options()) ->
129 {ok, gen_tcp:socket(), connection_details()} | {error, term()}.
130 read_connection_details(Socket, #{proxy_protocol := true}) ->
131
:-(
read_proxy_header(Socket);
132 read_connection_details(Socket, _Opts) ->
133 85 case {inet:sockname(Socket), inet:peername(Socket)} of
134 {{ok, {DestAddr, DestPort}}, {ok, {SrcAddr, SrcPort}}} ->
135 85 {ok, Socket, #{proxy => false,
136 src_address => SrcAddr,
137 src_port => SrcPort,
138 dest_address => DestAddr,
139 dest_port => DestPort}};
140 Other ->
141
:-(
gen_tcp:close(Socket),
142
:-(
{error, simple_reason(Other)}
143 end.
144
145 simple_reason({{error, Reason}, _}) ->
146
:-(
Reason;
147 simple_reason({_, {error, Reason}}) ->
148
:-(
Reason.
149
150 -spec read_proxy_header(gen_tcp:socket()) ->
151 {ok, gen_tcp:socket(), connection_details()} | {error, term()}.
152 read_proxy_header(Socket) ->
153
:-(
case ranch_tcp:recv_proxy_header(Socket, 1000) of
154 {ok, ProxyInfo} ->
155
:-(
{ok, Socket, #{proxy => true,
156 src_address => maps:get(src_address, ProxyInfo),
157 src_port => maps:get(src_port, ProxyInfo),
158 dest_address => maps:get(dest_address, ProxyInfo),
159 dest_port => maps:get(dest_port, ProxyInfo),
160 version => maps:get(version, ProxyInfo)
161 }};
162 {error, Reason} ->
163
:-(
gen_tcp:close(Socket),
164
:-(
{error, Reason}
165 end.
166
167 -spec make_childspec(Id :: term(), ListenSock :: port(),
168 Module :: module(), Opts :: options()) ->
169 supervisor:child_spec().
170 make_childspec(Id, ListenSock, Module, Opts) ->
171 19300 #{id => Id,
172 start => {?MODULE, start_accept_loop, [ListenSock, Module, Opts]},
173 restart => permanent,
174 shutdown => brutal_kill,
175 type => worker,
176 modules => [?MODULE]}.
177
178 -spec listen_tcp(options()) -> port().
179 listen_tcp(Opts = #{port := Port}) ->
180 193 SockOpts = prepare_socket_opts(Opts),
181 193 Res = listen_or_retry(Port, SockOpts, 10),
182 193 case Res of
183 {ok, ListenSocket} ->
184 193 ListenSocket;
185 {error, Reason} ->
186
:-(
error(#{what => mongoose_tcp_listener_init_failed,
187 reason => Reason,
188 text => inet:format_error(Reason),
189 options => Opts})
190 end.
191
192 prepare_socket_opts(#{ip_version := IPVersion, ip_tuple := IPTuple, backlog := Backlog}) ->
193 193 [binary,
194 {packet, 0},
195 {active, false},
196 {reuseaddr, true},
197 {nodelay, true},
198 {send_timeout, ?TCP_SEND_TIMEOUT},
199 {keepalive, true},
200 {send_timeout_close, true},
201 mongoose_listener_config:address_family(IPVersion),
202 {ip, IPTuple},
203 {backlog, Backlog}].
204
205 %% Process exit and socket release are not transactional
206 %% So, there can be a short period of time when we can't bind
207 listen_or_retry(Port, SockOpts, Retries) ->
208 193 case gen_tcp:listen(Port, SockOpts) of
209 {ok, ListenSocket} ->
210 193 {ok, ListenSocket};
211 {error, eaddrinuse} when Retries > 0 ->
212
:-(
timer:sleep(100),
213
:-(
listen_or_retry(Port, SockOpts, Retries-1);
214 {error, Reason} ->
215
:-(
{error, Reason}
216 end.
Line Hits Source