./ct_report/coverage/mod_global_distrib_bounce.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_bounce).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_mod).
21 -behaviour(gen_server).
22 -behaviour(mongoose_module_metrics).
23
24 -include("mongoose.hrl").
25 -include("jlib.hrl").
26 -include("global_distrib_metrics.hrl").
27
28 -define(MESSAGE_STORE, mod_global_distrib_bounce_message_store).
29 -define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target).
30
31 -export([start_link/0, start/2, stop/1]).
32 -export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
33 -export([maybe_store_message/1, reroute_messages/4]).
34 -export([bounce_queue_size/0]).
35
36 -ignore_xref([bounce_queue_size/0, maybe_store_message/1, reroute_messages/4, start_link/0]).
37
38 %%--------------------------------------------------------------------
39 %% gen_mod API
40 %%--------------------------------------------------------------------
41
42 -spec start(Host :: jid:lserver(), Opts :: proplists:proplist()) -> any().
43 start(Host, Opts0) ->
44 21 ResendAfterMs = proplists:get_value(resend_after_ms, Opts0, 200),
45 21 ResendAfter = erlang:convert_time_unit(ResendAfterMs, millisecond, native),
46 21 Opts = [{resend_after, ResendAfter}, {max_retries, 4} | Opts0],
47 21 mod_global_distrib_utils:start(?MODULE, Host, Opts, fun start/0).
48
49 -spec stop(Host :: jid:lserver()) -> any().
50 stop(Host) ->
51 21 mod_global_distrib_utils:stop(?MODULE, Host, fun stop/0).
52
53 -spec start_link() -> {ok, pid()} | {error, any()}.
54 start_link() ->
55 21 gen_server:start_link(?MODULE, [], []).
56
57 %%--------------------------------------------------------------------
58 %% gen_server API
59 %%--------------------------------------------------------------------
60
61 init(_) ->
62 21 self() ! resend,
63 21 {ok, no_state}.
64
65 handle_info(resend, State) ->
66 2148 erlang:send_after(100, self(), resend),
67 2148 Now = erlang:monotonic_time(),
68 2148 resend_messages(Now),
69 2148 {noreply, State}.
70
71 handle_cast(_Message, _State) ->
72
:-(
exit(bad_cast).
73
74 handle_call(_Message, _From, _State) ->
75
:-(
exit(bad_call).
76
77 code_change(_Version, State, _Extra) ->
78
:-(
{ok, State}.
79
80 terminate(_Reason, _State) ->
81
:-(
ignore.
82
83 %%--------------------------------------------------------------------
84 %% Hooks implementation
85 %%--------------------------------------------------------------------
86
87 -spec maybe_store_message(drop) -> drop;
88 ({jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) ->
89 drop | {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}.
90
:-(
maybe_store_message(drop) -> drop;
91 maybe_store_message({From, To, Acc0, Packet} = FPacket) ->
92 39 LocalHost = opt(local_host),
93 39 {ok, ID} = mod_global_distrib:find_metadata(Acc0, id),
94 39 case mod_global_distrib:get_metadata(Acc0, {bounce_ttl, LocalHost}, opt(max_retries)) of
95 0 ->
96 5 ?LOG_DEBUG(#{what => gd_skip_store_message,
97 text => <<"Not storing global message">>,
98 5 gd_id => ID, acc => Acc0, bounce_ttl => 0}),
99 5 ?LOG_IF(error, To#jid.luser == <<>>,
100 #{what => gd_message_to_component_ttl_zero,
101
:-(
gd_id => ID, acc => Acc0}),
102 5 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
103 5 FPacket;
104 OldTTL ->
105 34 ?LOG_DEBUG(#{what => gd_store_message,
106 text => <<"Storing global message">>,
107 gd_id => ID, acc => Acc0, bounce_ttl => OldTTL,
108 34 resend_after_ms => erlang:convert_time_unit(opt(resend_after), native, millisecond)}),
109 34 Acc = mod_global_distrib:put_metadata(Acc0, {bounce_ttl, LocalHost}, OldTTL - 1),
110 34 ResendAt = erlang:monotonic_time() + opt(resend_after),
111 34 do_insert_in_store(ResendAt, {From, To, Acc, Packet}),
112 34 drop
113 end.
114
115 -spec reroute_messages(SomeAcc :: mongoose_acc:t(),
116 From :: jid:jid(),
117 To :: jid:jid(),
118 TargetHost :: binary()) -> mongoose_acc:t().
119 reroute_messages(Acc, From, To, TargetHost) ->
120 742 Key = get_index_key(From, To),
121 742 StoredMessages =
122 lists:filtermap(
123 fun({_, {ResendAt, FPacket}}) ->
124 10 case ets:take(?MESSAGE_STORE, ResendAt) of
125 10 [_] -> {true, FPacket};
126
:-(
[] -> false
127 end
128 end,
129 ets:take(?MS_BY_TARGET, Key)),
130 742 ?LOG_IF(debug, StoredMessages =/= [],
131 #{what => gd_route_stored,
132 text => <<"Routing multiple previously stored messages">>,
133 3 stored_messages_length => length(StoredMessages), acc => Acc}),
134 742 lists:foreach(pa:bind(fun reroute_message/2, TargetHost), StoredMessages),
135 742 Acc.
136
137 %%--------------------------------------------------------------------
138 %% API for metrics
139 %%--------------------------------------------------------------------
140
141 -spec bounce_queue_size() -> non_neg_integer().
142 bounce_queue_size() ->
143
:-(
case ets:info(?MESSAGE_STORE, size) of
144
:-(
undefined -> 0;
145
:-(
Value -> Value
146 end.
147
148 %%--------------------------------------------------------------------
149 %% Helpers
150 %%--------------------------------------------------------------------
151
152 -spec reroute_message(TargetHost :: binary(),
153 FPacket :: {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) ->
154 any().
155 reroute_message(TargetHost, {From, To, Acc0, Packet}) ->
156 10 Acc = mod_global_distrib:put_metadata(Acc0, target_host_override, TargetHost),
157 10 ejabberd_router:route(From, To, Acc, Packet).
158
159 -spec start() -> any().
160 start() ->
161 21 Host = opt(global_host),
162 21 mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set),
163 21 mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag),
164 21 EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}],[value]},
165 21 QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef},
166 21 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef),
167 21 ejabberd_hooks:add(mod_global_distrib_unknown_recipient, Host, ?MODULE, maybe_store_message, 80),
168 21 ejabberd_hooks:add(mod_global_distrib_known_recipient, Host, ?MODULE, reroute_messages, 80),
169 21 ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]},
170 21 ejabberd_sup:start_child(ChildSpec).
171
172 -spec stop() -> any().
173 stop() ->
174 21 Host = opt(global_host),
175 21 ejabberd_sup:stop_child(?MODULE),
176 21 ejabberd_hooks:delete(mod_global_distrib_known_recipient, Host, ?MODULE, reroute_messages, 80),
177 21 ejabberd_hooks:delete(mod_global_distrib_unknown_recipient, Host, ?MODULE, maybe_store_message, 80),
178 21 ets:delete(?MS_BY_TARGET),
179 21 ets:delete(?MESSAGE_STORE).
180
181 add_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) ->
182 34 Key = get_index_key(From, To),
183 34 ets:insert(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}).
184
185 delete_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) ->
186 23 Key = get_index_key(From, To),
187 23 ets:delete_object(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}).
188
189 get_index_key(From, To) ->
190 799 {jid:to_lower(From), jid:to_lower(To)}.
191
192 -spec do_insert_in_store(ResendAt :: integer(), {jid:jid(), jid:jid(), mongoose_acc:t(), exml:packet()}) -> any().
193 do_insert_in_store(ResendAt, FPacket) ->
194 34 case ets:insert_new(?MESSAGE_STORE, {ResendAt, FPacket}) of
195 34 true -> add_index(ResendAt, FPacket);
196
:-(
false -> do_insert_in_store(ResendAt + 1, FPacket)
197 end.
198
199 -spec resend_messages(Now :: integer()) -> ok.
200 resend_messages(Now) ->
201 2171 case ets:first(?MESSAGE_STORE) of
202 Key when is_integer(Key) andalso Key < Now ->
203 23 case ets:take(?MESSAGE_STORE, Key) of
204 [{Key, {From, To, _Acc, _Packet} = FPacket}] ->
205 23 delete_index(Key, FPacket),
206 23 mod_global_distrib_mapping:clear_cache(To),
207 23 WorkerKey = mod_global_distrib_utils:recipient_to_worker_key(From, opt(global_host)),
208 23 Worker = mod_global_distrib_worker_sup:get_worker(WorkerKey),
209 23 gen_server:cast(Worker, {route, FPacket});
210 _ ->
211
:-(
ok
212 end,
213 23 resend_messages(Now);
214 _ ->
215 2148 ok
216 end.
217
218 -spec opt(Key :: atom()) -> term().
219 opt(Key) ->
220 177 mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source