./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -ifdef(gen_server_request_id).
50 -type request_id() :: gen_server:request_id().
51 -else.
52 -type request_id() :: term().
53 -endif.
54
55 -export_type([escaped_binary/0,
56 escaped_string/0,
57 escaped_like/0,
58 escaped_integer/0,
59 escaped_boolean/0,
60 escaped_null/0,
61 escaped_value/0,
62 sql_query/0,
63 sql_query_part/0]).
64
65 -export([process_options/1]).
66
67 %% External exports
68 -export([prepare/4,
69 prepared/1,
70 execute/3,
71 execute_cast/3,
72 execute_request/3,
73 execute_wrapped_request/4,
74 execute_successfully/3,
75 sql_query/2,
76 sql_query_cast/2,
77 sql_query_request/2,
78 sql_query_t/1,
79 sql_transaction/2,
80 sql_transaction_request/2,
81 transaction_with_delayed_retry/3,
82 sql_dirty/2,
83 to_bool/1,
84 db_engine/1,
85 db_type/0,
86 use_escaped/1]).
87
88 %% Unicode escaping
89 -export([escape_string/1,
90 use_escaped_string/1]).
91
92 %% Integer escaping
93 -export([escape_integer/1,
94 use_escaped_integer/1]).
95
96 %% Boolean escaping
97 -export([escape_boolean/1,
98 use_escaped_boolean/1]).
99
100 %% LIKE escaping
101 -export([escape_like/1,
102 escape_prepared_like/1,
103 escape_like_prefix/1,
104 use_escaped_like/1]).
105
106 %% BLOB escaping
107 -export([escape_binary/2,
108 unescape_binary/2,
109 use_escaped_binary/1]).
110
111 %% Null escaping
112 %% (to keep uniform pattern of passing values)
113 -export([escape_null/0,
114 use_escaped_null/1]).
115
116 %% count / integra types decoding
117 -export([result_to_integer/1,
118 selected_to_integer/1]).
119
120 -export([character_to_integer/1]).
121
122 %% gen_server callbacks
123 -export([init/1,
124 handle_call/3,
125 handle_cast/2,
126 handle_info/2,
127 terminate/2,
128 code_change/3]).
129
130 -ignore_xref([sql_query_cast/2, sql_query_request/2,
131 execute_cast/3, execute_request/3,
132 execute_wrapped_request/4,
133 sql_transaction_request/2,
134 sql_query_t/1, use_escaped/1,
135 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
136 escape_binary/2, use_escaped_binary/1,
137 escape_integer/1, use_escaped_integer/1,
138 escape_string/1, use_escaped_string/1,
139 escape_boolean/1, use_escaped_boolean/1,
140 escape_null/0, use_escaped_null/1
141 ]).
142
143 %% internal usage
144 -export([get_db_info/1]).
145
146 -include("mongoose.hrl").
147
148 -record(state, {db_ref,
149 prepared = #{} :: #{binary() => term()},
150 keepalive_interval :: undefined | pos_integer(),
151 query_timeout :: pos_integer()
152 }).
153 -type state() :: #state{}.
154
155 -define(STATE_KEY, mongoose_rdbms_state).
156 -define(MAX_TRANSACTION_RESTARTS, 10).
157 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
158 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
159 %% The value is arbitrary; supervisor will restart the connection once
160 %% the retry counter runs out. We just attempt to reduce log pollution.
161 -define(CONNECT_RETRIES, 5).
162
163 -type server() :: mongooseim:host_type() | global.
164 -type request_wrapper() :: fun((fun(() -> T)) -> T).
165 -type rdbms_msg() :: {sql_query, _}
166 | {sql_transaction, fun()}
167 | {sql_dirty, fun()}
168 | {sql_execute, atom(), [binary() | boolean() | integer()]}
169 | {sql_execute_wrapped, atom(), [binary() | boolean() | integer()], request_wrapper()}.
170 -type single_query_result() :: {selected, [tuple()]} |
171 {updated, non_neg_integer() | undefined} |
172 {updated, non_neg_integer(), [tuple()]} |
173 {aborted, Reason :: term()} |
174 {error, Reason :: string() | duplicate_key}.
175 -type query_result() :: single_query_result() | [single_query_result()].
176 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
177 -type dirty_result() :: {ok, any()} | {error, any()}.
178 -export_type([query_result/0,
179 transaction_result/0,
180 server/0]).
181
182 -type options() :: #{driver := pgsql | mysql | odbc,
183 max_start_interval := pos_integer(),
184 query_timeout := pos_integer(),
185 atom() => any()}.
186
187 -export_type([options/0]).
188
189 %%%----------------------------------------------------------------------
190 %%% API
191 %%%----------------------------------------------------------------------
192
193 -spec process_options(map()) -> options().
194 process_options(Opts = #{driver := odbc, settings := _}) ->
195 92 Opts;
196 process_options(Opts = #{host := _Host, database := _DB, username := _User, password := _Pass}) ->
197
:-(
ensure_db_port(process_tls_options(Opts));
198 process_options(Opts) ->
199
:-(
error(#{what => invalid_rdbms_connection_options, options => Opts}).
200
201 process_tls_options(Opts = #{driver := mysql, tls := #{required := _}}) ->
202
:-(
error(#{what => invalid_rdbms_tls_options, options => Opts,
203 text => <<"The 'required' option is not supported for MySQL">>});
204 process_tls_options(Opts = #{driver := pgsql, tls := TLSOpts}) ->
205
:-(
Opts#{tls := maps:merge(#{required => false}, TLSOpts)};
206 process_tls_options(Opts) ->
207
:-(
Opts.
208
209
:-(
ensure_db_port(Opts = #{port := _}) -> Opts;
210
:-(
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
211
:-(
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.
212
213 -spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
214 Statement :: iodata()) ->
215 {ok, Name} | {error, already_exists}
216 when Name :: atom().
217 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
218 24225 prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
219 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
220 20883 prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
221 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
222 24225 true = lists:all(fun is_binary/1, Fields),
223 24225 Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
224 24225 case ets:insert_new(prepared_statements, Tuple) of
225 5494 true -> {ok, Name};
226 18731 false -> {error, already_exists}
227 end.
228
229 -spec prepared(atom()) -> boolean().
230 prepared(Name) ->
231 5410 ets:member(prepared_statements, Name).
232
233 -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
234 query_result().
235 execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
236 131229 sql_call(HostType, {sql_execute, Name, Parameters}).
237
238 -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
239 query_result().
240 execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
241 1 sql_cast(HostType, {sql_execute, Name, Parameters}).
242
243 -spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
244 request_id().
245 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
246 650 sql_request(HostType, {sql_execute, Name, Parameters}).
247
248 -spec execute_wrapped_request(
249 HostType :: server(),
250 Name :: atom(),
251 Parameters :: [term()],
252 Wrapper :: request_wrapper()) -> request_id().
253 execute_wrapped_request(HostType, Name, Parameters, Wrapper)
254 when is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
255 13 sql_request(HostType, {sql_execute_wrapped, Name, Parameters, Wrapper}).
256
257 %% Same as execute/3, but would fail loudly on any error.
258 -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
259 query_result().
260 execute_successfully(HostType, Name, Parameters) ->
261 102513 try execute(HostType, Name, Parameters) of
262 {selected, _} = Result ->
263 63381 Result;
264 {updated, _} = Result ->
265 38872 Result;
266 Other ->
267
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
268 statement_query => query_name_to_string(Name),
269 statement_params => Parameters, reason => Other},
270
:-(
?LOG_ERROR(Log),
271
:-(
error(Log)
272 catch error:Reason:Stacktrace ->
273
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
274 statement_query => query_name_to_string(Name),
275 statement_params => Parameters,
276 reason => Reason, stacktrace => Stacktrace},
277
:-(
?LOG_ERROR(Log),
278
:-(
erlang:raise(error, Reason, Stacktrace)
279 end.
280
281 query_name_to_string(Name) ->
282
:-(
case ets:lookup(prepared_statements, Name) of
283 [] ->
284
:-(
not_found;
285 [{_, _Table, _Fields, Statement}] ->
286
:-(
Statement
287 end.
288
289 -spec sql_query(HostType :: server(), Query :: any()) -> query_result().
290 sql_query(HostType, Query) ->
291 1330 sql_call(HostType, {sql_query, Query}).
292
293 -spec sql_query_request(HostType :: server(), Query :: any()) -> request_id().
294 sql_query_request(HostType, Query) ->
295 1 sql_request(HostType, {sql_query, Query}).
296
297 -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
298 sql_query_cast(HostType, Query) ->
299 1 sql_cast(HostType, {sql_query, Query}).
300
301 %% @doc SQL transaction based on a list of queries
302 -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
303 sql_transaction(HostType, Queries) when is_list(Queries) ->
304
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
305
:-(
sql_transaction(HostType, F);
306 %% SQL transaction, based on a erlang anonymous function (F = fun)
307 sql_transaction(HostType, F) when is_function(F) ->
308 16455 sql_call(HostType, {sql_transaction, F}).
309
310 %% @doc SQL transaction based on a list of queries
311 -spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id().
312 sql_transaction_request(HostType, Queries) when is_list(Queries) ->
313 1 F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
314 1 sql_transaction_request(HostType, F);
315 %% SQL transaction, based on a erlang anonymous function (F = fun)
316 sql_transaction_request(HostType, F) when is_function(F) ->
317 1 sql_request(HostType, {sql_transaction, F}).
318
319 %% This function allows to specify delay between retries.
320 -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
321 transaction_with_delayed_retry(HostType, F, Info) ->
322 297 Retries = maps:get(retries, Info),
323 297 Delay = maps:get(delay, Info),
324 297 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
325
326 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
327 297 Result = mongoose_rdbms:sql_transaction(HostType, F),
328 297 case Result of
329 {atomic, _} ->
330 297 Result;
331 {aborted, Reason} when Retries > 0 ->
332
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
333 text => <<"Transaction aborted. Restart">>,
334
:-(
reason => Reason, retries_left => Retries}),
335
:-(
timer:sleep(Delay),
336
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
337 _ ->
338
:-(
Err = Info#{what => mam_transaction_failed,
339 text => <<"Transaction failed. Do not restart">>,
340 reason => Result},
341
:-(
?LOG_ERROR(Err),
342
:-(
erlang:error(Err)
343 end.
344
345 -spec sql_dirty(server(), fun()) -> any() | no_return().
346 sql_dirty(HostType, F) when is_function(F) ->
347
:-(
case sql_call(HostType, {sql_dirty, F}) of
348
:-(
{ok, Result} -> Result;
349
:-(
{error, Error} -> throw(Error)
350 end.
351
352 %% TODO: Better spec for RPC calls
353 -spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any().
354 sql_call(HostType, Msg) ->
355 149014 case get_state() of
356 111097 undefined -> sql_call0(HostType, Msg);
357 State ->
358 37917 {Res, NewState} = nested_op(Msg, State),
359 37649 put_state(NewState),
360 37649 Res
361 end.
362
363 -spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any().
364 sql_call0(HostType, Msg) ->
365 111097 Timestamp = erlang:monotonic_time(millisecond),
366 111097 mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
367
368 -spec sql_request(HostType :: server(), Msg :: rdbms_msg()) -> any().
369 sql_request(HostType, Msg) ->
370 665 case get_state() of
371 654 undefined -> sql_request0(HostType, Msg);
372 State ->
373 11 {Res, NewState} = nested_op(Msg, State),
374
:-(
put_state(NewState),
375
:-(
Res
376 end.
377
378 -spec sql_request0(HostType :: server(), Msg :: rdbms_msg()) -> any().
379 sql_request0(HostType, Msg) ->
380 654 Timestamp = erlang:monotonic_time(millisecond),
381 654 mongoose_wpool:send_request(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
382
383 -spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
384 sql_cast(HostType, Msg) ->
385 2 case get_state() of
386 2 undefined -> sql_cast0(HostType, Msg);
387 State ->
388
:-(
{Res, NewState} = nested_op(Msg, State),
389
:-(
put_state(NewState),
390
:-(
Res
391 end.
392
393 -spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
394 sql_cast0(HostType, Msg) ->
395 2 Timestamp = erlang:monotonic_time(millisecond),
396 2 mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
397
398 -spec get_db_info(Target :: server() | pid()) ->
399 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
400 get_db_info(Pid) when is_pid(Pid) ->
401 650 wpool_process:call(Pid, get_db_info, 5000);
402 get_db_info(HostType) ->
403
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
404
405 %% This function is intended to be used from inside an sql_transaction:
406 sql_query_t(Query) ->
407 2 sql_query_t(Query, get_state()).
408
409 sql_query_t(Query, State) ->
410 2 QRes = sql_query_internal(Query, State),
411 2 case QRes of
412 {error, Reason} ->
413
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
414 _ when is_list(QRes) ->
415
:-(
case lists:keysearch(error, 1, QRes) of
416 {value, {error, Reason}} ->
417
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
418 _ ->
419
:-(
QRes
420 end;
421 _ ->
422 2 QRes
423 end.
424
425 %% Only for binaries, escapes only the content, '%' has to be added afterwards
426 %% The escape character is '$' as '\' causes issues in PostgreSQL
427 %% Returned value is NOT safe to use outside of a prepared statement
428 -spec escape_prepared_like(binary()) -> binary().
429 escape_prepared_like(S) ->
430 18 << (escape_prepared_like_character(C)) || <<C>> <= S >>.
431
432 %% @doc Escape character that will confuse an SQL engine
433 %% Percent and underscore only need to be escaped for
434 %% pattern matching like statement
435 %% INFO: Used in mod_vcard_rdbms.
436 %% Searches in the middle of text, non-efficient
437 -spec escape_like(binary() | string()) -> escaped_like().
438 escape_like(S) ->
439 7 {escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
440
441 -spec escape_like_prefix(binary() | string()) -> escaped_like().
442 escape_like_prefix(S) ->
443
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
444
445 -spec escape_binary(server(), binary()) -> escaped_binary().
446 escape_binary(_HostType, Bin) when is_binary(Bin) ->
447 87 {escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
448
449 %% @doc The same as escape, but returns value including ''
450 -spec escape_string(binary() | string()) -> escaped_string().
451 escape_string(S) ->
452 57 {escaped_string, escape_string_internal(S)}.
453
454 -spec escape_integer(integer()) -> escaped_integer().
455 escape_integer(I) when is_integer(I) ->
456
:-(
{escaped_integer, integer_to_binary(I)}.
457
458 %% Be aware, that we can't just use escaped_integer here.
459 %% Because of the error in pgsql:
460 %% column \"match_all\" is of type boolean but expression is of type integer
461 -spec escape_boolean(boolean()) -> escaped_boolean().
462 escape_boolean(true) ->
463 1 {escaped_boolean, "'1'"};
464 escape_boolean(false) ->
465 1 {escaped_boolean, "'0'"}.
466
467 -spec escape_null() -> escaped_null().
468 escape_null() ->
469 12 {escaped_null, "null"}.
470
471
472 %% @doc SQL-injection check.
473 %% Call this function just before using value from escape_string/1 inside a query.
474 -spec use_escaped_string(escaped_string()) -> sql_query_part().
475 use_escaped_string({escaped_string, S}) ->
476 57 S;
477 use_escaped_string(X) ->
478 %% We need to print an error, because in some places
479 %% the error can be just ignored, because of too wide catches.
480
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
481
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
482
:-(
erlang:error({use_escaped_string, X}).
483
484 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
485 use_escaped_binary({escaped_binary, S}) ->
486 87 S;
487 use_escaped_binary(X) ->
488
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
489
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
490
:-(
erlang:error({use_escaped_binary, X}).
491
492 -spec use_escaped_like(escaped_like()) -> sql_query_part().
493 use_escaped_like({escaped_like, S}) ->
494 7 S;
495 use_escaped_like(X) ->
496
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
497
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
498
:-(
erlang:error({use_escaped_like, X}).
499
500 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
501 use_escaped_integer({escaped_integer, S}) ->
502
:-(
S;
503 use_escaped_integer(X) ->
504
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
505
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
506
:-(
erlang:error({use_escaped_integer, X}).
507
508 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
509 use_escaped_boolean({escaped_boolean, S}) ->
510 2 S;
511 use_escaped_boolean(X) ->
512
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
513
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
514
:-(
erlang:error({use_escaped_boolean, X}).
515
516 -spec use_escaped_null(escaped_null()) -> sql_query_part().
517 use_escaped_null({escaped_null, S}) ->
518 12 S;
519 use_escaped_null(X) ->
520
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
521
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
522
:-(
erlang:error({use_escaped_null, X}).
523
524 %% Use this function, if type is unknown.
525 %% Be aware, you can't pass escaped_like() there.
526 -spec use_escaped(Value) -> sql_query_part() when
527 Value :: escaped_value().
528 use_escaped({escaped_string, _}=X) ->
529 57 use_escaped_string(X);
530 use_escaped({escaped_binary, _}=X) ->
531 87 use_escaped_binary(X);
532 use_escaped({escaped_integer, _}=X) ->
533
:-(
use_escaped_integer(X);
534 use_escaped({escaped_boolean, _}=X) ->
535 2 use_escaped_boolean(X);
536 use_escaped({escaped_null, _}=X) ->
537 12 use_escaped_null(X);
538 use_escaped(X) ->
539
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
540
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
541
:-(
erlang:error({use_escaped, X}).
542
543 -spec escape_prepared_like_character(char()) -> binary().
544 4 escape_prepared_like_character($%) -> <<"$%">>;
545 3 escape_prepared_like_character($_) -> <<"$_">>;
546
:-(
escape_prepared_like_character($$) -> <<"$$">>;
547 61 escape_prepared_like_character(C) -> <<C>>.
548
549 -spec escape_like_internal(binary() | string()) -> binary() | string().
550 escape_like_internal(S) when is_binary(S) ->
551 7 list_to_binary(escape_like_internal(binary_to_list(S)));
552 escape_like_internal(S) when is_list(S) ->
553 7 [escape_like_character(C) || C <- S].
554
555 escape_string_internal(S) ->
556 57 case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
557 true ->
558 57 mongoose_rdbms_backend:escape_string(S);
559 false ->
560 %% generic escaping
561
:-(
[$', escape_characters(S), $']
562 end.
563
564 escape_characters(S) when is_binary(S) ->
565
:-(
list_to_binary(escape_characters(binary_to_list(S)));
566 escape_characters(S) when is_list(S) ->
567
:-(
[escape_character(C) || C <- S].
568
569
:-(
escape_like_character($%) -> "\\%";
570
:-(
escape_like_character($_) -> "\\_";
571 30 escape_like_character(C) -> escape_character(C).
572
573 %% Characters to escape
574
:-(
escape_character($\0) -> "\\0";
575
:-(
escape_character($\n) -> "\\n";
576
:-(
escape_character($\t) -> "\\t";
577
:-(
escape_character($\b) -> "\\b";
578
:-(
escape_character($\r) -> "\\r";
579
:-(
escape_character($') -> "''";
580
:-(
escape_character($") -> "\\\"";
581
:-(
escape_character($\\) -> "\\\\";
582 30 escape_character(C) -> C.
583
584
585 -spec unescape_binary(server(), binary()) -> binary().
586 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
587 6431 mongoose_rdbms_backend:unescape_binary(Bin).
588
589
590 -spec result_to_integer(binary() | integer()) -> integer().
591 result_to_integer(Int) when is_integer(Int) ->
592 28878 Int;
593 result_to_integer(Bin) when is_binary(Bin) ->
594
:-(
binary_to_integer(Bin).
595
596 selected_to_integer({selected, [{BInt}]}) ->
597 9149 result_to_integer(BInt).
598
599 %% Converts a value from a CHAR(1) field to integer
600 2844 character_to_integer(<<X>>) -> X;
601
:-(
character_to_integer(X) when is_integer(X) -> X.
602
603 %% pgsql returns booleans as "t" or "f"
604 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
605 to_bool(B) when is_binary(B) ->
606
:-(
to_bool(binary_to_list(B));
607
:-(
to_bool("t") -> true;
608
:-(
to_bool("true") -> true;
609
:-(
to_bool("1") -> true;
610
:-(
to_bool(true) -> true;
611 205 to_bool(1) -> true;
612 808 to_bool(_) -> false.
613
614 %%%----------------------------------------------------------------------
615 %%% Callback functions from gen_server
616 %%%----------------------------------------------------------------------
617 -spec init(options()) -> {ok, state()}.
618 init(Opts = #{query_timeout := QueryTimeout, max_start_interval := MaxStartInterval}) ->
619 460 process_flag(trap_exit, true),
620 460 KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
621 % retries are delayed exponentially, max_start_interval limits the delay
622 % e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
623 460 case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
624 {ok, DbRef} ->
625 460 schedule_keepalive(KeepaliveInterval),
626 460 {ok, #state{db_ref = DbRef,
627 keepalive_interval = KeepaliveInterval,
628 query_timeout = QueryTimeout}};
629 Error ->
630
:-(
{stop, Error}
631 end.
632
633
634 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
635 111751 {Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
636 111751 case abort_on_driver_error(Result) of
637
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
638 111751 continue -> {reply, Result, NewState}
639 end;
640 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
641 650 {reply, {ok, db_engine(global), DbRef}, State};
642 handle_call(Request, From, State) ->
643
:-(
?UNEXPECTED_CALL(Request, From),
644
:-(
{reply, {error, badarg}, State}.
645
646 handle_cast({sql_cmd, Command, Timestamp}, State) ->
647 2 {Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
648 2 case abort_on_driver_error(Result) of
649
:-(
{stop, Reason} -> {stop, Reason, NewState};
650 2 continue -> {noreply, NewState}
651 end;
652 handle_cast(Request, State) ->
653
:-(
?UNEXPECTED_CAST(Request),
654
:-(
{noreply, State}.
655
656 code_change(_OldVsn, State, _Extra) ->
657
:-(
{ok, State}.
658
659 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
660
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
661 {selected, _} ->
662
:-(
schedule_keepalive(KeepaliveInterval),
663
:-(
{noreply, State};
664 {error, _} = Error ->
665
:-(
{stop, {keepalive_failed, Error}, State}
666 end;
667 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
668
:-(
{stop, Reason, State};
669 handle_info(Info, State) ->
670
:-(
?UNEXPECTED_INFO(Info),
671
:-(
{noreply, State}.
672
673 -spec terminate(Reason :: term(), state()) -> any().
674 terminate(_Reason, #state{db_ref = DbRef}) ->
675 450 catch mongoose_rdbms_backend:disconnect(DbRef).
676
677 %%%----------------------------------------------------------------------
678 %%% Internal functions
679 %%%----------------------------------------------------------------------
680
681 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
682 {Result :: term(), state()}.
683 run_sql_cmd(Command, _From, State, Timestamp) ->
684 111753 Now = erlang:monotonic_time(millisecond),
685 111753 case Now - Timestamp of
686 Age when Age < ?TRANSACTION_TIMEOUT ->
687 111753 outer_op(Command, State);
688 Age ->
689
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
690
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
691
:-(
{reply, {error, timeout}, State}
692 end.
693
694 %% @doc Only called by handle_call, only handles top level operations.
695 -spec outer_op(rdbms_msg(), state()) -> {query_result()
696 | transaction_result()
697 | dirty_result(), state()}.
698 outer_op({sql_query, Query}, State) ->
699 964 {sql_query_internal(Query, State), State};
700 outer_op({sql_transaction, F}, State) ->
701 16456 outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
702 outer_op({sql_dirty, F}, State) ->
703
:-(
sql_dirty_internal(F, State);
704 outer_op({sql_execute, Name, Params}, State) ->
705 94331 sql_execute(outer_op, Name, Params, State);
706 outer_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
707 2 try
708 2 Wrapper(fun() -> sql_execute(outer_op, Name, Params, State) end)
709 catch
710 _Class:Error ->
711 1 ?LOG_ERROR(#{what => sql_execute_wrapped_failed, reason => Error,
712
:-(
statement_name => Name, wrapper_fun => Wrapper}),
713 1 {{error, Error}, State}
714 end.
715
716 %% @doc Called via sql_query/transaction/bloc from client code when inside a
717 %% nested operation
718 -spec nested_op(rdbms_msg(), state()) -> any().
719 nested_op({sql_query, Query}, State) ->
720 %% XXX - use sql_query_t here insted? Most likely would break
721 %% callers who expect {error, _} tuples (sql_query_t turns
722 %% these into throws)
723 368 {sql_query_internal(Query, State), State};
724 nested_op({sql_transaction, F}, State) ->
725 %% Transaction inside a transaction
726
:-(
inner_transaction(F, State);
727 nested_op({sql_execute, Name, Params}, State) ->
728 37549 sql_execute(nested_op, Name, Params, State);
729 nested_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
730 11 Wrapper(fun() -> sql_execute(nested_op, Name, Params, State) end).
731
732 %% @doc Never retry nested transactions - only outer transactions
733 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
734 inner_transaction(F, _State) ->
735
:-(
case catch F() of
736 {aborted, Reason} ->
737
:-(
{aborted, Reason};
738 {'EXIT', Reason} ->
739
:-(
{'EXIT', Reason};
740 {atomic, Res} ->
741
:-(
{atomic, Res};
742 Res ->
743
:-(
{atomic, Res}
744 end.
745
746 -spec outer_transaction(F :: fun(),
747 NRestarts :: 0..10,
748 Reason :: any(), state()) -> {transaction_result(), state()}.
749 outer_transaction(F, NRestarts, _Reason, State) ->
750 16703 sql_query_internal(rdbms_queries:begin_trans(), State),
751 16703 put_state(State),
752 16703 {Result, StackTrace} = apply_transaction_function(F),
753 16703 NewState = erase_state(),
754 16703 case Result of
755 {aborted, Reason} when NRestarts > 0 ->
756 %% Retry outer transaction upto NRestarts times.
757 247 sql_query_internal([<<"rollback;">>], NewState),
758 247 outer_transaction(F, NRestarts - 1, Reason, NewState);
759 {aborted, #{reason := Reason, sql_query := SqlQuery}}
760 when NRestarts =:= 0 ->
761 %% Too many retries of outer transaction.
762
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
763 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
764 last_sql_query => iolist_to_binary(SqlQuery),
765
:-(
stacktrace => StackTrace, state => NewState}),
766
:-(
sql_query_internal([<<"rollback;">>], NewState),
767
:-(
{{aborted, Reason}, NewState};
768 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
769 %% Too many retries of outer transaction.
770 24 ?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
771 restarts => ?MAX_TRANSACTION_RESTARTS,
772 last_abort_reason => Reason, stacktrace => StackTrace,
773
:-(
state => NewState}),
774 24 sql_query_internal([<<"rollback;">>], NewState),
775 24 {{aborted, Reason}, NewState};
776 {'EXIT', Reason} ->
777 %% Abort sql transaction on EXIT from outer txn only.
778 9 sql_query_internal([<<"rollback;">>], NewState),
779 9 {{aborted, Reason}, NewState};
780 Res ->
781 %% Commit successful outer txn
782 16423 sql_query_internal([<<"commit;">>], NewState),
783 16423 {{atomic, Res}, NewState}
784 end.
785
786 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
787 apply_transaction_function(F) ->
788 16703 try
789 16703 {F(), []}
790 catch
791 throw:ThrowResult:StackTrace ->
792 271 {ThrowResult, StackTrace};
793 Class:Reason:StackTrace ->
794 9 ?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
795
:-(
reason => Reason, stacktrace => StackTrace}),
796 9 {{'EXIT', Reason}, StackTrace}
797 end.
798
799 sql_query_internal(Query, #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
800 34740 case mongoose_rdbms_backend:query(DBRef, Query, QueryTimeout) of
801 {error, "No SQL-driver information available."} ->
802
:-(
{updated, 0}; %% workaround for odbc bug
803 Result ->
804 34740 Result
805 end.
806
807 sql_dirty_internal(F, State) ->
808
:-(
put_state(State),
809
:-(
Result =
810
:-(
try F() of
811 Result0 ->
812
:-(
{ok, Result0}
813 catch
814 _C:R ->
815
:-(
{error, R}
816 end,
817
:-(
{Result, erase_state()}.
818
819 -spec sql_execute(Type :: atom(), Name :: atom(), Params :: [term()], state()) ->
820 {query_result(), state()}.
821 sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
822 %% Postgres allows to prepare statement only once, so we should take care that NewState is updated
823 131892 {StatementRef, NewState} = prepare_statement(Name, State),
824 131892 put_state(NewState),
825 131892 Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, QueryTimeout)
826 catch Class:Reason:StackTrace ->
827 8 ?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
828 class => Class, reason => Reason, params => Params,
829
:-(
stacktrace => StackTrace}),
830 8 erlang:raise(Class, Reason, StackTrace)
831 end,
832 131884 check_execute_result(Type, Res, Name, Params),
833 131613 {Res, NewState}.
834
835 %% Similar check as in sql_query_t
836 check_execute_result(outer_op, _Res, _Name, _Params) ->
837 94332 ok;
838 check_execute_result(nested_op, Res, Name, Params) ->
839 %% Res is not a list (i.e. executes are one query only and one result set only)
840 37552 case Res of
841 {error, Reason} ->
842 271 throw({aborted, #{reason => Reason, statement_name => Name, params => Params}});
843 _ when is_tuple(Res) ->
844 37281 ok
845 end.
846
847 -spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
848 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
849 131892 case maps:get(Name, Prepared, undefined) of
850 undefined ->
851 3129 {_, Table, Fields, Statement} = lookup_statement(Name),
852 3129 {ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
853 3129 {Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
854
855 Ref ->
856 128763 {Ref, State}
857 end.
858
859 lookup_statement(Name) ->
860 3129 case ets:lookup(prepared_statements, Name) of
861 3129 [Rec] -> Rec;
862
:-(
[] -> error({lookup_statement_failed, Name})
863 end.
864
865 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
866 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
867
:-(
{stop, timeout};
868 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
869
:-(
{stop, closed};
870 abort_on_driver_error(_) ->
871 111753 continue.
872
873 -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
874 db_engine(_HostType) ->
875 10483 try mongoose_backend:get_backend_name(global, ?MODULE)
876
:-(
catch error:badarg -> undefined end.
877
878 %% @doc Used to optimise queries for different databases.
879 %% @todo Should be refactored to use host types with this module.
880 %% Also, this parameter should not be global, but pool-name parameterized
881 -spec db_type() -> mssql | generic.
882 db_type() ->
883 25561 case mongoose_config:get_opt(rdbms_server_type) of
884 25561 mssql -> mssql;
885
:-(
_ -> generic
886 end.
887
888 -spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
889 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
890 connect(#{query_timeout := QueryTimeout} = Options, Retry, RetryAfter, MaxRetryDelay) ->
891 460 case mongoose_rdbms_backend:connect(Options, QueryTimeout) of
892 {ok, _} = Ok ->
893 460 Ok;
894 Error when Retry =:= 0 ->
895
:-(
Error;
896 Error ->
897
:-(
SleepFor = rand:uniform(RetryAfter),
898
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
899
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
900
:-(
error => Error, sleep_for => SleepFor}),
901
:-(
timer:sleep(timer:seconds(SleepFor)),
902
:-(
NextRetryDelay = RetryAfter * RetryAfter,
903
:-(
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
904 end.
905
906
907 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
908 schedule_keepalive(KeepaliveInterval) ->
909 460 case KeepaliveInterval of
910 _ when is_integer(KeepaliveInterval) ->
911
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
912 undefined ->
913 460 ok;
914 Other ->
915
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
916
:-(
ok
917 end.
918
919 %% ----- process state access, for convenient tracing
920
921 put_state(State) ->
922 186244 put(?STATE_KEY, State).
923
924 erase_state() ->
925 16703 erase(?STATE_KEY).
926
927 get_state() ->
928 149683 get(?STATE_KEY).
Line Hits Source