1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mongoose_cassandra_worker). |
17 |
|
-author('rafal.slota@erlang-solutions.com'). |
18 |
|
|
19 |
|
-include("mongoose_logger.hrl"). |
20 |
|
-include_lib("cqerl/include/cqerl.hrl"). |
21 |
|
|
22 |
|
%% ==================================================================== |
23 |
|
%% Exports |
24 |
|
%% ==================================================================== |
25 |
|
|
26 |
|
%% gen_server callbacks |
27 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
28 |
|
terminate/2, code_change/3]). |
29 |
|
|
30 |
|
%% gen_server API |
31 |
|
-export([write/5, write_async/5, read/7]). |
32 |
|
|
33 |
|
-behaviour(gen_server). |
34 |
|
|
35 |
|
%% ==================================================================== |
36 |
|
%% Internal records and definitions |
37 |
|
%% ==================================================================== |
38 |
|
|
39 |
|
-record(state, { |
40 |
|
pool_name :: mongoose_cassandra:pool_name(), |
41 |
|
inflight = #{} :: #{request_id() => request()}, |
42 |
|
cql_tags = #{} :: #{cql_tag() => request_id()}, |
43 |
|
monitors = #{} :: #{cql_mref() => request_id()} |
44 |
|
}). |
45 |
|
|
46 |
|
-record(write_action, { |
47 |
|
query :: cql_query(), |
48 |
|
%% Rows left to send |
49 |
|
pending_rows :: mongoose_cassandra:rows(), |
50 |
|
%% Rows that were send to Cassandra and are awaiting response (write confirmation) |
51 |
|
current_rows :: mongoose_cassandra:rows() |
52 |
|
}). |
53 |
|
|
54 |
|
-record(read_action, { |
55 |
|
query :: cql_query(), |
56 |
|
last_result :: cql_result() | undefined %% Last result handle from cqerl |
57 |
|
}). |
58 |
|
|
59 |
|
-record(request, { |
60 |
|
id :: request_id(), |
61 |
|
action :: action(), |
62 |
|
opts = #{} :: options(), |
63 |
|
cql_mref :: cql_mref() | undefined, |
64 |
|
tag :: cql_tag() | undefined, |
65 |
|
from :: caller(), |
66 |
|
retry_left :: non_neg_integer(), |
67 |
|
timeout_tref :: reference() | undefined |
68 |
|
}). |
69 |
|
|
70 |
|
-define(DEFAULT_OPTS, #{ |
71 |
|
batch_size => 20, |
72 |
|
retry => 3, |
73 |
|
timeout => timer:minutes(1), |
74 |
|
%% cqerl options |
75 |
|
consistency => one, |
76 |
|
batch_mode => unlogged |
77 |
|
}). |
78 |
|
|
79 |
|
|
80 |
|
%% By how much (%) should we resize batch size when Cassandra returns 'batch to big' error |
81 |
|
-define(BATCH_SHRINK_RATIO, 0.5). |
82 |
|
|
83 |
|
%% ==================================================================== |
84 |
|
%% Types |
85 |
|
%% ==================================================================== |
86 |
|
|
87 |
|
%% API types |
88 |
|
|
89 |
|
-type options() :: #{ |
90 |
|
timeout => non_neg_integer(), %% milliseconds |
91 |
|
retry => non_neg_integer(), %% how many retry attempts should be made |
92 |
|
batch_size => non_neg_integer(), %% Due to Cassandra query size limits, this value may be |
93 |
|
%% adjusted on error (last query retry will use batch size |
94 |
|
consistency => consistency_level() | consistency_level_int(), |
95 |
|
batch_mode => batch_mode() | batch_mode_int() |
96 |
|
}. |
97 |
|
|
98 |
|
-export_type([options/0]). |
99 |
|
|
100 |
|
%% Internal types |
101 |
|
|
102 |
|
-type context_id() :: term() | undefined. %% This value is used as a hash base in worker selection |
103 |
|
-type query_str() :: binary(). |
104 |
|
-type caller() :: {pid(), term()} | undefined. |
105 |
|
|
106 |
|
-type request_id() :: reference(). |
107 |
|
-type request() :: #request{}. |
108 |
|
-type write_action() :: #write_action{}. |
109 |
|
-type read_action() :: #read_action{}. |
110 |
|
-type action() :: read_action() | write_action(). |
111 |
|
|
112 |
|
-type cql_result() :: void | #cql_result{}. |
113 |
|
-type cql_query() :: #cql_query{}. |
114 |
|
-type cql_mref() :: reference(). |
115 |
|
-type cql_tag() :: term(). %% Normally it's a reference(), but let's not depend on that |
116 |
|
|
117 |
|
-type worker_state() :: #state{}. |
118 |
|
-type process_type() :: cqerl_client. |
119 |
|
-type error_type() :: {down, process_type()} | cancel | cqerl_error. |
120 |
|
-type error_reason() :: {Code :: non_neg_integer(), Details :: binary(), any()} | timeout | term(). |
121 |
|
|
122 |
|
|
123 |
|
%%==================================================================== |
124 |
|
%% API functions |
125 |
|
%%==================================================================== |
126 |
|
|
127 |
|
|
128 |
|
%% -------------------------------------------------------- |
129 |
|
%% @doc Execute batch write query to Cassandra (insert, update or delete). |
130 |
|
%% Note that Cassandra doesn't like big batches, therefore this function will try to |
131 |
|
%% split given rows into batches of 20 rows and will fall back to smaller batches if |
132 |
|
%% Cassandra rejects the query due to its size being to big. |
133 |
|
%% @end |
134 |
|
%% -------------------------------------------------------- |
135 |
|
-spec write(mongoose_cassandra:pool_name(), context_id(), query_str(), |
136 |
|
mongoose_cassandra:rows(), options()) -> |
137 |
|
ok | {error, Reason :: term()}. |
138 |
|
write(PoolName, ContextId, QueryStr, Rows, Opts) -> |
139 |
:-( |
Opts1 = prepare_options(Opts), |
140 |
:-( |
Call = {write, QueryStr, Rows, Opts1}, |
141 |
:-( |
mongoose_cassandra_pool:call_query(PoolName, ContextId, Call). |
142 |
|
|
143 |
|
%% -------------------------------------------------------- |
144 |
|
%% @doc Same as @see write/5 but asynchronous (without response). |
145 |
|
%% @end |
146 |
|
%% -------------------------------------------------------- |
147 |
|
-spec write_async(mongoose_cassandra:pool_name(), context_id(), query_str(), |
148 |
|
mongoose_cassandra:rows(), options()) -> |
149 |
|
ok. |
150 |
|
write_async(PoolName, ContextId, QueryStr, Rows, Opts) -> |
151 |
:-( |
Opts1 = prepare_options(Opts), |
152 |
:-( |
Cast = {write, QueryStr, Rows, Opts1}, |
153 |
:-( |
mongoose_cassandra_pool:cast_query(PoolName, ContextId, Cast). |
154 |
|
|
155 |
|
%% -------------------------------------------------------- |
156 |
|
%% @doc Execute read query to Cassandra (select). |
157 |
|
%% This functions behaves much like the lists:foldl/3 but the input are pages from result of given |
158 |
|
%% query. Therefore each execution of given fun gets list of several result rows (by default 100 at |
159 |
|
%% most). |
160 |
|
%% @end |
161 |
|
%% -------------------------------------------------------- |
162 |
|
-spec read(mongoose_cassandra:pool_name(), context_id(), query_str(), |
163 |
|
mongoose_cassandra:parameters(), mongoose_cassandra:fold_fun(), |
164 |
|
mongoose_cassandra:fold_accumulator(), options()) -> |
165 |
|
{ok, mongoose_cassandra:fold_accumulator()} | {error, Reason :: term()}. |
166 |
|
read(PoolName, ContextId, QueryStr, Params, Fun, AccIn, Opts) -> |
167 |
:-( |
Opts1 = prepare_options(Opts), |
168 |
:-( |
Call = {read, QueryStr, Params, Opts1}, |
169 |
:-( |
do_read(PoolName, ContextId, Call, Fun, AccIn). |
170 |
|
|
171 |
|
do_read(PoolName, ContextId, Call, Fun, AccIn) -> |
172 |
:-( |
case mongoose_cassandra_pool:call_query(PoolName, ContextId, Call) of |
173 |
|
{finished, Result} -> |
174 |
:-( |
NextAcc = Fun(cqerl:all_rows(Result), AccIn), |
175 |
:-( |
{ok, NextAcc}; |
176 |
|
{partial, Req, Result} -> |
177 |
:-( |
NextAcc = Fun(cqerl:all_rows(Result), AccIn), |
178 |
:-( |
do_read(PoolName, ContextId, {continue, Req}, Fun, NextAcc); |
179 |
|
Other -> |
180 |
:-( |
Other |
181 |
|
end. |
182 |
|
|
183 |
|
%%==================================================================== |
184 |
|
%% gen_server callbacks |
185 |
|
%%==================================================================== |
186 |
|
|
187 |
|
%%-------------------------------------------------------------------- |
188 |
|
%% Function: init(Args) -> {ok, State} | |
189 |
|
%% {ok, State, Timeout} | |
190 |
|
%% ignore | |
191 |
|
%% {stop, Reason} |
192 |
|
%% Description: Initiates the server |
193 |
|
%%-------------------------------------------------------------------- |
194 |
|
init([PoolName]) -> |
195 |
:-( |
{ok, #state{pool_name = PoolName}}. |
196 |
|
|
197 |
|
%%-------------------------------------------------------------------- |
198 |
|
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
199 |
|
%% {reply, Reply, State, Timeout} | |
200 |
|
%% {noreply, State} | |
201 |
|
%% {noreply, State, Timeout} | |
202 |
|
%% {stop, Reason, Reply, State} | |
203 |
|
%% {stop, Reason, State} |
204 |
|
%% Description: Handling call messages |
205 |
|
%%-------------------------------------------------------------------- |
206 |
|
handle_call({write, QueryStr, Rows, Opts}, From, State = #state{}) -> |
207 |
:-( |
Action = new_write_action(QueryStr, Rows), |
208 |
:-( |
Request = new_request(From, Opts, Action), |
209 |
:-( |
RequestId = Request#request.id, |
210 |
|
|
211 |
:-( |
NewRequest = schedule_timeout(maps:get(timeout, Opts), Request), |
212 |
:-( |
NewState = update_req(NewRequest, State), |
213 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
214 |
|
handle_call({read, QueryStr, Params, Opts}, From, State = #state{}) -> |
215 |
:-( |
Action = new_read_action(QueryStr, Params), |
216 |
:-( |
Request = new_request(From, Opts, Action), |
217 |
:-( |
RequestId = Request#request.id, |
218 |
|
|
219 |
:-( |
NewRequest = schedule_timeout(maps:get(timeout, Opts), Request), |
220 |
:-( |
NewState = update_req(NewRequest, State), |
221 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
222 |
|
handle_call({continue, Req}, From, State = #state{}) -> |
223 |
:-( |
RequestId = Req#request.id, |
224 |
:-( |
NewReq = Req#request{from = From}, |
225 |
:-( |
NewState = update_req(NewReq, State), |
226 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
227 |
|
handle_call(Msg, From, State) -> |
228 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
229 |
:-( |
{noreply, State}. |
230 |
|
|
231 |
|
|
232 |
|
%%-------------------------------------------------------------------- |
233 |
|
%% Function: handle_cast(Msg, State) -> {noreply, State} | |
234 |
|
%% {noreply, State, Timeout} | |
235 |
|
%% {stop, Reason, State} |
236 |
|
%% Description: Handling cast messages |
237 |
|
%%-------------------------------------------------------------------- |
238 |
|
|
239 |
|
handle_cast({write, QueryStr, Rows, Opts}, #state{} = State) -> |
240 |
:-( |
Action = new_write_action(QueryStr, Rows), |
241 |
:-( |
Request = new_request(undefined, Opts, Action), |
242 |
:-( |
RequestId = Request#request.id, |
243 |
|
|
244 |
:-( |
NewState = State#state{inflight = maps:put(RequestId, Request, State#state.inflight)}, |
245 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
246 |
|
handle_cast(Msg, State) -> |
247 |
:-( |
?UNEXPECTED_CAST(Msg), |
248 |
:-( |
{noreply, State}. |
249 |
|
|
250 |
|
%%-------------------------------------------------------------------- |
251 |
|
%% Function: handle_info(Info, State) -> {noreply, State} | |
252 |
|
%% {noreply, State, Timeout} | |
253 |
|
%% {stop, Reason, State} |
254 |
|
%% Description: Handling all non call/cast messages |
255 |
|
%%-------------------------------------------------------------------- |
256 |
|
|
257 |
|
handle_info({cancel, ReqId, Reason}, #state{} = St) -> |
258 |
:-( |
{noreply, handle_cancel(ReqId, Reason, St)}; |
259 |
|
|
260 |
|
handle_info({result, _, _} = R, #state{} = St) -> |
261 |
:-( |
{noreply, handle_result(R, St)}; |
262 |
|
|
263 |
|
handle_info({error, _, _} = Er, #state{} = St) -> |
264 |
:-( |
{noreply, handle_error(Er, St)}; |
265 |
|
|
266 |
|
handle_info({'DOWN', _MRef, _, _, _} = Down, #state{} = St) -> |
267 |
:-( |
{noreply, handle_down(Down, St)}; |
268 |
|
|
269 |
|
handle_info({retry, ReqId}, #state{} = St) -> |
270 |
:-( |
case maps:get(ReqId, St#state.inflight, undefined) of |
271 |
|
undefined -> |
272 |
:-( |
?LOG_WARNING(#{what => cassandra_unexpected_retry_request, |
273 |
|
pool => St#state.pool_name, |
274 |
:-( |
state => St, request_id => ReqId}), |
275 |
:-( |
{noreply, St}; |
276 |
|
#request{retry_left = TryCount} = Req -> |
277 |
:-( |
NextRequest = Req#request{retry_left = max(TryCount - 1, 0)}, |
278 |
:-( |
NextState = update_req(NextRequest, St), |
279 |
:-( |
{noreply, process_request(ReqId, NextState)} |
280 |
|
end; |
281 |
|
|
282 |
|
handle_info(Msg, State) -> |
283 |
:-( |
?UNEXPECTED_INFO(Msg), |
284 |
:-( |
{noreply, State}. |
285 |
|
|
286 |
|
%%-------------------------------------------------------------------- |
287 |
|
%% Function: terminate(Reason, State) -> void() |
288 |
|
%% Description: This function is called by a gen_server when it is about to |
289 |
|
%% terminate. It should be the opposite of Module:init/1 and do any necessary |
290 |
|
%% cleaning up. When it returns, the gen_server terminates with Reason. |
291 |
|
%% The return value is ignored. |
292 |
|
%%-------------------------------------------------------------------- |
293 |
|
terminate(_Reason, _State) -> |
294 |
:-( |
ok. |
295 |
|
|
296 |
|
%%-------------------------------------------------------------------- |
297 |
|
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
298 |
|
%% Description: Convert process state when code is changed |
299 |
|
%%-------------------------------------------------------------------- |
300 |
|
code_change(_OldVsn, State, _Extra) -> |
301 |
:-( |
{ok, State}. |
302 |
|
|
303 |
|
%% ==================================================================== |
304 |
|
%% Implementation |
305 |
|
%% ==================================================================== |
306 |
|
|
307 |
|
%% Get DBMS client and sends next Cassandra query for given request. |
308 |
|
-spec process_request(request_id(), worker_state()) -> worker_state(). |
309 |
|
process_request(ReqId, #state{pool_name = PoolName} = State) -> |
310 |
:-( |
Req = maps:get(ReqId, State#state.inflight), |
311 |
:-( |
#request{action = Action, opts = Opts} = Req, |
312 |
|
|
313 |
:-( |
try |
314 |
:-( |
Client = get_client(PoolName), |
315 |
:-( |
ClientPid = cqerl:client_to_pid(Client), |
316 |
:-( |
MRef = erlang:monitor(process, ClientPid), |
317 |
:-( |
{Tag, NextAction} = send_next_request(Client, Action, Opts), |
318 |
|
|
319 |
:-( |
NextReq = Req#request{ |
320 |
|
action = NextAction, |
321 |
|
tag = Tag, |
322 |
|
cql_mref = MRef |
323 |
|
}, |
324 |
:-( |
State#state{ |
325 |
|
inflight = maps:put(ReqId, NextReq, State#state.inflight), |
326 |
|
cql_tags = maps:put(Tag, ReqId, State#state.cql_tags), |
327 |
|
monitors = maps:put(MRef, ReqId, State#state.monitors) |
328 |
|
} |
329 |
|
catch |
330 |
|
_:Reason -> |
331 |
|
%% Logging will be handled in do_handle_error/4 |
332 |
:-( |
do_handle_error({down, cqerl_client}, Reason, Req, State) |
333 |
|
end. |
334 |
|
|
335 |
|
%% Sends cqerl query for given action(). |
336 |
|
-spec send_next_request(cqerl:client(), action(), options()) -> {cql_tag(), action()}. |
337 |
|
send_next_request(Client, #write_action{} = Action, Opts) -> |
338 |
:-( |
#write_action{ |
339 |
|
current_rows = FailedRows, |
340 |
|
pending_rows = PendingRows, |
341 |
|
query = Query |
342 |
|
} = Action, |
343 |
:-( |
Rows = FailedRows ++ PendingRows, |
344 |
:-( |
BatchSize = maps:get(batch_size, Opts), |
345 |
|
|
346 |
:-( |
{NewRows, Tail} = lists:split(min(BatchSize, length(Rows)), Rows), |
347 |
:-( |
QueryBatch = #cql_query_batch{ |
348 |
|
mode = maps:get(batch_mode, Opts), |
349 |
|
consistency = maps:get(consistency, Opts), |
350 |
:-( |
queries = [Query#cql_query{values = NewRow} || NewRow <- NewRows] |
351 |
|
}, |
352 |
:-( |
Tag = cqerl:send_query(Client, QueryBatch), |
353 |
|
|
354 |
:-( |
NextAction = Action#write_action{current_rows = NewRows, pending_rows = Tail}, |
355 |
:-( |
{Tag, NextAction}; |
356 |
|
send_next_request(Client, #read_action{} = Action, _Opts) -> |
357 |
:-( |
#read_action{query = Query} = Action, |
358 |
|
|
359 |
:-( |
Tag = |
360 |
|
case Action#read_action.last_result of |
361 |
|
undefined -> |
362 |
:-( |
cqerl:send_query(Client, Query); |
363 |
|
Result -> |
364 |
:-( |
cqerl:fetch_more_async(Result) |
365 |
|
end, |
366 |
|
|
367 |
:-( |
{Tag, Action}. |
368 |
|
|
369 |
|
%% Handles query cancel requests (normally scheduled timeout) |
370 |
|
-spec handle_cancel(request_id(), Reason :: term(), worker_state()) -> worker_state(). |
371 |
|
handle_cancel(ReqId, Reason, State) -> |
372 |
:-( |
case maps:get(ReqId, State#state.inflight, undefined) of |
373 |
|
undefined -> |
374 |
:-( |
State; |
375 |
|
#request{tag = Tag} = Req -> |
376 |
:-( |
?LOG_WARNING(#{what => cassandra_aborting_request, reason => Reason, |
377 |
|
pool => State#state.pool_name, |
378 |
|
tag => Tag, error_text => error_text(cancel, Reason, Req), |
379 |
:-( |
query_type => query_type(Req)}), |
380 |
:-( |
cleanup_request(ReqId, State) |
381 |
|
end. |
382 |
|
|
383 |
|
-spec handle_result({result, cql_tag(), cql_result()}, worker_state()) -> worker_state(). |
384 |
|
handle_result({result, Tag, Result} = R, #state{} = State) -> |
385 |
:-( |
case maps:get(Tag, State#state.cql_tags, undefined) of |
386 |
|
undefined -> |
387 |
:-( |
?LOG_WARNING(#{what => cassandra_unexpected_result, |
388 |
|
pool => State#state.pool_name, |
389 |
:-( |
result => R}), |
390 |
:-( |
State; |
391 |
|
ReqId -> |
392 |
:-( |
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight), |
393 |
:-( |
erlang:demonitor(MRef, [flush]), |
394 |
:-( |
do_handle_result(Result, Req, State) |
395 |
|
end. |
396 |
|
|
397 |
|
%% Handles all 'result' responses from DBMS. Depending on the state of request, |
398 |
|
%% initiates next query or returs response to the caller. |
399 |
|
-spec do_handle_result(cql_result(), request(), worker_state()) -> worker_state(). |
400 |
|
do_handle_result(Result, Req, State) -> |
401 |
:-( |
#request{action = Action, id = ReqId} = Req, |
402 |
|
|
403 |
:-( |
IsFinished = is_finished(Req, Result), |
404 |
:-( |
case {IsFinished, Action} of |
405 |
|
%% Continue with unfinished queries |
406 |
|
{false, #write_action{} = A} -> |
407 |
:-( |
NextAction = A#write_action{current_rows = []}, %% Remove just saved rows |
408 |
:-( |
NextReq = Req#request{action = NextAction}, |
409 |
:-( |
NextState = update_req(NextReq, State), |
410 |
|
|
411 |
:-( |
process_request(ReqId, NextState); |
412 |
|
{false, #read_action{} = A} -> |
413 |
|
|
414 |
:-( |
NextAction = A#read_action{last_result = Result}, |
415 |
:-( |
NextReq = Req#request{action = NextAction}, |
416 |
:-( |
NextState = update_req(NextReq, State), |
417 |
:-( |
maybe_reply(Req, {partial, NextReq, Result}), |
418 |
:-( |
NextState; |
419 |
|
|
420 |
|
%% Reply and cleanup for finished queries |
421 |
|
{true, #write_action{}} -> |
422 |
:-( |
maybe_reply(Req, ok), |
423 |
:-( |
cleanup_request(ReqId, State); |
424 |
|
{true, #read_action{}} -> |
425 |
:-( |
maybe_reply(Req, {finished, Result}), |
426 |
:-( |
cleanup_request(ReqId, State) |
427 |
|
|
428 |
|
end. |
429 |
|
|
430 |
|
-spec handle_error({error, cql_tag(), Reason :: term()}, worker_state()) -> worker_state(). |
431 |
|
handle_error({error, Tag, Reason} = Error, #state{} = State) -> |
432 |
:-( |
case maps:get(Tag, State#state.cql_tags, undefined) of |
433 |
|
undefined -> |
434 |
:-( |
?LOG_WARNING(#{what => cassandra_unexpected_error, error => Error, |
435 |
|
pool => State#state.pool_name, |
436 |
:-( |
tag => Tag, reason => Reason}), |
437 |
:-( |
State; |
438 |
|
ReqId -> |
439 |
:-( |
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight), |
440 |
:-( |
erlang:demonitor(MRef, [flush]), |
441 |
:-( |
do_handle_error(cqerl_error, Reason, Req, State) |
442 |
|
end. |
443 |
|
|
444 |
|
-spec handle_down({'DOWN', reference(), process, pid(), Info :: term()}, |
445 |
|
worker_state()) -> worker_state(). |
446 |
|
handle_down({'DOWN', MRef, _, _Pid, Info} = _Down, State) -> |
447 |
:-( |
case maps:get(MRef, State#state.monitors, undefined) of |
448 |
|
undefined -> |
449 |
:-( |
State; |
450 |
|
ReqId -> |
451 |
:-( |
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight), |
452 |
:-( |
do_handle_error({down, cqerl_client}, Info, Req, State) |
453 |
|
end. |
454 |
|
|
455 |
|
-spec do_handle_error(error_type(), error_reason(), request(), worker_state()) -> worker_state(). |
456 |
|
do_handle_error(Type, Reason, Req, State) -> |
457 |
:-( |
#request{tag = Tag, id = ReqId, opts = Opts, retry_left = RetryLeft} = Req, |
458 |
|
|
459 |
:-( |
case retry_info(Type, Reason, Req, State) of |
460 |
|
{abort, AbortReason, NextState} -> |
461 |
:-( |
?LOG_WARNING(#{what => cassandra_aborting_query, tag => Tag, |
462 |
|
pool => State#state.pool_name, |
463 |
|
query_type => query_type(Req), reason => AbortReason, |
464 |
:-( |
error_text => error_text(Type, Reason, Req)}), |
465 |
:-( |
maybe_reply(Req, {error, Reason}), |
466 |
:-( |
cleanup_request(Req, NextState); |
467 |
|
{retry, WaitFor, NextState} -> |
468 |
:-( |
?LOG_WARNING(#{what => cassandra_retrying_query, tag => Tag, |
469 |
|
pool => State#state.pool_name, |
470 |
|
query_type => query_type(Req), request_opts => Opts, |
471 |
:-( |
error_text => error_text(Type, Reason, Req), retry_left => RetryLeft}), |
472 |
:-( |
schedule_retry(ReqId, WaitFor, NextState) |
473 |
|
end. |
474 |
|
|
475 |
|
%% Sends given reply is caller is known. |
476 |
|
-spec maybe_reply(request(), Result :: ok | {ok, term()} | |
477 |
|
{error, term()} | {finished, term()} | {partial, request(), term()}) -> any(). |
478 |
|
maybe_reply(#request{from = undefined}, _Result) -> |
479 |
:-( |
ok; |
480 |
|
maybe_reply(#request{from = From}, Result) -> |
481 |
:-( |
gen_server:reply(From, Result). |
482 |
|
|
483 |
|
%% Removes all data and monitors associated with the request. |
484 |
|
-spec cleanup_request(request() | request_id(), worker_state()) -> worker_state(). |
485 |
|
cleanup_request(#request{id = ReqId}, State) -> |
486 |
:-( |
cleanup_request(ReqId, State); |
487 |
|
cleanup_request(ReqId, State) -> |
488 |
:-( |
#state{inflight = Inflight, monitors = Monitors, cql_tags = Tags} = State, |
489 |
:-( |
#request{tag = Tag, cql_mref = CqlMRef, timeout_tref = TRef} = maps:get(ReqId, Inflight), |
490 |
:-( |
catch demonitor(CqlMRef), %% If may be unset at this point and we don't really care about it |
491 |
:-( |
catch erlang:cancel_timer(TRef), |
492 |
|
|
493 |
:-( |
NextInflight = maps:remove(ReqId, Inflight), |
494 |
:-( |
NextMonitors = maps:remove(CqlMRef, Monitors), |
495 |
:-( |
NextTags = maps:remove(Tag, Tags), |
496 |
|
|
497 |
:-( |
State#state{inflight = NextInflight, monitors = NextMonitors, cql_tags = NextTags}. |
498 |
|
|
499 |
|
%% Checks whether request requires more queries to complete. |
500 |
|
-spec is_finished(request(), cql_result()) -> boolean(). |
501 |
|
is_finished(_Req = #request{action = #write_action{pending_rows = []}}, _Result) -> |
502 |
:-( |
true; |
503 |
|
is_finished(_Req = #request{action = #write_action{}}, _Result) -> |
504 |
:-( |
false; |
505 |
|
is_finished(_Req = #request{action = #read_action{}}, Result) -> |
506 |
:-( |
not cqerl:has_more_pages(Result). |
507 |
|
|
508 |
|
%% For given query error, returns whether request should be retried and if so - how soon. |
509 |
|
-spec retry_info(error_type(), error_reason(), request(), worker_state()) -> |
510 |
|
{abort, AbortReason :: term(), worker_state()} | |
511 |
|
{retry, Timeout :: non_neg_integer(), worker_state()}. |
512 |
|
retry_info(_, _, #request{retry_left = 0} = _Req, State) -> |
513 |
:-( |
{abort, retry_limit_exceeded, State}; |
514 |
|
retry_info({down, cqerl_client}, _Reason, _Req, State) -> |
515 |
:-( |
{retry, 5 + rand:uniform(20), State}; |
516 |
|
retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) -> |
517 |
:-( |
{retry, 10 + rand:uniform(50), State}; |
518 |
|
retry_info(cqerl_error, {16#1200 = _ReadTimout, _, _}, _Req, State) -> |
519 |
:-( |
{retry, 10 + rand:uniform(50), State}; |
520 |
|
retry_info(cqerl_error, {16#2200 = _WriteToBig, _, _}, Req, State) -> |
521 |
:-( |
#request{id = ReqId, opts = Opts, retry_left = RetryLeft} = Req, |
522 |
:-( |
BatchSize = maps:get(batch_size, Opts), |
523 |
:-( |
NewBatchSize = |
524 |
|
case RetryLeft of |
525 |
|
1 -> %% This is the last try, let's use batch size of 1 |
526 |
:-( |
1; |
527 |
:-( |
_ -> max(1, round(BatchSize * ?BATCH_SHRINK_RATIO)) |
528 |
|
end, |
529 |
|
|
530 |
:-( |
NextOpts = maps:put(batch_size, NewBatchSize, Opts), |
531 |
:-( |
NextReq = Req#request{opts = NextOpts}, |
532 |
:-( |
NextState = State#state{inflight = maps:put(ReqId, NextReq, State#state.inflight)}, |
533 |
|
|
534 |
:-( |
{retry, 0, NextState}; |
535 |
|
retry_info(cqerl_error, {connection_closed, _}, _Req, State) -> |
536 |
:-( |
{retry, 5 + rand:uniform(20), State}; |
537 |
|
retry_info(_Type, _Reason, _Req, State) -> |
538 |
:-( |
{abort, unknown_error, State}. |
539 |
|
|
540 |
|
%% Returns log message for given query error. |
541 |
|
error_text(cancel = Category, Reason, _Request) -> |
542 |
:-( |
io_lib:format("status=error, category=~s, details=~p", |
543 |
|
[Category, Reason]); |
544 |
|
error_text({down, ProcessType}, Reason, Request) -> |
545 |
:-( |
MRef = Request#request.cql_mref, |
546 |
:-( |
io_lib:format("status=error, category=~s, details=~p, process_type=~p, mref=~p", |
547 |
|
[process_down, Reason, ProcessType, MRef]); |
548 |
|
error_text(cqerl_error, Reason, _Request) -> |
549 |
:-( |
cql_error_text(Reason). |
550 |
|
|
551 |
|
%% Returns log message for given cqerl specific query error. |
552 |
|
cql_error_text({ErrorCode, ErrorText, _}) when is_binary(ErrorText) -> |
553 |
:-( |
CodeText = response_code_to_binary(ErrorCode), |
554 |
:-( |
HexErrorCode = "0x" ++ integer_to_list(ErrorCode, 16), |
555 |
:-( |
io_lib:format("status=error, category=~s, details=~s, code=~p", |
556 |
|
[CodeText, ErrorText, HexErrorCode]); |
557 |
|
cql_error_text({ErrorCode, _, ErrorData}) -> |
558 |
:-( |
cql_error_text({ErrorCode, <<"undefined">>, ErrorData}); |
559 |
|
cql_error_text({Category, Reason}) -> |
560 |
:-( |
io_lib:format("status=error, category=~s, details=~p", |
561 |
|
[Category, Reason]); |
562 |
|
cql_error_text(UnknownReason) -> |
563 |
:-( |
io_lib:format("status=error, category=~s, details=~p", |
564 |
|
[unknown, UnknownReason]). |
565 |
|
|
566 |
|
|
567 |
|
%% ==================================================================== |
568 |
|
%% Helpers |
569 |
|
%% ==================================================================== |
570 |
|
|
571 |
|
-spec get_client(mongoose_cassandra:pool_name()) -> |
572 |
|
cqerl:client() | no_return(). |
573 |
|
get_client(PoolName) -> |
574 |
:-( |
get_client_loop(PoolName, 0). |
575 |
|
|
576 |
|
-spec get_client_loop(mongoose_cassandra:pool_name(), non_neg_integer()) -> |
577 |
|
cqerl:client() | no_return(). |
578 |
|
get_client_loop(PoolName, RetryNo) when RetryNo >= 500 -> |
579 |
:-( |
?LOG_WARNING(#{what => cassandra_get_client_failed, |
580 |
:-( |
pool => PoolName}), |
581 |
:-( |
error(cannot_get_cqerl_client, [PoolName, RetryNo]); |
582 |
|
get_client_loop(PoolName, RetryNo) -> |
583 |
:-( |
try cqerl:get_client(PoolName) of |
584 |
|
{ok, Client} -> |
585 |
:-( |
Pid = cqerl:client_to_pid(Client), |
586 |
|
%% TODO that would not be retried. Add docs why we need is_process_alive. |
587 |
:-( |
case is_process_alive(Pid) of |
588 |
:-( |
true -> Client; |
589 |
:-( |
_ -> throw({dead, Pid}) |
590 |
|
end |
591 |
|
catch |
592 |
|
Class:Reason:Stacktrace -> |
593 |
:-( |
?LOG_INFO(#{what => cassandra_get_client_retry, |
594 |
|
class => Class, reason => Reason, stacktrace => Stacktrace, |
595 |
:-( |
pool => PoolName, retry_number => RetryNo}), |
596 |
|
|
597 |
:-( |
Wait = rand:uniform(10), |
598 |
:-( |
timer:sleep(Wait), |
599 |
:-( |
get_client_loop(PoolName, RetryNo + 1) |
600 |
|
end. |
601 |
|
|
602 |
|
-spec schedule_retry(request_id(), non_neg_integer(), worker_state()) -> worker_state(). |
603 |
|
schedule_retry(ReqId, Time, State) -> |
604 |
:-( |
erlang:send_after(Time, self(), {retry, ReqId}), |
605 |
:-( |
State. |
606 |
|
|
607 |
|
-spec prepare_options(options()) -> options(). |
608 |
|
prepare_options(Opts) -> |
609 |
:-( |
maps:merge(?DEFAULT_OPTS, Opts). |
610 |
|
|
611 |
|
-spec new_request(From :: caller(), options(), action()) -> request(). |
612 |
|
new_request(From, Opts, Action) -> |
613 |
:-( |
RequestId = erlang:make_ref(), |
614 |
|
|
615 |
:-( |
#request{ |
616 |
|
id = RequestId, |
617 |
|
action = Action, |
618 |
|
opts = Opts, |
619 |
|
from = From, |
620 |
|
retry_left = maps:get(retry, Opts) |
621 |
|
}. |
622 |
|
|
623 |
|
-spec new_write_action(query_str(), mongoose_cassandra:rows()) -> write_action(). |
624 |
|
new_write_action(QueryStr, Rows) -> |
625 |
:-( |
#write_action{ |
626 |
|
query = #cql_query{statement = QueryStr}, |
627 |
|
pending_rows = Rows, |
628 |
|
current_rows = [] |
629 |
|
}. |
630 |
|
|
631 |
|
-spec new_read_action(query_str(), mongoose_cassandra:parameters()) -> read_action(). |
632 |
|
new_read_action(QueryStr, Params) -> |
633 |
:-( |
#read_action{ |
634 |
|
query = #cql_query{statement = QueryStr, values = Params} |
635 |
|
}. |
636 |
|
|
637 |
|
schedule_timeout(Timeout, Req = #request{id = ReqId}) -> |
638 |
:-( |
TRef = erlang:send_after(Timeout, self(), {cancel, ReqId, timeout}), |
639 |
:-( |
Req#request{timeout_tref = TRef}. |
640 |
|
|
641 |
|
update_req(Req = #request{id = ReqId}, State = #state{inflight = Inflight}) -> |
642 |
:-( |
State#state{inflight = maps:put(ReqId, Req, Inflight)}. |
643 |
|
|
644 |
|
query_type(#request{action = #write_action{}}) -> |
645 |
:-( |
write; |
646 |
|
query_type(#request{action = #read_action{}}) -> |
647 |
:-( |
read. |
648 |
|
|
649 |
|
%% Source: https://stackoverflow.com/questions/48304330/list-of-cassandra-error-codes |
650 |
|
response_code_to_binary(Code) when is_integer(Code) -> |
651 |
:-( |
case Code of |
652 |
:-( |
16#0000 -> <<"serverError">>; |
653 |
:-( |
16#000A -> <<"protocolError">>; |
654 |
:-( |
16#0100 -> <<"badCredentials">>; |
655 |
:-( |
16#1000 -> <<"unavailableException">>; |
656 |
:-( |
16#1001 -> <<"overloaded">>; |
657 |
:-( |
16#1002 -> <<"isBootstrapping">>; |
658 |
:-( |
16#1003 -> <<"truncateError">>; |
659 |
:-( |
16#1100 -> <<"writeTimeout">>; |
660 |
:-( |
16#1200 -> <<"readTimeout">>; |
661 |
:-( |
16#1300 -> <<"readFailure">>; |
662 |
:-( |
16#1400 -> <<"functionFailure">>; |
663 |
:-( |
16#1500 -> <<"writeFailure">>; |
664 |
:-( |
16#2000 -> <<"syntaxError">>; |
665 |
:-( |
16#2100 -> <<"unauthorized">>; |
666 |
:-( |
16#2200 -> <<"invalid">>; |
667 |
:-( |
16#2300 -> <<"configError">>; |
668 |
:-( |
16#2400 -> <<"alreadyExists">>; |
669 |
:-( |
16#2500 -> <<"unprepared">>; |
670 |
:-( |
_ -> <<"unknownCode=0x", (integer_to_binary(Code, 16))/bytes>> |
671 |
|
end; |
672 |
|
response_code_to_binary(_) -> |
673 |
:-( |
<<"invalidErrorCode">>. |