./ct_report/coverage/mod_global_distrib_mapping_redis.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_mapping_redis).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(mod_global_distrib_mapping_backend).
21
22 -include("mongoose.hrl").
23
24 -define(JIDS_ETS, mod_global_distrib_mapping_redis_jids).
25 -define(DOMAINS_ETS, mod_global_distrib_mapping_redis_domains).
26 -define(PUBLIC_DOMAINS_ETS, mod_global_distrib_mapping_redis_public_domains).
27
28 -export([start/1, stop/0,
29 put_session/1, get_session/1, delete_session/1,
30 put_domain/2, get_domain/1, delete_domain/1,
31 get_endpoints/1, get_domains/0, get_public_domains/0, get_hosts/0]).
32
33 -export([refresh/0, refresh/1]).
34
35 -export([init/1, handle_info/2]).
36
37 %% Only for debug & tests!
38 -export([nodes_key/0, set_endpoints/1]).
39
40 -ignore_xref([handle_info/2, init/1, nodes_key/0, refresh/0, refresh/1, set_endpoints/1]).
41
42 %%--------------------------------------------------------------------
43 %% API
44 %%--------------------------------------------------------------------
45
46 -spec start(gen_mod:module_opts()) -> any().
47 start(#{redis := #{refresh_after := RefreshAfter}}) ->
48 21 mod_global_distrib_utils:create_ets([?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]),
49
50 21 Refresher = {mod_global_distrib_redis_refresher,
51 {gen_server, start_link, [?MODULE, RefreshAfter, []]},
52 permanent, 1000, supervisor, [?MODULE]},
53 21 ejabberd_sup:start_child(Refresher),
54 21 ok.
55
56 -spec stop() -> ok.
57 stop() ->
58 21 lists:foreach(fun(Id) -> ejabberd_sup:stop_child(Id) end,
59 [?MODULE, mod_global_distrib_redis_refresher]),
60 21 [ets:delete(Tab) || Tab <- [?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]],
61 21 ok.
62
63 -spec put_session(jid:literal_jid()) -> ok.
64 put_session(Jid) ->
65 142 ets:insert(?JIDS_ETS, {Jid}),
66 142 do_put(Jid, opt(local_host)).
67
68 -spec get_session(jid:literal_jid()) -> {ok, jid:lserver()} | error.
69 get_session(Jid) ->
70 80 do_get(Jid).
71
72 -spec delete_session(jid:literal_jid()) -> ok.
73 delete_session(Jid) ->
74 146 ets:delete(?JIDS_ETS, Jid),
75 146 do_delete(Jid).
76
77 -spec put_domain(jid:lserver(), IsHidden :: boolean()) -> ok.
78 put_domain(Domain, IsHidden) ->
79 10 ets:insert(?DOMAINS_ETS, {Domain}),
80 10 {ok, _} = q([<<"SADD">>, domains_key(), Domain]),
81 10 do_put(Domain, opt(local_host)),
82 10 case IsHidden of
83 false ->
84 9 ets:insert(?PUBLIC_DOMAINS_ETS, {Domain}),
85 9 {ok, _} = q([<<"SADD">>, public_domains_key(), Domain]),
86 9 ok;
87 true ->
88 1 ok
89 end.
90
91 -spec get_domain(jid:lserver()) -> {ok, jid:lserver()} | error.
92 get_domain(Domain) ->
93 17 do_get(Domain).
94
95 -spec delete_domain(jid:lserver()) -> ok.
96 delete_domain(Domain) ->
97 12 ets:delete(?DOMAINS_ETS, Domain),
98 12 ets:delete(?PUBLIC_DOMAINS_ETS, Domain),
99 12 do_delete(Domain),
100 12 {ok, _} = q([<<"SREM">>, domains_key(), Domain]),
101 12 ok.
102
103 -spec get_domains() -> [jid:lserver()].
104 get_domains() ->
105 3 get_domains(fun domains_key/2).
106
107 -spec get_public_domains() -> [jid:lserver()].
108 get_public_domains() ->
109 3 get_domains(fun public_domains_key/2).
110
111 -spec get_endpoints(Host :: jid:lserver()) -> [mod_global_distrib_utils:endpoint()].
112 get_endpoints(Host) ->
113 93 Nodes = get_nodes(Host),
114 93 get_endpoints_for_nodes(Host, Nodes).
115
116 get_endpoints_for_nodes(_Host, []) ->
117
:-(
[];
118 get_endpoints_for_nodes(Host, Nodes) ->
119 93 EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes],
120 93 {ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]),
121 93 lists:map(fun binary_to_endpoint/1, BinEndpoints).
122
123 %%--------------------------------------------------------------------
124 %% gen_server API
125 %%--------------------------------------------------------------------
126
127 init(RefreshAfter) ->
128 21 refresh_and_schedule_next("initial_autorefresh", RefreshAfter),
129 21 {ok, RefreshAfter}.
130
131 handle_info(refresh, RefreshAfter) ->
132
:-(
refresh_and_schedule_next("autorefresh", RefreshAfter),
133
:-(
{noreply, RefreshAfter}.
134
135 refresh() ->
136
:-(
refresh("reason_unknown").
137
138 refresh_and_schedule_next(Reason, RefreshAfter) ->
139 21 Reason2 = Reason ++ ",next_refresh_in=" ++ integer_to_list(RefreshAfter),
140 21 refresh(Reason2),
141 21 erlang:send_after(timer:seconds(RefreshAfter), self(), refresh).
142
143 -spec refresh(Reason :: string()) -> ok.
144 refresh(Reason) ->
145 38 ?LOG_DEBUG(#{what => gd_refreshing_own_hosts}),
146 38 refresh_hosts(),
147 38 ?LOG_DEBUG(#{what => gd_refreshing_own_nodes}),
148 38 refresh_nodes(),
149 38 ?LOG_DEBUG(#{what => gd_refreshing_own_jids}),
150 38 refresh_jids(),
151 38 ?LOG_DEBUG(#{what => gd_refreshing_own_endpoints}),
152 38 refresh_endpoints(),
153 38 ?LOG_DEBUG(#{what => gd_refreshing_own_domains}),
154 38 refresh_domains(),
155 38 ?LOG_DEBUG(#{what => gd_refreshing_own_public_domains}),
156 38 refresh_public_domains(),
157 38 ?LOG_INFO(#{what => gd_refreshing_own_data_done, reason => Reason}),
158 38 ok.
159
160 %%--------------------------------------------------------------------
161 %% Helpers
162 %%--------------------------------------------------------------------
163
164 -spec q(Args :: list()) -> {ok, term()}.
165 q(Args) ->
166 1895 {ok, Worker} = mongoose_wpool:get_worker(redis, global, pool()),
167 1895 case eredis:q(Worker, Args) of
168 {ok, _} = OKRes ->
169 1895 OKRes;
170 Error ->
171
:-(
?LOG_ERROR(#{what => gd_redis_query_error, worker => Worker,
172
:-(
redis_query => Args, reason => Error}),
173
:-(
error(Error)
174 end.
175
176 -spec nodes_key() -> binary().
177 nodes_key() ->
178 83 LocalHost = opt(local_host),
179 83 nodes_key(LocalHost).
180
181 -spec nodes_key(Host :: jid:lserver()) -> binary().
182 nodes_key(Host) when is_binary(Host) ->
183 188 <<Host/binary, "#{nodes}">>.
184
185 -spec endpoints_key() -> binary().
186 endpoints_key() ->
187 43 key(<<"endpoints">>).
188
189 -spec endpoints_key(Host :: jid:lserver(), Node :: binary()) -> binary().
190 endpoints_key(Host, Node) ->
191 117 key(Host, Node, <<"endpoints">>).
192
193 -spec domains_key() -> binary().
194 domains_key() ->
195 60 key(<<"domains">>).
196
197 -spec domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
198 domains_key(Host, Node) ->
199 6 key(Host, Node, <<"domains">>).
200
201 -spec public_domains_key() -> binary().
202 public_domains_key() ->
203 47 key(<<"public_domains">>).
204
205 -spec public_domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
206 public_domains_key(Host, Node) ->
207 7 key(Host, Node, <<"public_domains">>).
208
209 -spec key(Type :: binary()) -> binary().
210 key(Type) ->
211 150 LocalHost = opt(local_host),
212 150 Node = atom_to_binary(node(), latin1),
213 150 key(LocalHost, Node, Type).
214
215 -spec key(Host :: jid:lserver(), Node :: binary(), Type :: binary()) -> binary().
216 key(Host, Node, Type) ->
217 280 <<Host/binary, "#", Node/binary, "#{", Type/binary, "}">>.
218
219 -spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value().
220 opt(Key) ->
221 2902 mod_global_distrib_utils:opt(mod_global_distrib_mapping, Key).
222
223 -spec expire_after() -> pos_integer().
224 expire_after() ->
225 338 opt([redis, expire_after]).
226
227 -spec pool() -> atom().
228 pool() ->
229 1895 opt([redis, pool]).
230
231 -spec do_put(Key :: binary(), Host :: binary()) -> ok.
232 do_put(Key, Host) ->
233 152 {ok, _} = q([<<"SET">>, Key, Host, <<"EX">>, expire_after()]),
234 152 ok.
235
236 -spec do_get(Key :: binary()) -> {ok, Host :: binary()} | error.
237 do_get(Key) ->
238 97 case q([<<"GET">>, Key]) of
239 51 {ok, undefined} -> error;
240 46 {ok, Host} -> {ok, Host}
241 end.
242
243 -spec do_delete(Key :: binary()) -> ok.
244 do_delete(Key) ->
245 158 LocalHost = opt(local_host),
246 158 case q([<<"GET">>, Key]) of
247 149 {ok, LocalHost} -> {ok, _} = q([<<"DEL">>, Key]);
248 9 _ -> ok
249 end,
250 158 ok.
251
252 -spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) ->
253 [jid:lserver()].
254 get_domains(KeyFun) ->
255 6 Hosts = get_hosts(),
256 6 Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts),
257 6 Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes],
258 6 {ok, Domains} = q([<<"SUNION">> | Keys]),
259 6 Domains.
260
261 -spec refresh_hosts() -> any().
262 refresh_hosts() ->
263 38 q([<<"SADD">>, <<"hosts">>, opt(local_host)]).
264
265 -spec get_hosts() -> [Host :: jid:lserver()].
266 get_hosts() ->
267 565 {ok, Hosts} = q([<<"SMEMBERS">>, <<"hosts">>]),
268 565 Hosts.
269
270 -spec refresh_nodes() -> any().
271 refresh_nodes() ->
272 38 NodesKey = nodes_key(),
273 38 Now = erlang:system_time(second),
274 38 case get_expired_nodes(Now) of
275 37 [] -> ok;
276 1 ExpiredNodes -> q([<<"HDEL">>, NodesKey | ExpiredNodes])
277 end,
278 38 q([<<"HSET">>, NodesKey, atom_to_binary(node(), latin1), Now]).
279
280 -spec get_nodes(Host :: jid:lserver()) -> [Node :: binary()].
281 get_nodes(Host) ->
282 105 {ok, Nodes} = q([<<"HKEYS">>, nodes_key(Host)]),
283 105 Nodes.
284
285 -spec get_expired_nodes(Now :: integer()) -> [Node :: binary()].
286 get_expired_nodes(Now) ->
287 38 {ok, Results} = q([<<"HGETALL">>, nodes_key()]),
288 38 lists:foldl(
289 fun
290 (Val, [{Key} | Acc]) ->
291 54 Stamp = binary_to_integer(Val),
292 54 case Stamp + expire_after() > Now of
293 53 true -> Acc;
294 1 false -> [Key | Acc]
295 end;
296 (Key, Acc) ->
297 54 [{Key} | Acc]
298 end,
299 [],
300 Results).
301
302 -spec refresh_jids() -> any().
303 refresh_jids() ->
304 38 ets:foldl(fun({Jid}, _) -> refresh_jid(Jid) end, [], ?JIDS_ETS).
305
306 -spec refresh_jid(Jid :: binary()) -> any().
307 refresh_jid(Jid) ->
308 12 LocalHost = opt(local_host),
309 12 q([<<"SET">>, Jid, LocalHost, <<"EX">>, expire_after()]). %% TODO: log error
310
311 -spec refresh_endpoints() -> any().
312 refresh_endpoints() ->
313 38 set_endpoints(opt([connections, advertised_endpoints])).
314
315 -spec set_endpoints(Endpoints :: [mod_global_distrib_utils:endpoint()]) -> any().
316 set_endpoints(Endpoints) ->
317 43 EndpointsKey = endpoints_key(),
318 43 BinEndpoints = lists:map(fun endpoint_to_binary/1, Endpoints),
319 43 refresh_set(EndpointsKey, BinEndpoints).
320
321 -spec endpoint_to_binary(mod_global_distrib_utils:endpoint()) -> binary().
322 endpoint_to_binary({IpAddr, Port}) when is_tuple(IpAddr) ->
323
:-(
iolist_to_binary([inet:ntoa(IpAddr), "#", integer_to_binary(Port)]);
324 endpoint_to_binary({Domain, Port}) when is_list(Domain) ->
325 43 iolist_to_binary([unicode:characters_to_binary(Domain), "#", integer_to_binary(Port)]).
326
327 -spec binary_to_endpoint(binary()) -> mod_global_distrib_utils:endpoint().
328 binary_to_endpoint(Bin) ->
329 116 [Addr, BinPort] = binary:split(Bin, <<"#">>),
330 116 {_, IpAddrOrDomain} = mod_global_distrib_utils:parse_address(Addr),
331 116 Port = binary_to_integer(BinPort),
332 116 {IpAddrOrDomain, Port}.
333
334 -spec refresh_domains() -> any().
335 refresh_domains() ->
336 38 LocalHost = opt(local_host),
337 38 DomainsKey = domains_key(),
338 38 Domains = [Domain || {Domain} <- ets:tab2list(?DOMAINS_ETS)],
339 38 refresh_set(DomainsKey, Domains),
340 38 lists:foreach(fun(Domain) ->
341 1 q([<<"SET">>, Domain, LocalHost, <<"EX">>, expire_after()])
342 end,
343 Domains).
344
345 -spec refresh_public_domains() -> any().
346 refresh_public_domains() ->
347 38 DomainsKey = public_domains_key(),
348 38 Domains = [Domain || {Domain} <- ets:tab2list(?PUBLIC_DOMAINS_ETS)],
349 38 refresh_set(DomainsKey, Domains).
350
351 -spec refresh_set(Key :: binary(), Members :: [binary()]) -> any().
352 refresh_set(Key, Members) ->
353 119 q([<<"PERSIST">>, Key]),
354
355 119 ToDelete =
356 case catch q([<<"SMEMBERS">>, Key]) of
357 {ok, ExistingMembers} ->
358 119 ordsets:subtract(ordsets:from_list(ExistingMembers),
359 ordsets:from_list(Members));
360 _ ->
361
:-(
[]
362 end,
363
364 119 case ToDelete of
365 108 [] -> ok;
366 11 _ -> q([<<"SREM">>, Key | ToDelete])
367 end,
368
369 119 case Members of
370 76 [] -> ok;
371 43 _ -> q([<<"SADD">>, Key | Members])
372 end,
373 119 q([<<"EXPIRE">>, Key, expire_after()]).
Line Hits Source