1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_mapping_redis). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(mod_global_distrib_mapping_backend). |
21 |
|
|
22 |
|
-include("mongoose.hrl"). |
23 |
|
|
24 |
|
-define(JIDS_ETS, mod_global_distrib_mapping_redis_jids). |
25 |
|
-define(DOMAINS_ETS, mod_global_distrib_mapping_redis_domains). |
26 |
|
-define(PUBLIC_DOMAINS_ETS, mod_global_distrib_mapping_redis_public_domains). |
27 |
|
|
28 |
|
-export([start/1, stop/0, |
29 |
|
put_session/1, get_session/1, delete_session/1, |
30 |
|
put_domain/2, get_domain/1, delete_domain/1, |
31 |
|
get_endpoints/1, get_domains/0, get_public_domains/0, get_hosts/0]). |
32 |
|
|
33 |
|
-export([refresh/0, refresh/1]). |
34 |
|
|
35 |
|
-export([init/1, handle_info/2]). |
36 |
|
|
37 |
|
%% Only for debug & tests! |
38 |
|
-export([nodes_key/0, set_endpoints/1]). |
39 |
|
|
40 |
|
-ignore_xref([handle_info/2, init/1, nodes_key/0, refresh/0, refresh/1, set_endpoints/1]). |
41 |
|
|
42 |
|
%%-------------------------------------------------------------------- |
43 |
|
%% API |
44 |
|
%%-------------------------------------------------------------------- |
45 |
|
|
46 |
|
-spec start(gen_mod:module_opts()) -> any(). |
47 |
|
start(#{redis := #{refresh_after := RefreshAfter}}) -> |
48 |
21 |
mod_global_distrib_utils:create_ets([?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]), |
49 |
|
|
50 |
21 |
Refresher = {mod_global_distrib_redis_refresher, |
51 |
|
{gen_server, start_link, [?MODULE, RefreshAfter, []]}, |
52 |
|
permanent, 1000, supervisor, [?MODULE]}, |
53 |
21 |
ejabberd_sup:start_child(Refresher), |
54 |
21 |
ok. |
55 |
|
|
56 |
|
-spec stop() -> ok. |
57 |
|
stop() -> |
58 |
21 |
lists:foreach(fun(Id) -> ejabberd_sup:stop_child(Id) end, |
59 |
|
[?MODULE, mod_global_distrib_redis_refresher]), |
60 |
21 |
[ets:delete(Tab) || Tab <- [?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]], |
61 |
21 |
ok. |
62 |
|
|
63 |
|
-spec put_session(jid:literal_jid()) -> ok. |
64 |
|
put_session(Jid) -> |
65 |
142 |
ets:insert(?JIDS_ETS, {Jid}), |
66 |
142 |
do_put(Jid, opt(local_host)). |
67 |
|
|
68 |
|
-spec get_session(jid:literal_jid()) -> {ok, jid:lserver()} | error. |
69 |
|
get_session(Jid) -> |
70 |
75 |
do_get(Jid). |
71 |
|
|
72 |
|
-spec delete_session(jid:literal_jid()) -> ok. |
73 |
|
delete_session(Jid) -> |
74 |
146 |
ets:delete(?JIDS_ETS, Jid), |
75 |
146 |
do_delete(Jid). |
76 |
|
|
77 |
|
-spec put_domain(jid:lserver(), IsHidden :: boolean()) -> ok. |
78 |
|
put_domain(Domain, IsHidden) -> |
79 |
10 |
ets:insert(?DOMAINS_ETS, {Domain}), |
80 |
10 |
{ok, _} = q([<<"SADD">>, domains_key(), Domain]), |
81 |
10 |
do_put(Domain, opt(local_host)), |
82 |
10 |
case IsHidden of |
83 |
|
false -> |
84 |
9 |
ets:insert(?PUBLIC_DOMAINS_ETS, {Domain}), |
85 |
9 |
{ok, _} = q([<<"SADD">>, public_domains_key(), Domain]), |
86 |
9 |
ok; |
87 |
|
true -> |
88 |
1 |
ok |
89 |
|
end. |
90 |
|
|
91 |
|
-spec get_domain(jid:lserver()) -> {ok, jid:lserver()} | error. |
92 |
|
get_domain(Domain) -> |
93 |
17 |
do_get(Domain). |
94 |
|
|
95 |
|
-spec delete_domain(jid:lserver()) -> ok. |
96 |
|
delete_domain(Domain) -> |
97 |
12 |
ets:delete(?DOMAINS_ETS, Domain), |
98 |
12 |
ets:delete(?PUBLIC_DOMAINS_ETS, Domain), |
99 |
12 |
do_delete(Domain), |
100 |
12 |
{ok, _} = q([<<"SREM">>, domains_key(), Domain]), |
101 |
12 |
ok. |
102 |
|
|
103 |
|
-spec get_domains() -> [jid:lserver()]. |
104 |
|
get_domains() -> |
105 |
3 |
get_domains(fun domains_key/2). |
106 |
|
|
107 |
|
-spec get_public_domains() -> [jid:lserver()]. |
108 |
|
get_public_domains() -> |
109 |
3 |
get_domains(fun public_domains_key/2). |
110 |
|
|
111 |
|
-spec get_endpoints(Host :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()]. |
112 |
|
get_endpoints(Host) -> |
113 |
121 |
Nodes = get_nodes(Host), |
114 |
121 |
get_endpoints_for_nodes(Host, Nodes). |
115 |
|
|
116 |
|
get_endpoints_for_nodes(_Host, []) -> |
117 |
:-( |
[]; |
118 |
|
get_endpoints_for_nodes(Host, Nodes) -> |
119 |
121 |
EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes], |
120 |
121 |
{ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]), |
121 |
121 |
lists:map(fun binary_to_endpoint/1, BinEndpoints). |
122 |
|
|
123 |
|
%%-------------------------------------------------------------------- |
124 |
|
%% gen_server API |
125 |
|
%%-------------------------------------------------------------------- |
126 |
|
|
127 |
|
init(RefreshAfter) -> |
128 |
21 |
refresh_and_schedule_next("initial_autorefresh", RefreshAfter), |
129 |
21 |
{ok, RefreshAfter}. |
130 |
|
|
131 |
|
handle_info(refresh, RefreshAfter) -> |
132 |
:-( |
refresh_and_schedule_next("autorefresh", RefreshAfter), |
133 |
:-( |
{noreply, RefreshAfter}. |
134 |
|
|
135 |
|
refresh() -> |
136 |
:-( |
refresh("reason_unknown"). |
137 |
|
|
138 |
|
refresh_and_schedule_next(Reason, RefreshAfter) -> |
139 |
21 |
Reason2 = Reason ++ ",next_refresh_in=" ++ integer_to_list(RefreshAfter), |
140 |
21 |
refresh(Reason2), |
141 |
21 |
erlang:send_after(timer:seconds(RefreshAfter), self(), refresh). |
142 |
|
|
143 |
|
-spec refresh(Reason :: string()) -> ok. |
144 |
|
refresh(Reason) -> |
145 |
38 |
?LOG_DEBUG(#{what => gd_refreshing_own_hosts}), |
146 |
38 |
refresh_hosts(), |
147 |
38 |
?LOG_DEBUG(#{what => gd_refreshing_own_nodes}), |
148 |
38 |
refresh_nodes(), |
149 |
38 |
?LOG_DEBUG(#{what => gd_refreshing_own_jids}), |
150 |
38 |
refresh_jids(), |
151 |
38 |
?LOG_DEBUG(#{what => gd_refreshing_own_endpoints}), |
152 |
38 |
refresh_endpoints(), |
153 |
38 |
?LOG_DEBUG(#{what => gd_refreshing_own_domains}), |
154 |
38 |
refresh_domains(), |
155 |
38 |
?LOG_DEBUG(#{what => gd_refreshing_own_public_domains}), |
156 |
38 |
refresh_public_domains(), |
157 |
38 |
?LOG_INFO(#{what => gd_refreshing_own_data_done, reason => Reason}), |
158 |
38 |
ok. |
159 |
|
|
160 |
|
%%-------------------------------------------------------------------- |
161 |
|
%% Helpers |
162 |
|
%%-------------------------------------------------------------------- |
163 |
|
|
164 |
|
-spec q(Args :: list()) -> {ok, term()}. |
165 |
|
q(Args) -> |
166 |
1994 |
{ok, Worker} = mongoose_wpool:get_worker(redis, global, pool()), |
167 |
1994 |
case eredis:q(Worker, Args) of |
168 |
|
{ok, _} = OKRes -> |
169 |
1994 |
OKRes; |
170 |
|
Error -> |
171 |
:-( |
?LOG_ERROR(#{what => gd_redis_query_error, worker => Worker, |
172 |
:-( |
redis_query => Args, reason => Error}), |
173 |
:-( |
error(Error) |
174 |
|
end. |
175 |
|
|
176 |
|
-spec nodes_key() -> binary(). |
177 |
|
nodes_key() -> |
178 |
83 |
LocalHost = opt(local_host), |
179 |
83 |
nodes_key(LocalHost). |
180 |
|
|
181 |
|
-spec nodes_key(Host :: jid:lserver()) -> binary(). |
182 |
|
nodes_key(Host) when is_binary(Host) -> |
183 |
216 |
<<Host/binary, "#{nodes}">>. |
184 |
|
|
185 |
|
-spec endpoints_key() -> binary(). |
186 |
|
endpoints_key() -> |
187 |
43 |
key(<<"endpoints">>). |
188 |
|
|
189 |
|
-spec endpoints_key(Host :: jid:lserver(), Node :: binary()) -> binary(). |
190 |
|
endpoints_key(Host, Node) -> |
191 |
149 |
key(Host, Node, <<"endpoints">>). |
192 |
|
|
193 |
|
-spec domains_key() -> binary(). |
194 |
|
domains_key() -> |
195 |
60 |
key(<<"domains">>). |
196 |
|
|
197 |
|
-spec domains_key(Host :: jid:lserver(), Node :: binary()) -> binary(). |
198 |
|
domains_key(Host, Node) -> |
199 |
6 |
key(Host, Node, <<"domains">>). |
200 |
|
|
201 |
|
-spec public_domains_key() -> binary(). |
202 |
|
public_domains_key() -> |
203 |
47 |
key(<<"public_domains">>). |
204 |
|
|
205 |
|
-spec public_domains_key(Host :: jid:lserver(), Node :: binary()) -> binary(). |
206 |
|
public_domains_key(Host, Node) -> |
207 |
7 |
key(Host, Node, <<"public_domains">>). |
208 |
|
|
209 |
|
-spec key(Type :: binary()) -> binary(). |
210 |
|
key(Type) -> |
211 |
150 |
LocalHost = opt(local_host), |
212 |
150 |
Node = atom_to_binary(node(), latin1), |
213 |
150 |
key(LocalHost, Node, Type). |
214 |
|
|
215 |
|
-spec key(Host :: jid:lserver(), Node :: binary(), Type :: binary()) -> binary(). |
216 |
|
key(Host, Node, Type) -> |
217 |
312 |
<<Host/binary, "#", Node/binary, "#{", Type/binary, "}">>. |
218 |
|
|
219 |
|
-spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value(). |
220 |
|
opt(Key) -> |
221 |
3001 |
mod_global_distrib_utils:opt(mod_global_distrib_mapping, Key). |
222 |
|
|
223 |
|
-spec expire_after() -> pos_integer(). |
224 |
|
expire_after() -> |
225 |
338 |
opt([redis, expire_after]). |
226 |
|
|
227 |
|
-spec pool() -> atom(). |
228 |
|
pool() -> |
229 |
1994 |
opt([redis, pool]). |
230 |
|
|
231 |
|
-spec do_put(Key :: binary(), Host :: binary()) -> ok. |
232 |
|
do_put(Key, Host) -> |
233 |
152 |
{ok, _} = q([<<"SET">>, Key, Host, <<"EX">>, expire_after()]), |
234 |
152 |
ok. |
235 |
|
|
236 |
|
-spec do_get(Key :: binary()) -> {ok, Host :: binary()} | error. |
237 |
|
do_get(Key) -> |
238 |
92 |
case q([<<"GET">>, Key]) of |
239 |
50 |
{ok, undefined} -> error; |
240 |
42 |
{ok, Host} -> {ok, Host} |
241 |
|
end. |
242 |
|
|
243 |
|
-spec do_delete(Key :: binary()) -> ok. |
244 |
|
do_delete(Key) -> |
245 |
158 |
LocalHost = opt(local_host), |
246 |
158 |
case q([<<"GET">>, Key]) of |
247 |
149 |
{ok, LocalHost} -> {ok, _} = q([<<"DEL">>, Key]); |
248 |
9 |
_ -> ok |
249 |
|
end, |
250 |
158 |
ok. |
251 |
|
|
252 |
|
-spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) -> |
253 |
|
[jid:lserver()]. |
254 |
|
get_domains(KeyFun) -> |
255 |
6 |
Hosts = get_hosts(), |
256 |
6 |
Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts), |
257 |
6 |
Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes], |
258 |
6 |
{ok, Domains} = q([<<"SUNION">> | Keys]), |
259 |
6 |
Domains. |
260 |
|
|
261 |
|
-spec refresh_hosts() -> any(). |
262 |
|
refresh_hosts() -> |
263 |
38 |
q([<<"SADD">>, <<"hosts">>, opt(local_host)]). |
264 |
|
|
265 |
|
-spec get_hosts() -> [Host :: jid:lserver()]. |
266 |
|
get_hosts() -> |
267 |
613 |
{ok, Hosts} = q([<<"SMEMBERS">>, <<"hosts">>]), |
268 |
613 |
Hosts. |
269 |
|
|
270 |
|
-spec refresh_nodes() -> any(). |
271 |
|
refresh_nodes() -> |
272 |
38 |
NodesKey = nodes_key(), |
273 |
38 |
Now = erlang:system_time(second), |
274 |
38 |
case get_expired_nodes(Now) of |
275 |
37 |
[] -> ok; |
276 |
1 |
ExpiredNodes -> q([<<"HDEL">>, NodesKey | ExpiredNodes]) |
277 |
|
end, |
278 |
38 |
q([<<"HSET">>, NodesKey, atom_to_binary(node(), latin1), Now]). |
279 |
|
|
280 |
|
-spec get_nodes(Host :: jid:lserver()) -> [Node :: binary()]. |
281 |
|
get_nodes(Host) -> |
282 |
133 |
{ok, Nodes} = q([<<"HKEYS">>, nodes_key(Host)]), |
283 |
133 |
Nodes. |
284 |
|
|
285 |
|
-spec get_expired_nodes(Now :: integer()) -> [Node :: binary()]. |
286 |
|
get_expired_nodes(Now) -> |
287 |
38 |
{ok, Results} = q([<<"HGETALL">>, nodes_key()]), |
288 |
38 |
lists:foldl( |
289 |
|
fun |
290 |
|
(Val, [{Key} | Acc]) -> |
291 |
54 |
Stamp = binary_to_integer(Val), |
292 |
54 |
case Stamp + expire_after() > Now of |
293 |
53 |
true -> Acc; |
294 |
1 |
false -> [Key | Acc] |
295 |
|
end; |
296 |
|
(Key, Acc) -> |
297 |
54 |
[{Key} | Acc] |
298 |
|
end, |
299 |
|
[], |
300 |
|
Results). |
301 |
|
|
302 |
|
-spec refresh_jids() -> any(). |
303 |
|
refresh_jids() -> |
304 |
38 |
ets:foldl(fun({Jid}, _) -> refresh_jid(Jid) end, [], ?JIDS_ETS). |
305 |
|
|
306 |
|
-spec refresh_jid(Jid :: binary()) -> any(). |
307 |
|
refresh_jid(Jid) -> |
308 |
12 |
LocalHost = opt(local_host), |
309 |
12 |
q([<<"SET">>, Jid, LocalHost, <<"EX">>, expire_after()]). %% TODO: log error |
310 |
|
|
311 |
|
-spec refresh_endpoints() -> any(). |
312 |
|
refresh_endpoints() -> |
313 |
38 |
set_endpoints(opt([connections, advertised_endpoints])). |
314 |
|
|
315 |
|
-spec set_endpoints(Endpoints :: [mod_global_distrib_utils:endpoint()]) -> any(). |
316 |
|
set_endpoints(Endpoints) -> |
317 |
43 |
EndpointsKey = endpoints_key(), |
318 |
43 |
BinEndpoints = lists:map(fun endpoint_to_binary/1, Endpoints), |
319 |
43 |
refresh_set(EndpointsKey, BinEndpoints). |
320 |
|
|
321 |
|
-spec endpoint_to_binary(mod_global_distrib_utils:endpoint()) -> binary(). |
322 |
|
endpoint_to_binary({IpAddr, Port}) when is_tuple(IpAddr) -> |
323 |
:-( |
iolist_to_binary([inet:ntoa(IpAddr), "#", integer_to_binary(Port)]); |
324 |
|
endpoint_to_binary({Domain, Port}) when is_list(Domain) -> |
325 |
43 |
iolist_to_binary([unicode:characters_to_binary(Domain), "#", integer_to_binary(Port)]). |
326 |
|
|
327 |
|
-spec binary_to_endpoint(binary()) -> mod_global_distrib_utils:endpoint(). |
328 |
|
binary_to_endpoint(Bin) -> |
329 |
148 |
[Addr, BinPort] = binary:split(Bin, <<"#">>), |
330 |
148 |
{_, IpAddrOrDomain} = mod_global_distrib_utils:parse_address(Addr), |
331 |
148 |
Port = binary_to_integer(BinPort), |
332 |
148 |
{IpAddrOrDomain, Port}. |
333 |
|
|
334 |
|
-spec refresh_domains() -> any(). |
335 |
|
refresh_domains() -> |
336 |
38 |
LocalHost = opt(local_host), |
337 |
38 |
DomainsKey = domains_key(), |
338 |
38 |
Domains = [Domain || {Domain} <- ets:tab2list(?DOMAINS_ETS)], |
339 |
38 |
refresh_set(DomainsKey, Domains), |
340 |
38 |
lists:foreach(fun(Domain) -> |
341 |
1 |
q([<<"SET">>, Domain, LocalHost, <<"EX">>, expire_after()]) |
342 |
|
end, |
343 |
|
Domains). |
344 |
|
|
345 |
|
-spec refresh_public_domains() -> any(). |
346 |
|
refresh_public_domains() -> |
347 |
38 |
DomainsKey = public_domains_key(), |
348 |
38 |
Domains = [Domain || {Domain} <- ets:tab2list(?PUBLIC_DOMAINS_ETS)], |
349 |
38 |
refresh_set(DomainsKey, Domains). |
350 |
|
|
351 |
|
-spec refresh_set(Key :: binary(), Members :: [binary()]) -> any(). |
352 |
|
refresh_set(Key, Members) -> |
353 |
119 |
q([<<"PERSIST">>, Key]), |
354 |
|
|
355 |
119 |
ToDelete = |
356 |
|
case catch q([<<"SMEMBERS">>, Key]) of |
357 |
|
{ok, ExistingMembers} -> |
358 |
119 |
ordsets:subtract(ordsets:from_list(ExistingMembers), |
359 |
|
ordsets:from_list(Members)); |
360 |
|
_ -> |
361 |
:-( |
[] |
362 |
|
end, |
363 |
|
|
364 |
119 |
case ToDelete of |
365 |
108 |
[] -> ok; |
366 |
11 |
_ -> q([<<"SREM">>, Key | ToDelete]) |
367 |
|
end, |
368 |
|
|
369 |
119 |
case Members of |
370 |
76 |
[] -> ok; |
371 |
43 |
_ -> q([<<"SADD">>, Key | Members]) |
372 |
|
end, |
373 |
119 |
q([<<"EXPIRE">>, Key, expire_after()]). |