./ct_report/coverage/mongoose_rabbit_worker.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%
16 %% @doc
17 %% Generic RabbitMQ worker. The module is responsible for interacting with
18 %% RabbitMQ server. Worker holds its own AMQP connection and channel in its
19 %% state.
20 %% @end
21 %%==============================================================================
22
23 -module(mongoose_rabbit_worker).
24 -author('kacper.mentel@erlang-solutions.com').
25
26 -include("mongoose.hrl").
27
28 -behaviour(gen_server).
29
30 -export([start_link/0, list_metrics/1]).
31
32 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
33 terminate/2]).
34
35 -ignore_xref([start_link/0]).
36
37 -type state() :: #{amqp_client_opts := mongoose_amqp:network_params(),
38 connection := pid(),
39 channel := pid(),
40 host := binary(),
41 pool_tag := atom(),
42 confirms := boolean(),
43 max_queue_len := non_neg_integer() | infinity}.
44
45 -type worker_opts() :: state().
46
47 %%%===================================================================
48 %%% Metrics
49 %%%===================================================================
50
51 -define(CONNECTIONS_ACTIVE_METRIC(Tag), [backends, Tag, connections_active]).
52 -define(CONNECTIONS_OPENED_METRIC(Tag), [backends, Tag, connections_opened]).
53 -define(CONNECTIONS_CLOSED_METRIC(Tag), [backends, Tag, connections_closed]).
54 -define(CONNECTIONS_FAILED_METRIC(Tag), [backends, Tag, connections_failed]).
55 -define(MESSAGES_PUBLISHED_METRIC(Tag), [backends, Tag, messages_published]).
56 -define(MESSAGES_FAILED_METRIC(Tag), [backends, Tag, messages_failed]).
57 -define(MESSAGES_TIMEOUT_METRIC(Tag), [backends, Tag, messages_timeout]).
58 -define(MESSAGE_PUBLISH_TIME_METRIC(Tag),
59 [backends, Tag, message_publish_time]).
60 -define(MESSAGE_PAYLOAD_SIZE_METRIC(Tag),
61 [backends, Tag, message_payload_size]).
62
63 %%%===================================================================
64 %%% API
65 %%%===================================================================
66
67 %%--------------------------------------------------------------------
68 %% @doc
69 %% Starts the RabbitMQ worker.
70 %% @end
71 %%--------------------------------------------------------------------
72 start_link() ->
73
:-(
gen_server:start_link(?MODULE, [], []).
74
75 list_metrics(Tag) ->
76 4 [{?CONNECTIONS_ACTIVE_METRIC(Tag), spiral},
77 {?CONNECTIONS_OPENED_METRIC(Tag), spiral},
78 {?CONNECTIONS_CLOSED_METRIC(Tag), spiral},
79 {?CONNECTIONS_FAILED_METRIC(Tag), spiral},
80 {?MESSAGES_PUBLISHED_METRIC(Tag), spiral},
81 {?MESSAGES_FAILED_METRIC(Tag), spiral},
82 {?MESSAGES_TIMEOUT_METRIC(Tag), spiral},
83 {?MESSAGE_PUBLISH_TIME_METRIC(Tag), histogram},
84 {?MESSAGE_PAYLOAD_SIZE_METRIC(Tag), histogram}].
85
86 %%%===================================================================
87 %%% gen_server callbacks
88 %%%===================================================================
89
90 init(Opts) ->
91 30 process_flag(trap_exit, true),
92 %TODO: Refactor with handle_continue when OTP 21 is minimal supported version
93 30 do_init(Opts).
94
95 handle_call(Req, From, State) ->
96 12 maybe_handle_request(fun do_handle_call/3, [Req, From, State],
97 {reply, request_dropped, State}).
98
99 handle_cast(Req, State) ->
100 64 maybe_handle_request(fun do_handle_cast/2, [Req, State], {noreply, State}).
101
102 handle_info(Req, State) ->
103
:-(
maybe_handle_request(fun do_handle_info/2, [Req, State], {noreply, State}).
104
105 terminate(_Reason, #{connection := Connection, channel := Channel,
106 host := Host, pool_tag := PoolTag}) ->
107 30 close_rabbit_connection(Connection, Channel, Host, PoolTag),
108 30 ok.
109
110 %%%===================================================================
111 %%% Internal functions
112 %%%===================================================================
113
114 do_init(Opts) ->
115 30 Host = proplists:get_value(host_type, Opts),
116 30 PoolTag = proplists:get_value(pool_tag, Opts),
117 30 AMQPClientOpts = proplists:get_value(amqp_client_opts, Opts),
118 30 {Connection, Channel} =
119 establish_rabbit_connection(AMQPClientOpts, Host, PoolTag),
120 30 IsConfirmEnabled = maybe_enable_confirms(Channel, Opts),
121 30 MaxMsgQueueLen = proplists:get_value(max_queue_len, Opts),
122 30 {ok, #{host => Host, amqp_client_opts => AMQPClientOpts,
123 connection => Connection, channel => Channel,
124 confirms => IsConfirmEnabled, max_queue_len => MaxMsgQueueLen,
125 pool_tag => PoolTag}}.
126
127 do_handle_call({amqp_call, Method}, _From, State = #{channel := Channel}) ->
128 12 try amqp_channel:call(Channel, Method) of
129 Res ->
130 12 {reply, {ok, Res}, State}
131 catch
132 Error:Reason ->
133
:-(
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(State),
134
:-(
{reply, {Error, Reason}, State#{connection := FreshConn,
135 channel := FreshChann}}
136 end.
137
138 do_handle_cast({amqp_publish, Method, Payload}, State) ->
139 64 handle_amqp_publish(Method, Payload, State).
140
141 do_handle_info(Req, State) ->
142
:-(
?UNEXPECTED_INFO(Req),
143
:-(
{noreply, State}.
144
145 -spec handle_amqp_publish(Method :: mongoose_amqp:method(),
146 Payload :: mongoose_amqp:message(),
147 Opts :: worker_opts()) -> {noreply, worker_opts()}.
148 handle_amqp_publish(Method, Payload, Opts = #{host := Host,
149 pool_tag := PoolTag}) ->
150 64 {PublishTime, Result} =
151 timer:tc(fun publish_message_and_wait_for_confirm/3,
152 [Method, Payload, Opts]),
153 64 case Result of
154 true ->
155 64 update_messages_published_metrics(Host, PoolTag, PublishTime,
156 Payload),
157 64 ?LOG_DEBUG(#{what => rabbit_message_sent,
158 64 method => Method, payload => Payload, opts => Opts}),
159 64 {noreply, Opts};
160 false ->
161
:-(
update_messages_failed_metrics(Host, PoolTag),
162
:-(
?LOG_WARNING(#{what => rabbit_message_sent_failed, reason => negative_ack,
163
:-(
method => Method, payload => Payload, opts => Opts}),
164
:-(
{noreply, Opts};
165 {channel_exception, Error, Reason} ->
166
:-(
update_messages_failed_metrics(Host, PoolTag),
167
:-(
?LOG_ERROR(#{what => rabbit_message_sent_failed,
168 class => Error, reason => Reason,
169
:-(
method => Method, payload => Payload, opts => Opts}),
170
:-(
{FreshConn, FreshChann} = maybe_restart_rabbit_connection(Opts),
171
:-(
{noreply, Opts#{connection := FreshConn, channel := FreshChann}};
172 timeout ->
173
:-(
update_messages_timeout_metrics(Host, PoolTag),
174
:-(
?LOG_ERROR(#{what => rabbit_message_sent_failed, reason => timeout,
175
:-(
method => Method, payload => Payload, opts => Opts}),
176
:-(
{noreply, Opts}
177 end.
178
179 -spec publish_message_and_wait_for_confirm(Method :: mongoose_amqp:method(),
180 Payload :: mongoose_amqp:message(),
181 worker_opts()) ->
182 boolean() | timeout | channel_exception.
183 publish_message_and_wait_for_confirm(Method, Payload,
184 #{channel := Channel,
185 confirms := IsConfirmEnabled}) ->
186 64 try amqp_channel:call(Channel, Method, Payload) of
187 _Res ->
188 64 maybe_wait_for_confirms(Channel, IsConfirmEnabled)
189 catch
190
:-(
Error:Reason -> {channel_exception, Error, Reason}
191 end.
192
193 -spec maybe_wait_for_confirms(Channel :: pid(), boolean()) ->
194 boolean() | timeout.
195 maybe_wait_for_confirms(Channel, true) ->
196
:-(
amqp_channel:wait_for_confirms(Channel);
197 64 maybe_wait_for_confirms(_, _) -> true.
198
199 -spec maybe_restart_rabbit_connection(worker_opts()) -> {pid(), pid()}.
200 maybe_restart_rabbit_connection(#{connection := Conn, host := Host,
201 pool_tag := PoolTag,
202 amqp_client_opts := AMQPOpts}) ->
203
:-(
case is_process_alive(Conn) of
204 true ->
205
:-(
{ok, Channel} = amqp_connection:open_channel(Conn),
206
:-(
{Conn, Channel};
207 false ->
208
:-(
establish_rabbit_connection(AMQPOpts, Host, PoolTag)
209 end.
210
211 -spec establish_rabbit_connection(Opts :: mongoose_amqp:network_params(),
212 Host :: jid:server(), PoolTag :: atom())
213 -> {pid(), pid()}.
214 establish_rabbit_connection(AMQPOpts, Host, PoolTag) ->
215 30 case amqp_connection:start(AMQPOpts) of
216 {ok, Connection} ->
217 30 update_success_connections_metrics(Host, PoolTag),
218 30 {ok, Channel} = amqp_connection:open_channel(Connection),
219 30 ?LOG_DEBUG(#{what => rabbit_connection_established,
220 30 server => Host, pool_tag => PoolTag, opts => AMQPOpts}),
221 30 {Connection, Channel};
222 {error, Error} ->
223
:-(
update_failed_connections_metrics(Host, PoolTag),
224
:-(
?LOG_ERROR(#{what => rabbit_connection_failed, reason => Error,
225
:-(
server => Host, pool_tag => PoolTag, opts => AMQPOpts}),
226
:-(
exit("connection to a Rabbit server failed")
227 end.
228
229 -spec close_rabbit_connection(Connection :: pid(), Channel :: pid(),
230 Host :: jid:server(), PoolTag :: atom()) ->
231 ok | no_return().
232 close_rabbit_connection(Connection, Channel, Host, PoolTag) ->
233 30 update_closed_connections_metrics(Host, PoolTag),
234 30 try amqp_channel:close(Channel)
235 catch
236
:-(
_Error:_Reason -> already_closed
237 end,
238 30 amqp_connection:close(Connection).
239
240 -spec maybe_enable_confirms(Channel :: pid(), proplists:proplist()) ->
241 boolean() | no_return().
242 maybe_enable_confirms(Channel, Opts) ->
243 30 case proplists:get_value(confirms, Opts) of
244 true ->
245
:-(
ConfirmCallRes = mongoose_amqp:confirm_select_ok(),
246
:-(
ConfirmCallRes =
247 amqp_channel:call(Channel, mongoose_amqp:confirm_select()),
248
:-(
true;
249 false ->
250 30 false
251 end.
252
253 -spec update_messages_published_metrics(Host :: jid:server(),
254 PoolTag :: atom(),
255 PublishTime :: non_neg_integer(),
256 Message :: mongoose_amqp:message()) ->
257 any().
258 update_messages_published_metrics(Host, PoolTag, PublishTime, Payload) ->
259 64 mongoose_metrics:update(Host, ?MESSAGES_PUBLISHED_METRIC(PoolTag), 1),
260 64 mongoose_metrics:update(Host, ?MESSAGE_PUBLISH_TIME_METRIC(PoolTag),
261 PublishTime),
262 64 mongoose_metrics:update(Host, ?MESSAGE_PAYLOAD_SIZE_METRIC(PoolTag),
263 byte_size(term_to_binary(Payload))).
264
265 -spec update_messages_failed_metrics(Host :: jid:server(), PoolTag :: atom())
266 -> any().
267 update_messages_failed_metrics(Host, PoolTag) ->
268
:-(
mongoose_metrics:update(Host, ?MESSAGES_FAILED_METRIC(PoolTag), 1).
269
270 -spec update_messages_timeout_metrics(Host :: jid:server(), PoolTag :: atom())
271 -> any().
272 update_messages_timeout_metrics(Host, PoolTag) ->
273
:-(
mongoose_metrics:update(Host, ?MESSAGES_TIMEOUT_METRIC(PoolTag), 1).
274
275 -spec update_success_connections_metrics(Host :: jid:server(), PoolTag :: atom())
276 -> any().
277 update_success_connections_metrics(Host, PoolTag) ->
278 30 mongoose_metrics:update(Host, ?CONNECTIONS_ACTIVE_METRIC(PoolTag), 1),
279 30 mongoose_metrics:update(Host, ?CONNECTIONS_OPENED_METRIC(PoolTag), 1).
280
281 -spec update_failed_connections_metrics(Host :: jid:server(), PoolTag :: atom())
282 -> any().
283 update_failed_connections_metrics(Host, PoolTag) ->
284
:-(
mongoose_metrics:update(Host, ?CONNECTIONS_FAILED_METRIC(PoolTag), 1).
285
286 -spec update_closed_connections_metrics(Host :: jid:server(), PoolTag :: atom())
287 -> any().
288 update_closed_connections_metrics(Host, PoolTag) ->
289 30 mongoose_metrics:update(Host, ?CONNECTIONS_ACTIVE_METRIC(PoolTag), -1),
290 30 mongoose_metrics:update(Host, ?CONNECTIONS_CLOSED_METRIC(PoolTag), 1).
291
292 -spec maybe_handle_request(Callback :: function(), Args :: [term()],
293 Reply :: term()) -> term().
294 maybe_handle_request(Callback, Args, Reply) ->
295 76 State = lists:last(Args),
296 76 Limit = maps:get(max_queue_len, State),
297 76 case is_msq_queue_max_limit_reached(Limit) of
298 false ->
299 76 apply(Callback, Args);
300 true ->
301
:-(
?LOG_WARNING(#{what => rabbit_worker_request_dropped,
302 reason => queue_message_length_limit_reached,
303
:-(
limit => Limit}),
304
:-(
Reply
305 end.
306
307 -spec is_msq_queue_max_limit_reached(Limit :: infinity | non_neg_integer()) ->
308 boolean().
309
:-(
is_msq_queue_max_limit_reached(infinity) -> false;
310 is_msq_queue_max_limit_reached(Limit) ->
311 76 case process_info(self(), message_queue_len) of
312 {_, QueueLen} when QueueLen > Limit ->
313
:-(
true;
314 _Else ->
315 76 false
316 end.
Line Hits Source