./ct_report/coverage/rdbms_queries.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : rdbms_queries.erl
3 %%% Author : Mickael Remond <mremond@process-one.net>
4 %%% Purpose : RDBMS queries dependind on back-end
5 %%% Created : by Mickael Remond <mremond@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(rdbms_queries).
27 -author("mremond@process-one.net").
28
29 -export([get_db_type/0,
30 begin_trans/0,
31 get_db_specific_limits/0,
32 get_db_specific_limits/1,
33 get_db_specific_limits_binaries/0,
34 get_db_specific_limits_binaries/1,
35 get_db_specific_offset/2,
36 add_limit_arg/2,
37 limit_offset_sql/0,
38 limit_offset_args/2,
39 sql_transaction/2,
40 count_records_where/3,
41 create_bulk_insert_query/3]).
42
43 -export([join/2,
44 prepare_upsert/6,
45 prepare_upsert/7,
46 execute_upsert/5, execute_upsert/6,
47 request_upsert/5]).
48
49 -ignore_xref([
50 execute_upsert/6, count_records_where/3,
51 get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0
52 ]).
53
54 %% We have only two compile time options for db queries:
55 %%-define(generic, true).
56 %%-define(mssql, true).
57 -ifndef(mssql).
58 -undef(generic).
59 -endif.
60
61 -define(RDBMS_TYPE, (mongoose_rdbms:db_type())).
62
63 -include("mongoose.hrl").
64
65 -ifdef(gen_server_request_id).
66 -type request_id() :: gen_server:request_id().
67 -else.
68 -type request_id() :: term().
69 -endif.
70
71 %
72 %% -----------------
73 %% Common functions
74
75 join([], _Sep) ->
76
:-(
[];
77 join([H|T], Sep) ->
78 7656 [H, [[Sep, X] || X <- T]].
79
80 %% -----------------
81 %% Generic queries
82
83 get_db_type() ->
84
:-(
?RDBMS_TYPE.
85
86 -spec execute_upsert(HostType :: mongooseim:host_type_or_global(),
87 Name :: atom(),
88 InsertParams :: [any()],
89 UpdateParams :: [any()],
90 UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result().
91 execute_upsert(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
92 1672 execute_upsert(HostType, default, Name, InsertParams, UpdateParams, UniqueKeyValues).
93
94 -spec execute_upsert(HostType :: mongooseim:host_type_or_global(),
95 PoolTag :: mongoose_wpool:tag(),
96 Name :: atom(),
97 InsertParams :: [any()],
98 UpdateParams :: [any()],
99 UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result().
100 execute_upsert(HostType, PoolTag, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
101 1675 case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of
102 {mysql, _} ->
103
:-(
mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams);
104 {pgsql, _} ->
105 1675 mongoose_rdbms:execute(HostType, PoolTag, Name, InsertParams ++ UpdateParams);
106 {odbc, mssql} ->
107
:-(
mongoose_rdbms:execute(HostType, PoolTag, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
108
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
109 end.
110
111 -spec request_upsert(HostType :: mongooseim:host_type_or_global(),
112 Name :: atom(),
113 InsertParams :: [any()],
114 UpdateParams :: [any()],
115 UniqueKeyValues :: [any()]) -> request_id().
116 request_upsert(HostType, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
117 479 case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of
118 {mysql, _} ->
119
:-(
mongoose_rdbms:execute_request(HostType, Name, InsertParams ++ UpdateParams);
120 {pgsql, _} ->
121 479 mongoose_rdbms:execute_request(HostType, Name, InsertParams ++ UpdateParams);
122 {odbc, mssql} ->
123
:-(
mongoose_rdbms:execute_request(HostType, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
124
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
125 end.
126
127 %% @doc
128 %% This functions prepares query for inserting a row or updating it if already exists
129 %% Updates can be either fields or literal expressions like "column = tab.column + 1"
130 -spec prepare_upsert(HostType :: mongooseim:host_type_or_global(),
131 QueryName :: mongoose_rdbms:query_name(),
132 TableName :: atom(),
133 InsertFields :: [binary()],
134 Updates :: [binary() | {assignment | expression, binary(), binary()}],
135 UniqueKeyFields :: [binary()]) ->
136 {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}.
137 prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields) ->
138 1580 prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields, none).
139
140 -spec prepare_upsert(HostType :: mongooseim:host_type_or_global(),
141 QueryName :: mongoose_rdbms:query_name(),
142 TableName :: atom(),
143 InsertFields :: [ColumnName :: binary()],
144 Updates :: [binary() | {assignment | expression, binary(), binary()}],
145 UniqueKeyFields :: [binary()],
146 IncrementalField :: none | binary()) ->
147 {ok, QueryName :: mongoose_rdbms:query_name()} | {error, already_exists}.
148 prepare_upsert(HostType, Name, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
149 1686 SQL = upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField),
150 1686 Query = iolist_to_binary(SQL),
151 1686 ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}),
152 1686 Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields),
153 1686 mongoose_rdbms:prepare(Name, Table, Fields, Query).
154
155 prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) ->
156 1686 UpdateFields = lists:filtermap(fun get_field_name/1, Updates),
157 1686 case mongoose_rdbms:db_type() of
158 mssql ->
159
:-(
UniqueKeyFields ++ InsertFields ++ UpdateFields;
160 1686 _ -> InsertFields ++ UpdateFields
161 end.
162
163 upsert_query(HostType, Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
164 1686 case {mongoose_rdbms:db_engine(HostType), mongoose_rdbms:db_type()} of
165 {mysql, _} ->
166
:-(
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
167 {pgsql, _} ->
168 1686 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField);
169 {odbc, mssql} ->
170
:-(
upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields);
171
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
172 end.
173
174 mysql_and_pgsql_insert(Table, Columns) ->
175 1686 JoinedFields = join(Columns, <<", ">>),
176 1686 Placeholders = lists:duplicate(length(Columns), $?),
177 1686 ["INSERT INTO ", atom_to_binary(Table, utf8), " (",
178 JoinedFields,
179 ") VALUES (",
180 join(Placeholders, ", "),
181 ")"].
182
183 upsert_mysql_query(Table, InsertFields, Updates, [Key | _], IncrementalField) ->
184
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
185
:-(
OnConflict = mysql_on_conflict(Table, Updates, Key, IncrementalField),
186
:-(
[Insert, OnConflict].
187
188 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields, IncrementalField) ->
189 1686 Insert = mysql_and_pgsql_insert(Table, InsertFields),
190 1686 OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields),
191 1686 WhereIncrements = pgsql_ensure_increments(Table, IncrementalField),
192 1686 [Insert, OnConflict, WhereIncrements].
193
194 mysql_on_conflict(_Table, [], Key, _) ->
195 %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL
196
:-(
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key];
197 mysql_on_conflict(_Table, UpdateFields, _, none) ->
198
:-(
[" ON DUPLICATE KEY UPDATE ",
199 update_fields_on_conflict(UpdateFields)];
200 mysql_on_conflict(Table, UpdateFields, _, IncrementalField) ->
201
:-(
TableName = atom_to_list(Table),
202
:-(
FieldsWithPlaceHolders = [mysql_fields_with_placeholders(TableName, Update, IncrementalField)
203
:-(
|| Update <- UpdateFields],
204
:-(
IncrUpdates = join(FieldsWithPlaceHolders, ", "),
205
:-(
[" AS alias ON DUPLICATE KEY UPDATE ", IncrUpdates].
206
207 mysql_fields_with_placeholders(TableName, UpdateField, IncrementalField) ->
208
:-(
Alternatives = case UpdateField of
209 {Op, Column, Expression} when Op =:= assignment; Op =:= expression ->
210
:-(
[Expression, ", ", TableName, ".", Column, ")"];
211 Column ->
212
:-(
["? , ", TableName, ".", Column, ")"]
213 end,
214
:-(
[ Column, " = IF(", TableName, ".", IncrementalField, " < alias.", IncrementalField, ", "
215 | Alternatives].
216
217 pgsql_on_conflict([], UniqueKeyFields) ->
218 7 JoinedKeys = join(UniqueKeyFields, ", "),
219 7 [" ON CONFLICT (", JoinedKeys, ") DO NOTHING"];
220 pgsql_on_conflict(UpdateFields, UniqueKeyFields) ->
221 1679 JoinedKeys = join(UniqueKeyFields, ", "),
222 1679 [" ON CONFLICT (", JoinedKeys, ")"
223 " DO UPDATE SET ",
224 update_fields_on_conflict(UpdateFields)].
225
226 update_fields_on_conflict(Updates) ->
227 1679 FieldsWithPlaceHolders = [get_field_expression(Update) || Update <- Updates],
228 1679 join(FieldsWithPlaceHolders, ", ").
229
230 pgsql_ensure_increments(_Table, none) ->
231 1580 [];
232 pgsql_ensure_increments(Table, IncrementalField) ->
233 106 TableName = atom_to_list(Table),
234 106 [" WHERE ", TableName, ".", IncrementalField, " < EXCLUDED.", IncrementalField].
235
236 upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
237
:-(
UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields],
238
:-(
BinTab = atom_to_binary(Table, utf8),
239
:-(
UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields],
240
:-(
JoinedInsertFields = join(InsertFields, ", "),
241
:-(
Placeholders = lists:duplicate(length(InsertFields), $?),
242
:-(
["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)"
243 " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")"
244 " AS source (", join(UniqueKeyFields, ", "), ")"
245 " ON (", join(UniqueConstraint, " AND "), ")"
246 " WHEN NOT MATCHED THEN INSERT"
247 " (", JoinedInsertFields, ")"
248 " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)].
249
250
:-(
mssql_on_conflict([]) -> ";";
251 mssql_on_conflict(Updates) ->
252
:-(
[" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"].
253
254 get_field_expression({Op, ColumnName, Expr}) when Op =:= assignment; Op =:= expression ->
255 144 [ColumnName, " = ", Expr];
256 get_field_expression(Field) when is_binary(Field) ->
257 11513 [Field, " = ?"].
258
259 get_field_name({assignment, Field, _}) when is_binary(Field) ->
260
:-(
false;
261 get_field_name({expression, Field, _}) when is_binary(Field) ->
262 144 {true, Field};
263 get_field_name(Field) when is_binary(Field) ->
264 11513 true.
265
266 %% F can be either a fun or a list of queries
267 %% TODO: We should probably move the list of queries transaction
268 %% wrapper from the mongoose_rdbms module to this one (rdbms_queries)
269 sql_transaction(LServer, F) ->
270 456 mongoose_rdbms:sql_transaction(LServer, F).
271
272 begin_trans() ->
273 16676 begin_trans(?RDBMS_TYPE).
274
275 begin_trans(mssql) ->
276
:-(
rdbms_queries_mssql:begin_trans();
277 begin_trans(_) ->
278 16676 [<<"BEGIN;">>].
279
280 %% Count number of records in a table given a where clause
281 count_records_where(LServer, Table, WhereClause) ->
282
:-(
mongoose_rdbms:sql_query(
283 LServer,
284 [<<"select count(*) from ">>, Table, " ", WhereClause, ";"]).
285
286 -spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()],
287 RowsNum :: pos_integer()) ->
288 {iodata(), [binary()]}.
289 create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) ->
290 285 create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum);
291 create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) ->
292 285 create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum);
293 create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 ->
294 285 JoinedFields = join(Fields, <<", ">>),
295 285 Placeholders = lists:duplicate(length(Fields), <<"?">>),
296 285 PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>],
297 285 PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet),
298 285 JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>),
299 285 Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") "
300 "VALUES ">>, JoinedPlaceholderSets, <<";">>],
301 285 Fields2 = lists:append(lists:duplicate(RowsNum, Fields)),
302 285 {Sql, Fields2}.
303
304 get_db_specific_limits() ->
305 438 do_get_db_specific_limits(?RDBMS_TYPE, "?", true).
306
307 get_db_specific_limits_binaries() ->
308 395 {LimitSQL, LimitMSSQL} = get_db_specific_limits(),
309 395 {list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
310
311 -spec get_db_specific_limits(integer())
312 -> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}.
313 get_db_specific_limits(Limit) ->
314 148 LimitStr = integer_to_list(Limit),
315 148 do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false).
316
317 -spec get_db_specific_offset(integer(), integer()) -> iolist().
318 get_db_specific_offset(Offset, Limit) ->
319
:-(
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)).
320
321
322 %% Arguments:
323 %% - Type (atom) - database type
324 %% - LimitStr (string) - a field value
325 %% - Wrap (boolean) - add parentheses around a field for MSSQL
326 do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) ->
327
:-(
{"", "TOP " ++ LimitStr};
328 do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) ->
329
:-(
{"", "TOP (" ++ LimitStr ++ ")"};
330 do_get_db_specific_limits(_, LimitStr, _Wrap) ->
331 586 {"LIMIT " ++ LimitStr, ""}.
332
333 do_get_db_specific_offset(mssql, Offset, Limit) ->
334
:-(
[" OFFSET ", Offset, " ROWS"
335 " FETCH NEXT ", Limit, " ROWS ONLY"];
336 do_get_db_specific_offset(_, Offset, _Limit) ->
337
:-(
[" OFFSET ", Offset].
338
339 add_limit_arg(Limit, Args) ->
340 2030 add_limit_arg(?RDBMS_TYPE, Limit, Args).
341
342 add_limit_arg(mssql, Limit, Args) ->
343
:-(
[Limit|Args];
344 add_limit_arg(_, Limit, Args) ->
345 2030 Args ++ [Limit].
346
347 get_db_specific_limits_binaries(Limit) ->
348 148 {LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit),
349 148 {list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
350
351 limit_offset_sql() ->
352 237 limit_offset_sql(?RDBMS_TYPE).
353
354 limit_offset_sql(mssql) ->
355
:-(
<<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>;
356 limit_offset_sql(_) ->
357 237 <<" LIMIT ? OFFSET ?">>.
358
359 limit_offset_args(Limit, Offset) ->
360 27 limit_offset_args(?RDBMS_TYPE, Limit, Offset).
361
362
:-(
limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit];
363 27 limit_offset_args(_, Limit, Offset) -> [Limit, Offset].
Line Hits Source