./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -ifdef(gen_server_request_id).
50 -type request_id() :: gen_server:request_id().
51 -else.
52 -type request_id() :: term().
53 -endif.
54
55 -export_type([escaped_binary/0,
56 escaped_string/0,
57 escaped_like/0,
58 escaped_integer/0,
59 escaped_boolean/0,
60 escaped_null/0,
61 escaped_value/0,
62 sql_query/0,
63 sql_query_part/0]).
64
65 -export([process_options/1]).
66
67 %% External exports
68 -export([prepare/4,
69 prepared/1,
70 execute/3, execute/4,
71 execute_cast/3, execute_cast/4,
72 execute_request/3, execute_request/4,
73 execute_wrapped_request/4, execute_wrapped_request/5,
74 execute_successfully/3, execute_successfully/4,
75 sql_query/2, sql_query/3,
76 sql_query_cast/2, sql_query_cast/3,
77 sql_query_request/2, sql_query_request/3,
78 sql_transaction/2, sql_transaction/3,
79 sql_transaction_request/2, sql_transaction_request/3,
80 sql_dirty/2, sql_dirty/3,
81 sql_query_t/1,
82 transaction_with_delayed_retry/3,
83 to_bool/1,
84 db_engine/1,
85 db_type/0,
86 use_escaped/1]).
87
88 %% Unicode escaping
89 -export([escape_string/1, use_escaped_string/1]).
90
91 %% Integer escaping
92 -export([escape_integer/1, use_escaped_integer/1]).
93
94 %% Boolean escaping
95 -export([escape_boolean/1, use_escaped_boolean/1]).
96
97 %% LIKE escaping
98 -export([escape_like/1, escape_prepared_like/1, escape_like_prefix/1, use_escaped_like/1]).
99
100 %% BLOB escaping
101 -export([escape_binary/2, unescape_binary/2, use_escaped_binary/1]).
102
103 %% Null escaping
104 %% (to keep uniform pattern of passing values)
105 -export([escape_null/0, use_escaped_null/1]).
106
107 %% count / integra types decoding
108 -export([result_to_integer/1, selected_to_integer/1]).
109
110 -export([character_to_integer/1]).
111
112 %% gen_server callbacks
113 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
114
115 %% External exports
116 -ignore_xref([
117 execute/4, execute_cast/3, execute_cast/4,
118 execute_request/3, execute_request/4,
119 execute_wrapped_request/4, execute_wrapped_request/5,
120 execute_successfully/4,
121 sql_query/3, sql_query_cast/2, sql_query_cast/3,
122 sql_query_request/2, sql_query_request/3,
123 sql_transaction/3, sql_transaction_request/2, sql_transaction_request/3,
124 sql_dirty/3, sql_query_t/1,
125 use_escaped/1,
126 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
127 escape_binary/2, use_escaped_binary/1,
128 escape_integer/1, use_escaped_integer/1,
129 escape_string/1, use_escaped_string/1,
130 escape_boolean/1, use_escaped_boolean/1,
131 escape_null/0, use_escaped_null/1
132 ]).
133
134 %% internal usage
135 -export([get_db_info/1]).
136
137 -include("mongoose.hrl").
138
139 -record(state, {db_ref,
140 prepared = #{} :: #{binary() => term()},
141 keepalive_interval :: undefined | pos_integer(),
142 query_timeout :: pos_integer()
143 }).
144 -type state() :: #state{}.
145
146 -define(DEFAULT_POOL_TAG, default).
147 -define(STATE_KEY, mongoose_rdbms_state).
148 -define(MAX_TRANSACTION_RESTARTS, 10).
149 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
150 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
151 %% The value is arbitrary; supervisor will restart the connection once
152 %% the retry counter runs out. We just attempt to reduce log pollution.
153 -define(CONNECT_RETRIES, 5).
154
155 -type query_name() :: atom().
156 -type query_params() :: [term()].
157 -type request_wrapper() :: fun((fun(() -> T)) -> T).
158 -type rdbms_msg() :: {sql_query, _}
159 | {sql_transaction, fun()}
160 | {sql_dirty, fun()}
161 | {sql_execute, atom(), [binary() | boolean() | integer()]}
162 | {sql_execute_wrapped, atom(), [binary() | boolean() | integer()], request_wrapper()}.
163 -type single_query_result() :: {selected, [tuple()]} |
164 {updated, non_neg_integer() | undefined} |
165 {updated, non_neg_integer(), [tuple()]} |
166 {aborted, Reason :: term()} |
167 {error, Reason :: string() | duplicate_key}.
168 -type query_result() :: single_query_result() | [single_query_result()].
169 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
170 -type dirty_result() :: {ok, any()} | {error, any()}.
171 -export_type([query_name/0, query_result/0, transaction_result/0]).
172
173 -type options() :: #{driver := pgsql | mysql | odbc,
174 max_start_interval := pos_integer(),
175 query_timeout := pos_integer(),
176 atom() => any()}.
177
178 -export_type([options/0]).
179
180 %%%----------------------------------------------------------------------
181 %%% API
182 %%%----------------------------------------------------------------------
183
184 -spec process_options(map()) -> options().
185 process_options(Opts = #{driver := odbc, settings := _}) ->
186 104 Opts;
187 process_options(Opts = #{host := _Host, database := _DB, username := _User, password := _Pass}) ->
188
:-(
ensure_db_port(process_tls_options(Opts));
189 process_options(Opts) ->
190
:-(
error(#{what => invalid_rdbms_connection_options, options => Opts}).
191
192 process_tls_options(Opts = #{driver := mysql, tls := #{required := _}}) ->
193
:-(
error(#{what => invalid_rdbms_tls_options, options => Opts,
194 text => <<"The 'required' option is not supported for MySQL">>});
195 process_tls_options(Opts = #{driver := pgsql, tls := TLSOpts}) ->
196
:-(
Opts#{tls := maps:merge(#{required => false}, TLSOpts)};
197 process_tls_options(Opts) ->
198
:-(
Opts.
199
200
:-(
ensure_db_port(Opts = #{port := _}) -> Opts;
201
:-(
ensure_db_port(Opts = #{driver := pgsql}) -> Opts#{port => 5432};
202
:-(
ensure_db_port(Opts = #{driver := mysql}) -> Opts#{port => 3306}.
203
204 -spec prepare(
205 query_name(), Table :: binary() | atom(), Fields :: [binary() | atom()], Statement :: iodata()) ->
206 {ok, query_name()} | {error, already_exists}.
207 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
208 29841 prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
209 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
210 26015 prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
211 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
212 29841 true = lists:all(fun is_binary/1, Fields),
213 29841 Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
214 29841 case ets:insert_new(prepared_statements, Tuple) of
215 6471 true -> {ok, Name};
216 23370 false -> {error, already_exists}
217 end.
218
219 -spec prepared(atom()) -> boolean().
220 prepared(Name) ->
221 5595 ets:member(prepared_statements, Name).
222
223 -spec execute(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result().
224 execute(HostType, Name, Parameters) ->
225 30621 execute(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
226
227 -spec execute(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
228 query_result().
229 execute(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) ->
230 159072 sql_call(HostType, PoolTag, {sql_execute, Name, Parameters}).
231
232 -spec execute_cast(mongooseim:host_type_or_global(), query_name(), query_params()) -> query_result().
233 execute_cast(HostType, Name, Parameters) ->
234 1 execute_cast(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
235
236 -spec execute_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
237 query_result().
238 execute_cast(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) ->
239 2 sql_cast(HostType, PoolTag, {sql_execute, Name, Parameters}).
240
241 -spec execute_request(mongooseim:host_type_or_global(), query_name(), query_params()) -> request_id().
242 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
243 647 execute_request(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
244
245 -spec execute_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
246 request_id().
247 execute_request(HostType, PoolTag, Name, Parameters) when is_atom(PoolTag), is_atom(Name), is_list(Parameters) ->
248 648 sql_request(HostType, PoolTag, {sql_execute, Name, Parameters}).
249
250 -spec execute_wrapped_request(mongooseim:host_type_or_global(), query_name(), query_params(), request_wrapper()) ->
251 request_id().
252 execute_wrapped_request(HostType, Name, Parameters, Wrapper) ->
253 24 execute_wrapped_request(HostType, ?DEFAULT_POOL_TAG, Name, Parameters, Wrapper).
254
255 -spec execute_wrapped_request(
256 mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params(), request_wrapper()) ->
257 request_id().
258 execute_wrapped_request(HostType, PoolTag, Name, Parameters, Wrapper)
259 when is_atom(PoolTag), is_atom(Name), is_list(Parameters), is_function(Wrapper) ->
260 26 sql_request(HostType, PoolTag, {sql_execute_wrapped, Name, Parameters, Wrapper}).
261
262 %% Same as execute/3, but would fail loudly on any error.
263 -spec execute_successfully(mongooseim:host_type_or_global(), query_name(), query_params()) ->
264 query_result().
265 execute_successfully(HostType, Name, Parameters) ->
266 125620 execute_successfully(HostType, ?DEFAULT_POOL_TAG, Name, Parameters).
267
268 -spec execute_successfully(mongooseim:host_type_or_global(), mongoose_wpool:tag(), query_name(), query_params()) ->
269 query_result().
270 execute_successfully(HostType, PoolTag, Name, Parameters) ->
271 125644 try execute(HostType, PoolTag, Name, Parameters) of
272 {selected, _} = Result ->
273 79446 Result;
274 {updated, _} = Result ->
275 45659 Result;
276 Other ->
277
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
278 statement_query => query_name_to_string(Name),
279 statement_params => Parameters, reason => Other},
280
:-(
?LOG_ERROR(Log),
281
:-(
error(Log)
282 catch error:Reason:Stacktrace ->
283
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
284 statement_query => query_name_to_string(Name),
285 statement_params => Parameters,
286 reason => Reason, stacktrace => Stacktrace},
287
:-(
?LOG_ERROR(Log),
288
:-(
erlang:raise(error, Reason, Stacktrace)
289 end.
290
291 query_name_to_string(Name) ->
292
:-(
case ets:lookup(prepared_statements, Name) of
293 [] ->
294
:-(
not_found;
295 [{_, _Table, _Fields, Statement}] ->
296
:-(
Statement
297 end.
298
299 -spec sql_query(mongooseim:host_type_or_global(), Query :: any()) -> query_result().
300 sql_query(HostType, Query) ->
301 1907 sql_query(HostType, ?DEFAULT_POOL_TAG, Query).
302
303 -spec sql_query(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
304 query_result().
305 sql_query(HostType, PoolTag, Query) ->
306 2801 sql_call(HostType, PoolTag, {sql_query, Query}).
307
308 -spec sql_query_request(mongooseim:host_type_or_global(), Query :: any()) -> request_id().
309 sql_query_request(HostType, Query) ->
310 1 sql_query_request(HostType, ?DEFAULT_POOL_TAG, Query).
311
312 -spec sql_query_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
313 request_id().
314 sql_query_request(HostType, PoolTag, Query) ->
315 2 sql_request(HostType, PoolTag, {sql_query, Query}).
316
317 -spec sql_query_cast(mongooseim:host_type_or_global(), Query :: any()) -> query_result().
318 sql_query_cast(HostType, Query) ->
319 1 sql_query_cast(HostType, ?DEFAULT_POOL_TAG, Query).
320
321 -spec sql_query_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Query :: any()) ->
322 query_result().
323 sql_query_cast(HostType, PoolTag, Query) ->
324 2 sql_cast(HostType, PoolTag, {sql_query, Query}).
325
326 %% @doc SQL transaction based on a list of queries
327 -spec sql_transaction(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) ->
328 transaction_result().
329 sql_transaction(HostType, Msg) ->
330 18861 sql_transaction(HostType, ?DEFAULT_POOL_TAG, Msg).
331
332 -spec sql_transaction(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) ->
333 transaction_result().
334 sql_transaction(HostType, PoolTag, Queries) when is_atom(PoolTag), is_list(Queries) ->
335
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
336
:-(
sql_transaction(HostType, PoolTag, F);
337 %% SQL transaction, based on a erlang anonymous function (F = fun)
338 sql_transaction(HostType, PoolTag, F) when is_atom(PoolTag), is_function(F) ->
339 18865 sql_call(HostType, PoolTag, {sql_transaction, F}).
340
341 %% @doc SQL transaction based on a list of queries
342 -spec sql_transaction_request(mongooseim:host_type_or_global(), fun() | maybe_improper_list()) ->
343 request_id().
344 sql_transaction_request(HostType, Queries) ->
345 1 sql_transaction_request(HostType, ?DEFAULT_POOL_TAG, Queries).
346
347 -spec sql_transaction_request(mongooseim:host_type_or_global(), atom(), fun() | maybe_improper_list()) ->
348 request_id().
349 sql_transaction_request(HostType, PoolTag, Queries) when is_atom(PoolTag), is_list(Queries) ->
350 2 F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
351 2 sql_transaction_request(HostType, PoolTag, F);
352 %% SQL transaction, based on a erlang anonymous function (F = fun)
353 sql_transaction_request(HostType, PoolTag, F) when is_atom(PoolTag), is_function(F) ->
354 2 sql_request(HostType, PoolTag, {sql_transaction, F}).
355
356 %% This function allows to specify delay between retries.
357 -spec transaction_with_delayed_retry(mongooseim:host_type_or_global(), fun() | maybe_improper_list(), map()) ->
358 transaction_result().
359 transaction_with_delayed_retry(HostType, F, Info) ->
360 297 Retries = maps:get(retries, Info),
361 297 Delay = maps:get(delay, Info),
362 297 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
363
364 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
365 297 Result = mongoose_rdbms:sql_transaction(HostType, F),
366 297 case Result of
367 {atomic, _} ->
368 297 Result;
369 {aborted, Reason} when Retries > 0 ->
370
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
371 text => <<"Transaction aborted. Restart">>,
372
:-(
reason => Reason, retries_left => Retries}),
373
:-(
timer:sleep(Delay),
374
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
375 _ ->
376
:-(
Err = Info#{what => mam_transaction_failed,
377 text => <<"Transaction failed. Do not restart">>,
378 reason => Result},
379
:-(
?LOG_ERROR(Err),
380
:-(
erlang:error(Err)
381 end.
382
383 -spec sql_dirty(mongooseim:host_type_or_global(), fun()) -> any() | no_return().
384 sql_dirty(HostType, F) ->
385 1731 sql_dirty(HostType, ?DEFAULT_POOL_TAG, F).
386
387 -spec sql_dirty(mongooseim:host_type_or_global(), atom(), fun()) -> any() | no_return().
388 sql_dirty(HostType, PoolTag, F) when is_function(F) ->
389 1731 case sql_call(HostType, PoolTag, {sql_dirty, F}) of
390 1731 {ok, Result} -> Result;
391
:-(
{error, Error} -> throw(Error)
392 end.
393
394 %% TODO: Better spec for RPC calls
395 -spec sql_call(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
396 sql_call(HostType, PoolTag, Msg) ->
397 182469 case get_state() of
398 129191 undefined -> sql_call0(HostType, PoolTag, Msg);
399 State ->
400 53278 {Res, NewState} = nested_op(Msg, State),
401 52717 put_state(NewState),
402 52717 Res
403 end.
404
405 -spec sql_call0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
406 sql_call0(HostType, PoolTag, Msg) ->
407 129191 Timestamp = erlang:monotonic_time(millisecond),
408 129191 mongoose_wpool:call(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}).
409
410 -spec sql_request(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
411 sql_request(HostType, PoolTag, Msg) ->
412 678 case get_state() of
413 656 undefined -> sql_request0(HostType, PoolTag, Msg);
414 State ->
415 22 {Res, NewState} = nested_op(Msg, State),
416
:-(
put_state(NewState),
417
:-(
Res
418 end.
419
420 -spec sql_request0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
421 sql_request0(HostType, PoolTag, Msg) ->
422 656 Timestamp = erlang:monotonic_time(millisecond),
423 656 mongoose_wpool:send_request(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}).
424
425 -spec sql_cast(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
426 sql_cast(HostType, PoolTag, Msg) ->
427 4 case get_state() of
428 4 undefined -> sql_cast0(HostType, PoolTag, Msg);
429 State ->
430
:-(
{Res, NewState} = nested_op(Msg, State),
431
:-(
put_state(NewState),
432
:-(
Res
433 end.
434
435 -spec sql_cast0(mongooseim:host_type_or_global(), mongoose_wpool:tag(), Msg :: rdbms_msg()) -> any().
436 sql_cast0(HostType, PoolTag, Msg) ->
437 4 Timestamp = erlang:monotonic_time(millisecond),
438 4 mongoose_wpool:cast(rdbms, HostType, PoolTag, {sql_cmd, Msg, Timestamp}).
439
440 -spec get_db_info(Target :: mongooseim:host_type_or_global() | pid()) ->
441 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
442 get_db_info(Pid) when is_pid(Pid) ->
443 650 wpool_process:call(Pid, get_db_info, 5000);
444 get_db_info(HostType) ->
445
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
446
447 %% This function is intended to be used from inside an sql_transaction:
448 sql_query_t(Query) ->
449 4 sql_query_t(Query, get_state()).
450
451 sql_query_t(Query, State) ->
452 4 QRes = sql_query_internal(Query, State),
453 4 case QRes of
454 {error, Reason} ->
455
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
456 _ when is_list(QRes) ->
457
:-(
case lists:keysearch(error, 1, QRes) of
458 {value, {error, Reason}} ->
459
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
460 _ ->
461
:-(
QRes
462 end;
463 _ ->
464 4 QRes
465 end.
466
467 %% Only for binaries, escapes only the content, '%' has to be added afterwards
468 %% The escape character is '$' as '\' causes issues in PostgreSQL
469 %% Returned value is NOT safe to use outside of a prepared statement
470 -spec escape_prepared_like(binary()) -> binary().
471 escape_prepared_like(S) ->
472 32 << (escape_prepared_like_character(C)) || <<C>> <= S >>.
473
474 %% @doc Escape character that will confuse an SQL engine
475 %% Percent and underscore only need to be escaped for
476 %% pattern matching like statement
477 %% INFO: Used in mod_vcard_rdbms.
478 %% Searches in the middle of text, non-efficient
479 -spec escape_like(binary() | string()) -> escaped_like().
480 escape_like(S) ->
481 14 {escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
482
483 -spec escape_like_prefix(binary() | string()) -> escaped_like().
484 escape_like_prefix(S) ->
485
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
486
487 -spec escape_binary(mongooseim:host_type_or_global(), binary()) -> escaped_binary().
488 escape_binary(_HostType, Bin) when is_binary(Bin) ->
489 174 {escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
490
491 %% @doc The same as escape, but returns value including ''
492 -spec escape_string(binary() | string()) -> escaped_string().
493 escape_string(S) ->
494 114 {escaped_string, escape_string_internal(S)}.
495
496 -spec escape_integer(integer()) -> escaped_integer().
497 escape_integer(I) when is_integer(I) ->
498
:-(
{escaped_integer, integer_to_binary(I)}.
499
500 %% Be aware, that we can't just use escaped_integer here.
501 %% Because of the error in pgsql:
502 %% column \"match_all\" is of type boolean but expression is of type integer
503 -spec escape_boolean(boolean()) -> escaped_boolean().
504 escape_boolean(true) ->
505 2 {escaped_boolean, "'1'"};
506 escape_boolean(false) ->
507 2 {escaped_boolean, "'0'"}.
508
509 -spec escape_null() -> escaped_null().
510 escape_null() ->
511 24 {escaped_null, "null"}.
512
513
514 %% @doc SQL-injection check.
515 %% Call this function just before using value from escape_string/1 inside a query.
516 -spec use_escaped_string(escaped_string()) -> sql_query_part().
517 use_escaped_string({escaped_string, S}) ->
518 114 S;
519 use_escaped_string(X) ->
520 %% We need to print an error, because in some places
521 %% the error can be just ignored, because of too wide catches.
522
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
523
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
524
:-(
erlang:error({use_escaped_string, X}).
525
526 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
527 use_escaped_binary({escaped_binary, S}) ->
528 174 S;
529 use_escaped_binary(X) ->
530
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
531
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
532
:-(
erlang:error({use_escaped_binary, X}).
533
534 -spec use_escaped_like(escaped_like()) -> sql_query_part().
535 use_escaped_like({escaped_like, S}) ->
536 14 S;
537 use_escaped_like(X) ->
538
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
539
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
540
:-(
erlang:error({use_escaped_like, X}).
541
542 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
543 use_escaped_integer({escaped_integer, S}) ->
544
:-(
S;
545 use_escaped_integer(X) ->
546
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
547
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
548
:-(
erlang:error({use_escaped_integer, X}).
549
550 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
551 use_escaped_boolean({escaped_boolean, S}) ->
552 4 S;
553 use_escaped_boolean(X) ->
554
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
555
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
556
:-(
erlang:error({use_escaped_boolean, X}).
557
558 -spec use_escaped_null(escaped_null()) -> sql_query_part().
559 use_escaped_null({escaped_null, S}) ->
560 24 S;
561 use_escaped_null(X) ->
562
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
563
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
564
:-(
erlang:error({use_escaped_null, X}).
565
566 %% Use this function, if type is unknown.
567 %% Be aware, you can't pass escaped_like() there.
568 -spec use_escaped(Value) -> sql_query_part() when
569 Value :: escaped_value().
570 use_escaped({escaped_string, _}=X) ->
571 114 use_escaped_string(X);
572 use_escaped({escaped_binary, _}=X) ->
573 174 use_escaped_binary(X);
574 use_escaped({escaped_integer, _}=X) ->
575
:-(
use_escaped_integer(X);
576 use_escaped({escaped_boolean, _}=X) ->
577 4 use_escaped_boolean(X);
578 use_escaped({escaped_null, _}=X) ->
579 24 use_escaped_null(X);
580 use_escaped(X) ->
581
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
582
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
583
:-(
erlang:error({use_escaped, X}).
584
585 -spec escape_prepared_like_character(char()) -> binary().
586 8 escape_prepared_like_character($%) -> <<"$%">>;
587 6 escape_prepared_like_character($_) -> <<"$_">>;
588
:-(
escape_prepared_like_character($$) -> <<"$$">>;
589 119 escape_prepared_like_character(C) -> <<C>>.
590
591 -spec escape_like_internal(binary() | string()) -> binary() | string().
592 escape_like_internal(S) when is_binary(S) ->
593 14 list_to_binary(escape_like_internal(binary_to_list(S)));
594 escape_like_internal(S) when is_list(S) ->
595 14 [escape_like_character(C) || C <- S].
596
597 escape_string_internal(S) ->
598 114 case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
599 true ->
600 114 mongoose_rdbms_backend:escape_string(S);
601 false ->
602 %% generic escaping
603
:-(
[$', escape_characters(S), $']
604 end.
605
606 escape_characters(S) when is_binary(S) ->
607
:-(
list_to_binary(escape_characters(binary_to_list(S)));
608 escape_characters(S) when is_list(S) ->
609
:-(
[escape_character(C) || C <- S].
610
611
:-(
escape_like_character($%) -> "\\%";
612
:-(
escape_like_character($_) -> "\\_";
613 60 escape_like_character(C) -> escape_character(C).
614
615 %% Characters to escape
616
:-(
escape_character($\0) -> "\\0";
617
:-(
escape_character($\n) -> "\\n";
618
:-(
escape_character($\t) -> "\\t";
619
:-(
escape_character($\b) -> "\\b";
620
:-(
escape_character($\r) -> "\\r";
621
:-(
escape_character($') -> "''";
622
:-(
escape_character($") -> "\\\"";
623
:-(
escape_character($\\) -> "\\\\";
624 60 escape_character(C) -> C.
625
626
627 -spec unescape_binary(mongooseim:host_type_or_global(), binary()) -> binary().
628 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
629 6973 mongoose_rdbms_backend:unescape_binary(Bin).
630
631
632 -spec result_to_integer(binary() | integer()) -> integer().
633 result_to_integer(Int) when is_integer(Int) ->
634 35295 Int;
635 result_to_integer(Bin) when is_binary(Bin) ->
636
:-(
binary_to_integer(Bin).
637
638 selected_to_integer({selected, [{BInt}]}) ->
639 10183 result_to_integer(BInt).
640
641 %% Converts a value from a CHAR(1) field to integer
642 3036 character_to_integer(<<X>>) -> X;
643
:-(
character_to_integer(X) when is_integer(X) -> X.
644
645 %% pgsql returns booleans as "t" or "f"
646 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
647 to_bool(B) when is_binary(B) ->
648
:-(
to_bool(binary_to_list(B));
649
:-(
to_bool("t") -> true;
650
:-(
to_bool("true") -> true;
651
:-(
to_bool("1") -> true;
652
:-(
to_bool(true) -> true;
653 209 to_bool(1) -> true;
654 812 to_bool(_) -> false.
655
656 %%%----------------------------------------------------------------------
657 %%% Callback functions from gen_server
658 %%%----------------------------------------------------------------------
659 -spec init(options()) -> {ok, state()}.
660 init(Opts = #{query_timeout := QueryTimeout, max_start_interval := MaxStartInterval}) ->
661 530 process_flag(trap_exit, true),
662 530 KeepaliveInterval = maps:get(keepalive_interval, Opts, undefined),
663 % retries are delayed exponentially, max_start_interval limits the delay
664 % e.g. if the limit is 30, the delays are: 2, 4, 8, 16, 30, 30, ...
665 530 case connect(Opts, ?CONNECT_RETRIES, 2, MaxStartInterval) of
666 {ok, DbRef} ->
667 530 schedule_keepalive(KeepaliveInterval),
668 530 {ok, #state{db_ref = DbRef,
669 keepalive_interval = KeepaliveInterval,
670 query_timeout = QueryTimeout}};
671 Error ->
672
:-(
{stop, Error}
673 end.
674
675
676 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
677 129847 {Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
678 129847 case abort_on_driver_error(Result) of
679
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
680 129847 continue -> {reply, Result, NewState}
681 end;
682 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
683 650 {reply, {ok, db_engine(global), DbRef}, State};
684 handle_call(Request, From, State) ->
685
:-(
?UNEXPECTED_CALL(Request, From),
686
:-(
{reply, {error, badarg}, State}.
687
688 handle_cast({sql_cmd, Command, Timestamp}, State) ->
689 4 {Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
690 4 case abort_on_driver_error(Result) of
691
:-(
{stop, Reason} -> {stop, Reason, NewState};
692 4 continue -> {noreply, NewState}
693 end;
694 handle_cast(Request, State) ->
695
:-(
?UNEXPECTED_CAST(Request),
696
:-(
{noreply, State}.
697
698 code_change(_OldVsn, State, _Extra) ->
699
:-(
{ok, State}.
700
701 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
702
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
703 {selected, _} ->
704
:-(
schedule_keepalive(KeepaliveInterval),
705
:-(
{noreply, State};
706 {error, _} = Error ->
707
:-(
{stop, {keepalive_failed, Error}, State}
708 end;
709 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
710
:-(
{stop, Reason, State};
711 handle_info(Info, State) ->
712
:-(
?UNEXPECTED_INFO(Info),
713
:-(
{noreply, State}.
714
715 -spec terminate(Reason :: term(), state()) -> any().
716 terminate(_Reason, #state{db_ref = DbRef}) ->
717 520 catch mongoose_rdbms_backend:disconnect(DbRef).
718
719 %%%----------------------------------------------------------------------
720 %%% Internal functions
721 %%%----------------------------------------------------------------------
722
723 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
724 {Result :: term(), state()}.
725 run_sql_cmd(Command, _From, State, Timestamp) ->
726 129851 Now = erlang:monotonic_time(millisecond),
727 129851 case Now - Timestamp of
728 Age when Age < ?TRANSACTION_TIMEOUT ->
729 129851 outer_op(Command, State);
730 Age ->
731
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
732
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
733
:-(
{reply, {error, timeout}, State}
734 end.
735
736 %% @doc Only called by handle_call, only handles top level operations.
737 -spec outer_op(rdbms_msg(), state()) -> {query_result()
738 | transaction_result()
739 | dirty_result(), state()}.
740 outer_op({sql_query, Query}, State) ->
741 1861 {sql_query_internal(Query, State), State};
742 outer_op({sql_transaction, F}, State) ->
743 18867 outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
744 outer_op({sql_dirty, F}, State) ->
745 1731 sql_dirty_internal(F, State);
746 outer_op({sql_execute, Name, Params}, State) ->
747 107388 sql_execute(outer_op, Name, Params, State);
748 outer_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
749 4 try
750 4 Wrapper(fun() -> sql_execute(outer_op, Name, Params, State) end)
751 catch
752 _Class:Error ->
753 2 ?LOG_ERROR(#{what => sql_execute_wrapped_failed, reason => Error,
754
:-(
statement_name => Name, wrapper_fun => Wrapper}),
755 2 {{error, Error}, State}
756 end.
757
758 %% @doc Called via sql_query/transaction/bloc from client code when inside a
759 %% nested operation
760 -spec nested_op(rdbms_msg(), state()) -> any().
761 nested_op({sql_query, Query}, State) ->
762 %% XXX - use sql_query_t here insted? Most likely would break
763 %% callers who expect {error, _} tuples (sql_query_t turns
764 %% these into throws)
765 944 {sql_query_internal(Query, State), State};
766 nested_op({sql_transaction, F}, State) ->
767 %% Transaction inside a transaction
768
:-(
inner_transaction(F, State);
769 nested_op({sql_execute, Name, Params}, State) ->
770 52334 sql_execute(nested_op, Name, Params, State);
771 nested_op({sql_execute_wrapped, Name, Params, Wrapper}, State) ->
772 22 Wrapper(fun() -> sql_execute(nested_op, Name, Params, State) end).
773
774 %% @doc Never retry nested transactions - only outer transactions
775 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
776 inner_transaction(F, _State) ->
777
:-(
case catch F() of
778 {aborted, Reason} ->
779
:-(
{aborted, Reason};
780 {'EXIT', Reason} ->
781
:-(
{'EXIT', Reason};
782 {atomic, Res} ->
783
:-(
{atomic, Res};
784 Res ->
785
:-(
{atomic, Res}
786 end.
787
788 -spec outer_transaction(F :: fun(),
789 NRestarts :: 0..10,
790 Reason :: any(), state()) -> {transaction_result(), state()}.
791 outer_transaction(F, NRestarts, _Reason, State) ->
792 19398 sql_query_internal(rdbms_queries:begin_trans(), State),
793 19398 put_state(State),
794 19398 {Result, StackTrace} = apply_transaction_function(F),
795 19398 NewState = erase_state(),
796 19398 case Result of
797 {aborted, Reason} when NRestarts > 0 ->
798 %% Retry outer transaction upto NRestarts times.
799 531 sql_query_internal([<<"rollback;">>], NewState),
800 531 outer_transaction(F, NRestarts - 1, Reason, NewState);
801 {aborted, #{reason := Reason, sql_query := SqlQuery}}
802 when NRestarts =:= 0 ->
803 %% Too many retries of outer transaction.
804
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
805 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
806 last_sql_query => iolist_to_binary(SqlQuery),
807
:-(
stacktrace => StackTrace, state => NewState}),
808
:-(
sql_query_internal([<<"rollback;">>], NewState),
809
:-(
{{aborted, Reason}, NewState};
810 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
811 %% Too many retries of outer transaction.
812 52 ?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
813 restarts => ?MAX_TRANSACTION_RESTARTS,
814 last_abort_reason => Reason, stacktrace => StackTrace,
815
:-(
state => NewState}),
816 52 sql_query_internal([<<"rollback;">>], NewState),
817 52 {{aborted, Reason}, NewState};
818 {'EXIT', Reason} ->
819 %% Abort sql transaction on EXIT from outer txn only.
820 2 sql_query_internal([<<"rollback;">>], NewState),
821 2 {{aborted, Reason}, NewState};
822 Res ->
823 %% Commit successful outer txn
824 18813 sql_query_internal([<<"commit;">>], NewState),
825 18813 {{atomic, Res}, NewState}
826 end.
827
828 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
829 apply_transaction_function(F) ->
830 19398 try
831 19398 {F(), []}
832 catch
833 throw:ThrowResult:StackTrace ->
834 583 {ThrowResult, StackTrace};
835 Class:Reason:StackTrace ->
836 2 ?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
837
:-(
reason => Reason, stacktrace => StackTrace}),
838 2 {{'EXIT', Reason}, StackTrace}
839 end.
840
841 sql_query_internal(Query, #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
842 41605 case mongoose_rdbms_backend:query(DBRef, Query, QueryTimeout) of
843 {error, "No SQL-driver information available."} ->
844
:-(
{updated, 0}; %% workaround for odbc bug
845 Result ->
846 41605 Result
847 end.
848
849 sql_dirty_internal(F, State) ->
850 1731 put_state(State),
851 1731 Result =
852 1731 try F() of
853 Result0 ->
854 1731 {ok, Result0}
855 catch
856 _C:R ->
857
:-(
{error, R}
858 end,
859 1731 {Result, erase_state()}.
860
861 -spec sql_execute(Type :: atom(), query_name(), query_params(), state()) ->
862 {query_result(), state()}.
863 sql_execute(Type, Name, Params, State = #state{db_ref = DBRef, query_timeout = QueryTimeout}) ->
864 %% Postgres allows to prepare statement only once, so we should take care that NewState is updated
865 159746 {StatementRef, NewState} = prepare_statement(Name, State),
866 159746 put_state(NewState),
867 159746 Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, QueryTimeout)
868 catch Class:Reason:StackTrace ->
869
:-(
?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
870 class => Class, reason => Reason, params => Params,
871
:-(
stacktrace => StackTrace}),
872
:-(
erlang:raise(Class, Reason, StackTrace)
873 end,
874 159746 check_execute_result(Type, Res, Name, Params),
875 159163 {Res, NewState}.
876
877 %% Similar check as in sql_query_t
878 check_execute_result(outer_op, _Res, _Name, _Params) ->
879 107390 ok;
880 check_execute_result(nested_op, Res, Name, Params) ->
881 %% Res is not a list (i.e. executes are one query only and one result set only)
882 52356 case Res of
883 {error, Reason} ->
884 583 throw({aborted, #{reason => Reason, statement_name => Name, params => Params}});
885 _ when is_tuple(Res) ->
886 51773 ok
887 end.
888
889 -spec prepare_statement(query_name(), state()) -> {Ref :: term(), state()}.
890 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
891 159746 case maps:get(Name, Prepared, undefined) of
892 undefined ->
893 4138 {_, Table, Fields, Statement} = lookup_statement(Name),
894 4138 {ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
895 4138 {Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
896
897 Ref ->
898 155608 {Ref, State}
899 end.
900
901 lookup_statement(Name) ->
902 4138 case ets:lookup(prepared_statements, Name) of
903 4138 [Rec] -> Rec;
904
:-(
[] -> error({lookup_statement_failed, Name})
905 end.
906
907 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
908 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
909
:-(
{stop, timeout};
910 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
911
:-(
{stop, closed};
912 abort_on_driver_error(_) ->
913 129851 continue.
914
915 -spec db_engine(mongooseim:host_type_or_global()) -> odbc | mysql | pgsql | undefined.
916 db_engine(_HostType) ->
917 12576 try mongoose_backend:get_backend_name(global, ?MODULE)
918
:-(
catch error:badarg -> undefined end.
919
920 %% @doc Used to optimise queries for different databases.
921 %% @todo Should be refactored to use host types with this module.
922 %% Also, this parameter should not be global, but pool-name parameterized
923 -spec db_type() -> mssql | generic.
924 db_type() ->
925 29998 case mongoose_config:get_opt(rdbms_server_type) of
926 29998 mssql -> mssql;
927
:-(
_ -> generic
928 end.
929
930 -spec connect(options(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
931 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
932 connect(#{query_timeout := QueryTimeout} = Options, Retry, RetryAfter, MaxRetryDelay) ->
933 530 case mongoose_rdbms_backend:connect(Options, QueryTimeout) of
934 {ok, _} = Ok ->
935 530 Ok;
936 Error when Retry =:= 0 ->
937
:-(
Error;
938 Error ->
939
:-(
SleepFor = rand:uniform(RetryAfter),
940
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
941
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
942
:-(
error => Error, sleep_for => SleepFor}),
943
:-(
timer:sleep(timer:seconds(SleepFor)),
944
:-(
NextRetryDelay = RetryAfter * RetryAfter,
945
:-(
connect(Options, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
946 end.
947
948
949 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
950 schedule_keepalive(KeepaliveInterval) ->
951 530 case KeepaliveInterval of
952 _ when is_integer(KeepaliveInterval) ->
953
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
954 undefined ->
955 530 ok;
956 Other ->
957
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
958
:-(
ok
959 end.
960
961 %% ----- process state access, for convenient tracing
962
963 put_state(State) ->
964 233592 put(?STATE_KEY, State).
965
966 erase_state() ->
967 21129 erase(?STATE_KEY).
968
969 get_state() ->
970 183155 get(?STATE_KEY).
Line Hits Source