./ct_report/coverage/mod_global_distrib_receiver.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_receiver).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_mod).
21 -behaviour(ranch_protocol).
22 -behaviour(gen_server).
23 -behaviour(mongoose_module_metrics).
24
25 -include("mongoose.hrl").
26 -include("jlib.hrl").
27 -include("global_distrib_metrics.hrl").
28
29 -export([endpoints/0, start_link/4]).
30 -export([start/2, stop/1]).
31 -export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
32
33 -define(LISTEN_RETRIES, 5). %% Number of retries in case of eaddrinuse
34 -define(LISTEN_RETRY_DELAY, 1000). %% Milliseconds to retrying in case of eaddrinuse
35
36 -record(state, {
37 socket :: mod_global_distrib_transport:t(),
38 waiting_for :: header | non_neg_integer(),
39 buffer = <<>> :: binary(),
40 host :: undefined | atom(),
41 conn_id = <<>> :: binary(),
42 peer :: tuple() | unknown
43 }).
44
45 -type state() :: #state{}.
46
47 %%--------------------------------------------------------------------
48 %% API
49 %%--------------------------------------------------------------------
50
51 -spec start_link(Ref :: reference(), Socket :: gen_tcp:socket(), Transport :: ranch_tcp,
52 Opts :: [term()]) -> {ok, pid()}.
53 start_link(Ref, Socket, ranch_tcp, Opts) ->
54
:-(
Pid = proc_lib:spawn_link(?MODULE, init, [{Ref, Socket, Opts}]),
55
:-(
{ok, Pid}.
56
57 %%--------------------------------------------------------------------
58 %% gen_mod API
59 %%--------------------------------------------------------------------
60
61 -spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
62 start(Host, Opts0) ->
63
:-(
Opts = prepare_opts(Opts0),
64
:-(
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
65
66 -spec stop(Host :: jid:lserver()) -> any().
67 stop(Host) ->
68
:-(
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
69
70 %%--------------------------------------------------------------------
71 %% ranch_protocol API
72 %%--------------------------------------------------------------------
73
74 init({Ref, RawSocket, _Opts}) ->
75
:-(
process_flag(trap_exit, true),
76
:-(
ok = ranch:accept_ack(Ref),
77
:-(
{ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, opt(tls_opts)),
78
:-(
ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
79
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, 1),
80
:-(
State = #state{socket = Socket, waiting_for = header,
81 peer = mod_global_distrib_transport:peername(Socket)},
82
:-(
gen_server:enter_loop(?MODULE, [], State).
83
84 %%--------------------------------------------------------------------
85 %% gen_server API
86 %%--------------------------------------------------------------------
87
88 handle_info({tcp, _Socket, RawData}, #state{socket = Socket, buffer = Buffer} = State) ->
89
:-(
do_setopts_and_receive_data(Socket, Buffer, RawData, State);
90 handle_info({tcp_closed, _Socket}, State) ->
91
:-(
{stop, normal, State};
92 handle_info({tcp_error, _Socket, Reason}, State) ->
93
:-(
?LOG_ERROR(#{what => gd_incoming_socket_error, reason => Reason,
94 text => <<"mod_global_distrib_receiver received tcp_error">>,
95
:-(
peer => State#state.peer, conn_id => State#state.conn_id}),
96
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ERRORED(State#state.host), 1),
97
:-(
{stop, {error, Reason}, State};
98 handle_info(Msg, State) ->
99
:-(
?UNEXPECTED_INFO(Msg),
100
:-(
{noreply, State}.
101
102 handle_cast(_Message, _State) ->
103
:-(
exit(bad_cast).
104
105 handle_call(_Message, _From, _State) ->
106
:-(
exit(bad_call).
107
108 code_change(_Version, State, _Extra) ->
109
:-(
{ok, State}.
110
111 terminate(Reason, State) ->
112
:-(
?LOG_WARNING(#{what => gd_incoming_socket_closed,
113 peer => State#state.peer, server => State#state.host,
114
:-(
reason => Reason, conn_id => State#state.conn_id}),
115
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_CLOSED(State#state.host), 1),
116
:-(
catch mod_global_distrib_transport:close(State#state.socket),
117
:-(
ignore.
118
119 %%--------------------------------------------------------------------
120 %% Helpers
121 %%--------------------------------------------------------------------
122
123 prepare_opts(Opts0) ->
124
:-(
case lists:keyfind(local_host, 1, Opts0) of
125 {local_host, LocalHost} ->
126
:-(
[{endpoints, [{LocalHost, 5555}]} | Opts0];
127 _ ->
128
:-(
error({missing_option, local_host}, [Opts0])
129 end.
130
131 -spec start() -> any().
132 start() ->
133
:-(
opt(tls_opts), %% Check for required tls_opts
134
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, histogram),
135
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, spiral),
136
:-(
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_ERRORED(undefined), spiral),
137
:-(
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_CLOSED(undefined), spiral),
138
:-(
ChildMod = mod_global_distrib_worker_sup,
139
:-(
Child = {ChildMod, {ChildMod, start_link, []}, permanent, 10000, supervisor, [ChildMod]},
140
:-(
ejabberd_sup:start_child(Child),
141
:-(
Endpoints = mod_global_distrib_utils:resolve_endpoints(opt(endpoints)),
142
:-(
ets:insert(?MODULE, {endpoints, Endpoints}),
143
:-(
start_listeners().
144
145 -spec stop() -> any().
146 stop() ->
147
:-(
stop_listeners(),
148
:-(
ejabberd_sup:stop_child(mod_global_distrib_worker_sup).
149
150 -spec opt(Key :: atom()) -> term().
151 opt(Key) ->
152
:-(
mod_global_distrib_utils:opt(?MODULE, Key).
153
154 do_setopts_and_receive_data(Socket, Buffer, RawData, State) ->
155
:-(
SetOptsResult = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
156
:-(
case SetOptsResult of
157 ok ->
158
:-(
do_receive_data(Socket, Buffer, RawData, State);
159 {error, closed} ->
160
:-(
{stop, normal, State};
161 _ ->
162
:-(
{stop, {setopts_failed, SetOptsResult}, State}
163 end.
164
165 do_receive_data(Socket, Buffer, RawData, State) ->
166
:-(
case mod_global_distrib_transport:recv_data(Socket, RawData) of
167 {ok, Data} ->
168
:-(
NewState = handle_buffered(State#state{buffer = <<Buffer/binary, Data/binary>>}),
169
:-(
{noreply, NewState};
170 {error, closed} ->
171
:-(
{stop, normal, State};
172 Other ->
173
:-(
{stop, {recv_data_failed, Other}, State}
174 end.
175
176 -spec handle_data(Data :: binary(), state()) -> state().
177 handle_data(GdStart, State = #state{host = undefined}) ->
178
:-(
{ok, #xmlel{name = <<"gd_start">>, attrs = Attrs}} = exml:parse(GdStart),
179
:-(
#{<<"server">> := BinHost, <<"conn_id">> := ConnId} = maps:from_list(Attrs),
180
:-(
Host = mod_global_distrib_utils:binary_to_metric_atom(BinHost),
181
:-(
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), spiral),
182
:-(
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_TRANSFER_TIME(Host), histogram),
183
:-(
mod_global_distrib_utils:ensure_metric(
184 ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), spiral),
185
:-(
mod_global_distrib_utils:ensure_metric(
186 ?GLOBAL_DISTRIB_INCOMING_ERRORED(Host), spiral),
187
:-(
mod_global_distrib_utils:ensure_metric(
188 ?GLOBAL_DISTRIB_INCOMING_CLOSED(Host), spiral),
189
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), 1),
190
:-(
?LOG_INFO(#{what => gd_incoming_connection, server => BinHost, conn_id => ConnId}),
191
:-(
State#state{host = Host, conn_id = ConnId};
192 handle_data(Data, State = #state{host = Host}) ->
193
:-(
<<ClockTime:64, BinFromSize:16, _/binary>> = Data,
194
:-(
TransferTime = erlang:system_time(microsecond) - ClockTime,
195
:-(
<<_:80, BinFrom:BinFromSize/binary, BinTerm/binary>> = Data,
196
:-(
Worker = mod_global_distrib_worker_sup:get_worker(BinFrom),
197
:-(
Stamp = erlang:monotonic_time(),
198
:-(
ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Host, TransferTime, Stamp, BinTerm}),
199
:-(
State.
200
201 -spec handle_buffered(state()) -> state().
202 handle_buffered(#state{waiting_for = header, buffer = <<Header:4/binary, Rest/binary>>} = State) ->
203
:-(
Size = binary:decode_unsigned(Header),
204
:-(
handle_buffered(State#state{waiting_for = Size, buffer = Rest});
205 handle_buffered(#state{waiting_for = Size, buffer = Buffer} = State)
206 when byte_size(Buffer) >= Size ->
207
:-(
<<Data:Size/binary, Rest/binary>> = Buffer,
208
:-(
NewState = handle_data(Data, State),
209
:-(
handle_buffered(NewState#state{waiting_for = header, buffer = Rest});
210 handle_buffered(State) ->
211
:-(
State.
212
213 -spec endpoints() -> [mod_global_distrib_utils:endpoint()].
214 endpoints() ->
215
:-(
opt(endpoints).
216
217 -spec start_listeners() -> any().
218 start_listeners() ->
219
:-(
[start_listener(Endpoint, ?LISTEN_RETRIES) || Endpoint <- endpoints()],
220
:-(
ok.
221
222 -spec start_listener(mod_global_distrib_utils:endpoint(),
223 RetriesLeft :: non_neg_integer()) -> any().
224 start_listener({Addr, Port} = Ref, RetriesLeft) ->
225
:-(
?LOG_INFO(#{what => gd_start_listener, address => Addr, port => Port}),
226
:-(
case ranch:start_listener(Ref, 10, ranch_tcp, [{ip, Addr}, {port, Port}], ?MODULE, []) of
227
:-(
{ok, _} -> ok;
228 {error, eaddrinuse} when RetriesLeft > 0 ->
229
:-(
?LOG_ERROR(#{what => gd_start_listener_failed, address => Addr, port => Port,
230
:-(
text => <<"Failed to start listener: address in use. Will retry in 1 second.">>}),
231
:-(
timer:sleep(?LISTEN_RETRY_DELAY),
232
:-(
start_listener(Ref, RetriesLeft - 1)
233 end.
234
235 -spec stop_listeners() -> any().
236 stop_listeners() ->
237
:-(
lists:foreach(fun ranch:stop_listener/1, endpoints()).
Line Hits Source