./ct_report/coverage/mongoose_rdbms.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : mongoose_rdbms.erl
3 %%% Author : Alexey Shchepin <alexey@process-one.net>
4 %%% Purpose : Serve RDBMS connection
5 %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%% Copyright 2016 Erlang Solutions Ltd.
10 %%%
11 %%% This program is free software; you can redistribute it and/or
12 %%% modify it under the terms of the GNU General Public License as
13 %%% published by the Free Software Foundation; either version 2 of the
14 %%% License, or (at your option) any later version.
15 %%%
16 %%% This program is distributed in the hope that it will be useful,
17 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
18 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 %%% General Public License for more details.
20 %%%
21 %%% You should have received a copy of the GNU General Public License
22 %%% along with this program; if not, write to the Free Software
23 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
24 %%%
25 %%%----------------------------------------------------------------------
26
27 -module(mongoose_rdbms).
28 -author('alexey@process-one.net').
29 -author('konrad.zemek@erlang-solutions.com').
30
31 -behaviour(gen_server).
32
33 %% Part of SQL query string, produced by use_escaped/1 function
34 -type sql_query_part() :: iodata().
35 -type sql_query() :: iodata().
36
37 %% Blob data type to be used inside SQL queries
38 -opaque escaped_binary() :: {escaped_binary, sql_query_part()}.
39 %% Unicode string to be used inside SQL queries
40 -opaque escaped_string() :: {escaped_string, sql_query_part()}.
41 %% Unicode string to be used inside LIKE conditions
42 -opaque escaped_like() :: {escaped_like, sql_query_part()}.
43 -opaque escaped_integer() :: {escaped_integer, sql_query_part()}.
44 -opaque escaped_boolean() :: {escaped_boolean, sql_query_part()}.
45 -opaque escaped_null() :: {escaped_null, sql_query_part()}.
46 -opaque escaped_value() :: escaped_string() | escaped_binary() | escaped_integer() |
47 escaped_boolean() | escaped_null().
48
49 -export_type([escaped_binary/0,
50 escaped_string/0,
51 escaped_like/0,
52 escaped_integer/0,
53 escaped_boolean/0,
54 escaped_null/0,
55 escaped_value/0,
56 sql_query/0,
57 sql_query_part/0]).
58
59 %% External exports
60 -export([prepare/4,
61 prepared/1,
62 execute/3,
63 execute_cast/3,
64 execute_request/3,
65 execute_successfully/3,
66 sql_query/2,
67 sql_query_cast/2,
68 sql_query_request/2,
69 sql_query_t/1,
70 sql_transaction/2,
71 transaction_with_delayed_retry/3,
72 sql_dirty/2,
73 to_bool/1,
74 db_engine/1,
75 db_type/0,
76 use_escaped/1]).
77
78 %% Unicode escaping
79 -export([escape_string/1,
80 use_escaped_string/1]).
81
82 %% Integer escaping
83 -export([escape_integer/1,
84 use_escaped_integer/1]).
85
86 %% Boolean escaping
87 -export([escape_boolean/1,
88 use_escaped_boolean/1]).
89
90 %% LIKE escaping
91 -export([escape_like/1,
92 escape_prepared_like/1,
93 escape_like_prefix/1,
94 use_escaped_like/1]).
95
96 %% BLOB escaping
97 -export([escape_binary/2,
98 unescape_binary/2,
99 use_escaped_binary/1]).
100
101 %% Null escaping
102 %% (to keep uniform pattern of passing values)
103 -export([escape_null/0,
104 use_escaped_null/1]).
105
106 %% count / integra types decoding
107 -export([result_to_integer/1,
108 selected_to_integer/1]).
109
110 -export([character_to_integer/1]).
111
112 %% gen_server callbacks
113 -export([init/1,
114 handle_call/3,
115 handle_cast/2,
116 handle_info/2,
117 terminate/2,
118 code_change/3]).
119
120 -ignore_xref([sql_query_cast/2, sql_query_request/2, execute_cast/3, execute_request/3,
121 sql_query_t/1, use_escaped/1,
122 escape_like/1, escape_like_prefix/1, use_escaped_like/1,
123 escape_binary/2, use_escaped_binary/1,
124 escape_integer/1, use_escaped_integer/1,
125 escape_string/1, use_escaped_string/1,
126 escape_boolean/1, use_escaped_boolean/1,
127 escape_null/0, use_escaped_null/1
128 ]).
129
130 %% internal usage
131 -export([get_db_info/1]).
132
133 -include("mongoose.hrl").
134
135 -record(state, {db_ref,
136 prepared = #{} :: #{binary() => term()},
137 keepalive_interval :: undefined | pos_integer()
138 }).
139 -type state() :: #state{}.
140
141 -define(STATE_KEY, mongoose_rdbms_state).
142 -define(MAX_TRANSACTION_RESTARTS, 10).
143 -define(TRANSACTION_TIMEOUT, 60000). % milliseconds
144 -define(KEEPALIVE_QUERY, <<"SELECT 1;">>).
145 -define(QUERY_TIMEOUT, 5000).
146 %% The value is arbitrary; supervisor will restart the connection once
147 %% the retry counter runs out. We just attempt to reduce log pollution.
148 -define(CONNECT_RETRIES, 5).
149
150 -type server() :: mongooseim:host_type() | global.
151 -type rdbms_msg() :: {sql_query, _}
152 | {sql_transaction, fun()}
153 | {sql_dirty, fun()}
154 | {sql_execute, atom(), [binary() | boolean() | integer()]}.
155 -type single_query_result() :: {selected, [tuple()]} |
156 {updated, non_neg_integer() | undefined} |
157 {updated, non_neg_integer(), [tuple()]} |
158 {aborted, Reason :: term()} |
159 {error, Reason :: string() | duplicate_key}.
160 -type query_result() :: single_query_result() | [single_query_result()].
161 -type transaction_result() :: {aborted, _} | {atomic, _} | {error, _}.
162 -type dirty_result() :: {ok, any()} | {error, any()}.
163 -export_type([query_result/0,
164 server/0]).
165
166 %%%----------------------------------------------------------------------
167 %%% API
168 %%%----------------------------------------------------------------------
169
170 -spec prepare(Name, Table :: binary() | atom(), Fields :: [binary() | atom()],
171 Statement :: iodata()) ->
172 {ok, Name} | {error, already_exists}
173 when Name :: atom().
174 prepare(Name, Table, Fields, Statement) when is_atom(Table) ->
175
:-(
prepare(Name, atom_to_binary(Table, utf8), Fields, Statement);
176 prepare(Name, Table, [Field | _] = Fields, Statement) when is_atom(Field) ->
177
:-(
prepare(Name, Table, [atom_to_binary(F, utf8) || F <- Fields], Statement);
178 prepare(Name, Table, Fields, Statement) when is_atom(Name), is_binary(Table) ->
179
:-(
true = lists:all(fun is_binary/1, Fields),
180
:-(
Tuple = {Name, Table, Fields, iolist_to_binary(Statement)},
181
:-(
case ets:insert_new(prepared_statements, Tuple) of
182
:-(
true -> {ok, Name};
183
:-(
false -> {error, already_exists}
184 end.
185
186 -spec prepared(atom()) -> boolean().
187 prepared(Name) ->
188
:-(
ets:member(prepared_statements, Name).
189
190 -spec execute(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
191 query_result().
192 execute(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
193
:-(
sql_call(HostType, {sql_execute, Name, Parameters}).
194
195 -spec execute_cast(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
196 query_result().
197 execute_cast(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
198
:-(
sql_cast(HostType, {sql_execute, Name, Parameters}).
199
200 -spec execute_request(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
201 reference().
202 execute_request(HostType, Name, Parameters) when is_atom(Name), is_list(Parameters) ->
203
:-(
sql_request(HostType, {sql_execute, Name, Parameters}).
204
205 %% Same as execute/3, but would fail loudly on any error.
206 -spec execute_successfully(HostType :: server(), Name :: atom(), Parameters :: [term()]) ->
207 query_result().
208 execute_successfully(HostType, Name, Parameters) ->
209
:-(
try execute(HostType, Name, Parameters) of
210 {selected, _} = Result ->
211
:-(
Result;
212 {updated, _} = Result ->
213
:-(
Result;
214 Other ->
215
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
216 statement_query => query_name_to_string(Name),
217 statement_params => Parameters, reason => Other},
218
:-(
?LOG_ERROR(Log),
219
:-(
error(Log)
220 catch error:Reason:Stacktrace ->
221
:-(
Log = #{what => sql_execute_failed, host => HostType, statement_name => Name,
222 statement_query => query_name_to_string(Name),
223 statement_params => Parameters,
224 reason => Reason, stacktrace => Stacktrace},
225
:-(
?LOG_ERROR(Log),
226
:-(
erlang:raise(error, Reason, Stacktrace)
227 end.
228
229 query_name_to_string(Name) ->
230
:-(
case ets:lookup(prepared_statements, Name) of
231 [] ->
232
:-(
not_found;
233 [{_, _Table, _Fields, Statement}] ->
234
:-(
Statement
235 end.
236
237 -spec sql_query(HostType :: server(), Query :: any()) -> query_result().
238 sql_query(HostType, Query) ->
239
:-(
sql_call(HostType, {sql_query, Query}).
240
241 -spec sql_query_request(HostType :: server(), Query :: any()) -> reference().
242 sql_query_request(HostType, Query) ->
243
:-(
sql_request(HostType, {sql_query, Query}).
244
245 -spec sql_query_cast(HostType :: server(), Query :: any()) -> query_result().
246 sql_query_cast(HostType, Query) ->
247
:-(
sql_cast(HostType, {sql_query, Query}).
248
249 %% @doc SQL transaction based on a list of queries
250 -spec sql_transaction(server(), fun() | maybe_improper_list()) -> transaction_result().
251 sql_transaction(HostType, Queries) when is_list(Queries) ->
252
:-(
F = fun() -> lists:map(fun sql_query_t/1, Queries) end,
253
:-(
sql_transaction(HostType, F);
254 %% SQL transaction, based on a erlang anonymous function (F = fun)
255 sql_transaction(HostType, F) when is_function(F) ->
256 270 sql_call(HostType, {sql_transaction, F}).
257
258 %% This function allows to specify delay between retries.
259 -spec transaction_with_delayed_retry(server(), fun() | maybe_improper_list(), map()) -> transaction_result().
260 transaction_with_delayed_retry(HostType, F, Info) ->
261
:-(
Retries = maps:get(retries, Info),
262
:-(
Delay = maps:get(delay, Info),
263
:-(
do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info).
264
265 do_transaction_with_delayed_retry(HostType, F, Retries, Delay, Info) ->
266
:-(
Result = mongoose_rdbms:sql_transaction(HostType, F),
267
:-(
case Result of
268 {atomic, _} ->
269
:-(
Result;
270 {aborted, Reason} when Retries > 0 ->
271
:-(
?LOG_WARNING(Info#{what => rdbms_transaction_aborted,
272 text => <<"Transaction aborted. Restart">>,
273
:-(
reason => Reason, retries_left => Retries}),
274
:-(
timer:sleep(Delay),
275
:-(
do_transaction_with_delayed_retry(HostType, F, Retries - 1, Delay, Info);
276 _ ->
277
:-(
Err = Info#{what => mam_transaction_failed,
278 text => <<"Transaction failed. Do not restart">>,
279 reason => Result},
280
:-(
?LOG_ERROR(Err),
281
:-(
erlang:error(Err)
282 end.
283
284 -spec sql_dirty(server(), fun()) -> any() | no_return().
285 sql_dirty(HostType, F) when is_function(F) ->
286
:-(
case sql_call(HostType, {sql_dirty, F}) of
287
:-(
{ok, Result} -> Result;
288
:-(
{error, Error} -> throw(Error)
289 end.
290
291 %% TODO: Better spec for RPC calls
292 -spec sql_call(HostType :: server(), Msg :: rdbms_msg()) -> any().
293 sql_call(HostType, Msg) ->
294 270 case get_state() of
295 270 undefined -> sql_call0(HostType, Msg);
296 State ->
297
:-(
{Res, NewState} = nested_op(Msg, State),
298
:-(
put_state(NewState),
299
:-(
Res
300 end.
301
302 -spec sql_call0(HostType :: server(), Msg :: rdbms_msg()) -> any().
303 sql_call0(HostType, Msg) ->
304 270 Timestamp = erlang:monotonic_time(millisecond),
305 270 mongoose_wpool:call(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
306
307 -spec sql_request(HostType :: server(), Msg :: rdbms_msg()) -> any().
308 sql_request(HostType, Msg) ->
309
:-(
case get_state() of
310
:-(
undefined -> sql_request0(HostType, Msg);
311 State ->
312
:-(
{Res, NewState} = nested_op(Msg, State),
313
:-(
put_state(NewState),
314
:-(
Res
315 end.
316
317 -spec sql_request0(HostType :: server(), Msg :: rdbms_msg()) -> any().
318 sql_request0(HostType, Msg) ->
319
:-(
Timestamp = erlang:monotonic_time(millisecond),
320
:-(
mongoose_wpool:send_request(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
321
322 -spec sql_cast(HostType :: server(), Msg :: rdbms_msg()) -> any().
323 sql_cast(HostType, Msg) ->
324
:-(
case get_state() of
325
:-(
undefined -> sql_cast0(HostType, Msg);
326 State ->
327
:-(
{Res, NewState} = nested_op(Msg, State),
328
:-(
put_state(NewState),
329
:-(
Res
330 end.
331
332 -spec sql_cast0(HostType :: server(), Msg :: rdbms_msg()) -> any().
333 sql_cast0(HostType, Msg) ->
334
:-(
Timestamp = erlang:monotonic_time(millisecond),
335
:-(
mongoose_wpool:cast(rdbms, HostType, {sql_cmd, Msg, Timestamp}).
336
337 -spec get_db_info(Target :: server() | pid()) ->
338 {ok, DbType :: atom(), DbRef :: term()} | {error, any()}.
339 get_db_info(Pid) when is_pid(Pid) ->
340
:-(
wpool_process:call(Pid, get_db_info, 5000);
341 get_db_info(HostType) ->
342
:-(
mongoose_wpool:call(rdbms, HostType, get_db_info).
343
344 %% This function is intended to be used from inside an sql_transaction:
345 sql_query_t(Query) ->
346
:-(
sql_query_t(Query, get_state()).
347
348 sql_query_t(Query, State) ->
349
:-(
QRes = sql_query_internal(Query, State),
350
:-(
case QRes of
351 {error, Reason} ->
352
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
353 _ when is_list(QRes) ->
354
:-(
case lists:keysearch(error, 1, QRes) of
355 {value, {error, Reason}} ->
356
:-(
throw({aborted, #{reason => Reason, sql_query => Query}});
357 _ ->
358
:-(
QRes
359 end;
360 _ ->
361
:-(
QRes
362 end.
363
364 %% Only for binaries, escapes only the content, '%' has to be added afterwards
365 %% The escape character is '$' as '\' causes issues in PostgreSQL
366 %% Returned value is NOT safe to use outside of a prepared statement
367 -spec escape_prepared_like(binary()) -> binary().
368 escape_prepared_like(S) ->
369
:-(
<< (escape_prepared_like_character(C)) || <<C>> <= S >>.
370
371 %% @doc Escape character that will confuse an SQL engine
372 %% Percent and underscore only need to be escaped for
373 %% pattern matching like statement
374 %% INFO: Used in mod_vcard_rdbms.
375 %% Searches in the middle of text, non-efficient
376 -spec escape_like(binary() | string()) -> escaped_like().
377 escape_like(S) ->
378
:-(
{escaped_like, [$', $%, escape_like_internal(S), $%, $']}.
379
380 -spec escape_like_prefix(binary() | string()) -> escaped_like().
381 escape_like_prefix(S) ->
382
:-(
{escaped_like, [$', escape_like_internal(S), $%, $']}.
383
384 -spec escape_binary(server(), binary()) -> escaped_binary().
385 escape_binary(_HostType, Bin) when is_binary(Bin) ->
386
:-(
{escaped_binary, mongoose_rdbms_backend:escape_binary(Bin)}.
387
388 %% @doc The same as escape, but returns value including ''
389 -spec escape_string(binary() | string()) -> escaped_string().
390 escape_string(S) ->
391
:-(
{escaped_string, escape_string_internal(S)}.
392
393 -spec escape_integer(integer()) -> escaped_integer().
394 escape_integer(I) when is_integer(I) ->
395
:-(
{escaped_integer, integer_to_binary(I)}.
396
397 %% Be aware, that we can't just use escaped_integer here.
398 %% Because of the error in pgsql:
399 %% column \"match_all\" is of type boolean but expression is of type integer
400 -spec escape_boolean(boolean()) -> escaped_boolean().
401 escape_boolean(true) ->
402
:-(
{escaped_boolean, "'1'"};
403 escape_boolean(false) ->
404
:-(
{escaped_boolean, "'0'"}.
405
406 -spec escape_null() -> escaped_null().
407 escape_null() ->
408
:-(
{escaped_null, "null"}.
409
410
411 %% @doc SQL-injection check.
412 %% Call this function just before using value from escape_string/1 inside a query.
413 -spec use_escaped_string(escaped_string()) -> sql_query_part().
414 use_escaped_string({escaped_string, S}) ->
415
:-(
S;
416 use_escaped_string(X) ->
417 %% We need to print an error, because in some places
418 %% the error can be just ignored, because of too wide catches.
419
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
420
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
421
:-(
erlang:error({use_escaped_string, X}).
422
423 -spec use_escaped_binary(escaped_binary()) -> sql_query_part().
424 use_escaped_binary({escaped_binary, S}) ->
425
:-(
S;
426 use_escaped_binary(X) ->
427
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
428
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
429
:-(
erlang:error({use_escaped_binary, X}).
430
431 -spec use_escaped_like(escaped_like()) -> sql_query_part().
432 use_escaped_like({escaped_like, S}) ->
433
:-(
S;
434 use_escaped_like(X) ->
435
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
436
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
437
:-(
erlang:error({use_escaped_like, X}).
438
439 -spec use_escaped_integer(escaped_integer()) -> sql_query_part().
440 use_escaped_integer({escaped_integer, S}) ->
441
:-(
S;
442 use_escaped_integer(X) ->
443
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
444
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
445
:-(
erlang:error({use_escaped_integer, X}).
446
447 -spec use_escaped_boolean(escaped_boolean()) -> sql_query_part().
448 use_escaped_boolean({escaped_boolean, S}) ->
449
:-(
S;
450 use_escaped_boolean(X) ->
451
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
452
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
453
:-(
erlang:error({use_escaped_boolean, X}).
454
455 -spec use_escaped_null(escaped_null()) -> sql_query_part().
456 use_escaped_null({escaped_null, S}) ->
457
:-(
S;
458 use_escaped_null(X) ->
459
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
460
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
461
:-(
erlang:error({use_escaped_null, X}).
462
463 %% Use this function, if type is unknown.
464 %% Be aware, you can't pass escaped_like() there.
465 -spec use_escaped(Value) -> sql_query_part() when
466 Value :: escaped_value().
467 use_escaped({escaped_string, _}=X) ->
468
:-(
use_escaped_string(X);
469 use_escaped({escaped_binary, _}=X) ->
470
:-(
use_escaped_binary(X);
471 use_escaped({escaped_integer, _}=X) ->
472
:-(
use_escaped_integer(X);
473 use_escaped({escaped_boolean, _}=X) ->
474
:-(
use_escaped_boolean(X);
475 use_escaped({escaped_null, _}=X) ->
476
:-(
use_escaped_null(X);
477 use_escaped(X) ->
478
:-(
?LOG_ERROR(#{what => rdbms_use_escaped_failure, value => X,
479
:-(
stacktrace => erlang:process_info(self(), current_stacktrace)}),
480
:-(
erlang:error({use_escaped, X}).
481
482 -spec escape_prepared_like_character(char()) -> binary().
483
:-(
escape_prepared_like_character($%) -> <<"$%">>;
484
:-(
escape_prepared_like_character($_) -> <<"$_">>;
485
:-(
escape_prepared_like_character($$) -> <<"$$">>;
486
:-(
escape_prepared_like_character(C) -> <<C>>.
487
488 -spec escape_like_internal(binary() | string()) -> binary() | string().
489 escape_like_internal(S) when is_binary(S) ->
490
:-(
list_to_binary(escape_like_internal(binary_to_list(S)));
491 escape_like_internal(S) when is_list(S) ->
492
:-(
[escape_like_character(C) || C <- S].
493
494 escape_string_internal(S) ->
495
:-(
case mongoose_backend:is_exported(global, ?MODULE, escape_string, 1) of
496 true ->
497
:-(
mongoose_rdbms_backend:escape_string(S);
498 false ->
499 %% generic escaping
500
:-(
[$', escape_characters(S), $']
501 end.
502
503 escape_characters(S) when is_binary(S) ->
504
:-(
list_to_binary(escape_characters(binary_to_list(S)));
505 escape_characters(S) when is_list(S) ->
506
:-(
[escape_character(C) || C <- S].
507
508
:-(
escape_like_character($%) -> "\\%";
509
:-(
escape_like_character($_) -> "\\_";
510
:-(
escape_like_character(C) -> escape_character(C).
511
512 %% Characters to escape
513
:-(
escape_character($\0) -> "\\0";
514
:-(
escape_character($\n) -> "\\n";
515
:-(
escape_character($\t) -> "\\t";
516
:-(
escape_character($\b) -> "\\b";
517
:-(
escape_character($\r) -> "\\r";
518
:-(
escape_character($') -> "''";
519
:-(
escape_character($") -> "\\\"";
520
:-(
escape_character($\\) -> "\\\\";
521
:-(
escape_character(C) -> C.
522
523
524 -spec unescape_binary(server(), binary()) -> binary().
525 unescape_binary(_HostType, Bin) when is_binary(Bin) ->
526
:-(
mongoose_rdbms_backend:unescape_binary(Bin).
527
528
529 -spec result_to_integer(binary() | integer()) -> integer().
530 result_to_integer(Int) when is_integer(Int) ->
531
:-(
Int;
532 result_to_integer(Bin) when is_binary(Bin) ->
533
:-(
binary_to_integer(Bin).
534
535 selected_to_integer({selected, [{BInt}]}) ->
536
:-(
result_to_integer(BInt).
537
538 %% Converts a value from a CHAR(1) field to integer
539
:-(
character_to_integer(<<X>>) -> X;
540
:-(
character_to_integer(X) when is_integer(X) -> X.
541
542 %% pgsql returns booleans as "t" or "f"
543 -spec to_bool(binary() | string() | atom() | integer() | any()) -> boolean().
544 to_bool(B) when is_binary(B) ->
545
:-(
to_bool(binary_to_list(B));
546
:-(
to_bool("t") -> true;
547
:-(
to_bool("true") -> true;
548
:-(
to_bool("1") -> true;
549
:-(
to_bool(true) -> true;
550
:-(
to_bool(1) -> true;
551
:-(
to_bool(_) -> false.
552
553 %%%----------------------------------------------------------------------
554 %%% Callback functions from gen_server
555 %%%----------------------------------------------------------------------
556 -spec init(term()) -> {ok, state()}.
557 init(Opts) ->
558
:-(
process_flag(trap_exit, true),
559
:-(
{server, Settings} = lists:keyfind(server, 1, Opts),
560
:-(
KeepaliveInterval = proplists:get_value(keepalive_interval, Opts),
561 % retries are delayed exponentially, this param limits the delay
562 % so if we start with 2 and try 6 times, we have 2, 4, 8, 16, 30
563
:-(
MaxStartInterval = proplists:get_value(start_interval, Opts, 30),
564
:-(
case connect(Settings, ?CONNECT_RETRIES, 2, MaxStartInterval) of
565 {ok, DbRef} ->
566
:-(
schedule_keepalive(KeepaliveInterval),
567
:-(
{ok, #state{db_ref = DbRef, keepalive_interval = KeepaliveInterval}};
568 Error ->
569
:-(
{stop, Error}
570 end.
571
572
573 handle_call({sql_cmd, Command, Timestamp}, From, State) ->
574
:-(
{Result, NewState} = run_sql_cmd(Command, From, State, Timestamp),
575
:-(
case abort_on_driver_error(Result) of
576
:-(
{stop, Reason} -> {stop, Reason, Result, NewState};
577
:-(
continue -> {reply, Result, NewState}
578 end;
579 handle_call(get_db_info, _, #state{db_ref = DbRef} = State) ->
580
:-(
{reply, {ok, db_engine(global), DbRef}, State};
581 handle_call(Request, From, State) ->
582
:-(
?UNEXPECTED_CALL(Request, From),
583
:-(
{reply, {error, badarg}, State}.
584
585 handle_cast({sql_cmd, Command, Timestamp}, State) ->
586
:-(
{Result, NewState} = run_sql_cmd(Command, undefined, State, Timestamp),
587
:-(
case abort_on_driver_error(Result) of
588
:-(
{stop, Reason} -> {stop, Reason, NewState};
589
:-(
continue -> {noreply, NewState}
590 end;
591 handle_cast(Request, State) ->
592
:-(
?UNEXPECTED_CAST(Request),
593
:-(
{noreply, State}.
594
595 code_change(_OldVsn, State, _Extra) ->
596
:-(
{ok, State}.
597
598 handle_info(keepalive, #state{keepalive_interval = KeepaliveInterval} = State) ->
599
:-(
case sql_query_internal([?KEEPALIVE_QUERY], State) of
600 {selected, _} ->
601
:-(
schedule_keepalive(KeepaliveInterval),
602
:-(
{noreply, State};
603 {error, _} = Error ->
604
:-(
{stop, {keepalive_failed, Error}, State}
605 end;
606 handle_info({'EXIT', _Pid, _Reason} = Reason, State) ->
607
:-(
{stop, Reason, State};
608 handle_info(Info, State) ->
609
:-(
?UNEXPECTED_INFO(Info),
610
:-(
{noreply, State}.
611
612 -spec terminate(Reason :: term(), state()) -> any().
613 terminate(_Reason, #state{db_ref = DbRef}) ->
614
:-(
catch mongoose_rdbms_backend:disconnect(DbRef).
615
616 %%%----------------------------------------------------------------------
617 %%% Internal functions
618 %%%----------------------------------------------------------------------
619
620 -spec run_sql_cmd(Command :: any(), From :: any(), State :: state(), Timestamp :: integer()) ->
621 {Result :: term(), state()}.
622 run_sql_cmd(Command, _From, State, Timestamp) ->
623
:-(
Now = erlang:monotonic_time(millisecond),
624
:-(
case Now - Timestamp of
625 Age when Age < ?TRANSACTION_TIMEOUT ->
626
:-(
outer_op(Command, State);
627 Age ->
628
:-(
?LOG_ERROR(#{what => rdbms_db_not_available_or_too_slow,
629
:-(
text => <<"Discarding request">>, age => Age, command => Command}),
630
:-(
{reply, {error, timeout}, State}
631 end.
632
633 %% @doc Only called by handle_call, only handles top level operations.
634 -spec outer_op(rdbms_msg(), state()) -> {query_result()
635 | transaction_result()
636 | dirty_result(), state()}.
637 outer_op({sql_query, Query}, State) ->
638
:-(
{sql_query_internal(Query, State), State};
639 outer_op({sql_transaction, F}, State) ->
640
:-(
outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "", State);
641 outer_op({sql_dirty, F}, State) ->
642
:-(
sql_dirty_internal(F, State);
643 outer_op({sql_execute, Name, Params}, State) ->
644
:-(
sql_execute(Name, Params, State).
645
646 %% @doc Called via sql_query/transaction/bloc from client code when inside a
647 %% nested operation
648 -spec nested_op(rdbms_msg(), state()) -> any().
649 nested_op({sql_query, Query}, State) ->
650 %% XXX - use sql_query_t here insted? Most likely would break
651 %% callers who expect {error, _} tuples (sql_query_t turns
652 %% these into throws)
653
:-(
{sql_query_internal(Query, State), State};
654 nested_op({sql_transaction, F}, State) ->
655 %% Transaction inside a transaction
656
:-(
inner_transaction(F, State);
657 nested_op({sql_execute, Name, Params}, State) ->
658
:-(
sql_execute(Name, Params, State).
659
660 %% @doc Never retry nested transactions - only outer transactions
661 -spec inner_transaction(fun(), state()) -> transaction_result() | {'EXIT', any()}.
662 inner_transaction(F, _State) ->
663
:-(
case catch F() of
664 {aborted, Reason} ->
665
:-(
{aborted, Reason};
666 {'EXIT', Reason} ->
667
:-(
{'EXIT', Reason};
668 {atomic, Res} ->
669
:-(
{atomic, Res};
670 Res ->
671
:-(
{atomic, Res}
672 end.
673
674 -spec outer_transaction(F :: fun(),
675 NRestarts :: 0..10,
676 Reason :: any(), state()) -> {transaction_result(), state()}.
677 outer_transaction(F, NRestarts, _Reason, State) ->
678
:-(
sql_query_internal(rdbms_queries:begin_trans(), State),
679
:-(
put_state(State),
680
:-(
{Result, StackTrace} = apply_transaction_function(F),
681
:-(
NewState = erase_state(),
682
:-(
case Result of
683 {aborted, Reason} when NRestarts > 0 ->
684 %% Retry outer transaction upto NRestarts times.
685
:-(
sql_query_internal([<<"rollback;">>], NewState),
686
:-(
outer_transaction(F, NRestarts - 1, Reason, NewState);
687 {aborted, #{reason := Reason, sql_query := SqlQuery}}
688 when NRestarts =:= 0 ->
689 %% Too many retries of outer transaction.
690
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
691 restarts => ?MAX_TRANSACTION_RESTARTS, last_abort_reason => Reason,
692 last_sql_query => iolist_to_binary(SqlQuery),
693
:-(
stacktrace => StackTrace, state => NewState}),
694
:-(
sql_query_internal([<<"rollback;">>], NewState),
695
:-(
{{aborted, Reason}, NewState};
696 {aborted, Reason} when NRestarts =:= 0 -> %% old format for abort
697 %% Too many retries of outer transaction.
698
:-(
?LOG_ERROR(#{what => rdbms_sql_transaction_restarts_exceeded,
699 restarts => ?MAX_TRANSACTION_RESTARTS,
700 last_abort_reason => Reason, stacktrace => StackTrace,
701
:-(
state => NewState}),
702
:-(
sql_query_internal([<<"rollback;">>], NewState),
703
:-(
{{aborted, Reason}, NewState};
704 {'EXIT', Reason} ->
705 %% Abort sql transaction on EXIT from outer txn only.
706
:-(
sql_query_internal([<<"rollback;">>], NewState),
707
:-(
{{aborted, Reason}, NewState};
708 Res ->
709 %% Commit successful outer txn
710
:-(
sql_query_internal([<<"commit;">>], NewState),
711
:-(
{{atomic, Res}, NewState}
712 end.
713
714 -spec apply_transaction_function(F :: fun()) -> {any(), list()}.
715 apply_transaction_function(F) ->
716
:-(
try
717
:-(
{F(), []}
718 catch
719 throw:ThrowResult:StackTrace ->
720
:-(
{ThrowResult, StackTrace};
721 Class:Reason:StackTrace ->
722
:-(
?LOG_ERROR(#{what => rdbms_outer_transaction_failed, class => Class,
723
:-(
reason => Reason, stacktrace => StackTrace}),
724
:-(
{{'EXIT', Reason}, StackTrace}
725 end.
726
727 sql_query_internal(Query, #state{db_ref = DBRef}) ->
728
:-(
case mongoose_rdbms_backend:query(DBRef, Query, ?QUERY_TIMEOUT) of
729 {error, "No SQL-driver information available."} ->
730
:-(
{updated, 0}; %% workaround for odbc bug
731 Result ->
732
:-(
Result
733 end.
734
735 sql_dirty_internal(F, State) ->
736
:-(
put_state(State),
737
:-(
Result =
738
:-(
try F() of
739 Result0 ->
740
:-(
{ok, Result0}
741 catch
742 _C:R ->
743
:-(
{error, R}
744 end,
745
:-(
{Result, erase_state()}.
746
747 -spec sql_execute(Name :: atom(), Params :: [term()], state()) -> {query_result(), state()}.
748 sql_execute(Name, Params, State = #state{db_ref = DBRef}) ->
749
:-(
{StatementRef, NewState} = prepare_statement(Name, State),
750
:-(
Res = try mongoose_rdbms_backend:execute(DBRef, StatementRef, Params, ?QUERY_TIMEOUT)
751 catch Class:Reason:StackTrace ->
752
:-(
?LOG_ERROR(#{what => rdbms_sql_execute_failed, statement_name => Name,
753 class => Class, reason => Reason, params => Params,
754
:-(
stacktrace => StackTrace}),
755
:-(
erlang:raise(Class, Reason, StackTrace)
756 end,
757
:-(
{Res, NewState}.
758
759 -spec prepare_statement(Name :: atom(), state()) -> {Ref :: term(), state()}.
760 prepare_statement(Name, State = #state{db_ref = DBRef, prepared = Prepared}) ->
761
:-(
case maps:get(Name, Prepared, undefined) of
762 undefined ->
763
:-(
{_, Table, Fields, Statement} = lookup_statement(Name),
764
:-(
{ok, Ref} = mongoose_rdbms_backend:prepare(DBRef, Name, Table, Fields, Statement),
765
:-(
{Ref, State#state{prepared = maps:put(Name, Ref, Prepared)}};
766
767 Ref ->
768
:-(
{Ref, State}
769 end.
770
771 lookup_statement(Name) ->
772
:-(
case ets:lookup(prepared_statements, Name) of
773
:-(
[Rec] -> Rec;
774
:-(
[] -> error({lookup_statement_failed, Name})
775 end.
776
777 -spec abort_on_driver_error(_) -> continue | {stop, timeout | closed}.
778 abort_on_driver_error({error, "query timed out"}) -> %% mysql driver error
779
:-(
{stop, timeout};
780 abort_on_driver_error({error, "Failed sending data on socket" ++ _}) -> %% mysql driver error
781
:-(
{stop, closed};
782 abort_on_driver_error(_) ->
783
:-(
continue.
784
785 -spec db_engine(HostType :: server()) -> odbc | mysql | pgsql | undefined.
786 db_engine(_HostType) ->
787 4 try mongoose_backend:get_backend_name(global, ?MODULE)
788 4 catch error:badarg -> undefined end.
789
790 %% @doc Used to optimise queries for different databases.
791 %% @todo Should be refactored to use host types with this module.
792 %% Also, this parameter should not be global, but pool-name parameterized
793 -spec db_type() -> mssql | generic.
794 db_type() ->
795
:-(
case mongoose_config:get_opt(rdbms_server_type) of
796
:-(
mssql -> mssql;
797
:-(
_ -> generic
798 end.
799
800 -spec connect(Settings :: tuple(), Retry :: non_neg_integer(), RetryAfter :: non_neg_integer(),
801 MaxRetryDelay :: non_neg_integer()) -> {ok, term()} | {error, any()}.
802 connect(Settings, Retry, RetryAfter, MaxRetryDelay) ->
803
:-(
case mongoose_rdbms_backend:connect(Settings, ?QUERY_TIMEOUT) of
804 {ok, _} = Ok ->
805
:-(
Ok;
806 Error when Retry =:= 0 ->
807
:-(
Error;
808 Error ->
809
:-(
SleepFor = rand:uniform(RetryAfter),
810
:-(
Backend = mongoose_backend:get_backend_name(global, ?MODULE),
811
:-(
?LOG_ERROR(#{what => rdbms_connection_attempt_error, backend => Backend,
812
:-(
error => Error, sleep_for => SleepFor}),
813
:-(
timer:sleep(timer:seconds(SleepFor)),
814
:-(
NextRetryDelay = RetryAfter * RetryAfter,
815
:-(
connect(Settings, Retry - 1, min(MaxRetryDelay, NextRetryDelay), MaxRetryDelay)
816 end.
817
818
819 -spec schedule_keepalive(KeepaliveInterval :: undefined | pos_integer()) -> any().
820 schedule_keepalive(KeepaliveInterval) ->
821
:-(
case KeepaliveInterval of
822 _ when is_integer(KeepaliveInterval) ->
823
:-(
erlang:send_after(timer:seconds(KeepaliveInterval), self(), keepalive);
824 undefined ->
825
:-(
ok;
826 Other ->
827
:-(
?LOG_ERROR(#{what => rdbms_wrong_keepalive_interval, reason => Other}),
828
:-(
ok
829 end.
830
831 %% ----- process state access, for convenient tracing
832
833 put_state(State) ->
834
:-(
put(?STATE_KEY, State).
835
836 erase_state() ->
837
:-(
erase(?STATE_KEY).
838
839 get_state() ->
840 270 get(?STATE_KEY).
Line Hits Source