./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -export_type([escaped_binary/0,
50 escaped_string/0,
51 escaped_like/0,
52 escaped_integer/0,
53 escaped_boolean/0,
54 escaped_null/0,
55 escaped_value/0,
56 sql_query/0,
57 sql_query_part/0]).
58
59 -export([process_options/1]).
60
61 %% External exports
62 -export([prepare/4,
63 prepared/1,
64 execute/3,
65 execute_cast/3,
66 execute_request/3,
67 execute_successfully/3,
68 sql_query/2,
69 sql_query_cast/2,
70 sql_query_request/2,
71 sql_query_t/1,
72 sql_transaction/2,
73 sql_transaction_request/2,
74 transaction_with_delayed_retry/3,
75 sql_dirty/2,
76 to_bool/1,
77 db_engine/1,
78 db_type/0,
79 use_escaped/1]).
80
81 %% Unicode escaping
82 -export([escape_string/1,
83 use_escaped_string/1]).
84
85 %% Integer escaping
86 -export([escape_integer/1,
87 use_escaped_integer/1]).
88
89 %% Boolean escaping
90 -export([escape_boolean/1,
91 use_escaped_boolean/1]).
92
93 %% LIKE escaping
94 -export([escape_like/1,
95 escape_prepared_like/1,
96 escape_like_prefix/1,
97 use_escaped_like/1]).
98
99 %% BLOB escaping
100 -export([escape_binary/2,
101 unescape_binary/2,
102 use_escaped_binary/1]).
103
104 %% Null escaping
105 %% (to keep uniform pattern of passing values)
106 -export([escape_null/0,
107 use_escaped_null/1]).
108
109 %% count / integra types decoding
110 -export([result_to_integer/1,
111 selected_to_integer/1]).
112
113 -export([character_to_integer/1]).
114
115 %% gen_server callbacks
116 -export([init/1,
117 handle_call/3,
118 handle_cast/2,
119 handle_info/2,
120 terminate/2,
121 code_change/3]).
122
123 -ignore_xref([sql_query_cast/2, sql_query_request/2,
124 execute_cast/3, execute_request/3,
125 sql_transaction_request/2,
126 sql_query_t/1, use_escaped/1,
127 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
128 escape_binary/2, use_escaped_binary/1,
129 escape_integer/1, use_escaped_integer/1,
130 escape_string/1, use_escaped_string/1,
131 escape_boolean/1, use_escaped_boolean/1,
132 escape_null/0, use_escaped_null/1
133 ]).
134
135 %% internal usage
136 -export([get_db_info/1]).
137
138 -include("mongoose.hrl").
139
140 -record(state, {db_ref,
141 prepared = #{} :: #{binary() => term()},
142 keepalive_interval :: undefined | pos_integer()
143 }).
144 -type state() :: #state{}.
145
146 -define(STATE_KEY, mongoose_rdbms_state).
147 -define(MAX_TRANSACTION_RESTARTS, 10).
148 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
149 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
150 -define(QUERY_TIMEOUT, 5000).
151 %% The value is arbitrary; supervisor will restart the connection once
152 %% the retry counter runs out. We just attempt to reduce log pollution.
153 -define(CONNECT_RETRIES, 5).
154
155 -type server() :: mongooseim:host_type() | global.
156 -type rdbms_msg() :: {sql_query, _}
157 | {sql_transaction, fun()}
158 | {sql_dirty, fun()}
159 | {sql_execute, atom(), [binary() | boolean() | integer()]}.
160 -type single_query_result() :: {selected, [tuple()]} |
161 {updated, non_neg_integer() | undefined} |
162 {updated, non_neg_integer(), [tuple()]} |
163 {aborted, Reason :: term()} |
164 {error, Reason :: string() | duplicate_key}.
165 -type query_result() :: single_query_result() | [single_query_result()].
166 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
167 -type dirty_result() :: {ok, any()} | {error, any()}.
168 -export_type([query_result/0,
169 server/0]).
170
171 -type options() :: #{driver := pgsql | mysql | odbc,
172 max_start_interval := pos_integer(),
173 atom() => any()}.
174
175 -export_type([options/0]).
176
177 %%%----------------------------------------------------------------------
178 %%% API
179 %%%----------------------------------------------------------------------
180
181 -spec process_options(map()) -> options().
182 process_options(Opts = #{driver := odbc, settings := _}) ->
183
:-(
Opts;
184 process_options(Opts = #{host := _Host, database := _DB, username := _User, password := _Pass}) ->
185
:-(
ensure_db_port(process_tls_options(Opts));
186 process_options(Opts) ->
187
:-(
error(#{what => invalid_rdbms_connection_options, options => Opts}).
188
189 process_tls_options(Opts = #{driver := mysql, tls := #{required := _}}) ->
190
:-(
error(#{what => invalid_rdbms_tls_options, options => Opts,
191 text => <<"The 'required' option is not supported for MySQL">>});
192 process_tls_options(Opts = #{driver := pgsql, tls := TLSOpts}) ->
193
:-(
Opts#{tls := maps:merge(#{required => false}, TLSOpts)};
194 process_tls_options(Opts) ->
195
:-(
Opts.
196
197
:-(
ensure_db_port(Opts = #{port := _}) -> Opts;
198
:-(
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
199
:-(
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.
200
201 -spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
202 Statement :: iodata()) ->
203 {ok, Name} | {error, already_exists}
204 when Name :: atom().
205 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
206
:-(
prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
207 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
208
:-(
prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
209 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
210
:-(
true = lists:all(fun is_binary/1, Fields),
211
:-(
Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
212
:-(
case ets:insert_new(prepared_statements, Tuple) of
213
:-(
true -> {ok, Name};
214
:-(
false -> {error, already_exists}
215 end.
216
217 -spec prepared(atom()) -> boolean().
218 prepared(Name) ->
219
:-(
ets:member(prepared_statements, Name).
220
221 -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
222 query_result().
223 execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
224
:-(
sql_call(HostType, {sql_execute, Name, Parameters}).
225
226 -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
227 query_result().
228 execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
229
:-(
sql_cast(HostType, {sql_execute, Name, Parameters}).
230
231 -spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
232 gen_server:request_id().
233 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
234
:-(
sql_request(HostType, {sql_execute, Name, Parameters}).
235
236 %% Same as execute/3, but would fail loudly on any error.
237 -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
238 query_result().
239 execute_successfully(HostType, Name, Parameters) ->
240
:-(
try execute(HostType, Name, Parameters) of
241 {selected, _} = Result ->
242
:-(
Result;
243 {updated, _} = Result ->
244
:-(
Result;
245 Other ->
246
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
247 statement_query => query_name_to_string(Name),
248 statement_params => Parameters, reason => Other},
249
:-(
?LOG_ERROR(Log),
250
:-(
error(Log)
251 catch error:Reason:Stacktrace ->
252
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
253 statement_query => query_name_to_string(Name),
254 statement_params => Parameters,
255 reason => Reason, stacktrace => Stacktrace},
256
:-(
?LOG_ERROR(Log),
257
:-(
erlang:raise(error, Reason, Stacktrace)
258 end.
259
260 query_name_to_string(Name) ->
261
:-(
case ets:lookup(prepared_statements, Name) of
262 [] ->
263
:-(
not_found;
264 [{_, _Table, _Fields, Statement}] ->
265
:-(
Statement
266 end.
267
268 -spec sql_query(HostType :: server(), Query :: any()) -> query_result().
269 sql_query(HostType, Query) ->
270
:-(
sql_call(HostType, {sql_query, Query}).
271
272 -spec sql_query_request(HostType :: server(), Query :: any()) -> gen_server:request_id().
273 sql_query_request(HostType, Query) ->
274
:-(
sql_request(HostType, {sql_query, Query}).
275
276 -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
277 sql_query_cast(HostType, Query) ->
278
:-(
sql_cast(HostType, {sql_query, Query}).
279
280 %% @doc SQL transaction based on a list of queries
281 -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
282 sql_transaction(HostType, Queries) when is_list(Queries) ->
283
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
284
:-(
sql_transaction(HostType, F);
285 %% SQL transaction, based on a erlang anonymous function (F = fun)
286 sql_transaction(HostType, F) when is_function(F) ->
287 310 sql_call(HostType, {sql_transaction, F}).
288
289 %% @doc SQL transaction based on a list of queries
290 -spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> transaction_result().
291 sql_transaction_request(HostType, Queries) when is_list(Queries) ->
292
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
293
:-(
sql_transaction_request(HostType, F);
294 %% SQL transaction, based on a erlang anonymous function (F = fun)
295 sql_transaction_request(HostType, F) when is_function(F) ->
296
:-(
sql_request(HostType, {sql_transaction, F}).
297
298 %% This function allows to specify delay between retries.
299 -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
300 transaction_with_delayed_retry(HostType, F, Info) ->
301
:-(
Retries = maps:get(retries, Info),
302
:-(
Delay = maps:get(delay, Info),
303
:-(
do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
304
305 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
306
:-(
Result = mongoose_rdbms:sql_transaction(HostType, F),
307
:-(
case Result of
308 {atomic, _} ->
309
:-(
Result;
310 {aborted, Reason} when Retries > 0 ->
311
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
312 text => <<"Transaction aborted. Restart">>,
313
:-(
reason => Reason, retries_left => Retries}),
314
:-(
timer:sleep(Delay),
315
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
316 _ ->
317
:-(
Err = Info#{what => mam_transaction_failed,
318 text => <<"Transaction failed. Do not restart">>,
319 reason => Result},
320
:-(
?LOG_ERROR(Err),
321
:-(
erlang:error(Err)
322 end.
323
324 -spec sql_dirty(server(), fun()) -> any() | no_return().
325 sql_dirty(HostType, F) when is_function(F) ->
326
:-(
case sql_call(HostType, {sql_dirty, F}) of
327
:-(
{ok, Result} -> Result;
328
:-(
{error, Error} -> throw(Error)
329 end.
330
331 %% TODO: Better spec for RPC calls
332 -spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any().
333 sql_call(HostType, Msg) ->
334 310 case get_state() of
335 310 undefined -> sql_call0(HostType, Msg);
336 State ->
337
:-(
{Res, NewState} = nested_op(Msg, State),
338
:-(
put_state(NewState),
339
:-(
Res
340 end.
341
342 -spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any().
343 sql_call0(HostType, Msg) ->
344 310 Timestamp = erlang:monotonic_time(millisecond),
345 310 mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
346
347 -spec sql_request(HostType :: server(), Msg :: rdbms_msg()) -> any().
348 sql_request(HostType, Msg) ->
349
:-(
case get_state() of
350
:-(
undefined -> sql_request0(HostType, Msg);
351 State ->
352
:-(
{Res, NewState} = nested_op(Msg, State),
353
:-(
put_state(NewState),
354
:-(
Res
355 end.
356
357 -spec sql_request0(HostType :: server(), Msg :: rdbms_msg()) -> any().
358 sql_request0(HostType, Msg) ->
359
:-(
Timestamp = erlang:monotonic_time(millisecond),
360
:-(
mongoose_wpool:send_request(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
361
362 -spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
363 sql_cast(HostType, Msg) ->
364
:-(
case get_state() of
365
:-(
undefined -> sql_cast0(HostType, Msg);
366 State ->
367
:-(
{Res, NewState} = nested_op(Msg, State),
368
:-(
put_state(NewState),
369
:-(
Res
370 end.
371
372 -spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
373 sql_cast0(HostType, Msg) ->
374
:-(
Timestamp = erlang:monotonic_time(millisecond),
375
:-(
mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
376
377 -spec get_db_info(Target :: server() | pid()) ->
378 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
379 get_db_info(Pid) when is_pid(Pid) ->
380
:-(
wpool_process:call(Pid, get_db_info, 5000);
381 get_db_info(HostType) ->
382
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
383
384 %% This function is intended to be used from inside an sql_transaction:
385 sql_query_t(Query) ->
386
:-(
sql_query_t(Query, get_state()).
387
388 sql_query_t(Query, State) ->
389
:-(
QRes = sql_query_internal(Query, State),
390
:-(
case QRes of
391 {error, Reason} ->
392
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
393 _ when is_list(QRes) ->
394
:-(
case lists:keysearch(error, 1, QRes) of
395 {value, {error, Reason}} ->
396
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
397 _ ->
398
:-(
QRes
399 end;
400 _ ->
401
:-(
QRes
402 end.
403
404 %% Only for binaries, escapes only the content, '%' has to be added afterwards
405 %% The escape character is '$' as '\' causes issues in PostgreSQL
406 %% Returned value is NOT safe to use outside of a prepared statement
407 -spec escape_prepared_like(binary()) -> binary().
408 escape_prepared_like(S) ->
409
:-(
<< (escape_prepared_like_character(C)) || <<C>> <= S >>.
410
411 %% @doc Escape character that will confuse an SQL engine
412 %% Percent and underscore only need to be escaped for
413 %% pattern matching like statement
414 %% INFO: Used in mod_vcard_rdbms.
415 %% Searches in the middle of text, non-efficient
416 -spec escape_like(binary() | string()) -> escaped_like().
417 escape_like(S) ->
418
:-(
{escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
419
420 -spec escape_like_prefix(binary() | string()) -> escaped_like().
421 escape_like_prefix(S) ->
422
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
423
424 -spec escape_binary(server(), binary()) -> escaped_binary().
425 escape_binary(_HostType, Bin) when is_binary(Bin) ->
426
:-(
{escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
427
428 %% @doc The same as escape, but returns value including ''
429 -spec escape_string(binary() | string()) -> escaped_string().
430 escape_string(S) ->
431
:-(
{escaped_string, escape_string_internal(S)}.
432
433 -spec escape_integer(integer()) -> escaped_integer().
434 escape_integer(I) when is_integer(I) ->
435
:-(
{escaped_integer, integer_to_binary(I)}.
436
437 %% Be aware, that we can't just use escaped_integer here.
438 %% Because of the error in pgsql:
439 %% column \"match_all\" is of type boolean but expression is of type integer
440 -spec escape_boolean(boolean()) -> escaped_boolean().
441 escape_boolean(true) ->
442
:-(
{escaped_boolean, "'1'"};
443 escape_boolean(false) ->
444
:-(
{escaped_boolean, "'0'"}.
445
446 -spec escape_null() -> escaped_null().
447 escape_null() ->
448
:-(
{escaped_null, "null"}.
449
450
451 %% @doc SQL-injection check.
452 %% Call this function just before using value from escape_string/1 inside a query.
453 -spec use_escaped_string(escaped_string()) -> sql_query_part().
454 use_escaped_string({escaped_string, S}) ->
455
:-(
S;
456 use_escaped_string(X) ->
457 %% We need to print an error, because in some places
458 %% the error can be just ignored, because of too wide catches.
459
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
460
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
461
:-(
erlang:error({use_escaped_string, X}).
462
463 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
464 use_escaped_binary({escaped_binary, S}) ->
465
:-(
S;
466 use_escaped_binary(X) ->
467
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
468
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
469
:-(
erlang:error({use_escaped_binary, X}).
470
471 -spec use_escaped_like(escaped_like()) -> sql_query_part().
472 use_escaped_like({escaped_like, S}) ->
473
:-(
S;
474 use_escaped_like(X) ->
475
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
476
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
477
:-(
erlang:error({use_escaped_like, X}).
478
479 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
480 use_escaped_integer({escaped_integer, S}) ->
481
:-(
S;
482 use_escaped_integer(X) ->
483
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
484
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
485
:-(
erlang:error({use_escaped_integer, X}).
486
487 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
488 use_escaped_boolean({escaped_boolean, S}) ->
489
:-(
S;
490 use_escaped_boolean(X) ->
491
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
492
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
493
:-(
erlang:error({use_escaped_boolean, X}).
494
495 -spec use_escaped_null(escaped_null()) -> sql_query_part().
496 use_escaped_null({escaped_null, S}) ->
497
:-(
S;
498 use_escaped_null(X) ->
499
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
500
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
501
:-(
erlang:error({use_escaped_null, X}).
502
503 %% Use this function, if type is unknown.
504 %% Be aware, you can't pass escaped_like() there.
505 -spec use_escaped(Value) -> sql_query_part() when
506 Value :: escaped_value().
507 use_escaped({escaped_string, _}=X) ->
508
:-(
use_escaped_string(X);
509 use_escaped({escaped_binary, _}=X) ->
510
:-(
use_escaped_binary(X);
511 use_escaped({escaped_integer, _}=X) ->
512
:-(
use_escaped_integer(X);
513 use_escaped({escaped_boolean, _}=X) ->
514
:-(
use_escaped_boolean(X);
515 use_escaped({escaped_null, _}=X) ->
516
:-(
use_escaped_null(X);
517 use_escaped(X) ->
518
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
519
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
520
:-(
erlang:error({use_escaped, X}).
521
522 -spec escape_prepared_like_character(char()) -> binary().
523
:-(
escape_prepared_like_character($%) -> <<"$%">>;
524
:-(
escape_prepared_like_character($_) -> <<"$_">>;
525
:-(
escape_prepared_like_character($$) -> <<"$$">>;
526
:-(
escape_prepared_like_character(C) -> <<C>>.
527
528 -spec escape_like_internal(binary() | string()) -> binary() | string().
529 escape_like_internal(S) when is_binary(S) ->
530
:-(
list_to_binary(escape_like_internal(binary_to_list(S)));
531 escape_like_internal(S) when is_list(S) ->
532
:-(
[escape_like_character(C) || C <- S].
533
534 escape_string_internal(S) ->
535
:-(
case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
536 true ->
537
:-(
mongoose_rdbms_backend:escape_string(S);
538 false ->
539 %% generic escaping
540
:-(
[$', escape_characters(S), $']
541 end.
542
543 escape_characters(S) when is_binary(S) ->
544
:-(
list_to_binary(escape_characters(binary_to_list(S)));
545 escape_characters(S) when is_list(S) ->
546
:-(
[escape_character(C) || C <- S].
547
548
:-(
escape_like_character($%) -> "\\%";
549
:-(
escape_like_character($_) -> "\\_";
550
:-(
escape_like_character(C) -> escape_character(C).
551
552 %% Characters to escape
553
:-(
escape_character($\0) -> "\\0";
554
:-(
escape_character($\n) -> "\\n";
555
:-(
escape_character($\t) -> "\\t";
556
:-(
escape_character($\b) -> "\\b";
557
:-(
escape_character($\r) -> "\\r";
558
:-(
escape_character($') -> "''";
559
:-(
escape_character($") -> "\\\"";
560
:-(
escape_character($\\) -> "\\\\";
561
:-(
escape_character(C) -> C.
562
563
564 -spec unescape_binary(server(), binary()) -> binary().
565 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
566
:-(
mongoose_rdbms_backend:unescape_binary(Bin).
567
568
569 -spec result_to_integer(binary() | integer()) -> integer().
570 result_to_integer(Int) when is_integer(Int) ->
571
:-(
Int;
572 result_to_integer(Bin) when is_binary(Bin) ->
573
:-(
binary_to_integer(Bin).
574
575 selected_to_integer({selected, [{BInt}]}) ->
576
:-(
result_to_integer(BInt).
577
578 %% Converts a value from a CHAR(1) field to integer
579
:-(
character_to_integer(<<X>>) -> X;
580
:-(
character_to_integer(X) when is_integer(X) -> X.
581
582 %% pgsql returns booleans as "t" or "f"
583 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
584 to_bool(B) when is_binary(B) ->
585
:-(
to_bool(binary_to_list(B));
586
:-(
to_bool("t") -> true;
587
:-(
to_bool("true") -> true;
588
:-(
to_bool("1") -> true;
589
:-(
to_bool(true) -> true;
590
:-(
to_bool(1) -> true;
591
:-(
to_bool(_) -> false.
592
593 %%%----------------------------------------------------------------------
594 %%% Callback functions from gen_server
595 %%%----------------------------------------------------------------------
596 -spec init(options()) -> {ok, state()}.
597 init(Opts = #{max_start_interval := MaxStartInterval}) ->
598
:-(
process_flag(trap_exit, true),
599
:-(
KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
600 % retries are delayed exponentially, max_start_interval limits the delay
601 % e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
602
:-(
case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
603 {ok, DbRef} ->
604
:-(
schedule_keepalive(KeepaliveInterval),
605
:-(
{ok, #state{db_ref = DbRef, keepalive_interval = KeepaliveInterval}};
606 Error ->
607
:-(
{stop, Error}
608 end.
609
610
611 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
612
:-(
{Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
613
:-(
case abort_on_driver_error(Result) of
614
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
615
:-(
continue -> {reply, Result, NewState}
616 end;
617 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
618
:-(
{reply, {ok, db_engine(global), DbRef}, State};
619 handle_call(Request, From, State) ->
620
:-(
?UNEXPECTED_CALL(Request, From),
621
:-(
{reply, {error, badarg}, State}.
622
623 handle_cast({sql_cmd, Command, Timestamp}, State) ->
624
:-(
{Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
625
:-(
case abort_on_driver_error(Result) of
626
:-(
{stop, Reason} -> {stop, Reason, NewState};
627
:-(
continue -> {noreply, NewState}
628 end;
629 handle_cast(Request, State) ->
630
:-(
?UNEXPECTED_CAST(Request),
631
:-(
{noreply, State}.
632
633 code_change(_OldVsn, State, _Extra) ->
634
:-(
{ok, State}.
635
636 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
637
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
638 {selected, _} ->
639
:-(
schedule_keepalive(KeepaliveInterval),
640
:-(
{noreply, State};
641 {error, _} = Error ->
642
:-(
{stop, {keepalive_failed, Error}, State}
643 end;
644 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
645
:-(
{stop, Reason, State};
646 handle_info(Info, State) ->
647
:-(
?UNEXPECTED_INFO(Info),
648
:-(
{noreply, State}.
649
650 -spec terminate(Reason :: term(), state()) -> any().
651 terminate(_Reason, #state{db_ref = DbRef}) ->
652
:-(
catch mongoose_rdbms_backend:disconnect(DbRef).
653
654 %%%----------------------------------------------------------------------
655 %%% Internal functions
656 %%%----------------------------------------------------------------------
657
658 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
659 {Result :: term(), state()}.
660 run_sql_cmd(Command, _From, State, Timestamp) ->
661
:-(
Now = erlang:monotonic_time(millisecond),
662
:-(
case Now - Timestamp of
663 Age when Age < ?TRANSACTION_TIMEOUT ->
664
:-(
outer_op(Command, State);
665 Age ->
666
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
667
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
668
:-(
{reply, {error, timeout}, State}
669 end.
670
671 %% @doc Only called by handle_call, only handles top level operations.
672 -spec outer_op(rdbms_msg(), state()) -> {query_result()
673 | transaction_result()
674 | dirty_result(), state()}.
675 outer_op({sql_query, Query}, State) ->
676
:-(
{sql_query_internal(Query, State), State};
677 outer_op({sql_transaction, F}, State) ->
678
:-(
outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
679 outer_op({sql_dirty, F}, State) ->
680
:-(
sql_dirty_internal(F, State);
681 outer_op({sql_execute, Name, Params}, State) ->
682
:-(
sql_execute(Name, Params, State).
683
684 %% @doc Called via sql_query/transaction/bloc from client code when inside a
685 %% nested operation
686 -spec nested_op(rdbms_msg(), state()) -> any().
687 nested_op({sql_query, Query}, State) ->
688 %% XXX - use sql_query_t here insted? Most likely would break
689 %% callers who expect {error, _} tuples (sql_query_t turns
690 %% these into throws)
691
:-(
{sql_query_internal(Query, State), State};
692 nested_op({sql_transaction, F}, State) ->
693 %% Transaction inside a transaction
694
:-(
inner_transaction(F, State);
695 nested_op({sql_execute, Name, Params}, State) ->
696
:-(
sql_execute(Name, Params, State).
697
698 %% @doc Never retry nested transactions - only outer transactions
699 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
700 inner_transaction(F, _State) ->
701
:-(
case catch F() of
702 {aborted, Reason} ->
703
:-(
{aborted, Reason};
704 {'EXIT', Reason} ->
705
:-(
{'EXIT', Reason};
706 {atomic, Res} ->
707
:-(
{atomic, Res};
708 Res ->
709
:-(
{atomic, Res}
710 end.
711
712 -spec outer_transaction(F :: fun(),
713 NRestarts :: 0..10,
714 Reason :: any(), state()) -> {transaction_result(), state()}.
715 outer_transaction(F, NRestarts, _Reason, State) ->
716
:-(
sql_query_internal(rdbms_queries:begin_trans(), State),
717
:-(
put_state(State),
718
:-(
{Result, StackTrace} = apply_transaction_function(F),
719
:-(
NewState = erase_state(),
720
:-(
case Result of
721 {aborted, Reason} when NRestarts > 0 ->
722 %% Retry outer transaction upto NRestarts times.
723
:-(
sql_query_internal([<<"rollback;">>], NewState),
724
:-(
outer_transaction(F, NRestarts - 1, Reason, NewState);
725 {aborted, #{reason := Reason, sql_query := SqlQuery}}
726 when NRestarts =:= 0 ->
727 %% Too many retries of outer transaction.
728
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
729 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
730 last_sql_query => iolist_to_binary(SqlQuery),
731
:-(
stacktrace => StackTrace, state => NewState}),
732
:-(
sql_query_internal([<<"rollback;">>], NewState),
733
:-(
{{aborted, Reason}, NewState};
734 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
735 %% Too many retries of outer transaction.
736
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
737 restarts => ?MAX_TRANSACTION_RESTARTS,
738 last_abort_reason => Reason, stacktrace => StackTrace,
739
:-(
state => NewState}),
740
:-(
sql_query_internal([<<"rollback;">>], NewState),
741
:-(
{{aborted, Reason}, NewState};
742 {'EXIT', Reason} ->
743 %% Abort sql transaction on EXIT from outer txn only.
744
:-(
sql_query_internal([<<"rollback;">>], NewState),
745
:-(
{{aborted, Reason}, NewState};
746 Res ->
747 %% Commit successful outer txn
748
:-(
sql_query_internal([<<"commit;">>], NewState),
749
:-(
{{atomic, Res}, NewState}
750 end.
751
752 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
753 apply_transaction_function(F) ->
754
:-(
try
755
:-(
{F(), []}
756 catch
757 throw:ThrowResult:StackTrace ->
758
:-(
{ThrowResult, StackTrace};
759 Class:Reason:StackTrace ->
760
:-(
?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
761
:-(
reason => Reason, stacktrace => StackTrace}),
762
:-(
{{'EXIT', Reason}, StackTrace}
763 end.
764
765 sql_query_internal(Query, #state{db_ref = DBRef}) ->
766
:-(
case mongoose_rdbms_backend:query(DBRef, Query, ?QUERY_TIMEOUT) of
767 {error, "No SQL-driver information available."} ->
768
:-(
{updated, 0}; %% workaround for odbc bug
769 Result ->
770
:-(
Result
771 end.
772
773 sql_dirty_internal(F, State) ->
774
:-(
put_state(State),
775
:-(
Result =
776
:-(
try F() of
777 Result0 ->
778
:-(
{ok, Result0}
779 catch
780 _C:R ->
781
:-(
{error, R}
782 end,
783
:-(
{Result, erase_state()}.
784
785 -spec sql_execute(Name :: atom(), Params :: [term()], state()) -> {query_result(), state()}.
786 sql_execute(Name, Params, State = #state{db_ref = DBRef}) ->
787
:-(
{StatementRef, NewState} = prepare_statement(Name, State),
788
:-(
Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, ?QUERY_TIMEOUT)
789 catch Class:Reason:StackTrace ->
790
:-(
?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
791 class => Class, reason => Reason, params => Params,
792
:-(
stacktrace => StackTrace}),
793
:-(
erlang:raise(Class, Reason, StackTrace)
794 end,
795
:-(
{Res, NewState}.
796
797 -spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
798 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
799
:-(
case maps:get(Name, Prepared, undefined) of
800 undefined ->
801
:-(
{_, Table, Fields, Statement} = lookup_statement(Name),
802
:-(
{ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
803
:-(
{Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
804
805 Ref ->
806
:-(
{Ref, State}
807 end.
808
809 lookup_statement(Name) ->
810
:-(
case ets:lookup(prepared_statements, Name) of
811
:-(
[Rec] -> Rec;
812
:-(
[] -> error({lookup_statement_failed, Name})
813 end.
814
815 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
816 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
817
:-(
{stop, timeout};
818 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
819
:-(
{stop, closed};
820 abort_on_driver_error(_) ->
821
:-(
continue.
822
823 -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
824 db_engine(_HostType) ->
825 4 try mongoose_backend:get_backend_name(global, ?MODULE)
826 4 catch error:badarg -> undefined end.
827
828 %% @doc Used to optimise queries for different databases.
829 %% @todo Should be refactored to use host types with this module.
830 %% Also, this parameter should not be global, but pool-name parameterized
831 -spec db_type() -> mssql | generic.
832 db_type() ->
833
:-(
case mongoose_config:get_opt(rdbms_server_type) of
834
:-(
mssql -> mssql;
835
:-(
_ -> generic
836 end.
837
838 -spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
839 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
840 connect(Options, Retry, RetryAfter, MaxRetryDelay) ->
841
:-(
case mongoose_rdbms_backend:connect(Options, ?QUERY_TIMEOUT) of
842 {ok, _} = Ok ->
843
:-(
Ok;
844 Error when Retry =:= 0 ->
845
:-(
Error;
846 Error ->
847
:-(
SleepFor = rand:uniform(RetryAfter),
848
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
849
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
850
:-(
error => Error, sleep_for => SleepFor}),
851
:-(
timer:sleep(timer:seconds(SleepFor)),
852
:-(
NextRetryDelay = RetryAfter * RetryAfter,
853
:-(
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
854 end.
855
856
857 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
858 schedule_keepalive(KeepaliveInterval) ->
859
:-(
case KeepaliveInterval of
860 _ when is_integer(KeepaliveInterval) ->
861
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
862 undefined ->
863
:-(
ok;
864 Other ->
865
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
866
:-(
ok
867 end.
868
869 %% ----- process state access, for convenient tracing
870
871 put_state(State) ->
872
:-(
put(?STATE_KEY, State).
873
874 erase_state() ->
875
:-(
erase(?STATE_KEY).
876
877 get_state() ->
878 310 get(?STATE_KEY).
Line Hits Source