1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
%% Routing worker |
18 |
|
|
19 |
|
-module(mod_global_distrib_worker). |
20 |
|
-author('konrad.zemek@erlang-solutions.com'). |
21 |
|
|
22 |
|
-behaviour(gen_server). |
23 |
|
|
24 |
|
-include("mongoose.hrl"). |
25 |
|
-include("jlib.hrl"). |
26 |
|
-include("global_distrib_metrics.hrl"). |
27 |
|
|
28 |
|
-define(TIMEOUT, 60000). % TODO |
29 |
|
|
30 |
|
-export([start_link/1]). |
31 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). |
32 |
|
|
33 |
|
-ignore_xref([start_link/1]). |
34 |
|
|
35 |
|
%%-------------------------------------------------------------------- |
36 |
|
%% API |
37 |
|
%%-------------------------------------------------------------------- |
38 |
|
|
39 |
|
-spec start_link(Name :: atom()) -> {ok, pid()} | {error, any()}. |
40 |
|
start_link(Name) -> |
41 |
32 |
gen_server:start_link({local, Name}, ?MODULE, [], []). |
42 |
|
|
43 |
|
%%-------------------------------------------------------------------- |
44 |
|
%% gen_server API |
45 |
|
%%-------------------------------------------------------------------- |
46 |
|
|
47 |
|
init(_) -> |
48 |
32 |
{ok, state, ?TIMEOUT}. |
49 |
|
|
50 |
|
handle_call({data, Host, TransferTime, Stamp, Data}, From, State) -> |
51 |
:-( |
gen_server:reply(From, ok), |
52 |
:-( |
handle_cast({data, Host, TransferTime, Stamp, Data}, State). |
53 |
|
|
54 |
|
handle_cast({route, {From, To, Acc, Packet}}, State) -> |
55 |
33 |
ejabberd_router:route(From, To, Acc, Packet), |
56 |
33 |
{noreply, State, ?TIMEOUT}; |
57 |
|
handle_cast({data, Host, TransferTime, Stamp, Data}, State) -> |
58 |
341 |
QueueTimeNative = erlang:monotonic_time() - Stamp, |
59 |
341 |
QueueTimeUS = erlang:convert_time_unit(QueueTimeNative, native, microsecond), |
60 |
341 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, QueueTimeUS), |
61 |
341 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_TRANSFER_TIME(Host), TransferTime), |
62 |
341 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), 1), |
63 |
341 |
do_work(Data), |
64 |
341 |
{noreply, State, ?TIMEOUT}. |
65 |
|
|
66 |
|
handle_info(timeout, State) -> |
67 |
:-( |
{stop, {shutdown, spinning_down_idle_worker}, State}; |
68 |
|
handle_info(_, State) -> |
69 |
:-( |
{noreply, State, ?TIMEOUT}. |
70 |
|
|
71 |
|
code_change(_FromVersion, State, _Extra) -> |
72 |
:-( |
{ok, State}. |
73 |
|
|
74 |
|
terminate(_Reason, _State) -> |
75 |
:-( |
ok. |
76 |
|
|
77 |
|
%%-------------------------------------------------------------------- |
78 |
|
%% Helpers |
79 |
|
%%-------------------------------------------------------------------- |
80 |
|
|
81 |
|
-spec do_work(Data :: binary()) -> any(). |
82 |
|
do_work(Data) -> |
83 |
341 |
{From, To, Acc, Packet} = erlang:binary_to_term(Data), |
84 |
341 |
mod_global_distrib_utils:maybe_update_mapping(From, Acc), |
85 |
341 |
?LOG_DEBUG(#{what => gd_route_incoming, acc => Acc, |
86 |
341 |
gd_id => mod_global_distrib:get_metadata(Acc, id, "unknown")}), |
87 |
341 |
ejabberd_router:route(From, To, Acc, Packet). |