1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_receiver). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_mod). |
21 |
|
-behaviour(ranch_protocol). |
22 |
|
-behaviour(gen_server). |
23 |
|
-behaviour(mongoose_module_metrics). |
24 |
|
|
25 |
|
-include("mongoose.hrl"). |
26 |
|
-include("jlib.hrl"). |
27 |
|
-include("global_distrib_metrics.hrl"). |
28 |
|
|
29 |
|
-export([start_link/3]). |
30 |
|
-export([start/2, stop/1, deps/2]). |
31 |
|
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]). |
32 |
|
|
33 |
|
-define(LISTEN_RETRIES, 5). %% Number of retries in case of eaddrinuse |
34 |
|
-define(LISTEN_RETRY_DELAY, 1000). %% Milliseconds to retrying in case of eaddrinuse |
35 |
|
|
36 |
|
-record(state, { |
37 |
|
socket :: mod_global_distrib_transport:t(), |
38 |
|
waiting_for :: header | non_neg_integer(), |
39 |
|
buffer = <<>> :: binary(), |
40 |
|
host :: undefined | atom(), |
41 |
|
conn_id = <<>> :: binary(), |
42 |
|
peer :: tuple() | unknown |
43 |
|
}). |
44 |
|
|
45 |
|
-type state() :: #state{}. |
46 |
|
|
47 |
|
%%-------------------------------------------------------------------- |
48 |
|
%% API |
49 |
|
%%-------------------------------------------------------------------- |
50 |
|
|
51 |
|
-spec start_link(Ref :: reference(), Transport :: ranch_tcp, |
52 |
|
Opts :: [term()]) -> {ok, pid()}. |
53 |
|
start_link(Ref, ranch_tcp, Opts) -> |
54 |
589 |
Pid = proc_lib:spawn_link(?MODULE, init, [{Ref, ranch_tcp, Opts}]), |
55 |
589 |
{ok, Pid}. |
56 |
|
|
57 |
|
%%-------------------------------------------------------------------- |
58 |
|
%% gen_mod API |
59 |
|
%%-------------------------------------------------------------------- |
60 |
|
|
61 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
62 |
|
start(_HostType, _Opts) -> |
63 |
28 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, histogram), |
64 |
28 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, spiral), |
65 |
28 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_ERRORED(undefined), spiral), |
66 |
28 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_CLOSED(undefined), spiral), |
67 |
28 |
ChildMod = mod_global_distrib_worker_sup, |
68 |
28 |
Child = {ChildMod, {ChildMod, start_link, []}, permanent, 10000, supervisor, [ChildMod]}, |
69 |
28 |
ejabberd_sup:start_child(Child), |
70 |
28 |
start_listeners(). |
71 |
|
|
72 |
|
-spec stop(mongooseim:host_type()) -> any(). |
73 |
|
stop(_HostType) -> |
74 |
28 |
stop_listeners(), |
75 |
28 |
ejabberd_sup:stop_child(mod_global_distrib_worker_sup). |
76 |
|
|
77 |
|
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps(). |
78 |
|
deps(_HostType, Opts) -> |
79 |
113 |
[{mod_global_distrib_utils, Opts, hard}]. |
80 |
|
|
81 |
|
%%-------------------------------------------------------------------- |
82 |
|
%% ranch_protocol API |
83 |
|
%%-------------------------------------------------------------------- |
84 |
|
|
85 |
|
init({Ref, ranch_tcp, _Opts}) -> |
86 |
589 |
process_flag(trap_exit, true), |
87 |
589 |
{ok, RawSocket} = ranch:handshake(Ref), |
88 |
589 |
ConnOpts = opt(connections), |
89 |
589 |
{ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, ConnOpts), |
90 |
589 |
ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]), |
91 |
589 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, 1), |
92 |
589 |
State = #state{socket = Socket, waiting_for = header, |
93 |
|
peer = mod_global_distrib_transport:peername(Socket)}, |
94 |
589 |
gen_server:enter_loop(?MODULE, [], State). |
95 |
|
|
96 |
|
%%-------------------------------------------------------------------- |
97 |
|
%% gen_server API |
98 |
|
%%-------------------------------------------------------------------- |
99 |
|
|
100 |
|
handle_info({tcp, _Socket, RawData}, #state{socket = Socket, buffer = Buffer} = State) -> |
101 |
2208 |
do_setopts_and_receive_data(Socket, Buffer, RawData, State); |
102 |
|
handle_info({tcp_closed, _Socket}, State) -> |
103 |
566 |
{stop, normal, State}; |
104 |
|
handle_info({tcp_error, _Socket, Reason}, State) -> |
105 |
:-( |
?LOG_ERROR(#{what => gd_incoming_socket_error, reason => Reason, |
106 |
|
text => <<"mod_global_distrib_receiver received tcp_error">>, |
107 |
:-( |
peer => State#state.peer, conn_id => State#state.conn_id}), |
108 |
:-( |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ERRORED(State#state.host), 1), |
109 |
:-( |
{stop, {error, Reason}, State}; |
110 |
|
handle_info(Msg, State) -> |
111 |
:-( |
?UNEXPECTED_INFO(Msg), |
112 |
:-( |
{noreply, State}. |
113 |
|
|
114 |
|
handle_cast(_Message, _State) -> |
115 |
:-( |
exit(bad_cast). |
116 |
|
|
117 |
|
handle_call(_Message, _From, _State) -> |
118 |
:-( |
exit(bad_call). |
119 |
|
|
120 |
|
code_change(_Version, State, _Extra) -> |
121 |
:-( |
{ok, State}. |
122 |
|
|
123 |
|
terminate(Reason, State) -> |
124 |
589 |
?LOG_WARNING(#{what => gd_incoming_socket_closed, |
125 |
|
peer => State#state.peer, server => State#state.host, |
126 |
:-( |
reason => Reason, conn_id => State#state.conn_id}), |
127 |
589 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_CLOSED(State#state.host), 1), |
128 |
589 |
catch mod_global_distrib_transport:close(State#state.socket), |
129 |
589 |
ignore. |
130 |
|
|
131 |
|
%%-------------------------------------------------------------------- |
132 |
|
%% Helpers |
133 |
|
%%-------------------------------------------------------------------- |
134 |
|
|
135 |
|
-spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value(). |
136 |
|
opt(Key) -> |
137 |
645 |
mod_global_distrib_utils:opt(?MODULE, Key). |
138 |
|
|
139 |
|
do_setopts_and_receive_data(Socket, Buffer, RawData, State) -> |
140 |
2208 |
SetOptsResult = mod_global_distrib_transport:setopts(Socket, [{active, once}]), |
141 |
2208 |
case SetOptsResult of |
142 |
|
ok -> |
143 |
2208 |
do_receive_data(Socket, Buffer, RawData, State); |
144 |
|
{error, closed} -> |
145 |
:-( |
{stop, normal, State}; |
146 |
|
_ -> |
147 |
:-( |
{stop, {setopts_failed, SetOptsResult}, State} |
148 |
|
end. |
149 |
|
|
150 |
|
do_receive_data(Socket, Buffer, RawData, State) -> |
151 |
2208 |
case mod_global_distrib_transport:recv_data(Socket, RawData) of |
152 |
|
{ok, Data} -> |
153 |
2200 |
NewState = handle_buffered(State#state{buffer = <<Buffer/binary, Data/binary>>}), |
154 |
2200 |
{noreply, NewState}; |
155 |
|
{error, closed} -> |
156 |
7 |
{stop, normal, State}; |
157 |
|
Other -> |
158 |
1 |
{stop, {recv_data_failed, Other}, State} |
159 |
|
end. |
160 |
|
|
161 |
|
-spec handle_data(Data :: binary(), state()) -> state(). |
162 |
|
handle_data(GdStart, State = #state{host = undefined}) -> |
163 |
556 |
{ok, #xmlel{name = <<"gd_start">>, attrs = Attrs}} = exml:parse(GdStart), |
164 |
556 |
#{<<"server">> := BinHost, <<"conn_id">> := ConnId} = maps:from_list(Attrs), |
165 |
556 |
Host = mod_global_distrib_utils:binary_to_metric_atom(BinHost), |
166 |
556 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), spiral), |
167 |
556 |
mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_TRANSFER_TIME(Host), histogram), |
168 |
556 |
mod_global_distrib_utils:ensure_metric( |
169 |
|
?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), spiral), |
170 |
556 |
mod_global_distrib_utils:ensure_metric( |
171 |
|
?GLOBAL_DISTRIB_INCOMING_ERRORED(Host), spiral), |
172 |
556 |
mod_global_distrib_utils:ensure_metric( |
173 |
|
?GLOBAL_DISTRIB_INCOMING_CLOSED(Host), spiral), |
174 |
556 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), 1), |
175 |
556 |
?LOG_INFO(#{what => gd_incoming_connection, server => BinHost, conn_id => ConnId}), |
176 |
556 |
State#state{host = Host, conn_id = ConnId}; |
177 |
|
handle_data(Data, State = #state{host = Host}) -> |
178 |
341 |
<<ClockTime:64, BinFromSize:16, _/binary>> = Data, |
179 |
341 |
TransferTime = erlang:system_time(microsecond) - ClockTime, |
180 |
341 |
<<_:80, BinFrom:BinFromSize/binary, BinTerm/binary>> = Data, |
181 |
341 |
Worker = mod_global_distrib_worker_sup:get_worker(BinFrom), |
182 |
341 |
Stamp = erlang:monotonic_time(), |
183 |
341 |
ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Host, TransferTime, Stamp, BinTerm}), |
184 |
341 |
State. |
185 |
|
|
186 |
|
-spec handle_buffered(state()) -> state(). |
187 |
|
handle_buffered(#state{waiting_for = header, buffer = <<Header:4/binary, Rest/binary>>} = State) -> |
188 |
897 |
Size = binary:decode_unsigned(Header), |
189 |
897 |
handle_buffered(State#state{waiting_for = Size, buffer = Rest}); |
190 |
|
handle_buffered(#state{waiting_for = Size, buffer = Buffer} = State) |
191 |
|
when byte_size(Buffer) >= Size -> |
192 |
897 |
<<Data:Size/binary, Rest/binary>> = Buffer, |
193 |
897 |
NewState = handle_data(Data, State), |
194 |
897 |
handle_buffered(NewState#state{waiting_for = header, buffer = Rest}); |
195 |
|
handle_buffered(State) -> |
196 |
2200 |
State. |
197 |
|
|
198 |
|
-spec start_listeners() -> any(). |
199 |
|
start_listeners() -> |
200 |
28 |
[start_listener(Endpoint, ?LISTEN_RETRIES) || Endpoint <- endpoints()], |
201 |
28 |
ok. |
202 |
|
|
203 |
|
-spec start_listener(mod_global_distrib_utils:endpoint(), |
204 |
|
RetriesLeft :: non_neg_integer()) -> any(). |
205 |
|
start_listener({Addr, Port} = Ref, RetriesLeft) -> |
206 |
30 |
?LOG_INFO(#{what => gd_start_listener, address => Addr, port => Port}), |
207 |
30 |
SocketOpts = [{ip, Addr}, {port, Port}], |
208 |
30 |
RanchOpts = #{max_connections => infinity, num_acceptors => 10, socket_opts => SocketOpts}, |
209 |
30 |
case ranch:start_listener(Ref, ranch_tcp, RanchOpts, ?MODULE, []) of |
210 |
30 |
{ok, _} -> ok; |
211 |
|
{error, eaddrinuse} when RetriesLeft > 0 -> |
212 |
:-( |
?LOG_ERROR(#{what => gd_start_listener_failed, address => Addr, port => Port, |
213 |
:-( |
text => <<"Failed to start listener: address in use. Will retry in 1 second.">>}), |
214 |
:-( |
timer:sleep(?LISTEN_RETRY_DELAY), |
215 |
:-( |
start_listener(Ref, RetriesLeft - 1) |
216 |
|
end. |
217 |
|
|
218 |
|
-spec stop_listeners() -> any(). |
219 |
|
stop_listeners() -> |
220 |
28 |
lists:foreach(fun ranch:stop_listener/1, endpoints()). |
221 |
|
|
222 |
|
-spec endpoints() -> [mod_global_distrib_utils:endpoint()]. |
223 |
|
endpoints() -> |
224 |
56 |
opt([connections, resolved_endpoints]). |