
1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
17 %% Intercepts filter_packet hook with maybe_reroute callback.
18 -module(mod_global_distrib).
19 -author('konrad.zemek@erlang-solutions.com').
21 -behaviour(gen_mod).
22 -behaviour(mongoose_module_metrics).
24 -include("global_distrib_metrics.hrl").
25 -include("jlib.hrl").
26 -include("mongoose.hrl").
27 -include("mongoose_config_spec.hrl").
29 -export([deps/2, start/2, stop/1, config_spec/0]).
30 -export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
31 -export([maybe_reroute/3]).
32 -export([process_opts/1, process_endpoint/1]).
34 -ignore_xref([remove_metadata/2]).
36 %%--------------------------------------------------------------------
37 %% gen_mod API
38 %%--------------------------------------------------------------------
40 %% Note: while this module should be enabled for all hosts,
41 %% it needs to be started only once - this is why deps/2 and start/2
42 %% do nothing for hosts other than global_host
44 -spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
45 deps(HostType, Opts = #{global_host := HostType, bounce := BounceOpts}) ->
46 %% Start each required module with the same opts for simplicity
47 106 [{Mod, Opts, hard} || Mod <- dep_modules() ++ bounce_modules(BounceOpts)];
48 deps(_HostType, #{}) ->
51 dep_modules() ->
52 106 [mod_global_distrib_utils, mod_global_distrib_mapping, mod_global_distrib_disco,
53 mod_global_distrib_receiver, mod_global_distrib_hosts_refresher].
55 106 bounce_modules(#{enabled := true}) -> [mod_global_distrib_bounce];
bounce_modules(#{enabled := false}) -> [].
58 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
59 start(HostType, #{global_host := HostType}) ->
60 21 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
61 21 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
62 21 gen_hook:add_handlers(hooks());
63 start(_HostType, #{}) ->
66 -spec stop(mongooseim:host_type()) -> any().
67 stop(HostType) ->
68 21 case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of
69 21 HostType -> gen_hook:delete_handlers(hooks());
_ -> ok
71 end.
73 hooks() ->
74 42 [{filter_packet, global, fun ?MODULE:maybe_reroute/3, #{}, 99}].
76 -spec config_spec() -> mongoose_config_spec:config_section().
77 config_spec() ->
78 202 #section{
79 items = #{<<"global_host">> => #option{type = binary,
80 validate = domain},
81 <<"local_host">> => #option{type = binary,
82 validate = domain},
83 <<"message_ttl">> => #option{type = integer,
84 validate = non_negative},
85 <<"hosts_refresh_interval">> => #option{type = integer,
86 validate = non_negative},
87 <<"connections">> => connections_spec(),
88 <<"redis">> => redis_spec(),
89 <<"cache">> => cache_spec(),
90 <<"bounce">> => bounce_spec()
91 },
92 required = [<<"global_host">>, <<"local_host">>],
93 defaults = #{<<"message_ttl">> => 4,
94 <<"hosts_refresh_interval">> => 3000},
95 process = fun ?MODULE:process_opts/1
96 }.
98 connections_spec() ->
99 202 #section{
100 items = #{<<"endpoints">> => #list{items = endpoint_spec()},
101 <<"advertised_endpoints">> => #list{items = endpoint_spec()},
102 <<"connections_per_endpoint">> => #option{type = integer,
103 validate = non_negative},
104 <<"endpoint_refresh_interval">> => #option{type = integer,
105 validate = positive},
106 <<"endpoint_refresh_interval_when_empty">> => #option{type = integer,
107 validate = positive},
108 <<"disabled_gc_interval">> => #option{type = integer,
109 validate = positive},
110 <<"tls">> => tls_spec()
111 },
112 defaults = #{<<"connections_per_endpoint">> => 1,
113 <<"endpoint_refresh_interval">> => 60,
114 <<"endpoint_refresh_interval_when_empty">> => 3,
115 <<"disabled_gc_interval">> => 60},
116 include = always
117 }.
119 endpoint_spec() ->
120 404 #section{
121 items = #{<<"host">> => #option{type = string,
122 validate = network_address},
123 <<"port">> => #option{type = integer,
124 validate = port}
125 },
126 required = all,
127 process = fun ?MODULE:process_endpoint/1
128 }.
130 tls_spec() ->
131 202 mongoose_config_spec:tls([client, server], [fast_tls]).
133 redis_spec() ->
134 202 #section{
135 items = #{<<"pool">> => #option{type = atom,
136 validate = pool_name},
137 <<"expire_after">> => #option{type = integer,
138 validate = positive},
139 <<"refresh_after">> => #option{type = integer,
140 validate = non_negative}
141 },
142 defaults = #{<<"pool">> => global_distrib,
143 <<"expire_after">> => 120,
144 <<"refresh_after">> => 60},
145 include = always
146 }.
148 cache_spec() ->
149 202 #section{
150 items = #{<<"cache_missed">> => #option{type = boolean},
151 <<"domain_lifetime_seconds">> => #option{type = integer,
152 validate = non_negative},
153 <<"jid_lifetime_seconds">> => #option{type = integer,
154 validate = non_negative},
155 <<"max_jids">> => #option{type = integer,
156 validate = non_negative}
157 },
158 defaults = #{<<"cache_missed">> => true,
159 <<"domain_lifetime_seconds">> => 600,
160 <<"jid_lifetime_seconds">> => 5,
161 <<"max_jids">> => 10000
162 },
163 include = always
164 }.
166 bounce_spec() ->
167 202 #section{
168 items = #{<<"enabled">> => #option{type = boolean},
169 <<"resend_after_ms">> => #option{type = integer,
170 validate = non_negative},
171 <<"max_retries">> => #option{type = integer,
172 validate = non_negative}
173 },
174 defaults = #{<<"enabled">> => true,
175 <<"resend_after_ms">> => 200,
176 <<"max_retries">> => 4},
177 include = always
178 }.
180 -spec process_opts(gen_mod:module_opts()) -> gen_mod:module_opts().
181 process_opts(Opts = #{local_host := LocalHost, connections := Connections}) ->
Opts#{connections := process_connections(LocalHost, Connections)}.
184 process_connections(_LocalHost, Opts = #{advertised_endpoints := _,
185 endpoints := Endpoints}) ->
Opts#{resolved_endpoints => mod_global_distrib_utils:resolve_endpoints(Endpoints)};
187 process_connections(LocalHost, Opts = #{endpoints := Endpoints}) ->
process_connections(LocalHost, Opts#{advertised_endpoints => Endpoints});
189 process_connections(LocalHost, Opts) ->
process_connections(LocalHost, Opts#{endpoints => [{binary_to_list(LocalHost), 5555}]}).
192 -spec process_endpoint(map()) -> mod_global_distrib_utils:endpoint().
193 process_endpoint(#{host := Host, port := Port}) ->
{Host, Port}.
196 %%--------------------------------------------------------------------
197 %% public functions
198 %%--------------------------------------------------------------------
200 -spec get_metadata(mongoose_acc:t(), Key :: term(), Default :: term()) -> Value :: term().
201 get_metadata(Acc, Key, Default) ->
202 872 mongoose_acc:get(global_distrib, Key, Default, Acc).
204 -spec find_metadata(mongoose_acc:t(), Key :: term()) ->
205 {ok, Value :: term()} | {error, undefined}.
206 find_metadata(Acc, Key) ->
207 2439 try mongoose_acc:get(global_distrib, Key, Acc) of
208 2031 Value -> {ok, Value}
209 catch
210 _:_ ->
211 408 {error, undefined}
212 end.
214 -spec put_metadata(mongoose_acc:t(), Key :: term(), Value :: term()) -> mongoose_acc:t().
215 put_metadata(Acc, Key, Value) ->
216 1638 mongoose_acc:set_permanent(global_distrib, Key, Value, Acc).
218 -spec remove_metadata(mongoose_acc:t(), Key :: term()) -> mongoose_acc:t().
219 remove_metadata(Acc, Key) ->
220 342 mongoose_acc:delete(global_distrib, Key, Acc).
222 %%--------------------------------------------------------------------
223 %% Hooks implementation
224 %%--------------------------------------------------------------------
226 -spec maybe_reroute(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when
227 FPacket :: mongoose_hooks:filter_packet_acc(),
228 Params :: map(),
229 Extra :: map().
230 maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From,
231 #jid{ luser = SameUser, lserver = SameServer } = _To,
232 _Acc, _Packet} = FPacket, _, _) ->
233 %% GD is not designed to support two user sessions existing in distinct clusters
234 %% and here we explicitly block routing stanzas between them.
235 %% Without this clause, test_pm_with_ungraceful_reconnection_to_different_server test
236 %% was randomly failing because sometimes 'unavailable' presence from a dead session
237 %% was poisoning reg1 cache. In such case, reg1 tried to route locally stanzas
238 %% from unacked SM buffer, leading to an error, while a brand new, shiny Eve
239 %% on mim1 was waiting.
240 186 {ok, FPacket};
241 maybe_reroute({From, To, _, Packet} = FPacket, _, _) ->
242 810 Acc = maybe_initialize_metadata(FPacket),
243 810 {ok, ID} = find_metadata(Acc, id),
244 810 LocalHost = opt(local_host),
245 810 GlobalHost = opt(global_host),
246 %% If target_host_override is set (typically when routed out of bounce storage),
247 %% host lookup is skipped and messages are routed to target_host_override value.
248 810 TargetHostOverride = get_metadata(Acc, target_host_override, undefined),
249 810 ResultFPacket = case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of
250 {ok, LocalHost} ->
251 %% Continue routing with initialized metadata
252 406 mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
253 From, To, LocalHost),
254 406 ?LOG_DEBUG(#{what => gd_route_local,
255 text => <<"Routing global message to local datacenter">>,
gd_id => ID, local_host => LocalHost, acc => Acc}),
257 406 {ok, TTL} = find_metadata(Acc, ttl),
258 406 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, TTL),
259 406 {From, To, Acc, Packet};
261 {ok, TargetHost} ->
262 342 mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
263 From, To, TargetHost),
264 342 case find_metadata(Acc, ttl) of
265 {ok, 0} ->
266 %% Just continue routing
?LOG_INFO(#{what => gd_route_ttl_zero,
268 text => <<"Skip global distribution">>,
gd_id => ID, acc => Acc, target_host => TargetHost}),
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
272 {ok, TTL} ->
273 342 ?LOG_DEBUG(#{what => gd_reroute, ttl => TTL,
274 text => <<"Forward stanza to remote cluster "
275 "using global distribution">>,
gd_id => ID, acc => Acc, target_host => TargetHost}),
277 342 Acc1 = put_metadata(Acc, ttl, TTL - 1),
278 342 Acc2 = remove_metadata(Acc1, target_host_override),
279 %% KNOWN ISSUE: will crash loudly if there are no connections available
280 %% TODO: Discuss behaviour in such scenario
281 342 Worker = get_bound_connection_noisy(TargetHost, ID, Acc2),
282 341 mod_global_distrib_sender:send(Worker, {From, To, Acc2, Packet}),
283 341 drop
284 end;
286 error ->
287 62 ?LOG_DEBUG(#{what => gd_route_failed, gd_id => ID, acc => Acc,
text => <<"Unable to route global: user not found in the routing table">>}),
289 62 mongoose_hooks:mod_global_distrib_unknown_recipient(GlobalHost, {From, To, Acc, Packet})
290 end,
291 809 case ResultFPacket of
292 drop ->
293 395 {stop, drop};
294 ResultFPacket ->
295 414 {ok, ResultFPacket}
296 end.
298 %%--------------------------------------------------------------------
299 %% Helpers
300 %%--------------------------------------------------------------------
302 -spec maybe_initialize_metadata({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> mongoose_acc:t().
303 maybe_initialize_metadata({_From, _To, Acc, _Packet}) ->
304 810 case find_metadata(Acc, origin) of
305 {error, undefined} ->
306 408 Acc1 = put_metadata(Acc, ttl, opt(message_ttl)),
307 408 ID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
308 408 Acc2 = put_metadata(Acc1, id, ID),
309 408 ?LOG_DEBUG(#{what => gd_init_metadata, gd_id => ID, acc => Acc}),
310 408 put_metadata(Acc2, origin, opt(local_host));
311 _ ->
312 402 Acc
313 end.
315 get_bound_connection_noisy(TargetHost, GDID, Acc) ->
316 342 try get_bound_connection(TargetHost, GDID)
317 catch Class:Reason:Stacktrace ->
318 1 ?LOG_ERROR(#{what => gd_get_process_for_failed,
319 gd_id => GDID, acc => Acc, target_host => TargetHost,
class => Class, reason => Reason, stacktrace => Stacktrace}),
321 1 erlang:raise(Class, Reason, Stacktrace)
322 end.
324 -spec get_bound_connection(Server :: jid:lserver(), binary()) -> pid().
325 get_bound_connection(Server, GDID) when is_binary(GDID) ->
326 342 get_bound_connection(Server, GDID, get({connection, Server})).
328 -spec get_bound_connection(Server :: jid:lserver(), term(), pid() | undefined) -> pid().
329 get_bound_connection(Server, GDID, undefined) ->
330 28 Pid = mod_global_distrib_sender:get_process_for(Server),
331 27 put({connection, Server}, Pid),
332 27 ?LOG_DEBUG(#{what => gd_new_bound_connection,
server => Server, gd_id => GDID, gd_pid => Pid}),
334 27 Pid;
335 get_bound_connection(Server, GDID, Pid) when is_pid(Pid) ->
336 315 case is_process_alive(Pid) of
337 false ->
338 1 ?LOG_DEBUG(#{what => gd_dead_bound_connection,
server => Server, gd_id => GDID, gd_pid => Pid}),
340 1 get_bound_connection(Server, GDID, undefined);
341 true ->
342 314 ?LOG_DEBUG(#{what => gd_reuse_bound_connection,
server => Server, gd_id => GDID, gd_pid => Pid}),
344 314 Pid
345 end.
347 -spec lookup_recipients_host(TargetHost :: binary() | undefined,
348 To :: jid:jid(),
349 LocalHost :: binary(),
350 GlobalHost :: binary()) ->
351 {ok, binary()} | error.
352 lookup_recipients_host(undefined, To, LocalHost, GlobalHost) ->
353 792 lookup_recipients_host(To, LocalHost, GlobalHost);
354 lookup_recipients_host(TargetHost, _To, _LocalHost, _GlobalHost) ->
355 18 {ok, TargetHost}.
357 -spec lookup_recipients_host(To :: jid:jid(),
358 LocalHost :: binary(),
359 GlobalHost :: binary()) -> {ok, binary()} | error.
360 lookup_recipients_host(#jid{luser = <<>>, lserver = LServer}, LocalHost, GlobalHost)
361 when LServer == LocalHost; LServer == GlobalHost ->
362 3 {ok, LocalHost};
363 lookup_recipients_host(#jid{luser = <<>>, lserver = HostAddressedTo}, _LocalHost, _GlobalHost) ->
364 20 mod_global_distrib_mapping:for_domain(HostAddressedTo);
365 lookup_recipients_host(#jid{lserver = HostAddressedTo} = To, LocalHost, GlobalHost) ->
366 769 case HostAddressedTo of
LocalHost -> {ok, LocalHost};
368 751 GlobalHost -> mod_global_distrib_mapping:for_jid(To);
369 18 _ -> mod_global_distrib_mapping:for_domain(HostAddressedTo)
370 end.
372 -spec opt(Key :: atom()) -> term().
373 opt(Key) ->
374 2436 mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source