./ct_report/coverage/mod_global_distrib_bounce.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_bounce).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_mod).
21 -behaviour(gen_server).
22 -behaviour(mongoose_module_metrics).
23
24 -include("mongoose.hrl").
25 -include("jlib.hrl").
26 -include("global_distrib_metrics.hrl").
27
28 -define(MESSAGE_STORE, mod_global_distrib_bounce_message_store).
29 -define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target).
30
31 -export([start_link/0, start/2, stop/1, deps/2]).
32 -export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
33 -export([maybe_store_message/1, reroute_messages/4]).
34 -export([bounce_queue_size/0]).
35
36 -ignore_xref([bounce_queue_size/0, maybe_store_message/1, reroute_messages/4, start_link/0]).
37
38 %%--------------------------------------------------------------------
39 %% gen_mod API
40 %%--------------------------------------------------------------------
41
42 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
43 start(HostType, _Opts) ->
44
:-(
mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set),
45
:-(
mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag),
46
:-(
EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}], [value]},
47
:-(
QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef},
48
:-(
mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef),
49
:-(
ejabberd_hooks:add(hooks(HostType)),
50
:-(
ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]},
51
:-(
ejabberd_sup:start_child(ChildSpec).
52
53 -spec stop(mongooseim:host_type()) -> any().
54 stop(HostType) ->
55
:-(
ejabberd_sup:stop_child(?MODULE),
56
:-(
ejabberd_hooks:delete(hooks(HostType)),
57
:-(
ets:delete(?MS_BY_TARGET),
58
:-(
ets:delete(?MESSAGE_STORE).
59
60 -spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
61 deps(_HostType, Opts) ->
62
:-(
[{mod_global_distrib_utils, Opts, hard}].
63
64 hooks(HostType) ->
65
:-(
[{mod_global_distrib_unknown_recipient, HostType, ?MODULE, maybe_store_message, 80},
66 {mod_global_distrib_known_recipient, HostType, ?MODULE, reroute_messages, 80}].
67
68 -spec start_link() -> {ok, pid()} | {error, any()}.
69 start_link() ->
70
:-(
gen_server:start_link(?MODULE, [], []).
71
72 %%--------------------------------------------------------------------
73 %% gen_server API
74 %%--------------------------------------------------------------------
75
76 init(_) ->
77
:-(
self() ! resend,
78
:-(
{ok, no_state}.
79
80 handle_info(resend, State) ->
81
:-(
erlang:send_after(100, self(), resend),
82
:-(
Now = erlang:monotonic_time(),
83
:-(
resend_messages(Now),
84
:-(
{noreply, State}.
85
86 handle_cast(_Message, _State) ->
87
:-(
exit(bad_cast).
88
89 handle_call(_Message, _From, _State) ->
90
:-(
exit(bad_call).
91
92 code_change(_Version, State, _Extra) ->
93
:-(
{ok, State}.
94
95 terminate(_Reason, _State) ->
96
:-(
ignore.
97
98 %%--------------------------------------------------------------------
99 %% Hooks implementation
100 %%--------------------------------------------------------------------
101
102 -spec maybe_store_message(drop) -> drop;
103 ({jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) ->
104 drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}.
105
:-(
maybe_store_message(drop) -> drop;
106 maybe_store_message({From, To, Acc0, Packet} = FPacket) ->
107
:-(
LocalHost = opt(local_host),
108
:-(
{ok, ID} = mod_global_distrib:find_metadata(Acc0, id),
109
:-(
case mod_global_distrib:get_metadata(Acc0, {bounce_ttl, LocalHost},
110 opt([bounce, max_retries])) of
111 0 ->
112
:-(
?LOG_DEBUG(#{what => gd_skip_store_message,
113 text => <<"Not storing global message">>,
114
:-(
gd_id => ID, acc => Acc0, bounce_ttl => 0}),
115
:-(
?LOG_IF(error, To#jid.luser == <<>>,
116 #{what => gd_message_to_component_ttl_zero,
117
:-(
gd_id => ID, acc => Acc0}),
118
:-(
mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
119
:-(
FPacket;
120 OldTTL ->
121
:-(
ResendAfterMs = opt([bounce, resend_after_ms]),
122
:-(
?LOG_DEBUG(#{what => gd_store_message,
123 text => <<"Storing global message">>,
124 gd_id => ID, acc => Acc0, bounce_ttl => OldTTL,
125
:-(
resend_after_ms => ResendAfterMs}),
126
:-(
Acc = mod_global_distrib:put_metadata(Acc0, {bounce_ttl, LocalHost}, OldTTL - 1),
127
:-(
ResendAfter = erlang:convert_time_unit(ResendAfterMs, millisecond, native),
128
:-(
ResendAt = erlang:monotonic_time() + ResendAfter,
129
:-(
do_insert_in_store(ResendAt, {From, To, Acc, Packet}),
130
:-(
drop
131 end.
132
133 -spec reroute_messages(SomeAcc :: mongoose_acc:t(),
134 From :: jid:jid(),
135 To :: jid:jid(),
136 TargetHost :: binary()) -> mongoose_acc:t().
137 reroute_messages(Acc, From, To, TargetHost) ->
138
:-(
Key = get_index_key(From, To),
139
:-(
StoredMessages =
140 lists:filtermap(
141 fun({_, {ResendAt, FPacket}}) ->
142
:-(
case ets:take(?MESSAGE_STORE, ResendAt) of
143
:-(
[_] -> {true, FPacket};
144
:-(
[] -> false
145 end
146 end,
147 ets:take(?MS_BY_TARGET, Key)),
148
:-(
?LOG_IF(debug, StoredMessages =/= [],
149 #{what => gd_route_stored,
150 text => <<"Routing multiple previously stored messages">>,
151
:-(
stored_messages_length => length(StoredMessages), acc => Acc}),
152
:-(
lists:foreach(pa:bind(fun reroute_message/2, TargetHost), StoredMessages),
153
:-(
Acc.
154
155 %%--------------------------------------------------------------------
156 %% API for metrics
157 %%--------------------------------------------------------------------
158
159 -spec bounce_queue_size() -> non_neg_integer().
160 bounce_queue_size() ->
161
:-(
case ets:info(?MESSAGE_STORE, size) of
162
:-(
undefined -> 0;
163
:-(
Value -> Value
164 end.
165
166 %%--------------------------------------------------------------------
167 %% Helpers
168 %%--------------------------------------------------------------------
169
170 -spec reroute_message(TargetHost :: binary(),
171 FPacket :: {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) ->
172 any().
173 reroute_message(TargetHost, {From, To, Acc0, Packet}) ->
174
:-(
Acc = mod_global_distrib:put_metadata(Acc0, target_host_override, TargetHost),
175
:-(
ejabberd_router:route(From, To, Acc, Packet).
176
177 add_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) ->
178
:-(
Key = get_index_key(From, To),
179
:-(
ets:insert(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}).
180
181 delete_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) ->
182
:-(
Key = get_index_key(From, To),
183
:-(
ets:delete_object(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}).
184
185 get_index_key(From, To) ->
186
:-(
{jid:to_lower(From), jid:to_lower(To)}.
187
188 -spec do_insert_in_store(ResendAt :: integer(),
189 {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> any().
190 do_insert_in_store(ResendAt, FPacket) ->
191
:-(
case ets:insert_new(?MESSAGE_STORE, {ResendAt, FPacket}) of
192
:-(
true -> add_index(ResendAt, FPacket);
193
:-(
false -> do_insert_in_store(ResendAt + 1, FPacket)
194 end.
195
196 -spec resend_messages(Now :: integer()) -> ok.
197 resend_messages(Now) ->
198
:-(
case ets:first(?MESSAGE_STORE) of
199 Key when is_integer(Key) andalso Key < Now ->
200
:-(
case ets:take(?MESSAGE_STORE, Key) of
201 [{Key, {From, To, _Acc, _Packet} = FPacket}] ->
202
:-(
delete_index(Key, FPacket),
203
:-(
mod_global_distrib_mapping:clear_cache(To),
204
:-(
WorkerKey = mod_global_distrib_utils:recipient_to_worker_key(
205 From, opt(global_host)),
206
:-(
Worker = mod_global_distrib_worker_sup:get_worker(WorkerKey),
207
:-(
gen_server:cast(Worker, {route, FPacket});
208 _ ->
209
:-(
ok
210 end,
211
:-(
resend_messages(Now);
212 _ ->
213
:-(
ok
214 end.
215
216 -spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value().
217 opt(Key) ->
218
:-(
mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source