./ct_report/coverage/mongoose_cassandra_worker.COVER.html

1 %%==============================================================================
2 %% Copyright 2018 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mongoose_cassandra_worker).
17 -author('rafal.slota@erlang-solutions.com').
18
19 -include("mongoose_logger.hrl").
20 -include_lib("cqerl/include/cqerl.hrl").
21
22 %% ====================================================================
23 %% Exports
24 %% ====================================================================
25
26 %% gen_server callbacks
27 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
28 terminate/2, code_change/3]).
29
30 %% gen_server API
31 -export([write/5, write_async/5, read/7]).
32
33 -behaviour(gen_server).
34
35 %% ====================================================================
36 %% Internal records and definitions
37 %% ====================================================================
38
39 -record(state, {
40 pool_name :: mongoose_cassandra:pool_name(),
41 inflight = #{} :: #{request_id() => request()},
42 cql_tags = #{} :: #{cql_tag() => request_id()},
43 monitors = #{} :: #{cql_mref() => request_id()}
44 }).
45
46 -record(write_action, {
47 query :: cql_query(),
48 %% Rows left to send
49 pending_rows :: mongoose_cassandra:rows(),
50 %% Rows that were send to Cassandra and are awaiting response (write confirmation)
51 current_rows :: mongoose_cassandra:rows()
52 }).
53
54 -record(read_action, {
55 query :: cql_query(),
56 last_result :: cql_result() | undefined %% Last result handle from cqerl
57 }).
58
59 -record(request, {
60 id :: request_id(),
61 action :: action(),
62 opts = #{} :: options(),
63 cql_mref :: cql_mref() | undefined,
64 tag :: cql_tag() | undefined,
65 from :: caller(),
66 retry_left :: non_neg_integer(),
67 timeout_tref :: reference() | undefined
68 }).
69
70 -define(DEFAULT_OPTS, #{
71 batch_size => 20,
72 retry => 3,
73 timeout => timer:minutes(1),
74 %% cqerl options
75 consistency => one,
76 batch_mode => unlogged
77 }).
78
79
80 %% By how much (%) should we resize batch size when Cassandra returns 'batch to big' error
81 -define(BATCH_SHRINK_RATIO, 0.5).
82
83 %% ====================================================================
84 %% Types
85 %% ====================================================================
86
87 %% API types
88
89 -type options() :: #{
90 timeout => non_neg_integer(), %% milliseconds
91 retry => non_neg_integer(), %% how many retry attempts should be made
92 batch_size => non_neg_integer(), %% Due to Cassandra query size limits, this value may be
93 %% adjusted on error (last query retry will use batch size
94 consistency => consistency_level() | consistency_level_int(),
95 batch_mode => batch_mode() | batch_mode_int()
96 }.
97
98 -export_type([options/0]).
99
100 %% Internal types
101
102 -type context_id() :: term() | undefined. %% This value is used as a hash base in worker selection
103 -type query_str() :: binary().
104 -type caller() :: {pid(), term()} | undefined.
105
106 -type request_id() :: reference().
107 -type request() :: #request{}.
108 -type write_action() :: #write_action{}.
109 -type read_action() :: #read_action{}.
110 -type action() :: read_action() | write_action().
111
112 -type cql_result() :: void | #cql_result{}.
113 -type cql_query() :: #cql_query{}.
114 -type cql_mref() :: reference().
115 -type cql_tag() :: term(). %% Normally it's a reference(), but let's not depend on that
116
117 -type worker_state() :: #state{}.
118 -type process_type() :: cqerl_client.
119 -type error_type() :: {down, process_type()} | cancel | cqerl_error.
120 -type error_reason() :: {Code :: non_neg_integer(), Details :: binary(), any()} | timeout | term().
121
122
123 %%====================================================================
124 %% API functions
125 %%====================================================================
126
127
128 %% --------------------------------------------------------
129 %% @doc Execute batch write query to Cassandra (insert, update or delete).
130 %% Note that Cassandra doesn't like big batches, therefore this function will try to
131 %% split given rows into batches of 20 rows and will fall back to smaller batches if
132 %% Cassandra rejects the query due to its size being to big.
133 %% @end
134 %% --------------------------------------------------------
135 -spec write(mongoose_cassandra:pool_name(), context_id(), query_str(),
136 mongoose_cassandra:rows(), options()) ->
137 ok | {error, Reason :: term()}.
138 write(PoolName, ContextId, QueryStr, Rows, Opts) ->
139
:-(
Opts1 = prepare_options(Opts),
140
:-(
Call = {write, QueryStr, Rows, Opts1},
141
:-(
mongoose_cassandra_pool:call_query(PoolName, ContextId, Call).
142
143 %% --------------------------------------------------------
144 %% @doc Same as @see write/5 but asynchronous (without response).
145 %% @end
146 %% --------------------------------------------------------
147 -spec write_async(mongoose_cassandra:pool_name(), context_id(), query_str(),
148 mongoose_cassandra:rows(), options()) ->
149 ok.
150 write_async(PoolName, ContextId, QueryStr, Rows, Opts) ->
151
:-(
Opts1 = prepare_options(Opts),
152
:-(
Cast = {write, QueryStr, Rows, Opts1},
153
:-(
mongoose_cassandra_pool:cast_query(PoolName, ContextId, Cast).
154
155 %% --------------------------------------------------------
156 %% @doc Execute read query to Cassandra (select).
157 %% This functions behaves much like the lists:foldl/3 but the input are pages from result of given
158 %% query. Therefore each execution of given fun gets list of several result rows (by default 100 at
159 %% most).
160 %% @end
161 %% --------------------------------------------------------
162 -spec read(mongoose_cassandra:pool_name(), context_id(), query_str(),
163 mongoose_cassandra:parameters(), mongoose_cassandra:fold_fun(),
164 mongoose_cassandra:fold_accumulator(), options()) ->
165 {ok, mongoose_cassandra:fold_accumulator()} | {error, Reason :: term()}.
166 read(PoolName, ContextId, QueryStr, Params, Fun, AccIn, Opts) ->
167
:-(
Opts1 = prepare_options(Opts),
168
:-(
Call = {read, QueryStr, Params, Opts1},
169
:-(
do_read(PoolName, ContextId, Call, Fun, AccIn).
170
171 do_read(PoolName, ContextId, Call, Fun, AccIn) ->
172
:-(
case mongoose_cassandra_pool:call_query(PoolName, ContextId, Call) of
173 {finished, Result} ->
174
:-(
NextAcc = Fun(cqerl:all_rows(Result), AccIn),
175
:-(
{ok, NextAcc};
176 {partial, Req, Result} ->
177
:-(
NextAcc = Fun(cqerl:all_rows(Result), AccIn),
178
:-(
do_read(PoolName, ContextId, {continue, Req}, Fun, NextAcc);
179 Other ->
180
:-(
Other
181 end.
182
183 %%====================================================================
184 %% gen_server callbacks
185 %%====================================================================
186
187 %%--------------------------------------------------------------------
188 %% Function: init(Args) -> {ok, State} |
189 %% {ok, State, Timeout} |
190 %% ignore |
191 %% {stop, Reason}
192 %% Description: Initiates the server
193 %%--------------------------------------------------------------------
194 init([PoolName]) ->
195
:-(
{ok, #state{pool_name = PoolName}}.
196
197 %%--------------------------------------------------------------------
198 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
199 %% {reply, Reply, State, Timeout} |
200 %% {noreply, State} |
201 %% {noreply, State, Timeout} |
202 %% {stop, Reason, Reply, State} |
203 %% {stop, Reason, State}
204 %% Description: Handling call messages
205 %%--------------------------------------------------------------------
206 handle_call({write, QueryStr, Rows, Opts}, From, State = #state{}) ->
207
:-(
Action = new_write_action(QueryStr, Rows),
208
:-(
Request = new_request(From, Opts, Action),
209
:-(
RequestId = Request#request.id,
210
211
:-(
NewRequest = schedule_timeout(maps:get(timeout, Opts), Request),
212
:-(
NewState = update_req(NewRequest, State),
213
:-(
{noreply, process_request(RequestId, NewState)};
214 handle_call({read, QueryStr, Params, Opts}, From, State = #state{}) ->
215
:-(
Action = new_read_action(QueryStr, Params),
216
:-(
Request = new_request(From, Opts, Action),
217
:-(
RequestId = Request#request.id,
218
219
:-(
NewRequest = schedule_timeout(maps:get(timeout, Opts), Request),
220
:-(
NewState = update_req(NewRequest, State),
221
:-(
{noreply, process_request(RequestId, NewState)};
222 handle_call({continue, Req}, From, State = #state{}) ->
223
:-(
RequestId = Req#request.id,
224
:-(
NewReq = Req#request{from = From},
225
:-(
NewState = update_req(NewReq, State),
226
:-(
{noreply, process_request(RequestId, NewState)};
227 handle_call(Msg, From, State) ->
228
:-(
?UNEXPECTED_CALL(Msg, From),
229
:-(
{noreply, State}.
230
231
232 %%--------------------------------------------------------------------
233 %% Function: handle_cast(Msg, State) -> {noreply, State} |
234 %% {noreply, State, Timeout} |
235 %% {stop, Reason, State}
236 %% Description: Handling cast messages
237 %%--------------------------------------------------------------------
238
239 handle_cast({write, QueryStr, Rows, Opts}, #state{} = State) ->
240
:-(
Action = new_write_action(QueryStr, Rows),
241
:-(
Request = new_request(undefined, Opts, Action),
242
:-(
RequestId = Request#request.id,
243
244
:-(
NewState = State#state{inflight = maps:put(RequestId, Request, State#state.inflight)},
245
:-(
{noreply, process_request(RequestId, NewState)};
246 handle_cast(Msg, State) ->
247
:-(
?UNEXPECTED_CAST(Msg),
248
:-(
{noreply, State}.
249
250 %%--------------------------------------------------------------------
251 %% Function: handle_info(Info, State) -> {noreply, State} |
252 %% {noreply, State, Timeout} |
253 %% {stop, Reason, State}
254 %% Description: Handling all non call/cast messages
255 %%--------------------------------------------------------------------
256
257 handle_info({cancel, ReqId, Reason}, #state{} = St) ->
258
:-(
{noreply, handle_cancel(ReqId, Reason, St)};
259
260 handle_info({result, _, _} = R, #state{} = St) ->
261
:-(
{noreply, handle_result(R, St)};
262
263 handle_info({error, _, _} = Er, #state{} = St) ->
264
:-(
{noreply, handle_error(Er, St)};
265
266 handle_info({'DOWN', _MRef, _, _, _} = Down, #state{} = St) ->
267
:-(
{noreply, handle_down(Down, St)};
268
269 handle_info({retry, ReqId}, #state{} = St) ->
270
:-(
case maps:get(ReqId, St#state.inflight, undefined) of
271 undefined ->
272
:-(
?LOG_WARNING(#{what => cassandra_unexpected_retry_request,
273 pool => St#state.pool_name,
274
:-(
state => St, request_id => ReqId}),
275
:-(
{noreply, St};
276 #request{retry_left = TryCount} = Req ->
277
:-(
NextRequest = Req#request{retry_left = max(TryCount - 1, 0)},
278
:-(
NextState = update_req(NextRequest, St),
279
:-(
{noreply, process_request(ReqId, NextState)}
280 end;
281
282 handle_info(Msg, State) ->
283
:-(
?UNEXPECTED_INFO(Msg),
284
:-(
{noreply, State}.
285
286 %%--------------------------------------------------------------------
287 %% Function: terminate(Reason, State) -> void()
288 %% Description: This function is called by a gen_server when it is about to
289 %% terminate. It should be the opposite of Module:init/1 and do any necessary
290 %% cleaning up. When it returns, the gen_server terminates with Reason.
291 %% The return value is ignored.
292 %%--------------------------------------------------------------------
293 terminate(_Reason, _State) ->
294
:-(
ok.
295
296 %%--------------------------------------------------------------------
297 %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
298 %% Description: Convert process state when code is changed
299 %%--------------------------------------------------------------------
300 code_change(_OldVsn, State, _Extra) ->
301
:-(
{ok, State}.
302
303 %% ====================================================================
304 %% Implementation
305 %% ====================================================================
306
307 %% Get DBMS client and sends next Cassandra query for given request.
308 -spec process_request(request_id(), worker_state()) -> worker_state().
309 process_request(ReqId, #state{pool_name = PoolName} = State) ->
310
:-(
Req = maps:get(ReqId, State#state.inflight),
311
:-(
#request{action = Action, opts = Opts} = Req,
312
313
:-(
try
314
:-(
Client = get_client(PoolName),
315
:-(
ClientPid = cqerl:client_to_pid(Client),
316
:-(
MRef = erlang:monitor(process, ClientPid),
317
:-(
{Tag, NextAction} = send_next_request(Client, Action, Opts),
318
319
:-(
NextReq = Req#request{
320 action = NextAction,
321 tag = Tag,
322 cql_mref = MRef
323 },
324
:-(
State#state{
325 inflight = maps:put(ReqId, NextReq, State#state.inflight),
326 cql_tags = maps:put(Tag, ReqId, State#state.cql_tags),
327 monitors = maps:put(MRef, ReqId, State#state.monitors)
328 }
329 catch
330 _:Reason ->
331 %% Logging will be handled in do_handle_error/4
332
:-(
do_handle_error({down, cqerl_client}, Reason, Req, State)
333 end.
334
335 %% Sends cqerl query for given action().
336 -spec send_next_request(cqerl:client(), action(), options()) -> {cql_tag(), action()}.
337 send_next_request(Client, #write_action{} = Action, Opts) ->
338
:-(
#write_action{
339 current_rows = FailedRows,
340 pending_rows = PendingRows,
341 query = Query
342 } = Action,
343
:-(
Rows = FailedRows ++ PendingRows,
344
:-(
BatchSize = maps:get(batch_size, Opts),
345
346
:-(
{NewRows, Tail} = lists:split(min(BatchSize, length(Rows)), Rows),
347
:-(
QueryBatch = #cql_query_batch{
348 mode = maps:get(batch_mode, Opts),
349 consistency = maps:get(consistency, Opts),
350
:-(
queries = [Query#cql_query{values = NewRow} || NewRow <- NewRows]
351 },
352
:-(
Tag = cqerl:send_query(Client, QueryBatch),
353
354
:-(
NextAction = Action#write_action{current_rows = NewRows, pending_rows = Tail},
355
:-(
{Tag, NextAction};
356 send_next_request(Client, #read_action{} = Action, _Opts) ->
357
:-(
#read_action{query = Query} = Action,
358
359
:-(
Tag =
360 case Action#read_action.last_result of
361 undefined ->
362
:-(
cqerl:send_query(Client, Query);
363 Result ->
364
:-(
cqerl:fetch_more_async(Result)
365 end,
366
367
:-(
{Tag, Action}.
368
369 %% Handles query cancel requests (normally scheduled timeout)
370 -spec handle_cancel(request_id(), Reason :: term(), worker_state()) -> worker_state().
371 handle_cancel(ReqId, Reason, State) ->
372
:-(
case maps:get(ReqId, State#state.inflight, undefined) of
373 undefined ->
374
:-(
State;
375 #request{tag = Tag} = Req ->
376
:-(
?LOG_WARNING(#{what => cassandra_aborting_request, reason => Reason,
377 pool => State#state.pool_name,
378 tag => Tag, error_text => error_text(cancel, Reason, Req),
379
:-(
query_type => query_type(Req)}),
380
:-(
cleanup_request(ReqId, State)
381 end.
382
383 -spec handle_result({result, cql_tag(), cql_result()}, worker_state()) -> worker_state().
384 handle_result({result, Tag, Result} = R, #state{} = State) ->
385
:-(
case maps:get(Tag, State#state.cql_tags, undefined) of
386 undefined ->
387
:-(
?LOG_WARNING(#{what => cassandra_unexpected_result,
388 pool => State#state.pool_name,
389
:-(
result => R}),
390
:-(
State;
391 ReqId ->
392
:-(
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight),
393
:-(
erlang:demonitor(MRef, [flush]),
394
:-(
do_handle_result(Result, Req, State)
395 end.
396
397 %% Handles all 'result' responses from DBMS. Depending on the state of request,
398 %% initiates next query or returs response to the caller.
399 -spec do_handle_result(cql_result(), request(), worker_state()) -> worker_state().
400 do_handle_result(Result, Req, State) ->
401
:-(
#request{action = Action, id = ReqId} = Req,
402
403
:-(
IsFinished = is_finished(Req, Result),
404
:-(
case {IsFinished, Action} of
405 %% Continue with unfinished queries
406 {false, #write_action{} = A} ->
407
:-(
NextAction = A#write_action{current_rows = []}, %% Remove just saved rows
408
:-(
NextReq = Req#request{action = NextAction},
409
:-(
NextState = update_req(NextReq, State),
410
411
:-(
process_request(ReqId, NextState);
412 {false, #read_action{} = A} ->
413
414
:-(
NextAction = A#read_action{last_result = Result},
415
:-(
NextReq = Req#request{action = NextAction},
416
:-(
NextState = update_req(NextReq, State),
417
:-(
maybe_reply(Req, {partial, NextReq, Result}),
418
:-(
NextState;
419
420 %% Reply and cleanup for finished queries
421 {true, #write_action{}} ->
422
:-(
maybe_reply(Req, ok),
423
:-(
cleanup_request(ReqId, State);
424 {true, #read_action{}} ->
425
:-(
maybe_reply(Req, {finished, Result}),
426
:-(
cleanup_request(ReqId, State)
427
428 end.
429
430 -spec handle_error({error, cql_tag(), Reason :: term()}, worker_state()) -> worker_state().
431 handle_error({error, Tag, Reason} = Error, #state{} = State) ->
432
:-(
case maps:get(Tag, State#state.cql_tags, undefined) of
433 undefined ->
434
:-(
?LOG_WARNING(#{what => cassandra_unexpected_error, error => Error,
435 pool => State#state.pool_name,
436
:-(
tag => Tag, reason => Reason}),
437
:-(
State;
438 ReqId ->
439
:-(
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight),
440
:-(
erlang:demonitor(MRef, [flush]),
441
:-(
do_handle_error(cqerl_error, Reason, Req, State)
442 end.
443
444 -spec handle_down({'DOWN', reference(), process, pid(), Info :: term()},
445 worker_state()) -> worker_state().
446 handle_down({'DOWN', MRef, _, _Pid, Info} = _Down, State) ->
447
:-(
case maps:get(MRef, State#state.monitors, undefined) of
448 undefined ->
449
:-(
State;
450 ReqId ->
451
:-(
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight),
452
:-(
do_handle_error({down, cqerl_client}, Info, Req, State)
453 end.
454
455 -spec do_handle_error(error_type(), error_reason(), request(), worker_state()) -> worker_state().
456 do_handle_error(Type, Reason, Req, State) ->
457
:-(
#request{tag = Tag, id = ReqId, opts = Opts, retry_left = RetryLeft} = Req,
458
459
:-(
case retry_info(Type, Reason, Req, State) of
460 {abort, AbortReason, NextState} ->
461
:-(
?LOG_WARNING(#{what => cassandra_aborting_query, tag => Tag,
462 pool => State#state.pool_name,
463 query_type => query_type(Req), reason => AbortReason,
464
:-(
error_text => error_text(Type, Reason, Req)}),
465
:-(
maybe_reply(Req, {error, Reason}),
466
:-(
cleanup_request(Req, NextState);
467 {retry, WaitFor, NextState} ->
468
:-(
?LOG_WARNING(#{what => cassandra_retrying_query, tag => Tag,
469 pool => State#state.pool_name,
470 query_type => query_type(Req), request_opts => Opts,
471
:-(
error_text => error_text(Type, Reason, Req), retry_left => RetryLeft}),
472
:-(
schedule_retry(ReqId, WaitFor, NextState)
473 end.
474
475 %% Sends given reply is caller is known.
476 -spec maybe_reply(request(), Result :: ok | {ok, term()} |
477 {error, term()} | {finished, term()} | {partial, request(), term()}) -> any().
478 maybe_reply(#request{from = undefined}, _Result) ->
479
:-(
ok;
480 maybe_reply(#request{from = From}, Result) ->
481
:-(
gen_server:reply(From, Result).
482
483 %% Removes all data and monitors associated with the request.
484 -spec cleanup_request(request() | request_id(), worker_state()) -> worker_state().
485 cleanup_request(#request{id = ReqId}, State) ->
486
:-(
cleanup_request(ReqId, State);
487 cleanup_request(ReqId, State) ->
488
:-(
#state{inflight = Inflight, monitors = Monitors, cql_tags = Tags} = State,
489
:-(
#request{tag = Tag, cql_mref = CqlMRef, timeout_tref = TRef} = maps:get(ReqId, Inflight),
490
:-(
catch demonitor(CqlMRef), %% If may be unset at this point and we don't really care about it
491
:-(
catch erlang:cancel_timer(TRef),
492
493
:-(
NextInflight = maps:remove(ReqId, Inflight),
494
:-(
NextMonitors = maps:remove(CqlMRef, Monitors),
495
:-(
NextTags = maps:remove(Tag, Tags),
496
497
:-(
State#state{inflight = NextInflight, monitors = NextMonitors, cql_tags = NextTags}.
498
499 %% Checks whether request requires more queries to complete.
500 -spec is_finished(request(), cql_result()) -> boolean().
501 is_finished(_Req = #request{action = #write_action{pending_rows = []}}, _Result) ->
502
:-(
true;
503 is_finished(_Req = #request{action = #write_action{}}, _Result) ->
504
:-(
false;
505 is_finished(_Req = #request{action = #read_action{}}, Result) ->
506
:-(
not cqerl:has_more_pages(Result).
507
508 %% For given query error, returns whether request should be retried and if so - how soon.
509 -spec retry_info(error_type(), error_reason(), request(), worker_state()) ->
510 {abort, AbortReason :: term(), worker_state()} |
511 {retry, Timeout :: non_neg_integer(), worker_state()}.
512 retry_info(_, _, #request{retry_left = 0} = _Req, State) ->
513
:-(
{abort, retry_limit_exceeded, State};
514 retry_info({down, cqerl_client}, _Reason, _Req, State) ->
515
:-(
{retry, 5 + rand:uniform(20), State};
516 retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) ->
517
:-(
{retry, 10 + rand:uniform(50), State};
518 retry_info(cqerl_error, {16#1200 = _ReadTimout, _, _}, _Req, State) ->
519
:-(
{retry, 10 + rand:uniform(50), State};
520 retry_info(cqerl_error, {16#2200 = _WriteToBig, _, _}, Req, State) ->
521
:-(
#request{id = ReqId, opts = Opts, retry_left = RetryLeft} = Req,
522
:-(
BatchSize = maps:get(batch_size, Opts),
523
:-(
NewBatchSize =
524 case RetryLeft of
525 1 -> %% This is the last try, let's use batch size of 1
526
:-(
1;
527
:-(
_ -> max(1, round(BatchSize * ?BATCH_SHRINK_RATIO))
528 end,
529
530
:-(
NextOpts = maps:put(batch_size, NewBatchSize, Opts),
531
:-(
NextReq = Req#request{opts = NextOpts},
532
:-(
NextState = State#state{inflight = maps:put(ReqId, NextReq, State#state.inflight)},
533
534
:-(
{retry, 0, NextState};
535 retry_info(cqerl_error, {connection_closed, _}, _Req, State) ->
536
:-(
{retry, 5 + rand:uniform(20), State};
537 retry_info(_Type, _Reason, _Req, State) ->
538
:-(
{abort, unknown_error, State}.
539
540 %% Returns log message for given query error.
541 error_text(cancel = Category, Reason, _Request) ->
542
:-(
io_lib:format("status=error, category=~s, details=~p",
543 [Category, Reason]);
544 error_text({down, ProcessType}, Reason, Request) ->
545
:-(
MRef = Request#request.cql_mref,
546
:-(
io_lib:format("status=error, category=~s, details=~p, process_type=~p, mref=~p",
547 [process_down, Reason, ProcessType, MRef]);
548 error_text(cqerl_error, Reason, _Request) ->
549
:-(
cql_error_text(Reason).
550
551 %% Returns log message for given cqerl specific query error.
552 cql_error_text({ErrorCode, ErrorText, _}) when is_binary(ErrorText) ->
553
:-(
CodeText = response_code_to_binary(ErrorCode),
554
:-(
HexErrorCode = "0x" ++ integer_to_list(ErrorCode, 16),
555
:-(
io_lib:format("status=error, category=~s, details=~s, code=~p",
556 [CodeText, ErrorText, HexErrorCode]);
557 cql_error_text({ErrorCode, _, ErrorData}) ->
558
:-(
cql_error_text({ErrorCode, <<"undefined">>, ErrorData});
559 cql_error_text({Category, Reason}) ->
560
:-(
io_lib:format("status=error, category=~s, details=~p",
561 [Category, Reason]);
562 cql_error_text(UnknownReason) ->
563
:-(
io_lib:format("status=error, category=~s, details=~p",
564 [unknown, UnknownReason]).
565
566
567 %% ====================================================================
568 %% Helpers
569 %% ====================================================================
570
571 -spec get_client(mongoose_cassandra:pool_name()) ->
572 cqerl:client() | no_return().
573 get_client(PoolName) ->
574
:-(
get_client_loop(PoolName, 0).
575
576 -spec get_client_loop(mongoose_cassandra:pool_name(), non_neg_integer()) ->
577 cqerl:client() | no_return().
578 get_client_loop(PoolName, RetryNo) when RetryNo >= 500 ->
579
:-(
?LOG_WARNING(#{what => cassandra_get_client_failed,
580
:-(
pool => PoolName}),
581
:-(
error(cannot_get_cqerl_client, [PoolName, RetryNo]);
582 get_client_loop(PoolName, RetryNo) ->
583
:-(
try cqerl:get_client(PoolName) of
584 {ok, Client} ->
585
:-(
Pid = cqerl:client_to_pid(Client),
586 %% TODO that would not be retried. Add docs why we need is_process_alive.
587
:-(
case is_process_alive(Pid) of
588
:-(
true -> Client;
589
:-(
_ -> throw({dead, Pid})
590 end
591 catch
592 Class:Reason:Stacktrace ->
593
:-(
?LOG_INFO(#{what => cassandra_get_client_retry,
594 class => Class, reason => Reason, stacktrace => Stacktrace,
595
:-(
pool => PoolName, retry_number => RetryNo}),
596
597
:-(
Wait = rand:uniform(10),
598
:-(
timer:sleep(Wait),
599
:-(
get_client_loop(PoolName, RetryNo + 1)
600 end.
601
602 -spec schedule_retry(request_id(), non_neg_integer(), worker_state()) -> worker_state().
603 schedule_retry(ReqId, Time, State) ->
604
:-(
erlang:send_after(Time, self(), {retry, ReqId}),
605
:-(
State.
606
607 -spec prepare_options(options()) -> options().
608 prepare_options(Opts) ->
609
:-(
maps:merge(?DEFAULT_OPTS, Opts).
610
611 -spec new_request(From :: caller(), options(), action()) -> request().
612 new_request(From, Opts, Action) ->
613
:-(
RequestId = erlang:make_ref(),
614
615
:-(
#request{
616 id = RequestId,
617 action = Action,
618 opts = Opts,
619 from = From,
620 retry_left = maps:get(retry, Opts)
621 }.
622
623 -spec new_write_action(query_str(), mongoose_cassandra:rows()) -> write_action().
624 new_write_action(QueryStr, Rows) ->
625
:-(
#write_action{
626 query = #cql_query{statement = QueryStr},
627 pending_rows = Rows,
628 current_rows = []
629 }.
630
631 -spec new_read_action(query_str(), mongoose_cassandra:parameters()) -> read_action().
632 new_read_action(QueryStr, Params) ->
633
:-(
#read_action{
634 query = #cql_query{statement = QueryStr, values = Params}
635 }.
636
637 schedule_timeout(Timeout, Req = #request{id = ReqId}) ->
638
:-(
TRef = erlang:send_after(Timeout, self(), {cancel, ReqId, timeout}),
639
:-(
Req#request{timeout_tref = TRef}.
640
641 update_req(Req = #request{id = ReqId}, State = #state{inflight = Inflight}) ->
642
:-(
State#state{inflight = maps:put(ReqId, Req, Inflight)}.
643
644 query_type(#request{action = #write_action{}}) ->
645
:-(
write;
646 query_type(#request{action = #read_action{}}) ->
647
:-(
read.
648
649 %% Source: https://stackoverflow.com/questions/48304330/list-of-cassandra-error-codes
650 response_code_to_binary(Code) when is_integer(Code) ->
651
:-(
case Code of
652
:-(
16#0000 -> <<"serverError">>;
653
:-(
16#000A -> <<"protocolError">>;
654
:-(
16#0100 -> <<"badCredentials">>;
655
:-(
16#1000 -> <<"unavailableException">>;
656
:-(
16#1001 -> <<"overloaded">>;
657
:-(
16#1002 -> <<"isBootstrapping">>;
658
:-(
16#1003 -> <<"truncateError">>;
659
:-(
16#1100 -> <<"writeTimeout">>;
660
:-(
16#1200 -> <<"readTimeout">>;
661
:-(
16#1300 -> <<"readFailure">>;
662
:-(
16#1400 -> <<"functionFailure">>;
663
:-(
16#1500 -> <<"writeFailure">>;
664
:-(
16#2000 -> <<"syntaxError">>;
665
:-(
16#2100 -> <<"unauthorized">>;
666
:-(
16#2200 -> <<"invalid">>;
667
:-(
16#2300 -> <<"configError">>;
668
:-(
16#2400 -> <<"alreadyExists">>;
669
:-(
16#2500 -> <<"unprepared">>;
670
:-(
_ -> <<"unknownCode=0x", (integer_to_binary(Code, 16))/bytes>>
671 end;
672 response_code_to_binary(_) ->
673
:-(
<<"invalidErrorCode">>.
Line Hits Source