1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_connection). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_server). |
21 |
|
|
22 |
|
-include("mongoose.hrl"). |
23 |
|
-include("jlib.hrl"). |
24 |
|
-include("global_distrib_metrics.hrl"). |
25 |
|
|
26 |
|
-record(state, { |
27 |
|
socket :: mod_global_distrib_transport:t(), |
28 |
|
host :: atom(), |
29 |
|
peer :: tuple() | unknown, |
30 |
|
conn_id :: binary() |
31 |
|
}). |
32 |
|
|
33 |
|
-export([start_link/2]). |
34 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). |
35 |
|
|
36 |
|
-ignore_xref([start_link/2]). |
37 |
|
|
38 |
|
%%-------------------------------------------------------------------- |
39 |
|
%% API |
40 |
|
%%-------------------------------------------------------------------- |
41 |
|
|
42 |
|
-spec start_link(Endpoint :: mod_global_distrib_utils:endpoint(), |
43 |
|
Server :: jid:lserver()) -> {ok, pid()} | {error, any()}. |
44 |
|
start_link(Endpoint, Server) -> |
45 |
570 |
gen_server:start_link(?MODULE, [Endpoint, Server], []). |
46 |
|
|
47 |
|
init([{Addr, Port}, Server]) -> |
48 |
570 |
ConnID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), |
49 |
570 |
?LOG_DEBUG(#{what => gd_new_outgoing_connection, |
50 |
|
server => Server, address => Addr, port => Port, |
51 |
:-( |
pid => self(), conn_id => ConnID}), |
52 |
570 |
process_flag(trap_exit, true), |
53 |
570 |
MetricServer = mod_global_distrib_utils:binary_to_metric_atom(Server), |
54 |
570 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_SENT(MetricServer), spiral), |
55 |
570 |
mod_global_distrib_utils:ensure_metric( |
56 |
|
?GLOBAL_DISTRIB_SEND_QUEUE_TIME(MetricServer), histogram), |
57 |
570 |
mod_global_distrib_utils:ensure_metric( |
58 |
|
?GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(MetricServer), spiral), |
59 |
570 |
mod_global_distrib_utils:ensure_metric( |
60 |
|
?GLOBAL_DISTRIB_OUTGOING_ERRORED(MetricServer), spiral), |
61 |
570 |
mod_global_distrib_utils:ensure_metric( |
62 |
|
?GLOBAL_DISTRIB_OUTGOING_CLOSED(MetricServer), spiral), |
63 |
570 |
try |
64 |
570 |
{ok, RawSocket} = gen_tcp:connect(Addr, Port, [binary, {active, false}]), |
65 |
552 |
{ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, opt(connections), |
66 |
|
#{connect => true}), |
67 |
552 |
GdStart = gd_start(Server, ConnID), |
68 |
552 |
ok = mod_global_distrib_transport:send(Socket, <<(byte_size(GdStart)):32, GdStart/binary>>), |
69 |
552 |
mod_global_distrib_transport:setopts(Socket, [{active, once}]), |
70 |
552 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(MetricServer), 1), |
71 |
552 |
{ok, #state{socket = Socket, host = MetricServer, conn_id = ConnID, |
72 |
|
peer = mod_global_distrib_transport:peername(Socket)}} |
73 |
|
catch |
74 |
|
error:{badmatch, Reason}:StackTrace -> |
75 |
18 |
?LOG_ERROR(#{what => gd_connection_failed, |
76 |
|
server => Server, address => Addr, port => Port, |
77 |
:-( |
reason => Reason, conn_id => ConnID, stacktrace => StackTrace}), |
78 |
18 |
{stop, normal} |
79 |
|
end. |
80 |
|
|
81 |
|
handle_call(Msg, From, State) -> |
82 |
:-( |
gen_server:reply(From, ok), |
83 |
:-( |
handle_cast(Msg, State). |
84 |
|
|
85 |
|
handle_cast({data, Stamp, Data}, #state{socket = Socket, host = ToHost} = State) -> |
86 |
340 |
QueueTimeNative = erlang:monotonic_time() - Stamp, |
87 |
340 |
QueueTimeUS = erlang:convert_time_unit(QueueTimeNative, native, microsecond), |
88 |
340 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_SEND_QUEUE_TIME(ToHost), QueueTimeUS), |
89 |
340 |
ClockTime = erlang:system_time(microsecond), |
90 |
340 |
Annotated = <<(byte_size(Data) + 8):32, ClockTime:64, Data/binary>>, |
91 |
340 |
case mod_global_distrib_transport:send(Socket, Annotated) of |
92 |
|
ok -> |
93 |
340 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MESSAGES_SENT(ToHost), 1); |
94 |
|
Error -> |
95 |
:-( |
?LOG_ERROR(#{what => gd_cant_send_packet, |
96 |
:-( |
reason => Error, packet => Data, conn_id => State#state.conn_id}), |
97 |
:-( |
error(Error) |
98 |
|
end, |
99 |
340 |
{noreply, State}. |
100 |
|
|
101 |
|
handle_info({tcp, _Socket, RawData}, #state{socket = Socket} = State) -> |
102 |
1642 |
ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]), |
103 |
|
%% Feeding data to drive the TLS state machine (in case of TLS connection) |
104 |
1642 |
{ok, _} = mod_global_distrib_transport:recv_data(Socket, RawData), |
105 |
1641 |
{noreply, State}; |
106 |
|
handle_info({tcp_closed, _}, State) -> |
107 |
14 |
{stop, normal, State}; |
108 |
|
handle_info({tcp_error, _Socket, Reason}, State) -> |
109 |
:-( |
?LOG_ERROR(#{what => gd_outgoing_socket_error, |
110 |
:-( |
reason => Reason, peer => State#state.peer, conn_id => State#state.conn_id}), |
111 |
:-( |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_ERRORED(State#state.host), 1), |
112 |
:-( |
{stop, {error, Reason}, State}; |
113 |
|
handle_info(_, State) -> |
114 |
:-( |
{noreply, State}. |
115 |
|
|
116 |
|
code_change(_OldVsn, State, _Extra) -> |
117 |
:-( |
{ok, State}. |
118 |
|
|
119 |
|
terminate(Reason, State) -> |
120 |
552 |
?LOG_ERROR(#{what => gd_outgoing_socket_error, |
121 |
:-( |
reason => Reason, peer => State#state.peer, conn_id => State#state.conn_id}), |
122 |
552 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_CLOSED(State#state.host), 1), |
123 |
552 |
catch mod_global_distrib_transport:close(State#state.socket), |
124 |
552 |
ignore. |
125 |
|
|
126 |
|
%%-------------------------------------------------------------------- |
127 |
|
%% Helpers |
128 |
|
%%-------------------------------------------------------------------- |
129 |
|
|
130 |
|
-spec opt(Key :: atom()) -> term(). |
131 |
|
opt(Key) -> |
132 |
552 |
mod_global_distrib_utils:opt(mod_global_distrib, Key). |
133 |
|
|
134 |
|
gd_start(Server, ConnID) -> |
135 |
552 |
Attrs = [{<<"server">>, Server}, {<<"conn_id">>, ConnID}], |
136 |
552 |
exml:to_binary(#xmlel{name = <<"gd_start">>, attrs = Attrs}). |