./ct_report/coverage/rdbms_queries.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : rdbms_queries.erl
3 %%% Author : Mickael Remond <mremond@process-one.net>
4 %%% Purpose : RDBMS queries dependind on back-end
5 %%% Created : by Mickael Remond <mremond@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(rdbms_queries).
27 -author("mremond@process-one.net").
28
29 -export([get_db_type/0,
30 begin_trans/0,
31 get_db_specific_limits/0,
32 get_db_specific_limits/1,
33 get_db_specific_limits_binaries/0,
34 get_db_specific_limits_binaries/1,
35 get_db_specific_offset/2,
36 add_limit_arg/2,
37 limit_offset_sql/0,
38 limit_offset_args/2,
39 sql_transaction/2,
40 count_records_where/3,
41 create_bulk_insert_query/3]).
42
43 -export([join/2,
44 prepare_upsert/6,
45 prepare_upsert/7,
46 execute_upsert/5,
47 request_upsert/5]).
48
49 -ignore_xref([
50 count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0
51 ]).
52
53 %% We have only two compile time options for db queries:
54 %%-define(generic, true).
55 %%-define(mssql, true).
56 -ifndef(mssql).
57 -undef(generic).
58 -endif.
59
60 -define(RDBMS_TYPE, (mongoose_rdbms:db_type())).
61
62 -include("mongoose.hrl").
63
64 %
65 %% -----------------
66 %% Common functions
67
68 join([], _Sep) ->
69
:-(
[];
70 join([H|T], Sep) ->
71
:-(
[H, [[Sep, X] || X <- T]].
72
73 %% -----------------
74 %% Generic queries
75
76 get_db_type() ->
77
:-(
?RDBMS_TYPE.
78
79 -spec execute_upsert(Host :: mongoose_rdbms:server(),
80 Name :: atom(),
81 InsertParams :: [any()],
82 UpdateParams :: [any()],
83 UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result().
84 execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
85
:-(
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
86 {mysql, _} ->
87
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
88 {pgsql, _} ->
89
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
90 {odbc, mssql} ->
91
:-(
mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
92
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
93 end.
94
95 -spec request_upsert(Host :: mongoose_rdbms:server(),
96 Name :: atom(),
97 InsertParams :: [any()],
98 UpdateParams :: [any()],
99 UniqueKeyValues :: [any()]) -> gen_server:request_id().
100 request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
101
:-(
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
102 {mysql, _} ->
103
:-(
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams);
104 {pgsql, _} ->
105
:-(
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams);
106 {odbc, mssql} ->
107
:-(
mongoose_rdbms:execute_request(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
108
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
109 end.
110
111 %% @doc
112 %% This functions prepares query for inserting a row or updating it if already exists
113 %% Updates can be either fields or literal expressions like "column = tab.column + 1"
114 -spec prepare_upsert(Host :: mongoose_rdbms:server(),
115 QueryName :: atom(),
116 TableName :: atom(),
117 InsertFields :: [binary()],
118 Updates :: [binary() | {assignment | expression, binary(), binary()}],
119 UniqueKeyFields :: [binary()]) ->
120 {ok, QueryName :: atom()} | {error, already_exists}.
121 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) ->
122
:-(
prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none).
123
124 -spec prepare_upsert(Host :: mongoose_rdbms:server(),
125 QueryName :: atom(),
126 TableName :: atom(),
127 InsertFields :: [ColumnName :: binary()],
128 Updates :: [binary() | {assignment | expression, binary(), binary()}],
129 UniqueKeyFields :: [binary()],
130 IncrementalField :: none | binary()) ->
131 {ok, QueryName :: atom()} | {error, already_exists}.
132 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
133
:-(
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField),
134
:-(
Query = iolist_to_binary(SQL),
135
:-(
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}),
136
:-(
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields),
137
:-(
mongoose_rdbms:prepare(Name, Table, Fields, Query).
138
139 prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) ->
140
:-(
UpdateFields = lists:filtermap(fun get_field_name/1, Updates),
141
:-(
case mongoose_rdbms:db_type() of
142 mssql ->
143
:-(
UniqueKeyFields ++ InsertFields ++ UpdateFields;
144
:-(
_ -> InsertFields ++ UpdateFields
145 end.
146
147 upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
148
:-(
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
149 {mysql, _} ->
150
:-(
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
151 {pgsql, _} ->
152
:-(
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
153 {odbc, mssql} ->
154
:-(
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields);
155
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
156 end.
157
158 mysql_and_pgsql_insert(Table, Columns) ->
159
:-(
JoinedFields = join(Columns, <<", ">>),
160
:-(
Placeholders = lists:duplicate(length(Columns), $?),
161
:-(
["INSERT INTO ", atom_to_binary(Table, utf8), " (",
162 JoinedFields,
163 ") VALUES (",
164 join(Placeholders, ", "),
165 ")"].
166
167 upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) ->
168
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
169
:-(
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField),
170
:-(
[Insert, OnConflict].
171
172 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
173
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
174
:-(
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields),
175
:-(
WhereIncrements = pgsql_ensure_increments(Table, IncrementalField),
176
:-(
[Insert, OnConflict, WhereIncrements].
177
178 mysql_on_conflict(_Table, [], Key, _) ->
179 %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL
180
:-(
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key];
181 mysql_on_conflict(_Table, UpdateFields, _, none) ->
182
:-(
[" ON DUPLICATE KEY UPDATE ",
183 update_fields_on_conflict(UpdateFields)];
184 mysql_on_conflict(Table, UpdateFields, _, IncrementalField) ->
185
:-(
TableName = atom_to_list(Table),
186
:-(
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField)
187
:-(
|| Update <- UpdateFields],
188
:-(
IncrUpdates = join(FieldsWithPlaceHolders, ", "),
189
:-(
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates].
190
191 mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) ->
192
:-(
Alternatives = case UpdateField of
193 {Op, Column, Expression} when Op =:= assignment; Op =:= expression ->
194
:-(
[Expression, ", ", TableName, ".", Column, ")"];
195 Column ->
196
:-(
["? , ", TableName, ".", Column, ")"]
197 end,
198
:-(
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", "
199 | Alternatives].
200
201 pgsql_on_conflict([], UniqueKeyFields) ->
202
:-(
JoinedKeys = join(UniqueKeyFields, ", "),
203
:-(
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"];
204 pgsql_on_conflict(UpdateFields, UniqueKeyFields) ->
205
:-(
JoinedKeys = join(UniqueKeyFields, ", "),
206
:-(
[" ON CONFLICT (", JoinedKeys, ")"
207 " DO UPDATE SET ",
208 update_fields_on_conflict(UpdateFields)].
209
210 update_fields_on_conflict(Updates) ->
211
:-(
FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates],
212
:-(
join(FieldsWithPlaceHolders, ", ").
213
214 pgsql_ensure_increments(_Table, none) ->
215
:-(
[];
216 pgsql_ensure_increments(Table, IncrementalField) ->
217
:-(
TableName = atom_to_list(Table),
218
:-(
[" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField].
219
220 upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
221
:-(
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields],
222
:-(
BinTab = atom_to_binary(Table, utf8),
223
:-(
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields],
224
:-(
JoinedInsertFields = join(InsertFields, ", "),
225
:-(
Placeholders = lists:duplicate(length(InsertFields), $?),
226
:-(
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)"
227 " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")"
228 " AS source (", join(UniqueKeyFields, ", "), ")"
229 " ON (", join(UniqueConstraint, " AND "), ")"
230 " WHEN NOT MATCHED THEN INSERT"
231 " (", JoinedInsertFields, ")"
232 " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)].
233
234
:-(
mssql_on_conflict([]) -> ";";
235 mssql_on_conflict(Updates) ->
236
:-(
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"].
237
238 get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression ->
239
:-(
[ColumnName, " = ", Expr];
240 get_field_expression(Field) when is_binary(Field) ->
241
:-(
[Field, " = ?"].
242
243 get_field_name({assignment, Field, _}) when is_binary(Field) ->
244
:-(
false;
245 get_field_name({expression, Field, _}) when is_binary(Field) ->
246
:-(
{true, Field};
247 get_field_name(Field) when is_binary(Field) ->
248
:-(
true.
249
250 %% F can be either a fun or a list of queries
251 %% TODO: We should probably move the list of queries transaction
252 %% wrapper from the mongoose_rdbms module to this one (rdbms_queries)
253 sql_transaction(LServer, F) ->
254
:-(
mongoose_rdbms:sql_transaction(LServer, F).
255
256 begin_trans() ->
257
:-(
begin_trans(?RDBMS_TYPE).
258
259 begin_trans(mssql) ->
260
:-(
rdbms_queries_mssql:begin_trans();
261 begin_trans(_) ->
262
:-(
[<<"BEGIN;">>].
263
264 %% Count number of records in a table given a where clause
265 count_records_where(LServer, Table, WhereClause) ->
266
:-(
mongoose_rdbms:sql_query(
267 LServer,
268 [<<"select count(*) from ">>, Table, " ", WhereClause, ";"]).
269
270 -spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()],
271 RowsNum :: pos_integer()) ->
272 {iodata(), [binary()]}.
273 create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) ->
274
:-(
create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum);
275 create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) ->
276
:-(
create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum);
277 create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 ->
278
:-(
JoinedFields = join(Fields, <<", ">>),
279
:-(
Placeholders = lists:duplicate(length(Fields), <<"?">>),
280
:-(
PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>],
281
:-(
PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet),
282
:-(
JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>),
283
:-(
Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") "
284 "VALUES ">>, JoinedPlaceholderSets, <<";">>],
285
:-(
Fields2 = lists:append(lists:duplicate(RowsNum, Fields)),
286
:-(
{Sql, Fields2}.
287
288 get_db_specific_limits() ->
289
:-(
do_get_db_specific_limits(?RDBMS_TYPE, "?", true).
290
291 get_db_specific_limits_binaries() ->
292
:-(
{LimitSQL, LimitMSSQL} = get_db_specific_limits(),
293
:-(
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
294
295 -spec get_db_specific_limits(integer())
296 -> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}.
297 get_db_specific_limits(Limit) ->
298
:-(
LimitStr = integer_to_list(Limit),
299
:-(
do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false).
300
301 -spec get_db_specific_offset(integer(), integer()) -> iolist().
302 get_db_specific_offset(Offset, Limit) ->
303
:-(
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)).
304
305
306 %% Arguments:
307 %% - Type (atom) - database type
308 %% - LimitStr (string) - a field value
309 %% - Wrap (boolean) - add parentheses around a field for MSSQL
310 do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) ->
311
:-(
{"", "TOP " ++ LimitStr};
312 do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) ->
313
:-(
{"", "TOP (" ++ LimitStr ++ ")"};
314 do_get_db_specific_limits(_, LimitStr, _Wrap) ->
315
:-(
{"LIMIT " ++ LimitStr, ""}.
316
317 do_get_db_specific_offset(mssql, Offset, Limit) ->
318
:-(
[" OFFSET ", Offset, " ROWS"
319 " FETCH NEXT ", Limit, " ROWS ONLY"];
320 do_get_db_specific_offset(_, Offset, _Limit) ->
321
:-(
[" OFFSET ", Offset].
322
323 add_limit_arg(Limit, Args) ->
324
:-(
add_limit_arg(?RDBMS_TYPE, Limit, Args).
325
326 add_limit_arg(mssql, Limit, Args) ->
327
:-(
[Limit|Args];
328 add_limit_arg(_, Limit, Args) ->
329
:-(
Args ++ [Limit].
330
331 get_db_specific_limits_binaries(Limit) ->
332
:-(
{LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit),
333
:-(
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
334
335 limit_offset_sql() ->
336
:-(
limit_offset_sql(?RDBMS_TYPE).
337
338 limit_offset_sql(mssql) ->
339
:-(
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>;
340 limit_offset_sql(_) ->
341
:-(
<<" LIMIT ? OFFSET ?">>.
342
343 limit_offset_args(Limit, Offset) ->
344
:-(
limit_offset_args(?RDBMS_TYPE, Limit, Offset).
345
346
:-(
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit];
347
:-(
limit_offset_args(_, Limit, Offset) -> [Limit, Offset].
Line Hits Source