./ct_report/coverage/ejabberd_s2s_out.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : ejabberd_s2s_out.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Manage outgoing server-to-server connections
5 %%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(ejabberd_s2s_out).
27 -author('alexey@process-one.net').
28 -behaviour(p1_fsm).
29
30 %% External exports
31 -export([start/3,
32 start_link/3,
33 start_connection/1,
34 terminate_if_waiting_delay/2,
35 stop_connection/1]).
36
37 %% p1_fsm callbacks (same as gen_fsm)
38 -export([init/1,
39 open_socket/2,
40 wait_for_stream/2,
41 wait_for_validation/2,
42 wait_for_features/2,
43 wait_for_auth_result/2,
44 wait_for_starttls_proceed/2,
45 relay_to_bridge/2,
46 reopen_socket/2,
47 wait_before_retry/2,
48 stream_established/2,
49 handle_event/3,
50 handle_sync_event/4,
51 handle_info/3,
52 terminate/3,
53 print_state/1,
54 code_change/4]).
55
56 -ignore_xref([open_socket/2, print_state/1, relay_to_bridge/2,
57 reopen_socket/2, start_link/3, stream_established/2,
58 wait_before_retry/2, wait_for_auth_result/2,
59 wait_for_features/2, wait_for_starttls_proceed/2, wait_for_stream/2,
60 wait_for_stream/2, wait_for_validation/2]).
61
62 -include("mongoose.hrl").
63 -include("jlib.hrl").
64
65 -record(state, {socket,
66 streamid,
67 remote_streamid = <<>>,
68 use_v10,
69 tls = false :: boolean(),
70 tls_required = false :: boolean(),
71 tls_enabled = false :: boolean(),
72 tls_options = [connect] :: list(),
73 authenticated = false :: boolean(),
74 db_enabled = true :: boolean(),
75 try_auth = true :: boolean(),
76 myname, server, queue,
77 delay_to_retry = undefined_delay,
78 new = false :: boolean(),
79 verify = false :: false | {pid(), Key :: binary(), SID :: binary()},
80 bridge,
81 timer :: reference()
82 }).
83 -type state() :: #state{}.
84
85 -type element_queue() :: queue:queue(#xmlel{}).
86 -type statename() :: open_socket
87 | wait_for_stream
88 | wait_for_features
89 | wait_for_auth_result
90 | wait_for_starttls_proceed
91 | wait_for_validation
92 | wait_before_retry
93 | relay_to_bridge.
94 %% FSM handler return value
95 -type fsm_return() :: {'stop', Reason :: 'normal', state()}
96 | {'next_state', statename(), state()}
97 | {'next_state', statename(), state(), Timeout :: integer()}.
98
99 -type addr() :: #{ip_tuple := inet:ip_adress(),
100 port := inet:port_number(),
101 type := inet | inet6}.
102
103 %%-define(DBGFSM, true).
104
105 -ifdef(DBGFSM).
106 -define(FSMOPTS, [{debug, [trace]}]).
107 -else.
108 -define(FSMOPTS, []).
109 -endif.
110
111 %% Module start with or without supervisor:
112 -ifdef(NO_TRANSIENT_SUPERVISORS).
113 -define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_out, [From, Host, Type],
114 fsm_limit_opts() ++ ?FSMOPTS)).
115 -else.
116 -define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_out_sup,
117 [From, Host, Type])).
118 -endif.
119
120 -define(FSMTIMEOUT, 30000).
121
122 %% We do not block on send anymore.
123 -define(TCP_SEND_TIMEOUT, 15000).
124
125 %% Maximum delay to wait before retrying to connect after a failed attempt.
126 %% Specified in seconds. Default value is 5 minutes.
127 -define(MAX_RETRY_DELAY, 300).
128
129 -define(STREAM_HEADER(From, To, Other),
130 <<"<?xml version='1.0'?>",
131 "<stream:stream "
132 "xmlns:stream='http://etherx.jabber.org/streams' "
133 "xmlns='jabber:server' "
134 "xmlns:db='jabber:server:dialback' "
135 "from='", (From)/binary, "' ",
136 "to='", (To)/binary, "' ",
137 (Other)/binary, ">">>
138 ).
139
140 -define(SOCKET_DEFAULT_RESULT, {error, badarg}).
141
142
143 -define(CLOSE_GENERIC(StateName, Reason, StateData),
144 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection">>,
145 state_name => StateName, reason => Reason,
146 myname => StateData#state.myname, server => StateData#state.server}),
147 {stop, normal, StateData}).
148
149 -define(CLOSE_GENERIC(StateName, Reason, El, StateData),
150 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection on stanza">>,
151 state_name => StateName, reason => Reason, exml_packet => El,
152 myname => StateData#state.myname, server => StateData#state.server}),
153 {stop, normal, StateData}).
154
155 %%%----------------------------------------------------------------------
156 %%% API
157 %%%----------------------------------------------------------------------
158 -spec start(_, _, _) -> {'error', _} | {'ok', 'undefined' | pid()} | {'ok', 'undefined' | pid(), _}.
159 start(From, Host, Type) ->
160 6 ?SUPERVISOR_START.
161
162
163 -spec start_link(_, _, _) -> 'ignore' | {'error', _} | {'ok', pid()}.
164 start_link(From, Host, Type) ->
165 6 p1_fsm:start_link(ejabberd_s2s_out, [From, Host, Type],
166 fsm_limit_opts() ++ ?FSMOPTS).
167
168
169 start_connection(Pid) ->
170 5 p1_fsm:send_event(Pid, init).
171
172
173 stop_connection(Pid) ->
174 1 p1_fsm:send_event(Pid, closed).
175
176 %%%----------------------------------------------------------------------
177 %%% Callback functions from p1_fsm
178 %%%----------------------------------------------------------------------
179
180 %%----------------------------------------------------------------------
181 %% Func: init/1
182 %% Returns: {ok, StateName, StateData} |
183 %% {ok, StateName, StateData, Timeout} |
184 %% ignore |
185 %% {stop, StopReason}
186 %%----------------------------------------------------------------------
187 -spec init([any(), ...]) -> {'ok', 'open_socket', state()}.
188 init([From, Server, Type]) ->
189 6 process_flag(trap_exit, true),
190 6 ?LOG_DEBUG(#{what => s2s_out_started,
191 text => <<"New outgoing s2s connection">>,
192 6 from => From, server => Server, type => Type}),
193 6 {TLS, TLSRequired} = case mongoose_config:get_opt(s2s_use_starttls, false) of
194 UseTls when (UseTls==false) ->
195
:-(
{false, false};
196 UseTls when (UseTls==true) or (UseTls==optional) ->
197 6 {true, false};
198 UseTls when (UseTls==required) or (UseTls==required_trusted) ->
199
:-(
{true, true}
200 end,
201 6 UseV10 = TLS,
202 6 TLSOpts = case mongoose_config:lookup_opt(s2s_certfile) of
203 {error, not_found} ->
204
:-(
[connect];
205 {ok, CertFile} ->
206 6 [{certfile, CertFile}, connect]
207 end,
208 6 TLSOpts2 = case mongoose_config:lookup_opt(s2s_ciphers) of
209 {error, not_found} ->
210 6 TLSOpts;
211 {ok, Ciphers} ->
212
:-(
[{ciphers, Ciphers} | TLSOpts]
213 end,
214 6 {New, Verify} = case Type of
215 new ->
216 6 {true, false};
217 {verify, Pid, Key, SID} ->
218
:-(
start_connection(self()),
219
:-(
{false, {Pid, Key, SID}}
220 end,
221 6 Timer = erlang:start_timer(ejabberd_s2s:timeout(), self(), []),
222 6 {ok, open_socket, #state{use_v10 = UseV10,
223 tls = TLS,
224 tls_required = TLSRequired,
225 tls_options = TLSOpts2,
226 queue = queue:new(),
227 myname = From,
228 server = Server,
229 new = New,
230 verify = Verify,
231 timer = Timer}}.
232
233 %%----------------------------------------------------------------------
234 %% Func: StateName/2
235 %% Returns: {next_state, NextStateName, NextStateData} |
236 %% {next_state, NextStateName, NextStateData, Timeout} |
237 %% {stop, Reason, NewStateData}
238 %%----------------------------------------------------------------------
239 -spec open_socket(_, state()) -> fsm_return().
240 open_socket(init, StateData) ->
241 5 log_s2s_out(StateData#state.new,
242 StateData#state.myname,
243 StateData#state.server,
244 StateData#state.tls),
245 5 ?LOG_DEBUG(#{what => s2s_open_socket,
246 myname => StateData#state.myname,
247 server => StateData#state.server,
248 new => StateData#state.new,
249 5 verify => StateData#state.verify}),
250 5 AddrList = get_addr_list(StateData#state.server),
251 5 case lists:foldl(fun(_, {ok, Socket}) ->
252
:-(
{ok, Socket};
253 (#{ip_tuple := Addr, port := Port, type := Type}, _) ->
254
:-(
open_socket2(Type, Addr, Port)
255 end, ?SOCKET_DEFAULT_RESULT, AddrList) of
256 {ok, Socket} ->
257
:-(
Version = case StateData#state.use_v10 of
258
:-(
true -> <<" version='1.0'">>;
259
:-(
false -> <<"">>
260 end,
261
:-(
NewStateData = StateData#state{socket = Socket,
262 tls_enabled = false,
263 streamid = new_id()},
264
:-(
send_text(NewStateData,
265 ?STREAM_HEADER(StateData#state.myname, StateData#state.server, Version)),
266
:-(
{next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
267 {error, Reason} ->
268 5 ?LOG_WARNING(#{what => s2s_out_failed, reason => Reason,
269 text => <<"Outgoing s2s connection failed (remote server not found)">>,
270
:-(
myname => StateData#state.myname, server => StateData#state.server}),
271 5 case mongoose_hooks:find_s2s_bridge(StateData#state.myname,
272 StateData#state.server) of
273 {Mod, Fun, Type} ->
274
:-(
?LOG_INFO(#{what => s2s_out_bridge_found,
275 text => <<"Found a bridge, relay_to_bridge next.">>,
276 type => Type,
277
:-(
myname => StateData#state.myname, server => StateData#state.server}),
278
:-(
NewStateData = StateData#state{bridge={Mod, Fun}},
279
:-(
{next_state, relay_to_bridge, NewStateData};
280 _ ->
281 5 wait_before_reconnect(StateData)
282 end
283 end;
284 open_socket(closed, StateData) ->
285 1 ?CLOSE_GENERIC(open_socket, closed, StateData);
286 open_socket(timeout, StateData) ->
287
:-(
?CLOSE_GENERIC(open_socket, timeout, StateData);
288 open_socket(_, StateData) ->
289
:-(
{next_state, open_socket, StateData}.
290
291 -spec open_socket2(Type :: inet | inet6,
292 Addr :: inet:ip_address(),
293 Port :: inet:port_number()) -> {'error', _} | {'ok', _}.
294 open_socket2(Type, Addr, Port) ->
295
:-(
?LOG_DEBUG(#{what => s2s_out_connecting,
296
:-(
address => Addr, port => Port}),
297
:-(
Timeout = outgoing_s2s_timeout(),
298
:-(
SockOpts = [binary,
299 {packet, 0},
300 {send_timeout, ?TCP_SEND_TIMEOUT},
301 {send_timeout_close, true},
302 {active, false},
303 Type],
304
305
:-(
case (catch ejabberd_socket:connect(Addr, Port, SockOpts, Timeout)) of
306
:-(
{ok, _Socket} = R -> R;
307 {error, Reason} = R ->
308
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
309
:-(
address => Addr, port => Port, reason => Reason}),
310
:-(
R;
311 {'EXIT', Reason} ->
312
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
313 text => <<"Failed to open s2s socket because of crashing">>,
314
:-(
address => Addr, port => Port, reason => Reason}),
315
:-(
{error, Reason}
316 end.
317
318 %%----------------------------------------------------------------------
319
320 -spec wait_for_stream(ejabberd:xml_stream_item(), state()) -> fsm_return().
321 wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData0) ->
322
:-(
RemoteStreamID = xml:get_attr_s(<<"id">>, Attrs),
323
:-(
StateData = StateData0#state{remote_streamid = RemoteStreamID},
324
:-(
case {xml:get_attr_s(<<"xmlns">>, Attrs),
325 xml:get_attr_s(<<"xmlns:db">>, Attrs),
326 xml:get_attr_s(<<"version">>, Attrs) == <<"1.0">>} of
327 {<<"jabber:server">>, <<"jabber:server:dialback">>, false} ->
328
:-(
send_db_request(StateData);
329 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
330 StateData#state.use_v10 ->
331
:-(
{next_state, wait_for_features, StateData, ?FSMTIMEOUT};
332 %% Clause added to handle Tigase's workaround for an old ejabberd bug:
333 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
334 not StateData#state.use_v10 ->
335
:-(
send_db_request(StateData);
336 {<<"jabber:server">>, <<"">>, true} when StateData#state.use_v10 ->
337
:-(
{next_state, wait_for_features, StateData#state{db_enabled = false}, ?FSMTIMEOUT};
338 {NSProvided, DB, _} ->
339
:-(
send_text(StateData, exml:to_binary(mongoose_xmpp_errors:invalid_namespace())),
340
:-(
?LOG_INFO(#{what => s2s_out_closing,
341 text => <<"Closing s2s connection: (invalid namespace)">>,
342 namespace_provided => NSProvided,
343 namespace_expected => <<"jabber:server">>,
344 xmlnsdb_provided => DB,
345 all_attributes => Attrs,
346
:-(
myname => StateData#state.myname, server => StateData#state.server}),
347
:-(
{stop, normal, StateData}
348 end;
349 wait_for_stream({xmlstreamerror, _}, StateData) ->
350
:-(
send_text(StateData,
351 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
352
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamerror, StateData);
353 wait_for_stream({xmlstreamend, _Name}, StateData) ->
354
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamend, StateData);
355 wait_for_stream(timeout, StateData) ->
356
:-(
?CLOSE_GENERIC(wait_for_stream, timeout, StateData);
357 wait_for_stream(closed, StateData) ->
358
:-(
?CLOSE_GENERIC(wait_for_stream, closed, StateData).
359
360
361 -spec wait_for_validation(ejabberd:xml_stream_item(), state()) -> fsm_return().
362 wait_for_validation({xmlstreamelement, El}, StateData) ->
363
:-(
case is_verify_res(El) of
364 {result, To, From, Id, Type} ->
365
:-(
?LOG_DEBUG(#{what => s2s_receive_result,
366
:-(
from => From, to => To, message_id => Id, type => Type}),
367
:-(
case {Type, StateData#state.tls_enabled, StateData#state.tls_required} of
368 {<<"valid">>, Enabled, Required} when (Enabled==true) or (Required==false) ->
369
:-(
send_queue(StateData, StateData#state.queue),
370
:-(
?LOG_INFO(#{what => s2s_out_connected,
371 text => <<"New outgoing s2s connection established">>,
372 tls_enabled => StateData#state.tls_enabled,
373
:-(
myname => StateData#state.myname, server => StateData#state.server}),
374
:-(
mongoose_hooks:s2s_connect_hook(StateData#state.myname,
375 StateData#state.server),
376
:-(
{next_state, stream_established,
377 StateData#state{queue = queue:new()}};
378 {<<"valid">>, Enabled, Required} when (Enabled==false) and (Required==true) ->
379 %% TODO: bounce packets
380
:-(
?CLOSE_GENERIC(wait_for_validation, tls_required_but_unavailable, El, StateData);
381 _ ->
382 %% TODO: bounce packets
383
:-(
?CLOSE_GENERIC(wait_for_validation, invalid_dialback_key, El, StateData)
384 end;
385 {verify, To, From, Id, Type} ->
386
:-(
?LOG_DEBUG(#{what => s2s_receive_verify,
387
:-(
from => From, to => To, message_id => Id, type => Type}),
388
:-(
case StateData#state.verify of
389 false ->
390
:-(
NextState = wait_for_validation,
391 %% TODO: Should'nt we close the connection here ?
392
:-(
{next_state, NextState, StateData,
393 get_timeout_interval(NextState)};
394 {Pid, _Key, _SID} ->
395
:-(
send_event(Type, Pid, StateData),
396
:-(
NextState = wait_for_validation,
397
:-(
{next_state, NextState, StateData,
398 get_timeout_interval(NextState)}
399
400 end;
401 _ ->
402
:-(
{next_state, wait_for_validation, StateData, ?FSMTIMEOUT*3}
403 end;
404 wait_for_validation({xmlstreamend, _Name}, StateData) ->
405
:-(
?CLOSE_GENERIC(wait_for_validation, xmlstreamend, StateData);
406 wait_for_validation({xmlstreamerror, _}, StateData) ->
407
:-(
send_text(StateData,
408 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
409
:-(
?CLOSE_GENERIC(wait_for_validation, xmlstreamerror, StateData);
410 wait_for_validation(timeout, #state{verify = {VPid, VKey, SID}} = StateData)
411 when is_pid(VPid) and is_binary(VKey) and is_binary(SID) ->
412 %% This is an auxiliary s2s connection for dialback.
413 %% This timeout is normal and doesn't represent a problem.
414
:-(
?LOG_INFO(#{what => s2s_out_validation_timeout,
415 text => <<"Timeout in verify outgoing s2s connection. Stopping">>,
416
:-(
myname => StateData#state.myname, server => StateData#state.server}),
417
:-(
{stop, normal, StateData};
418 wait_for_validation(timeout, StateData) ->
419
:-(
?LOG_INFO(#{what => s2s_out_connect_timeout,
420 text => <<"Connection timeout in outgoing s2s connection. Stopping">>,
421
:-(
myname => StateData#state.myname, server => StateData#state.server}),
422
:-(
{stop, normal, StateData};
423 wait_for_validation(closed, StateData) ->
424
:-(
?LOG_INFO(#{what => s2s_out_validation_closed,
425 text => <<"Connection closed when waiting for validation in outgoing s2s connection. Stopping">>,
426
:-(
myname => StateData#state.myname, server => StateData#state.server}),
427
:-(
{stop, normal, StateData}.
428
429
430 -spec wait_for_features(ejabberd:xml_stream_item(), state()) -> fsm_return().
431 wait_for_features({xmlstreamelement, El}, StateData) ->
432
:-(
case El of
433 #xmlel{name = <<"stream:features">>, children = Els} ->
434
:-(
{SASLEXT, StartTLS, StartTLSRequired} =
435 lists:foldl(
436 fun(#xmlel{name = <<"mechanisms">>, attrs = Attrs1,
437 children = Els1} = _El1, Acc) ->
438
:-(
Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
439
:-(
get_acc_with_new_sext(Attr, Els1, Acc);
440 (#xmlel{name = <<"starttls">>, attrs = Attrs1} = El1, Acc) ->
441
:-(
Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
442
:-(
get_acc_with_new_tls(Attr, El1, Acc);
443 (_, Acc) ->
444
:-(
Acc
445 end, {false, false, false}, Els),
446
:-(
handle_parsed_features({SASLEXT, StartTLS, StartTLSRequired, StateData});
447 _ ->
448
:-(
send_text(StateData,
449 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
450 (?STREAM_TRAILER)/binary>>),
451
:-(
?CLOSE_GENERIC(wait_for_features, bad_format, El, StateData)
452 end;
453 wait_for_features({xmlstreamend, _Name}, StateData) ->
454
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamend, StateData);
455 wait_for_features({xmlstreamerror, _}, StateData) ->
456
:-(
send_text(StateData,
457 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
458
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamerror, StateData);
459 wait_for_features(timeout, StateData) ->
460
:-(
?CLOSE_GENERIC(wait_for_features, timeout, StateData);
461 wait_for_features(closed, StateData) ->
462
:-(
?CLOSE_GENERIC(wait_for_features, closed, StateData).
463
464
465 -spec wait_for_auth_result(ejabberd:xml_stream_item(), state()) -> fsm_return().
466 wait_for_auth_result({xmlstreamelement, El}, StateData) ->
467
:-(
case El of
468 #xmlel{name = <<"success">>, attrs = Attrs} ->
469
:-(
case xml:get_attr_s(<<"xmlns">>, Attrs) of
470 ?NS_SASL ->
471
:-(
?LOG_DEBUG(#{what => s2s_auth_success,
472 myname => StateData#state.myname,
473
:-(
server => StateData#state.server}),
474
:-(
send_text(StateData,
475 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
476 <<" version='1.0'">>)),
477
:-(
{next_state, wait_for_stream,
478 StateData#state{streamid = new_id(),
479 authenticated = true
480 }, ?FSMTIMEOUT};
481 _ ->
482
:-(
send_text(StateData,
483 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
484 (?STREAM_TRAILER)/binary>>),
485
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
486 end;
487 #xmlel{name = <<"failure">>, attrs = Attrs} ->
488
:-(
case xml:get_attr_s(<<"xmlns">>, Attrs) of
489 ?NS_SASL ->
490
:-(
?LOG_INFO(#{what => s2s_auth_failure,
491 text => <<"Received failure result in ejabberd_s2s_out. Restarting">>,
492 myname => StateData#state.myname,
493
:-(
server => StateData#state.server}),
494
:-(
ejabberd_socket:close(StateData#state.socket),
495
:-(
{next_state, reopen_socket,
496 StateData#state{socket = undefined}, ?FSMTIMEOUT};
497 _ ->
498
:-(
send_text(StateData,
499 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
500 (?STREAM_TRAILER)/binary>>),
501
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
502 end;
503 _ ->
504
:-(
send_text(StateData,
505 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
506 (?STREAM_TRAILER)/binary>>),
507
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
508 end;
509 wait_for_auth_result({xmlstreamend, _Name}, StateData) ->
510
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamend, StateData);
511 wait_for_auth_result({xmlstreamerror, _}, StateData) ->
512
:-(
send_text(StateData,
513 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
514
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamerror, StateData);
515 wait_for_auth_result(timeout, StateData) ->
516
:-(
?CLOSE_GENERIC(wait_for_auth_result, timeout, StateData);
517 wait_for_auth_result(closed, StateData) ->
518
:-(
?CLOSE_GENERIC(wait_for_auth_result, closed, StateData).
519
520
521 -spec wait_for_starttls_proceed(ejabberd:xml_stream_item(), state()) -> fsm_return().
522 wait_for_starttls_proceed({xmlstreamelement, El}, StateData) ->
523
:-(
case El of
524 #xmlel{name = <<"proceed">>, attrs = Attrs} ->
525
:-(
case xml:get_attr_s(<<"xmlns">>, Attrs) of
526 ?NS_TLS ->
527
:-(
?LOG_DEBUG(#{what => s2s_starttls,
528 myname => StateData#state.myname,
529
:-(
server => StateData#state.server}),
530
:-(
Socket = StateData#state.socket,
531
:-(
TLSOpts = get_tls_opts_with_certfile(StateData),
532
:-(
TLSOpts2 = get_tls_opts_with_ciphers(TLSOpts),
533
:-(
TLSSocket = ejabberd_socket:starttls(Socket, TLSOpts2),
534
:-(
NewStateData = StateData#state{socket = TLSSocket,
535 streamid = new_id(),
536 tls_enabled = true,
537 tls_options = TLSOpts2
538 },
539
:-(
send_text(NewStateData,
540 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
541 <<" version='1.0'">>)),
542
:-(
{next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
543 _ ->
544
:-(
send_text(StateData,
545 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
546 (?STREAM_TRAILER)/binary>>),
547
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
548 end;
549 _ ->
550
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
551 end;
552 wait_for_starttls_proceed({xmlstreamend, _Name}, StateData) ->
553
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamend, StateData);
554 wait_for_starttls_proceed({xmlstreamerror, _}, StateData) ->
555
:-(
send_text(StateData,
556 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
557
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamerror, StateData);
558 wait_for_starttls_proceed(timeout, StateData) ->
559
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, timeout, StateData);
560 wait_for_starttls_proceed(closed, StateData) ->
561
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, closed, StateData).
562
563
564 -spec reopen_socket(ejabberd:xml_stream_item(), state()) -> fsm_return().
565 reopen_socket({xmlstreamelement, _El}, StateData) ->
566
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
567 reopen_socket({xmlstreamend, _Name}, StateData) ->
568
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
569 reopen_socket({xmlstreamerror, _}, StateData) ->
570
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
571 reopen_socket(timeout, StateData) ->
572
:-(
?CLOSE_GENERIC(reopen_socket, timeout, StateData);
573 reopen_socket(closed, StateData) ->
574
:-(
p1_fsm:send_event(self(), init),
575
:-(
{next_state, open_socket, StateData, ?FSMTIMEOUT}.
576
577
578 %% @doc This state is use to avoid reconnecting to often to bad sockets
579 -spec wait_before_retry(ejabberd:xml_stream_item(), state()) -> fsm_return().
580 wait_before_retry(_Event, StateData) ->
581
:-(
{next_state, wait_before_retry, StateData, ?FSMTIMEOUT}.
582
583
584 -spec relay_to_bridge(ejabberd:xml_stream_item(), state()) -> fsm_return().
585 relay_to_bridge(stop, StateData) ->
586
:-(
wait_before_reconnect(StateData);
587 relay_to_bridge(closed, StateData) ->
588
:-(
?CLOSE_GENERIC(relay_to_bridge, closed, StateData);
589 relay_to_bridge(_Event, StateData) ->
590
:-(
{next_state, relay_to_bridge, StateData}.
591
592
593 -spec stream_established(ejabberd:xml_stream_item(), state()) -> fsm_return().
594 stream_established({xmlstreamelement, El}, StateData) ->
595
:-(
?LOG_DEBUG(#{what => s2s_out_stream_established, exml_packet => El,
596
:-(
myname => StateData#state.myname, server => StateData#state.server}),
597
:-(
case is_verify_res(El) of
598 {verify, VTo, VFrom, VId, VType} ->
599
:-(
?LOG_DEBUG(#{what => s2s_recv_verify,
600 to => VTo, from => VFrom, message_id => VId, type => VType,
601
:-(
myname => StateData#state.myname, server => StateData#state.server}),
602
:-(
case StateData#state.verify of
603 {VPid, _VKey, _SID} ->
604
:-(
send_event(VType, VPid, StateData);
605 _ ->
606
:-(
ok
607 end;
608 _ ->
609
:-(
ok
610 end,
611
:-(
{next_state, stream_established, StateData};
612 stream_established({xmlstreamend, _Name}, StateData) ->
613
:-(
?CLOSE_GENERIC(stream_established, xmlstreamend, StateData);
614 stream_established({xmlstreamerror, _}, StateData) ->
615
:-(
send_text(StateData,
616 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
617
:-(
?CLOSE_GENERIC(stream_established, xmlstreamerror, StateData);
618 stream_established(timeout, StateData) ->
619
:-(
?CLOSE_GENERIC(stream_established, timeout, StateData);
620 stream_established(closed, StateData) ->
621
:-(
?CLOSE_GENERIC(stream_established, closed, StateData).
622
623
624 %%----------------------------------------------------------------------
625 %% Func: StateName/3
626 %% Returns: {next_state, NextStateName, NextStateData} |
627 %% {next_state, NextStateName, NextStateData, Timeout} |
628 %% {reply, Reply, NextStateName, NextStateData} |
629 %% {reply, Reply, NextStateName, NextStateData, Timeout} |
630 %% {stop, Reason, NewStateData} |
631 %% {stop, Reason, Reply, NewStateData}
632 %%----------------------------------------------------------------------
633 %%state_name(Event, From, StateData) ->
634 %% Reply = ok,
635 %% {reply, Reply, state_name, StateData}.
636
637 %%----------------------------------------------------------------------
638 %% Func: handle_event/3
639 %% Returns: {next_state, NextStateName, NextStateData} |
640 %% {next_state, NextStateName, NextStateData, Timeout} |
641 %% {stop, Reason, NewStateData}
642 %%----------------------------------------------------------------------
643 handle_event(_Event, StateName, StateData) ->
644
:-(
{next_state, StateName, StateData, get_timeout_interval(StateName)}.
645
646 %%----------------------------------------------------------------------
647 %% Func: handle_sync_event/4
648 %% Returns: The associated StateData for this connection
649 %% {reply, Reply, NextStateName, NextStateData}
650 %% Reply = {state_infos, [{InfoName::atom(), InfoValue::any()]
651 %%----------------------------------------------------------------------
652 handle_sync_event(get_state_infos, _From, StateName, StateData) ->
653
:-(
{Addr, Port} = try ejabberd_socket:peername(StateData#state.socket) of
654
:-(
{ok, {A, P}} -> {A, P}
655 catch
656 _:_ ->
657
:-(
{unknown, unknown}
658 end,
659
:-(
Infos = [
660 {direction, out},
661 {statename, StateName},
662 {addr, Addr},
663 {port, Port},
664 {streamid, StateData#state.streamid},
665 {use_v10, StateData#state.use_v10},
666 {tls, StateData#state.tls},
667 {tls_required, StateData#state.tls_required},
668 {tls_enabled, StateData#state.tls_enabled},
669 {tls_options, StateData#state.tls_options},
670 {authenticated, StateData#state.authenticated},
671 {db_enabled, StateData#state.db_enabled},
672 {try_auth, StateData#state.try_auth},
673 {myname, StateData#state.myname},
674 {server, StateData#state.server},
675 {delay_to_retry, StateData#state.delay_to_retry},
676 {verify, StateData#state.verify}
677 ],
678
:-(
Reply = {state_infos, Infos},
679
:-(
{reply, Reply, StateName, StateData};
680
681 %%----------------------------------------------------------------------
682 %% Func: handle_sync_event/4
683 %% Returns: {next_state, NextStateName, NextStateData} |
684 %% {next_state, NextStateName, NextStateData, Timeout} |
685 %% {reply, Reply, NextStateName, NextStateData} |
686 %% {reply, Reply, NextStateName, NextStateData, Timeout} |
687 %% {stop, Reason, NewStateData} |
688 %% {stop, Reason, Reply, NewStateData}
689 %%----------------------------------------------------------------------
690 handle_sync_event(_Event, _From, StateName, StateData) ->
691
:-(
Reply = ok,
692
:-(
{reply, Reply, StateName, StateData, get_timeout_interval(StateName)}.
693
694
695 code_change(_OldVsn, StateName, StateData, _Extra) ->
696
:-(
{ok, StateName, StateData}.
697
698 %%----------------------------------------------------------------------
699 %% Func: handle_info/3
700 %% Returns: {next_state, NextStateName, NextStateData} |
701 %% {next_state, NextStateName, NextStateData, Timeout} |
702 %% {stop, Reason, NewStateData}
703 %%----------------------------------------------------------------------
704 handle_info({send_text, Text}, StateName, StateData) ->
705
:-(
?LOG_ERROR(#{what => s2s_send_text, text => <<"Deprecated ejabberd_s2s_out send_text">>,
706 myname => StateData#state.myname, server => StateData#state.server,
707
:-(
send_text => Text}),
708
:-(
send_text(StateData, Text),
709
:-(
cancel_timer(StateData#state.timer),
710
:-(
Timer = erlang:start_timer(ejabberd_s2s:timeout(), self(), []),
711
:-(
{next_state, StateName, StateData#state{timer = Timer},
712 get_timeout_interval(StateName)};
713 handle_info({send_element, Acc, El}, StateName, StateData) ->
714 16 case StateName of
715 stream_established ->
716
:-(
cancel_timer(StateData#state.timer),
717
:-(
Timer = erlang:start_timer(ejabberd_s2s:timeout(), self(), []),
718
:-(
send_element(StateData, El),
719
:-(
{next_state, StateName, StateData#state{timer = Timer}};
720 %% In this state we bounce all message: We are waiting before
721 %% trying to reconnect
722 wait_before_retry ->
723 16 bounce_element(Acc, El, mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s">>)),
724 16 {next_state, StateName, StateData};
725 relay_to_bridge ->
726 %% In this state we relay all outbound messages
727 %% to a foreign protocol bridge such as SMTP, SIP, etc.
728
:-(
{Mod, Fun} = StateData#state.bridge,
729
:-(
?LOG_DEBUG(#{what => s2s_relay_stanza,
730 text => <<"Relaying stanza to bridge">>,
731 bridge_module => Mod, bridge_fun => Fun, acc => Acc,
732
:-(
myname => StateData#state.myname, server => StateData#state.server}),
733
:-(
case catch Mod:Fun(El) of
734 {'EXIT', Reason} ->
735
:-(
?LOG_ERROR(#{what => s2s_relay_to_bridge_failed,
736 bridge_module => Mod, bridge_fun => Fun,
737 reason => Reason, acc => Acc,
738 myname => StateData#state.myname,
739
:-(
server => StateData#state.server}),
740
:-(
bounce_element(Acc, El, mongoose_xmpp_errors:internal_server_error()),
741
:-(
wait_before_reconnect(StateData);
742 _ ->
743
:-(
{next_state, StateName, StateData}
744 end;
745 _ ->
746
:-(
Q = queue:in({Acc, El}, StateData#state.queue),
747
:-(
{next_state, StateName, StateData#state{queue = Q},
748 get_timeout_interval(StateName)}
749 end;
750 handle_info({timeout, Timer, _}, wait_before_retry,
751 #state{timer = Timer} = StateData) ->
752 5 ?LOG_INFO(#{what => s2s_reconnect_delay_expired,
753 text => <<"Reconnect delay expired: Will now retry to connect when needed.">>,
754 myname => StateData#state.myname,
755 5 server => StateData#state.server}),
756 5 {stop, normal, StateData};
757 handle_info({timeout, Timer, _}, StateName,
758 #state{timer = Timer} = StateData) ->
759
:-(
?CLOSE_GENERIC(StateName, s2s_out_timeout, StateData);
760 handle_info(terminate_if_waiting_before_retry, wait_before_retry, StateData) ->
761
:-(
?CLOSE_GENERIC(wait_before_retry, terminate_if_waiting_before_retry, StateData);
762 handle_info(terminate_if_waiting_before_retry, StateName, StateData) ->
763
:-(
{next_state, StateName, StateData, get_timeout_interval(StateName)};
764 handle_info(_, StateName, StateData) ->
765
:-(
{next_state, StateName, StateData, get_timeout_interval(StateName)}.
766
767 %%----------------------------------------------------------------------
768 %% Func: terminate/3
769 %% Purpose: Shutdown the fsm
770 %% Returns: any
771 %%----------------------------------------------------------------------
772 terminate(Reason, StateName, StateData) ->
773 6 ?LOG_DEBUG(#{what => s2s_out_closed, text => <<"ejabberd_s2s_out terminated">>,
774 reason => Reason, state_name => StateName,
775 6 myname => StateData#state.myname, server => StateData#state.server}),
776 6 case StateData#state.new of
777 false ->
778
:-(
ok;
779 true ->
780 6 ejabberd_s2s:remove_connection(
781 {StateData#state.myname, StateData#state.server}, self())
782 end,
783 6 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"Bounced by s2s">>),
784 %% bounce queue manage by process and Erlang message queue
785 6 bounce_queue(StateData#state.queue, E),
786 6 bounce_messages(E),
787 6 case StateData#state.socket of
788 undefined ->
789 6 ok;
790 _Socket ->
791
:-(
ejabberd_socket:close(StateData#state.socket)
792 end,
793 6 ok.
794
795 %%----------------------------------------------------------------------
796 %% Func: print_state/1
797 %% Purpose: Prepare the state to be printed on error log
798 %% Returns: State to print
799 %%----------------------------------------------------------------------
800 print_state(State) ->
801
:-(
State.
802
803 %%%----------------------------------------------------------------------
804 %%% Internal functions
805 %%%----------------------------------------------------------------------
806
807 -spec send_text(state(), binary()) -> 'ok'.
808 send_text(StateData, Text) ->
809
:-(
ejabberd_socket:send(StateData#state.socket, Text).
810
811
812 -spec send_element(state(), exml:element()|mongoose_acc:t()) -> 'ok'.
813 send_element(StateData, #xmlel{} = El) ->
814
:-(
send_text(StateData, exml:to_binary(El)).
815
816 -spec send_element(state(), mongoose_acc:t(), exml:element()) -> mongoose_acc:t().
817 send_element(StateData, Acc, El) ->
818
:-(
send_text(StateData, exml:to_binary(El)),
819
:-(
Acc.
820
821
822 -spec send_queue(state(), Q :: element_queue()) -> 'ok'.
823 send_queue(StateData, Q) ->
824
:-(
case queue:out(Q) of
825 {{value, {Acc, El}}, Q1} ->
826
:-(
send_element(StateData, Acc, El),
827
:-(
send_queue(StateData, Q1);
828 {empty, _Q1} ->
829
:-(
ok
830 end.
831
832
833 %% @doc Bounce a single message (xmlel)
834 -spec bounce_element(Acc :: mongoose_acc:t(), El :: exml:element(), Error :: exml:element()) -> 'ok'.
835 bounce_element(Acc, El, Error) ->
836 24 case mongoose_acc:stanza_type(Acc) of
837
:-(
<<"error">> -> ok;
838
:-(
<<"result">> -> ok;
839 _ ->
840 24 From = mongoose_acc:from_jid(Acc),
841 24 To = mongoose_acc:to_jid(Acc),
842 24 {Acc1, Err} = jlib:make_error_reply(Acc, El, Error),
843 24 ejabberd_router:route(To, From, Acc1, Err)
844 end.
845
846
847 -spec bounce_queue(Q :: element_queue(), Error :: exml:element()) -> 'ok'.
848 bounce_queue(Q, Error) ->
849 11 case queue:out(Q) of
850 {{value, {Acc, El}}, Q1} ->
851
:-(
bounce_element(Acc, El, Error),
852
:-(
bounce_queue(Q1, Error);
853 {empty, _} ->
854 11 ok
855 end.
856
857
858 -spec new_id() -> binary().
859 new_id() ->
860
:-(
mongoose_bin:gen_from_crypto().
861
862
863 -spec cancel_timer(reference()) -> 'ok'.
864 cancel_timer(Timer) ->
865 5 erlang:cancel_timer(Timer),
866 5 receive
867 {timeout, Timer, _} ->
868
:-(
ok
869 after 0 ->
870 5 ok
871 end.
872
873
874 -spec bounce_messages(exml:element()) -> 'ok'.
875 bounce_messages(Error) ->
876 19 receive
877 {send_element, Acc, El} ->
878 8 bounce_element(Acc, El, Error),
879 8 bounce_messages(Error)
880 after 0 ->
881 11 ok
882 end.
883
884
885 -spec send_db_request(state()) -> fsm_return().
886 send_db_request(StateData) ->
887
:-(
Server = StateData#state.server,
888
:-(
New = case StateData#state.new of
889 false ->
890
:-(
ejabberd_s2s:try_register(
891 {StateData#state.myname, Server});
892 true ->
893
:-(
true
894 end,
895
:-(
NewStateData = StateData#state{new = New},
896
:-(
try
897
:-(
case New of
898 false ->
899
:-(
ok;
900 true ->
901
:-(
Key1 = ejabberd_s2s:key(
902 {StateData#state.myname, Server},
903 StateData#state.remote_streamid),
904
:-(
send_element(StateData,
905 #xmlel{name = <<"db:result">>,
906 attrs = [{<<"from">>, StateData#state.myname},
907 {<<"to">>, Server}],
908 children = [#xmlcdata{content = Key1}]})
909 end,
910
:-(
case StateData#state.verify of
911 false ->
912
:-(
ok;
913 {_Pid, Key2, SID} ->
914
:-(
send_element(StateData,
915 #xmlel{name = <<"db:verify">>,
916 attrs = [{<<"from">>, StateData#state.myname},
917 {<<"to">>, StateData#state.server},
918 {<<"id">>, SID}],
919 children = [#xmlcdata{content = Key2}]})
920 end,
921
:-(
{next_state, wait_for_validation, NewStateData, ?FSMTIMEOUT*6}
922 catch
923 Class:Reason:Stacktrace ->
924
:-(
?LOG_ERROR(#{what => s2s_out_send_db_request_failed,
925 class => Class, reason => Reason, stacktrace => Stacktrace,
926
:-(
myname => StateData#state.myname, server => StateData#state.server}),
927
:-(
{stop, normal, NewStateData}
928 end.
929
930
931 -spec is_verify_res(exml:element()) -> 'false' | {'result', _, _, _, _} | {'verify', _, _, _, _}.
932 is_verify_res(#xmlel{name = Name,
933 attrs = Attrs}) when Name == <<"db:result">> ->
934
:-(
{result,
935 xml:get_attr_s(<<"to">>, Attrs),
936 xml:get_attr_s(<<"from">>, Attrs),
937 xml:get_attr_s(<<"id">>, Attrs),
938 xml:get_attr_s(<<"type">>, Attrs)};
939 is_verify_res(#xmlel{name = Name,
940 attrs = Attrs}) when Name == <<"db:verify">> ->
941
:-(
{verify,
942 xml:get_attr_s(<<"to">>, Attrs),
943 xml:get_attr_s(<<"from">>, Attrs),
944 xml:get_attr_s(<<"id">>, Attrs),
945 xml:get_attr_s(<<"type">>, Attrs)};
946 is_verify_res(_) ->
947
:-(
false.
948
949
950 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
951 %% SRV support
952
953 -include_lib("kernel/include/inet.hrl").
954
955 -spec lookup_services(jid:server()) -> [addr()].
956 lookup_services(Server) ->
957 5 case ejabberd_s2s:domain_utf8_to_ascii(Server) of
958
:-(
false -> [];
959 5 ASCIIAddr -> do_lookup_services(ASCIIAddr)
960 end.
961
962 -spec do_lookup_services(jid:server()) -> [addr()].
963 do_lookup_services(Server) ->
964 5 Res = srv_lookup(Server),
965 5 case Res of
966 {error, Reason} ->
967 5 ?LOG_DEBUG(#{what => s2s_srv_lookup_failed,
968 5 reason => Reason, server => Server}),
969 5 [];
970 {ok, #hostent{h_addr_list = AddrList, h_addrtype = Type}} ->
971 %% Probabilities are not exactly proportional to weights
972 %% for simplicity (higher weights are overvalued)
973
:-(
case (catch lists:map(fun calc_addr_index/1, AddrList)) of
974 {'EXIT', _Reason} ->
975
:-(
[];
976 IndexedAddrs ->
977
:-(
Addrs = [#{ip_tuple => Addr, port => Port, type => Type}
978
:-(
|| {_Index, Addr, Port} <- lists:keysort(1, IndexedAddrs)],
979
:-(
?LOG_DEBUG(#{what => s2s_srv_lookup_success,
980
:-(
addresses => Addrs, server => Server}),
981
:-(
Addrs
982 end
983 end.
984
985
986 -spec srv_lookup(jid:server()) -> {'error', atom()} | {'ok', inet:hostent()}.
987 srv_lookup(Server) ->
988 5 #{timeout := TimeoutSec, retries := Retries} = mongoose_config:get_opt(s2s_dns),
989 5 srv_lookup(Server, timer:seconds(TimeoutSec), Retries).
990
991
992 %% @doc XXX - this behaviour is suboptimal in the case that the domain
993 %% has a "_xmpp-server._tcp." but not a "_jabber._tcp." record and
994 %% we don't get a DNS reply for the "_xmpp-server._tcp." lookup. In this
995 %% case we'll give up when we get the "_jabber._tcp." nxdomain reply.
996 -spec srv_lookup(jid:server(),
997 Timeout :: non_neg_integer(),
998 Retries :: pos_integer()
999 ) -> {'error', atom()} | {'ok', inet:hostent()}.
1000 srv_lookup(_Server, _Timeout, Retries) when Retries < 1 ->
1001
:-(
{error, timeout};
1002 srv_lookup(Server, Timeout, Retries) ->
1003 5 case inet_res:getbyname("_xmpp-server._tcp." ++ binary_to_list(Server), srv, Timeout) of
1004 {error, _Reason} ->
1005 5 case inet_res:getbyname("_jabber._tcp." ++ binary_to_list(Server), srv, Timeout) of
1006 {error, timeout} ->
1007
:-(
?LOG_ERROR(#{what => s2s_dns_lookup_failed,
1008 text => <<"The DNS servers timed out on request for IN SRV."
1009 " You should check your DNS configuration.">>,
1010 nameserver => inet_db:res_option(nameserver),
1011
:-(
server => Server}),
1012
:-(
srv_lookup(Server, Timeout, Retries - 1);
1013 5 R -> R
1014 end;
1015
:-(
{ok, _HEnt} = R -> R
1016 end.
1017
1018 -spec lookup_addrs(jid:server()) -> [addr()].
1019 lookup_addrs(Server) ->
1020 5 Port = outgoing_s2s_port(),
1021 5 lists:foldl(fun(Type, []) ->
1022 10 [#{ip_tuple => Addr, port => Port, type => Type}
1023 10 || Addr <- lookup_addrs(Server, Type)];
1024 (_Type, Addrs) ->
1025
:-(
Addrs
1026 end, [], outgoing_s2s_types()).
1027
1028 -spec lookup_addrs(jid:lserver(), inet | inet6) -> [inet:ip_address()].
1029 lookup_addrs(Server, Type) ->
1030 10 case inet:gethostbyname(binary_to_list(Server), Type) of
1031 {ok, #hostent{h_addr_list = Addrs}} ->
1032
:-(
?LOG_DEBUG(#{what => s2s_srv_resolve_success,
1033
:-(
type => Type, server => Server, addresses => Addrs}),
1034
:-(
Addrs;
1035 {error, Reason} ->
1036 10 ?LOG_DEBUG(#{what => s2s_srv_resolve_failed,
1037 10 type => Type, server => Server, reason => Reason}),
1038 10 []
1039 end.
1040
1041
1042 -spec outgoing_s2s_port() -> inet:port_number().
1043 outgoing_s2s_port() ->
1044 5 mongoose_config:get_opt([s2s_outgoing, port]).
1045
1046
1047 -spec outgoing_s2s_types() -> [inet | inet6, ...].
1048 outgoing_s2s_types() ->
1049 %% DISCUSSION: Why prefer IPv4 first?
1050 %%
1051 %% IPv4 connectivity will be available for everyone for
1052 %% many years to come. So, there's absolutely no benefit
1053 %% in preferring IPv6 connections which are flaky at best
1054 %% nowadays.
1055 %%
1056 %% On the other hand content providers hesitate putting up
1057 %% AAAA records for their sites due to the mentioned
1058 %% quality of current IPv6 connectivity. Making IPv6 the a
1059 %% `fallback' may avoid these problems elegantly.
1060 5 [ip_version_to_type(V) || V <- mongoose_config:get_opt([s2s_outgoing, ip_versions])].
1061
1062 5 ip_version_to_type(4) -> inet;
1063 5 ip_version_to_type(6) -> inet6.
1064
1065 -spec outgoing_s2s_timeout() -> non_neg_integer() | infinity.
1066 outgoing_s2s_timeout() ->
1067
:-(
mongoose_config:get_opt([s2s_outgoing, connection_timeout], 10000).
1068
1069 %% @doc Human readable S2S logging: Log only new outgoing connections as INFO
1070 %% Do not log dialback
1071
:-(
log_s2s_out(false, _, _, _) -> ok;
1072 %% Log new outgoing connections:
1073 log_s2s_out(_, Myname, Server, Tls) ->
1074 5 ?LOG_INFO(#{what => s2s_out,
1075 text => <<"Trying to open s2s connection">>,
1076 5 myname => Myname, server => Server, tls => Tls}).
1077
1078 %% @doc Calculate timeout depending on which state we are in:
1079 %% Can return integer > 0 | infinity
1080 -spec get_timeout_interval(statename()) -> 'infinity' | non_neg_integer().
1081 get_timeout_interval(StateName) ->
1082
:-(
case StateName of
1083 %% Validation implies dialback: Networking can take longer:
1084 wait_for_validation ->
1085
:-(
?FSMTIMEOUT*6;
1086 %% When stream is established, we only rely on S2S Timeout timer:
1087 stream_established ->
1088
:-(
infinity;
1089 _ ->
1090
:-(
?FSMTIMEOUT
1091 end.
1092
1093
1094 %% @doc This function is intended to be called at the end of a state
1095 %% function that want to wait for a reconnect delay before stopping.
1096 -spec wait_before_reconnect(state()) -> fsm_return().
1097 wait_before_reconnect(StateData) ->
1098 5 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s (waiting)">>),
1099 %% bounce queue manage by process and Erlang message queue
1100 5 bounce_queue(StateData#state.queue, E),
1101 5 bounce_messages(E),
1102 5 cancel_timer(StateData#state.timer),
1103 5 Delay = case StateData#state.delay_to_retry of
1104 undefined_delay ->
1105 %% The initial delay is random between 1 and 15 seconds
1106 %% Return a random integer between 1000 and 15000
1107 5 MicroSecs = erlang:system_time(microsecond),
1108 5 (MicroSecs rem 14000) + 1000;
1109 D1 ->
1110 %% Duplicate the delay with each successive failed
1111 %% reconnection attempt, but don't exceed the max
1112
:-(
lists:min([D1 * 2, get_max_retry_delay()])
1113 end,
1114 5 Timer = erlang:start_timer(Delay, self(), []),
1115 5 {next_state, wait_before_retry, StateData#state{timer=Timer,
1116 delay_to_retry = Delay,
1117 queue = queue:new()}}.
1118
1119
1120 %% @doc Get the maximum allowed delay for retry to reconnect (in milliseconds).
1121 %% The default value is 5 minutes.
1122 %% The option {s2s_max_retry_delay, Seconds} can be used (in seconds).
1123 get_max_retry_delay() ->
1124
:-(
mongoose_config:get_opt(s2s_max_retry_delay, ?MAX_RETRY_DELAY) * 1000.
1125
1126
1127 %% @doc Terminate s2s_out connections that are in state wait_before_retry
1128 terminate_if_waiting_delay(From, To) ->
1129
:-(
FromTo = {From, To},
1130
:-(
Pids = ejabberd_s2s:get_connections_pids(FromTo),
1131
:-(
lists:foreach(
1132 fun(Pid) ->
1133
:-(
Pid ! terminate_if_waiting_before_retry
1134 end,
1135 Pids).
1136
1137
1138 -spec fsm_limit_opts() -> [{'max_queue', integer()}].
1139 fsm_limit_opts() ->
1140 6 case mongoose_config:lookup_opt(max_fsm_queue) of
1141 {ok, N} ->
1142 6 [{max_queue, N}];
1143 {error, not_found} ->
1144
:-(
[]
1145 end.
1146
1147 -spec get_addr_list(jid:server()) -> [addr()].
1148 get_addr_list(Server) ->
1149 5 lists:foldl(fun(F, []) -> F(Server);
1150
:-(
(_, Result) -> Result
1151 end, [], [fun get_predefined_addresses/1,
1152 fun lookup_services/1,
1153 fun lookup_addrs/1]).
1154
1155 %% @doc Get IPs predefined for a given s2s domain in the configuration
1156 -spec get_predefined_addresses(jid:server()) -> [addr()].
1157 get_predefined_addresses(Server) ->
1158 5 case mongoose_config:lookup_opt([s2s_address, Server]) of
1159 {ok, #{ip_address := IPAddress} = M} ->
1160
:-(
{ok, IPTuple} = inet:parse_address(IPAddress),
1161
:-(
Port = get_predefined_port(M),
1162
:-(
[#{ip_tuple => IPTuple, port => Port, type => addr_type(IPTuple)}];
1163 {error, not_found} ->
1164 5 []
1165 end.
1166
1167
:-(
get_predefined_port(#{port := Port}) -> Port;
1168
:-(
get_predefined_port(_) -> outgoing_s2s_port().
1169
1170
:-(
addr_type(Addr) when tuple_size(Addr) =:= 4 -> inet;
1171
:-(
addr_type(Addr) when tuple_size(Addr) =:= 8 -> inet6.
1172
1173 send_event(<<"valid">>, Pid, StateData) ->
1174
:-(
p1_fsm:send_event(
1175 Pid, {valid,
1176 StateData#state.server,
1177 StateData#state.myname});
1178 send_event(_, Pid, StateData) ->
1179
:-(
p1_fsm:send_event(
1180 Pid, {invalid,
1181 StateData#state.server,
1182 StateData#state.myname}).
1183
1184 get_acc_with_new_sext(?NS_SASL, Els1, {_SEXT, STLS, STLSReq}) ->
1185
:-(
NewSEXT =
1186 lists:any(
1187 fun(#xmlel{name = <<"mechanism">>,
1188 children = Els2}) ->
1189
:-(
case xml:get_cdata(Els2) of
1190
:-(
<<"EXTERNAL">> -> true;
1191
:-(
_ -> false
1192 end;
1193
:-(
(_) -> false
1194 end, Els1),
1195
:-(
{NewSEXT, STLS, STLSReq};
1196 get_acc_with_new_sext(_, _, Acc) ->
1197
:-(
Acc.
1198
1199 get_acc_with_new_tls(?NS_TLS, El1, {SEXT, _STLS, _STLSReq}) ->
1200
:-(
Req = case xml:get_subtag(El1, <<"required">>) of
1201
:-(
#xmlel{} -> true;
1202
:-(
false -> false
1203 end,
1204
:-(
{SEXT, true, Req};
1205 get_acc_with_new_tls(_, _, Acc) ->
1206
:-(
Acc.
1207
1208 get_tls_opts_with_certfile(StateData) ->
1209
:-(
case mongoose_config:lookup_opt([domain_certfile, StateData#state.myname]) of
1210 {error, not_found} ->
1211
:-(
StateData#state.tls_options;
1212 {ok, CertFile} ->
1213
:-(
lists:keystore(certfile, 1, StateData#state.tls_options, {certfile, CertFile})
1214 end.
1215
1216 get_tls_opts_with_ciphers(TLSOpts) ->
1217
:-(
case mongoose_config:lookup_opt(s2s_ciphers) of
1218 {error, not_found} ->
1219
:-(
TLSOpts;
1220 {ok, Ciphers} ->
1221
:-(
[{ciphers, Ciphers} | TLSOpts]
1222 end.
1223
1224 calc_addr_index({Priority, Weight, Port, Host}) ->
1225
:-(
N = case Weight of
1226
:-(
0 -> 0;
1227
:-(
_ -> (Weight + 1) * rand:uniform()
1228 end,
1229
:-(
{Priority * 65536 - N, Host, Port}.
1230
1231 handle_parsed_features({false, false, _, StateData = #state{authenticated = true}}) ->
1232
:-(
send_queue(StateData, StateData#state.queue),
1233
:-(
?LOG_INFO(#{what => s2s_out_connected,
1234 text => <<"New outgoing s2s connection established">>,
1235
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1236
:-(
mongoose_hooks:s2s_connect_hook(StateData#state.myname,
1237 StateData#state.server),
1238
:-(
{next_state, stream_established,
1239 StateData#state{queue = queue:new()}};
1240 handle_parsed_features({true, _, _, StateData = #state{try_auth = true, new = New}}) when
1241 New /= false ->
1242
:-(
send_element(StateData,
1243 #xmlel{name = <<"auth">>,
1244 attrs = [{<<"xmlns">>, ?NS_SASL},
1245 {<<"mechanism">>, <<"EXTERNAL">>}],
1246 children =
1247 [#xmlcdata{content = jlib:encode_base64(
1248 StateData#state.myname)}]}),
1249
:-(
{next_state, wait_for_auth_result,
1250 StateData#state{try_auth = false}, ?FSMTIMEOUT};
1251 handle_parsed_features({_, true, _, StateData = #state{tls = true, tls_enabled = false}}) ->
1252
:-(
send_element(StateData,
1253 #xmlel{name = <<"starttls">>,
1254 attrs = [{<<"xmlns">>, ?NS_TLS}]}),
1255
:-(
{next_state, wait_for_starttls_proceed, StateData,
1256 ?FSMTIMEOUT};
1257 handle_parsed_features({_, _, true, StateData = #state{tls = false}}) ->
1258
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1259
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1260
:-(
ejabberd_socket:close(StateData#state.socket),
1261
:-(
{next_state, reopen_socket,
1262 StateData#state{socket = undefined,
1263 use_v10 = false}, ?FSMTIMEOUT};
1264 handle_parsed_features({_, _, _, StateData = #state{db_enabled = true}}) ->
1265
:-(
send_db_request(StateData);
1266 handle_parsed_features({_, _, _, StateData}) ->
1267
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1268
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1269 % TODO: clear message queue
1270
:-(
ejabberd_socket:close(StateData#state.socket),
1271
:-(
{next_state, reopen_socket, StateData#state{socket = undefined,
1272 use_v10 = false}, ?FSMTIMEOUT}.
Line Hits Source