1 |
|
%%%---------------------------------------------------------------------- |
2 |
|
%%% File : rdbms_queries.erl |
3 |
|
%%% Author : Mickael Remond <mremond@process-one.net> |
4 |
|
%%% Purpose : RDBMS queries dependind on back-end |
5 |
|
%%% Created : by Mickael Remond <mremond@process-one.net> |
6 |
|
%%% |
7 |
|
%%% |
8 |
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne |
9 |
|
%%% |
10 |
|
%%% This program is free software; you can redistribute it and/or |
11 |
|
%%% modify it under the terms of the GNU General Public License as |
12 |
|
%%% published by the Free Software Foundation; either version 2 of the |
13 |
|
%%% License, or (at your option) any later version. |
14 |
|
%%% |
15 |
|
%%% This program is distributed in the hope that it will be useful, |
16 |
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of |
17 |
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
18 |
|
%%% General Public License for more details. |
19 |
|
%%% |
20 |
|
%%% You should have received a copy of the GNU General Public License |
21 |
|
%%% along with this program; if not, write to the Free Software |
22 |
|
%%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
23 |
|
%%% |
24 |
|
%%%---------------------------------------------------------------------- |
25 |
|
|
26 |
|
-module(rdbms_queries). |
27 |
|
-author("mremond@process-one.net"). |
28 |
|
|
29 |
|
-export([get_db_type/0, |
30 |
|
begin_trans/0, |
31 |
|
get_db_specific_limits/0, |
32 |
|
get_db_specific_limits/1, |
33 |
|
get_db_specific_limits_binaries/0, |
34 |
|
get_db_specific_limits_binaries/1, |
35 |
|
get_db_specific_offset/2, |
36 |
|
add_limit_arg/2, |
37 |
|
limit_offset_sql/0, |
38 |
|
limit_offset_args/2, |
39 |
|
sql_transaction/2, |
40 |
|
count_records_where/3, |
41 |
|
create_bulk_insert_query/3]). |
42 |
|
|
43 |
|
-export([join/2, |
44 |
|
prepare_upsert/6, |
45 |
|
execute_upsert/5]). |
46 |
|
|
47 |
|
-ignore_xref([ |
48 |
|
count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0 |
49 |
|
]). |
50 |
|
|
51 |
|
%% We have only two compile time options for db queries: |
52 |
|
%%-define(generic, true). |
53 |
|
%%-define(mssql, true). |
54 |
|
-ifndef(mssql). |
55 |
|
-undef(generic). |
56 |
|
-define(generic, true). |
57 |
|
-endif. |
58 |
|
|
59 |
|
-define(RDBMS_TYPE, (mongoose_rdbms:db_type())). |
60 |
|
|
61 |
|
-include("mongoose.hrl"). |
62 |
|
|
63 |
|
% |
64 |
|
%% ----------------- |
65 |
|
%% Common functions |
66 |
|
|
67 |
|
join([], _Sep) -> |
68 |
:-( |
[]; |
69 |
|
join([H|T], Sep) -> |
70 |
8216 |
[H, [[Sep, X] || X <- T]]. |
71 |
|
|
72 |
|
%% ----------------- |
73 |
|
%% Generic queries |
74 |
|
|
75 |
|
get_db_type() -> |
76 |
:-( |
?RDBMS_TYPE. |
77 |
|
|
78 |
|
-spec execute_upsert(Host :: mongoose_rdbms:server(), |
79 |
|
Name :: atom(), |
80 |
|
InsertParams :: [any()], |
81 |
|
UpdateParams :: [any()], |
82 |
|
UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result(). |
83 |
|
execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) -> |
84 |
964 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
85 |
|
{mysql, _} -> |
86 |
:-( |
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); |
87 |
|
{pgsql, _} -> |
88 |
:-( |
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams); |
89 |
|
{odbc, mssql} -> |
90 |
964 |
mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams); |
91 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
92 |
|
end. |
93 |
|
|
94 |
|
%% @doc |
95 |
|
%% This functions prepares query for inserting a row or updating it if already exists |
96 |
|
%% Updates can be either fields or literal expressions like "column = tab.column + 1" |
97 |
|
-spec prepare_upsert(Host :: mongoose_rdbms:server(), |
98 |
|
QueryName :: atom(), |
99 |
|
TableName :: atom(), |
100 |
|
InsertFields :: [binary()], |
101 |
|
Updates :: [binary() | {binary(), binary()}], |
102 |
|
UniqueKeyFields :: [binary()]) -> |
103 |
|
{ok, QueryName :: atom()} | {error, already_exists}. |
104 |
|
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) -> |
105 |
1249 |
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields), |
106 |
1249 |
Query = iolist_to_binary(SQL), |
107 |
1249 |
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}), |
108 |
1249 |
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields), |
109 |
1249 |
mongoose_rdbms:prepare(Name, Table, Fields, Query). |
110 |
|
|
111 |
|
prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) -> |
112 |
1249 |
UpdateFields = lists:filtermap(fun get_field_name/1, Updates), |
113 |
1249 |
case mongoose_rdbms:db_type() of |
114 |
|
mssql -> |
115 |
1249 |
UniqueKeyFields ++ InsertFields ++ UpdateFields; |
116 |
:-( |
_ -> InsertFields ++ UpdateFields |
117 |
|
end. |
118 |
|
|
119 |
|
upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields) -> |
120 |
1249 |
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of |
121 |
|
{mysql, _} -> |
122 |
:-( |
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields); |
123 |
|
{pgsql, _} -> |
124 |
:-( |
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields); |
125 |
|
{odbc, mssql} -> |
126 |
1249 |
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields); |
127 |
:-( |
NotSupported -> erlang:error({rdbms_not_supported, NotSupported}) |
128 |
|
end. |
129 |
|
|
130 |
|
mysql_and_pgsql_insert(Table, Columns) -> |
131 |
:-( |
JoinedFields = join(Columns, <<", ">>), |
132 |
:-( |
Placeholders = lists:duplicate(length(Columns), $?), |
133 |
:-( |
["INSERT INTO ", atom_to_binary(Table, utf8), " (", |
134 |
|
JoinedFields, |
135 |
|
") VALUES (", |
136 |
|
join(Placeholders, ", "), |
137 |
|
")"]. |
138 |
|
|
139 |
|
upsert_mysql_query(Table, InsertFields, Updates, [Key | _]) -> |
140 |
:-( |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
141 |
:-( |
OnConflict = mysql_on_conflict(Updates, Key), |
142 |
:-( |
[Insert, OnConflict]. |
143 |
|
|
144 |
|
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields) -> |
145 |
:-( |
Insert = mysql_and_pgsql_insert(Table, InsertFields), |
146 |
:-( |
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields), |
147 |
:-( |
[Insert, OnConflict]. |
148 |
|
|
149 |
|
mysql_on_conflict([], Key) -> |
150 |
|
%% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL |
151 |
:-( |
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key]; |
152 |
|
mysql_on_conflict(UpdateFields, _) -> |
153 |
:-( |
[" ON DUPLICATE KEY UPDATE ", |
154 |
|
update_fields_on_conflict(UpdateFields)]. |
155 |
|
|
156 |
|
pgsql_on_conflict([], UniqueKeyFields) -> |
157 |
:-( |
JoinedKeys = join(UniqueKeyFields, ", "), |
158 |
:-( |
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"]; |
159 |
|
pgsql_on_conflict(UpdateFields, UniqueKeyFields) -> |
160 |
:-( |
JoinedKeys = join(UniqueKeyFields, ", "), |
161 |
:-( |
[" ON CONFLICT (", JoinedKeys, ")" |
162 |
|
" DO UPDATE SET ", |
163 |
|
update_fields_on_conflict(UpdateFields)]. |
164 |
|
|
165 |
|
update_fields_on_conflict(Updates) -> |
166 |
1246 |
FieldsWithPlaceHolders = [update_field_expression(Update) || Update <- Updates], |
167 |
1246 |
join(FieldsWithPlaceHolders, ", "). |
168 |
|
|
169 |
|
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) -> |
170 |
1249 |
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields], |
171 |
1249 |
BinTab = atom_to_binary(Table, utf8), |
172 |
1249 |
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields], |
173 |
1249 |
JoinedInsertFields = join(InsertFields, ", "), |
174 |
1249 |
Placeholders = lists:duplicate(length(InsertFields), $?), |
175 |
1249 |
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)" |
176 |
|
" USING (SELECT ", join(UniqueKeysInSelect, ", "), ")" |
177 |
|
" AS source (", join(UniqueKeyFields, ", "), ")" |
178 |
|
" ON (", join(UniqueConstraint, " AND "), ")" |
179 |
|
" WHEN NOT MATCHED THEN INSERT" |
180 |
|
" (", JoinedInsertFields, ")" |
181 |
|
" VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)]. |
182 |
|
|
183 |
3 |
mssql_on_conflict([]) -> ";"; |
184 |
|
mssql_on_conflict(Updates) -> |
185 |
1246 |
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"]. |
186 |
|
|
187 |
|
update_field_expression(Update) -> |
188 |
8853 |
case get_field_expression(Update) of |
189 |
25 |
{true, Expr} -> Expr; |
190 |
8828 |
true -> [Update, " = ?"]; |
191 |
:-( |
false -> Update |
192 |
|
end. |
193 |
|
|
194 |
|
get_field_expression({_, FieldExpr}) -> |
195 |
25 |
case binary:match(FieldExpr, <<"=">>) of |
196 |
:-( |
nomatch -> false; |
197 |
25 |
_ -> {true, FieldExpr} |
198 |
|
end; |
199 |
|
get_field_expression(FieldExpr) -> |
200 |
8828 |
binary:match(FieldExpr, <<"=">>) =:= nomatch. |
201 |
|
|
202 |
|
get_field_name({Field, _}) -> |
203 |
25 |
case binary:match(Field, <<"=">>) of |
204 |
25 |
nomatch -> {true, Field}; |
205 |
:-( |
_ -> false |
206 |
|
end; |
207 |
|
get_field_name(FieldExpr) -> |
208 |
8828 |
binary:match(FieldExpr, <<"=">>) =:= nomatch. |
209 |
|
|
210 |
|
%% F can be either a fun or a list of queries |
211 |
|
%% TODO: We should probably move the list of queries transaction |
212 |
|
%% wrapper from the mongoose_rdbms module to this one (rdbms_queries) |
213 |
|
sql_transaction(LServer, F) -> |
214 |
368 |
mongoose_rdbms:sql_transaction(LServer, F). |
215 |
|
|
216 |
|
begin_trans() -> |
217 |
10023 |
begin_trans(?RDBMS_TYPE). |
218 |
|
|
219 |
|
begin_trans(mssql) -> |
220 |
10023 |
rdbms_queries_mssql:begin_trans(); |
221 |
|
begin_trans(_) -> |
222 |
:-( |
[<<"BEGIN;">>]. |
223 |
|
|
224 |
|
%% Count number of records in a table given a where clause |
225 |
|
count_records_where(LServer, Table, WhereClause) -> |
226 |
:-( |
mongoose_rdbms:sql_query( |
227 |
|
LServer, |
228 |
|
[<<"select count(*) from ">>, Table, " ", WhereClause, ";"]). |
229 |
|
|
230 |
|
-spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()], |
231 |
|
RowsNum :: pos_integer()) -> |
232 |
|
{iodata(), [binary()]}. |
233 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) -> |
234 |
225 |
create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum); |
235 |
|
create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) -> |
236 |
225 |
create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum); |
237 |
|
create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 -> |
238 |
225 |
JoinedFields = join(Fields, <<", ">>), |
239 |
225 |
Placeholders = lists:duplicate(length(Fields), <<"?">>), |
240 |
225 |
PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>], |
241 |
225 |
PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet), |
242 |
225 |
JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>), |
243 |
225 |
Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") " |
244 |
|
"VALUES ">>, JoinedPlaceholderSets, <<";">>], |
245 |
225 |
Fields2 = lists:append(lists:duplicate(RowsNum, Fields)), |
246 |
225 |
{Sql, Fields2}. |
247 |
|
|
248 |
|
get_db_specific_limits() -> |
249 |
568 |
do_get_db_specific_limits(?RDBMS_TYPE, "?", true). |
250 |
|
|
251 |
|
get_db_specific_limits_binaries() -> |
252 |
536 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(), |
253 |
536 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
254 |
|
|
255 |
|
-spec get_db_specific_limits(integer()) |
256 |
|
-> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}. |
257 |
|
get_db_specific_limits(Limit) -> |
258 |
93 |
LimitStr = integer_to_list(Limit), |
259 |
93 |
do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false). |
260 |
|
|
261 |
|
-spec get_db_specific_offset(integer(), integer()) -> iolist(). |
262 |
|
get_db_specific_offset(Offset, Limit) -> |
263 |
:-( |
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)). |
264 |
|
|
265 |
|
|
266 |
|
%% Arguments: |
267 |
|
%% - Type (atom) - database type |
268 |
|
%% - LimitStr (string) - a field value |
269 |
|
%% - Wrap (boolean) - add parentheses around a field for MSSQL |
270 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) -> |
271 |
93 |
{"", "TOP " ++ LimitStr}; |
272 |
|
do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) -> |
273 |
568 |
{"", "TOP (" ++ LimitStr ++ ")"}; |
274 |
|
do_get_db_specific_limits(_, LimitStr, _Wrap) -> |
275 |
:-( |
{"LIMIT " ++ LimitStr, ""}. |
276 |
|
|
277 |
|
do_get_db_specific_offset(mssql, Offset, Limit) -> |
278 |
:-( |
[" OFFSET ", Offset, " ROWS" |
279 |
|
" FETCH NEXT ", Limit, " ROWS ONLY"]; |
280 |
|
do_get_db_specific_offset(_, Offset, _Limit) -> |
281 |
:-( |
[" OFFSET ", Offset]. |
282 |
|
|
283 |
|
add_limit_arg(Limit, Args) -> |
284 |
1842 |
add_limit_arg(?RDBMS_TYPE, Limit, Args). |
285 |
|
|
286 |
|
add_limit_arg(mssql, Limit, Args) -> |
287 |
1842 |
[Limit|Args]; |
288 |
|
add_limit_arg(_, Limit, Args) -> |
289 |
:-( |
Args ++ [Limit]. |
290 |
|
|
291 |
|
get_db_specific_limits_binaries(Limit) -> |
292 |
93 |
{LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit), |
293 |
93 |
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}. |
294 |
|
|
295 |
|
limit_offset_sql() -> |
296 |
212 |
limit_offset_sql(?RDBMS_TYPE). |
297 |
|
|
298 |
|
limit_offset_sql(mssql) -> |
299 |
212 |
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>; |
300 |
|
limit_offset_sql(_) -> |
301 |
:-( |
<<" LIMIT ? OFFSET ?">>. |
302 |
|
|
303 |
|
limit_offset_args(Limit, Offset) -> |
304 |
72 |
limit_offset_args(?RDBMS_TYPE, Limit, Offset). |
305 |
|
|
306 |
72 |
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit]; |
307 |
:-( |
limit_offset_args(_, Limit, Offset) -> [Limit, Offset]. |