1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
%% Intercepts filter_packet hook with maybe_reroute callback. |
18 |
|
-module(mod_global_distrib). |
19 |
|
-author('konrad.zemek@erlang-solutions.com'). |
20 |
|
|
21 |
|
-behaviour(gen_mod). |
22 |
|
-behaviour(mongoose_module_metrics). |
23 |
|
|
24 |
|
-include("global_distrib_metrics.hrl"). |
25 |
|
-include("jlib.hrl"). |
26 |
|
-include("mongoose.hrl"). |
27 |
|
-include("mongoose_config_spec.hrl"). |
28 |
|
|
29 |
|
-export([deps/2, start/2, stop/1, config_spec/0]). |
30 |
|
-export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]). |
31 |
|
-export([maybe_reroute/3]). |
32 |
|
-export([process_opts/1, process_endpoint/1]). |
33 |
|
|
34 |
|
-ignore_xref([remove_metadata/2]). |
35 |
|
|
36 |
|
%%-------------------------------------------------------------------- |
37 |
|
%% gen_mod API |
38 |
|
%%-------------------------------------------------------------------- |
39 |
|
|
40 |
|
%% Note: while this module should be enabled for all hosts, |
41 |
|
%% it needs to be started only once - this is why deps/2 and start/2 |
42 |
|
%% do nothing for hosts other than global_host |
43 |
|
|
44 |
|
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps(). |
45 |
|
deps(HostType, Opts = #{global_host := HostType, bounce := BounceOpts}) -> |
46 |
|
%% Start each required module with the same opts for simplicity |
47 |
106 |
[{Mod, Opts, hard} || Mod <- dep_modules() ++ bounce_modules(BounceOpts)]; |
48 |
|
deps(_HostType, #{}) -> |
49 |
:-( |
[]. |
50 |
|
|
51 |
|
dep_modules() -> |
52 |
106 |
[mod_global_distrib_utils, mod_global_distrib_mapping, mod_global_distrib_disco, |
53 |
|
mod_global_distrib_receiver, mod_global_distrib_hosts_refresher]. |
54 |
|
|
55 |
106 |
bounce_modules(#{enabled := true}) -> [mod_global_distrib_bounce]; |
56 |
:-( |
bounce_modules(#{enabled := false}) -> []. |
57 |
|
|
58 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
59 |
|
start(HostType, #{global_host := HostType}) -> |
60 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram), |
61 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral), |
62 |
21 |
gen_hook:add_handlers(hooks()); |
63 |
|
start(_HostType, #{}) -> |
64 |
:-( |
ok. |
65 |
|
|
66 |
|
-spec stop(mongooseim:host_type()) -> any(). |
67 |
|
stop(HostType) -> |
68 |
21 |
case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of |
69 |
21 |
HostType -> gen_hook:delete_handlers(hooks()); |
70 |
:-( |
_ -> ok |
71 |
|
end. |
72 |
|
|
73 |
|
hooks() -> |
74 |
42 |
[{filter_packet, global, fun ?MODULE:maybe_reroute/3, #{}, 99}]. |
75 |
|
|
76 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
77 |
|
config_spec() -> |
78 |
106 |
#section{ |
79 |
|
items = #{<<"global_host">> => #option{type = binary, |
80 |
|
validate = domain}, |
81 |
|
<<"local_host">> => #option{type = binary, |
82 |
|
validate = domain}, |
83 |
|
<<"message_ttl">> => #option{type = integer, |
84 |
|
validate = non_negative}, |
85 |
|
<<"hosts_refresh_interval">> => #option{type = integer, |
86 |
|
validate = non_negative}, |
87 |
|
<<"connections">> => connections_spec(), |
88 |
|
<<"redis">> => redis_spec(), |
89 |
|
<<"cache">> => cache_spec(), |
90 |
|
<<"bounce">> => bounce_spec() |
91 |
|
}, |
92 |
|
required = [<<"global_host">>, <<"local_host">>], |
93 |
|
defaults = #{<<"message_ttl">> => 4, |
94 |
|
<<"hosts_refresh_interval">> => 3000}, |
95 |
|
process = fun ?MODULE:process_opts/1 |
96 |
|
}. |
97 |
|
|
98 |
|
connections_spec() -> |
99 |
106 |
#section{ |
100 |
|
items = #{<<"endpoints">> => #list{items = endpoint_spec()}, |
101 |
|
<<"advertised_endpoints">> => #list{items = endpoint_spec()}, |
102 |
|
<<"connections_per_endpoint">> => #option{type = integer, |
103 |
|
validate = non_negative}, |
104 |
|
<<"endpoint_refresh_interval">> => #option{type = integer, |
105 |
|
validate = positive}, |
106 |
|
<<"endpoint_refresh_interval_when_empty">> => #option{type = integer, |
107 |
|
validate = positive}, |
108 |
|
<<"disabled_gc_interval">> => #option{type = integer, |
109 |
|
validate = positive}, |
110 |
|
<<"tls">> => tls_spec() |
111 |
|
}, |
112 |
|
defaults = #{<<"connections_per_endpoint">> => 1, |
113 |
|
<<"endpoint_refresh_interval">> => 60, |
114 |
|
<<"endpoint_refresh_interval_when_empty">> => 3, |
115 |
|
<<"disabled_gc_interval">> => 60}, |
116 |
|
include = always |
117 |
|
}. |
118 |
|
|
119 |
|
endpoint_spec() -> |
120 |
212 |
#section{ |
121 |
|
items = #{<<"host">> => #option{type = string, |
122 |
|
validate = network_address}, |
123 |
|
<<"port">> => #option{type = integer, |
124 |
|
validate = port} |
125 |
|
}, |
126 |
|
required = all, |
127 |
|
process = fun ?MODULE:process_endpoint/1 |
128 |
|
}. |
129 |
|
|
130 |
|
tls_spec() -> |
131 |
106 |
mongoose_config_spec:tls([client, server], [fast_tls]). |
132 |
|
|
133 |
|
redis_spec() -> |
134 |
106 |
#section{ |
135 |
|
items = #{<<"pool">> => #option{type = atom, |
136 |
|
validate = pool_name}, |
137 |
|
<<"expire_after">> => #option{type = integer, |
138 |
|
validate = positive}, |
139 |
|
<<"refresh_after">> => #option{type = integer, |
140 |
|
validate = non_negative} |
141 |
|
}, |
142 |
|
defaults = #{<<"pool">> => global_distrib, |
143 |
|
<<"expire_after">> => 120, |
144 |
|
<<"refresh_after">> => 60}, |
145 |
|
include = always |
146 |
|
}. |
147 |
|
|
148 |
|
cache_spec() -> |
149 |
106 |
#section{ |
150 |
|
items = #{<<"cache_missed">> => #option{type = boolean}, |
151 |
|
<<"domain_lifetime_seconds">> => #option{type = integer, |
152 |
|
validate = non_negative}, |
153 |
|
<<"jid_lifetime_seconds">> => #option{type = integer, |
154 |
|
validate = non_negative}, |
155 |
|
<<"max_jids">> => #option{type = integer, |
156 |
|
validate = non_negative} |
157 |
|
}, |
158 |
|
defaults = #{<<"cache_missed">> => true, |
159 |
|
<<"domain_lifetime_seconds">> => 600, |
160 |
|
<<"jid_lifetime_seconds">> => 5, |
161 |
|
<<"max_jids">> => 10000 |
162 |
|
}, |
163 |
|
include = always |
164 |
|
}. |
165 |
|
|
166 |
|
bounce_spec() -> |
167 |
106 |
#section{ |
168 |
|
items = #{<<"enabled">> => #option{type = boolean}, |
169 |
|
<<"resend_after_ms">> => #option{type = integer, |
170 |
|
validate = non_negative}, |
171 |
|
<<"max_retries">> => #option{type = integer, |
172 |
|
validate = non_negative} |
173 |
|
}, |
174 |
|
defaults = #{<<"enabled">> => true, |
175 |
|
<<"resend_after_ms">> => 200, |
176 |
|
<<"max_retries">> => 4}, |
177 |
|
include = always |
178 |
|
}. |
179 |
|
|
180 |
|
-spec process_opts(gen_mod:module_opts()) -> gen_mod:module_opts(). |
181 |
|
process_opts(Opts = #{local_host := LocalHost, connections := Connections}) -> |
182 |
:-( |
Opts#{connections := process_connections(LocalHost, Connections)}. |
183 |
|
|
184 |
|
process_connections(_LocalHost, Opts = #{advertised_endpoints := _, |
185 |
|
endpoints := Endpoints}) -> |
186 |
:-( |
Opts#{resolved_endpoints => mod_global_distrib_utils:resolve_endpoints(Endpoints)}; |
187 |
|
process_connections(LocalHost, Opts = #{endpoints := Endpoints}) -> |
188 |
:-( |
process_connections(LocalHost, Opts#{advertised_endpoints => Endpoints}); |
189 |
|
process_connections(LocalHost, Opts) -> |
190 |
:-( |
process_connections(LocalHost, Opts#{endpoints => [{binary_to_list(LocalHost), 5555}]}). |
191 |
|
|
192 |
|
-spec process_endpoint(map()) -> mod_global_distrib_utils:endpoint(). |
193 |
|
process_endpoint(#{host := Host, port := Port}) -> |
194 |
:-( |
{Host, Port}. |
195 |
|
|
196 |
|
%%-------------------------------------------------------------------- |
197 |
|
%% public functions |
198 |
|
%%-------------------------------------------------------------------- |
199 |
|
|
200 |
|
-spec get_metadata(mongoose_acc:t(), Key :: term(), Default :: term()) -> Value :: term(). |
201 |
|
get_metadata(Acc, Key, Default) -> |
202 |
852 |
mongoose_acc:get(global_distrib, Key, Default, Acc). |
203 |
|
|
204 |
|
-spec find_metadata(mongoose_acc:t(), Key :: term()) -> |
205 |
|
{ok, Value :: term()} | {error, undefined}. |
206 |
|
find_metadata(Acc, Key) -> |
207 |
2409 |
try mongoose_acc:get(global_distrib, Key, Acc) of |
208 |
2001 |
Value -> {ok, Value} |
209 |
|
catch |
210 |
|
_:_ -> |
211 |
408 |
{error, undefined} |
212 |
|
end. |
213 |
|
|
214 |
|
-spec put_metadata(mongoose_acc:t(), Key :: term(), Value :: term()) -> mongoose_acc:t(). |
215 |
|
put_metadata(Acc, Key, Value) -> |
216 |
1618 |
mongoose_acc:set_permanent(global_distrib, Key, Value, Acc). |
217 |
|
|
218 |
|
-spec remove_metadata(mongoose_acc:t(), Key :: term()) -> mongoose_acc:t(). |
219 |
|
remove_metadata(Acc, Key) -> |
220 |
342 |
mongoose_acc:delete(global_distrib, Key, Acc). |
221 |
|
|
222 |
|
%%-------------------------------------------------------------------- |
223 |
|
%% Hooks implementation |
224 |
|
%%-------------------------------------------------------------------- |
225 |
|
|
226 |
|
-spec maybe_reroute(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when |
227 |
|
FPacket :: mongoose_hooks:filter_packet_acc(), |
228 |
|
Params :: map(), |
229 |
|
Extra :: map(). |
230 |
|
maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From, |
231 |
|
#jid{ luser = SameUser, lserver = SameServer } = _To, |
232 |
|
_Acc, _Packet} = FPacket, _, _) -> |
233 |
|
%% GD is not designed to support two user sessions existing in distinct clusters |
234 |
|
%% and here we explicitly block routing stanzas between them. |
235 |
|
%% Without this clause, test_pm_with_ungraceful_reconnection_to_different_server test |
236 |
|
%% was randomly failing because sometimes 'unavailable' presence from a dead session |
237 |
|
%% was poisoning reg1 cache. In such case, reg1 tried to route locally stanzas |
238 |
|
%% from unacked SM buffer, leading to an error, while a brand new, shiny Eve |
239 |
|
%% on mim1 was waiting. |
240 |
192 |
{ok, FPacket}; |
241 |
|
maybe_reroute({From, To, _, Packet} = FPacket, _, _) -> |
242 |
800 |
Acc = maybe_initialize_metadata(FPacket), |
243 |
800 |
{ok, ID} = find_metadata(Acc, id), |
244 |
800 |
LocalHost = opt(local_host), |
245 |
800 |
GlobalHost = opt(global_host), |
246 |
|
%% If target_host_override is set (typically when routed out of bounce storage), |
247 |
|
%% host lookup is skipped and messages are routed to target_host_override value. |
248 |
800 |
TargetHostOverride = get_metadata(Acc, target_host_override, undefined), |
249 |
800 |
ResultFPacket = case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of |
250 |
|
{ok, LocalHost} -> |
251 |
|
%% Continue routing with initialized metadata |
252 |
406 |
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost, |
253 |
|
From, To, LocalHost), |
254 |
406 |
?LOG_DEBUG(#{what => gd_route_local, |
255 |
|
text => <<"Routing global message to local datacenter">>, |
256 |
:-( |
gd_id => ID, local_host => LocalHost, acc => Acc}), |
257 |
406 |
{ok, TTL} = find_metadata(Acc, ttl), |
258 |
406 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, TTL), |
259 |
406 |
{From, To, Acc, Packet}; |
260 |
|
|
261 |
|
{ok, TargetHost} -> |
262 |
342 |
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost, |
263 |
|
From, To, TargetHost), |
264 |
342 |
case find_metadata(Acc, ttl) of |
265 |
|
{ok, 0} -> |
266 |
|
%% Just continue routing |
267 |
:-( |
?LOG_INFO(#{what => gd_route_ttl_zero, |
268 |
|
text => <<"Skip global distribution">>, |
269 |
:-( |
gd_id => ID, acc => Acc, target_host => TargetHost}), |
270 |
:-( |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1), |
271 |
:-( |
FPacket; |
272 |
|
{ok, TTL} -> |
273 |
342 |
?LOG_DEBUG(#{what => gd_reroute, ttl => TTL, |
274 |
|
text => <<"Forward stanza to remote cluster " |
275 |
|
"using global distribution">>, |
276 |
:-( |
gd_id => ID, acc => Acc, target_host => TargetHost}), |
277 |
342 |
Acc1 = put_metadata(Acc, ttl, TTL - 1), |
278 |
342 |
Acc2 = remove_metadata(Acc1, target_host_override), |
279 |
|
%% KNOWN ISSUE: will crash loudly if there are no connections available |
280 |
|
%% TODO: Discuss behaviour in such scenario |
281 |
342 |
Worker = get_bound_connection_noisy(TargetHost, ID, Acc2), |
282 |
341 |
mod_global_distrib_sender:send(Worker, {From, To, Acc2, Packet}), |
283 |
341 |
drop |
284 |
|
end; |
285 |
|
|
286 |
|
error -> |
287 |
52 |
?LOG_DEBUG(#{what => gd_route_failed, gd_id => ID, acc => Acc, |
288 |
:-( |
text => <<"Unable to route global: user not found in the routing table">>}), |
289 |
52 |
mongoose_hooks:mod_global_distrib_unknown_recipient(GlobalHost, {From, To, Acc, Packet}) |
290 |
|
end, |
291 |
799 |
case ResultFPacket of |
292 |
|
drop -> |
293 |
385 |
{stop, drop}; |
294 |
|
ResultFPacket -> |
295 |
414 |
{ok, ResultFPacket} |
296 |
|
end. |
297 |
|
|
298 |
|
%%-------------------------------------------------------------------- |
299 |
|
%% Helpers |
300 |
|
%%-------------------------------------------------------------------- |
301 |
|
|
302 |
|
-spec maybe_initialize_metadata({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> mongoose_acc:t(). |
303 |
|
maybe_initialize_metadata({_From, _To, Acc, _Packet}) -> |
304 |
800 |
case find_metadata(Acc, origin) of |
305 |
|
{error, undefined} -> |
306 |
408 |
Acc1 = put_metadata(Acc, ttl, opt(message_ttl)), |
307 |
408 |
ID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), |
308 |
408 |
Acc2 = put_metadata(Acc1, id, ID), |
309 |
408 |
?LOG_DEBUG(#{what => gd_init_metadata, gd_id => ID, acc => Acc}), |
310 |
408 |
put_metadata(Acc2, origin, opt(local_host)); |
311 |
|
_ -> |
312 |
392 |
Acc |
313 |
|
end. |
314 |
|
|
315 |
|
get_bound_connection_noisy(TargetHost, GDID, Acc) -> |
316 |
342 |
try get_bound_connection(TargetHost, GDID) |
317 |
|
catch Class:Reason:Stacktrace -> |
318 |
1 |
?LOG_ERROR(#{what => gd_get_process_for_failed, |
319 |
|
gd_id => GDID, acc => Acc, target_host => TargetHost, |
320 |
:-( |
class => Class, reason => Reason, stacktrace => Stacktrace}), |
321 |
1 |
erlang:raise(Class, Reason, Stacktrace) |
322 |
|
end. |
323 |
|
|
324 |
|
-spec get_bound_connection(Server :: jid:lserver(), binary()) -> pid(). |
325 |
|
get_bound_connection(Server, GDID) when is_binary(GDID) -> |
326 |
342 |
get_bound_connection(Server, GDID, get({connection, Server})). |
327 |
|
|
328 |
|
-spec get_bound_connection(Server :: jid:lserver(), term(), pid() | undefined) -> pid(). |
329 |
|
get_bound_connection(Server, GDID, undefined) -> |
330 |
28 |
Pid = mod_global_distrib_sender:get_process_for(Server), |
331 |
27 |
put({connection, Server}, Pid), |
332 |
27 |
?LOG_DEBUG(#{what => gd_new_bound_connection, |
333 |
:-( |
server => Server, gd_id => GDID, gd_pid => Pid}), |
334 |
27 |
Pid; |
335 |
|
get_bound_connection(Server, GDID, Pid) when is_pid(Pid) -> |
336 |
315 |
case is_process_alive(Pid) of |
337 |
|
false -> |
338 |
1 |
?LOG_DEBUG(#{what => gd_dead_bound_connection, |
339 |
:-( |
server => Server, gd_id => GDID, gd_pid => Pid}), |
340 |
1 |
get_bound_connection(Server, GDID, undefined); |
341 |
|
true -> |
342 |
314 |
?LOG_DEBUG(#{what => gd_reuse_bound_connection, |
343 |
:-( |
server => Server, gd_id => GDID, gd_pid => Pid}), |
344 |
314 |
Pid |
345 |
|
end. |
346 |
|
|
347 |
|
-spec lookup_recipients_host(TargetHost :: binary() | undefined, |
348 |
|
To :: jid:jid(), |
349 |
|
LocalHost :: binary(), |
350 |
|
GlobalHost :: binary()) -> |
351 |
|
{ok, binary()} | error. |
352 |
|
lookup_recipients_host(undefined, To, LocalHost, GlobalHost) -> |
353 |
792 |
lookup_recipients_host(To, LocalHost, GlobalHost); |
354 |
|
lookup_recipients_host(TargetHost, _To, _LocalHost, _GlobalHost) -> |
355 |
8 |
{ok, TargetHost}. |
356 |
|
|
357 |
|
-spec lookup_recipients_host(To :: jid:jid(), |
358 |
|
LocalHost :: binary(), |
359 |
|
GlobalHost :: binary()) -> {ok, binary()} | error. |
360 |
|
lookup_recipients_host(#jid{luser = <<>>, lserver = LServer}, LocalHost, GlobalHost) |
361 |
|
when LServer == LocalHost; LServer == GlobalHost -> |
362 |
3 |
{ok, LocalHost}; |
363 |
|
lookup_recipients_host(#jid{luser = <<>>, lserver = HostAddressedTo}, _LocalHost, _GlobalHost) -> |
364 |
20 |
mod_global_distrib_mapping:for_domain(HostAddressedTo); |
365 |
|
lookup_recipients_host(#jid{lserver = HostAddressedTo} = To, LocalHost, GlobalHost) -> |
366 |
769 |
case HostAddressedTo of |
367 |
:-( |
LocalHost -> {ok, LocalHost}; |
368 |
751 |
GlobalHost -> mod_global_distrib_mapping:for_jid(To); |
369 |
18 |
_ -> mod_global_distrib_mapping:for_domain(HostAddressedTo) |
370 |
|
end. |
371 |
|
|
372 |
|
-spec opt(Key :: atom()) -> term(). |
373 |
|
opt(Key) -> |
374 |
2416 |
mod_global_distrib_utils:opt(?MODULE, Key). |