./ct_report/coverage/ejabberd_receiver.COVER.html

1 %%%----------------------------------------------------------------------
2 %% File : ejabberd_receiver.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Socket receiver for C2S and S2S connections
5 %%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(ejabberd_receiver).
27 -author('alexey@process-one.net').
28
29 -behaviour(gen_server).
30
31 %% API
32 -export([start_link/4,
33 start/4,
34 change_shaper/2,
35 starttls/2,
36 get_socket/1,
37 compress/2,
38 become_controller/2,
39 close/1]).
40
41 %% gen_server callbacks
42 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
43 terminate/2, code_change/3]).
44
45 -ignore_xref([start_link/4]).
46
47 -include("mongoose.hrl").
48 -include("jlib.hrl").
49
50 -record(state, {socket,
51 sock_mod,
52 shaper_state,
53 c2s_pid,
54 max_stanza_size,
55 stanza_chunk_size,
56 parser,
57 timeout,
58 hibernate_after = 0 :: non_neg_integer()}).
59 -type state() :: #state{}.
60
61 -type options() :: #{max_stanza_size := pos_integer() | infinity,
62 hibernate_after := non_neg_integer(),
63 atom() => any()}.
64
65 %%====================================================================
66 %% API
67 %%====================================================================
68 %%--------------------------------------------------------------------
69 %% Function: start_link() -> {ok, Pid} | ignore | {error, Error}
70 %% Description: Starts the server
71 %%--------------------------------------------------------------------
72 -spec start_link(port(), ejabberd:sockmod(), atom(), options()) ->
73 ignore | {error, _} | {ok, pid()}.
74 start_link(Socket, SockMod, Shaper, Opts) ->
75 5502 gen_server:start_link(?MODULE, [Socket, SockMod, Shaper, Opts], []).
76
77 -spec start(port(), ejabberd:sockmod(), atom(), options()) -> pid().
78 start(Socket, SockMod, Shaper, Opts) ->
79 5502 {ok, Pid} = supervisor:start_child(ejabberd_receiver_sup,
80 [Socket, SockMod, Shaper, Opts]),
81 5502 Pid.
82
83 -spec change_shaper(atom() | pid() | {atom(), _} | {'via', _, _}, _) -> 'ok'.
84 change_shaper(Pid, Shaper) ->
85 13428 gen_server:cast(Pid, {change_shaper, Shaper}).
86
87 starttls(Pid, TLSOpts) ->
88 1055 gen_server_call_or_noproc(Pid, {starttls, TLSOpts}).
89
90 get_socket(Pid) ->
91 1053 gen_server_call_or_noproc(Pid, get_socket).
92
93 compress(Pid, ZlibSocket) ->
94 10 gen_server_call_or_noproc(Pid, {compress, ZlibSocket}).
95
96 become_controller(Pid, C2SPid) ->
97 5052 gen_server:call(Pid, {become_controller, C2SPid}).
98
99 -spec close(atom() | pid() | {atom(), _} | {'via', _, _}) -> 'ok'.
100 close(Pid) ->
101 5508 gen_server:cast(Pid, close).
102
103 %%====================================================================
104 %% gen_server callbacks
105 %%====================================================================
106
107 %%--------------------------------------------------------------------
108 %% Function: init(Args) -> {ok, State} |
109 %% {ok, State, Timeout} |
110 %% ignore |
111 %% {stop, Reason}
112 %% Description: Initiates the server
113 %%--------------------------------------------------------------------
114 -spec init([any(), ...]) -> {'ok', state()}.
115 init([Socket, SockMod, Shaper, Opts]) ->
116 5502 ShaperState = shaper:new(Shaper),
117 5502 #{max_stanza_size := MaxStanzaSize, hibernate_after := HibernateAfter} = Opts,
118 5502 Timeout = case SockMod of
119
:-(
ssl -> 20;
120 5502 _ -> infinity
121 end,
122 5502 {ok, #state{socket = Socket,
123 sock_mod = SockMod,
124 shaper_state = ShaperState,
125 max_stanza_size = MaxStanzaSize,
126 stanza_chunk_size = 0,
127 timeout = Timeout,
128 hibernate_after = HibernateAfter}}.
129
130 %%--------------------------------------------------------------------
131 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
132 %% {reply, Reply, State, Timeout} |
133 %% {noreply, State} |
134 %% {noreply, State, Timeout} |
135 %% {stop, Reason, Reply, State} |
136 %% {stop, Reason, State}
137 %% Description: Handling call messages
138 %%--------------------------------------------------------------------
139 handle_call(get_socket, _From, #state{socket = Socket} = State) ->
140 600 {reply, {ok, Socket}, State, maybe_hibernate(State)};
141 handle_call({starttls, TLSOpts}, From, #state{socket = TCPSocket} = State) ->
142 %% the next message from client is part of TLS handshake, it must
143 %% be handled by TLS library (another process in case of just_tls)
144 %% so deactivating the socket.
145 1053 deactivate_socket(State),
146 %% TLS handshake always starts from client's request, let
147 %% ejabberd_socket finish starttls negotiation and notify
148 %% client that it can start TLS handshake.
149 1053 gen_server:reply(From, ok),
150 1053 case ejabberd_tls:tcp_to_tls(TCPSocket, TLSOpts) of
151 {ok, TLSSocket} ->
152 600 StateAfterReset = reset_parser(State),
153 600 NewState = StateAfterReset#state{socket = TLSSocket,
154 sock_mod = ejabberd_tls},
155 %% fast_tls requires dummy recv_data/2 call to accomplish TLS
156 %% handshake. such call is simply ignored by just_tls backend.
157 600 case ejabberd_tls:recv_data(TLSSocket, <<"">>) of
158 {ok, TLSData} ->
159 600 NewState2 = process_data(TLSData, NewState),
160 600 {noreply, NewState2, maybe_hibernate(NewState2)};
161 {error, Reason} ->
162
:-(
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason,
163
:-(
c2s_pid => State#state.c2s_pid}),
164
:-(
{stop, normal, NewState}
165 end;
166 {error, Reason} ->
167 453 ?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason,
168
:-(
c2s_pid => State#state.c2s_pid}),
169 453 {stop, normal, State}
170 end;
171 handle_call({compress, ZlibSocket}, _From,
172 #state{c2s_pid = C2SPid} = State) ->
173 10 StateAfterReset = reset_parser(State),
174 10 NewState = StateAfterReset#state{socket = ZlibSocket,
175 sock_mod = ejabberd_zlib},
176 10 case ejabberd_zlib:recv_data(ZlibSocket, "") of
177 {ok, ZlibData} ->
178 10 NewState2 = process_data(ZlibData, NewState),
179 10 {reply, ok, NewState2, maybe_hibernate(NewState2)};
180 {error, inflate_size_exceeded} ->
181
:-(
apply(gen_fsm(), send_event,
182 [C2SPid, {xmlstreamerror, <<"child element too big">>}]),
183
:-(
{reply, ok, NewState, maybe_hibernate(NewState)};
184 {error, inflate_error} ->
185
:-(
{stop, normal, ok, NewState}
186 end;
187 handle_call({become_controller, C2SPid}, _From, State) ->
188 5052 StateAfterReset = reset_parser(State),
189 5052 NewState = StateAfterReset#state{c2s_pid = C2SPid},
190 5052 activate_socket(NewState),
191 5052 Reply = ok,
192 5052 {reply, Reply, NewState, maybe_hibernate(NewState)};
193 handle_call(_Request, _From, State) ->
194
:-(
Reply = ok,
195
:-(
{reply, Reply, State, maybe_hibernate(State)}.
196
197 %%--------------------------------------------------------------------
198 %% Function: handle_cast(Msg, State) -> {noreply, State} |
199 %% {noreply, State, Timeout} |
200 %% {stop, Reason, State}
201 %% Description: Handling cast messages
202 %%--------------------------------------------------------------------
203 handle_cast({change_shaper, Shaper}, State) ->
204 13428 NewShaperState = shaper:new(Shaper),
205 13428 NewState = State#state{shaper_state = NewShaperState},
206 13428 {noreply, NewState, maybe_hibernate(NewState)};
207 handle_cast(close, State) ->
208 899 {stop, normal, State};
209 handle_cast(_Msg, State) ->
210
:-(
{noreply, State, maybe_hibernate(State)}.
211
212 %%--------------------------------------------------------------------
213 %% Function: handle_info(Info, State) -> {noreply, State} |
214 %% {noreply, State, Timeout} |
215 %% {stop, Reason, State}
216 %% Description: Handling all non call/cast messages
217 %%--------------------------------------------------------------------
218 handle_info({Tag, _TCPSocket, Data},
219 #state{socket = Socket,
220 c2s_pid = C2SPid,
221 sock_mod = SockMod} = State)
222 when (Tag == tcp) or (Tag == ssl) ->
223 39011 case SockMod of
224 ejabberd_tls ->
225 1130 mongoose_metrics:update(global,
226 [data, xmpp, received, encrypted_size], size(Data)),
227 1130 case ejabberd_tls:recv_data(Socket, Data) of
228 {ok, TLSData} ->
229 718 NewState = process_data(TLSData, State),
230 718 {noreply, NewState, maybe_hibernate(NewState)};
231 {error, _Reason} ->
232 412 {stop, normal, State}
233 end;
234 ejabberd_zlib ->
235 29 mongoose_metrics:update(global,
236 [data, xmpp, received, compressed_size], size(Data)),
237 29 case ejabberd_zlib:recv_data(Socket, Data) of
238 {ok, ZlibData} ->
239 29 NewState = process_data(ZlibData, State),
240 29 {noreply, NewState, maybe_hibernate(NewState)};
241 {error, inflate_size_exceeded} ->
242
:-(
apply(gen_fsm(), send_event,
243 [C2SPid, {xmlstreamerror, <<"child element too big">>}]),
244
:-(
{noreply, State, maybe_hibernate(State)};
245 {error, inflate_error} ->
246
:-(
{stop, normal, State}
247 end;
248 _ ->
249 37852 NewState = process_data(Data, State),
250 37852 {noreply, NewState, maybe_hibernate(NewState)}
251 end;
252 handle_info({Tag, _TCPSocket}, State)
253 when (Tag == tcp_closed) or (Tag == ssl_closed) ->
254 3738 {stop, normal, State};
255 handle_info({Tag, _TCPSocket, Reason}, State)
256 when (Tag == tcp_error) or (Tag == ssl_error) ->
257
:-(
case Reason of
258 timeout ->
259
:-(
{noreply, State, maybe_hibernate(State)};
260 _ ->
261
:-(
{stop, normal, State}
262 end;
263 handle_info({timeout, _Ref, activate}, State) ->
264 398 activate_socket(State),
265 398 {noreply, State, maybe_hibernate(State)};
266 handle_info(timeout, State) ->
267
:-(
{noreply, State, hibernate()};
268 handle_info(_Info, State) ->
269
:-(
{noreply, State, maybe_hibernate(State)}.
270
271 %%--------------------------------------------------------------------
272 %% Function: terminate(Reason, State) -> void()
273 %% Description: This function is called by a gen_server when it is about to
274 %% terminate. It should be the opposite of Module:init/1 and do any necessary
275 %% cleaning up. When it returns, the gen_server terminates with Reason.
276 %% The return value is ignored.
277 %%--------------------------------------------------------------------
278 terminate(_Reason, #state{parser = Parser,
279 c2s_pid = C2SPid} = State) ->
280 5502 free_parser(Parser),
281 5502 case C2SPid of
282 450 undefined -> ok;
283 5052 _ -> gen_fsm_compat:send_event(C2SPid, closed)
284 end,
285 5502 catch shutdown_socket_and_wait_for_peer_to_close(State#state.socket, State#state.sock_mod),
286 5502 ok.
287
288 %%--------------------------------------------------------------------
289 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
290 %% Description: Convert process state when code is changed
291 %%--------------------------------------------------------------------
292 code_change(_OldVsn, State, _Extra) ->
293
:-(
{ok, State}.
294
295 %%--------------------------------------------------------------------
296 %%% Internal functions
297 %%--------------------------------------------------------------------
298
299 -spec activate_socket(state()) -> 'ok' | {'tcp_closed', _}.
300 activate_socket(#state{socket = Socket, sock_mod = gen_tcp}) ->
301 42256 inet:setopts(Socket, [{active, once}]),
302 42256 PeerName = inet:peername(Socket),
303 42256 resolve_peername(PeerName, Socket);
304 activate_socket(#state{socket = Socket, sock_mod = SockMod}) ->
305 1357 SockMod:setopts(Socket, [{active, once}]),
306 1357 PeerName = SockMod:peername(Socket),
307 1357 resolve_peername(PeerName, Socket).
308
309 resolve_peername({ok, _}, _Socket) ->
310 43472 ok;
311 resolve_peername({error, _Reason}, Socket) ->
312 141 self() ! {tcp_closed, Socket}.
313
314 -spec deactivate_socket(state()) -> 'ok' | {'error', _}.
315 deactivate_socket(#state{socket = Socket,
316 sock_mod = SockMod}) ->
317 1053 case SockMod of
318 1053 gen_tcp -> inet:setopts(Socket, [{active, false}]);
319
:-(
_ -> SockMod:setopts(Socket, [{active, false}])
320 end.
321
322 %% @doc Data processing for connectors directly generating xmlel in
323 %% Erlang data structure.
324 %% WARNING: Shaper does not work with Erlang data structure.
325 -spec process_data(binary() | maybe_improper_list(), state()) -> state().
326 process_data([], State) ->
327
:-(
activate_socket(State),
328
:-(
State;
329 process_data(Els, #state{c2s_pid = undefined} = State) when is_list(Els) ->
330
:-(
State;
331 process_data([Element|Els], #state{c2s_pid = C2SPid} = State)
332 when element(1, Element) == xmlel;
333 element(1, Element) == xmlstreamstart;
334 element(1, Element) == xmlstreamelement;
335 element(1, Element) == xmlstreamend ->
336
:-(
catch gen_fsm_compat:send_event(C2SPid, element_wrapper(Element)),
337
:-(
process_data(Els, State);
338 %% Data processing for connectors receivind data as string.
339 process_data(Data, #state{parser = Parser,
340 shaper_state = ShaperState,
341 stanza_chunk_size = ChunkSize,
342 c2s_pid = C2SPid} = State) ->
343 39209 ?LOG_DEBUG(#{what => received_xml_on_stream, packet => Data, c2s_pid => C2SPid}),
344 39209 Size = byte_size(Data),
345 39209 maybe_run_keep_alive_hook(Size, State),
346 39209 {C2SEvents, NewParser} =
347 case exml_stream:parse(Parser, Data) of
348 39205 {ok, NParser, Elems} -> {[wrap_if_xmlel(E) || E <- Elems], NParser};
349 4 {error, Reason} -> {[{xmlstreamerror, Reason}], Parser}
350 end,
351 39209 NewChunkSize = update_stanza_size(C2SEvents, ChunkSize, Size),
352 39209 {NewShaperState, Pause} = shaper:update(ShaperState, Size),
353 39209 [gen_fsm_compat:send_event(C2SPid, Event) || Event <- C2SEvents],
354 39209 maybe_pause(Pause, State),
355 39209 State#state{parser = NewParser, shaper_state = NewShaperState, stanza_chunk_size = NewChunkSize}.
356
357 24960 wrap_if_xmlel(#xmlel{} = E) -> {xmlstreamelement, E};
358 13378 wrap_if_xmlel(E) -> E.
359
360 update_stanza_size([_|_], ChunkSize, Size) ->
361 38220 mongoose_metrics:update(global,
362 [data, xmpp, received, xml_stanza_size], ChunkSize + Size),
363 38220 0;
364 update_stanza_size(_, ChunkSize, Size) ->
365 989 ChunkSize + Size.
366
367 maybe_pause(_, #state{c2s_pid = undefined}) ->
368 514 ok;
369 maybe_pause(Pause, _State) when Pause > 0 ->
370 532 erlang:start_timer(Pause, self(), activate);
371 maybe_pause(_, State) ->
372 38163 activate_socket(State).
373
374 maybe_run_keep_alive_hook(Size, #state{c2s_pid = C2SPid})
375 when Size < 3, is_pid(C2SPid) ->
376 %% yes it can happen that the data is shorter than 3 bytes and contain
377 %% some part of xml but this will not harm the keep_alive_hook
378 472 gen_fsm_compat:send_all_state_event(C2SPid, keep_alive_packet);
379 maybe_run_keep_alive_hook(_, _) ->
380 38737 ok.
381
382 %% @doc Element coming from XML parser are wrapped inside xmlstreamelement
383 %% When we receive directly xmlel tuple (from a socket module
384 %% speaking directly Erlang XML), we wrap it inside the same
385 %% xmlstreamelement coming from the XML parser.
386 -spec element_wrapper(exml:element() | tuple()) -> tuple().
387 element_wrapper(#xmlel{} = XMLElement) ->
388
:-(
{xmlstreamelement, XMLElement};
389 element_wrapper(Element) ->
390
:-(
Element.
391
392 reset_parser(#state{parser = undefined, max_stanza_size = Size} = State) ->
393 5052 MaxSize = case Size of
394
:-(
infinity -> 0;
395 5052 _ -> Size
396 end,
397 5052 {ok, NewParser} = exml_stream:new_parser([{start_tag, <<"stream:stream">>},
398 {max_child_size, MaxSize}]),
399 5052 State#state{parser = NewParser, stanza_chunk_size = 0};
400 reset_parser(#state{parser = Parser} = State) ->
401 610 {ok, NewParser} = exml_stream:reset_parser(Parser),
402 610 State#state{parser = NewParser, stanza_chunk_size = 0}.
403
404 free_parser(undefined) ->
405 450 ok;
406 free_parser(Parser) ->
407 5052 exml_stream:free_parser(Parser).
408
409 gen_server_call_or_noproc(Pid, Message) ->
410 2118 try
411 2118 gen_server:call(Pid, Message)
412 catch
413 exit:{noproc, Extra} ->
414 1 {error, {noproc, Extra}};
415 exit:{normal, Extra} ->
416 % reciver exited with normal status after the gen_server call was sent
417 % but before it was processed
418 454 {error, {died, Extra}}
419 end.
420
421
:-(
gen_fsm() -> p1_fsm.
422
423 -spec hibernate() -> hibernate | infinity.
424 hibernate() ->
425 58687 case process_info(self(), message_queue_len) of
426 53136 {_, 0} -> hibernate;
427 5551 _ -> infinity
428 end.
429
430 -spec maybe_hibernate(state()) -> hibernate | infinity | pos_integer().
431 58687 maybe_hibernate(#state{hibernate_after = 0}) -> hibernate();
432
:-(
maybe_hibernate(#state{hibernate_after = HA}) -> HA.
433
434 %% gen_tcp:close/2, but trying to ensure that all data is received by peer.
435 %%
436 %% This is based on tls_connection:workaround_transport_delivery_problems/2 code
437 %% https://github.com/erlang/otp/blob/OTP_17.0-rc2/lib/ssl/src/tls_connection.erl#L959
438 %%
439 %% There are some more docs why we need it in http://erlang.org/doc/man/gen_tcp.html#close-1
440 shutdown_socket_and_wait_for_peer_to_close(Socket, gen_tcp) ->
441 %% Standard trick to try to make sure all
442 %% data sent to the tcp port is really delivered to the
443 %% peer application before tcp port is closed so that the peer will
444 %% get the correct stream end and not only a transport close.
445 4902 inet:setopts(Socket, [{active, false}]),
446 4902 gen_tcp:shutdown(Socket, write),
447 %% Will return when other side has closed or after 30 s
448 %% e.g. we do not want to hang if something goes wrong
449 %% with the network but we want to maximise the odds that
450 %% peer application gets all data sent on the tcp connection.
451 4902 gen_tcp:recv(Socket, 0, 30000);
452 shutdown_socket_and_wait_for_peer_to_close(Socket, SockMod) ->
453 600 SockMod:close(Socket).
Line Hits Source