./ct_report/coverage/mongoose_cassandra_worker.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mongoose_cassandra_worker).
17 -author('rafal.slota@erlang-solutions.com').
18
19 -include("mongoose_logger.hrl").
20 -include_lib("cqerl/include/cqerl.hrl").
21
22 %% ====================================================================
23 %% Exports
24 %% ====================================================================
25
26 %% gen_server callbacks
27 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
28 terminate/2, code_change/3]).
29
30 %% gen_server API
31 -export([write/5, write_async/5, read/7]).
32
33 -behaviour(gen_server).
34
35 %% ====================================================================
36 %% Internal records and definitions
37 %% ====================================================================
38
39 -record(state, {
40 pool_name :: mongoose_cassandra:pool_name(),
41 inflight = #{} :: #{request_id() => request()},
42 cql_tags = #{} :: #{cql_tag() => request_id()},
43 monitors = #{} :: #{cql_mref() => request_id()}
44 }).
45
46 -record(write_action, {
47 query :: cql_query(),
48 %% Rows left to send
49 pending_rows :: mongoose_cassandra:rows(),
50 %% Rows that were send to Cassandra and are awaiting response (write confirmation)
51 current_rows :: mongoose_cassandra:rows()
52 }).
53
54 -record(read_action, {
55 query :: cql_query(),
56 last_result :: cql_result() | undefined %% Last result handle from cqerl
57 }).
58
59 -record(request, {
60 id :: request_id(),
61 action :: action(),
62 opts = #{} :: options(),
63 cql_mref :: cql_mref() | undefined,
64 tag :: cql_tag() | undefined,
65 from :: caller(),
66 retry_left :: non_neg_integer(),
67 timeout_tref :: reference() | undefined
68 }).
69
70 -define(DEFAULT_OPTS, #{
71 batch_size => 20,
72 retry => 3,
73 timeout => timer:minutes(1),
74 %% cqerl options
75 consistency => one,
76 batch_mode => unlogged
77 }).
78
79
80 %% By how much (%) should we resize batch size when Cassandra returns 'batch to big' error
81 -define(BATCH_SHRINK_RATIO, 0.5).
82
83 %% ====================================================================
84 %% Types
85 %% ====================================================================
86
87 %% API types
88
89 -type options() :: #{
90 timeout => non_neg_integer(), %% milliseconds
91 retry => non_neg_integer(), %% how many retry attempts should be made
92 batch_size => non_neg_integer(), %% Due to Cassandra query size limits, this value may be
93 %% adjusted on error (last query retry will use batch size
94 consistency => consistency_level() | consistency_level_int(),
95 batch_mode => batch_mode() | batch_mode_int()
96 }.
97
98 -export_type([options/0]).
99
100 %% Internal types
101
102 -type context_id() :: term() | undefined. %% This value is used as a hash base in worker selection
103 -type query_str() :: binary().
104 -type caller() :: {pid(), term()} | undefined.
105
106 -type request_id() :: reference().
107 -type request() :: #request{}.
108 -type write_action() :: #write_action{}.
109 -type read_action() :: #read_action{}.
110 -type action() :: read_action() | write_action().
111
112 -type cql_result() :: void | #cql_result{}.
113 -type cql_query() :: #cql_query{}.
114 -type cql_mref() :: reference().
115 -type cql_tag() :: term(). %% Normally it's a reference(), but let's not depend on that
116
117 -type worker_state() :: #state{}.
118 -type process_type() :: cqerl_client.
119 -type error_type() :: {down, process_type()} | cancel | cqerl_error.
120 -type error_reason() :: {Code :: non_neg_integer(), Details :: binary(), any()} | timeout | term().
121
122
123 %%====================================================================
124 %% API functions
125 %%====================================================================
126
127
128 %% --------------------------------------------------------
129 %% @doc Execute batch write query to Cassandra (insert, update or delete).
130 %% Note that Cassandra doesn't like big batches, therefore this function will try to
131 %% split given rows into batches of 20 rows and will fall back to smaller batches if
132 %% Cassandra rejects the query due to its size being to big.
133 %% @end
134 %% --------------------------------------------------------
135 -spec write(mongoose_cassandra:pool_name(), context_id(), query_str(),
136 mongoose_cassandra:rows(), options()) ->
137 ok | {error, Reason :: term()}.
138 write(PoolName, ContextId, QueryStr, Rows, Opts) ->
139 419 Opts1 = prepare_options(Opts),
140 419 Call = {write, QueryStr, Rows, Opts1},
141 419 mongoose_cassandra_pool:call_query(PoolName, ContextId, Call).
142
143 %% --------------------------------------------------------
144 %% @doc Same as @see write/5 but asynchronous (without response).
145 %% @end
146 %% --------------------------------------------------------
147 -spec write_async(mongoose_cassandra:pool_name(), context_id(), query_str(),
148 mongoose_cassandra:rows(), options()) ->
149 ok.
150 write_async(PoolName, ContextId, QueryStr, Rows, Opts) ->
151 889 Opts1 = prepare_options(Opts),
152 889 Cast = {write, QueryStr, Rows, Opts1},
153 889 mongoose_cassandra_pool:cast_query(PoolName, ContextId, Cast).
154
155 %% --------------------------------------------------------
156 %% @doc Execute read query to Cassandra (select).
157 %% This functions behaves much like the lists:foldl/3 but the input are pages from result of given
158 %% query. Therefore each execution of given fun gets list of several result rows (by default 100 at
159 %% most).
160 %% @end
161 %% --------------------------------------------------------
162 -spec read(mongoose_cassandra:pool_name(), context_id(), query_str(),
163 mongoose_cassandra:parameters(), mongoose_cassandra:fold_fun(),
164 mongoose_cassandra:fold_accumulator(), options()) ->
165 {ok, mongoose_cassandra:fold_accumulator()} | {error, Reason :: term()}.
166 read(PoolName, ContextId, QueryStr, Params, Fun, AccIn, Opts) ->
167 839 Opts1 = prepare_options(Opts),
168 839 Call = {read, QueryStr, Params, Opts1},
169 839 do_read(PoolName, ContextId, Call, Fun, AccIn).
170
171 do_read(PoolName, ContextId, Call, Fun, AccIn) ->
172 840 case mongoose_cassandra_pool:call_query(PoolName, ContextId, Call) of
173 {finished, Result} ->
174 839 NextAcc = Fun(cqerl:all_rows(Result), AccIn),
175 839 {ok, NextAcc};
176 {partial, Req, Result} ->
177 1 NextAcc = Fun(cqerl:all_rows(Result), AccIn),
178 1 do_read(PoolName, ContextId, {continue, Req}, Fun, NextAcc);
179 Other ->
180
:-(
Other
181 end.
182
183 %%====================================================================
184 %% gen_server callbacks
185 %%====================================================================
186
187 %%--------------------------------------------------------------------
188 %% Function: init(Args) -> {ok, State} |
189 %% {ok, State, Timeout} |
190 %% ignore |
191 %% {stop, Reason}
192 %% Description: Initiates the server
193 %%--------------------------------------------------------------------
194 init([PoolName]) ->
195 1660 {ok, #state{pool_name = PoolName}}.
196
197 %%--------------------------------------------------------------------
198 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
199 %% {reply, Reply, State, Timeout} |
200 %% {noreply, State} |
201 %% {noreply, State, Timeout} |
202 %% {stop, Reason, Reply, State} |
203 %% {stop, Reason, State}
204 %% Description: Handling call messages
205 %%--------------------------------------------------------------------
206 handle_call({write, QueryStr, Rows, Opts}, From, State = #state{}) ->
207 419 Action = new_write_action(QueryStr, Rows),
208 419 Request = new_request(From, Opts, Action),
209 419 RequestId = Request#request.id,
210
211 419 NewRequest = schedule_timeout(maps:get(timeout, Opts), Request),
212 419 NewState = update_req(NewRequest, State),
213 419 {noreply, process_request(RequestId, NewState)};
214 handle_call({read, QueryStr, Params, Opts}, From, State = #state{}) ->
215 839 Action = new_read_action(QueryStr, Params),
216 839 Request = new_request(From, Opts, Action),
217 839 RequestId = Request#request.id,
218
219 839 NewRequest = schedule_timeout(maps:get(timeout, Opts), Request),
220 839 NewState = update_req(NewRequest, State),
221 839 {noreply, process_request(RequestId, NewState)};
222 handle_call({continue, Req}, From, State = #state{}) ->
223 1 RequestId = Req#request.id,
224 1 NewReq = Req#request{from = From},
225 1 NewState = update_req(NewReq, State),
226 1 {noreply, process_request(RequestId, NewState)};
227 handle_call(Msg, From, State) ->
228
:-(
?UNEXPECTED_CALL(Msg, From),
229
:-(
{noreply, State}.
230
231
232 %%--------------------------------------------------------------------
233 %% Function: handle_cast(Msg, State) -> {noreply, State} |
234 %% {noreply, State, Timeout} |
235 %% {stop, Reason, State}
236 %% Description: Handling cast messages
237 %%--------------------------------------------------------------------
238
239 handle_cast({write, QueryStr, Rows, Opts}, #state{} = State) ->
240 889 Action = new_write_action(QueryStr, Rows),
241 889 Request = new_request(undefined, Opts, Action),
242 889 RequestId = Request#request.id,
243
244 889 NewState = State#state{inflight = maps:put(RequestId, Request, State#state.inflight)},
245 889 {noreply, process_request(RequestId, NewState)};
246 handle_cast(Msg, State) ->
247
:-(
?UNEXPECTED_CAST(Msg),
248
:-(
{noreply, State}.
249
250 %%--------------------------------------------------------------------
251 %% Function: handle_info(Info, State) -> {noreply, State} |
252 %% {noreply, State, Timeout} |
253 %% {stop, Reason, State}
254 %% Description: Handling all non call/cast messages
255 %%--------------------------------------------------------------------
256
257 handle_info({cancel, ReqId, Reason}, #state{} = St) ->
258
:-(
{noreply, handle_cancel(ReqId, Reason, St)};
259
260 handle_info({result, _, _} = R, #state{} = St) ->
261 2171 {noreply, handle_result(R, St)};
262
263 handle_info({error, _, _} = Er, #state{} = St) ->
264 3 {noreply, handle_error(Er, St)};
265
266 handle_info({'DOWN', _MRef, _, _, _} = Down, #state{} = St) ->
267
:-(
{noreply, handle_down(Down, St)};
268
269 handle_info({retry, ReqId}, #state{} = St) ->
270 5 case maps:get(ReqId, St#state.inflight, undefined) of
271 undefined ->
272
:-(
?LOG_WARNING(#{what => cassandra_unexpected_retry_request,
273 pool => St#state.pool_name,
274
:-(
state => St, request_id => ReqId}),
275
:-(
{noreply, St};
276 #request{retry_left = TryCount} = Req ->
277 5 NextRequest = Req#request{retry_left = max(TryCount - 1, 0)},
278 5 NextState = update_req(NextRequest, St),
279 5 {noreply, process_request(ReqId, NextState)}
280 end;
281
282 handle_info(Msg, State) ->
283
:-(
?UNEXPECTED_INFO(Msg),
284
:-(
{noreply, State}.
285
286 %%--------------------------------------------------------------------
287 %% Function: terminate(Reason, State) -> void()
288 %% Description: This function is called by a gen_server when it is about to
289 %% terminate. It should be the opposite of Module:init/1 and do any necessary
290 %% cleaning up. When it returns, the gen_server terminates with Reason.
291 %% The return value is ignored.
292 %%--------------------------------------------------------------------
293 terminate(_Reason, _State) ->
294
:-(
ok.
295
296 %%--------------------------------------------------------------------
297 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
298 %% Description: Convert process state when code is changed
299 %%--------------------------------------------------------------------
300 code_change(_OldVsn, State, _Extra) ->
301
:-(
{ok, State}.
302
303 %% ====================================================================
304 %% Implementation
305 %% ====================================================================
306
307 %% Get DBMS client and sends next Cassandra query for given request.
308 -spec process_request(request_id(), worker_state()) -> worker_state().
309 process_request(ReqId, #state{pool_name = PoolName} = State) ->
310 2176 Req = maps:get(ReqId, State#state.inflight),
311 2176 #request{action = Action, opts = Opts} = Req,
312
313 2176 try
314 2176 Client = {ClientPid, _} = get_client(PoolName),
315 2174 MRef = erlang:monitor(process, ClientPid),
316 2174 {Tag, NextAction} = send_next_request(Client, Action, Opts),
317
318 2174 NextReq = Req#request{
319 action = NextAction,
320 tag = Tag,
321 cql_mref = MRef
322 },
323 2174 State#state{
324 inflight = maps:put(ReqId, NextReq, State#state.inflight),
325 cql_tags = maps:put(Tag, ReqId, State#state.cql_tags),
326 monitors = maps:put(MRef, ReqId, State#state.monitors)
327 }
328 catch
329 _:Reason ->
330 %% Logging will be handled in do_handle_error/4
331 2 do_handle_error({down, cqerl_client}, Reason, Req, State)
332 end.
333
334 %% Sends cqerl query for given action().
335 -spec send_next_request(cqerl:client(), action(), options()) -> {cql_tag(), action()}.
336 send_next_request(Client, #write_action{} = Action, Opts) ->
337 1334 #write_action{
338 current_rows = FailedRows,
339 pending_rows = PendingRows,
340 query = Query
341 } = Action,
342 1334 Rows = FailedRows ++ PendingRows,
343 1334 BatchSize = maps:get(batch_size, Opts),
344
345 1334 {NewRows, Tail} = lists:split(min(BatchSize, length(Rows)), Rows),
346 1334 QueryBatch = #cql_query_batch{
347 mode = maps:get(batch_mode, Opts),
348 consistency = maps:get(consistency, Opts),
349 2746 queries = [Query#cql_query{values = NewRow} || NewRow <- NewRows]
350 },
351 1334 Tag = cqerl:send_query(Client, QueryBatch),
352
353 1334 NextAction = Action#write_action{current_rows = NewRows, pending_rows = Tail},
354 1334 {Tag, NextAction};
355 send_next_request(Client, #read_action{} = Action, _Opts) ->
356 840 #read_action{query = Query} = Action,
357
358 840 Tag =
359 case Action#read_action.last_result of
360 undefined ->
361 839 cqerl:send_query(Client, Query);
362 Result ->
363 1 cqerl:fetch_more_async(Result)
364 end,
365
366 840 {Tag, Action}.
367
368 %% Handles query cancel requests (normally scheduled timeout)
369 -spec handle_cancel(request_id(), Reason :: term(), worker_state()) -> worker_state().
370 handle_cancel(ReqId, Reason, State) ->
371
:-(
case maps:get(ReqId, State#state.inflight, undefined) of
372 undefined ->
373
:-(
State;
374 #request{tag = Tag} = Req ->
375
:-(
?LOG_WARNING(#{what => cassandra_aborting_request, reason => Reason,
376 pool => State#state.pool_name,
377 tag => Tag, error_text => error_text(cancel, Reason, Req),
378
:-(
query_type => query_type(Req)}),
379
:-(
cleanup_request(ReqId, State)
380 end.
381
382 -spec handle_result({result, cql_tag(), cql_result()}, worker_state()) -> worker_state().
383 handle_result({result, Tag, Result} = R, #state{} = State) ->
384 2171 case maps:get(Tag, State#state.cql_tags, undefined) of
385 undefined ->
386
:-(
?LOG_WARNING(#{what => cassandra_unexpected_result,
387 pool => State#state.pool_name,
388
:-(
result => R}),
389
:-(
State;
390 ReqId ->
391 2171 Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight),
392 2171 erlang:demonitor(MRef, [flush]),
393 2171 do_handle_result(Result, Req, State)
394 end.
395
396 %% Handles all 'result' responses from DBMS. Depending on the state of request,
397 %% initiates next query or returs response to the caller.
398 -spec do_handle_result(cql_result(), request(), worker_state()) -> worker_state().
399 do_handle_result(Result, Req, State) ->
400 2171 #request{action = Action, id = ReqId} = Req,
401
402 2171 IsFinished = is_finished(Req, Result),
403 2171 case {IsFinished, Action} of
404 %% Continue with unfinished queries
405 {false, #write_action{} = A} ->
406 23 NextAction = A#write_action{current_rows = []}, %% Remove just saved rows
407 23 NextReq = Req#request{action = NextAction},
408 23 NextState = update_req(NextReq, State),
409
410 23 process_request(ReqId, NextState);
411 {false, #read_action{} = A} ->
412
413 1 NextAction = A#read_action{last_result = Result},
414 1 NextReq = Req#request{action = NextAction},
415 1 NextState = update_req(NextReq, State),
416 1 maybe_reply(Req, {partial, NextReq, Result}),
417 1 NextState;
418
419 %% Reply and cleanup for finished queries
420 {true, #write_action{}} ->
421 1308 maybe_reply(Req, ok),
422 1308 cleanup_request(ReqId, State);
423 {true, #read_action{}} ->
424 839 maybe_reply(Req, {finished, Result}),
425 839 cleanup_request(ReqId, State)
426
427 end.
428
429 -spec handle_error({error, cql_tag(), Reason :: term()}, worker_state()) -> worker_state().
430 handle_error({error, Tag, Reason} = Error, #state{} = State) ->
431 3 case maps:get(Tag, State#state.cql_tags, undefined) of
432 undefined ->
433
:-(
?LOG_WARNING(#{what => cassandra_unexpected_error, error => Error,
434 pool => State#state.pool_name,
435
:-(
tag => Tag, reason => Reason}),
436
:-(
State;
437 ReqId ->
438 3 Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight),
439 3 erlang:demonitor(MRef, [flush]),
440 3 do_handle_error(cqerl_error, Reason, Req, State)
441 end.
442
443 -spec handle_down({'DOWN', reference(), process, pid(), Info :: term()},
444 worker_state()) -> worker_state().
445 handle_down({'DOWN', MRef, _, _Pid, Info} = _Down, State) ->
446
:-(
case maps:get(MRef, State#state.monitors, undefined) of
447 undefined ->
448
:-(
State;
449 ReqId ->
450
:-(
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight),
451
:-(
do_handle_error({down, cqerl_client}, Info, Req, State)
452 end.
453
454 -spec do_handle_error(error_type(), error_reason(), request(), worker_state()) -> worker_state().
455 do_handle_error(Type, Reason, Req, State) ->
456 5 #request{tag = Tag, id = ReqId, opts = Opts, retry_left = RetryLeft} = Req,
457
458 5 case retry_info(Type, Reason, Req, State) of
459 {abort, AbortReason, NextState} ->
460
:-(
?LOG_WARNING(#{what => cassandra_aborting_query, tag => Tag,
461 pool => State#state.pool_name,
462 query_type => query_type(Req), reason => AbortReason,
463
:-(
error_text => error_text(Type, Reason, Req)}),
464
:-(
maybe_reply(Req, {error, Reason}),
465
:-(
cleanup_request(Req, NextState);
466 {retry, WaitFor, NextState} ->
467 5 ?LOG_WARNING(#{what => cassandra_retrying_query, tag => Tag,
468 pool => State#state.pool_name,
469 query_type => query_type(Req), request_opts => Opts,
470
:-(
error_text => error_text(Type, Reason, Req), retry_left => RetryLeft}),
471 5 schedule_retry(ReqId, WaitFor, NextState)
472 end.
473
474 %% Sends given reply is caller is known.
475 -spec maybe_reply(request(), Result :: ok | {ok, term()} |
476 {error, term()} | {finished, term()} | {partial, request(), term()}) -> any().
477 maybe_reply(#request{from = undefined}, _Result) ->
478 889 ok;
479 maybe_reply(#request{from = From}, Result) ->
480 1259 gen_server:reply(From, Result).
481
482 %% Removes all data and monitors associated with the request.
483 -spec cleanup_request(request() | request_id(), worker_state()) -> worker_state().
484 cleanup_request(#request{id = ReqId}, State) ->
485
:-(
cleanup_request(ReqId, State);
486 cleanup_request(ReqId, State) ->
487 2147 #state{inflight = Inflight, monitors = Monitors, cql_tags = Tags} = State,
488 2147 #request{tag = Tag, cql_mref = CqlMRef, timeout_tref = TRef} = maps:get(ReqId, Inflight),
489 2147 catch demonitor(CqlMRef), %% If may be unset at this point and we don't really care about it
490 2147 catch erlang:cancel_timer(TRef),
491
492 2147 NextInflight = maps:remove(ReqId, Inflight),
493 2147 NextMonitors = maps:remove(CqlMRef, Monitors),
494 2147 NextTags = maps:remove(Tag, Tags),
495
496 2147 State#state{inflight = NextInflight, monitors = NextMonitors, cql_tags = NextTags}.
497
498 %% Checks whether request requires more queries to complete.
499 -spec is_finished(request(), cql_result()) -> boolean().
500 is_finished(_Req = #request{action = #write_action{pending_rows = []}}, _Result) ->
501 1308 true;
502 is_finished(_Req = #request{action = #write_action{}}, _Result) ->
503 23 false;
504 is_finished(_Req = #request{action = #read_action{}}, Result) ->
505 840 not cqerl:has_more_pages(Result).
506
507 %% For given query error, returns whether request should be retried and if so - how soon.
508 -spec retry_info(error_type(), error_reason(), request(), worker_state()) ->
509 {abort, AbortReason :: term(), worker_state()} |
510 {retry, Timeout :: non_neg_integer(), worker_state()}.
511 retry_info(_, _, #request{retry_left = 0} = _Req, State) ->
512
:-(
{abort, retry_limit_exceeded, State};
513 retry_info({down, cqerl_client}, _Reason, _Req, State) ->
514 2 {retry, 5 + rand:uniform(20), State};
515 retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) ->
516
:-(
{retry, 10 + rand:uniform(50), State};
517 retry_info(cqerl_error, {16#1200 = _ReadTimout, _, _}, _Req, State) ->
518
:-(
{retry, 10 + rand:uniform(50), State};
519 retry_info(cqerl_error, {16#2200 = _WriteToBig, _, _}, Req, State) ->
520 3 #request{id = ReqId, opts = Opts, retry_left = RetryLeft} = Req,
521 3 BatchSize = maps:get(batch_size, Opts),
522 3 NewBatchSize =
523 case RetryLeft of
524 1 -> %% This is the last try, let's use batch size of 1
525 1 1;
526 2 _ -> max(1, round(BatchSize * ?BATCH_SHRINK_RATIO))
527 end,
528
529 3 NextOpts = maps:put(batch_size, NewBatchSize, Opts),
530 3 NextReq = Req#request{opts = NextOpts},
531 3 NextState = State#state{inflight = maps:put(ReqId, NextReq, State#state.inflight)},
532
533 3 {retry, 0, NextState};
534 retry_info(cqerl_error, {connection_closed, _}, _Req, State) ->
535
:-(
{retry, 5 + rand:uniform(20), State};
536 retry_info(_Type, _Reason, _Req, State) ->
537
:-(
{abort, unknown_error, State}.
538
539 %% Returns log message for given query error.
540 error_text(cancel = Category, Reason, _Request) ->
541
:-(
io_lib:format("status=error, category=~s, details=~p",
542 [Category, Reason]);
543 error_text({down, ProcessType}, Reason, Request) ->
544 2 MRef = Request#request.cql_mref,
545 2 io_lib:format("status=error, category=~s, details=~p, process_type=~p, mref=~p",
546 [process_down, Reason, ProcessType, MRef]);
547 error_text(cqerl_error, Reason, _Request) ->
548 3 cql_error_text(Reason).
549
550 %% Returns log message for given cqerl specific query error.
551 cql_error_text({ErrorCode, ErrorText, _}) when is_binary(ErrorText) ->
552 3 CodeText = response_code_to_binary(ErrorCode),
553 3 HexErrorCode = "0x" ++ integer_to_list(ErrorCode, 16),
554 3 io_lib:format("status=error, category=~s, details=~s, code=~p",
555 [CodeText, ErrorText, HexErrorCode]);
556 cql_error_text({ErrorCode, _, ErrorData}) ->
557
:-(
cql_error_text({ErrorCode, <<"undefined">>, ErrorData});
558 cql_error_text({Category, Reason}) ->
559
:-(
io_lib:format("status=error, category=~s, details=~p",
560 [Category, Reason]);
561 cql_error_text(UnknownReason) ->
562
:-(
io_lib:format("status=error, category=~s, details=~p",
563 [unknown, UnknownReason]).
564
565
566 %% ====================================================================
567 %% Helpers
568 %% ====================================================================
569
570 -spec get_client(mongoose_cassandra:pool_name()) ->
571 cqerl:client() | no_return().
572 get_client(PoolName) ->
573 2176 get_client_loop(PoolName, 0).
574
575 -spec get_client_loop(mongoose_cassandra:pool_name(), non_neg_integer()) ->
576 cqerl:client() | no_return().
577 get_client_loop(PoolName, RetryNo) when RetryNo >= 500 ->
578
:-(
?LOG_WARNING(#{what => cassandra_get_client_failed,
579
:-(
pool => PoolName}),
580
:-(
error(cannot_get_cqerl_client, [PoolName, RetryNo]);
581 get_client_loop(PoolName, RetryNo) ->
582 2176 try cqerl:get_client(PoolName) of
583 {ok, {Pid, _Ref} = Client} ->
584 2174 case is_process_alive(Pid) of
585 2174 true -> Client;
586
:-(
_ -> throw({dead, Pid})
587 end
588 catch
589 Class:Reason:Stacktrace ->
590
:-(
?LOG_INFO(#{what => cassandra_get_client_retry,
591 class => Class, reason => Reason, stacktrace => Stacktrace,
592
:-(
pool => PoolName, retry_number => RetryNo}),
593
594
:-(
Wait = rand:uniform(10),
595
:-(
timer:sleep(Wait),
596
:-(
get_client_loop(PoolName, RetryNo + 1)
597 end.
598
599 -spec schedule_retry(request_id(), non_neg_integer(), worker_state()) -> worker_state().
600 schedule_retry(ReqId, Time, State) ->
601 5 erlang:send_after(Time, self(), {retry, ReqId}),
602 5 State.
603
604 -spec prepare_options(options()) -> options().
605 prepare_options(Opts) ->
606 2147 maps:merge(?DEFAULT_OPTS, Opts).
607
608 -spec new_request(From :: caller(), options(), action()) -> request().
609 new_request(From, Opts, Action) ->
610 2147 RequestId = erlang:make_ref(),
611
612 2147 #request{
613 id = RequestId,
614 action = Action,
615 opts = Opts,
616 from = From,
617 retry_left = maps:get(retry, Opts)
618 }.
619
620 -spec new_write_action(query_str(), mongoose_cassandra:rows()) -> write_action().
621 new_write_action(QueryStr, Rows) ->
622 1308 #write_action{
623 query = #cql_query{statement = QueryStr},
624 pending_rows = Rows,
625 current_rows = []
626 }.
627
628 -spec new_read_action(query_str(), mongoose_cassandra:parameters()) -> read_action().
629 new_read_action(QueryStr, Params) ->
630 839 #read_action{
631 query = #cql_query{statement = QueryStr, values = Params}
632 }.
633
634 schedule_timeout(Timeout, Req = #request{id = ReqId}) ->
635 1258 TRef = erlang:send_after(Timeout, self(), {cancel, ReqId, timeout}),
636 1258 Req#request{timeout_tref = TRef}.
637
638 update_req(Req = #request{id = ReqId}, State = #state{inflight = Inflight}) ->
639 1288 State#state{inflight = maps:put(ReqId, Req, Inflight)}.
640
641 query_type(#request{action = #write_action{}}) ->
642 3 write;
643 query_type(#request{action = #read_action{}}) ->
644 2 read.
645
646 %% Source: https://stackoverflow.com/questions/48304330/list-of-cassandra-error-codes
647 response_code_to_binary(Code) when is_integer(Code) ->
648 3 case Code of
649
:-(
16#0000 -> <<"serverError">>;
650
:-(
16#000A -> <<"protocolError">>;
651
:-(
16#0100 -> <<"badCredentials">>;
652
:-(
16#1000 -> <<"unavailableException">>;
653
:-(
16#1001 -> <<"overloaded">>;
654
:-(
16#1002 -> <<"isBootstrapping">>;
655
:-(
16#1003 -> <<"truncateError">>;
656
:-(
16#1100 -> <<"writeTimeout">>;
657
:-(
16#1200 -> <<"readTimeout">>;
658
:-(
16#1300 -> <<"readFailure">>;
659
:-(
16#1400 -> <<"functionFailure">>;
660
:-(
16#1500 -> <<"writeFailure">>;
661
:-(
16#2000 -> <<"syntaxError">>;
662
:-(
16#2100 -> <<"unauthorized">>;
663 3 16#2200 -> <<"invalid">>;
664
:-(
16#2300 -> <<"configError">>;
665
:-(
16#2400 -> <<"alreadyExists">>;
666
:-(
16#2500 -> <<"unprepared">>;
667
:-(
_ -> <<"unknownCode=0x", (integer_to_binary(Code, 16))/bytes>>
668 end;
669 response_code_to_binary(_) ->
670
:-(
<<"invalidErrorCode">>.
Line Hits Source