./ct_report/coverage/ejabberd_s2s_out.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : ejabberd_s2s_out.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Manage outgoing server-to-server connections
5 %%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(ejabberd_s2s_out).
27 -author('alexey@process-one.net').
28 -behaviour(p1_fsm).
29
30 %% External exports
31 -export([start/3,
32 start_link/3,
33 start_connection/1,
34 terminate_if_waiting_delay/2,
35 stop_connection/1]).
36
37 %% p1_fsm callbacks (same as gen_fsm)
38 -export([init/1,
39 open_socket/2,
40 wait_for_stream/2,
41 wait_for_validation/2,
42 wait_for_features/2,
43 wait_for_auth_result/2,
44 wait_for_starttls_proceed/2,
45 relay_to_bridge/2,
46 reopen_socket/2,
47 wait_before_retry/2,
48 stream_established/2,
49 handle_event/3,
50 handle_sync_event/4,
51 handle_info/3,
52 terminate/3,
53 print_state/1,
54 code_change/4]).
55
56 -ignore_xref([open_socket/2, print_state/1, relay_to_bridge/2,
57 reopen_socket/2, start_link/3, stream_established/2,
58 wait_before_retry/2, wait_for_auth_result/2,
59 wait_for_features/2, wait_for_starttls_proceed/2, wait_for_stream/2,
60 wait_for_stream/2, wait_for_validation/2]).
61
62 -include("mongoose.hrl").
63 -include("jlib.hrl").
64
65 -record(state, {socket,
66 streamid,
67 remote_streamid = <<>>,
68 use_v10,
69 tls = false :: boolean(),
70 tls_required = false :: boolean(),
71 tls_enabled = false :: boolean(),
72 tls_options = [connect] :: list(),
73 authenticated = false :: boolean(),
74 db_enabled = true :: boolean(),
75 try_auth = true :: boolean(),
76 myname, server, queue,
77 host_type :: mongooseim:host_type(),
78 delay_to_retry = undefined_delay,
79 new = false :: boolean(),
80 verify = false :: false | {pid(), Key :: binary(), SID :: binary()},
81 bridge,
82 timer :: reference()
83 }).
84 -type state() :: #state{}.
85
86 -type element_queue() :: queue:queue(#xmlel{}).
87 -type statename() :: open_socket
88 | wait_for_stream
89 | wait_for_features
90 | wait_for_auth_result
91 | wait_for_starttls_proceed
92 | wait_for_validation
93 | wait_before_retry
94 | relay_to_bridge.
95 %% FSM handler return value
96 -type fsm_return() :: {'stop', Reason :: 'normal', state()}
97 | {'next_state', statename(), state()}
98 | {'next_state', statename(), state(), Timeout :: integer()}.
99
100 -type addr() :: #{ip_tuple := inet:ip_adress(),
101 port := inet:port_number(),
102 type := inet | inet6}.
103
104 %%-define(DBGFSM, true).
105
106 -ifdef(DBGFSM).
107 -define(FSMOPTS, [{debug, [trace]}]).
108 -else.
109 -define(FSMOPTS, []).
110 -endif.
111
112 %% Module start with or without supervisor:
113 -ifdef(NO_TRANSIENT_SUPERVISORS).
114 -define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_out, [From, Host, Type],
115 fsm_limit_opts() ++ ?FSMOPTS)).
116 -else.
117 -define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_out_sup,
118 [From, Host, Type])).
119 -endif.
120
121 -define(FSMTIMEOUT, 30000).
122
123 %% We do not block on send anymore.
124 -define(TCP_SEND_TIMEOUT, 15000).
125
126 -define(STREAM_HEADER(From, To, Other),
127 <<"<?xml version='1.0'?>",
128 "<stream:stream "
129 "xmlns:stream='http://etherx.jabber.org/streams' "
130 "xmlns='jabber:server' "
131 "xmlns:db='jabber:server:dialback' "
132 "from='", (From)/binary, "' ",
133 "to='", (To)/binary, "' ",
134 (Other)/binary, ">">>
135 ).
136
137 -define(SOCKET_DEFAULT_RESULT, {error, badarg}).
138
139
140 -define(CLOSE_GENERIC(StateName, Reason, StateData),
141 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection">>,
142 state_name => StateName, reason => Reason,
143 myname => StateData#state.myname, server => StateData#state.server}),
144 {stop, normal, StateData}).
145
146 -define(CLOSE_GENERIC(StateName, Reason, El, StateData),
147 ?LOG_INFO(#{what => s2s_out_closing, text => <<"Closing s2s connection on stanza">>,
148 state_name => StateName, reason => Reason, exml_packet => El,
149 myname => StateData#state.myname, server => StateData#state.server}),
150 {stop, normal, StateData}).
151
152 %%%----------------------------------------------------------------------
153 %%% API
154 %%%----------------------------------------------------------------------
155 -spec start(_, _, _) -> {'error', _} | {'ok', 'undefined' | pid()} | {'ok', 'undefined' | pid(), _}.
156 start(From, Host, Type) ->
157 56 ?SUPERVISOR_START.
158
159
160 -spec start_link(_, _, _) -> 'ignore' | {'error', _} | {'ok', pid()}.
161 start_link(From, Host, Type) ->
162 56 p1_fsm:start_link(ejabberd_s2s_out, [From, Host, Type],
163 fsm_limit_opts() ++ ?FSMOPTS).
164
165
166 start_connection(Pid) ->
167 52 p1_fsm:send_event(Pid, init).
168
169
170 stop_connection(Pid) ->
171 53 p1_fsm:send_event(Pid, closed).
172
173 %%%----------------------------------------------------------------------
174 %%% Callback functions from p1_fsm
175 %%%----------------------------------------------------------------------
176
177 %%----------------------------------------------------------------------
178 %% Func: init/1
179 %% Returns: {ok, StateName, StateData} |
180 %% {ok, StateName, StateData, Timeout} |
181 %% ignore |
182 %% {stop, StopReason}
183 %%----------------------------------------------------------------------
184 -spec init([any(), ...]) -> {'ok', 'open_socket', state()}.
185 init([From, Server, Type]) ->
186 56 process_flag(trap_exit, true),
187 56 ?LOG_DEBUG(#{what => s2s_out_started,
188 text => <<"New outgoing s2s connection">>,
189 56 from => From, server => Server, type => Type}),
190 56 {ok, HostType} = mongoose_domain_api:get_host_type(From),
191 56 {TLS, TLSRequired} = case mongoose_config:get_opt([{s2s, HostType}, use_starttls]) of
192 UseTls when (UseTls==false) ->
193 23 {false, false};
194 UseTls when (UseTls==true) or (UseTls==optional) ->
195 21 {true, false};
196 UseTls when (UseTls==required) or (UseTls==required_trusted) ->
197 12 {true, true}
198 end,
199 56 UseV10 = TLS,
200 56 {New, Verify} = case Type of
201 new ->
202 28 {true, false};
203 {verify, Pid, Key, SID} ->
204 28 start_connection(self()),
205 28 {false, {Pid, Key, SID}}
206 end,
207 56 Timer = erlang:start_timer(ejabberd_s2s:timeout(), self(), []),
208 56 {ok, open_socket, #state{use_v10 = UseV10,
209 tls = TLS,
210 tls_required = TLSRequired,
211 tls_options = tls_options(HostType),
212 queue = queue:new(),
213 myname = From,
214 host_type = HostType,
215 server = Server,
216 new = New,
217 verify = Verify,
218 timer = Timer}}.
219
220 %%----------------------------------------------------------------------
221 %% Func: StateName/2
222 %% Returns: {next_state, NextStateName, NextStateData} |
223 %% {next_state, NextStateName, NextStateData, Timeout} |
224 %% {stop, Reason, NewStateData}
225 %%----------------------------------------------------------------------
226 -spec open_socket(_, state()) -> fsm_return().
227 open_socket(init, StateData = #state{host_type = HostType}) ->
228 52 log_s2s_out(StateData#state.new,
229 StateData#state.myname,
230 StateData#state.server,
231 StateData#state.tls),
232 52 ?LOG_DEBUG(#{what => s2s_open_socket,
233 myname => StateData#state.myname,
234 server => StateData#state.server,
235 new => StateData#state.new,
236 52 verify => StateData#state.verify}),
237 52 AddrList = get_addr_list(HostType, StateData#state.server),
238 52 case lists:foldl(fun(_, {ok, Socket}) ->
239
:-(
{ok, Socket};
240 (#{ip_tuple := Addr, port := Port, type := Type}, _) ->
241 44 open_socket2(HostType, Type, Addr, Port)
242 end, ?SOCKET_DEFAULT_RESULT, AddrList) of
243 {ok, Socket} ->
244 44 Version = case StateData#state.use_v10 of
245 24 true -> <<" version='1.0'">>;
246 20 false -> <<"">>
247 end,
248 44 NewStateData = StateData#state{socket = Socket,
249 tls_enabled = false,
250 streamid = new_id()},
251 44 send_text(NewStateData,
252 ?STREAM_HEADER(StateData#state.myname, StateData#state.server, Version)),
253 44 {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
254 {error, Reason} ->
255 8 ?LOG_WARNING(#{what => s2s_out_failed, reason => Reason,
256 text => <<"Outgoing s2s connection failed (remote server not found)">>,
257
:-(
myname => StateData#state.myname, server => StateData#state.server}),
258 8 case mongoose_hooks:find_s2s_bridge(StateData#state.myname,
259 StateData#state.server) of
260 {Mod, Fun, Type} ->
261
:-(
?LOG_INFO(#{what => s2s_out_bridge_found,
262 text => <<"Found a bridge, relay_to_bridge next.">>,
263 type => Type,
264
:-(
myname => StateData#state.myname, server => StateData#state.server}),
265
:-(
NewStateData = StateData#state{bridge={Mod, Fun}},
266
:-(
{next_state, relay_to_bridge, NewStateData};
267 _ ->
268 8 wait_before_reconnect(StateData)
269 end
270 end;
271 open_socket(closed, StateData) ->
272 4 ?CLOSE_GENERIC(open_socket, closed, StateData);
273 open_socket(timeout, StateData) ->
274
:-(
?CLOSE_GENERIC(open_socket, timeout, StateData);
275 open_socket(_, StateData) ->
276
:-(
{next_state, open_socket, StateData}.
277
278 -spec open_socket2(mongooseim:host_type(), inet | inet6, inet:ip_address(), inet:port_number()) ->
279 {'error', _} | {'ok', _}.
280 open_socket2(HostType, Type, Addr, Port) ->
281 44 ?LOG_DEBUG(#{what => s2s_out_connecting,
282 44 address => Addr, port => Port}),
283 44 Timeout = outgoing_s2s_timeout(HostType),
284 44 SockOpts = [binary,
285 {packet, 0},
286 {send_timeout, ?TCP_SEND_TIMEOUT},
287 {send_timeout_close, true},
288 {active, false},
289 Type],
290
291 44 case (catch ejabberd_socket:connect(Addr, Port, SockOpts, Timeout)) of
292 44 {ok, _Socket} = R -> R;
293 {error, Reason} = R ->
294
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
295
:-(
address => Addr, port => Port, reason => Reason}),
296
:-(
R;
297 {'EXIT', Reason} ->
298
:-(
?LOG_DEBUG(#{what => s2s_out_failed,
299 text => <<"Failed to open s2s socket because of crashing">>,
300
:-(
address => Addr, port => Port, reason => Reason}),
301
:-(
{error, Reason}
302 end.
303
304 %%----------------------------------------------------------------------
305
306 -spec wait_for_stream(ejabberd:xml_stream_item(), state()) -> fsm_return().
307 wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData0) ->
308 62 RemoteStreamID = xml:get_attr_s(<<"id">>, Attrs),
309 62 StateData = StateData0#state{remote_streamid = RemoteStreamID},
310 62 case {xml:get_attr_s(<<"xmlns">>, Attrs),
311 xml:get_attr_s(<<"xmlns:db">>, Attrs),
312 xml:get_attr_s(<<"version">>, Attrs) == <<"1.0">>} of
313 {<<"jabber:server">>, <<"jabber:server:dialback">>, false} ->
314 28 send_db_request(StateData);
315 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
316 StateData#state.use_v10 ->
317 34 {next_state, wait_for_features, StateData, ?FSMTIMEOUT};
318 %% Clause added to handle Tigase's workaround for an old ejabberd bug:
319 {<<"jabber:server">>, <<"jabber:server:dialback">>, true} when
320 not StateData#state.use_v10 ->
321
:-(
send_db_request(StateData);
322 {<<"jabber:server">>, <<"">>, true} when StateData#state.use_v10 ->
323
:-(
{next_state, wait_for_features, StateData#state{db_enabled = false}, ?FSMTIMEOUT};
324 {NSProvided, DB, _} ->
325
:-(
send_text(StateData, exml:to_binary(mongoose_xmpp_errors:invalid_namespace())),
326
:-(
?LOG_INFO(#{what => s2s_out_closing,
327 text => <<"Closing s2s connection: (invalid namespace)">>,
328 namespace_provided => NSProvided,
329 namespace_expected => <<"jabber:server">>,
330 xmlnsdb_provided => DB,
331 all_attributes => Attrs,
332
:-(
myname => StateData#state.myname, server => StateData#state.server}),
333
:-(
{stop, normal, StateData}
334 end;
335 wait_for_stream({xmlstreamerror, _}, StateData) ->
336
:-(
send_text(StateData,
337 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
338
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamerror, StateData);
339 wait_for_stream({xmlstreamend, _Name}, StateData) ->
340
:-(
?CLOSE_GENERIC(wait_for_stream, xmlstreamend, StateData);
341 wait_for_stream(timeout, StateData) ->
342
:-(
?CLOSE_GENERIC(wait_for_stream, timeout, StateData);
343 wait_for_stream(closed, StateData) ->
344
:-(
?CLOSE_GENERIC(wait_for_stream, closed, StateData).
345
346
347 -spec wait_for_validation(ejabberd:xml_stream_item(), state()) -> fsm_return().
348 wait_for_validation({xmlstreamelement, El}, StateData) ->
349 54 case is_verify_res(El) of
350 {result, To, From, Id, Type} ->
351 27 ?LOG_DEBUG(#{what => s2s_receive_result,
352 27 from => From, to => To, message_id => Id, type => Type}),
353 27 case {Type, StateData#state.tls_enabled, StateData#state.tls_required} of
354 {<<"valid">>, Enabled, Required} when (Enabled==true) or (Required==false) ->
355 23 send_queue(StateData, StateData#state.queue),
356 23 ?LOG_INFO(#{what => s2s_out_connected,
357 text => <<"New outgoing s2s connection established">>,
358 tls_enabled => StateData#state.tls_enabled,
359 23 myname => StateData#state.myname, server => StateData#state.server}),
360 23 mongoose_hooks:s2s_connect_hook(StateData#state.myname,
361 StateData#state.server),
362 23 {next_state, stream_established,
363 StateData#state{queue = queue:new()}};
364 {<<"valid">>, Enabled, Required} when (Enabled==false) and (Required==true) ->
365 %% TODO: bounce packets
366 4 ?CLOSE_GENERIC(wait_for_validation, tls_required_but_unavailable, El, StateData);
367 _ ->
368 %% TODO: bounce packets
369
:-(
?CLOSE_GENERIC(wait_for_validation, invalid_dialback_key, El, StateData)
370 end;
371 {verify, To, From, Id, Type} ->
372 27 ?LOG_DEBUG(#{what => s2s_receive_verify,
373 27 from => From, to => To, message_id => Id, type => Type}),
374 27 case StateData#state.verify of
375 false ->
376
:-(
NextState = wait_for_validation,
377 %% TODO: Should'nt we close the connection here ?
378
:-(
{next_state, NextState, StateData,
379 get_timeout_interval(NextState)};
380 {Pid, _Key, _SID} ->
381 27 send_event(Type, Pid, StateData),
382 27 NextState = wait_for_validation,
383 27 {next_state, NextState, StateData,
384 get_timeout_interval(NextState)}
385
386 end;
387 _ ->
388
:-(
{next_state, wait_for_validation, StateData, ?FSMTIMEOUT*3}
389 end;
390 wait_for_validation({xmlstreamend, _Name}, StateData) ->
391
:-(
?CLOSE_GENERIC(wait_for_validation, xmlstreamend, StateData);
392 wait_for_validation({xmlstreamerror, _}, StateData) ->
393
:-(
send_text(StateData,
394 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
395
:-(
?CLOSE_GENERIC(wait_for_validation, xmlstreamerror, StateData);
396 wait_for_validation(timeout, #state{verify = {VPid, VKey, SID}} = StateData)
397 when is_pid(VPid) and is_binary(VKey) and is_binary(SID) ->
398 %% This is an auxiliary s2s connection for dialback.
399 %% This timeout is normal and doesn't represent a problem.
400
:-(
?LOG_INFO(#{what => s2s_out_validation_timeout,
401 text => <<"Timeout in verify outgoing s2s connection. Stopping">>,
402
:-(
myname => StateData#state.myname, server => StateData#state.server}),
403
:-(
{stop, normal, StateData};
404 wait_for_validation(timeout, StateData) ->
405
:-(
?LOG_INFO(#{what => s2s_out_connect_timeout,
406 text => <<"Connection timeout in outgoing s2s connection. Stopping">>,
407
:-(
myname => StateData#state.myname, server => StateData#state.server}),
408
:-(
{stop, normal, StateData};
409 wait_for_validation(closed, StateData) ->
410 15 ?LOG_INFO(#{what => s2s_out_validation_closed,
411 text => <<"Connection closed when waiting for validation in outgoing s2s connection. Stopping">>,
412 15 myname => StateData#state.myname, server => StateData#state.server}),
413 15 {stop, normal, StateData}.
414
415
416 -spec wait_for_features(ejabberd:xml_stream_item(), state()) -> fsm_return().
417 wait_for_features({xmlstreamelement, El}, StateData) ->
418 34 case El of
419 #xmlel{name = <<"stream:features">>, children = Els} ->
420 34 {SASLEXT, StartTLS, StartTLSRequired} =
421 lists:foldl(
422 fun(#xmlel{name = <<"mechanisms">>, attrs = Attrs1,
423 children = Els1} = _El1, Acc) ->
424 2 Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
425 2 get_acc_with_new_sext(Attr, Els1, Acc);
426 (#xmlel{name = <<"starttls">>, attrs = Attrs1} = El1, Acc) ->
427 17 Attr = xml:get_attr_s(<<"xmlns">>, Attrs1),
428 17 get_acc_with_new_tls(Attr, El1, Acc);
429 (_, Acc) ->
430
:-(
Acc
431 end, {false, false, false}, Els),
432 34 handle_parsed_features({SASLEXT, StartTLS, StartTLSRequired, StateData});
433 _ ->
434
:-(
send_text(StateData,
435 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
436 (?STREAM_TRAILER)/binary>>),
437
:-(
?CLOSE_GENERIC(wait_for_features, bad_format, El, StateData)
438 end;
439 wait_for_features({xmlstreamend, _Name}, StateData) ->
440
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamend, StateData);
441 wait_for_features({xmlstreamerror, _}, StateData) ->
442
:-(
send_text(StateData,
443 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
444
:-(
?CLOSE_GENERIC(wait_for_features, xmlstreamerror, StateData);
445 wait_for_features(timeout, StateData) ->
446
:-(
?CLOSE_GENERIC(wait_for_features, timeout, StateData);
447 wait_for_features(closed, StateData) ->
448
:-(
?CLOSE_GENERIC(wait_for_features, closed, StateData).
449
450
451 -spec wait_for_auth_result(ejabberd:xml_stream_item(), state()) -> fsm_return().
452 wait_for_auth_result({xmlstreamelement, El}, StateData) ->
453 1 case El of
454 #xmlel{name = <<"success">>, attrs = Attrs} ->
455 1 case xml:get_attr_s(<<"xmlns">>, Attrs) of
456 ?NS_SASL ->
457 1 ?LOG_DEBUG(#{what => s2s_auth_success,
458 myname => StateData#state.myname,
459 1 server => StateData#state.server}),
460 1 send_text(StateData,
461 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
462 <<" version='1.0'">>)),
463 1 {next_state, wait_for_stream,
464 StateData#state{streamid = new_id(),
465 authenticated = true
466 }, ?FSMTIMEOUT};
467 _ ->
468
:-(
send_text(StateData,
469 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
470 (?STREAM_TRAILER)/binary>>),
471
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
472 end;
473 #xmlel{name = <<"failure">>, attrs = Attrs} ->
474
:-(
case xml:get_attr_s(<<"xmlns">>, Attrs) of
475 ?NS_SASL ->
476
:-(
?LOG_INFO(#{what => s2s_auth_failure,
477 text => <<"Received failure result in ejabberd_s2s_out. Restarting">>,
478 myname => StateData#state.myname,
479
:-(
server => StateData#state.server}),
480
:-(
ejabberd_socket:close(StateData#state.socket),
481
:-(
{next_state, reopen_socket,
482 StateData#state{socket = undefined}, ?FSMTIMEOUT};
483 _ ->
484
:-(
send_text(StateData,
485 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
486 (?STREAM_TRAILER)/binary>>),
487
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
488 end;
489 _ ->
490
:-(
send_text(StateData,
491 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
492 (?STREAM_TRAILER)/binary>>),
493
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
494 end;
495 wait_for_auth_result({xmlstreamend, _Name}, StateData) ->
496
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamend, StateData);
497 wait_for_auth_result({xmlstreamerror, _}, StateData) ->
498
:-(
send_text(StateData,
499 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
500
:-(
?CLOSE_GENERIC(wait_for_auth_result, xmlstreamerror, StateData);
501 wait_for_auth_result(timeout, StateData) ->
502
:-(
?CLOSE_GENERIC(wait_for_auth_result, timeout, StateData);
503 wait_for_auth_result(closed, StateData) ->
504
:-(
?CLOSE_GENERIC(wait_for_auth_result, closed, StateData).
505
506
507 -spec wait_for_starttls_proceed(ejabberd:xml_stream_item(), state()) -> fsm_return().
508 wait_for_starttls_proceed({xmlstreamelement, El}, StateData) ->
509 17 case El of
510 #xmlel{name = <<"proceed">>, attrs = Attrs} ->
511 17 case xml:get_attr_s(<<"xmlns">>, Attrs) of
512 ?NS_TLS ->
513 17 ?LOG_DEBUG(#{what => s2s_starttls,
514 myname => StateData#state.myname,
515 17 server => StateData#state.server}),
516 17 Socket = StateData#state.socket,
517 17 TLSSocket = ejabberd_socket:starttls(Socket, StateData#state.tls_options),
518 17 NewStateData = StateData#state{socket = TLSSocket,
519 streamid = new_id(),
520 tls_enabled = true},
521 17 send_text(NewStateData,
522 ?STREAM_HEADER(StateData#state.myname, StateData#state.server,
523 <<" version='1.0'">>)),
524 17 {next_state, wait_for_stream, NewStateData, ?FSMTIMEOUT};
525 _ ->
526
:-(
send_text(StateData,
527 <<(mongoose_xmpp_errors:bad_format_bin())/binary,
528 (?STREAM_TRAILER)/binary>>),
529
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
530 end;
531 _ ->
532
:-(
?CLOSE_GENERIC(wait_for_auth_result, bad_format, El, StateData)
533 end;
534 wait_for_starttls_proceed({xmlstreamend, _Name}, StateData) ->
535
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamend, StateData);
536 wait_for_starttls_proceed({xmlstreamerror, _}, StateData) ->
537
:-(
send_text(StateData,
538 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
539
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, xmlstreamerror, StateData);
540 wait_for_starttls_proceed(timeout, StateData) ->
541
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, timeout, StateData);
542 wait_for_starttls_proceed(closed, StateData) ->
543
:-(
?CLOSE_GENERIC(wait_for_starttls_proceed, closed, StateData).
544
545
546 -spec reopen_socket(ejabberd:xml_stream_item(), state()) -> fsm_return().
547 reopen_socket({xmlstreamelement, _El}, StateData) ->
548
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
549 reopen_socket({xmlstreamend, _Name}, StateData) ->
550
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
551 reopen_socket({xmlstreamerror, _}, StateData) ->
552
:-(
{next_state, reopen_socket, StateData, ?FSMTIMEOUT};
553 reopen_socket(timeout, StateData) ->
554
:-(
?CLOSE_GENERIC(reopen_socket, timeout, StateData);
555 reopen_socket(closed, StateData) ->
556
:-(
p1_fsm:send_event(self(), init),
557
:-(
{next_state, open_socket, StateData, ?FSMTIMEOUT}.
558
559
560 %% @doc This state is use to avoid reconnecting to often to bad sockets
561 -spec wait_before_retry(ejabberd:xml_stream_item(), state()) -> fsm_return().
562 wait_before_retry(_Event, StateData) ->
563 10 {next_state, wait_before_retry, StateData, ?FSMTIMEOUT}.
564
565
566 -spec relay_to_bridge(ejabberd:xml_stream_item(), state()) -> fsm_return().
567 relay_to_bridge(stop, StateData) ->
568
:-(
wait_before_reconnect(StateData);
569 relay_to_bridge(closed, StateData) ->
570
:-(
?CLOSE_GENERIC(relay_to_bridge, closed, StateData);
571 relay_to_bridge(_Event, StateData) ->
572
:-(
{next_state, relay_to_bridge, StateData}.
573
574
575 -spec stream_established(ejabberd:xml_stream_item(), state()) -> fsm_return().
576 stream_established({xmlstreamelement, El}, StateData) ->
577
:-(
?LOG_DEBUG(#{what => s2s_out_stream_established, exml_packet => El,
578
:-(
myname => StateData#state.myname, server => StateData#state.server}),
579
:-(
case is_verify_res(El) of
580 {verify, VTo, VFrom, VId, VType} ->
581
:-(
?LOG_DEBUG(#{what => s2s_recv_verify,
582 to => VTo, from => VFrom, message_id => VId, type => VType,
583
:-(
myname => StateData#state.myname, server => StateData#state.server}),
584
:-(
case StateData#state.verify of
585 {VPid, _VKey, _SID} ->
586
:-(
send_event(VType, VPid, StateData);
587 _ ->
588
:-(
ok
589 end;
590 _ ->
591
:-(
ok
592 end,
593
:-(
{next_state, stream_established, StateData};
594 stream_established({xmlstreamend, _Name}, StateData) ->
595
:-(
?CLOSE_GENERIC(stream_established, xmlstreamend, StateData);
596 stream_established({xmlstreamerror, _}, StateData) ->
597
:-(
send_text(StateData,
598 <<(mongoose_xmpp_errors:xml_not_well_formed_bin())/binary, (?STREAM_TRAILER)/binary>>),
599
:-(
?CLOSE_GENERIC(stream_established, xmlstreamerror, StateData);
600 stream_established(timeout, StateData) ->
601
:-(
?CLOSE_GENERIC(stream_established, timeout, StateData);
602 stream_established(closed, StateData) ->
603 24 ?CLOSE_GENERIC(stream_established, closed, StateData).
604
605
606 %%----------------------------------------------------------------------
607 %% Func: StateName/3
608 %% Returns: {next_state, NextStateName, NextStateData} |
609 %% {next_state, NextStateName, NextStateData, Timeout} |
610 %% {reply, Reply, NextStateName, NextStateData} |
611 %% {reply, Reply, NextStateName, NextStateData, Timeout} |
612 %% {stop, Reason, NewStateData} |
613 %% {stop, Reason, Reply, NewStateData}
614 %%----------------------------------------------------------------------
615 %%state_name(Event, From, StateData) ->
616 %% Reply = ok,
617 %% {reply, Reply, state_name, StateData}.
618
619 %%----------------------------------------------------------------------
620 %% Func: handle_event/3
621 %% Returns: {next_state, NextStateName, NextStateData} |
622 %% {next_state, NextStateName, NextStateData, Timeout} |
623 %% {stop, Reason, NewStateData}
624 %%----------------------------------------------------------------------
625 handle_event(_Event, StateName, StateData) ->
626 65 {next_state, StateName, StateData, get_timeout_interval(StateName)}.
627
628 %%----------------------------------------------------------------------
629 %% Func: handle_sync_event/4
630 %% Returns: The associated StateData for this connection
631 %% {reply, Reply, NextStateName, NextStateData}
632 %% Reply = {state_infos, [{InfoName::atom(), InfoValue::any()]
633 %%----------------------------------------------------------------------
634 handle_sync_event(get_state_infos, _From, StateName, StateData) ->
635 2 {Addr, Port} = try ejabberd_socket:peername(StateData#state.socket) of
636 2 {ok, {A, P}} -> {A, P}
637 catch
638 _:_ ->
639
:-(
{unknown, unknown}
640 end,
641 2 Infos = [
642 {direction, out},
643 {statename, StateName},
644 {addr, Addr},
645 {port, Port},
646 {streamid, StateData#state.streamid},
647 {use_v10, StateData#state.use_v10},
648 {tls, StateData#state.tls},
649 {tls_required, StateData#state.tls_required},
650 {tls_enabled, StateData#state.tls_enabled},
651 {tls_options, StateData#state.tls_options},
652 {authenticated, StateData#state.authenticated},
653 {db_enabled, StateData#state.db_enabled},
654 {try_auth, StateData#state.try_auth},
655 {myname, StateData#state.myname},
656 {server, StateData#state.server},
657 {delay_to_retry, StateData#state.delay_to_retry},
658 {verify, StateData#state.verify}
659 ],
660 2 Reply = {state_infos, Infos},
661 2 {reply, Reply, StateName, StateData};
662
663 %%----------------------------------------------------------------------
664 %% Func: handle_sync_event/4
665 %% Returns: {next_state, NextStateName, NextStateData} |
666 %% {next_state, NextStateName, NextStateData, Timeout} |
667 %% {reply, Reply, NextStateName, NextStateData} |
668 %% {reply, Reply, NextStateName, NextStateData, Timeout} |
669 %% {stop, Reason, NewStateData} |
670 %% {stop, Reason, Reply, NewStateData}
671 %%----------------------------------------------------------------------
672 handle_sync_event(_Event, _From, StateName, StateData) ->
673
:-(
Reply = ok,
674
:-(
{reply, Reply, StateName, StateData, get_timeout_interval(StateName)}.
675
676
677 code_change(_OldVsn, StateName, StateData, _Extra) ->
678
:-(
{ok, StateName, StateData}.
679
680 %%----------------------------------------------------------------------
681 %% Func: handle_info/3
682 %% Returns: {next_state, NextStateName, NextStateData} |
683 %% {next_state, NextStateName, NextStateData, Timeout} |
684 %% {stop, Reason, NewStateData}
685 %%----------------------------------------------------------------------
686 handle_info({send_text, Text}, StateName, StateData) ->
687
:-(
?LOG_ERROR(#{what => s2s_send_text, text => <<"Deprecated ejabberd_s2s_out send_text">>,
688 myname => StateData#state.myname, server => StateData#state.server,
689
:-(
send_text => Text}),
690
:-(
send_text(StateData, Text),
691
:-(
cancel_timer(StateData#state.timer),
692
:-(
Timer = erlang:start_timer(ejabberd_s2s:timeout(), self(), []),
693
:-(
{next_state, StateName, StateData#state{timer = Timer},
694 get_timeout_interval(StateName)};
695 handle_info({send_element, Acc, El}, StateName, StateData) ->
696 82 case StateName of
697 stream_established ->
698 46 cancel_timer(StateData#state.timer),
699 46 Timer = erlang:start_timer(ejabberd_s2s:timeout(), self(), []),
700 46 send_element(StateData, El),
701 46 {next_state, StateName, StateData#state{timer = Timer}};
702 %% In this state we bounce all message: We are waiting before
703 %% trying to reconnect
704 wait_before_retry ->
705 8 bounce_element(Acc, El, mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s">>)),
706 8 {next_state, StateName, StateData};
707 relay_to_bridge ->
708 %% In this state we relay all outbound messages
709 %% to a foreign protocol bridge such as SMTP, SIP, etc.
710
:-(
{Mod, Fun} = StateData#state.bridge,
711
:-(
?LOG_DEBUG(#{what => s2s_relay_stanza,
712 text => <<"Relaying stanza to bridge">>,
713 bridge_module => Mod, bridge_fun => Fun, acc => Acc,
714
:-(
myname => StateData#state.myname, server => StateData#state.server}),
715
:-(
case catch Mod:Fun(El) of
716 {'EXIT', Reason} ->
717
:-(
?LOG_ERROR(#{what => s2s_relay_to_bridge_failed,
718 bridge_module => Mod, bridge_fun => Fun,
719 reason => Reason, acc => Acc,
720 myname => StateData#state.myname,
721
:-(
server => StateData#state.server}),
722
:-(
bounce_element(Acc, El, mongoose_xmpp_errors:internal_server_error()),
723
:-(
wait_before_reconnect(StateData);
724 _ ->
725
:-(
{next_state, StateName, StateData}
726 end;
727 _ ->
728 28 Q = queue:in({Acc, El}, StateData#state.queue),
729 28 {next_state, StateName, StateData#state{queue = Q},
730 get_timeout_interval(StateName)}
731 end;
732 handle_info({timeout, Timer, _}, wait_before_retry,
733 #state{timer = Timer} = StateData) ->
734 7 ?LOG_INFO(#{what => s2s_reconnect_delay_expired,
735 text => <<"Reconnect delay expired: Will now retry to connect when needed.">>,
736 myname => StateData#state.myname,
737 7 server => StateData#state.server}),
738 7 {stop, normal, StateData};
739 handle_info({timeout, Timer, _}, StateName,
740 #state{timer = Timer} = StateData) ->
741
:-(
?CLOSE_GENERIC(StateName, s2s_out_timeout, StateData);
742 handle_info(terminate_if_waiting_before_retry, wait_before_retry, StateData) ->
743
:-(
?CLOSE_GENERIC(wait_before_retry, terminate_if_waiting_before_retry, StateData);
744 handle_info(terminate_if_waiting_before_retry, StateName, StateData) ->
745 15 {next_state, StateName, StateData, get_timeout_interval(StateName)};
746 handle_info(_, StateName, StateData) ->
747
:-(
{next_state, StateName, StateData, get_timeout_interval(StateName)}.
748
749 %%----------------------------------------------------------------------
750 %% Func: terminate/3
751 %% Purpose: Shutdown the fsm
752 %% Returns: any
753 %%----------------------------------------------------------------------
754 terminate(Reason, StateName, StateData) ->
755 55 ?LOG_DEBUG(#{what => s2s_out_closed, text => <<"ejabberd_s2s_out terminated">>,
756 reason => Reason, state_name => StateName,
757 55 myname => StateData#state.myname, server => StateData#state.server}),
758 55 case StateData#state.new of
759 false ->
760 14 ok;
761 true ->
762 41 ejabberd_s2s:remove_connection(
763 {StateData#state.myname, StateData#state.server}, self())
764 end,
765 55 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"Bounced by s2s">>),
766 %% bounce queue manage by process and Erlang message queue
767 55 bounce_queue(StateData#state.queue, E),
768 55 bounce_messages(E),
769 55 case StateData#state.socket of
770 undefined ->
771 11 ok;
772 _Socket ->
773 44 ejabberd_socket:close(StateData#state.socket)
774 end,
775 55 ok.
776
777 %%----------------------------------------------------------------------
778 %% Func: print_state/1
779 %% Purpose: Prepare the state to be printed on error log
780 %% Returns: State to print
781 %%----------------------------------------------------------------------
782 print_state(State) ->
783
:-(
State.
784
785 %%%----------------------------------------------------------------------
786 %%% Internal functions
787 %%%----------------------------------------------------------------------
788
789 -spec send_text(state(), binary()) -> 'ok'.
790 send_text(StateData, Text) ->
791 207 ejabberd_socket:send(StateData#state.socket, Text).
792
793
794 -spec send_element(state(), exml:element()|mongoose_acc:t()) -> 'ok'.
795 send_element(StateData, #xmlel{} = El) ->
796 120 send_text(StateData, exml:to_binary(El)).
797
798 -spec send_element(state(), mongoose_acc:t(), exml:element()) -> mongoose_acc:t().
799 send_element(StateData, Acc, El) ->
800 25 send_text(StateData, exml:to_binary(El)),
801 25 Acc.
802
803
804 -spec send_queue(state(), Q :: element_queue()) -> 'ok'.
805 send_queue(StateData, Q) ->
806 49 case queue:out(Q) of
807 {{value, {Acc, El}}, Q1} ->
808 25 send_element(StateData, Acc, El),
809 25 send_queue(StateData, Q1);
810 {empty, _Q1} ->
811 24 ok
812 end.
813
814
815 %% @doc Bounce a single message (xmlel)
816 -spec bounce_element(Acc :: mongoose_acc:t(), El :: exml:element(), Error :: exml:element()) -> 'ok'.
817 bounce_element(Acc, El, Error) ->
818 24 case mongoose_acc:stanza_type(Acc) of
819
:-(
<<"error">> -> ok;
820
:-(
<<"result">> -> ok;
821 _ ->
822 24 From = mongoose_acc:from_jid(Acc),
823 24 To = mongoose_acc:to_jid(Acc),
824 24 {Acc1, Err} = jlib:make_error_reply(Acc, El, Error),
825 24 ejabberd_router:route(To, From, Acc1, Err)
826 end.
827
828
829 -spec bounce_queue(Q :: element_queue(), Error :: exml:element()) -> 'ok'.
830 bounce_queue(Q, Error) ->
831 66 case queue:out(Q) of
832 {{value, {Acc, El}}, Q1} ->
833 3 bounce_element(Acc, El, Error),
834 3 bounce_queue(Q1, Error);
835 {empty, _} ->
836 63 ok
837 end.
838
839
840 -spec new_id() -> binary().
841 new_id() ->
842 62 mongoose_bin:gen_from_crypto().
843
844
845 -spec cancel_timer(reference()) -> 'ok'.
846 cancel_timer(Timer) ->
847 54 erlang:cancel_timer(Timer),
848 54 receive
849 {timeout, Timer, _} ->
850
:-(
ok
851 after 0 ->
852 54 ok
853 end.
854
855
856 -spec bounce_messages(exml:element()) -> 'ok'.
857 bounce_messages(Error) ->
858 76 receive
859 {send_element, Acc, El} ->
860 13 bounce_element(Acc, El, Error),
861 13 bounce_messages(Error)
862 after 0 ->
863 63 ok
864 end.
865
866
867 -spec send_db_request(state()) -> fsm_return().
868 send_db_request(StateData) ->
869 43 Server = StateData#state.server,
870 43 New = case StateData#state.new of
871 false ->
872 28 ejabberd_s2s:try_register(
873 {StateData#state.myname, Server});
874 true ->
875 15 true
876 end,
877 43 NewStateData = StateData#state{new = New},
878 43 try
879 43 case New of
880 false ->
881 14 ok;
882 true ->
883 29 Key1 = ejabberd_s2s:key(
884 StateData#state.host_type,
885 {StateData#state.myname, Server},
886 StateData#state.remote_streamid),
887 29 send_element(StateData,
888 #xmlel{name = <<"db:result">>,
889 attrs = [{<<"from">>, StateData#state.myname},
890 {<<"to">>, Server}],
891 children = [#xmlcdata{content = Key1}]})
892 end,
893 42 case StateData#state.verify of
894 false ->
895 15 ok;
896 {_Pid, Key2, SID} ->
897 27 send_element(StateData,
898 #xmlel{name = <<"db:verify">>,
899 attrs = [{<<"from">>, StateData#state.myname},
900 {<<"to">>, StateData#state.server},
901 {<<"id">>, SID}],
902 children = [#xmlcdata{content = Key2}]})
903 end,
904 42 {next_state, wait_for_validation, NewStateData, ?FSMTIMEOUT*6}
905 catch
906 Class:Reason:Stacktrace ->
907 1 ?LOG_ERROR(#{what => s2s_out_send_db_request_failed,
908 class => Class, reason => Reason, stacktrace => Stacktrace,
909
:-(
myname => StateData#state.myname, server => StateData#state.server}),
910 1 {stop, normal, NewStateData}
911 end.
912
913
914 -spec is_verify_res(exml:element()) -> 'false' | {'result', _, _, _, _} | {'verify', _, _, _, _}.
915 is_verify_res(#xmlel{name = Name,
916 attrs = Attrs}) when Name == <<"db:result">> ->
917 27 {result,
918 xml:get_attr_s(<<"to">>, Attrs),
919 xml:get_attr_s(<<"from">>, Attrs),
920 xml:get_attr_s(<<"id">>, Attrs),
921 xml:get_attr_s(<<"type">>, Attrs)};
922 is_verify_res(#xmlel{name = Name,
923 attrs = Attrs}) when Name == <<"db:verify">> ->
924 27 {verify,
925 xml:get_attr_s(<<"to">>, Attrs),
926 xml:get_attr_s(<<"from">>, Attrs),
927 xml:get_attr_s(<<"id">>, Attrs),
928 xml:get_attr_s(<<"type">>, Attrs)};
929 is_verify_res(_) ->
930
:-(
false.
931
932
933 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
934 %% SRV support
935
936 -include_lib("kernel/include/inet.hrl").
937
938 -spec lookup_services(mongooseim:host_type(), jid:lserver()) -> [addr()].
939 lookup_services(HostType, Server) ->
940 8 case ejabberd_s2s:domain_utf8_to_ascii(Server) of
941 1 false -> [];
942 7 ASCIIAddr -> do_lookup_services(HostType, ASCIIAddr)
943 end.
944
945 -spec do_lookup_services(mongooseim:host_type(),jid:lserver()) -> [addr()].
946 do_lookup_services(HostType, Server) ->
947 7 Res = srv_lookup(HostType, Server),
948 7 case Res of
949 {error, Reason} ->
950 7 ?LOG_DEBUG(#{what => s2s_srv_lookup_failed,
951 7 reason => Reason, server => Server}),
952 7 [];
953 {ok, #hostent{h_addr_list = AddrList, h_addrtype = Type}} ->
954 %% Probabilities are not exactly proportional to weights
955 %% for simplicity (higher weights are overvalued)
956
:-(
case (catch lists:map(fun calc_addr_index/1, AddrList)) of
957 {'EXIT', _Reason} ->
958
:-(
[];
959 IndexedAddrs ->
960
:-(
Addrs = [#{ip_tuple => Addr, port => Port, type => Type}
961
:-(
|| {_Index, Addr, Port} <- lists:keysort(1, IndexedAddrs)],
962
:-(
?LOG_DEBUG(#{what => s2s_srv_lookup_success,
963
:-(
addresses => Addrs, server => Server}),
964
:-(
Addrs
965 end
966 end.
967
968
969 -spec srv_lookup(mongooseim:host_type(), jid:lserver()) ->
970 {'error', atom()} | {'ok', inet:hostent()}.
971 srv_lookup(HostType, Server) ->
972 7 #{timeout := TimeoutSec, retries := Retries} = mongoose_config:get_opt([{s2s, HostType}, dns]),
973 7 srv_lookup(Server, timer:seconds(TimeoutSec), Retries).
974
975
976 %% @doc XXX - this behaviour is suboptimal in the case that the domain
977 %% has a "_xmpp-server._tcp." but not a "_jabber._tcp." record and
978 %% we don't get a DNS reply for the "_xmpp-server._tcp." lookup. In this
979 %% case we'll give up when we get the "_jabber._tcp." nxdomain reply.
980 -spec srv_lookup(jid:server(),
981 Timeout :: non_neg_integer(),
982 Retries :: pos_integer()
983 ) -> {'error', atom()} | {'ok', inet:hostent()}.
984 srv_lookup(_Server, _Timeout, Retries) when Retries < 1 ->
985
:-(
{error, timeout};
986 srv_lookup(Server, Timeout, Retries) ->
987 7 case inet_res:getbyname("_xmpp-server._tcp." ++ binary_to_list(Server), srv, Timeout) of
988 {error, _Reason} ->
989 7 case inet_res:getbyname("_jabber._tcp." ++ binary_to_list(Server), srv, Timeout) of
990 {error, timeout} ->
991
:-(
?LOG_ERROR(#{what => s2s_dns_lookup_failed,
992 text => <<"The DNS servers timed out on request for IN SRV."
993 " You should check your DNS configuration.">>,
994 nameserver => inet_db:res_option(nameserver),
995
:-(
server => Server}),
996
:-(
srv_lookup(Server, Timeout, Retries - 1);
997 7 R -> R
998 end;
999
:-(
{ok, _HEnt} = R -> R
1000 end.
1001
1002 -spec lookup_addrs(mongooseim:host_type(), jid:server()) -> [addr()].
1003 lookup_addrs(HostType, Server) ->
1004 8 Port = outgoing_s2s_port(HostType),
1005 8 lists:foldl(fun(Type, []) ->
1006 16 [#{ip_tuple => Addr, port => Port, type => Type}
1007 16 || Addr <- lookup_addrs_for_type(Server, Type)];
1008 (_Type, Addrs) ->
1009
:-(
Addrs
1010 end, [], outgoing_s2s_types(HostType)).
1011
1012 -spec lookup_addrs_for_type(jid:lserver(), inet | inet6) -> [inet:ip_address()].
1013 lookup_addrs_for_type(Server, Type) ->
1014 16 case inet:gethostbyname(binary_to_list(Server), Type) of
1015 {ok, #hostent{h_addr_list = Addrs}} ->
1016
:-(
?LOG_DEBUG(#{what => s2s_srv_resolve_success,
1017
:-(
type => Type, server => Server, addresses => Addrs}),
1018
:-(
Addrs;
1019 {error, Reason} ->
1020 16 ?LOG_DEBUG(#{what => s2s_srv_resolve_failed,
1021 16 type => Type, server => Server, reason => Reason}),
1022 16 []
1023 end.
1024
1025
1026 -spec outgoing_s2s_port(mongooseim:host_type()) -> inet:port_number().
1027 outgoing_s2s_port(HostType) ->
1028 52 mongoose_config:get_opt([{s2s, HostType}, outgoing, port]).
1029
1030
1031 -spec outgoing_s2s_types(mongooseim:host_type()) -> [inet | inet6, ...].
1032 outgoing_s2s_types(HostType) ->
1033 %% DISCUSSION: Why prefer IPv4 first?
1034 %%
1035 %% IPv4 connectivity will be available for everyone for
1036 %% many years to come. So, there's absolutely no benefit
1037 %% in preferring IPv6 connections which are flaky at best
1038 %% nowadays.
1039 %%
1040 %% On the other hand content providers hesitate putting up
1041 %% AAAA records for their sites due to the mentioned
1042 %% quality of current IPv6 connectivity. Making IPv6 the a
1043 %% `fallback' may avoid these problems elegantly.
1044 8 [ip_version_to_type(V) || V <- mongoose_config:get_opt([{s2s, HostType}, outgoing, ip_versions])].
1045
1046 8 ip_version_to_type(4) -> inet;
1047 8 ip_version_to_type(6) -> inet6.
1048
1049 -spec outgoing_s2s_timeout(mongooseim:host_type()) -> non_neg_integer() | infinity.
1050 outgoing_s2s_timeout(HostType) ->
1051 44 mongoose_config:get_opt([{s2s, HostType}, outgoing, connection_timeout], 10000).
1052
1053 %% @doc Human readable S2S logging: Log only new outgoing connections as INFO
1054 %% Do not log dialback
1055 28 log_s2s_out(false, _, _, _) -> ok;
1056 %% Log new outgoing connections:
1057 log_s2s_out(_, Myname, Server, Tls) ->
1058 24 ?LOG_INFO(#{what => s2s_out,
1059 text => <<"Trying to open s2s connection">>,
1060 24 myname => Myname, server => Server, tls => Tls}).
1061
1062 %% @doc Calculate timeout depending on which state we are in:
1063 %% Can return integer > 0 | infinity
1064 -spec get_timeout_interval(statename()) -> 'infinity' | non_neg_integer().
1065 get_timeout_interval(StateName) ->
1066 135 case StateName of
1067 %% Validation implies dialback: Networking can take longer:
1068 wait_for_validation ->
1069 52 ?FSMTIMEOUT*6;
1070 %% When stream is established, we only rely on S2S Timeout timer:
1071 stream_established ->
1072 2 infinity;
1073 _ ->
1074 81 ?FSMTIMEOUT
1075 end.
1076
1077
1078 %% @doc This function is intended to be called at the end of a state
1079 %% function that want to wait for a reconnect delay before stopping.
1080 -spec wait_before_reconnect(state()) -> fsm_return().
1081 wait_before_reconnect(StateData) ->
1082 8 E = mongoose_xmpp_errors:remote_server_not_found(<<"en">>, <<"From s2s (waiting)">>),
1083 %% bounce queue manage by process and Erlang message queue
1084 8 bounce_queue(StateData#state.queue, E),
1085 8 bounce_messages(E),
1086 8 cancel_timer(StateData#state.timer),
1087 8 Delay = case StateData#state.delay_to_retry of
1088 undefined_delay ->
1089 %% The initial delay is random between 1 and 15 seconds
1090 %% Return a random integer between 1000 and 15000
1091 8 MicroSecs = erlang:system_time(microsecond),
1092 8 (MicroSecs rem 14000) + 1000;
1093 D1 ->
1094 %% Duplicate the delay with each successive failed
1095 %% reconnection attempt, but don't exceed the max
1096
:-(
lists:min([D1 * 2, get_max_retry_delay(StateData#state.host_type)])
1097 end,
1098 8 Timer = erlang:start_timer(Delay, self(), []),
1099 8 {next_state, wait_before_retry, StateData#state{timer=Timer,
1100 delay_to_retry = Delay,
1101 queue = queue:new()}}.
1102
1103
1104 %% @doc Get the maximum allowed delay for retry to reconnect (in milliseconds).
1105 %% The default value is 5 minutes.
1106 %% The option {s2s_max_retry_delay, Seconds} can be used (in seconds).
1107 get_max_retry_delay(HostType) ->
1108
:-(
mongoose_config:get_opt([{s2s, HostType}, max_retry_delay]) * 1000.
1109
1110
1111 %% @doc Terminate s2s_out connections that are in state wait_before_retry
1112 terminate_if_waiting_delay(From, To) ->
1113 28 FromTo = {From, To},
1114 28 Pids = ejabberd_s2s:get_connections_pids(FromTo),
1115 28 lists:foreach(
1116 fun(Pid) ->
1117 15 Pid ! terminate_if_waiting_before_retry
1118 end,
1119 Pids).
1120
1121
1122 -spec fsm_limit_opts() -> [{'max_queue', integer()}].
1123 fsm_limit_opts() ->
1124 56 case mongoose_config:lookup_opt(max_fsm_queue) of
1125 {ok, N} ->
1126 56 [{max_queue, N}];
1127 {error, not_found} ->
1128
:-(
[]
1129 end.
1130
1131 -spec get_addr_list(mongooseim:host_type(), jid:lserver()) -> [addr()].
1132 get_addr_list(HostType, Server) ->
1133 52 lists:foldl(fun(F, []) -> F(HostType, Server);
1134 88 (_, Result) -> Result
1135 end, [], [fun get_predefined_addresses/2,
1136 fun lookup_services/2,
1137 fun lookup_addrs/2]).
1138
1139 %% @doc Get IPs predefined for a given s2s domain in the configuration
1140 -spec get_predefined_addresses(mongooseim:host_type(), jid:lserver()) -> [addr()].
1141 get_predefined_addresses(HostType, Server) ->
1142 52 case mongoose_config:lookup_opt([{s2s, HostType}, address, Server]) of
1143 {ok, #{ip_address := IPAddress} = M} ->
1144 44 {ok, IPTuple} = inet:parse_address(IPAddress),
1145 44 Port = get_predefined_port(HostType, M),
1146 44 [#{ip_tuple => IPTuple, port => Port, type => addr_type(IPTuple)}];
1147 {error, not_found} ->
1148 8 []
1149 end.
1150
1151
:-(
get_predefined_port(_HostType, #{port := Port}) -> Port;
1152 44 get_predefined_port(HostType, _Addr) -> outgoing_s2s_port(HostType).
1153
1154 44 addr_type(Addr) when tuple_size(Addr) =:= 4 -> inet;
1155
:-(
addr_type(Addr) when tuple_size(Addr) =:= 8 -> inet6.
1156
1157 send_event(<<"valid">>, Pid, StateData) ->
1158 27 p1_fsm:send_event(
1159 Pid, {valid,
1160 StateData#state.server,
1161 StateData#state.myname});
1162 send_event(_, Pid, StateData) ->
1163
:-(
p1_fsm:send_event(
1164 Pid, {invalid,
1165 StateData#state.server,
1166 StateData#state.myname}).
1167
1168 get_acc_with_new_sext(?NS_SASL, Els1, {_SEXT, STLS, STLSReq}) ->
1169 2 NewSEXT =
1170 lists:any(
1171 fun(#xmlel{name = <<"mechanism">>,
1172 children = Els2}) ->
1173 2 case xml:get_cdata(Els2) of
1174 2 <<"EXTERNAL">> -> true;
1175
:-(
_ -> false
1176 end;
1177
:-(
(_) -> false
1178 end, Els1),
1179 2 {NewSEXT, STLS, STLSReq};
1180 get_acc_with_new_sext(_, _, Acc) ->
1181
:-(
Acc.
1182
1183 get_acc_with_new_tls(?NS_TLS, El1, {SEXT, _STLS, _STLSReq}) ->
1184 17 Req = case xml:get_subtag(El1, <<"required">>) of
1185 9 #xmlel{} -> true;
1186 8 false -> false
1187 end,
1188 17 {SEXT, true, Req};
1189 get_acc_with_new_tls(_, _, Acc) ->
1190
:-(
Acc.
1191
1192 tls_options(HostType) ->
1193 56 CipherOpt = {ciphers, mongoose_config:get_opt([{s2s, HostType}, ciphers])},
1194 56 CertFileOpts = case ejabberd_s2s:lookup_certfile(HostType) of
1195 41 {ok, CertFile} -> [{certfile, CertFile}];
1196 15 {error, not_found} -> []
1197 end,
1198 56 [connect, CipherOpt | CertFileOpts].
1199
1200 calc_addr_index({Priority, Weight, Port, Host}) ->
1201
:-(
N = case Weight of
1202
:-(
0 -> 0;
1203
:-(
_ -> (Weight + 1) * rand:uniform()
1204 end,
1205
:-(
{Priority * 65536 - N, Host, Port}.
1206
1207 handle_parsed_features({false, false, _, StateData = #state{authenticated = true}}) ->
1208 1 send_queue(StateData, StateData#state.queue),
1209 1 ?LOG_INFO(#{what => s2s_out_connected,
1210 text => <<"New outgoing s2s connection established">>,
1211 1 myname => StateData#state.myname, server => StateData#state.server}),
1212 1 mongoose_hooks:s2s_connect_hook(StateData#state.myname,
1213 StateData#state.server),
1214 1 {next_state, stream_established,
1215 StateData#state{queue = queue:new()}};
1216 handle_parsed_features({true, _, _, StateData = #state{try_auth = true, new = New}}) when
1217 New /= false ->
1218 1 send_element(StateData,
1219 #xmlel{name = <<"auth">>,
1220 attrs = [{<<"xmlns">>, ?NS_SASL},
1221 {<<"mechanism">>, <<"EXTERNAL">>}],
1222 children =
1223 [#xmlcdata{content = jlib:encode_base64(
1224 StateData#state.myname)}]}),
1225 1 {next_state, wait_for_auth_result,
1226 StateData#state{try_auth = false}, ?FSMTIMEOUT};
1227 handle_parsed_features({_, true, _, StateData = #state{tls = true, tls_enabled = false}}) ->
1228 17 send_element(StateData,
1229 #xmlel{name = <<"starttls">>,
1230 attrs = [{<<"xmlns">>, ?NS_TLS}]}),
1231 17 {next_state, wait_for_starttls_proceed, StateData,
1232 ?FSMTIMEOUT};
1233 handle_parsed_features({_, _, true, StateData = #state{tls = false}}) ->
1234
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1235
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1236
:-(
ejabberd_socket:close(StateData#state.socket),
1237
:-(
{next_state, reopen_socket,
1238 StateData#state{socket = undefined,
1239 use_v10 = false}, ?FSMTIMEOUT};
1240 handle_parsed_features({_, _, _, StateData = #state{db_enabled = true}}) ->
1241 15 send_db_request(StateData);
1242 handle_parsed_features({_, _, _, StateData}) ->
1243
:-(
?LOG_DEBUG(#{what => s2s_out_restarted,
1244
:-(
myname => StateData#state.myname, server => StateData#state.server}),
1245 % TODO: clear message queue
1246
:-(
ejabberd_socket:close(StateData#state.socket),
1247
:-(
{next_state, reopen_socket, StateData#state{socket = undefined,
1248 use_v10 = false}, ?FSMTIMEOUT}.
Line Hits Source