./ct_report/coverage/mod_global_distrib_connection.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_connection).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_server).
21
22 -include("mongoose.hrl").
23 -include("jlib.hrl").
24 -include("global_distrib_metrics.hrl").
25
26 -record(state, {
27 socket :: mod_global_distrib_transport:t(),
28 host :: atom(),
29 peer :: tuple() | unknown,
30 conn_id :: binary()
31 }).
32
33 -export([start_link/2]).
34 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
35
36 -ignore_xref([start_link/2]).
37
38 %%--------------------------------------------------------------------
39 %% API
40 %%--------------------------------------------------------------------
41
42 -spec start_link(Endpoint :: mod_global_distrib_utils:endpoint(),
43 Server :: jid:lserver()) -> {ok, pid()} | {error, any()}.
44 start_link(Endpoint, Server) ->
45 624 gen_server:start_link(?MODULE, [Endpoint, Server], []).
46
47 init([{Addr, Port}, Server]) ->
48 624 ConnID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
49 624 ?LOG_DEBUG(#{what => gd_new_outgoing_connection,
50 server => Server, address => Addr, port => Port,
51
:-(
pid => self(), conn_id => ConnID}),
52 624 process_flag(trap_exit, true),
53 624 MetricServer = mod_global_distrib_utils:binary_to_metric_atom(Server),
54 624 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_SENT(MetricServer), spiral),
55 624 mod_global_distrib_utils:ensure_metric(
56 ?GLOBAL_DISTRIB_SEND_QUEUE_TIME(MetricServer), histogram),
57 624 mod_global_distrib_utils:ensure_metric(
58 ?GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(MetricServer), spiral),
59 624 mod_global_distrib_utils:ensure_metric(
60 ?GLOBAL_DISTRIB_OUTGOING_ERRORED(MetricServer), spiral),
61 624 mod_global_distrib_utils:ensure_metric(
62 ?GLOBAL_DISTRIB_OUTGOING_CLOSED(MetricServer), spiral),
63 624 try
64 624 {ok, RawSocket} = gen_tcp:connect(Addr, Port, [binary, {active, false}]),
65 605 {ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, opt(connections),
66 #{connect => true}),
67 605 GdStart = gd_start(Server, ConnID),
68 605 ok = mod_global_distrib_transport:send(Socket, <<(byte_size(GdStart)):32, GdStart/binary>>),
69 605 mod_global_distrib_transport:setopts(Socket, [{active, once}]),
70 605 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_ESTABLISHED(MetricServer), 1),
71 605 {ok, #state{socket = Socket, host = MetricServer, conn_id = ConnID,
72 peer = mod_global_distrib_transport:peername(Socket)}}
73 catch
74 error:{badmatch, Reason}:StackTrace ->
75 19 ?LOG_ERROR(#{what => gd_connection_failed,
76 server => Server, address => Addr, port => Port,
77
:-(
reason => Reason, conn_id => ConnID, stacktrace => StackTrace}),
78 19 {stop, normal}
79 end.
80
81 handle_call(Msg, From, State) ->
82
:-(
gen_server:reply(From, ok),
83
:-(
handle_cast(Msg, State).
84
85 handle_cast({data, Stamp, Data}, #state{socket = Socket, host = ToHost} = State) ->
86 342 QueueTimeNative = erlang:monotonic_time() - Stamp,
87 342 QueueTimeUS = erlang:convert_time_unit(QueueTimeNative, native, microsecond),
88 342 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_SEND_QUEUE_TIME(ToHost), QueueTimeUS),
89 342 ClockTime = erlang:system_time(microsecond),
90 342 Annotated = <<(byte_size(Data) + 8):32, ClockTime:64, Data/binary>>,
91 342 case mod_global_distrib_transport:send(Socket, Annotated) of
92 ok ->
93 342 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MESSAGES_SENT(ToHost), 1);
94 Error ->
95
:-(
?LOG_ERROR(#{what => gd_cant_send_packet,
96
:-(
reason => Error, packet => Data, conn_id => State#state.conn_id}),
97
:-(
error(Error)
98 end,
99 342 {noreply, State}.
100
101 handle_info({tcp, _Socket, RawData}, #state{socket = Socket} = State) ->
102 1797 ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
103 %% Feeding data to drive the TLS state machine (in case of TLS connection)
104 1797 {ok, _} = mod_global_distrib_transport:recv_data(Socket, RawData),
105 1796 {noreply, State};
106 handle_info({tcp_closed, _}, State) ->
107 14 {stop, normal, State};
108 handle_info({tcp_error, _Socket, Reason}, State) ->
109
:-(
?LOG_ERROR(#{what => gd_outgoing_socket_error,
110
:-(
reason => Reason, peer => State#state.peer, conn_id => State#state.conn_id}),
111
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_ERRORED(State#state.host), 1),
112
:-(
{stop, {error, Reason}, State};
113 handle_info(_, State) ->
114
:-(
{noreply, State}.
115
116 code_change(_OldVsn, State, _Extra) ->
117
:-(
{ok, State}.
118
119 terminate(Reason, State) ->
120 605 ?LOG_ERROR(#{what => gd_outgoing_socket_error,
121
:-(
reason => Reason, peer => State#state.peer, conn_id => State#state.conn_id}),
122 605 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_OUTGOING_CLOSED(State#state.host), 1),
123 605 catch mod_global_distrib_transport:close(State#state.socket),
124 605 ignore.
125
126 %%--------------------------------------------------------------------
127 %% Helpers
128 %%--------------------------------------------------------------------
129
130 -spec opt(Key :: atom()) -> term().
131 opt(Key) ->
132 605 mod_global_distrib_utils:opt(mod_global_distrib, Key).
133
134 gd_start(Server, ConnID) ->
135 605 Attrs = [{<<"server">>, Server}, {<<"conn_id">>, ConnID}],
136 605 exml:to_binary(#xmlel{name = <<"gd_start">>, attrs = Attrs}).
Line Hits Source