1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%% File : ejabberd_receiver.erl |
3 |
|
%%% Author : Alexey Shchepin <alexey@process-one.net> |
4 |
|
%%% Purpose : Socket receiver for C2S and S2S connections |
5 |
|
%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net> |
6 |
|
%%% |
7 |
|
%%% |
8 |
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne |
9 |
|
%%% |
10 |
|
%%% This program is free software; you can redistribute it and/or |
11 |
|
%%% modify it under the terms of the GNU General Public License as |
12 |
|
%%% published by the Free Software Foundation; either version 2 of the |
13 |
|
%%% License, or (at your option) any later version. |
14 |
|
%%% |
15 |
|
%%% This program is distributed in the hope that it will be useful, |
16 |
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 |
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
18 |
|
%%% General Public License for more details. |
19 |
|
%%% |
20 |
|
%%% You should have received a copy of the GNU General Public License |
21 |
|
%%% along with this program; if not, write to the Free Software |
22 |
|
%%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
23 |
|
%%% |
24 |
|
%%%---------------------------------------------------------------------- |
25 |
|
|
26 |
|
-module(ejabberd_receiver). |
27 |
|
-author('alexey@process-one.net'). |
28 |
|
|
29 |
|
-behaviour(gen_server). |
30 |
|
|
31 |
|
%% API |
32 |
|
-export([start_link/4, |
33 |
|
start/4, |
34 |
|
change_shaper/2, |
35 |
|
starttls/2, |
36 |
|
get_socket/1, |
37 |
|
compress/2, |
38 |
|
become_controller/2, |
39 |
|
close/1]). |
40 |
|
|
41 |
|
%% gen_server callbacks |
42 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
43 |
|
terminate/2, code_change/3]). |
44 |
|
|
45 |
|
-ignore_xref([start_link/4]). |
46 |
|
|
47 |
|
-include("mongoose.hrl"). |
48 |
|
-include("jlib.hrl"). |
49 |
|
|
50 |
|
-record(state, {socket, |
51 |
|
sock_mod, |
52 |
|
shaper_state, |
53 |
|
c2s_pid, |
54 |
|
max_stanza_size, |
55 |
|
stanza_chunk_size, |
56 |
|
parser, |
57 |
|
timeout, |
58 |
|
hibernate_after = 0 :: non_neg_integer()}). |
59 |
|
-type state() :: #state{}. |
60 |
|
|
61 |
|
-type options() :: #{max_stanza_size := pos_integer() | infinity, |
62 |
|
hibernate_after := non_neg_integer(), |
63 |
|
atom() => any()}. |
64 |
|
|
65 |
|
%%==================================================================== |
66 |
|
%% API |
67 |
|
%%==================================================================== |
68 |
|
%%-------------------------------------------------------------------- |
69 |
|
%% Function: start_link() -> {ok, Pid} | ignore | {error, Error} |
70 |
|
%% Description: Starts the server |
71 |
|
%%-------------------------------------------------------------------- |
72 |
|
-spec start_link(port(), ejabberd:sockmod(), atom(), options()) -> |
73 |
|
ignore | {error, _} | {ok, pid()}. |
74 |
|
start_link(Socket, SockMod, Shaper, Opts) -> |
75 |
5502 |
gen_server:start_link(?MODULE, [Socket, SockMod, Shaper, Opts], []). |
76 |
|
|
77 |
|
-spec start(port(), ejabberd:sockmod(), atom(), options()) -> pid(). |
78 |
|
start(Socket, SockMod, Shaper, Opts) -> |
79 |
5502 |
{ok, Pid} = supervisor:start_child(ejabberd_receiver_sup, |
80 |
|
[Socket, SockMod, Shaper, Opts]), |
81 |
5502 |
Pid. |
82 |
|
|
83 |
|
-spec change_shaper(atom() | pid() | {atom(), _} | {'via', _, _}, _) -> 'ok'. |
84 |
|
change_shaper(Pid, Shaper) -> |
85 |
13428 |
gen_server:cast(Pid, {change_shaper, Shaper}). |
86 |
|
|
87 |
|
starttls(Pid, TLSOpts) -> |
88 |
1055 |
gen_server_call_or_noproc(Pid, {starttls, TLSOpts}). |
89 |
|
|
90 |
|
get_socket(Pid) -> |
91 |
1053 |
gen_server_call_or_noproc(Pid, get_socket). |
92 |
|
|
93 |
|
compress(Pid, ZlibSocket) -> |
94 |
10 |
gen_server_call_or_noproc(Pid, {compress, ZlibSocket}). |
95 |
|
|
96 |
|
become_controller(Pid, C2SPid) -> |
97 |
5052 |
gen_server:call(Pid, {become_controller, C2SPid}). |
98 |
|
|
99 |
|
-spec close(atom() | pid() | {atom(), _} | {'via', _, _}) -> 'ok'. |
100 |
|
close(Pid) -> |
101 |
5508 |
gen_server:cast(Pid, close). |
102 |
|
|
103 |
|
%%==================================================================== |
104 |
|
%% gen_server callbacks |
105 |
|
%%==================================================================== |
106 |
|
|
107 |
|
%%-------------------------------------------------------------------- |
108 |
|
%% Function: init(Args) -> {ok, State} | |
109 |
|
%% {ok, State, Timeout} | |
110 |
|
%% ignore | |
111 |
|
%% {stop, Reason} |
112 |
|
%% Description: Initiates the server |
113 |
|
%%-------------------------------------------------------------------- |
114 |
|
-spec init([any(), ...]) -> {'ok', state()}. |
115 |
|
init([Socket, SockMod, Shaper, Opts]) -> |
116 |
5502 |
ShaperState = shaper:new(Shaper), |
117 |
5502 |
#{max_stanza_size := MaxStanzaSize, hibernate_after := HibernateAfter} = Opts, |
118 |
5502 |
Timeout = case SockMod of |
119 |
:-( |
ssl -> 20; |
120 |
5502 |
_ -> infinity |
121 |
|
end, |
122 |
5502 |
{ok, #state{socket = Socket, |
123 |
|
sock_mod = SockMod, |
124 |
|
shaper_state = ShaperState, |
125 |
|
max_stanza_size = MaxStanzaSize, |
126 |
|
stanza_chunk_size = 0, |
127 |
|
timeout = Timeout, |
128 |
|
hibernate_after = HibernateAfter}}. |
129 |
|
|
130 |
|
%%-------------------------------------------------------------------- |
131 |
|
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
132 |
|
%% {reply, Reply, State, Timeout} | |
133 |
|
%% {noreply, State} | |
134 |
|
%% {noreply, State, Timeout} | |
135 |
|
%% {stop, Reason, Reply, State} | |
136 |
|
%% {stop, Reason, State} |
137 |
|
%% Description: Handling call messages |
138 |
|
%%-------------------------------------------------------------------- |
139 |
|
handle_call(get_socket, _From, #state{socket = Socket} = State) -> |
140 |
600 |
{reply, {ok, Socket}, State, maybe_hibernate(State)}; |
141 |
|
handle_call({starttls, TLSOpts}, From, #state{socket = TCPSocket} = State) -> |
142 |
|
%% the next message from client is part of TLS handshake, it must |
143 |
|
%% be handled by TLS library (another process in case of just_tls) |
144 |
|
%% so deactivating the socket. |
145 |
1053 |
deactivate_socket(State), |
146 |
|
%% TLS handshake always starts from client's request, let |
147 |
|
%% ejabberd_socket finish starttls negotiation and notify |
148 |
|
%% client that it can start TLS handshake. |
149 |
1053 |
gen_server:reply(From, ok), |
150 |
1053 |
case ejabberd_tls:tcp_to_tls(TCPSocket, TLSOpts) of |
151 |
|
{ok, TLSSocket} -> |
152 |
600 |
StateAfterReset = reset_parser(State), |
153 |
600 |
NewState = StateAfterReset#state{socket = TLSSocket, |
154 |
|
sock_mod = ejabberd_tls}, |
155 |
|
%% fast_tls requires dummy recv_data/2 call to accomplish TLS |
156 |
|
%% handshake. such call is simply ignored by just_tls backend. |
157 |
600 |
case ejabberd_tls:recv_data(TLSSocket, <<"">>) of |
158 |
|
{ok, TLSData} -> |
159 |
600 |
NewState2 = process_data(TLSData, NewState), |
160 |
600 |
{noreply, NewState2, maybe_hibernate(NewState2)}; |
161 |
|
{error, Reason} -> |
162 |
:-( |
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason, |
163 |
:-( |
c2s_pid => State#state.c2s_pid}), |
164 |
:-( |
{stop, normal, NewState} |
165 |
|
end; |
166 |
|
{error, Reason} -> |
167 |
453 |
?LOG_WARNING(#{what => tcp_to_tls_failed, reason => Reason, |
168 |
:-( |
c2s_pid => State#state.c2s_pid}), |
169 |
453 |
{stop, normal, State} |
170 |
|
end; |
171 |
|
handle_call({compress, ZlibSocket}, _From, |
172 |
|
#state{c2s_pid = C2SPid} = State) -> |
173 |
10 |
StateAfterReset = reset_parser(State), |
174 |
10 |
NewState = StateAfterReset#state{socket = ZlibSocket, |
175 |
|
sock_mod = ejabberd_zlib}, |
176 |
10 |
case ejabberd_zlib:recv_data(ZlibSocket, "") of |
177 |
|
{ok, ZlibData} -> |
178 |
10 |
NewState2 = process_data(ZlibData, NewState), |
179 |
10 |
{reply, ok, NewState2, maybe_hibernate(NewState2)}; |
180 |
|
{error, inflate_size_exceeded} -> |
181 |
:-( |
apply(gen_fsm(), send_event, |
182 |
|
[C2SPid, {xmlstreamerror, <<"child element too big">>}]), |
183 |
:-( |
{reply, ok, NewState, maybe_hibernate(NewState)}; |
184 |
|
{error, inflate_error} -> |
185 |
:-( |
{stop, normal, ok, NewState} |
186 |
|
end; |
187 |
|
handle_call({become_controller, C2SPid}, _From, State) -> |
188 |
5052 |
StateAfterReset = reset_parser(State), |
189 |
5052 |
NewState = StateAfterReset#state{c2s_pid = C2SPid}, |
190 |
5052 |
activate_socket(NewState), |
191 |
5052 |
Reply = ok, |
192 |
5052 |
{reply, Reply, NewState, maybe_hibernate(NewState)}; |
193 |
|
handle_call(_Request, _From, State) -> |
194 |
:-( |
Reply = ok, |
195 |
:-( |
{reply, Reply, State, maybe_hibernate(State)}. |
196 |
|
|
197 |
|
%%-------------------------------------------------------------------- |
198 |
|
%% Function: handle_cast(Msg, State) -> {noreply, State} | |
199 |
|
%% {noreply, State, Timeout} | |
200 |
|
%% {stop, Reason, State} |
201 |
|
%% Description: Handling cast messages |
202 |
|
%%-------------------------------------------------------------------- |
203 |
|
handle_cast({change_shaper, Shaper}, State) -> |
204 |
13428 |
NewShaperState = shaper:new(Shaper), |
205 |
13428 |
NewState = State#state{shaper_state = NewShaperState}, |
206 |
13428 |
{noreply, NewState, maybe_hibernate(NewState)}; |
207 |
|
handle_cast(close, State) -> |
208 |
899 |
{stop, normal, State}; |
209 |
|
handle_cast(_Msg, State) -> |
210 |
:-( |
{noreply, State, maybe_hibernate(State)}. |
211 |
|
|
212 |
|
%%-------------------------------------------------------------------- |
213 |
|
%% Function: handle_info(Info, State) -> {noreply, State} | |
214 |
|
%% {noreply, State, Timeout} | |
215 |
|
%% {stop, Reason, State} |
216 |
|
%% Description: Handling all non call/cast messages |
217 |
|
%%-------------------------------------------------------------------- |
218 |
|
handle_info({Tag, _TCPSocket, Data}, |
219 |
|
#state{socket = Socket, |
220 |
|
c2s_pid = C2SPid, |
221 |
|
sock_mod = SockMod} = State) |
222 |
|
when (Tag == tcp) or (Tag == ssl) -> |
223 |
39011 |
case SockMod of |
224 |
|
ejabberd_tls -> |
225 |
1130 |
mongoose_metrics:update(global, |
226 |
|
[data, xmpp, received, encrypted_size], size(Data)), |
227 |
1130 |
case ejabberd_tls:recv_data(Socket, Data) of |
228 |
|
{ok, TLSData} -> |
229 |
718 |
NewState = process_data(TLSData, State), |
230 |
718 |
{noreply, NewState, maybe_hibernate(NewState)}; |
231 |
|
{error, _Reason} -> |
232 |
412 |
{stop, normal, State} |
233 |
|
end; |
234 |
|
ejabberd_zlib -> |
235 |
29 |
mongoose_metrics:update(global, |
236 |
|
[data, xmpp, received, compressed_size], size(Data)), |
237 |
29 |
case ejabberd_zlib:recv_data(Socket, Data) of |
238 |
|
{ok, ZlibData} -> |
239 |
29 |
NewState = process_data(ZlibData, State), |
240 |
29 |
{noreply, NewState, maybe_hibernate(NewState)}; |
241 |
|
{error, inflate_size_exceeded} -> |
242 |
:-( |
apply(gen_fsm(), send_event, |
243 |
|
[C2SPid, {xmlstreamerror, <<"child element too big">>}]), |
244 |
:-( |
{noreply, State, maybe_hibernate(State)}; |
245 |
|
{error, inflate_error} -> |
246 |
:-( |
{stop, normal, State} |
247 |
|
end; |
248 |
|
_ -> |
249 |
37852 |
NewState = process_data(Data, State), |
250 |
37852 |
{noreply, NewState, maybe_hibernate(NewState)} |
251 |
|
end; |
252 |
|
handle_info({Tag, _TCPSocket}, State) |
253 |
|
when (Tag == tcp_closed) or (Tag == ssl_closed) -> |
254 |
3738 |
{stop, normal, State}; |
255 |
|
handle_info({Tag, _TCPSocket, Reason}, State) |
256 |
|
when (Tag == tcp_error) or (Tag == ssl_error) -> |
257 |
:-( |
case Reason of |
258 |
|
timeout -> |
259 |
:-( |
{noreply, State, maybe_hibernate(State)}; |
260 |
|
_ -> |
261 |
:-( |
{stop, normal, State} |
262 |
|
end; |
263 |
|
handle_info({timeout, _Ref, activate}, State) -> |
264 |
398 |
activate_socket(State), |
265 |
398 |
{noreply, State, maybe_hibernate(State)}; |
266 |
|
handle_info(timeout, State) -> |
267 |
:-( |
{noreply, State, hibernate()}; |
268 |
|
handle_info(_Info, State) -> |
269 |
:-( |
{noreply, State, maybe_hibernate(State)}. |
270 |
|
|
271 |
|
%%-------------------------------------------------------------------- |
272 |
|
%% Function: terminate(Reason, State) -> void() |
273 |
|
%% Description: This function is called by a gen_server when it is about to |
274 |
|
%% terminate. It should be the opposite of Module:init/1 and do any necessary |
275 |
|
%% cleaning up. When it returns, the gen_server terminates with Reason. |
276 |
|
%% The return value is ignored. |
277 |
|
%%-------------------------------------------------------------------- |
278 |
|
terminate(_Reason, #state{parser = Parser, |
279 |
|
c2s_pid = C2SPid} = State) -> |
280 |
5502 |
free_parser(Parser), |
281 |
5502 |
case C2SPid of |
282 |
450 |
undefined -> ok; |
283 |
5052 |
_ -> gen_fsm_compat:send_event(C2SPid, closed) |
284 |
|
end, |
285 |
5502 |
catch shutdown_socket_and_wait_for_peer_to_close(State#state.socket, State#state.sock_mod), |
286 |
5502 |
ok. |
287 |
|
|
288 |
|
%%-------------------------------------------------------------------- |
289 |
|
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
290 |
|
%% Description: Convert process state when code is changed |
291 |
|
%%-------------------------------------------------------------------- |
292 |
|
code_change(_OldVsn, State, _Extra) -> |
293 |
:-( |
{ok, State}. |
294 |
|
|
295 |
|
%%-------------------------------------------------------------------- |
296 |
|
%%% Internal functions |
297 |
|
%%-------------------------------------------------------------------- |
298 |
|
|
299 |
|
-spec activate_socket(state()) -> 'ok' | {'tcp_closed', _}. |
300 |
|
activate_socket(#state{socket = Socket, sock_mod = gen_tcp}) -> |
301 |
42256 |
inet:setopts(Socket, [{active, once}]), |
302 |
42256 |
PeerName = inet:peername(Socket), |
303 |
42256 |
resolve_peername(PeerName, Socket); |
304 |
|
activate_socket(#state{socket = Socket, sock_mod = SockMod}) -> |
305 |
1357 |
SockMod:setopts(Socket, [{active, once}]), |
306 |
1357 |
PeerName = SockMod:peername(Socket), |
307 |
1357 |
resolve_peername(PeerName, Socket). |
308 |
|
|
309 |
|
resolve_peername({ok, _}, _Socket) -> |
310 |
43472 |
ok; |
311 |
|
resolve_peername({error, _Reason}, Socket) -> |
312 |
141 |
self() ! {tcp_closed, Socket}. |
313 |
|
|
314 |
|
-spec deactivate_socket(state()) -> 'ok' | {'error', _}. |
315 |
|
deactivate_socket(#state{socket = Socket, |
316 |
|
sock_mod = SockMod}) -> |
317 |
1053 |
case SockMod of |
318 |
1053 |
gen_tcp -> inet:setopts(Socket, [{active, false}]); |
319 |
:-( |
_ -> SockMod:setopts(Socket, [{active, false}]) |
320 |
|
end. |
321 |
|
|
322 |
|
%% @doc Data processing for connectors directly generating xmlel in |
323 |
|
%% Erlang data structure. |
324 |
|
%% WARNING: Shaper does not work with Erlang data structure. |
325 |
|
-spec process_data(binary() | maybe_improper_list(), state()) -> state(). |
326 |
|
process_data([], State) -> |
327 |
:-( |
activate_socket(State), |
328 |
:-( |
State; |
329 |
|
process_data(Els, #state{c2s_pid = undefined} = State) when is_list(Els) -> |
330 |
:-( |
State; |
331 |
|
process_data([Element|Els], #state{c2s_pid = C2SPid} = State) |
332 |
|
when element(1, Element) == xmlel; |
333 |
|
element(1, Element) == xmlstreamstart; |
334 |
|
element(1, Element) == xmlstreamelement; |
335 |
|
element(1, Element) == xmlstreamend -> |
336 |
:-( |
catch gen_fsm_compat:send_event(C2SPid, element_wrapper(Element)), |
337 |
:-( |
process_data(Els, State); |
338 |
|
%% Data processing for connectors receivind data as string. |
339 |
|
process_data(Data, #state{parser = Parser, |
340 |
|
shaper_state = ShaperState, |
341 |
|
stanza_chunk_size = ChunkSize, |
342 |
|
c2s_pid = C2SPid} = State) -> |
343 |
39209 |
?LOG_DEBUG(#{what => received_xml_on_stream, packet => Data, c2s_pid => C2SPid}), |
344 |
39209 |
Size = byte_size(Data), |
345 |
39209 |
maybe_run_keep_alive_hook(Size, State), |
346 |
39209 |
{C2SEvents, NewParser} = |
347 |
|
case exml_stream:parse(Parser, Data) of |
348 |
39205 |
{ok, NParser, Elems} -> {[wrap_if_xmlel(E) || E <- Elems], NParser}; |
349 |
4 |
{error, Reason} -> {[{xmlstreamerror, Reason}], Parser} |
350 |
|
end, |
351 |
39209 |
NewChunkSize = update_stanza_size(C2SEvents, ChunkSize, Size), |
352 |
39209 |
{NewShaperState, Pause} = shaper:update(ShaperState, Size), |
353 |
39209 |
[gen_fsm_compat:send_event(C2SPid, Event) || Event <- C2SEvents], |
354 |
39209 |
maybe_pause(Pause, State), |
355 |
39209 |
State#state{parser = NewParser, shaper_state = NewShaperState, stanza_chunk_size = NewChunkSize}. |
356 |
|
|
357 |
24960 |
wrap_if_xmlel(#xmlel{} = E) -> {xmlstreamelement, E}; |
358 |
13378 |
wrap_if_xmlel(E) -> E. |
359 |
|
|
360 |
|
update_stanza_size([_|_], ChunkSize, Size) -> |
361 |
38220 |
mongoose_metrics:update(global, |
362 |
|
[data, xmpp, received, xml_stanza_size], ChunkSize + Size), |
363 |
38220 |
0; |
364 |
|
update_stanza_size(_, ChunkSize, Size) -> |
365 |
989 |
ChunkSize + Size. |
366 |
|
|
367 |
|
maybe_pause(_, #state{c2s_pid = undefined}) -> |
368 |
514 |
ok; |
369 |
|
maybe_pause(Pause, _State) when Pause > 0 -> |
370 |
532 |
erlang:start_timer(Pause, self(), activate); |
371 |
|
maybe_pause(_, State) -> |
372 |
38163 |
activate_socket(State). |
373 |
|
|
374 |
|
maybe_run_keep_alive_hook(Size, #state{c2s_pid = C2SPid}) |
375 |
|
when Size < 3, is_pid(C2SPid) -> |
376 |
|
%% yes it can happen that the data is shorter than 3 bytes and contain |
377 |
|
%% some part of xml but this will not harm the keep_alive_hook |
378 |
472 |
gen_fsm_compat:send_all_state_event(C2SPid, keep_alive_packet); |
379 |
|
maybe_run_keep_alive_hook(_, _) -> |
380 |
38737 |
ok. |
381 |
|
|
382 |
|
%% @doc Element coming from XML parser are wrapped inside xmlstreamelement |
383 |
|
%% When we receive directly xmlel tuple (from a socket module |
384 |
|
%% speaking directly Erlang XML), we wrap it inside the same |
385 |
|
%% xmlstreamelement coming from the XML parser. |
386 |
|
-spec element_wrapper(exml:element() | tuple()) -> tuple(). |
387 |
|
element_wrapper(#xmlel{} = XMLElement) -> |
388 |
:-( |
{xmlstreamelement, XMLElement}; |
389 |
|
element_wrapper(Element) -> |
390 |
:-( |
Element. |
391 |
|
|
392 |
|
reset_parser(#state{parser = undefined, max_stanza_size = Size} = State) -> |
393 |
5052 |
MaxSize = case Size of |
394 |
:-( |
infinity -> 0; |
395 |
5052 |
_ -> Size |
396 |
|
end, |
397 |
5052 |
{ok, NewParser} = exml_stream:new_parser([{start_tag, <<"stream:stream">>}, |
398 |
|
{max_child_size, MaxSize}]), |
399 |
5052 |
State#state{parser = NewParser, stanza_chunk_size = 0}; |
400 |
|
reset_parser(#state{parser = Parser} = State) -> |
401 |
610 |
{ok, NewParser} = exml_stream:reset_parser(Parser), |
402 |
610 |
State#state{parser = NewParser, stanza_chunk_size = 0}. |
403 |
|
|
404 |
|
free_parser(undefined) -> |
405 |
450 |
ok; |
406 |
|
free_parser(Parser) -> |
407 |
5052 |
exml_stream:free_parser(Parser). |
408 |
|
|
409 |
|
gen_server_call_or_noproc(Pid, Message) -> |
410 |
2118 |
try |
411 |
2118 |
gen_server:call(Pid, Message) |
412 |
|
catch |
413 |
|
exit:{noproc, Extra} -> |
414 |
1 |
{error, {noproc, Extra}}; |
415 |
|
exit:{normal, Extra} -> |
416 |
|
% reciver exited with normal status after the gen_server call was sent |
417 |
|
% but before it was processed |
418 |
454 |
{error, {died, Extra}} |
419 |
|
end. |
420 |
|
|
421 |
:-( |
gen_fsm() -> p1_fsm. |
422 |
|
|
423 |
|
-spec hibernate() -> hibernate | infinity. |
424 |
|
hibernate() -> |
425 |
58687 |
case process_info(self(), message_queue_len) of |
426 |
53136 |
{_, 0} -> hibernate; |
427 |
5551 |
_ -> infinity |
428 |
|
end. |
429 |
|
|
430 |
|
-spec maybe_hibernate(state()) -> hibernate | infinity | pos_integer(). |
431 |
58687 |
maybe_hibernate(#state{hibernate_after = 0}) -> hibernate(); |
432 |
:-( |
maybe_hibernate(#state{hibernate_after = HA}) -> HA. |
433 |
|
|
434 |
|
%% gen_tcp:close/2, but trying to ensure that all data is received by peer. |
435 |
|
%% |
436 |
|
%% This is based on tls_connection:workaround_transport_delivery_problems/2 code |
437 |
|
%% https://github.com/erlang/otp/blob/OTP_17.0-rc2/lib/ssl/src/tls_connection.erl#L959 |
438 |
|
%% |
439 |
|
%% There are some more docs why we need it in http://erlang.org/doc/man/gen_tcp.html#close-1 |
440 |
|
shutdown_socket_and_wait_for_peer_to_close(Socket, gen_tcp) -> |
441 |
|
%% Standard trick to try to make sure all |
442 |
|
%% data sent to the tcp port is really delivered to the |
443 |
|
%% peer application before tcp port is closed so that the peer will |
444 |
|
%% get the correct stream end and not only a transport close. |
445 |
4902 |
inet:setopts(Socket, [{active, false}]), |
446 |
4902 |
gen_tcp:shutdown(Socket, write), |
447 |
|
%% Will return when other side has closed or after 30 s |
448 |
|
%% e.g. we do not want to hang if something goes wrong |
449 |
|
%% with the network but we want to maximise the odds that |
450 |
|
%% peer application gets all data sent on the tcp connection. |
451 |
4902 |
gen_tcp:recv(Socket, 0, 30000); |
452 |
|
shutdown_socket_and_wait_for_peer_to_close(Socket, SockMod) -> |
453 |
600 |
SockMod:close(Socket). |