./ct_report/coverage/mongoose_rabbit_worker.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc
17 %% Generic RabbitMQ worker. The module is responsible for interacting with
18 %% RabbitMQ server. Worker holds its own AMQP connection and channel in its
19 %% state.
20 %% @end
21 %%==============================================================================
22
23 -module(mongoose_rabbit_worker).
24 -author('kacper.mentel@erlang-solutions.com').
25
26 -include("mongoose.hrl").
27
28 -behaviour(gen_server).
29
30 -export([start_link/0, list_metrics/1]).
31
32 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
33 terminate/2]).
34
35 -ignore_xref([start_link/0]).
36
37 -type state() :: #{amqp_client_opts := mongoose_amqp:network_params(),
38 connection := pid(),
39 channel := pid(),
40 host := binary(),
41 pool_tag := atom(),
42 confirms := boolean(),
43 max_queue_len := non_neg_integer() | infinity}.
44
45 -type worker_opts() :: state().
46
47 %%%===================================================================
48 %%% Metrics
49 %%%===================================================================
50
51 -define(CONNECTIONS_ACTIVE_METRIC(Tag), [backends, Tag, connections_active]).
52 -define(CONNECTIONS_OPENED_METRIC(Tag), [backends, Tag, connections_opened]).
53 -define(CONNECTIONS_CLOSED_METRIC(Tag), [backends, Tag, connections_closed]).
54 -define(CONNECTIONS_FAILED_METRIC(Tag), [backends, Tag, connections_failed]).
55 -define(MESSAGES_PUBLISHED_METRIC(Tag), [backends, Tag, messages_published]).
56 -define(MESSAGES_FAILED_METRIC(Tag), [backends, Tag, messages_failed]).
57 -define(MESSAGES_TIMEOUT_METRIC(Tag), [backends, Tag, messages_timeout]).
58 -define(MESSAGE_PUBLISH_TIME_METRIC(Tag),
59 [backends, Tag, message_publish_time]).
60 -define(MESSAGE_PAYLOAD_SIZE_METRIC(Tag),
61 [backends, Tag, message_payload_size]).
62
63 %%%===================================================================
64 %%% API
65 %%%===================================================================
66
67 %%--------------------------------------------------------------------
68 %% @doc
69 %% Starts the RabbitMQ worker.
70 %% @end
71 %%--------------------------------------------------------------------
72 start_link() ->
73
:-(
gen_server:start_link(?MODULE, [], []).
74
75 list_metrics(Tag) ->
76
:-(
[{?CONNECTIONS_ACTIVE_METRIC(Tag), spiral},
77 {?CONNECTIONS_OPENED_METRIC(Tag), spiral},
78 {?CONNECTIONS_CLOSED_METRIC(Tag), spiral},
79 {?CONNECTIONS_FAILED_METRIC(Tag), spiral},
80 {?MESSAGES_PUBLISHED_METRIC(Tag), spiral},
81 {?MESSAGES_FAILED_METRIC(Tag), spiral},
82 {?MESSAGES_TIMEOUT_METRIC(Tag), spiral},
83 {?MESSAGE_PUBLISH_TIME_METRIC(Tag), histogram},
84 {?MESSAGE_PAYLOAD_SIZE_METRIC(Tag), histogram}].
85
86 %%%===================================================================
87 %%% gen_server callbacks
88 %%%===================================================================
89
90 init(Opts) ->
91 1 process_flag(trap_exit, true),
92 %TODO: Refactor with handle_continue when OTP 21 is minimal supported version
93 1 do_init(Opts).
94
95 handle_call(Req, From, State) ->
96
:-(
maybe_handle_request(fun do_handle_call/3, [Req, From, State],
97 {reply, request_dropped, State}).
98
99 handle_cast(Req, State) ->
100
:-(
maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}).
101
102 handle_info(Req, State) ->
103
:-(
maybe_handle_request(fun do_handle_info/2, [Req, State], {noreply, State}).
104
105 terminate(_Reason, #{connection := Connection, channel := Channel,
106 host := Host, pool_tag := PoolTag}) ->
107
:-(
close_rabbit_connection(Connection, Channel, Host, PoolTag),
108
:-(
ok.
109
110 %%%===================================================================
111 %%% Internal functions
112 %%%===================================================================
113
114 do_init(Opts) ->
115 1 Host = proplists:get_value(host, Opts),
116 1 PoolTag = proplists:get_value(pool_tag, Opts),
117 1 AMQPClientOpts = proplists:get_value(amqp_client_opts, Opts),
118 1 {Connection, Channel} =
119 establish_rabbit_connection(AMQPClientOpts, Host, PoolTag),
120
:-(
IsConfirmEnabled = maybe_enable_confirms(Channel, Opts),
121
:-(
MaxMsgQueueLen = proplists:get_value(max_queue_len, Opts),
122
:-(
{ok, #{host => Host, amqp_client_opts => AMQPClientOpts,
123 connection => Connection, channel => Channel,
124 confirms => IsConfirmEnabled, max_queue_len => MaxMsgQueueLen,
125 pool_tag => PoolTag}}.
126
127 do_handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) ->
128
:-(
try amqp_channel:call(Channel, Method) of
129 Res ->
130
:-(
{reply, {ok, Res}, State}
131 catch
132 Error:Reason ->
133
:-(
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(State),
134
:-(
{reply, {Error, Reason}, State#{connection := FreshConn,
135 channel := FreshChann}}
136 end.
137
138 do_handle_cast({amqp_publish, Method, Payload}, State) ->
139
:-(
handle_amqp_publish(Method, Payload, State).
140
141 do_handle_info(Req, State) ->
142
:-(
?UNEXPECTED_INFO(Req),
143
:-(
{noreply, State}.
144
145 -spec handle_amqp_publish(Method :: mongoose_amqp:method(),
146 Payload :: mongoose_amqp:message(),
147 Opts :: worker_opts()) -> {noreply, worker_opts()}.
148 handle_amqp_publish(Method, Payload, Opts = #{host := Host,
149 pool_tag := PoolTag}) ->
150
:-(
{PublishTime, Result} =
151 timer:tc(fun publish_message_and_wait_for_confirm/3,
152 [Method, Payload, Opts]),
153
:-(
case Result of
154 true ->
155
:-(
update_messages_published_metrics(Host, PoolTag, PublishTime,
156 Payload),
157
:-(
?LOG_DEBUG(#{what => rabbit_message_sent,
158
:-(
method => Method, payload => Payload, opts => Opts}),
159
:-(
{noreply, Opts};
160 false ->
161
:-(
update_messages_failed_metrics(Host, PoolTag),
162
:-(
?LOG_WARNING(#{what => rabbit_message_sent_failed, reason => negative_ack,
163
:-(
method => Method, payload => Payload, opts => Opts}),
164
:-(
{noreply, Opts};
165 {channel_exception, Error, Reason} ->
166
:-(
update_messages_failed_metrics(Host, PoolTag),
167
:-(
?LOG_ERROR(#{what => rabbit_message_sent_failed,
168 class => Error, reason => Reason,
169
:-(
method => Method, payload => Payload, opts => Opts}),
170
:-(
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(Opts),
171
:-(
{noreply, Opts#{connection := FreshConn, channel := FreshChann}};
172 timeout ->
173
:-(
update_messages_timeout_metrics(Host, PoolTag),
174
:-(
?LOG_ERROR(#{what => rabbit_message_sent_failed, reason => timeout,
175
:-(
method => Method, payload => Payload, opts => Opts}),
176
:-(
{noreply, Opts}
177 end.
178
179 -spec publish_message_and_wait_for_confirm(Method :: mongoose_amqp:method(),
180 Payload :: mongoose_amqp:message(),
181 worker_opts()) ->
182 boolean() | timeout | channel_exception.
183 publish_message_and_wait_for_confirm(Method, Payload,
184 #{channel := Channel,
185 confirms := IsConfirmEnabled}) ->
186
:-(
try amqp_channel:call(Channel, Method, Payload) of
187 _Res ->
188
:-(
maybe_wait_for_confirms(Channel, IsConfirmEnabled)
189 catch
190
:-(
Error:Reason -> {channel_exception, Error, Reason}
191 end.
192
193 -spec maybe_wait_for_confirms(Channel :: pid(), boolean()) ->
194 boolean() | timeout.
195 maybe_wait_for_confirms(Channel, true) ->
196
:-(
amqp_channel:wait_for_confirms(Channel);
197
:-(
maybe_wait_for_confirms(_, _) -> true.
198
199 -spec maybe_restart_rabbit_connection(worker_opts()) -> {pid(), pid()}.
200 maybe_restart_rabbit_connection(#{connection := Conn, host := Host,
201 pool_tag := PoolTag,
202 amqp_client_opts := AMQPOpts}) ->
203
:-(
case is_process_alive(Conn) of
204 true ->
205
:-(
{Conn, amqp_connection:open_channel(Conn)};
206 false ->
207
:-(
establish_rabbit_connection(AMQPOpts, Host, PoolTag)
208 end.
209
210 -spec establish_rabbit_connection(Opts :: mongoose_amqp:network_params(),
211 Host :: jid:server(), PoolTag :: atom())
212 -> {pid(), pid()}.
213 establish_rabbit_connection(AMQPOpts, Host, PoolTag) ->
214 1 case amqp_connection:start(AMQPOpts) of
215 {ok, Connection} ->
216
:-(
update_success_connections_metrics(Host, PoolTag),
217
:-(
{ok, Channel} = amqp_connection:open_channel(Connection),
218
:-(
?LOG_DEBUG(#{what => rabbit_connection_established,
219
:-(
server => Host, pool_tag => PoolTag, opts => AMQPOpts}),
220
:-(
{Connection, Channel};
221 {error, Error} ->
222 1 update_failed_connections_metrics(Host, PoolTag),
223 1 ?LOG_ERROR(#{what => rabbit_connection_failed, reason => Error,
224
:-(
server => Host, pool_tag => PoolTag, opts => AMQPOpts}),
225 1 exit("connection to a Rabbit server failed")
226 end.
227
228 -spec close_rabbit_connection(Connection :: pid(), Channel :: pid(),
229 Host :: jid:server(), PoolTag :: atom()) ->
230 ok | no_return().
231 close_rabbit_connection(Connection, Channel, Host, PoolTag) ->
232
:-(
update_closed_connections_metrics(Host, PoolTag),
233
:-(
try amqp_channel:close(Channel)
234 catch
235
:-(
_Error:_Reason -> already_closed
236 end,
237
:-(
amqp_connection:close(Connection).
238
239 -spec maybe_enable_confirms(Channel :: pid(), proplists:proplist()) ->
240 boolean() | no_return().
241 maybe_enable_confirms(Channel, Opts) ->
242
:-(
case proplists:get_value(confirms, Opts) of
243 true ->
244
:-(
ConfirmCallRes = mongoose_amqp:confirm_select_ok(),
245
:-(
ConfirmCallRes =
246 amqp_channel:call(Channel, mongoose_amqp:confirm_select()),
247
:-(
true;
248 false ->
249
:-(
false
250 end.
251
252 -spec update_messages_published_metrics(Host :: jid:server(),
253 PoolTag :: atom(),
254 PublishTime :: non_neg_integer(),
255 Message :: mongoose_amqp:message()) ->
256 any().
257 update_messages_published_metrics(Host, PoolTag, PublishTime, Payload) ->
258
:-(
mongoose_metrics:update(Host, ?MESSAGES_PUBLISHED_METRIC(PoolTag), 1),
259
:-(
mongoose_metrics:update(Host, ?MESSAGE_PUBLISH_TIME_METRIC(PoolTag),
260 PublishTime),
261
:-(
mongoose_metrics:update(Host, ?MESSAGE_PAYLOAD_SIZE_METRIC(PoolTag),
262 byte_size(term_to_binary(Payload))).
263
264 -spec update_messages_failed_metrics(Host :: jid:server(), PoolTag :: atom())
265 -> any().
266 update_messages_failed_metrics(Host, PoolTag) ->
267
:-(
mongoose_metrics:update(Host, ?MESSAGES_FAILED_METRIC(PoolTag), 1).
268
269 -spec update_messages_timeout_metrics(Host :: jid:server(), PoolTag :: atom())
270 -> any().
271 update_messages_timeout_metrics(Host, PoolTag) ->
272
:-(
mongoose_metrics:update(Host, ?MESSAGES_TIMEOUT_METRIC(PoolTag), 1).
273
274 -spec update_success_connections_metrics(Host :: jid:server(), PoolTag :: atom())
275 -> any().
276 update_success_connections_metrics(Host, PoolTag) ->
277
:-(
mongoose_metrics:update(Host, ?CONNECTIONS_ACTIVE_METRIC(PoolTag), 1),
278
:-(
mongoose_metrics:update(Host, ?CONNECTIONS_OPENED_METRIC(PoolTag), 1).
279
280 -spec update_failed_connections_metrics(Host :: jid:server(), PoolTag :: atom())
281 -> any().
282 update_failed_connections_metrics(Host, PoolTag) ->
283 1 mongoose_metrics:update(Host, ?CONNECTIONS_FAILED_METRIC(PoolTag), 1).
284
285 -spec update_closed_connections_metrics(Host :: jid:server(), PoolTag :: atom())
286 -> any().
287 update_closed_connections_metrics(Host, PoolTag) ->
288
:-(
mongoose_metrics:update(Host, ?CONNECTIONS_ACTIVE_METRIC(PoolTag), -1),
289
:-(
mongoose_metrics:update(Host, ?CONNECTIONS_CLOSED_METRIC(PoolTag), 1).
290
291 -spec maybe_handle_request(Callback :: function(), Args :: [term()],
292 Reply :: term()) -> term().
293 maybe_handle_request(Callback, Args, Reply) ->
294
:-(
State = lists:last(Args),
295
:-(
Limit = maps:get(max_queue_len, State),
296
:-(
case is_msq_queue_max_limit_reached(Limit) of
297 false ->
298
:-(
apply(Callback, Args);
299 true ->
300
:-(
?LOG_WARNING(#{what => rabbit_worker_request_dropped,
301 reason => queue_message_length_limit_reached,
302
:-(
limit => Limit}),
303
:-(
Reply
304 end.
305
306 -spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) ->
307 boolean().
308
:-(
is_msq_queue_max_limit_reached(infinity) -> false;
309 is_msq_queue_max_limit_reached(Limit) ->
310
:-(
case process_info(self(), message_queue_len) of
311 {_, QueueLen} when QueueLen > Limit ->
312
:-(
true;
313 _Else ->
314
:-(
false
315 end.
Line Hits Source