./ct_report/coverage/mod_global_distrib_receiver.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_receiver).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_mod).
21 -behaviour(ranch_protocol).
22 -behaviour(gen_server).
23 -behaviour(mongoose_module_metrics).
24
25 -include("mongoose.hrl").
26 -include("jlib.hrl").
27 -include("global_distrib_metrics.hrl").
28
29 -export([endpoints/0, start_link/4]).
30 -export([start/2, stop/1]).
31 -export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
32
33 -define(LISTEN_RETRIES, 5). %% Number of retries in case of eaddrinuse
34 -define(LISTEN_RETRY_DELAY, 1000). %% Milliseconds to retrying in case of eaddrinuse
35
36 -record(state, {
37 socket :: mod_global_distrib_transport:t(),
38 waiting_for :: header | non_neg_integer(),
39 buffer = <<>> :: binary(),
40 host :: undefined | atom(),
41 conn_id = <<>> :: binary(),
42 peer :: tuple() | unknown
43 }).
44
45 -type state() :: #state{}.
46
47 %%--------------------------------------------------------------------
48 %% API
49 %%--------------------------------------------------------------------
50
51 -spec start_link(Ref :: reference(), Socket :: gen_tcp:socket(), Transport :: ranch_tcp,
52 Opts :: [term()]) -> {ok, pid()}.
53 start_link(Ref, Socket, ranch_tcp, Opts) ->
54 1575 Pid = proc_lib:spawn_link(?MODULE, init, [{Ref, Socket, Opts}]),
55 1575 {ok, Pid}.
56
57 %%--------------------------------------------------------------------
58 %% gen_mod API
59 %%--------------------------------------------------------------------
60
61 -spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
62 start(Host, Opts0) ->
63 28 Opts = prepare_opts(Opts0),
64 28 mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
65
66 -spec stop(Host :: jid:lserver()) -> any().
67 stop(Host) ->
68 28 mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
69
70 %%--------------------------------------------------------------------
71 %% ranch_protocol API
72 %%--------------------------------------------------------------------
73
74 init({Ref, RawSocket, _Opts}) ->
75 1575 process_flag(trap_exit, true),
76 1575 ok = ranch:accept_ack(Ref),
77 1575 {ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, opt(tls_opts)),
78 1575 ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
79 1575 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, 1),
80 1575 State = #state{socket = Socket, waiting_for = header,
81 peer = mod_global_distrib_transport:peername(Socket)},
82 1575 gen_server:enter_loop(?MODULE, [], State).
83
84 %%--------------------------------------------------------------------
85 %% gen_server API
86 %%--------------------------------------------------------------------
87
88 handle_info({tcp, _Socket, RawData}, #state{socket = Socket, buffer = Buffer} = State) ->
89 5364 do_setopts_and_receive_data(Socket, Buffer, RawData, State);
90 handle_info({tcp_closed, _Socket}, State) ->
91 1562 {stop, normal, State};
92 handle_info({tcp_error, _Socket, Reason}, State) ->
93
:-(
?LOG_ERROR(#{what => gd_incoming_socket_error, reason => Reason,
94 text => <<"mod_global_distrib_receiver received tcp_error">>,
95
:-(
peer => State#state.peer, conn_id => State#state.conn_id}),
96
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ERRORED(State#state.host), 1),
97
:-(
{stop, {error, Reason}, State};
98 handle_info(Msg, State) ->
99
:-(
?UNEXPECTED_INFO(Msg),
100
:-(
{noreply, State}.
101
102 handle_cast(_Message, _State) ->
103
:-(
exit(bad_cast).
104
105 handle_call(_Message, _From, _State) ->
106
:-(
exit(bad_call).
107
108 code_change(_Version, State, _Extra) ->
109
:-(
{ok, State}.
110
111 terminate(Reason, State) ->
112 1575 ?LOG_WARNING(#{what => gd_incoming_socket_closed,
113 peer => State#state.peer, server => State#state.host,
114
:-(
reason => Reason, conn_id => State#state.conn_id}),
115 1575 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_CLOSED(State#state.host), 1),
116 1575 catch mod_global_distrib_transport:close(State#state.socket),
117 1575 ignore.
118
119 %%--------------------------------------------------------------------
120 %% Helpers
121 %%--------------------------------------------------------------------
122
123 prepare_opts(Opts0) ->
124 28 case lists:keyfind(local_host, 1, Opts0) of
125 {local_host, LocalHost} ->
126 28 [{endpoints, [{LocalHost, 5555}]} | Opts0];
127 _ ->
128
:-(
error({missing_option, local_host}, [Opts0])
129 end.
130
131 -spec start() -> any().
132 start() ->
133 28 opt(tls_opts), %% Check for required tls_opts
134 28 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, histogram),
135 28 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, spiral),
136 28 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_ERRORED(undefined), spiral),
137 28 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_CLOSED(undefined), spiral),
138 28 ChildMod = mod_global_distrib_worker_sup,
139 28 Child = {ChildMod, {ChildMod, start_link, []}, permanent, 10000, supervisor, [ChildMod]},
140 28 ejabberd_sup:start_child(Child),
141 28 Endpoints = mod_global_distrib_utils:resolve_endpoints(opt(endpoints)),
142 28 ets:insert(?MODULE, {endpoints, Endpoints}),
143 28 start_listeners().
144
145 -spec stop() -> any().
146 stop() ->
147 28 stop_listeners(),
148 28 ejabberd_sup:stop_child(mod_global_distrib_worker_sup).
149
150 -spec opt(Key :: atom()) -> term().
151 opt(Key) ->
152 1727 mod_global_distrib_utils:opt(?MODULE, Key).
153
154 do_setopts_and_receive_data(Socket, Buffer, RawData, State) ->
155 5364 SetOptsResult = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
156 5364 case SetOptsResult of
157 ok ->
158 5364 do_receive_data(Socket, Buffer, RawData, State);
159 {error, closed} ->
160
:-(
{stop, normal, State};
161 _ ->
162
:-(
{stop, {setopts_failed, SetOptsResult}, State}
163 end.
164
165 do_receive_data(Socket, Buffer, RawData, State) ->
166 5364 case mod_global_distrib_transport:recv_data(Socket, RawData) of
167 {ok, Data} ->
168 5364 NewState = handle_buffered(State#state{buffer = <<Buffer/binary, Data/binary>>}),
169 5364 {noreply, NewState};
170 {error, closed} ->
171
:-(
{stop, normal, State};
172 Other ->
173
:-(
{stop, {recv_data_failed, Other}, State}
174 end.
175
176 -spec handle_data(Data :: binary(), state()) -> state().
177 handle_data(GdStart, State = #state{host = undefined}) ->
178 1554 {ok, #xmlel{name = <<"gd_start">>, attrs = Attrs}} = exml:parse(GdStart),
179 1554 #{<<"server">> := BinHost, <<"conn_id">> := ConnId} = maps:from_list(Attrs),
180 1554 Host = mod_global_distrib_utils:binary_to_metric_atom(BinHost),
181 1554 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), spiral),
182 1554 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_TRANSFER_TIME(Host), histogram),
183 1554 mod_global_distrib_utils:ensure_metric(
184 ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), spiral),
185 1554 mod_global_distrib_utils:ensure_metric(
186 ?GLOBAL_DISTRIB_INCOMING_ERRORED(Host), spiral),
187 1554 mod_global_distrib_utils:ensure_metric(
188 ?GLOBAL_DISTRIB_INCOMING_CLOSED(Host), spiral),
189 1554 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), 1),
190 1554 ?LOG_INFO(#{what => gd_incoming_connection, server => BinHost, conn_id => ConnId}),
191 1554 State#state{host = Host, conn_id = ConnId};
192 handle_data(Data, State = #state{host = Host}) ->
193 336 <<ClockTime:64, BinFromSize:16, _/binary>> = Data,
194 336 TransferTime = erlang:system_time(microsecond) - ClockTime,
195 336 <<_:80, BinFrom:BinFromSize/binary, BinTerm/binary>> = Data,
196 336 Worker = mod_global_distrib_worker_sup:get_worker(BinFrom),
197 336 Stamp = erlang:monotonic_time(),
198 336 ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Host, TransferTime, Stamp, BinTerm}),
199 336 State.
200
201 -spec handle_buffered(state()) -> state().
202 handle_buffered(#state{waiting_for = header, buffer = <<Header:4/binary, Rest/binary>>} = State) ->
203 1890 Size = binary:decode_unsigned(Header),
204 1890 handle_buffered(State#state{waiting_for = Size, buffer = Rest});
205 handle_buffered(#state{waiting_for = Size, buffer = Buffer} = State)
206 when byte_size(Buffer) >= Size ->
207 1890 <<Data:Size/binary, Rest/binary>> = Buffer,
208 1890 NewState = handle_data(Data, State),
209 1890 handle_buffered(NewState#state{waiting_for = header, buffer = Rest});
210 handle_buffered(State) ->
211 5364 State.
212
213 -spec endpoints() -> [mod_global_distrib_utils:endpoint()].
214 endpoints() ->
215 96 opt(endpoints).
216
217 -spec start_listeners() -> any().
218 start_listeners() ->
219 28 [start_listener(Endpoint, ?LISTEN_RETRIES) || Endpoint <- endpoints()],
220 28 ok.
221
222 -spec start_listener(mod_global_distrib_utils:endpoint(),
223 RetriesLeft :: non_neg_integer()) -> any().
224 start_listener({Addr, Port} = Ref, RetriesLeft) ->
225 30 ?LOG_INFO(#{what => gd_start_listener, address => Addr, port => Port}),
226 30 case ranch:start_listener(Ref, 10, ranch_tcp, [{ip, Addr}, {port, Port}], ?MODULE, []) of
227 30 {ok, _} -> ok;
228 {error, eaddrinuse} when RetriesLeft > 0 ->
229
:-(
?LOG_ERROR(#{what => gd_start_listener_failed, address => Addr, port => Port,
230
:-(
text => <<"Failed to start listener: address in use. Will retry in 1 second.">>}),
231
:-(
timer:sleep(?LISTEN_RETRY_DELAY),
232
:-(
start_listener(Ref, RetriesLeft - 1)
233 end.
234
235 -spec stop_listeners() -> any().
236 stop_listeners() ->
237 28 lists:foreach(fun ranch:stop_listener/1, endpoints()).
Line Hits Source