./ct_report/coverage/rdbms_queries.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : rdbms_queries.erl
3 %%% Author : Mickael Remond <mremond@process-one.net>
4 %%% Purpose : RDBMS queries dependind on back-end
5 %%% Created : by Mickael Remond <mremond@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(rdbms_queries).
27 -author("mremond@process-one.net").
28
29 -export([get_db_type/0,
30 begin_trans/0,
31 get_db_specific_limits/0,
32 get_db_specific_limits/1,
33 get_db_specific_limits_binaries/0,
34 get_db_specific_limits_binaries/1,
35 get_db_specific_offset/2,
36 add_limit_arg/2,
37 limit_offset_sql/0,
38 limit_offset_args/2,
39 sql_transaction/2,
40 count_records_where/3,
41 create_bulk_insert_query/3]).
42
43 -export([join/2,
44 prepare_upsert/6,
45 execute_upsert/5,
46 request_upsert/5]).
47
48 -ignore_xref([
49 request_upsert/5, count_records_where/3,
50 get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0
51 ]).
52
53 %% We have only two compile time options for db queries:
54 %%-define(generic, true).
55 %%-define(mssql, true).
56 -ifndef(mssql).
57 -undef(generic).
58 -define(generic, true).
59 -endif.
60
61 -define(RDBMS_TYPE, (mongoose_rdbms:db_type())).
62
63 -include("mongoose.hrl").
64
65 %
66 %% -----------------
67 %% Common functions
68
69 join([], _Sep) ->
70
:-(
[];
71 join([H|T], Sep) ->
72
:-(
[H, [[Sep, X] || X <- T]].
73
74 %% -----------------
75 %% Generic queries
76
77 get_db_type() ->
78
:-(
?RDBMS_TYPE.
79
80 -spec execute_upsert(Host :: mongoose_rdbms:server(),
81 Name :: atom(),
82 InsertParams :: [any()],
83 UpdateParams :: [any()],
84 UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result().
85 execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
86
:-(
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
87 {mysql, _} ->
88
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
89 {pgsql, _} ->
90
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
91 {odbc, mssql} ->
92
:-(
mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
93
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
94 end.
95
96 -spec request_upsert(Host :: mongoose_rdbms:server(),
97 Name :: atom(),
98 InsertParams :: [any()],
99 UpdateParams :: [any()],
100 UniqueKeyValues :: [any()]) -> reference().
101 request_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
102
:-(
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
103 {mysql, _} ->
104
:-(
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams);
105 {pgsql, _} ->
106
:-(
mongoose_rdbms:execute_request(Host, Name, InsertParams ++ UpdateParams);
107 {odbc, mssql} ->
108
:-(
mongoose_rdbms:execute_request(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
109
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
110 end.
111
112 %% @doc
113 %% This functions prepares query for inserting a row or updating it if already exists
114 %% Updates can be either fields or literal expressions like "column = tab.column + 1"
115 -spec prepare_upsert(Host :: mongoose_rdbms:server(),
116 QueryName :: atom(),
117 TableName :: atom(),
118 InsertFields :: [binary()],
119 Updates :: [binary() | {binary(), binary()}],
120 UniqueKeyFields :: [binary()]) ->
121 {ok, QueryName :: atom()} | {error, already_exists}.
122 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) ->
123
:-(
SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields),
124
:-(
Query = iolist_to_binary(SQL),
125
:-(
?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}),
126
:-(
Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields),
127
:-(
mongoose_rdbms:prepare(Name, Table, Fields, Query).
128
129 prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) ->
130
:-(
UpdateFields = lists:filtermap(fun get_field_name/1, Updates),
131
:-(
case mongoose_rdbms:db_type() of
132 mssql ->
133
:-(
UniqueKeyFields ++ InsertFields ++ UpdateFields;
134
:-(
_ -> InsertFields ++ UpdateFields
135 end.
136
137 upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields) ->
138
:-(
case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
139 {mysql, _} ->
140
:-(
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields);
141 {pgsql, _} ->
142
:-(
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields);
143 {odbc, mssql} ->
144
:-(
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields);
145
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
146 end.
147
148 mysql_and_pgsql_insert(Table, Columns) ->
149
:-(
JoinedFields = join(Columns, <<", ">>),
150
:-(
Placeholders = lists:duplicate(length(Columns), $?),
151
:-(
["INSERT INTO ", atom_to_binary(Table, utf8), " (",
152 JoinedFields,
153 ") VALUES (",
154 join(Placeholders, ", "),
155 ")"].
156
157 upsert_mysql_query(Table, InsertFields, Updates, [Key | _]) ->
158
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
159
:-(
OnConflict = mysql_on_conflict(Updates, Key),
160
:-(
[Insert, OnConflict].
161
162 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
163
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
164
:-(
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields),
165
:-(
[Insert, OnConflict].
166
167 mysql_on_conflict([], Key) ->
168 %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL
169
:-(
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key];
170 mysql_on_conflict(UpdateFields, _) ->
171
:-(
[" ON DUPLICATE KEY UPDATE ",
172 update_fields_on_conflict(UpdateFields)].
173
174 pgsql_on_conflict([], UniqueKeyFields) ->
175
:-(
JoinedKeys = join(UniqueKeyFields, ", "),
176
:-(
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"];
177 pgsql_on_conflict(UpdateFields, UniqueKeyFields) ->
178
:-(
JoinedKeys = join(UniqueKeyFields, ", "),
179
:-(
[" ON CONFLICT (", JoinedKeys, ")"
180 " DO UPDATE SET ",
181 update_fields_on_conflict(UpdateFields)].
182
183 update_fields_on_conflict(Updates) ->
184
:-(
FieldsWithPlaceHolders = [update_field_expression(Update) || Update <- Updates],
185
:-(
join(FieldsWithPlaceHolders, ", ").
186
187 upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
188
:-(
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields],
189
:-(
BinTab = atom_to_binary(Table, utf8),
190
:-(
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields],
191
:-(
JoinedInsertFields = join(InsertFields, ", "),
192
:-(
Placeholders = lists:duplicate(length(InsertFields), $?),
193
:-(
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)"
194 " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")"
195 " AS source (", join(UniqueKeyFields, ", "), ")"
196 " ON (", join(UniqueConstraint, " AND "), ")"
197 " WHEN NOT MATCHED THEN INSERT"
198 " (", JoinedInsertFields, ")"
199 " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)].
200
201
:-(
mssql_on_conflict([]) -> ";";
202 mssql_on_conflict(Updates) ->
203
:-(
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"].
204
205 update_field_expression(Update) ->
206
:-(
case get_field_expression(Update) of
207
:-(
{true, Expr} -> Expr;
208
:-(
true -> [Update, " = ?"];
209
:-(
false -> Update
210 end.
211
212 get_field_expression({_, FieldExpr}) ->
213
:-(
case binary:match(FieldExpr, <<"=">>) of
214
:-(
nomatch -> false;
215
:-(
_ -> {true, FieldExpr}
216 end;
217 get_field_expression(FieldExpr) ->
218
:-(
binary:match(FieldExpr, <<"=">>) =:= nomatch.
219
220 get_field_name({Field, _}) ->
221
:-(
case binary:match(Field, <<"=">>) of
222
:-(
nomatch -> {true, Field};
223
:-(
_ -> false
224 end;
225 get_field_name(FieldExpr) ->
226
:-(
binary:match(FieldExpr, <<"=">>) =:= nomatch.
227
228 %% F can be either a fun or a list of queries
229 %% TODO: We should probably move the list of queries transaction
230 %% wrapper from the mongoose_rdbms module to this one (rdbms_queries)
231 sql_transaction(LServer, F) ->
232
:-(
mongoose_rdbms:sql_transaction(LServer, F).
233
234 begin_trans() ->
235
:-(
begin_trans(?RDBMS_TYPE).
236
237 begin_trans(mssql) ->
238
:-(
rdbms_queries_mssql:begin_trans();
239 begin_trans(_) ->
240
:-(
[<<"BEGIN;">>].
241
242 %% Count number of records in a table given a where clause
243 count_records_where(LServer, Table, WhereClause) ->
244
:-(
mongoose_rdbms:sql_query(
245 LServer,
246 [<<"select count(*) from ">>, Table, " ", WhereClause, ";"]).
247
248 -spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()],
249 RowsNum :: pos_integer()) ->
250 {iodata(), [binary()]}.
251 create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) ->
252
:-(
create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum);
253 create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) ->
254
:-(
create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum);
255 create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 ->
256
:-(
JoinedFields = join(Fields, <<", ">>),
257
:-(
Placeholders = lists:duplicate(length(Fields), <<"?">>),
258
:-(
PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>],
259
:-(
PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet),
260
:-(
JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>),
261
:-(
Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") "
262 "VALUES ">>, JoinedPlaceholderSets, <<";">>],
263
:-(
Fields2 = lists:append(lists:duplicate(RowsNum, Fields)),
264
:-(
{Sql, Fields2}.
265
266 get_db_specific_limits() ->
267
:-(
do_get_db_specific_limits(?RDBMS_TYPE, "?", true).
268
269 get_db_specific_limits_binaries() ->
270
:-(
{LimitSQL, LimitMSSQL} = get_db_specific_limits(),
271
:-(
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
272
273 -spec get_db_specific_limits(integer())
274 -> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}.
275 get_db_specific_limits(Limit) ->
276
:-(
LimitStr = integer_to_list(Limit),
277
:-(
do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false).
278
279 -spec get_db_specific_offset(integer(), integer()) -> iolist().
280 get_db_specific_offset(Offset, Limit) ->
281
:-(
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)).
282
283
284 %% Arguments:
285 %% - Type (atom) - database type
286 %% - LimitStr (string) - a field value
287 %% - Wrap (boolean) - add parentheses around a field for MSSQL
288 do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) ->
289
:-(
{"", "TOP " ++ LimitStr};
290 do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) ->
291
:-(
{"", "TOP (" ++ LimitStr ++ ")"};
292 do_get_db_specific_limits(_, LimitStr, _Wrap) ->
293
:-(
{"LIMIT " ++ LimitStr, ""}.
294
295 do_get_db_specific_offset(mssql, Offset, Limit) ->
296
:-(
[" OFFSET ", Offset, " ROWS"
297 " FETCH NEXT ", Limit, " ROWS ONLY"];
298 do_get_db_specific_offset(_, Offset, _Limit) ->
299
:-(
[" OFFSET ", Offset].
300
301 add_limit_arg(Limit, Args) ->
302
:-(
add_limit_arg(?RDBMS_TYPE, Limit, Args).
303
304 add_limit_arg(mssql, Limit, Args) ->
305
:-(
[Limit|Args];
306 add_limit_arg(_, Limit, Args) ->
307
:-(
Args ++ [Limit].
308
309 get_db_specific_limits_binaries(Limit) ->
310
:-(
{LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit),
311
:-(
{list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
312
313 limit_offset_sql() ->
314
:-(
limit_offset_sql(?RDBMS_TYPE).
315
316 limit_offset_sql(mssql) ->
317
:-(
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>;
318 limit_offset_sql(_) ->
319
:-(
<<" LIMIT ? OFFSET ?">>.
320
321 limit_offset_args(Limit, Offset) ->
322
:-(
limit_offset_args(?RDBMS_TYPE, Limit, Offset).
323
324
:-(
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit];
325
:-(
limit_offset_args(_, Limit, Offset) -> [Limit, Offset].
Line Hits Source