./ct_report/coverage/mongoose_tcp_listener.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mongoose_tcp_listener).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -include("mongoose.hrl").
21
22 -behaviour(supervisor).
23
24 %% We do not block on send anymore.
25 -define(TCP_SEND_TIMEOUT, 15000).
26
27 -export([start_link/6, init/1]).
28
29 %% Internal
30 -export([start_accept_loop/3, accept_loop/4]).
31
32 -ignore_xref([start_link/6, start_accept_loop/3]).
33
34 -type connection_details() :: #{
35 proxy := boolean(),
36 version => 1 | 2,
37 src_address := inet:ip_address() | binary(),
38 src_port := inet:port_number(),
39 dest_address := inet:ip_address() | binary(),
40 dest_port := inet:port_number()
41 }.
42 -export_type([connection_details/0]).
43
44 %%--------------------------------------------------------------------
45 %% API
46 %%--------------------------------------------------------------------
47
48 -spec start_link(Id :: mongoose_listener_config:listener_id(),
49 Module :: atom(),
50 Opts :: [any(), ...],
51 SockOpts :: [gen_tcp:listen_option()],
52 Port :: inet:port_number(),
53 IPS :: [any()]) -> any().
54 start_link(Id, Module, Opts, SockOpts, Port, IPS) ->
55 428 supervisor:start_link(?MODULE, {Id, Module, Opts, SockOpts, Port, IPS}).
56
57 -spec init({Id :: mongoose_listener_config:listener_id(),
58 Module :: atom(),
59 Opts :: [any(), ...],
60 SockOpts :: [gen_tcp:listen_option()],
61 Port :: inet:port_number(),
62 IPS :: [any()]}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}.
63 init({Id, Module, Opts, SockOpts, Port, IPS}) ->
64 428 try
65 428 AcceptorsNum = proplists:get_value(acceptors_num, Opts, 100),
66 428 ListenSocket = listen_tcp(Id, Module, SockOpts, Port, IPS),
67 428 Children = [make_childspec({Id, I}, ListenSocket, Module, Opts)
68 428 || I <- lists:seq(1, AcceptorsNum)],
69 428 {ok, {#{strategy => one_for_one, intensity => 100, period => 1}, Children}}
70 catch
71
:-(
Error -> exit(Error)
72 end.
73
74 %%--------------------------------------------------------------------
75 %% Helpers
76 %%--------------------------------------------------------------------
77
78 -spec start_accept_loop(Socket :: port(),
79 Module :: ejabberd_listener:mod(),
80 Opts :: ejabberd_listener:opts()) -> {ok, pid()}.
81 start_accept_loop(ListenSock, Module, Opts) ->
82 42801 ProxyProtocol = proplists:get_value(proxy_protocol, Opts, false),
83 42801 Pid = proc_lib:spawn_link(?MODULE, accept_loop, [ListenSock, Module, Opts, ProxyProtocol]),
84 42801 {ok, Pid}.
85
86 -spec accept_loop(Socket :: port(),
87 Module :: ejabberd_listener:mod(),
88 Opts :: ejabberd_listener:opts(),
89 ProxyProtocol :: boolean()) -> no_return().
90 accept_loop(ListenSocket, Module, Opts, ProxyProtocol) ->
91 46751 case do_accept(ListenSocket, ProxyProtocol) of
92 {ok, Socket, ConnectionDetails} ->
93 3872 ?LOG_INFO(#{what => tcp_accepted,
94 socket => Socket, handler_module => Module,
95 3872 conn_details => ConnectionDetails}),
96 3872 ejabberd_socket:start(
97 Module, gen_tcp, Socket, [{connection_details, ConnectionDetails} | Opts]),
98 3872 ?MODULE:accept_loop(ListenSocket, Module, Opts, ProxyProtocol);
99 {error, Reason} ->
100
:-(
?LOG_INFO(#{what => tcp_accept_failed,
101 listen_socket => ListenSocket,
102
:-(
reason => Reason, handler_module => Module}),
103
:-(
?MODULE:accept_loop(ListenSocket, Module, Opts, ProxyProtocol)
104 end.
105
106 -spec do_accept(gen_tcp:socket(), boolean()) ->
107 {ok, gen_tcp:socket(), connection_details()} | {error, term()}.
108 do_accept(ListenSocket, ProxyProtocol) ->
109 46751 case gen_tcp:accept(ListenSocket) of
110 {ok, Socket} when ProxyProtocol ->
111 2 read_proxy_header(Socket);
112 {ok, Socket} ->
113 3871 {ok, {DestAddr, DestPort}} = inet:sockname(Socket),
114 3871 {ok, {SrcAddr, SrcPort}} = inet:peername(Socket),
115 3871 {ok, Socket, #{proxy => false,
116 src_address => SrcAddr,
117 src_port => SrcPort,
118 dest_address => DestAddr,
119 dest_port => DestPort}};
120 Other ->
121
:-(
Other
122 end.
123
124 -spec read_proxy_header(gen_tcp:socket()) -> {ok, gen_tcp:socket(), connection_details()}.
125 read_proxy_header(Socket) ->
126 2 {ok, ProxyInfo} = ranch_tcp:recv_proxy_header(Socket, 1000),
127 1 {ok, Socket, #{proxy => true,
128 src_address => maps:get(src_address, ProxyInfo),
129 src_port => maps:get(src_port, ProxyInfo),
130 dest_address => maps:get(dest_address, ProxyInfo),
131 dest_port => maps:get(dest_port, ProxyInfo),
132 version => maps:get(version, ProxyInfo)
133 }}.
134
135 -spec make_childspec(Id :: term(), ListenSock :: port(),
136 Module :: module(), Opts :: [any()]) ->
137 supervisor:child_spec().
138 make_childspec(Id, ListenSock, Module, Opts) ->
139 42800 #{id => Id,
140 start => {?MODULE, start_accept_loop, [ListenSock, Module, Opts]},
141 restart => permanent,
142 shutdown => brutal_kill,
143 type => worker,
144 modules => [?MODULE]}.
145
146 -spec listen_tcp(Id :: mongoose_listener_config:listener_id(),
147 Module :: atom(),
148 SockOpts :: [gen_tcp:listen_option()],
149 Port :: inet:port_number(),
150 IPS :: [any()]) -> port().
151 listen_tcp(Id, Module, SockOpts, Port, IPS) ->
152 428 DefaultSockOpts = [binary,
153 {backlog, 100},
154 {packet, 0},
155 {active, false},
156 {reuseaddr, true},
157 {nodelay, true},
158 {send_timeout, ?TCP_SEND_TIMEOUT},
159 {keepalive, true},
160 {send_timeout_close, true}],
161 428 FinalSockOpts = override_sock_opts(SockOpts, DefaultSockOpts),
162 428 Res = listen_or_retry(Port, FinalSockOpts, 10),
163 428 case Res of
164 {ok, ListenSocket} ->
165 428 ListenSocket;
166 {error, Reason} ->
167
:-(
ejabberd_listener:socket_error(Reason, Id, Module, SockOpts, Port, IPS)
168 end.
169
170 %% Process exit and socket release are not transactional
171 %% So, there can be a short period of time when we can't bind
172 listen_or_retry(Port, SockOpts, Retries) ->
173 428 case gen_tcp:listen(Port, SockOpts) of
174 {ok, ListenSocket} ->
175 428 {ok, ListenSocket};
176 {error, eaddrinuse} when Retries > 0 ->
177
:-(
timer:sleep(100),
178
:-(
listen_or_retry(Port, SockOpts, Retries-1);
179 {error, Reason} ->
180
:-(
{error, Reason}
181 end.
182
183 override_sock_opts([], Opts) ->
184 428 Opts;
185 override_sock_opts([Override | OverrideOpts], Opts) ->
186 856 NewOpts = do_override(Override, Opts),
187 856 override_sock_opts(OverrideOpts, NewOpts).
188
189 do_override({ip, _} = IP, Opts) ->
190 428 lists:keystore(ip, 1, Opts, IP);
191 do_override({backlog, _} = Backlog, Opts) ->
192
:-(
lists:keystore(backlog, 1, Opts, Backlog);
193 do_override(inet6, Opts) ->
194
:-(
[inet6 | lists:delete(inet6, Opts)];
195 do_override(inet, Opts) ->
196 428 [inet | lists:delete(inet, Opts)];
197 do_override(_, Opts) ->
198
:-(
Opts.
Line Hits Source