1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mod_global_distrib_sender). |
17 |
|
-author('konrad.zemek@erlang-solutions.com'). |
18 |
|
|
19 |
|
-behaviour(mongoose_module_metrics). |
20 |
|
|
21 |
|
-include("mongoose.hrl"). |
22 |
|
|
23 |
|
-export([send/2, get_process_for/1]). |
24 |
|
|
25 |
|
%%-------------------------------------------------------------------- |
26 |
|
%% API |
27 |
|
%%-------------------------------------------------------------------- |
28 |
|
|
29 |
|
-spec send(jid:lserver() | pid(), {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> ok. |
30 |
|
send(Server, {_,_, Acc, _} = Packet) when is_binary(Server) -> |
31 |
:-( |
try get_process_for(Server) of |
32 |
|
Worker -> |
33 |
:-( |
send(Worker, Packet) |
34 |
|
catch Class:Reason:Stacktrace -> |
35 |
:-( |
?LOG_ERROR(#{what => gd_get_process_for_failed, server => Server, acc => Acc, |
36 |
:-( |
class => Class, reason => Reason, stacktrace => Stacktrace}), |
37 |
:-( |
erlang:raise(Class, Reason, Stacktrace) |
38 |
|
end; |
39 |
|
send(Worker, {From, _To, _Acc, _Packet} = FPacket) -> |
40 |
342 |
BinPacket = term_to_binary(FPacket), |
41 |
342 |
BinFrom = mod_global_distrib_utils:recipient_to_worker_key(From, opt(global_host)), |
42 |
342 |
Data = <<(byte_size(BinFrom)):16, BinFrom/binary, BinPacket/binary>>, |
43 |
342 |
Stamp = erlang:monotonic_time(), |
44 |
342 |
ok = mod_global_distrib_utils:cast_or_call(Worker, {data, Stamp, Data}). |
45 |
|
|
46 |
|
%%-------------------------------------------------------------------- |
47 |
|
%% Helpers |
48 |
|
%%-------------------------------------------------------------------- |
49 |
|
|
50 |
|
-spec get_process_for(jid:lserver()) -> pid(). |
51 |
|
get_process_for(Server) -> |
52 |
28 |
mod_global_distrib_outgoing_conns_sup:get_connection(Server). |
53 |
|
|
54 |
|
-spec opt(Key :: atom()) -> term(). |
55 |
|
opt(Key) -> |
56 |
342 |
mod_global_distrib_utils:opt(mod_global_distrib, Key). |