1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%% File : ejabberd_receiver.erl |
3 |
|
%%% Author : Alexey Shchepin <alexey@process-one.net> |
4 |
|
%%% Purpose : Socket receiver for C2S and S2S connections |
5 |
|
%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net> |
6 |
|
%%% |
7 |
|
%%% |
8 |
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne |
9 |
|
%%% |
10 |
|
%%% This program is free software; you can redistribute it and/or |
11 |
|
%%% modify it under the terms of the GNU General Public License as |
12 |
|
%%% published by the Free Software Foundation; either version 2 of the |
13 |
|
%%% License, or (at your option) any later version. |
14 |
|
%%% |
15 |
|
%%% This program is distributed in the hope that it will be useful, |
16 |
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 |
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
18 |
|
%%% General Public License for more details. |
19 |
|
%%% |
20 |
|
%%% You should have received a copy of the GNU General Public License |
21 |
|
%%% along with this program; if not, write to the Free Software |
22 |
|
%%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
23 |
|
%%% |
24 |
|
%%%---------------------------------------------------------------------- |
25 |
|
|
26 |
|
-module(ejabberd_receiver). |
27 |
|
-author('alexey@process-one.net'). |
28 |
|
|
29 |
|
-behaviour(gen_server). |
30 |
|
|
31 |
|
%% API |
32 |
|
-export([start_link/4, |
33 |
|
start/4, |
34 |
|
change_shaper/2, |
35 |
|
starttls/2, |
36 |
|
get_socket/1, |
37 |
|
compress/2, |
38 |
|
become_controller/2, |
39 |
|
close/1]). |
40 |
|
|
41 |
|
%% gen_server callbacks |
42 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
43 |
|
terminate/2, code_change/3]). |
44 |
|
|
45 |
|
-ignore_xref([start_link/4]). |
46 |
|
|
47 |
|
-include("mongoose.hrl"). |
48 |
|
-include("jlib.hrl"). |
49 |
|
|
50 |
|
-record(state, {socket, |
51 |
|
sock_mod, |
52 |
|
shaper_state, |
53 |
|
c2s_pid, |
54 |
|
max_stanza_size, |
55 |
|
stanza_chunk_size, |
56 |
|
parser, |
57 |
|
timeout, |
58 |
|
hibernate_after = 0 :: non_neg_integer()}). |
59 |
|
-type state() :: #state{}. |
60 |
|
|
61 |
|
%%==================================================================== |
62 |
|
%% API |
63 |
|
%%==================================================================== |
64 |
|
%%-------------------------------------------------------------------- |
65 |
|
%% Function: start_link() -> {ok, Pid} | ignore | {error, Error} |
66 |
|
%% Description: Starts the server |
67 |
|
%%-------------------------------------------------------------------- |
68 |
|
-spec start_link(_, _, _, _) -> 'ignore' | {'error', _} | {'ok', pid()}. |
69 |
|
start_link(Socket, SockMod, Shaper, ConnOpts) -> |
70 |
6397 |
gen_server:start_link( |
71 |
|
?MODULE, [Socket, SockMod, Shaper, ConnOpts], []). |
72 |
|
|
73 |
|
%%-------------------------------------------------------------------- |
74 |
|
%% Function: start() -> {ok, Pid} | ignore | {error, Error} |
75 |
|
%% Description: Starts the server |
76 |
|
%%-------------------------------------------------------------------- |
77 |
|
start(Socket, SockMod, Shaper, ConnOpts) -> |
78 |
6397 |
{ok, Pid} = supervisor:start_child( |
79 |
|
ejabberd_receiver_sup, |
80 |
|
[Socket, SockMod, Shaper, ConnOpts]), |
81 |
6397 |
Pid. |
82 |
|
|
83 |
|
-spec change_shaper(atom() | pid() | {atom(), _} | {'via', _, _}, _) -> 'ok'. |
84 |
|
change_shaper(Pid, Shaper) -> |
85 |
15341 |
gen_server:cast(Pid, {change_shaper, Shaper}). |
86 |
|
|
87 |
|
starttls(Pid, TLSOpts) -> |
88 |
1094 |
gen_server_call_or_noproc(Pid, {starttls, TLSOpts}). |
89 |
|
|
90 |
|
get_socket(Pid) -> |
91 |
1092 |
gen_server_call_or_noproc(Pid, get_socket). |
92 |
|
|
93 |
|
compress(Pid, ZlibSocket) -> |
94 |
10 |
gen_server_call_or_noproc(Pid, {compress, ZlibSocket}). |
95 |
|
|
96 |
|
become_controller(Pid, C2SPid) -> |
97 |
5947 |
gen_server:call(Pid, {become_controller, C2SPid}). |
98 |
|
|
99 |
|
-spec close(atom() | pid() | {atom(), _} | {'via', _, _}) -> 'ok'. |
100 |
|
close(Pid) -> |
101 |
6387 |
gen_server:cast(Pid, close). |
102 |
|
|
103 |
|
%%==================================================================== |
104 |
|
%% gen_server callbacks |
105 |
|
%%==================================================================== |
106 |
|
|
107 |
|
%%-------------------------------------------------------------------- |
108 |
|
%% Function: init(Args) -> {ok, State} | |
109 |
|
%% {ok, State, Timeout} | |
110 |
|
%% ignore | |
111 |
|
%% {stop, Reason} |
112 |
|
%% Description: Initiates the server |
113 |
|
%%-------------------------------------------------------------------- |
114 |
|
-spec init([any(), ...]) -> {'ok', state()}. |
115 |
|
init([Socket, SockMod, Shaper, ConnOpts]) -> |
116 |
6397 |
ShaperState = shaper:new(Shaper), |
117 |
6397 |
Timeout = case SockMod of |
118 |
|
ssl -> |
119 |
:-( |
20; |
120 |
|
_ -> |
121 |
6397 |
infinity |
122 |
|
end, |
123 |
6397 |
MaxStanzaSize = |
124 |
|
case lists:keyfind(max_stanza_size, 1, ConnOpts) of |
125 |
6322 |
{_, Size} -> Size; |
126 |
75 |
_ -> infinity |
127 |
|
end, |
128 |
6397 |
HibernateAfter = |
129 |
|
case lists:keyfind(hibernate_after, 1, ConnOpts) of |
130 |
:-( |
{_, HA} -> HA; |
131 |
6397 |
_ -> 0 |
132 |
|
end, |
133 |
6397 |
{ok, #state{socket = Socket, |
134 |
|
sock_mod = SockMod, |
135 |
|
shaper_state = ShaperState, |
136 |
|
max_stanza_size = MaxStanzaSize, |
137 |
|
stanza_chunk_size = 0, |
138 |
|
timeout = Timeout, |
139 |
|
hibernate_after = HibernateAfter}}. |
140 |
|
|
141 |
|
%%-------------------------------------------------------------------- |
142 |
|
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
143 |
|
%% {reply, Reply, State, Timeout} | |
144 |
|
%% {noreply, State} | |
145 |
|
%% {noreply, State, Timeout} | |
146 |
|
%% {stop, Reason, Reply, State} | |
147 |
|
%% {stop, Reason, State} |
148 |
|
%% Description: Handling call messages |
149 |
|
%%-------------------------------------------------------------------- |
150 |
|
handle_call(get_socket, _From, #state{socket = Socket} = State) -> |
151 |
639 |
{reply, {ok, Socket}, State, maybe_hibernate(State)}; |
152 |
|
handle_call({starttls, TLSOpts}, From, #state{socket = TCPSocket} = State) -> |
153 |
|
%% the next message from client is part of TLS handshake, it must |
154 |
|
%% be handled by TLS library (another process in case of just_tls) |
155 |
|
%% so deactivating the socket. |
156 |
1092 |
deactivate_socket(State), |
157 |
|
%% TLS handshake always starts from client's request, let |
158 |
|
%% ejabberd_socket finish starttls negotiation and notify |
159 |
|
%% client that it can start TLS handshake. |
160 |
1092 |
gen_server:reply(From, ok), |
161 |
1092 |
case ejabberd_tls:tcp_to_tls(TCPSocket, TLSOpts) of |
162 |
|
{ok, TLSSocket} -> |
163 |
639 |
StateAfterReset = reset_parser(State), |
164 |
639 |
NewState = StateAfterReset#state{socket = TLSSocket, |
165 |
|
sock_mod = ejabberd_tls}, |
166 |
|
%% fast_tls requires dummy recv_data/2 call to accomplish TLS |
167 |
|
%% handshake. such call is simply ignored by just_tls backend. |
168 |
639 |
case ejabberd_tls:recv_data(TLSSocket, <<"">>) of |
169 |
|
{ok, TLSData} -> |
170 |
639 |
NewState2 = process_data(TLSData, NewState), |
171 |
639 |
{noreply, NewState2, maybe_hibernate(NewState2)}; |
172 |
|
{error, Reason} -> |
173 |
:-( |
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason, |
174 |
:-( |
c2s_pid => State#state.c2s_pid}), |
175 |
:-( |
{stop, normal, NewState} |
176 |
|
end; |
177 |
|
{error, Reason} -> |
178 |
453 |
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason, |
179 |
:-( |
c2s_pid => State#state.c2s_pid}), |
180 |
453 |
{stop, normal, State} |
181 |
|
end; |
182 |
|
handle_call({compress, ZlibSocket}, _From, |
183 |
|
#state{c2s_pid = C2SPid} = State) -> |
184 |
10 |
StateAfterReset = reset_parser(State), |
185 |
10 |
NewState = StateAfterReset#state{socket = ZlibSocket, |
186 |
|
sock_mod = ejabberd_zlib}, |
187 |
10 |
case ejabberd_zlib:recv_data(ZlibSocket, "") of |
188 |
|
{ok, ZlibData} -> |
189 |
10 |
NewState2 = process_data(ZlibData, NewState), |
190 |
10 |
{reply, ok, NewState2, maybe_hibernate(NewState2)}; |
191 |
|
{error, inflate_size_exceeded} -> |
192 |
:-( |
apply(gen_fsm(), send_event, |
193 |
|
[C2SPid, {xmlstreamerror, <<"child element too big">>}]), |
194 |
:-( |
{reply, ok, NewState, maybe_hibernate(NewState)}; |
195 |
|
{error, inflate_error} -> |
196 |
:-( |
{stop, normal, ok, NewState} |
197 |
|
end; |
198 |
|
handle_call({become_controller, C2SPid}, _From, State) -> |
199 |
5947 |
StateAfterReset = reset_parser(State), |
200 |
5947 |
NewState = StateAfterReset#state{c2s_pid = C2SPid}, |
201 |
5947 |
activate_socket(NewState), |
202 |
5947 |
Reply = ok, |
203 |
5947 |
{reply, Reply, NewState, maybe_hibernate(NewState)}; |
204 |
|
handle_call(_Request, _From, State) -> |
205 |
:-( |
Reply = ok, |
206 |
:-( |
{reply, Reply, State, maybe_hibernate(State)}. |
207 |
|
|
208 |
|
%%-------------------------------------------------------------------- |
209 |
|
%% Function: handle_cast(Msg, State) -> {noreply, State} | |
210 |
|
%% {noreply, State, Timeout} | |
211 |
|
%% {stop, Reason, State} |
212 |
|
%% Description: Handling cast messages |
213 |
|
%%-------------------------------------------------------------------- |
214 |
|
handle_cast({change_shaper, Shaper}, State) -> |
215 |
15341 |
NewShaperState = shaper:new(Shaper), |
216 |
15341 |
NewState = State#state{shaper_state = NewShaperState}, |
217 |
15341 |
{noreply, NewState, maybe_hibernate(NewState)}; |
218 |
|
handle_cast(close, State) -> |
219 |
1562 |
{stop, normal, State}; |
220 |
|
handle_cast(_Msg, State) -> |
221 |
:-( |
{noreply, State, maybe_hibernate(State)}. |
222 |
|
|
223 |
|
%%-------------------------------------------------------------------- |
224 |
|
%% Function: handle_info(Info, State) -> {noreply, State} | |
225 |
|
%% {noreply, State, Timeout} | |
226 |
|
%% {stop, Reason, State} |
227 |
|
%% Description: Handling all non call/cast messages |
228 |
|
%%-------------------------------------------------------------------- |
229 |
|
handle_info({Tag, _TCPSocket, Data}, |
230 |
|
#state{socket = Socket, |
231 |
|
c2s_pid = C2SPid, |
232 |
|
sock_mod = SockMod} = State) |
233 |
|
when (Tag == tcp) or (Tag == ssl) -> |
234 |
45174 |
case SockMod of |
235 |
|
ejabberd_tls -> |
236 |
1332 |
mongoose_metrics:update(global, |
237 |
|
[data, xmpp, received, encrypted_size], size(Data)), |
238 |
1332 |
case ejabberd_tls:recv_data(Socket, Data) of |
239 |
|
{ok, TLSData} -> |
240 |
920 |
NewState = process_data(TLSData, State), |
241 |
920 |
{noreply, NewState, maybe_hibernate(NewState)}; |
242 |
|
{error, _Reason} -> |
243 |
412 |
{stop, normal, State} |
244 |
|
end; |
245 |
|
ejabberd_zlib -> |
246 |
29 |
mongoose_metrics:update(global, |
247 |
|
[data, xmpp, received, compressed_size], size(Data)), |
248 |
29 |
case ejabberd_zlib:recv_data(Socket, Data) of |
249 |
|
{ok, ZlibData} -> |
250 |
29 |
NewState = process_data(ZlibData, State), |
251 |
29 |
{noreply, NewState, maybe_hibernate(NewState)}; |
252 |
|
{error, inflate_size_exceeded} -> |
253 |
:-( |
apply(gen_fsm(), send_event, |
254 |
|
[C2SPid, {xmlstreamerror, <<"child element too big">>}]), |
255 |
:-( |
{noreply, State, maybe_hibernate(State)}; |
256 |
|
{error, inflate_error} -> |
257 |
:-( |
{stop, normal, State} |
258 |
|
end; |
259 |
|
_ -> |
260 |
43813 |
NewState = process_data(Data, State), |
261 |
43813 |
{noreply, NewState, maybe_hibernate(NewState)} |
262 |
|
end; |
263 |
|
handle_info({Tag, _TCPSocket}, State) |
264 |
|
when (Tag == tcp_closed) or (Tag == ssl_closed) -> |
265 |
3970 |
{stop, normal, State}; |
266 |
|
handle_info({Tag, _TCPSocket, Reason}, State) |
267 |
|
when (Tag == tcp_error) or (Tag == ssl_error) -> |
268 |
:-( |
case Reason of |
269 |
|
timeout -> |
270 |
:-( |
{noreply, State, maybe_hibernate(State)}; |
271 |
|
_ -> |
272 |
:-( |
{stop, normal, State} |
273 |
|
end; |
274 |
|
handle_info({timeout, _Ref, activate}, State) -> |
275 |
669 |
activate_socket(State), |
276 |
669 |
{noreply, State, maybe_hibernate(State)}; |
277 |
|
handle_info(timeout, State) -> |
278 |
:-( |
{noreply, State, hibernate()}; |
279 |
|
handle_info(_Info, State) -> |
280 |
:-( |
{noreply, State, maybe_hibernate(State)}. |
281 |
|
|
282 |
|
%%-------------------------------------------------------------------- |
283 |
|
%% Function: terminate(Reason, State) -> void() |
284 |
|
%% Description: This function is called by a gen_server when it is about to |
285 |
|
%% terminate. It should be the opposite of Module:init/1 and do any necessary |
286 |
|
%% cleaning up. When it returns, the gen_server terminates with Reason. |
287 |
|
%% The return value is ignored. |
288 |
|
%%-------------------------------------------------------------------- |
289 |
|
terminate(_Reason, #state{parser = Parser, |
290 |
|
c2s_pid = C2SPid} = State) -> |
291 |
6397 |
free_parser(Parser), |
292 |
6397 |
case C2SPid of |
293 |
450 |
undefined -> ok; |
294 |
5947 |
_ -> gen_fsm_compat:send_event(C2SPid, closed) |
295 |
|
end, |
296 |
6397 |
catch shutdown_socket_and_wait_for_peer_to_close(State#state.socket, State#state.sock_mod), |
297 |
6397 |
ok. |
298 |
|
|
299 |
|
%%-------------------------------------------------------------------- |
300 |
|
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
301 |
|
%% Description: Convert process state when code is changed |
302 |
|
%%-------------------------------------------------------------------- |
303 |
|
code_change(_OldVsn, State, _Extra) -> |
304 |
:-( |
{ok, State}. |
305 |
|
|
306 |
|
%%-------------------------------------------------------------------- |
307 |
|
%%% Internal functions |
308 |
|
%%-------------------------------------------------------------------- |
309 |
|
|
310 |
|
-spec activate_socket(state()) -> 'ok' | {'tcp_closed', _}. |
311 |
|
activate_socket(#state{socket = Socket, sock_mod = gen_tcp}) -> |
312 |
49047 |
inet:setopts(Socket, [{active, once}]), |
313 |
49047 |
PeerName = inet:peername(Socket), |
314 |
49047 |
resolve_peername(PeerName, Socket); |
315 |
|
activate_socket(#state{socket = Socket, sock_mod = SockMod}) -> |
316 |
1598 |
SockMod:setopts(Socket, [{active, once}]), |
317 |
1598 |
PeerName = SockMod:peername(Socket), |
318 |
1598 |
resolve_peername(PeerName, Socket). |
319 |
|
|
320 |
|
resolve_peername({ok, _}, _Socket) -> |
321 |
50496 |
ok; |
322 |
|
resolve_peername({error, _Reason}, Socket) -> |
323 |
149 |
self() ! {tcp_closed, Socket}. |
324 |
|
|
325 |
|
-spec deactivate_socket(state()) -> 'ok' | {'error', _}. |
326 |
|
deactivate_socket(#state{socket = Socket, |
327 |
|
sock_mod = SockMod}) -> |
328 |
1092 |
case SockMod of |
329 |
1092 |
gen_tcp -> inet:setopts(Socket, [{active, false}]); |
330 |
:-( |
_ -> SockMod:setopts(Socket, [{active, false}]) |
331 |
|
end. |
332 |
|
|
333 |
|
%% @doc Data processing for connectors directly generating xmlel in |
334 |
|
%% Erlang data structure. |
335 |
|
%% WARNING: Shaper does not work with Erlang data structure. |
336 |
|
-spec process_data(binary() | maybe_improper_list(), state()) -> state(). |
337 |
|
process_data([], State) -> |
338 |
:-( |
activate_socket(State), |
339 |
:-( |
State; |
340 |
|
process_data(Els, #state{c2s_pid = undefined} = State) when is_list(Els) -> |
341 |
:-( |
State; |
342 |
|
process_data([Element|Els], #state{c2s_pid = C2SPid} = State) |
343 |
|
when element(1, Element) == xmlel; |
344 |
|
element(1, Element) == xmlstreamstart; |
345 |
|
element(1, Element) == xmlstreamelement; |
346 |
|
element(1, Element) == xmlstreamend -> |
347 |
:-( |
catch gen_fsm_compat:send_event(C2SPid, element_wrapper(Element)), |
348 |
:-( |
process_data(Els, State); |
349 |
|
%% Data processing for connectors receivind data as string. |
350 |
|
process_data(Data, #state{parser = Parser, |
351 |
|
shaper_state = ShaperState, |
352 |
|
stanza_chunk_size = ChunkSize, |
353 |
|
c2s_pid = C2SPid} = State) -> |
354 |
45411 |
?LOG_DEBUG(#{what => received_xml_on_stream, packet => Data, c2s_pid => C2SPid}), |
355 |
45411 |
Size = byte_size(Data), |
356 |
45411 |
maybe_run_keep_alive_hook(Size, State), |
357 |
45411 |
{C2SEvents, NewParser} = |
358 |
|
case exml_stream:parse(Parser, Data) of |
359 |
45407 |
{ok, NParser, Elems} -> {[wrap_if_xmlel(E) || E <- Elems], NParser}; |
360 |
4 |
{error, Reason} -> {[{xmlstreamerror, Reason}], Parser} |
361 |
|
end, |
362 |
45411 |
NewChunkSize = update_stanza_size(C2SEvents, ChunkSize, Size), |
363 |
45411 |
{NewShaperState, Pause} = shaper:update(ShaperState, Size), |
364 |
45411 |
[gen_fsm_compat:send_event(C2SPid, Event) || Event <- C2SEvents], |
365 |
45411 |
maybe_pause(Pause, State), |
366 |
45411 |
State#state{parser = NewParser, shaper_state = NewShaperState, stanza_chunk_size = NewChunkSize}. |
367 |
|
|
368 |
29033 |
wrap_if_xmlel(#xmlel{} = E) -> {xmlstreamelement, E}; |
369 |
15602 |
wrap_if_xmlel(E) -> E. |
370 |
|
|
371 |
|
update_stanza_size([_|_], ChunkSize, Size) -> |
372 |
44224 |
mongoose_metrics:update(global, |
373 |
|
[data, xmpp, received, xml_stanza_size], ChunkSize + Size), |
374 |
44224 |
0; |
375 |
|
update_stanza_size(_, ChunkSize, Size) -> |
376 |
1187 |
ChunkSize + Size. |
377 |
|
|
378 |
|
maybe_pause(_, #state{c2s_pid = undefined}) -> |
379 |
514 |
ok; |
380 |
|
maybe_pause(Pause, _State) when Pause > 0 -> |
381 |
868 |
erlang:start_timer(Pause, self(), activate); |
382 |
|
maybe_pause(_, State) -> |
383 |
44029 |
activate_socket(State). |
384 |
|
|
385 |
|
maybe_run_keep_alive_hook(Size, #state{c2s_pid = C2SPid}) |
386 |
|
when Size < 3, is_pid(C2SPid) -> |
387 |
|
%% yes it can happen that the data is shorter than 3 bytes and contain |
388 |
|
%% some part of xml but this will not harm the keep_alive_hook |
389 |
596 |
gen_fsm_compat:send_all_state_event(C2SPid, keep_alive_packet); |
390 |
|
maybe_run_keep_alive_hook(_, _) -> |
391 |
44815 |
ok. |
392 |
|
|
393 |
|
%% @doc Element coming from XML parser are wrapped inside xmlstreamelement |
394 |
|
%% When we receive directly xmlel tuple (from a socket module |
395 |
|
%% speaking directly Erlang XML), we wrap it inside the same |
396 |
|
%% xmlstreamelement coming from the XML parser. |
397 |
|
-spec element_wrapper(exml:element() | tuple()) -> tuple(). |
398 |
|
element_wrapper(#xmlel{} = XMLElement) -> |
399 |
:-( |
{xmlstreamelement, XMLElement}; |
400 |
|
element_wrapper(Element) -> |
401 |
:-( |
Element. |
402 |
|
|
403 |
|
reset_parser(#state{parser = undefined, max_stanza_size = Size} = State) -> |
404 |
5947 |
MaxSize = case Size of |
405 |
75 |
infinity -> 0; |
406 |
5872 |
_ -> Size |
407 |
|
end, |
408 |
5947 |
{ok, NewParser} = exml_stream:new_parser([{start_tag, <<"stream:stream">>}, |
409 |
|
{max_child_size, MaxSize}]), |
410 |
5947 |
State#state{parser = NewParser, stanza_chunk_size = 0}; |
411 |
|
reset_parser(#state{parser = Parser} = State) -> |
412 |
649 |
{ok, NewParser} = exml_stream:reset_parser(Parser), |
413 |
649 |
State#state{parser = NewParser, stanza_chunk_size = 0}. |
414 |
|
|
415 |
|
free_parser(undefined) -> |
416 |
450 |
ok; |
417 |
|
free_parser(Parser) -> |
418 |
5947 |
exml_stream:free_parser(Parser). |
419 |
|
|
420 |
|
gen_server_call_or_noproc(Pid, Message) -> |
421 |
2196 |
try |
422 |
2196 |
gen_server:call(Pid, Message) |
423 |
|
catch |
424 |
|
exit:{noproc, Extra} -> |
425 |
1 |
{error, {noproc, Extra}}; |
426 |
|
exit:{normal, Extra} -> |
427 |
|
% reciver exited with normal status after the gen_server call was sent |
428 |
|
% but before it was processed |
429 |
454 |
{error, {died, Extra}} |
430 |
|
end. |
431 |
|
|
432 |
:-( |
gen_fsm() -> p1_fsm. |
433 |
|
|
434 |
|
-spec hibernate() -> hibernate | infinity. |
435 |
|
hibernate() -> |
436 |
68007 |
case process_info(self(), message_queue_len) of |
437 |
61330 |
{_, 0} -> hibernate; |
438 |
6677 |
_ -> infinity |
439 |
|
end. |
440 |
|
|
441 |
|
-spec maybe_hibernate(state()) -> hibernate | infinity | pos_integer(). |
442 |
68007 |
maybe_hibernate(#state{hibernate_after = 0}) -> hibernate(); |
443 |
:-( |
maybe_hibernate(#state{hibernate_after = HA}) -> HA. |
444 |
|
|
445 |
|
%% gen_tcp:close/2, but trying to ensure that all data is received by peer. |
446 |
|
%% |
447 |
|
%% This is based on tls_connection:workaround_transport_delivery_problems/2 code |
448 |
|
%% https://github.com/erlang/otp/blob/OTP_17.0-rc2/lib/ssl/src/tls_connection.erl#L959 |
449 |
|
%% |
450 |
|
%% There are some more docs why we need it in http://erlang.org/doc/man/gen_tcp.html#close-1 |
451 |
|
shutdown_socket_and_wait_for_peer_to_close(Socket, gen_tcp) -> |
452 |
|
%% Standard trick to try to make sure all |
453 |
|
%% data sent to the tcp port is really delivered to the |
454 |
|
%% peer application before tcp port is closed so that the peer will |
455 |
|
%% get the correct stream end and not only a transport close. |
456 |
5758 |
inet:setopts(Socket, [{active, false}]), |
457 |
5758 |
gen_tcp:shutdown(Socket, write), |
458 |
|
%% Will return when other side has closed or after 30 s |
459 |
|
%% e.g. we do not want to hang if something goes wrong |
460 |
|
%% with the network but we want to maximise the odds that |
461 |
|
%% peer application gets all data sent on the tcp connection. |
462 |
5758 |
gen_tcp:recv(Socket, 0, 30000); |
463 |
|
shutdown_socket_and_wait_for_peer_to_close(Socket, SockMod) -> |
464 |
639 |
SockMod:close(Socket). |