./ct_report/coverage/mod_global_distrib_receiver.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_receiver).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_mod).
21 -behaviour(ranch_protocol).
22 -behaviour(gen_server).
23 -behaviour(mongoose_module_metrics).
24
25 -include("mongoose.hrl").
26 -include("jlib.hrl").
27 -include("global_distrib_metrics.hrl").
28
29 -export([start_link/3]).
30 -export([start/2, stop/1, deps/2]).
31 -export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
32
33 -define(LISTEN_RETRIES, 5). %% Number of retries in case of eaddrinuse
34 -define(LISTEN_RETRY_DELAY, 1000). %% Milliseconds to retrying in case of eaddrinuse
35
36 -record(state, {
37 socket :: mod_global_distrib_transport:t(),
38 waiting_for :: header | non_neg_integer(),
39 buffer = <<>> :: binary(),
40 host :: undefined | atom(),
41 conn_id = <<>> :: binary(),
42 peer :: tuple() | unknown
43 }).
44
45 -type state() :: #state{}.
46
47 %%--------------------------------------------------------------------
48 %% API
49 %%--------------------------------------------------------------------
50
51 -spec start_link(Ref :: reference(), Transport :: ranch_tcp,
52 Opts :: [term()]) -> {ok, pid()}.
53 start_link(Ref, ranch_tcp, Opts) ->
54 601 Pid = proc_lib:spawn_link(?MODULE, init, [{Ref, ranch_tcp, Opts}]),
55 601 {ok, Pid}.
56
57 %%--------------------------------------------------------------------
58 %% gen_mod API
59 %%--------------------------------------------------------------------
60
61 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
62 start(_HostType, _Opts) ->
63 28 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, histogram),
64 28 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, spiral),
65 28 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_ERRORED(undefined), spiral),
66 28 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_INCOMING_CLOSED(undefined), spiral),
67 28 ChildMod = mod_global_distrib_worker_sup,
68 28 Child = {ChildMod, {ChildMod, start_link, []}, permanent, 10000, supervisor, [ChildMod]},
69 28 ejabberd_sup:start_child(Child),
70 28 start_listeners().
71
72 -spec stop(mongooseim:host_type()) -> any().
73 stop(_HostType) ->
74 28 stop_listeners(),
75 28 ejabberd_sup:stop_child(mod_global_distrib_worker_sup).
76
77 -spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
78 deps(_HostType, Opts) ->
79 113 [{mod_global_distrib_utils, Opts, hard}].
80
81 %%--------------------------------------------------------------------
82 %% ranch_protocol API
83 %%--------------------------------------------------------------------
84
85 init({Ref, ranch_tcp, _Opts}) ->
86 601 process_flag(trap_exit, true),
87 601 {ok, RawSocket} = ranch:handshake(Ref),
88 601 ConnOpts = opt(connections),
89 601 {ok, Socket} = mod_global_distrib_transport:wrap(RawSocket, ConnOpts),
90 601 ok = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
91 601 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ESTABLISHED, 1),
92 601 State = #state{socket = Socket, waiting_for = header,
93 peer = mod_global_distrib_transport:peername(Socket)},
94 601 gen_server:enter_loop(?MODULE, [], State).
95
96 %%--------------------------------------------------------------------
97 %% gen_server API
98 %%--------------------------------------------------------------------
99
100 handle_info({tcp, _Socket, RawData}, #state{socket = Socket, buffer = Buffer} = State) ->
101 2227 do_setopts_and_receive_data(Socket, Buffer, RawData, State);
102 handle_info({tcp_closed, _Socket}, State) ->
103 579 {stop, normal, State};
104 handle_info({tcp_error, _Socket, Reason}, State) ->
105
:-(
?LOG_ERROR(#{what => gd_incoming_socket_error, reason => Reason,
106 text => <<"mod_global_distrib_receiver received tcp_error">>,
107
:-(
peer => State#state.peer, conn_id => State#state.conn_id}),
108
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_ERRORED(State#state.host), 1),
109
:-(
{stop, {error, Reason}, State};
110 handle_info(Msg, State) ->
111
:-(
?UNEXPECTED_INFO(Msg),
112
:-(
{noreply, State}.
113
114 handle_cast(_Message, _State) ->
115
:-(
exit(bad_cast).
116
117 handle_call(_Message, _From, _State) ->
118
:-(
exit(bad_call).
119
120 code_change(_Version, State, _Extra) ->
121
:-(
{ok, State}.
122
123 terminate(Reason, State) ->
124 601 ?LOG_WARNING(#{what => gd_incoming_socket_closed,
125 peer => State#state.peer, server => State#state.host,
126
:-(
reason => Reason, conn_id => State#state.conn_id}),
127 601 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_CLOSED(State#state.host), 1),
128 601 catch mod_global_distrib_transport:close(State#state.socket),
129 601 ignore.
130
131 %%--------------------------------------------------------------------
132 %% Helpers
133 %%--------------------------------------------------------------------
134
135 -spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value().
136 opt(Key) ->
137 657 mod_global_distrib_utils:opt(?MODULE, Key).
138
139 do_setopts_and_receive_data(Socket, Buffer, RawData, State) ->
140 2227 SetOptsResult = mod_global_distrib_transport:setopts(Socket, [{active, once}]),
141 2227 case SetOptsResult of
142 ok ->
143 2227 do_receive_data(Socket, Buffer, RawData, State);
144 {error, closed} ->
145
:-(
{stop, normal, State};
146 _ ->
147
:-(
{stop, {setopts_failed, SetOptsResult}, State}
148 end.
149
150 do_receive_data(Socket, Buffer, RawData, State) ->
151 2227 case mod_global_distrib_transport:recv_data(Socket, RawData) of
152 {ok, Data} ->
153 2219 NewState = handle_buffered(State#state{buffer = <<Buffer/binary, Data/binary>>}),
154 2219 {noreply, NewState};
155 {error, closed} ->
156 8 {stop, normal, State};
157 Other ->
158
:-(
{stop, {recv_data_failed, Other}, State}
159 end.
160
161 -spec handle_data(Data :: binary(), state()) -> state().
162 handle_data(GdStart, State = #state{host = undefined}) ->
163 570 {ok, #xmlel{name = <<"gd_start">>, attrs = Attrs}} = exml:parse(GdStart),
164 570 #{<<"server">> := BinHost, <<"conn_id">> := ConnId} = maps:from_list(Attrs),
165 570 Host = mod_global_distrib_utils:binary_to_metric_atom(BinHost),
166 570 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), spiral),
167 570 mod_global_distrib_utils:ensure_metric(?GLOBAL_DISTRIB_TRANSFER_TIME(Host), histogram),
168 570 mod_global_distrib_utils:ensure_metric(
169 ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), spiral),
170 570 mod_global_distrib_utils:ensure_metric(
171 ?GLOBAL_DISTRIB_INCOMING_ERRORED(Host), spiral),
172 570 mod_global_distrib_utils:ensure_metric(
173 ?GLOBAL_DISTRIB_INCOMING_CLOSED(Host), spiral),
174 570 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_INCOMING_FIRST_PACKET(Host), 1),
175 570 ?LOG_INFO(#{what => gd_incoming_connection, server => BinHost, conn_id => ConnId}),
176 570 State#state{host = Host, conn_id = ConnId};
177 handle_data(Data, State = #state{host = Host}) ->
178 341 <<ClockTime:64, BinFromSize:16, _/binary>> = Data,
179 341 TransferTime = erlang:system_time(microsecond) - ClockTime,
180 341 <<_:80, BinFrom:BinFromSize/binary, BinTerm/binary>> = Data,
181 341 Worker = mod_global_distrib_worker_sup:get_worker(BinFrom),
182 341 Stamp = erlang:monotonic_time(),
183 341 ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Host, TransferTime, Stamp, BinTerm}),
184 341 State.
185
186 -spec handle_buffered(state()) -> state().
187 handle_buffered(#state{waiting_for = header, buffer = <<Header:4/binary, Rest/binary>>} = State) ->
188 911 Size = binary:decode_unsigned(Header),
189 911 handle_buffered(State#state{waiting_for = Size, buffer = Rest});
190 handle_buffered(#state{waiting_for = Size, buffer = Buffer} = State)
191 when byte_size(Buffer) >= Size ->
192 911 <<Data:Size/binary, Rest/binary>> = Buffer,
193 911 NewState = handle_data(Data, State),
194 911 handle_buffered(NewState#state{waiting_for = header, buffer = Rest});
195 handle_buffered(State) ->
196 2219 State.
197
198 -spec start_listeners() -> any().
199 start_listeners() ->
200 28 [start_listener(Endpoint, ?LISTEN_RETRIES) || Endpoint <- endpoints()],
201 28 ok.
202
203 -spec start_listener(mod_global_distrib_utils:endpoint(),
204 RetriesLeft :: non_neg_integer()) -> any().
205 start_listener({Addr, Port} = Ref, RetriesLeft) ->
206 30 ?LOG_INFO(#{what => gd_start_listener, address => Addr, port => Port}),
207 30 SocketOpts = [{ip, Addr}, {port, Port}],
208 30 RanchOpts = #{max_connections => infinity, num_acceptors => 10, socket_opts => SocketOpts},
209 30 case ranch:start_listener(Ref, ranch_tcp, RanchOpts, ?MODULE, []) of
210 30 {ok, _} -> ok;
211 {error, eaddrinuse} when RetriesLeft > 0 ->
212
:-(
?LOG_ERROR(#{what => gd_start_listener_failed, address => Addr, port => Port,
213
:-(
text => <<"Failed to start listener: address in use. Will retry in 1 second.">>}),
214
:-(
timer:sleep(?LISTEN_RETRY_DELAY),
215
:-(
start_listener(Ref, RetriesLeft - 1)
216 end.
217
218 -spec stop_listeners() -> any().
219 stop_listeners() ->
220 28 lists:foreach(fun ranch:stop_listener/1, endpoints()).
221
222 -spec endpoints() -> [mod_global_distrib_utils:endpoint()].
223 endpoints() ->
224 56 opt([connections, resolved_endpoints]).
Line Hits Source