1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : mongoose_transport.erl |
3 |
|
%%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com> |
4 |
|
%%% Purpose : transport module for s2s and components connection |
5 |
|
%%% Created : 18 Jan 2017 |
6 |
|
%%%---------------------------------------------------------------------- |
7 |
|
|
8 |
|
-module(mongoose_transport). |
9 |
|
-author('piotr.nosek@erlang-solutions.com'). |
10 |
|
|
11 |
|
-include("mongoose.hrl"). |
12 |
|
-include("jlib.hrl"). |
13 |
|
-include_lib("public_key/include/public_key.hrl"). |
14 |
|
|
15 |
|
-behaviour(gen_server). |
16 |
|
|
17 |
|
%%---------------------------------------------------------------------- |
18 |
|
%% Types |
19 |
|
%%---------------------------------------------------------------------- |
20 |
|
|
21 |
|
-type send_xml_input() :: {xmlstreamelement, exml:element()} |
22 |
|
| jlib:xmlstreamstart() |
23 |
|
| jlib:xmlstreamend(). |
24 |
|
-type peer() :: {inet:ip_address(), inet:port_number()}. |
25 |
|
-type peername_return() :: {ok, peer()} | {error, inet:posix()}. |
26 |
|
-type peercert_return() :: no_peer_cert | {ok, #'Certificate'{}}. |
27 |
|
|
28 |
|
-type stanza_size() :: pos_integer() | infinity. |
29 |
|
-type connection_type() :: s2s | component | undefined. |
30 |
|
|
31 |
|
-type options() :: #{max_stanza_size := stanza_size(), |
32 |
|
hibernate_after := non_neg_integer(), |
33 |
|
channel => connection_type(), |
34 |
|
atom() => any()}. |
35 |
|
|
36 |
|
-export_type([socket_data/0, send_xml_input/0, peer/0, peername_return/0, peercert_return/0]). |
37 |
|
|
38 |
|
-type socket_module() :: gen_tcp | mongoose_tls. |
39 |
|
-type socket() :: gen_tcp:socket() | mongoose_tls:socket(). |
40 |
|
|
41 |
|
-record(socket_data, {sockmod = gen_tcp :: socket_module(), |
42 |
|
socket :: term(), |
43 |
|
receiver :: pid(), |
44 |
|
connection_type :: connection_type(), |
45 |
|
connection_details :: mongoose_tcp_listener:connection_details() |
46 |
|
}). |
47 |
|
|
48 |
|
-type socket_data() :: #socket_data{}. |
49 |
|
|
50 |
|
-record(state, {socket :: socket(), |
51 |
|
sockmod = gen_tcp :: socket_module(), |
52 |
|
shaper_state :: mongoose_shaper:shaper(), |
53 |
|
dest_pid :: undefined | pid(), %% gen_fsm_compat pid |
54 |
|
max_stanza_size :: stanza_size(), |
55 |
|
parser :: exml_stream:parser(), |
56 |
|
connection_type :: connection_type(), |
57 |
|
hibernate_after = 0 :: non_neg_integer()}). |
58 |
|
-type state() :: #state{}. |
59 |
|
|
60 |
|
%% transport API |
61 |
|
-export([accept/4, connect/5, close/1, send_text/2, send_element/2]). |
62 |
|
-export([wait_for_tls_handshake/2, wait_for_tls_handshake/3, |
63 |
|
connect_tls/2, get_peer_certificate/1]). |
64 |
|
-export([monitor/1, peername/1, change_shaper/2]). |
65 |
|
-export([get_all_trasport_processes/0]). |
66 |
|
|
67 |
|
%% gen_server API |
68 |
|
-export([start_link/3, init/1, terminate/2, |
69 |
|
handle_cast/2, handle_call/3, handle_info/2]). |
70 |
|
|
71 |
|
-ignore_xref([start_link/3, get_all_trasport_processes/0, wait_for_tls_handshake/2]). |
72 |
|
|
73 |
|
%%---------------------------------------------------------------------- |
74 |
|
%% Transport API |
75 |
|
%%---------------------------------------------------------------------- |
76 |
|
|
77 |
|
-spec accept(module(), gen_tcp:socket(), |
78 |
|
mongoose_tcp_listener:options(), |
79 |
|
mongoose_tcp_listener:connection_details()) -> ok. |
80 |
|
accept(Module, Socket, Opts, ConnectionDetails) -> |
81 |
83 |
Receiver = start_child(Socket, none, Opts), |
82 |
83 |
ConnectionType = maps:get(connection_type, Opts), |
83 |
83 |
SocketData = #socket_data{sockmod = gen_tcp, |
84 |
|
socket = Socket, |
85 |
|
receiver = Receiver, |
86 |
|
connection_type = ConnectionType, |
87 |
|
connection_details = ConnectionDetails}, |
88 |
83 |
case gen_tcp:controlling_process(Socket, Receiver) of |
89 |
|
ok -> |
90 |
83 |
case Module:start(SocketData, Opts) of |
91 |
|
{ok, DestPid} -> |
92 |
83 |
set_dest_pid(Receiver, DestPid); |
93 |
|
{error, _Reason} -> |
94 |
:-( |
gen_tcp:close(Socket), |
95 |
:-( |
close(SocketData) |
96 |
|
end; |
97 |
|
{error, _Reason} -> |
98 |
:-( |
gen_tcp:close(Socket) |
99 |
|
end. |
100 |
|
|
101 |
|
-spec connect(ConnectionType :: connection_type(), |
102 |
|
Addr :: atom() | string() | inet:ip_address(), |
103 |
|
Port :: inet:port_number(), |
104 |
|
Opts :: [gen_tcp:connect_option()], |
105 |
|
Timeout :: non_neg_integer() | infinity |
106 |
|
) -> {'error', atom()} | {'ok', socket_data()}. |
107 |
|
connect(ConnectionType, Addr, Port, Opts, Timeout) -> |
108 |
42 |
case gen_tcp:connect(Addr, Port, Opts, Timeout) of |
109 |
|
{ok, Socket} -> |
110 |
|
%% Receiver options are configurable only for listeners |
111 |
|
%% It might make sense to make them configurable for |
112 |
|
%% outgoing s2s connections as well |
113 |
42 |
ReceiverOpts = #{max_stanza_size => infinity, |
114 |
|
hibernate_after => 0, |
115 |
|
connection_type => ConnectionType}, |
116 |
42 |
Receiver = start_child(Socket, none, ReceiverOpts), |
117 |
42 |
{SrcAddr, SrcPort} = case inet:sockname(Socket) of |
118 |
42 |
{ok, {A, P}} -> {A, P}; |
119 |
:-( |
{error, _} -> {unknown, unknown} |
120 |
|
end, |
121 |
42 |
ConnectionDetails = #{dest_address => Addr, dest_port => Port, proxy => false, |
122 |
|
src_address => SrcAddr, src_port => SrcPort}, |
123 |
42 |
SocketData = #socket_data{sockmod = gen_tcp, |
124 |
|
socket = Socket, |
125 |
|
receiver = Receiver, |
126 |
|
connection_type = ConnectionType, |
127 |
|
connection_details = ConnectionDetails}, |
128 |
42 |
DestPid = self(), |
129 |
42 |
case gen_tcp:controlling_process(Socket, Receiver) of |
130 |
|
ok -> |
131 |
42 |
set_dest_pid(Receiver, DestPid), |
132 |
42 |
{ok, SocketData}; |
133 |
|
{error, _Reason} = Error -> |
134 |
:-( |
gen_tcp:close(Socket), |
135 |
:-( |
Error |
136 |
|
end; |
137 |
|
{error, _Reason} = Error -> |
138 |
:-( |
Error |
139 |
|
end. |
140 |
|
|
141 |
|
-spec close(socket_data()) -> ok. |
142 |
|
close(#socket_data{receiver = Receiver}) -> |
143 |
110 |
gen_server:cast(Receiver, close). |
144 |
|
|
145 |
|
-spec wait_for_tls_handshake(socket_data(), mongoose_tls:options()) -> socket_data(). |
146 |
|
wait_for_tls_handshake(#socket_data{receiver = Receiver} = SocketData, TLSOpts) -> |
147 |
:-( |
tcp_to_tls(Receiver, TLSOpts#{connect => false}), |
148 |
:-( |
update_socket(SocketData). |
149 |
|
|
150 |
|
-spec wait_for_tls_handshake(socket_data(), mongoose_tls:options(), exml:element()) -> |
151 |
|
socket_data(). |
152 |
|
wait_for_tls_handshake(#socket_data{receiver = Receiver} = SocketData, TLSOpts, El) -> |
153 |
22 |
tcp_to_tls(Receiver, TLSOpts#{connect => false}), |
154 |
22 |
send_element(SocketData, El), %% send last negotiation chunk via tcp |
155 |
22 |
update_socket(SocketData). |
156 |
|
|
157 |
|
-spec connect_tls(socket_data(), mongoose_tls:options()) -> socket_data(). |
158 |
|
connect_tls(#socket_data{receiver = Receiver} = SocketData, TLSOpts) -> |
159 |
17 |
tcp_to_tls(Receiver, TLSOpts#{connect => true}), |
160 |
17 |
update_socket(SocketData). |
161 |
|
|
162 |
|
-spec send_text(socket_data(), binary()) -> ok. |
163 |
|
send_text(SocketData, Data) -> |
164 |
510 |
#socket_data{sockmod = SockMod, socket = Socket, |
165 |
|
connection_type = ConnectionType} = SocketData, |
166 |
510 |
case catch SockMod:send(Socket, Data) of |
167 |
|
ok -> |
168 |
510 |
update_transport_metrics(byte_size(Data), sent, ConnectionType), |
169 |
510 |
ok; |
170 |
|
{error, timeout} -> |
171 |
:-( |
?LOG_INFO(#{what => socket_error, reason => timeout, |
172 |
:-( |
socket => SockMod}), |
173 |
:-( |
exit(normal); |
174 |
|
Error -> |
175 |
:-( |
?LOG_INFO(#{what => socket_error, reason => Error, |
176 |
:-( |
socket => SockMod}), |
177 |
:-( |
exit(normal) |
178 |
|
end. |
179 |
|
|
180 |
|
|
181 |
|
-spec send_element(socket_data(), exml:element()) -> ok. |
182 |
|
send_element(SocketData, El) -> |
183 |
331 |
BinEl = exml:to_binary(El), |
184 |
331 |
mongoose_metrics:update(global, [data, xmpp, sent, xml_stanza_size], byte_size(BinEl)), |
185 |
331 |
send_text(SocketData, BinEl). |
186 |
|
|
187 |
|
-spec get_peer_certificate(socket_data()) -> mongoose_tls:cert(). |
188 |
|
get_peer_certificate(#socket_data{sockmod = mongoose_tls, socket = Socket}) -> |
189 |
25 |
mongoose_tls:get_peer_certificate(Socket); |
190 |
|
get_peer_certificate(_SocketData) -> |
191 |
:-( |
no_peer_cert. |
192 |
|
|
193 |
|
-spec peername(socket_data()) -> mongoose_transport:peername_return(). |
194 |
|
peername(#socket_data{connection_details = #{src_address := SrcAddr, |
195 |
|
src_port := SrcPort}}) -> |
196 |
3 |
{ok, {SrcAddr, SrcPort}}. |
197 |
|
|
198 |
|
-spec monitor(socket_data()) -> reference(). |
199 |
|
monitor(#socket_data{receiver = Receiver}) -> |
200 |
31 |
erlang:monitor(process, Receiver). |
201 |
|
|
202 |
|
-spec change_shaper(socket_data(), _) -> any(). |
203 |
|
change_shaper(#socket_data{receiver = Receiver}, Shaper) -> |
204 |
57 |
gen_server:cast(Receiver, {change_shaper, Shaper}). |
205 |
|
|
206 |
|
get_all_trasport_processes() -> |
207 |
:-( |
Connections = supervisor:which_children(mongoose_transport_sup), |
208 |
:-( |
get_transport_info(Connections). |
209 |
|
%%---------------------------------------------------------------------- |
210 |
|
%% gen_server interfaces |
211 |
|
%%---------------------------------------------------------------------- |
212 |
|
-spec start_link(port(), atom(), options()) -> |
213 |
|
ignore | {error, _} | {ok, pid()}. |
214 |
|
start_link(Socket, Shaper, Opts) -> |
215 |
125 |
gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []). |
216 |
|
|
217 |
|
init([Socket, Shaper, Opts]) -> |
218 |
125 |
ShaperState = mongoose_shaper:new(Shaper), |
219 |
125 |
#{max_stanza_size := MaxStanzaSize, |
220 |
|
hibernate_after := HibernateAfter, |
221 |
|
connection_type := ConnectionType} = Opts, |
222 |
125 |
Parser = new_parser(MaxStanzaSize), |
223 |
125 |
{ok, #state{socket = Socket, |
224 |
|
shaper_state = ShaperState, |
225 |
|
max_stanza_size = MaxStanzaSize, |
226 |
|
connection_type = ConnectionType, |
227 |
|
parser = Parser, |
228 |
|
hibernate_after = HibernateAfter}}. |
229 |
|
|
230 |
|
handle_call(get_socket, _From, #state{socket = Socket} = State) -> |
231 |
39 |
{reply, {ok, Socket}, State, hibernate_or_timeout(State)}; |
232 |
|
handle_call({tcp_to_tls, TLSOpts}, From, #state{socket = TCPSocket} = State0) -> |
233 |
|
%% the next message from client is part of TLS handshake, it must |
234 |
|
%% be handled by TLS library (another process in case of just_tls) |
235 |
|
%% so deactivating the socket. |
236 |
39 |
deactivate_socket(State0), |
237 |
|
%% TLS handshake always starts from client's request, let |
238 |
|
%% server finish starttls negotiation and notify client |
239 |
|
%% that it can start TLS handshake. |
240 |
39 |
gen_server:reply(From, ok), |
241 |
39 |
case mongoose_tls:tcp_to_tls(TCPSocket, TLSOpts) of |
242 |
|
{ok, TLSSocket} -> |
243 |
39 |
State1 = reset_parser(State0), |
244 |
39 |
State2 = State1#state{socket = TLSSocket, sockmod = mongoose_tls}, |
245 |
|
%% fast_tls requires dummy recv_data/2 call to accomplish TLS |
246 |
|
%% handshake. such call is simply ignored by just_tls backend. |
247 |
39 |
case mongoose_tls:recv_data(TLSSocket, <<>>) of |
248 |
|
{ok, TLSData} -> |
249 |
39 |
State3 = process_data(TLSData, State2), |
250 |
39 |
{noreply, State3, hibernate_or_timeout(State3)}; |
251 |
|
{error, Reason} -> |
252 |
:-( |
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason, |
253 |
:-( |
dest_pid => State2#state.dest_pid}), |
254 |
:-( |
{stop, normal, State2} |
255 |
|
end; |
256 |
|
{error, Reason} -> |
257 |
:-( |
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason, |
258 |
:-( |
dest_pid => State0#state.dest_pid}), |
259 |
:-( |
{stop, normal, State0} |
260 |
|
end; |
261 |
|
handle_call({set_dest_pid, DestPid}, _From, #state{dest_pid = undefined} = State) -> |
262 |
125 |
StateAfterReset = reset_parser(State), |
263 |
125 |
NewState = StateAfterReset#state{dest_pid = DestPid}, |
264 |
125 |
activate_socket(NewState), |
265 |
125 |
{reply, ok, NewState, hibernate_or_timeout(NewState)}; |
266 |
|
handle_call(_Request, _From, State) -> |
267 |
:-( |
{reply, ok, State, hibernate_or_timeout(State)}. |
268 |
|
|
269 |
|
handle_cast({change_shaper, Shaper}, State) -> |
270 |
57 |
NewShaperState = mongoose_shaper:new(Shaper), |
271 |
57 |
NewState = State#state{shaper_state = NewShaperState}, |
272 |
57 |
{noreply, NewState, hibernate_or_timeout(NewState)}; |
273 |
|
handle_cast(close, State) -> |
274 |
51 |
{stop, normal, State}; |
275 |
|
handle_cast(_Msg, State) -> |
276 |
:-( |
{noreply, State, hibernate_or_timeout(State)}. |
277 |
|
|
278 |
|
handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) -> |
279 |
357 |
NewState = process_data(Data, State), |
280 |
357 |
{noreply, NewState, hibernate_or_timeout(NewState)}; |
281 |
|
handle_info({Tag, _TCPSocket, Data}, |
282 |
|
#state{socket = Socket, |
283 |
|
sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl -> |
284 |
220 |
case mongoose_tls:recv_data(Socket, Data) of |
285 |
|
{ok, TLSData} -> |
286 |
220 |
NewState = process_data(TLSData, State), |
287 |
220 |
{noreply, NewState, hibernate_or_timeout(NewState)}; |
288 |
|
{error, Reason} -> |
289 |
:-( |
?LOG_WARNING(#{what => transport_tls_recv_error, socket => Socket, reason => Reason}), |
290 |
:-( |
{stop, normal, State} |
291 |
|
end; |
292 |
|
handle_info({Tag, _Socket}, State) when Tag == tcp_closed; Tag == ssl_closed -> |
293 |
74 |
{stop, normal, State}; |
294 |
|
handle_info({Tag, _Socket, Reason}, State) when Tag == tcp_error; Tag == ssl_error -> |
295 |
:-( |
case Reason of |
296 |
|
timeout -> |
297 |
:-( |
{noreply, State, hibernate_or_timeout(State)}; |
298 |
|
_ -> |
299 |
:-( |
{stop, normal, State} |
300 |
|
end; |
301 |
|
handle_info({timeout, _Ref, activate}, State) -> |
302 |
:-( |
activate_socket(State), |
303 |
:-( |
{noreply, State, hibernate_or_timeout(State)}; |
304 |
|
handle_info(timeout, State) -> |
305 |
:-( |
{noreply, State, hibernate()}; |
306 |
|
handle_info(_Info, State) -> |
307 |
:-( |
{noreply, State, hibernate_or_timeout(State)}. |
308 |
|
|
309 |
|
terminate(_Reason, #state{parser = Parser, dest_pid = DestPid, |
310 |
|
socket = Socket, sockmod = SockMod}) -> |
311 |
125 |
free_parser(Parser), |
312 |
125 |
case DestPid of |
313 |
:-( |
undefined -> ok; |
314 |
125 |
_ -> gen_fsm_compat:send_event(DestPid, closed) |
315 |
|
end, |
316 |
125 |
catch shutdown_socket_and_wait_for_peer_to_close(Socket, SockMod), |
317 |
125 |
ok. |
318 |
|
|
319 |
|
%%---------------------------------------------------------------------- |
320 |
|
%% local API helpers |
321 |
|
%%---------------------------------------------------------------------- |
322 |
|
-spec start_child(port(), atom(), options()) -> pid(). |
323 |
|
start_child(Socket, Shaper, Opts) -> |
324 |
125 |
{ok, Receiver} = supervisor:start_child(mongoose_transport_sup, |
325 |
|
[Socket, Shaper, Opts]), |
326 |
125 |
Receiver. |
327 |
|
|
328 |
|
get_transport_info(ConnectionList) when is_list(ConnectionList) -> |
329 |
:-( |
[get_transport_info(Pid) || {_, Pid, _, _} <- ConnectionList, is_pid(Pid)]; |
330 |
|
get_transport_info(TransportPid) when is_pid(TransportPid) -> |
331 |
:-( |
State = sys:get_state(TransportPid), |
332 |
:-( |
maps:from_list(lists:zip(record_info(fields, state),tl(tuple_to_list(State)))). |
333 |
|
|
334 |
|
-spec tcp_to_tls(pid(), mongoose_tls:options()) -> ok | {error, any()}. |
335 |
|
tcp_to_tls(Receiver, TLSOpts) -> |
336 |
39 |
gen_server_call_or_noproc(Receiver, {tcp_to_tls, TLSOpts}). |
337 |
|
|
338 |
|
-spec update_socket(socket_data()) -> socket_data(). |
339 |
|
update_socket(#socket_data{receiver = Receiver} = SocketData) -> |
340 |
39 |
case gen_server_call_or_noproc(Receiver, get_socket) of |
341 |
|
{ok, TLSSocket} -> |
342 |
39 |
SocketData#socket_data{socket = TLSSocket, sockmod = mongoose_tls}; |
343 |
|
{error, _} -> |
344 |
:-( |
exit(invalid_socket_after_upgrade_to_tls) |
345 |
|
end. |
346 |
|
|
347 |
|
-spec set_dest_pid(pid(), pid()) -> ok | {error, any()}. |
348 |
|
set_dest_pid(Receiver, DestPid) -> |
349 |
125 |
gen_server:call(Receiver, {set_dest_pid, DestPid}). |
350 |
|
|
351 |
|
-spec gen_server_call_or_noproc(pid(), any()) -> Ret :: any() | {error, any()}. |
352 |
|
gen_server_call_or_noproc(Pid, Message) -> |
353 |
78 |
try |
354 |
78 |
gen_server:call(Pid, Message) |
355 |
|
catch |
356 |
|
exit:{noproc, Extra} -> |
357 |
:-( |
{error, {noproc, Extra}}; |
358 |
|
exit:{normal, Extra} -> |
359 |
|
% reciver exited with normal status after the gen_server call was sent |
360 |
|
% but before it was processed |
361 |
:-( |
{error, {died, Extra}} |
362 |
|
end. |
363 |
|
|
364 |
|
%%-------------------------------------------------------------------- |
365 |
|
%% internal functions |
366 |
|
%%-------------------------------------------------------------------- |
367 |
|
|
368 |
|
-spec activate_socket(state()) -> 'ok' | {'tcp_closed', _}. |
369 |
|
activate_socket(#state{socket = Socket, sockmod = gen_tcp}) -> |
370 |
482 |
inet:setopts(Socket, [{active, once}]), |
371 |
482 |
PeerName = inet:peername(Socket), |
372 |
482 |
resolve_peername(PeerName, Socket); |
373 |
|
activate_socket(#state{socket = Socket, sockmod = mongoose_tls}) -> |
374 |
259 |
mongoose_tls:setopts(Socket, [{active, once}]), |
375 |
259 |
PeerName = mongoose_tls:peername(Socket), |
376 |
259 |
resolve_peername(PeerName, Socket). |
377 |
|
|
378 |
|
resolve_peername({ok, _}, _Socket) -> |
379 |
738 |
ok; |
380 |
|
resolve_peername({error, _Reason}, Socket) -> |
381 |
3 |
self() ! {tcp_closed, Socket}. |
382 |
|
|
383 |
|
-spec deactivate_socket(state()) -> 'ok' | {'error', _}. |
384 |
|
deactivate_socket(#state{socket = Socket, sockmod = gen_tcp}) -> |
385 |
39 |
inet:setopts(Socket, [{active, false}]); |
386 |
|
deactivate_socket(#state{socket = Socket, sockmod = mongoose_tls}) -> |
387 |
:-( |
mongoose_tls:setopts(Socket, [{active, false}]). |
388 |
|
|
389 |
|
-spec process_data(binary(), state()) -> state(). |
390 |
|
process_data(Data, #state{parser = Parser, |
391 |
|
shaper_state = ShaperState, |
392 |
|
dest_pid = DestPid} = State) -> |
393 |
616 |
?LOG_DEBUG(#{what => received_xml_on_stream, packet => Data, dest_pid => DestPid}), |
394 |
616 |
Size = byte_size(Data), |
395 |
616 |
{Events, NewParser} = |
396 |
|
case exml_stream:parse(Parser, Data) of |
397 |
|
{ok, NParser, Elems} -> |
398 |
615 |
{[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser}; |
399 |
|
{error, Reason} -> |
400 |
1 |
{[{xmlstreamerror, Reason}], Parser} |
401 |
|
end, |
402 |
616 |
{NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size), |
403 |
616 |
update_transport_metrics(Size, received, State#state.connection_type), |
404 |
616 |
[gen_fsm_compat:send_event(DestPid, Event) || Event <- Events], |
405 |
616 |
maybe_pause(Pause, State), |
406 |
616 |
State#state{parser = NewParser, shaper_state = NewShaperState}. |
407 |
|
|
408 |
|
wrap_xml_elements_and_update_metrics(#xmlel{} = E) -> |
409 |
311 |
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)), |
410 |
311 |
{xmlstreamelement, E}; |
411 |
|
wrap_xml_elements_and_update_metrics(E) -> |
412 |
170 |
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)), |
413 |
170 |
E. |
414 |
|
|
415 |
|
-spec update_transport_metrics(non_neg_integer(), |
416 |
|
sent | received, |
417 |
|
connection_type()) -> ok. |
418 |
|
update_transport_metrics(_Size, _Action, undefined) -> |
419 |
:-( |
ok; |
420 |
|
update_transport_metrics(Size, Action, ConnectionType) -> |
421 |
1126 |
mongoose_metrics:update(global, [data, xmpp, Action, ConnectionType], Size), |
422 |
1126 |
ok. |
423 |
|
|
424 |
|
-spec maybe_pause(Delay :: non_neg_integer(), state()) -> any(). |
425 |
|
maybe_pause(_, #state{dest_pid = undefined}) -> |
426 |
:-( |
ok; |
427 |
|
maybe_pause(Pause, _State) when Pause > 0 -> |
428 |
:-( |
erlang:start_timer(Pause, self(), activate); |
429 |
|
maybe_pause(_, State) -> |
430 |
616 |
activate_socket(State). |
431 |
|
|
432 |
|
-spec new_parser(stanza_size()) -> exml_stream:parser(). |
433 |
|
new_parser(MaxStanzaSize) -> |
434 |
125 |
MaxSize = case MaxStanzaSize of |
435 |
42 |
infinity -> 0; |
436 |
83 |
_ -> MaxStanzaSize |
437 |
|
end, |
438 |
125 |
{ok, NewParser} = exml_stream:new_parser([{max_element_size, MaxSize}]), |
439 |
125 |
NewParser. |
440 |
|
|
441 |
|
-spec reset_parser(state()) -> state(). |
442 |
|
reset_parser(#state{parser = Parser} = State) -> |
443 |
164 |
{ok, NewParser} = exml_stream:reset_parser(Parser), |
444 |
164 |
State#state{parser = NewParser}. |
445 |
|
|
446 |
|
-spec free_parser(exml_stream:parser()) -> ok. |
447 |
|
free_parser(Parser) -> |
448 |
125 |
exml_stream:free_parser(Parser). |
449 |
|
|
450 |
|
-spec hibernate() -> hibernate | infinity. |
451 |
|
hibernate() -> |
452 |
837 |
case process_info(self(), message_queue_len) of |
453 |
637 |
{_, 0} -> hibernate; |
454 |
200 |
_ -> infinity |
455 |
|
end. |
456 |
|
|
457 |
|
-spec hibernate_or_timeout(state()) -> hibernate | infinity | pos_integer(). |
458 |
837 |
hibernate_or_timeout(#state{hibernate_after = 0}) -> hibernate(); |
459 |
:-( |
hibernate_or_timeout(#state{hibernate_after = HA}) -> HA. |
460 |
|
|
461 |
|
-spec shutdown_socket_and_wait_for_peer_to_close(socket(), socket_module() ) -> ok. |
462 |
|
%% gen_tcp:close/2, but trying to ensure that all data is received by peer. |
463 |
|
%% |
464 |
|
%% This is based on tls_connection:workaround_transport_delivery_problems/2 code |
465 |
|
%% https://github.com/erlang/otp/blob/OTP_17.0-rc2/lib/ssl/src/tls_connection.erl#L959 |
466 |
|
%% |
467 |
|
%% There are some more docs why we need it in http://erlang.org/doc/man/gen_tcp.html#close-1 |
468 |
|
shutdown_socket_and_wait_for_peer_to_close(Socket, gen_tcp) -> |
469 |
|
%% Standard trick to try to make sure all |
470 |
|
%% data sent to the tcp port is really delivered to the |
471 |
|
%% peer application before tcp port is closed so that the peer will |
472 |
|
%% get the correct stream end and not only a transport close. |
473 |
86 |
inet:setopts(Socket, [{active, false}]), |
474 |
86 |
gen_tcp:shutdown(Socket, write), |
475 |
|
%% Will return when other side has closed or after 30 s |
476 |
|
%% e.g. we do not want to hang if something goes wrong |
477 |
|
%% with the network but we want to maximise the odds that |
478 |
|
%% peer application gets all data sent on the tcp connection. |
479 |
86 |
gen_tcp:recv(Socket, 0, 30000); |
480 |
|
shutdown_socket_and_wait_for_peer_to_close(Socket, mongoose_tls) -> |
481 |
39 |
mongoose_tls:close(Socket). |