./ct_report/coverage/mod_global_distrib_mapping_redis.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_mapping_redis).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(mod_global_distrib_mapping_backend).
21
22 -include("mongoose.hrl").
23
24 -define(JIDS_ETS, mod_global_distrib_mapping_redis_jids).
25 -define(DOMAINS_ETS, mod_global_distrib_mapping_redis_domains).
26 -define(PUBLIC_DOMAINS_ETS, mod_global_distrib_mapping_redis_public_domains).
27
28 -export([start/1, stop/0,
29 put_session/1, get_session/1, delete_session/1,
30 put_domain/2, get_domain/1, delete_domain/1,
31 get_endpoints/1, get_domains/0, get_public_domains/0, get_hosts/0]).
32
33 -export([refresh/0, refresh/1]).
34
35 -export([init/1, handle_info/2]).
36
37 %% Only for debug & tests!
38 -export([nodes_key/0, set_endpoints/1]).
39
40 -ignore_xref([handle_info/2, init/1, nodes_key/0, refresh/0, refresh/1, set_endpoints/1]).
41
42 %%--------------------------------------------------------------------
43 %% API
44 %%--------------------------------------------------------------------
45
46 -spec start(gen_mod:module_opts()) -> any().
47 start(#{redis := #{refresh_after := RefreshAfter}}) ->
48
:-(
mod_global_distrib_utils:create_ets([?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]),
49
50
:-(
Refresher = {mod_global_distrib_redis_refresher,
51 {gen_server, start_link, [?MODULE, RefreshAfter, []]},
52 permanent, 1000, supervisor, [?MODULE]},
53
:-(
ejabberd_sup:start_child(Refresher),
54
:-(
ok.
55
56 -spec stop() -> ok.
57 stop() ->
58
:-(
lists:foreach(fun(Id) -> ejabberd_sup:stop_child(Id) end,
59 [?MODULE, mod_global_distrib_redis_refresher]),
60
:-(
[ets:delete(Tab) || Tab <- [?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]],
61
:-(
ok.
62
63 -spec put_session(jid:literal_jid()) -> ok.
64 put_session(Jid) ->
65
:-(
ets:insert(?JIDS_ETS, {Jid}),
66
:-(
do_put(Jid, opt(local_host)).
67
68 -spec get_session(jid:literal_jid()) -> {ok, jid:lserver()} | error.
69 get_session(Jid) ->
70
:-(
do_get(Jid).
71
72 -spec delete_session(jid:literal_jid()) -> ok.
73 delete_session(Jid) ->
74
:-(
ets:delete(?JIDS_ETS, Jid),
75
:-(
do_delete(Jid).
76
77 -spec put_domain(jid:lserver(), IsHidden :: boolean()) -> ok.
78 put_domain(Domain, IsHidden) ->
79
:-(
ets:insert(?DOMAINS_ETS, {Domain}),
80
:-(
{ok, _} = q([<<"SADD">>, domains_key(), Domain]),
81
:-(
do_put(Domain, opt(local_host)),
82
:-(
case IsHidden of
83 false ->
84
:-(
ets:insert(?PUBLIC_DOMAINS_ETS, {Domain}),
85
:-(
{ok, _} = q([<<"SADD">>, public_domains_key(), Domain]),
86
:-(
ok;
87 true ->
88
:-(
ok
89 end.
90
91 -spec get_domain(jid:lserver()) -> {ok, jid:lserver()} | error.
92 get_domain(Domain) ->
93
:-(
do_get(Domain).
94
95 -spec delete_domain(jid:lserver()) -> ok.
96 delete_domain(Domain) ->
97
:-(
ets:delete(?DOMAINS_ETS, Domain),
98
:-(
ets:delete(?PUBLIC_DOMAINS_ETS, Domain),
99
:-(
do_delete(Domain),
100
:-(
{ok, _} = q([<<"SREM">>, domains_key(), Domain]),
101
:-(
ok.
102
103 -spec get_domains() -> [jid:lserver()].
104 get_domains() ->
105
:-(
get_domains(fun domains_key/2).
106
107 -spec get_public_domains() -> [jid:lserver()].
108 get_public_domains() ->
109
:-(
get_domains(fun public_domains_key/2).
110
111 -spec get_endpoints(Host :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()].
112 get_endpoints(Host) ->
113
:-(
Nodes = get_nodes(Host),
114
:-(
get_endpoints_for_nodes(Host, Nodes).
115
116 get_endpoints_for_nodes(_Host, []) ->
117
:-(
[];
118 get_endpoints_for_nodes(Host, Nodes) ->
119
:-(
EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes],
120
:-(
{ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]),
121
:-(
lists:map(fun binary_to_endpoint/1, BinEndpoints).
122
123 %%--------------------------------------------------------------------
124 %% gen_server API
125 %%--------------------------------------------------------------------
126
127 init(RefreshAfter) ->
128
:-(
refresh_and_schedule_next("initial_autorefresh", RefreshAfter),
129
:-(
{ok, RefreshAfter}.
130
131 handle_info(refresh, RefreshAfter) ->
132
:-(
refresh_and_schedule_next("autorefresh", RefreshAfter),
133
:-(
{noreply, RefreshAfter}.
134
135 refresh() ->
136
:-(
refresh("reason_unknown").
137
138 refresh_and_schedule_next(Reason, RefreshAfter) ->
139
:-(
Reason2 = Reason ++ ",next_refresh_in=" ++ integer_to_list(RefreshAfter),
140
:-(
refresh(Reason2),
141
:-(
erlang:send_after(timer:seconds(RefreshAfter), self(), refresh).
142
143 -spec refresh(Reason :: string()) -> ok.
144 refresh(Reason) ->
145
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_hosts}),
146
:-(
refresh_hosts(),
147
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_nodes}),
148
:-(
refresh_nodes(),
149
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_jids}),
150
:-(
refresh_jids(),
151
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_endpoints}),
152
:-(
refresh_endpoints(),
153
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_domains}),
154
:-(
refresh_domains(),
155
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_public_domains}),
156
:-(
refresh_public_domains(),
157
:-(
?LOG_INFO(#{what => gd_refreshing_own_data_done, reason => Reason}),
158
:-(
ok.
159
160 %%--------------------------------------------------------------------
161 %% Helpers
162 %%--------------------------------------------------------------------
163
164 -spec q(Args :: list()) -> {ok, term()}.
165 q(Args) ->
166
:-(
{ok, Worker} = mongoose_wpool:get_worker(redis, global, pool()),
167
:-(
case eredis:q(Worker, Args) of
168 {ok, _} = OKRes ->
169
:-(
OKRes;
170 Error ->
171
:-(
?LOG_ERROR(#{what => gd_redis_query_error, worker => Worker,
172
:-(
redis_query => Args, reason => Error}),
173
:-(
error(Error)
174 end.
175
176 -spec nodes_key() -> binary().
177 nodes_key() ->
178
:-(
LocalHost = opt(local_host),
179
:-(
nodes_key(LocalHost).
180
181 -spec nodes_key(Host :: jid:lserver()) -> binary().
182 nodes_key(Host) when is_binary(Host) ->
183
:-(
<<Host/binary, "#{nodes}">>.
184
185 -spec endpoints_key() -> binary().
186 endpoints_key() ->
187
:-(
key(<<"endpoints">>).
188
189 -spec endpoints_key(Host :: jid:lserver(), Node :: binary()) -> binary().
190 endpoints_key(Host, Node) ->
191
:-(
key(Host, Node, <<"endpoints">>).
192
193 -spec domains_key() -> binary().
194 domains_key() ->
195
:-(
key(<<"domains">>).
196
197 -spec domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
198 domains_key(Host, Node) ->
199
:-(
key(Host, Node, <<"domains">>).
200
201 -spec public_domains_key() -> binary().
202 public_domains_key() ->
203
:-(
key(<<"public_domains">>).
204
205 -spec public_domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
206 public_domains_key(Host, Node) ->
207
:-(
key(Host, Node, <<"public_domains">>).
208
209 -spec key(Type :: binary()) -> binary().
210 key(Type) ->
211
:-(
LocalHost = opt(local_host),
212
:-(
Node = atom_to_binary(node(), latin1),
213
:-(
key(LocalHost, Node, Type).
214
215 -spec key(Host :: jid:lserver(), Node :: binary(), Type :: binary()) -> binary().
216 key(Host, Node, Type) ->
217
:-(
<<Host/binary, "#", Node/binary, "#{", Type/binary, "}">>.
218
219 -spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value().
220 opt(Key) ->
221
:-(
mod_global_distrib_utils:opt(mod_global_distrib_mapping, Key).
222
223 -spec expire_after() -> pos_integer().
224 expire_after() ->
225
:-(
opt([redis, expire_after]).
226
227 -spec pool() -> atom().
228 pool() ->
229
:-(
opt([redis, pool]).
230
231 -spec do_put(Key :: binary(), Host :: binary()) -> ok.
232 do_put(Key, Host) ->
233
:-(
{ok, _} = q([<<"SET">>, Key, Host, <<"EX">>, expire_after()]),
234
:-(
ok.
235
236 -spec do_get(Key :: binary()) -> {ok, Host :: binary()} | error.
237 do_get(Key) ->
238
:-(
case q([<<"GET">>, Key]) of
239
:-(
{ok, undefined} -> error;
240
:-(
{ok, Host} -> {ok, Host}
241 end.
242
243 -spec do_delete(Key :: binary()) -> ok.
244 do_delete(Key) ->
245
:-(
LocalHost = opt(local_host),
246
:-(
case q([<<"GET">>, Key]) of
247
:-(
{ok, LocalHost} -> {ok, _} = q([<<"DEL">>, Key]);
248
:-(
_ -> ok
249 end,
250
:-(
ok.
251
252 -spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) ->
253 [jid:lserver()].
254 get_domains(KeyFun) ->
255
:-(
Hosts = get_hosts(),
256
:-(
Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts),
257
:-(
Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes],
258
:-(
{ok, Domains} = q([<<"SUNION">> | Keys]),
259
:-(
Domains.
260
261 -spec refresh_hosts() -> any().
262 refresh_hosts() ->
263
:-(
q([<<"SADD">>, <<"hosts">>, opt(local_host)]).
264
265 -spec get_hosts() -> [Host :: jid:lserver()].
266 get_hosts() ->
267
:-(
{ok, Hosts} = q([<<"SMEMBERS">>, <<"hosts">>]),
268
:-(
Hosts.
269
270 -spec refresh_nodes() -> any().
271 refresh_nodes() ->
272
:-(
NodesKey = nodes_key(),
273
:-(
Now = erlang:system_time(second),
274
:-(
case get_expired_nodes(Now) of
275
:-(
[] -> ok;
276
:-(
ExpiredNodes -> q([<<"HDEL">>, NodesKey | ExpiredNodes])
277 end,
278
:-(
q([<<"HSET">>, NodesKey, atom_to_binary(node(), latin1), Now]).
279
280 -spec get_nodes(Host :: jid:lserver()) -> [Node :: binary()].
281 get_nodes(Host) ->
282
:-(
{ok, Nodes} = q([<<"HKEYS">>, nodes_key(Host)]),
283
:-(
Nodes.
284
285 -spec get_expired_nodes(Now :: integer()) -> [Node :: binary()].
286 get_expired_nodes(Now) ->
287
:-(
{ok, Results} = q([<<"HGETALL">>, nodes_key()]),
288
:-(
lists:foldl(
289 fun
290 (Val, [{Key} | Acc]) ->
291
:-(
Stamp = binary_to_integer(Val),
292
:-(
case Stamp + expire_after() > Now of
293
:-(
true -> Acc;
294
:-(
false -> [Key | Acc]
295 end;
296 (Key, Acc) ->
297
:-(
[{Key} | Acc]
298 end,
299 [],
300 Results).
301
302 -spec refresh_jids() -> any().
303 refresh_jids() ->
304
:-(
ets:foldl(fun({Jid}, _) -> refresh_jid(Jid) end, [], ?JIDS_ETS).
305
306 -spec refresh_jid(Jid :: binary()) -> any().
307 refresh_jid(Jid) ->
308
:-(
LocalHost = opt(local_host),
309
:-(
q([<<"SET">>, Jid, LocalHost, <<"EX">>, expire_after()]). %% TODO: log error
310
311 -spec refresh_endpoints() -> any().
312 refresh_endpoints() ->
313
:-(
set_endpoints(opt([connections, advertised_endpoints])).
314
315 -spec set_endpoints(Endpoints :: [mod_global_distrib_utils:endpoint()]) -> any().
316 set_endpoints(Endpoints) ->
317
:-(
EndpointsKey = endpoints_key(),
318
:-(
BinEndpoints = lists:map(fun endpoint_to_binary/1, Endpoints),
319
:-(
refresh_set(EndpointsKey, BinEndpoints).
320
321 -spec endpoint_to_binary(mod_global_distrib_utils:endpoint()) -> binary().
322 endpoint_to_binary({IpAddr, Port}) when is_tuple(IpAddr) ->
323
:-(
iolist_to_binary([inet:ntoa(IpAddr), "#", integer_to_binary(Port)]);
324 endpoint_to_binary({Domain, Port}) when is_list(Domain) ->
325
:-(
iolist_to_binary([unicode:characters_to_binary(Domain), "#", integer_to_binary(Port)]).
326
327 -spec binary_to_endpoint(binary()) -> mod_global_distrib_utils:endpoint().
328 binary_to_endpoint(Bin) ->
329
:-(
[Addr, BinPort] = binary:split(Bin, <<"#">>),
330
:-(
{_, IpAddrOrDomain} = mod_global_distrib_utils:parse_address(Addr),
331
:-(
Port = binary_to_integer(BinPort),
332
:-(
{IpAddrOrDomain, Port}.
333
334 -spec refresh_domains() -> any().
335 refresh_domains() ->
336
:-(
LocalHost = opt(local_host),
337
:-(
DomainsKey = domains_key(),
338
:-(
Domains = [Domain || {Domain} <- ets:tab2list(?DOMAINS_ETS)],
339
:-(
refresh_set(DomainsKey, Domains),
340
:-(
lists:foreach(fun(Domain) ->
341
:-(
q([<<"SET">>, Domain, LocalHost, <<"EX">>, expire_after()])
342 end,
343 Domains).
344
345 -spec refresh_public_domains() -> any().
346 refresh_public_domains() ->
347
:-(
DomainsKey = public_domains_key(),
348
:-(
Domains = [Domain || {Domain} <- ets:tab2list(?PUBLIC_DOMAINS_ETS)],
349
:-(
refresh_set(DomainsKey, Domains).
350
351 -spec refresh_set(Key :: binary(), Members :: [binary()]) -> any().
352 refresh_set(Key, Members) ->
353
:-(
q([<<"PERSIST">>, Key]),
354
355
:-(
ToDelete =
356 case catch q([<<"SMEMBERS">>, Key]) of
357 {ok, ExistingMembers} ->
358
:-(
ordsets:subtract(ordsets:from_list(ExistingMembers),
359 ordsets:from_list(Members));
360 _ ->
361
:-(
[]
362 end,
363
364
:-(
case ToDelete of
365
:-(
[] -> ok;
366
:-(
_ -> q([<<"SREM">>, Key | ToDelete])
367 end,
368
369
:-(
case Members of
370
:-(
[] -> ok;
371
:-(
_ -> q([<<"SADD">>, Key | Members])
372 end,
373
:-(
q([<<"EXPIRE">>, Key, expire_after()]).
Line Hits Source