./ct_report/coverage/ejabberd_s2s_out.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : ejabberd_s2s_out.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Manage outgoing server-to-server connections
5 %%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(ejabberd_s2s_out).
27 -author('alexey@process-one.net').
28 -behaviour(p1_fsm).
29
30 % TODO this should be in a separate module after feature/cets is merged
31 -xep([{xep, 220}, {version, "1.1.1"}]).
32
33 %% External exports
34 -export([start/2,
35 start_link/2,
36 start_connection/1,
37 stop_connection/1,
38 terminate_if_waiting_delay/1]).
39
40 %% p1_fsm callbacks (same as gen_fsm)
41 -export([init/1,
42 open_socket/2,
43 wait_for_stream/2,
44 wait_for_validation/2,
45 wait_for_features/2,
46 wait_for_auth_result/2,
47 wait_for_starttls_proceed/2,
48 reopen_socket/2,
49 wait_before_retry/2,
50 stream_established/2,
51 handle_event/3,
52 handle_sync_event/4,
53 handle_info/3,
54 terminate/3,
55 print_state/1,
56 code_change/4]).
57
58 -export_type([connection_info/0]).
59
60 -ignore_xref([open_socket/2, print_state/1,
61 reopen_socket/2, start_link/2, stream_established/2,
62 wait_before_retry/2, wait_for_auth_result/2,
63 wait_for_features/2, wait_for_starttls_proceed/2, wait_for_stream/2,
64 wait_for_stream/2, wait_for_validation/2]).
65
66 -type verify_requester() :: false | {S2SIn :: pid(), Key :: ejabberd_s2s:s2s_dialback_key(), SID :: ejabberd_s2s:stream_id()}.
67
68 -include("mongoose.hrl").
69 -include("jlib.hrl").
70
71 -record(state, {socket,
72 streamid :: ejabberd_s2s:stream_id() | undefined,
73 remote_streamid = <<>> :: ejabberd_s2s:stream_id(),
74 use_v10 :: boolean(),
75 tls = false :: boolean(),
76 tls_required = false :: boolean(),
77 tls_enabled = false :: boolean(),
78 tls_options :: mongoose_tls:options(),
79 authenticated = false :: boolean(),
80 dialback_enabled = true :: boolean(),
81 try_auth = true :: boolean(),
82 from_to :: ejabberd_s2s:fromto(),
83 myname :: jid:lserver(),
84 server :: jid:lserver(),
85 queue :: element_queue(),
86 host_type :: mongooseim:host_type(),
87 delay_to_retry :: non_neg_integer() | undefined,
88 is_registered = false :: boolean(),
89 verify = false :: verify_requester(),
90 timer :: reference()
91 }).
92 -type state() :: #state{}.
93
94 -type connection_info() ::
95 #{pid => pid(),
96 direction => out,
97 statename => statename(),
98 addr => unknown | inet:ip_address(),
99 port => unknown | inet:port_number(),
100 streamid => ejabberd_s2s:stream_id() | undefined,
101 use_v10 => boolean(),
102 tls => boolean(),
103 tls_required => boolean(),
104 tls_enabled => boolean(),
105 tls_options => mongoose_tls:options(),
106 authenticated => boolean(),
107 dialback_enabled => boolean(),
108 try_auth => boolean(),
109 myname => jid:lserver(),
110 server => jid:lserver(),
111 delay_to_retry => undefined | non_neg_integer(),
112 verify => verify_requester()}.
113
114 -type element_queue() :: queue:queue(#xmlel{}).
115 -type statename() :: open_socket
116 | wait_for_stream
117 | wait_for_features
118 | wait_for_auth_result
119 | wait_for_starttls_proceed
120 | wait_for_validation
121 | wait_before_retry.
122
123 %% FSM handler return value
124 -type fsm_return() :: {'stop', Reason :: 'normal', state()}
125 | {'next_state', statename(), state()}
126 | {'next_state', statename(), state(), Timeout :: integer()}.
127
128 -type addr() :: #{ip_tuple := inet:ip_address(),
129 port := inet:port_number(),
130 type := inet | inet6}.
131
132 %%-define(DBGFSM, true).
133
134 -ifdef(DBGFSM).
135 -define(FSMOPTS, [{debug, [trace]}]).
136 -else.
137 -define(FSMOPTS, []).
138 -endif.
139
140 -define(FSMTIMEOUT, 30000).
141
142 %% We do not block on send anymore.
143 -define(TCP_SEND_TIMEOUT, 15000).
144
145 -define(STREAM_HEADER(From, To, Other),
146 <<"<?xml version='1.0'?>",
147 "<stream:stream "
148 "xmlns:stream='http://etherx.jabber.org/streams' "
149 "xmlns='jabber:server' "
150 "xmlns:db='jabber:server:dialback' "
151 "from='", (From)/binary, "' ",
152 "to='", (To)/binary, "' ",
153 (Other)/binary, ">">>
154 ).
155
156 -define(SOCKET_DEFAULT_RESULT, {error, badarg}).
157
158
159 -define(CLOSE_GENERIC(StateName, Reason, StateData),
160 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection">>,
161 state_name => StateName, reason => Reason,
162 myname => StateData#state.myname, server => StateData#state.server}),
163 {stop, normal, StateData}).
164
165 -define(CLOSE_GENERIC(StateName, Reason, El, StateData),
166 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection on stanza">>,
167 state_name => StateName, reason => Reason, exml_packet => El,
168 myname => StateData#state.myname, server => StateData#state.server}),
169 {stop, normal, StateData}).
170
171 %%%----------------------------------------------------------------------
172 %%% API
173 %%%----------------------------------------------------------------------
174 -spec start(ejabberd_s2s:fromto(), _) -> {'error', _} | {'ok', 'undefined' | pid()} | {'ok', 'undefined' | pid(), _}.
175 start(FromTo, Type) ->
176 53 supervisor:start_child(ejabberd_s2s_out_sup, [FromTo, Type]).
177
178 -spec start_link(ejabberd_s2s:fromto(), _) -> 'ignore' | {'error', _} | {'ok', pid()}.
179 start_link(FromTo, Type) ->
180 53 p1_fsm:start_link(ejabberd_s2s_out, [FromTo, Type],
181 fsm_limit_opts() ++ ?FSMOPTS).
182
183 start_connection(Pid) ->
184 50 p1_fsm:send_event(Pid, init).
185
186 stop_connection(Pid) ->
187 52 p1_fsm:send_event(Pid, closed).
188
189 %%%----------------------------------------------------------------------
190 %%% Callback functions from p1_fsm
191 %%%----------------------------------------------------------------------
192
193 %%----------------------------------------------------------------------
194 %% Func: init/1
195 %% Returns: {ok, StateName, StateData} |
196 %% {ok, StateName, StateData, Timeout} |
197 %% ignore |
198 %% {stop, StopReason}
199 %%----------------------------------------------------------------------
200 -spec init(list()) -> {'ok', 'open_socket', state()}.
201 init([{From, Server} = FromTo, Type]) ->
202 53 process_flag(trap_exit, true),
203 53 ?LOG_DEBUG(#{what => s2s_out_started,
204 text => <<"New outgoing s2s connection">>,
205 53 from => From, server => Server, type => Type}),
206 53 {ok, HostType} = mongoose_domain_api:get_host_type(From),
207 53 {TLS, TLSRequired} = case mongoose_config:get_opt([{s2s, HostType}, use_starttls]) of
208 UseTls when (UseTls==false) ->
209 24 {false, false};
210 UseTls when (UseTls==true) or (UseTls==optional) ->
211 18 {true, false};
212 UseTls when (UseTls==required) or (UseTls==required_trusted) ->
213 11 {true, true}
214 end,
215 53 UseV10 = TLS,
216 53 {IsRegistered, Verify} = case Type of
217 new ->
218 26 {true, false};
219 {verify, Pid, Key, SID} ->
220 27 start_connection(self()),
221 27 {false, {Pid, Key, SID}}
222 end,
223 53 Timer = erlang:start_timer(mongoose_s2s_lib:timeout(), self(), []),
224 53 {ok, open_socket, #state{use_v10 = UseV10,
225 tls = TLS,
226 tls_required = TLSRequired,
227 tls_options = tls_options(HostType),
228 queue = queue:new(),
229 from_to = FromTo,
230 myname = From,
231 host_type = HostType,
232 server = Server,
233 is_registered = IsRegistered,
234 verify = Verify,
235 timer = Timer}}.
236
237 %%----------------------------------------------------------------------
238 %% Func: StateName/2
239 %% Returns: {next_state, NextStateName, NextStateData} |
240 %% {next_state, NextStateName, NextStateData, Timeout} |
241 %% {stop, Reason, NewStateData}
242 %%----------------------------------------------------------------------
243 -spec open_socket(_, state()) -> fsm_return().
244 open_socket(init, StateData = #state{host_type = HostType}) ->
245 50 log_s2s_out(StateData#state.is_registered,
246 StateData#state.myname,
247 StateData#state.server,
248 StateData#state.tls),
249 50 ?LOG_DEBUG(#{what => s2s_open_socket,
250 myname => StateData#state.myname,
251 server => StateData#state.server,
252 is_registered => StateData#state.is_registered,
253 50 verify => StateData#state.verify}),
254 50 AddrList = get_addr_list(HostType, StateData#state.server),
255 50 case lists:foldl(fun(_, {ok, Socket}) ->
256
:-(
{ok, Socket};
257 (#{ip_tuple := Addr, port := Port, type := Type}, _) ->
258 42 open_socket2(HostType, Type, Addr, Port)
259 end, ?SOCKET_DEFAULT_RESULT, AddrList) of
260 {ok, Socket} ->
261 42 Version = case StateData#state.use_v10 of
262 23 true -> <<" version='1.0'">>;
263 19 false -> <<"">>
264 end,
265 42 NewStateData = StateData#state{socket = Socket,
266 tls_enabled = false,
267 streamid = new_id()},
268 42 send_text(NewStateData,
269 ?STREAM_HEADER(StateData#state.myname, StateData#state.server, Version)),
270 42 {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
271 {error, Reason} ->
272 8 ?LOG_WARNING(#{what => s2s_out_failed, reason => Reason,
273 text => <<"Outgoing s2s connection failed (remote server not found)">>,
274
:-(
myname => StateData#state.myname, server => StateData#state.server}),
275 8 wait_before_reconnect(StateData)
276 end;
277 open_socket(closed, StateData) ->
278 3 ?CLOSE_GENERIC(open_socket, closed, StateData);
279 open_socket(timeout, StateData) ->
280
:-(
?CLOSE_GENERIC(open_socket, timeout, StateData);
281 open_socket(_, StateData) ->
282
:-(
{next_state, open_socket, StateData}.
283
284 -spec open_socket2(mongooseim:host_type(), inet | inet6, inet:ip_address(), inet:port_number()) ->
285 {'error', _} | {'ok', _}.
286 open_socket2(HostType, Type, Addr, Port) ->
287 42 ?LOG_DEBUG(#{what => s2s_out_connecting,
288 42 address => Addr, port => Port}),
289 42 Timeout = outgoing_s2s_timeout(HostType),
290 42 SockOpts = [binary,
291 {packet, 0},
292 {send_timeout, ?TCP_SEND_TIMEOUT},
293 {send_timeout_close, true},
294 {active, false},
295 Type],
296
297 42 case (catch mongoose_transport:connect(s2s, Addr, Port, SockOpts, Timeout)) of
298 42 {ok, _Socket} = R -> R;
299 {error, Reason} = R ->
300
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
301
:-(
address => Addr, port => Port, reason => Reason}),
302
:-(
R;
303 {'EXIT', Reason} ->
304
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
305 text => <<"Failed to open s2s socket because of crashing">>,
306
:-(
address => Addr, port => Port, reason => Reason}),
307
:-(
{error, Reason}
308 end.
309
310 %%----------------------------------------------------------------------
311
312 -spec wait_for_stream(ejabberd:xml_stream_item(), state()) -> fsm_return().
313 wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData0) ->
314 60 RemoteStreamID = xml:get_attr_s(<<"id">>, Attrs),
315 60 StateData = StateData0#state{remote_streamid = RemoteStreamID},
316 60 case {xml:get_attr_s(<<"xmlns">>, Attrs),
317 xml:get_attr_s(<<"xmlns:db">>, Attrs),
318 xml:get_attr_s(<<"version">>, Attrs) == <<"1.0">>} of
319 {<<"jabber:server">>, <<"jabber:server:dialback">>, false} ->
320 26 send_dialback_request(StateData);
321 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
322 StateData#state.use_v10 ->
323 34 {next_state, wait_for_features, StateData, ?FSMTIMEOUT};
324 %% Clause added to handle Tigase's workaround for an old ejabberd bug:
325 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
326 not StateData#state.use_v10 ->
327
:-(
send_dialback_request(StateData);
328 {<<"jabber:server">>, <<"">>, true} when StateData#state.use_v10 ->
329
:-(
{next_state, wait_for_features, StateData#state{dialback_enabled = false}, ?FSMTIMEOUT};
330 {NSProvided, DB, _} ->
331
:-(
send_element(StateData, mongoose_xmpp_errors:invalid_namespace()),
332
:-(
?LOG_INFO(#{what => s2s_out_closing,
333 text => <<"Closing s2s connection: (invalid namespace)">>,
334 namespace_provided => NSProvided,
335 namespace_expected => <<"jabber:server">>,
336 xmlns_dialback_provided => DB,
337 all_attributes => Attrs,
338
:-(
myname => StateData#state.myname, server => StateData#state.server}),
339
:-(
{stop, normal, StateData}
340 end;
341 wait_for_stream({xmlstreamerror, _}, StateData) ->
342
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
343
:-(
send_text(StateData, ?STREAM_TRAILER),
344
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamerror, StateData);
345 wait_for_stream({xmlstreamend, _Name}, StateData) ->
346
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamend, StateData);
347 wait_for_stream(timeout, StateData) ->
348
:-(
?CLOSE_GENERIC(wait_for_stream, timeout, StateData);
349 wait_for_stream(closed, StateData) ->
350
:-(
?CLOSE_GENERIC(wait_for_stream, closed, StateData).
351
352
353 -spec wait_for_validation(ejabberd:xml_stream_item(), state()) -> fsm_return().
354 wait_for_validation({xmlstreamelement, El}, StateData = #state{from_to = FromTo}) ->
355 39 case mongoose_s2s_dialback:parse_validity(El) of
356 {step_3, FromTo, StreamID, IsValid} ->
357 14 ?LOG_DEBUG(#{what => s2s_receive_verify,
358 14 from_to => FromTo, stream_id => StreamID, is_valid => IsValid}),
359 14 case StateData#state.verify of
360 false ->
361 %% This is unexpected condition.
362 %% We've received step_3 reply, but there is no matching outgoing connection.
363 %% We could close the connection here.
364
:-(
next_state(wait_for_validation, StateData);
365 {Pid, _Key, _SID} ->
366 14 ejabberd_s2s_in:send_validity_from_s2s_out(Pid, IsValid, FromTo),
367 14 next_state(wait_for_validation, StateData)
368 end;
369 {step_4, FromTo, StreamID, IsValid} ->
370 24 ?LOG_DEBUG(#{what => s2s_receive_result,
371 24 from_to => FromTo, stream_id => StreamID, is_valid => IsValid}),
372 24 #state{tls_enabled = Enabled, tls_required = Required} = StateData,
373 24 case IsValid of
374 true when (Enabled==true) or (Required==false) ->
375 %% Initiating server receives valid verification result from receiving server (Step 4)
376 22 send_queue(StateData, StateData#state.queue),
377 22 ?LOG_INFO(#{what => s2s_out_connected,
378 text => <<"New outgoing s2s connection established">>,
379 tls_enabled => StateData#state.tls_enabled,
380 22 myname => StateData#state.myname, server => StateData#state.server}),
381 22 {next_state, stream_established,
382 StateData#state{queue = queue:new()}};
383 true when (Enabled==false) and (Required==true) ->
384 %% TODO: bounce packets
385 2 ?CLOSE_GENERIC(wait_for_validation, tls_required_but_unavailable, El, StateData);
386 _ ->
387 %% TODO: bounce packets
388
:-(
?CLOSE_GENERIC(wait_for_validation, invalid_dialback_key, El, StateData)
389 end;
390 false ->
391 1 {next_state, wait_for_validation, StateData, ?FSMTIMEOUT*3}
392 end;
393 wait_for_validation({xmlstreamend, _Name}, StateData) ->
394 1 ?CLOSE_GENERIC(wait_for_validation, xmlstreamend, StateData);
395 wait_for_validation({xmlstreamerror, _}, StateData) ->
396
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
397
:-(
send_text(StateData, ?STREAM_TRAILER),
398
:-(
?CLOSE_GENERIC(wait_for_validation, xmlstreamerror, StateData);
399 wait_for_validation(timeout, #state{verify = {VPid, VKey, SID}} = StateData)
400 when is_pid(VPid) and is_binary(VKey) and is_binary(SID) ->
401 %% This is an auxiliary s2s connection for dialback.
402 %% This timeout is normal and doesn't represent a problem.
403
:-(
?LOG_INFO(#{what => s2s_out_validation_timeout,
404 text => <<"Timeout in verify outgoing s2s connection. Stopping">>,
405
:-(
myname => StateData#state.myname, server => StateData#state.server}),
406
:-(
{stop, normal, StateData};
407 wait_for_validation(timeout, StateData) ->
408
:-(
?LOG_INFO(#{what => s2s_out_connect_timeout,
409 text => <<"Connection timeout in outgoing s2s connection. Stopping">>,
410
:-(
myname => StateData#state.myname, server => StateData#state.server}),
411
:-(
{stop, normal, StateData};
412 wait_for_validation(closed, StateData) ->
413 16 ?LOG_INFO(#{what => s2s_out_validation_closed,
414 text => <<"Connection closed when waiting for validation in outgoing s2s connection. Stopping">>,
415 16 myname => StateData#state.myname, server => StateData#state.server}),
416 16 {stop, normal, StateData}.
417
418
419 -spec wait_for_features(ejabberd:xml_stream_item(), state()) -> fsm_return().
420 wait_for_features({xmlstreamelement, El}, StateData) ->
421 34 case El of
422 #xmlel{name = <<"stream:features">>, children = Els} ->
423 34 {SASLEXT, StartTLS, StartTLSRequired} =
424 lists:foldl(
425 fun(#xmlel{name = <<"mechanisms">>, attrs = Attrs1,
426 children = Els1} = _El1, Acc) ->
427 2 Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
428 2 get_acc_with_new_sext(Attr, Els1, Acc);
429 (#xmlel{name = <<"starttls">>, attrs = Attrs1} = El1, Acc) ->
430 17 Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
431 17 get_acc_with_new_tls(Attr, El1, Acc);
432 (_, Acc) ->
433
:-(
Acc
434 end, {false, false, false}, Els),
435 34 handle_parsed_features({SASLEXT, StartTLS, StartTLSRequired, StateData});
436 _ ->
437
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
438
:-(
send_text(StateData, ?STREAM_TRAILER),
439
:-(
?CLOSE_GENERIC(wait_for_features, bad_format, El, StateData)
440 end;
441 wait_for_features({xmlstreamend, _Name}, StateData) ->
442
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamend, StateData);
443 wait_for_features({xmlstreamerror, _}, StateData) ->
444
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
445
:-(
send_text(StateData, ?STREAM_TRAILER),
446
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamerror, StateData);
447 wait_for_features(timeout, StateData) ->
448
:-(
?CLOSE_GENERIC(wait_for_features, timeout, StateData);
449 wait_for_features(closed, StateData) ->
450
:-(
?CLOSE_GENERIC(wait_for_features, closed, StateData).
451
452
453 -spec wait_for_auth_result(ejabberd:xml_stream_item(), state()) -> fsm_return().
454 wait_for_auth_result({xmlstreamelement, El}, StateData) ->
455 1 case El of
456 #xmlel{name = <<"success">>, attrs = Attrs} ->
457 1 case xml:get_attr_s(<<"xmlns">>, Attrs) of
458 ?NS_SASL ->
459 1 ?LOG_DEBUG(#{what => s2s_auth_success,
460 myname => StateData#state.myname,
461 1 server => StateData#state.server}),
462 1 send_text(StateData,
463 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
464 <<" version='1.0'">>)),
465 1 {next_state, wait_for_stream,
466 StateData#state{streamid = new_id(),
467 authenticated = true
468 }, ?FSMTIMEOUT};
469 _ ->
470
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
471
:-(
send_text(StateData, ?STREAM_TRAILER),
472
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
473 end;
474 #xmlel{name = <<"failure">>, attrs = Attrs} ->
475
:-(
case xml:get_attr_s(<<"xmlns">>, Attrs) of
476 ?NS_SASL ->
477
:-(
?LOG_WARNING(#{what => s2s_auth_failure,
478 text => <<"Received failure result in ejabberd_s2s_out. Restarting">>,
479 myname => StateData#state.myname,
480
:-(
server => StateData#state.server}),
481
:-(
mongoose_transport:close(StateData#state.socket),
482
:-(
{next_state, reopen_socket,
483 StateData#state{socket = undefined}, ?FSMTIMEOUT};
484 _ ->
485
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
486
:-(
send_text(StateData, ?STREAM_TRAILER),
487
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
488 end;
489 _ ->
490
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
491
:-(
send_text(StateData, ?STREAM_TRAILER),
492
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
493 end;
494 wait_for_auth_result({xmlstreamend, _Name}, StateData) ->
495
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamend, StateData);
496 wait_for_auth_result({xmlstreamerror, _}, StateData) ->
497
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
498
:-(
send_text(StateData, ?STREAM_TRAILER),
499
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamerror, StateData);
500 wait_for_auth_result(timeout, StateData) ->
501
:-(
?CLOSE_GENERIC(wait_for_auth_result, timeout, StateData);
502 wait_for_auth_result(closed, StateData) ->
503
:-(
?CLOSE_GENERIC(wait_for_auth_result, closed, StateData).
504
505
506 -spec wait_for_starttls_proceed(ejabberd:xml_stream_item(), state()) -> fsm_return().
507 wait_for_starttls_proceed({xmlstreamelement, El}, StateData) ->
508 17 case El of
509 #xmlel{name = <<"proceed">>, attrs = Attrs} ->
510 17 case xml:get_attr_s(<<"xmlns">>, Attrs) of
511 ?NS_TLS ->
512 17 ?LOG_DEBUG(#{what => s2s_starttls,
513 myname => StateData#state.myname,
514 17 server => StateData#state.server}),
515 17 TLSSocket = mongoose_transport:connect_tls(StateData#state.socket,
516 StateData#state.tls_options),
517 17 NewStateData = StateData#state{socket = TLSSocket,
518 streamid = new_id(),
519 tls_enabled = true},
520 17 send_text(NewStateData,
521 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
522 <<" version='1.0'">>)),
523 17 {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
524 _ ->
525
:-(
send_element(StateData, mongoose_xmpp_errors:bad_format()),
526
:-(
send_text(StateData, ?STREAM_TRAILER),
527
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
528 end;
529 _ ->
530
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
531 end;
532 wait_for_starttls_proceed({xmlstreamend, _Name}, StateData) ->
533
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamend, StateData);
534 wait_for_starttls_proceed({xmlstreamerror, _}, StateData) ->
535
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
536
:-(
send_text(StateData, ?STREAM_TRAILER),
537
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamerror, StateData);
538 wait_for_starttls_proceed(timeout, StateData) ->
539
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, timeout, StateData);
540 wait_for_starttls_proceed(closed, StateData) ->
541
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, closed, StateData).
542
543
544 -spec reopen_socket(ejabberd:xml_stream_item(), state()) -> fsm_return().
545 reopen_socket({xmlstreamelement, _El}, StateData) ->
546
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
547 reopen_socket({xmlstreamend, _Name}, StateData) ->
548
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
549 reopen_socket({xmlstreamerror, _}, StateData) ->
550
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
551 reopen_socket(timeout, StateData) ->
552
:-(
?CLOSE_GENERIC(reopen_socket, timeout, StateData);
553 reopen_socket(closed, StateData) ->
554
:-(
p1_fsm:send_event(self(), init),
555
:-(
{next_state, open_socket, StateData, ?FSMTIMEOUT}.
556
557
558 %% @doc This state is use to avoid reconnecting to often to bad sockets
559 -spec wait_before_retry(ejabberd:xml_stream_item(), state()) -> fsm_return().
560 wait_before_retry(_Event, StateData) ->
561 10 {next_state, wait_before_retry, StateData, ?FSMTIMEOUT}.
562
563 -spec stream_established(ejabberd:xml_stream_item(), state()) -> fsm_return().
564 stream_established({xmlstreamelement, El}, StateData = #state{from_to = FromTo}) ->
565 11 ?LOG_DEBUG(#{what => s2s_out_stream_established, exml_packet => El,
566 11 myname => StateData#state.myname, server => StateData#state.server}),
567 11 case mongoose_s2s_dialback:parse_validity(El) of
568 {step_3, FromTo, StreamID, IsValid} ->
569 11 ?LOG_DEBUG(#{what => s2s_recv_verify,
570 from_to => FromTo, stream_id => StreamID, is_valid => IsValid,
571 11 myname => StateData#state.myname, server => StateData#state.server}),
572 11 case StateData#state.verify of
573 {VPid, _VKey, _SID} ->
574 11 ejabberd_s2s_in:send_validity_from_s2s_out(VPid, IsValid, FromTo);
575 _ ->
576
:-(
ok
577 end;
578 {step_4, _FromTo, _StreamID, _IsValid} ->
579
:-(
ok;
580 false ->
581
:-(
ok
582 end,
583 11 {next_state, stream_established, StateData};
584 stream_established({xmlstreamend, _Name}, StateData) ->
585
:-(
?CLOSE_GENERIC(stream_established, xmlstreamend, StateData);
586 stream_established({xmlstreamerror, _}, StateData) ->
587
:-(
send_element(StateData, mongoose_xmpp_errors:xml_not_well_formed()),
588
:-(
send_text(StateData, ?STREAM_TRAILER),
589
:-(
?CLOSE_GENERIC(stream_established, xmlstreamerror, StateData);
590 stream_established(timeout, StateData) ->
591
:-(
?CLOSE_GENERIC(stream_established, timeout, StateData);
592 stream_established(closed, StateData) ->
593 23 ?CLOSE_GENERIC(stream_established, closed, StateData).
594
595
596 %%----------------------------------------------------------------------
597 %% Func: StateName/3
598 %% Returns: {next_state, NextStateName, NextStateData} |
599 %% {next_state, NextStateName, NextStateData, Timeout} |
600 %% {reply, Reply, NextStateName, NextStateData} |
601 %% {reply, Reply, NextStateName, NextStateData, Timeout} |
602 %% {stop, Reason, NewStateData} |
603 %% {stop, Reason, Reply, NewStateData}
604 %%----------------------------------------------------------------------
605 %%state_name(Event, From, StateData) ->
606 %% Reply = ok,
607 %% {reply, Reply, state_name, StateData}.
608
609 %%----------------------------------------------------------------------
610 %% Func: handle_event/3
611 %% Returns: {next_state, NextStateName, NextStateData} |
612 %% {next_state, NextStateName, NextStateData, Timeout} |
613 %% {stop, Reason, NewStateData}
614 %%----------------------------------------------------------------------
615 handle_event(_Event, StateName, StateData) ->
616
:-(
next_state(StateName, StateData).
617
618 handle_sync_event(get_state_info, _From, StateName, StateData) ->
619 2 {reply, handle_get_state_info(StateName, StateData), StateName, StateData};
620 handle_sync_event(_Event, _From, StateName, StateData) ->
621
:-(
{reply, ok, StateName, StateData, get_timeout_interval(StateName)}.
622
623
624 code_change(_OldVsn, StateName, StateData, _Extra) ->
625
:-(
{ok, StateName, StateData}.
626
627 %%----------------------------------------------------------------------
628 %% Func: handle_info/3
629 %% Returns: {next_state, NextStateName, NextStateData} |
630 %% {next_state, NextStateName, NextStateData, Timeout} |
631 %% {stop, Reason, NewStateData}
632 %%----------------------------------------------------------------------
633 handle_info({send_element, Acc, El}, StateName, StateData) ->
634 103 case StateName of
635 stream_established ->
636 65 cancel_timer(StateData#state.timer),
637 65 Timer = erlang:start_timer(mongoose_s2s_lib:timeout(), self(), []),
638 65 send_element(StateData, El),
639 65 {next_state, StateName, StateData#state{timer = Timer}};
640 %% In this state we bounce all message: We are waiting before
641 %% trying to reconnect
642 wait_before_retry ->
643 20 bounce_element(Acc, El, mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s">>)),
644 20 {next_state, StateName, StateData};
645 _ ->
646 18 Q = queue:in({Acc, El}, StateData#state.queue),
647 18 next_state(StateName, StateData#state{queue = Q})
648 end;
649 handle_info({timeout, Timer, _}, wait_before_retry,
650 #state{timer = Timer} = StateData) ->
651 8 ?LOG_INFO(#{what => s2s_reconnect_delay_expired,
652 text => <<"Reconnect delay expired: Will now retry to connect when needed.">>,
653 myname => StateData#state.myname,
654 8 server => StateData#state.server}),
655 8 {stop, normal, StateData};
656 handle_info({timeout, Timer, _}, StateName,
657 #state{timer = Timer} = StateData) ->
658
:-(
?CLOSE_GENERIC(StateName, s2s_out_timeout, StateData);
659 handle_info(terminate_if_waiting_before_retry, wait_before_retry, StateData) ->
660
:-(
?CLOSE_GENERIC(wait_before_retry, terminate_if_waiting_before_retry, StateData);
661 handle_info(terminate_if_waiting_before_retry, StateName, StateData) ->
662 13 next_state(StateName, StateData);
663 handle_info(_, StateName, StateData) ->
664
:-(
next_state(StateName, StateData).
665
666 %%----------------------------------------------------------------------
667 %% Func: terminate/3
668 %% Purpose: Shutdown the fsm
669 %% Returns: any
670 %%----------------------------------------------------------------------
671 terminate(Reason, StateName, StateData) ->
672 53 ?LOG_DEBUG(#{what => s2s_out_closed, text => <<"ejabberd_s2s_out terminated">>,
673 reason => Reason, state_name => StateName,
674 53 myname => StateData#state.myname, server => StateData#state.server}),
675 53 case StateData#state.is_registered of
676 false ->
677 14 ok;
678 true ->
679 39 ejabberd_s2s:remove_connection(
680 {StateData#state.myname, StateData#state.server}, self())
681 end,
682 53 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"Bounced by s2s">>),
683 %% bounce queue manage by process and Erlang message queue
684 53 bounce_queue(StateData#state.queue, E),
685 53 case queue:is_empty(StateData#state.queue) of
686 true ->
687 50 ok;
688 false ->
689 3 ?LOG_WARNING(#{what => s2s_terminate_non_empty,
690 state_name => StateName, reason => Reason,
691 queue => lists:sublist(queue:to_list(StateData#state.queue), 10),
692
:-(
authenticated => StateData#state.authenticated})
693 end,
694 53 bounce_messages(E),
695 53 case StateData#state.socket of
696 undefined ->
697 11 ok;
698 _Socket ->
699 42 mongoose_transport:close(StateData#state.socket)
700 end,
701 53 ok.
702
703 %%----------------------------------------------------------------------
704 %% Func: print_state/1
705 %% Purpose: Prepare the state to be printed on error log
706 %% Returns: State to print
707 %%----------------------------------------------------------------------
708 print_state(State) ->
709
:-(
State.
710
711 %%%----------------------------------------------------------------------
712 %%% Internal functions
713 %%%----------------------------------------------------------------------
714
715 -spec send_text(state(), binary()) -> 'ok'.
716 send_text(StateData, Text) ->
717 60 mongoose_transport:send_text(StateData#state.socket, Text).
718
719
720 -spec send_element(state(), exml:element()|mongoose_acc:t()) -> 'ok'.
721 send_element(StateData, #xmlel{} = El) ->
722 137 mongoose_transport:send_element(StateData#state.socket, El).
723
724 -spec send_element(state(), mongoose_acc:t(), exml:element()) -> mongoose_acc:t().
725 send_element(StateData, Acc, El) ->
726 15 mongoose_transport:send_element(StateData#state.socket, El),
727 15 Acc.
728
729
730 -spec send_queue(state(), Q :: element_queue()) -> 'ok'.
731 send_queue(StateData, Q) ->
732 38 case queue:out(Q) of
733 {{value, {Acc, El}}, Q1} ->
734 15 send_element(StateData, Acc, El),
735 15 send_queue(StateData, Q1);
736 {empty, _Q1} ->
737 23 ok
738 end.
739
740
741 %% @doc Bounce a single message (xmlel)
742 -spec bounce_element(Acc :: mongoose_acc:t(), El :: exml:element(), Error :: exml:element()) -> 'ok'.
743 bounce_element(Acc, El, Error) ->
744 36 case mongoose_acc:stanza_type(Acc) of
745
:-(
<<"error">> -> ok;
746
:-(
<<"result">> -> ok;
747 _ ->
748 36 From = mongoose_acc:from_jid(Acc),
749 36 To = mongoose_acc:to_jid(Acc),
750 36 {Acc1, Err} = jlib:make_error_reply(Acc, El, Error),
751 36 ejabberd_router:route(To, From, Acc1, Err)
752 end.
753
754
755 -spec bounce_queue(Q :: element_queue(), Error :: exml:element()) -> 'ok'.
756 bounce_queue(Q, Error) ->
757 64 case queue:out(Q) of
758 {{value, {Acc, El}}, Q1} ->
759 3 bounce_element(Acc, El, Error),
760 3 bounce_queue(Q1, Error);
761 {empty, _} ->
762 61 ok
763 end.
764
765
766 -spec new_id() -> binary().
767 new_id() ->
768 60 mongoose_bin:gen_from_crypto().
769
770
771 -spec cancel_timer(reference()) -> 'ok'.
772 cancel_timer(Timer) ->
773 73 erlang:cancel_timer(Timer),
774 73 receive
775 {timeout, Timer, _} ->
776
:-(
ok
777 after 0 ->
778 73 ok
779 end.
780
781
782 -spec bounce_messages(exml:element()) -> 'ok'.
783 bounce_messages(Error) ->
784 74 receive
785 {send_element, Acc, El} ->
786 13 bounce_element(Acc, El, Error),
787 13 bounce_messages(Error)
788 after 0 ->
789 61 ok
790 end.
791
792
793 -spec send_dialback_request(state()) -> fsm_return().
794 send_dialback_request(StateData) ->
795 41 IsRegistered = case StateData#state.is_registered of
796 false ->
797 27 ejabberd_s2s:try_register(StateData#state.from_to);
798 true ->
799 14 true
800 end,
801 41 NewStateData = StateData#state{is_registered = IsRegistered},
802 41 try
803 41 case IsRegistered of
804 false ->
805 %% Still not registered in the s2s table as an outgoing connection
806 14 ok;
807 true ->
808 27 Key1 = ejabberd_s2s:key(
809 StateData#state.host_type,
810 StateData#state.from_to,
811 StateData#state.remote_streamid),
812 %% Initiating server sends dialback key
813 27 send_element(StateData, mongoose_s2s_dialback:step_1(StateData#state.from_to, Key1))
814 end,
815 41 case StateData#state.verify of
816 false ->
817 14 ok;
818 {_Pid, Key2, SID} ->
819 %% Receiving server sends verification request
820 27 send_element(StateData, mongoose_s2s_dialback:step_2(StateData#state.from_to, Key2, SID))
821 end,
822 41 {next_state, wait_for_validation, NewStateData, ?FSMTIMEOUT*6}
823 catch
824 Class:Reason:Stacktrace ->
825
:-(
?LOG_ERROR(#{what => s2s_out_send_dialback_request_failed,
826 class => Class, reason => Reason, stacktrace => Stacktrace,
827
:-(
myname => StateData#state.myname, server => StateData#state.server}),
828
:-(
{stop, normal, NewStateData}
829 end.
830
831 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
832 %% SRV support
833
834 -include_lib("kernel/include/inet.hrl").
835
836 -spec lookup_services(mongooseim:host_type(), jid:lserver()) -> [addr()].
837 lookup_services(HostType, Server) ->
838 8 case mongoose_s2s_lib:domain_utf8_to_ascii(Server) of
839 1 false -> [];
840 7 ASCIIAddr -> do_lookup_services(HostType, ASCIIAddr)
841 end.
842
843 -spec do_lookup_services(mongooseim:host_type(),jid:lserver()) -> [addr()].
844 do_lookup_services(HostType, Server) ->
845 7 Res = srv_lookup(HostType, Server),
846 7 case Res of
847 {error, Reason} ->
848 7 ?LOG_DEBUG(#{what => s2s_srv_lookup_failed,
849 7 reason => Reason, server => Server}),
850 7 [];
851 {ok, #hostent{h_addr_list = AddrList, h_addrtype = Type}} ->
852 %% Probabilities are not exactly proportional to weights
853 %% for simplicity (higher weights are overvalued)
854
:-(
case (catch lists:map(fun calc_addr_index/1, AddrList)) of
855 {'EXIT', _Reason} ->
856
:-(
[];
857 IndexedAddrs ->
858
:-(
Addrs = [#{ip_tuple => Addr, port => Port, type => Type}
859
:-(
|| {_Index, Addr, Port} <- lists:keysort(1, IndexedAddrs)],
860
:-(
?LOG_DEBUG(#{what => s2s_srv_lookup_success,
861
:-(
addresses => Addrs, server => Server}),
862
:-(
Addrs
863 end
864 end.
865
866
867 -spec srv_lookup(mongooseim:host_type(), jid:lserver()) ->
868 {'error', atom()} | {'ok', inet:hostent()}.
869 srv_lookup(HostType, Server) ->
870 7 #{timeout := TimeoutSec, retries := Retries} = mongoose_config:get_opt([{s2s, HostType}, dns]),
871 7 srv_lookup(Server, timer:seconds(TimeoutSec), Retries).
872
873
874 %% @doc XXX - this behaviour is suboptimal in the case that the domain
875 %% has a "_xmpp-server._tcp." but not a "_jabber._tcp." record and
876 %% we don't get a DNS reply for the "_xmpp-server._tcp." lookup. In this
877 %% case we'll give up when we get the "_jabber._tcp." nxdomain reply.
878 -spec srv_lookup(jid:server(),
879 Timeout :: non_neg_integer(),
880 Retries :: pos_integer()
881 ) -> {'error', atom()} | {'ok', inet:hostent()}.
882 srv_lookup(_Server, _Timeout, Retries) when Retries < 1 ->
883
:-(
{error, timeout};
884 srv_lookup(Server, Timeout, Retries) ->
885 7 case inet_res:getbyname("_xmpp-server._tcp." ++ binary_to_list(Server), srv, Timeout) of
886 {error, _Reason} ->
887 7 case inet_res:getbyname("_jabber._tcp." ++ binary_to_list(Server), srv, Timeout) of
888 {error, timeout} ->
889
:-(
?LOG_ERROR(#{what => s2s_dns_lookup_failed,
890 text => <<"The DNS servers timed out on request for IN SRV."
891 " You should check your DNS configuration.">>,
892 nameserver => inet_db:res_option(nameserver),
893
:-(
server => Server}),
894
:-(
srv_lookup(Server, Timeout, Retries - 1);
895 7 R -> R
896 end;
897
:-(
{ok, _HEnt} = R -> R
898 end.
899
900 -spec lookup_addrs(mongooseim:host_type(), jid:server()) -> [addr()].
901 lookup_addrs(HostType, Server) ->
902 8 Port = outgoing_s2s_port(HostType),
903 8 lists:foldl(fun(Type, []) ->
904 16 [#{ip_tuple => Addr, port => Port, type => Type}
905 16 || Addr <- lookup_addrs_for_type(Server, Type)];
906 (_Type, Addrs) ->
907
:-(
Addrs
908 end, [], outgoing_s2s_types(HostType)).
909
910 -spec lookup_addrs_for_type(jid:lserver(), inet | inet6) -> [inet:ip_address()].
911 lookup_addrs_for_type(Server, Type) ->
912 16 case inet:gethostbyname(binary_to_list(Server), Type) of
913 {ok, #hostent{h_addr_list = Addrs}} ->
914
:-(
?LOG_DEBUG(#{what => s2s_srv_resolve_success,
915
:-(
type => Type, server => Server, addresses => Addrs}),
916
:-(
Addrs;
917 {error, Reason} ->
918 16 ?LOG_DEBUG(#{what => s2s_srv_resolve_failed,
919 16 type => Type, server => Server, reason => Reason}),
920 16 []
921 end.
922
923
924 -spec outgoing_s2s_port(mongooseim:host_type()) -> inet:port_number().
925 outgoing_s2s_port(HostType) ->
926 50 mongoose_config:get_opt([{s2s, HostType}, outgoing, port]).
927
928
929 -spec outgoing_s2s_types(mongooseim:host_type()) -> [inet | inet6, ...].
930 outgoing_s2s_types(HostType) ->
931 %% DISCUSSION: Why prefer IPv4 first?
932 %%
933 %% IPv4 connectivity will be available for everyone for
934 %% many years to come. So, there's absolutely no benefit
935 %% in preferring IPv6 connections which are flaky at best
936 %% nowadays.
937 %%
938 %% On the other hand content providers hesitate putting up
939 %% AAAA records for their sites due to the mentioned
940 %% quality of current IPv6 connectivity. Making IPv6 the a
941 %% `fallback' may avoid these problems elegantly.
942 8 [ip_version_to_type(V) || V <- mongoose_config:get_opt([{s2s, HostType}, outgoing, ip_versions])].
943
944 8 ip_version_to_type(4) -> inet;
945 8 ip_version_to_type(6) -> inet6.
946
947 -spec outgoing_s2s_timeout(mongooseim:host_type()) -> non_neg_integer() | infinity.
948 outgoing_s2s_timeout(HostType) ->
949 42 mongoose_config:get_opt([{s2s, HostType}, outgoing, connection_timeout], 10000).
950
951 %% @doc Human readable S2S logging: Log only new outgoing connections as INFO
952 %% Do not log dialback
953 27 log_s2s_out(false, _, _, _) -> ok;
954 %% Log new outgoing connections:
955 log_s2s_out(_, Myname, Server, Tls) ->
956 23 ?LOG_INFO(#{what => s2s_out,
957 text => <<"Trying to open s2s connection">>,
958 23 myname => Myname, server => Server, tls => Tls}).
959
960 next_state(StateName, StateData) ->
961 45 {next_state, StateName, StateData,
962 get_timeout_interval(StateName)}.
963
964 %% @doc Calculate timeout depending on which state we are in:
965 %% Can return integer > 0 | infinity
966 -spec get_timeout_interval(statename()) -> 'infinity' | non_neg_integer().
967 get_timeout_interval(StateName) ->
968 45 case StateName of
969 %% Validation implies dialback: Networking can take longer:
970 wait_for_validation ->
971 27 ?FSMTIMEOUT*6;
972 %% When stream is established, we only rely on S2S Timeout timer:
973 stream_established ->
974 1 infinity;
975 _ ->
976 17 ?FSMTIMEOUT
977 end.
978
979
980 %% @doc This function is intended to be called at the end of a state
981 %% function that want to wait for a reconnect delay before stopping.
982 -spec wait_before_reconnect(state()) -> fsm_return().
983 wait_before_reconnect(StateData) ->
984 8 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s (waiting)">>),
985 %% bounce queue manage by process and Erlang message queue
986 8 bounce_queue(StateData#state.queue, E),
987 8 bounce_messages(E),
988 8 cancel_timer(StateData#state.timer),
989 8 Delay = case StateData#state.delay_to_retry of
990 undefined ->
991 %% The initial delay is random between 1 and 15 seconds
992 %% Return a random integer between 1000 and 15000
993 8 MicroSecs = erlang:system_time(microsecond),
994 8 (MicroSecs rem 14000) + 1000;
995 D1 ->
996 %% Duplicate the delay with each successive failed
997 %% reconnection attempt, but don't exceed the max
998
:-(
lists:min([D1 * 2, get_max_retry_delay(StateData#state.host_type)])
999 end,
1000 8 Timer = erlang:start_timer(Delay, self(), []),
1001 8 {next_state, wait_before_retry, StateData#state{timer=Timer,
1002 delay_to_retry = Delay,
1003 queue = queue:new()}}.
1004
1005
1006 %% @doc Get the maximum allowed delay for retry to reconnect (in milliseconds).
1007 %% The default value is 5 minutes.
1008 %% The option {s2s_max_retry_delay, Seconds} can be used (in seconds).
1009 get_max_retry_delay(HostType) ->
1010
:-(
mongoose_config:get_opt([{s2s, HostType}, max_retry_delay]) * 1000.
1011
1012
1013 %% @doc Terminate s2s_out connections that are in state wait_before_retry
1014 -spec terminate_if_waiting_delay(ejabberd_s2s:fromto()) -> ok.
1015 terminate_if_waiting_delay(FromTo) ->
1016 26 Pids = ejabberd_s2s:get_s2s_out_pids(FromTo),
1017 26 lists:foreach(
1018 fun(Pid) ->
1019 13 Pid ! terminate_if_waiting_before_retry
1020 end,
1021 Pids).
1022
1023
1024 -spec fsm_limit_opts() -> [{'max_queue', integer()}].
1025 fsm_limit_opts() ->
1026 53 case mongoose_config:lookup_opt(max_fsm_queue) of
1027 {ok, N} ->
1028 53 [{max_queue, N}];
1029 {error, not_found} ->
1030
:-(
[]
1031 end.
1032
1033 -spec get_addr_list(mongooseim:host_type(), jid:lserver()) -> [addr()].
1034 get_addr_list(HostType, Server) ->
1035 50 lists:foldl(fun(F, []) -> F(HostType, Server);
1036 84 (_, Result) -> Result
1037 end, [], [fun get_predefined_addresses/2,
1038 fun lookup_services/2,
1039 fun lookup_addrs/2]).
1040
1041 %% @doc Get IPs predefined for a given s2s domain in the configuration
1042 -spec get_predefined_addresses(mongooseim:host_type(), jid:lserver()) -> [addr()].
1043 get_predefined_addresses(HostType, Server) ->
1044 50 case mongoose_config:lookup_opt([{s2s, HostType}, address, Server]) of
1045 {ok, #{ip_address := IPAddress} = M} ->
1046 42 {ok, IPTuple} = inet:parse_address(IPAddress),
1047 42 Port = get_predefined_port(HostType, M),
1048 42 [#{ip_tuple => IPTuple, port => Port, type => addr_type(IPTuple)}];
1049 {error, not_found} ->
1050 8 []
1051 end.
1052
1053
:-(
get_predefined_port(_HostType, #{port := Port}) -> Port;
1054 42 get_predefined_port(HostType, _Addr) -> outgoing_s2s_port(HostType).
1055
1056 42 addr_type(Addr) when tuple_size(Addr) =:= 4 -> inet;
1057
:-(
addr_type(Addr) when tuple_size(Addr) =:= 8 -> inet6.
1058
1059 get_acc_with_new_sext(?NS_SASL, Els1, {_SEXT, STLS, STLSReq}) ->
1060 2 NewSEXT =
1061 lists:any(
1062 fun(#xmlel{name = <<"mechanism">>,
1063 children = Els2}) ->
1064 2 case xml:get_cdata(Els2) of
1065 2 <<"EXTERNAL">> -> true;
1066
:-(
_ -> false
1067 end;
1068
:-(
(_) -> false
1069 end, Els1),
1070 2 {NewSEXT, STLS, STLSReq};
1071 get_acc_with_new_sext(_, _, Acc) ->
1072
:-(
Acc.
1073
1074 get_acc_with_new_tls(?NS_TLS, El1, {SEXT, _STLS, _STLSReq}) ->
1075 17 Req = case xml:get_subtag(El1, <<"required">>) of
1076 9 #xmlel{} -> true;
1077 8 false -> false
1078 end,
1079 17 {SEXT, true, Req};
1080 get_acc_with_new_tls(_, _, Acc) ->
1081
:-(
Acc.
1082
1083 tls_options(HostType) ->
1084 53 Ciphers = mongoose_config:get_opt([{s2s, HostType}, ciphers]),
1085 53 Options = #{verify_mode => peer, ciphers => Ciphers},
1086 53 case mongoose_s2s_lib:lookup_certfile(HostType) of
1087 35 {ok, CertFile} -> Options#{certfile => CertFile};
1088 18 {error, not_found} -> Options
1089 end.
1090
1091 calc_addr_index({Priority, Weight, Port, Host}) ->
1092
:-(
N = case Weight of
1093
:-(
0 -> 0;
1094
:-(
_ -> (Weight + 1) * rand:uniform()
1095 end,
1096
:-(
{Priority * 65536 - N, Host, Port}.
1097
1098 handle_parsed_features({false, false, _, StateData = #state{authenticated = true}}) ->
1099 1 send_queue(StateData, StateData#state.queue),
1100 1 ?LOG_INFO(#{what => s2s_out_connected,
1101 text => <<"New outgoing s2s connection established">>,
1102 1 myname => StateData#state.myname, server => StateData#state.server}),
1103 1 {next_state, stream_established,
1104 StateData#state{queue = queue:new()}};
1105 handle_parsed_features({true, _, _, StateData = #state{try_auth = true, is_registered = true}}) ->
1106 1 send_element(StateData,
1107 #xmlel{name = <<"auth">>,
1108 attrs = [{<<"xmlns">>, ?NS_SASL},
1109 {<<"mechanism">>, <<"EXTERNAL">>}],
1110 children =
1111 [#xmlcdata{content = jlib:encode_base64(
1112 StateData#state.myname)}]}),
1113 1 {next_state, wait_for_auth_result,
1114 StateData#state{try_auth = false}, ?FSMTIMEOUT};
1115 handle_parsed_features({_, true, _, StateData = #state{tls = true, tls_enabled = false}}) ->
1116 17 send_element(StateData,
1117 #xmlel{name = <<"starttls">>,
1118 attrs = [{<<"xmlns">>, ?NS_TLS}]}),
1119 17 {next_state, wait_for_starttls_proceed, StateData,
1120 ?FSMTIMEOUT};
1121 handle_parsed_features({_, _, true, StateData = #state{tls = false}}) ->
1122
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1123
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1124
:-(
mongoose_transport:close(StateData#state.socket),
1125
:-(
{next_state, reopen_socket,
1126 StateData#state{socket = undefined,
1127 use_v10 = false}, ?FSMTIMEOUT};
1128 handle_parsed_features({_, _, _, StateData = #state{dialback_enabled = true}}) ->
1129 15 send_dialback_request(StateData);
1130 handle_parsed_features({_, _, _, StateData}) ->
1131
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1132
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1133 % TODO: clear message queue
1134
:-(
mongoose_transport:close(StateData#state.socket),
1135
:-(
{next_state, reopen_socket, StateData#state{socket = undefined,
1136 use_v10 = false}, ?FSMTIMEOUT}.
1137
1138 handle_get_state_info(StateName, StateData) ->
1139 2 {Addr, Port} = get_peername(StateData#state.socket),
1140 2 #{pid => self(),
1141 direction => out,
1142 statename => StateName,
1143 addr => Addr,
1144 port => Port,
1145 streamid => StateData#state.streamid,
1146 use_v10 => StateData#state.use_v10,
1147 tls => StateData#state.tls,
1148 tls_required => StateData#state.tls_required,
1149 tls_enabled => StateData#state.tls_enabled,
1150 tls_options => StateData#state.tls_options,
1151 authenticated => StateData#state.authenticated,
1152 dialback_enabled => StateData#state.dialback_enabled,
1153 try_auth => StateData#state.try_auth,
1154 myname => StateData#state.myname,
1155 server => StateData#state.server,
1156 delay_to_retry => StateData#state.delay_to_retry,
1157 verify => StateData#state.verify}.
1158
1159 get_peername(undefined) ->
1160
:-(
{unknown, unknown};
1161 get_peername(Socket) ->
1162 2 {ok, {Addr, Port}} = mongoose_transport:peername(Socket),
1163 2 {Addr, Port}.
Line Hits Source