1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%% |
16 |
|
%% @doc |
17 |
|
%% Generic RabbitMQ worker. The module is responsible for interacting with |
18 |
|
%% RabbitMQ server. Worker holds its own AMQP connection and channel in its |
19 |
|
%% state. |
20 |
|
%% @end |
21 |
|
%%============================================================================== |
22 |
|
|
23 |
|
-module(mongoose_rabbit_worker). |
24 |
|
-author('kacper.mentel@erlang-solutions.com'). |
25 |
|
|
26 |
|
-include("mongoose.hrl"). |
27 |
|
|
28 |
|
-behaviour(gen_server). |
29 |
|
|
30 |
|
-export([start_link/0, list_metrics/1]). |
31 |
|
|
32 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
33 |
|
terminate/2]). |
34 |
|
|
35 |
|
-ignore_xref([start_link/0]). |
36 |
|
|
37 |
|
-type state() :: #{amqp_client_opts := mongoose_amqp:network_params(), |
38 |
|
connection := pid(), |
39 |
|
channel := pid(), |
40 |
|
host := binary(), |
41 |
|
pool_tag := atom(), |
42 |
|
confirms := boolean(), |
43 |
|
max_queue_len := non_neg_integer() | infinity}. |
44 |
|
|
45 |
|
-type worker_opts() :: state(). |
46 |
|
|
47 |
|
%%%=================================================================== |
48 |
|
%%% Metrics |
49 |
|
%%%=================================================================== |
50 |
|
|
51 |
|
-define(CONNECTIONS_ACTIVE_METRIC(Tag), [backends, Tag, connections_active]). |
52 |
|
-define(CONNECTIONS_OPENED_METRIC(Tag), [backends, Tag, connections_opened]). |
53 |
|
-define(CONNECTIONS_CLOSED_METRIC(Tag), [backends, Tag, connections_closed]). |
54 |
|
-define(CONNECTIONS_FAILED_METRIC(Tag), [backends, Tag, connections_failed]). |
55 |
|
-define(MESSAGES_PUBLISHED_METRIC(Tag), [backends, Tag, messages_published]). |
56 |
|
-define(MESSAGES_FAILED_METRIC(Tag), [backends, Tag, messages_failed]). |
57 |
|
-define(MESSAGES_TIMEOUT_METRIC(Tag), [backends, Tag, messages_timeout]). |
58 |
|
-define(MESSAGE_PUBLISH_TIME_METRIC(Tag), |
59 |
|
[backends, Tag, message_publish_time]). |
60 |
|
-define(MESSAGE_PAYLOAD_SIZE_METRIC(Tag), |
61 |
|
[backends, Tag, message_payload_size]). |
62 |
|
|
63 |
|
%%%=================================================================== |
64 |
|
%%% API |
65 |
|
%%%=================================================================== |
66 |
|
|
67 |
|
%%-------------------------------------------------------------------- |
68 |
|
%% @doc |
69 |
|
%% Starts the RabbitMQ worker. |
70 |
|
%% @end |
71 |
|
%%-------------------------------------------------------------------- |
72 |
|
start_link() -> |
73 |
:-( |
gen_server:start_link(?MODULE, [], []). |
74 |
|
|
75 |
|
list_metrics(Tag) -> |
76 |
4 |
[{?CONNECTIONS_ACTIVE_METRIC(Tag), spiral}, |
77 |
|
{?CONNECTIONS_OPENED_METRIC(Tag), spiral}, |
78 |
|
{?CONNECTIONS_CLOSED_METRIC(Tag), spiral}, |
79 |
|
{?CONNECTIONS_FAILED_METRIC(Tag), spiral}, |
80 |
|
{?MESSAGES_PUBLISHED_METRIC(Tag), spiral}, |
81 |
|
{?MESSAGES_FAILED_METRIC(Tag), spiral}, |
82 |
|
{?MESSAGES_TIMEOUT_METRIC(Tag), spiral}, |
83 |
|
{?MESSAGE_PUBLISH_TIME_METRIC(Tag), histogram}, |
84 |
|
{?MESSAGE_PAYLOAD_SIZE_METRIC(Tag), histogram}]. |
85 |
|
|
86 |
|
%%%=================================================================== |
87 |
|
%%% gen_server callbacks |
88 |
|
%%%=================================================================== |
89 |
|
|
90 |
|
init(Opts) -> |
91 |
30 |
process_flag(trap_exit, true), |
92 |
|
%TODO: Refactor with handle_continue when OTP 21 is minimal supported version |
93 |
30 |
do_init(Opts). |
94 |
|
|
95 |
|
handle_call(Req, From, State) -> |
96 |
12 |
maybe_handle_request(fun do_handle_call/3, [Req, From, State], |
97 |
|
{reply, request_dropped, State}). |
98 |
|
|
99 |
|
handle_cast(Req, State) -> |
100 |
60 |
maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}). |
101 |
|
|
102 |
|
handle_info(Req, State) -> |
103 |
:-( |
maybe_handle_request(fun do_handle_info/2, [Req, State], {noreply, State}). |
104 |
|
|
105 |
|
terminate(_Reason, #{connection := Connection, channel := Channel, |
106 |
|
host := Host, pool_tag := PoolTag}) -> |
107 |
30 |
close_rabbit_connection(Connection, Channel, Host, PoolTag), |
108 |
30 |
ok. |
109 |
|
|
110 |
|
%%%=================================================================== |
111 |
|
%%% Internal functions |
112 |
|
%%%=================================================================== |
113 |
|
|
114 |
|
do_init(Opts) -> |
115 |
30 |
Host = proplists:get_value(host_type, Opts), |
116 |
30 |
PoolTag = proplists:get_value(pool_tag, Opts), |
117 |
30 |
AMQPClientOpts = proplists:get_value(amqp_client_opts, Opts), |
118 |
30 |
{Connection, Channel} = |
119 |
|
establish_rabbit_connection(AMQPClientOpts, Host, PoolTag), |
120 |
30 |
IsConfirmEnabled = maybe_enable_confirms(Channel, Opts), |
121 |
30 |
MaxMsgQueueLen = proplists:get_value(max_queue_len, Opts), |
122 |
30 |
{ok, #{host => Host, amqp_client_opts => AMQPClientOpts, |
123 |
|
connection => Connection, channel => Channel, |
124 |
|
confirms => IsConfirmEnabled, max_queue_len => MaxMsgQueueLen, |
125 |
|
pool_tag => PoolTag}}. |
126 |
|
|
127 |
|
do_handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) -> |
128 |
12 |
try amqp_channel:call(Channel, Method) of |
129 |
|
Res -> |
130 |
12 |
{reply, {ok, Res}, State} |
131 |
|
catch |
132 |
|
Error:Reason -> |
133 |
:-( |
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(State), |
134 |
:-( |
{reply, {Error, Reason}, State#{connection := FreshConn, |
135 |
|
channel := FreshChann}} |
136 |
|
end. |
137 |
|
|
138 |
|
do_handle_cast({amqp_publish, Method, Payload}, State) -> |
139 |
60 |
handle_amqp_publish(Method, Payload, State). |
140 |
|
|
141 |
|
do_handle_info(Req, State) -> |
142 |
:-( |
?UNEXPECTED_INFO(Req), |
143 |
:-( |
{noreply, State}. |
144 |
|
|
145 |
|
-spec handle_amqp_publish(Method :: mongoose_amqp:method(), |
146 |
|
Payload :: mongoose_amqp:message(), |
147 |
|
Opts :: worker_opts()) -> {noreply, worker_opts()}. |
148 |
|
handle_amqp_publish(Method, Payload, Opts = #{host := Host, |
149 |
|
pool_tag := PoolTag}) -> |
150 |
60 |
{PublishTime, Result} = |
151 |
|
timer:tc(fun publish_message_and_wait_for_confirm/3, |
152 |
|
[Method, Payload, Opts]), |
153 |
60 |
case Result of |
154 |
|
true -> |
155 |
60 |
update_messages_published_metrics(Host, PoolTag, PublishTime, |
156 |
|
Payload), |
157 |
60 |
?LOG_DEBUG(#{what => rabbit_message_sent, |
158 |
60 |
method => Method, payload => Payload, opts => Opts}), |
159 |
60 |
{noreply, Opts}; |
160 |
|
false -> |
161 |
:-( |
update_messages_failed_metrics(Host, PoolTag), |
162 |
:-( |
?LOG_WARNING(#{what => rabbit_message_sent_failed, reason => negative_ack, |
163 |
:-( |
method => Method, payload => Payload, opts => Opts}), |
164 |
:-( |
{noreply, Opts}; |
165 |
|
{channel_exception, Error, Reason} -> |
166 |
:-( |
update_messages_failed_metrics(Host, PoolTag), |
167 |
:-( |
?LOG_ERROR(#{what => rabbit_message_sent_failed, |
168 |
|
class => Error, reason => Reason, |
169 |
:-( |
method => Method, payload => Payload, opts => Opts}), |
170 |
:-( |
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(Opts), |
171 |
:-( |
{noreply, Opts#{connection := FreshConn, channel := FreshChann}}; |
172 |
|
timeout -> |
173 |
:-( |
update_messages_timeout_metrics(Host, PoolTag), |
174 |
:-( |
?LOG_ERROR(#{what => rabbit_message_sent_failed, reason => timeout, |
175 |
:-( |
method => Method, payload => Payload, opts => Opts}), |
176 |
:-( |
{noreply, Opts} |
177 |
|
end. |
178 |
|
|
179 |
|
-spec publish_message_and_wait_for_confirm(Method :: mongoose_amqp:method(), |
180 |
|
Payload :: mongoose_amqp:message(), |
181 |
|
worker_opts()) -> |
182 |
|
boolean() | timeout | channel_exception. |
183 |
|
publish_message_and_wait_for_confirm(Method, Payload, |
184 |
|
#{channel := Channel, |
185 |
|
confirms := IsConfirmEnabled}) -> |
186 |
60 |
try amqp_channel:call(Channel, Method, Payload) of |
187 |
|
_Res -> |
188 |
60 |
maybe_wait_for_confirms(Channel, IsConfirmEnabled) |
189 |
|
catch |
190 |
:-( |
Error:Reason -> {channel_exception, Error, Reason} |
191 |
|
end. |
192 |
|
|
193 |
|
-spec maybe_wait_for_confirms(Channel :: pid(), boolean()) -> |
194 |
|
boolean() | timeout. |
195 |
|
maybe_wait_for_confirms(Channel, true) -> |
196 |
:-( |
amqp_channel:wait_for_confirms(Channel); |
197 |
60 |
maybe_wait_for_confirms(_, _) -> true. |
198 |
|
|
199 |
|
-spec maybe_restart_rabbit_connection(worker_opts()) -> {pid(), pid()}. |
200 |
|
maybe_restart_rabbit_connection(#{connection := Conn, host := Host, |
201 |
|
pool_tag := PoolTag, |
202 |
|
amqp_client_opts := AMQPOpts}) -> |
203 |
:-( |
case is_process_alive(Conn) of |
204 |
|
true -> |
205 |
:-( |
{ok, Channel} = amqp_connection:open_channel(Conn), |
206 |
:-( |
{Conn, Channel}; |
207 |
|
false -> |
208 |
:-( |
establish_rabbit_connection(AMQPOpts, Host, PoolTag) |
209 |
|
end. |
210 |
|
|
211 |
|
-spec establish_rabbit_connection(Opts :: mongoose_amqp:network_params(), |
212 |
|
Host :: jid:server(), PoolTag :: atom()) |
213 |
|
-> {pid(), pid()}. |
214 |
|
establish_rabbit_connection(AMQPOpts, Host, PoolTag) -> |
215 |
30 |
case amqp_connection:start(AMQPOpts) of |
216 |
|
{ok, Connection} -> |
217 |
30 |
update_success_connections_metrics(Host, PoolTag), |
218 |
30 |
{ok, Channel} = amqp_connection:open_channel(Connection), |
219 |
30 |
?LOG_DEBUG(#{what => rabbit_connection_established, |
220 |
30 |
server => Host, pool_tag => PoolTag, opts => AMQPOpts}), |
221 |
30 |
{Connection, Channel}; |
222 |
|
{error, Error} -> |
223 |
:-( |
update_failed_connections_metrics(Host, PoolTag), |
224 |
:-( |
?LOG_ERROR(#{what => rabbit_connection_failed, reason => Error, |
225 |
:-( |
server => Host, pool_tag => PoolTag, opts => AMQPOpts}), |
226 |
:-( |
exit("connection to a Rabbit server failed") |
227 |
|
end. |
228 |
|
|
229 |
|
-spec close_rabbit_connection(Connection :: pid(), Channel :: pid(), |
230 |
|
Host :: jid:server(), PoolTag :: atom()) -> |
231 |
|
ok | no_return(). |
232 |
|
close_rabbit_connection(Connection, Channel, Host, PoolTag) -> |
233 |
30 |
update_closed_connections_metrics(Host, PoolTag), |
234 |
30 |
try amqp_channel:close(Channel) |
235 |
|
catch |
236 |
:-( |
_Error:_Reason -> already_closed |
237 |
|
end, |
238 |
30 |
amqp_connection:close(Connection). |
239 |
|
|
240 |
|
-spec maybe_enable_confirms(Channel :: pid(), proplists:proplist()) -> |
241 |
|
boolean() | no_return(). |
242 |
|
maybe_enable_confirms(Channel, Opts) -> |
243 |
30 |
case proplists:get_value(confirms, Opts) of |
244 |
|
true -> |
245 |
:-( |
ConfirmCallRes = mongoose_amqp:confirm_select_ok(), |
246 |
:-( |
ConfirmCallRes = |
247 |
|
amqp_channel:call(Channel, mongoose_amqp:confirm_select()), |
248 |
:-( |
true; |
249 |
|
false -> |
250 |
30 |
false |
251 |
|
end. |
252 |
|
|
253 |
|
-spec update_messages_published_metrics(Host :: jid:server(), |
254 |
|
PoolTag :: atom(), |
255 |
|
PublishTime :: non_neg_integer(), |
256 |
|
Message :: mongoose_amqp:message()) -> |
257 |
|
any(). |
258 |
|
update_messages_published_metrics(Host, PoolTag, PublishTime, Payload) -> |
259 |
60 |
mongoose_metrics:update(Host, ?MESSAGES_PUBLISHED_METRIC(PoolTag), 1), |
260 |
60 |
mongoose_metrics:update(Host, ?MESSAGE_PUBLISH_TIME_METRIC(PoolTag), |
261 |
|
PublishTime), |
262 |
60 |
mongoose_metrics:update(Host, ?MESSAGE_PAYLOAD_SIZE_METRIC(PoolTag), |
263 |
|
byte_size(term_to_binary(Payload))). |
264 |
|
|
265 |
|
-spec update_messages_failed_metrics(Host :: jid:server(), PoolTag :: atom()) |
266 |
|
-> any(). |
267 |
|
update_messages_failed_metrics(Host, PoolTag) -> |
268 |
:-( |
mongoose_metrics:update(Host, ?MESSAGES_FAILED_METRIC(PoolTag), 1). |
269 |
|
|
270 |
|
-spec update_messages_timeout_metrics(Host :: jid:server(), PoolTag :: atom()) |
271 |
|
-> any(). |
272 |
|
update_messages_timeout_metrics(Host, PoolTag) -> |
273 |
:-( |
mongoose_metrics:update(Host, ?MESSAGES_TIMEOUT_METRIC(PoolTag), 1). |
274 |
|
|
275 |
|
-spec update_success_connections_metrics(Host :: jid:server(), PoolTag :: atom()) |
276 |
|
-> any(). |
277 |
|
update_success_connections_metrics(Host, PoolTag) -> |
278 |
30 |
mongoose_metrics:update(Host, ?CONNECTIONS_ACTIVE_METRIC(PoolTag), 1), |
279 |
30 |
mongoose_metrics:update(Host, ?CONNECTIONS_OPENED_METRIC(PoolTag), 1). |
280 |
|
|
281 |
|
-spec update_failed_connections_metrics(Host :: jid:server(), PoolTag :: atom()) |
282 |
|
-> any(). |
283 |
|
update_failed_connections_metrics(Host, PoolTag) -> |
284 |
:-( |
mongoose_metrics:update(Host, ?CONNECTIONS_FAILED_METRIC(PoolTag), 1). |
285 |
|
|
286 |
|
-spec update_closed_connections_metrics(Host :: jid:server(), PoolTag :: atom()) |
287 |
|
-> any(). |
288 |
|
update_closed_connections_metrics(Host, PoolTag) -> |
289 |
30 |
mongoose_metrics:update(Host, ?CONNECTIONS_ACTIVE_METRIC(PoolTag), -1), |
290 |
30 |
mongoose_metrics:update(Host, ?CONNECTIONS_CLOSED_METRIC(PoolTag), 1). |
291 |
|
|
292 |
|
-spec maybe_handle_request(Callback :: function(), Args :: [term()], |
293 |
|
Reply :: term()) -> term(). |
294 |
|
maybe_handle_request(Callback, Args, Reply) -> |
295 |
72 |
State = lists:last(Args), |
296 |
72 |
Limit = maps:get(max_queue_len, State), |
297 |
72 |
case is_msq_queue_max_limit_reached(Limit) of |
298 |
|
false -> |
299 |
72 |
apply(Callback, Args); |
300 |
|
true -> |
301 |
:-( |
?LOG_WARNING(#{what => rabbit_worker_request_dropped, |
302 |
|
reason => queue_message_length_limit_reached, |
303 |
:-( |
limit => Limit}), |
304 |
:-( |
Reply |
305 |
|
end. |
306 |
|
|
307 |
|
-spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) -> |
308 |
|
boolean(). |
309 |
:-( |
is_msq_queue_max_limit_reached(infinity) -> false; |
310 |
|
is_msq_queue_max_limit_reached(Limit) -> |
311 |
72 |
case process_info(self(), message_queue_len) of |
312 |
|
{_, QueueLen} when QueueLen > Limit -> |
313 |
:-( |
true; |
314 |
|
_Else -> |
315 |
72 |
false |
316 |
|
end. |