1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_bounce). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_mod). |
21 |
|
-behaviour(gen_server). |
22 |
|
-behaviour(mongoose_module_metrics). |
23 |
|
|
24 |
|
-include("mongoose.hrl"). |
25 |
|
-include("jlib.hrl"). |
26 |
|
-include("global_distrib_metrics.hrl"). |
27 |
|
|
28 |
|
-define(MESSAGE_STORE, mod_global_distrib_bounce_message_store). |
29 |
|
-define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target). |
30 |
|
|
31 |
|
-export([start_link/0, start/2, stop/1]). |
32 |
|
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]). |
33 |
|
-export([maybe_store_message/1, reroute_messages/4]). |
34 |
|
-export([bounce_queue_size/0]). |
35 |
|
|
36 |
|
-ignore_xref([bounce_queue_size/0, maybe_store_message/1, reroute_messages/4, start_link/0]). |
37 |
|
|
38 |
|
%%-------------------------------------------------------------------- |
39 |
|
%% gen_mod API |
40 |
|
%%-------------------------------------------------------------------- |
41 |
|
|
42 |
|
-spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any(). |
43 |
|
start(Host, Opts0) -> |
44 |
:-( |
ResendAfterMs = proplists:get_value(resend_after_ms, Opts0, 200), |
45 |
:-( |
ResendAfter = erlang:convert_time_unit(ResendAfterMs, millisecond, native), |
46 |
:-( |
Opts = [{resend_after, ResendAfter}, {max_retries, 4} | Opts0], |
47 |
:-( |
mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0). |
48 |
|
|
49 |
|
-spec stop(Host :: jid:lserver()) -> any(). |
50 |
|
stop(Host) -> |
51 |
:-( |
mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0). |
52 |
|
|
53 |
|
-spec start_link() -> {ok, pid()} | {error, any()}. |
54 |
|
start_link() -> |
55 |
:-( |
gen_server:start_link(?MODULE, [], []). |
56 |
|
|
57 |
|
%%-------------------------------------------------------------------- |
58 |
|
%% gen_server API |
59 |
|
%%-------------------------------------------------------------------- |
60 |
|
|
61 |
|
init(_) -> |
62 |
:-( |
self() ! resend, |
63 |
:-( |
{ok, no_state}. |
64 |
|
|
65 |
|
handle_info(resend, State) -> |
66 |
:-( |
erlang:send_after(100, self(), resend), |
67 |
:-( |
Now = erlang:monotonic_time(), |
68 |
:-( |
resend_messages(Now), |
69 |
:-( |
{noreply, State}. |
70 |
|
|
71 |
|
handle_cast(_Message, _State) -> |
72 |
:-( |
exit(bad_cast). |
73 |
|
|
74 |
|
handle_call(_Message, _From, _State) -> |
75 |
:-( |
exit(bad_call). |
76 |
|
|
77 |
|
code_change(_Version, State, _Extra) -> |
78 |
:-( |
{ok, State}. |
79 |
|
|
80 |
|
terminate(_Reason, _State) -> |
81 |
:-( |
ignore. |
82 |
|
|
83 |
|
%%-------------------------------------------------------------------- |
84 |
|
%% Hooks implementation |
85 |
|
%%-------------------------------------------------------------------- |
86 |
|
|
87 |
|
-spec maybe_store_message(drop) -> drop; |
88 |
|
({jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> |
89 |
|
drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}. |
90 |
:-( |
maybe_store_message(drop) -> drop; |
91 |
|
maybe_store_message({From, To, Acc0, Packet} = FPacket) -> |
92 |
:-( |
LocalHost = opt(local_host), |
93 |
:-( |
{ok, ID} = mod_global_distrib:find_metadata(Acc0, id), |
94 |
:-( |
case mod_global_distrib:get_metadata(Acc0, {bounce_ttl, LocalHost}, opt(max_retries)) of |
95 |
|
0 -> |
96 |
:-( |
?LOG_DEBUG(#{what => gd_skip_store_message, |
97 |
|
text => <<"Not storing global message">>, |
98 |
:-( |
gd_id => ID, acc => Acc0, bounce_ttl => 0}), |
99 |
:-( |
?LOG_IF(error, To#jid.luser == <<>>, |
100 |
|
#{what => gd_message_to_component_ttl_zero, |
101 |
:-( |
gd_id => ID, acc => Acc0}), |
102 |
:-( |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1), |
103 |
:-( |
FPacket; |
104 |
|
OldTTL -> |
105 |
:-( |
?LOG_DEBUG(#{what => gd_store_message, |
106 |
|
text => <<"Storing global message">>, |
107 |
|
gd_id => ID, acc => Acc0, bounce_ttl => OldTTL, |
108 |
:-( |
resend_after_ms => erlang:convert_time_unit(opt(resend_after), native, millisecond)}), |
109 |
:-( |
Acc = mod_global_distrib:put_metadata(Acc0, {bounce_ttl, LocalHost}, OldTTL - 1), |
110 |
:-( |
ResendAt = erlang:monotonic_time() + opt(resend_after), |
111 |
:-( |
do_insert_in_store(ResendAt, {From, To, Acc, Packet}), |
112 |
:-( |
drop |
113 |
|
end. |
114 |
|
|
115 |
|
-spec reroute_messages(SomeAcc :: mongoose_acc:t(), |
116 |
|
From :: jid:jid(), |
117 |
|
To :: jid:jid(), |
118 |
|
TargetHost :: binary()) -> mongoose_acc:t(). |
119 |
|
reroute_messages(Acc, From, To, TargetHost) -> |
120 |
:-( |
Key = get_index_key(From, To), |
121 |
:-( |
StoredMessages = |
122 |
|
lists:filtermap( |
123 |
|
fun({_, {ResendAt, FPacket}}) -> |
124 |
:-( |
case ets:take(?MESSAGE_STORE, ResendAt) of |
125 |
:-( |
[_] -> {true, FPacket}; |
126 |
:-( |
[] -> false |
127 |
|
end |
128 |
|
end, |
129 |
|
ets:take(?MS_BY_TARGET, Key)), |
130 |
:-( |
?LOG_IF(debug, StoredMessages =/= [], |
131 |
|
#{what => gd_route_stored, |
132 |
|
text => <<"Routing multiple previously stored messages">>, |
133 |
:-( |
stored_messages_length => length(StoredMessages), acc => Acc}), |
134 |
:-( |
lists:foreach(pa:bind(fun reroute_message/2, TargetHost), StoredMessages), |
135 |
:-( |
Acc. |
136 |
|
|
137 |
|
%%-------------------------------------------------------------------- |
138 |
|
%% API for metrics |
139 |
|
%%-------------------------------------------------------------------- |
140 |
|
|
141 |
|
-spec bounce_queue_size() -> non_neg_integer(). |
142 |
|
bounce_queue_size() -> |
143 |
:-( |
case ets:info(?MESSAGE_STORE, size) of |
144 |
:-( |
undefined -> 0; |
145 |
:-( |
Value -> Value |
146 |
|
end. |
147 |
|
|
148 |
|
%%-------------------------------------------------------------------- |
149 |
|
%% Helpers |
150 |
|
%%-------------------------------------------------------------------- |
151 |
|
|
152 |
|
-spec reroute_message(TargetHost :: binary(), |
153 |
|
FPacket :: {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> |
154 |
|
any(). |
155 |
|
reroute_message(TargetHost, {From, To, Acc0, Packet}) -> |
156 |
:-( |
Acc = mod_global_distrib:put_metadata(Acc0, target_host_override, TargetHost), |
157 |
:-( |
ejabberd_router:route(From, To, Acc, Packet). |
158 |
|
|
159 |
|
-spec start() -> any(). |
160 |
|
start() -> |
161 |
:-( |
Host = opt(global_host), |
162 |
:-( |
mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set), |
163 |
:-( |
mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag), |
164 |
:-( |
EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}],[value]}, |
165 |
:-( |
QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef}, |
166 |
:-( |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef), |
167 |
:-( |
ejabberd_hooks:add(mod_global_distrib_unknown_recipient, Host, ?MODULE, maybe_store_message, 80), |
168 |
:-( |
ejabberd_hooks:add(mod_global_distrib_known_recipient, Host, ?MODULE, reroute_messages, 80), |
169 |
:-( |
ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]}, |
170 |
:-( |
ejabberd_sup:start_child(ChildSpec). |
171 |
|
|
172 |
|
-spec stop() -> any(). |
173 |
|
stop() -> |
174 |
:-( |
Host = opt(global_host), |
175 |
:-( |
ejabberd_sup:stop_child(?MODULE), |
176 |
:-( |
ejabberd_hooks:delete(mod_global_distrib_known_recipient, Host, ?MODULE, reroute_messages, 80), |
177 |
:-( |
ejabberd_hooks:delete(mod_global_distrib_unknown_recipient, Host, ?MODULE, maybe_store_message, 80), |
178 |
:-( |
ets:delete(?MS_BY_TARGET), |
179 |
:-( |
ets:delete(?MESSAGE_STORE). |
180 |
|
|
181 |
|
add_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) -> |
182 |
:-( |
Key = get_index_key(From, To), |
183 |
:-( |
ets:insert(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}). |
184 |
|
|
185 |
|
delete_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) -> |
186 |
:-( |
Key = get_index_key(From, To), |
187 |
:-( |
ets:delete_object(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}). |
188 |
|
|
189 |
|
get_index_key(From, To) -> |
190 |
:-( |
{jid:to_lower(From), jid:to_lower(To)}. |
191 |
|
|
192 |
|
-spec do_insert_in_store(ResendAt :: integer(), {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> any(). |
193 |
|
do_insert_in_store(ResendAt, FPacket) -> |
194 |
:-( |
case ets:insert_new(?MESSAGE_STORE, {ResendAt, FPacket}) of |
195 |
:-( |
true -> add_index(ResendAt, FPacket); |
196 |
:-( |
false -> do_insert_in_store(ResendAt + 1, FPacket) |
197 |
|
end. |
198 |
|
|
199 |
|
-spec resend_messages(Now :: integer()) -> ok. |
200 |
|
resend_messages(Now) -> |
201 |
:-( |
case ets:first(?MESSAGE_STORE) of |
202 |
|
Key when is_integer(Key) andalso Key < Now -> |
203 |
:-( |
case ets:take(?MESSAGE_STORE, Key) of |
204 |
|
[{Key, {From, To, _Acc, _Packet} = FPacket}] -> |
205 |
:-( |
delete_index(Key, FPacket), |
206 |
:-( |
mod_global_distrib_mapping:clear_cache(To), |
207 |
:-( |
WorkerKey = mod_global_distrib_utils:recipient_to_worker_key(From, opt(global_host)), |
208 |
:-( |
Worker = mod_global_distrib_worker_sup:get_worker(WorkerKey), |
209 |
:-( |
gen_server:cast(Worker, {route, FPacket}); |
210 |
|
_ -> |
211 |
:-( |
ok |
212 |
|
end, |
213 |
:-( |
resend_messages(Now); |
214 |
|
_ -> |
215 |
:-( |
ok |
216 |
|
end. |
217 |
|
|
218 |
|
-spec opt(Key :: atom()) -> term(). |
219 |
|
opt(Key) -> |
220 |
:-( |
mod_global_distrib_utils:opt(?MODULE, Key). |