./ct_report/coverage/mod_global_distrib.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 %% Intercepts filter_packet hook with maybe_reroute callback.
18 -module(mod_global_distrib).
19 -author('konrad.zemek@erlang-solutions.com').
20
21 -behaviour(gen_mod).
22 -behaviour(mongoose_module_metrics).
23
24 -include("global_distrib_metrics.hrl").
25 -include("jlib.hrl").
26 -include("mongoose.hrl").
27 -include("mongoose_config_spec.hrl").
28
29 -export([deps/2, start/2, stop/1, config_spec/0]).
30 -export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
31 -export([maybe_reroute/3]).
32 -export([process_opts/1, process_endpoint/1]).
33
34 -ignore_xref([remove_metadata/2]).
35
36 %%--------------------------------------------------------------------
37 %% gen_mod API
38 %%--------------------------------------------------------------------
39
40 %% Note: while this module should be enabled for all hosts,
41 %% it needs to be started only once - this is why deps/2 and start/2
42 %% do nothing for hosts other than global_host
43
44 -spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
45 deps(HostType, Opts = #{global_host := HostType, bounce := BounceOpts}) ->
46 %% Start each required module with the same opts for simplicity
47 106 [{Mod, Opts, hard} || Mod <- dep_modules() ++ bounce_modules(BounceOpts)];
48 deps(_HostType, #{}) ->
49
:-(
[].
50
51 dep_modules() ->
52 106 [mod_global_distrib_utils, mod_global_distrib_mapping, mod_global_distrib_disco,
53 mod_global_distrib_receiver, mod_global_distrib_hosts_refresher].
54
55 106 bounce_modules(#{enabled := true}) -> [mod_global_distrib_bounce];
56
:-(
bounce_modules(#{enabled := false}) -> [].
57
58 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
59 start(HostType, #{global_host := HostType}) ->
60 21 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
61 21 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
62 21 gen_hook:add_handlers(hooks());
63 start(_HostType, #{}) ->
64
:-(
ok.
65
66 -spec stop(mongooseim:host_type()) -> any().
67 stop(HostType) ->
68 21 case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of
69 21 HostType -> gen_hook:delete_handlers(hooks());
70
:-(
_ -> ok
71 end.
72
73 hooks() ->
74 42 [{filter_packet, global, fun ?MODULE:maybe_reroute/3, #{}, 99}].
75
76 -spec config_spec() -> mongoose_config_spec:config_section().
77 config_spec() ->
78 160 #section{
79 items = #{<<"global_host">> => #option{type = binary,
80 validate = domain},
81 <<"local_host">> => #option{type = binary,
82 validate = domain},
83 <<"message_ttl">> => #option{type = integer,
84 validate = non_negative},
85 <<"hosts_refresh_interval">> => #option{type = integer,
86 validate = non_negative},
87 <<"connections">> => connections_spec(),
88 <<"redis">> => redis_spec(),
89 <<"cache">> => cache_spec(),
90 <<"bounce">> => bounce_spec()
91 },
92 required = [<<"global_host">>, <<"local_host">>],
93 defaults = #{<<"message_ttl">> => 4,
94 <<"hosts_refresh_interval">> => 3000},
95 process = fun ?MODULE:process_opts/1
96 }.
97
98 connections_spec() ->
99 160 #section{
100 items = #{<<"endpoints">> => #list{items = endpoint_spec()},
101 <<"advertised_endpoints">> => #list{items = endpoint_spec()},
102 <<"connections_per_endpoint">> => #option{type = integer,
103 validate = non_negative},
104 <<"endpoint_refresh_interval">> => #option{type = integer,
105 validate = positive},
106 <<"endpoint_refresh_interval_when_empty">> => #option{type = integer,
107 validate = positive},
108 <<"disabled_gc_interval">> => #option{type = integer,
109 validate = positive},
110 <<"tls">> => tls_spec()
111 },
112 defaults = #{<<"connections_per_endpoint">> => 1,
113 <<"endpoint_refresh_interval">> => 60,
114 <<"endpoint_refresh_interval_when_empty">> => 3,
115 <<"disabled_gc_interval">> => 60},
116 include = always
117 }.
118
119 endpoint_spec() ->
120 320 #section{
121 items = #{<<"host">> => #option{type = string,
122 validate = network_address},
123 <<"port">> => #option{type = integer,
124 validate = port}
125 },
126 required = all,
127 process = fun ?MODULE:process_endpoint/1
128 }.
129
130 tls_spec() ->
131 160 TLSSection = mongoose_config_spec:tls([client, server], [fast_tls]),
132 160 TLSSection#section{process = fun mongoose_config_spec:process_fast_tls/1}.
133
134 redis_spec() ->
135 160 #section{
136 items = #{<<"pool">> => #option{type = atom,
137 validate = pool_name},
138 <<"expire_after">> => #option{type = integer,
139 validate = positive},
140 <<"refresh_after">> => #option{type = integer,
141 validate = non_negative}
142 },
143 defaults = #{<<"pool">> => global_distrib,
144 <<"expire_after">> => 120,
145 <<"refresh_after">> => 60},
146 include = always
147 }.
148
149 cache_spec() ->
150 160 #section{
151 items = #{<<"cache_missed">> => #option{type = boolean},
152 <<"domain_lifetime_seconds">> => #option{type = integer,
153 validate = non_negative},
154 <<"jid_lifetime_seconds">> => #option{type = integer,
155 validate = non_negative},
156 <<"max_jids">> => #option{type = integer,
157 validate = non_negative}
158 },
159 defaults = #{<<"cache_missed">> => true,
160 <<"domain_lifetime_seconds">> => 600,
161 <<"jid_lifetime_seconds">> => 5,
162 <<"max_jids">> => 10000
163 },
164 include = always
165 }.
166
167 bounce_spec() ->
168 160 #section{
169 items = #{<<"enabled">> => #option{type = boolean},
170 <<"resend_after_ms">> => #option{type = integer,
171 validate = non_negative},
172 <<"max_retries">> => #option{type = integer,
173 validate = non_negative}
174 },
175 defaults = #{<<"enabled">> => true,
176 <<"resend_after_ms">> => 200,
177 <<"max_retries">> => 4},
178 include = always
179 }.
180
181 -spec process_opts(gen_mod:module_opts()) -> gen_mod:module_opts().
182 process_opts(Opts = #{local_host := LocalHost, connections := Connections}) ->
183
:-(
Opts#{connections := process_connections(LocalHost, Connections)}.
184
185 process_connections(_LocalHost, Opts = #{advertised_endpoints := _,
186 endpoints := Endpoints}) ->
187
:-(
Opts#{resolved_endpoints => mod_global_distrib_utils:resolve_endpoints(Endpoints)};
188 process_connections(LocalHost, Opts = #{endpoints := Endpoints}) ->
189
:-(
process_connections(LocalHost, Opts#{advertised_endpoints => Endpoints});
190 process_connections(LocalHost, Opts) ->
191
:-(
process_connections(LocalHost, Opts#{endpoints => [{binary_to_list(LocalHost), 5555}]}).
192
193 -spec process_endpoint(map()) -> mod_global_distrib_utils:endpoint().
194 process_endpoint(#{host := Host, port := Port}) ->
195
:-(
{Host, Port}.
196
197 %%--------------------------------------------------------------------
198 %% public functions
199 %%--------------------------------------------------------------------
200
201 -spec get_metadata(mongoose_acc:t(), Key :: term(), Default :: term()) -> Value :: term().
202 get_metadata(Acc, Key, Default) ->
203 905 mongoose_acc:get(global_distrib, Key, Default, Acc).
204
205 -spec find_metadata(mongoose_acc:t(), Key :: term()) ->
206 {ok, Value :: term()} | {error, undefined}.
207 find_metadata(Acc, Key) ->
208 2489 try mongoose_acc:get(global_distrib, Key, Acc) of
209 2081 Value -> {ok, Value}
210 catch
211 _:_ ->
212 408 {error, undefined}
213 end.
214
215 -spec put_metadata(mongoose_acc:t(), Key :: term(), Value :: term()) -> mongoose_acc:t().
216 put_metadata(Acc, Key, Value) ->
217 1671 mongoose_acc:set_permanent(global_distrib, Key, Value, Acc).
218
219 -spec remove_metadata(mongoose_acc:t(), Key :: term()) -> mongoose_acc:t().
220 remove_metadata(Acc, Key) ->
221 343 mongoose_acc:delete(global_distrib, Key, Acc).
222
223 %%--------------------------------------------------------------------
224 %% Hooks implementation
225 %%--------------------------------------------------------------------
226
227 -spec maybe_reroute(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when
228 FPacket :: mongoose_hooks:filter_packet_acc(),
229 Params :: map(),
230 Extra :: map().
231 maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From,
232 #jid{ luser = SameUser, lserver = SameServer } = _To,
233 _Acc, _Packet} = FPacket, _, _) ->
234 %% GD is not designed to support two user sessions existing in distinct clusters
235 %% and here we explicitly block routing stanzas between them.
236 %% Without this clause, test_pm_with_ungraceful_reconnection_to_different_server test
237 %% was randomly failing because sometimes 'unavailable' presence from a dead session
238 %% was poisoning reg1 cache. In such case, reg1 tried to route locally stanzas
239 %% from unacked SM buffer, leading to an error, while a brand new, shiny Eve
240 %% on mim1 was waiting.
241 186 {ok, FPacket};
242 maybe_reroute({From, To, _, Packet} = FPacket, _, _) ->
243 827 Acc = maybe_initialize_metadata(FPacket),
244 827 {ok, ID} = find_metadata(Acc, id),
245 827 LocalHost = opt(local_host),
246 827 GlobalHost = opt(global_host),
247 %% If target_host_override is set (typically when routed out of bounce storage),
248 %% host lookup is skipped and messages are routed to target_host_override value.
249 827 TargetHostOverride = get_metadata(Acc, target_host_override, undefined),
250 827 ResultFPacket = case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of
251 {ok, LocalHost} ->
252 %% Continue routing with initialized metadata
253 406 mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
254 From, To, LocalHost),
255 406 ?LOG_DEBUG(#{what => gd_route_local,
256 text => <<"Routing global message to local datacenter">>,
257
:-(
gd_id => ID, local_host => LocalHost, acc => Acc}),
258 406 {ok, TTL} = find_metadata(Acc, ttl),
259 406 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, TTL),
260 406 {From, To, Acc, Packet};
261
262 {ok, TargetHost} ->
263 343 mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
264 From, To, TargetHost),
265 343 case find_metadata(Acc, ttl) of
266 {ok, 0} ->
267 %% Just continue routing
268
:-(
?LOG_INFO(#{what => gd_route_ttl_zero,
269 text => <<"Skip global distribution">>,
270
:-(
gd_id => ID, acc => Acc, target_host => TargetHost}),
271
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
272
:-(
FPacket;
273 {ok, TTL} ->
274 343 ?LOG_DEBUG(#{what => gd_reroute, ttl => TTL,
275 text => <<"Forward stanza to remote cluster "
276 "using global distribution">>,
277
:-(
gd_id => ID, acc => Acc, target_host => TargetHost}),
278 343 Acc1 = put_metadata(Acc, ttl, TTL - 1),
279 343 Acc2 = remove_metadata(Acc1, target_host_override),
280 %% KNOWN ISSUE: will crash loudly if there are no connections available
281 %% TODO: Discuss behaviour in such scenario
282 343 Worker = get_bound_connection_noisy(TargetHost, ID, Acc2),
283 342 mod_global_distrib_sender:send(Worker, {From, To, Acc2, Packet}),
284 342 drop
285 end;
286
287 error ->
288 78 ?LOG_DEBUG(#{what => gd_route_failed, gd_id => ID, acc => Acc,
289
:-(
text => <<"Unable to route global: user not found in the routing table">>}),
290 78 mongoose_hooks:mod_global_distrib_unknown_recipient(GlobalHost, {From, To, Acc, Packet})
291 end,
292 826 case ResultFPacket of
293 drop ->
294 412 {stop, drop};
295 ResultFPacket ->
296 414 {ok, ResultFPacket}
297 end.
298
299 %%--------------------------------------------------------------------
300 %% Helpers
301 %%--------------------------------------------------------------------
302
303 -spec maybe_initialize_metadata({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> mongoose_acc:t().
304 maybe_initialize_metadata({_From, _To, Acc, _Packet}) ->
305 827 case find_metadata(Acc, origin) of
306 {error, undefined} ->
307 408 Acc1 = put_metadata(Acc, ttl, opt(message_ttl)),
308 408 ID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
309 408 Acc2 = put_metadata(Acc1, id, ID),
310 408 ?LOG_DEBUG(#{what => gd_init_metadata, gd_id => ID, acc => Acc}),
311 408 put_metadata(Acc2, origin, opt(local_host));
312 _ ->
313 419 Acc
314 end.
315
316 get_bound_connection_noisy(TargetHost, GDID, Acc) ->
317 343 try get_bound_connection(TargetHost, GDID)
318 catch Class:Reason:Stacktrace ->
319 1 ?LOG_ERROR(#{what => gd_get_process_for_failed,
320 gd_id => GDID, acc => Acc, target_host => TargetHost,
321
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
322 1 erlang:raise(Class, Reason, Stacktrace)
323 end.
324
325 -spec get_bound_connection(Server :: jid:lserver(), binary()) -> pid().
326 get_bound_connection(Server, GDID) when is_binary(GDID) ->
327 343 get_bound_connection(Server, GDID, get({connection, Server})).
328
329 -spec get_bound_connection(Server :: jid:lserver(), term(), pid() | undefined) -> pid().
330 get_bound_connection(Server, GDID, undefined) ->
331 28 Pid = mod_global_distrib_sender:get_process_for(Server),
332 27 put({connection, Server}, Pid),
333 27 ?LOG_DEBUG(#{what => gd_new_bound_connection,
334
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
335 27 Pid;
336 get_bound_connection(Server, GDID, Pid) when is_pid(Pid) ->
337 316 case is_process_alive(Pid) of
338 false ->
339 1 ?LOG_DEBUG(#{what => gd_dead_bound_connection,
340
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
341 1 get_bound_connection(Server, GDID, undefined);
342 true ->
343 315 ?LOG_DEBUG(#{what => gd_reuse_bound_connection,
344
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
345 315 Pid
346 end.
347
348 -spec lookup_recipients_host(TargetHost :: binary() | undefined,
349 To :: jid:jid(),
350 LocalHost :: binary(),
351 GlobalHost :: binary()) ->
352 {ok, binary()} | error.
353 lookup_recipients_host(undefined, To, LocalHost, GlobalHost) ->
354 793 lookup_recipients_host(To, LocalHost, GlobalHost);
355 lookup_recipients_host(TargetHost, _To, _LocalHost, _GlobalHost) ->
356 34 {ok, TargetHost}.
357
358 -spec lookup_recipients_host(To :: jid:jid(),
359 LocalHost :: binary(),
360 GlobalHost :: binary()) -> {ok, binary()} | error.
361 lookup_recipients_host(#jid{luser = <<>>, lserver = LServer}, LocalHost, GlobalHost)
362 when LServer == LocalHost; LServer == GlobalHost ->
363 3 {ok, LocalHost};
364 lookup_recipients_host(#jid{luser = <<>>, lserver = HostAddressedTo}, _LocalHost, _GlobalHost) ->
365 20 mod_global_distrib_mapping:for_domain(HostAddressedTo);
366 lookup_recipients_host(#jid{lserver = HostAddressedTo} = To, LocalHost, GlobalHost) ->
367 770 case HostAddressedTo of
368
:-(
LocalHost -> {ok, LocalHost};
369 752 GlobalHost -> mod_global_distrib_mapping:for_jid(To);
370 18 _ -> mod_global_distrib_mapping:for_domain(HostAddressedTo)
371 end.
372
373 -spec opt(Key :: atom()) -> term().
374 opt(Key) ->
375 2470 mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source