1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : rdbms_queries.erl |
3 |
|
%%% Author : Mickael Remond <mremond@process-one.net> |
4 |
|
%%% Purpose : RDBMS queries dependind on back-end |
5 |
|
%%% Created : by Mickael Remond <mremond@process-one.net> |
6 |
|
%%% |
7 |
|
%%% |
8 |
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne |
9 |
|
%%% |
10 |
|
%%% This program is free software; you can redistribute it and/or |
11 |
|
%%% modify it under the terms of the GNU General Public License as |
12 |
|
%%% published by the Free Software Foundation; either version 2 of the |
13 |
|
%%% License, or (at your option) any later version. |
14 |
|
%%% |
15 |
|
%%% This program is distributed in the hope that it will be useful, |
16 |
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 |
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
18 |
|
%%% General Public License for more details. |
19 |
|
%%% |
20 |
|
%%% You should have received a copy of the GNU General Public License |
21 |
|
%%% along with this program; if not, write to the Free Software |
22 |
|
%%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
23 |
|
%%% |
24 |
|
%%%---------------------------------------------------------------------- |
25 |
|
|
26 |
|
-module(rdbms_queries). |
27 |
|
-author("mremond@process-one.net"). |
28 |
|
|
29 |
|
-export([get_db_type/0, |
30 |
|
begin_trans/0, |
31 |
|
get_db_specific_limits/0, |
32 |
|
get_db_specific_limits/1, |
33 |
|
get_db_specific_limits_binaries/0, |
34 |
|
get_db_specific_limits_binaries/1, |
35 |
|
get_db_specific_offset/2, |
36 |
|
add_limit_arg/2, |
37 |
|
limit_offset_sql/0, |
38 |
|
limit_offset_args/2, |
39 |
|
sql_transaction/2, |
40 |
|
count_records_where/3, |
41 |
|
create_bulk_insert_query/3]). |
42 |
|
|
43 |
|
-export([join/2, |
44 |
|
prepare_upsert/6, |
45 |
|
prepare_upsert/7, |
46 |
|
execute_upsert/5, execute_upsert/6, |
47 |
|
request_upsert/5]). |
48 |
|
|
49 |
|
-ignore_xref([ |
50 |
|
execute_upsert/6, count_records_where/3, |
51 |
|
get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 |
52 |
|
]). |
53 |
|
|
54 |
|
%% We have only two compile time options for db queries: |
55 |
|
%%-define(generic, true). |
56 |
|
%%-define(mssql, true). |
57 |
|
-ifndef(mssql). |
58 |
|
-undef(generic). |
59 |
|
-endif. |
60 |
|
|
61 |
|
-define(RDBMS_TYPE, (mongoose_rdbms:db_type())). |
62 |
|
|
63 |
|
-include("mongoose.hrl"). |
64 |
|
|
65 |
|
-ifdef(gen_server_request_id). |
66 |
|
-type request_id() :: gen_server:request_id(). |
67 |
|
-else. |
68 |
|
-type request_id() :: term(). |
69 |
|
-endif. |
70 |
|
|
71 |
|
% |
72 |
|
%% ----------------- |
73 |
|
%% Common functions |
74 |
|
|
75 |
|
join([], _Sep) -> |
76 |
:-( |
[]; |
77 |
|
join([H|T], Sep) -> |
78 |
13134 |
[H, [[Sep, X] || X <- T]]. |
79 |
|
|
80 |
|
%% ----------------- |
81 |
|
%% Generic queries |
82 |
|
|
83 |
|
get_db_type() -> |
84 |
:-( |
?RDBMS_TYPE. |
85 |
|
|
86 |
|
-spec execute_upsert(HostType :: mongooseim:host_type_or_global(), |
87 |
|
Name :: atom(), |
88 |
|
InsertParams :: [any()], |
89 |
|
UpdateParams :: [any()], |
90 |
|
UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). |
91 |
|
execute_upsert(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
92 |
2304 |
execute_upsert(HostType, default, Name, InsertParams, UpdateParams, UniqueKeyValues). |
93 |
|
|
94 |
|
-spec execute_upsert(HostType :: mongooseim:host_type_or_global(), |
95 |
|
PoolTag :: mongoose_wpool:tag(), |
96 |
|
Name :: atom(), |
97 |
|
InsertParams :: [any()], |
98 |
|
UpdateParams :: [any()], |
99 |
|
UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). |
100 |
|
execute_upsert(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
101 |
2304 |
case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of |
102 |
|
{mysql, _} -> |
103 |
:-( |
mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); |
104 |
|
{pgsql, _} -> |
105 |
:-( |
mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams); |
106 |
|
{odbc, mssql} -> |
107 |
2304 |
mongoose_rdbms:execute(HostType, PoolTag, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
108 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
109 |
|
end. |
110 |
|
|
111 |
|
-spec request_upsert(HostType :: mongooseim:host_type_or_global(), |
112 |
|
Name :: atom(), |
113 |
|
InsertParams :: [any()], |
114 |
|
UpdateParams :: [any()], |
115 |
|
UniqueKeyValues :: [any()]) -> request_id(). |
116 |
|
request_upsert(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
117 |
476 |
case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of |
118 |
|
{mysql, _} -> |
119 |
:-( |
mongoose_rdbms:execute_request(HostType, Name, InsertParams ++ UpdateParams); |
120 |
|
{pgsql, _} -> |
121 |
:-( |
mongoose_rdbms:execute_request(HostType, Name, InsertParams ++ UpdateParams); |
122 |
|
{odbc, mssql} -> |
123 |
476 |
mongoose_rdbms:execute_request(HostType, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
124 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
125 |
|
end. |
126 |
|
|
127 |
|
%% @doc |
128 |
|
%% This functions prepares query for inserting a row or updating it if already exists |
129 |
|
%% Updates can be either fields or literal expressions like "column = tab.column + 1" |
130 |
|
-spec prepare_upsert(HostType :: mongooseim:host_type_or_global(), |
131 |
|
QueryName :: mongoose_rdbms:query_name(), |
132 |
|
TableName :: atom(), |
133 |
|
InsertFields :: [binary()], |
134 |
|
Updates :: [binary() | {assignment | expression, binary(), binary()}], |
135 |
|
UniqueKeyFields :: [binary()]) -> |
136 |
|
{ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. |
137 |
|
prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields) -> |
138 |
1916 |
prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields, none). |
139 |
|
|
140 |
|
-spec prepare_upsert(HostType :: mongooseim:host_type_or_global(), |
141 |
|
QueryName :: mongoose_rdbms:query_name(), |
142 |
|
TableName :: atom(), |
143 |
|
InsertFields :: [ColumnName :: binary()], |
144 |
|
Updates :: [binary() | {assignment | expression, binary(), binary()}], |
145 |
|
UniqueKeyFields :: [binary()], |
146 |
|
IncrementalField :: none | binary()) -> |
147 |
|
{ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}. |
148 |
|
prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
149 |
2033 |
SQL = upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField), |
150 |
2033 |
Query = iolist_to_binary(SQL), |
151 |
2033 |
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), |
152 |
2033 |
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), |
153 |
2033 |
mongoose_rdbms:prepare(Name, Table, Fields, Query). |
154 |
|
|
155 |
|
prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> |
156 |
2033 |
UpdateFields = lists:filtermap(fun get_field_name/1, Updates), |
157 |
2033 |
case mongoose_rdbms:db_type() of |
158 |
|
mssql -> |
159 |
2033 |
UniqueKeyFields ++ InsertFields ++ UpdateFields; |
160 |
:-( |
_ -> InsertFields ++ UpdateFields |
161 |
|
end. |
162 |
|
|
163 |
|
upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
164 |
2033 |
case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of |
165 |
|
{mysql, _} -> |
166 |
:-( |
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); |
167 |
|
{pgsql, _} -> |
168 |
:-( |
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField); |
169 |
|
{odbc, mssql} -> |
170 |
2033 |
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields); |
171 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
172 |
|
end. |
173 |
|
|
174 |
|
mysql_and_pgsql_insert(Table, Columns) -> |
175 |
:-( |
JoinedFields = join(Columns, <<", ">>), |
176 |
:-( |
Placeholders = lists:duplicate(length(Columns), $?), |
177 |
:-( |
["INSERT INTO ", atom_to_binary(Table, utf8), " (", |
178 |
|
JoinedFields, |
179 |
|
") VALUES (", |
180 |
|
join(Placeholders, ", "), |
181 |
|
")"]. |
182 |
|
|
183 |
|
upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) -> |
184 |
:-( |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
185 |
:-( |
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField), |
186 |
:-( |
[Insert, OnConflict]. |
187 |
|
|
188 |
|
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) -> |
189 |
:-( |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
190 |
:-( |
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), |
191 |
:-( |
WhereIncrements = pgsql_ensure_increments(Table, IncrementalField), |
192 |
:-( |
[Insert, OnConflict, WhereIncrements]. |
193 |
|
|
194 |
|
mysql_on_conflict(_Table, [], Key, _) -> |
195 |
|
%% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL |
196 |
:-( |
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; |
197 |
|
mysql_on_conflict(_Table, UpdateFields, _, none) -> |
198 |
:-( |
[" ON DUPLICATE KEY UPDATE ", |
199 |
|
update_fields_on_conflict(UpdateFields)]; |
200 |
|
mysql_on_conflict(Table, UpdateFields, _, IncrementalField) -> |
201 |
:-( |
TableName = atom_to_list(Table), |
202 |
:-( |
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField) |
203 |
:-( |
|| Update <- UpdateFields], |
204 |
:-( |
IncrUpdates = join(FieldsWithPlaceHolders, ", "), |
205 |
:-( |
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates]. |
206 |
|
|
207 |
|
mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) -> |
208 |
:-( |
Alternatives = case UpdateField of |
209 |
|
{Op, Column, Expression} when Op =:= assignment; Op =:= expression -> |
210 |
:-( |
[Expression, ", ", TableName, ".", Column, ")"]; |
211 |
|
Column -> |
212 |
:-( |
["? , ", TableName, ".", Column, ")"] |
213 |
|
end, |
214 |
:-( |
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", " |
215 |
|
| Alternatives]. |
216 |
|
|
217 |
|
pgsql_on_conflict([], UniqueKeyFields) -> |
218 |
:-( |
JoinedKeys = join(UniqueKeyFields, ", "), |
219 |
:-( |
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"]; |
220 |
|
pgsql_on_conflict(UpdateFields, UniqueKeyFields) -> |
221 |
:-( |
JoinedKeys = join(UniqueKeyFields, ", "), |
222 |
:-( |
[" ON CONFLICT (", JoinedKeys, ")" |
223 |
|
" DO UPDATE SET ", |
224 |
|
update_fields_on_conflict(UpdateFields)]. |
225 |
|
|
226 |
|
update_fields_on_conflict(Updates) -> |
227 |
2026 |
FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates], |
228 |
2026 |
join(FieldsWithPlaceHolders, ", "). |
229 |
|
|
230 |
|
pgsql_ensure_increments(_Table, none) -> |
231 |
:-( |
[]; |
232 |
|
pgsql_ensure_increments(Table, IncrementalField) -> |
233 |
:-( |
TableName = atom_to_list(Table), |
234 |
:-( |
[" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField]. |
235 |
|
|
236 |
|
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> |
237 |
2033 |
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], |
238 |
2033 |
BinTab = atom_to_binary(Table, utf8), |
239 |
2033 |
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields], |
240 |
2033 |
JoinedInsertFields = join(InsertFields, ", "), |
241 |
2033 |
Placeholders = lists:duplicate(length(InsertFields), $?), |
242 |
2033 |
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)" |
243 |
|
" USING (SELECT ", join(UniqueKeysInSelect, ", "), ")" |
244 |
|
" AS source (", join(UniqueKeyFields, ", "), ")" |
245 |
|
" ON (", join(UniqueConstraint, " AND "), ")" |
246 |
|
" WHEN NOT MATCHED THEN INSERT" |
247 |
|
" (", JoinedInsertFields, ")" |
248 |
|
" VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)]. |
249 |
|
|
250 |
7 |
mssql_on_conflict([]) -> ";"; |
251 |
|
mssql_on_conflict(Updates) -> |
252 |
2026 |
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. |
253 |
|
|
254 |
|
get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression -> |
255 |
162 |
[ColumnName, " = ", Expr]; |
256 |
|
get_field_expression(Field) when is_binary(Field) -> |
257 |
13312 |
[Field, " = ?"]. |
258 |
|
|
259 |
|
get_field_name({assignment, Field, _}) when is_binary(Field) -> |
260 |
:-( |
false; |
261 |
|
get_field_name({expression, Field, _}) when is_binary(Field) -> |
262 |
162 |
{true, Field}; |
263 |
|
get_field_name(Field) when is_binary(Field) -> |
264 |
13312 |
true. |
265 |
|
|
266 |
|
%% F can be either a fun or a list of queries |
267 |
|
%% TODO: We should probably move the list of queries transaction |
268 |
|
%% wrapper from the mongoose_rdbms module to this one (rdbms_queries) |
269 |
|
sql_transaction(LServer, F) -> |
270 |
487 |
mongoose_rdbms:sql_transaction(LServer, F). |
271 |
|
|
272 |
|
begin_trans() -> |
273 |
19083 |
begin_trans(?RDBMS_TYPE). |
274 |
|
|
275 |
|
begin_trans(mssql) -> |
276 |
19083 |
rdbms_queries_mssql:begin_trans(); |
277 |
|
begin_trans(_) -> |
278 |
:-( |
[<<"BEGIN;">>]. |
279 |
|
|
280 |
|
%% Count number of records in a table given a where clause |
281 |
|
count_records_where(LServer, Table, WhereClause) -> |
282 |
:-( |
mongoose_rdbms:sql_query( |
283 |
|
LServer, |
284 |
|
[<<"select count(*) from ">>, Table, " ", WhereClause, ";"]). |
285 |
|
|
286 |
|
-spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()], |
287 |
|
RowsNum :: pos_integer()) -> |
288 |
|
{iodata(), [binary()]}. |
289 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) -> |
290 |
294 |
create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum); |
291 |
|
create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) -> |
292 |
294 |
create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum); |
293 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 -> |
294 |
294 |
JoinedFields = join(Fields, <<", ">>), |
295 |
294 |
Placeholders = lists:duplicate(length(Fields), <<"?">>), |
296 |
294 |
PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>], |
297 |
294 |
PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet), |
298 |
294 |
JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>), |
299 |
294 |
Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") " |
300 |
|
"VALUES ">>, JoinedPlaceholderSets, <<";">>], |
301 |
294 |
Fields2 = lists:append(lists:duplicate(RowsNum, Fields)), |
302 |
294 |
{Sql, Fields2}. |
303 |
|
|
304 |
|
get_db_specific_limits() -> |
305 |
717 |
do_get_db_specific_limits(?RDBMS_TYPE, "?", true). |
306 |
|
|
307 |
|
get_db_specific_limits_binaries() -> |
308 |
675 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(), |
309 |
675 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
310 |
|
|
311 |
|
-spec get_db_specific_limits(integer()) |
312 |
|
-> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}. |
313 |
|
get_db_specific_limits(Limit) -> |
314 |
148 |
LimitStr = integer_to_list(Limit), |
315 |
148 |
do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false). |
316 |
|
|
317 |
|
-spec get_db_specific_offset(integer(), integer()) -> iolist(). |
318 |
|
get_db_specific_offset(Offset, Limit) -> |
319 |
:-( |
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)). |
320 |
|
|
321 |
|
|
322 |
|
%% Arguments: |
323 |
|
%% - Type (atom) - database type |
324 |
|
%% - LimitStr (string) - a field value |
325 |
|
%% - Wrap (boolean) - add parentheses around a field for MSSQL |
326 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) -> |
327 |
148 |
{"", "TOP " ++ LimitStr}; |
328 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) -> |
329 |
717 |
{"", "TOP (" ++ LimitStr ++ ")"}; |
330 |
|
do_get_db_specific_limits(_, LimitStr, _Wrap) -> |
331 |
:-( |
{"LIMIT " ++ LimitStr, ""}. |
332 |
|
|
333 |
|
do_get_db_specific_offset(mssql, Offset, Limit) -> |
334 |
:-( |
[" OFFSET ", Offset, " ROWS" |
335 |
|
" FETCH NEXT ", Limit, " ROWS ONLY"]; |
336 |
|
do_get_db_specific_offset(_, Offset, _Limit) -> |
337 |
:-( |
[" OFFSET ", Offset]. |
338 |
|
|
339 |
|
add_limit_arg(Limit, Args) -> |
340 |
2458 |
add_limit_arg(?RDBMS_TYPE, Limit, Args). |
341 |
|
|
342 |
|
add_limit_arg(mssql, Limit, Args) -> |
343 |
2458 |
[Limit|Args]; |
344 |
|
add_limit_arg(_, Limit, Args) -> |
345 |
:-( |
Args ++ [Limit]. |
346 |
|
|
347 |
|
get_db_specific_limits_binaries(Limit) -> |
348 |
148 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit), |
349 |
148 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
350 |
|
|
351 |
|
limit_offset_sql() -> |
352 |
268 |
limit_offset_sql(?RDBMS_TYPE). |
353 |
|
|
354 |
|
limit_offset_sql(mssql) -> |
355 |
268 |
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>; |
356 |
|
limit_offset_sql(_) -> |
357 |
:-( |
<<" LIMIT ? OFFSET ?">>. |
358 |
|
|
359 |
|
limit_offset_args(Limit, Offset) -> |
360 |
27 |
limit_offset_args(?RDBMS_TYPE, Limit, Offset). |
361 |
|
|
362 |
27 |
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit]; |
363 |
:-( |
limit_offset_args(_, Limit, Offset) -> [Limit, Offset]. |