1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mod_global_distrib_sender). |
17 |
|
-author('konrad.zemek@erlang-solutions.com'). |
18 |
|
|
19 |
|
-behaviour(gen_mod). |
20 |
|
-behaviour(mongoose_module_metrics). |
21 |
|
|
22 |
|
-include("mongoose.hrl"). |
23 |
|
-include("jlib.hrl"). |
24 |
|
-include("global_distrib_metrics.hrl"). |
25 |
|
|
26 |
|
-export([start/2, stop/1, send/2, get_process_for/1]). |
27 |
|
|
28 |
|
%%-------------------------------------------------------------------- |
29 |
|
%% API |
30 |
|
%%-------------------------------------------------------------------- |
31 |
|
|
32 |
|
|
33 |
|
-spec send(jid:lserver() | pid(), {jid:jid(), jid:jid(), mongoose_acc:t(), xmlel:packet()}) -> ok. |
34 |
|
send(Server, {_,_, Acc, _} = Packet) when is_binary(Server) -> |
35 |
:-( |
try get_process_for(Server) of |
36 |
|
Worker -> |
37 |
:-( |
send(Worker, Packet) |
38 |
|
catch Class:Reason:Stacktrace -> |
39 |
:-( |
?LOG_ERROR(#{what => gd_get_process_for_failed, server => Server, acc => Acc, |
40 |
:-( |
class => Class, reason => Reason, stacktrace => Stacktrace}), |
41 |
:-( |
erlang:raise(Class, Reason, Stacktrace) |
42 |
|
end; |
43 |
|
send(Worker, {From, _To, _Acc, _Packet} = FPacket) -> |
44 |
336 |
BinPacket = term_to_binary(FPacket), |
45 |
336 |
BinFrom = mod_global_distrib_utils:recipient_to_worker_key(From, opt(global_host)), |
46 |
336 |
Data = <<(byte_size(BinFrom)):16, BinFrom/binary, BinPacket/binary>>, |
47 |
336 |
Stamp = erlang:monotonic_time(), |
48 |
336 |
ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Stamp, Data}). |
49 |
|
|
50 |
|
%%-------------------------------------------------------------------- |
51 |
|
%% gen_mod API |
52 |
|
%%-------------------------------------------------------------------- |
53 |
|
|
54 |
|
-spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any(). |
55 |
|
start(Host, Opts0) -> |
56 |
21 |
Opts = [{listen_port, 5555}, |
57 |
|
{connections_per_endpoint, 1}, |
58 |
|
{endpoint_refresh_interval_when_empty, 3}, |
59 |
|
{endpoint_refresh_interval, 60}, |
60 |
|
{disabled_gc_interval, 60} | Opts0], |
61 |
21 |
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0). |
62 |
|
|
63 |
|
-spec stop(Host :: jid:lserver()) -> any(). |
64 |
|
stop(Host) -> |
65 |
21 |
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0). |
66 |
|
|
67 |
|
%%-------------------------------------------------------------------- |
68 |
|
%% Helpers |
69 |
|
%%-------------------------------------------------------------------- |
70 |
|
|
71 |
|
-spec start() -> any(). |
72 |
|
start() -> |
73 |
21 |
opt(tls_opts). %% Check for required tls_opts |
74 |
|
|
75 |
|
-spec stop() -> any(). |
76 |
|
stop() -> |
77 |
21 |
ok. |
78 |
|
|
79 |
|
-spec get_process_for(jid:lserver()) -> pid(). |
80 |
|
get_process_for(Server) -> |
81 |
28 |
mod_global_distrib_outgoing_conns_sup:get_connection(Server). |
82 |
|
|
83 |
|
-spec opt(Key :: atom()) -> term(). |
84 |
|
opt(Key) -> |
85 |
357 |
mod_global_distrib_utils:opt(?MODULE, Key). |