1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mongoose_tcp_listener). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-include("mongoose.hrl"). |
21 |
|
|
22 |
|
-behaviour(supervisor). |
23 |
|
|
24 |
|
%% We do not block on send anymore. |
25 |
|
-define(TCP_SEND_TIMEOUT, 15000). |
26 |
|
|
27 |
|
-export([start_link/6, init/1]). |
28 |
|
|
29 |
|
%% Internal |
30 |
|
-export([start_accept_loop/3, accept_loop/4]). |
31 |
|
|
32 |
|
-ignore_xref([start_link/6, start_accept_loop/3]). |
33 |
|
|
34 |
|
-type connection_details() :: #{ |
35 |
|
proxy := boolean(), |
36 |
|
version => 1 | 2, |
37 |
|
src_address := inet:ip_address() | binary(), |
38 |
|
src_port := inet:port_number(), |
39 |
|
dest_address := inet:ip_address() | binary(), |
40 |
|
dest_port := inet:port_number() |
41 |
|
}. |
42 |
|
-export_type([connection_details/0]). |
43 |
|
|
44 |
|
%%-------------------------------------------------------------------- |
45 |
|
%% API |
46 |
|
%%-------------------------------------------------------------------- |
47 |
|
|
48 |
|
-spec start_link(Id :: mongoose_listener_config:listener_id(), |
49 |
|
Module :: atom(), |
50 |
|
Opts :: [any(), ...], |
51 |
|
SockOpts :: [gen_tcp:listen_option()], |
52 |
|
Port :: inet:port_number(), |
53 |
|
IPS :: [any()]) -> any(). |
54 |
|
start_link(Id, Module, Opts, SockOpts, Port, IPS) -> |
55 |
446 |
supervisor:start_link(?MODULE, {Id, Module, Opts, SockOpts, Port, IPS}). |
56 |
|
|
57 |
|
-spec init({Id :: mongoose_listener_config:listener_id(), |
58 |
|
Module :: atom(), |
59 |
|
Opts :: [any(), ...], |
60 |
|
SockOpts :: [gen_tcp:listen_option()], |
61 |
|
Port :: inet:port_number(), |
62 |
|
IPS :: [any()]}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. |
63 |
|
init({Id, Module, Opts, SockOpts, Port, IPS}) -> |
64 |
446 |
try |
65 |
446 |
AcceptorsNum = proplists:get_value(acceptors_num, Opts, 100), |
66 |
446 |
ListenSocket = listen_tcp(Id, Module, SockOpts, Port, IPS), |
67 |
446 |
Children = [make_childspec({Id, I}, ListenSocket, Module, Opts) |
68 |
446 |
|| I <- lists:seq(1, AcceptorsNum)], |
69 |
446 |
{ok, {#{strategy => one_for_one, intensity => 100, period => 1}, Children}} |
70 |
|
catch |
71 |
:-( |
Error -> exit(Error) |
72 |
|
end. |
73 |
|
|
74 |
|
%%-------------------------------------------------------------------- |
75 |
|
%% Helpers |
76 |
|
%%-------------------------------------------------------------------- |
77 |
|
|
78 |
|
-spec start_accept_loop(Socket :: port(), |
79 |
|
Module :: ejabberd_listener:mod(), |
80 |
|
Opts :: ejabberd_listener:opts()) -> {ok, pid()}. |
81 |
|
start_accept_loop(ListenSock, Module, Opts) -> |
82 |
44601 |
ProxyProtocol = proplists:get_value(proxy_protocol, Opts, false), |
83 |
44601 |
Pid = proc_lib:spawn_link(?MODULE, accept_loop, [ListenSock, Module, Opts, ProxyProtocol]), |
84 |
44601 |
{ok, Pid}. |
85 |
|
|
86 |
|
-spec accept_loop(Socket :: port(), |
87 |
|
Module :: ejabberd_listener:mod(), |
88 |
|
Opts :: ejabberd_listener:opts(), |
89 |
|
ProxyProtocol :: boolean()) -> no_return(). |
90 |
|
accept_loop(ListenSocket, Module, Opts, ProxyProtocol) -> |
91 |
50954 |
case do_accept(ListenSocket, ProxyProtocol) of |
92 |
|
{ok, Socket, ConnectionDetails} -> |
93 |
6275 |
?LOG_INFO(#{what => tcp_accepted, |
94 |
|
socket => Socket, handler_module => Module, |
95 |
6275 |
conn_details => ConnectionDetails}), |
96 |
6275 |
ejabberd_socket:start( |
97 |
|
Module, gen_tcp, Socket, [{connection_details, ConnectionDetails} | Opts]), |
98 |
6275 |
?MODULE:accept_loop(ListenSocket, Module, Opts, ProxyProtocol); |
99 |
|
{error, Reason} -> |
100 |
:-( |
?LOG_INFO(#{what => tcp_accept_failed, |
101 |
|
listen_socket => ListenSocket, |
102 |
:-( |
reason => Reason, handler_module => Module}), |
103 |
:-( |
?MODULE:accept_loop(ListenSocket, Module, Opts, ProxyProtocol) |
104 |
|
end. |
105 |
|
|
106 |
|
-spec do_accept(gen_tcp:socket(), boolean()) -> |
107 |
|
{ok, gen_tcp:socket(), connection_details()} | {error, term()}. |
108 |
|
do_accept(ListenSocket, ProxyProtocol) -> |
109 |
50954 |
case gen_tcp:accept(ListenSocket) of |
110 |
|
{ok, Socket} when ProxyProtocol -> |
111 |
2 |
read_proxy_header(Socket); |
112 |
|
{ok, Socket} -> |
113 |
6274 |
{ok, {DestAddr, DestPort}} = inet:sockname(Socket), |
114 |
6274 |
{ok, {SrcAddr, SrcPort}} = inet:peername(Socket), |
115 |
6274 |
{ok, Socket, #{proxy => false, |
116 |
|
src_address => SrcAddr, |
117 |
|
src_port => SrcPort, |
118 |
|
dest_address => DestAddr, |
119 |
|
dest_port => DestPort}}; |
120 |
|
Other -> |
121 |
:-( |
Other |
122 |
|
end. |
123 |
|
|
124 |
|
-spec read_proxy_header(gen_tcp:socket()) -> {ok, gen_tcp:socket(), connection_details()}. |
125 |
|
read_proxy_header(Socket) -> |
126 |
2 |
{ok, ProxyInfo} = ranch_tcp:recv_proxy_header(Socket, 1000), |
127 |
1 |
{ok, Socket, #{proxy => true, |
128 |
|
src_address => maps:get(src_address, ProxyInfo), |
129 |
|
src_port => maps:get(src_port, ProxyInfo), |
130 |
|
dest_address => maps:get(dest_address, ProxyInfo), |
131 |
|
dest_port => maps:get(dest_port, ProxyInfo), |
132 |
|
version => maps:get(version, ProxyInfo) |
133 |
|
}}. |
134 |
|
|
135 |
|
-spec make_childspec(Id :: term(), ListenSock :: port(), |
136 |
|
Module :: module(), Opts :: [any()]) -> |
137 |
|
supervisor:child_spec(). |
138 |
|
make_childspec(Id, ListenSock, Module, Opts) -> |
139 |
44600 |
#{id => Id, |
140 |
|
start => {?MODULE, start_accept_loop, [ListenSock, Module, Opts]}, |
141 |
|
restart => permanent, |
142 |
|
shutdown => brutal_kill, |
143 |
|
type => worker, |
144 |
|
modules => [?MODULE]}. |
145 |
|
|
146 |
|
-spec listen_tcp(Id :: mongoose_listener_config:listener_id(), |
147 |
|
Module :: atom(), |
148 |
|
SockOpts :: [gen_tcp:listen_option()], |
149 |
|
Port :: inet:port_number(), |
150 |
|
IPS :: [any()]) -> port(). |
151 |
|
listen_tcp(Id, Module, SockOpts, Port, IPS) -> |
152 |
446 |
DefaultSockOpts = [binary, |
153 |
|
{backlog, 100}, |
154 |
|
{packet, 0}, |
155 |
|
{active, false}, |
156 |
|
{reuseaddr, true}, |
157 |
|
{nodelay, true}, |
158 |
|
{send_timeout, ?TCP_SEND_TIMEOUT}, |
159 |
|
{keepalive, true}, |
160 |
|
{send_timeout_close, true}], |
161 |
446 |
FinalSockOpts = override_sock_opts(SockOpts, DefaultSockOpts), |
162 |
446 |
Res = listen_or_retry(Port, FinalSockOpts, 10), |
163 |
446 |
case Res of |
164 |
|
{ok, ListenSocket} -> |
165 |
446 |
ListenSocket; |
166 |
|
{error, Reason} -> |
167 |
:-( |
ejabberd_listener:socket_error(Reason, Id, Module, SockOpts, Port, IPS) |
168 |
|
end. |
169 |
|
|
170 |
|
%% Process exit and socket release are not transactional |
171 |
|
%% So, there can be a short period of time when we can't bind |
172 |
|
listen_or_retry(Port, SockOpts, Retries) -> |
173 |
446 |
case gen_tcp:listen(Port, SockOpts) of |
174 |
|
{ok, ListenSocket} -> |
175 |
446 |
{ok, ListenSocket}; |
176 |
|
{error, eaddrinuse} when Retries > 0 -> |
177 |
:-( |
timer:sleep(100), |
178 |
:-( |
listen_or_retry(Port, SockOpts, Retries-1); |
179 |
|
{error, Reason} -> |
180 |
:-( |
{error, Reason} |
181 |
|
end. |
182 |
|
|
183 |
|
override_sock_opts([], Opts) -> |
184 |
446 |
Opts; |
185 |
|
override_sock_opts([Override | OverrideOpts], Opts) -> |
186 |
892 |
NewOpts = do_override(Override, Opts), |
187 |
892 |
override_sock_opts(OverrideOpts, NewOpts). |
188 |
|
|
189 |
|
do_override({ip, _} = IP, Opts) -> |
190 |
446 |
lists:keystore(ip, 1, Opts, IP); |
191 |
|
do_override({backlog, _} = Backlog, Opts) -> |
192 |
:-( |
lists:keystore(backlog, 1, Opts, Backlog); |
193 |
|
do_override(inet6, Opts) -> |
194 |
:-( |
[inet6 | lists:delete(inet6, Opts)]; |
195 |
|
do_override(inet, Opts) -> |
196 |
446 |
[inet | lists:delete(inet, Opts)]; |
197 |
|
do_override(_, Opts) -> |
198 |
:-( |
Opts. |