./ct_report/coverage/mod_global_distrib_utils.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_utils).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -include("mongoose.hrl").
21 -include("jlib.hrl").
22
23 -behaviour(gen_mod).
24
25 %% gen_mod callbacks
26 -export([start/2, stop/1]).
27
28 -export([
29 opt/2, host_type/0, create_ets/1, create_ets/2,
30 cast_or_call/2, cast_or_call/3, cast_or_call/4,
31 any_binary_to_atom/1, resolve_endpoints/1,
32 binary_to_metric_atom/1, ensure_metric/2, recipient_to_worker_key/2,
33 server_to_mgr_name/1, server_to_sup_name/1, maybe_update_mapping/2,
34 parse_address/1
35 ]).
36
37 -export([getaddrs/2]).
38
39 -ignore_xref([cast_or_call/3, cast_or_call/4]).
40
41 -type domain_name() :: string().
42 -type endpoint() :: {inet:ip_address() | domain_name(), inet:port_number()}.
43
44 -export_type([endpoint/0]).
45
46 %%--------------------------------------------------------------------
47 %% API
48 %%--------------------------------------------------------------------
49
50 -spec binary_to_metric_atom(binary()) -> atom().
51 binary_to_metric_atom(Binary) ->
52 1363 List = lists:filtermap(fun
53 616 ($.) -> {true, $_};
54
:-(
($ ) -> {true, $_};
55 10380 (C) when C > 31, C < 127 -> {true, C};
56
:-(
(_) -> false
57 end,
58 unicode:characters_to_list(Binary)),
59 1363 list_to_atom(List).
60
61 ensure_metric(Metric, Type) ->
62 6859 mongoose_metrics:ensure_subscribed_metric(global, Metric, Type).
63
64 -spec any_binary_to_atom(binary()) -> atom().
65 any_binary_to_atom(Binary) ->
66 369 binary_to_atom(base64:encode(Binary), latin1).
67
68 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
69 start(HostType, #{global_host := GlobalHost, local_host := LocalHost}) ->
70 16 check_host(LocalHost),
71 15 check_host(GlobalHost),
72 15 persistent_term:put({mod_global_distrib, host_type}, HostType).
73
74 -spec stop(mongooseim:host_type()) -> any().
75 stop(_HostType) ->
76 15 persistent_term:erase({mod_global_distrib, host_type}).
77
78 -spec opt(module(), gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value().
79 opt(Module, Key) ->
80 7002 gen_mod:get_module_opt(host_type(), Module, Key).
81
82 host_type() ->
83 7074 persistent_term:get({mod_global_distrib, host_type}).
84
85 -spec cast_or_call(Target :: pid() | atom(), Message :: term()) -> any().
86 cast_or_call(Target, Message) ->
87 680 cast_or_call(Target, Message, 500).
88
89 -spec cast_or_call(Target :: pid() | atom(), Message :: term(),
90 SyncWatermark :: non_neg_integer()) ->
91 any().
92 cast_or_call(Target, Message, SyncWatermark) ->
93 680 cast_or_call(Target, Message, SyncWatermark, 5000).
94
95 -spec cast_or_call(Target :: pid() | atom(), Message :: term(),
96 SyncWatermark :: non_neg_integer(), Timeout :: pos_integer() | infinity) ->
97 any().
98 cast_or_call(Target, Message, SyncWatermark, Timeout) when is_atom(Target), Target =/= undefined ->
99 340 cast_or_call(whereis(Target), Message, SyncWatermark, Timeout);
100 cast_or_call(Target, Message, SyncWatermark, Timeout) when is_pid(Target) ->
101 680 case process_info(Target, message_queue_len) of
102
:-(
{_, X} when X > SyncWatermark -> gen_server:call(Target, Message, Timeout);
103 680 {_, _} -> gen_server:cast(Target, Message)
104 end.
105
106 -spec create_ets(Names :: [atom()] | atom()) -> any().
107 create_ets(Names) ->
108 15 create_ets(Names, set).
109
110 -spec create_ets(Names :: [atom()] | atom(), Type :: atom()) -> any().
111 create_ets(Names, Type) when is_list(Names) ->
112 15 [create_ets(Name, Type) || Name <- Names];
113 create_ets(Name, Type) ->
114 75 ejabberd_sup:create_ets_table(Name, [named_table, public, Type, {read_concurrency, true}]).
115
116 -spec resolve_endpoints([{inet:ip_address() | string(), inet:port_number()}]) -> [endpoint()].
117 resolve_endpoints(Endpoints) when is_list(Endpoints) ->
118 82 lists:flatmap(fun resolve_endpoint/1, Endpoints).
119
120 resolve_endpoint({Addr, _Port} = E) when is_tuple(Addr) ->
121 103 [E];
122 resolve_endpoint({Addr, Port}) ->
123
:-(
case to_ip_tuples(Addr) of
124 {ok, IpAddrs} ->
125
:-(
Resolved = [{IpAddr, Port} || IpAddr <- IpAddrs],
126
:-(
?LOG_IF(info, is_domain(Addr), #{what => gd_resolve_endpoint,
127 text => <<"GD resolved address to IPs">>,
128
:-(
address => Addr, ip_addresses => IpAddrs}),
129
:-(
Resolved;
130 {error, {Reasonv6, Reasonv4}} ->
131
:-(
?LOG_ERROR(#{what => gd_resolve_endpoint_failed,
132 text => <<"GD Cannot convert address to IP addresses">>,
133 address => Addr,
134 ipv6_reason => inet:format_error(Reasonv6),
135
:-(
ipv4_reason => inet:format_error(Reasonv4)}),
136
:-(
error({domain_not_resolved, {Reasonv6, Reasonv4}})
137 end.
138
139
140 -spec recipient_to_worker_key(jid:jid() | jid:ljid(), jid:lserver()) -> binary().
141 recipient_to_worker_key(#jid{} = Jid, GlobalHost) ->
142 369 recipient_to_worker_key(jid:to_lower(Jid), GlobalHost);
143 recipient_to_worker_key({_, GlobalHost, _} = Jid, GlobalHost) ->
144 341 jid:to_binary(Jid);
145 recipient_to_worker_key({_, Domain, _}, _GlobalHost) ->
146 28 Domain.
147
148 -spec server_to_mgr_name(Server :: jid:lserver()) -> atom().
149 server_to_mgr_name(Server) ->
150 841 gen_mod:get_module_proc(Server, mod_global_distrib_server_mgr).
151
152 -spec server_to_sup_name(Server :: jid:lserver()) -> atom().
153 server_to_sup_name(Server) ->
154 165 gen_mod:get_module_proc(Server, mod_global_distrib_server_sup).
155
156 %% IMPORTANT! Regarding mod_global_distrib_mapping:insert_for_*/2:
157 %% These functions with arity 2 will call cache update functions with
158 %% dummy update functions, so they will result only in cache update with
159 %% no backend side effects.
160 -spec maybe_update_mapping(From :: jid:jid(), mongoose_acc:t()) -> any().
161 maybe_update_mapping(_From, #{name := <<"presence">>, type := <<"unavailable">>}) ->
162
:-(
ok;
163 maybe_update_mapping(#jid{luser = <<>>, lserver = LServer} = From, Acc) ->
164 5 case opt(mod_global_distrib, global_host) of
165
:-(
LServer -> ok;
166 _ ->
167 5 ensure_domain_inserted(Acc, From#jid.lserver)
168 end;
169 maybe_update_mapping(From, Acc) ->
170 340 case mod_global_distrib_mapping:for_jid(From) of
171 error ->
172 9 case mod_global_distrib:find_metadata(Acc, origin) of
173 %% Lack of 'global_distrib' indicates 100% local routing...
174 {error, undefined} ->
175 %% .. so we can insert From into cache with local host as mapping
176
:-(
mod_global_distrib_mapping:cache_jid(From, local_host());
177 {ok, Origin} ->
178 9 mod_global_distrib_mapping:cache_jid(From, Origin)
179 end;
180 _ ->
181 331 ok
182 end.
183
184 %%--------------------------------------------------------------------
185 %% Helpers
186 %%--------------------------------------------------------------------
187
188 -spec ensure_domain_inserted(mongoose_acc:t(), jid:lserver()) -> ok.
189 ensure_domain_inserted(Acc, Domain) ->
190 5 case mod_global_distrib_mapping:for_domain(Domain) of
191 error ->
192 %% See the comments in the last match of maybe_update_mapping/2 function
193
:-(
case mod_global_distrib:find_metadata(Acc, origin) of
194 {error, undefined} ->
195
:-(
mod_global_distrib_mapping:cache_domain(Domain, local_host());
196 {ok, Origin} ->
197
:-(
mod_global_distrib_mapping:cache_domain(Domain, Origin)
198 end;
199 _ ->
200 5 ok
201 end.
202
203 %% @doc Check that the host is hosted by the server
204 -spec check_host(jid:lserver()) -> ok.
205 check_host(Domain) ->
206 %% There is no clause for a dynamic domain as this module can't be started with dynamic domains
207 31 case mongoose_domain_api:get_domain_host_type(Domain) of
208 {error, not_found} ->
209 1 error(#{what => check_host_failed,
210 msg => <<"Domain is not hosted by the server">>,
211 domain => Domain});
212 {ok, Domain} ->
213 30 ok
214 end.
215
216 get_addrs_in_parallel(Addr) ->
217 %% getaddrs could be pretty slow, so do in parallel
218 %% Also, limit the time it could be running to 5 seconds
219 %% (would fail with reason timeout if it takes too long)
220
:-(
F = fun(Ver) -> ?MODULE:getaddrs(Addr, Ver) end,
221
:-(
[V6, V4] = mongoose_lib:pmap(F, [inet6, inet]),
222
:-(
{simplify_result(V6), simplify_result(V4)}.
223
224
:-(
simplify_result({ok, Res}) -> Res;
225
:-(
simplify_result(Res) -> Res.
226
227 -spec to_ip_tuples(Addr :: inet:ip_address() | string()) ->
228 {ok, [inet:ip_address()]} | {error, {V6 :: atom(), V4 :: atom()}}.
229 to_ip_tuples(Addr) ->
230
:-(
case get_addrs_in_parallel(Addr) of
231 {{error, Reason6}, {error, Reason4}} ->
232
:-(
{error, {Reason6, Reason4}};
233 {Addrs, {error, Msg}} ->
234
:-(
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg,
235
:-(
text => <<"IPv4 address resolution error">>}),
236
:-(
Addrs;
237 {{error, Msg}, Addrs} ->
238
:-(
?LOG_DEBUG(#{what => resolv_error, address => Addr, reason => Msg,
239
:-(
text => <<"IPv6 address resolution error">>}),
240
:-(
Addrs;
241 {{ok, Addrs6}, {ok, Addrs4}} ->
242
:-(
{ok, Addrs6 ++ Addrs4}
243 end.
244
245 -spec parse_address(binary() | string() | inet:ip_address()) ->
246 {ip, inet:ip_address()} | {domain, domain_name()}.
247 parse_address(DomainOrIp) when is_binary(DomainOrIp) ->
248 103 parse_address(binary_to_list(DomainOrIp));
249 parse_address(Ip) when is_tuple(Ip) ->
250
:-(
{ip, Ip};
251 parse_address(DomainOrIp) ->
252 103 case inet:parse_address(DomainOrIp) of
253 {error, einval} ->
254
:-(
{domain, DomainOrIp};
255 {ok, Ip} ->
256 103 {ip, Ip}
257 end.
258
259 is_domain(DomainOrIp) ->
260
:-(
case parse_address(DomainOrIp) of
261 {domain, _} ->
262
:-(
true;
263 _ ->
264
:-(
false
265 end.
266
267 local_host() ->
268
:-(
opt(mod_global_distrib, local_host).
269
270 getaddrs(Addr, Type) ->
271
:-(
case inet:getaddrs(Addr, Type) of
272
:-(
{ok, Addrs} -> {ok, lists:usort(Addrs)};
273
:-(
{error, Reason} -> {error, Reason}
274 end.
Line Hits Source