./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -export_type([escaped_binary/0,
50 escaped_string/0,
51 escaped_like/0,
52 escaped_integer/0,
53 escaped_boolean/0,
54 escaped_null/0,
55 escaped_value/0,
56 sql_query/0,
57 sql_query_part/0]).
58
59 %% External exports
60 -export([prepare/4,
61 prepared/1,
62 execute/3,
63 execute_cast/3,
64 execute_successfully/3,
65 sql_query/2,
66 sql_query_cast/2,
67 sql_query_t/1,
68 sql_transaction/2,
69 transaction_with_delayed_retry/3,
70 sql_dirty/2,
71 to_bool/1,
72 db_engine/1,
73 db_type/0,
74 print_state/1,
75 use_escaped/1]).
76
77 %% Unicode escaping
78 -export([escape_string/1,
79 use_escaped_string/1]).
80
81 %% Integer escaping
82 -export([escape_integer/1,
83 use_escaped_integer/1]).
84
85 %% Boolean escaping
86 -export([escape_boolean/1,
87 use_escaped_boolean/1]).
88
89 %% LIKE escaping
90 -export([escape_like/1,
91 escape_prepared_like/1,
92 escape_like_prefix/1,
93 use_escaped_like/1]).
94
95 %% BLOB escaping
96 -export([escape_binary/2,
97 unescape_binary/2,
98 use_escaped_binary/1]).
99
100 %% Null escaping
101 %% (to keep uniform pattern of passing values)
102 -export([escape_null/0,
103 use_escaped_null/1]).
104
105 %% count / integra types decoding
106 -export([result_to_integer/1,
107 selected_to_integer/1]).
108
109 -export([character_to_integer/1]).
110
111 %% gen_server callbacks
112 -export([init/1,
113 handle_call/3,
114 handle_cast/2,
115 handle_info/2,
116 terminate/2,
117 code_change/3]).
118
119 -ignore_xref([behaviour_info/1, print_state/1,
120 sql_query_cast/2, execute_cast/3,
121 sql_query_t/1, use_escaped/1,
122 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
123 escape_binary/2, use_escaped_binary/1,
124 escape_integer/1, use_escaped_integer/1,
125 escape_string/1, use_escaped_string/1,
126 escape_boolean/1, use_escaped_boolean/1,
127 escape_null/0, use_escaped_null/1
128 ]).
129
130 %% internal usage
131 -export([get_db_info/1]).
132
133 -include("mongoose.hrl").
134
135 -record(state, {db_ref,
136 prepared = #{} :: #{binary() => term()},
137 keepalive_interval :: undefined | pos_integer()
138 }).
139 -type state() :: #state{}.
140
141 -define(STATE_KEY, mongoose_rdbms_state).
142 -define(MAX_TRANSACTION_RESTARTS, 10).
143 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
144 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
145 -define(QUERY_TIMEOUT, 5000).
146 %% The value is arbitrary; supervisor will restart the connection once
147 %% the retry counter runs out. We just attempt to reduce log pollution.
148 -define(CONNECT_RETRIES, 5).
149
150 -type server() :: mongooseim:host_type() | global.
151 -type rdbms_msg() :: {sql_query, _}
152 | {sql_transaction, fun()}
153 | {sql_dirty, fun()}
154 | {sql_execute, atom(), [binary() | boolean() | integer()]}.
155 -type single_query_result() :: {selected, [tuple()]} |
156 {updated, non_neg_integer() | undefined} |
157 {updated, non_neg_integer(), [tuple()]} |
158 {aborted, Reason :: term()} |
159 {error, Reason :: string() | duplicate_key}.
160 -type query_result() :: single_query_result() | [single_query_result()].
161 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
162 -type dirty_result() :: {ok, any()} | {error, any()}.
163 -export_type([query_result/0,
164 server/0]).
165
166 %%%----------------------------------------------------------------------
167 %%% API
168 %%%----------------------------------------------------------------------
169
170 -spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
171 Statement :: iodata()) ->
172 {ok, Name} | {error, already_exists}
173 when Name :: atom().
174 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
175
:-(
prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
176 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
177
:-(
prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
178 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
179
:-(
true = lists:all(fun is_binary/1, Fields),
180
:-(
Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
181
:-(
case ets:insert_new(prepared_statements, Tuple) of
182
:-(
true -> {ok, Name};
183
:-(
false -> {error, already_exists}
184 end.
185
186 -spec prepared(atom()) -> boolean().
187 prepared(Name) ->
188
:-(
ets:member(prepared_statements, Name).
189
190 -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
191 query_result().
192 execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
193
:-(
sql_call(HostType, {sql_execute, Name, Parameters}).
194
195 -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
196 query_result().
197 execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
198
:-(
sql_cast(HostType, {sql_execute, Name, Parameters}).
199
200 %% Same as execute/3, but would fail loudly on any error.
201 -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
202 query_result().
203 execute_successfully(HostType, Name, Parameters) ->
204
:-(
try execute(HostType, Name, Parameters) of
205 {selected, _} = Result ->
206
:-(
Result;
207 {updated, _} = Result ->
208
:-(
Result;
209 Other ->
210
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
211 statement_query => query_name_to_string(Name),
212 statement_params => Parameters, reason => Other},
213
:-(
?LOG_ERROR(Log),
214
:-(
error(Log)
215 catch error:Reason:Stacktrace ->
216
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
217 statement_query => query_name_to_string(Name),
218 statement_params => Parameters,
219 reason => Reason, stacktrace => Stacktrace},
220
:-(
?LOG_ERROR(Log),
221
:-(
erlang:raise(error, Reason, Stacktrace)
222 end.
223
224 query_name_to_string(Name) ->
225
:-(
case ets:lookup(prepared_statements, Name) of
226 [] ->
227
:-(
not_found;
228 [{_, _Table, _Fields, Statement}] ->
229
:-(
Statement
230 end.
231
232 -spec sql_query(HostType :: server(), Query :: any()) -> query_result().
233 sql_query(HostType, Query) ->
234
:-(
sql_call(HostType, {sql_query, Query}).
235
236 -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
237 sql_query_cast(HostType, Query) ->
238
:-(
sql_cast(HostType, {sql_query, Query}).
239
240 %% @doc SQL transaction based on a list of queries
241 -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
242 sql_transaction(HostType, Queries) when is_list(Queries) ->
243
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
244
:-(
sql_transaction(HostType, F);
245 %% SQL transaction, based on a erlang anonymous function (F = fun)
246 sql_transaction(HostType, F) when is_function(F) ->
247 224 sql_call(HostType, {sql_transaction, F}).
248
249 %% This function allows to specify delay between retries.
250 -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
251 transaction_with_delayed_retry(HostType, F, Info) ->
252
:-(
Retries = maps:get(retries, Info),
253
:-(
Delay = maps:get(delay, Info),
254
:-(
do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
255
256 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
257
:-(
Result = mongoose_rdbms:sql_transaction(HostType, F),
258
:-(
case Result of
259 {atomic, _} ->
260
:-(
Result;
261 {aborted, Reason} when Retries > 0 ->
262
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
263 text => <<"Transaction aborted. Restart">>,
264
:-(
reason => Reason, retries_left => Retries}),
265
:-(
timer:sleep(Delay),
266
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
267 _ ->
268
:-(
Err = Info#{what => mam_transaction_failed,
269 text => <<"Transaction failed. Do not restart">>,
270 reason => Result},
271
:-(
?LOG_ERROR(Err),
272
:-(
erlang:error(Err)
273 end.
274
275 -spec sql_dirty(server(), fun()) -> any() | no_return().
276 sql_dirty(HostType, F) when is_function(F) ->
277
:-(
case sql_call(HostType, {sql_dirty, F}) of
278
:-(
{ok, Result} -> Result;
279
:-(
{error, Error} -> throw(Error)
280 end.
281
282 %% TODO: Better spec for RPC calls
283 -spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any().
284 sql_call(HostType, Msg) ->
285 224 case get_state() of
286 224 undefined -> sql_call0(HostType, Msg);
287 State ->
288
:-(
{Res, NewState} = nested_op(Msg, State),
289
:-(
put_state(NewState),
290
:-(
Res
291 end.
292
293 -spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any().
294 sql_call0(HostType, Msg) ->
295 224 Timestamp = erlang:monotonic_time(millisecond),
296 224 mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
297
298 -spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
299 sql_cast(HostType, Msg) ->
300
:-(
case get_state() of
301
:-(
undefined -> sql_cast0(HostType, Msg);
302 State ->
303
:-(
{Res, NewState} = nested_op(Msg, State),
304
:-(
put_state(NewState),
305
:-(
Res
306 end.
307
308 -spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
309 sql_cast0(HostType, Msg) ->
310
:-(
Timestamp = erlang:monotonic_time(millisecond),
311
:-(
mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
312
313 -spec get_db_info(Target :: server() | pid()) ->
314 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
315 get_db_info(Pid) when is_pid(Pid) ->
316
:-(
wpool_process:call(Pid, get_db_info, 5000);
317 get_db_info(HostType) ->
318
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
319
320 %% This function is intended to be used from inside an sql_transaction:
321 sql_query_t(Query) ->
322
:-(
sql_query_t(Query, get_state()).
323
324 sql_query_t(Query, State) ->
325
:-(
QRes = sql_query_internal(Query, State),
326
:-(
case QRes of
327 {error, Reason} ->
328
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
329 _ when is_list(QRes) ->
330
:-(
case lists:keysearch(error, 1, QRes) of
331 {value, {error, Reason}} ->
332
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
333 _ ->
334
:-(
QRes
335 end;
336 _ ->
337
:-(
QRes
338 end.
339
340 %% Only for binaries, escapes only the content, '%' has to be added afterwards
341 %% The escape character is '$' as '\' causes issues in PostgreSQL
342 %% Returned value is NOT safe to use outside of a prepared statement
343 -spec escape_prepared_like(binary()) -> binary().
344 escape_prepared_like(S) ->
345
:-(
<< (escape_prepared_like_character(C)) || <<C>> <= S >>.
346
347 %% @doc Escape character that will confuse an SQL engine
348 %% Percent and underscore only need to be escaped for
349 %% pattern matching like statement
350 %% INFO: Used in mod_vcard_rdbms.
351 %% Searches in the middle of text, non-efficient
352 -spec escape_like(binary() | string()) -> escaped_like().
353 escape_like(S) ->
354
:-(
{escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
355
356 -spec escape_like_prefix(binary() | string()) -> escaped_like().
357 escape_like_prefix(S) ->
358
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
359
360 -spec escape_binary(server(), binary()) -> escaped_binary().
361 escape_binary(_HostType, Bin) when is_binary(Bin) ->
362
:-(
{escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
363
364 %% @doc The same as escape, but returns value including ''
365 -spec escape_string(binary() | string()) -> escaped_string().
366 escape_string(S) ->
367
:-(
{escaped_string, escape_string_internal(S)}.
368
369 -spec escape_integer(integer()) -> escaped_integer().
370 escape_integer(I) when is_integer(I) ->
371
:-(
{escaped_integer, integer_to_binary(I)}.
372
373 %% Be aware, that we can't just use escaped_integer here.
374 %% Because of the error in pgsql:
375 %% column \"match_all\" is of type boolean but expression is of type integer
376 -spec escape_boolean(boolean()) -> escaped_boolean().
377 escape_boolean(true) ->
378
:-(
{escaped_boolean, "'1'"};
379 escape_boolean(false) ->
380
:-(
{escaped_boolean, "'0'"}.
381
382 -spec escape_null() -> escaped_null().
383 escape_null() ->
384
:-(
{escaped_null, "null"}.
385
386
387 %% @doc SQL-injection check.
388 %% Call this function just before using value from escape_string/1 inside a query.
389 -spec use_escaped_string(escaped_string()) -> sql_query_part().
390 use_escaped_string({escaped_string, S}) ->
391
:-(
S;
392 use_escaped_string(X) ->
393 %% We need to print an error, because in some places
394 %% the error can be just ignored, because of too wide catches.
395
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
396
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
397
:-(
erlang:error({use_escaped_string, X}).
398
399 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
400 use_escaped_binary({escaped_binary, S}) ->
401
:-(
S;
402 use_escaped_binary(X) ->
403
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
404
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
405
:-(
erlang:error({use_escaped_binary, X}).
406
407 -spec use_escaped_like(escaped_like()) -> sql_query_part().
408 use_escaped_like({escaped_like, S}) ->
409
:-(
S;
410 use_escaped_like(X) ->
411
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
412
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
413
:-(
erlang:error({use_escaped_like, X}).
414
415 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
416 use_escaped_integer({escaped_integer, S}) ->
417
:-(
S;
418 use_escaped_integer(X) ->
419
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
420
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
421
:-(
erlang:error({use_escaped_integer, X}).
422
423 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
424 use_escaped_boolean({escaped_boolean, S}) ->
425
:-(
S;
426 use_escaped_boolean(X) ->
427
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
428
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
429
:-(
erlang:error({use_escaped_boolean, X}).
430
431 -spec use_escaped_null(escaped_null()) -> sql_query_part().
432 use_escaped_null({escaped_null, S}) ->
433
:-(
S;
434 use_escaped_null(X) ->
435
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
436
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
437
:-(
erlang:error({use_escaped_null, X}).
438
439 %% Use this function, if type is unknown.
440 %% Be aware, you can't pass escaped_like() there.
441 -spec use_escaped(Value) -> sql_query_part() when
442 Value :: escaped_value().
443 use_escaped({escaped_string, _}=X) ->
444
:-(
use_escaped_string(X);
445 use_escaped({escaped_binary, _}=X) ->
446
:-(
use_escaped_binary(X);
447 use_escaped({escaped_integer, _}=X) ->
448
:-(
use_escaped_integer(X);
449 use_escaped({escaped_boolean, _}=X) ->
450
:-(
use_escaped_boolean(X);
451 use_escaped({escaped_null, _}=X) ->
452
:-(
use_escaped_null(X);
453 use_escaped(X) ->
454
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
455
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
456
:-(
erlang:error({use_escaped, X}).
457
458 -spec escape_prepared_like_character(char()) -> binary().
459
:-(
escape_prepared_like_character($%) -> <<"$%">>;
460
:-(
escape_prepared_like_character($_) -> <<"$_">>;
461
:-(
escape_prepared_like_character($$) -> <<"$$">>;
462
:-(
escape_prepared_like_character(C) -> <<C>>.
463
464 -spec escape_like_internal(binary() | string()) -> binary() | string().
465 escape_like_internal(S) when is_binary(S) ->
466
:-(
list_to_binary(escape_like_internal(binary_to_list(S)));
467 escape_like_internal(S) when is_list(S) ->
468
:-(
[escape_like_character(C) || C <- S].
469
470 escape_string_internal(S) ->
471
:-(
case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
472 true ->
473
:-(
mongoose_rdbms_backend:escape_string(S);
474 false ->
475 %% generic escaping
476
:-(
[$', escape_characters(S), $']
477 end.
478
479 escape_characters(S) when is_binary(S) ->
480
:-(
list_to_binary(escape_characters(binary_to_list(S)));
481 escape_characters(S) when is_list(S) ->
482
:-(
[escape_character(C) || C <- S].
483
484
:-(
escape_like_character($%) -> "\\%";
485
:-(
escape_like_character($_) -> "\\_";
486
:-(
escape_like_character(C) -> escape_character(C).
487
488 %% Characters to escape
489
:-(
escape_character($\0) -> "\\0";
490
:-(
escape_character($\n) -> "\\n";
491
:-(
escape_character($\t) -> "\\t";
492
:-(
escape_character($\b) -> "\\b";
493
:-(
escape_character($\r) -> "\\r";
494
:-(
escape_character($') -> "''";
495
:-(
escape_character($") -> "\\\"";
496
:-(
escape_character($\\) -> "\\\\";
497
:-(
escape_character(C) -> C.
498
499
500 -spec unescape_binary(server(), binary()) -> binary().
501 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
502
:-(
mongoose_rdbms_backend:unescape_binary(Bin).
503
504
505 -spec result_to_integer(binary() | integer()) -> integer().
506 result_to_integer(Int) when is_integer(Int) ->
507
:-(
Int;
508 result_to_integer(Bin) when is_binary(Bin) ->
509
:-(
binary_to_integer(Bin).
510
511 selected_to_integer({selected, [{BInt}]}) ->
512
:-(
result_to_integer(BInt).
513
514 %% Converts a value from a CHAR(1) field to integer
515
:-(
character_to_integer(<<X>>) -> X;
516
:-(
character_to_integer(X) when is_integer(X) -> X.
517
518 %% pgsql returns booleans as "t" or "f"
519 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
520 to_bool(B) when is_binary(B) ->
521
:-(
to_bool(binary_to_list(B));
522
:-(
to_bool("t") -> true;
523
:-(
to_bool("true") -> true;
524
:-(
to_bool("1") -> true;
525
:-(
to_bool(true) -> true;
526
:-(
to_bool(1) -> true;
527
:-(
to_bool(_) -> false.
528
529 %%%----------------------------------------------------------------------
530 %%% Callback functions from gen_server
531 %%%----------------------------------------------------------------------
532 -spec init(term()) -> {ok, state()}.
533 init(Opts) ->
534
:-(
process_flag(trap_exit, true),
535
:-(
{server, Settings} = lists:keyfind(server, 1, Opts),
536
:-(
KeepaliveInterval = proplists:get_value(keepalive_interval, Opts),
537 % retries are delayed exponentially, this param limits the delay
538 % so if we start with 2 and try 6 times, we have 2, 4, 8, 16, 30
539
:-(
MaxStartInterval = proplists:get_value(start_interval, Opts, 30),
540
:-(
case connect(Settings, ?CONNECT_RETRIES, 2, MaxStartInterval) of
541 {ok, DbRef} ->
542
:-(
schedule_keepalive(KeepaliveInterval),
543
:-(
{ok, #state{db_ref = DbRef, keepalive_interval = KeepaliveInterval}};
544 Error ->
545
:-(
{stop, Error}
546 end.
547
548
549 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
550
:-(
{Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
551
:-(
case abort_on_driver_error(Result) of
552
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
553
:-(
continue -> {reply, Result, NewState}
554 end;
555 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
556
:-(
{reply, {ok, db_engine(global), DbRef}, State};
557 handle_call(Request, From, State) ->
558
:-(
?UNEXPECTED_CALL(Request, From),
559
:-(
{reply, {error, badarg}, State}.
560
561 handle_cast({sql_cmd, Command, Timestamp}, State) ->
562
:-(
{Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
563
:-(
case abort_on_driver_error(Result) of
564
:-(
{stop, Reason} -> {stop, Reason, NewState};
565
:-(
continue -> {noreply, NewState}
566 end;
567 handle_cast(Request, State) ->
568
:-(
?UNEXPECTED_CAST(Request),
569
:-(
{noreply, State}.
570
571 code_change(_OldVsn, State, _Extra) ->
572
:-(
{ok, State}.
573
574 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
575
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
576 {selected, _} ->
577
:-(
schedule_keepalive(KeepaliveInterval),
578
:-(
{noreply, State};
579 {error, _} = Error ->
580
:-(
{stop, {keepalive_failed, Error}, State}
581 end;
582 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
583
:-(
{stop, Reason, State};
584 handle_info(Info, State) ->
585
:-(
?UNEXPECTED_INFO(Info),
586
:-(
{noreply, State}.
587
588 -spec terminate(Reason :: term(), state()) -> any().
589 terminate(_Reason, #state{db_ref = DbRef}) ->
590
:-(
catch mongoose_rdbms_backend:disconnect(DbRef).
591
592 %%----------------------------------------------------------------------
593 %% Func: print_state/1
594 %% Purpose: Prepare the state to be printed on error log
595 %% Returns: State to print
596 %%----------------------------------------------------------------------
597 print_state(State) ->
598
:-(
State.
599 %%%----------------------------------------------------------------------
600 %%% Internal functions
601 %%%----------------------------------------------------------------------
602
603 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
604 {Result :: term(), state()}.
605 run_sql_cmd(Command, _From, State, Timestamp) ->
606
:-(
Now = erlang:monotonic_time(millisecond),
607
:-(
case Now - Timestamp of
608 Age when Age < ?TRANSACTION_TIMEOUT ->
609
:-(
outer_op(Command, State);
610 Age ->
611
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
612
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
613
:-(
{reply, {error, timeout}, State}
614 end.
615
616 %% @doc Only called by handle_call, only handles top level operations.
617 -spec outer_op(rdbms_msg(), state()) -> {query_result()
618 | transaction_result()
619 | dirty_result(), state()}.
620 outer_op({sql_query, Query}, State) ->
621
:-(
{sql_query_internal(Query, State), State};
622 outer_op({sql_transaction, F}, State) ->
623
:-(
outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
624 outer_op({sql_dirty, F}, State) ->
625
:-(
sql_dirty_internal(F, State);
626 outer_op({sql_execute, Name, Params}, State) ->
627
:-(
sql_execute(Name, Params, State).
628
629 %% @doc Called via sql_query/transaction/bloc from client code when inside a
630 %% nested operation
631 -spec nested_op(rdbms_msg(), state()) -> any().
632 nested_op({sql_query, Query}, State) ->
633 %% XXX - use sql_query_t here insted? Most likely would break
634 %% callers who expect {error, _} tuples (sql_query_t turns
635 %% these into throws)
636
:-(
{sql_query_internal(Query, State), State};
637 nested_op({sql_transaction, F}, State) ->
638 %% Transaction inside a transaction
639
:-(
inner_transaction(F, State);
640 nested_op({sql_execute, Name, Params}, State) ->
641
:-(
sql_execute(Name, Params, State).
642
643 %% @doc Never retry nested transactions - only outer transactions
644 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
645 inner_transaction(F, _State) ->
646
:-(
case catch F() of
647 {aborted, Reason} ->
648
:-(
{aborted, Reason};
649 {'EXIT', Reason} ->
650
:-(
{'EXIT', Reason};
651 {atomic, Res} ->
652
:-(
{atomic, Res};
653 Res ->
654
:-(
{atomic, Res}
655 end.
656
657 -spec outer_transaction(F :: fun(),
658 NRestarts :: 0..10,
659 Reason :: any(), state()) -> {transaction_result(), state()}.
660 outer_transaction(F, NRestarts, _Reason, State) ->
661
:-(
sql_query_internal(rdbms_queries:begin_trans(), State),
662
:-(
put_state(State),
663
:-(
{Result, StackTrace} = apply_transaction_function(F),
664
:-(
NewState = erase_state(),
665
:-(
case Result of
666 {aborted, Reason} when NRestarts > 0 ->
667 %% Retry outer transaction upto NRestarts times.
668
:-(
sql_query_internal([<<"rollback;">>], NewState),
669
:-(
outer_transaction(F, NRestarts - 1, Reason, NewState);
670 {aborted, #{reason := Reason, sql_query := SqlQuery}}
671 when NRestarts =:= 0 ->
672 %% Too many retries of outer transaction.
673
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
674 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
675 last_sql_query => iolist_to_binary(SqlQuery),
676
:-(
stacktrace => StackTrace, state => NewState}),
677
:-(
sql_query_internal([<<"rollback;">>], NewState),
678
:-(
{{aborted, Reason}, NewState};
679 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
680 %% Too many retries of outer transaction.
681
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
682 restarts => ?MAX_TRANSACTION_RESTARTS,
683 last_abort_reason => Reason, stacktrace => StackTrace,
684
:-(
state => NewState}),
685
:-(
sql_query_internal([<<"rollback;">>], NewState),
686
:-(
{{aborted, Reason}, NewState};
687 {'EXIT', Reason} ->
688 %% Abort sql transaction on EXIT from outer txn only.
689
:-(
sql_query_internal([<<"rollback;">>], NewState),
690
:-(
{{aborted, Reason}, NewState};
691 Res ->
692 %% Commit successful outer txn
693
:-(
sql_query_internal([<<"commit;">>], NewState),
694
:-(
{{atomic, Res}, NewState}
695 end.
696
697 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
698 apply_transaction_function(F) ->
699
:-(
try
700
:-(
{F(), []}
701 catch
702 throw:ThrowResult:StackTrace ->
703
:-(
{ThrowResult, StackTrace};
704 Class:Reason:StackTrace ->
705
:-(
?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
706
:-(
reason => Reason, stacktrace => StackTrace}),
707
:-(
{{'EXIT', Reason}, StackTrace}
708 end.
709
710 sql_query_internal(Query, #state{db_ref = DBRef}) ->
711
:-(
case mongoose_rdbms_backend:query(DBRef, Query, ?QUERY_TIMEOUT) of
712 {error, "No SQL-driver information available."} ->
713
:-(
{updated, 0}; %% workaround for odbc bug
714 Result ->
715
:-(
Result
716 end.
717
718 sql_dirty_internal(F, State) ->
719
:-(
put_state(State),
720
:-(
Result =
721
:-(
try F() of
722 Result0 ->
723
:-(
{ok, Result0}
724 catch
725 _C:R ->
726
:-(
{error, R}
727 end,
728
:-(
{Result, erase_state()}.
729
730 -spec sql_execute(Name :: atom(), Params :: [term()], state()) -> {query_result(), state()}.
731 sql_execute(Name, Params, State = #state{db_ref = DBRef}) ->
732
:-(
{StatementRef, NewState} = prepare_statement(Name, State),
733
:-(
Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, ?QUERY_TIMEOUT)
734 catch Class:Reason:StackTrace ->
735
:-(
?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
736 class => Class, reason => Reason, params => Params,
737
:-(
stacktrace => StackTrace}),
738
:-(
erlang:raise(Class, Reason, StackTrace)
739 end,
740
:-(
{Res, NewState}.
741
742 -spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
743 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
744
:-(
case maps:get(Name, Prepared, undefined) of
745 undefined ->
746
:-(
{_, Table, Fields, Statement} = lookup_statement(Name),
747
:-(
{ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
748
:-(
{Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
749
750 Ref ->
751
:-(
{Ref, State}
752 end.
753
754 lookup_statement(Name) ->
755
:-(
case ets:lookup(prepared_statements, Name) of
756
:-(
[Rec] -> Rec;
757
:-(
[] -> error({lookup_statement_failed, Name})
758 end.
759
760 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
761 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
762
:-(
{stop, timeout};
763 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
764
:-(
{stop, closed};
765 abort_on_driver_error(_) ->
766
:-(
continue.
767
768 -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
769 db_engine(_HostType) ->
770 4 try mongoose_backend:get_backend_name(global, ?MODULE)
771 4 catch error:badarg -> undefined end.
772
773 %% @doc Used to optimise queries for different databases.
774 %% @todo Should be refactored to use host types with this module.
775 %% Also, this parameter should not be global, but pool-name parameterized
776 -spec db_type() -> mssql | generic.
777 db_type() ->
778
:-(
case mongoose_config:get_opt(rdbms_server_type) of
779
:-(
mssql -> mssql;
780
:-(
_ -> generic
781 end.
782
783 -spec connect(Settings :: term(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
784 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
785 connect(Settings, Retry, RetryAfter, MaxRetryDelay) ->
786
:-(
case mongoose_rdbms_backend:connect(Settings, ?QUERY_TIMEOUT) of
787 {ok, _} = Ok ->
788
:-(
Ok;
789 Error when Retry =:= 0 ->
790
:-(
Error;
791 Error ->
792
:-(
SleepFor = rand:uniform(RetryAfter),
793
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
794
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
795
:-(
error => Error, sleep_for => SleepFor}),
796
:-(
timer:sleep(timer:seconds(SleepFor)),
797
:-(
NextRetryDelay = RetryAfter * RetryAfter,
798
:-(
connect(Settings, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
799 end.
800
801
802 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
803 schedule_keepalive(KeepaliveInterval) ->
804
:-(
case KeepaliveInterval of
805 _ when is_integer(KeepaliveInterval) ->
806
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
807 undefined ->
808
:-(
ok;
809 Other ->
810
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
811
:-(
ok
812 end.
813
814 %% ----- process state access, for convenient tracing
815
816 put_state(State) ->
817
:-(
put(?STATE_KEY, State).
818
819 erase_state() ->
820
:-(
erase(?STATE_KEY).
821
822 get_state() ->
823 224 get(?STATE_KEY).
Line Hits Source