./ct_report/coverage/mod_global_distrib_sender.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mod_global_distrib_sender).
17 -author('konrad.zemek@erlang-solutions.com').
18
19 -behaviour(gen_mod).
20 -behaviour(mongoose_module_metrics).
21
22 -include("mongoose.hrl").
23 -include("jlib.hrl").
24 -include("global_distrib_metrics.hrl").
25
26 -export([start/2, stop/1, send/2, get_process_for/1]).
27
28 %%--------------------------------------------------------------------
29 %% API
30 %%--------------------------------------------------------------------
31
32
33 -spec send(jid:lserver() | pid(), {jid:jid(), jid:jid(), mongoose_acc:t(), xmlel:packet()}) -> ok.
34 send(Server, {_,_, Acc, _} = Packet) when is_binary(Server) ->
35
:-(
try get_process_for(Server) of
36 Worker ->
37
:-(
send(Worker, Packet)
38 catch Class:Reason:Stacktrace ->
39
:-(
?LOG_ERROR(#{what => gd_get_process_for_failed, server => Server, acc => Acc,
40
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
41
:-(
erlang:raise(Class, Reason, Stacktrace)
42 end;
43 send(Worker, {From, _To, _Acc, _Packet} = FPacket) ->
44
:-(
BinPacket = term_to_binary(FPacket),
45
:-(
BinFrom = mod_global_distrib_utils:recipient_to_worker_key(From, opt(global_host)),
46
:-(
Data = <<(byte_size(BinFrom)):16, BinFrom/binary, BinPacket/binary>>,
47
:-(
Stamp = erlang:monotonic_time(),
48
:-(
ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Stamp, Data}).
49
50 %%--------------------------------------------------------------------
51 %% gen_mod API
52 %%--------------------------------------------------------------------
53
54 -spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
55 start(Host, Opts0) ->
56
:-(
Opts = [{listen_port, 5555},
57 {connections_per_endpoint, 1},
58 {endpoint_refresh_interval_when_empty, 3},
59 {endpoint_refresh_interval, 60},
60 {disabled_gc_interval, 60} | Opts0],
61
:-(
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
62
63 -spec stop(Host :: jid:lserver()) -> any().
64 stop(Host) ->
65
:-(
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
66
67 %%--------------------------------------------------------------------
68 %% Helpers
69 %%--------------------------------------------------------------------
70
71 -spec start() -> any().
72 start() ->
73
:-(
opt(tls_opts). %% Check for required tls_opts
74
75 -spec stop() -> any().
76 stop() ->
77
:-(
ok.
78
79 -spec get_process_for(jid:lserver()) -> pid().
80 get_process_for(Server) ->
81
:-(
mod_global_distrib_outgoing_conns_sup:get_connection(Server).
82
83 -spec opt(Key :: atom()) -> term().
84 opt(Key) ->
85
:-(
mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source