./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -ifdef(gen_server_request_id).
50 -type request_id() :: gen_server:request_id().
51 -else.
52 -type request_id() :: term().
53 -endif.
54
55 -export_type([escaped_binary/0,
56 escaped_string/0,
57 escaped_like/0,
58 escaped_integer/0,
59 escaped_boolean/0,
60 escaped_null/0,
61 escaped_value/0,
62 sql_query/0,
63 sql_query_part/0]).
64
65 -export([process_options/1]).
66
67 %% External exports
68 -export([prepare/4,
69 prepared/1,
70 execute/3, execute/4,
71 execute_cast/3, execute_cast/4,
72 execute_request/3, execute_request/4,
73 execute_wrapped_request/4, execute_wrapped_request/5,
74 execute_successfully/3, execute_successfully/4,
75 sql_query/2, sql_query/3,
76 sql_query_cast/2, sql_query_cast/3,
77 sql_query_request/2, sql_query_request/3,
78 sql_transaction/2, sql_transaction/3,
79 sql_transaction_request/2, sql_transaction_request/3,
80 sql_dirty/2, sql_dirty/3,
81 sql_query_t/1,
82 transaction_with_delayed_retry/3,
83 to_bool/1,
84 db_engine/1,
85 db_type/0,
86 use_escaped/1]).
87
88 %% Unicode escaping
89 -export([escape_string/1,
90 use_escaped_string/1]).
91
92 %% Integer escaping
93 -export([escape_integer/1,
94 use_escaped_integer/1]).
95
96 %% Boolean escaping
97 -export([escape_boolean/1,
98 use_escaped_boolean/1]).
99
100 %% LIKE escaping
101 -export([escape_like/1,
102 escape_prepared_like/1,
103 escape_like_prefix/1,
104 use_escaped_like/1]).
105
106 %% BLOB escaping
107 -export([escape_binary/2,
108 unescape_binary/2,
109 use_escaped_binary/1]).
110
111 %% Null escaping
112 %% (to keep uniform pattern of passing values)
113 -export([escape_null/0,
114 use_escaped_null/1]).
115
116 %% count / integra types decoding
117 -export([result_to_integer/1,
118 selected_to_integer/1]).
119
120 -export([character_to_integer/1]).
121
122 %% gen_server callbacks
123 -export([init/1,
124 handle_call/3,
125 handle_cast/2,
126 handle_info/2,
127 terminate/2,
128 code_change/3]).
129
130 -ignore_xref([
131 sql_query_cast/2, sql_query_request/2,
132 execute/4, execute_wrapped_request/5,
133 sql_query/3, sql_query_cast/3, sql_query_request/3,
134 sql_dirty/3, sql_transaction/3,
135 execute_successfully/4, send_request/3,
136 execute_cast/3, execute_cast/4,
137 execute_request/3, execute_request/4,
138 execute_wrapped_request/4,
139 sql_transaction_request/2, sql_transaction_request/3,
140 sql_query_t/1, use_escaped/1,
141 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
142 escape_binary/2, use_escaped_binary/1,
143 escape_integer/1, use_escaped_integer/1,
144 escape_string/1, use_escaped_string/1,
145 escape_boolean/1, use_escaped_boolean/1,
146 escape_null/0, use_escaped_null/1
147 ]).
148
149 %% internal usage
150 -export([get_db_info/1]).
151
152 -include("mongoose.hrl").
153
154 -record(state, {db_ref,
155 prepared = #{} :: #{binary() => term()},
156 keepalive_interval :: undefined | pos_integer(),
157 query_timeout :: pos_integer()
158 }).
159 -type state() :: #state{}.
160
161 -define(DEFAULT_POOL_NAME, default).
162 -define(STATE_KEY, mongoose_rdbms_state).
163 -define(MAX_TRANSACTION_RESTARTS, 10).
164 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
165 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
166 %% The value is arbitrary; supervisor will restart the connection once
167 %% the retry counter runs out. We just attempt to reduce log pollution.
168 -define(CONNECT_RETRIES, 5).
169
170 -type server() :: mongooseim:host_type() | global.
171 -type request_wrapper() :: fun((fun(() -> T)) -> T).
172 -type rdbms_msg() :: {sql_query, _}
173 | {sql_transaction, fun()}
174 | {sql_dirty, fun()}
175 | {sql_execute, atom(), [binary() | boolean() | integer()]}
176 | {sql_execute_wrapped, atom(), [binary() | boolean() | integer()], request_wrapper()}.
177 -type single_query_result() :: {selected, [tuple()]} |
178 {updated, non_neg_integer() | undefined} |
179 {updated, non_neg_integer(), [tuple()]} |
180 {aborted, Reason :: term()} |
181 {error, Reason :: string() | duplicate_key}.
182 -type query_result() :: single_query_result() | [single_query_result()].
183 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
184 -type dirty_result() :: {ok, any()} | {error, any()}.
185 -export_type([query_result/0,
186 transaction_result/0,
187 server/0]).
188
189 -type options() :: #{driver := pgsql | mysql | odbc,
190 max_start_interval := pos_integer(),
191 query_timeout := pos_integer(),
192 atom() => any()}.
193
194 -export_type([options/0]).
195
196 %%%----------------------------------------------------------------------
197 %%% API
198 %%%----------------------------------------------------------------------
199
200 -spec process_options(map()) -> options().
201 process_options(Opts = #{driver := odbc, settings := _}) ->
202 104 Opts;
203 process_options(Opts = #{host := _Host, database := _DB, username := _User, password := _Pass}) ->
204
:-(
ensure_db_port(process_tls_options(Opts));
205 process_options(Opts) ->
206
:-(
error(#{what => invalid_rdbms_connection_options, options => Opts}).
207
208 process_tls_options(Opts = #{driver := mysql, tls := #{required := _}}) ->
209
:-(
error(#{what => invalid_rdbms_tls_options, options => Opts,
210 text => <<"The 'required' option is not supported for MySQL">>});
211 process_tls_options(Opts = #{driver := pgsql, tls := TLSOpts}) ->
212
:-(
Opts#{tls := maps:merge(#{required => false}, TLSOpts)};
213 process_tls_options(Opts) ->
214
:-(
Opts.
215
216
:-(
ensure_db_port(Opts = #{port := _}) -> Opts;
217
:-(
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
218
:-(
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.
219
220 -spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
221 Statement :: iodata()) ->
222 {ok, Name} | {error, already_exists}
223 when Name :: atom().
224 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
225 29339 prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
226 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
227 25522 prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
228 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
229 29339 true = lists:all(fun is_binary/1, Fields),
230 29339 Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
231 29339 case ets:insert_new(prepared_statements, Tuple) of
232 6466 true -> {ok, Name};
233 22873 false -> {error, already_exists}
234 end.
235
236 -spec prepared(atom()) -> boolean().
237 prepared(Name) ->
238 5393 ets:member(prepared_statements, Name).
239
240 -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
241 query_result().
242 execute(HostType, Name, Parameters) ->
243 28036 execute(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).
244
245 -spec execute(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
246 query_result().
247 execute(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
248 153040 sql_call(HostType, PoolName, {sql_execute, Name, Parameters}).
249
250 -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
251 query_result().
252 execute_cast(HostType, Name, Parameters) ->
253
:-(
execute_cast(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).
254
255 -spec execute_cast(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
256 query_result().
257 execute_cast(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
258 1 sql_cast(HostType, PoolName, {sql_execute, Name, Parameters}).
259
260 -spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
261 request_id().
262 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
263 649 execute_request(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).
264
265 -spec execute_request(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
266 request_id().
267 execute_request(HostType, PoolName, Name, Parameters) when is_atom(PoolName), is_atom(Name), is_list(Parameters) ->
268 650 sql_request(HostType, PoolName, {sql_execute, Name, Parameters}).
269
270 -spec execute_wrapped_request(
271 HostType :: server(),
272 Name :: atom(),
273 Parameters :: [term()],
274 Wrapper :: request_wrapper()) -> request_id().
275 execute_wrapped_request(HostType, Name, Parameters, Wrapper) ->
276 11 execute_wrapped_request(HostType, ?DEFAULT_POOL_NAME, Name, Parameters, Wrapper).
277
278 -spec execute_wrapped_request(
279 HostType :: server(),
280 PoolName :: atom(),
281 Name :: atom(),
282 Parameters :: [term()],
283 Wrapper :: request_wrapper()) -> request_id().
284 execute_wrapped_request(HostType, PoolName, Name, Parameters, Wrapper)
285 when is_atom(PoolName), is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
286 13 sql_request(HostType, PoolName, {sql_execute_wrapped, Name, Parameters, Wrapper}).
287
288 %% Same as execute/3, but would fail loudly on any error.
289 -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
290 query_result().
291 execute_successfully(HostType, Name, Parameters) ->
292 122174 execute_successfully(HostType, ?DEFAULT_POOL_NAME, Name, Parameters).
293
294 -spec execute_successfully(HostType :: server(), PoolName :: atom(), Name :: atom(), Parameters :: [term()]) ->
295 query_result().
296 execute_successfully(HostType, PoolName, Name, Parameters) ->
297 122197 try execute(HostType, PoolName, Name, Parameters) of
298 {selected, _} = Result ->
299 78484 Result;
300 {updated, _} = Result ->
301 43406 Result;
302 Other ->
303
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
304 statement_query => query_name_to_string(Name),
305 statement_params => Parameters, reason => Other},
306
:-(
?LOG_ERROR(Log),
307
:-(
error(Log)
308 catch error:Reason:Stacktrace ->
309
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
310 statement_query => query_name_to_string(Name),
311 statement_params => Parameters,
312 reason => Reason, stacktrace => Stacktrace},
313
:-(
?LOG_ERROR(Log),
314
:-(
erlang:raise(error, Reason, Stacktrace)
315 end.
316
317 query_name_to_string(Name) ->
318
:-(
case ets:lookup(prepared_statements, Name) of
319 [] ->
320
:-(
not_found;
321 [{_, _Table, _Fields, Statement}] ->
322
:-(
Statement
323 end.
324
325 -spec sql_query(HostType :: server(), Query :: any()) -> query_result().
326 sql_query(HostType, Query) ->
327 484 sql_query(HostType, ?DEFAULT_POOL_NAME, Query).
328
329 -spec sql_query(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
330 sql_query(HostType, PoolName, Query) ->
331 1379 sql_call(HostType, PoolName, {sql_query, Query}).
332
333 -spec sql_query_request(HostType :: server(), Query :: any()) -> request_id().
334 sql_query_request(HostType, Query) ->
335
:-(
sql_query_request(HostType, ?DEFAULT_POOL_NAME, Query).
336
337 -spec sql_query_request(HostType :: server(), PoolName :: atom(), Query :: any()) -> request_id().
338 sql_query_request(HostType, PoolName, Query) ->
339 1 sql_request(HostType, PoolName, {sql_query, Query}).
340
341 -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
342 sql_query_cast(HostType, Query) ->
343
:-(
sql_query_cast(HostType, ?DEFAULT_POOL_NAME, Query).
344
345 -spec sql_query_cast(HostType :: server(), PoolName :: atom(), Query :: any()) -> query_result().
346 sql_query_cast(HostType, PoolName, Query) ->
347 1 sql_cast(HostType, PoolName, {sql_query, Query}).
348
349 %% @doc SQL transaction based on a list of queries
350 -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
351 sql_transaction(HostType, Msg) ->
352 18585 sql_transaction(HostType, ?DEFAULT_POOL_NAME, Msg).
353
354 -spec sql_transaction(server(), atom(), fun() | maybe_improper_list()) -> transaction_result().
355 sql_transaction(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
356
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
357
:-(
sql_transaction(HostType, PoolName, F);
358 %% SQL transaction, based on a erlang anonymous function (F = fun)
359 sql_transaction(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
360 18589 sql_call(HostType, PoolName, {sql_transaction, F}).
361
362 %% @doc SQL transaction based on a list of queries
363 -spec sql_transaction_request(server(), fun() | maybe_improper_list()) -> request_id().
364 sql_transaction_request(HostType, Queries) ->
365
:-(
sql_transaction_request(HostType, ?DEFAULT_POOL_NAME, Queries).
366
367 -spec sql_transaction_request(server(), atom(), fun() | maybe_improper_list()) -> request_id().
368 sql_transaction_request(HostType, PoolName, Queries) when is_atom(PoolName), is_list(Queries) ->
369 1 F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
370 1 sql_transaction_request(HostType, PoolName, F);
371 %% SQL transaction, based on a erlang anonymous function (F = fun)
372 sql_transaction_request(HostType, PoolName, F) when is_atom(PoolName), is_function(F) ->
373 1 sql_request(HostType, PoolName, {sql_transaction, F}).
374
375 %% This function allows to specify delay between retries.
376 -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
377 transaction_with_delayed_retry(HostType, F, Info) ->
378 297 Retries = maps:get(retries, Info),
379 297 Delay = maps:get(delay, Info),
380 297 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
381
382 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
383 297 Result = mongoose_rdbms:sql_transaction(HostType, F),
384 297 case Result of
385 {atomic, _} ->
386 297 Result;
387 {aborted, Reason} when Retries > 0 ->
388
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
389 text => <<"Transaction aborted. Restart">>,
390
:-(
reason => Reason, retries_left => Retries}),
391
:-(
timer:sleep(Delay),
392
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
393 _ ->
394
:-(
Err = Info#{what => mam_transaction_failed,
395 text => <<"Transaction failed. Do not restart">>,
396 reason => Result},
397
:-(
?LOG_ERROR(Err),
398
:-(
erlang:error(Err)
399 end.
400
401 -spec sql_dirty(server(), fun()) -> any() | no_return().
402 sql_dirty(HostType, F) ->
403 1731 sql_dirty(HostType, ?DEFAULT_POOL_NAME, F).
404
405 -spec sql_dirty(server(), atom(), fun()) -> any() | no_return().
406 sql_dirty(HostType, PoolName, F) when is_function(F) ->
407 1731 case sql_call(HostType, PoolName, {sql_dirty, F}) of
408 1731 {ok, Result} -> Result;
409
:-(
{error, Error} -> throw(Error)
410 end.
411
412 %% TODO: Better spec for RPC calls
413 -spec sql_call(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
414 sql_call(HostType, PoolName, Msg) ->
415 174739 case get_state() of
416 123006 undefined -> sql_call0(HostType, PoolName, Msg);
417 State ->
418 51733 {Res, NewState} = nested_op(Msg, State),
419 51418 put_state(NewState),
420 51418 Res
421 end.
422
423 -spec sql_call0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
424 sql_call0(HostType, PoolName, Msg) ->
425 123006 Timestamp = erlang:monotonic_time(millisecond),
426 123006 mongoose_wpool:call(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).
427
428 -spec sql_request(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
429 sql_request(HostType, PoolName, Msg) ->
430 665 case get_state() of
431 654 undefined -> sql_request0(HostType, PoolName, Msg);
432 State ->
433 11 {Res, NewState} = nested_op(Msg, State),
434
:-(
put_state(NewState),
435
:-(
Res
436 end.
437
438 -spec sql_request0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
439 sql_request0(HostType, PoolName, Msg) ->
440 654 Timestamp = erlang:monotonic_time(millisecond),
441 654 mongoose_wpool:send_request(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).
442
443 -spec sql_cast(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
444 sql_cast(HostType, PoolName, Msg) ->
445 2 case get_state() of
446 2 undefined -> sql_cast0(HostType, PoolName, Msg);
447 State ->
448
:-(
{Res, NewState} = nested_op(Msg, State),
449
:-(
put_state(NewState),
450
:-(
Res
451 end.
452
453 -spec sql_cast0(HostType :: server(), PoolName :: atom(), Msg :: rdbms_msg()) -> any().
454 sql_cast0(HostType, PoolName, Msg) ->
455 2 Timestamp = erlang:monotonic_time(millisecond),
456 2 mongoose_wpool:cast(rdbms, HostType, PoolName, {sql_cmd, Msg, Timestamp}).
457
458 -spec get_db_info(Target :: server() | pid()) ->
459 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
460 get_db_info(Pid) when is_pid(Pid) ->
461 650 wpool_process:call(Pid, get_db_info, 5000);
462 get_db_info(HostType) ->
463
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
464
465 %% This function is intended to be used from inside an sql_transaction:
466 sql_query_t(Query) ->
467 2 sql_query_t(Query, get_state()).
468
469 sql_query_t(Query, State) ->
470 2 QRes = sql_query_internal(Query, State),
471 2 case QRes of
472 {error, Reason} ->
473
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
474 _ when is_list(QRes) ->
475
:-(
case lists:keysearch(error, 1, QRes) of
476 {value, {error, Reason}} ->
477
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
478 _ ->
479
:-(
QRes
480 end;
481 _ ->
482 2 QRes
483 end.
484
485 %% Only for binaries, escapes only the content, '%' has to be added afterwards
486 %% The escape character is '$' as '\' causes issues in PostgreSQL
487 %% Returned value is NOT safe to use outside of a prepared statement
488 -spec escape_prepared_like(binary()) -> binary().
489 escape_prepared_like(S) ->
490 18 << (escape_prepared_like_character(C)) || <<C>> <= S >>.
491
492 %% @doc Escape character that will confuse an SQL engine
493 %% Percent and underscore only need to be escaped for
494 %% pattern matching like statement
495 %% INFO: Used in mod_vcard_rdbms.
496 %% Searches in the middle of text, non-efficient
497 -spec escape_like(binary() | string()) -> escaped_like().
498 escape_like(S) ->
499 7 {escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
500
501 -spec escape_like_prefix(binary() | string()) -> escaped_like().
502 escape_like_prefix(S) ->
503
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
504
505 -spec escape_binary(server(), binary()) -> escaped_binary().
506 escape_binary(_HostType, Bin) when is_binary(Bin) ->
507 87 {escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
508
509 %% @doc The same as escape, but returns value including ''
510 -spec escape_string(binary() | string()) -> escaped_string().
511 escape_string(S) ->
512 57 {escaped_string, escape_string_internal(S)}.
513
514 -spec escape_integer(integer()) -> escaped_integer().
515 escape_integer(I) when is_integer(I) ->
516
:-(
{escaped_integer, integer_to_binary(I)}.
517
518 %% Be aware, that we can't just use escaped_integer here.
519 %% Because of the error in pgsql:
520 %% column \"match_all\" is of type boolean but expression is of type integer
521 -spec escape_boolean(boolean()) -> escaped_boolean().
522 escape_boolean(true) ->
523 1 {escaped_boolean, "'1'"};
524 escape_boolean(false) ->
525 1 {escaped_boolean, "'0'"}.
526
527 -spec escape_null() -> escaped_null().
528 escape_null() ->
529 12 {escaped_null, "null"}.
530
531
532 %% @doc SQL-injection check.
533 %% Call this function just before using value from escape_string/1 inside a query.
534 -spec use_escaped_string(escaped_string()) -> sql_query_part().
535 use_escaped_string({escaped_string, S}) ->
536 57 S;
537 use_escaped_string(X) ->
538 %% We need to print an error, because in some places
539 %% the error can be just ignored, because of too wide catches.
540
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
541
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
542
:-(
erlang:error({use_escaped_string, X}).
543
544 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
545 use_escaped_binary({escaped_binary, S}) ->
546 87 S;
547 use_escaped_binary(X) ->
548
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
549
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
550
:-(
erlang:error({use_escaped_binary, X}).
551
552 -spec use_escaped_like(escaped_like()) -> sql_query_part().
553 use_escaped_like({escaped_like, S}) ->
554 7 S;
555 use_escaped_like(X) ->
556
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
557
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
558
:-(
erlang:error({use_escaped_like, X}).
559
560 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
561 use_escaped_integer({escaped_integer, S}) ->
562
:-(
S;
563 use_escaped_integer(X) ->
564
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
565
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
566
:-(
erlang:error({use_escaped_integer, X}).
567
568 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
569 use_escaped_boolean({escaped_boolean, S}) ->
570 2 S;
571 use_escaped_boolean(X) ->
572
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
573
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
574
:-(
erlang:error({use_escaped_boolean, X}).
575
576 -spec use_escaped_null(escaped_null()) -> sql_query_part().
577 use_escaped_null({escaped_null, S}) ->
578 12 S;
579 use_escaped_null(X) ->
580
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
581
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
582
:-(
erlang:error({use_escaped_null, X}).
583
584 %% Use this function, if type is unknown.
585 %% Be aware, you can't pass escaped_like() there.
586 -spec use_escaped(Value) -> sql_query_part() when
587 Value :: escaped_value().
588 use_escaped({escaped_string, _}=X) ->
589 57 use_escaped_string(X);
590 use_escaped({escaped_binary, _}=X) ->
591 87 use_escaped_binary(X);
592 use_escaped({escaped_integer, _}=X) ->
593
:-(
use_escaped_integer(X);
594 use_escaped({escaped_boolean, _}=X) ->
595 2 use_escaped_boolean(X);
596 use_escaped({escaped_null, _}=X) ->
597 12 use_escaped_null(X);
598 use_escaped(X) ->
599
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
600
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
601
:-(
erlang:error({use_escaped, X}).
602
603 -spec escape_prepared_like_character(char()) -> binary().
604 4 escape_prepared_like_character($%) -> <<"$%">>;
605 3 escape_prepared_like_character($_) -> <<"$_">>;
606
:-(
escape_prepared_like_character($$) -> <<"$$">>;
607 61 escape_prepared_like_character(C) -> <<C>>.
608
609 -spec escape_like_internal(binary() | string()) -> binary() | string().
610 escape_like_internal(S) when is_binary(S) ->
611 7 list_to_binary(escape_like_internal(binary_to_list(S)));
612 escape_like_internal(S) when is_list(S) ->
613 7 [escape_like_character(C) || C <- S].
614
615 escape_string_internal(S) ->
616 57 case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
617 true ->
618 57 mongoose_rdbms_backend:escape_string(S);
619 false ->
620 %% generic escaping
621
:-(
[$', escape_characters(S), $']
622 end.
623
624 escape_characters(S) when is_binary(S) ->
625
:-(
list_to_binary(escape_characters(binary_to_list(S)));
626 escape_characters(S) when is_list(S) ->
627
:-(
[escape_character(C) || C <- S].
628
629
:-(
escape_like_character($%) -> "\\%";
630
:-(
escape_like_character($_) -> "\\_";
631 30 escape_like_character(C) -> escape_character(C).
632
633 %% Characters to escape
634
:-(
escape_character($\0) -> "\\0";
635
:-(
escape_character($\n) -> "\\n";
636
:-(
escape_character($\t) -> "\\t";
637
:-(
escape_character($\b) -> "\\b";
638
:-(
escape_character($\r) -> "\\r";
639
:-(
escape_character($') -> "''";
640
:-(
escape_character($") -> "\\\"";
641
:-(
escape_character($\\) -> "\\\\";
642 30 escape_character(C) -> C.
643
644
645 -spec unescape_binary(server(), binary()) -> binary().
646 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
647 6538 mongoose_rdbms_backend:unescape_binary(Bin).
648
649
650 -spec result_to_integer(binary() | integer()) -> integer().
651 result_to_integer(Int) when is_integer(Int) ->
652 33695 Int;
653 result_to_integer(Bin) when is_binary(Bin) ->
654
:-(
binary_to_integer(Bin).
655
656 selected_to_integer({selected, [{BInt}]}) ->
657 9934 result_to_integer(BInt).
658
659 %% Converts a value from a CHAR(1) field to integer
660 3036 character_to_integer(<<X>>) -> X;
661
:-(
character_to_integer(X) when is_integer(X) -> X.
662
663 %% pgsql returns booleans as "t" or "f"
664 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
665 to_bool(B) when is_binary(B) ->
666
:-(
to_bool(binary_to_list(B));
667
:-(
to_bool("t") -> true;
668
:-(
to_bool("true") -> true;
669
:-(
to_bool("1") -> true;
670
:-(
to_bool(true) -> true;
671 205 to_bool(1) -> true;
672 808 to_bool(_) -> false.
673
674 %%%----------------------------------------------------------------------
675 %%% Callback functions from gen_server
676 %%%----------------------------------------------------------------------
677 -spec init(options()) -> {ok, state()}.
678 init(Opts = #{query_timeout := QueryTimeout, max_start_interval := MaxStartInterval}) ->
679 530 process_flag(trap_exit, true),
680 530 KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
681 % retries are delayed exponentially, max_start_interval limits the delay
682 % e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
683 530 case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
684 {ok, DbRef} ->
685 530 schedule_keepalive(KeepaliveInterval),
686 530 {ok, #state{db_ref = DbRef,
687 keepalive_interval = KeepaliveInterval,
688 query_timeout = QueryTimeout}};
689 Error ->
690
:-(
{stop, Error}
691 end.
692
693
694 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
695 123659 {Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
696 123659 case abort_on_driver_error(Result) of
697
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
698 123659 continue -> {reply, Result, NewState}
699 end;
700 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
701 650 {reply, {ok, db_engine(global), DbRef}, State};
702 handle_call(Request, From, State) ->
703
:-(
?UNEXPECTED_CALL(Request, From),
704
:-(
{reply, {error, badarg}, State}.
705
706 handle_cast({sql_cmd, Command, Timestamp}, State) ->
707 2 {Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
708 2 case abort_on_driver_error(Result) of
709
:-(
{stop, Reason} -> {stop, Reason, NewState};
710 2 continue -> {noreply, NewState}
711 end;
712 handle_cast(Request, State) ->
713
:-(
?UNEXPECTED_CAST(Request),
714
:-(
{noreply, State}.
715
716 code_change(_OldVsn, State, _Extra) ->
717
:-(
{ok, State}.
718
719 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
720
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
721 {selected, _} ->
722
:-(
schedule_keepalive(KeepaliveInterval),
723
:-(
{noreply, State};
724 {error, _} = Error ->
725
:-(
{stop, {keepalive_failed, Error}, State}
726 end;
727 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
728
:-(
{stop, Reason, State};
729 handle_info(Info, State) ->
730
:-(
?UNEXPECTED_INFO(Info),
731
:-(
{noreply, State}.
732
733 -spec terminate(Reason :: term(), state()) -> any().
734 terminate(_Reason, #state{db_ref = DbRef}) ->
735 520 catch mongoose_rdbms_backend:disconnect(DbRef).
736
737 %%%----------------------------------------------------------------------
738 %%% Internal functions
739 %%%----------------------------------------------------------------------
740
741 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
742 {Result :: term(), state()}.
743 run_sql_cmd(Command, _From, State, Timestamp) ->
744 123661 Now = erlang:monotonic_time(millisecond),
745 123661 case Now - Timestamp of
746 Age when Age < ?TRANSACTION_TIMEOUT ->
747 123661 outer_op(Command, State);
748 Age ->
749
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
750
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
751
:-(
{reply, {error, timeout}, State}
752 end.
753
754 %% @doc Only called by handle_call, only handles top level operations.
755 -spec outer_op(rdbms_msg(), state()) -> {query_result()
756 | transaction_result()
757 | dirty_result(), state()}.
758 outer_op({sql_query, Query}, State) ->
759 965 {sql_query_internal(Query, State), State};
760 outer_op({sql_transaction, F}, State) ->
761 18590 outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
762 outer_op({sql_dirty, F}, State) ->
763 1731 sql_dirty_internal(F, State);
764 outer_op({sql_execute, Name, Params}, State) ->
765 102373 sql_execute(outer_op, Name, Params, State);
766 outer_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
767 2 try
768 2 Wrapper(fun() -> sql_execute(outer_op, Name, Params, State) end)
769 catch
770 _Class:Error ->
771 1 ?LOG_ERROR(#{what => sql_execute_wrapped_failed, reason => Error,
772
:-(
statement_name => Name, wrapper_fun => Wrapper}),
773 1 {{error, Error}, State}
774 end.
775
776 %% @doc Called via sql_query/transaction/bloc from client code when inside a
777 %% nested operation
778 -spec nested_op(rdbms_msg(), state()) -> any().
779 nested_op({sql_query, Query}, State) ->
780 %% XXX - use sql_query_t here insted? Most likely would break
781 %% callers who expect {error, _} tuples (sql_query_t turns
782 %% these into throws)
783 416 {sql_query_internal(Query, State), State};
784 nested_op({sql_transaction, F}, State) ->
785 %% Transaction inside a transaction
786
:-(
inner_transaction(F, State);
787 nested_op({sql_execute, Name, Params}, State) ->
788 51317 sql_execute(nested_op, Name, Params, State);
789 nested_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
790 11 Wrapper(fun() -> sql_execute(nested_op, Name, Params, State) end).
791
792 %% @doc Never retry nested transactions - only outer transactions
793 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
794 inner_transaction(F, _State) ->
795
:-(
case catch F() of
796 {aborted, Reason} ->
797
:-(
{aborted, Reason};
798 {'EXIT', Reason} ->
799
:-(
{'EXIT', Reason};
800 {atomic, Res} ->
801
:-(
{atomic, Res};
802 Res ->
803
:-(
{atomic, Res}
804 end.
805
806 -spec outer_transaction(F :: fun(),
807 NRestarts :: 0..10,
808 Reason :: any(), state()) -> {transaction_result(), state()}.
809 outer_transaction(F, NRestarts, _Reason, State) ->
810 18878 sql_query_internal(rdbms_queries:begin_trans(), State),
811 18878 put_state(State),
812 18878 {Result, StackTrace} = apply_transaction_function(F),
813 18878 NewState = erase_state(),
814 18878 case Result of
815 {aborted, Reason} when NRestarts > 0 ->
816 %% Retry outer transaction upto NRestarts times.
817 288 sql_query_internal([<<"rollback;">>], NewState),
818 288 outer_transaction(F, NRestarts - 1, Reason, NewState);
819 {aborted, #{reason := Reason, sql_query := SqlQuery}}
820 when NRestarts =:= 0 ->
821 %% Too many retries of outer transaction.
822
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
823 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
824 last_sql_query => iolist_to_binary(SqlQuery),
825
:-(
stacktrace => StackTrace, state => NewState}),
826
:-(
sql_query_internal([<<"rollback;">>], NewState),
827
:-(
{{aborted, Reason}, NewState};
828 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
829 %% Too many retries of outer transaction.
830 28 ?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
831 restarts => ?MAX_TRANSACTION_RESTARTS,
832 last_abort_reason => Reason, stacktrace => StackTrace,
833
:-(
state => NewState}),
834 28 sql_query_internal([<<"rollback;">>], NewState),
835 28 {{aborted, Reason}, NewState};
836 {'EXIT', Reason} ->
837 %% Abort sql transaction on EXIT from outer txn only.
838 11 sql_query_internal([<<"rollback;">>], NewState),
839 11 {{aborted, Reason}, NewState};
840 Res ->
841 %% Commit successful outer txn
842 18551 sql_query_internal([<<"commit;">>], NewState),
843 18551 {{atomic, Res}, NewState}
844 end.
845
846 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
847 apply_transaction_function(F) ->
848 18878 try
849 18878 {F(), []}
850 catch
851 throw:ThrowResult:StackTrace ->
852 316 {ThrowResult, StackTrace};
853 Class:Reason:StackTrace ->
854 11 ?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
855
:-(
reason => Reason, stacktrace => StackTrace}),
856 11 {{'EXIT', Reason}, StackTrace}
857 end.
858
859 sql_query_internal(Query, #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
860 39139 case mongoose_rdbms_backend:query(DBRef, Query, QueryTimeout) of
861 {error, "No SQL-driver information available."} ->
862
:-(
{updated, 0}; %% workaround for odbc bug
863 Result ->
864 39139 Result
865 end.
866
867 sql_dirty_internal(F, State) ->
868 1731 put_state(State),
869 1731 Result =
870 1731 try F() of
871 Result0 ->
872 1731 {ok, Result0}
873 catch
874 _C:R ->
875
:-(
{error, R}
876 end,
877 1731 {Result, erase_state()}.
878
879 -spec sql_execute(Type :: atom(), Name :: atom(), Params :: [term()], state()) ->
880 {query_result(), state()}.
881 sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
882 %% Postgres allows to prepare statement only once, so we should take care that NewState is updated
883 153702 {StatementRef, NewState} = prepare_statement(Name, State),
884 153702 put_state(NewState),
885 153702 Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, QueryTimeout)
886 catch Class:Reason:StackTrace ->
887 10 ?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
888 class => Class, reason => Reason, params => Params,
889
:-(
stacktrace => StackTrace}),
890 10 erlang:raise(Class, Reason, StackTrace)
891 end,
892 153692 check_execute_result(Type, Res, Name, Params),
893 153376 {Res, NewState}.
894
895 %% Similar check as in sql_query_t
896 check_execute_result(outer_op, _Res, _Name, _Params) ->
897 102374 ok;
898 check_execute_result(nested_op, Res, Name, Params) ->
899 %% Res is not a list (i.e. executes are one query only and one result set only)
900 51318 case Res of
901 {error, Reason} ->
902 316 throw({aborted, #{reason => Reason, statement_name => Name, params => Params}});
903 _ when is_tuple(Res) ->
904 51002 ok
905 end.
906
907 -spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
908 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
909 153702 case maps:get(Name, Prepared, undefined) of
910 undefined ->
911 3985 {_, Table, Fields, Statement} = lookup_statement(Name),
912 3985 {ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
913 3985 {Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
914
915 Ref ->
916 149717 {Ref, State}
917 end.
918
919 lookup_statement(Name) ->
920 3985 case ets:lookup(prepared_statements, Name) of
921 3985 [Rec] -> Rec;
922
:-(
[] -> error({lookup_statement_failed, Name})
923 end.
924
925 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
926 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
927
:-(
{stop, timeout};
928 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
929
:-(
{stop, closed};
930 abort_on_driver_error(_) ->
931 123661 continue.
932
933 -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
934 db_engine(_HostType) ->
935 12302 try mongoose_backend:get_backend_name(global, ?MODULE)
936
:-(
catch error:badarg -> undefined end.
937
938 %% @doc Used to optimise queries for different databases.
939 %% @todo Should be refactored to use host types with this module.
940 %% Also, this parameter should not be global, but pool-name parameterized
941 -spec db_type() -> mssql | generic.
942 db_type() ->
943 29368 case mongoose_config:get_opt(rdbms_server_type) of
944 29368 mssql -> mssql;
945
:-(
_ -> generic
946 end.
947
948 -spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
949 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
950 connect(#{query_timeout := QueryTimeout} = Options, Retry, RetryAfter, MaxRetryDelay) ->
951 530 case mongoose_rdbms_backend:connect(Options, QueryTimeout) of
952 {ok, _} = Ok ->
953 530 Ok;
954 Error when Retry =:= 0 ->
955
:-(
Error;
956 Error ->
957
:-(
SleepFor = rand:uniform(RetryAfter),
958
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
959
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
960
:-(
error => Error, sleep_for => SleepFor}),
961
:-(
timer:sleep(timer:seconds(SleepFor)),
962
:-(
NextRetryDelay = RetryAfter * RetryAfter,
963
:-(
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
964 end.
965
966
967 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
968 schedule_keepalive(KeepaliveInterval) ->
969 530 case KeepaliveInterval of
970 _ when is_integer(KeepaliveInterval) ->
971
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
972 undefined ->
973 530 ok;
974 Other ->
975
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
976
:-(
ok
977 end.
978
979 %% ----- process state access, for convenient tracing
980
981 put_state(State) ->
982 225729 put(?STATE_KEY, State).
983
984 erase_state() ->
985 20609 erase(?STATE_KEY).
986
987 get_state() ->
988 175408 get(?STATE_KEY).
Line Hits Source