1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_mapping_redis). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(mod_global_distrib_mapping). |
21 |
|
|
22 |
|
-include("mongoose.hrl"). |
23 |
|
|
24 |
|
-define(JIDS_ETS, mod_global_distrib_mapping_redis_jids). |
25 |
|
-define(DOMAINS_ETS, mod_global_distrib_mapping_redis_domains). |
26 |
|
-define(PUBLIC_DOMAINS_ETS, mod_global_distrib_mapping_redis_public_domains). |
27 |
|
|
28 |
|
-export([start/1, stop/0, |
29 |
|
put_session/1, get_session/1, delete_session/1, |
30 |
|
put_domain/2, get_domain/1, delete_domain/1, |
31 |
|
get_endpoints/1, get_domains/0, get_public_domains/0, get_hosts/0]). |
32 |
|
|
33 |
|
-export([refresh/0, refresh/1]). |
34 |
|
|
35 |
|
-export([init/1, handle_info/2]). |
36 |
|
|
37 |
|
%% Only for debug & tests! |
38 |
|
-export([nodes_key/0, set_endpoints/1]). |
39 |
|
|
40 |
|
-ignore_xref([handle_info/2, init/1, nodes_key/0, refresh/0, refresh/1, set_endpoints/1]). |
41 |
|
|
42 |
|
%%-------------------------------------------------------------------- |
43 |
|
%% API |
44 |
|
%%-------------------------------------------------------------------- |
45 |
|
|
46 |
|
-spec start(proplists:proplist()) -> any(). |
47 |
|
start(Opts) -> |
48 |
21 |
RefreshAfter = proplists:get_value(refresh_after, Opts, 60), |
49 |
|
|
50 |
21 |
mod_global_distrib_utils:create_ets([?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]), |
51 |
21 |
ExpireAfter = proplists:get_value(expire_after, Opts, 120), |
52 |
21 |
ets:insert(?MODULE, {expire_after, ExpireAfter}), |
53 |
21 |
PoolTag = proplists:get_value(pool, Opts, global_distrib), |
54 |
21 |
ets:insert(?MODULE, {pool, PoolTag}), |
55 |
|
|
56 |
21 |
Refresher = {mod_global_distrib_redis_refresher, |
57 |
|
{gen_server, start_link, [?MODULE, RefreshAfter, []]}, |
58 |
|
permanent, 1000, supervisor, [?MODULE]}, |
59 |
21 |
ejabberd_sup:start_child(Refresher), |
60 |
21 |
ok. |
61 |
|
|
62 |
|
-spec stop() -> any(). |
63 |
|
stop() -> |
64 |
21 |
lists:foreach( |
65 |
|
fun(Id) -> |
66 |
42 |
ejabberd_sup:stop_child(Id) |
67 |
|
end, |
68 |
|
[?MODULE, mod_global_distrib_redis_refresher]), |
69 |
21 |
[ets:delete(Tab) || Tab <- [?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]]. |
70 |
|
|
71 |
|
-spec put_session(Jid :: binary()) -> ok. |
72 |
|
put_session(Jid) -> |
73 |
142 |
ets:insert(?JIDS_ETS, {Jid}), |
74 |
142 |
do_put(Jid, opt(local_host)). |
75 |
|
|
76 |
|
-spec get_session(Jid :: binary()) -> {ok, Host :: binary()} | error. |
77 |
|
get_session(Jid) -> |
78 |
79 |
do_get(Jid). |
79 |
|
|
80 |
|
-spec delete_session(Jid :: binary()) -> ok. |
81 |
|
delete_session(Jid) -> |
82 |
144 |
ets:delete(?JIDS_ETS, Jid), |
83 |
144 |
do_delete(Jid). |
84 |
|
|
85 |
|
-spec put_domain(Domain :: binary(), IsHidden :: boolean()) -> ok. |
86 |
|
put_domain(Domain, IsHidden) -> |
87 |
12 |
ets:insert(?DOMAINS_ETS, {Domain}), |
88 |
12 |
{ok, _} = q([<<"SADD">>, domains_key(), Domain]), |
89 |
12 |
do_put(Domain, opt(local_host)), |
90 |
12 |
case IsHidden of |
91 |
|
false -> |
92 |
11 |
ets:insert(?PUBLIC_DOMAINS_ETS, {Domain}), |
93 |
11 |
{ok, _} = q([<<"SADD">>, public_domains_key(), Domain]), |
94 |
11 |
ok; |
95 |
|
true -> |
96 |
1 |
ok |
97 |
|
end. |
98 |
|
|
99 |
|
-spec get_domain(Domain :: binary()) -> {ok, Host :: binary()} | error. |
100 |
|
get_domain(Domain) -> |
101 |
17 |
do_get(Domain). |
102 |
|
|
103 |
|
-spec delete_domain(Domain :: binary()) -> ok. |
104 |
|
delete_domain(Domain) -> |
105 |
12 |
ets:delete(?DOMAINS_ETS, Domain), |
106 |
12 |
ets:delete(?PUBLIC_DOMAINS_ETS, Domain), |
107 |
12 |
do_delete(Domain), |
108 |
12 |
{ok, _} = q([<<"SREM">>, domains_key(), Domain]), |
109 |
12 |
ok. |
110 |
|
|
111 |
|
-spec get_domains() -> {ok, [Domain :: binary()]}. |
112 |
|
get_domains() -> |
113 |
3 |
get_domains(fun domains_key/2). |
114 |
|
|
115 |
|
-spec get_public_domains() -> {ok, [Domain :: binary()]}. |
116 |
|
get_public_domains() -> |
117 |
3 |
get_domains(fun public_domains_key/2). |
118 |
|
|
119 |
|
-spec get_endpoints(Host :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}. |
120 |
|
get_endpoints(Host) -> |
121 |
84 |
Nodes = get_nodes(Host), |
122 |
84 |
get_endpoints_for_nodes(Host, Nodes). |
123 |
|
|
124 |
|
get_endpoints_for_nodes(_Host, []) -> |
125 |
:-( |
{ok, []}; |
126 |
|
get_endpoints_for_nodes(Host, Nodes) -> |
127 |
84 |
EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes], |
128 |
84 |
{ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]), |
129 |
84 |
{ok, lists:map(fun binary_to_endpoint/1, BinEndpoints)}. |
130 |
|
|
131 |
|
%%-------------------------------------------------------------------- |
132 |
|
%% gen_server API |
133 |
|
%%-------------------------------------------------------------------- |
134 |
|
|
135 |
|
init(RefreshAfter) -> |
136 |
21 |
refresh_and_schedule_next("initial_autorefresh", RefreshAfter), |
137 |
21 |
{ok, RefreshAfter}. |
138 |
|
|
139 |
|
handle_info(refresh, RefreshAfter) -> |
140 |
:-( |
refresh_and_schedule_next("autorefresh", RefreshAfter), |
141 |
:-( |
{noreply, RefreshAfter}. |
142 |
|
|
143 |
|
refresh() -> |
144 |
:-( |
refresh("reason_unknown"). |
145 |
|
|
146 |
|
refresh_and_schedule_next(Reason, RefreshAfter) -> |
147 |
21 |
Reason2 = Reason ++ ",next_refresh_in=" ++ integer_to_list(RefreshAfter), |
148 |
21 |
refresh(Reason2), |
149 |
21 |
erlang:send_after(timer:seconds(RefreshAfter), self(), refresh). |
150 |
|
|
151 |
|
-spec refresh(Reason :: string()) -> ok. |
152 |
|
refresh(Reason) -> |
153 |
40 |
?LOG_DEBUG(#{what => gd_refreshing_own_hosts}), |
154 |
40 |
refresh_hosts(), |
155 |
40 |
?LOG_DEBUG(#{what => gd_refreshing_own_nodes}), |
156 |
40 |
refresh_nodes(), |
157 |
40 |
?LOG_DEBUG(#{what => gd_refreshing_own_jids}), |
158 |
40 |
refresh_jids(), |
159 |
40 |
?LOG_DEBUG(#{what => gd_refreshing_own_endpoints}), |
160 |
40 |
refresh_endpoints(), |
161 |
40 |
?LOG_DEBUG(#{what => gd_refreshing_own_domains}), |
162 |
40 |
refresh_domains(), |
163 |
40 |
?LOG_DEBUG(#{what => gd_refreshing_own_public_domains}), |
164 |
40 |
refresh_public_domains(), |
165 |
40 |
?LOG_INFO(#{what => gd_refreshing_own_data_done, reason => Reason}), |
166 |
40 |
ok. |
167 |
|
|
168 |
|
%%-------------------------------------------------------------------- |
169 |
|
%% Helpers |
170 |
|
%%-------------------------------------------------------------------- |
171 |
|
|
172 |
|
-spec q(Args :: list()) -> {ok, term()}. |
173 |
|
q(Args) -> |
174 |
2374 |
{ok, Worker} = mongoose_wpool:get_worker(redis, global, pool()), |
175 |
2374 |
case eredis:q(Worker, Args) of |
176 |
|
{ok, _} = OKRes -> |
177 |
2374 |
OKRes; |
178 |
|
Error -> |
179 |
:-( |
?LOG_ERROR(#{what => gd_redis_query_error, worker => Worker, |
180 |
:-( |
redis_query => Args, reason => Error}), |
181 |
:-( |
error(Error) |
182 |
|
end. |
183 |
|
|
184 |
|
-spec nodes_key() -> binary(). |
185 |
|
nodes_key() -> |
186 |
87 |
LocalHost = opt(local_host), |
187 |
87 |
nodes_key(LocalHost). |
188 |
|
|
189 |
|
-spec nodes_key(Host :: jid:lserver()) -> binary(). |
190 |
|
nodes_key(Host) when is_binary(Host) -> |
191 |
183 |
<<Host/binary, "#{nodes}">>. |
192 |
|
|
193 |
|
-spec endpoints_key() -> binary(). |
194 |
|
endpoints_key() -> |
195 |
43 |
key(<<"endpoints">>). |
196 |
|
|
197 |
|
-spec endpoints_key(Host :: jid:lserver(), Node :: binary()) -> binary(). |
198 |
|
endpoints_key(Host, Node) -> |
199 |
106 |
key(Host, Node, <<"endpoints">>). |
200 |
|
|
201 |
|
-spec domains_key() -> binary(). |
202 |
|
domains_key() -> |
203 |
64 |
key(<<"domains">>). |
204 |
|
|
205 |
|
-spec domains_key(Host :: jid:lserver(), Node :: binary()) -> binary(). |
206 |
|
domains_key(Host, Node) -> |
207 |
6 |
key(Host, Node, <<"domains">>). |
208 |
|
|
209 |
|
-spec public_domains_key() -> binary(). |
210 |
|
public_domains_key() -> |
211 |
51 |
key(<<"public_domains">>). |
212 |
|
|
213 |
|
-spec public_domains_key(Host :: jid:lserver(), Node :: binary()) -> binary(). |
214 |
|
public_domains_key(Host, Node) -> |
215 |
7 |
key(Host, Node, <<"public_domains">>). |
216 |
|
|
217 |
|
-spec key(Type :: binary()) -> binary(). |
218 |
|
key(Type) -> |
219 |
158 |
LocalHost = opt(local_host), |
220 |
158 |
Node = atom_to_binary(node(), latin1), |
221 |
158 |
key(LocalHost, Node, Type). |
222 |
|
|
223 |
|
-spec key(Host :: jid:lserver(), Node :: binary(), Type :: binary()) -> binary(). |
224 |
|
key(Host, Node, Type) -> |
225 |
277 |
<<Host/binary, "#", Node/binary, "#{", Type/binary, "}">>. |
226 |
|
|
227 |
|
-spec opt(Key :: atom()) -> term(). |
228 |
|
opt(Key) -> |
229 |
687 |
mod_global_distrib_utils:opt(mod_global_distrib_mapping, Key). |
230 |
|
|
231 |
|
-spec expire_after() -> pos_integer(). |
232 |
|
expire_after() -> |
233 |
352 |
ets:lookup_element(?MODULE, expire_after, 2). |
234 |
|
|
235 |
|
-spec pool() -> atom(). |
236 |
|
pool() -> |
237 |
2374 |
ets:lookup_element(?MODULE, pool, 2). |
238 |
|
|
239 |
|
-spec do_put(Key :: binary(), Host :: binary()) -> ok. |
240 |
|
do_put(Key, Host) -> |
241 |
154 |
{ok, _} = q([<<"SET">>, Key, Host, <<"EX">>, expire_after()]), |
242 |
154 |
ok. |
243 |
|
|
244 |
|
-spec do_get(Key :: binary()) -> {ok, Host :: binary()} | error. |
245 |
|
do_get(Key) -> |
246 |
96 |
case q([<<"GET">>, Key]) of |
247 |
42 |
{ok, undefined} -> error; |
248 |
54 |
{ok, Host} -> {ok, Host} |
249 |
|
end. |
250 |
|
|
251 |
|
-spec do_delete(Key :: binary()) -> ok. |
252 |
|
do_delete(Key) -> |
253 |
156 |
LocalHost = opt(local_host), |
254 |
156 |
case q([<<"GET">>, Key]) of |
255 |
149 |
{ok, LocalHost} -> {ok, _} = q([<<"DEL">>, Key]); |
256 |
7 |
_ -> ok |
257 |
|
end, |
258 |
156 |
ok. |
259 |
|
|
260 |
|
-spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) -> |
261 |
|
{ok, Domains :: [binary()]}. |
262 |
|
get_domains(KeyFun) -> |
263 |
6 |
Hosts = get_hosts(), |
264 |
6 |
Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts), |
265 |
6 |
Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes], |
266 |
6 |
{ok, _Domains} = q([<<"SUNION">> | Keys]). |
267 |
|
|
268 |
|
-spec refresh_hosts() -> any(). |
269 |
|
refresh_hosts() -> |
270 |
40 |
q([<<"SADD">>, <<"hosts">>, opt(local_host)]). |
271 |
|
|
272 |
|
-spec get_hosts() -> [Host :: jid:lserver()]. |
273 |
|
get_hosts() -> |
274 |
1019 |
{ok, Hosts} = q([<<"SMEMBERS">>, <<"hosts">>]), |
275 |
1019 |
Hosts. |
276 |
|
|
277 |
|
-spec refresh_nodes() -> any(). |
278 |
|
refresh_nodes() -> |
279 |
40 |
NodesKey = nodes_key(), |
280 |
40 |
Now = erlang:system_time(second), |
281 |
40 |
case get_expired_nodes(Now) of |
282 |
39 |
[] -> ok; |
283 |
1 |
ExpiredNodes -> q([<<"HDEL">>, NodesKey | ExpiredNodes]) |
284 |
|
end, |
285 |
40 |
q([<<"HSET">>, NodesKey, atom_to_binary(node(), latin1), Now]). |
286 |
|
|
287 |
|
-spec get_nodes(Host :: jid:lserver()) -> [Node :: binary()]. |
288 |
|
get_nodes(Host) -> |
289 |
96 |
{ok, Nodes} = q([<<"HKEYS">>, nodes_key(Host)]), |
290 |
96 |
Nodes. |
291 |
|
|
292 |
|
-spec get_expired_nodes(Now :: integer()) -> [Node :: binary()]. |
293 |
|
get_expired_nodes(Now) -> |
294 |
40 |
{ok, Results} = q([<<"HGETALL">>, nodes_key()]), |
295 |
40 |
lists:foldl( |
296 |
|
fun |
297 |
|
(Val, [{Key} | Acc]) -> |
298 |
56 |
Stamp = binary_to_integer(Val), |
299 |
56 |
case Stamp + expire_after() > Now of |
300 |
55 |
true -> Acc; |
301 |
1 |
false -> [Key | Acc] |
302 |
|
end; |
303 |
|
(Key, Acc) -> |
304 |
56 |
[{Key} | Acc] |
305 |
|
end, |
306 |
|
[], |
307 |
|
Results). |
308 |
|
|
309 |
|
-spec refresh_jids() -> any(). |
310 |
|
refresh_jids() -> |
311 |
40 |
ets:foldl(fun({Jid}, _) -> refresh_jid(Jid) end, [], ?JIDS_ETS). |
312 |
|
|
313 |
|
-spec refresh_jid(Jid :: binary()) -> any(). |
314 |
|
refresh_jid(Jid) -> |
315 |
12 |
LocalHost = opt(local_host), |
316 |
12 |
q([<<"SET">>, Jid, LocalHost, <<"EX">>, expire_after()]). %% TODO: log error |
317 |
|
|
318 |
|
-spec refresh_endpoints() -> any(). |
319 |
|
refresh_endpoints() -> |
320 |
40 |
AdvertisedEndpoints = opt(advertised_endpoints), |
321 |
40 |
LocalEndpoints = mod_global_distrib_receiver:endpoints(), |
322 |
40 |
FinalEndpoints = case AdvertisedEndpoints of |
323 |
39 |
false -> LocalEndpoints; |
324 |
1 |
Endpoints -> Endpoints |
325 |
|
end, |
326 |
40 |
set_endpoints(FinalEndpoints). |
327 |
|
|
328 |
|
-spec set_endpoints(Endpoints :: [mod_global_distrib_utils:endpoint()]) -> any(). |
329 |
|
set_endpoints(Endpoints) -> |
330 |
43 |
EndpointsKey = endpoints_key(), |
331 |
43 |
BinEndpoints = lists:map(fun endpoint_to_binary/1, Endpoints), |
332 |
43 |
refresh_set(EndpointsKey, BinEndpoints). |
333 |
|
|
334 |
|
-spec endpoint_to_binary(mod_global_distrib_utils:endpoint()) -> binary(). |
335 |
|
endpoint_to_binary({IpAddr, Port}) when is_tuple(IpAddr) -> |
336 |
42 |
iolist_to_binary([inet:ntoa(IpAddr), "#", integer_to_binary(Port)]); |
337 |
|
endpoint_to_binary({Domain, Port}) when is_list(Domain) -> |
338 |
1 |
iolist_to_binary([unicode:characters_to_binary(Domain), "#", integer_to_binary(Port)]). |
339 |
|
|
340 |
|
-spec binary_to_endpoint(binary()) -> mod_global_distrib_utils:endpoint(). |
341 |
|
binary_to_endpoint(Bin) -> |
342 |
106 |
[Addr, BinPort] = binary:split(Bin, <<"#">>), |
343 |
106 |
{_, IpAddrOrDomain} = mod_global_distrib_utils:parse_address(Addr), |
344 |
106 |
Port = binary_to_integer(BinPort), |
345 |
106 |
{IpAddrOrDomain, Port}. |
346 |
|
|
347 |
|
-spec refresh_domains() -> any(). |
348 |
|
refresh_domains() -> |
349 |
40 |
LocalHost = opt(local_host), |
350 |
40 |
DomainsKey = domains_key(), |
351 |
40 |
Domains = [Domain || {Domain} <- ets:tab2list(?DOMAINS_ETS)], |
352 |
40 |
refresh_set(DomainsKey, Domains), |
353 |
40 |
lists:foreach(fun(Domain) -> |
354 |
7 |
q([<<"SET">>, Domain, LocalHost, <<"EX">>, expire_after()]) |
355 |
|
end, |
356 |
|
Domains). |
357 |
|
|
358 |
|
-spec refresh_public_domains() -> any(). |
359 |
|
refresh_public_domains() -> |
360 |
40 |
DomainsKey = public_domains_key(), |
361 |
40 |
Domains = [Domain || {Domain} <- ets:tab2list(?PUBLIC_DOMAINS_ETS)], |
362 |
40 |
refresh_set(DomainsKey, Domains). |
363 |
|
|
364 |
|
-spec refresh_set(Key :: binary(), Members :: [binary()]) -> any(). |
365 |
|
refresh_set(Key, Members) -> |
366 |
123 |
q([<<"PERSIST">>, Key]), |
367 |
|
|
368 |
123 |
ToDelete = |
369 |
|
case catch q([<<"SMEMBERS">>, Key]) of |
370 |
|
{ok, ExistingMembers} -> |
371 |
123 |
ordsets:subtract(ordsets:from_list(ExistingMembers), |
372 |
|
ordsets:from_list(Members)); |
373 |
|
_ -> |
374 |
:-( |
[] |
375 |
|
end, |
376 |
|
|
377 |
123 |
case ToDelete of |
378 |
108 |
[] -> ok; |
379 |
15 |
_ -> q([<<"SREM">>, Key | ToDelete]) |
380 |
|
end, |
381 |
|
|
382 |
123 |
case Members of |
383 |
68 |
[] -> ok; |
384 |
55 |
_ -> q([<<"SADD">>, Key | Members]) |
385 |
|
end, |
386 |
123 |
q([<<"EXPIRE">>, Key, expire_after()]). |