1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : rdbms_queries.erl |
3 |
|
%%% Author : Mickael Remond <mremond@process-one.net> |
4 |
|
%%% Purpose : RDBMS queries dependind on back-end |
5 |
|
%%% Created : by Mickael Remond <mremond@process-one.net> |
6 |
|
%%% |
7 |
|
%%% |
8 |
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne |
9 |
|
%%% |
10 |
|
%%% This program is free software; you can redistribute it and/or |
11 |
|
%%% modify it under the terms of the GNU General Public License as |
12 |
|
%%% published by the Free Software Foundation; either version 2 of the |
13 |
|
%%% License, or (at your option) any later version. |
14 |
|
%%% |
15 |
|
%%% This program is distributed in the hope that it will be useful, |
16 |
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 |
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
18 |
|
%%% General Public License for more details. |
19 |
|
%%% |
20 |
|
%%% You should have received a copy of the GNU General Public License |
21 |
|
%%% along with this program; if not, write to the Free Software |
22 |
|
%%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
23 |
|
%%% |
24 |
|
%%%---------------------------------------------------------------------- |
25 |
|
|
26 |
|
-module(rdbms_queries). |
27 |
|
-author("mremond@process-one.net"). |
28 |
|
|
29 |
|
-export([get_db_type/0, |
30 |
|
begin_trans/0, |
31 |
|
get_db_specific_limits/0, |
32 |
|
get_db_specific_limits/1, |
33 |
|
get_db_specific_limits_binaries/0, |
34 |
|
get_db_specific_limits_binaries/1, |
35 |
|
get_db_specific_offset/2, |
36 |
|
add_limit_arg/2, |
37 |
|
limit_offset_sql/0, |
38 |
|
limit_offset_args/2, |
39 |
|
sql_transaction/2, |
40 |
|
count_records_where/3, |
41 |
|
create_bulk_insert_query/3]). |
42 |
|
|
43 |
|
-export([join/2, |
44 |
|
prepare_upsert/6, |
45 |
|
prepare_upsert/7, |
46 |
|
execute_upsert/5, |
47 |
|
request_upsert/5]). |
48 |
|
|
49 |
|
-ignore_xref([ |
50 |
|
count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 |
51 |
|
]). |
52 |
|
|
53 |
|
%% We have only two compile time options for db queries: |
54 |
|
%%-define(generic, true). |
55 |
|
%%-define(mssql, true). |
56 |
|
-ifndef(mssql). |
57 |
|
-undef(generic). |
58 |
|
-endif. |
59 |
|
|
60 |
|
-define(RDBMS_TYPE, (mongoose_rdbms:db_type())). |
61 |
|
|
62 |
|
-include("mongoose.hrl"). |
63 |
|
|
64 |
|
-ifdef(gen_server_request_id). |
65 |
|
-type request_id() :: gen_server:request_id(). |
66 |
|
-else. |
67 |
|
-type request_id() :: term(). |
68 |
|
-endif. |
69 |
|
|
70 |
|
% |
71 |
|
%% ----------------- |
72 |
|
%% Common functions |
73 |
|
|
74 |
|
join([], _Sep) -> |
75 |
:-( |
[]; |
76 |
|
join([H|T], Sep) -> |
77 |
3497 |
[H, [[Sep, X] || X <- T]]. |
78 |
|
|
79 |
|
%% ----------------- |
80 |
|
%% Generic queries |
81 |
|
|
82 |
|
get_db_type() -> |
83 |
:-( |
?RDBMS_TYPE. |
84 |
|
|
85 |
|
-spec execute_upsert(Host :: mongoose_rdbms:server(), |
86 |
|
Name :: atom(), |
87 |
|
InsertParams :: [any()], |
88 |
|
UpdateParams :: [any()], |
89 |
|
UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). |
90 |
|
execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
91 |
1672 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
92 |
|
{mysql, _} -> |
93 |
1672 |
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); |
94 |
|
{pgsql, _} -> |
95 |
:-( |
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); |
96 |
|
{odbc, mssql} -> |
97 |
:-( |
mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
98 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
99 |
|
end. |
100 |
|
|
101 |
|
-spec request_upsert(Host :: mongoose_rdbms:server(), |
102 |
|
Name :: atom(), |
103 |
|
InsertParams :: [any()], |
104 |
|
UpdateParams :: [any()], |
105 |
|
UniqueKeyValues :: [any()]) -> request_id(). |
106 |
|
request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
107 |
475 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
108 |
|
{mysql, _} -> |
109 |
475 |
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams); |
110 |
|
{pgsql, _} -> |
111 |
:-( |
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams); |
112 |
|
{odbc, mssql} -> |
113 |
:-( |
mongoose_rdbms:execute_request(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
114 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
115 |
|
end. |
116 |
|
|
117 |
|
%% @doc |
118 |
|
%% This functions prepares query for inserting a row or updating it if already exists |
119 |
|
%% Updates can be either fields or literal expressions like "column = tab.column + 1" |
120 |
|
-spec prepare_upsert(Host :: mongoose_rdbms:server(), |
121 |
|
QueryName :: atom(), |
122 |
|
TableName :: atom(), |
123 |
|
InsertFields :: [binary()], |
124 |
|
Updates :: [binary() | {assignment | expression, binary(), binary()}], |
125 |
|
UniqueKeyFields :: [binary()]) -> |
126 |
|
{ok, QueryName :: atom()} | {error, already_exists}. |
127 |
|
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) -> |
128 |
760 |
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none). |
129 |
|
|
130 |
|
-spec prepare_upsert(Host :: mongoose_rdbms:server(), |
131 |
|
QueryName :: atom(), |
132 |
|
TableName :: atom(), |
133 |
|
InsertFields :: [ColumnName :: binary()], |
134 |
|
Updates :: [binary() | {assignment | expression, binary(), binary()}], |
135 |
|
UniqueKeyFields :: [binary()], |
136 |
|
IncrementalField :: none | binary()) -> |
137 |
|
{ok, QueryName :: atom()} | {error, already_exists}. |
138 |
|
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
139 |
865 |
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), |
140 |
865 |
Query = iolist_to_binary(SQL), |
141 |
865 |
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), |
142 |
865 |
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), |
143 |
865 |
mongoose_rdbms:prepare(Name, Table, Fields, Query). |
144 |
|
|
145 |
|
prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> |
146 |
865 |
UpdateFields = lists:filtermap(fun get_field_name/1, Updates), |
147 |
865 |
case mongoose_rdbms:db_type() of |
148 |
|
mssql -> |
149 |
:-( |
UniqueKeyFields ++ InsertFields ++ UpdateFields; |
150 |
865 |
_ -> InsertFields ++ UpdateFields |
151 |
|
end. |
152 |
|
|
153 |
|
upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
154 |
865 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
155 |
|
{mysql, _} -> |
156 |
865 |
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); |
157 |
|
{pgsql, _} -> |
158 |
:-( |
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); |
159 |
|
{odbc, mssql} -> |
160 |
:-( |
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields); |
161 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
162 |
|
end. |
163 |
|
|
164 |
|
mysql_and_pgsql_insert(Table, Columns) -> |
165 |
865 |
JoinedFields = join(Columns, <<", ">>), |
166 |
865 |
Placeholders = lists:duplicate(length(Columns), $?), |
167 |
865 |
["INSERT INTO ", atom_to_binary(Table, utf8), " (", |
168 |
|
JoinedFields, |
169 |
|
") VALUES (", |
170 |
|
join(Placeholders, ", "), |
171 |
|
")"]. |
172 |
|
|
173 |
|
upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> |
174 |
865 |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
175 |
865 |
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), |
176 |
865 |
[Insert, OnConflict]. |
177 |
|
|
178 |
|
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
179 |
:-( |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
180 |
:-( |
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), |
181 |
:-( |
WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), |
182 |
:-( |
[Insert, OnConflict, WhereIncrements]. |
183 |
|
|
184 |
|
mysql_on_conflict(_Table, [], Key, _) -> |
185 |
|
%% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL |
186 |
7 |
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; |
187 |
|
mysql_on_conflict(_Table, UpdateFields, _, none) -> |
188 |
753 |
[" ON DUPLICATE KEY UPDATE ", |
189 |
|
update_fields_on_conflict(UpdateFields)]; |
190 |
|
mysql_on_conflict(Table, UpdateFields, _, IncrementalField) -> |
191 |
105 |
TableName = atom_to_list(Table), |
192 |
105 |
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField) |
193 |
105 |
|| Update <- UpdateFields], |
194 |
105 |
IncrUpdates = join(FieldsWithPlaceHolders, ", "), |
195 |
105 |
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates]. |
196 |
|
|
197 |
|
mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) -> |
198 |
497 |
Alternatives = case UpdateField of |
199 |
|
{Op, Column, Expression} when Op =:= assignment; Op =:= expression -> |
200 |
144 |
[Expression, ", ", TableName, ".", Column, ")"]; |
201 |
|
Column -> |
202 |
353 |
["? , ", TableName, ".", Column, ")"] |
203 |
|
end, |
204 |
497 |
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", " |
205 |
|
| Alternatives]. |
206 |
|
|
207 |
|
pgsql_on_conflict([], UniqueKeyFields) -> |
208 |
:-( |
JoinedKeys = join(UniqueKeyFields, ", "), |
209 |
:-( |
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"]; |
210 |
|
pgsql_on_conflict(UpdateFields, UniqueKeyFields) -> |
211 |
:-( |
JoinedKeys = join(UniqueKeyFields, ", "), |
212 |
:-( |
[" ON CONFLICT (", JoinedKeys, ")" |
213 |
|
" DO UPDATE SET ", |
214 |
|
update_fields_on_conflict(UpdateFields)]. |
215 |
|
|
216 |
|
update_fields_on_conflict(Updates) -> |
217 |
753 |
FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates], |
218 |
753 |
join(FieldsWithPlaceHolders, ", "). |
219 |
|
|
220 |
|
pgsql_ensure_increments(_Table, none) -> |
221 |
:-( |
[]; |
222 |
|
pgsql_ensure_increments(Table, IncrementalField) -> |
223 |
:-( |
TableName = atom_to_list(Table), |
224 |
:-( |
[" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField]. |
225 |
|
|
226 |
|
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> |
227 |
:-( |
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], |
228 |
:-( |
BinTab = atom_to_binary(Table, utf8), |
229 |
:-( |
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields], |
230 |
:-( |
JoinedInsertFields = join(InsertFields, ", "), |
231 |
:-( |
Placeholders = lists:duplicate(length(InsertFields), $?), |
232 |
:-( |
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)" |
233 |
|
" USING (SELECT ", join(UniqueKeysInSelect, ", "), ")" |
234 |
|
" AS source (", join(UniqueKeyFields, ", "), ")" |
235 |
|
" ON (", join(UniqueConstraint, " AND "), ")" |
236 |
|
" WHEN NOT MATCHED THEN INSERT" |
237 |
|
" (", JoinedInsertFields, ")" |
238 |
|
" VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)]. |
239 |
|
|
240 |
:-( |
mssql_on_conflict([]) -> ";"; |
241 |
|
mssql_on_conflict(Updates) -> |
242 |
:-( |
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. |
243 |
|
|
244 |
|
get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression -> |
245 |
:-( |
[ColumnName, " = ", Expr]; |
246 |
|
get_field_expression(Field) when is_binary(Field) -> |
247 |
5214 |
[Field, " = ?"]. |
248 |
|
|
249 |
|
get_field_name({assignment, Field, _}) when is_binary(Field) -> |
250 |
:-( |
false; |
251 |
|
get_field_name({expression, Field, _}) when is_binary(Field) -> |
252 |
144 |
{true, Field}; |
253 |
|
get_field_name(Field) when is_binary(Field) -> |
254 |
5567 |
true. |
255 |
|
|
256 |
|
%% F can be either a fun or a list of queries |
257 |
|
%% TODO: We should probably move the list of queries transaction |
258 |
|
%% wrapper from the mongoose_rdbms module to this one (rdbms_queries) |
259 |
|
sql_transaction(LServer, F) -> |
260 |
456 |
mongoose_rdbms:sql_transaction(LServer, F). |
261 |
|
|
262 |
|
begin_trans() -> |
263 |
16377 |
begin_trans(?RDBMS_TYPE). |
264 |
|
|
265 |
|
begin_trans(mssql) -> |
266 |
:-( |
rdbms_queries_mssql:begin_trans(); |
267 |
|
begin_trans(_) -> |
268 |
16377 |
[<<"BEGIN;">>]. |
269 |
|
|
270 |
|
%% Count number of records in a table given a where clause |
271 |
|
count_records_where(LServer, Table, WhereClause) -> |
272 |
:-( |
mongoose_rdbms:sql_query( |
273 |
|
LServer, |
274 |
|
[<<"select count(*) from ">>, Table, " ", WhereClause, ";"]). |
275 |
|
|
276 |
|
-spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()], |
277 |
|
RowsNum :: pos_integer()) -> |
278 |
|
{iodata(), [binary()]}. |
279 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) -> |
280 |
285 |
create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum); |
281 |
|
create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) -> |
282 |
285 |
create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum); |
283 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 -> |
284 |
285 |
JoinedFields = join(Fields, <<", ">>), |
285 |
285 |
Placeholders = lists:duplicate(length(Fields), <<"?">>), |
286 |
285 |
PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>], |
287 |
285 |
PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet), |
288 |
285 |
JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>), |
289 |
285 |
Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") " |
290 |
|
"VALUES ">>, JoinedPlaceholderSets, <<";">>], |
291 |
285 |
Fields2 = lists:append(lists:duplicate(RowsNum, Fields)), |
292 |
285 |
{Sql, Fields2}. |
293 |
|
|
294 |
|
get_db_specific_limits() -> |
295 |
228 |
do_get_db_specific_limits(?RDBMS_TYPE, "?", true). |
296 |
|
|
297 |
|
get_db_specific_limits_binaries() -> |
298 |
191 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(), |
299 |
191 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
300 |
|
|
301 |
|
-spec get_db_specific_limits(integer()) |
302 |
|
-> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}. |
303 |
|
get_db_specific_limits(Limit) -> |
304 |
148 |
LimitStr = integer_to_list(Limit), |
305 |
148 |
do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false). |
306 |
|
|
307 |
|
-spec get_db_specific_offset(integer(), integer()) -> iolist(). |
308 |
|
get_db_specific_offset(Offset, Limit) -> |
309 |
:-( |
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)). |
310 |
|
|
311 |
|
|
312 |
|
%% Arguments: |
313 |
|
%% - Type (atom) - database type |
314 |
|
%% - LimitStr (string) - a field value |
315 |
|
%% - Wrap (boolean) - add parentheses around a field for MSSQL |
316 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) -> |
317 |
:-( |
{"", "TOP " ++ LimitStr}; |
318 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) -> |
319 |
:-( |
{"", "TOP (" ++ LimitStr ++ ")"}; |
320 |
|
do_get_db_specific_limits(_, LimitStr, _Wrap) -> |
321 |
376 |
{"LIMIT " ++ LimitStr, ""}. |
322 |
|
|
323 |
|
do_get_db_specific_offset(mssql, Offset, Limit) -> |
324 |
:-( |
[" OFFSET ", Offset, " ROWS" |
325 |
|
" FETCH NEXT ", Limit, " ROWS ONLY"]; |
326 |
|
do_get_db_specific_offset(_, Offset, _Limit) -> |
327 |
:-( |
[" OFFSET ", Offset]. |
328 |
|
|
329 |
|
add_limit_arg(Limit, Args) -> |
330 |
1757 |
add_limit_arg(?RDBMS_TYPE, Limit, Args). |
331 |
|
|
332 |
|
add_limit_arg(mssql, Limit, Args) -> |
333 |
:-( |
[Limit|Args]; |
334 |
|
add_limit_arg(_, Limit, Args) -> |
335 |
1757 |
Args ++ [Limit]. |
336 |
|
|
337 |
|
get_db_specific_limits_binaries(Limit) -> |
338 |
148 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit), |
339 |
148 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
340 |
|
|
341 |
|
limit_offset_sql() -> |
342 |
83 |
limit_offset_sql(?RDBMS_TYPE). |
343 |
|
|
344 |
|
limit_offset_sql(mssql) -> |
345 |
:-( |
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>; |
346 |
|
limit_offset_sql(_) -> |
347 |
83 |
<<" LIMIT ? OFFSET ?">>. |
348 |
|
|
349 |
|
limit_offset_args(Limit, Offset) -> |
350 |
27 |
limit_offset_args(?RDBMS_TYPE, Limit, Offset). |
351 |
|
|
352 |
:-( |
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit]; |
353 |
27 |
limit_offset_args(_, Limit, Offset) -> [Limit, Offset]. |