./ct_report/coverage/mongoose_transport.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_transport.erl
3 %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com>
4 %%% Purpose : transport module for s2s and components connection
5 %%% Created : 18 Jan 2017
6 %%%----------------------------------------------------------------------
7
8 -module(mongoose_transport).
9 -author('piotr.nosek@erlang-solutions.com').
10
11 -include("mongoose.hrl").
12 -include("jlib.hrl").
13 -include_lib("public_key/include/public_key.hrl").
14
15 -behaviour(gen_server).
16
17 %%----------------------------------------------------------------------
18 %% Types
19 %%----------------------------------------------------------------------
20
21 -type send_xml_input() :: {xmlstreamelement, exml:element()}
22 | jlib:xmlstreamstart()
23 | jlib:xmlstreamend().
24 -type peer() :: {inet:ip_address(), inet:port_number()}.
25 -type peername_return() :: {ok, peer()} | {error, inet:posix()}.
26 -type peercert_return() :: no_peer_cert | {ok, #'Certificate'{}}.
27
28 -type stanza_size() :: pos_integer() | infinity.
29 -type connection_type() :: s2s | component | undefined.
30
31 -type options() :: #{max_stanza_size := stanza_size(),
32 hibernate_after := non_neg_integer(),
33 channel => connection_type(),
34 atom() => any()}.
35
36 -export_type([socket_data/0, send_xml_input/0, peer/0, peername_return/0, peercert_return/0]).
37
38 -type socket_module() :: gen_tcp | mongoose_tls.
39 -type socket() :: gen_tcp:socket() | mongoose_tls:socket().
40
41 -record(socket_data, {sockmod = gen_tcp :: socket_module(),
42 socket :: term(),
43 receiver :: pid(),
44 connection_type :: connection_type(),
45 connection_details :: mongoose_tcp_listener:connection_details()
46 }).
47
48 -type socket_data() :: #socket_data{}.
49
50 -record(state, {socket :: socket(),
51 sockmod = gen_tcp :: socket_module(),
52 shaper_state :: mongoose_shaper:shaper(),
53 dest_pid :: undefined | pid(), %% gen_fsm_compat pid
54 max_stanza_size :: stanza_size(),
55 parser :: exml_stream:parser(),
56 connection_type :: connection_type(),
57 hibernate_after = 0 :: non_neg_integer()}).
58 -type state() :: #state{}.
59
60 %% transport API
61 -export([accept/4, connect/5, close/1, send_text/2, send_element/2]).
62 -export([wait_for_tls_handshake/2, wait_for_tls_handshake/3,
63 connect_tls/2, get_peer_certificate/1]).
64 -export([monitor/1, peername/1, change_shaper/2]).
65 -export([get_all_trasport_processes/0]).
66
67 %% gen_server API
68 -export([start_link/3, init/1, terminate/2,
69 handle_cast/2, handle_call/3, handle_info/2]).
70
71 -ignore_xref([start_link/3, get_all_trasport_processes/0, wait_for_tls_handshake/2]).
72
73 %%----------------------------------------------------------------------
74 %% Transport API
75 %%----------------------------------------------------------------------
76
77 -spec accept(module(), gen_tcp:socket(),
78 mongoose_tcp_listener:options(),
79 mongoose_tcp_listener:connection_details()) -> ok.
80 accept(Module, Socket, Opts, ConnectionDetails) ->
81 83 Receiver = start_child(Socket, none, Opts),
82 83 ConnectionType = maps:get(connection_type, Opts),
83 83 SocketData = #socket_data{sockmod = gen_tcp,
84 socket = Socket,
85 receiver = Receiver,
86 connection_type = ConnectionType,
87 connection_details = ConnectionDetails},
88 83 case gen_tcp:controlling_process(Socket, Receiver) of
89 ok ->
90 83 case Module:start(SocketData, Opts) of
91 {ok, DestPid} ->
92 83 set_dest_pid(Receiver, DestPid);
93 {error, _Reason} ->
94
:-(
gen_tcp:close(Socket),
95
:-(
close(SocketData)
96 end;
97 {error, _Reason} ->
98
:-(
gen_tcp:close(Socket)
99 end.
100
101 -spec connect(ConnectionType :: connection_type(),
102 Addr :: atom() | string() | inet:ip_address(),
103 Port :: inet:port_number(),
104 Opts :: [gen_tcp:connect_option()],
105 Timeout :: non_neg_integer() | infinity
106 ) -> {'error', atom()} | {'ok', socket_data()}.
107 connect(ConnectionType, Addr, Port, Opts, Timeout) ->
108 42 case gen_tcp:connect(Addr, Port, Opts, Timeout) of
109 {ok, Socket} ->
110 %% Receiver options are configurable only for listeners
111 %% It might make sense to make them configurable for
112 %% outgoing s2s connections as well
113 42 ReceiverOpts = #{max_stanza_size => infinity,
114 hibernate_after => 0,
115 connection_type => ConnectionType},
116 42 Receiver = start_child(Socket, none, ReceiverOpts),
117 42 {SrcAddr, SrcPort} = case inet:sockname(Socket) of
118 42 {ok, {A, P}} -> {A, P};
119
:-(
{error, _} -> {unknown, unknown}
120 end,
121 42 ConnectionDetails = #{dest_address => Addr, dest_port => Port, proxy => false,
122 src_address => SrcAddr, src_port => SrcPort},
123 42 SocketData = #socket_data{sockmod = gen_tcp,
124 socket = Socket,
125 receiver = Receiver,
126 connection_type = ConnectionType,
127 connection_details = ConnectionDetails},
128 42 DestPid = self(),
129 42 case gen_tcp:controlling_process(Socket, Receiver) of
130 ok ->
131 42 set_dest_pid(Receiver, DestPid),
132 42 {ok, SocketData};
133 {error, _Reason} = Error ->
134
:-(
gen_tcp:close(Socket),
135
:-(
Error
136 end;
137 {error, _Reason} = Error ->
138
:-(
Error
139 end.
140
141 -spec close(socket_data()) -> ok.
142 close(#socket_data{receiver = Receiver}) ->
143 110 gen_server:cast(Receiver, close).
144
145 -spec wait_for_tls_handshake(socket_data(), mongoose_tls:options()) -> socket_data().
146 wait_for_tls_handshake(#socket_data{receiver = Receiver} = SocketData, TLSOpts) ->
147
:-(
tcp_to_tls(Receiver, TLSOpts#{connect => false}),
148
:-(
update_socket(SocketData).
149
150 -spec wait_for_tls_handshake(socket_data(), mongoose_tls:options(), exml:element()) ->
151 socket_data().
152 wait_for_tls_handshake(#socket_data{receiver = Receiver} = SocketData, TLSOpts, El) ->
153 22 tcp_to_tls(Receiver, TLSOpts#{connect => false}),
154 22 send_element(SocketData, El), %% send last negotiation chunk via tcp
155 22 update_socket(SocketData).
156
157 -spec connect_tls(socket_data(), mongoose_tls:options()) -> socket_data().
158 connect_tls(#socket_data{receiver = Receiver} = SocketData, TLSOpts) ->
159 17 tcp_to_tls(Receiver, TLSOpts#{connect => true}),
160 17 update_socket(SocketData).
161
162 -spec send_text(socket_data(), binary()) -> ok.
163 send_text(SocketData, Data) ->
164 510 #socket_data{sockmod = SockMod, socket = Socket,
165 connection_type = ConnectionType} = SocketData,
166 510 case catch SockMod:send(Socket, Data) of
167 ok ->
168 510 update_transport_metrics(byte_size(Data), sent, ConnectionType),
169 510 ok;
170 {error, timeout} ->
171
:-(
?LOG_INFO(#{what => socket_error, reason => timeout,
172
:-(
socket => SockMod}),
173
:-(
exit(normal);
174 Error ->
175
:-(
?LOG_INFO(#{what => socket_error, reason => Error,
176
:-(
socket => SockMod}),
177
:-(
exit(normal)
178 end.
179
180
181 -spec send_element(socket_data(), exml:element()) -> ok.
182 send_element(SocketData, El) ->
183 331 BinEl = exml:to_binary(El),
184 331 mongoose_metrics:update(global, [data, xmpp, sent, xml_stanza_size], byte_size(BinEl)),
185 331 send_text(SocketData, BinEl).
186
187 -spec get_peer_certificate(socket_data()) -> mongoose_tls:cert().
188 get_peer_certificate(#socket_data{sockmod = mongoose_tls, socket = Socket}) ->
189 25 mongoose_tls:get_peer_certificate(Socket);
190 get_peer_certificate(_SocketData) ->
191
:-(
no_peer_cert.
192
193 -spec peername(socket_data()) -> mongoose_transport:peername_return().
194 peername(#socket_data{connection_details = #{src_address := SrcAddr,
195 src_port := SrcPort}}) ->
196 3 {ok, {SrcAddr, SrcPort}}.
197
198 -spec monitor(socket_data()) -> reference().
199 monitor(#socket_data{receiver = Receiver}) ->
200 31 erlang:monitor(process, Receiver).
201
202 -spec change_shaper(socket_data(), _) -> any().
203 change_shaper(#socket_data{receiver = Receiver}, Shaper) ->
204 57 gen_server:cast(Receiver, {change_shaper, Shaper}).
205
206 get_all_trasport_processes() ->
207
:-(
Connections = supervisor:which_children(mongoose_transport_sup),
208
:-(
get_transport_info(Connections).
209 %%----------------------------------------------------------------------
210 %% gen_server interfaces
211 %%----------------------------------------------------------------------
212 -spec start_link(port(), atom(), options()) ->
213 ignore | {error, _} | {ok, pid()}.
214 start_link(Socket, Shaper, Opts) ->
215 125 gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []).
216
217 init([Socket, Shaper, Opts]) ->
218 125 ShaperState = mongoose_shaper:new(Shaper),
219 125 #{max_stanza_size := MaxStanzaSize,
220 hibernate_after := HibernateAfter,
221 connection_type := ConnectionType} = Opts,
222 125 Parser = new_parser(MaxStanzaSize),
223 125 {ok, #state{socket = Socket,
224 shaper_state = ShaperState,
225 max_stanza_size = MaxStanzaSize,
226 connection_type = ConnectionType,
227 parser = Parser,
228 hibernate_after = HibernateAfter}}.
229
230 handle_call(get_socket, _From, #state{socket = Socket} = State) ->
231 39 {reply, {ok, Socket}, State, hibernate_or_timeout(State)};
232 handle_call({tcp_to_tls, TLSOpts}, From, #state{socket = TCPSocket} = State0) ->
233 %% the next message from client is part of TLS handshake, it must
234 %% be handled by TLS library (another process in case of just_tls)
235 %% so deactivating the socket.
236 39 deactivate_socket(State0),
237 %% TLS handshake always starts from client's request, let
238 %% server finish starttls negotiation and notify client
239 %% that it can start TLS handshake.
240 39 gen_server:reply(From, ok),
241 39 case mongoose_tls:tcp_to_tls(TCPSocket, TLSOpts) of
242 {ok, TLSSocket} ->
243 39 State1 = reset_parser(State0),
244 39 State2 = State1#state{socket = TLSSocket, sockmod = mongoose_tls},
245 %% fast_tls requires dummy recv_data/2 call to accomplish TLS
246 %% handshake. such call is simply ignored by just_tls backend.
247 39 case mongoose_tls:recv_data(TLSSocket, <<>>) of
248 {ok, TLSData} ->
249 39 State3 = process_data(TLSData, State2),
250 39 {noreply, State3, hibernate_or_timeout(State3)};
251 {error, Reason} ->
252
:-(
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason,
253
:-(
dest_pid => State2#state.dest_pid}),
254
:-(
{stop, normal, State2}
255 end;
256 {error, Reason} ->
257
:-(
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason,
258
:-(
dest_pid => State0#state.dest_pid}),
259
:-(
{stop, normal, State0}
260 end;
261 handle_call({set_dest_pid, DestPid}, _From, #state{dest_pid = undefined} = State) ->
262 125 StateAfterReset = reset_parser(State),
263 125 NewState = StateAfterReset#state{dest_pid = DestPid},
264 125 activate_socket(NewState),
265 125 {reply, ok, NewState, hibernate_or_timeout(NewState)};
266 handle_call(_Request, _From, State) ->
267
:-(
{reply, ok, State, hibernate_or_timeout(State)}.
268
269 handle_cast({change_shaper, Shaper}, State) ->
270 57 NewShaperState = mongoose_shaper:new(Shaper),
271 57 NewState = State#state{shaper_state = NewShaperState},
272 57 {noreply, NewState, hibernate_or_timeout(NewState)};
273 handle_cast(close, State) ->
274 50 {stop, normal, State};
275 handle_cast(_Msg, State) ->
276
:-(
{noreply, State, hibernate_or_timeout(State)}.
277
278 handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) ->
279 357 NewState = process_data(Data, State),
280 357 {noreply, NewState, hibernate_or_timeout(NewState)};
281 handle_info({Tag, _TCPSocket, Data},
282 #state{socket = Socket,
283 sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl ->
284 221 case mongoose_tls:recv_data(Socket, Data) of
285 {ok, TLSData} ->
286 221 NewState = process_data(TLSData, State),
287 221 {noreply, NewState, hibernate_or_timeout(NewState)};
288 {error, Reason} ->
289
:-(
?LOG_WARNING(#{what => transport_tls_recv_error, socket => Socket, reason => Reason}),
290
:-(
{stop, normal, State}
291 end;
292 handle_info({Tag, _Socket}, State) when Tag == tcp_closed; Tag == ssl_closed ->
293 75 {stop, normal, State};
294 handle_info({Tag, _Socket, Reason}, State) when Tag == tcp_error; Tag == ssl_error ->
295
:-(
case Reason of
296 timeout ->
297
:-(
{noreply, State, hibernate_or_timeout(State)};
298 _ ->
299
:-(
{stop, normal, State}
300 end;
301 handle_info({timeout, _Ref, activate}, State) ->
302
:-(
activate_socket(State),
303
:-(
{noreply, State, hibernate_or_timeout(State)};
304 handle_info(timeout, State) ->
305
:-(
{noreply, State, hibernate()};
306 handle_info(_Info, State) ->
307
:-(
{noreply, State, hibernate_or_timeout(State)}.
308
309 terminate(_Reason, #state{parser = Parser, dest_pid = DestPid,
310 socket = Socket, sockmod = SockMod}) ->
311 125 free_parser(Parser),
312 125 case DestPid of
313
:-(
undefined -> ok;
314 125 _ -> gen_fsm_compat:send_event(DestPid, closed)
315 end,
316 125 catch shutdown_socket_and_wait_for_peer_to_close(Socket, SockMod),
317 125 ok.
318
319 %%----------------------------------------------------------------------
320 %% local API helpers
321 %%----------------------------------------------------------------------
322 -spec start_child(port(), atom(), options()) -> pid().
323 start_child(Socket, Shaper, Opts) ->
324 125 {ok, Receiver} = supervisor:start_child(mongoose_transport_sup,
325 [Socket, Shaper, Opts]),
326 125 Receiver.
327
328 get_transport_info(ConnectionList) when is_list(ConnectionList) ->
329
:-(
[get_transport_info(Pid) || {_, Pid, _, _} <- ConnectionList, is_pid(Pid)];
330 get_transport_info(TransportPid) when is_pid(TransportPid) ->
331
:-(
State = sys:get_state(TransportPid),
332
:-(
maps:from_list(lists:zip(record_info(fields, state),tl(tuple_to_list(State)))).
333
334 -spec tcp_to_tls(pid(), mongoose_tls:options()) -> ok | {error, any()}.
335 tcp_to_tls(Receiver, TLSOpts) ->
336 39 gen_server_call_or_noproc(Receiver, {tcp_to_tls, TLSOpts}).
337
338 -spec update_socket(socket_data()) -> socket_data().
339 update_socket(#socket_data{receiver = Receiver} = SocketData) ->
340 39 case gen_server_call_or_noproc(Receiver, get_socket) of
341 {ok, TLSSocket} ->
342 39 SocketData#socket_data{socket = TLSSocket, sockmod = mongoose_tls};
343 {error, _} ->
344
:-(
exit(invalid_socket_after_upgrade_to_tls)
345 end.
346
347 -spec set_dest_pid(pid(), pid()) -> ok | {error, any()}.
348 set_dest_pid(Receiver, DestPid) ->
349 125 gen_server:call(Receiver, {set_dest_pid, DestPid}).
350
351 -spec gen_server_call_or_noproc(pid(), any()) -> Ret :: any() | {error, any()}.
352 gen_server_call_or_noproc(Pid, Message) ->
353 78 try
354 78 gen_server:call(Pid, Message)
355 catch
356 exit:{noproc, Extra} ->
357
:-(
{error, {noproc, Extra}};
358 exit:{normal, Extra} ->
359 % reciver exited with normal status after the gen_server call was sent
360 % but before it was processed
361
:-(
{error, {died, Extra}}
362 end.
363
364 %%--------------------------------------------------------------------
365 %% internal functions
366 %%--------------------------------------------------------------------
367
368 -spec activate_socket(state()) -> 'ok' | {'tcp_closed', _}.
369 activate_socket(#state{socket = Socket, sockmod = gen_tcp}) ->
370 482 inet:setopts(Socket, [{active, once}]),
371 482 PeerName = inet:peername(Socket),
372 482 resolve_peername(PeerName, Socket);
373 activate_socket(#state{socket = Socket, sockmod = mongoose_tls}) ->
374 260 mongoose_tls:setopts(Socket, [{active, once}]),
375 260 PeerName = mongoose_tls:peername(Socket),
376 260 resolve_peername(PeerName, Socket).
377
378 resolve_peername({ok, _}, _Socket) ->
379 739 ok;
380 resolve_peername({error, _Reason}, Socket) ->
381 3 self() ! {tcp_closed, Socket}.
382
383 -spec deactivate_socket(state()) -> 'ok' | {'error', _}.
384 deactivate_socket(#state{socket = Socket, sockmod = gen_tcp}) ->
385 39 inet:setopts(Socket, [{active, false}]);
386 deactivate_socket(#state{socket = Socket, sockmod = mongoose_tls}) ->
387
:-(
mongoose_tls:setopts(Socket, [{active, false}]).
388
389 -spec process_data(binary(), state()) -> state().
390 process_data(Data, #state{parser = Parser,
391 shaper_state = ShaperState,
392 dest_pid = DestPid} = State) ->
393 617 ?LOG_DEBUG(#{what => received_xml_on_stream, packet => Data, dest_pid => DestPid}),
394 617 Size = byte_size(Data),
395 617 {Events, NewParser} =
396 case exml_stream:parse(Parser, Data) of
397 {ok, NParser, Elems} ->
398 616 {[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser};
399 {error, Reason} ->
400 1 {[{xmlstreamerror, Reason}], Parser}
401 end,
402 617 {NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size),
403 617 update_transport_metrics(Size, received, State#state.connection_type),
404 617 [gen_fsm_compat:send_event(DestPid, Event) || Event <- Events],
405 617 maybe_pause(Pause, State),
406 617 State#state{parser = NewParser, shaper_state = NewShaperState}.
407
408 wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
409 310 mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
410 310 {xmlstreamelement, E};
411 wrap_xml_elements_and_update_metrics(E) ->
412 170 mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
413 170 E.
414
415 -spec update_transport_metrics(non_neg_integer(),
416 sent | received,
417 connection_type()) -> ok.
418 update_transport_metrics(_Size, _Action, undefined) ->
419
:-(
ok;
420 update_transport_metrics(Size, Action, ConnectionType) ->
421 1127 mongoose_metrics:update(global, [data, xmpp, Action, ConnectionType], Size),
422 1127 ok.
423
424 -spec maybe_pause(Delay :: non_neg_integer(), state()) -> any().
425 maybe_pause(_, #state{dest_pid = undefined}) ->
426
:-(
ok;
427 maybe_pause(Pause, _State) when Pause > 0 ->
428
:-(
erlang:start_timer(Pause, self(), activate);
429 maybe_pause(_, State) ->
430 617 activate_socket(State).
431
432 -spec new_parser(stanza_size()) -> exml_stream:parser().
433 new_parser(MaxStanzaSize) ->
434 125 MaxSize = case MaxStanzaSize of
435 42 infinity -> 0;
436 83 _ -> MaxStanzaSize
437 end,
438 125 {ok, NewParser} = exml_stream:new_parser([{max_element_size, MaxSize}]),
439 125 NewParser.
440
441 -spec reset_parser(state()) -> state().
442 reset_parser(#state{parser = Parser} = State) ->
443 164 {ok, NewParser} = exml_stream:reset_parser(Parser),
444 164 State#state{parser = NewParser}.
445
446 -spec free_parser(exml_stream:parser()) -> ok.
447 free_parser(Parser) ->
448 125 exml_stream:free_parser(Parser).
449
450 -spec hibernate() -> hibernate | infinity.
451 hibernate() ->
452 838 case process_info(self(), message_queue_len) of
453 642 {_, 0} -> hibernate;
454 196 _ -> infinity
455 end.
456
457 -spec hibernate_or_timeout(state()) -> hibernate | infinity | pos_integer().
458 838 hibernate_or_timeout(#state{hibernate_after = 0}) -> hibernate();
459
:-(
hibernate_or_timeout(#state{hibernate_after = HA}) -> HA.
460
461 -spec shutdown_socket_and_wait_for_peer_to_close(socket(), socket_module() ) -> ok.
462 %% gen_tcp:close/2, but trying to ensure that all data is received by peer.
463 %%
464 %% This is based on tls_connection:workaround_transport_delivery_problems/2 code
465 %% https://github.com/erlang/otp/blob/OTP_17.0-rc2/lib/ssl/src/tls_connection.erl#L959
466 %%
467 %% There are some more docs why we need it in http://erlang.org/doc/man/gen_tcp.html#close-1
468 shutdown_socket_and_wait_for_peer_to_close(Socket, gen_tcp) ->
469 %% Standard trick to try to make sure all
470 %% data sent to the tcp port is really delivered to the
471 %% peer application before tcp port is closed so that the peer will
472 %% get the correct stream end and not only a transport close.
473 86 inet:setopts(Socket, [{active, false}]),
474 86 gen_tcp:shutdown(Socket, write),
475 %% Will return when other side has closed or after 30 s
476 %% e.g. we do not want to hang if something goes wrong
477 %% with the network but we want to maximise the odds that
478 %% peer application gets all data sent on the tcp connection.
479 86 gen_tcp:recv(Socket, 0, 30000);
480 shutdown_socket_and_wait_for_peer_to_close(Socket, mongoose_tls) ->
481 39 mongoose_tls:close(Socket).
Line Hits Source