1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_bounce). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_mod). |
21 |
|
-behaviour(gen_server). |
22 |
|
-behaviour(mongoose_module_metrics). |
23 |
|
|
24 |
|
-include("mongoose.hrl"). |
25 |
|
-include("jlib.hrl"). |
26 |
|
-include("global_distrib_metrics.hrl"). |
27 |
|
|
28 |
|
-define(MESSAGE_STORE, mod_global_distrib_bounce_message_store). |
29 |
|
-define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target). |
30 |
|
|
31 |
|
-export([start_link/0, start/2, stop/1, deps/2]). |
32 |
|
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]). |
33 |
|
-export([maybe_store_message/1, reroute_messages/4]). |
34 |
|
-export([bounce_queue_size/0]). |
35 |
|
|
36 |
|
-ignore_xref([bounce_queue_size/0, maybe_store_message/1, reroute_messages/4, start_link/0]). |
37 |
|
|
38 |
|
%%-------------------------------------------------------------------- |
39 |
|
%% gen_mod API |
40 |
|
%%-------------------------------------------------------------------- |
41 |
|
|
42 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
43 |
|
start(HostType, _Opts) -> |
44 |
21 |
mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set), |
45 |
21 |
mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag), |
46 |
21 |
EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}], [value]}, |
47 |
21 |
QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef}, |
48 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef), |
49 |
21 |
ejabberd_hooks:add(hooks(HostType)), |
50 |
21 |
ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]}, |
51 |
21 |
ejabberd_sup:start_child(ChildSpec). |
52 |
|
|
53 |
|
-spec stop(mongooseim:host_type()) -> any(). |
54 |
|
stop(HostType) -> |
55 |
21 |
ejabberd_sup:stop_child(?MODULE), |
56 |
21 |
ejabberd_hooks:delete(hooks(HostType)), |
57 |
21 |
ets:delete(?MS_BY_TARGET), |
58 |
21 |
ets:delete(?MESSAGE_STORE). |
59 |
|
|
60 |
|
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps(). |
61 |
|
deps(_HostType, Opts) -> |
62 |
106 |
[{mod_global_distrib_utils, Opts, hard}]. |
63 |
|
|
64 |
|
hooks(HostType) -> |
65 |
42 |
[{mod_global_distrib_unknown_recipient, HostType, ?MODULE, maybe_store_message, 80}, |
66 |
|
{mod_global_distrib_known_recipient, HostType, ?MODULE, reroute_messages, 80}]. |
67 |
|
|
68 |
|
-spec start_link() -> {ok, pid()} | {error, any()}. |
69 |
|
start_link() -> |
70 |
21 |
gen_server:start_link(?MODULE, [], []). |
71 |
|
|
72 |
|
%%-------------------------------------------------------------------- |
73 |
|
%% gen_server API |
74 |
|
%%-------------------------------------------------------------------- |
75 |
|
|
76 |
|
init(_) -> |
77 |
21 |
self() ! resend, |
78 |
21 |
{ok, no_state}. |
79 |
|
|
80 |
|
handle_info(resend, State) -> |
81 |
2068 |
erlang:send_after(100, self(), resend), |
82 |
2068 |
Now = erlang:monotonic_time(), |
83 |
2068 |
resend_messages(Now), |
84 |
2068 |
{noreply, State}. |
85 |
|
|
86 |
|
handle_cast(_Message, _State) -> |
87 |
:-( |
exit(bad_cast). |
88 |
|
|
89 |
|
handle_call(_Message, _From, _State) -> |
90 |
:-( |
exit(bad_call). |
91 |
|
|
92 |
|
code_change(_Version, State, _Extra) -> |
93 |
:-( |
{ok, State}. |
94 |
|
|
95 |
|
terminate(_Reason, _State) -> |
96 |
:-( |
ignore. |
97 |
|
|
98 |
|
%%-------------------------------------------------------------------- |
99 |
|
%% Hooks implementation |
100 |
|
%%-------------------------------------------------------------------- |
101 |
|
|
102 |
|
-spec maybe_store_message(drop) -> drop; |
103 |
|
({jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> |
104 |
|
drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}. |
105 |
:-( |
maybe_store_message(drop) -> drop; |
106 |
|
maybe_store_message({From, To, Acc0, Packet} = FPacket) -> |
107 |
56 |
LocalHost = opt(local_host), |
108 |
56 |
{ok, ID} = mod_global_distrib:find_metadata(Acc0, id), |
109 |
56 |
case mod_global_distrib:get_metadata(Acc0, {bounce_ttl, LocalHost}, |
110 |
|
opt([bounce, max_retries])) of |
111 |
|
0 -> |
112 |
8 |
?LOG_DEBUG(#{what => gd_skip_store_message, |
113 |
|
text => <<"Not storing global message">>, |
114 |
8 |
gd_id => ID, acc => Acc0, bounce_ttl => 0}), |
115 |
8 |
?LOG_IF(error, To#jid.luser == <<>>, |
116 |
|
#{what => gd_message_to_component_ttl_zero, |
117 |
:-( |
gd_id => ID, acc => Acc0}), |
118 |
8 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1), |
119 |
8 |
FPacket; |
120 |
|
OldTTL -> |
121 |
48 |
ResendAfterMs = opt([bounce, resend_after_ms]), |
122 |
48 |
?LOG_DEBUG(#{what => gd_store_message, |
123 |
|
text => <<"Storing global message">>, |
124 |
|
gd_id => ID, acc => Acc0, bounce_ttl => OldTTL, |
125 |
48 |
resend_after_ms => ResendAfterMs}), |
126 |
48 |
Acc = mod_global_distrib:put_metadata(Acc0, {bounce_ttl, LocalHost}, OldTTL - 1), |
127 |
48 |
ResendAfter = erlang:convert_time_unit(ResendAfterMs, millisecond, native), |
128 |
48 |
ResendAt = erlang:monotonic_time() + ResendAfter, |
129 |
48 |
do_insert_in_store(ResendAt, {From, To, Acc, Packet}), |
130 |
48 |
drop |
131 |
|
end. |
132 |
|
|
133 |
|
-spec reroute_messages(SomeAcc :: mongoose_acc:t(), |
134 |
|
From :: jid:jid(), |
135 |
|
To :: jid:jid(), |
136 |
|
TargetHost :: binary()) -> mongoose_acc:t(). |
137 |
|
reroute_messages(Acc, From, To, TargetHost) -> |
138 |
745 |
Key = get_index_key(From, To), |
139 |
745 |
StoredMessages = |
140 |
|
lists:filtermap( |
141 |
|
fun({_, {ResendAt, FPacket}}) -> |
142 |
12 |
case ets:take(?MESSAGE_STORE, ResendAt) of |
143 |
12 |
[_] -> {true, FPacket}; |
144 |
:-( |
[] -> false |
145 |
|
end |
146 |
|
end, |
147 |
|
ets:take(?MS_BY_TARGET, Key)), |
148 |
745 |
?LOG_IF(debug, StoredMessages =/= [], |
149 |
|
#{what => gd_route_stored, |
150 |
|
text => <<"Routing multiple previously stored messages">>, |
151 |
2 |
stored_messages_length => length(StoredMessages), acc => Acc}), |
152 |
745 |
lists:foreach(pa:bind(fun reroute_message/2, TargetHost), StoredMessages), |
153 |
745 |
Acc. |
154 |
|
|
155 |
|
%%-------------------------------------------------------------------- |
156 |
|
%% API for metrics |
157 |
|
%%-------------------------------------------------------------------- |
158 |
|
|
159 |
|
-spec bounce_queue_size() -> non_neg_integer(). |
160 |
|
bounce_queue_size() -> |
161 |
:-( |
case ets:info(?MESSAGE_STORE, size) of |
162 |
:-( |
undefined -> 0; |
163 |
:-( |
Value -> Value |
164 |
|
end. |
165 |
|
|
166 |
|
%%-------------------------------------------------------------------- |
167 |
|
%% Helpers |
168 |
|
%%-------------------------------------------------------------------- |
169 |
|
|
170 |
|
-spec reroute_message(TargetHost :: binary(), |
171 |
|
FPacket :: {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> |
172 |
|
any(). |
173 |
|
reroute_message(TargetHost, {From, To, Acc0, Packet}) -> |
174 |
12 |
Acc = mod_global_distrib:put_metadata(Acc0, target_host_override, TargetHost), |
175 |
12 |
ejabberd_router:route(From, To, Acc, Packet). |
176 |
|
|
177 |
|
add_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) -> |
178 |
48 |
Key = get_index_key(From, To), |
179 |
48 |
ets:insert(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}). |
180 |
|
|
181 |
|
delete_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) -> |
182 |
33 |
Key = get_index_key(From, To), |
183 |
33 |
ets:delete_object(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}). |
184 |
|
|
185 |
|
get_index_key(From, To) -> |
186 |
826 |
{jid:to_lower(From), jid:to_lower(To)}. |
187 |
|
|
188 |
|
-spec do_insert_in_store(ResendAt :: integer(), |
189 |
|
{jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> any(). |
190 |
|
do_insert_in_store(ResendAt, FPacket) -> |
191 |
48 |
case ets:insert_new(?MESSAGE_STORE, {ResendAt, FPacket}) of |
192 |
48 |
true -> add_index(ResendAt, FPacket); |
193 |
:-( |
false -> do_insert_in_store(ResendAt + 1, FPacket) |
194 |
|
end. |
195 |
|
|
196 |
|
-spec resend_messages(Now :: integer()) -> ok. |
197 |
|
resend_messages(Now) -> |
198 |
2101 |
case ets:first(?MESSAGE_STORE) of |
199 |
|
Key when is_integer(Key) andalso Key < Now -> |
200 |
33 |
case ets:take(?MESSAGE_STORE, Key) of |
201 |
|
[{Key, {From, To, _Acc, _Packet} = FPacket}] -> |
202 |
33 |
delete_index(Key, FPacket), |
203 |
33 |
mod_global_distrib_mapping:clear_cache(To), |
204 |
33 |
WorkerKey = mod_global_distrib_utils:recipient_to_worker_key( |
205 |
|
From, opt(global_host)), |
206 |
33 |
Worker = mod_global_distrib_worker_sup:get_worker(WorkerKey), |
207 |
33 |
gen_server:cast(Worker, {route, FPacket}); |
208 |
|
_ -> |
209 |
:-( |
ok |
210 |
|
end, |
211 |
33 |
resend_messages(Now); |
212 |
|
_ -> |
213 |
2068 |
ok |
214 |
|
end. |
215 |
|
|
216 |
|
-spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value(). |
217 |
|
opt(Key) -> |
218 |
193 |
mod_global_distrib_utils:opt(?MODULE, Key). |