1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_receiver). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_mod). |
21 |
|
-behaviour(ranch_protocol). |
22 |
|
-behaviour(gen_server). |
23 |
|
-behaviour(mongoose_module_metrics). |
24 |
|
|
25 |
|
-include("mongoose.hrl"). |
26 |
|
-include("jlib.hrl"). |
27 |
|
-include("global_distrib_metrics.hrl"). |
28 |
|
|
29 |
|
-export([endpoints/0, start_link/4]). |
30 |
|
-export([start/2, stop/1]). |
31 |
|
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]). |
32 |
|
|
33 |
|
-define(LISTEN_RETRIES, 5). %% Number of retries in case of eaddrinuse |
34 |
|
-define(LISTEN_RETRY_DELAY, 1000). %% Milliseconds to retrying in case of eaddrinuse |
35 |
|
|
36 |
|
-record(state, { |
37 |
|
socket :: mod_global_distrib_transport:t(), |
38 |
|
waiting_for :: header | non_neg_integer(), |
39 |
|
buffer = <<>> :: binary(), |
40 |
|
host :: undefined | atom(), |
41 |
|
conn_id = <<>> :: binary(), |
42 |
|
peer :: tuple() | unknown |
43 |
|
}). |
44 |
|
|
45 |
|
-type state() :: #state{}. |
46 |
|
|
47 |
|
%%-------------------------------------------------------------------- |
48 |
|
%% API |
49 |
|
%%-------------------------------------------------------------------- |
50 |
|
|
51 |
|
-spec start_link(Ref :: reference(), Socket :: gen_tcp:socket(), Transport :: ranch_tcp, |
52 |
|
Opts :: [term()]) -> {ok, pid()}. |
53 |
|
start_link(Ref, Socket, ranch_tcp, Opts) -> |
54 |
1561 |
Pid = proc_lib:spawn_link(?MODULE, init, [{Ref, Socket, Opts}]), |
55 |
1561 |
{ok, Pid}. |
56 |
|
|
57 |
|
%%-------------------------------------------------------------------- |
58 |
|
%% gen_mod API |
59 |
|
%%-------------------------------------------------------------------- |
60 |
|
|
61 |
|
-spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any(). |
62 |
|
start(Host, Opts0) -> |
63 |
28 |
Opts = prepare_opts(Opts0), |
64 |
28 |
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0). |
65 |
|
|
66 |
|
-spec stop(Host :: jid:lserver()) -> any(). |
67 |
|
stop(Host) -> |
68 |
28 |
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0). |
69 |
|
|
70 |
|
%%-------------------------------------------------------------------- |
71 |
|
%% ranch_protocol API |
72 |
|
%%-------------------------------------------------------------------- |
73 |
|
|
74 |
|
init({Ref, RawSocket, _Opts}) -> |
75 |
1561 |
process_flag(trap_exit, true), |
76 |
1561 |
ok = ranch:accept_ack(Ref), |
77 |
1561 |
{ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, opt(tls_opts)), |
78 |
1561 |
ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]), |
79 |
1561 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, 1), |
80 |
1561 |
State = #state{socket = Socket, waiting_for = header, |
81 |
|
peer = mod_global_distrib_transport:peername(Socket)}, |
82 |
1561 |
gen_server:enter_loop(?MODULE, [], State). |
83 |
|
|
84 |
|
%%-------------------------------------------------------------------- |
85 |
|
%% gen_server API |
86 |
|
%%-------------------------------------------------------------------- |
87 |
|
|
88 |
|
handle_info({tcp, _Socket, RawData}, #state{socket = Socket, buffer = Buffer} = State) -> |
89 |
5317 |
do_setopts_and_receive_data(Socket, Buffer, RawData, State); |
90 |
|
handle_info({tcp_closed, _Socket}, State) -> |
91 |
1548 |
{stop, normal, State}; |
92 |
|
handle_info({tcp_error, _Socket, Reason}, State) -> |
93 |
:-( |
?LOG_ERROR(#{what => gd_incoming_socket_error, reason => Reason, |
94 |
|
text => <<"mod_global_distrib_receiver received tcp_error">>, |
95 |
:-( |
peer => State#state.peer, conn_id => State#state.conn_id}), |
96 |
:-( |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ERRORED(State#state.host), 1), |
97 |
:-( |
{stop, {error, Reason}, State}; |
98 |
|
handle_info(Msg, State) -> |
99 |
:-( |
?UNEXPECTED_INFO(Msg), |
100 |
:-( |
{noreply, State}. |
101 |
|
|
102 |
|
handle_cast(_Message, _State) -> |
103 |
:-( |
exit(bad_cast). |
104 |
|
|
105 |
|
handle_call(_Message, _From, _State) -> |
106 |
:-( |
exit(bad_call). |
107 |
|
|
108 |
|
code_change(_Version, State, _Extra) -> |
109 |
:-( |
{ok, State}. |
110 |
|
|
111 |
|
terminate(Reason, State) -> |
112 |
1561 |
?LOG_WARNING(#{what => gd_incoming_socket_closed, |
113 |
|
peer => State#state.peer, server => State#state.host, |
114 |
:-( |
reason => Reason, conn_id => State#state.conn_id}), |
115 |
1561 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_CLOSED(State#state.host), 1), |
116 |
1561 |
catch mod_global_distrib_transport:close(State#state.socket), |
117 |
1561 |
ignore. |
118 |
|
|
119 |
|
%%-------------------------------------------------------------------- |
120 |
|
%% Helpers |
121 |
|
%%-------------------------------------------------------------------- |
122 |
|
|
123 |
|
prepare_opts(Opts0) -> |
124 |
28 |
case lists:keyfind(local_host, 1, Opts0) of |
125 |
|
{local_host, LocalHost} -> |
126 |
28 |
[{endpoints, [{LocalHost, 5555}]} | Opts0]; |
127 |
|
_ -> |
128 |
:-( |
error({missing_option, local_host}, [Opts0]) |
129 |
|
end. |
130 |
|
|
131 |
|
-spec start() -> any(). |
132 |
|
start() -> |
133 |
28 |
opt(tls_opts), %% Check for required tls_opts |
134 |
28 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, histogram), |
135 |
28 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, spiral), |
136 |
28 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_ERRORED(undefined), spiral), |
137 |
28 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_CLOSED(undefined), spiral), |
138 |
28 |
ChildMod = mod_global_distrib_worker_sup, |
139 |
28 |
Child = {ChildMod, {ChildMod, start_link, []}, permanent, 10000, supervisor, [ChildMod]}, |
140 |
28 |
ejabberd_sup:start_child(Child), |
141 |
28 |
Endpoints = mod_global_distrib_utils:resolve_endpoints(opt(endpoints)), |
142 |
28 |
ets:insert(?MODULE, {endpoints, Endpoints}), |
143 |
28 |
start_listeners(). |
144 |
|
|
145 |
|
-spec stop() -> any(). |
146 |
|
stop() -> |
147 |
28 |
stop_listeners(), |
148 |
28 |
ejabberd_sup:stop_child(mod_global_distrib_worker_sup). |
149 |
|
|
150 |
|
-spec opt(Key :: atom()) -> term(). |
151 |
|
opt(Key) -> |
152 |
1713 |
mod_global_distrib_utils:opt(?MODULE, Key). |
153 |
|
|
154 |
|
do_setopts_and_receive_data(Socket, Buffer, RawData, State) -> |
155 |
5317 |
SetOptsResult = mod_global_distrib_transport:setopts(Socket, [{active, once}]), |
156 |
5317 |
case SetOptsResult of |
157 |
|
ok -> |
158 |
5317 |
do_receive_data(Socket, Buffer, RawData, State); |
159 |
|
{error, closed} -> |
160 |
:-( |
{stop, normal, State}; |
161 |
|
_ -> |
162 |
:-( |
{stop, {setopts_failed, SetOptsResult}, State} |
163 |
|
end. |
164 |
|
|
165 |
|
do_receive_data(Socket, Buffer, RawData, State) -> |
166 |
5317 |
case mod_global_distrib_transport:recv_data(Socket, RawData) of |
167 |
|
{ok, Data} -> |
168 |
5317 |
NewState = handle_buffered(State#state{buffer = <<Buffer/binary, Data/binary>>}), |
169 |
5317 |
{noreply, NewState}; |
170 |
|
{error, closed} -> |
171 |
:-( |
{stop, normal, State}; |
172 |
|
Other -> |
173 |
:-( |
{stop, {recv_data_failed, Other}, State} |
174 |
|
end. |
175 |
|
|
176 |
|
-spec handle_data(Data :: binary(), state()) -> state(). |
177 |
|
handle_data(GdStart, State = #state{host = undefined}) -> |
178 |
1539 |
{ok, #xmlel{name = <<"gd_start">>, attrs = Attrs}} = exml:parse(GdStart), |
179 |
1539 |
#{<<"server">> := BinHost, <<"conn_id">> := ConnId} = maps:from_list(Attrs), |
180 |
1539 |
Host = mod_global_distrib_utils:binary_to_metric_atom(BinHost), |
181 |
1539 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), spiral), |
182 |
1539 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_TRANSFER_TIME(Host), histogram), |
183 |
1539 |
mod_global_distrib_utils:ensure_metric( |
184 |
|
?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), spiral), |
185 |
1539 |
mod_global_distrib_utils:ensure_metric( |
186 |
|
?GLOBAL_DISTRIB_INCOMING_ERRORED(Host), spiral), |
187 |
1539 |
mod_global_distrib_utils:ensure_metric( |
188 |
|
?GLOBAL_DISTRIB_INCOMING_CLOSED(Host), spiral), |
189 |
1539 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), 1), |
190 |
1539 |
?LOG_INFO(#{what => gd_incoming_connection, server => BinHost, conn_id => ConnId}), |
191 |
1539 |
State#state{host = Host, conn_id = ConnId}; |
192 |
|
handle_data(Data, State = #state{host = Host}) -> |
193 |
336 |
<<ClockTime:64, BinFromSize:16, _/binary>> = Data, |
194 |
336 |
TransferTime = erlang:system_time(microsecond) - ClockTime, |
195 |
336 |
<<_:80, BinFrom:BinFromSize/binary, BinTerm/binary>> = Data, |
196 |
336 |
Worker = mod_global_distrib_worker_sup:get_worker(BinFrom), |
197 |
336 |
Stamp = erlang:monotonic_time(), |
198 |
336 |
ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Host, TransferTime, Stamp, BinTerm}), |
199 |
336 |
State. |
200 |
|
|
201 |
|
-spec handle_buffered(state()) -> state(). |
202 |
|
handle_buffered(#state{waiting_for = header, buffer = <<Header:4/binary, Rest/binary>>} = State) -> |
203 |
1875 |
Size = binary:decode_unsigned(Header), |
204 |
1875 |
handle_buffered(State#state{waiting_for = Size, buffer = Rest}); |
205 |
|
handle_buffered(#state{waiting_for = Size, buffer = Buffer} = State) |
206 |
|
when byte_size(Buffer) >= Size -> |
207 |
1875 |
<<Data:Size/binary, Rest/binary>> = Buffer, |
208 |
1875 |
NewState = handle_data(Data, State), |
209 |
1875 |
handle_buffered(NewState#state{waiting_for = header, buffer = Rest}); |
210 |
|
handle_buffered(State) -> |
211 |
5317 |
State. |
212 |
|
|
213 |
|
-spec endpoints() -> [mod_global_distrib_utils:endpoint()]. |
214 |
|
endpoints() -> |
215 |
96 |
opt(endpoints). |
216 |
|
|
217 |
|
-spec start_listeners() -> any(). |
218 |
|
start_listeners() -> |
219 |
28 |
[start_listener(Endpoint, ?LISTEN_RETRIES) || Endpoint <- endpoints()], |
220 |
28 |
ok. |
221 |
|
|
222 |
|
-spec start_listener(mod_global_distrib_utils:endpoint(), |
223 |
|
RetriesLeft :: non_neg_integer()) -> any(). |
224 |
|
start_listener({Addr, Port} = Ref, RetriesLeft) -> |
225 |
30 |
?LOG_INFO(#{what => gd_start_listener, address => Addr, port => Port}), |
226 |
30 |
case ranch:start_listener(Ref, 10, ranch_tcp, [{ip, Addr}, {port, Port}], ?MODULE, []) of |
227 |
30 |
{ok, _} -> ok; |
228 |
|
{error, eaddrinuse} when RetriesLeft > 0 -> |
229 |
:-( |
?LOG_ERROR(#{what => gd_start_listener_failed, address => Addr, port => Port, |
230 |
:-( |
text => <<"Failed to start listener: address in use. Will retry in 1 second.">>}), |
231 |
:-( |
timer:sleep(?LISTEN_RETRY_DELAY), |
232 |
:-( |
start_listener(Ref, RetriesLeft - 1) |
233 |
|
end. |
234 |
|
|
235 |
|
-spec stop_listeners() -> any(). |
236 |
|
stop_listeners() -> |
237 |
28 |
lists:foreach(fun ranch:stop_listener/1, endpoints()). |