./ct_report/coverage/mongoose_tcp_listener.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mongoose_tcp_listener).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -include("mongoose.hrl").
21
22 -behaviour(supervisor).
23
24 %% We do not block on send anymore.
25 -define(TCP_SEND_TIMEOUT, 15000).
26
27 -export([start_listener/1, start_link/1, init/1]).
28
29 %% Internal
30 -export([start_accept_loop/3, accept_loop/3]).
31
32 -ignore_xref([start_link/1, start_accept_loop/3]).
33
34 -type options() :: #{module := module(),
35 port := inet:port_number(),
36 ip_tuple := inet:ip_address(),
37 ip_address := string(),
38 ip_version := 4 | 6,
39 proto := tcp,
40 num_acceptors := pos_integer(),
41 backlog := non_neg_integer(),
42 proxy_protocol := boolean(),
43 atom() => any()}.
44
45 -type connection_details() :: #{
46 proxy := boolean(),
47 version => 1 | 2,
48 src_address := inet:ip_address() | binary(),
49 src_port := inet:port_number(),
50 dest_address := inet:ip_address() | binary(),
51 dest_port := inet:port_number()
52 }.
53 -export_type([options/0, connection_details/0]).
54
55 %%--------------------------------------------------------------------
56 %% API
57 %%--------------------------------------------------------------------
58
59 -spec start_listener(options()) -> ok.
60 start_listener(Opts = #{proto := tcp}) ->
61 434 ListenerId = mongoose_listener_config:listener_id(Opts),
62 434 mongoose_listener_sup:start_child(listener_child_spec(ListenerId, Opts)).
63
64 listener_child_spec(ListenerId, Opts) ->
65 434 #{id => ListenerId,
66 start => {?MODULE, start_link, [Opts]},
67 restart => permanent,
68 shutdown => 1000,
69 type => supervisor,
70 modules => [?MODULE]}.
71
72 -spec start_link(options()) -> any().
73 start_link(Opts) ->
74 434 supervisor:start_link(?MODULE, Opts).
75
76 -spec init(options()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
77 init(Opts = #{module := Module, num_acceptors := NumAcceptors}) ->
78 434 try
79 434 ListenSocket = listen_tcp(Opts),
80 434 Id = mongoose_listener_config:listener_id(Opts),
81 434 Children = [make_childspec({Id, I}, ListenSocket, Module, Opts)
82 434 || I <- lists:seq(1, NumAcceptors)],
83 434 {ok, {#{strategy => one_for_one, intensity => 100, period => 1}, Children}}
84 catch
85
:-(
Error -> exit(Error)
86 end.
87
88 %%--------------------------------------------------------------------
89 %% Helpers
90 %%--------------------------------------------------------------------
91
92 -spec start_accept_loop(Socket :: port(),
93 Module :: module(),
94 Opts :: options()) -> {ok, pid()}.
95 start_accept_loop(ListenSock, Module, Opts) ->
96 43401 Pid = proc_lib:spawn_link(?MODULE, accept_loop, [ListenSock, Module, Opts]),
97 43401 {ok, Pid}.
98
99 -spec accept_loop(Socket :: port(),
100 Module :: module(),
101 Opts :: options()) -> no_return().
102 accept_loop(ListenSocket, Module, Opts = #{proxy_protocol := ProxyProtocol}) ->
103 47999 case do_accept(ListenSocket, ProxyProtocol) of
104 {ok, Socket, ConnectionDetails} ->
105 4520 ?LOG_INFO(#{what => tcp_accepted,
106 socket => Socket, handler_module => Module,
107 4520 conn_details => ConnectionDetails}),
108 4520 ejabberd_socket:start(
109 Module, gen_tcp, Socket, Opts, ConnectionDetails),
110 4520 ?MODULE:accept_loop(ListenSocket, Module, Opts);
111 {error, Reason} ->
112
:-(
?LOG_INFO(#{what => tcp_accept_failed,
113 listen_socket => ListenSocket,
114
:-(
reason => Reason, handler_module => Module}),
115
:-(
?MODULE:accept_loop(ListenSocket, Module, Opts)
116 end.
117
118 -spec do_accept(gen_tcp:socket(), boolean()) ->
119 {ok, gen_tcp:socket(), connection_details()} | {error, term()}.
120 do_accept(ListenSocket, ProxyProtocol) ->
121 47999 case gen_tcp:accept(ListenSocket) of
122 {ok, Socket} when ProxyProtocol ->
123 2 read_proxy_header(Socket);
124 {ok, Socket} ->
125 4519 {ok, {DestAddr, DestPort}} = inet:sockname(Socket),
126 4519 {ok, {SrcAddr, SrcPort}} = inet:peername(Socket),
127 4519 {ok, Socket, #{proxy => false,
128 src_address => SrcAddr,
129 src_port => SrcPort,
130 dest_address => DestAddr,
131 dest_port => DestPort}};
132 Other ->
133
:-(
Other
134 end.
135
136 -spec read_proxy_header(gen_tcp:socket()) -> {ok, gen_tcp:socket(), connection_details()}.
137 read_proxy_header(Socket) ->
138 2 {ok, ProxyInfo} = ranch_tcp:recv_proxy_header(Socket, 1000),
139 1 {ok, Socket, #{proxy => true,
140 src_address => maps:get(src_address, ProxyInfo),
141 src_port => maps:get(src_port, ProxyInfo),
142 dest_address => maps:get(dest_address, ProxyInfo),
143 dest_port => maps:get(dest_port, ProxyInfo),
144 version => maps:get(version, ProxyInfo)
145 }}.
146
147 -spec make_childspec(Id :: term(), ListenSock :: port(),
148 Module :: module(), Opts :: options()) ->
149 supervisor:child_spec().
150 make_childspec(Id, ListenSock, Module, Opts) ->
151 43400 #{id => Id,
152 start => {?MODULE, start_accept_loop, [ListenSock, Module, Opts]},
153 restart => permanent,
154 shutdown => brutal_kill,
155 type => worker,
156 modules => [?MODULE]}.
157
158 -spec listen_tcp(options()) -> port().
159 listen_tcp(Opts = #{port := Port}) ->
160 434 SockOpts = prepare_socket_opts(Opts),
161 434 Res = listen_or_retry(Port, SockOpts, 10),
162 434 case Res of
163 {ok, ListenSocket} ->
164 434 ListenSocket;
165 {error, Reason} ->
166
:-(
error(#{what => mongoose_tcp_listener_init_failed,
167 reason => Reason,
168 text => inet:format_error(Reason),
169 options => Opts})
170 end.
171
172 prepare_socket_opts(#{ip_version := IPVersion, ip_tuple := IPTuple, backlog := Backlog}) ->
173 434 [binary,
174 {packet, 0},
175 {active, false},
176 {reuseaddr, true},
177 {nodelay, true},
178 {send_timeout, ?TCP_SEND_TIMEOUT},
179 {keepalive, true},
180 {send_timeout_close, true},
181 mongoose_listener_config:address_family(IPVersion),
182 {ip, IPTuple},
183 {backlog, Backlog}].
184
185 %% Process exit and socket release are not transactional
186 %% So, there can be a short period of time when we can't bind
187 listen_or_retry(Port, SockOpts, Retries) ->
188 434 case gen_tcp:listen(Port, SockOpts) of
189 {ok, ListenSocket} ->
190 434 {ok, ListenSocket};
191 {error, eaddrinuse} when Retries > 0 ->
192
:-(
timer:sleep(100),
193
:-(
listen_or_retry(Port, SockOpts, Retries-1);
194 {error, Reason} ->
195
:-(
{error, Reason}
196 end.
Line Hits Source