1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
%% Intercepts filter_packet hook with maybe_reroute callback. |
18 |
|
-module(mod_global_distrib). |
19 |
|
-author('konrad.zemek@erlang-solutions.com'). |
20 |
|
|
21 |
|
-behaviour(gen_mod). |
22 |
|
-behaviour(mongoose_module_metrics). |
23 |
|
|
24 |
|
-include("global_distrib_metrics.hrl"). |
25 |
|
-include("jlib.hrl"). |
26 |
|
-include("mongoose.hrl"). |
27 |
|
-include("mongoose_config_spec.hrl"). |
28 |
|
|
29 |
|
-export([deps/2, start/2, stop/1, config_spec/0]). |
30 |
|
-export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]). |
31 |
|
-export([maybe_reroute/1]). |
32 |
|
-export([process_endpoints/1, process_bounce/1]). |
33 |
|
|
34 |
|
-ignore_xref([maybe_reroute/1, remove_metadata/2]). |
35 |
|
|
36 |
|
%%-------------------------------------------------------------------- |
37 |
|
%% gen_mod API |
38 |
|
%% See "gen_mod logic" block below in this file |
39 |
|
%%-------------------------------------------------------------------- |
40 |
|
|
41 |
|
-spec deps(Host :: jid:server(), Opts :: proplists:proplist()) -> gen_mod:deps_list(). |
42 |
|
deps(Host, Opts) -> |
43 |
105 |
mod_global_distrib_utils:deps(?MODULE, Host, Opts, fun deps/1). |
44 |
|
|
45 |
|
-spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any(). |
46 |
|
start(Host, Opts0) -> |
47 |
22 |
Opts = [{message_ttl, 4} | Opts0], |
48 |
22 |
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0). |
49 |
|
|
50 |
|
-spec stop(Host :: jid:lserver()) -> any(). |
51 |
|
stop(Host) -> |
52 |
21 |
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0). |
53 |
|
|
54 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
55 |
|
config_spec() -> |
56 |
160 |
#section{ |
57 |
|
items = #{<<"global_host">> => #option{type = string, |
58 |
|
validate = domain}, |
59 |
|
<<"local_host">> => #option{type = string, |
60 |
|
validate = domain}, |
61 |
|
<<"message_ttl">> => #option{type = integer, |
62 |
|
validate = non_negative}, |
63 |
|
<<"hosts_refresh_interval">> => #option{type = integer, |
64 |
|
validate = non_negative}, |
65 |
|
<<"connections">> => connections_spec(), |
66 |
|
<<"redis">> => redis_spec(), |
67 |
|
<<"cache">> => cache_spec(), |
68 |
|
<<"bounce">> => bounce_spec() |
69 |
|
}, |
70 |
|
required = [<<"global_host">>, <<"local_host">>] |
71 |
|
}. |
72 |
|
|
73 |
|
connections_spec() -> |
74 |
160 |
#section{ |
75 |
|
items = #{<<"endpoints">> => #list{items = endpoints_spec()}, |
76 |
|
<<"advertised_endpoints">> => #list{items = endpoints_spec()}, |
77 |
|
<<"connections_per_endpoint">> => #option{type = integer, |
78 |
|
validate = non_negative}, |
79 |
|
<<"endpoint_refresh_interval">> => #option{type = integer, |
80 |
|
validate = positive}, |
81 |
|
<<"endpoint_refresh_interval_when_empty">> => #option{type = integer, |
82 |
|
validate = positive}, |
83 |
|
<<"disabled_gc_interval">> => #option{type = integer, |
84 |
|
validate = positive}, |
85 |
|
<<"tls">> => tls_spec() |
86 |
|
} |
87 |
|
}. |
88 |
|
|
89 |
|
endpoints_spec() -> |
90 |
320 |
#section{ |
91 |
|
items = #{<<"host">> => #option{type = string, |
92 |
|
validate = network_address}, |
93 |
|
<<"port">> => #option{type = integer, |
94 |
|
validate = port} |
95 |
|
}, |
96 |
|
required = all, |
97 |
|
process = fun ?MODULE:process_endpoints/1 |
98 |
|
}. |
99 |
|
|
100 |
|
tls_spec() -> |
101 |
160 |
#section{ |
102 |
|
items = #{<<"certfile">> => #option{type = string, |
103 |
|
validate = filename}, |
104 |
|
<<"cacertfile">> => #option{type = string, |
105 |
|
validate = filename, |
106 |
|
wrap = {kv, cafile}}, |
107 |
|
<<"ciphers">> => #option{type = string}, |
108 |
|
<<"dhfile">> => #option{type = string, |
109 |
|
validate = filename} |
110 |
|
}, |
111 |
|
required = [<<"certfile">>, <<"cacertfile">>], |
112 |
|
wrap = {kv, tls_opts} |
113 |
|
}. |
114 |
|
|
115 |
|
redis_spec() -> |
116 |
160 |
#section{ |
117 |
|
items = #{<<"pool">> => #option{type = atom, |
118 |
|
validate = pool_name}, |
119 |
|
<<"expire_after">> => #option{type = integer, |
120 |
|
validate = positive}, |
121 |
|
<<"refresh_after">> => #option{type = integer, |
122 |
|
validate = non_negative} |
123 |
|
} |
124 |
|
}. |
125 |
|
|
126 |
|
cache_spec() -> |
127 |
160 |
#section{ |
128 |
|
items = #{<<"cache_missed">> => #option{type = boolean}, |
129 |
|
<<"domain_lifetime_seconds">> => #option{type = integer, |
130 |
|
validate = non_negative}, |
131 |
|
<<"jid_lifetime_seconds">> => #option{type = integer, |
132 |
|
validate = non_negative}, |
133 |
|
<<"max_jids">> => #option{type = integer, |
134 |
|
validate = non_negative} |
135 |
|
} |
136 |
|
}. |
137 |
|
|
138 |
|
bounce_spec() -> |
139 |
160 |
#section{ |
140 |
|
items = #{<<"enabled">> => #option{type = boolean}, |
141 |
|
<<"resend_after_ms">> => #option{type = integer, |
142 |
|
validate = non_negative}, |
143 |
|
<<"max_retries">> => #option{type = integer, |
144 |
|
validate = non_negative} |
145 |
|
}, |
146 |
|
process = fun ?MODULE:process_bounce/1 |
147 |
|
}. |
148 |
|
|
149 |
|
process_endpoints(KV) -> |
150 |
:-( |
{[[{host, Host}], [{port, Port}]], []} = proplists:split(KV, [host, port]), |
151 |
:-( |
{Host, Port}. |
152 |
|
|
153 |
|
process_bounce(KVs) -> |
154 |
:-( |
{[EnabledOpts], Opts} = proplists:split(KVs, [enabled]), |
155 |
:-( |
bounce_value(EnabledOpts, Opts). |
156 |
|
|
157 |
:-( |
bounce_value([{enabled, false}], _) -> false; |
158 |
:-( |
bounce_value(_, Opts) -> Opts. |
159 |
|
|
160 |
|
%%-------------------------------------------------------------------- |
161 |
|
%% public functions |
162 |
|
%%-------------------------------------------------------------------- |
163 |
|
|
164 |
|
-spec get_metadata(mongoose_acc:t(), Key :: term(), Default :: term()) -> Value :: term(). |
165 |
|
get_metadata(Acc, Key, Default) -> |
166 |
821 |
mongoose_acc:get(global_distrib, Key, Default, Acc). |
167 |
|
|
168 |
|
-spec find_metadata(mongoose_acc:t(), Key :: term()) -> |
169 |
|
{ok, Value :: term()} | {error, undefined}. |
170 |
|
find_metadata(Acc, Key) -> |
171 |
2354 |
try mongoose_acc:get(global_distrib, Key, Acc) of |
172 |
1951 |
Value -> {ok, Value} |
173 |
|
catch |
174 |
|
_:_ -> |
175 |
403 |
{error, undefined} |
176 |
|
end. |
177 |
|
|
178 |
|
-spec put_metadata(mongoose_acc:t(), Key :: term(), Value :: term()) -> mongoose_acc:t(). |
179 |
|
put_metadata(Acc, Key, Value) -> |
180 |
1590 |
mongoose_acc:set_permanent(global_distrib, Key, Value, Acc). |
181 |
|
|
182 |
|
-spec remove_metadata(mongoose_acc:t(), Key :: term()) -> mongoose_acc:t(). |
183 |
|
remove_metadata(Acc, Key) -> |
184 |
337 |
mongoose_acc:delete(global_distrib, Key, Acc). |
185 |
|
|
186 |
|
%%-------------------------------------------------------------------- |
187 |
|
%% Hooks implementation |
188 |
|
%%-------------------------------------------------------------------- |
189 |
|
|
190 |
|
-spec maybe_reroute(drop) -> drop; |
191 |
|
({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> |
192 |
|
drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}. |
193 |
:-( |
maybe_reroute(drop) -> drop; |
194 |
|
maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From, |
195 |
|
#jid{ luser = SameUser, lserver = SameServer } = _To, |
196 |
|
_Acc, _Packet} = FPacket) -> |
197 |
|
%% GD is not designed to support two user sessions existing in distinct clusters |
198 |
|
%% and here we explicitly block routing stanzas between them. |
199 |
|
%% Without this clause, test_pm_with_ungraceful_reconnection_to_different_server test |
200 |
|
%% was randomly failing because sometimes 'unavailable' presence from a dead session |
201 |
|
%% was poisoning reg1 cache. In such case, reg1 tried to route locally stanzas |
202 |
|
%% from unacked SM buffer, leading to an error, while a brand new, shiny Eve |
203 |
|
%% on mim1 was waiting. |
204 |
153 |
FPacket; |
205 |
|
maybe_reroute({From, To, _, Packet} = FPacket) -> |
206 |
782 |
Acc = maybe_initialize_metadata(FPacket), |
207 |
782 |
{ok, ID} = find_metadata(Acc, id), |
208 |
782 |
LocalHost = opt(local_host), |
209 |
782 |
GlobalHost = opt(global_host), |
210 |
|
%% If target_host_override is set (typically when routed out of bounce storage), |
211 |
|
%% host lookup is skipped and messages are routed to target_host_override value. |
212 |
782 |
TargetHostOverride = get_metadata(Acc, target_host_override, undefined), |
213 |
782 |
case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of |
214 |
|
{ok, LocalHost} -> |
215 |
|
%% Continue routing with initialized metadata |
216 |
404 |
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost, |
217 |
|
From, To, LocalHost), |
218 |
404 |
?LOG_DEBUG(#{what => gd_route_local, |
219 |
|
text => <<"Routing global message to local datacenter">>, |
220 |
:-( |
gd_id => ID, local_host => LocalHost, acc => Acc}), |
221 |
404 |
{ok, TTL} = find_metadata(Acc, ttl), |
222 |
404 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, TTL), |
223 |
404 |
{From, To, Acc, Packet}; |
224 |
|
|
225 |
|
{ok, TargetHost} -> |
226 |
338 |
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost, |
227 |
|
From, To, TargetHost), |
228 |
338 |
case find_metadata(Acc, ttl) of |
229 |
|
{ok, 0} -> |
230 |
|
%% Just continue routing |
231 |
:-( |
?LOG_INFO(#{what => gd_route_ttl_zero, |
232 |
|
text => <<"Skip global distribution">>, |
233 |
:-( |
gd_id => ID, acc => Acc, target_host => TargetHost}), |
234 |
:-( |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1), |
235 |
:-( |
FPacket; |
236 |
|
{ok, TTL} -> |
237 |
338 |
?LOG_DEBUG(#{what => gd_reroute, ttl => TTL, |
238 |
|
text => <<"Forward stanza to remote cluster " |
239 |
|
"using global distribution">>, |
240 |
:-( |
gd_id => ID, acc => Acc, target_host => TargetHost}), |
241 |
337 |
Acc1 = put_metadata(Acc, ttl, TTL - 1), |
242 |
337 |
Acc2 = remove_metadata(Acc1, target_host_override), |
243 |
|
%% KNOWN ISSUE: will crash loudly if there are no connections available |
244 |
|
%% TODO: Discuss behaviour in such scenario |
245 |
337 |
Worker = get_bound_connection_noisy(TargetHost, ID, Acc2), |
246 |
336 |
mod_global_distrib_sender:send(Worker, {From, To, Acc2, Packet}), |
247 |
336 |
drop |
248 |
|
end; |
249 |
|
|
250 |
|
error -> |
251 |
40 |
?LOG_DEBUG(#{what => gd_route_failed, gd_id => ID, acc => Acc, |
252 |
:-( |
text => <<"Unable to route global: user not found in the routing table">>}), |
253 |
39 |
mongoose_hooks:mod_global_distrib_unknown_recipient(GlobalHost, {From, To, Acc, Packet}) |
254 |
|
end. |
255 |
|
|
256 |
|
%%-------------------------------------------------------------------- |
257 |
|
%% Helpers |
258 |
|
%%-------------------------------------------------------------------- |
259 |
|
|
260 |
|
-spec maybe_initialize_metadata({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> mongoose_acc:t(). |
261 |
|
maybe_initialize_metadata({_From, _To, Acc, _Packet}) -> |
262 |
782 |
case find_metadata(Acc, origin) of |
263 |
|
{error, undefined} -> |
264 |
403 |
Acc1 = put_metadata(Acc, ttl, opt(message_ttl)), |
265 |
403 |
ID = uuid:uuid_to_string(uuid:get_v4(), binary_standard), |
266 |
403 |
Acc2 = put_metadata(Acc1, id, ID), |
267 |
403 |
?LOG_DEBUG(#{what => gd_init_metadata, gd_id => ID, acc => Acc}), |
268 |
403 |
put_metadata(Acc2, origin, opt(local_host)); |
269 |
|
_ -> |
270 |
379 |
Acc |
271 |
|
end. |
272 |
|
|
273 |
|
get_bound_connection_noisy(TargetHost, GDID, Acc) -> |
274 |
337 |
try get_bound_connection(TargetHost, GDID) |
275 |
|
catch Class:Reason:Stacktrace -> |
276 |
1 |
?LOG_ERROR(#{what => gd_get_process_for_failed, |
277 |
|
gd_id => GDID, acc => Acc, target_host => TargetHost, |
278 |
:-( |
class => Class, reason => Reason, stacktrace => Stacktrace}), |
279 |
1 |
erlang:raise(Class, Reason, Stacktrace) |
280 |
|
end. |
281 |
|
|
282 |
|
-spec get_bound_connection(Server :: jid:lserver(), binary()) -> pid(). |
283 |
|
get_bound_connection(Server, GDID) when is_binary(GDID) -> |
284 |
337 |
get_bound_connection(Server, GDID, get({connection, Server})). |
285 |
|
|
286 |
|
-spec get_bound_connection(Server :: jid:lserver(), term(), pid() | undefined) -> pid(). |
287 |
|
get_bound_connection(Server, GDID, undefined) -> |
288 |
28 |
Pid = mod_global_distrib_sender:get_process_for(Server), |
289 |
27 |
put({connection, Server}, Pid), |
290 |
27 |
?LOG_DEBUG(#{what => gd_new_bound_connection, |
291 |
:-( |
server => Server, gd_id => GDID, gd_pid => Pid}), |
292 |
27 |
Pid; |
293 |
|
get_bound_connection(Server, GDID, Pid) when is_pid(Pid) -> |
294 |
310 |
case is_process_alive(Pid) of |
295 |
|
false -> |
296 |
1 |
?LOG_DEBUG(#{what => gd_dead_bound_connection, |
297 |
:-( |
server => Server, gd_id => GDID, gd_pid => Pid}), |
298 |
1 |
get_bound_connection(Server, GDID, undefined); |
299 |
|
true -> |
300 |
309 |
?LOG_DEBUG(#{what => gd_reuse_bound_connection, |
301 |
:-( |
server => Server, gd_id => GDID, gd_pid => Pid}), |
302 |
309 |
Pid |
303 |
|
end. |
304 |
|
|
305 |
|
%%-------------------------------------------------------------------- |
306 |
|
%% gen_mod logic |
307 |
|
%%-------------------------------------------------------------------- |
308 |
|
|
309 |
|
-spec deps(Opts :: proplists:proplist()) -> gen_mod:deps_list(). |
310 |
|
deps(Opts) -> |
311 |
105 |
ConnectionsOpts = |
312 |
|
lists:ukeysort(1, proplists:get_value(connections, Opts, []) ++ default_conn_opts()), |
313 |
105 |
CacheOpts = proplists:get_value(cache, Opts, []), |
314 |
105 |
BounceOpts = proplists:get_value(bounce, Opts, []), |
315 |
|
|
316 |
105 |
Deps0 = [{mod_global_distrib_mapping, CacheOpts ++ Opts, hard}, |
317 |
|
{mod_global_distrib_disco, Opts, hard}, |
318 |
|
{mod_global_distrib_receiver, ConnectionsOpts ++ Opts, hard}, |
319 |
|
{mod_global_distrib_sender, ConnectionsOpts ++ Opts, hard}, |
320 |
|
{mod_global_distrib_hosts_refresher, Opts, hard}], |
321 |
105 |
case BounceOpts of |
322 |
:-( |
false -> Deps0; |
323 |
105 |
_ -> [{mod_global_distrib_bounce, BounceOpts ++ Opts, hard} | Deps0] |
324 |
|
end. |
325 |
|
|
326 |
|
default_conn_opts() -> |
327 |
105 |
[{tls_opts, false}]. |
328 |
|
|
329 |
|
-spec start() -> any(). |
330 |
|
start() -> |
331 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram), |
332 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral), |
333 |
21 |
ejabberd_hooks:add(filter_packet, global, ?MODULE, maybe_reroute, 99). |
334 |
|
|
335 |
|
-spec stop() -> any(). |
336 |
|
stop() -> |
337 |
21 |
ejabberd_hooks:delete(filter_packet, global, ?MODULE, maybe_reroute, 99). |
338 |
|
|
339 |
|
%%-------------------------------------------------------------------- |
340 |
|
|
341 |
|
-spec lookup_recipients_host(TargetHost :: binary() | undefined, |
342 |
|
To :: jid:jid(), |
343 |
|
LocalHost :: binary(), |
344 |
|
GlobalHost :: binary()) -> |
345 |
|
{ok, binary()} | error. |
346 |
|
lookup_recipients_host(undefined, To, LocalHost, GlobalHost) -> |
347 |
772 |
lookup_recipients_host(To, LocalHost, GlobalHost); |
348 |
|
lookup_recipients_host(TargetHost, _To, _LocalHost, _GlobalHost) -> |
349 |
10 |
{ok, TargetHost}. |
350 |
|
|
351 |
|
-spec lookup_recipients_host(To :: jid:jid(), |
352 |
|
LocalHost :: binary(), |
353 |
|
GlobalHost :: binary()) -> {ok, binary()} | error. |
354 |
|
lookup_recipients_host(#jid{luser = <<>>, lserver = LServer}, LocalHost, GlobalHost) |
355 |
|
when LServer == LocalHost; LServer == GlobalHost -> |
356 |
3 |
{ok, LocalHost}; |
357 |
|
lookup_recipients_host(#jid{luser = <<>>, lserver = HostAddressedTo}, _LocalHost, _GlobalHost) -> |
358 |
20 |
mod_global_distrib_mapping:for_domain(HostAddressedTo); |
359 |
|
lookup_recipients_host(#jid{lserver = HostAddressedTo} = To, LocalHost, GlobalHost) -> |
360 |
749 |
case HostAddressedTo of |
361 |
:-( |
LocalHost -> {ok, LocalHost}; |
362 |
731 |
GlobalHost -> mod_global_distrib_mapping:for_jid(To); |
363 |
18 |
_ -> mod_global_distrib_mapping:for_domain(HostAddressedTo) |
364 |
|
end. |
365 |
|
|
366 |
|
-spec opt(Key :: atom()) -> term(). |
367 |
|
opt(Key) -> |
368 |
2370 |
mod_global_distrib_utils:opt(?MODULE, Key). |