./ct_report/coverage/mod_global_distrib_mapping_redis.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_mapping_redis).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(mod_global_distrib_mapping).
21
22 -include("mongoose.hrl").
23
24 -define(JIDS_ETS, mod_global_distrib_mapping_redis_jids).
25 -define(DOMAINS_ETS, mod_global_distrib_mapping_redis_domains).
26 -define(PUBLIC_DOMAINS_ETS, mod_global_distrib_mapping_redis_public_domains).
27
28 -export([start/1, stop/0,
29 put_session/1, get_session/1, delete_session/1,
30 put_domain/2, get_domain/1, delete_domain/1,
31 get_endpoints/1, get_domains/0, get_public_domains/0, get_hosts/0]).
32
33 -export([refresh/0, refresh/1]).
34
35 -export([init/1, handle_info/2]).
36
37 %% Only for debug & tests!
38 -export([nodes_key/0, set_endpoints/1]).
39
40 -ignore_xref([handle_info/2, init/1, nodes_key/0, refresh/0, refresh/1, set_endpoints/1]).
41
42 %%--------------------------------------------------------------------
43 %% API
44 %%--------------------------------------------------------------------
45
46 -spec start(proplists:proplist()) -> any().
47 start(Opts) ->
48
:-(
RefreshAfter = proplists:get_value(refresh_after, Opts, 60),
49
50
:-(
mod_global_distrib_utils:create_ets([?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]),
51
:-(
ExpireAfter = proplists:get_value(expire_after, Opts, 120),
52
:-(
ets:insert(?MODULE, {expire_after, ExpireAfter}),
53
:-(
PoolTag = proplists:get_value(pool, Opts, global_distrib),
54
:-(
ets:insert(?MODULE, {pool, PoolTag}),
55
56
:-(
Refresher = {mod_global_distrib_redis_refresher,
57 {gen_server, start_link, [?MODULE, RefreshAfter, []]},
58 permanent, 1000, supervisor, [?MODULE]},
59
:-(
ejabberd_sup:start_child(Refresher),
60
:-(
ok.
61
62 -spec stop() -> any().
63 stop() ->
64
:-(
lists:foreach(
65 fun(Id) ->
66
:-(
ejabberd_sup:stop_child(Id)
67 end,
68 [?MODULE, mod_global_distrib_redis_refresher]),
69
:-(
[ets:delete(Tab) || Tab <- [?MODULE, ?JIDS_ETS, ?DOMAINS_ETS, ?PUBLIC_DOMAINS_ETS]].
70
71 -spec put_session(Jid :: binary()) -> ok.
72 put_session(Jid) ->
73
:-(
ets:insert(?JIDS_ETS, {Jid}),
74
:-(
do_put(Jid, opt(local_host)).
75
76 -spec get_session(Jid :: binary()) -> {ok, Host :: binary()} | error.
77 get_session(Jid) ->
78
:-(
do_get(Jid).
79
80 -spec delete_session(Jid :: binary()) -> ok.
81 delete_session(Jid) ->
82
:-(
ets:delete(?JIDS_ETS, Jid),
83
:-(
do_delete(Jid).
84
85 -spec put_domain(Domain :: binary(), IsHidden :: boolean()) -> ok.
86 put_domain(Domain, IsHidden) ->
87
:-(
ets:insert(?DOMAINS_ETS, {Domain}),
88
:-(
{ok, _} = q([<<"SADD">>, domains_key(), Domain]),
89
:-(
do_put(Domain, opt(local_host)),
90
:-(
case IsHidden of
91 false ->
92
:-(
ets:insert(?PUBLIC_DOMAINS_ETS, {Domain}),
93
:-(
{ok, _} = q([<<"SADD">>, public_domains_key(), Domain]),
94
:-(
ok;
95 true ->
96
:-(
ok
97 end.
98
99 -spec get_domain(Domain :: binary()) -> {ok, Host :: binary()} | error.
100 get_domain(Domain) ->
101
:-(
do_get(Domain).
102
103 -spec delete_domain(Domain :: binary()) -> ok.
104 delete_domain(Domain) ->
105
:-(
ets:delete(?DOMAINS_ETS, Domain),
106
:-(
ets:delete(?PUBLIC_DOMAINS_ETS, Domain),
107
:-(
do_delete(Domain),
108
:-(
{ok, _} = q([<<"SREM">>, domains_key(), Domain]),
109
:-(
ok.
110
111 -spec get_domains() -> {ok, [Domain :: binary()]}.
112 get_domains() ->
113
:-(
get_domains(fun domains_key/2).
114
115 -spec get_public_domains() -> {ok, [Domain :: binary()]}.
116 get_public_domains() ->
117
:-(
get_domains(fun public_domains_key/2).
118
119 -spec get_endpoints(Host :: jid:lserver()) -> {ok, [mod_global_distrib_utils:endpoint()]}.
120 get_endpoints(Host) ->
121
:-(
Nodes = get_nodes(Host),
122
:-(
get_endpoints_for_nodes(Host, Nodes).
123
124 get_endpoints_for_nodes(_Host, []) ->
125
:-(
{ok, []};
126 get_endpoints_for_nodes(Host, Nodes) ->
127
:-(
EndpointKeys = [endpoints_key(Host, Node) || Node <- Nodes],
128
:-(
{ok, BinEndpoints} = q([<<"SUNION">> | EndpointKeys]),
129
:-(
{ok, lists:map(fun binary_to_endpoint/1, BinEndpoints)}.
130
131 %%--------------------------------------------------------------------
132 %% gen_server API
133 %%--------------------------------------------------------------------
134
135 init(RefreshAfter) ->
136
:-(
refresh_and_schedule_next("initial_autorefresh", RefreshAfter),
137
:-(
{ok, RefreshAfter}.
138
139 handle_info(refresh, RefreshAfter) ->
140
:-(
refresh_and_schedule_next("autorefresh", RefreshAfter),
141
:-(
{noreply, RefreshAfter}.
142
143 refresh() ->
144
:-(
refresh("reason_unknown").
145
146 refresh_and_schedule_next(Reason, RefreshAfter) ->
147
:-(
Reason2 = Reason ++ ",next_refresh_in=" ++ integer_to_list(RefreshAfter),
148
:-(
refresh(Reason2),
149
:-(
erlang:send_after(timer:seconds(RefreshAfter), self(), refresh).
150
151 -spec refresh(Reason :: string()) -> ok.
152 refresh(Reason) ->
153
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_hosts}),
154
:-(
refresh_hosts(),
155
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_nodes}),
156
:-(
refresh_nodes(),
157
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_jids}),
158
:-(
refresh_jids(),
159
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_endpoints}),
160
:-(
refresh_endpoints(),
161
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_domains}),
162
:-(
refresh_domains(),
163
:-(
?LOG_DEBUG(#{what => gd_refreshing_own_public_domains}),
164
:-(
refresh_public_domains(),
165
:-(
?LOG_INFO(#{what => gd_refreshing_own_data_done, reason => Reason}),
166
:-(
ok.
167
168 %%--------------------------------------------------------------------
169 %% Helpers
170 %%--------------------------------------------------------------------
171
172 -spec q(Args :: list()) -> {ok, term()}.
173 q(Args) ->
174
:-(
{ok, Worker} = mongoose_wpool:get_worker(redis, global, pool()),
175
:-(
case eredis:q(Worker, Args) of
176 {ok, _} = OKRes ->
177
:-(
OKRes;
178 Error ->
179
:-(
?LOG_ERROR(#{what => gd_redis_query_error, worker => Worker,
180
:-(
redis_query => Args, reason => Error}),
181
:-(
error(Error)
182 end.
183
184 -spec nodes_key() -> binary().
185 nodes_key() ->
186
:-(
LocalHost = opt(local_host),
187
:-(
nodes_key(LocalHost).
188
189 -spec nodes_key(Host :: jid:lserver()) -> binary().
190 nodes_key(Host) when is_binary(Host) ->
191
:-(
<<Host/binary, "#{nodes}">>.
192
193 -spec endpoints_key() -> binary().
194 endpoints_key() ->
195
:-(
key(<<"endpoints">>).
196
197 -spec endpoints_key(Host :: jid:lserver(), Node :: binary()) -> binary().
198 endpoints_key(Host, Node) ->
199
:-(
key(Host, Node, <<"endpoints">>).
200
201 -spec domains_key() -> binary().
202 domains_key() ->
203
:-(
key(<<"domains">>).
204
205 -spec domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
206 domains_key(Host, Node) ->
207
:-(
key(Host, Node, <<"domains">>).
208
209 -spec public_domains_key() -> binary().
210 public_domains_key() ->
211
:-(
key(<<"public_domains">>).
212
213 -spec public_domains_key(Host :: jid:lserver(), Node :: binary()) -> binary().
214 public_domains_key(Host, Node) ->
215
:-(
key(Host, Node, <<"public_domains">>).
216
217 -spec key(Type :: binary()) -> binary().
218 key(Type) ->
219
:-(
LocalHost = opt(local_host),
220
:-(
Node = atom_to_binary(node(), latin1),
221
:-(
key(LocalHost, Node, Type).
222
223 -spec key(Host :: jid:lserver(), Node :: binary(), Type :: binary()) -> binary().
224 key(Host, Node, Type) ->
225
:-(
<<Host/binary, "#", Node/binary, "#{", Type/binary, "}">>.
226
227 -spec opt(Key :: atom()) -> term().
228 opt(Key) ->
229
:-(
mod_global_distrib_utils:opt(mod_global_distrib_mapping, Key).
230
231 -spec expire_after() -> pos_integer().
232 expire_after() ->
233
:-(
ets:lookup_element(?MODULE, expire_after, 2).
234
235 -spec pool() -> atom().
236 pool() ->
237
:-(
ets:lookup_element(?MODULE, pool, 2).
238
239 -spec do_put(Key :: binary(), Host :: binary()) -> ok.
240 do_put(Key, Host) ->
241
:-(
{ok, _} = q([<<"SET">>, Key, Host, <<"EX">>, expire_after()]),
242
:-(
ok.
243
244 -spec do_get(Key :: binary()) -> {ok, Host :: binary()} | error.
245 do_get(Key) ->
246
:-(
case q([<<"GET">>, Key]) of
247
:-(
{ok, undefined} -> error;
248
:-(
{ok, Host} -> {ok, Host}
249 end.
250
251 -spec do_delete(Key :: binary()) -> ok.
252 do_delete(Key) ->
253
:-(
LocalHost = opt(local_host),
254
:-(
case q([<<"GET">>, Key]) of
255
:-(
{ok, LocalHost} -> {ok, _} = q([<<"DEL">>, Key]);
256
:-(
_ -> ok
257 end,
258
:-(
ok.
259
260 -spec get_domains(KeyFun :: fun((Host :: binary(), Node :: binary()) -> Key :: binary())) ->
261 {ok, Domains :: [binary()]}.
262 get_domains(KeyFun) ->
263
:-(
Hosts = get_hosts(),
264
:-(
Nodes = lists:flatmap(fun(Host) -> [{Host, Node} || Node <- get_nodes(Host)] end, Hosts),
265
:-(
Keys = [KeyFun(Host, Node) || {Host, Node} <- Nodes],
266
:-(
{ok, _Domains} = q([<<"SUNION">> | Keys]).
267
268 -spec refresh_hosts() -> any().
269 refresh_hosts() ->
270
:-(
q([<<"SADD">>, <<"hosts">>, opt(local_host)]).
271
272 -spec get_hosts() -> [Host :: jid:lserver()].
273 get_hosts() ->
274
:-(
{ok, Hosts} = q([<<"SMEMBERS">>, <<"hosts">>]),
275
:-(
Hosts.
276
277 -spec refresh_nodes() -> any().
278 refresh_nodes() ->
279
:-(
NodesKey = nodes_key(),
280
:-(
Now = erlang:system_time(second),
281
:-(
case get_expired_nodes(Now) of
282
:-(
[] -> ok;
283
:-(
ExpiredNodes -> q([<<"HDEL">>, NodesKey | ExpiredNodes])
284 end,
285
:-(
q([<<"HSET">>, NodesKey, atom_to_binary(node(), latin1), Now]).
286
287 -spec get_nodes(Host :: jid:lserver()) -> [Node :: binary()].
288 get_nodes(Host) ->
289
:-(
{ok, Nodes} = q([<<"HKEYS">>, nodes_key(Host)]),
290
:-(
Nodes.
291
292 -spec get_expired_nodes(Now :: integer()) -> [Node :: binary()].
293 get_expired_nodes(Now) ->
294
:-(
{ok, Results} = q([<<"HGETALL">>, nodes_key()]),
295
:-(
lists:foldl(
296 fun
297 (Val, [{Key} | Acc]) ->
298
:-(
Stamp = binary_to_integer(Val),
299
:-(
case Stamp + expire_after() > Now of
300
:-(
true -> Acc;
301
:-(
false -> [Key | Acc]
302 end;
303 (Key, Acc) ->
304
:-(
[{Key} | Acc]
305 end,
306 [],
307 Results).
308
309 -spec refresh_jids() -> any().
310 refresh_jids() ->
311
:-(
ets:foldl(fun({Jid}, _) -> refresh_jid(Jid) end, [], ?JIDS_ETS).
312
313 -spec refresh_jid(Jid :: binary()) -> any().
314 refresh_jid(Jid) ->
315
:-(
LocalHost = opt(local_host),
316
:-(
q([<<"SET">>, Jid, LocalHost, <<"EX">>, expire_after()]). %% TODO: log error
317
318 -spec refresh_endpoints() -> any().
319 refresh_endpoints() ->
320
:-(
AdvertisedEndpoints = opt(advertised_endpoints),
321
:-(
LocalEndpoints = mod_global_distrib_receiver:endpoints(),
322
:-(
FinalEndpoints = case AdvertisedEndpoints of
323
:-(
false -> LocalEndpoints;
324
:-(
Endpoints -> Endpoints
325 end,
326
:-(
set_endpoints(FinalEndpoints).
327
328 -spec set_endpoints(Endpoints :: [mod_global_distrib_utils:endpoint()]) -> any().
329 set_endpoints(Endpoints) ->
330
:-(
EndpointsKey = endpoints_key(),
331
:-(
BinEndpoints = lists:map(fun endpoint_to_binary/1, Endpoints),
332
:-(
refresh_set(EndpointsKey, BinEndpoints).
333
334 -spec endpoint_to_binary(mod_global_distrib_utils:endpoint()) -> binary().
335 endpoint_to_binary({IpAddr, Port}) when is_tuple(IpAddr) ->
336
:-(
iolist_to_binary([inet:ntoa(IpAddr), "#", integer_to_binary(Port)]);
337 endpoint_to_binary({Domain, Port}) when is_list(Domain) ->
338
:-(
iolist_to_binary([unicode:characters_to_binary(Domain), "#", integer_to_binary(Port)]).
339
340 -spec binary_to_endpoint(binary()) -> mod_global_distrib_utils:endpoint().
341 binary_to_endpoint(Bin) ->
342
:-(
[Addr, BinPort] = binary:split(Bin, <<"#">>),
343
:-(
{_, IpAddrOrDomain} = mod_global_distrib_utils:parse_address(Addr),
344
:-(
Port = binary_to_integer(BinPort),
345
:-(
{IpAddrOrDomain, Port}.
346
347 -spec refresh_domains() -> any().
348 refresh_domains() ->
349
:-(
LocalHost = opt(local_host),
350
:-(
DomainsKey = domains_key(),
351
:-(
Domains = [Domain || {Domain} <- ets:tab2list(?DOMAINS_ETS)],
352
:-(
refresh_set(DomainsKey, Domains),
353
:-(
lists:foreach(fun(Domain) ->
354
:-(
q([<<"SET">>, Domain, LocalHost, <<"EX">>, expire_after()])
355 end,
356 Domains).
357
358 -spec refresh_public_domains() -> any().
359 refresh_public_domains() ->
360
:-(
DomainsKey = public_domains_key(),
361
:-(
Domains = [Domain || {Domain} <- ets:tab2list(?PUBLIC_DOMAINS_ETS)],
362
:-(
refresh_set(DomainsKey, Domains).
363
364 -spec refresh_set(Key :: binary(), Members :: [binary()]) -> any().
365 refresh_set(Key, Members) ->
366
:-(
q([<<"PERSIST">>, Key]),
367
368
:-(
ToDelete =
369 case catch q([<<"SMEMBERS">>, Key]) of
370 {ok, ExistingMembers} ->
371
:-(
ordsets:subtract(ordsets:from_list(ExistingMembers),
372 ordsets:from_list(Members));
373 _ ->
374
:-(
[]
375 end,
376
377
:-(
case ToDelete of
378
:-(
[] -> ok;
379
:-(
_ -> q([<<"SREM">>, Key | ToDelete])
380 end,
381
382
:-(
case Members of
383
:-(
[] -> ok;
384
:-(
_ -> q([<<"SADD">>, Key | Members])
385 end,
386
:-(
q([<<"EXPIRE">>, Key, expire_after()]).
Line Hits Source