./ct_report/coverage/rdbms_queries.COVER.html

1 %%%----------------------------------------------------------------------
2 %%% File : rdbms_queries.erl
3 %%% Author : Mickael Remond <mremond@process-one.net>
4 %%% Purpose : RDBMS queries dependind on back-end
5 %%% Created : by Mickael Remond <mremond@process-one.net>
6 %%%
7 %%%
8 %%% ejabberd, Copyright (C) 2002-2011 ProcessOne
9 %%%
10 %%% This program is free software; you can redistribute it and/or
11 %%% modify it under the terms of the GNU General Public License as
12 %%% published by the Free Software Foundation; either version 2 of the
13 %%% License, or (at your option) any later version.
14 %%%
15 %%% This program is distributed in the hope that it will be useful,
16 %%% but WITHOUT ANY WARRANTY; without even the implied warranty of
17 %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 %%% General Public License for more details.
19 %%%
20 %%% You should have received a copy of the GNU General Public License
21 %%% along with this program; if not, write to the Free Software
22 %%% Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 %%%
24 %%%----------------------------------------------------------------------
25
26 -module(rdbms_queries).
27 -author("mremond@process-one.net").
28
29 -export([get_db_type/0,
30 begin_trans/0,
31 get_db_specific_limits/0,
32 get_db_specific_limits/1,
33 get_db_specific_limits_binaries/0,
34 get_db_specific_limits_binaries/1,
35 get_db_specific_offset/2,
36 add_limit_arg/2,
37 limit_offset_sql/0,
38 limit_offset_args/2,
39 sql_transaction/2,
40 count_records_where/3,
41 create_bulk_insert_query/3]).
42
43 -export([join/2,
44 prepare_upsert/6,
45 execute_upsert/5]).
46
47 -ignore_xref([
48 count_records_where/3, get_db_specific_limits/1, get_db_specific_offset/2, get_db_type/0
49 ]).
50
51 %% We have only two compile time options for db queries:
52 %%-define(generic, true).
53 %%-define(mssql, true).
54 -ifndef(mssql).
55 -undef(generic).
56 -define(generic, true).
57 -endif.
58
59 -define(RDBMS_TYPE, (mongoose_rdbms:db_type())).
60
61 -include("mongoose.hrl").
62
63 %
64 %% -----------------
65 %% Common functions
66
67 join([], _Sep) ->
68
:-(
[];
69 join([H|T], Sep) ->
70 8216 [H, [[Sep, X] || X <- T]].
71
72 %% -----------------
73 %% Generic queries
74
75 get_db_type() ->
76
:-(
?RDBMS_TYPE.
77
78 -spec execute_upsert(Host :: mongoose_rdbms:server(),
79 Name :: atom(),
80 InsertParams :: [any()],
81 UpdateParams :: [any()],
82 UniqueKeyValues :: [any()]) -> mongoose_rdbms:query_result().
83 execute_upsert(Host, Name, InsertParams, UpdateParams, UniqueKeyValues) ->
84 964 case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
85 {mysql, _} ->
86
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
87 {pgsql, _} ->
88
:-(
mongoose_rdbms:execute(Host, Name, InsertParams ++ UpdateParams);
89 {odbc, mssql} ->
90 964 mongoose_rdbms:execute(Host, Name, UniqueKeyValues ++ InsertParams ++ UpdateParams);
91
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
92 end.
93
94 %% @doc
95 %% This functions prepares query for inserting a row or updating it if already exists
96 %% Updates can be either fields or literal expressions like "column = tab.column + 1"
97 -spec prepare_upsert(Host :: mongoose_rdbms:server(),
98 QueryName :: atom(),
99 TableName :: atom(),
100 InsertFields :: [binary()],
101 Updates :: [binary() | {binary(), binary()}],
102 UniqueKeyFields :: [binary()]) ->
103 {ok, QueryName :: atom()} | {error, already_exists}.
104 prepare_upsert(Host, Name, Table, InsertFields, Updates, UniqueKeyFields) ->
105 1249 SQL = upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields),
106 1249 Query = iolist_to_binary(SQL),
107 1249 ?LOG_DEBUG(#{what => rdbms_upsert_query, name => Name, query => Query}),
108 1249 Fields = prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields),
109 1249 mongoose_rdbms:prepare(Name, Table, Fields, Query).
110
111 prepared_upsert_fields(InsertFields, Updates, UniqueKeyFields) ->
112 1249 UpdateFields = lists:filtermap(fun get_field_name/1, Updates),
113 1249 case mongoose_rdbms:db_type() of
114 mssql ->
115 1249 UniqueKeyFields ++ InsertFields ++ UpdateFields;
116
:-(
_ -> InsertFields ++ UpdateFields
117 end.
118
119 upsert_query(Host, Table, InsertFields, Updates, UniqueKeyFields) ->
120 1249 case {mongoose_rdbms:db_engine(Host), mongoose_rdbms:db_type()} of
121 {mysql, _} ->
122
:-(
upsert_mysql_query(Table, InsertFields, Updates, UniqueKeyFields);
123 {pgsql, _} ->
124
:-(
upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields);
125 {odbc, mssql} ->
126 1249 upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields);
127
:-(
NotSupported -> erlang:error({rdbms_not_supported, NotSupported})
128 end.
129
130 mysql_and_pgsql_insert(Table, Columns) ->
131
:-(
JoinedFields = join(Columns, <<", ">>),
132
:-(
Placeholders = lists:duplicate(length(Columns), $?),
133
:-(
["INSERT INTO ", atom_to_binary(Table, utf8), " (",
134 JoinedFields,
135 ") VALUES (",
136 join(Placeholders, ", "),
137 ")"].
138
139 upsert_mysql_query(Table, InsertFields, Updates, [Key | _]) ->
140
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
141
:-(
OnConflict = mysql_on_conflict(Updates, Key),
142
:-(
[Insert, OnConflict].
143
144 upsert_pgsql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
145
:-(
Insert = mysql_and_pgsql_insert(Table, InsertFields),
146
:-(
OnConflict = pgsql_on_conflict(Updates, UniqueKeyFields),
147
:-(
[Insert, OnConflict].
148
149 mysql_on_conflict([], Key) ->
150 %% Update field to itself (no-op), there is no 'DO NOTHING' in MySQL
151
:-(
[" ON DUPLICATE KEY UPDATE ", Key, " = ", Key];
152 mysql_on_conflict(UpdateFields, _) ->
153
:-(
[" ON DUPLICATE KEY UPDATE ",
154 update_fields_on_conflict(UpdateFields)].
155
156 pgsql_on_conflict([], UniqueKeyFields) ->
157
:-(
JoinedKeys = join(UniqueKeyFields, ", "),
158
:-(
[" ON CONFLICT (", JoinedKeys, ") DO NOTHING"];
159 pgsql_on_conflict(UpdateFields, UniqueKeyFields) ->
160
:-(
JoinedKeys = join(UniqueKeyFields, ", "),
161
:-(
[" ON CONFLICT (", JoinedKeys, ")"
162 " DO UPDATE SET ",
163 update_fields_on_conflict(UpdateFields)].
164
165 update_fields_on_conflict(Updates) ->
166 1246 FieldsWithPlaceHolders = [update_field_expression(Update) || Update <- Updates],
167 1246 join(FieldsWithPlaceHolders, ", ").
168
169 upsert_mssql_query(Table, InsertFields, Updates, UniqueKeyFields) ->
170 1249 UniqueKeysInSelect = [[" ? AS ", Key] || Key <- UniqueKeyFields],
171 1249 BinTab = atom_to_binary(Table, utf8),
172 1249 UniqueConstraint = [[BinTab, ".", Key, " = source.", Key] || Key <- UniqueKeyFields],
173 1249 JoinedInsertFields = join(InsertFields, ", "),
174 1249 Placeholders = lists:duplicate(length(InsertFields), $?),
175 1249 ["MERGE INTO ", BinTab, " WITH (SERIALIZABLE)"
176 " USING (SELECT ", join(UniqueKeysInSelect, ", "), ")"
177 " AS source (", join(UniqueKeyFields, ", "), ")"
178 " ON (", join(UniqueConstraint, " AND "), ")"
179 " WHEN NOT MATCHED THEN INSERT"
180 " (", JoinedInsertFields, ")"
181 " VALUES (", join(Placeholders, ", "), ")" | mssql_on_conflict(Updates)].
182
183 3 mssql_on_conflict([]) -> ";";
184 mssql_on_conflict(Updates) ->
185 1246 [" WHEN MATCHED THEN UPDATE SET ", update_fields_on_conflict(Updates), ";"].
186
187 update_field_expression(Update) ->
188 8853 case get_field_expression(Update) of
189 25 {true, Expr} -> Expr;
190 8828 true -> [Update, " = ?"];
191
:-(
false -> Update
192 end.
193
194 get_field_expression({_, FieldExpr}) ->
195 25 case binary:match(FieldExpr, <<"=">>) of
196
:-(
nomatch -> false;
197 25 _ -> {true, FieldExpr}
198 end;
199 get_field_expression(FieldExpr) ->
200 8828 binary:match(FieldExpr, <<"=">>) =:= nomatch.
201
202 get_field_name({Field, _}) ->
203 25 case binary:match(Field, <<"=">>) of
204 25 nomatch -> {true, Field};
205
:-(
_ -> false
206 end;
207 get_field_name(FieldExpr) ->
208 8828 binary:match(FieldExpr, <<"=">>) =:= nomatch.
209
210 %% F can be either a fun or a list of queries
211 %% TODO: We should probably move the list of queries transaction
212 %% wrapper from the mongoose_rdbms module to this one (rdbms_queries)
213 sql_transaction(LServer, F) ->
214 368 mongoose_rdbms:sql_transaction(LServer, F).
215
216 begin_trans() ->
217 10023 begin_trans(?RDBMS_TYPE).
218
219 begin_trans(mssql) ->
220 10023 rdbms_queries_mssql:begin_trans();
221 begin_trans(_) ->
222
:-(
[<<"BEGIN;">>].
223
224 %% Count number of records in a table given a where clause
225 count_records_where(LServer, Table, WhereClause) ->
226
:-(
mongoose_rdbms:sql_query(
227 LServer,
228 [<<"select count(*) from ">>, Table, " ", WhereClause, ";"]).
229
230 -spec create_bulk_insert_query(Table :: iodata() | atom(), Fields :: [iodata() | atom()],
231 RowsNum :: pos_integer()) ->
232 {iodata(), [binary()]}.
233 create_bulk_insert_query(Table, Fields, RowsNum) when is_atom(Table) ->
234 225 create_bulk_insert_query(atom_to_binary(Table, utf8), Fields, RowsNum);
235 create_bulk_insert_query(Table, [Field | _] = Fields, RowsNum) when is_atom(Field) ->
236 225 create_bulk_insert_query(Table, [atom_to_binary(F, utf8) || F <- Fields], RowsNum);
237 create_bulk_insert_query(Table, Fields, RowsNum) when RowsNum > 0 ->
238 225 JoinedFields = join(Fields, <<", ">>),
239 225 Placeholders = lists:duplicate(length(Fields), <<"?">>),
240 225 PlaceholderSet = [<<"(">>, join(Placeholders, <<", ">>), <<")">>],
241 225 PlaceholderSets = lists:duplicate(RowsNum, PlaceholderSet),
242 225 JoinedPlaceholderSets = join(PlaceholderSets, <<", ">>),
243 225 Sql = [<<"INSERT INTO ">>, Table, <<" (">>, JoinedFields, <<") "
244 "VALUES ">>, JoinedPlaceholderSets, <<";">>],
245 225 Fields2 = lists:append(lists:duplicate(RowsNum, Fields)),
246 225 {Sql, Fields2}.
247
248 get_db_specific_limits() ->
249 568 do_get_db_specific_limits(?RDBMS_TYPE, "?", true).
250
251 get_db_specific_limits_binaries() ->
252 536 {LimitSQL, LimitMSSQL} = get_db_specific_limits(),
253 536 {list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
254
255 -spec get_db_specific_limits(integer())
256 -> {SQL :: nonempty_string(), []} | {[], MSSQL::nonempty_string()}.
257 get_db_specific_limits(Limit) ->
258 93 LimitStr = integer_to_list(Limit),
259 93 do_get_db_specific_limits(?RDBMS_TYPE, LimitStr, false).
260
261 -spec get_db_specific_offset(integer(), integer()) -> iolist().
262 get_db_specific_offset(Offset, Limit) ->
263
:-(
do_get_db_specific_offset(?RDBMS_TYPE, integer_to_list(Offset), integer_to_list(Limit)).
264
265
266 %% Arguments:
267 %% - Type (atom) - database type
268 %% - LimitStr (string) - a field value
269 %% - Wrap (boolean) - add parentheses around a field for MSSQL
270 do_get_db_specific_limits(mssql, LimitStr, _Wrap = false) ->
271 93 {"", "TOP " ++ LimitStr};
272 do_get_db_specific_limits(mssql, LimitStr, _Wrap = true) ->
273 568 {"", "TOP (" ++ LimitStr ++ ")"};
274 do_get_db_specific_limits(_, LimitStr, _Wrap) ->
275
:-(
{"LIMIT " ++ LimitStr, ""}.
276
277 do_get_db_specific_offset(mssql, Offset, Limit) ->
278
:-(
[" OFFSET ", Offset, " ROWS"
279 " FETCH NEXT ", Limit, " ROWS ONLY"];
280 do_get_db_specific_offset(_, Offset, _Limit) ->
281
:-(
[" OFFSET ", Offset].
282
283 add_limit_arg(Limit, Args) ->
284 1842 add_limit_arg(?RDBMS_TYPE, Limit, Args).
285
286 add_limit_arg(mssql, Limit, Args) ->
287 1842 [Limit|Args];
288 add_limit_arg(_, Limit, Args) ->
289
:-(
Args ++ [Limit].
290
291 get_db_specific_limits_binaries(Limit) ->
292 93 {LimitSQL, LimitMSSQL} = get_db_specific_limits(Limit),
293 93 {list_to_binary(LimitSQL), list_to_binary(LimitMSSQL)}.
294
295 limit_offset_sql() ->
296 212 limit_offset_sql(?RDBMS_TYPE).
297
298 limit_offset_sql(mssql) ->
299 212 <<" OFFSET (?) ROWS FETCH NEXT (?) ROWS ONLY">>;
300 limit_offset_sql(_) ->
301
:-(
<<" LIMIT ? OFFSET ?">>.
302
303 limit_offset_args(Limit, Offset) ->
304 72 limit_offset_args(?RDBMS_TYPE, Limit, Offset).
305
306 72 limit_offset_args(mssql, Limit, Offset) -> [Offset, Limit];
307
:-(
limit_offset_args(_, Limit, Offset) -> [Limit, Offset].
Line Hits Source