1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2018 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mongoose_cassandra_worker). |
17 |
|
-author('rafal.slota@erlang-solutions.com'). |
18 |
|
|
19 |
|
-include("mongoose_logger.hrl"). |
20 |
|
-include_lib("cqerl/include/cqerl.hrl"). |
21 |
|
|
22 |
|
%% ==================================================================== |
23 |
|
%% Exports |
24 |
|
%% ==================================================================== |
25 |
|
|
26 |
|
%% gen_server callbacks |
27 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
28 |
|
terminate/2, code_change/3]). |
29 |
|
|
30 |
|
%% gen_server API |
31 |
|
-export([write/5, write_async/5, read/7]). |
32 |
|
|
33 |
|
-behaviour(gen_server). |
34 |
|
|
35 |
|
%% ==================================================================== |
36 |
|
%% Internal records and definitions |
37 |
|
%% ==================================================================== |
38 |
|
|
39 |
|
-record(state, { |
40 |
|
pool_name :: mongoose_cassandra:pool_name(), |
41 |
|
inflight = #{} :: #{request_id() => request()}, |
42 |
|
cql_tags = #{} :: #{cql_tag() => request_id()}, |
43 |
|
monitors = #{} :: #{cql_mref() => request_id()} |
44 |
|
}). |
45 |
|
|
46 |
|
-record(write_action, { |
47 |
|
query :: cql_query(), |
48 |
|
%% Rows left to send |
49 |
|
pending_rows :: mongoose_cassandra:rows(), |
50 |
|
%% Rows that were send to Cassandra and are awaiting response (write confirmation) |
51 |
|
current_rows :: mongoose_cassandra:rows() |
52 |
|
}). |
53 |
|
|
54 |
|
-record(read_action, { |
55 |
|
query :: cql_query(), |
56 |
|
last_result :: cql_result() | undefined %% Last result handle from cqerl |
57 |
|
}). |
58 |
|
|
59 |
|
-record(request, { |
60 |
|
id :: request_id(), |
61 |
|
action :: action(), |
62 |
|
opts = #{} :: options(), |
63 |
|
cql_mref :: cql_mref() | undefined, |
64 |
|
tag :: cql_tag() | undefined, |
65 |
|
from :: caller(), |
66 |
|
retry_left :: non_neg_integer(), |
67 |
|
timeout_tref :: reference() | undefined |
68 |
|
}). |
69 |
|
|
70 |
|
-define(DEFAULT_OPTS, #{ |
71 |
|
batch_size => 20, |
72 |
|
retry => 3, |
73 |
|
timeout => timer:minutes(1), |
74 |
|
%% cqerl options |
75 |
|
consistency => one, |
76 |
|
batch_mode => unlogged |
77 |
|
}). |
78 |
|
|
79 |
|
|
80 |
|
%% By how much (%) should we resize batch size when Cassandra returns 'batch to big' error |
81 |
|
-define(BATCH_SHRINK_RATIO, 0.5). |
82 |
|
|
83 |
|
%% ==================================================================== |
84 |
|
%% Types |
85 |
|
%% ==================================================================== |
86 |
|
|
87 |
|
%% API types |
88 |
|
|
89 |
|
-type options() :: #{ |
90 |
|
timeout => non_neg_integer(), %% milliseconds |
91 |
|
retry => non_neg_integer(), %% how many retry attempts should be made |
92 |
|
batch_size => non_neg_integer(), %% Due to Cassandra query size limits, this value may be |
93 |
|
%% adjusted on error (last query retry will use batch size |
94 |
|
consistency => consistency_level() | consistency_level_int(), |
95 |
|
batch_mode => batch_mode() | batch_mode_int() |
96 |
|
}. |
97 |
|
|
98 |
|
-export_type([options/0]). |
99 |
|
|
100 |
|
%% Internal types |
101 |
|
|
102 |
|
-type context_id() :: term() | undefined. %% This value is used as a hash base in worker selection |
103 |
|
-type query_str() :: binary(). |
104 |
|
-type caller() :: {pid(), term()} | undefined. |
105 |
|
|
106 |
|
-type request_id() :: reference(). |
107 |
|
-type request() :: #request{}. |
108 |
|
-type write_action() :: #write_action{}. |
109 |
|
-type read_action() :: #read_action{}. |
110 |
|
-type action() :: read_action() | write_action(). |
111 |
|
|
112 |
|
-type cql_result() :: void | #cql_result{}. |
113 |
|
-type cql_query() :: #cql_query{}. |
114 |
|
-type cql_mref() :: reference(). |
115 |
|
-type cql_tag() :: term(). %% Normally it's a reference(), but let's not depend on that |
116 |
|
|
117 |
|
-type worker_state() :: #state{}. |
118 |
|
-type process_type() :: cqerl_client. |
119 |
|
-type error_type() :: {down, process_type()} | cancel | cqerl_error. |
120 |
|
-type error_reason() :: {Code :: non_neg_integer(), Details :: binary(), any()} | timeout | term(). |
121 |
|
|
122 |
|
|
123 |
|
%%==================================================================== |
124 |
|
%% API functions |
125 |
|
%%==================================================================== |
126 |
|
|
127 |
|
|
128 |
|
%% -------------------------------------------------------- |
129 |
|
%% @doc Execute batch write query to Cassandra (insert, update or delete). |
130 |
|
%% Note that Cassandra doesn't like big batches, therefore this function will try to |
131 |
|
%% split given rows into batches of 20 rows and will fall back to smaller batches if |
132 |
|
%% Cassandra rejects the query due to its size being to big. |
133 |
|
%% @end |
134 |
|
%% -------------------------------------------------------- |
135 |
|
-spec write(mongoose_cassandra:pool_name(), context_id(), query_str(), |
136 |
|
mongoose_cassandra:rows(), options()) -> |
137 |
|
ok | {error, Reason :: term()}. |
138 |
|
write(PoolName, ContextId, QueryStr, Rows, Opts) -> |
139 |
:-( |
Opts1 = prepare_options(Opts), |
140 |
:-( |
Call = {write, QueryStr, Rows, Opts1}, |
141 |
:-( |
mongoose_cassandra_pool:call_query(PoolName, ContextId, Call). |
142 |
|
|
143 |
|
%% -------------------------------------------------------- |
144 |
|
%% @doc Same as @see write/5 but asynchronous (without response). |
145 |
|
%% @end |
146 |
|
%% -------------------------------------------------------- |
147 |
|
-spec write_async(mongoose_cassandra:pool_name(), context_id(), query_str(), |
148 |
|
mongoose_cassandra:rows(), options()) -> |
149 |
|
ok. |
150 |
|
write_async(PoolName, ContextId, QueryStr, Rows, Opts) -> |
151 |
:-( |
Opts1 = prepare_options(Opts), |
152 |
:-( |
Cast = {write, QueryStr, Rows, Opts1}, |
153 |
:-( |
mongoose_cassandra_pool:cast_query(PoolName, ContextId, Cast). |
154 |
|
|
155 |
|
%% -------------------------------------------------------- |
156 |
|
%% @doc Execute read query to Cassandra (select). |
157 |
|
%% This functions behaves much like the lists:foldl/3 but the input are pages from result of given |
158 |
|
%% query. Therefore each execution of given fun gets list of several result rows (by default 100 at |
159 |
|
%% most). |
160 |
|
%% @end |
161 |
|
%% -------------------------------------------------------- |
162 |
|
-spec read(mongoose_cassandra:pool_name(), context_id(), query_str(), |
163 |
|
mongoose_cassandra:parameters(), mongoose_cassandra:fold_fun(), |
164 |
|
mongoose_cassandra:fold_accumulator(), options()) -> |
165 |
|
{ok, mongoose_cassandra:fold_accumulator()} | {error, Reason :: term()}. |
166 |
|
read(PoolName, ContextId, QueryStr, Params, Fun, AccIn, Opts) -> |
167 |
:-( |
Opts1 = prepare_options(Opts), |
168 |
:-( |
Call = {read, QueryStr, Params, Opts1}, |
169 |
:-( |
do_read(PoolName, ContextId, Call, Fun, AccIn). |
170 |
|
|
171 |
|
do_read(PoolName, ContextId, Call, Fun, AccIn) -> |
172 |
:-( |
case mongoose_cassandra_pool:call_query(PoolName, ContextId, Call) of |
173 |
|
{finished, Result} -> |
174 |
:-( |
NextAcc = Fun(cqerl:all_rows(Result), AccIn), |
175 |
:-( |
{ok, NextAcc}; |
176 |
|
{partial, Req, Result} -> |
177 |
:-( |
NextAcc = Fun(cqerl:all_rows(Result), AccIn), |
178 |
:-( |
do_read(PoolName, ContextId, {continue, Req}, Fun, NextAcc); |
179 |
|
Other -> |
180 |
:-( |
Other |
181 |
|
end. |
182 |
|
|
183 |
|
%%==================================================================== |
184 |
|
%% gen_server callbacks |
185 |
|
%%==================================================================== |
186 |
|
|
187 |
|
%%-------------------------------------------------------------------- |
188 |
|
%% Function: init(Args) -> {ok, State} | |
189 |
|
%% {ok, State, Timeout} | |
190 |
|
%% ignore | |
191 |
|
%% {stop, Reason} |
192 |
|
%% Description: Initiates the server |
193 |
|
%%-------------------------------------------------------------------- |
194 |
|
init([PoolName]) -> |
195 |
:-( |
{ok, #state{pool_name = PoolName}}. |
196 |
|
|
197 |
|
%%-------------------------------------------------------------------- |
198 |
|
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | |
199 |
|
%% {reply, Reply, State, Timeout} | |
200 |
|
%% {noreply, State} | |
201 |
|
%% {noreply, State, Timeout} | |
202 |
|
%% {stop, Reason, Reply, State} | |
203 |
|
%% {stop, Reason, State} |
204 |
|
%% Description: Handling call messages |
205 |
|
%%-------------------------------------------------------------------- |
206 |
|
handle_call({write, QueryStr, Rows, Opts}, From, State = #state{}) -> |
207 |
:-( |
Action = new_write_action(QueryStr, Rows), |
208 |
:-( |
Request = new_request(From, Opts, Action), |
209 |
:-( |
RequestId = Request#request.id, |
210 |
|
|
211 |
:-( |
NewRequest = schedule_timeout(maps:get(timeout, Opts), Request), |
212 |
:-( |
NewState = update_req(NewRequest, State), |
213 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
214 |
|
handle_call({read, QueryStr, Params, Opts}, From, State = #state{}) -> |
215 |
:-( |
Action = new_read_action(QueryStr, Params), |
216 |
:-( |
Request = new_request(From, Opts, Action), |
217 |
:-( |
RequestId = Request#request.id, |
218 |
|
|
219 |
:-( |
NewRequest = schedule_timeout(maps:get(timeout, Opts), Request), |
220 |
:-( |
NewState = update_req(NewRequest, State), |
221 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
222 |
|
handle_call({continue, Req}, From, State = #state{}) -> |
223 |
:-( |
RequestId = Req#request.id, |
224 |
:-( |
NewReq = Req#request{from = From}, |
225 |
:-( |
NewState = update_req(NewReq, State), |
226 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
227 |
|
handle_call(Msg, From, State) -> |
228 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
229 |
:-( |
{noreply, State}. |
230 |
|
|
231 |
|
|
232 |
|
%%-------------------------------------------------------------------- |
233 |
|
%% Function: handle_cast(Msg, State) -> {noreply, State} | |
234 |
|
%% {noreply, State, Timeout} | |
235 |
|
%% {stop, Reason, State} |
236 |
|
%% Description: Handling cast messages |
237 |
|
%%-------------------------------------------------------------------- |
238 |
|
|
239 |
|
handle_cast({write, QueryStr, Rows, Opts}, #state{} = State) -> |
240 |
:-( |
Action = new_write_action(QueryStr, Rows), |
241 |
:-( |
Request = new_request(undefined, Opts, Action), |
242 |
:-( |
RequestId = Request#request.id, |
243 |
|
|
244 |
:-( |
NewState = State#state{inflight = maps:put(RequestId, Request, State#state.inflight)}, |
245 |
:-( |
{noreply, process_request(RequestId, NewState)}; |
246 |
|
handle_cast(Msg, State) -> |
247 |
:-( |
?UNEXPECTED_CAST(Msg), |
248 |
:-( |
{noreply, State}. |
249 |
|
|
250 |
|
%%-------------------------------------------------------------------- |
251 |
|
%% Function: handle_info(Info, State) -> {noreply, State} | |
252 |
|
%% {noreply, State, Timeout} | |
253 |
|
%% {stop, Reason, State} |
254 |
|
%% Description: Handling all non call/cast messages |
255 |
|
%%-------------------------------------------------------------------- |
256 |
|
|
257 |
|
handle_info({cancel, ReqId, Reason}, #state{} = St) -> |
258 |
:-( |
{noreply, handle_cancel(ReqId, Reason, St)}; |
259 |
|
|
260 |
|
handle_info({result, _, _} = R, #state{} = St) -> |
261 |
:-( |
{noreply, handle_result(R, St)}; |
262 |
|
|
263 |
|
handle_info({error, _, _} = Er, #state{} = St) -> |
264 |
:-( |
{noreply, handle_error(Er, St)}; |
265 |
|
|
266 |
|
handle_info({'DOWN', _MRef, _, _, _} = Down, #state{} = St) -> |
267 |
:-( |
{noreply, handle_down(Down, St)}; |
268 |
|
|
269 |
|
handle_info({retry, ReqId}, #state{} = St) -> |
270 |
:-( |
case maps:get(ReqId, St#state.inflight, undefined) of |
271 |
|
undefined -> |
272 |
:-( |
?LOG_WARNING(#{what => cassandra_unexpected_retry_request, |
273 |
|
pool => St#state.pool_name, |
274 |
:-( |
state => St, request_id => ReqId}), |
275 |
:-( |
{noreply, St}; |
276 |
|
#request{retry_left = TryCount} = Req -> |
277 |
:-( |
NextRequest = Req#request{retry_left = max(TryCount - 1, 0)}, |
278 |
:-( |
NextState = update_req(NextRequest, St), |
279 |
:-( |
{noreply, process_request(ReqId, NextState)} |
280 |
|
end; |
281 |
|
|
282 |
|
handle_info(Msg, State) -> |
283 |
:-( |
?UNEXPECTED_INFO(Msg), |
284 |
:-( |
{noreply, State}. |
285 |
|
|
286 |
|
%%-------------------------------------------------------------------- |
287 |
|
%% Function: terminate(Reason, State) -> void() |
288 |
|
%% Description: This function is called by a gen_server when it is about to |
289 |
|
%% terminate. It should be the opposite of Module:init/1 and do any necessary |
290 |
|
%% cleaning up. When it returns, the gen_server terminates with Reason. |
291 |
|
%% The return value is ignored. |
292 |
|
%%-------------------------------------------------------------------- |
293 |
|
terminate(_Reason, _State) -> |
294 |
:-( |
ok. |
295 |
|
|
296 |
|
%%-------------------------------------------------------------------- |
297 |
|
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} |
298 |
|
%% Description: Convert process state when code is changed |
299 |
|
%%-------------------------------------------------------------------- |
300 |
|
code_change(_OldVsn, State, _Extra) -> |
301 |
:-( |
{ok, State}. |
302 |
|
|
303 |
|
%% ==================================================================== |
304 |
|
%% Implementation |
305 |
|
%% ==================================================================== |
306 |
|
|
307 |
|
%% Get DBMS client and sends next Cassandra query for given request. |
308 |
|
-spec process_request(request_id(), worker_state()) -> worker_state(). |
309 |
|
process_request(ReqId, #state{pool_name = PoolName} = State) -> |
310 |
:-( |
Req = maps:get(ReqId, State#state.inflight), |
311 |
:-( |
#request{action = Action, opts = Opts} = Req, |
312 |
|
|
313 |
:-( |
try |
314 |
:-( |
Client = {ClientPid, _} = get_client(PoolName), |
315 |
:-( |
MRef = erlang:monitor(process, ClientPid), |
316 |
:-( |
{Tag, NextAction} = send_next_request(Client, Action, Opts), |
317 |
|
|
318 |
:-( |
NextReq = Req#request{ |
319 |
|
action = NextAction, |
320 |
|
tag = Tag, |
321 |
|
cql_mref = MRef |
322 |
|
}, |
323 |
:-( |
State#state{ |
324 |
|
inflight = maps:put(ReqId, NextReq, State#state.inflight), |
325 |
|
cql_tags = maps:put(Tag, ReqId, State#state.cql_tags), |
326 |
|
monitors = maps:put(MRef, ReqId, State#state.monitors) |
327 |
|
} |
328 |
|
catch |
329 |
|
_:Reason -> |
330 |
|
%% Logging will be handled in do_handle_error/4 |
331 |
:-( |
do_handle_error({down, cqerl_client}, Reason, Req, State) |
332 |
|
end. |
333 |
|
|
334 |
|
%% Sends cqerl query for given action(). |
335 |
|
-spec send_next_request(cqerl:client(), action(), options()) -> {cql_tag(), action()}. |
336 |
|
send_next_request(Client, #write_action{} = Action, Opts) -> |
337 |
:-( |
#write_action{ |
338 |
|
current_rows = FailedRows, |
339 |
|
pending_rows = PendingRows, |
340 |
|
query = Query |
341 |
|
} = Action, |
342 |
:-( |
Rows = FailedRows ++ PendingRows, |
343 |
:-( |
BatchSize = maps:get(batch_size, Opts), |
344 |
|
|
345 |
:-( |
{NewRows, Tail} = lists:split(min(BatchSize, length(Rows)), Rows), |
346 |
:-( |
QueryBatch = #cql_query_batch{ |
347 |
|
mode = maps:get(batch_mode, Opts), |
348 |
|
consistency = maps:get(consistency, Opts), |
349 |
:-( |
queries = [Query#cql_query{values = NewRow} || NewRow <- NewRows] |
350 |
|
}, |
351 |
:-( |
Tag = cqerl:send_query(Client, QueryBatch), |
352 |
|
|
353 |
:-( |
NextAction = Action#write_action{current_rows = NewRows, pending_rows = Tail}, |
354 |
:-( |
{Tag, NextAction}; |
355 |
|
send_next_request(Client, #read_action{} = Action, _Opts) -> |
356 |
:-( |
#read_action{query = Query} = Action, |
357 |
|
|
358 |
:-( |
Tag = |
359 |
|
case Action#read_action.last_result of |
360 |
|
undefined -> |
361 |
:-( |
cqerl:send_query(Client, Query); |
362 |
|
Result -> |
363 |
:-( |
cqerl:fetch_more_async(Result) |
364 |
|
end, |
365 |
|
|
366 |
:-( |
{Tag, Action}. |
367 |
|
|
368 |
|
%% Handles query cancel requests (normally scheduled timeout) |
369 |
|
-spec handle_cancel(request_id(), Reason :: term(), worker_state()) -> worker_state(). |
370 |
|
handle_cancel(ReqId, Reason, State) -> |
371 |
:-( |
case maps:get(ReqId, State#state.inflight, undefined) of |
372 |
|
undefined -> |
373 |
:-( |
State; |
374 |
|
#request{tag = Tag} = Req -> |
375 |
:-( |
?LOG_WARNING(#{what => cassandra_aborting_request, reason => Reason, |
376 |
|
pool => State#state.pool_name, |
377 |
|
tag => Tag, error_text => error_text(cancel, Reason, Req), |
378 |
:-( |
query_type => query_type(Req)}), |
379 |
:-( |
cleanup_request(ReqId, State) |
380 |
|
end. |
381 |
|
|
382 |
|
-spec handle_result({result, cql_tag(), cql_result()}, worker_state()) -> worker_state(). |
383 |
|
handle_result({result, Tag, Result} = R, #state{} = State) -> |
384 |
:-( |
case maps:get(Tag, State#state.cql_tags, undefined) of |
385 |
|
undefined -> |
386 |
:-( |
?LOG_WARNING(#{what => cassandra_unexpected_result, |
387 |
|
pool => State#state.pool_name, |
388 |
:-( |
result => R}), |
389 |
:-( |
State; |
390 |
|
ReqId -> |
391 |
:-( |
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight), |
392 |
:-( |
erlang:demonitor(MRef, [flush]), |
393 |
:-( |
do_handle_result(Result, Req, State) |
394 |
|
end. |
395 |
|
|
396 |
|
%% Handles all 'result' responses from DBMS. Depending on the state of request, |
397 |
|
%% initiates next query or returs response to the caller. |
398 |
|
-spec do_handle_result(cql_result(), request(), worker_state()) -> worker_state(). |
399 |
|
do_handle_result(Result, Req, State) -> |
400 |
:-( |
#request{action = Action, id = ReqId} = Req, |
401 |
|
|
402 |
:-( |
IsFinished = is_finished(Req, Result), |
403 |
:-( |
case {IsFinished, Action} of |
404 |
|
%% Continue with unfinished queries |
405 |
|
{false, #write_action{} = A} -> |
406 |
:-( |
NextAction = A#write_action{current_rows = []}, %% Remove just saved rows |
407 |
:-( |
NextReq = Req#request{action = NextAction}, |
408 |
:-( |
NextState = update_req(NextReq, State), |
409 |
|
|
410 |
:-( |
process_request(ReqId, NextState); |
411 |
|
{false, #read_action{} = A} -> |
412 |
|
|
413 |
:-( |
NextAction = A#read_action{last_result = Result}, |
414 |
:-( |
NextReq = Req#request{action = NextAction}, |
415 |
:-( |
NextState = update_req(NextReq, State), |
416 |
:-( |
maybe_reply(Req, {partial, NextReq, Result}), |
417 |
:-( |
NextState; |
418 |
|
|
419 |
|
%% Reply and cleanup for finished queries |
420 |
|
{true, #write_action{}} -> |
421 |
:-( |
maybe_reply(Req, ok), |
422 |
:-( |
cleanup_request(ReqId, State); |
423 |
|
{true, #read_action{}} -> |
424 |
:-( |
maybe_reply(Req, {finished, Result}), |
425 |
:-( |
cleanup_request(ReqId, State) |
426 |
|
|
427 |
|
end. |
428 |
|
|
429 |
|
-spec handle_error({error, cql_tag(), Reason :: term()}, worker_state()) -> worker_state(). |
430 |
|
handle_error({error, Tag, Reason} = Error, #state{} = State) -> |
431 |
:-( |
case maps:get(Tag, State#state.cql_tags, undefined) of |
432 |
|
undefined -> |
433 |
:-( |
?LOG_WARNING(#{what => cassandra_unexpected_error, error => Error, |
434 |
|
pool => State#state.pool_name, |
435 |
:-( |
tag => Tag, reason => Reason}), |
436 |
:-( |
State; |
437 |
|
ReqId -> |
438 |
:-( |
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight), |
439 |
:-( |
erlang:demonitor(MRef, [flush]), |
440 |
:-( |
do_handle_error(cqerl_error, Reason, Req, State) |
441 |
|
end. |
442 |
|
|
443 |
|
-spec handle_down({'DOWN', reference(), process, pid(), Info :: term()}, |
444 |
|
worker_state()) -> worker_state(). |
445 |
|
handle_down({'DOWN', MRef, _, _Pid, Info} = _Down, State) -> |
446 |
:-( |
case maps:get(MRef, State#state.monitors, undefined) of |
447 |
|
undefined -> |
448 |
:-( |
State; |
449 |
|
ReqId -> |
450 |
:-( |
Req = #request{cql_mref = MRef} = maps:get(ReqId, State#state.inflight), |
451 |
:-( |
do_handle_error({down, cqerl_client}, Info, Req, State) |
452 |
|
end. |
453 |
|
|
454 |
|
-spec do_handle_error(error_type(), error_reason(), request(), worker_state()) -> worker_state(). |
455 |
|
do_handle_error(Type, Reason, Req, State) -> |
456 |
:-( |
#request{tag = Tag, id = ReqId, opts = Opts, retry_left = RetryLeft} = Req, |
457 |
|
|
458 |
:-( |
case retry_info(Type, Reason, Req, State) of |
459 |
|
{abort, AbortReason, NextState} -> |
460 |
:-( |
?LOG_WARNING(#{what => cassandra_aborting_query, tag => Tag, |
461 |
|
pool => State#state.pool_name, |
462 |
|
query_type => query_type(Req), reason => AbortReason, |
463 |
:-( |
error_text => error_text(Type, Reason, Req)}), |
464 |
:-( |
maybe_reply(Req, {error, Reason}), |
465 |
:-( |
cleanup_request(Req, NextState); |
466 |
|
{retry, WaitFor, NextState} -> |
467 |
:-( |
?LOG_WARNING(#{what => cassandra_retrying_query, tag => Tag, |
468 |
|
pool => State#state.pool_name, |
469 |
|
query_type => query_type(Req), request_opts => Opts, |
470 |
:-( |
error_text => error_text(Type, Reason, Req), retry_left => RetryLeft}), |
471 |
:-( |
schedule_retry(ReqId, WaitFor, NextState) |
472 |
|
end. |
473 |
|
|
474 |
|
%% Sends given reply is caller is known. |
475 |
|
-spec maybe_reply(request(), Result :: ok | {ok, term()} | |
476 |
|
{error, term()} | {finished, term()} | {partial, request(), term()}) -> any(). |
477 |
|
maybe_reply(#request{from = undefined}, _Result) -> |
478 |
:-( |
ok; |
479 |
|
maybe_reply(#request{from = From}, Result) -> |
480 |
:-( |
gen_server:reply(From, Result). |
481 |
|
|
482 |
|
%% Removes all data and monitors associated with the request. |
483 |
|
-spec cleanup_request(request() | request_id(), worker_state()) -> worker_state(). |
484 |
|
cleanup_request(#request{id = ReqId}, State) -> |
485 |
:-( |
cleanup_request(ReqId, State); |
486 |
|
cleanup_request(ReqId, State) -> |
487 |
:-( |
#state{inflight = Inflight, monitors = Monitors, cql_tags = Tags} = State, |
488 |
:-( |
#request{tag = Tag, cql_mref = CqlMRef, timeout_tref = TRef} = maps:get(ReqId, Inflight), |
489 |
:-( |
catch demonitor(CqlMRef), %% If may be unset at this point and we don't really care about it |
490 |
:-( |
catch erlang:cancel_timer(TRef), |
491 |
|
|
492 |
:-( |
NextInflight = maps:remove(ReqId, Inflight), |
493 |
:-( |
NextMonitors = maps:remove(CqlMRef, Monitors), |
494 |
:-( |
NextTags = maps:remove(Tag, Tags), |
495 |
|
|
496 |
:-( |
State#state{inflight = NextInflight, monitors = NextMonitors, cql_tags = NextTags}. |
497 |
|
|
498 |
|
%% Checks whether request requires more queries to complete. |
499 |
|
-spec is_finished(request(), cql_result()) -> boolean(). |
500 |
|
is_finished(_Req = #request{action = #write_action{pending_rows = []}}, _Result) -> |
501 |
:-( |
true; |
502 |
|
is_finished(_Req = #request{action = #write_action{}}, _Result) -> |
503 |
:-( |
false; |
504 |
|
is_finished(_Req = #request{action = #read_action{}}, Result) -> |
505 |
:-( |
not cqerl:has_more_pages(Result). |
506 |
|
|
507 |
|
%% For given query error, returns whether request should be retried and if so - how soon. |
508 |
|
-spec retry_info(error_type(), error_reason(), request(), worker_state()) -> |
509 |
|
{abort, AbortReason :: term(), worker_state()} | |
510 |
|
{retry, Timeout :: non_neg_integer(), worker_state()}. |
511 |
|
retry_info(_, _, #request{retry_left = 0} = _Req, State) -> |
512 |
:-( |
{abort, retry_limit_exceeded, State}; |
513 |
|
retry_info({down, cqerl_client}, _Reason, _Req, State) -> |
514 |
:-( |
{retry, 5 + rand:uniform(20), State}; |
515 |
|
retry_info(cqerl_error, {16#1100 = _WriteTimout, _, _}, _Req, State) -> |
516 |
:-( |
{retry, 10 + rand:uniform(50), State}; |
517 |
|
retry_info(cqerl_error, {16#1200 = _ReadTimout, _, _}, _Req, State) -> |
518 |
:-( |
{retry, 10 + rand:uniform(50), State}; |
519 |
|
retry_info(cqerl_error, {16#2200 = _WriteToBig, _, _}, Req, State) -> |
520 |
:-( |
#request{id = ReqId, opts = Opts, retry_left = RetryLeft} = Req, |
521 |
:-( |
BatchSize = maps:get(batch_size, Opts), |
522 |
:-( |
NewBatchSize = |
523 |
|
case RetryLeft of |
524 |
|
1 -> %% This is the last try, let's use batch size of 1 |
525 |
:-( |
1; |
526 |
:-( |
_ -> max(1, round(BatchSize * ?BATCH_SHRINK_RATIO)) |
527 |
|
end, |
528 |
|
|
529 |
:-( |
NextOpts = maps:put(batch_size, NewBatchSize, Opts), |
530 |
:-( |
NextReq = Req#request{opts = NextOpts}, |
531 |
:-( |
NextState = State#state{inflight = maps:put(ReqId, NextReq, State#state.inflight)}, |
532 |
|
|
533 |
:-( |
{retry, 0, NextState}; |
534 |
|
retry_info(cqerl_error, {connection_closed, _}, _Req, State) -> |
535 |
:-( |
{retry, 5 + rand:uniform(20), State}; |
536 |
|
retry_info(_Type, _Reason, _Req, State) -> |
537 |
:-( |
{abort, unknown_error, State}. |
538 |
|
|
539 |
|
%% Returns log message for given query error. |
540 |
|
error_text(cancel = Category, Reason, _Request) -> |
541 |
:-( |
io_lib:format("status=error, category=~s, details=~p", |
542 |
|
[Category, Reason]); |
543 |
|
error_text({down, ProcessType}, Reason, Request) -> |
544 |
:-( |
MRef = Request#request.cql_mref, |
545 |
:-( |
io_lib:format("status=error, category=~s, details=~p, process_type=~p, mref=~p", |
546 |
|
[process_down, Reason, ProcessType, MRef]); |
547 |
|
error_text(cqerl_error, Reason, _Request) -> |
548 |
:-( |
cql_error_text(Reason). |
549 |
|
|
550 |
|
%% Returns log message for given cqerl specific query error. |
551 |
|
cql_error_text({ErrorCode, ErrorText, _}) when is_binary(ErrorText) -> |
552 |
:-( |
CodeText = response_code_to_binary(ErrorCode), |
553 |
:-( |
HexErrorCode = "0x" ++ integer_to_list(ErrorCode, 16), |
554 |
:-( |
io_lib:format("status=error, category=~s, details=~s, code=~p", |
555 |
|
[CodeText, ErrorText, HexErrorCode]); |
556 |
|
cql_error_text({ErrorCode, _, ErrorData}) -> |
557 |
:-( |
cql_error_text({ErrorCode, <<"undefined">>, ErrorData}); |
558 |
|
cql_error_text({Category, Reason}) -> |
559 |
:-( |
io_lib:format("status=error, category=~s, details=~p", |
560 |
|
[Category, Reason]); |
561 |
|
cql_error_text(UnknownReason) -> |
562 |
:-( |
io_lib:format("status=error, category=~s, details=~p", |
563 |
|
[unknown, UnknownReason]). |
564 |
|
|
565 |
|
|
566 |
|
%% ==================================================================== |
567 |
|
%% Helpers |
568 |
|
%% ==================================================================== |
569 |
|
|
570 |
|
-spec get_client(mongoose_cassandra:pool_name()) -> |
571 |
|
cqerl:client() | no_return(). |
572 |
|
get_client(PoolName) -> |
573 |
:-( |
get_client_loop(PoolName, 0). |
574 |
|
|
575 |
|
-spec get_client_loop(mongoose_cassandra:pool_name(), non_neg_integer()) -> |
576 |
|
cqerl:client() | no_return(). |
577 |
|
get_client_loop(PoolName, RetryNo) when RetryNo >= 500 -> |
578 |
:-( |
?LOG_WARNING(#{what => cassandra_get_client_failed, |
579 |
:-( |
pool => PoolName}), |
580 |
:-( |
error(cannot_get_cqerl_client, [PoolName, RetryNo]); |
581 |
|
get_client_loop(PoolName, RetryNo) -> |
582 |
:-( |
try cqerl:get_client(PoolName) of |
583 |
|
{ok, {Pid, _Ref} = Client} -> |
584 |
:-( |
case is_process_alive(Pid) of |
585 |
:-( |
true -> Client; |
586 |
:-( |
_ -> throw({dead, Pid}) |
587 |
|
end |
588 |
|
catch |
589 |
|
Class:Reason:Stacktrace -> |
590 |
:-( |
?LOG_INFO(#{what => cassandra_get_client_retry, |
591 |
|
class => Class, reason => Reason, stacktrace => Stacktrace, |
592 |
:-( |
pool => PoolName, retry_number => RetryNo}), |
593 |
|
|
594 |
:-( |
Wait = rand:uniform(10), |
595 |
:-( |
timer:sleep(Wait), |
596 |
:-( |
get_client_loop(PoolName, RetryNo + 1) |
597 |
|
end. |
598 |
|
|
599 |
|
-spec schedule_retry(request_id(), non_neg_integer(), worker_state()) -> worker_state(). |
600 |
|
schedule_retry(ReqId, Time, State) -> |
601 |
:-( |
erlang:send_after(Time, self(), {retry, ReqId}), |
602 |
:-( |
State. |
603 |
|
|
604 |
|
-spec prepare_options(options()) -> options(). |
605 |
|
prepare_options(Opts) -> |
606 |
:-( |
maps:merge(?DEFAULT_OPTS, Opts). |
607 |
|
|
608 |
|
-spec new_request(From :: caller(), options(), action()) -> request(). |
609 |
|
new_request(From, Opts, Action) -> |
610 |
:-( |
RequestId = erlang:make_ref(), |
611 |
|
|
612 |
:-( |
#request{ |
613 |
|
id = RequestId, |
614 |
|
action = Action, |
615 |
|
opts = Opts, |
616 |
|
from = From, |
617 |
|
retry_left = maps:get(retry, Opts) |
618 |
|
}. |
619 |
|
|
620 |
|
-spec new_write_action(query_str(), mongoose_cassandra:rows()) -> write_action(). |
621 |
|
new_write_action(QueryStr, Rows) -> |
622 |
:-( |
#write_action{ |
623 |
|
query = #cql_query{statement = QueryStr}, |
624 |
|
pending_rows = Rows, |
625 |
|
current_rows = [] |
626 |
|
}. |
627 |
|
|
628 |
|
-spec new_read_action(query_str(), mongoose_cassandra:parameters()) -> read_action(). |
629 |
|
new_read_action(QueryStr, Params) -> |
630 |
:-( |
#read_action{ |
631 |
|
query = #cql_query{statement = QueryStr, values = Params} |
632 |
|
}. |
633 |
|
|
634 |
|
schedule_timeout(Timeout, Req = #request{id = ReqId}) -> |
635 |
:-( |
TRef = erlang:send_after(Timeout, self(), {cancel, ReqId, timeout}), |
636 |
:-( |
Req#request{timeout_tref = TRef}. |
637 |
|
|
638 |
|
update_req(Req = #request{id = ReqId}, State = #state{inflight = Inflight}) -> |
639 |
:-( |
State#state{inflight = maps:put(ReqId, Req, Inflight)}. |
640 |
|
|
641 |
|
query_type(#request{action = #write_action{}}) -> |
642 |
:-( |
write; |
643 |
|
query_type(#request{action = #read_action{}}) -> |
644 |
:-( |
read. |
645 |
|
|
646 |
|
%% Source: https://stackoverflow.com/questions/48304330/list-of-cassandra-error-codes |
647 |
|
response_code_to_binary(Code) when is_integer(Code) -> |
648 |
:-( |
case Code of |
649 |
:-( |
16#0000 -> <<"serverError">>; |
650 |
:-( |
16#000A -> <<"protocolError">>; |
651 |
:-( |
16#0100 -> <<"badCredentials">>; |
652 |
:-( |
16#1000 -> <<"unavailableException">>; |
653 |
:-( |
16#1001 -> <<"overloaded">>; |
654 |
:-( |
16#1002 -> <<"isBootstrapping">>; |
655 |
:-( |
16#1003 -> <<"truncateError">>; |
656 |
:-( |
16#1100 -> <<"writeTimeout">>; |
657 |
:-( |
16#1200 -> <<"readTimeout">>; |
658 |
:-( |
16#1300 -> <<"readFailure">>; |
659 |
:-( |
16#1400 -> <<"functionFailure">>; |
660 |
:-( |
16#1500 -> <<"writeFailure">>; |
661 |
:-( |
16#2000 -> <<"syntaxError">>; |
662 |
:-( |
16#2100 -> <<"unauthorized">>; |
663 |
:-( |
16#2200 -> <<"invalid">>; |
664 |
:-( |
16#2300 -> <<"configError">>; |
665 |
:-( |
16#2400 -> <<"alreadyExists">>; |
666 |
:-( |
16#2500 -> <<"unprepared">>; |
667 |
:-( |
_ -> <<"unknownCode=0x", (integer_to_binary(Code, 16))/bytes>> |
668 |
|
end; |
669 |
|
response_code_to_binary(_) -> |
670 |
:-( |
<<"invalidErrorCode">>. |