./ct_report/coverage/mod_global_distrib_bounce.COVER.html

1 %%==============================================================================
2 %% Copyright 2017 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16
17 -module(mod_global_distrib_bounce).
18 -author('konrad.zemek@erlang-solutions.com').
19
20 -behaviour(gen_mod).
21 -behaviour(gen_server).
22 -behaviour(mongoose_module_metrics).
23
24 -include("mongoose.hrl").
25 -include("jlib.hrl").
26 -include("global_distrib_metrics.hrl").
27
28 -define(MESSAGE_STORE, mod_global_distrib_bounce_message_store).
29 -define(MS_BY_TARGET, mod_global_distrib_bounce_message_store_by_target).
30
31 -export([start_link/0, start/2, stop/1, hooks/1, deps/2]).
32 -export([init/1, handle_info/2, handle_cast/2, handle_call/3, code_change/3, terminate/2]).
33 -export([maybe_store_message/3, reroute_messages/3]).
34 -export([bounce_queue_size/0]).
35
36 -ignore_xref([bounce_queue_size/0, start_link/0]).
37
38 %%--------------------------------------------------------------------
39 %% gen_mod API
40 %%--------------------------------------------------------------------
41
42 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> any().
43 start(_HostType, _Opts) ->
44 21 mod_global_distrib_utils:create_ets(?MESSAGE_STORE, ordered_set),
45 21 mod_global_distrib_utils:create_ets(?MS_BY_TARGET, bag),
46 21 EvalDef = {[{l, [{t, [value, {v, 'Value'}]}]}], [value]},
47 21 QueueSizeDef = {function, ?MODULE, bounce_queue_size, [], eval, EvalDef},
48 21 mongoose_metrics:ensure_metric(global, ?GLOBAL_DISTRIB_BOUNCE_QUEUE_SIZE, QueueSizeDef),
49 21 ChildSpec = {?MODULE, {?MODULE, start_link, []}, permanent, 1000, worker, [?MODULE]},
50 21 ejabberd_sup:start_child(ChildSpec).
51
52 -spec stop(mongooseim:host_type()) -> any().
53 stop(_HostType) ->
54 21 ejabberd_sup:stop_child(?MODULE),
55 21 ets:delete(?MS_BY_TARGET),
56 21 ets:delete(?MESSAGE_STORE).
57
58 -spec deps(mongooseim:host_type(), gen_mod:module_opts()) -> gen_mod_deps:deps().
59 deps(_HostType, Opts) ->
60 106 [{mod_global_distrib_utils, Opts, hard}].
61
62 hooks(HostType) ->
63 42 [{mod_global_distrib_unknown_recipient, HostType, fun ?MODULE:maybe_store_message/3, #{}, 80},
64 {mod_global_distrib_known_recipient, HostType, fun ?MODULE:reroute_messages/3, #{}, 80}].
65
66 -spec start_link() -> {ok, pid()} | {error, any()}.
67 start_link() ->
68 21 gen_server:start_link(?MODULE, [], []).
69
70 %%--------------------------------------------------------------------
71 %% gen_server API
72 %%--------------------------------------------------------------------
73
74 init(_) ->
75 21 self() ! resend,
76 21 {ok, no_state}.
77
78 handle_info(resend, State) ->
79 1232 erlang:send_after(100, self(), resend),
80 1232 Now = erlang:monotonic_time(),
81 1232 resend_messages(Now),
82 1232 {noreply, State}.
83
84 handle_cast(_Message, _State) ->
85
:-(
exit(bad_cast).
86
87 handle_call(_Message, _From, _State) ->
88
:-(
exit(bad_call).
89
90 code_change(_Version, State, _Extra) ->
91
:-(
{ok, State}.
92
93 terminate(_Reason, _State) ->
94
:-(
ignore.
95
96 %%--------------------------------------------------------------------
97 %% Hooks implementation
98 %%--------------------------------------------------------------------
99
100 -spec maybe_store_message(FPacket, Params, Extra) -> {ok, FPacket} | {stop, drop} when
101 FPacket :: mongoose_hooks:filter_packet_acc(),
102 Params :: map(),
103 Extra :: map().
104 maybe_store_message({From, To, Acc0, Packet} = FPacket, _, _) ->
105 54 LocalHost = opt(local_host),
106 54 {ok, ID} = mod_global_distrib:find_metadata(Acc0, id),
107 54 case mod_global_distrib:get_metadata(Acc0, {bounce_ttl, LocalHost},
108 opt([bounce, max_retries])) of
109 0 ->
110 8 ?LOG_DEBUG(#{what => gd_skip_store_message,
111 text => <<"Not storing global message">>,
112 8 gd_id => ID, acc => Acc0, bounce_ttl => 0}),
113 8 ?LOG_IF(error, To#jid.luser == <<>>,
114 #{what => gd_message_to_component_ttl_zero,
115
:-(
gd_id => ID, acc => Acc0}),
116 8 mongoose_metrics:update(global, ?GLOBAL_DISTRIB_STOP_TTL_ZERO, 1),
117 8 {ok, FPacket};
118 OldTTL ->
119 46 ResendAfterMs = opt([bounce, resend_after_ms]),
120 46 ?LOG_DEBUG(#{what => gd_store_message,
121 text => <<"Storing global message">>,
122 gd_id => ID, acc => Acc0, bounce_ttl => OldTTL,
123 46 resend_after_ms => ResendAfterMs}),
124 46 Acc = mod_global_distrib:put_metadata(Acc0, {bounce_ttl, LocalHost}, OldTTL - 1),
125 46 ResendAfter = erlang:convert_time_unit(ResendAfterMs, millisecond, native),
126 46 ResendAt = erlang:monotonic_time() + ResendAfter,
127 46 do_insert_in_store(ResendAt, {From, To, Acc, Packet}),
128 46 {stop, drop}
129 end.
130
131 -spec reroute_messages(Acc, Params, Extra) -> {ok, Acc} when
132 Acc :: mongoose_acc:t(),
133 Params :: #{from := jid:jid(), to := jid:jid(), target_host := binary()},
134 Extra :: map().
135 reroute_messages(Acc, #{from := From, to := To, target_host := TargetHost}, _) ->
136 747 Key = get_index_key(From, To),
137 747 StoredMessages =
138 lists:filtermap(
139 fun({_, {ResendAt, FPacket}}) ->
140 11 case ets:take(?MESSAGE_STORE, ResendAt) of
141 11 [_] -> {true, FPacket};
142
:-(
[] -> false
143 end
144 end,
145 ets:take(?MS_BY_TARGET, Key)),
146 747 ?LOG_IF(debug, StoredMessages =/= [],
147 #{what => gd_route_stored,
148 text => <<"Routing multiple previously stored messages">>,
149 3 stored_messages_length => length(StoredMessages), acc => Acc}),
150 747 lists:foreach(pa:bind(fun reroute_message/2, TargetHost), StoredMessages),
151 747 {ok, Acc}.
152
153 %%--------------------------------------------------------------------
154 %% API for metrics
155 %%--------------------------------------------------------------------
156
157 -spec bounce_queue_size() -> non_neg_integer().
158 bounce_queue_size() ->
159
:-(
case ets:info(?MESSAGE_STORE, size) of
160
:-(
undefined -> 0;
161
:-(
Value -> Value
162 end.
163
164 %%--------------------------------------------------------------------
165 %% Helpers
166 %%--------------------------------------------------------------------
167
168 -spec reroute_message(TargetHost :: binary(),
169 FPacket :: {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) ->
170 any().
171 reroute_message(TargetHost, {From, To, Acc0, Packet}) ->
172 11 Acc = mod_global_distrib:put_metadata(Acc0, target_host_override, TargetHost),
173 11 ejabberd_router:route(From, To, Acc, Packet).
174
175 add_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) ->
176 46 Key = get_index_key(From, To),
177 46 ets:insert(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}).
178
179 delete_index(ResendAt, {From, To, _Acc, _Packet} = FPacket) ->
180 33 Key = get_index_key(From, To),
181 33 ets:delete_object(?MS_BY_TARGET, {Key, {ResendAt, FPacket}}).
182
183 get_index_key(From, To) ->
184 826 {jid:to_lower(From), jid:to_lower(To)}.
185
186 -spec do_insert_in_store(ResendAt :: integer(),
187 {jid:jid(), jid:jid(), mongoose_acc:t(), exml:element()}) -> any().
188 do_insert_in_store(ResendAt, FPacket) ->
189 46 case ets:insert_new(?MESSAGE_STORE, {ResendAt, FPacket}) of
190 46 true -> add_index(ResendAt, FPacket);
191
:-(
false -> do_insert_in_store(ResendAt + 1, FPacket)
192 end.
193
194 -spec resend_messages(Now :: integer()) -> ok.
195 resend_messages(Now) ->
196 1265 case ets:first(?MESSAGE_STORE) of
197 Key when is_integer(Key) andalso Key < Now ->
198 33 case ets:take(?MESSAGE_STORE, Key) of
199 [{Key, {From, To, _Acc, _Packet} = FPacket}] ->
200 33 delete_index(Key, FPacket),
201 33 mod_global_distrib_mapping:clear_cache(To),
202 33 WorkerKey = mod_global_distrib_utils:recipient_to_worker_key(
203 From, opt(global_host)),
204 33 Worker = mod_global_distrib_worker_sup:get_worker(WorkerKey),
205 33 gen_server:cast(Worker, {route, FPacket});
206 _ ->
207
:-(
ok
208 end,
209 33 resend_messages(Now);
210 _ ->
211 1232 ok
212 end.
213
214 -spec opt(gen_mod:opt_key() | gen_mod:key_path()) -> gen_mod:opt_value().
215 opt(Key) ->
216 187 mod_global_distrib_utils:opt(?MODULE, Key).
Line Hits Source