./ct_report/coverage/rdbms_queries.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : rdbms_queries.erl
3 %%% Author : Mickael Remond <mremond@process-one.net>
4 %%% Purpose : RDBMS queries dependind on back-end
5 %%% Created : by Mickael Remond <mremond@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(rdbms_queries).
27 -author("mremond@process-one.net").
28
29 -export([get_db_type/0,
30 begin_trans/0,
31 get_db_specific_limits/0,
32 get_db_specific_limits/1,
33 get_db_specific_limits_binaries/0,
34 get_db_specific_limits_binaries/1,
35 get_db_specific_offset/2,
36 add_limit_arg/2,
37 limit_offset_sql/0,
38 limit_offset_args/2,
39 sql_transaction/2,
40 count_records_where/3,
41 create_bulk_insert_query/3]).
42
43 -export([join/2,
44 prepare_upsert/6,
45 prepare_upsert/7,
46 execute_upsert/5,
47 request_upsert/5]).
48
49 -ignore_xref([
50 count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0
51 ]).
52
53 %% We have only two compile time options for db queries:
54 %%-define(generic, true).
55 %%-define(mssql, true).
56 -ifndef(mssql).
57 -undef(generic).
58 -endif.
59
60 -define(RDBMS_TYPE, (mongoose_rdbms:db_type())).
61
62 -include("mongoose.hrl").
63
64 -ifdef(gen_server_request_id).
65 -type request_id() :: gen_server:request_id().
66 -else.
67 -type request_id() :: term().
68 -endif.
69
70 %
71 %% -----------------
72 %% Common functions
73
74 join([], _Sep) ->
75
:-(
[];
76 join([H|T], Sep) ->
77 4473 [H, [[Sep, X] || X <- T]].
78
79 %% -----------------
80 %% Generic queries
81
82 get_db_type() ->
83
:-(
?RDBMS_TYPE.
84
85 -spec execute_upsert(Host :: mongoose_rdbms:server(),
86 Name :: atom(),
87 InsertParams :: [any()],
88 UpdateParams :: [any()],
89 UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result().
90 execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
91 2249 case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
92 {mysql, _} ->
93
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
94 {pgsql, _} ->
95 2249 mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
96 {odbc, mssql} ->
97
:-(
mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
98
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
99 end.
100
101 -spec request_upsert(Host :: mongoose_rdbms:server(),
102 Name :: atom(),
103 InsertParams :: [any()],
104 UpdateParams :: [any()],
105 UniqueKeyValues :: [any()]) -> request_id().
106 request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
107 471 case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
108 {mysql, _} ->
109
:-(
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams);
110 {pgsql, _} ->
111 471 mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams);
112 {odbc, mssql} ->
113
:-(
mongoose_rdbms:execute_request(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
114
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
115 end.
116
117 %% @doc
118 %% This functions prepares query for inserting a row or updating it if already exists
119 %% Updates can be either fields or literal expressions like "column = tab.column + 1"
120 -spec prepare_upsert(Host :: mongoose_rdbms:server(),
121 QueryName :: atom(),
122 TableName :: atom(),
123 InsertFields :: [binary()],
124 Updates :: [binary() | {assignment | expression, binary(), binary()}],
125 UniqueKeyFields :: [binary()]) ->
126 {ok, QueryName :: atom()} | {error, already_exists}.
127 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) ->
128 776 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, none).
129
130 -spec prepare_upsert(Host :: mongoose_rdbms:server(),
131 QueryName :: atom(),
132 TableName :: atom(),
133 InsertFields :: [ColumnName :: binary()],
134 Updates :: [binary() | {assignment | expression, binary(), binary()}],
135 UniqueKeyFields :: [binary()],
136 IncrementalField :: none | binary()) ->
137 {ok, QueryName :: atom()} | {error, already_exists}.
138 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
139 898 SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField),
140 898 Query = iolist_to_binary(SQL),
141 898 ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}),
142 898 Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields),
143 898 mongoose_rdbms:prepare(Name, Table, Fields, Query).
144
145 prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) ->
146 898 UpdateFields = lists:filtermap(fun get_field_name/1, Updates),
147 898 case mongoose_rdbms:db_type() of
148 mssql ->
149
:-(
UniqueKeyFields ++ InsertFields ++ UpdateFields;
150 898 _ -> InsertFields ++ UpdateFields
151 end.
152
153 upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
154 898 case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
155 {mysql, _} ->
156
:-(
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
157 {pgsql, _} ->
158 898 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
159 {odbc, mssql} ->
160
:-(
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields);
161
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
162 end.
163
164 mysql_and_pgsql_insert(Table, Columns) ->
165 898 JoinedFields = join(Columns, <<", ">>),
166 898 Placeholders = lists:duplicate(length(Columns), $?),
167 898 ["INSERT INTO ", atom_to_binary(Table, utf8), " (",
168 JoinedFields,
169 ") VALUES (",
170 join(Placeholders, ", "),
171 ")"].
172
173 upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) ->
174
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
175
:-(
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField),
176
:-(
[Insert, OnConflict].
177
178 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
179 898 Insert = mysql_and_pgsql_insert(Table, InsertFields),
180 898 OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields),
181 898 WhereIncrements = pgsql_ensure_increments(Table, IncrementalField),
182 898 [Insert, OnConflict, WhereIncrements].
183
184 mysql_on_conflict(_Table, [], Key, _) ->
185 %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL
186
:-(
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key];
187 mysql_on_conflict(_Table, UpdateFields, _, none) ->
188
:-(
[" ON DUPLICATE KEY UPDATE ",
189 update_fields_on_conflict(UpdateFields)];
190 mysql_on_conflict(Table, UpdateFields, _, IncrementalField) ->
191
:-(
TableName = atom_to_list(Table),
192
:-(
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField)
193
:-(
|| Update <- UpdateFields],
194
:-(
IncrUpdates = join(FieldsWithPlaceHolders, ", "),
195
:-(
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates].
196
197 mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) ->
198
:-(
Alternatives = case UpdateField of
199 {Op, Column, Expression} when Op =:= assignment; Op =:= expression ->
200
:-(
[Expression, ", ", TableName, ".", Column, ")"];
201 Column ->
202
:-(
["? , ", TableName, ".", Column, ")"]
203 end,
204
:-(
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", "
205 | Alternatives].
206
207 pgsql_on_conflict([], UniqueKeyFields) ->
208 7 JoinedKeys = join(UniqueKeyFields, ", "),
209 7 [" ON CONFLICT (", JoinedKeys, ") DO NOTHING"];
210 pgsql_on_conflict(UpdateFields, UniqueKeyFields) ->
211 891 JoinedKeys = join(UniqueKeyFields, ", "),
212 891 [" ON CONFLICT (", JoinedKeys, ")"
213 " DO UPDATE SET ",
214 update_fields_on_conflict(UpdateFields)].
215
216 update_fields_on_conflict(Updates) ->
217 891 FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates],
218 891 join(FieldsWithPlaceHolders, ", ").
219
220 pgsql_ensure_increments(_Table, none) ->
221 776 [];
222 pgsql_ensure_increments(Table, IncrementalField) ->
223 122 TableName = atom_to_list(Table),
224 122 [" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField].
225
226 upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
227
:-(
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields],
228
:-(
BinTab = atom_to_binary(Table, utf8),
229
:-(
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields],
230
:-(
JoinedInsertFields = join(InsertFields, ", "),
231
:-(
Placeholders = lists:duplicate(length(InsertFields), $?),
232
:-(
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)"
233 " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")"
234 " AS source (", join(UniqueKeyFields, ", "), ")"
235 " ON (", join(UniqueConstraint, " AND "), ")"
236 " WHEN NOT MATCHED THEN INSERT"
237 " (", JoinedInsertFields, ")"
238 " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)].
239
240
:-(
mssql_on_conflict([]) -> ";";
241 mssql_on_conflict(Updates) ->
242
:-(
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"].
243
244 get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression ->
245 168 [ColumnName, " = ", Expr];
246 get_field_expression(Field) when is_binary(Field) ->
247 5062 [Field, " = ?"].
248
249 get_field_name({assignment, Field, _}) when is_binary(Field) ->
250
:-(
false;
251 get_field_name({expression, Field, _}) when is_binary(Field) ->
252 168 {true, Field};
253 get_field_name(Field) when is_binary(Field) ->
254 5062 true.
255
256 %% F can be either a fun or a list of queries
257 %% TODO: We should probably move the list of queries transaction
258 %% wrapper from the mongoose_rdbms module to this one (rdbms_queries)
259 sql_transaction(LServer, F) ->
260 441 mongoose_rdbms:sql_transaction(LServer, F).
261
262 begin_trans() ->
263 17224 begin_trans(?RDBMS_TYPE).
264
265 begin_trans(mssql) ->
266
:-(
rdbms_queries_mssql:begin_trans();
267 begin_trans(_) ->
268 17224 [<<"BEGIN;">>].
269
270 %% Count number of records in a table given a where clause
271 count_records_where(LServer, Table, WhereClause) ->
272
:-(
mongoose_rdbms:sql_query(
273 LServer,
274 [<<"select count(*) from ">>, Table, " ", WhereClause, ";"]).
275
276 -spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()],
277 RowsNum :: pos_integer()) ->
278 {iodata(), [binary()]}.
279 create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) ->
280 278 create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum);
281 create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) ->
282 278 create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum);
283 create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 ->
284 278 JoinedFields = join(Fields, <<", ">>),
285 278 Placeholders = lists:duplicate(length(Fields), <<"?">>),
286 278 PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>],
287 278 PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet),
288 278 JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>),
289 278 Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") "
290 "VALUES ">>, JoinedPlaceholderSets, <<";">>],
291 278 Fields2 = lists:append(lists:duplicate(RowsNum, Fields)),
292 278 {Sql, Fields2}.
293
294 get_db_specific_limits() ->
295 236 do_get_db_specific_limits(?RDBMS_TYPE, "?", true).
296
297 get_db_specific_limits_binaries() ->
298 199 {LimitSQL, LimitMSSQL} = get_db_specific_limits(),
299 199 {list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
300
301 -spec get_db_specific_limits(integer())
302 -> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}.
303 get_db_specific_limits(Limit) ->
304 130 LimitStr = integer_to_list(Limit),
305 130 do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false).
306
307 -spec get_db_specific_offset(integer(), integer()) -> iolist().
308 get_db_specific_offset(Offset, Limit) ->
309
:-(
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)).
310
311
312 %% Arguments:
313 %% - Type (atom) - database type
314 %% - LimitStr (string) - a field value
315 %% - Wrap (boolean) - add parentheses around a field for MSSQL
316 do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) ->
317
:-(
{"", "TOP " ++ LimitStr};
318 do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) ->
319
:-(
{"", "TOP (" ++ LimitStr ++ ")"};
320 do_get_db_specific_limits(_, LimitStr, _Wrap) ->
321 366 {"LIMIT " ++ LimitStr, ""}.
322
323 do_get_db_specific_offset(mssql, Offset, Limit) ->
324
:-(
[" OFFSET ", Offset, " ROWS"
325 " FETCH NEXT ", Limit, " ROWS ONLY"];
326 do_get_db_specific_offset(_, Offset, _Limit) ->
327
:-(
[" OFFSET ", Offset].
328
329 add_limit_arg(Limit, Args) ->
330 1487 add_limit_arg(?RDBMS_TYPE, Limit, Args).
331
332 add_limit_arg(mssql, Limit, Args) ->
333
:-(
[Limit|Args];
334 add_limit_arg(_, Limit, Args) ->
335 1487 Args ++ [Limit].
336
337 get_db_specific_limits_binaries(Limit) ->
338 130 {LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit),
339 130 {list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
340
341 limit_offset_sql() ->
342 54 limit_offset_sql(?RDBMS_TYPE).
343
344 limit_offset_sql(mssql) ->
345
:-(
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>;
346 limit_offset_sql(_) ->
347 54 <<" LIMIT ? OFFSET ?">>.
348
349 limit_offset_args(Limit, Offset) ->
350 21 limit_offset_args(?RDBMS_TYPE, Limit, Offset).
351
352
:-(
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit];
353 21 limit_offset_args(_, Limit, Offset) -> [Limit, Offset].
Line Hits Source