./ct_report/coverage/mod_global_distrib.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 %% Intercepts filter_packet hook with maybe_reroute callback.
18 -module(mod_global_distrib).
19 -author('konrad.zemek@erlang-solutions.com').
20
21 -behaviour(gen_mod).
22 -behaviour(mongoose_module_metrics).
23
24 -include("global_distrib_metrics.hrl").
25 -include("jlib.hrl").
26 -include("mongoose.hrl").
27 -include("mongoose_config_spec.hrl").
28
29 -export([deps/2, start/2, stop/1, config_spec/0]).
30 -export([find_metadata/2, get_metadata/3, remove_metadata/2, put_metadata/3]).
31 -export([maybe_reroute/1]).
32 -export([process_endpoints/1, process_bounce/1]).
33
34 -ignore_xref([maybe_reroute/1, remove_metadata/2]).
35
36 %%--------------------------------------------------------------------
37 %% gen_mod API
38 %% See "gen_mod logic" block below in this file
39 %%--------------------------------------------------------------------
40
41 -spec deps(Host :: jid:server(), Opts :: proplists:proplist()) -> gen_mod:deps_list().
42 deps(Host, Opts) ->
43
:-(
mod_global_distrib_utils:deps(?MODULE, Host, Opts, fun deps/1).
44
45 -spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
46 start(Host, Opts0) ->
47
:-(
Opts = [{message_ttl, 4} | Opts0],
48
:-(
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
49
50 -spec stop(Host :: jid:lserver()) -> any().
51 stop(Host) ->
52
:-(
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
53
54 -spec config_spec() -> mongoose_config_spec:config_section().
55 config_spec() ->
56 146 #section{
57 items = #{<<"global_host">> => #option{type = string,
58 validate = domain},
59 <<"local_host">> => #option{type = string,
60 validate = domain},
61 <<"message_ttl">> => #option{type = integer,
62 validate = non_negative},
63 <<"hosts_refresh_interval">> => #option{type = integer,
64 validate = non_negative},
65 <<"connections">> => connections_spec(),
66 <<"redis">> => redis_spec(),
67 <<"cache">> => cache_spec(),
68 <<"bounce">> => bounce_spec()
69 },
70 required = [<<"global_host">>, <<"local_host">>]
71 }.
72
73 connections_spec() ->
74 146 #section{
75 items = #{<<"endpoints">> => #list{items = endpoints_spec()},
76 <<"advertised_endpoints">> => #list{items = endpoints_spec()},
77 <<"connections_per_endpoint">> => #option{type = integer,
78 validate = non_negative},
79 <<"endpoint_refresh_interval">> => #option{type = integer,
80 validate = positive},
81 <<"endpoint_refresh_interval_when_empty">> => #option{type = integer,
82 validate = positive},
83 <<"disabled_gc_interval">> => #option{type = integer,
84 validate = positive},
85 <<"tls">> => tls_spec()
86 }
87 }.
88
89 endpoints_spec() ->
90 292 #section{
91 items = #{<<"host">> => #option{type = string,
92 validate = network_address},
93 <<"port">> => #option{type = integer,
94 validate = port}
95 },
96 required = all,
97 process = fun ?MODULE:process_endpoints/1
98 }.
99
100 tls_spec() ->
101 146 #section{
102 items = #{<<"certfile">> => #option{type = string,
103 validate = filename},
104 <<"cacertfile">> => #option{type = string,
105 validate = filename,
106 wrap = {kv, cafile}},
107 <<"ciphers">> => #option{type = string},
108 <<"dhfile">> => #option{type = string,
109 validate = filename}
110 },
111 required = [<<"certfile">>, <<"cacertfile">>],
112 wrap = {kv, tls_opts}
113 }.
114
115 redis_spec() ->
116 146 #section{
117 items = #{<<"pool">> => #option{type = atom,
118 validate = pool_name},
119 <<"expire_after">> => #option{type = integer,
120 validate = positive},
121 <<"refresh_after">> => #option{type = integer,
122 validate = non_negative}
123 }
124 }.
125
126 cache_spec() ->
127 146 #section{
128 items = #{<<"cache_missed">> => #option{type = boolean},
129 <<"domain_lifetime_seconds">> => #option{type = integer,
130 validate = non_negative},
131 <<"jid_lifetime_seconds">> => #option{type = integer,
132 validate = non_negative},
133 <<"max_jids">> => #option{type = integer,
134 validate = non_negative}
135 }
136 }.
137
138 bounce_spec() ->
139 146 #section{
140 items = #{<<"enabled">> => #option{type = boolean},
141 <<"resend_after_ms">> => #option{type = integer,
142 validate = non_negative},
143 <<"max_retries">> => #option{type = integer,
144 validate = non_negative}
145 },
146 process = fun ?MODULE:process_bounce/1
147 }.
148
149 process_endpoints(KV) ->
150
:-(
{[[{host, Host}], [{port, Port}]], []} = proplists:split(KV, [host, port]),
151
:-(
{Host, Port}.
152
153 process_bounce(KVs) ->
154
:-(
{[EnabledOpts], Opts} = proplists:split(KVs, [enabled]),
155
:-(
bounce_value(EnabledOpts, Opts).
156
157
:-(
bounce_value([{enabled, false}], _) -> false;
158
:-(
bounce_value(_, Opts) -> Opts.
159
160 %%--------------------------------------------------------------------
161 %% public functions
162 %%--------------------------------------------------------------------
163
164 -spec get_metadata(mongoose_acc:t(), Key :: term(), Default :: term()) -> Value :: term().
165 get_metadata(Acc, Key, Default) ->
166
:-(
mongoose_acc:get(global_distrib, Key, Default, Acc).
167
168 -spec find_metadata(mongoose_acc:t(), Key :: term()) ->
169 {ok, Value :: term()} | {error, undefined}.
170 find_metadata(Acc, Key) ->
171
:-(
try mongoose_acc:get(global_distrib, Key, Acc) of
172
:-(
Value -> {ok, Value}
173 catch
174 _:_ ->
175
:-(
{error, undefined}
176 end.
177
178 -spec put_metadata(mongoose_acc:t(), Key :: term(), Value :: term()) -> mongoose_acc:t().
179 put_metadata(Acc, Key, Value) ->
180
:-(
mongoose_acc:set_permanent(global_distrib, Key, Value, Acc).
181
182 -spec remove_metadata(mongoose_acc:t(), Key :: term()) -> mongoose_acc:t().
183 remove_metadata(Acc, Key) ->
184
:-(
mongoose_acc:delete(global_distrib, Key, Acc).
185
186 %%--------------------------------------------------------------------
187 %% Hooks implementation
188 %%--------------------------------------------------------------------
189
190 -spec maybe_reroute(drop) -> drop;
191 ({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) ->
192 drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}.
193
:-(
maybe_reroute(drop) -> drop;
194 maybe_reroute({#jid{ luser = SameUser, lserver = SameServer } = _From,
195 #jid{ luser = SameUser, lserver = SameServer } = _To,
196 _Acc, _Packet} = FPacket) ->
197 %% GD is not designed to support two user sessions existing in distinct clusters
198 %% and here we explicitly block routing stanzas between them.
199 %% Without this clause, test_pm_with_ungraceful_reconnection_to_different_server test
200 %% was randomly failing because sometimes 'unavailable' presence from a dead session
201 %% was poisoning reg1 cache. In such case, reg1 tried to route locally stanzas
202 %% from unacked SM buffer, leading to an error, while a brand new, shiny Eve
203 %% on mim1 was waiting.
204
:-(
FPacket;
205 maybe_reroute({From, To, _, Packet} = FPacket) ->
206
:-(
Acc = maybe_initialize_metadata(FPacket),
207
:-(
{ok, ID} = find_metadata(Acc, id),
208
:-(
LocalHost = opt(local_host),
209
:-(
GlobalHost = opt(global_host),
210 %% If target_host_override is set (typically when routed out of bounce storage),
211 %% host lookup is skipped and messages are routed to target_host_override value.
212
:-(
TargetHostOverride = get_metadata(Acc, target_host_override, undefined),
213
:-(
case lookup_recipients_host(TargetHostOverride, To, LocalHost, GlobalHost) of
214 {ok, LocalHost} ->
215 %% Continue routing with initialized metadata
216
:-(
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
217 From, To, LocalHost),
218
:-(
?LOG_DEBUG(#{what => gd_route_local,
219 text => <<"Routing global message to local datacenter">>,
220
:-(
gd_id => ID, local_host => LocalHost, acc => Acc}),
221
:-(
{ok, TTL} = find_metadata(Acc, ttl),
222
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, TTL),
223
:-(
{From, To, Acc, Packet};
224
225 {ok, TargetHost} ->
226
:-(
mongoose_hooks:mod_global_distrib_known_recipient(GlobalHost,
227 From, To, TargetHost),
228
:-(
case find_metadata(Acc, ttl) of
229 {ok, 0} ->
230 %% Just continue routing
231
:-(
?LOG_INFO(#{what => gd_route_ttl_zero,
232 text => <<"Skip global distribution">>,
233
:-(
gd_id => ID, acc => Acc, target_host => TargetHost}),
234
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
235
:-(
FPacket;
236 {ok, TTL} ->
237
:-(
?LOG_DEBUG(#{what => gd_reroute, ttl => TTL,
238 text => <<"Forward stanza to remote cluster "
239 "using global distribution">>,
240
:-(
gd_id => ID, acc => Acc, target_host => TargetHost}),
241
:-(
Acc1 = put_metadata(Acc, ttl, TTL - 1),
242
:-(
Acc2 = remove_metadata(Acc1, target_host_override),
243 %% KNOWN ISSUE: will crash loudly if there are no connections available
244 %% TODO: Discuss behaviour in such scenario
245
:-(
Worker = get_bound_connection_noisy(TargetHost, ID, Acc2),
246
:-(
mod_global_distrib_sender:send(Worker, {From, To, Acc2, Packet}),
247
:-(
drop
248 end;
249
250 error ->
251
:-(
?LOG_DEBUG(#{what => gd_route_failed, gd_id => ID, acc => Acc,
252
:-(
text => <<"Unable to route global: user not found in the routing table">>}),
253
:-(
mongoose_hooks:mod_global_distrib_unknown_recipient(GlobalHost, {From, To, Acc, Packet})
254 end.
255
256 %%--------------------------------------------------------------------
257 %% Helpers
258 %%--------------------------------------------------------------------
259
260 -spec maybe_initialize_metadata({jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> mongoose_acc:t().
261 maybe_initialize_metadata({_From, _To, Acc, _Packet}) ->
262
:-(
case find_metadata(Acc, origin) of
263 {error, undefined} ->
264
:-(
Acc1 = put_metadata(Acc, ttl, opt(message_ttl)),
265
:-(
ID = uuid:uuid_to_string(uuid:get_v4(), binary_standard),
266
:-(
Acc2 = put_metadata(Acc1, id, ID),
267
:-(
?LOG_DEBUG(#{what => gd_init_metadata, gd_id => ID, acc => Acc}),
268
:-(
put_metadata(Acc2, origin, opt(local_host));
269 _ ->
270
:-(
Acc
271 end.
272
273 get_bound_connection_noisy(TargetHost, GDID, Acc) ->
274
:-(
try get_bound_connection(TargetHost, GDID)
275 catch Class:Reason:Stacktrace ->
276
:-(
?LOG_ERROR(#{what => gd_get_process_for_failed,
277 gd_id => GDID, acc => Acc, target_host => TargetHost,
278
:-(
class => Class, reason => Reason, stacktrace => Stacktrace}),
279
:-(
erlang:raise(Class, Reason, Stacktrace)
280 end.
281
282 -spec get_bound_connection(Server :: jid:lserver(), binary()) -> pid().
283 get_bound_connection(Server, GDID) when is_binary(GDID) ->
284
:-(
get_bound_connection(Server, GDID, get({connection, Server})).
285
286 -spec get_bound_connection(Server :: jid:lserver(), term(), pid() | undefined) -> pid().
287 get_bound_connection(Server, GDID, undefined) ->
288
:-(
Pid = mod_global_distrib_sender:get_process_for(Server),
289
:-(
put({connection, Server}, Pid),
290
:-(
?LOG_DEBUG(#{what => gd_new_bound_connection,
291
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
292
:-(
Pid;
293 get_bound_connection(Server, GDID, Pid) when is_pid(Pid) ->
294
:-(
case is_process_alive(Pid) of
295 false ->
296
:-(
?LOG_DEBUG(#{what => gd_dead_bound_connection,
297
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
298
:-(
get_bound_connection(Server, GDID, undefined);
299 true ->
300
:-(
?LOG_DEBUG(#{what => gd_reuse_bound_connection,
301
:-(
server => Server, gd_id => GDID, gd_pid => Pid}),
302
:-(
Pid
303 end.
304
305 %%--------------------------------------------------------------------
306 %% gen_mod logic
307 %%--------------------------------------------------------------------
308
309 -spec deps(Opts :: proplists:proplist()) -> gen_mod:deps_list().
310 deps(Opts) ->
311
:-(
ConnectionsOpts =
312 lists:ukeysort(1, proplists:get_value(connections, Opts, []) ++ default_conn_opts()),
313
:-(
CacheOpts = proplists:get_value(cache, Opts, []),
314
:-(
BounceOpts = proplists:get_value(bounce, Opts, []),
315
316
:-(
Deps0 = [{mod_global_distrib_mapping, CacheOpts ++ Opts, hard},
317 {mod_global_distrib_disco, Opts, hard},
318 {mod_global_distrib_receiver, ConnectionsOpts ++ Opts, hard},
319 {mod_global_distrib_sender, ConnectionsOpts ++ Opts, hard},
320 {mod_global_distrib_hosts_refresher, Opts, hard}],
321
:-(
case BounceOpts of
322
:-(
false -> Deps0;
323
:-(
_ -> [{mod_global_distrib_bounce, BounceOpts ++ Opts, hard} | Deps0]
324 end.
325
326 default_conn_opts() ->
327
:-(
[{tls_opts, false}].
328
329 -spec start() -> any().
330 start() ->
331
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_DELIVERED_WITH_TTL, histogram),
332
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, spiral),
333
:-(
ejabberd_hooks:add(filter_packet, global, ?MODULE, maybe_reroute, 99).
334
335 -spec stop() -> any().
336 stop() ->
337
:-(
ejabberd_hooks:delete(filter_packet, global, ?MODULE, maybe_reroute, 99).
338
339 %%--------------------------------------------------------------------
340
341 -spec lookup_recipients_host(TargetHost :: binary() | undefined,
342 To :: jid:jid(),
343 LocalHost :: binary(),
344 GlobalHost :: binary()) ->
345 {ok, binary()} | error.
346 lookup_recipients_host(undefined, To, LocalHost, GlobalHost) ->
347
:-(
lookup_recipients_host(To, LocalHost, GlobalHost);
348 lookup_recipients_host(TargetHost, _To, _LocalHost, _GlobalHost) ->
349
:-(
{ok, TargetHost}.
350
351 -spec lookup_recipients_host(To :: jid:jid(),
352 LocalHost :: binary(),
353 GlobalHost :: binary()) -> {ok, binary()} | error.
354 lookup_recipients_host(#jid{luser = <<>>, lserver = LServer}, LocalHost, GlobalHost)
355 when LServer == LocalHost; LServer == GlobalHost ->
356
:-(
{ok, LocalHost};
357 lookup_recipients_host(#jid{luser = <<>>, lserver = HostAddressedTo}, _LocalHost, _GlobalHost) ->
358
:-(
mod_global_distrib_mapping:for_domain(HostAddressedTo);
359 lookup_recipients_host(#jid{lserver = HostAddressedTo} = To, LocalHost, GlobalHost) ->
360
:-(
case HostAddressedTo of
361
:-(
LocalHost -> {ok, LocalHost};
362
:-(
GlobalHost -> mod_global_distrib_mapping:for_jid(To);
363
:-(
_ -> mod_global_distrib_mapping:for_domain(HostAddressedTo)
364 end.
365
366 -spec opt(Key :: atom()) -> term().
367 opt(Key) ->
368
:-(
mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source