./ct_report/coverage/mod_global_distrib_utils.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_utils).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -include("mongoose.hrl").
21 -include("jlib.hrl").
22
23 -export([
24 start/4, deps/4, stop/3, opt/2, cast_or_call/2, cast_or_call/3, cast_or_call/4,
25 create_ets/1, create_ets/2, any_binary_to_atom/1, resolve_endpoints/1,
26 binary_to_metric_atom/1, ensure_metric/2, recipient_to_worker_key/2,
27 server_to_mgr_name/1, server_to_sup_name/1, maybe_update_mapping/2,
28 parse_address/1
29 ]).
30
31 -export([getaddrs/2]).
32
33 -ignore_xref([cast_or_call/3, cast_or_call/4]).
34
35 -type domain_name() :: string().
36 -type endpoint() :: {inet:ip_address() | domain_name(), inet:port_number()}.
37
38 -export_type([endpoint/0]).
39
40 %%--------------------------------------------------------------------
41 %% API
42 %%--------------------------------------------------------------------
43
44 -spec binary_to_metric_atom(binary()) -> atom().
45 binary_to_metric_atom(Binary) ->
46 3101 List = lists:filtermap(fun
47 1466 ($.) -> {true, $_};
48
:-(
($ ) -> {true, $_};
49 24132 (C) when C > 31, C < 127 -> {true, C};
50
:-(
(_) -> false
51 end,
52 unicode:characters_to_list(Binary)),
53 3101 list_to_atom(List).
54
55 ensure_metric(Metric, Type) ->
56 15549 mongoose_metrics:ensure_subscribed_metric(global, Metric, Type).
57
58 -spec any_binary_to_atom(binary()) -> atom().
59 any_binary_to_atom(Binary) ->
60 355 binary_to_atom(base64:encode(Binary), latin1).
61
62 -spec start(module(), Host :: jid:lserver(), Opts :: proplists:proplist(),
63 StartFun :: fun(() -> any())) -> any().
64 start(Module, Host, Opts, StartFun) ->
65 113 check_host(global_host, Opts),
66 113 check_host(local_host, Opts),
67
68 112 {global_host, GlobalHostList} = lists:keyfind(global_host, 1, Opts),
69 112 case unicode:characters_to_binary(GlobalHostList) of
70 Host ->
71 112 create_ets(Module),
72 112 populate_opts_ets(Module, Opts),
73 112 StartFun();
74 _ ->
75
:-(
ok
76 end.
77
78 -spec stop(module(), Host :: jid:lserver(), StopFun :: fun(() -> any())) ->
79 any().
80 stop(Module, Host, StopFun) ->
81 112 case catch opt(Module, global_host) of
82 Host ->
83 112 StopFun(),
84 112 ets:delete(Module);
85 _ ->
86
:-(
ok
87 end.
88
89 -spec deps(module(), Host :: jid:lserver(), Opts :: proplists:proplist(),
90 DepsFun :: fun((proplists:proplist()) -> gen_mod:deps_list())) ->
91 gen_mod:deps_list().
92 deps(_Module, Host, Opts, DepsFun) ->
93 232 {global_host, GlobalHostList} = lists:keyfind(global_host, 1, Opts),
94 232 case unicode:characters_to_binary(GlobalHostList) of
95 232 Host -> DepsFun(Opts);
96
:-(
_ -> []
97 end.
98
99 -spec opt(module(), Key :: atom()) -> Value :: term().
100 opt(Module, Key) ->
101 8358 try ets:lookup_element(Module, Key, 2)
102 catch
103 error:badarg ->
104
:-(
error(atom_to_list(Module) ++ " required option unset: " ++ atom_to_list(Key))
105 end.
106
107 -spec cast_or_call(Target :: pid() | atom(), Message :: term()) -> any().
108 cast_or_call(Target, Message) ->
109 668 cast_or_call(Target, Message, 500).
110
111 -spec cast_or_call(Target :: pid() | atom(), Message :: term(),
112 SyncWatermark :: non_neg_integer()) ->
113 any().
114 cast_or_call(Target, Message, SyncWatermark) ->
115 668 cast_or_call(Target, Message, SyncWatermark, 5000).
116
117 -spec cast_or_call(Target :: pid() | atom(), Message :: term(),
118 SyncWatermark :: non_neg_integer(), Timeout :: pos_integer() | infinity) ->
119 any().
120 cast_or_call(Target, Message, SyncWatermark, Timeout) when is_atom(Target), Target =/= undefined ->
121 334 cast_or_call(whereis(Target), Message, SyncWatermark, Timeout);
122 cast_or_call(Target, Message, SyncWatermark, Timeout) when is_pid(Target) ->
123 668 case process_info(Target, message_queue_len) of
124
:-(
{_, X} when X > SyncWatermark -> gen_server:call(Target, Message, Timeout);
125 668 {_, _} -> gen_server:cast(Target, Message)
126 end.
127
128 -spec create_ets(Names :: [atom()] | atom()) -> any().
129 create_ets(Names) ->
130 127 create_ets(Names, set).
131
132 -spec create_ets(Names :: [atom()] | atom(), Type :: atom()) -> any().
133 create_ets(Names, Type) when is_list(Names) ->
134 15 [create_ets(Name, Type) || Name <- Names];
135 create_ets(Name, Type) ->
136 202 Self = self(),
137 202 Heir = case whereis(ejabberd_sup) of
138
:-(
undefined -> {heir, none};
139
:-(
Self -> {heir, none};
140 202 Pid -> {heir, Pid, testing}
141 end,
142
143 202 ets:new(Name, [named_table, public, Type, {read_concurrency, true}, Heir]).
144
145 -spec resolve_endpoints([{inet:ip_address() | string(), inet:port_number()}]) ->
146 [endpoint()].
147 resolve_endpoints(Endpoints) when is_list(Endpoints) ->
148 97 lists:flatmap(fun resolve_endpoint/1, Endpoints).
149
150 resolve_endpoint({Addr, _Port} = E) when is_tuple(Addr) ->
151 118 [E];
152 resolve_endpoint({Addr, Port}) ->
153
:-(
case to_ip_tuples(Addr) of
154 {ok, IpAddrs} ->
155
:-(
Resolved = [{IpAddr, Port} || IpAddr <- IpAddrs],
156
:-(
?LOG_IF(info, is_domain(Addr), #{what => gd_resolve_endpoint,
157 text => <<"GD resolved address to IPs">>,
158
:-(
address => Addr, ip_addresses => IpAddrs}),
159
:-(
Resolved;
160 {error, {Reasonv6, Reasonv4}} ->
161
:-(
?LOG_ERROR(#{what => gd_resolve_endpoint_failed,
162 text => <<"GD Cannot convert address to IP addresses">>,
163 address => Addr,
164 ipv6_reason => inet:format_error(Reasonv6),
165
:-(
ipv4_reason => inet:format_error(Reasonv4)}),
166
:-(
error({domain_not_resolved, {Reasonv6, Reasonv4}})
167 end.
168
169
170 -spec recipient_to_worker_key(jid:jid() | jid:ljid(), jid:lserver()) -> binary().
171 recipient_to_worker_key(#jid{} = Jid, GlobalHost) ->
172 355 recipient_to_worker_key(jid:to_lower(Jid), GlobalHost);
173 recipient_to_worker_key({_, GlobalHost, _} = Jid, GlobalHost) ->
174 341 jid:to_binary(Jid);
175 recipient_to_worker_key({_, Domain, _}, _GlobalHost) ->
176 14 Domain.
177
178 -spec server_to_mgr_name(Server :: jid:lserver()) -> atom().
179 server_to_mgr_name(Server) ->
180 1257 gen_mod:get_module_proc(Server, mod_global_distrib_server_mgr).
181
182 -spec server_to_sup_name(Server :: jid:lserver()) -> atom().
183 server_to_sup_name(Server) ->
184 151 gen_mod:get_module_proc(Server, mod_global_distrib_server_sup).
185
186 %% IMPORTANT! Regarding mod_global_distrib_mapping:insert_for_*/2:
187 %% These functions with arity 2 will call cache update functions with
188 %% dummy update functions, so they will result only in cache update with
189 %% no backend side effects.
190 -spec maybe_update_mapping(From :: jid:jid(), mongoose_acc:t()) -> any().
191 maybe_update_mapping(_From, #{name := <<"presence">>, type := <<"unavailable">>}) ->
192
:-(
ok;
193 maybe_update_mapping(#jid{luser = <<>>, lserver = LServer} = From, Acc) ->
194 5 case opt(mod_global_distrib, global_host) of
195
:-(
LServer -> ok;
196 _ ->
197 5 ensure_domain_inserted(Acc, From#jid.lserver)
198 end;
199 maybe_update_mapping(From, Acc) ->
200 334 case mod_global_distrib_mapping:for_jid(From) of
201 error ->
202 9 case mod_global_distrib:find_metadata(Acc, origin) of
203 %% Lack of 'global_distrib' indicates 100% local routing...
204 {error, undefined} ->
205 %% .. so we can insert From into cache with local host as mapping
206
:-(
mod_global_distrib_mapping:cache_jid(From, local_host());
207 {ok, Origin} ->
208 9 mod_global_distrib_mapping:cache_jid(From, Origin)
209 end;
210 _ ->
211 325 ok
212 end.
213
214 %%--------------------------------------------------------------------
215 %% Helpers
216 %%--------------------------------------------------------------------
217
218 -spec ensure_domain_inserted(mongoose_acc:t(), jid:lserver()) -> ok.
219 ensure_domain_inserted(Acc, Domain) ->
220 5 case mod_global_distrib_mapping:for_domain(Domain) of
221 error ->
222 %% See the comments in the last match of maybe_update_mapping/2 function
223
:-(
case mod_global_distrib:find_metadata(Acc, origin) of
224 {error, undefined} ->
225
:-(
mod_global_distrib_mapping:cache_domain(Domain, local_host());
226 {ok, Origin} ->
227
:-(
mod_global_distrib_mapping:cache_domain(Domain, Origin)
228 end;
229 _ ->
230 5 ok
231 end.
232
233 -spec check_host(local_host | global_host, Opts :: proplists:proplist()) -> true.
234 check_host(Key, Opts) ->
235 226 {Key, HostList} = lists:keyfind(Key, 1, Opts),
236 226 Host = unicode:characters_to_binary(HostList),
237 226 lists:member(Host, ?MYHOSTS) orelse error(HostList ++ " is not a member of the host list").
238
239 -spec populate_opts_ets(module(), Opts :: proplists:proplist()) -> any().
240 populate_opts_ets(Module, Opts) ->
241 112 [ets:insert(Module, {Key, translate_opt(Value)}) || {Key, Value} <- Opts].
242
243 -spec translate_opt(term()) -> term().
244 translate_opt([Elem | _] = Opt) when is_list(Elem) ->
245
:-(
[translate_opt(E) || E <- Opt];
246 translate_opt(Opt) when is_list(Opt) ->
247 597 case catch unicode:characters_to_binary(Opt) of
248 224 Bin when is_binary(Bin) -> Bin;
249 373 _ -> Opt
250 end;
251 translate_opt(Opt) ->
252 605 Opt.
253
254 -spec to_ip_tuples(Addr :: inet:ip_address() | string()) ->
255 {ok, [inet:ip_address()]} | {error, {V6 :: atom(), V4 :: atom()}}.
256 to_ip_tuples(Addr) ->
257
:-(
case {?MODULE:getaddrs(Addr, inet6), ?MODULE:getaddrs(Addr, inet)} of
258 {{error, Reason6}, {error, Reason4}} ->
259
:-(
{error, {Reason6, Reason4}};
260 {Addrs, {error, Msg}} ->
261
:-(
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg,
262
:-(
text => <<"IPv4 address resolution error">>}),
263
:-(
Addrs;
264 {{error, Msg}, Addrs} ->
265
:-(
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg,
266
:-(
text => <<"IPv6 address resolution error">>}),
267
:-(
Addrs;
268 {{ok, Addrs6}, {ok, Addrs4}} ->
269
:-(
{ok, Addrs6 ++ Addrs4}
270 end.
271
272 -spec parse_address(binary() | string() | inet:ip_address()) ->
273 {ip, inet:ip_address()} | {domain, domain_name()}.
274 parse_address(DomainOrIp) when is_binary(DomainOrIp) ->
275 94 parse_address(binary_to_list(DomainOrIp));
276 parse_address(Ip) when is_tuple(Ip) ->
277
:-(
{ip, Ip};
278 parse_address(DomainOrIp) ->
279 94 case inet:parse_address(DomainOrIp) of
280 {error, einval} ->
281
:-(
{domain, DomainOrIp};
282 {ok, Ip} ->
283 94 {ip, Ip}
284 end.
285
286 is_domain(DomainOrIp) ->
287
:-(
case parse_address(DomainOrIp) of
288 {domain, _} ->
289
:-(
true;
290 _ ->
291
:-(
false
292 end.
293
294 local_host() ->
295
:-(
opt(mod_global_distrib, local_host).
296
297 %% For mocking in tests
298 getaddrs(Addr, Type) ->
299
:-(
inet:getaddrs(Addr, Type).
Line Hits Source