./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -ifdef(gen_server_request_id).
50 -type request_id() :: gen_server:request_id().
51 -else.
52 -type request_id() :: term().
53 -endif.
54
55 -export_type([escaped_binary/0,
56 escaped_string/0,
57 escaped_like/0,
58 escaped_integer/0,
59 escaped_boolean/0,
60 escaped_null/0,
61 escaped_value/0,
62 sql_query/0,
63 sql_query_part/0]).
64
65 -export([process_options/1]).
66
67 %% External exports
68 -export([prepare/4,
69 prepared/1,
70 execute/3, execute/4,
71 execute_cast/3, execute_cast/4,
72 execute_request/3, execute_request/4,
73 execute_wrapped_request/4, execute_wrapped_request/5,
74 execute_successfully/3, execute_successfully/4,
75 sql_query/2, sql_query/3,
76 sql_query_cast/2, sql_query_cast/3,
77 sql_query_request/2, sql_query_request/3,
78 sql_transaction/2, sql_transaction/3,
79 sql_transaction_request/2, sql_transaction_request/3,
80 sql_dirty/2, sql_dirty/3,
81 sql_query_t/1,
82 transaction_with_delayed_retry/3,
83 to_bool/1,
84 db_engine/1,
85 db_type/0,
86 use_escaped/1]).
87
88 %% Unicode escaping
89 -export([escape_string/1, use_escaped_string/1]).
90
91 %% Integer escaping
92 -export([escape_integer/1, use_escaped_integer/1]).
93
94 %% Boolean escaping
95 -export([escape_boolean/1, use_escaped_boolean/1]).
96
97 %% LIKE escaping
98 -export([escape_like/1, escape_prepared_like/1, escape_like_prefix/1, use_escaped_like/1]).
99
100 %% BLOB escaping
101 -export([escape_binary/2, unescape_binary/2, use_escaped_binary/1]).
102
103 %% Null escaping
104 %% (to keep uniform pattern of passing values)
105 -export([escape_null/0, use_escaped_null/1]).
106
107 %% count / integra types decoding
108 -export([result_to_integer/1, selected_to_integer/1]).
109
110 -export([character_to_integer/1]).
111
112 %% gen_server callbacks
113 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
114
115 %% External exports
116 -ignore_xref([
117 execute/4, execute_cast/3, execute_cast/4,
118 execute_request/3, execute_request/4,
119 execute_wrapped_request/4, execute_wrapped_request/5,
120 execute_successfully/4,
121 sql_query/3, sql_query_cast/2, sql_query_cast/3,
122 sql_query_request/2, sql_query_request/3,
123 sql_transaction/3, sql_transaction_request/2, sql_transaction_request/3,
124 sql_dirty/3, sql_query_t/1,
125 use_escaped/1,
126 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
127 escape_binary/2, use_escaped_binary/1,
128 escape_integer/1, use_escaped_integer/1,
129 escape_string/1, use_escaped_string/1,
130 escape_boolean/1, use_escaped_boolean/1,
131 escape_null/0, use_escaped_null/1
132 ]).
133
134 %% internal usage
135 -export([get_db_info/1]).
136
137 -include("mongoose.hrl").
138
139 -record(state, {db_ref,
140 prepared = #{} :: #{binary() => term()},
141 keepalive_interval :: undefined | pos_integer(),
142 query_timeout :: pos_integer()
143 }).
144 -type state() :: #state{}.
145
146 -define(DEFAULT_POOL_TAG, default).
147 -define(STATE_KEY, mongoose_rdbms_state).
148 -define(MAX_TRANSACTION_RESTARTS, 10).
149 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
150 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
151 %% The value is arbitrary; supervisor will restart the connection once
152 %% the retry counter runs out. We just attempt to reduce log pollution.
153 -define(CONNECT_RETRIES, 5).
154
155 -type query_name() :: atom().
156 -type query_params() :: [term()].
157 -type request_wrapper() :: fun((fun(() -> T)) -> T).
158 -type rdbms_msg() :: {sql_query, _}
159 | {sql_transaction, fun()}
160 | {sql_dirty, fun()}
161 | {sql_execute, atom(), [binary() | boolean() | integer()]}
162 | {sql_execute_wrapped, atom(), [binary() | boolean() | integer()], request_wrapper()}.
163 -type single_query_result() :: {selected, [tuple()]} |
164 {updated, non_neg_integer() | undefined} |
165 {updated, non_neg_integer(), [tuple()]} |
166 {aborted, Reason :: term()} |
167 {error, Reason :: string() | duplicate_key}.
168 -type query_result() :: single_query_result() | [single_query_result()].
169 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
170 -type dirty_result() :: {ok, any()} | {error, any()}.
171 -export_type([query_name/0, query_result/0, transaction_result/0]).
172
173 -type backend() :: pgsql | mysql | odbc.
174 -type options() :: #{driver := backend(),
175 max_start_interval := pos_integer(),
176 query_timeout := pos_integer(),
177 atom() => any()}.
178
179 -export_type([options/0, backend/0]).
180
181 %%%----------------------------------------------------------------------
182 %%% API
183 %%%----------------------------------------------------------------------
184
185 -spec process_options(map()) -> options().
186 process_options(Opts = #{driver := odbc, settings := _}) ->
187 93 Opts;
188 process_options(Opts = #{host := _Host, database := _DB, username := _User, password := _Pass}) ->
189
:-(
ensure_db_port(process_tls_options(Opts));
190 process_options(Opts) ->
191
:-(
error(#{what => invalid_rdbms_connection_options, options => Opts}).
192
193 process_tls_options(Opts = #{driver := mysql, tls := #{required := _}}) ->
194
:-(
error(#{what => invalid_rdbms_tls_options, options => Opts,
195 text => <<"The 'required' option is not supported for MySQL">>});
196 process_tls_options(Opts = #{driver := pgsql, tls := TLSOpts}) ->
197
:-(
Opts#{tls := maps:merge(#{required => false}, TLSOpts)};
198 process_tls_options(Opts) ->
199
:-(
Opts.
200
201
:-(
ensure_db_port(Opts = #{port := _}) -> Opts;
202
:-(
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
203
:-(
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.
204
205 -spec prepare(
206 query_name(), Table :: binary() | atom(), Fields :: [binary() | atom()], Statement :: iodata()) ->
207 {ok, query_name()} | {error, already_exists}.
208 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
209 25045 prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
210 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
211 21645 prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
212 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
213 25045 true = lists:all(fun is_binary/1, Fields),
214 25045 Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
215 25045 case ets:insert_new(prepared_statements, Tuple) of
216 5565 true -> {ok, Name};
217 19480 false -> {error, already_exists}
218 end.
219
220 -spec prepared(atom()) -> boolean().
221 prepared(Name) ->
222 5788 ets:member(prepared_statements, Name).
223
224 -spec execute(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result().
225 execute(HostType, Name, Parameters) ->
226 32477 execute(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
227
228 -spec execute(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
229 query_result().
230 execute(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) ->
231 143426 sql_call(HostType, PoolTag, {sql_execute, Name, Parameters}).
232
233 -spec execute_cast(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result().
234 execute_cast(HostType, Name, Parameters) ->
235 1 execute_cast(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
236
237 -spec execute_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
238 query_result().
239 execute_cast(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) ->
240 2 sql_cast(HostType, PoolTag, {sql_execute, Name, Parameters}).
241
242 -spec execute_request(mongooseim:host_type_or_global(), query_name(), query_params()) -> request_id().
243 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
244 650 execute_request(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
245
246 -spec execute_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
247 request_id().
248 execute_request(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) ->
249 651 sql_request(HostType, PoolTag, {sql_execute, Name, Parameters}).
250
251 -spec execute_wrapped_request(mongooseim:host_type_or_global(), query_name(), query_params(), request_wrapper()) ->
252 request_id().
253 execute_wrapped_request(HostType, Name, Parameters, Wrapper) ->
254 24 execute_wrapped_request(HostType, ?DEFAULT_POOL_TAG, Name, Parameters, Wrapper).
255
256 -spec execute_wrapped_request(
257 mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params(), request_wrapper()) ->
258 request_id().
259 execute_wrapped_request(HostType, PoolTag, Name, Parameters, Wrapper)
260 when is_atom(PoolTag), is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
261 26 sql_request(HostType, PoolTag, {sql_execute_wrapped, Name, Parameters, Wrapper}).
262
263 %% Same as execute/3, but would fail loudly on any error.
264 -spec execute_successfully(mongooseim:host_type_or_global(), query_name(), query_params()) ->
265 query_result().
266 execute_successfully(HostType, Name, Parameters) ->
267 108721 execute_successfully(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
268
269 -spec execute_successfully(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
270 query_result().
271 execute_successfully(HostType, PoolTag, Name, Parameters) ->
272 108754 try execute(HostType, PoolTag, Name, Parameters) of
273 {selected, _} = Result ->
274 66148 Result;
275 {updated, _} = Result ->
276 42358 Result;
277 Other ->
278
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
279 statement_query => query_name_to_string(Name),
280 statement_params => Parameters, reason => Other},
281
:-(
?LOG_ERROR(Log),
282
:-(
error(Log)
283 catch error:Reason:Stacktrace ->
284
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
285 statement_query => query_name_to_string(Name),
286 statement_params => Parameters,
287 reason => Reason, stacktrace => Stacktrace},
288
:-(
?LOG_ERROR(Log),
289
:-(
erlang:raise(error, Reason, Stacktrace)
290 end.
291
292 query_name_to_string(Name) ->
293
:-(
case ets:lookup(prepared_statements, Name) of
294 [] ->
295
:-(
not_found;
296 [{_, _Table, _Fields, Statement}] ->
297
:-(
Statement
298 end.
299
300 -spec sql_query(mongooseim:host_type_or_global(), Query :: any()) -> query_result().
301 sql_query(HostType, Query) ->
302 1335 sql_query(HostType, ?DEFAULT_POOL_TAG, Query).
303
304 -spec sql_query(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
305 query_result().
306 sql_query(HostType, PoolTag, Query) ->
307 2236 sql_call(HostType, PoolTag, {sql_query, Query}).
308
309 -spec sql_query_request(mongooseim:host_type_or_global(), Query :: any()) -> request_id().
310 sql_query_request(HostType, Query) ->
311 1 sql_query_request(HostType, ?DEFAULT_POOL_TAG, Query).
312
313 -spec sql_query_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
314 request_id().
315 sql_query_request(HostType, PoolTag, Query) ->
316 2 sql_request(HostType, PoolTag, {sql_query, Query}).
317
318 -spec sql_query_cast(mongooseim:host_type_or_global(), Query :: any()) -> query_result().
319 sql_query_cast(HostType, Query) ->
320 1 sql_query_cast(HostType, ?DEFAULT_POOL_TAG, Query).
321
322 -spec sql_query_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
323 query_result().
324 sql_query_cast(HostType, PoolTag, Query) ->
325 2 sql_cast(HostType, PoolTag, {sql_query, Query}).
326
327 %% @doc SQL transaction based on a list of queries
328 -spec sql_transaction(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) ->
329 transaction_result().
330 sql_transaction(HostType, Msg) ->
331 17310 sql_transaction(HostType, ?DEFAULT_POOL_TAG, Msg).
332
333 -spec sql_transaction(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) ->
334 transaction_result().
335 sql_transaction(HostType, PoolTag, Queries) when is_atom(PoolTag), is_list(Queries) ->
336
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
337
:-(
sql_transaction(HostType, PoolTag, F);
338 %% SQL transaction, based on a erlang anonymous function (F = fun)
339 sql_transaction(HostType, PoolTag, F) when is_atom(PoolTag), is_function(F) ->
340 17314 sql_call(HostType, PoolTag, {sql_transaction, F}).
341
342 %% @doc SQL transaction based on a list of queries
343 -spec sql_transaction_request(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) ->
344 request_id().
345 sql_transaction_request(HostType, Queries) ->
346 1 sql_transaction_request(HostType, ?DEFAULT_POOL_TAG, Queries).
347
348 -spec sql_transaction_request(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) ->
349 request_id().
350 sql_transaction_request(HostType, PoolTag, Queries) when is_atom(PoolTag), is_list(Queries) ->
351 2 F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
352 2 sql_transaction_request(HostType, PoolTag, F);
353 %% SQL transaction, based on a erlang anonymous function (F = fun)
354 sql_transaction_request(HostType, PoolTag, F) when is_atom(PoolTag), is_function(F) ->
355 2 sql_request(HostType, PoolTag, {sql_transaction, F}).
356
357 %% This function allows to specify delay between retries.
358 -spec transaction_with_delayed_retry(mongooseim:host_type_or_global(), fun() | maybe_improper_list(), map()) ->
359 transaction_result().
360 transaction_with_delayed_retry(HostType, F, Info) ->
361 412 Retries = maps:get(retries, Info),
362 412 Delay = maps:get(delay, Info),
363 412 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
364
365 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
366 412 Result = mongoose_rdbms:sql_transaction(HostType, F),
367 412 case Result of
368 {atomic, _} ->
369 412 Result;
370 {aborted, Reason} when Retries > 0 ->
371
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
372 text => <<"Transaction aborted. Restart">>,
373
:-(
reason => Reason, retries_left => Retries}),
374
:-(
timer:sleep(Delay),
375
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
376 _ ->
377
:-(
Err = Info#{what => mam_transaction_failed,
378 text => <<"Transaction failed. Do not restart">>,
379 reason => Result},
380
:-(
?LOG_ERROR(Err),
381
:-(
erlang:error(Err)
382 end.
383
384 -spec sql_dirty(mongooseim:host_type_or_global(), fun()) -> any() | no_return().
385 sql_dirty(HostType, F) ->
386
:-(
sql_dirty(HostType, ?DEFAULT_POOL_TAG, F).
387
388 -spec sql_dirty(mongooseim:host_type_or_global(), atom(), fun()) -> any() | no_return().
389 sql_dirty(HostType, PoolTag, F) when is_function(F) ->
390
:-(
case sql_call(HostType, PoolTag, {sql_dirty, F}) of
391
:-(
{ok, Result} -> Result;
392
:-(
{error, Error} -> throw(Error)
393 end.
394
395 %% TODO: Better spec for RPC calls
396 -spec sql_call(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
397 sql_call(HostType, PoolTag, Msg) ->
398 162976 case get_state() of
399 123507 undefined -> sql_call0(HostType, PoolTag, Msg);
400 State ->
401 39469 {Res, NewState} = nested_op(Msg, State),
402 39201 put_state(NewState),
403 39201 Res
404 end.
405
406 -spec sql_call0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
407 sql_call0(HostType, PoolTag, Msg) ->
408 123507 Timestamp = erlang:monotonic_time(millisecond),
409 123507 mongoose_wpool:call(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}).
410
411 -spec sql_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
412 sql_request(HostType, PoolTag, Msg) ->
413 681 case get_state() of
414 659 undefined -> sql_request0(HostType, PoolTag, Msg);
415 State ->
416 22 {Res, NewState} = nested_op(Msg, State),
417
:-(
put_state(NewState),
418
:-(
Res
419 end.
420
421 -spec sql_request0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
422 sql_request0(HostType, PoolTag, Msg) ->
423 659 Timestamp = erlang:monotonic_time(millisecond),
424 659 mongoose_wpool:send_request(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}).
425
426 -spec sql_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
427 sql_cast(HostType, PoolTag, Msg) ->
428 4 case get_state() of
429 4 undefined -> sql_cast0(HostType, PoolTag, Msg);
430 State ->
431
:-(
{Res, NewState} = nested_op(Msg, State),
432
:-(
put_state(NewState),
433
:-(
Res
434 end.
435
436 -spec sql_cast0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
437 sql_cast0(HostType, PoolTag, Msg) ->
438 4 Timestamp = erlang:monotonic_time(millisecond),
439 4 mongoose_wpool:cast(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}).
440
441 -spec get_db_info(Target :: mongooseim:host_type_or_global() | pid()) ->
442 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
443 get_db_info(Pid) when is_pid(Pid) ->
444 650 wpool_process:call(Pid, get_db_info, 5000);
445 get_db_info(HostType) ->
446
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
447
448 %% This function is intended to be used from inside an sql_transaction:
449 sql_query_t(Query) ->
450 4 sql_query_t(Query, get_state()).
451
452 sql_query_t(Query, State) ->
453 4 QRes = sql_query_internal(Query, State),
454 4 case QRes of
455 {error, Reason} ->
456
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
457 _ when is_list(QRes) ->
458
:-(
case lists:keysearch(error, 1, QRes) of
459 {value, {error, Reason}} ->
460
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
461 _ ->
462
:-(
QRes
463 end;
464 _ ->
465 4 QRes
466 end.
467
468 %% Only for binaries, escapes only the content, '%' has to be added afterwards
469 %% The escape character is '$' as '\' causes issues in PostgreSQL
470 %% Returned value is NOT safe to use outside of a prepared statement
471 -spec escape_prepared_like(binary()) -> binary().
472 escape_prepared_like(S) ->
473 32 << (escape_prepared_like_character(C)) || <<C>> <= S >>.
474
475 %% @doc Escape character that will confuse an SQL engine
476 %% Percent and underscore only need to be escaped for
477 %% pattern matching like statement
478 %% INFO: Used in mod_vcard_rdbms.
479 %% Searches in the middle of text, non-efficient
480 -spec escape_like(binary() | string()) -> escaped_like().
481 escape_like(S) ->
482 14 {escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
483
484 -spec escape_like_prefix(binary() | string()) -> escaped_like().
485 escape_like_prefix(S) ->
486
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
487
488 -spec escape_binary(mongooseim:host_type_or_global(), binary()) -> escaped_binary().
489 escape_binary(_HostType, Bin) when is_binary(Bin) ->
490 174 {escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
491
492 %% @doc The same as escape, but returns value including ''
493 -spec escape_string(binary() | string()) -> escaped_string().
494 escape_string(S) ->
495 114 {escaped_string, escape_string_internal(S)}.
496
497 -spec escape_integer(integer()) -> escaped_integer().
498 escape_integer(I) when is_integer(I) ->
499
:-(
{escaped_integer, integer_to_binary(I)}.
500
501 %% Be aware, that we can't just use escaped_integer here.
502 %% Because of the error in pgsql:
503 %% column \"match_all\" is of type boolean but expression is of type integer
504 -spec escape_boolean(boolean()) -> escaped_boolean().
505 escape_boolean(true) ->
506 2 {escaped_boolean, "'1'"};
507 escape_boolean(false) ->
508 2 {escaped_boolean, "'0'"}.
509
510 -spec escape_null() -> escaped_null().
511 escape_null() ->
512 24 {escaped_null, "null"}.
513
514
515 %% @doc SQL-injection check.
516 %% Call this function just before using value from escape_string/1 inside a query.
517 -spec use_escaped_string(escaped_string()) -> sql_query_part().
518 use_escaped_string({escaped_string, S}) ->
519 114 S;
520 use_escaped_string(X) ->
521 %% We need to print an error, because in some places
522 %% the error can be just ignored, because of too wide catches.
523
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
524
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
525
:-(
erlang:error({use_escaped_string, X}).
526
527 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
528 use_escaped_binary({escaped_binary, S}) ->
529 174 S;
530 use_escaped_binary(X) ->
531
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
532
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
533
:-(
erlang:error({use_escaped_binary, X}).
534
535 -spec use_escaped_like(escaped_like()) -> sql_query_part().
536 use_escaped_like({escaped_like, S}) ->
537 14 S;
538 use_escaped_like(X) ->
539
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
540
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
541
:-(
erlang:error({use_escaped_like, X}).
542
543 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
544 use_escaped_integer({escaped_integer, S}) ->
545
:-(
S;
546 use_escaped_integer(X) ->
547
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
548
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
549
:-(
erlang:error({use_escaped_integer, X}).
550
551 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
552 use_escaped_boolean({escaped_boolean, S}) ->
553 4 S;
554 use_escaped_boolean(X) ->
555
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
556
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
557
:-(
erlang:error({use_escaped_boolean, X}).
558
559 -spec use_escaped_null(escaped_null()) -> sql_query_part().
560 use_escaped_null({escaped_null, S}) ->
561 24 S;
562 use_escaped_null(X) ->
563
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
564
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
565
:-(
erlang:error({use_escaped_null, X}).
566
567 %% Use this function, if type is unknown.
568 %% Be aware, you can't pass escaped_like() there.
569 -spec use_escaped(Value) -> sql_query_part() when
570 Value :: escaped_value().
571 use_escaped({escaped_string, _}=X) ->
572 114 use_escaped_string(X);
573 use_escaped({escaped_binary, _}=X) ->
574 174 use_escaped_binary(X);
575 use_escaped({escaped_integer, _}=X) ->
576
:-(
use_escaped_integer(X);
577 use_escaped({escaped_boolean, _}=X) ->
578 4 use_escaped_boolean(X);
579 use_escaped({escaped_null, _}=X) ->
580 24 use_escaped_null(X);
581 use_escaped(X) ->
582
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
583
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
584
:-(
erlang:error({use_escaped, X}).
585
586 -spec escape_prepared_like_character(char()) -> binary().
587 8 escape_prepared_like_character($%) -> <<"$%">>;
588 6 escape_prepared_like_character($_) -> <<"$_">>;
589
:-(
escape_prepared_like_character($$) -> <<"$$">>;
590 119 escape_prepared_like_character(C) -> <<C>>.
591
592 -spec escape_like_internal(binary() | string()) -> binary() | string().
593 escape_like_internal(S) when is_binary(S) ->
594 14 list_to_binary(escape_like_internal(binary_to_list(S)));
595 escape_like_internal(S) when is_list(S) ->
596 14 [escape_like_character(C) || C <- S].
597
598 escape_string_internal(S) ->
599 114 case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
600 true ->
601 114 mongoose_rdbms_backend:escape_string(S);
602 false ->
603 %% generic escaping
604
:-(
[$', escape_characters(S), $']
605 end.
606
607 escape_characters(S) when is_binary(S) ->
608
:-(
list_to_binary(escape_characters(binary_to_list(S)));
609 escape_characters(S) when is_list(S) ->
610
:-(
[escape_character(C) || C <- S].
611
612
:-(
escape_like_character($%) -> "\\%";
613
:-(
escape_like_character($_) -> "\\_";
614 60 escape_like_character(C) -> escape_character(C).
615
616 %% Characters to escape
617
:-(
escape_character($\0) -> "\\0";
618
:-(
escape_character($\n) -> "\\n";
619
:-(
escape_character($\t) -> "\\t";
620
:-(
escape_character($\b) -> "\\b";
621
:-(
escape_character($\r) -> "\\r";
622
:-(
escape_character($') -> "''";
623
:-(
escape_character($") -> "\\\"";
624
:-(
escape_character($\\) -> "\\\\";
625 60 escape_character(C) -> C.
626
627
628 -spec unescape_binary(mongooseim:host_type_or_global(), binary()) -> binary().
629 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
630 7329 mongoose_rdbms_backend:unescape_binary(Bin).
631
632
633 -spec result_to_integer(binary() | integer()) -> integer().
634 result_to_integer(Int) when is_integer(Int) ->
635 32380 Int;
636 result_to_integer(Bin) when is_binary(Bin) ->
637
:-(
binary_to_integer(Bin).
638
639 selected_to_integer({selected, [{BInt}]}) ->
640 9840 result_to_integer(BInt).
641
642 %% Converts a value from a CHAR(1) field to integer
643 2844 character_to_integer(<<X>>) -> X;
644
:-(
character_to_integer(X) when is_integer(X) -> X.
645
646 %% pgsql returns booleans as "t" or "f"
647 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
648 to_bool(B) when is_binary(B) ->
649
:-(
to_bool(binary_to_list(B));
650
:-(
to_bool("t") -> true;
651
:-(
to_bool("true") -> true;
652
:-(
to_bool("1") -> true;
653
:-(
to_bool(true) -> true;
654 209 to_bool(1) -> true;
655 812 to_bool(_) -> false.
656
657 %%%----------------------------------------------------------------------
658 %%% Callback functions from gen_server
659 %%%----------------------------------------------------------------------
660 -spec init(options()) -> {ok, state()}.
661 init(Opts = #{query_timeout := QueryTimeout, max_start_interval := MaxStartInterval}) ->
662 475 process_flag(trap_exit, true),
663 475 KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
664 % retries are delayed exponentially, max_start_interval limits the delay
665 % e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
666 475 case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
667 {ok, DbRef} ->
668 475 schedule_keepalive(KeepaliveInterval),
669 475 {ok, #state{db_ref = DbRef,
670 keepalive_interval = KeepaliveInterval,
671 query_timeout = QueryTimeout}};
672 Error ->
673
:-(
{stop, Error}
674 end.
675
676
677 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
678 124166 {Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
679 124166 case abort_on_driver_error(Result) of
680
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
681 124166 continue -> {reply, Result, NewState}
682 end;
683 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
684 650 {reply, {ok, db_engine(global), DbRef}, State};
685 handle_call(Request, From, State) ->
686
:-(
?UNEXPECTED_CALL(Request, From),
687
:-(
{reply, {error, badarg}, State}.
688
689 handle_cast({sql_cmd, Command, Timestamp}, State) ->
690 4 {Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
691 4 case abort_on_driver_error(Result) of
692
:-(
{stop, Reason} -> {stop, Reason, NewState};
693 4 continue -> {noreply, NewState}
694 end;
695 handle_cast(Request, State) ->
696
:-(
?UNEXPECTED_CAST(Request),
697
:-(
{noreply, State}.
698
699 code_change(_OldVsn, State, _Extra) ->
700
:-(
{ok, State}.
701
702 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
703
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
704 {selected, _} ->
705
:-(
schedule_keepalive(KeepaliveInterval),
706
:-(
{noreply, State};
707 {error, _} = Error ->
708
:-(
{stop, {keepalive_failed, Error}, State}
709 end;
710 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
711
:-(
{stop, Reason, State};
712 handle_info(Info, State) ->
713
:-(
?UNEXPECTED_INFO(Info),
714
:-(
{noreply, State}.
715
716 -spec terminate(Reason :: term(), state()) -> any().
717 terminate(_Reason, #state{db_ref = DbRef}) ->
718 465 catch mongoose_rdbms_backend:disconnect(DbRef).
719
720 %%%----------------------------------------------------------------------
721 %%% Internal functions
722 %%%----------------------------------------------------------------------
723
724 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
725 {Result :: term(), state()}.
726 run_sql_cmd(Command, _From, State, Timestamp) ->
727 124170 Now = erlang:monotonic_time(millisecond),
728 124170 case Now - Timestamp of
729 Age when Age < ?TRANSACTION_TIMEOUT ->
730 124170 outer_op(Command, State);
731 Age ->
732
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
733
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
734
:-(
{reply, {error, timeout}, State}
735 end.
736
737 %% @doc Only called by handle_call, only handles top level operations.
738 -spec outer_op(rdbms_msg(), state()) -> {query_result()
739 | transaction_result()
740 | dirty_result(), state()}.
741 outer_op({sql_query, Query}, State) ->
742 1872 {sql_query_internal(Query, State), State};
743 outer_op({sql_transaction, F}, State) ->
744 17316 outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
745 outer_op({sql_dirty, F}, State) ->
746
:-(
sql_dirty_internal(F, State);
747 outer_op({sql_execute, Name, Params}, State) ->
748 104978 sql_execute(outer_op, Name, Params, State);
749 outer_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
750 4 try
751 4 Wrapper(fun() -> sql_execute(outer_op, Name, Params, State) end)
752 catch
753 _Class:Error ->
754 2 ?LOG_ERROR(#{what => sql_execute_wrapped_failed, reason => Error,
755
:-(
statement_name => Name, wrapper_fun => Wrapper}),
756 2 {{error, Error}, State}
757 end.
758
759 %% @doc Called via sql_query/transaction/bloc from client code when inside a
760 %% nested operation
761 -spec nested_op(rdbms_msg(), state()) -> any().
762 nested_op({sql_query, Query}, State) ->
763 %% XXX - use sql_query_t here insted? Most likely would break
764 %% callers who expect {error, _} tuples (sql_query_t turns
765 %% these into throws)
766 368 {sql_query_internal(Query, State), State};
767 nested_op({sql_transaction, F}, State) ->
768 %% Transaction inside a transaction
769
:-(
inner_transaction(F, State);
770 nested_op({sql_execute, Name, Params}, State) ->
771 39101 sql_execute(nested_op, Name, Params, State);
772 nested_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
773 22 Wrapper(fun() -> sql_execute(nested_op, Name, Params, State) end).
774
775 %% @doc Never retry nested transactions - only outer transactions
776 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
777 inner_transaction(F, _State) ->
778
:-(
case catch F() of
779 {aborted, Reason} ->
780
:-(
{aborted, Reason};
781 {'EXIT', Reason} ->
782
:-(
{'EXIT', Reason};
783 {atomic, Res} ->
784
:-(
{atomic, Res};
785 Res ->
786
:-(
{atomic, Res}
787 end.
788
789 -spec outer_transaction(F :: fun(),
790 NRestarts :: 0..10,
791 Reason :: any(), state()) -> {transaction_result(), state()}.
792 outer_transaction(F, NRestarts, _Reason, State) ->
793 17580 sql_query_internal(rdbms_queries:begin_trans(), State),
794 17580 put_state(State),
795 17580 {Result, StackTrace} = apply_transaction_function(F),
796 17580 NewState = erase_state(),
797 17580 case Result of
798 {aborted, Reason} when NRestarts > 0 ->
799 %% Retry outer transaction upto NRestarts times.
800 264 sql_query_internal([<<"rollback;">>], NewState),
801 264 outer_transaction(F, NRestarts - 1, Reason, NewState);
802 {aborted, #{reason := Reason, sql_query := SqlQuery}}
803 when NRestarts =:= 0 ->
804 %% Too many retries of outer transaction.
805
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
806 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
807 last_sql_query => iolist_to_binary(SqlQuery),
808
:-(
stacktrace => StackTrace, state => NewState}),
809
:-(
sql_query_internal([<<"rollback;">>], NewState),
810
:-(
{{aborted, Reason}, NewState};
811 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
812 %% Too many retries of outer transaction.
813 26 ?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
814 restarts => ?MAX_TRANSACTION_RESTARTS,
815 last_abort_reason => Reason, stacktrace => StackTrace,
816
:-(
state => NewState}),
817 26 sql_query_internal([<<"rollback;">>], NewState),
818 26 {{aborted, Reason}, NewState};
819 {'EXIT', Reason} ->
820 %% Abort sql transaction on EXIT from outer txn only.
821 2 sql_query_internal([<<"rollback;">>], NewState),
822 2 {{aborted, Reason}, NewState};
823 Res ->
824 %% Commit successful outer txn
825 17288 sql_query_internal([<<"commit;">>], NewState),
826 17288 {{atomic, Res}, NewState}
827 end.
828
829 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
830 apply_transaction_function(F) ->
831 17580 try
832 17580 {F(), []}
833 catch
834 throw:ThrowResult:StackTrace ->
835 290 {ThrowResult, StackTrace};
836 Class:Reason:StackTrace ->
837 2 ?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
838
:-(
reason => Reason, stacktrace => StackTrace}),
839 2 {{'EXIT', Reason}, StackTrace}
840 end.
841
842 sql_query_internal(Query, #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
843 37404 case mongoose_rdbms_backend:query(DBRef, Query, QueryTimeout) of
844 {error, "No SQL-driver information available."} ->
845
:-(
{updated, 0}; %% workaround for odbc bug
846 Result ->
847 37404 Result
848 end.
849
850 sql_dirty_internal(F, State) ->
851
:-(
put_state(State),
852
:-(
Result =
853
:-(
try F() of
854 Result0 ->
855
:-(
{ok, Result0}
856 catch
857 _C:R ->
858
:-(
{error, R}
859 end,
860
:-(
{Result, erase_state()}.
861
862 -spec sql_execute(Type :: atom(), query_name(), query_params(), state()) ->
863 {query_result(), state()}.
864 sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
865 %% Postgres allows to prepare statement only once, so we should take care that NewState is updated
866 144103 {StatementRef, NewState} = prepare_statement(Name, State),
867 144103 put_state(NewState),
868 144103 Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, QueryTimeout)
869 catch Class:Reason:StackTrace ->
870
:-(
?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
871 class => Class, reason => Reason, params => Params,
872
:-(
stacktrace => StackTrace}),
873
:-(
erlang:raise(Class, Reason, StackTrace)
874 end,
875 144103 check_execute_result(Type, Res, Name, Params),
876 143813 {Res, NewState}.
877
878 %% Similar check as in sql_query_t
879 check_execute_result(outer_op, _Res, _Name, _Params) ->
880 104980 ok;
881 check_execute_result(nested_op, Res, Name, Params) ->
882 %% Res is not a list (i.e. executes are one query only and one result set only)
883 39123 case Res of
884 {error, Reason} ->
885 290 throw({aborted, #{reason => Reason, statement_name => Name, params => Params}});
886 _ when is_tuple(Res) ->
887 38833 ok
888 end.
889
890 -spec prepare_statement(query_name(), state()) -> {Ref :: term(), state()}.
891 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
892 144103 case maps:get(Name, Prepared, undefined) of
893 undefined ->
894 3432 {_, Table, Fields, Statement} = lookup_statement(Name),
895 3432 {ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
896 3432 {Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
897
898 Ref ->
899 140671 {Ref, State}
900 end.
901
902 lookup_statement(Name) ->
903 3432 case ets:lookup(prepared_statements, Name) of
904 3432 [Rec] -> Rec;
905
:-(
[] -> error({lookup_statement_failed, Name})
906 end.
907
908 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
909 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
910
:-(
{stop, timeout};
911 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
912
:-(
{stop, closed};
913 abort_on_driver_error(_) ->
914 124170 continue.
915
916 -spec db_engine(mongooseim:host_type_or_global()) -> odbc | mysql | pgsql | undefined.
917 db_engine(_HostType) ->
918 11089 try mongoose_backend:get_backend_name(global, ?MODULE)
919
:-(
catch error:badarg -> undefined end.
920
921 %% @doc Used to optimise queries for different databases.
922 %% @todo Should be refactored to use host types with this module.
923 %% Also, this parameter should not be global, but pool-name parameterized
924 -spec db_type() -> mssql | generic.
925 db_type() ->
926 26713 case mongoose_config:get_opt(rdbms_server_type) of
927 26713 mssql -> mssql;
928
:-(
_ -> generic
929 end.
930
931 -spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
932 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
933 connect(#{query_timeout := QueryTimeout} = Options, Retry, RetryAfter, MaxRetryDelay) ->
934 475 case mongoose_rdbms_backend:connect(Options, QueryTimeout) of
935 {ok, _} = Ok ->
936 475 Ok;
937 Error when Retry =:= 0 ->
938
:-(
Error;
939 Error ->
940
:-(
SleepFor = rand:uniform(RetryAfter),
941
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
942
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
943
:-(
error => Error, sleep_for => SleepFor}),
944
:-(
timer:sleep(timer:seconds(SleepFor)),
945
:-(
NextRetryDelay = RetryAfter * RetryAfter,
946
:-(
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
947 end.
948
949
950 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
951 schedule_keepalive(KeepaliveInterval) ->
952 475 case KeepaliveInterval of
953 _ when is_integer(KeepaliveInterval) ->
954
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
955 undefined ->
956 475 ok;
957 Other ->
958
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
959
:-(
ok
960 end.
961
962 %% ----- process state access, for convenient tracing
963
964 put_state(State) ->
965 200884 put(?STATE_KEY, State).
966
967 erase_state() ->
968 17580 erase(?STATE_KEY).
969
970 get_state() ->
971 163665 get(?STATE_KEY).
Line Hits Source