1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_mapping). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_mod). |
21 |
|
-behaviour(mongoose_module_metrics). |
22 |
|
|
23 |
|
-include("mongoose.hrl"). |
24 |
|
-include("jlib.hrl"). |
25 |
|
-include("global_distrib_metrics.hrl"). |
26 |
|
|
27 |
|
-define(DOMAIN_TAB, mod_global_distrib_domain_cache_tab). |
28 |
|
-define(JID_TAB, mod_global_distrib_jid_cache_tab). |
29 |
|
|
30 |
|
-export([start/2, stop/1, deps/2]). |
31 |
|
-export([for_domain/1, insert_for_domain/1, insert_for_domain/2, insert_for_domain/3, |
32 |
|
cache_domain/2, delete_for_domain/1, all_domains/0, public_domains/0]). |
33 |
|
-export([for_jid/1, insert_for_jid/1, cache_jid/2, delete_for_jid/1, clear_cache/1]). |
34 |
|
-export([register_subhost/3, unregister_subhost/2, packet_to_component/3, |
35 |
|
session_opened/5, session_closed/5]). |
36 |
|
-export([endpoints/1, hosts/0]). |
37 |
|
|
38 |
|
-ignore_xref([ |
39 |
|
delete_for_domain/1, delete_for_jid/1, insert_for_domain/1, |
40 |
|
insert_for_domain/2, insert_for_domain/3, insert_for_jid/1, packet_to_component/3, |
41 |
|
register_subhost/3, session_closed/5, session_opened/5, unregister_subhost/2 |
42 |
|
]). |
43 |
|
|
44 |
|
-type endpoint() :: mod_global_distrib_utils:endpoint(). |
45 |
|
|
46 |
|
%%-------------------------------------------------------------------- |
47 |
|
%% API |
48 |
|
%%-------------------------------------------------------------------- |
49 |
|
|
50 |
|
-spec for_domain(Domain :: binary()) -> {ok, Host :: jid:lserver()} | error. |
51 |
|
for_domain(Domain) when is_binary(Domain) -> |
52 |
45 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MAPPING_FETCHES, 1), |
53 |
45 |
{Time, R} = timer:tc(ets_cache, lookup, [?DOMAIN_TAB, Domain, fun() -> get_domain(Domain) end]), |
54 |
45 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MAPPING_FETCH_TIME, Time), |
55 |
45 |
R. |
56 |
|
|
57 |
|
-spec insert_for_domain(Domain :: binary()) -> ok. |
58 |
|
insert_for_domain(Domain) when is_binary(Domain) -> |
59 |
:-( |
insert_for_domain(Domain, false). |
60 |
|
|
61 |
|
-spec insert_for_domain(Domain :: binary(), IsHidden :: boolean()) -> ok. |
62 |
|
insert_for_domain(Domain, IsHidden) when is_binary(Domain) -> |
63 |
10 |
LocalHost = opt(local_host), |
64 |
10 |
insert_for_domain(Domain, LocalHost, IsHidden). |
65 |
|
|
66 |
|
-spec insert_for_domain(Domain :: binary(), Host :: binary(), IsHidden :: boolean()) -> ok. |
67 |
|
insert_for_domain(Domain, Host, IsHidden) when is_binary(Domain), is_binary(Host) -> |
68 |
10 |
do_insert_for_domain(Domain, Host, fun(ToStore) -> put_domain(ToStore, IsHidden) end). |
69 |
|
|
70 |
|
-spec cache_domain(Domain :: binary(), Host :: binary()) -> ok. |
71 |
|
cache_domain(Domain, Host) when is_binary(Domain), is_binary(Host) -> |
72 |
:-( |
do_insert_for_domain(Domain, Host, fun(_) -> ok end). |
73 |
|
|
74 |
|
-spec delete_for_domain(Domain :: binary()) -> ok. |
75 |
|
delete_for_domain(Domain) when is_binary(Domain) -> |
76 |
12 |
delete_domain(Domain), |
77 |
12 |
ets_cache:delete(?DOMAIN_TAB, Domain). |
78 |
|
|
79 |
|
-spec for_jid(jid:jid() | jid:ljid()) -> {ok, Host :: jid:lserver()} | error. |
80 |
1099 |
for_jid(#jid{} = Jid) -> for_jid(jid:to_lower(Jid)); |
81 |
|
for_jid({_, _, _} = Jid) -> |
82 |
1099 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MAPPING_FETCHES, 1), |
83 |
1099 |
{Time, R} = timer:tc(fun do_lookup_jid/1, [Jid]), |
84 |
1099 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MAPPING_FETCH_TIME, Time), |
85 |
1099 |
R. |
86 |
|
|
87 |
|
-spec insert_for_jid(jid:jid() | jid:ljid()) -> ok. |
88 |
|
insert_for_jid(Jid) -> |
89 |
75 |
LocalHost = opt(local_host), |
90 |
75 |
do_insert_for_jid(Jid, LocalHost, fun put_session/1). |
91 |
|
|
92 |
|
-spec cache_jid(jid:jid() | jid:ljid(), Host :: jid:lserver()) -> ok. |
93 |
|
cache_jid(Jid, Host) when is_binary(Host) -> |
94 |
9 |
do_insert_for_jid(Jid, Host, fun(_) -> ok end). |
95 |
|
|
96 |
|
-spec clear_cache(jid:jid()) -> ok. |
97 |
|
clear_cache(#jid{} = Jid) -> |
98 |
38 |
GlobalHost = opt(global_host), |
99 |
38 |
case jid:to_lower(Jid) of |
100 |
|
{_, GlobalHost, _} = LJid -> |
101 |
29 |
[ets_cache:delete(?JID_TAB, J) || J <- normalize_jid(LJid)]; |
102 |
|
{_, SubHost, _} -> |
103 |
9 |
ets_cache:delete(?DOMAIN_TAB, SubHost) |
104 |
|
end. |
105 |
|
|
106 |
|
-spec delete_for_jid(jid:jid() | jid:ljid()) -> ok. |
107 |
76 |
delete_for_jid(#jid{} = Jid) -> delete_for_jid(jid:to_lower(Jid)); |
108 |
|
delete_for_jid({_, _, _} = Jid) -> |
109 |
76 |
lists:foreach( |
110 |
|
fun(BinJid) -> |
111 |
152 |
delete_session(BinJid), |
112 |
152 |
ets_cache:delete(?JID_TAB, BinJid) |
113 |
|
end, |
114 |
|
normalize_jid(Jid)). |
115 |
|
|
116 |
|
-spec all_domains() -> [jid:lserver()]. |
117 |
|
all_domains() -> |
118 |
3 |
mod_global_distrib_mapping_backend:get_domains(). |
119 |
|
|
120 |
|
-spec public_domains() -> [jid:lserver()]. |
121 |
|
public_domains() -> |
122 |
3 |
mod_global_distrib_mapping_backend:get_public_domains(). |
123 |
|
|
124 |
|
-spec endpoints(Host :: jid:lserver()) -> [endpoint()]. |
125 |
|
endpoints(Host) -> |
126 |
82 |
mod_global_distrib_mapping_backend:get_endpoints(Host). |
127 |
|
|
128 |
|
-spec hosts() -> [Host :: jid:lserver()]. |
129 |
|
hosts() -> |
130 |
962 |
mod_global_distrib_mapping_backend:get_hosts(). |
131 |
|
|
132 |
|
%%-------------------------------------------------------------------- |
133 |
|
%% gen_mod API |
134 |
|
%%-------------------------------------------------------------------- |
135 |
|
|
136 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
137 |
|
start(HostType, Opts = #{cache := CacheOpts}) -> |
138 |
21 |
mod_global_distrib_mapping_backend:start(Opts#{backend => redis}), |
139 |
|
|
140 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_MAPPING_FETCH_TIME, histogram), |
141 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_MAPPING_FETCHES, spiral), |
142 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_MAPPING_CACHE_MISSES, spiral), |
143 |
|
|
144 |
21 |
#{cache_missed := CacheMissed, |
145 |
|
domain_lifetime_seconds := DomainLifetimeSec, |
146 |
|
jid_lifetime_seconds := JidLifeTimeSec, |
147 |
|
max_jids := MaxJids} = CacheOpts, |
148 |
21 |
DomainLifetime = timer:seconds(DomainLifetimeSec), |
149 |
21 |
JidLifetime = timer:seconds(JidLifeTimeSec), |
150 |
|
|
151 |
21 |
ejabberd_hooks:add(hooks(HostType)), |
152 |
|
|
153 |
21 |
ets_cache:new(?DOMAIN_TAB, [{cache_missed, CacheMissed}, {life_time, DomainLifetime}]), |
154 |
21 |
ets_cache:new(?JID_TAB, [{cache_missed, CacheMissed}, {life_time, JidLifetime}, |
155 |
|
{max_size, MaxJids}]). |
156 |
|
|
157 |
|
-spec stop(mongooseim:host_type()) -> any(). |
158 |
|
stop(HostType) -> |
159 |
21 |
ets_cache:delete(?JID_TAB), |
160 |
21 |
ets_cache:delete(?DOMAIN_TAB), |
161 |
21 |
ejabberd_hooks:delete(hooks(HostType)), |
162 |
21 |
mod_global_distrib_mapping_backend:stop(). |
163 |
|
|
164 |
|
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps(). |
165 |
|
deps(_HostType, Opts) -> |
166 |
106 |
[{mod_global_distrib_utils, Opts, hard}, |
167 |
|
{mod_global_distrib_receiver, Opts, hard}]. |
168 |
|
|
169 |
|
%%-------------------------------------------------------------------- |
170 |
|
%% Hooks implementation |
171 |
|
%%-------------------------------------------------------------------- |
172 |
|
|
173 |
|
-spec session_opened(Acc, binary(), ejabberd_sm:sid(), UserJID :: jid:jid(), Info :: list()) -> |
174 |
|
Acc when Acc :: any(). |
175 |
|
session_opened(Acc, _HostType, _SID, UserJid, _Info) -> |
176 |
75 |
insert_for_jid(UserJid), |
177 |
75 |
Acc. |
178 |
|
|
179 |
|
-spec session_closed(mongoose_acc:t(), |
180 |
|
ejabberd_sm:sid(), |
181 |
|
UserJID :: jid:jid(), |
182 |
|
Info :: list(), |
183 |
|
_Status :: any()) -> |
184 |
|
mongoose_acc:t(). |
185 |
|
session_closed(Acc, _SID, UserJid, _Info, _Reason) -> |
186 |
74 |
delete_for_jid(UserJid), |
187 |
74 |
Acc. |
188 |
|
|
189 |
|
-spec packet_to_component(Acc :: mongoose_acc:t(), |
190 |
|
From :: jid:jid(), |
191 |
|
To :: jid:jid()) -> mongoose_acc:t(). |
192 |
|
packet_to_component(Acc, From, _To) -> |
193 |
5 |
mod_global_distrib_utils:maybe_update_mapping(From, Acc), |
194 |
5 |
Acc. |
195 |
|
|
196 |
|
-spec register_subhost(any(), SubHost :: binary(), IsHidden :: boolean()) -> ok. |
197 |
|
register_subhost(_, SubHost, IsHidden) -> |
198 |
10 |
IsSubhostOf = |
199 |
|
fun(Host) -> |
200 |
29 |
case binary:match(SubHost, Host) of |
201 |
10 |
{Start, Length} -> Start + Length == byte_size(SubHost); |
202 |
19 |
_ -> false |
203 |
|
end |
204 |
|
end, |
205 |
|
|
206 |
10 |
GlobalHost = opt(global_host), |
207 |
10 |
case lists:filter(IsSubhostOf, ?MYHOSTS) of |
208 |
10 |
[GlobalHost] -> insert_for_domain(SubHost, IsHidden); |
209 |
:-( |
_ -> ok |
210 |
|
end. |
211 |
|
|
212 |
|
-spec unregister_subhost(any(), SubHost :: binary()) -> ok. |
213 |
|
unregister_subhost(_, SubHost) -> |
214 |
12 |
delete_for_domain(SubHost). |
215 |
|
|
216 |
|
%%-------------------------------------------------------------------- |
217 |
|
%% Helpers |
218 |
|
%%-------------------------------------------------------------------- |
219 |
|
|
220 |
|
hooks(HostType) -> |
221 |
42 |
[{register_subhost, global, ?MODULE, register_subhost, 90}, |
222 |
|
{unregister_subhost, global, ?MODULE, unregister_subhost, 90}, |
223 |
|
{packet_to_component, global, ?MODULE, packet_to_component, 90}, |
224 |
|
{sm_register_connection_hook, HostType, ?MODULE, session_opened, 90}, |
225 |
|
{sm_remove_connection_hook, HostType, ?MODULE, session_closed, 90}]. |
226 |
|
|
227 |
|
-spec normalize_jid(jid:ljid()) -> [binary()]. |
228 |
|
normalize_jid({_, _, _} = FullJid) -> |
229 |
189 |
case jid:to_bare(FullJid) of |
230 |
5 |
FullJid -> [jid:to_binary(FullJid)]; |
231 |
184 |
BareJid -> [jid:to_binary(FullJid), jid:to_binary(BareJid)] |
232 |
|
end. |
233 |
|
|
234 |
|
-spec opt(Key :: atom()) -> term(). |
235 |
|
opt(Key) -> |
236 |
133 |
mod_global_distrib_utils:opt(?MODULE, Key). |
237 |
|
|
238 |
|
-spec get_session(Key :: binary()) -> {ok, term()} | error. |
239 |
|
get_session(Key) -> |
240 |
81 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MAPPING_CACHE_MISSES, 1), |
241 |
81 |
mod_global_distrib_mapping_backend:get_session(Key). |
242 |
|
|
243 |
|
-spec put_session(Key :: binary()) -> ok. |
244 |
|
put_session(Key) -> |
245 |
150 |
?LOG_DEBUG(#{what => gd_mapper_put_session, key => Key}), |
246 |
150 |
mod_global_distrib_mapping_backend:put_session(Key). |
247 |
|
|
248 |
|
-spec delete_session(Key :: binary()) -> ok. |
249 |
|
delete_session(Key) -> |
250 |
152 |
?LOG_DEBUG(#{what => gd_mapper_delete_session, key => Key}), |
251 |
152 |
mod_global_distrib_mapping_backend:delete_session(Key). |
252 |
|
|
253 |
|
-spec get_domain(Key :: binary()) -> {ok, term()} | error. |
254 |
|
get_domain(Key) -> |
255 |
17 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_MAPPING_CACHE_MISSES, 1), |
256 |
17 |
mod_global_distrib_mapping_backend:get_domain(Key). |
257 |
|
|
258 |
|
-spec put_domain(Key :: binary(), IsHidden :: boolean()) -> ok. |
259 |
|
put_domain(Key, IsHidden) -> |
260 |
10 |
mod_global_distrib_mapping_backend:put_domain(Key, IsHidden). |
261 |
|
|
262 |
|
-spec delete_domain(Key :: binary()) -> ok. |
263 |
|
delete_domain(Key) -> |
264 |
12 |
mod_global_distrib_mapping_backend:delete_domain(Key). |
265 |
|
|
266 |
|
-spec do_insert_for_jid(jid:jid() | jid:ljid(), Host :: jid:lserver(), |
267 |
|
PutSession :: fun((binary()) -> ok | error)) -> ok. |
268 |
|
do_insert_for_jid(#jid{} = Jid, Host, PutSession) -> |
269 |
84 |
do_insert_for_jid(jid:to_lower(Jid), Host, PutSession); |
270 |
|
do_insert_for_jid({_, _, _} = Jid, Host, PutSession) -> |
271 |
84 |
lists:foreach( |
272 |
|
fun(BinJid) -> |
273 |
167 |
ets_cache:update(?JID_TAB, BinJid, {ok, Host}, fun() -> PutSession(BinJid) end) |
274 |
|
end, |
275 |
|
normalize_jid(Jid)). |
276 |
|
|
277 |
|
-spec do_insert_for_domain(Domain :: binary(), Host :: jid:lserver(), |
278 |
|
PutDomain :: fun((binary()) -> ok | error)) -> ok. |
279 |
|
do_insert_for_domain(Domain, Host, PutDomain) -> |
280 |
10 |
ets_cache:update(?DOMAIN_TAB, Domain, {ok, Host}, fun() -> PutDomain(Domain) end). |
281 |
|
|
282 |
|
-spec do_lookup_jid(jid:ljid()) -> {ok, Host :: jid:lserver()} | error. |
283 |
|
do_lookup_jid({_, _, _} = Jid) -> |
284 |
1099 |
BinJid = jid:to_binary(Jid), |
285 |
1099 |
LookupInDB = fun(BJid) -> fun() -> get_session(BJid) end end, |
286 |
1099 |
case ets_cache:lookup(?JID_TAB, BinJid, LookupInDB(BinJid)) of |
287 |
1054 |
{ok, _} = Result -> Result; |
288 |
|
Other -> |
289 |
45 |
case jid:to_bare(Jid) of |
290 |
6 |
Jid -> Other; |
291 |
39 |
BareJid -> ets_cache:lookup(?JID_TAB, BinJid, LookupInDB(jid:to_binary(BareJid))) |
292 |
|
end |
293 |
|
end. |