1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2016 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
|
17 |
|
-module(mongoose_rdbms_pgsql). |
18 |
|
-author('konrad.zemek@erlang-solutions.com'). |
19 |
|
-behaviour(mongoose_rdbms_backend). |
20 |
|
|
21 |
|
-include_lib("epgsql/include/epgsql.hrl"). |
22 |
|
|
23 |
|
-type options() :: #{host := string(), |
24 |
|
port := inet:port_number(), |
25 |
|
database := string(), |
26 |
|
username := string(), |
27 |
|
password := string(), |
28 |
|
atom() => any()}. |
29 |
|
|
30 |
|
-export([escape_binary/1, unescape_binary/1, connect/2, disconnect/1, |
31 |
|
query/3, prepare/5, execute/4]). |
32 |
|
|
33 |
|
%% API |
34 |
|
|
35 |
|
-spec escape_binary(binary()) -> iodata(). |
36 |
|
escape_binary(Bin) when is_binary(Bin) -> |
37 |
87 |
[<<"decode('">>, base64:encode(Bin), <<"','base64')">>]. |
38 |
|
|
39 |
|
-spec unescape_binary(binary()) -> binary(). |
40 |
|
unescape_binary(<<"\\x", Bin/binary>>) -> |
41 |
174 |
base16:decode(Bin); |
42 |
|
unescape_binary(Bin) when is_binary(Bin) -> |
43 |
5885 |
Bin. |
44 |
|
|
45 |
|
-spec connect(options(), QueryTimeout :: non_neg_integer()) -> |
46 |
|
{ok, Connection :: term()} | {error, Reason :: any()}. |
47 |
|
connect(Options, QueryTimeout) -> |
48 |
520 |
case epgsql:connect(db_opts(Options)) of |
49 |
|
{ok, Pid} -> |
50 |
520 |
epgsql:squery(Pid, [<<"SET statement_timeout=">>, integer_to_binary(QueryTimeout)]), |
51 |
520 |
epgsql:squery(Pid, <<"SET standard_conforming_strings=off">>), |
52 |
520 |
{ok, Pid}; |
53 |
|
Error -> |
54 |
:-( |
Error |
55 |
|
end. |
56 |
|
|
57 |
|
-spec disconnect(Connection :: epgsql:connection()) -> ok. |
58 |
|
disconnect(Connection) -> |
59 |
510 |
epgsql:close(Connection). |
60 |
|
|
61 |
|
-spec query(Connection :: term(), Query :: any(), |
62 |
|
Timeout :: infinity | non_neg_integer()) -> mongoose_rdbms:query_result(). |
63 |
|
query(Connection, Query, _Timeout) -> |
64 |
37007 |
pgsql_to_rdbms(epgsql:squery(Connection, Query)). |
65 |
|
|
66 |
|
-spec prepare(Connection :: term(), Name :: atom(), Table :: binary(), |
67 |
|
Fields :: [binary()], Statement :: iodata()) -> |
68 |
|
{ok, term()} | {error, any()}. |
69 |
|
prepare(Connection, Name, _Table, _Fields, Statement) -> |
70 |
3836 |
BinName = [atom_to_binary(Name, latin1)], |
71 |
3836 |
ReplacedStatement = replace_question_marks(Statement), |
72 |
3836 |
case epgsql:parse(Connection, BinName, ReplacedStatement, []) of |
73 |
3836 |
{ok, _} -> epgsql:describe(Connection, statement, BinName); |
74 |
:-( |
Error -> Error |
75 |
|
end. |
76 |
|
|
77 |
|
-spec execute(Connection :: term(), StatementRef :: term(), Params :: [term()], |
78 |
|
Timeout :: infinity | non_neg_integer()) -> mongoose_rdbms:query_result(). |
79 |
|
execute(Connection, StatementRef, Params, _Timeout) -> |
80 |
147323 |
pgsql_to_rdbms(epgsql:prepared_query(Connection, StatementRef, Params)). |
81 |
|
|
82 |
|
%% Helpers |
83 |
|
|
84 |
|
-spec db_opts(options()) -> epgsql:connect_opts(). |
85 |
|
db_opts(Options) -> |
86 |
520 |
BasicOpts = maps:with([host, port, database, username, password], Options), |
87 |
520 |
TLSOpts = tls_opts(Options), |
88 |
520 |
maps:merge(BasicOpts#{codecs => [{mongoose_rdbms_pgsql_codec_boolean, []}]}, TLSOpts). |
89 |
|
|
90 |
|
tls_opts(#{tls := TLSOpts}) -> |
91 |
520 |
#{ssl => ssl_mode(TLSOpts), |
92 |
|
ssl_opts => just_tls:make_ssl_opts(maps:remove(required, TLSOpts))}; |
93 |
|
tls_opts(#{}) -> |
94 |
:-( |
#{}. |
95 |
|
|
96 |
520 |
ssl_mode(#{required := true}) -> required; |
97 |
:-( |
ssl_mode(#{required := false}) -> true. |
98 |
|
|
99 |
|
-spec pgsql_to_rdbms(epgsql:reply(term())) -> mongoose_rdbms:query_result(). |
100 |
|
pgsql_to_rdbms(Items) when is_list(Items) -> |
101 |
:-( |
lists:reverse([pgsql_to_rdbms(Item) || Item <- Items]); |
102 |
|
pgsql_to_rdbms({error, #error{codename = unique_violation}}) -> |
103 |
232 |
{error, duplicate_key}; |
104 |
|
pgsql_to_rdbms({error, #error{message = Message}}) -> |
105 |
:-( |
{error, unicode:characters_to_list(Message)}; |
106 |
|
pgsql_to_rdbms({ok, Count}) -> |
107 |
61676 |
{updated, Count}; |
108 |
|
pgsql_to_rdbms({ok, Count, _Column, Value}) -> |
109 |
:-( |
{updated, Count, Value}; |
110 |
|
pgsql_to_rdbms({ok, _Columns, Rows}) -> |
111 |
122422 |
{selected, Rows}. |
112 |
|
|
113 |
|
-spec replace_question_marks(Statement :: iodata()) -> iodata(). |
114 |
|
replace_question_marks(Statement) when is_list(Statement) -> |
115 |
:-( |
replace_question_marks(iolist_to_binary(Statement)); |
116 |
|
replace_question_marks(Statement) when is_binary(Statement) -> |
117 |
3836 |
[Head | Parts] = binary:split(Statement, <<"?">>, [global]), |
118 |
3836 |
Placeholders = [<<"$", (integer_to_binary(I))/binary>> || I <- lists:seq(1, length(Parts))], |
119 |
3836 |
PartsWithPlaceholders = lists:zipwith(fun(A, B) -> [A, B] end, Placeholders, Parts), |
120 |
3836 |
[Head | PartsWithPlaceholders]. |