./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -ifdef(gen_server_request_id).
50 -type request_id() :: gen_server:request_id().
51 -else.
52 -type request_id() :: term().
53 -endif.
54
55 -export_type([escaped_binary/0,
56 escaped_string/0,
57 escaped_like/0,
58 escaped_integer/0,
59 escaped_boolean/0,
60 escaped_null/0,
61 escaped_value/0,
62 sql_query/0,
63 sql_query_part/0]).
64
65 -export([process_options/1]).
66
67 %% External exports
68 -export([prepare/4,
69 prepared/1,
70 execute/3, execute/4,
71 execute_cast/3, execute_cast/4,
72 execute_request/3, execute_request/4,
73 execute_wrapped_request/4, execute_wrapped_request/5,
74 execute_successfully/3, execute_successfully/4,
75 sql_query/2, sql_query/3,
76 sql_query_cast/2, sql_query_cast/3,
77 sql_query_request/2, sql_query_request/3,
78 sql_transaction/2, sql_transaction/3,
79 sql_transaction_request/2, sql_transaction_request/3,
80 sql_dirty/2, sql_dirty/3,
81 sql_query_t/1,
82 transaction_with_delayed_retry/3,
83 to_bool/1,
84 db_engine/1,
85 db_type/0,
86 use_escaped/1]).
87
88 %% Unicode escaping
89 -export([escape_string/1,
90 use_escaped_string/1]).
91
92 %% Integer escaping
93 -export([escape_integer/1,
94 use_escaped_integer/1]).
95
96 %% Boolean escaping
97 -export([escape_boolean/1,
98 use_escaped_boolean/1]).
99
100 %% LIKE escaping
101 -export([escape_like/1,
102 escape_prepared_like/1,
103 escape_like_prefix/1,
104 use_escaped_like/1]).
105
106 %% BLOB escaping
107 -export([escape_binary/2,
108 unescape_binary/2,
109 use_escaped_binary/1]).
110
111 %% Null escaping
112 %% (to keep uniform pattern of passing values)
113 -export([escape_null/0,
114 use_escaped_null/1]).
115
116 %% count / integra types decoding
117 -export([result_to_integer/1,
118 selected_to_integer/1]).
119
120 -export([character_to_integer/1]).
121
122 %% gen_server callbacks
123 -export([init/1,
124 handle_call/3,
125 handle_cast/2,
126 handle_info/2,
127 terminate/2,
128 code_change/3]).
129
130 -ignore_xref([
131 sql_query_cast/2, sql_query_request/2,
132 execute/4, execute_wrapped_request/5,
133 sql_query_cast/3, sql_query_request/3, sql_dirty/3,
134 execute_cast/3, execute_cast/4,
135 execute_request/3, execute_request/4,
136 execute_wrapped_request/4,
137 sql_transaction_request/2, sql_transaction_request/3,
138 sql_query_t/1, use_escaped/1,
139 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
140 escape_binary/2, use_escaped_binary/1,
141 escape_integer/1, use_escaped_integer/1,
142 escape_string/1, use_escaped_string/1,
143 escape_boolean/1, use_escaped_boolean/1,
144 escape_null/0, use_escaped_null/1
145 ]).
146
147 %% internal usage
148 -export([get_db_info/1]).
149
150 -include("mongoose.hrl").
151
152 -record(state, {db_ref,
153 prepared = #{} :: #{binary() => term()},
154 keepalive_interval :: undefined | pos_integer(),
155 query_timeout :: pos_integer()
156 }).
157 -type state() :: #state{}.
158
159 -define(DEFAULT_POOL, default).
160 -define(STATE_KEY, mongoose_rdbms_state).
161 -define(MAX_TRANSACTION_RESTARTS, 10).
162 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
163 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
164 %% The value is arbitrary; supervisor will restart the connection once
165 %% the retry counter runs out. We just attempt to reduce log pollution.
166 -define(CONNECT_RETRIES, 5).
167
168 -type server() :: mongooseim:host_type() | global.
169 -type request_wrapper() :: fun((fun(() -> T)) -> T).
170 -type rdbms_msg() :: {sql_query, _}
171 | {sql_transaction, fun()}
172 | {sql_dirty, fun()}
173 | {sql_execute, atom(), [binary() | boolean() | integer()]}
174 | {sql_execute_wrapped, atom(), [binary() | boolean() | integer()], request_wrapper()}.
175 -type single_query_result() :: {selected, [tuple()]} |
176 {updated, non_neg_integer() | undefined} |
177 {updated, non_neg_integer(), [tuple()]} |
178 {aborted, Reason :: term()} |
179 {error, Reason :: string() | duplicate_key}.
180 -type query_result() :: single_query_result() | [single_query_result()].
181 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
182 -type dirty_result() :: {ok, any()} | {error, any()}.
183 -export_type([query_result/0,
184 transaction_result/0,
185 server/0]).
186
187 -type options() :: #{driver := pgsql | mysql | odbc,
188 max_start_interval := pos_integer(),
189 query_timeout := pos_integer(),
190 atom() => any()}.
191
192 -export_type([options/0]).
193
194 %%%----------------------------------------------------------------------
195 %%% API
196 %%%----------------------------------------------------------------------
197
198 -spec process_options(map()) -> options().
199 process_options(Opts = #{driver := odbc, settings := _}) ->
200
:-(
Opts;
201 process_options(Opts = #{host := _Host, database := _DB, username := _User, password := _Pass}) ->
202 88 ensure_db_port(process_tls_options(Opts));
203 process_options(Opts) ->
204
:-(
error(#{what => invalid_rdbms_connection_options, options => Opts}).
205
206 process_tls_options(Opts = #{driver := mysql, tls := #{required := _}}) ->
207
:-(
error(#{what => invalid_rdbms_tls_options, options => Opts,
208 text => <<"The 'required' option is not supported for MySQL">>});
209 process_tls_options(Opts = #{driver := pgsql, tls := TLSOpts}) ->
210 88 Opts#{tls := maps:merge(#{required => false}, TLSOpts)};
211 process_tls_options(Opts) ->
212
:-(
Opts.
213
214
:-(
ensure_db_port(Opts = #{port := _}) -> Opts;
215 88 ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
216
:-(
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.
217
218 -spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
219 Statement :: iodata()) ->
220 {ok, Name} | {error, already_exists}
221 when Name :: atom().
222 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
223 14721 prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
224 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
225 12940 prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
226 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
227 14721 true = lists:all(fun is_binary/1, Fields),
228 14721 Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
229 14721 case ets:insert_new(prepared_statements, Tuple) of
230 4721 true -> {ok, Name};
231 10000 false -> {error, already_exists}
232 end.
233
234 -spec prepared(atom()) -> boolean().
235 prepared(Name) ->
236 3246 ets:member(prepared_statements, Name).
237
238 -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
239 query_result().
240 execute(HostType, Name, Parameters) ->
241 24893 execute(HostType, ?DEFAULT_POOL, Name, Parameters).
242
243 -spec execute(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
244 query_result().
245 execute(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
246 89723 sql_call(HostType, PoolName, {sql_execute, Name, Parameters}).
247
248 -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
249 query_result().
250 execute_cast(HostType, Name, Parameters) ->
251 1 execute_cast(HostType, ?DEFAULT_POOL, Name, Parameters).
252
253 -spec execute_cast(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
254 query_result().
255 execute_cast(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
256 1 sql_cast(HostType, PoolName, {sql_execute, Name, Parameters}).
257
258 -spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
259 request_id().
260 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
261 1 execute_request(HostType, ?DEFAULT_POOL, Name, Parameters).
262
263 -spec execute_request(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
264 request_id().
265 execute_request(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
266 1 sql_request(HostType, PoolName, {sql_execute, Name, Parameters}).
267
268 -spec execute_wrapped_request(
269 HostType :: server(),
270 Name :: atom(),
271 Parameters :: [term()],
272 Wrapper :: request_wrapper()) -> request_id().
273 execute_wrapped_request(HostType, Name, Parameters, Wrapper) ->
274 13 execute_wrapped_request(HostType, ?DEFAULT_POOL, Name, Parameters, Wrapper).
275
276 -spec execute_wrapped_request(
277 HostType :: server(),
278 PoolName :: atom(),
279 Name :: atom(),
280 Parameters :: [term()],
281 Wrapper :: request_wrapper()) -> request_id().
282 execute_wrapped_request(HostType, PoolName, Name, Parameters, Wrapper)
283 when is_atom(PoolName), is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
284 13 sql_request(HostType, PoolName, {sql_execute_wrapped, Name, Parameters, Wrapper}).
285
286 %% Same as execute/3, but would fail loudly on any error.
287 -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
288 query_result().
289 execute_successfully(HostType, Name, Parameters) ->
290 64083 execute_successfully(HostType, ?DEFAULT_POOL, Name, Parameters).
291
292 -spec execute_successfully(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
293 query_result().
294 execute_successfully(HostType, PoolName, Name, Parameters) ->
295 64830 try execute(HostType, PoolName, Name, Parameters) of
296 {selected, _} = Result ->
297 40723 Result;
298 {updated, _} = Result ->
299 24081 Result;
300 Other ->
301 25 Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
302 statement_query => query_name_to_string(Name),
303 statement_params => Parameters, reason => Other},
304 25 ?LOG_ERROR(Log),
305 25 error(Log)
306 catch error:Reason:Stacktrace ->
307 1 Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
308 statement_query => query_name_to_string(Name),
309 statement_params => Parameters,
310 reason => Reason, stacktrace => Stacktrace},
311
:-(
?LOG_ERROR(Log),
312
:-(
erlang:raise(error, Reason, Stacktrace)
313 end.
314
315 query_name_to_string(Name) ->
316 26 case ets:lookup(prepared_statements, Name) of
317 [] ->
318
:-(
not_found;
319 [{_, _Table, _Fields, Statement}] ->
320 25 Statement
321 end.
322
323 -spec sql_query(HostType :: server(), Query :: any()) -> query_result().
324 sql_query(HostType, Query) ->
325 1104 sql_query(HostType, ?DEFAULT_POOL, Query).
326
327 -spec sql_query(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
328 sql_query(HostType, PoolName, Query) ->
329 1104 sql_call(HostType, PoolName, {sql_query, Query}).
330
331 -spec sql_query_request(HostType :: server(), Query :: any()) -> request_id().
332 sql_query_request(HostType, Query) ->
333 1 sql_query_request(HostType, ?DEFAULT_POOL, Query).
334
335 -spec sql_query_request(HostType :: server(), PoolName :: atom(), Query :: any()) -> request_id().
336 sql_query_request(HostType, PoolName, Query) ->
337 1 sql_request(HostType, PoolName, {sql_query, Query}).
338
339 -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
340 sql_query_cast(HostType, Query) ->
341 1 sql_query_cast(HostType, ?DEFAULT_POOL, Query).
342
343 -spec sql_query_cast(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
344 sql_query_cast(HostType, PoolName, Query) ->
345 1 sql_cast(HostType, PoolName, {sql_query, Query}).
346
347 %% @doc SQL transaction based on a list of queries
348 -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
349 sql_transaction(HostType, Msg) ->
350 10355 sql_transaction(HostType, ?DEFAULT_POOL, Msg).
351
352 -spec sql_transaction(server(), atom(), fun() | maybe_improper_list()) -> transaction_result().
353 sql_transaction(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
354
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
355
:-(
sql_transaction(HostType, PoolName, F);
356 %% SQL transaction, based on a erlang anonymous function (F = fun)
357 sql_transaction(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
358 10399 sql_call(HostType, PoolName, {sql_transaction, F}).
359
360 %% @doc SQL transaction based on a list of queries
361 -spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id().
362 sql_transaction_request(HostType, Queries) ->
363 1 sql_transaction_request(HostType, ?DEFAULT_POOL, Queries).
364
365 -spec sql_transaction_request(server(), atom(), fun() | maybe_improper_list()) -> request_id().
366 sql_transaction_request(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
367 1 F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
368 1 sql_transaction_request(HostType, PoolName, F);
369 %% SQL transaction, based on a erlang anonymous function (F = fun)
370 sql_transaction_request(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
371 1 sql_request(HostType, PoolName, {sql_transaction, F}).
372
373 %% This function allows to specify delay between retries.
374 -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
375 transaction_with_delayed_retry(HostType, F, Info) ->
376 273 Retries = maps:get(retries, Info),
377 273 Delay = maps:get(delay, Info),
378 273 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
379
380 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
381 273 Result = mongoose_rdbms:sql_transaction(HostType, F),
382 273 case Result of
383 {atomic, _} ->
384 273 Result;
385 {aborted, Reason} when Retries > 0 ->
386
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
387 text => <<"Transaction aborted. Restart">>,
388
:-(
reason => Reason, retries_left => Retries}),
389
:-(
timer:sleep(Delay),
390
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
391 _ ->
392
:-(
Err = Info#{what => mam_transaction_failed,
393 text => <<"Transaction failed. Do not restart">>,
394 reason => Result},
395
:-(
?LOG_ERROR(Err),
396
:-(
erlang:error(Err)
397 end.
398
399 -spec sql_dirty(server(), fun()) -> any() | no_return().
400 sql_dirty(HostType, F) ->
401
:-(
sql_dirty(HostType, ?DEFAULT_POOL, F).
402
403 -spec sql_dirty(server(), atom(), fun()) -> any() | no_return().
404 sql_dirty(HostType, PoolName, F) when is_function(F) ->
405
:-(
case sql_call(HostType, PoolName, {sql_dirty, F}) of
406
:-(
{ok, Result} -> Result;
407
:-(
{error, Error} -> throw(Error)
408 end.
409
410 %% TODO: Better spec for RPC calls
411 -spec sql_call(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
412 sql_call(HostType, PoolName, Msg) ->
413 101226 case get_state() of
414 80150 undefined -> sql_call0(HostType, PoolName, Msg);
415 State ->
416 21076 {Res, NewState} = nested_op(Msg, State),
417 21063 put_state(NewState),
418 21063 Res
419 end.
420
421 -spec sql_call0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
422 sql_call0(HostType, PoolName, Msg) ->
423 80150 Timestamp = erlang:monotonic_time(millisecond),
424 80150 mongoose_wpool:call(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).
425
426 -spec sql_request(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
427 sql_request(HostType, PoolName, Msg) ->
428 16 case get_state() of
429 5 undefined -> sql_request0(HostType, PoolName, Msg);
430 State ->
431 11 {Res, NewState} = nested_op(Msg, State),
432
:-(
put_state(NewState),
433
:-(
Res
434 end.
435
436 -spec sql_request0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
437 sql_request0(HostType, PoolName, Msg) ->
438 5 Timestamp = erlang:monotonic_time(millisecond),
439 5 mongoose_wpool:send_request(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).
440
441 -spec sql_cast(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
442 sql_cast(HostType, PoolName, Msg) ->
443 2 case get_state() of
444 2 undefined -> sql_cast0(HostType, PoolName, Msg);
445 State ->
446
:-(
{Res, NewState} = nested_op(Msg, State),
447
:-(
put_state(NewState),
448
:-(
Res
449 end.
450
451 -spec sql_cast0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
452 sql_cast0(HostType, PoolName, Msg) ->
453 2 Timestamp = erlang:monotonic_time(millisecond),
454 2 mongoose_wpool:cast(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).
455
456 -spec get_db_info(Target :: server() | pid()) ->
457 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
458 get_db_info(Pid) when is_pid(Pid) ->
459 1480 wpool_process:call(Pid, get_db_info, 5000);
460 get_db_info(HostType) ->
461
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
462
463 %% This function is intended to be used from inside an sql_transaction:
464 sql_query_t(Query) ->
465 2 sql_query_t(Query, get_state()).
466
467 sql_query_t(Query, State) ->
468 2 QRes = sql_query_internal(Query, State),
469 2 case QRes of
470 {error, Reason} ->
471
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
472 _ when is_list(QRes) ->
473
:-(
case lists:keysearch(error, 1, QRes) of
474 {value, {error, Reason}} ->
475
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
476 _ ->
477
:-(
QRes
478 end;
479 _ ->
480 2 QRes
481 end.
482
483 %% Only for binaries, escapes only the content, '%' has to be added afterwards
484 %% The escape character is '$' as '\' causes issues in PostgreSQL
485 %% Returned value is NOT safe to use outside of a prepared statement
486 -spec escape_prepared_like(binary()) -> binary().
487 escape_prepared_like(S) ->
488 18 << (escape_prepared_like_character(C)) || <<C>> <= S >>.
489
490 %% @doc Escape character that will confuse an SQL engine
491 %% Percent and underscore only need to be escaped for
492 %% pattern matching like statement
493 %% INFO: Used in mod_vcard_rdbms.
494 %% Searches in the middle of text, non-efficient
495 -spec escape_like(binary() | string()) -> escaped_like().
496 escape_like(S) ->
497 7 {escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
498
499 -spec escape_like_prefix(binary() | string()) -> escaped_like().
500 escape_like_prefix(S) ->
501
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
502
503 -spec escape_binary(server(), binary()) -> escaped_binary().
504 escape_binary(_HostType, Bin) when is_binary(Bin) ->
505 87 {escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
506
507 %% @doc The same as escape, but returns value including ''
508 -spec escape_string(binary() | string()) -> escaped_string().
509 escape_string(S) ->
510 57 {escaped_string, escape_string_internal(S)}.
511
512 -spec escape_integer(integer()) -> escaped_integer().
513 escape_integer(I) when is_integer(I) ->
514
:-(
{escaped_integer, integer_to_binary(I)}.
515
516 %% Be aware, that we can't just use escaped_integer here.
517 %% Because of the error in pgsql:
518 %% column \"match_all\" is of type boolean but expression is of type integer
519 -spec escape_boolean(boolean()) -> escaped_boolean().
520 escape_boolean(true) ->
521 1 {escaped_boolean, "'1'"};
522 escape_boolean(false) ->
523 1 {escaped_boolean, "'0'"}.
524
525 -spec escape_null() -> escaped_null().
526 escape_null() ->
527 12 {escaped_null, "null"}.
528
529
530 %% @doc SQL-injection check.
531 %% Call this function just before using value from escape_string/1 inside a query.
532 -spec use_escaped_string(escaped_string()) -> sql_query_part().
533 use_escaped_string({escaped_string, S}) ->
534 57 S;
535 use_escaped_string(X) ->
536 %% We need to print an error, because in some places
537 %% the error can be just ignored, because of too wide catches.
538
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
539
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
540
:-(
erlang:error({use_escaped_string, X}).
541
542 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
543 use_escaped_binary({escaped_binary, S}) ->
544 87 S;
545 use_escaped_binary(X) ->
546
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
547
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
548
:-(
erlang:error({use_escaped_binary, X}).
549
550 -spec use_escaped_like(escaped_like()) -> sql_query_part().
551 use_escaped_like({escaped_like, S}) ->
552 7 S;
553 use_escaped_like(X) ->
554
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
555
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
556
:-(
erlang:error({use_escaped_like, X}).
557
558 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
559 use_escaped_integer({escaped_integer, S}) ->
560
:-(
S;
561 use_escaped_integer(X) ->
562
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
563
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
564
:-(
erlang:error({use_escaped_integer, X}).
565
566 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
567 use_escaped_boolean({escaped_boolean, S}) ->
568 2 S;
569 use_escaped_boolean(X) ->
570
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
571
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
572
:-(
erlang:error({use_escaped_boolean, X}).
573
574 -spec use_escaped_null(escaped_null()) -> sql_query_part().
575 use_escaped_null({escaped_null, S}) ->
576 12 S;
577 use_escaped_null(X) ->
578
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
579
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
580
:-(
erlang:error({use_escaped_null, X}).
581
582 %% Use this function, if type is unknown.
583 %% Be aware, you can't pass escaped_like() there.
584 -spec use_escaped(Value) -> sql_query_part() when
585 Value :: escaped_value().
586 use_escaped({escaped_string, _}=X) ->
587 57 use_escaped_string(X);
588 use_escaped({escaped_binary, _}=X) ->
589 87 use_escaped_binary(X);
590 use_escaped({escaped_integer, _}=X) ->
591
:-(
use_escaped_integer(X);
592 use_escaped({escaped_boolean, _}=X) ->
593 2 use_escaped_boolean(X);
594 use_escaped({escaped_null, _}=X) ->
595 12 use_escaped_null(X);
596 use_escaped(X) ->
597
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
598
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
599
:-(
erlang:error({use_escaped, X}).
600
601 -spec escape_prepared_like_character(char()) -> binary().
602 4 escape_prepared_like_character($%) -> <<"$%">>;
603 3 escape_prepared_like_character($_) -> <<"$_">>;
604
:-(
escape_prepared_like_character($$) -> <<"$$">>;
605 61 escape_prepared_like_character(C) -> <<C>>.
606
607 -spec escape_like_internal(binary() | string()) -> binary() | string().
608 escape_like_internal(S) when is_binary(S) ->
609 7 list_to_binary(escape_like_internal(binary_to_list(S)));
610 escape_like_internal(S) when is_list(S) ->
611 7 [escape_like_character(C) || C <- S].
612
613 escape_string_internal(S) ->
614 57 case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
615 true ->
616
:-(
mongoose_rdbms_backend:escape_string(S);
617 false ->
618 %% generic escaping
619 57 [$', escape_characters(S), $']
620 end.
621
622 escape_characters(S) when is_binary(S) ->
623 57 list_to_binary(escape_characters(binary_to_list(S)));
624 escape_characters(S) when is_list(S) ->
625 57 [escape_character(C) || C <- S].
626
627
:-(
escape_like_character($%) -> "\\%";
628
:-(
escape_like_character($_) -> "\\_";
629 30 escape_like_character(C) -> escape_character(C).
630
631 %% Characters to escape
632
:-(
escape_character($\0) -> "\\0";
633 11006 escape_character($\n) -> "\\n";
634 1000 escape_character($\t) -> "\\t";
635 1000 escape_character($\b) -> "\\b";
636 6 escape_character($\r) -> "\\r";
637 18 escape_character($') -> "''";
638 9 escape_character($") -> "\\\"";
639 9 escape_character($\\) -> "\\\\";
640 17604 escape_character(C) -> C.
641
642
643 -spec unescape_binary(server(), binary()) -> binary().
644 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
645 5242 mongoose_rdbms_backend:unescape_binary(Bin).
646
647
648 -spec result_to_integer(binary() | integer()) -> integer().
649 result_to_integer(Int) when is_integer(Int) ->
650 19915 Int;
651 result_to_integer(Bin) when is_binary(Bin) ->
652
:-(
binary_to_integer(Bin).
653
654 selected_to_integer({selected, [{BInt}]}) ->
655 6318 result_to_integer(BInt).
656
657 %% Converts a value from a CHAR(1) field to integer
658
:-(
character_to_integer(<<X>>) -> X;
659 2350 character_to_integer(X) when is_integer(X) -> X.
660
661 %% pgsql returns booleans as "t" or "f"
662 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
663 to_bool(B) when is_binary(B) ->
664 4 to_bool(binary_to_list(B));
665 2 to_bool("t") -> true;
666
:-(
to_bool("true") -> true;
667
:-(
to_bool("1") -> true;
668 142 to_bool(true) -> true;
669
:-(
to_bool(1) -> true;
670 567 to_bool(_) -> false.
671
672 %%%----------------------------------------------------------------------
673 %%% Callback functions from gen_server
674 %%%----------------------------------------------------------------------
675 -spec init(options()) -> {ok, state()}.
676 init(Opts = #{query_timeout := QueryTimeout, max_start_interval := MaxStartInterval}) ->
677 2410 process_flag(trap_exit, true),
678 2410 KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
679 % retries are delayed exponentially, max_start_interval limits the delay
680 % e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
681 2410 case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
682 {ok, DbRef} ->
683 2410 schedule_keepalive(KeepaliveInterval),
684 2410 {ok, #state{db_ref = DbRef,
685 keepalive_interval = KeepaliveInterval,
686 query_timeout = QueryTimeout}};
687 Error ->
688
:-(
{stop, Error}
689 end.
690
691
692 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
693 79765 {Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
694 79765 case abort_on_driver_error(Result) of
695
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
696 79765 continue -> {reply, Result, NewState}
697 end;
698 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
699 1480 {reply, {ok, db_engine(global), DbRef}, State};
700 handle_call(Request, From, State) ->
701
:-(
?UNEXPECTED_CALL(Request, From),
702
:-(
{reply, {error, badarg}, State}.
703
704 handle_cast({sql_cmd, Command, Timestamp}, State) ->
705 2 {Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
706 2 case abort_on_driver_error(Result) of
707
:-(
{stop, Reason} -> {stop, Reason, NewState};
708 2 continue -> {noreply, NewState}
709 end;
710 handle_cast(Request, State) ->
711
:-(
?UNEXPECTED_CAST(Request),
712
:-(
{noreply, State}.
713
714 code_change(_OldVsn, State, _Extra) ->
715
:-(
{ok, State}.
716
717 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
718
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
719 {selected, _} ->
720
:-(
schedule_keepalive(KeepaliveInterval),
721
:-(
{noreply, State};
722 {error, _} = Error ->
723
:-(
{stop, {keepalive_failed, Error}, State}
724 end;
725 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
726
:-(
{stop, Reason, State};
727 handle_info(Info, State) ->
728
:-(
?UNEXPECTED_INFO(Info),
729
:-(
{noreply, State}.
730
731 -spec terminate(Reason :: term(), state()) -> any().
732 terminate(_Reason, #state{db_ref = DbRef}) ->
733 2410 catch mongoose_rdbms_backend:disconnect(DbRef).
734
735 %%%----------------------------------------------------------------------
736 %%% Internal functions
737 %%%----------------------------------------------------------------------
738
739 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
740 {Result :: term(), state()}.
741 run_sql_cmd(Command, _From, State, Timestamp) ->
742 79767 Now = erlang:monotonic_time(millisecond),
743 79767 case Now - Timestamp of
744 Age when Age < ?TRANSACTION_TIMEOUT ->
745 79767 outer_op(Command, State);
746 Age ->
747
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
748
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
749
:-(
{reply, {error, timeout}, State}
750 end.
751
752 %% @doc Only called by handle_call, only handles top level operations.
753 -spec outer_op(rdbms_msg(), state()) -> {query_result()
754 | transaction_result()
755 | dirty_result(), state()}.
756 outer_op({sql_query, Query}, State) ->
757 956 {sql_query_internal(Query, State), State};
758 outer_op({sql_transaction, F}, State) ->
759 10186 outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
760 outer_op({sql_dirty, F}, State) ->
761
:-(
sql_dirty_internal(F, State);
762 outer_op({sql_execute, Name, Params}, State) ->
763 68623 sql_execute(outer_op, Name, Params, State);
764 outer_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
765 2 try
766 2 Wrapper(fun() -> sql_execute(outer_op, Name, Params, State) end)
767 catch
768 _Class:Error ->
769 1 ?LOG_ERROR(#{what => sql_execute_wrapped_failed, reason => Error,
770
:-(
statement_name => Name, wrapper_fun => Wrapper}),
771 1 {{error, Error}, State}
772 end.
773
774 %% @doc Called via sql_query/transaction/bloc from client code when inside a
775 %% nested operation
776 -spec nested_op(rdbms_msg(), state()) -> any().
777 nested_op({sql_query, Query}, State) ->
778 %% XXX - use sql_query_t here insted? Most likely would break
779 %% callers who expect {error, _} tuples (sql_query_t turns
780 %% these into throws)
781
:-(
{sql_query_internal(Query, State), State};
782 nested_op({sql_transaction, F}, State) ->
783 %% Transaction inside a transaction
784
:-(
inner_transaction(F, State);
785 nested_op({sql_execute, Name, Params}, State) ->
786 21076 sql_execute(nested_op, Name, Params, State);
787 nested_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
788 11 Wrapper(fun() -> sql_execute(nested_op, Name, Params, State) end).
789
790 %% @doc Never retry nested transactions - only outer transactions
791 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
792 inner_transaction(F, _State) ->
793
:-(
case catch F() of
794 {aborted, Reason} ->
795
:-(
{aborted, Reason};
796 {'EXIT', Reason} ->
797
:-(
{'EXIT', Reason};
798 {atomic, Res} ->
799
:-(
{atomic, Res};
800 Res ->
801
:-(
{atomic, Res}
802 end.
803
804 -spec outer_transaction(F :: fun(),
805 NRestarts :: 0..10,
806 Reason :: any(), state()) -> {transaction_result(), state()}.
807 outer_transaction(F, NRestarts, _Reason, State) ->
808 10208 sql_query_internal(rdbms_queries:begin_trans(), State),
809 10208 put_state(State),
810 10208 {Result, StackTrace} = apply_transaction_function(F),
811 10208 NewState = erase_state(),
812 10208 case Result of
813 {aborted, Reason} when NRestarts > 0 ->
814 %% Retry outer transaction upto NRestarts times.
815 22 sql_query_internal([<<"rollback;">>], NewState),
816 22 outer_transaction(F, NRestarts - 1, Reason, NewState);
817 {aborted, #{reason := Reason, sql_query := SqlQuery}}
818 when NRestarts =:= 0 ->
819 %% Too many retries of outer transaction.
820
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
821 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
822 last_sql_query => iolist_to_binary(SqlQuery),
823
:-(
stacktrace => StackTrace, state => NewState}),
824
:-(
sql_query_internal([<<"rollback;">>], NewState),
825
:-(
{{aborted, Reason}, NewState};
826 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
827 %% Too many retries of outer transaction.
828 2 ?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
829 restarts => ?MAX_TRANSACTION_RESTARTS,
830 last_abort_reason => Reason, stacktrace => StackTrace,
831
:-(
state => NewState}),
832 2 sql_query_internal([<<"rollback;">>], NewState),
833 2 {{aborted, Reason}, NewState};
834 {'EXIT', Reason} ->
835 %% Abort sql transaction on EXIT from outer txn only.
836 1 sql_query_internal([<<"rollback;">>], NewState),
837 1 {{aborted, Reason}, NewState};
838 Res ->
839 %% Commit successful outer txn
840 10183 sql_query_internal([<<"commit;">>], NewState),
841 10183 {{atomic, Res}, NewState}
842 end.
843
844 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
845 apply_transaction_function(F) ->
846 10208 try
847 10208 {F(), []}
848 catch
849 throw:ThrowResult:StackTrace ->
850 24 {ThrowResult, StackTrace};
851 Class:Reason:StackTrace ->
852 1 ?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
853
:-(
reason => Reason, stacktrace => StackTrace}),
854 1 {{'EXIT', Reason}, StackTrace}
855 end.
856
857 sql_query_internal(Query, #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
858 21374 case mongoose_rdbms_backend:query(DBRef, Query, QueryTimeout) of
859 {error, "No SQL-driver information available."} ->
860
:-(
{updated, 0}; %% workaround for odbc bug
861 Result ->
862 21374 Result
863 end.
864
865 sql_dirty_internal(F, State) ->
866
:-(
put_state(State),
867
:-(
Result =
868
:-(
try F() of
869 Result0 ->
870
:-(
{ok, Result0}
871 catch
872 _C:R ->
873
:-(
{error, R}
874 end,
875
:-(
{Result, erase_state()}.
876
877 -spec sql_execute(Type :: atom(), Name :: atom(), Params :: [term()], state()) ->
878 {query_result(), state()}.
879 sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
880 %% Postgres allows to prepare statement only once, so we should take care that NewState is updated
881 89711 {StatementRef, NewState} = prepare_statement(Name, State),
882 89711 put_state(NewState),
883 89711 Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, QueryTimeout)
884 catch Class:Reason:StackTrace ->
885
:-(
?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
886 class => Class, reason => Reason, params => Params,
887
:-(
stacktrace => StackTrace}),
888
:-(
erlang:raise(Class, Reason, StackTrace)
889 end,
890 89711 check_execute_result(Type, Res, Name, Params),
891 89687 {Res, NewState}.
892
893 %% Similar check as in sql_query_t
894 check_execute_result(outer_op, _Res, _Name, _Params) ->
895 68624 ok;
896 check_execute_result(nested_op, Res, Name, Params) ->
897 %% Res is not a list (i.e. executes are one query only and one result set only)
898 21087 case Res of
899 {error, Reason} ->
900 24 throw({aborted, #{reason => Reason, statement_name => Name, params => Params}});
901 _ when is_tuple(Res) ->
902 21063 ok
903 end.
904
905 -spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
906 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
907 89711 case maps:get(Name, Prepared, undefined) of
908 undefined ->
909 1791 {_, Table, Fields, Statement} = lookup_statement(Name),
910 1791 {ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
911 1791 {Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
912
913 Ref ->
914 87920 {Ref, State}
915 end.
916
917 lookup_statement(Name) ->
918 1791 case ets:lookup(prepared_statements, Name) of
919 1791 [Rec] -> Rec;
920
:-(
[] -> error({lookup_statement_failed, Name})
921 end.
922
923 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
924 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
925
:-(
{stop, timeout};
926 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
927
:-(
{stop, closed};
928 abort_on_driver_error(_) ->
929 79767 continue.
930
931 -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
932 db_engine(_HostType) ->
933 8826 try mongoose_backend:get_backend_name(global, ?MODULE)
934
:-(
catch error:badarg -> undefined end.
935
936 %% @doc Used to optimise queries for different databases.
937 %% @todo Should be refactored to use host types with this module.
938 %% Also, this parameter should not be global, but pool-name parameterized
939 -spec db_type() -> mssql | generic.
940 db_type() ->
941 16523 case mongoose_config:get_opt(rdbms_server_type) of
942
:-(
mssql -> mssql;
943 16523 _ -> generic
944 end.
945
946 -spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
947 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
948 connect(#{query_timeout := QueryTimeout} = Options, Retry, RetryAfter, MaxRetryDelay) ->
949 2410 case mongoose_rdbms_backend:connect(Options, QueryTimeout) of
950 {ok, _} = Ok ->
951 2410 Ok;
952 Error when Retry =:= 0 ->
953
:-(
Error;
954 Error ->
955
:-(
SleepFor = rand:uniform(RetryAfter),
956
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
957
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
958
:-(
error => Error, sleep_for => SleepFor}),
959
:-(
timer:sleep(timer:seconds(SleepFor)),
960
:-(
NextRetryDelay = RetryAfter * RetryAfter,
961
:-(
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
962 end.
963
964
965 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
966 schedule_keepalive(KeepaliveInterval) ->
967 2410 case KeepaliveInterval of
968 _ when is_integer(KeepaliveInterval) ->
969
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
970 undefined ->
971 2410 ok;
972 Other ->
973
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
974
:-(
ok
975 end.
976
977 %% ----- process state access, for convenient tracing
978
979 put_state(State) ->
980 120982 put(?STATE_KEY, State).
981
982 erase_state() ->
983 10208 erase(?STATE_KEY).
984
985 get_state() ->
986 101246 get(?STATE_KEY).
Line Hits Source