./ct_report/coverage/mod_global_distrib_worker.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 %% Routing worker
18
19 -module(mod_global_distrib_worker).
20 -author('konrad.zemek@erlang-solutions.com').
21
22 -behaviour(gen_server).
23
24 -include("mongoose.hrl").
25 -include("jlib.hrl").
26 -include("global_distrib_metrics.hrl").
27
28 -define(TIMEOUT, 60000). % TODO
29
30 -export([start_link/1]).
31 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
32
33 -ignore_xref([start_link/1]).
34
35 %%--------------------------------------------------------------------
36 %% API
37 %%--------------------------------------------------------------------
38
39 -spec start_link(Name :: atom()) -> {ok, pid()} | {error, any()}.
40 start_link(Name) ->
41 32 gen_server:start_link({local, Name}, ?MODULE, [], []).
42
43 %%--------------------------------------------------------------------
44 %% gen_server API
45 %%--------------------------------------------------------------------
46
47 init(_) ->
48 32 {ok, state, ?TIMEOUT}.
49
50 handle_call({data, Host, TransferTime, Stamp, Data}, From, State) ->
51
:-(
gen_server:reply(From, ok),
52
:-(
handle_cast({data, Host, TransferTime, Stamp, Data}, State).
53
54 handle_cast({route, {From, To, Acc, Packet}}, State) ->
55 33 ejabberd_router:route(From, To, Acc, Packet),
56 33 {noreply, State, ?TIMEOUT};
57 handle_cast({data, Host, TransferTime, Stamp, Data}, State) ->
58 340 QueueTimeNative = erlang:monotonic_time() - Stamp,
59 340 QueueTimeUS = erlang:convert_time_unit(QueueTimeNative, native, microsecond),
60 340 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_RECV_QUEUE_TIME, QueueTimeUS),
61 340 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_TRANSFER_TIME(Host), TransferTime),
62 340 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MESSAGES_RECEIVED(Host), 1),
63 340 do_work(Data),
64 340 {noreply, State, ?TIMEOUT}.
65
66 handle_info(timeout, State) ->
67
:-(
{stop, {shutdown, spinning_down_idle_worker}, State};
68 handle_info(_, State) ->
69
:-(
{noreply, State, ?TIMEOUT}.
70
71 code_change(_FromVersion, State, _Extra) ->
72
:-(
{ok, State}.
73
74 terminate(_Reason, _State) ->
75
:-(
ok.
76
77 %%--------------------------------------------------------------------
78 %% Helpers
79 %%--------------------------------------------------------------------
80
81 -spec do_work(Data :: binary()) -> any().
82 do_work(Data) ->
83 340 {From, To, Acc, Packet} = erlang:binary_to_term(Data),
84 340 mod_global_distrib_utils:maybe_update_mapping(From, Acc),
85 340 ?LOG_DEBUG(#{what => gd_route_incoming, acc => Acc,
86 340 gd_id => mod_global_distrib:get_metadata(Acc, id, "unknown")}),
87 340 ejabberd_router:route(From, To, Acc, Packet).
Line Hits Source