1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_utils). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-include("mongoose.hrl"). |
21 |
|
-include("jlib.hrl"). |
22 |
|
|
23 |
|
-export([ |
24 |
|
start/4, deps/4, stop/3, opt/2, cast_or_call/2, cast_or_call/3, cast_or_call/4, |
25 |
|
create_ets/1, create_ets/2, any_binary_to_atom/1, resolve_endpoints/1, |
26 |
|
binary_to_metric_atom/1, ensure_metric/2, recipient_to_worker_key/2, |
27 |
|
server_to_mgr_name/1, server_to_sup_name/1, maybe_update_mapping/2, |
28 |
|
parse_address/1 |
29 |
|
]). |
30 |
|
|
31 |
|
-export([getaddrs/2]). |
32 |
|
|
33 |
|
-ignore_xref([cast_or_call/3, cast_or_call/4]). |
34 |
|
|
35 |
|
-type domain_name() :: string(). |
36 |
|
-type endpoint() :: {inet:ip_address() | domain_name(), inet:port_number()}. |
37 |
|
|
38 |
|
-export_type([endpoint/0]). |
39 |
|
|
40 |
|
%%-------------------------------------------------------------------- |
41 |
|
%% API |
42 |
|
%%-------------------------------------------------------------------- |
43 |
|
|
44 |
|
-spec binary_to_metric_atom(binary()) -> atom(). |
45 |
|
binary_to_metric_atom(Binary) -> |
46 |
3070 |
List = lists:filtermap(fun |
47 |
1456 |
($.) -> {true, $_}; |
48 |
:-( |
($ ) -> {true, $_}; |
49 |
23928 |
(C) when C > 31, C < 127 -> {true, C}; |
50 |
:-( |
(_) -> false |
51 |
|
end, |
52 |
|
unicode:characters_to_list(Binary)), |
53 |
3070 |
list_to_atom(List). |
54 |
|
|
55 |
|
ensure_metric(Metric, Type) -> |
56 |
15394 |
mongoose_metrics:ensure_subscribed_metric(global, Metric, Type). |
57 |
|
|
58 |
|
-spec any_binary_to_atom(binary()) -> atom(). |
59 |
|
any_binary_to_atom(Binary) -> |
60 |
357 |
binary_to_atom(base64:encode(Binary), latin1). |
61 |
|
|
62 |
|
-spec start(module(), Host :: jid:lserver(), Opts :: proplists:proplist(), |
63 |
|
StartFun :: fun(() -> any())) -> any(). |
64 |
|
start(Module, Host, Opts, StartFun) -> |
65 |
113 |
check_host(global_host, Opts), |
66 |
113 |
check_host(local_host, Opts), |
67 |
|
|
68 |
112 |
{global_host, GlobalHostList} = lists:keyfind(global_host, 1, Opts), |
69 |
112 |
case unicode:characters_to_binary(GlobalHostList) of |
70 |
|
Host -> |
71 |
112 |
create_ets(Module), |
72 |
112 |
populate_opts_ets(Module, Opts), |
73 |
112 |
StartFun(); |
74 |
|
_ -> |
75 |
:-( |
ok |
76 |
|
end. |
77 |
|
|
78 |
|
-spec stop(module(), Host :: jid:lserver(), StopFun :: fun(() -> any())) -> |
79 |
|
any(). |
80 |
|
stop(Module, Host, StopFun) -> |
81 |
112 |
case catch opt(Module, global_host) of |
82 |
|
Host -> |
83 |
112 |
StopFun(), |
84 |
112 |
ets:delete(Module); |
85 |
|
_ -> |
86 |
:-( |
ok |
87 |
|
end. |
88 |
|
|
89 |
|
-spec deps(module(), Host :: jid:lserver(), Opts :: proplists:proplist(), |
90 |
|
DepsFun :: fun((proplists:proplist()) -> gen_mod:deps_list())) -> |
91 |
|
gen_mod:deps_list(). |
92 |
|
deps(_Module, Host, Opts, DepsFun) -> |
93 |
232 |
{global_host, GlobalHostList} = lists:keyfind(global_host, 1, Opts), |
94 |
232 |
case unicode:characters_to_binary(GlobalHostList) of |
95 |
232 |
Host -> DepsFun(Opts); |
96 |
:-( |
_ -> [] |
97 |
|
end. |
98 |
|
|
99 |
|
-spec opt(module(), Key :: atom()) -> Value :: term(). |
100 |
|
opt(Module, Key) -> |
101 |
8286 |
try ets:lookup_element(Module, Key, 2) |
102 |
|
catch |
103 |
|
error:badarg -> |
104 |
:-( |
error(atom_to_list(Module) ++ " required option unset: " ++ atom_to_list(Key)) |
105 |
|
end. |
106 |
|
|
107 |
|
-spec cast_or_call(Target :: pid() | atom(), Message :: term()) -> any(). |
108 |
|
cast_or_call(Target, Message) -> |
109 |
668 |
cast_or_call(Target, Message, 500). |
110 |
|
|
111 |
|
-spec cast_or_call(Target :: pid() | atom(), Message :: term(), |
112 |
|
SyncWatermark :: non_neg_integer()) -> |
113 |
|
any(). |
114 |
|
cast_or_call(Target, Message, SyncWatermark) -> |
115 |
668 |
cast_or_call(Target, Message, SyncWatermark, 5000). |
116 |
|
|
117 |
|
-spec cast_or_call(Target :: pid() | atom(), Message :: term(), |
118 |
|
SyncWatermark :: non_neg_integer(), Timeout :: pos_integer() | infinity) -> |
119 |
|
any(). |
120 |
|
cast_or_call(Target, Message, SyncWatermark, Timeout) when is_atom(Target), Target =/= undefined -> |
121 |
334 |
cast_or_call(whereis(Target), Message, SyncWatermark, Timeout); |
122 |
|
cast_or_call(Target, Message, SyncWatermark, Timeout) when is_pid(Target) -> |
123 |
668 |
case process_info(Target, message_queue_len) of |
124 |
:-( |
{_, X} when X > SyncWatermark -> gen_server:call(Target, Message, Timeout); |
125 |
668 |
{_, _} -> gen_server:cast(Target, Message) |
126 |
|
end. |
127 |
|
|
128 |
|
-spec create_ets(Names :: [atom()] | atom()) -> any(). |
129 |
|
create_ets(Names) -> |
130 |
127 |
create_ets(Names, set). |
131 |
|
|
132 |
|
-spec create_ets(Names :: [atom()] | atom(), Type :: atom()) -> any(). |
133 |
|
create_ets(Names, Type) when is_list(Names) -> |
134 |
15 |
[create_ets(Name, Type) || Name <- Names]; |
135 |
|
create_ets(Name, Type) -> |
136 |
202 |
Self = self(), |
137 |
202 |
Heir = case whereis(ejabberd_sup) of |
138 |
:-( |
undefined -> {heir, none}; |
139 |
:-( |
Self -> {heir, none}; |
140 |
202 |
Pid -> {heir, Pid, testing} |
141 |
|
end, |
142 |
|
|
143 |
202 |
ets:new(Name, [named_table, public, Type, {read_concurrency, true}, Heir]). |
144 |
|
|
145 |
|
-spec resolve_endpoints([{inet:ip_address() | string(), inet:port_number()}]) -> |
146 |
|
[endpoint()]. |
147 |
|
resolve_endpoints(Endpoints) when is_list(Endpoints) -> |
148 |
94 |
lists:flatmap(fun resolve_endpoint/1, Endpoints). |
149 |
|
|
150 |
|
resolve_endpoint({Addr, _Port} = E) when is_tuple(Addr) -> |
151 |
114 |
[E]; |
152 |
|
resolve_endpoint({Addr, Port}) -> |
153 |
:-( |
case to_ip_tuples(Addr) of |
154 |
|
{ok, IpAddrs} -> |
155 |
:-( |
Resolved = [{IpAddr, Port} || IpAddr <- IpAddrs], |
156 |
:-( |
?LOG_IF(info, is_domain(Addr), #{what => gd_resolve_endpoint, |
157 |
|
text => <<"GD resolved address to IPs">>, |
158 |
:-( |
address => Addr, ip_addresses => IpAddrs}), |
159 |
:-( |
Resolved; |
160 |
|
{error, {Reasonv6, Reasonv4}} -> |
161 |
:-( |
?LOG_ERROR(#{what => gd_resolve_endpoint_failed, |
162 |
|
text => <<"GD Cannot convert address to IP addresses">>, |
163 |
|
address => Addr, |
164 |
|
ipv6_reason => inet:format_error(Reasonv6), |
165 |
:-( |
ipv4_reason => inet:format_error(Reasonv4)}), |
166 |
:-( |
error({domain_not_resolved, {Reasonv6, Reasonv4}}) |
167 |
|
end. |
168 |
|
|
169 |
|
|
170 |
|
-spec recipient_to_worker_key(jid:jid() | jid:ljid(), jid:lserver()) -> binary(). |
171 |
|
recipient_to_worker_key(#jid{} = Jid, GlobalHost) -> |
172 |
357 |
recipient_to_worker_key(jid:to_lower(Jid), GlobalHost); |
173 |
|
recipient_to_worker_key({_, GlobalHost, _} = Jid, GlobalHost) -> |
174 |
341 |
jid:to_binary(Jid); |
175 |
|
recipient_to_worker_key({_, Domain, _}, _GlobalHost) -> |
176 |
16 |
Domain. |
177 |
|
|
178 |
|
-spec server_to_mgr_name(Server :: jid:lserver()) -> atom(). |
179 |
|
server_to_mgr_name(Server) -> |
180 |
1246 |
gen_mod:get_module_proc(Server, mod_global_distrib_server_mgr). |
181 |
|
|
182 |
|
-spec server_to_sup_name(Server :: jid:lserver()) -> atom(). |
183 |
|
server_to_sup_name(Server) -> |
184 |
145 |
gen_mod:get_module_proc(Server, mod_global_distrib_server_sup). |
185 |
|
|
186 |
|
%% IMPORTANT! Regarding mod_global_distrib_mapping:insert_for_*/2: |
187 |
|
%% These functions with arity 2 will call cache update functions with |
188 |
|
%% dummy update functions, so they will result only in cache update with |
189 |
|
%% no backend side effects. |
190 |
|
-spec maybe_update_mapping(From :: jid:jid(), mongoose_acc:t()) -> any(). |
191 |
|
maybe_update_mapping(_From, #{name := <<"presence">>, type := <<"unavailable">>}) -> |
192 |
:-( |
ok; |
193 |
|
maybe_update_mapping(#jid{luser = <<>>, lserver = LServer} = From, Acc) -> |
194 |
5 |
case opt(mod_global_distrib, global_host) of |
195 |
:-( |
LServer -> ok; |
196 |
|
_ -> |
197 |
5 |
ensure_domain_inserted(Acc, From#jid.lserver) |
198 |
|
end; |
199 |
|
maybe_update_mapping(From, Acc) -> |
200 |
334 |
case mod_global_distrib_mapping:for_jid(From) of |
201 |
|
error -> |
202 |
9 |
case mod_global_distrib:find_metadata(Acc, origin) of |
203 |
|
%% Lack of 'global_distrib' indicates 100% local routing... |
204 |
|
{error, undefined} -> |
205 |
|
%% .. so we can insert From into cache with local host as mapping |
206 |
:-( |
mod_global_distrib_mapping:cache_jid(From, local_host()); |
207 |
|
{ok, Origin} -> |
208 |
9 |
mod_global_distrib_mapping:cache_jid(From, Origin) |
209 |
|
end; |
210 |
|
_ -> |
211 |
325 |
ok |
212 |
|
end. |
213 |
|
|
214 |
|
%%-------------------------------------------------------------------- |
215 |
|
%% Helpers |
216 |
|
%%-------------------------------------------------------------------- |
217 |
|
|
218 |
|
-spec ensure_domain_inserted(mongoose_acc:t(), jid:lserver()) -> ok. |
219 |
|
ensure_domain_inserted(Acc, Domain) -> |
220 |
5 |
case mod_global_distrib_mapping:for_domain(Domain) of |
221 |
|
error -> |
222 |
|
%% See the comments in the last match of maybe_update_mapping/2 function |
223 |
:-( |
case mod_global_distrib:find_metadata(Acc, origin) of |
224 |
|
{error, undefined} -> |
225 |
:-( |
mod_global_distrib_mapping:cache_domain(Domain, local_host()); |
226 |
|
{ok, Origin} -> |
227 |
:-( |
mod_global_distrib_mapping:cache_domain(Domain, Origin) |
228 |
|
end; |
229 |
|
_ -> |
230 |
5 |
ok |
231 |
|
end. |
232 |
|
|
233 |
|
-spec check_host(local_host | global_host, Opts :: proplists:proplist()) -> true. |
234 |
|
check_host(Key, Opts) -> |
235 |
226 |
{Key, HostList} = lists:keyfind(Key, 1, Opts), |
236 |
226 |
Host = unicode:characters_to_binary(HostList), |
237 |
226 |
lists:member(Host, ?MYHOSTS) orelse error(HostList ++ " is not a member of the host list"). |
238 |
|
|
239 |
|
-spec populate_opts_ets(module(), Opts :: proplists:proplist()) -> any(). |
240 |
|
populate_opts_ets(Module, Opts) -> |
241 |
112 |
[ets:insert(Module, {Key, translate_opt(Value)}) || {Key, Value} <- Opts]. |
242 |
|
|
243 |
|
-spec translate_opt(term()) -> term(). |
244 |
|
translate_opt([Elem | _] = Opt) when is_list(Elem) -> |
245 |
:-( |
[translate_opt(E) || E <- Opt]; |
246 |
|
translate_opt(Opt) when is_list(Opt) -> |
247 |
597 |
case catch unicode:characters_to_binary(Opt) of |
248 |
224 |
Bin when is_binary(Bin) -> Bin; |
249 |
373 |
_ -> Opt |
250 |
|
end; |
251 |
|
translate_opt(Opt) -> |
252 |
605 |
Opt. |
253 |
|
|
254 |
|
-spec to_ip_tuples(Addr :: inet:ip_address() | string()) -> |
255 |
|
{ok, [inet:ip_address()]} | {error, {V6 :: atom(), V4 :: atom()}}. |
256 |
|
to_ip_tuples(Addr) -> |
257 |
:-( |
case {?MODULE:getaddrs(Addr, inet6), ?MODULE:getaddrs(Addr, inet)} of |
258 |
|
{{error, Reason6}, {error, Reason4}} -> |
259 |
:-( |
{error, {Reason6, Reason4}}; |
260 |
|
{Addrs, {error, Msg}} -> |
261 |
:-( |
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg, |
262 |
:-( |
text => <<"IPv4 address resolution error">>}), |
263 |
:-( |
Addrs; |
264 |
|
{{error, Msg}, Addrs} -> |
265 |
:-( |
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg, |
266 |
:-( |
text => <<"IPv6 address resolution error">>}), |
267 |
:-( |
Addrs; |
268 |
|
{{ok, Addrs6}, {ok, Addrs4}} -> |
269 |
:-( |
{ok, Addrs6 ++ Addrs4} |
270 |
|
end. |
271 |
|
|
272 |
|
-spec parse_address(binary() | string() | inet:ip_address()) -> |
273 |
|
{ip, inet:ip_address()} | {domain, domain_name()}. |
274 |
|
parse_address(DomainOrIp) when is_binary(DomainOrIp) -> |
275 |
90 |
parse_address(binary_to_list(DomainOrIp)); |
276 |
|
parse_address(Ip) when is_tuple(Ip) -> |
277 |
:-( |
{ip, Ip}; |
278 |
|
parse_address(DomainOrIp) -> |
279 |
90 |
case inet:parse_address(DomainOrIp) of |
280 |
|
{error, einval} -> |
281 |
:-( |
{domain, DomainOrIp}; |
282 |
|
{ok, Ip} -> |
283 |
90 |
{ip, Ip} |
284 |
|
end. |
285 |
|
|
286 |
|
is_domain(DomainOrIp) -> |
287 |
:-( |
case parse_address(DomainOrIp) of |
288 |
|
{domain, _} -> |
289 |
:-( |
true; |
290 |
|
_ -> |
291 |
:-( |
false |
292 |
|
end. |
293 |
|
|
294 |
|
local_host() -> |
295 |
:-( |
opt(mod_global_distrib, local_host). |
296 |
|
|
297 |
|
%% For mocking in tests |
298 |
|
getaddrs(Addr, Type) -> |
299 |
:-( |
inet:getaddrs(Addr, Type). |