1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_utils). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-include("mongoose.hrl"). |
21 |
|
-include("jlib.hrl"). |
22 |
|
|
23 |
|
-behaviour(gen_mod). |
24 |
|
|
25 |
|
%% gen_mod callbacks |
26 |
|
-export([start/2, stop/1]). |
27 |
|
|
28 |
|
-export([ |
29 |
|
opt/2, host_type/0, create_ets/1, create_ets/2, |
30 |
|
cast_or_call/2, cast_or_call/3, cast_or_call/4, |
31 |
|
any_binary_to_atom/1, resolve_endpoints/1, |
32 |
|
binary_to_metric_atom/1, ensure_metric/2, recipient_to_worker_key/2, |
33 |
|
server_to_mgr_name/1, server_to_sup_name/1, maybe_update_mapping/2, |
34 |
|
parse_address/1 |
35 |
|
]). |
36 |
|
|
37 |
|
-export([getaddrs/2]). |
38 |
|
|
39 |
|
-ignore_xref([cast_or_call/3, cast_or_call/4]). |
40 |
|
|
41 |
|
-type domain_name() :: string(). |
42 |
|
-type endpoint() :: {inet:ip_address() | domain_name(), inet:port_number()}. |
43 |
|
|
44 |
|
-export_type([endpoint/0]). |
45 |
|
|
46 |
|
%%-------------------------------------------------------------------- |
47 |
|
%% API |
48 |
|
%%-------------------------------------------------------------------- |
49 |
|
|
50 |
|
-spec binary_to_metric_atom(binary()) -> atom(). |
51 |
|
binary_to_metric_atom(Binary) -> |
52 |
1449 |
List = lists:filtermap(fun |
53 |
664 |
($.) -> {true, $_}; |
54 |
:-( |
($ ) -> {true, $_}; |
55 |
11108 |
(C) when C > 31, C < 127 -> {true, C}; |
56 |
:-( |
(_) -> false |
57 |
|
end, |
58 |
|
unicode:characters_to_list(Binary)), |
59 |
1449 |
list_to_atom(List). |
60 |
|
|
61 |
|
ensure_metric(Metric, Type) -> |
62 |
7289 |
mongoose_metrics:ensure_subscribed_metric(global, Metric, Type). |
63 |
|
|
64 |
|
-spec any_binary_to_atom(binary()) -> atom(). |
65 |
|
any_binary_to_atom(Binary) -> |
66 |
372 |
binary_to_atom(base64:encode(Binary), latin1). |
67 |
|
|
68 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
69 |
|
start(HostType, #{global_host := GlobalHost, local_host := LocalHost}) -> |
70 |
16 |
check_host(LocalHost), |
71 |
15 |
check_host(GlobalHost), |
72 |
15 |
persistent_term:put({mod_global_distrib, host_type}, HostType). |
73 |
|
|
74 |
|
-spec stop(mongooseim:host_type()) -> any(). |
75 |
|
stop(_HostType) -> |
76 |
15 |
persistent_term:erase({mod_global_distrib, host_type}). |
77 |
|
|
78 |
|
-spec opt(module(), gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value(). |
79 |
|
opt(Module, Key) -> |
80 |
7114 |
gen_mod:get_module_opt(host_type(), Module, Key). |
81 |
|
|
82 |
|
host_type() -> |
83 |
7187 |
persistent_term:get({mod_global_distrib, host_type}). |
84 |
|
|
85 |
|
-spec cast_or_call(Target :: pid() | atom(), Message :: term()) -> any(). |
86 |
|
cast_or_call(Target, Message) -> |
87 |
678 |
cast_or_call(Target, Message, 500). |
88 |
|
|
89 |
|
-spec cast_or_call(Target :: pid() | atom(), Message :: term(), |
90 |
|
SyncWatermark :: non_neg_integer()) -> |
91 |
|
any(). |
92 |
|
cast_or_call(Target, Message, SyncWatermark) -> |
93 |
678 |
cast_or_call(Target, Message, SyncWatermark, 5000). |
94 |
|
|
95 |
|
-spec cast_or_call(Target :: pid() | atom(), Message :: term(), |
96 |
|
SyncWatermark :: non_neg_integer(), Timeout :: pos_integer() | infinity) -> |
97 |
|
any(). |
98 |
|
cast_or_call(Target, Message, SyncWatermark, Timeout) when is_atom(Target), Target =/= undefined -> |
99 |
339 |
cast_or_call(whereis(Target), Message, SyncWatermark, Timeout); |
100 |
|
cast_or_call(Target, Message, SyncWatermark, Timeout) when is_pid(Target) -> |
101 |
678 |
case process_info(Target, message_queue_len) of |
102 |
:-( |
{_, X} when X > SyncWatermark -> gen_server:call(Target, Message, Timeout); |
103 |
678 |
{_, _} -> gen_server:cast(Target, Message) |
104 |
|
end. |
105 |
|
|
106 |
|
-spec create_ets(Names :: [atom()] | atom()) -> any(). |
107 |
|
create_ets(Names) -> |
108 |
15 |
create_ets(Names, set). |
109 |
|
|
110 |
|
-spec create_ets(Names :: [atom()] | atom(), Type :: atom()) -> any(). |
111 |
|
create_ets(Names, Type) when is_list(Names) -> |
112 |
15 |
[create_ets(Name, Type) || Name <- Names]; |
113 |
|
create_ets(Name, Type) -> |
114 |
75 |
ejabberd_sup:create_ets_table(Name, [named_table, public, Type, {read_concurrency, true}]). |
115 |
|
|
116 |
|
-spec resolve_endpoints([{inet:ip_address() | string(), inet:port_number()}]) -> [endpoint()]. |
117 |
|
resolve_endpoints(Endpoints) when is_list(Endpoints) -> |
118 |
83 |
lists:flatmap(fun resolve_endpoint/1, Endpoints). |
119 |
|
|
120 |
|
resolve_endpoint({Addr, _Port} = E) when is_tuple(Addr) -> |
121 |
105 |
[E]; |
122 |
|
resolve_endpoint({Addr, Port}) -> |
123 |
:-( |
case to_ip_tuples(Addr) of |
124 |
|
{ok, IpAddrs} -> |
125 |
:-( |
Resolved = [{IpAddr, Port} || IpAddr <- IpAddrs], |
126 |
:-( |
?LOG_IF(info, is_domain(Addr), #{what => gd_resolve_endpoint, |
127 |
|
text => <<"GD resolved address to IPs">>, |
128 |
:-( |
address => Addr, ip_addresses => IpAddrs}), |
129 |
:-( |
Resolved; |
130 |
|
{error, {Reasonv6, Reasonv4}} -> |
131 |
:-( |
?LOG_ERROR(#{what => gd_resolve_endpoint_failed, |
132 |
|
text => <<"GD Cannot convert address to IP addresses">>, |
133 |
|
address => Addr, |
134 |
|
ipv6_reason => inet:format_error(Reasonv6), |
135 |
:-( |
ipv4_reason => inet:format_error(Reasonv4)}), |
136 |
:-( |
error({domain_not_resolved, {Reasonv6, Reasonv4}}) |
137 |
|
end. |
138 |
|
|
139 |
|
|
140 |
|
-spec recipient_to_worker_key(jid:jid() | jid:ljid(), jid:lserver()) -> binary(). |
141 |
|
recipient_to_worker_key(#jid{} = Jid, GlobalHost) -> |
142 |
372 |
recipient_to_worker_key(jid:to_lower(Jid), GlobalHost); |
143 |
|
recipient_to_worker_key({_, GlobalHost, _} = Jid, GlobalHost) -> |
144 |
341 |
jid:to_binary(Jid); |
145 |
|
recipient_to_worker_key({_, Domain, _}, _GlobalHost) -> |
146 |
31 |
Domain. |
147 |
|
|
148 |
|
-spec server_to_mgr_name(Server :: jid:lserver()) -> atom(). |
149 |
|
server_to_mgr_name(Server) -> |
150 |
837 |
gen_mod:get_module_proc(Server, mod_global_distrib_server_mgr). |
151 |
|
|
152 |
|
-spec server_to_sup_name(Server :: jid:lserver()) -> atom(). |
153 |
|
server_to_sup_name(Server) -> |
154 |
167 |
gen_mod:get_module_proc(Server, mod_global_distrib_server_sup). |
155 |
|
|
156 |
|
%% IMPORTANT! Regarding mod_global_distrib_mapping:insert_for_*/2: |
157 |
|
%% These functions with arity 2 will call cache update functions with |
158 |
|
%% dummy update functions, so they will result only in cache update with |
159 |
|
%% no backend side effects. |
160 |
|
-spec maybe_update_mapping(From :: jid:jid(), mongoose_acc:t()) -> any(). |
161 |
|
maybe_update_mapping(_From, #{name := <<"presence">>, type := <<"unavailable">>}) -> |
162 |
:-( |
ok; |
163 |
|
maybe_update_mapping(#jid{luser = <<>>, lserver = LServer} = From, Acc) -> |
164 |
5 |
case opt(mod_global_distrib, global_host) of |
165 |
:-( |
LServer -> ok; |
166 |
|
_ -> |
167 |
5 |
ensure_domain_inserted(Acc, From#jid.lserver) |
168 |
|
end; |
169 |
|
maybe_update_mapping(From, Acc) -> |
170 |
339 |
case mod_global_distrib_mapping:for_jid(From) of |
171 |
|
error -> |
172 |
9 |
case mod_global_distrib:find_metadata(Acc, origin) of |
173 |
|
%% Lack of 'global_distrib' indicates 100% local routing... |
174 |
|
{error, undefined} -> |
175 |
|
%% .. so we can insert From into cache with local host as mapping |
176 |
:-( |
mod_global_distrib_mapping:cache_jid(From, local_host()); |
177 |
|
{ok, Origin} -> |
178 |
9 |
mod_global_distrib_mapping:cache_jid(From, Origin) |
179 |
|
end; |
180 |
|
_ -> |
181 |
330 |
ok |
182 |
|
end. |
183 |
|
|
184 |
|
%%-------------------------------------------------------------------- |
185 |
|
%% Helpers |
186 |
|
%%-------------------------------------------------------------------- |
187 |
|
|
188 |
|
-spec ensure_domain_inserted(mongoose_acc:t(), jid:lserver()) -> ok. |
189 |
|
ensure_domain_inserted(Acc, Domain) -> |
190 |
5 |
case mod_global_distrib_mapping:for_domain(Domain) of |
191 |
|
error -> |
192 |
|
%% See the comments in the last match of maybe_update_mapping/2 function |
193 |
:-( |
case mod_global_distrib:find_metadata(Acc, origin) of |
194 |
|
{error, undefined} -> |
195 |
:-( |
mod_global_distrib_mapping:cache_domain(Domain, local_host()); |
196 |
|
{ok, Origin} -> |
197 |
:-( |
mod_global_distrib_mapping:cache_domain(Domain, Origin) |
198 |
|
end; |
199 |
|
_ -> |
200 |
5 |
ok |
201 |
|
end. |
202 |
|
|
203 |
|
%% @doc Check that the host is hosted by the server |
204 |
|
-spec check_host(jid:lserver()) -> ok. |
205 |
|
check_host(Domain) -> |
206 |
|
%% There is no clause for a dynamic domain as this module can't be started with dynamic domains |
207 |
31 |
case mongoose_domain_api:get_domain_host_type(Domain) of |
208 |
|
{error, not_found} -> |
209 |
1 |
error(#{what => check_host_failed, |
210 |
|
msg => <<"Domain is not hosted by the server">>, |
211 |
|
domain => Domain}); |
212 |
|
{ok, Domain} -> |
213 |
30 |
ok |
214 |
|
end. |
215 |
|
|
216 |
|
get_addrs_in_parallel(Addr) -> |
217 |
|
%% getaddrs could be pretty slow, so do in parallel |
218 |
|
%% Also, limit the time it could be running to 5 seconds |
219 |
|
%% (would fail with reason timeout if it takes too long) |
220 |
:-( |
F = fun(Ver) -> ?MODULE:getaddrs(Addr, Ver) end, |
221 |
:-( |
[V6, V4] = mongoose_lib:pmap(F, [inet6, inet]), |
222 |
:-( |
{simplify_result(V6), simplify_result(V4)}. |
223 |
|
|
224 |
:-( |
simplify_result({ok, Res}) -> Res; |
225 |
:-( |
simplify_result(Res) -> Res. |
226 |
|
|
227 |
|
-spec to_ip_tuples(Addr :: inet:ip_address() | string()) -> |
228 |
|
{ok, [inet:ip_address()]} | {error, {V6 :: atom(), V4 :: atom()}}. |
229 |
|
to_ip_tuples(Addr) -> |
230 |
:-( |
case get_addrs_in_parallel(Addr) of |
231 |
|
{{error, Reason6}, {error, Reason4}} -> |
232 |
:-( |
{error, {Reason6, Reason4}}; |
233 |
|
{Addrs, {error, Msg}} -> |
234 |
:-( |
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg, |
235 |
:-( |
text => <<"IPv4 address resolution error">>}), |
236 |
:-( |
Addrs; |
237 |
|
{{error, Msg}, Addrs} -> |
238 |
:-( |
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg, |
239 |
:-( |
text => <<"IPv6 address resolution error">>}), |
240 |
:-( |
Addrs; |
241 |
|
{{ok, Addrs6}, {ok, Addrs4}} -> |
242 |
:-( |
{ok, Addrs6 ++ Addrs4} |
243 |
|
end. |
244 |
|
|
245 |
|
-spec parse_address(binary() | string() | inet:ip_address()) -> |
246 |
|
{ip, inet:ip_address()} | {domain, domain_name()}. |
247 |
|
parse_address(DomainOrIp) when is_binary(DomainOrIp) -> |
248 |
105 |
parse_address(binary_to_list(DomainOrIp)); |
249 |
|
parse_address(Ip) when is_tuple(Ip) -> |
250 |
:-( |
{ip, Ip}; |
251 |
|
parse_address(DomainOrIp) -> |
252 |
105 |
case inet:parse_address(DomainOrIp) of |
253 |
|
{error, einval} -> |
254 |
:-( |
{domain, DomainOrIp}; |
255 |
|
{ok, Ip} -> |
256 |
105 |
{ip, Ip} |
257 |
|
end. |
258 |
|
|
259 |
|
is_domain(DomainOrIp) -> |
260 |
:-( |
case parse_address(DomainOrIp) of |
261 |
|
{domain, _} -> |
262 |
:-( |
true; |
263 |
|
_ -> |
264 |
:-( |
false |
265 |
|
end. |
266 |
|
|
267 |
|
local_host() -> |
268 |
:-( |
opt(mod_global_distrib, local_host). |
269 |
|
|
270 |
|
getaddrs(Addr, Type) -> |
271 |
:-( |
case inet:getaddrs(Addr, Type) of |
272 |
:-( |
{ok, Addrs} -> {ok, lists:usort(Addrs)}; |
273 |
:-( |
{error, Reason} -> {error, Reason} |
274 |
|
end. |