./ct_report/coverage/ejabberd_s2s_out.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : ejabberd_s2s_out.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Manage outgoing server-to-server connections
5 %%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(ejabberd_s2s_out).
27 -author('alexey@process-one.net').
28 -behaviour(p1_fsm).
29
30 % TODO this should be in a separate module after feature/cets is merged
31 -xep([{xep, 220}, {version, "1.1.1"}]).
32
33 %% External exports
34 -export([start/2,
35 start_link/2,
36 start_connection/1,
37 stop_connection/1,
38 terminate_if_waiting_delay/1]).
39
40 %% p1_fsm callbacks (same as gen_fsm)
41 -export([init/1,
42 open_socket/2,
43 wait_for_stream/2,
44 wait_for_validation/2,
45 wait_for_features/2,
46 wait_for_auth_result/2,
47 wait_for_starttls_proceed/2,
48 reopen_socket/2,
49 wait_before_retry/2,
50 stream_established/2,
51 handle_event/3,
52 handle_sync_event/4,
53 handle_info/3,
54 terminate/3,
55 print_state/1,
56 code_change/4]).
57
58 -export_type([connection_info/0]).
59
60 -ignore_xref([open_socket/2, print_state/1,
61 reopen_socket/2, start_link/2, stream_established/2,
62 wait_before_retry/2, wait_for_auth_result/2,
63 wait_for_features/2, wait_for_starttls_proceed/2, wait_for_stream/2,
64 wait_for_stream/2, wait_for_validation/2]).
65
66 -type verify_requester() :: false | {S2SIn :: pid(), Key :: ejabberd_s2s:s2s_dialback_key(), SID :: ejabberd_s2s:stream_id()}.
67
68 -include("mongoose.hrl").
69 -include("jlib.hrl").
70
71 -record(state, {socket,
72 streamid :: ejabberd_s2s:stream_id() | undefined,
73 remote_streamid = <<>> :: ejabberd_s2s:stream_id(),
74 use_v10 :: boolean(),
75 tls = false :: boolean(),
76 tls_required = false :: boolean(),
77 tls_enabled = false :: boolean(),
78 tls_options :: mongoose_tls:options(),
79 authenticated = false :: boolean(),
80 dialback_enabled = true :: boolean(),
81 try_auth = true :: boolean(),
82 from_to :: ejabberd_s2s:fromto(),
83 myname :: jid:lserver(),
84 server :: jid:lserver(),
85 queue :: element_queue(),
86 host_type :: mongooseim:host_type(),
87 delay_to_retry :: non_neg_integer() | undefined,
88 is_registered = false :: boolean(),
89 verify = false :: verify_requester(),
90 timer :: reference()
91 }).
92 -type state() :: #state{}.
93
94 -type connection_info() ::
95 #{pid => pid(),
96 direction => out,
97 statename => statename(),
98 addr => unknown | inet:ip_address(),
99 port => unknown | inet:port_number(),
100 streamid => ejabberd_s2s:stream_id() | undefined,
101 use_v10 => boolean(),
102 tls => boolean(),
103 tls_required => boolean(),
104 tls_enabled => boolean(),
105 tls_options => mongoose_tls:options(),
106 authenticated => boolean(),
107 dialback_enabled => boolean(),
108 try_auth => boolean(),
109 myname => jid:lserver(),
110 server => jid:lserver(),
111 delay_to_retry => undefined | non_neg_integer(),
112 verify => verify_requester()}.
113
114 -type element_queue() :: queue:queue(#xmlel{}).
115 -type statename() :: open_socket
116 | wait_for_stream
117 | wait_for_features
118 | wait_for_auth_result
119 | wait_for_starttls_proceed
120 | wait_for_validation
121 | wait_before_retry.
122
123 %% FSM handler return value
124 -type fsm_return() :: {stop, Reason :: normal, state()}
125 | {next_state, statename(), state()}
126 | {next_state, statename(), state(), Timeout :: integer()}.
127
128 -type dns_name() :: string().
129
130 -type addr() :: #{address := inet:ip_address() | dns_name(),
131 port := inet:port_number(),
132 type := inet | inet6}.
133
134 %%-define(DBGFSM, true).
135
136 -ifdef(DBGFSM).
137 -define(FSMOPTS, [{debug, [trace]}]).
138 -else.
139 -define(FSMOPTS, []).
140 -endif.
141
142 -define(FSMTIMEOUT, 30000).
143
144 %% We do not block on send anymore.
145 -define(TCP_SEND_TIMEOUT, 15000).
146
147 -define(STREAM_HEADER(From, To, Other),
148 <<"<?xml version='1.0'?>",
149 "<stream:stream "
150 "xmlns:stream='http://etherx.jabber.org/streams' "
151 "xmlns='jabber:server' "
152 "xmlns:db='jabber:server:dialback' "
153 "from='", (From)/binary, "' ",
154 "to='", (To)/binary, "' ",
155 (Other)/binary, ">">>
156 ).
157
158 -define(SOCKET_DEFAULT_RESULT, {error, badarg}).
159
160
161 -define(CLOSE_GENERIC(StateName, Reason, StateData),
162 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection">>,
163 state_name => StateName, reason => Reason,
164 myname => StateData#state.myname, server => StateData#state.server}),
165 {stop, normal, StateData}).
166
167 -define(CLOSE_GENERIC(StateName, Reason, El, StateData),
168 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection on stanza">>,
169 state_name => StateName, reason => Reason, exml_packet => El,
170 myname => StateData#state.myname, server => StateData#state.server}),
171 {stop, normal, StateData}).
172
173 %%%----------------------------------------------------------------------
174 %%% API
175 %%%----------------------------------------------------------------------
176 -spec start(ejabberd_s2s:fromto(), _) -> {error, _} | {ok, undefined | pid()} | {ok, undefined | pid(), _}.
177 start(FromTo, Type) ->
178 60 supervisor:start_child(ejabberd_s2s_out_sup, [FromTo, Type]).
179
180 -spec start_link(ejabberd_s2s:fromto(), _) -> ignore | {error, _} | {ok, pid()}.
181 start_link(FromTo, Type) ->
182 60 p1_fsm:start_link(ejabberd_s2s_out, [FromTo, Type],
183 fsm_limit_opts() ++ ?FSMOPTS).
184
185 start_connection(Pid) ->
186 53 p1_fsm:send_event(Pid, init).
187
188 stop_connection(Pid) ->
189 65 p1_fsm:send_event(Pid, closed).
190
191 %%%----------------------------------------------------------------------
192 %%% Callback functions from p1_fsm
193 %%%----------------------------------------------------------------------
194
195 %%----------------------------------------------------------------------
196 %% Func: init/1
197 %% Returns: {ok, StateName, StateData} |
198 %% {ok, StateName, StateData, Timeout} |
199 %% ignore |
200 %% {stop, StopReason}
201 %%----------------------------------------------------------------------
202 -spec init(list()) -> {ok, open_socket, state()}.
203 init([{From, Server} = FromTo, Type]) ->
204 60 process_flag(trap_exit, true),
205 60 ?LOG_DEBUG(#{what => s2s_out_started,
206 text => <<"New outgoing s2s connection">>,
207 60 from => From, server => Server, type => Type}),
208 60 {ok, HostType} = mongoose_domain_api:get_host_type(From),
209 60 {TLS, TLSRequired} = case mongoose_config:get_opt([{s2s, HostType}, use_starttls]) of
210 UseTls when (UseTls==false) ->
211 29 {false, false};
212 UseTls when (UseTls==true) or (UseTls==optional) ->
213 20 {true, false};
214 UseTls when (UseTls==required) or (UseTls==required_trusted) ->
215 11 {true, true}
216 end,
217 60 UseV10 = TLS,
218 60 {IsRegistered, Verify} = case Type of
219 new ->
220 31 {true, false};
221 {verify, Pid, Key, SID} ->
222 29 start_connection(self()),
223 29 {false, {Pid, Key, SID}}
224 end,
225 60 Timer = erlang:start_timer(mongoose_s2s_lib:timeout(), self(), []),
226 60 {ok, open_socket, #state{use_v10 = UseV10,
227 tls = TLS,
228 tls_required = TLSRequired,
229 tls_options = tls_options(HostType),
230 queue = queue:new(),
231 from_to = FromTo,
232 myname = From,
233 host_type = HostType,
234 server = Server,
235 is_registered = IsRegistered,
236 verify = Verify,
237 timer = Timer}}.
238
239 %%----------------------------------------------------------------------
240 %% Func: StateName/2
241 %% Returns: {next_state, NextStateName, NextStateData} |
242 %% {next_state, NextStateName, NextStateData, Timeout} |
243 %% {stop, Reason, NewStateData}
244 %%----------------------------------------------------------------------
245 -spec open_socket(_, state()) -> fsm_return().
246 open_socket(init, StateData = #state{host_type = HostType}) ->
247 53 log_s2s_out(StateData#state.is_registered,
248 StateData#state.myname,
249 StateData#state.server,
250 StateData#state.tls),
251 53 ?LOG_DEBUG(#{what => s2s_open_socket,
252 myname => StateData#state.myname,
253 server => StateData#state.server,
254 is_registered => StateData#state.is_registered,
255 53 verify => StateData#state.verify}),
256 53 AddrList = get_addr_list(HostType, StateData#state.server),
257 53 case lists:foldl(fun(_, {ok, Socket}) ->
258
:-(
{ok, Socket};
259 (#{address := Addr, port := Port, type := Type}, _) ->
260 45 open_socket2(HostType, Type, Addr, Port)
261 end, ?SOCKET_DEFAULT_RESULT, AddrList) of
262 {ok, Socket} ->
263 45 Version = case StateData#state.use_v10 of
264 24 true -> <<" version='1.0'">>;
265 21 false -> <<"">>
266 end,
267 45 NewStateData = StateData#state{socket = Socket,
268 tls_enabled = false,
269 streamid = new_id()},
270 45 send_text(NewStateData,
271 ?STREAM_HEADER(StateData#state.myname, StateData#state.server, Version)),
272 45 {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
273 {error, Reason} ->
274 8 ?LOG_WARNING(#{what => s2s_out_failed, reason => Reason,
275 text => <<"Outgoing s2s connection failed (remote server not found)">>,
276
:-(
myname => StateData#state.myname, server => StateData#state.server}),
277 8 wait_before_reconnect(StateData)
278 end;
279 open_socket(closed, StateData) ->
280 7 ?CLOSE_GENERIC(open_socket, closed, StateData);
281 open_socket(timeout, StateData) ->
282
:-(
?CLOSE_GENERIC(open_socket, timeout, StateData);
283 open_socket(_, StateData) ->
284
:-(
{next_state, open_socket, StateData}.
285
286 -spec open_socket2(mongooseim:host_type(), inet | inet6, inet:ip_address(), inet:port_number()) ->
287 {error, _} | {ok, _}.
288 open_socket2(HostType, Type, Addr, Port) ->
289 45 ?LOG_DEBUG(#{what => s2s_out_connecting,
290 45 address => Addr, port => Port}),
291 45 Timeout = outgoing_s2s_timeout(HostType),
292 45 SockOpts = [binary,
293 {packet, 0},
294 {send_timeout, ?TCP_SEND_TIMEOUT},
295 {send_timeout_close, true},
296 {active, false},
297 Type],
298
299 45 case (catch mongoose_transport:connect(s2s, Addr, Port, SockOpts, Timeout)) of
300 45 {ok, _Socket} = R -> R;
301 {error, Reason} = R ->
302
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
303
:-(
address => Addr, port => Port, reason => Reason}),
304
:-(
R;
305 {'EXIT', Reason} ->
306
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
307 text => <<"Failed to open s2s socket because of crashing">>,
308
:-(
address => Addr, port => Port, reason => Reason}),
309
:-(
{error, Reason}
310 end.
311
312 %%----------------------------------------------------------------------
313
314 -spec wait_for_stream(ejabberd:xml_stream_item(), state()) -> fsm_return().
315 wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData0) ->
316 63 RemoteStreamID = xml:get_attr_s(<<"id">>, Attrs),
317 63 StateData = StateData0#state{remote_streamid = RemoteStreamID},
318 63 case {xml:get_attr_s(<<"xmlns">>, Attrs),
319 xml:get_attr_s(<<"xmlns:db">>, Attrs),
320 xml:get_attr_s(<<"version">>, Attrs) == <<"1.0">>} of
321 {<<"jabber:server">>, <<"jabber:server:dialback">>, false} ->
322 29 send_dialback_request(StateData);
323 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
324 StateData#state.use_v10 ->
325 34 {next_state, wait_for_features, StateData, ?FSMTIMEOUT};
326 %% Clause added to handle Tigase's workaround for an old ejabberd bug:
327 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
328 not StateData#state.use_v10 ->
329
:-(
send_dialback_request(StateData);
330 {<<"jabber:server">>, <<"">>, true} when StateData#state.use_v10 ->
331
:-(
{next_state, wait_for_features, StateData#state{dialback_enabled = false}, ?FSMTIMEOUT};
332 {NSProvided, DB, _} ->
333
:-(
send_element(StateData, mongoose_xmpp_errors:invalid_namespace()),
334
:-(
?LOG_INFO(#{what => s2s_out_closing,
335 text => <<"Closing s2s connection: (invalid namespace)">>,
336 namespace_provided => NSProvided,
337 namespace_expected => <<"jabber:server">>,
338 xmlns_dialback_provided => DB,
339 all_attributes => Attrs,
340
:-(
myname => StateData#state.myname, server => StateData#state.server}),
341
:-(
{stop, normal, StateData}
342 end;
343 wait_for_stream({xmlstreamerror, _}, StateData) ->
344
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
345
:-(
send_text(StateData, ?STREAM_TRAILER),
346
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamerror, StateData);
347 wait_for_stream({xmlstreamend, _Name}, StateData) ->
348
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamend, StateData);
349 wait_for_stream(timeout, StateData) ->
350
:-(
?CLOSE_GENERIC(wait_for_stream, timeout, StateData);
351 wait_for_stream(closed, StateData) ->
352
:-(
?CLOSE_GENERIC(wait_for_stream, closed, StateData).
353
354
355 -spec wait_for_validation(ejabberd:xml_stream_item(), state()) -> fsm_return().
356 wait_for_validation({xmlstreamelement, El}, StateData = #state{from_to = FromTo}) ->
357 42 case mongoose_s2s_dialback:parse_validity(El) of
358 {step_3, FromTo, StreamID, IsValid} ->
359 15 ?LOG_DEBUG(#{what => s2s_receive_verify,
360 15 from_to => FromTo, stream_id => StreamID, is_valid => IsValid}),
361 15 case StateData#state.verify of
362 false ->
363 %% This is unexpected condition.
364 %% We've received step_3 reply, but there is no matching outgoing connection.
365 %% We could close the connection here.
366
:-(
next_state(wait_for_validation, StateData);
367 {Pid, _Key, _SID} ->
368 15 ejabberd_s2s_in:send_validity_from_s2s_out(Pid, IsValid, FromTo),
369 15 next_state(wait_for_validation, StateData)
370 end;
371 {step_4, FromTo, StreamID, IsValid} ->
372 26 ?LOG_DEBUG(#{what => s2s_receive_result,
373 26 from_to => FromTo, stream_id => StreamID, is_valid => IsValid}),
374 26 #state{tls_enabled = Enabled, tls_required = Required} = StateData,
375 26 case IsValid of
376 true when (Enabled==true) or (Required==false) ->
377 %% Initiating server receives valid verification result from receiving server (Step 4)
378 24 send_queue(StateData, StateData#state.queue),
379 24 ?LOG_INFO(#{what => s2s_out_connected,
380 text => <<"New outgoing s2s connection established">>,
381 tls_enabled => StateData#state.tls_enabled,
382 24 myname => StateData#state.myname, server => StateData#state.server}),
383 24 {next_state, stream_established,
384 StateData#state{queue = queue:new()}};
385 true when (Enabled==false) and (Required==true) ->
386 %% TODO: bounce packets
387 2 ?CLOSE_GENERIC(wait_for_validation, tls_required_but_unavailable, El, StateData);
388 _ ->
389 %% TODO: bounce packets
390
:-(
?CLOSE_GENERIC(wait_for_validation, invalid_dialback_key, El, StateData)
391 end;
392 false ->
393 1 {next_state, wait_for_validation, StateData, ?FSMTIMEOUT*3}
394 end;
395 wait_for_validation({xmlstreamend, _Name}, StateData) ->
396 1 ?CLOSE_GENERIC(wait_for_validation, xmlstreamend, StateData);
397 wait_for_validation({xmlstreamerror, _}, StateData) ->
398
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
399
:-(
send_text(StateData, ?STREAM_TRAILER),
400
:-(
?CLOSE_GENERIC(wait_for_validation, xmlstreamerror, StateData);
401 wait_for_validation(timeout, #state{verify = {VPid, VKey, SID}} = StateData)
402 when is_pid(VPid) and is_binary(VKey) and is_binary(SID) ->
403 %% This is an auxiliary s2s connection for dialback.
404 %% This timeout is normal and doesn't represent a problem.
405
:-(
?LOG_INFO(#{what => s2s_out_validation_timeout,
406 text => <<"Timeout in verify outgoing s2s connection. Stopping">>,
407
:-(
myname => StateData#state.myname, server => StateData#state.server}),
408
:-(
{stop, normal, StateData};
409 wait_for_validation(timeout, StateData) ->
410
:-(
?LOG_INFO(#{what => s2s_out_connect_timeout,
411 text => <<"Connection timeout in outgoing s2s connection. Stopping">>,
412
:-(
myname => StateData#state.myname, server => StateData#state.server}),
413
:-(
{stop, normal, StateData};
414 wait_for_validation(closed, StateData) ->
415 17 ?LOG_INFO(#{what => s2s_out_validation_closed,
416 text => <<"Connection closed when waiting for validation in outgoing s2s connection. Stopping">>,
417 17 myname => StateData#state.myname, server => StateData#state.server}),
418 17 {stop, normal, StateData}.
419
420
421 -spec wait_for_features(ejabberd:xml_stream_item(), state()) -> fsm_return().
422 wait_for_features({xmlstreamelement, El}, StateData) ->
423 34 case El of
424 #xmlel{name = <<"stream:features">>, children = Els} ->
425 34 {SASLEXT, StartTLS, StartTLSRequired} =
426 lists:foldl(
427 fun(#xmlel{name = <<"mechanisms">>, attrs = Attrs1,
428 children = Els1} = _El1, Acc) ->
429 2 Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
430 2 get_acc_with_new_sext(Attr, Els1, Acc);
431 (#xmlel{name = <<"starttls">>, attrs = Attrs1} = El1, Acc) ->
432 17 Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
433 17 get_acc_with_new_tls(Attr, El1, Acc);
434 (_, Acc) ->
435
:-(
Acc
436 end, {false, false, false}, Els),
437 34 handle_parsed_features({SASLEXT, StartTLS, StartTLSRequired, StateData});
438 _ ->
439
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
440
:-(
send_text(StateData, ?STREAM_TRAILER),
441
:-(
?CLOSE_GENERIC(wait_for_features, bad_format, El, StateData)
442 end;
443 wait_for_features({xmlstreamend, _Name}, StateData) ->
444
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamend, StateData);
445 wait_for_features({xmlstreamerror, _}, StateData) ->
446
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
447
:-(
send_text(StateData, ?STREAM_TRAILER),
448
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamerror, StateData);
449 wait_for_features(timeout, StateData) ->
450
:-(
?CLOSE_GENERIC(wait_for_features, timeout, StateData);
451 wait_for_features(closed, StateData) ->
452
:-(
?CLOSE_GENERIC(wait_for_features, closed, StateData).
453
454
455 -spec wait_for_auth_result(ejabberd:xml_stream_item(), state()) -> fsm_return().
456 wait_for_auth_result({xmlstreamelement, El}, StateData) ->
457 1 case El of
458 #xmlel{name = <<"success">>, attrs = Attrs} ->
459 1 case xml:get_attr_s(<<"xmlns">>, Attrs) of
460 ?NS_SASL ->
461 1 ?LOG_DEBUG(#{what => s2s_auth_success,
462 myname => StateData#state.myname,
463 1 server => StateData#state.server}),
464 1 send_text(StateData,
465 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
466 <<" version='1.0'">>)),
467 1 {next_state, wait_for_stream,
468 StateData#state{streamid = new_id(),
469 authenticated = true
470 }, ?FSMTIMEOUT};
471 _ ->
472
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
473
:-(
send_text(StateData, ?STREAM_TRAILER),
474
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
475 end;
476 #xmlel{name = <<"failure">>, attrs = Attrs} ->
477
:-(
case xml:get_attr_s(<<"xmlns">>, Attrs) of
478 ?NS_SASL ->
479
:-(
?LOG_WARNING(#{what => s2s_auth_failure,
480 text => <<"Received failure result in ejabberd_s2s_out. Restarting">>,
481 myname => StateData#state.myname,
482
:-(
server => StateData#state.server}),
483
:-(
mongoose_transport:close(StateData#state.socket),
484
:-(
{next_state, reopen_socket,
485 StateData#state{socket = undefined}, ?FSMTIMEOUT};
486 _ ->
487
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
488
:-(
send_text(StateData, ?STREAM_TRAILER),
489
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
490 end;
491 _ ->
492
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
493
:-(
send_text(StateData, ?STREAM_TRAILER),
494
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
495 end;
496 wait_for_auth_result({xmlstreamend, _Name}, StateData) ->
497
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamend, StateData);
498 wait_for_auth_result({xmlstreamerror, _}, StateData) ->
499
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
500
:-(
send_text(StateData, ?STREAM_TRAILER),
501
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamerror, StateData);
502 wait_for_auth_result(timeout, StateData) ->
503
:-(
?CLOSE_GENERIC(wait_for_auth_result, timeout, StateData);
504 wait_for_auth_result(closed, StateData) ->
505
:-(
?CLOSE_GENERIC(wait_for_auth_result, closed, StateData).
506
507
508 -spec wait_for_starttls_proceed(ejabberd:xml_stream_item(), state()) -> fsm_return().
509 wait_for_starttls_proceed({xmlstreamelement, El}, StateData) ->
510 17 case El of
511 #xmlel{name = <<"proceed">>, attrs = Attrs} ->
512 17 case xml:get_attr_s(<<"xmlns">>, Attrs) of
513 ?NS_TLS ->
514 17 ?LOG_DEBUG(#{what => s2s_starttls,
515 myname => StateData#state.myname,
516 17 server => StateData#state.server}),
517 17 TLSSocket = mongoose_transport:connect_tls(StateData#state.socket,
518 StateData#state.tls_options),
519 17 NewStateData = StateData#state{socket = TLSSocket,
520 streamid = new_id(),
521 tls_enabled = true},
522 17 send_text(NewStateData,
523 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
524 <<" version='1.0'">>)),
525 17 {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
526 _ ->
527
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
528
:-(
send_text(StateData, ?STREAM_TRAILER),
529
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
530 end;
531 _ ->
532
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
533 end;
534 wait_for_starttls_proceed({xmlstreamend, _Name}, StateData) ->
535
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamend, StateData);
536 wait_for_starttls_proceed({xmlstreamerror, _}, StateData) ->
537
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
538
:-(
send_text(StateData, ?STREAM_TRAILER),
539
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamerror, StateData);
540 wait_for_starttls_proceed(timeout, StateData) ->
541
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, timeout, StateData);
542 wait_for_starttls_proceed(closed, StateData) ->
543
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, closed, StateData).
544
545
546 -spec reopen_socket(ejabberd:xml_stream_item(), state()) -> fsm_return().
547 reopen_socket({xmlstreamelement, _El}, StateData) ->
548
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
549 reopen_socket({xmlstreamend, _Name}, StateData) ->
550
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
551 reopen_socket({xmlstreamerror, _}, StateData) ->
552
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
553 reopen_socket(timeout, StateData) ->
554
:-(
?CLOSE_GENERIC(reopen_socket, timeout, StateData);
555 reopen_socket(closed, StateData) ->
556
:-(
p1_fsm:send_event(self(), init),
557
:-(
{next_state, open_socket, StateData, ?FSMTIMEOUT}.
558
559
560 %% @doc This state is use to avoid reconnecting to often to bad sockets
561 -spec wait_before_retry(ejabberd:xml_stream_item(), state()) -> fsm_return().
562 wait_before_retry(_Event, StateData) ->
563 16 {next_state, wait_before_retry, StateData, ?FSMTIMEOUT}.
564
565 -spec stream_established(ejabberd:xml_stream_item(), state()) -> fsm_return().
566 stream_established({xmlstreamelement, El}, StateData = #state{from_to = FromTo}) ->
567 12 ?LOG_DEBUG(#{what => s2s_out_stream_established, exml_packet => El,
568 12 myname => StateData#state.myname, server => StateData#state.server}),
569 12 case mongoose_s2s_dialback:parse_validity(El) of
570 {step_3, FromTo, StreamID, IsValid} ->
571 12 ?LOG_DEBUG(#{what => s2s_recv_verify,
572 from_to => FromTo, stream_id => StreamID, is_valid => IsValid,
573 12 myname => StateData#state.myname, server => StateData#state.server}),
574 12 case StateData#state.verify of
575 {VPid, _VKey, _SID} ->
576 12 ejabberd_s2s_in:send_validity_from_s2s_out(VPid, IsValid, FromTo);
577 _ ->
578
:-(
ok
579 end;
580 {step_4, _FromTo, _StreamID, _IsValid} ->
581
:-(
ok;
582 false ->
583
:-(
ok
584 end,
585 12 {next_state, stream_established, StateData};
586 stream_established({xmlstreamend, _Name}, StateData) ->
587
:-(
?CLOSE_GENERIC(stream_established, xmlstreamend, StateData);
588 stream_established({xmlstreamerror, _}, StateData) ->
589
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
590
:-(
send_text(StateData, ?STREAM_TRAILER),
591
:-(
?CLOSE_GENERIC(stream_established, xmlstreamerror, StateData);
592 stream_established(timeout, StateData) ->
593
:-(
?CLOSE_GENERIC(stream_established, timeout, StateData);
594 stream_established(closed, StateData) ->
595 25 ?CLOSE_GENERIC(stream_established, closed, StateData).
596
597
598 %%----------------------------------------------------------------------
599 %% Func: StateName/3
600 %% Returns: {next_state, NextStateName, NextStateData} |
601 %% {next_state, NextStateName, NextStateData, Timeout} |
602 %% {reply, Reply, NextStateName, NextStateData} |
603 %% {reply, Reply, NextStateName, NextStateData, Timeout} |
604 %% {stop, Reason, NewStateData} |
605 %% {stop, Reason, Reply, NewStateData}
606 %%----------------------------------------------------------------------
607 %%state_name(Event, From, StateData) ->
608 %% Reply = ok,
609 %% {reply, Reply, state_name, StateData}.
610
611 %%----------------------------------------------------------------------
612 %% Func: handle_event/3
613 %% Returns: {next_state, NextStateName, NextStateData} |
614 %% {next_state, NextStateName, NextStateData, Timeout} |
615 %% {stop, Reason, NewStateData}
616 %%----------------------------------------------------------------------
617 handle_event(_Event, StateName, StateData) ->
618
:-(
next_state(StateName, StateData).
619
620 handle_sync_event(get_state_info, _From, StateName, StateData) ->
621 2 {reply, handle_get_state_info(StateName, StateData), StateName, StateData};
622 handle_sync_event(_Event, _From, StateName, StateData) ->
623
:-(
{reply, ok, StateName, StateData, get_timeout_interval(StateName)}.
624
625
626 code_change(_OldVsn, StateName, StateData, _Extra) ->
627
:-(
{ok, StateName, StateData}.
628
629 %%----------------------------------------------------------------------
630 %% Func: handle_info/3
631 %% Returns: {next_state, NextStateName, NextStateData} |
632 %% {next_state, NextStateName, NextStateData, Timeout} |
633 %% {stop, Reason, NewStateData}
634 %%----------------------------------------------------------------------
635 handle_info({send_element, Acc, El}, StateName, StateData) ->
636 97 case StateName of
637 stream_established ->
638 62 cancel_timer(StateData#state.timer),
639 62 Timer = erlang:start_timer(mongoose_s2s_lib:timeout(), self(), []),
640 62 send_element(StateData, El),
641 62 {next_state, StateName, StateData#state{timer = Timer}};
642 %% In this state we bounce all message: We are waiting before
643 %% trying to reconnect
644 wait_before_retry ->
645 12 bounce_element(Acc, El, mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s">>)),
646 12 {next_state, StateName, StateData};
647 _ ->
648 23 Q = queue:in({Acc, El}, StateData#state.queue),
649 23 next_state(StateName, StateData#state{queue = Q})
650 end;
651 handle_info({timeout, Timer, _}, wait_before_retry,
652 #state{timer = Timer} = StateData) ->
653 7 ?LOG_INFO(#{what => s2s_reconnect_delay_expired,
654 text => <<"Reconnect delay expired: Will now retry to connect when needed.">>,
655 myname => StateData#state.myname,
656 7 server => StateData#state.server}),
657 7 {stop, normal, StateData};
658 handle_info({timeout, Timer, _}, StateName,
659 #state{timer = Timer} = StateData) ->
660
:-(
?CLOSE_GENERIC(StateName, s2s_out_timeout, StateData);
661 handle_info(terminate_if_waiting_before_retry, wait_before_retry, StateData) ->
662
:-(
?CLOSE_GENERIC(wait_before_retry, terminate_if_waiting_before_retry, StateData);
663 handle_info(terminate_if_waiting_before_retry, StateName, StateData) ->
664 14 next_state(StateName, StateData);
665 handle_info(_, StateName, StateData) ->
666
:-(
next_state(StateName, StateData).
667
668 %%----------------------------------------------------------------------
669 %% Func: terminate/3
670 %% Purpose: Shutdown the fsm
671 %% Returns: any
672 %%----------------------------------------------------------------------
673 terminate(Reason, StateName, StateData) ->
674 59 ?LOG_DEBUG(#{what => s2s_out_closed, text => <<"ejabberd_s2s_out terminated">>,
675 reason => Reason, state_name => StateName,
676 59 myname => StateData#state.myname, server => StateData#state.server}),
677 59 case StateData#state.is_registered of
678 false ->
679 15 ok;
680 true ->
681 44 ejabberd_s2s:remove_connection(
682 {StateData#state.myname, StateData#state.server}, self())
683 end,
684 59 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"Bounced by s2s">>),
685 %% bounce queue manage by process and Erlang message queue
686 59 bounce_queue(StateData#state.queue, E),
687 59 case queue:is_empty(StateData#state.queue) of
688 true ->
689 56 ok;
690 false ->
691 3 ?LOG_WARNING(#{what => s2s_terminate_non_empty,
692 state_name => StateName, reason => Reason,
693 queue => lists:sublist(queue:to_list(StateData#state.queue), 10),
694
:-(
authenticated => StateData#state.authenticated})
695 end,
696 59 bounce_messages(E),
697 59 case StateData#state.socket of
698 undefined ->
699 14 ok;
700 _Socket ->
701 45 mongoose_transport:close(StateData#state.socket)
702 end,
703 59 ok.
704
705 %%----------------------------------------------------------------------
706 %% Func: print_state/1
707 %% Purpose: Prepare the state to be printed on error log
708 %% Returns: State to print
709 %%----------------------------------------------------------------------
710 print_state(State) ->
711
:-(
State.
712
713 %%%----------------------------------------------------------------------
714 %%% Internal functions
715 %%%----------------------------------------------------------------------
716
717 -spec send_text(state(), binary()) -> ok.
718 send_text(StateData, Text) ->
719 63 mongoose_transport:send_text(StateData#state.socket, Text).
720
721
722 -spec send_element(state(), exml:element()|mongoose_acc:t()) -> ok.
723 send_element(StateData, #xmlel{} = El) ->
724 138 mongoose_transport:send_element(StateData#state.socket, El).
725
726 -spec send_element(state(), mongoose_acc:t(), exml:element()) -> mongoose_acc:t().
727 send_element(StateData, Acc, El) ->
728 20 mongoose_transport:send_element(StateData#state.socket, El),
729 20 Acc.
730
731
732 -spec send_queue(state(), Q :: element_queue()) -> ok.
733 send_queue(StateData, Q) ->
734 45 case queue:out(Q) of
735 {{value, {Acc, El}}, Q1} ->
736 20 send_element(StateData, Acc, El),
737 20 send_queue(StateData, Q1);
738 {empty, _Q1} ->
739 25 ok
740 end.
741
742
743 %% @doc Bounce a single message (xmlel)
744 -spec bounce_element(Acc :: mongoose_acc:t(), El :: exml:element(), Error :: exml:element()) -> ok.
745 bounce_element(Acc, El, Error) ->
746 27 case mongoose_acc:stanza_type(Acc) of
747
:-(
<<"error">> -> ok;
748
:-(
<<"result">> -> ok;
749 _ ->
750 27 From = mongoose_acc:from_jid(Acc),
751 27 To = mongoose_acc:to_jid(Acc),
752 27 {Acc1, Err} = jlib:make_error_reply(Acc, El, Error),
753 27 ejabberd_router:route(To, From, Acc1, Err)
754 end.
755
756
757 -spec bounce_queue(Q :: element_queue(), Error :: exml:element()) -> ok.
758 bounce_queue(Q, Error) ->
759 70 case queue:out(Q) of
760 {{value, {Acc, El}}, Q1} ->
761 3 bounce_element(Acc, El, Error),
762 3 bounce_queue(Q1, Error);
763 {empty, _} ->
764 67 ok
765 end.
766
767
768 -spec new_id() -> binary().
769 new_id() ->
770 63 mongoose_bin:gen_from_crypto().
771
772
773 -spec cancel_timer(reference()) -> ok.
774 cancel_timer(Timer) ->
775 70 erlang:cancel_timer(Timer),
776 70 receive
777 {timeout, Timer, _} ->
778
:-(
ok
779 after 0 ->
780 70 ok
781 end.
782
783
784 -spec bounce_messages(exml:element()) -> ok.
785 bounce_messages(Error) ->
786 79 receive
787 {send_element, Acc, El} ->
788 12 bounce_element(Acc, El, Error),
789 12 bounce_messages(Error)
790 after 0 ->
791 67 ok
792 end.
793
794
795 -spec send_dialback_request(state()) -> fsm_return().
796 send_dialback_request(StateData) ->
797 44 IsRegistered = case StateData#state.is_registered of
798 false ->
799 29 ejabberd_s2s:try_register(StateData#state.from_to);
800 true ->
801 15 true
802 end,
803 44 NewStateData = StateData#state{is_registered = IsRegistered},
804 44 try
805 44 case IsRegistered of
806 false ->
807 %% Still not registered in the s2s table as an outgoing connection
808 15 ok;
809 true ->
810 29 Key1 = ejabberd_s2s:key(
811 StateData#state.host_type,
812 StateData#state.from_to,
813 StateData#state.remote_streamid),
814 %% Initiating server sends dialback key
815 29 send_element(StateData, mongoose_s2s_dialback:step_1(StateData#state.from_to, Key1))
816 end,
817 44 case StateData#state.verify of
818 false ->
819 15 ok;
820 {_Pid, Key2, SID} ->
821 %% Receiving server sends verification request
822 29 send_element(StateData, mongoose_s2s_dialback:step_2(StateData#state.from_to, Key2, SID))
823 end,
824 44 {next_state, wait_for_validation, NewStateData, ?FSMTIMEOUT*6}
825 catch
826 Class:Reason:Stacktrace ->
827
:-(
?LOG_ERROR(#{what => s2s_out_send_dialback_request_failed,
828 class => Class, reason => Reason, stacktrace => Stacktrace,
829
:-(
myname => StateData#state.myname, server => StateData#state.server}),
830
:-(
{stop, normal, NewStateData}
831 end.
832
833 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
834 %% SRV support
835
836 -include_lib("kernel/include/inet.hrl").
837
838 -spec lookup_services(mongooseim:host_type(), jid:lserver()) -> [addr()].
839 lookup_services(HostType, Server) ->
840 10 case mongoose_s2s_lib:domain_utf8_to_ascii(Server) of
841 1 false -> [];
842 9 ASCIIAddr -> do_lookup_services(HostType, ASCIIAddr)
843 end.
844
845 -spec do_lookup_services(mongooseim:host_type(), jid:lserver()) -> [addr()].
846 do_lookup_services(HostType, Server) ->
847 9 Res = srv_lookup(HostType, Server),
848 9 case Res of
849 {error, Reason} ->
850 7 ?LOG_DEBUG(#{what => s2s_srv_lookup_failed,
851 7 reason => Reason, server => Server}),
852 7 [];
853 {AddrList, Type} ->
854 %% Probabilities are not exactly proportional to weights
855 %% for simplicity (higher weights are overvalued)
856 2 case (catch lists:map(fun calc_addr_index/1, AddrList)) of
857 {'EXIT', _Reason} ->
858
:-(
[];
859 IndexedAddrs ->
860 2 Addrs = [#{address => Addr, port => Port, type => Type}
861 2 || {_Index, Addr, Port} <- lists:keysort(1, IndexedAddrs)],
862 2 ?LOG_DEBUG(#{what => s2s_srv_lookup_success,
863 2 addresses => Addrs, server => Server}),
864 2 Addrs
865 end
866 end.
867
868 -spec srv_lookup(mongooseim:host_type(), jid:lserver()) ->
869 {error, atom()} | {list(), inet | inet6}.
870 srv_lookup(HostType, Server) ->
871 9 #{timeout := TimeoutSec, retries := Retries} = mongoose_config:get_opt([{s2s, HostType}, dns]),
872 9 case srv_lookup(Server, timer:seconds(TimeoutSec), Retries) of
873 {error, Reason} ->
874 6 {error, Reason};
875 {ok, #hostent{h_addr_list = AddrList}} ->
876 3 case get_inet_protocol(Server) of
877 {error, Reason} ->
878 1 {error, Reason};
879 Type ->
880 2 {AddrList, Type}
881 end
882 end.
883
884 -spec get_inet_protocol(jid:lserver()) -> {error, atom()} | inet | inet6.
885 get_inet_protocol(Server) ->
886 3 case inet:getaddr(binary_to_list(Server), inet) of
887 {ok, _IPv4Addr} ->
888 2 inet;
889 {error, _} ->
890 1 case inet:getaddr(binary_to_list(Server), inet6) of
891 {ok, _IPv6Addr} ->
892
:-(
inet6;
893 {error, Reason} ->
894 1 {error, Reason}
895 end
896 end.
897
898 %% @doc XXX - this behaviour is suboptimal in the case that the domain
899 %% has a "_xmpp-server._tcp." but not a "_jabber._tcp." record and
900 %% we don't get a DNS reply for the "_xmpp-server._tcp." lookup. In this
901 %% case we'll give up when we get the "_jabber._tcp." nxdomain reply.
902 -spec srv_lookup(jid:server(),
903 Timeout :: non_neg_integer(),
904 Retries :: pos_integer()
905 ) -> {error, atom()} | {ok, inet:hostent()}.
906 srv_lookup(_Server, _Timeout, Retries) when Retries < 1 ->
907
:-(
{error, timeout};
908 srv_lookup(Server, Timeout, Retries) ->
909 9 case inet_res:getbyname("_xmpp-server._tcp." ++ binary_to_list(Server), srv, Timeout) of
910 {error, _Reason} ->
911 6 case inet_res:getbyname("_jabber._tcp." ++ binary_to_list(Server), srv, Timeout) of
912 {error, timeout} ->
913
:-(
?LOG_ERROR(#{what => s2s_dns_lookup_failed,
914 text => <<"The DNS servers timed out on request for IN SRV."
915 " You should check your DNS configuration.">>,
916 nameserver => inet_db:res_option(nameserver),
917
:-(
server => Server}),
918
:-(
srv_lookup(Server, Timeout, Retries - 1);
919 6 R -> R
920 end;
921 3 {ok, _HEnt} = R -> R
922 end.
923
924 -spec lookup_addrs(mongooseim:host_type(), jid:server()) -> [addr()].
925 lookup_addrs(HostType, Server) ->
926 8 Port = outgoing_s2s_port(HostType),
927 8 lists:foldl(fun(Type, []) ->
928 16 [#{address => Addr, port => Port, type => Type}
929 16 || Addr <- lookup_addrs_for_type(Server, Type)];
930 (_Type, Addrs) ->
931
:-(
Addrs
932 end, [], outgoing_s2s_types(HostType)).
933
934 -spec lookup_addrs_for_type(jid:lserver(), inet | inet6) -> [inet:ip_address()].
935 lookup_addrs_for_type(Server, Type) ->
936 16 case inet:gethostbyname(binary_to_list(Server), Type) of
937 {ok, #hostent{h_addr_list = Addrs}} ->
938
:-(
?LOG_DEBUG(#{what => s2s_srv_resolve_success,
939
:-(
type => Type, server => Server, addresses => Addrs}),
940
:-(
Addrs;
941 {error, Reason} ->
942 16 ?LOG_DEBUG(#{what => s2s_srv_resolve_failed,
943 16 type => Type, server => Server, reason => Reason}),
944 16 []
945 end.
946
947
948 -spec outgoing_s2s_port(mongooseim:host_type()) -> inet:port_number().
949 outgoing_s2s_port(HostType) ->
950 51 mongoose_config:get_opt([{s2s, HostType}, outgoing, port]).
951
952
953 -spec outgoing_s2s_types(mongooseim:host_type()) -> [inet | inet6, ...].
954 outgoing_s2s_types(HostType) ->
955 %% DISCUSSION: Why prefer IPv4 first?
956 %%
957 %% IPv4 connectivity will be available for everyone for
958 %% many years to come. So, there's absolutely no benefit
959 %% in preferring IPv6 connections which are flaky at best
960 %% nowadays.
961 %%
962 %% On the other hand content providers hesitate putting up
963 %% AAAA records for their sites due to the mentioned
964 %% quality of current IPv6 connectivity. Making IPv6 the a
965 %% `fallback' may avoid these problems elegantly.
966 8 [ip_version_to_type(V) || V <- mongoose_config:get_opt([{s2s, HostType}, outgoing, ip_versions])].
967
968 8 ip_version_to_type(4) -> inet;
969 8 ip_version_to_type(6) -> inet6.
970
971 -spec outgoing_s2s_timeout(mongooseim:host_type()) -> non_neg_integer() | infinity.
972 outgoing_s2s_timeout(HostType) ->
973 45 mongoose_config:get_opt([{s2s, HostType}, outgoing, connection_timeout], 10000).
974
975 %% @doc Human readable S2S logging: Log only new outgoing connections as INFO
976 %% Do not log dialback
977 29 log_s2s_out(false, _, _, _) -> ok;
978 %% Log new outgoing connections:
979 log_s2s_out(_, Myname, Server, Tls) ->
980 24 ?LOG_INFO(#{what => s2s_out,
981 text => <<"Trying to open s2s connection">>,
982 24 myname => Myname, server => Server, tls => Tls}).
983
984 next_state(StateName, StateData) ->
985 52 {next_state, StateName, StateData,
986 get_timeout_interval(StateName)}.
987
988 %% @doc Calculate timeout depending on which state we are in:
989 %% Can return integer > 0 | infinity
990 -spec get_timeout_interval(statename()) -> infinity | non_neg_integer().
991 get_timeout_interval(StateName) ->
992 52 case StateName of
993 %% Validation implies dialback: Networking can take longer:
994 wait_for_validation ->
995 30 ?FSMTIMEOUT*6;
996 %% When stream is established, we only rely on S2S Timeout timer:
997 stream_established ->
998 1 infinity;
999 _ ->
1000 21 ?FSMTIMEOUT
1001 end.
1002
1003
1004 %% @doc This function is intended to be called at the end of a state
1005 %% function that want to wait for a reconnect delay before stopping.
1006 -spec wait_before_reconnect(state()) -> fsm_return().
1007 wait_before_reconnect(StateData) ->
1008 8 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s (waiting)">>),
1009 %% bounce queue manage by process and Erlang message queue
1010 8 bounce_queue(StateData#state.queue, E),
1011 8 bounce_messages(E),
1012 8 cancel_timer(StateData#state.timer),
1013 8 Delay = case StateData#state.delay_to_retry of
1014 undefined ->
1015 %% The initial delay is random between 1 and 15 seconds
1016 %% Return a random integer between 1000 and 15000
1017 8 MicroSecs = erlang:system_time(microsecond),
1018 8 (MicroSecs rem 14000) + 1000;
1019 D1 ->
1020 %% Duplicate the delay with each successive failed
1021 %% reconnection attempt, but don't exceed the max
1022
:-(
lists:min([D1 * 2, get_max_retry_delay(StateData#state.host_type)])
1023 end,
1024 8 Timer = erlang:start_timer(Delay, self(), []),
1025 8 {next_state, wait_before_retry, StateData#state{timer=Timer,
1026 delay_to_retry = Delay,
1027 queue = queue:new()}}.
1028
1029
1030 %% @doc Get the maximum allowed delay for retry to reconnect (in milliseconds).
1031 %% The default value is 5 minutes.
1032 %% The option {s2s_max_retry_delay, Seconds} can be used (in seconds).
1033 get_max_retry_delay(HostType) ->
1034
:-(
mongoose_config:get_opt([{s2s, HostType}, max_retry_delay]) * 1000.
1035
1036
1037 %% @doc Terminate s2s_out connections that are in state wait_before_retry
1038 -spec terminate_if_waiting_delay(ejabberd_s2s:fromto()) -> ok.
1039 terminate_if_waiting_delay(FromTo) ->
1040 28 Pids = ejabberd_s2s:get_s2s_out_pids(FromTo),
1041 28 lists:foreach(
1042 fun(Pid) ->
1043 14 Pid ! terminate_if_waiting_before_retry
1044 end,
1045 Pids).
1046
1047
1048 -spec fsm_limit_opts() -> [{max_queue, integer()}].
1049 fsm_limit_opts() ->
1050 60 case mongoose_config:lookup_opt(max_fsm_queue) of
1051 {ok, N} ->
1052 60 [{max_queue, N}];
1053 {error, not_found} ->
1054
:-(
[]
1055 end.
1056
1057 -spec get_addr_list(mongooseim:host_type(), jid:lserver()) -> [addr()].
1058 get_addr_list(HostType, Server) ->
1059 53 lists:foldl(fun(F, []) -> F(HostType, Server);
1060 88 (_, Result) -> Result
1061 end, [], [fun get_predefined_addresses/2,
1062 fun lookup_services/2,
1063 fun lookup_addrs/2]).
1064
1065 %% @doc Get IPs predefined for a given s2s domain in the configuration
1066 -spec get_predefined_addresses(mongooseim:host_type(), jid:lserver()) -> [addr()].
1067 get_predefined_addresses(HostType, Server) ->
1068 53 case mongoose_config:lookup_opt([{s2s, HostType}, address, Server]) of
1069 {ok, #{ip_address := IPAddress} = M} ->
1070 43 {ok, IPTuple} = inet:parse_address(IPAddress),
1071 43 Port = get_predefined_port(HostType, M),
1072 43 [#{address => IPTuple, port => Port, type => addr_type(IPTuple)}];
1073 {error, not_found} ->
1074 10 []
1075 end.
1076
1077
:-(
get_predefined_port(_HostType, #{port := Port}) -> Port;
1078 43 get_predefined_port(HostType, _Addr) -> outgoing_s2s_port(HostType).
1079
1080 43 addr_type(Addr) when tuple_size(Addr) =:= 4 -> inet;
1081
:-(
addr_type(Addr) when tuple_size(Addr) =:= 8 -> inet6.
1082
1083 get_acc_with_new_sext(?NS_SASL, Els1, {_SEXT, STLS, STLSReq}) ->
1084 2 NewSEXT =
1085 lists:any(
1086 fun(#xmlel{name = <<"mechanism">>,
1087 children = Els2}) ->
1088 2 case xml:get_cdata(Els2) of
1089 2 <<"EXTERNAL">> -> true;
1090
:-(
_ -> false
1091 end;
1092
:-(
(_) -> false
1093 end, Els1),
1094 2 {NewSEXT, STLS, STLSReq};
1095 get_acc_with_new_sext(_, _, Acc) ->
1096
:-(
Acc.
1097
1098 get_acc_with_new_tls(?NS_TLS, El1, {SEXT, _STLS, _STLSReq}) ->
1099 17 Req = case xml:get_subtag(El1, <<"required">>) of
1100 9 #xmlel{} -> true;
1101 8 false -> false
1102 end,
1103 17 {SEXT, true, Req};
1104 get_acc_with_new_tls(_, _, Acc) ->
1105
:-(
Acc.
1106
1107 tls_options(HostType) ->
1108 60 Ciphers = mongoose_config:get_opt([{s2s, HostType}, ciphers]),
1109 60 Options = #{verify_mode => peer, ciphers => Ciphers},
1110 60 case mongoose_s2s_lib:lookup_certfile(HostType) of
1111 37 {ok, CertFile} -> Options#{certfile => CertFile};
1112 23 {error, not_found} -> Options
1113 end.
1114
1115 calc_addr_index({Priority, Weight, Port, Host}) ->
1116 2 N = case Weight of
1117
:-(
0 -> 0;
1118 2 _ -> (Weight + 1) * rand:uniform()
1119 end,
1120 2 {Priority * 65536 - N, Host, Port}.
1121
1122 handle_parsed_features({false, false, _, StateData = #state{authenticated = true}}) ->
1123 1 send_queue(StateData, StateData#state.queue),
1124 1 ?LOG_INFO(#{what => s2s_out_connected,
1125 text => <<"New outgoing s2s connection established">>,
1126 1 myname => StateData#state.myname, server => StateData#state.server}),
1127 1 {next_state, stream_established,
1128 StateData#state{queue = queue:new()}};
1129 handle_parsed_features({true, _, _, StateData = #state{try_auth = true, is_registered = true}}) ->
1130 1 send_element(StateData,
1131 #xmlel{name = <<"auth">>,
1132 attrs = [{<<"xmlns">>, ?NS_SASL},
1133 {<<"mechanism">>, <<"EXTERNAL">>}],
1134 children =
1135 [#xmlcdata{content = jlib:encode_base64(
1136 StateData#state.myname)}]}),
1137 1 {next_state, wait_for_auth_result,
1138 StateData#state{try_auth = false}, ?FSMTIMEOUT};
1139 handle_parsed_features({_, true, _, StateData = #state{tls = true, tls_enabled = false}}) ->
1140 17 send_element(StateData,
1141 #xmlel{name = <<"starttls">>,
1142 attrs = [{<<"xmlns">>, ?NS_TLS}]}),
1143 17 {next_state, wait_for_starttls_proceed, StateData,
1144 ?FSMTIMEOUT};
1145 handle_parsed_features({_, _, true, StateData = #state{tls = false}}) ->
1146
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1147
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1148
:-(
mongoose_transport:close(StateData#state.socket),
1149
:-(
{next_state, reopen_socket,
1150 StateData#state{socket = undefined,
1151 use_v10 = false}, ?FSMTIMEOUT};
1152 handle_parsed_features({_, _, _, StateData = #state{dialback_enabled = true}}) ->
1153 15 send_dialback_request(StateData);
1154 handle_parsed_features({_, _, _, StateData}) ->
1155
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1156
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1157 % TODO: clear message queue
1158
:-(
mongoose_transport:close(StateData#state.socket),
1159
:-(
{next_state, reopen_socket, StateData#state{socket = undefined,
1160 use_v10 = false}, ?FSMTIMEOUT}.
1161
1162 handle_get_state_info(StateName, StateData) ->
1163 2 {Addr, Port} = get_peername(StateData#state.socket),
1164 2 #{pid => self(),
1165 direction => out,
1166 statename => StateName,
1167 addr => Addr,
1168 port => Port,
1169 streamid => StateData#state.streamid,
1170 use_v10 => StateData#state.use_v10,
1171 tls => StateData#state.tls,
1172 tls_required => StateData#state.tls_required,
1173 tls_enabled => StateData#state.tls_enabled,
1174 tls_options => StateData#state.tls_options,
1175 authenticated => StateData#state.authenticated,
1176 dialback_enabled => StateData#state.dialback_enabled,
1177 try_auth => StateData#state.try_auth,
1178 myname => StateData#state.myname,
1179 server => StateData#state.server,
1180 delay_to_retry => StateData#state.delay_to_retry,
1181 verify => StateData#state.verify}.
1182
1183 get_peername(undefined) ->
1184
:-(
{unknown, unknown};
1185 get_peername(Socket) ->
1186 2 {ok, {Addr, Port}} = mongoose_transport:peername(Socket),
1187 2 {Addr, Port}.
Line Hits Source