1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2017 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mod_global_distrib_bounce). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
|
20 |
|
-behaviour(gen_mod). |
21 |
|
-behaviour(gen_server). |
22 |
|
-behaviour(mongoose_module_metrics). |
23 |
|
|
24 |
|
-include("mongoose.hrl"). |
25 |
|
-include("jlib.hrl"). |
26 |
|
-include("global_distrib_metrics.hrl"). |
27 |
|
|
28 |
|
-define(MESSAGE_STORE, mod_global_distrib_bounce_message_store). |
29 |
|
-define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target). |
30 |
|
|
31 |
|
-export([start_link/0, start/2, stop/1, hooks/1, deps/2]). |
32 |
|
-export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]). |
33 |
|
-export([maybe_store_message/3, reroute_messages/3]). |
34 |
|
-export([bounce_queue_size/0]). |
35 |
|
|
36 |
|
-ignore_xref([bounce_queue_size/0, start_link/0]). |
37 |
|
|
38 |
|
%%-------------------------------------------------------------------- |
39 |
|
%% gen_mod API |
40 |
|
%%-------------------------------------------------------------------- |
41 |
|
|
42 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any(). |
43 |
|
start(_HostType, _Opts) -> |
44 |
21 |
mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set), |
45 |
21 |
mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag), |
46 |
21 |
EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}], [value]}, |
47 |
21 |
QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef}, |
48 |
21 |
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef), |
49 |
21 |
ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]}, |
50 |
21 |
ejabberd_sup:start_child(ChildSpec). |
51 |
|
|
52 |
|
-spec stop(mongooseim:host_type()) -> any(). |
53 |
|
stop(_HostType) -> |
54 |
21 |
ejabberd_sup:stop_child(?MODULE), |
55 |
21 |
ets:delete(?MS_BY_TARGET), |
56 |
21 |
ets:delete(?MESSAGE_STORE). |
57 |
|
|
58 |
|
-spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps(). |
59 |
|
deps(_HostType, Opts) -> |
60 |
106 |
[{mod_global_distrib_utils, Opts, hard}]. |
61 |
|
|
62 |
|
hooks(HostType) -> |
63 |
42 |
[{mod_global_distrib_unknown_recipient, HostType, fun ?MODULE:maybe_store_message/3, #{}, 80}, |
64 |
|
{mod_global_distrib_known_recipient, HostType, fun ?MODULE:reroute_messages/3, #{}, 80}]. |
65 |
|
|
66 |
|
-spec start_link() -> {ok, pid()} | {error, any()}. |
67 |
|
start_link() -> |
68 |
21 |
gen_server:start_link(?MODULE, [], []). |
69 |
|
|
70 |
|
%%-------------------------------------------------------------------- |
71 |
|
%% gen_server API |
72 |
|
%%-------------------------------------------------------------------- |
73 |
|
|
74 |
|
init(_) -> |
75 |
21 |
self() ! resend, |
76 |
21 |
{ok, no_state}. |
77 |
|
|
78 |
|
handle_info(resend, State) -> |
79 |
1232 |
erlang:send_after(100, self(), resend), |
80 |
1232 |
Now = erlang:monotonic_time(), |
81 |
1232 |
resend_messages(Now), |
82 |
1232 |
{noreply, State}. |
83 |
|
|
84 |
|
handle_cast(_Message, _State) -> |
85 |
:-( |
exit(bad_cast). |
86 |
|
|
87 |
|
handle_call(_Message, _From, _State) -> |
88 |
:-( |
exit(bad_call). |
89 |
|
|
90 |
|
code_change(_Version, State, _Extra) -> |
91 |
:-( |
{ok, State}. |
92 |
|
|
93 |
|
terminate(_Reason, _State) -> |
94 |
:-( |
ignore. |
95 |
|
|
96 |
|
%%-------------------------------------------------------------------- |
97 |
|
%% Hooks implementation |
98 |
|
%%-------------------------------------------------------------------- |
99 |
|
|
100 |
|
-spec maybe_store_message(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when |
101 |
|
FPacket :: mongoose_hooks:filter_packet_acc(), |
102 |
|
Params :: map(), |
103 |
|
Extra :: map(). |
104 |
|
maybe_store_message({From, To, Acc0, Packet} = FPacket, _, _) -> |
105 |
54 |
LocalHost = opt(local_host), |
106 |
54 |
{ok, ID} = mod_global_distrib:find_metadata(Acc0, id), |
107 |
54 |
case mod_global_distrib:get_metadata(Acc0, {bounce_ttl, LocalHost}, |
108 |
|
opt([bounce, max_retries])) of |
109 |
|
0 -> |
110 |
8 |
?LOG_DEBUG(#{what => gd_skip_store_message, |
111 |
|
text => <<"Not storing global message">>, |
112 |
8 |
gd_id => ID, acc => Acc0, bounce_ttl => 0}), |
113 |
8 |
?LOG_IF(error, To#jid.luser == <<>>, |
114 |
|
#{what => gd_message_to_component_ttl_zero, |
115 |
:-( |
gd_id => ID, acc => Acc0}), |
116 |
8 |
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1), |
117 |
8 |
{ok, FPacket}; |
118 |
|
OldTTL -> |
119 |
46 |
ResendAfterMs = opt([bounce, resend_after_ms]), |
120 |
46 |
?LOG_DEBUG(#{what => gd_store_message, |
121 |
|
text => <<"Storing global message">>, |
122 |
|
gd_id => ID, acc => Acc0, bounce_ttl => OldTTL, |
123 |
46 |
resend_after_ms => ResendAfterMs}), |
124 |
46 |
Acc = mod_global_distrib:put_metadata(Acc0, {bounce_ttl, LocalHost}, OldTTL - 1), |
125 |
46 |
ResendAfter = erlang:convert_time_unit(ResendAfterMs, millisecond, native), |
126 |
46 |
ResendAt = erlang:monotonic_time() + ResendAfter, |
127 |
46 |
do_insert_in_store(ResendAt, {From, To, Acc, Packet}), |
128 |
46 |
{stop, drop} |
129 |
|
end. |
130 |
|
|
131 |
|
-spec reroute_messages(Acc, Params, Extra) -> {ok, Acc} when |
132 |
|
Acc :: mongoose_acc:t(), |
133 |
|
Params :: #{from := jid:jid(), to := jid:jid(), target_host := binary()}, |
134 |
|
Extra :: map(). |
135 |
|
reroute_messages(Acc, #{from := From, to := To, target_host := TargetHost}, _) -> |
136 |
747 |
Key = get_index_key(From, To), |
137 |
747 |
StoredMessages = |
138 |
|
lists:filtermap( |
139 |
|
fun({_, {ResendAt, FPacket}}) -> |
140 |
11 |
case ets:take(?MESSAGE_STORE, ResendAt) of |
141 |
11 |
[_] -> {true, FPacket}; |
142 |
:-( |
[] -> false |
143 |
|
end |
144 |
|
end, |
145 |
|
ets:take(?MS_BY_TARGET, Key)), |
146 |
747 |
?LOG_IF(debug, StoredMessages =/= [], |
147 |
|
#{what => gd_route_stored, |
148 |
|
text => <<"Routing multiple previously stored messages">>, |
149 |
3 |
stored_messages_length => length(StoredMessages), acc => Acc}), |
150 |
747 |
lists:foreach(pa:bind(fun reroute_message/2, TargetHost), StoredMessages), |
151 |
747 |
{ok, Acc}. |
152 |
|
|
153 |
|
%%-------------------------------------------------------------------- |
154 |
|
%% API for metrics |
155 |
|
%%-------------------------------------------------------------------- |
156 |
|
|
157 |
|
-spec bounce_queue_size() -> non_neg_integer(). |
158 |
|
bounce_queue_size() -> |
159 |
:-( |
case ets:info(?MESSAGE_STORE, size) of |
160 |
:-( |
undefined -> 0; |
161 |
:-( |
Value -> Value |
162 |
|
end. |
163 |
|
|
164 |
|
%%-------------------------------------------------------------------- |
165 |
|
%% Helpers |
166 |
|
%%-------------------------------------------------------------------- |
167 |
|
|
168 |
|
-spec reroute_message(TargetHost :: binary(), |
169 |
|
FPacket :: {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> |
170 |
|
any(). |
171 |
|
reroute_message(TargetHost, {From, To, Acc0, Packet}) -> |
172 |
11 |
Acc = mod_global_distrib:put_metadata(Acc0, target_host_override, TargetHost), |
173 |
11 |
ejabberd_router:route(From, To, Acc, Packet). |
174 |
|
|
175 |
|
add_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) -> |
176 |
46 |
Key = get_index_key(From, To), |
177 |
46 |
ets:insert(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}). |
178 |
|
|
179 |
|
delete_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) -> |
180 |
33 |
Key = get_index_key(From, To), |
181 |
33 |
ets:delete_object(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}). |
182 |
|
|
183 |
|
get_index_key(From, To) -> |
184 |
826 |
{jid:to_lower(From), jid:to_lower(To)}. |
185 |
|
|
186 |
|
-spec do_insert_in_store(ResendAt :: integer(), |
187 |
|
{jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> any(). |
188 |
|
do_insert_in_store(ResendAt, FPacket) -> |
189 |
46 |
case ets:insert_new(?MESSAGE_STORE, {ResendAt, FPacket}) of |
190 |
46 |
true -> add_index(ResendAt, FPacket); |
191 |
:-( |
false -> do_insert_in_store(ResendAt + 1, FPacket) |
192 |
|
end. |
193 |
|
|
194 |
|
-spec resend_messages(Now :: integer()) -> ok. |
195 |
|
resend_messages(Now) -> |
196 |
1265 |
case ets:first(?MESSAGE_STORE) of |
197 |
|
Key when is_integer(Key) andalso Key < Now -> |
198 |
33 |
case ets:take(?MESSAGE_STORE, Key) of |
199 |
|
[{Key, {From, To, _Acc, _Packet} = FPacket}] -> |
200 |
33 |
delete_index(Key, FPacket), |
201 |
33 |
mod_global_distrib_mapping:clear_cache(To), |
202 |
33 |
WorkerKey = mod_global_distrib_utils:recipient_to_worker_key( |
203 |
|
From, opt(global_host)), |
204 |
33 |
Worker = mod_global_distrib_worker_sup:get_worker(WorkerKey), |
205 |
33 |
gen_server:cast(Worker, {route, FPacket}); |
206 |
|
_ -> |
207 |
:-( |
ok |
208 |
|
end, |
209 |
33 |
resend_messages(Now); |
210 |
|
_ -> |
211 |
1232 |
ok |
212 |
|
end. |
213 |
|
|
214 |
|
-spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value(). |
215 |
|
opt(Key) -> |
216 |
187 |
mod_global_distrib_utils:opt(?MODULE, Key). |