./ct_report/coverage/mod_global_distrib_mapping_redis.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_mapping_redis).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(mod_global_distrib_mapping).
21
22 -include("mongoose.hrl").
23
24 -define(JIDS_ETS, mod_global_distrib_mapping_redis_jids).
25 -define(DOMAINS_ETS, mod_global_distrib_mapping_redis_domains).
26 -define(PUBLIC_DOMAINS_ETS, mod_global_distrib_mapping_redis_public_domains).
27
28 -export([start/1, stop/0,
29 put_session/1, get_session/1, delete_session/1,
30 put_domain/2, get_domain/1, delete_domain/1,
31 get_endpoints/1, get_domains/0, get_public_domains/0, get_hosts/0]).
32
33 -export([refresh/0, refresh/1]).
34
35 -export([init/1, handle_info/2]).
36
37 %% Only for debug & tests!
38 -export([nodes_key/0, set_endpoints/1]).
39
40 -ignore_xref([handle_info/2, init/1, nodes_key/0, refresh/0, refresh/1, set_endpoints/1]).
41
42 %%--------------------------------------------------------------------
43 %% API
44 %%--------------------------------------------------------------------
45
46 -spec start(proplists:proplist()) -> any().
47 start(Opts) ->
48 21 RefreshAfter = proplists:get_value(refresh_after, Opts, 60),
49
50 21 mod_global_distrib_utils:create_ets([?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]),
51 21 ExpireAfter = proplists:get_value(expire_after, Opts, 120),
52 21 ets:insert(?MODULE, {expire_after, ExpireAfter}),
53 21 PoolTag = proplists:get_value(pool, Opts, global_distrib),
54 21 ets:insert(?MODULE, {pool, PoolTag}),
55
56 21 Refresher = {mod_global_distrib_redis_refresher,
57 {gen_server, start_link, [?MODULE, RefreshAfter, []]},
58 permanent, 1000, supervisor, [?MODULE]},
59 21 ejabberd_sup:start_child(Refresher),
60 21 ok.
61
62 -spec stop() -> any().
63 stop() ->
64 21 lists:foreach(
65 fun(Id) ->
66 42 ejabberd_sup:stop_child(Id)
67 end,
68 [?MODULE, mod_global_distrib_redis_refresher]),
69 21 [ets:delete(Tab) || Tab <- [?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]].
70
71 -spec put_session(Jid :: binary()) -> ok.
72 put_session(Jid) ->
73 150 ets:insert(?JIDS_ETS, {Jid}),
74 150 do_put(Jid, opt(local_host)).
75
76 -spec get_session(Jid :: binary()) -> {ok, Host :: binary()} | error.
77 get_session(Jid) ->
78 79 do_get(Jid).
79
80 -spec delete_session(Jid :: binary()) -> ok.
81 delete_session(Jid) ->
82 152 ets:delete(?JIDS_ETS, Jid),
83 152 do_delete(Jid).
84
85 -spec put_domain(Domain :: binary(), IsHidden :: boolean()) -> ok.
86 put_domain(Domain, IsHidden) ->
87 12 ets:insert(?DOMAINS_ETS, {Domain}),
88 12 {ok, _} = q([<<"SADD">>, domains_key(), Domain]),
89 12 do_put(Domain, opt(local_host)),
90 12 case IsHidden of
91 false ->
92 11 ets:insert(?PUBLIC_DOMAINS_ETS, {Domain}),
93 11 {ok, _} = q([<<"SADD">>, public_domains_key(), Domain]),
94 11 ok;
95 true ->
96 1 ok
97 end.
98
99 -spec get_domain(Domain :: binary()) -> {ok, Host :: binary()} | error.
100 get_domain(Domain) ->
101 17 do_get(Domain).
102
103 -spec delete_domain(Domain :: binary()) -> ok.
104 delete_domain(Domain) ->
105 12 ets:delete(?DOMAINS_ETS, Domain),
106 12 ets:delete(?PUBLIC_DOMAINS_ETS, Domain),
107 12 do_delete(Domain),
108 12 {ok, _} = q([<<"SREM">>, domains_key(), Domain]),
109 12 ok.
110
111 -spec get_domains() -> {ok, [Domain :: binary()]}.
112 get_domains() ->
113 3 get_domains(fun domains_key/2).
114
115 -spec get_public_domains() -> {ok, [Domain :: binary()]}.
116 get_public_domains() ->
117 3 get_domains(fun public_domains_key/2).
118
119 -spec get_endpoints(Host :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}.
120 get_endpoints(Host) ->
121 86 Nodes = get_nodes(Host),
122 86 get_endpoints_for_nodes(Host, Nodes).
123
124 get_endpoints_for_nodes(_Host, []) ->
125
:-(
{ok, []};
126 get_endpoints_for_nodes(Host, Nodes) ->
127 86 EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes],
128 86 {ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]),
129 86 {ok, lists:map(fun binary_to_endpoint/1, BinEndpoints)}.
130
131 %%--------------------------------------------------------------------
132 %% gen_server API
133 %%--------------------------------------------------------------------
134
135 init(RefreshAfter) ->
136 21 refresh_and_schedule_next("initial_autorefresh", RefreshAfter),
137 21 {ok, RefreshAfter}.
138
139 handle_info(refresh, RefreshAfter) ->
140
:-(
refresh_and_schedule_next("autorefresh", RefreshAfter),
141
:-(
{noreply, RefreshAfter}.
142
143 refresh() ->
144
:-(
refresh("reason_unknown").
145
146 refresh_and_schedule_next(Reason, RefreshAfter) ->
147 21 Reason2 = Reason ++ ",next_refresh_in=" ++ integer_to_list(RefreshAfter),
148 21 refresh(Reason2),
149 21 erlang:send_after(timer:seconds(RefreshAfter), self(), refresh).
150
151 -spec refresh(Reason :: string()) -> ok.
152 refresh(Reason) ->
153 40 ?LOG_DEBUG(#{what => gd_refreshing_own_hosts}),
154 40 refresh_hosts(),
155 40 ?LOG_DEBUG(#{what => gd_refreshing_own_nodes}),
156 40 refresh_nodes(),
157 40 ?LOG_DEBUG(#{what => gd_refreshing_own_jids}),
158 40 refresh_jids(),
159 40 ?LOG_DEBUG(#{what => gd_refreshing_own_endpoints}),
160 40 refresh_endpoints(),
161 40 ?LOG_DEBUG(#{what => gd_refreshing_own_domains}),
162 40 refresh_domains(),
163 40 ?LOG_DEBUG(#{what => gd_refreshing_own_public_domains}),
164 40 refresh_public_domains(),
165 40 ?LOG_INFO(#{what => gd_refreshing_own_data_done, reason => Reason}),
166 40 ok.
167
168 %%--------------------------------------------------------------------
169 %% Helpers
170 %%--------------------------------------------------------------------
171
172 -spec q(Args :: list()) -> {ok, term()}.
173 q(Args) ->
174 2400 {ok, Worker} = mongoose_wpool:get_worker(redis, global, pool()),
175 2400 case eredis:q(Worker, Args) of
176 {ok, _} = OKRes ->
177 2399 OKRes;
178 Error ->
179
:-(
?LOG_ERROR(#{what => gd_redis_query_error, worker => Worker,
180
:-(
redis_query => Args, reason => Error}),
181
:-(
error(Error)
182 end.
183
184 -spec nodes_key() -> binary().
185 nodes_key() ->
186 87 LocalHost = opt(local_host),
187 87 nodes_key(LocalHost).
188
189 -spec nodes_key(Host :: jid:lserver()) -> binary().
190 nodes_key(Host) when is_binary(Host) ->
191 185 <<Host/binary, "#{nodes}">>.
192
193 -spec endpoints_key() -> binary().
194 endpoints_key() ->
195 43 key(<<"endpoints">>).
196
197 -spec endpoints_key(Host :: jid:lserver(), Node :: binary()) -> binary().
198 endpoints_key(Host, Node) ->
199 109 key(Host, Node, <<"endpoints">>).
200
201 -spec domains_key() -> binary().
202 domains_key() ->
203 64 key(<<"domains">>).
204
205 -spec domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
206 domains_key(Host, Node) ->
207 6 key(Host, Node, <<"domains">>).
208
209 -spec public_domains_key() -> binary().
210 public_domains_key() ->
211 51 key(<<"public_domains">>).
212
213 -spec public_domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
214 public_domains_key(Host, Node) ->
215 7 key(Host, Node, <<"public_domains">>).
216
217 -spec key(Type :: binary()) -> binary().
218 key(Type) ->
219 158 LocalHost = opt(local_host),
220 158 Node = atom_to_binary(node(), latin1),
221 158 key(LocalHost, Node, Type).
222
223 -spec key(Host :: jid:lserver(), Node :: binary(), Type :: binary()) -> binary().
224 key(Host, Node, Type) ->
225 280 <<Host/binary, "#", Node/binary, "#{", Type/binary, "}">>.
226
227 -spec opt(Key :: atom()) -> term().
228 opt(Key) ->
229 703 mod_global_distrib_utils:opt(mod_global_distrib_mapping, Key).
230
231 -spec expire_after() -> pos_integer().
232 expire_after() ->
233 360 ets:lookup_element(?MODULE, expire_after, 2).
234
235 -spec pool() -> atom().
236 pool() ->
237 2400 ets:lookup_element(?MODULE, pool, 2).
238
239 -spec do_put(Key :: binary(), Host :: binary()) -> ok.
240 do_put(Key, Host) ->
241 162 {ok, _} = q([<<"SET">>, Key, Host, <<"EX">>, expire_after()]),
242 162 ok.
243
244 -spec do_get(Key :: binary()) -> {ok, Host :: binary()} | error.
245 do_get(Key) ->
246 96 case q([<<"GET">>, Key]) of
247 41 {ok, undefined} -> error;
248 54 {ok, Host} -> {ok, Host}
249 end.
250
251 -spec do_delete(Key :: binary()) -> ok.
252 do_delete(Key) ->
253 164 LocalHost = opt(local_host),
254 164 case q([<<"GET">>, Key]) of
255 157 {ok, LocalHost} -> {ok, _} = q([<<"DEL">>, Key]);
256 7 _ -> ok
257 end,
258 164 ok.
259
260 -spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) ->
261 {ok, Domains :: [binary()]}.
262 get_domains(KeyFun) ->
263 6 Hosts = get_hosts(),
264 6 Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts),
265 6 Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes],
266 6 {ok, _Domains} = q([<<"SUNION">> | Keys]).
267
268 -spec refresh_hosts() -> any().
269 refresh_hosts() ->
270 40 q([<<"SADD">>, <<"hosts">>, opt(local_host)]).
271
272 -spec get_hosts() -> [Host :: jid:lserver()].
273 get_hosts() ->
274 1017 {ok, Hosts} = q([<<"SMEMBERS">>, <<"hosts">>]),
275 1017 Hosts.
276
277 -spec refresh_nodes() -> any().
278 refresh_nodes() ->
279 40 NodesKey = nodes_key(),
280 40 Now = erlang:system_time(second),
281 40 case get_expired_nodes(Now) of
282 39 [] -> ok;
283 1 ExpiredNodes -> q([<<"HDEL">>, NodesKey | ExpiredNodes])
284 end,
285 40 q([<<"HSET">>, NodesKey, atom_to_binary(node(), latin1), Now]).
286
287 -spec get_nodes(Host :: jid:lserver()) -> [Node :: binary()].
288 get_nodes(Host) ->
289 98 {ok, Nodes} = q([<<"HKEYS">>, nodes_key(Host)]),
290 98 Nodes.
291
292 -spec get_expired_nodes(Now :: integer()) -> [Node :: binary()].
293 get_expired_nodes(Now) ->
294 40 {ok, Results} = q([<<"HGETALL">>, nodes_key()]),
295 40 lists:foldl(
296 fun
297 (Val, [{Key} | Acc]) ->
298 56 Stamp = binary_to_integer(Val),
299 56 case Stamp + expire_after() > Now of
300 55 true -> Acc;
301 1 false -> [Key | Acc]
302 end;
303 (Key, Acc) ->
304 56 [{Key} | Acc]
305 end,
306 [],
307 Results).
308
309 -spec refresh_jids() -> any().
310 refresh_jids() ->
311 40 ets:foldl(fun({Jid}, _) -> refresh_jid(Jid) end, [], ?JIDS_ETS).
312
313 -spec refresh_jid(Jid :: binary()) -> any().
314 refresh_jid(Jid) ->
315 12 LocalHost = opt(local_host),
316 12 q([<<"SET">>, Jid, LocalHost, <<"EX">>, expire_after()]). %% TODO: log error
317
318 -spec refresh_endpoints() -> any().
319 refresh_endpoints() ->
320 40 AdvertisedEndpoints = opt(advertised_endpoints),
321 40 LocalEndpoints = mod_global_distrib_receiver:endpoints(),
322 40 FinalEndpoints = case AdvertisedEndpoints of
323 39 false -> LocalEndpoints;
324 1 Endpoints -> Endpoints
325 end,
326 40 set_endpoints(FinalEndpoints).
327
328 -spec set_endpoints(Endpoints :: [mod_global_distrib_utils:endpoint()]) -> any().
329 set_endpoints(Endpoints) ->
330 43 EndpointsKey = endpoints_key(),
331 43 BinEndpoints = lists:map(fun endpoint_to_binary/1, Endpoints),
332 43 refresh_set(EndpointsKey, BinEndpoints).
333
334 -spec endpoint_to_binary(mod_global_distrib_utils:endpoint()) -> binary().
335 endpoint_to_binary({IpAddr, Port}) when is_tuple(IpAddr) ->
336 42 iolist_to_binary([inet:ntoa(IpAddr), "#", integer_to_binary(Port)]);
337 endpoint_to_binary({Domain, Port}) when is_list(Domain) ->
338 1 iolist_to_binary([unicode:characters_to_binary(Domain), "#", integer_to_binary(Port)]).
339
340 -spec binary_to_endpoint(binary()) -> mod_global_distrib_utils:endpoint().
341 binary_to_endpoint(Bin) ->
342 109 [Addr, BinPort] = binary:split(Bin, <<"#">>),
343 109 {_, IpAddrOrDomain} = mod_global_distrib_utils:parse_address(Addr),
344 109 Port = binary_to_integer(BinPort),
345 109 {IpAddrOrDomain, Port}.
346
347 -spec refresh_domains() -> any().
348 refresh_domains() ->
349 40 LocalHost = opt(local_host),
350 40 DomainsKey = domains_key(),
351 40 Domains = [Domain || {Domain} <- ets:tab2list(?DOMAINS_ETS)],
352 40 refresh_set(DomainsKey, Domains),
353 40 lists:foreach(fun(Domain) ->
354 7 q([<<"SET">>, Domain, LocalHost, <<"EX">>, expire_after()])
355 end,
356 Domains).
357
358 -spec refresh_public_domains() -> any().
359 refresh_public_domains() ->
360 40 DomainsKey = public_domains_key(),
361 40 Domains = [Domain || {Domain} <- ets:tab2list(?PUBLIC_DOMAINS_ETS)],
362 40 refresh_set(DomainsKey, Domains).
363
364 -spec refresh_set(Key :: binary(), Members :: [binary()]) -> any().
365 refresh_set(Key, Members) ->
366 123 q([<<"PERSIST">>, Key]),
367
368 123 ToDelete =
369 case catch q([<<"SMEMBERS">>, Key]) of
370 {ok, ExistingMembers} ->
371 123 ordsets:subtract(ordsets:from_list(ExistingMembers),
372 ordsets:from_list(Members));
373 _ ->
374
:-(
[]
375 end,
376
377 123 case ToDelete of
378 108 [] -> ok;
379 15 _ -> q([<<"SREM">>, Key | ToDelete])
380 end,
381
382 123 case Members of
383 68 [] -> ok;
384 55 _ -> q([<<"SADD">>, Key | Members])
385 end,
386 123 q([<<"EXPIRE">>, Key, expire_after()]).
Line Hits Source