1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : rdbms_queries.erl |
3 |
|
%%% Author : Mickael Remond <mremond@process-one.net> |
4 |
|
%%% Purpose : RDBMS queries dependind on back-end |
5 |
|
%%% Created : by Mickael Remond <mremond@process-one.net> |
6 |
|
%%% |
7 |
|
%%% |
8 |
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne |
9 |
|
%%% |
10 |
|
%%% This program is free software; you can redistribute it and/or |
11 |
|
%%% modify it under the terms of the GNU General Public License as |
12 |
|
%%% published by the Free Software Foundation; either version 2 of the |
13 |
|
%%% License, or (at your option) any later version. |
14 |
|
%%% |
15 |
|
%%% This program is distributed in the hope that it will be useful, |
16 |
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 |
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
18 |
|
%%% General Public License for more details. |
19 |
|
%%% |
20 |
|
%%% You should have received a copy of the GNU General Public License |
21 |
|
%%% along with this program; if not, write to the Free Software |
22 |
|
%%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
23 |
|
%%% |
24 |
|
%%%---------------------------------------------------------------------- |
25 |
|
|
26 |
|
-module(rdbms_queries). |
27 |
|
-author("mremond@process-one.net"). |
28 |
|
|
29 |
|
-export([get_db_type/0, |
30 |
|
begin_trans/0, |
31 |
|
get_db_specific_limits/0, |
32 |
|
get_db_specific_limits/1, |
33 |
|
get_db_specific_limits_binaries/0, |
34 |
|
get_db_specific_limits_binaries/1, |
35 |
|
get_db_specific_offset/2, |
36 |
|
add_limit_arg/2, |
37 |
|
limit_offset_sql/0, |
38 |
|
limit_offset_args/2, |
39 |
|
sql_transaction/2, |
40 |
|
count_records_where/3, |
41 |
|
create_bulk_insert_query/3]). |
42 |
|
|
43 |
|
-export([join/2, |
44 |
|
prepare_upsert/6, |
45 |
|
prepare_upsert/7, |
46 |
|
execute_upsert/5, |
47 |
|
request_upsert/5]). |
48 |
|
|
49 |
|
-ignore_xref([ |
50 |
|
count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 |
51 |
|
]). |
52 |
|
|
53 |
|
%% We have only two compile time options for db queries: |
54 |
|
%%-define(generic, true). |
55 |
|
%%-define(mssql, true). |
56 |
|
-ifndef(mssql). |
57 |
|
-undef(generic). |
58 |
|
-endif. |
59 |
|
|
60 |
|
-define(RDBMS_TYPE, (mongoose_rdbms:db_type())). |
61 |
|
|
62 |
|
-include("mongoose.hrl"). |
63 |
|
|
64 |
|
% |
65 |
|
%% ----------------- |
66 |
|
%% Common functions |
67 |
|
|
68 |
|
join([], _Sep) -> |
69 |
:-( |
[]; |
70 |
|
join([H|T], Sep) -> |
71 |
6072 |
[H, [[Sep, X] || X <- T]]. |
72 |
|
|
73 |
|
%% ----------------- |
74 |
|
%% Generic queries |
75 |
|
|
76 |
|
get_db_type() -> |
77 |
:-( |
?RDBMS_TYPE. |
78 |
|
|
79 |
|
-spec execute_upsert(Host :: mongoose_rdbms:server(), |
80 |
|
Name :: atom(), |
81 |
|
InsertParams :: [any()], |
82 |
|
UpdateParams :: [any()], |
83 |
|
UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). |
84 |
|
execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
85 |
1162 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
86 |
|
{mysql, _} -> |
87 |
:-( |
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); |
88 |
|
{pgsql, _} -> |
89 |
1162 |
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); |
90 |
|
{odbc, mssql} -> |
91 |
:-( |
mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
92 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
93 |
|
end. |
94 |
|
|
95 |
|
-spec request_upsert(Host :: mongoose_rdbms:server(), |
96 |
|
Name :: atom(), |
97 |
|
InsertParams :: [any()], |
98 |
|
UpdateParams :: [any()], |
99 |
|
UniqueKeyValues :: [any()]) -> reference(). |
100 |
|
request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
101 |
337 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
102 |
|
{mysql, _} -> |
103 |
:-( |
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams); |
104 |
|
{pgsql, _} -> |
105 |
337 |
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams); |
106 |
|
{odbc, mssql} -> |
107 |
:-( |
mongoose_rdbms:execute_request(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
108 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
109 |
|
end. |
110 |
|
|
111 |
|
%% @doc |
112 |
|
%% This functions prepares query for inserting a row or updating it if already exists |
113 |
|
%% Updates can be either fields or literal expressions like "column = tab.column + 1" |
114 |
|
-spec prepare_upsert(Host :: mongoose_rdbms:server(), |
115 |
|
QueryName :: atom(), |
116 |
|
TableName :: atom(), |
117 |
|
InsertFields :: [binary()], |
118 |
|
Updates :: [binary() | {assignment | expression, binary(), binary()}], |
119 |
|
UniqueKeyFields :: [binary()]) -> |
120 |
|
{ok, QueryName :: atom()} | {error, already_exists}. |
121 |
|
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) -> |
122 |
1241 |
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none). |
123 |
|
|
124 |
|
-spec prepare_upsert(Host :: mongoose_rdbms:server(), |
125 |
|
QueryName :: atom(), |
126 |
|
TableName :: atom(), |
127 |
|
InsertFields :: [ColumnName :: binary()], |
128 |
|
Updates :: [binary() | {assignment | expression, binary(), binary()}], |
129 |
|
UniqueKeyFields :: [binary()], |
130 |
|
IncrementalField :: none | binary()) -> |
131 |
|
{ok, QueryName :: atom()} | {error, already_exists}. |
132 |
|
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
133 |
1340 |
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), |
134 |
1340 |
Query = iolist_to_binary(SQL), |
135 |
1340 |
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), |
136 |
1340 |
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), |
137 |
1340 |
mongoose_rdbms:prepare(Name, Table, Fields, Query). |
138 |
|
|
139 |
|
prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> |
140 |
1340 |
UpdateFields = lists:filtermap(fun get_field_name/1, Updates), |
141 |
1340 |
case mongoose_rdbms:db_type() of |
142 |
|
mssql -> |
143 |
:-( |
UniqueKeyFields ++ InsertFields ++ UpdateFields; |
144 |
1340 |
_ -> InsertFields ++ UpdateFields |
145 |
|
end. |
146 |
|
|
147 |
|
upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
148 |
1340 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
149 |
|
{mysql, _} -> |
150 |
:-( |
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); |
151 |
|
{pgsql, _} -> |
152 |
1340 |
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); |
153 |
|
{odbc, mssql} -> |
154 |
:-( |
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields); |
155 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
156 |
|
end. |
157 |
|
|
158 |
|
mysql_and_pgsql_insert(Table, Columns) -> |
159 |
1340 |
JoinedFields = join(Columns, <<", ">>), |
160 |
1340 |
Placeholders = lists:duplicate(length(Columns), $?), |
161 |
1340 |
["INSERT INTO ", atom_to_binary(Table, utf8), " (", |
162 |
|
JoinedFields, |
163 |
|
") VALUES (", |
164 |
|
join(Placeholders, ", "), |
165 |
|
")"]. |
166 |
|
|
167 |
|
upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> |
168 |
:-( |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
169 |
:-( |
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), |
170 |
:-( |
[Insert, OnConflict]. |
171 |
|
|
172 |
|
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
173 |
1340 |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
174 |
1340 |
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), |
175 |
1340 |
WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), |
176 |
1340 |
[Insert, OnConflict, WhereIncrements]. |
177 |
|
|
178 |
|
mysql_on_conflict(_Table, [], Key, _) -> |
179 |
|
%% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL |
180 |
:-( |
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; |
181 |
|
mysql_on_conflict(_Table, UpdateFields, _, none) -> |
182 |
:-( |
[" ON DUPLICATE KEY UPDATE ", |
183 |
|
update_fields_on_conflict(UpdateFields)]; |
184 |
|
mysql_on_conflict(Table, UpdateFields, _, IncrementalField) -> |
185 |
:-( |
TableName = atom_to_list(Table), |
186 |
:-( |
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField) |
187 |
:-( |
|| Update <- UpdateFields], |
188 |
:-( |
IncrUpdates = join(FieldsWithPlaceHolders, ", "), |
189 |
:-( |
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates]. |
190 |
|
|
191 |
|
mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) -> |
192 |
:-( |
Alternatives = case UpdateField of |
193 |
|
{Op, Column, Expression} when Op =:= assignment; Op =:= expression -> |
194 |
:-( |
[Expression, ", ", TableName, ".", Column, ")"]; |
195 |
|
Column -> |
196 |
:-( |
["? , ", TableName, ".", Column, ")"] |
197 |
|
end, |
198 |
:-( |
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", " |
199 |
|
| Alternatives]. |
200 |
|
|
201 |
|
pgsql_on_conflict([], UniqueKeyFields) -> |
202 |
4 |
JoinedKeys = join(UniqueKeyFields, ", "), |
203 |
4 |
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"]; |
204 |
|
pgsql_on_conflict(UpdateFields, UniqueKeyFields) -> |
205 |
1336 |
JoinedKeys = join(UniqueKeyFields, ", "), |
206 |
1336 |
[" ON CONFLICT (", JoinedKeys, ")" |
207 |
|
" DO UPDATE SET ", |
208 |
|
update_fields_on_conflict(UpdateFields)]. |
209 |
|
|
210 |
|
update_fields_on_conflict(Updates) -> |
211 |
1336 |
FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates], |
212 |
1336 |
join(FieldsWithPlaceHolders, ", "). |
213 |
|
|
214 |
|
pgsql_ensure_increments(_Table, none) -> |
215 |
1241 |
[]; |
216 |
|
pgsql_ensure_increments(Table, IncrementalField) -> |
217 |
99 |
TableName = atom_to_list(Table), |
218 |
99 |
[" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField]. |
219 |
|
|
220 |
|
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> |
221 |
:-( |
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], |
222 |
:-( |
BinTab = atom_to_binary(Table, utf8), |
223 |
:-( |
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields], |
224 |
:-( |
JoinedInsertFields = join(InsertFields, ", "), |
225 |
:-( |
Placeholders = lists:duplicate(length(InsertFields), $?), |
226 |
:-( |
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)" |
227 |
|
" USING (SELECT ", join(UniqueKeysInSelect, ", "), ")" |
228 |
|
" AS source (", join(UniqueKeyFields, ", "), ")" |
229 |
|
" ON (", join(UniqueConstraint, " AND "), ")" |
230 |
|
" WHEN NOT MATCHED THEN INSERT" |
231 |
|
" (", JoinedInsertFields, ")" |
232 |
|
" VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)]. |
233 |
|
|
234 |
:-( |
mssql_on_conflict([]) -> ";"; |
235 |
|
mssql_on_conflict(Updates) -> |
236 |
:-( |
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. |
237 |
|
|
238 |
|
get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression -> |
239 |
135 |
[ColumnName, " = ", Expr]; |
240 |
|
get_field_expression(Field) when is_binary(Field) -> |
241 |
9166 |
[Field, " = ?"]. |
242 |
|
|
243 |
|
get_field_name({assignment, Field, _}) when is_binary(Field) -> |
244 |
:-( |
false; |
245 |
|
get_field_name({expression, Field, _}) when is_binary(Field) -> |
246 |
135 |
{true, Field}; |
247 |
|
get_field_name(Field) when is_binary(Field) -> |
248 |
9166 |
true. |
249 |
|
|
250 |
|
%% F can be either a fun or a list of queries |
251 |
|
%% TODO: We should probably move the list of queries transaction |
252 |
|
%% wrapper from the mongoose_rdbms module to this one (rdbms_queries) |
253 |
|
sql_transaction(LServer, F) -> |
254 |
359 |
mongoose_rdbms:sql_transaction(LServer, F). |
255 |
|
|
256 |
|
begin_trans() -> |
257 |
11748 |
begin_trans(?RDBMS_TYPE). |
258 |
|
|
259 |
|
begin_trans(mssql) -> |
260 |
:-( |
rdbms_queries_mssql:begin_trans(); |
261 |
|
begin_trans(_) -> |
262 |
11748 |
[<<"BEGIN;">>]. |
263 |
|
|
264 |
|
%% Count number of records in a table given a where clause |
265 |
|
count_records_where(LServer, Table, WhereClause) -> |
266 |
:-( |
mongoose_rdbms:sql_query( |
267 |
|
LServer, |
268 |
|
[<<"select count(*) from ">>, Table, " ", WhereClause, ";"]). |
269 |
|
|
270 |
|
-spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()], |
271 |
|
RowsNum :: pos_integer()) -> |
272 |
|
{iodata(), [binary()]}. |
273 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) -> |
274 |
222 |
create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum); |
275 |
|
create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) -> |
276 |
222 |
create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum); |
277 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 -> |
278 |
222 |
JoinedFields = join(Fields, <<", ">>), |
279 |
222 |
Placeholders = lists:duplicate(length(Fields), <<"?">>), |
280 |
222 |
PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>], |
281 |
222 |
PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet), |
282 |
222 |
JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>), |
283 |
222 |
Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") " |
284 |
|
"VALUES ">>, JoinedPlaceholderSets, <<";">>], |
285 |
222 |
Fields2 = lists:append(lists:duplicate(RowsNum, Fields)), |
286 |
222 |
{Sql, Fields2}. |
287 |
|
|
288 |
|
get_db_specific_limits() -> |
289 |
591 |
do_get_db_specific_limits(?RDBMS_TYPE, "?", true). |
290 |
|
|
291 |
|
get_db_specific_limits_binaries() -> |
292 |
559 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(), |
293 |
559 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
294 |
|
|
295 |
|
-spec get_db_specific_limits(integer()) |
296 |
|
-> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}. |
297 |
|
get_db_specific_limits(Limit) -> |
298 |
100 |
LimitStr = integer_to_list(Limit), |
299 |
100 |
do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false). |
300 |
|
|
301 |
|
-spec get_db_specific_offset(integer(), integer()) -> iolist(). |
302 |
|
get_db_specific_offset(Offset, Limit) -> |
303 |
:-( |
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)). |
304 |
|
|
305 |
|
|
306 |
|
%% Arguments: |
307 |
|
%% - Type (atom) - database type |
308 |
|
%% - LimitStr (string) - a field value |
309 |
|
%% - Wrap (boolean) - add parentheses around a field for MSSQL |
310 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) -> |
311 |
:-( |
{"", "TOP " ++ LimitStr}; |
312 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) -> |
313 |
:-( |
{"", "TOP (" ++ LimitStr ++ ")"}; |
314 |
|
do_get_db_specific_limits(_, LimitStr, _Wrap) -> |
315 |
691 |
{"LIMIT " ++ LimitStr, ""}. |
316 |
|
|
317 |
|
do_get_db_specific_offset(mssql, Offset, Limit) -> |
318 |
:-( |
[" OFFSET ", Offset, " ROWS" |
319 |
|
" FETCH NEXT ", Limit, " ROWS ONLY"]; |
320 |
|
do_get_db_specific_offset(_, Offset, _Limit) -> |
321 |
:-( |
[" OFFSET ", Offset]. |
322 |
|
|
323 |
|
add_limit_arg(Limit, Args) -> |
324 |
1879 |
add_limit_arg(?RDBMS_TYPE, Limit, Args). |
325 |
|
|
326 |
|
add_limit_arg(mssql, Limit, Args) -> |
327 |
:-( |
[Limit|Args]; |
328 |
|
add_limit_arg(_, Limit, Args) -> |
329 |
1879 |
Args ++ [Limit]. |
330 |
|
|
331 |
|
get_db_specific_limits_binaries(Limit) -> |
332 |
100 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit), |
333 |
100 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
334 |
|
|
335 |
|
limit_offset_sql() -> |
336 |
219 |
limit_offset_sql(?RDBMS_TYPE). |
337 |
|
|
338 |
|
limit_offset_sql(mssql) -> |
339 |
:-( |
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>; |
340 |
|
limit_offset_sql(_) -> |
341 |
219 |
<<" LIMIT ? OFFSET ?">>. |
342 |
|
|
343 |
|
limit_offset_args(Limit, Offset) -> |
344 |
69 |
limit_offset_args(?RDBMS_TYPE, Limit, Offset). |
345 |
|
|
346 |
:-( |
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit]; |
347 |
69 |
limit_offset_args(_, Limit, Offset) -> [Limit, Offset]. |