./ct_report/coverage/ejabberd_receiver.COVER.html

1 %%%----------------------------------------------------------------------
2 %% File : ejabberd_receiver.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Socket receiver for C2S and S2S connections
5 %%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(ejabberd_receiver).
27 -author('alexey@process-one.net').
28
29 -behaviour(gen_server).
30
31 %% API
32 -export([start_link/4,
33 start/4,
34 change_shaper/2,
35 starttls/2,
36 get_socket/1,
37 compress/2,
38 become_controller/2,
39 close/1]).
40
41 %% gen_server callbacks
42 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
43 terminate/2, code_change/3]).
44
45 -ignore_xref([start_link/4]).
46
47 -include("mongoose.hrl").
48 -include("jlib.hrl").
49
50 -record(state, {socket,
51 sock_mod,
52 shaper_state,
53 c2s_pid,
54 max_stanza_size,
55 stanza_chunk_size,
56 parser,
57 timeout,
58 hibernate_after = 0 :: non_neg_integer()}).
59 -type state() :: #state{}.
60
61 %%====================================================================
62 %% API
63 %%====================================================================
64 %%--------------------------------------------------------------------
65 %% Function: start_link() -> {ok, Pid} | ignore | {error, Error}
66 %% Description: Starts the server
67 %%--------------------------------------------------------------------
68 -spec start_link(_, _, _, _) -> 'ignore' | {'error', _} | {'ok', pid()}.
69 start_link(Socket, SockMod, Shaper, ConnOpts) ->
70 4322 gen_server:start_link(
71 ?MODULE, [Socket, SockMod, Shaper, ConnOpts], []).
72
73 %%--------------------------------------------------------------------
74 %% Function: start() -> {ok, Pid} | ignore | {error, Error}
75 %% Description: Starts the server
76 %%--------------------------------------------------------------------
77 start(Socket, SockMod, Shaper, ConnOpts) ->
78 4322 {ok, Pid} = supervisor:start_child(
79 ejabberd_receiver_sup,
80 [Socket, SockMod, Shaper, ConnOpts]),
81 4322 Pid.
82
83 -spec change_shaper(atom() | pid() | {atom(), _} | {'via', _, _}, _) -> 'ok'.
84 change_shaper(Pid, Shaper) ->
85 9129 gen_server:cast(Pid, {change_shaper, Shaper}).
86
87 starttls(Pid, TLSOpts) ->
88 1094 gen_server_call_or_noproc(Pid, {starttls, TLSOpts}).
89
90 get_socket(Pid) ->
91 1092 gen_server_call_or_noproc(Pid, get_socket).
92
93 compress(Pid, ZlibSocket) ->
94 10 gen_server_call_or_noproc(Pid, {compress, ZlibSocket}).
95
96 become_controller(Pid, C2SPid) ->
97 3872 gen_server:call(Pid, {become_controller, C2SPid}).
98
99 -spec close(atom() | pid() | {atom(), _} | {'via', _, _}) -> 'ok'.
100 close(Pid) ->
101 4312 gen_server:cast(Pid, close).
102
103 %%====================================================================
104 %% gen_server callbacks
105 %%====================================================================
106
107 %%--------------------------------------------------------------------
108 %% Function: init(Args) -> {ok, State} |
109 %% {ok, State, Timeout} |
110 %% ignore |
111 %% {stop, Reason}
112 %% Description: Initiates the server
113 %%--------------------------------------------------------------------
114 -spec init([any(), ...]) -> {'ok', state()}.
115 init([Socket, SockMod, Shaper, ConnOpts]) ->
116 4322 ShaperState = shaper:new(Shaper),
117 4322 Timeout = case SockMod of
118 ssl ->
119
:-(
20;
120 _ ->
121 4322 infinity
122 end,
123 4322 MaxStanzaSize =
124 case lists:keyfind(max_stanza_size, 1, ConnOpts) of
125 4247 {_, Size} -> Size;
126 75 _ -> infinity
127 end,
128 4322 HibernateAfter =
129 case lists:keyfind(hibernate_after, 1, ConnOpts) of
130
:-(
{_, HA} -> HA;
131 4322 _ -> 0
132 end,
133 4322 {ok, #state{socket = Socket,
134 sock_mod = SockMod,
135 shaper_state = ShaperState,
136 max_stanza_size = MaxStanzaSize,
137 stanza_chunk_size = 0,
138 timeout = Timeout,
139 hibernate_after = HibernateAfter}}.
140
141 %%--------------------------------------------------------------------
142 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
143 %% {reply, Reply, State, Timeout} |
144 %% {noreply, State} |
145 %% {noreply, State, Timeout} |
146 %% {stop, Reason, Reply, State} |
147 %% {stop, Reason, State}
148 %% Description: Handling call messages
149 %%--------------------------------------------------------------------
150 handle_call(get_socket, _From, #state{socket = Socket} = State) ->
151 639 {reply, {ok, Socket}, State, maybe_hibernate(State)};
152 handle_call({starttls, TLSOpts}, From, #state{socket = TCPSocket} = State) ->
153 %% the next message from client is part of TLS handshake, it must
154 %% be handled by TLS library (another process in case of just_tls)
155 %% so deactivating the socket.
156 1092 deactivate_socket(State),
157 %% TLS handshake always starts from client's request, let
158 %% ejabberd_socket finish starttls negotiation and notify
159 %% client that it can start TLS handshake.
160 1092 gen_server:reply(From, ok),
161 1092 case ejabberd_tls:tcp_to_tls(TCPSocket, TLSOpts) of
162 {ok, TLSSocket} ->
163 639 StateAfterReset = reset_parser(State),
164 639 NewState = StateAfterReset#state{socket = TLSSocket,
165 sock_mod = ejabberd_tls},
166 %% fast_tls requires dummy recv_data/2 call to accomplish TLS
167 %% handshake. such call is simply ignored by just_tls backend.
168 639 case ejabberd_tls:recv_data(TLSSocket, <<"">>) of
169 {ok, TLSData} ->
170 639 NewState2 = process_data(TLSData, NewState),
171 639 {noreply, NewState2, maybe_hibernate(NewState2)};
172 {error, Reason} ->
173
:-(
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason,
174
:-(
c2s_pid => State#state.c2s_pid}),
175
:-(
{stop, normal, NewState}
176 end;
177 {error, Reason} ->
178 453 ?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason,
179
:-(
c2s_pid => State#state.c2s_pid}),
180 453 {stop, normal, State}
181 end;
182 handle_call({compress, ZlibSocket}, _From,
183 #state{c2s_pid = C2SPid} = State) ->
184 10 StateAfterReset = reset_parser(State),
185 10 NewState = StateAfterReset#state{socket = ZlibSocket,
186 sock_mod = ejabberd_zlib},
187 10 case ejabberd_zlib:recv_data(ZlibSocket, "") of
188 {ok, ZlibData} ->
189 10 NewState2 = process_data(ZlibData, NewState),
190 10 {reply, ok, NewState2, maybe_hibernate(NewState2)};
191 {error, inflate_size_exceeded} ->
192
:-(
apply(gen_fsm(), send_event,
193 [C2SPid, {xmlstreamerror, <<"child element too big">>}]),
194
:-(
{reply, ok, NewState, maybe_hibernate(NewState)};
195 {error, inflate_error} ->
196
:-(
{stop, normal, ok, NewState}
197 end;
198 handle_call({become_controller, C2SPid}, _From, State) ->
199 3872 StateAfterReset = reset_parser(State),
200 3872 NewState = StateAfterReset#state{c2s_pid = C2SPid},
201 3872 activate_socket(NewState),
202 3872 Reply = ok,
203 3872 {reply, Reply, NewState, maybe_hibernate(NewState)};
204 handle_call(_Request, _From, State) ->
205
:-(
Reply = ok,
206
:-(
{reply, Reply, State, maybe_hibernate(State)}.
207
208 %%--------------------------------------------------------------------
209 %% Function: handle_cast(Msg, State) -> {noreply, State} |
210 %% {noreply, State, Timeout} |
211 %% {stop, Reason, State}
212 %% Description: Handling cast messages
213 %%--------------------------------------------------------------------
214 handle_cast({change_shaper, Shaper}, State) ->
215 9129 NewShaperState = shaper:new(Shaper),
216 9129 NewState = State#state{shaper_state = NewShaperState},
217 9129 {noreply, NewState, maybe_hibernate(NewState)};
218 handle_cast(close, State) ->
219 1075 {stop, normal, State};
220 handle_cast(_Msg, State) ->
221
:-(
{noreply, State, maybe_hibernate(State)}.
222
223 %%--------------------------------------------------------------------
224 %% Function: handle_info(Info, State) -> {noreply, State} |
225 %% {noreply, State, Timeout} |
226 %% {stop, Reason, State}
227 %% Description: Handling all non call/cast messages
228 %%--------------------------------------------------------------------
229 handle_info({Tag, _TCPSocket, Data},
230 #state{socket = Socket,
231 c2s_pid = C2SPid,
232 sock_mod = SockMod} = State)
233 when (Tag == tcp) or (Tag == ssl) ->
234 26518 case SockMod of
235 ejabberd_tls ->
236 1328 mongoose_metrics:update(global,
237 [data, xmpp, received, encrypted_size], size(Data)),
238 1328 case ejabberd_tls:recv_data(Socket, Data) of
239 {ok, TLSData} ->
240 916 NewState = process_data(TLSData, State),
241 916 {noreply, NewState, maybe_hibernate(NewState)};
242 {error, _Reason} ->
243 412 {stop, normal, State}
244 end;
245 ejabberd_zlib ->
246 29 mongoose_metrics:update(global,
247 [data, xmpp, received, compressed_size], size(Data)),
248 29 case ejabberd_zlib:recv_data(Socket, Data) of
249 {ok, ZlibData} ->
250 29 NewState = process_data(ZlibData, State),
251 29 {noreply, NewState, maybe_hibernate(NewState)};
252 {error, inflate_size_exceeded} ->
253
:-(
apply(gen_fsm(), send_event,
254 [C2SPid, {xmlstreamerror, <<"child element too big">>}]),
255
:-(
{noreply, State, maybe_hibernate(State)};
256 {error, inflate_error} ->
257
:-(
{stop, normal, State}
258 end;
259 _ ->
260 25161 NewState = process_data(Data, State),
261 25161 {noreply, NewState, maybe_hibernate(NewState)}
262 end;
263 handle_info({Tag, _TCPSocket}, State)
264 when (Tag == tcp_closed) or (Tag == ssl_closed) ->
265 2382 {stop, normal, State};
266 handle_info({Tag, _TCPSocket, Reason}, State)
267 when (Tag == tcp_error) or (Tag == ssl_error) ->
268
:-(
case Reason of
269 timeout ->
270
:-(
{noreply, State, maybe_hibernate(State)};
271 _ ->
272
:-(
{stop, normal, State}
273 end;
274 handle_info({timeout, _Ref, activate}, State) ->
275 480 activate_socket(State),
276 480 {noreply, State, maybe_hibernate(State)};
277 handle_info(timeout, State) ->
278
:-(
{noreply, State, hibernate()};
279 handle_info(_Info, State) ->
280
:-(
{noreply, State, maybe_hibernate(State)}.
281
282 %%--------------------------------------------------------------------
283 %% Function: terminate(Reason, State) -> void()
284 %% Description: This function is called by a gen_server when it is about to
285 %% terminate. It should be the opposite of Module:init/1 and do any necessary
286 %% cleaning up. When it returns, the gen_server terminates with Reason.
287 %% The return value is ignored.
288 %%--------------------------------------------------------------------
289 terminate(_Reason, #state{parser = Parser,
290 c2s_pid = C2SPid} = State) ->
291 4322 free_parser(Parser),
292 4322 case C2SPid of
293 450 undefined -> ok;
294 3872 _ -> gen_fsm_compat:send_event(C2SPid, closed)
295 end,
296 4322 catch shutdown_socket_and_wait_for_peer_to_close(State#state.socket, State#state.sock_mod),
297 4322 ok.
298
299 %%--------------------------------------------------------------------
300 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
301 %% Description: Convert process state when code is changed
302 %%--------------------------------------------------------------------
303 code_change(_OldVsn, State, _Extra) ->
304
:-(
{ok, State}.
305
306 %%--------------------------------------------------------------------
307 %%% Internal functions
308 %%--------------------------------------------------------------------
309
310 -spec activate_socket(state()) -> 'ok' | {'tcp_closed', _}.
311 activate_socket(#state{socket = Socket, sock_mod = gen_tcp}) ->
312 28381 inet:setopts(Socket, [{active, once}]),
313 28381 PeerName = inet:peername(Socket),
314 28381 resolve_peername(PeerName, Socket);
315 activate_socket(#state{socket = Socket, sock_mod = SockMod}) ->
316 1594 SockMod:setopts(Socket, [{active, once}]),
317 1594 PeerName = SockMod:peername(Socket),
318 1594 resolve_peername(PeerName, Socket).
319
320 resolve_peername({ok, _}, _Socket) ->
321 29825 ok;
322 resolve_peername({error, _Reason}, Socket) ->
323 150 self() ! {tcp_closed, Socket}.
324
325 -spec deactivate_socket(state()) -> 'ok' | {'error', _}.
326 deactivate_socket(#state{socket = Socket,
327 sock_mod = SockMod}) ->
328 1092 case SockMod of
329 1092 gen_tcp -> inet:setopts(Socket, [{active, false}]);
330
:-(
_ -> SockMod:setopts(Socket, [{active, false}])
331 end.
332
333 %% @doc Data processing for connectors directly generating xmlel in
334 %% Erlang data structure.
335 %% WARNING: Shaper does not work with Erlang data structure.
336 -spec process_data(binary() | maybe_improper_list(), state()) -> state().
337 process_data([], State) ->
338
:-(
activate_socket(State),
339
:-(
State;
340 process_data(Els, #state{c2s_pid = undefined} = State) when is_list(Els) ->
341
:-(
State;
342 process_data([Element|Els], #state{c2s_pid = C2SPid} = State)
343 when element(1, Element) == xmlel;
344 element(1, Element) == xmlstreamstart;
345 element(1, Element) == xmlstreamelement;
346 element(1, Element) == xmlstreamend ->
347
:-(
catch gen_fsm_compat:send_event(C2SPid, element_wrapper(Element)),
348
:-(
process_data(Els, State);
349 %% Data processing for connectors receivind data as string.
350 process_data(Data, #state{parser = Parser,
351 shaper_state = ShaperState,
352 stanza_chunk_size = ChunkSize,
353 c2s_pid = C2SPid} = State) ->
354 26755 ?LOG_DEBUG(#{what => received_xml_on_stream, packet => Data, c2s_pid => C2SPid}),
355 26755 Size = byte_size(Data),
356 26755 maybe_run_keep_alive_hook(Size, State),
357 26755 {C2SEvents, NewParser} =
358 case exml_stream:parse(Parser, Data) of
359 26751 {ok, NParser, Elems} -> {[wrap_if_xmlel(E) || E <- Elems], NParser};
360 4 {error, Reason} -> {[{xmlstreamerror, Reason}], Parser}
361 end,
362 26755 NewChunkSize = update_stanza_size(C2SEvents, ChunkSize, Size),
363 26755 {NewShaperState, Pause} = shaper:update(ShaperState, Size),
364 26755 [gen_fsm_compat:send_event(C2SPid, Event) || Event <- C2SEvents],
365 26755 maybe_pause(Pause, State),
366 26755 State#state{parser = NewParser, shaper_state = NewShaperState, stanza_chunk_size = NewChunkSize}.
367
368 16534 wrap_if_xmlel(#xmlel{} = E) -> {xmlstreamelement, E};
369 9408 wrap_if_xmlel(E) -> E.
370
371 update_stanza_size([_|_], ChunkSize, Size) ->
372 25564 mongoose_metrics:update(global,
373 [data, xmpp, received, xml_stanza_size], ChunkSize + Size),
374 25564 0;
375 update_stanza_size(_, ChunkSize, Size) ->
376 1191 ChunkSize + Size.
377
378 maybe_pause(_, #state{c2s_pid = undefined}) ->
379 514 ok;
380 maybe_pause(Pause, _State) when Pause > 0 ->
381 618 erlang:start_timer(Pause, self(), activate);
382 maybe_pause(_, State) ->
383 25623 activate_socket(State).
384
385 maybe_run_keep_alive_hook(Size, #state{c2s_pid = C2SPid})
386 when Size < 3, is_pid(C2SPid) ->
387 %% yes it can happen that the data is shorter than 3 bytes and contain
388 %% some part of xml but this will not harm the keep_alive_hook
389 599 gen_fsm_compat:send_all_state_event(C2SPid, keep_alive_packet);
390 maybe_run_keep_alive_hook(_, _) ->
391 26156 ok.
392
393 %% @doc Element coming from XML parser are wrapped inside xmlstreamelement
394 %% When we receive directly xmlel tuple (from a socket module
395 %% speaking directly Erlang XML), we wrap it inside the same
396 %% xmlstreamelement coming from the XML parser.
397 -spec element_wrapper(exml:element() | tuple()) -> tuple().
398 element_wrapper(#xmlel{} = XMLElement) ->
399
:-(
{xmlstreamelement, XMLElement};
400 element_wrapper(Element) ->
401
:-(
Element.
402
403 reset_parser(#state{parser = undefined, max_stanza_size = Size} = State) ->
404 3872 MaxSize = case Size of
405 75 infinity -> 0;
406 3797 _ -> Size
407 end,
408 3872 {ok, NewParser} = exml_stream:new_parser([{start_tag, <<"stream:stream">>},
409 {max_child_size, MaxSize}]),
410 3872 State#state{parser = NewParser, stanza_chunk_size = 0};
411 reset_parser(#state{parser = Parser} = State) ->
412 649 {ok, NewParser} = exml_stream:reset_parser(Parser),
413 649 State#state{parser = NewParser, stanza_chunk_size = 0}.
414
415 free_parser(undefined) ->
416 450 ok;
417 free_parser(Parser) ->
418 3872 exml_stream:free_parser(Parser).
419
420 gen_server_call_or_noproc(Pid, Message) ->
421 2196 try
422 2196 gen_server:call(Pid, Message)
423 catch
424 exit:{noproc, Extra} ->
425 1 {error, {noproc, Extra}};
426 exit:{normal, Extra} ->
427 % reciver exited with normal status after the gen_server call was sent
428 % but before it was processed
429 454 {error, {died, Extra}}
430 end.
431
432
:-(
gen_fsm() -> p1_fsm.
433
434 -spec hibernate() -> hibernate | infinity.
435 hibernate() ->
436 40875 case process_info(self(), message_queue_len) of
437 36319 {_, 0} -> hibernate;
438 4556 _ -> infinity
439 end.
440
441 -spec maybe_hibernate(state()) -> hibernate | infinity | pos_integer().
442 40875 maybe_hibernate(#state{hibernate_after = 0}) -> hibernate();
443
:-(
maybe_hibernate(#state{hibernate_after = HA}) -> HA.
444
445 %% gen_tcp:close/2, but trying to ensure that all data is received by peer.
446 %%
447 %% This is based on tls_connection:workaround_transport_delivery_problems/2 code
448 %% https://github.com/erlang/otp/blob/OTP_17.0-rc2/lib/ssl/src/tls_connection.erl#L959
449 %%
450 %% There are some more docs why we need it in http://erlang.org/doc/man/gen_tcp.html#close-1
451 shutdown_socket_and_wait_for_peer_to_close(Socket, gen_tcp) ->
452 %% Standard trick to try to make sure all
453 %% data sent to the tcp port is really delivered to the
454 %% peer application before tcp port is closed so that the peer will
455 %% get the correct stream end and not only a transport close.
456 3683 inet:setopts(Socket, [{active, false}]),
457 3683 gen_tcp:shutdown(Socket, write),
458 %% Will return when other side has closed or after 30 s
459 %% e.g. we do not want to hang if something goes wrong
460 %% with the network but we want to maximise the odds that
461 %% peer application gets all data sent on the tcp connection.
462 3683 gen_tcp:recv(Socket, 0, 30000);
463 shutdown_socket_and_wait_for_peer_to_close(Socket, SockMod) ->
464 639 SockMod:close(Socket).
Line Hits Source