./ct_report/coverage/mod_global_distrib.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 %% Intercepts filter_packet hook with maybe_reroute callback.
18 -module(mod_global_distrib).
19 -author('konrad.zemek@erlang-solutions.com').
20
21 -behaviour(gen_mod).
22 -behaviour(mongoose_module_metrics).
23
24 -include("global_distrib_metrics.hrl").
25 -include("jlib.hrl").
26 -include("mongoose.hrl").
27 -include("mongoose_config_spec.hrl").
28
29 -export([deps/2, start/2, stop/1, config_spec/0]).
30 -export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
31 -export([maybe_reroute/1]).
32 -export([process_opts/1, process_endpoint/1]).
33
34 -ignore_xref([maybe_reroute/1, remove_metadata/2]).
35
36 %%--------------------------------------------------------------------
37 %% gen_mod API
38 %%--------------------------------------------------------------------
39
40 %% Note: while this module should be enabled for all hosts,
41 %% it needs to be started only once - this is why deps/2 and start/2
42 %% do nothing for hosts other than global_host
43
44 -spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
45 deps(HostType, Opts = #{global_host := HostType, bounce := BounceOpts}) ->
46 %% Start each required module with the same opts for simplicity
47
:-(
[{Mod, Opts, hard} || Mod <- dep_modules() ++ bounce_modules(BounceOpts)];
48 deps(_HostType, #{}) ->
49
:-(
[].
50
51 dep_modules() ->
52
:-(
[mod_global_distrib_utils, mod_global_distrib_mapping, mod_global_distrib_disco,
53 mod_global_distrib_receiver, mod_global_distrib_hosts_refresher].
54
55
:-(
bounce_modules(#{enabled := true}) -> [mod_global_distrib_bounce];
56
:-(
bounce_modules(#{enabled := false}) -> [].
57
58 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
59 start(HostType, #{global_host := HostType}) ->
60
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
61
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
62
:-(
ejabberd_hooks:add(hooks());
63 start(_HostType, #{}) ->
64
:-(
ok.
65
66 -spec stop(mongooseim:host_type()) -> any().
67 stop(HostType) ->
68
:-(
case gen_mod:get_module_opt(HostType, ?MODULE, global_host) of
69
:-(
HostType -> ejabberd_hooks:delete(hooks());
70
:-(
_ -> ok
71 end.
72
73 hooks() ->
74
:-(
[{filter_packet, global, ?MODULE, maybe_reroute, 99}].
75
76 -spec config_spec() -> mongoose_config_spec:config_section().
77 config_spec() ->
78 152 #section{
79 items = #{<<"global_host">> => #option{type = binary,
80 validate = domain},
81 <<"local_host">> => #option{type = binary,
82 validate = domain},
83 <<"message_ttl">> => #option{type = integer,
84 validate = non_negative},
85 <<"hosts_refresh_interval">> => #option{type = integer,
86 validate = non_negative},
87 <<"connections">> => connections_spec(),
88 <<"redis">> => redis_spec(),
89 <<"cache">> => cache_spec(),
90 <<"bounce">> => bounce_spec()
91 },
92 required = [<<"global_host">>, <<"local_host">>],
93 defaults = #{<<"message_ttl">> => 4,
94 <<"hosts_refresh_interval">> => 3000},
95 format_items = map,
96 process = fun ?MODULE:process_opts/1
97 }.
98
99 connections_spec() ->
100 152 #section{
101 items = #{<<"endpoints">> => #list{items = endpoint_spec()},
102 <<"advertised_endpoints">> => #list{items = endpoint_spec()},
103 <<"connections_per_endpoint">> => #option{type = integer,
104 validate = non_negative},
105 <<"endpoint_refresh_interval">> => #option{type = integer,
106 validate = positive},
107 <<"endpoint_refresh_interval_when_empty">> => #option{type = integer,
108 validate = positive},
109 <<"disabled_gc_interval">> => #option{type = integer,
110 validate = positive},
111 <<"tls">> => tls_spec()
112 },
113 defaults = #{<<"connections_per_endpoint">> => 1,
114 <<"endpoint_refresh_interval">> => 60,
115 <<"endpoint_refresh_interval_when_empty">> => 3,
116 <<"disabled_gc_interval">> => 60},
117 format_items = map,
118 include = always
119 }.
120
121 endpoint_spec() ->
122 304 #section{
123 items = #{<<"host">> => #option{type = string,
124 validate = network_address},
125 <<"port">> => #option{type = integer,
126 validate = port}
127 },
128 required = all,
129 format_items = map,
130 process = fun ?MODULE:process_endpoint/1
131 }.
132
133 tls_spec() ->
134 152 #section{
135 items = #{<<"certfile">> => #option{type = string,
136 validate = filename},
137 <<"cacertfile">> => #option{type = string,
138 validate = filename,
139 wrap = {kv, cafile}},
140 <<"ciphers">> => #option{type = string},
141 <<"dhfile">> => #option{type = string,
142 validate = filename}
143 },
144 required = [<<"certfile">>, <<"cacertfile">>],
145 defaults = #{<<"ciphers">> => "TLSv1.2:TLSv1.3"},
146 format_items = map
147 }.
148
149 redis_spec() ->
150 152 #section{
151 items = #{<<"pool">> => #option{type = atom,
152 validate = pool_name},
153 <<"expire_after">> => #option{type = integer,
154 validate = positive},
155 <<"refresh_after">> => #option{type = integer,
156 validate = non_negative}
157 },
158 defaults = #{<<"pool">> => global_distrib,
159 <<"expire_after">> => 120,
160 <<"refresh_after">> => 60},
161 format_items = map,
162 include = always
163 }.
164
165 cache_spec() ->
166 152 #section{
167 items = #{<<"cache_missed">> => #option{type = boolean},
168 <<"domain_lifetime_seconds">> => #option{type = integer,
169 validate = non_negative},
170 <<"jid_lifetime_seconds">> => #option{type = integer,
171 validate = non_negative},
172 <<"max_jids">> => #option{type = integer,
173 validate = non_negative}
174 },
175 defaults = #{<<"cache_missed">> => true,
176 <<"domain_lifetime_seconds">> => 600,
177 <<"jid_lifetime_seconds">> => 5,
178 <<"max_jids">> => 10000
179 },
180 format_items = map,
181 include = always
182 }.
183
184 bounce_spec() ->
185 152 #section{
186 items = #{<<"enabled">> => #option{type = boolean},
187 <<"resend_after_ms">> => #option{type = integer,
188 validate = non_negative},
189 <<"max_retries">> => #option{type = integer,
190 validate = non_negative}
191 },
192 defaults = #{<<"enabled">> => true,
193 <<"resend_after_ms">> => 200,
194 <<"max_retries">> => 4},
195 format_items = map,
196 include = always
197 }.
198
199 -spec process_opts(gen_mod:module_opts()) -> gen_mod:module_opts().
200 process_opts(Opts = #{local_host := LocalHost, connections := Connections}) ->
201
:-(
Opts#{connections := process_connections(LocalHost, Connections)}.
202
203 process_connections(_LocalHost, Opts = #{advertised_endpoints := _,
204 endpoints := Endpoints}) ->
205
:-(
Opts#{resolved_endpoints => mod_global_distrib_utils:resolve_endpoints(Endpoints)};
206 process_connections(LocalHost, Opts = #{endpoints := Endpoints}) ->
207
:-(
process_connections(LocalHost, Opts#{advertised_endpoints => Endpoints});
208 process_connections(LocalHost, Opts) ->
209
:-(
process_connections(LocalHost, Opts#{endpoints => [{binary_to_list(LocalHost), 5555}]}).
210
211 -spec process_endpoint(map()) -> mod_global_distrib_utils:endpoint().
212 process_endpoint(#{host := Host, port := Port}) ->
213
:-(
{Host, Port}.
214
215 %%--------------------------------------------------------------------
216 %% public functions
217 %%--------------------------------------------------------------------
218
219 -spec get_metadata(mongoose_acc:t(), Key :: term(), Default :: term()) -> Value :: term().
220 get_metadata(Acc, Key, Default) ->
221
:-(
mongoose_acc:get(global_distrib, Key, Default, Acc).
222
223 -spec find_metadata(mongoose_acc:t(), Key :: term()) ->
224 {ok, Value :: term()} | {error, undefined}.
225 find_metadata(Acc, Key) ->
226
:-(
try mongoose_acc:get(global_distrib, Key, Acc) of
227
:-(
Value -> {ok, Value}
228 catch
229 _:_ ->
230
:-(
{error, undefined}
231 end.
232
233 -spec put_metadata(mongoose_acc:t(), Key :: term(), Value :: term()) -> mongoose_acc:t().
234 put_metadata(Acc, Key, Value) ->
235
:-(
mongoose_acc:set_permanent(global_distrib, Key, Value, Acc).
236
237 -spec remove_metadata(mongoose_acc:t(), Key :: term()) -> mongoose_acc:t().
238 remove_metadata(Acc, Key) ->
239
:-(
mongoose_acc:delete(global_distrib, Key, Acc).
240
241 %%--------------------------------------------------------------------
242 %% Hooks implementation
243 %%--------------------------------------------------------------------
244
245 -spec maybe_reroute(drop) -> drop;
246 ({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) ->
247 drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}.
248
:-(
maybe_reroute(drop) -> drop;
249 maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From,
250 #jid{ luser = SameUser, lserver = SameServer } = _To,
251 _Acc, _Packet} = FPacket) ->
252 %% GD is not designed to support two user sessions existing in distinct clusters
253 %% and here we explicitly block routing stanzas between them.
254 %% Without this clause, test_pm_with_ungraceful_reconnection_to_different_server test
255 %% was randomly failing because sometimes 'unavailable' presence from a dead session
256 %% was poisoning reg1 cache. In such case, reg1 tried to route locally stanzas
257 %% from unacked SM buffer, leading to an error, while a brand new, shiny Eve
258 %% on mim1 was waiting.
259
:-(
FPacket;
260 maybe_reroute({From, To, _, Packet} = FPacket) ->
261
:-(
Acc = maybe_initialize_metadata(FPacket),
262
:-(
{ok, ID} = find_metadata(Acc, id),
263
:-(
LocalHost = opt(local_host),
264
:-(
GlobalHost = opt(global_host),
265 %% If target_host_override is set (typically when routed out of bounce storage),
266 %% host lookup is skipped and messages are routed to target_host_override value.
267
:-(
TargetHostOverride = get_metadata(Acc, target_host_override, undefined),
268
:-(
case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of
269 {ok, LocalHost} ->
270 %% Continue routing with initialized metadata
271
:-(
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
272 From, To, LocalHost),
273
:-(
?LOG_DEBUG(#{what => gd_route_local,
274 text => <<"Routing global message to local datacenter">>,
275
:-(
gd_id => ID, local_host => LocalHost, acc => Acc}),
276
:-(
{ok, TTL} = find_metadata(Acc, ttl),
277
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, TTL),
278
:-(
{From, To, Acc, Packet};
279
280 {ok, TargetHost} ->
281
:-(
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
282 From, To, TargetHost),
283
:-(
case find_metadata(Acc, ttl) of
284 {ok, 0} ->
285 %% Just continue routing
286
:-(
?LOG_INFO(#{what => gd_route_ttl_zero,
287 text => <<"Skip global distribution">>,
288
:-(
gd_id => ID, acc => Acc, target_host => TargetHost}),
289
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
290
:-(
FPacket;
291 {ok, TTL} ->
292
:-(
?LOG_DEBUG(#{what => gd_reroute, ttl => TTL,
293 text => <<"Forward stanza to remote cluster "
294 "using global distribution">>,
295
:-(
gd_id => ID, acc => Acc, target_host => TargetHost}),
296
:-(
Acc1 = put_metadata(Acc, ttl, TTL - 1),
297
:-(
Acc2 = remove_metadata(Acc1, target_host_override),
298 %% KNOWN ISSUE: will crash loudly if there are no connections available
299 %% TODO: Discuss behaviour in such scenario
300
:-(
Worker = get_bound_connection_noisy(TargetHost, ID, Acc2),
301
:-(
mod_global_distrib_sender:send(Worker, {From, To, Acc2, Packet}),
302
:-(
drop
303 end;
304
305 error ->
306
:-(
?LOG_DEBUG(#{what => gd_route_failed, gd_id => ID, acc => Acc,
307
:-(
text => <<"Unable to route global: user not found in the routing table">>}),
308
:-(
mongoose_hooks:mod_global_distrib_unknown_recipient(GlobalHost, {From, To, Acc, Packet})
309 end.
310
311 %%--------------------------------------------------------------------
312 %% Helpers
313 %%--------------------------------------------------------------------
314
315 -spec maybe_initialize_metadata({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> mongoose_acc:t().
316 maybe_initialize_metadata({_From, _To, Acc, _Packet}) ->
317
:-(
case find_metadata(Acc, origin) of
318 {error, undefined} ->
319
:-(
Acc1 = put_metadata(Acc, ttl, opt(message_ttl)),
320
:-(
ID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
321
:-(
Acc2 = put_metadata(Acc1, id, ID),
322
:-(
?LOG_DEBUG(#{what => gd_init_metadata, gd_id => ID, acc => Acc}),
323
:-(
put_metadata(Acc2, origin, opt(local_host));
324 _ ->
325
:-(
Acc
326 end.
327
328 get_bound_connection_noisy(TargetHost, GDID, Acc) ->
329
:-(
try get_bound_connection(TargetHost, GDID)
330 catch Class:Reason:Stacktrace ->
331
:-(
?LOG_ERROR(#{what => gd_get_process_for_failed,
332 gd_id => GDID, acc => Acc, target_host => TargetHost,
333
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
334
:-(
erlang:raise(Class, Reason, Stacktrace)
335 end.
336
337 -spec get_bound_connection(Server :: jid:lserver(), binary()) -> pid().
338 get_bound_connection(Server, GDID) when is_binary(GDID) ->
339
:-(
get_bound_connection(Server, GDID, get({connection, Server})).
340
341 -spec get_bound_connection(Server :: jid:lserver(), term(), pid() | undefined) -> pid().
342 get_bound_connection(Server, GDID, undefined) ->
343
:-(
Pid = mod_global_distrib_sender:get_process_for(Server),
344
:-(
put({connection, Server}, Pid),
345
:-(
?LOG_DEBUG(#{what => gd_new_bound_connection,
346
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
347
:-(
Pid;
348 get_bound_connection(Server, GDID, Pid) when is_pid(Pid) ->
349
:-(
case is_process_alive(Pid) of
350 false ->
351
:-(
?LOG_DEBUG(#{what => gd_dead_bound_connection,
352
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
353
:-(
get_bound_connection(Server, GDID, undefined);
354 true ->
355
:-(
?LOG_DEBUG(#{what => gd_reuse_bound_connection,
356
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
357
:-(
Pid
358 end.
359
360 -spec lookup_recipients_host(TargetHost :: binary() | undefined,
361 To :: jid:jid(),
362 LocalHost :: binary(),
363 GlobalHost :: binary()) ->
364 {ok, binary()} | error.
365 lookup_recipients_host(undefined, To, LocalHost, GlobalHost) ->
366
:-(
lookup_recipients_host(To, LocalHost, GlobalHost);
367 lookup_recipients_host(TargetHost, _To, _LocalHost, _GlobalHost) ->
368
:-(
{ok, TargetHost}.
369
370 -spec lookup_recipients_host(To :: jid:jid(),
371 LocalHost :: binary(),
372 GlobalHost :: binary()) -> {ok, binary()} | error.
373 lookup_recipients_host(#jid{luser = <<>>, lserver = LServer}, LocalHost, GlobalHost)
374 when LServer == LocalHost; LServer == GlobalHost ->
375
:-(
{ok, LocalHost};
376 lookup_recipients_host(#jid{luser = <<>>, lserver = HostAddressedTo}, _LocalHost, _GlobalHost) ->
377
:-(
mod_global_distrib_mapping:for_domain(HostAddressedTo);
378 lookup_recipients_host(#jid{lserver = HostAddressedTo} = To, LocalHost, GlobalHost) ->
379
:-(
case HostAddressedTo of
380
:-(
LocalHost -> {ok, LocalHost};
381
:-(
GlobalHost -> mod_global_distrib_mapping:for_jid(To);
382
:-(
_ -> mod_global_distrib_mapping:for_domain(HostAddressedTo)
383 end.
384
385 -spec opt(Key :: atom()) -> term().
386 opt(Key) ->
387
:-(
mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source