./ct_report/coverage/mod_global_distrib_connection.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_connection).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_server).
21
22 -include("mongoose.hrl").
23 -include("jlib.hrl").
24 -include("global_distrib_metrics.hrl").
25
26 -record(state, {
27 socket :: mod_global_distrib_transport:t(),
28 host :: atom(),
29 peer :: tuple() | unknown,
30 conn_id :: binary()
31 }).
32
33 -export([start_link/2]).
34 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
35
36 -ignore_xref([start_link/2]).
37
38 %%--------------------------------------------------------------------
39 %% API
40 %%--------------------------------------------------------------------
41
42 -spec start_link(Endpoint :: mod_global_distrib_utils:endpoint(),
43 Server :: jid:lserver()) -> {ok, pid()} | {error, any()}.
44 start_link(Endpoint, Server) ->
45 1575 gen_server:start_link(?MODULE, [Endpoint, Server], []).
46
47 init([{Addr, Port}, Server]) ->
48 1575 ConnID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
49 1575 ?LOG_DEBUG(#{what => gd_new_outgoing_connection,
50 server => Server, address => Addr, port => Port,
51
:-(
pid => self(), conn_id => ConnID}),
52 1575 process_flag(trap_exit, true),
53 1575 MetricServer = mod_global_distrib_utils:binary_to_metric_atom(Server),
54 1575 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_SENT(MetricServer), spiral),
55 1575 mod_global_distrib_utils:ensure_metric(
56 ?GLOBAL_DISTRIB_SEND_QUEUE_TIME(MetricServer), histogram),
57 1575 mod_global_distrib_utils:ensure_metric(
58 ?GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(MetricServer), spiral),
59 1575 mod_global_distrib_utils:ensure_metric(
60 ?GLOBAL_DISTRIB_OUTGOING_ERRORED(MetricServer), spiral),
61 1575 mod_global_distrib_utils:ensure_metric(
62 ?GLOBAL_DISTRIB_OUTGOING_CLOSED(MetricServer), spiral),
63 1575 try
64 1575 {ok, RawSocket} = gen_tcp:connect(Addr, Port, [binary, {active, false}]),
65 1560 {ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, opt(connections), [connect]),
66 1560 GdStart = gd_start(Server, ConnID),
67 1560 ok = mod_global_distrib_transport:send(Socket, <<(byte_size(GdStart)):32, GdStart/binary>>),
68 1560 mod_global_distrib_transport:setopts(Socket, [{active, once}]),
69 1560 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(MetricServer), 1),
70 1560 {ok, #state{socket = Socket, host = MetricServer, conn_id = ConnID,
71 peer = mod_global_distrib_transport:peername(Socket)}}
72 catch
73 error:{badmatch, Reason}:StackTrace ->
74 15 ?LOG_ERROR(#{what => gd_connection_failed,
75 server => Server, address => Addr, port => Port,
76
:-(
reason => Reason, conn_id => ConnID, stacktrace => StackTrace}),
77 15 {stop, normal}
78 end.
79
80 handle_call(Msg, From, State) ->
81
:-(
gen_server:reply(From, ok),
82
:-(
handle_cast(Msg, State).
83
84 handle_cast({data, Stamp, Data}, #state{socket = Socket, host = ToHost} = State) ->
85 340 QueueTimeNative = erlang:monotonic_time() - Stamp,
86 340 QueueTimeUS = erlang:convert_time_unit(QueueTimeNative, native, microsecond),
87 340 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_SEND_QUEUE_TIME(ToHost), QueueTimeUS),
88 340 ClockTime = erlang:system_time(microsecond),
89 340 Annotated = <<(byte_size(Data) + 8):32, ClockTime:64, Data/binary>>,
90 340 case mod_global_distrib_transport:send(Socket, Annotated) of
91 ok ->
92 340 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MESSAGES_SENT(ToHost), 1);
93 Error ->
94
:-(
?LOG_ERROR(#{what => gd_cant_send_packet,
95
:-(
reason => Error, packet => Data, conn_id => State#state.conn_id}),
96
:-(
error(Error)
97 end,
98 340 {noreply, State}.
99
100 handle_info({tcp, _Socket, RawData}, #state{socket = Socket} = State) ->
101 4674 ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
102 %% Feeding data to drive the TLS state machine (in case of TLS connection)
103 4674 {ok, _} = mod_global_distrib_transport:recv_data(Socket, RawData),
104 4673 {noreply, State};
105 handle_info({tcp_closed, _}, State) ->
106 14 {stop, normal, State};
107 handle_info({tcp_error, _Socket, Reason}, State) ->
108
:-(
?LOG_ERROR(#{what => gd_outgoing_socket_error,
109
:-(
reason => Reason, peer => State#state.peer, conn_id => State#state.conn_id}),
110
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_ERRORED(State#state.host), 1),
111
:-(
{stop, {error, Reason}, State};
112 handle_info(_, State) ->
113
:-(
{noreply, State}.
114
115 code_change(_OldVsn, State, _Extra) ->
116
:-(
{ok, State}.
117
118 terminate(Reason, State) ->
119 1560 ?LOG_ERROR(#{what => gd_outgoing_socket_error,
120
:-(
reason => Reason, peer => State#state.peer, conn_id => State#state.conn_id}),
121 1560 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_CLOSED(State#state.host), 1),
122 1560 catch mod_global_distrib_transport:close(State#state.socket),
123 1560 ignore.
124
125 %%--------------------------------------------------------------------
126 %% Helpers
127 %%--------------------------------------------------------------------
128
129 -spec opt(Key :: atom()) -> term().
130 opt(Key) ->
131 1560 mod_global_distrib_utils:opt(mod_global_distrib, Key).
132
133 gd_start(Server, ConnID) ->
134 1560 Attrs = [{<<"server">>, Server}, {<<"conn_id">>, ConnID}],
135 1560 exml:to_binary(#xmlel{name = <<"gd_start">>, attrs = Attrs}).
Line Hits Source