1 |
|
%%============================================================================== |
2 |
|
%% Copyright 2016 Erlang Solutions Ltd. |
3 |
|
%% |
4 |
|
%% Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
|
%% you may not use this file except in compliance with the License. |
6 |
|
%% You may obtain a copy of the License at |
7 |
|
%% |
8 |
|
%% http://www.apache.org/licenses/LICENSE-2.0 |
9 |
|
%% |
10 |
|
%% Unless required by applicable law or agreed to in writing, software |
11 |
|
%% distributed under the License is distributed on an "AS IS" BASIS, |
12 |
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
|
%% See the License for the specific language governing permissions and |
14 |
|
%% limitations under the License. |
15 |
|
%%============================================================================== |
16 |
|
-module(mongoose_cassandra). |
17 |
|
-author('rafal.slota@erlang-solutions.com'). |
18 |
|
|
19 |
|
-include_lib("cqerl/include/cqerl.hrl"). |
20 |
|
-include("mongoose_logger.hrl"). |
21 |
|
|
22 |
|
%% ==================================================================== |
23 |
|
%% Definitions |
24 |
|
%% ==================================================================== |
25 |
|
|
26 |
|
-define(WRITE_RETRY_COUNT, 3). |
27 |
|
-define(READ_RETRY_COUNT, 3). |
28 |
|
|
29 |
|
%% ==================================================================== |
30 |
|
%% Types |
31 |
|
%% ==================================================================== |
32 |
|
|
33 |
|
-type row() :: #{atom() => term()}. |
34 |
|
-type rows() :: [row()]. |
35 |
|
-type parameters() :: proplists:proplist() | row(). |
36 |
|
-type fold_fun() :: fun((Page :: rows(), fold_accumulator()) -> fold_accumulator()). |
37 |
|
-type fold_accumulator() :: any(). |
38 |
|
|
39 |
|
-type pool_name() :: atom(). |
40 |
|
-type query_name() :: atom() | {atom(), atom()}. |
41 |
|
|
42 |
|
%% ==================================================================== |
43 |
|
%% Exports |
44 |
|
%% ==================================================================== |
45 |
|
|
46 |
|
%% API |
47 |
|
-export([cql_read/5, cql_foldl/7, cql_write/5, cql_write_async/5]). |
48 |
|
-export([now_timestamp/0]). |
49 |
|
|
50 |
|
%% Test queries |
51 |
|
-export([prepared_queries/0, total_count_query_cql/1, test_query_sql/0]). |
52 |
|
-export([test_query/1, test_query/2, total_count_query/2]). |
53 |
|
|
54 |
|
-ignore_xref([behaviour_info/1, prepared_queries/0, test_query/1, test_query/2, |
55 |
|
test_query_sql/0, total_count_query/2, total_count_query_cql/1]). |
56 |
|
|
57 |
|
%% Types |
58 |
|
-export_type([pool_name/0, query_name/0]). |
59 |
|
-export_type([row/0, rows/0, parameters/0, fold_fun/0, fold_accumulator/0]). |
60 |
|
|
61 |
|
%% Callbacks definitions |
62 |
|
-callback prepared_queries() -> list({term(), string()}). |
63 |
|
|
64 |
|
%% ==================================================================== |
65 |
|
%% Cassandra API |
66 |
|
%% ==================================================================== |
67 |
|
|
68 |
|
|
69 |
|
%% -------------------------------------------------------- |
70 |
|
%% @doc Execute batch write query to cassandra (insert, update or delete). |
71 |
|
%% Note that Cassandra doesn't like big batches, therefore this function will try to |
72 |
|
%% split given rows into batches of 20 rows and will fall back to smaller batches if |
73 |
|
%% Cassandra rejects the query due to its size being to big. |
74 |
|
%% -------------------------------------------------------- |
75 |
|
-spec cql_write(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(), |
76 |
|
QueryName :: query_name(), Rows :: [parameters()]) -> |
77 |
|
ok | {error, Reason :: any()}. |
78 |
|
cql_write(PoolName, UserJID, Module, QueryName, Rows) -> |
79 |
426 |
QueryStr = proplists:get_value(QueryName, Module:prepared_queries()), |
80 |
426 |
Opts = #{ |
81 |
|
retry => ?WRITE_RETRY_COUNT |
82 |
|
}, |
83 |
426 |
mongoose_cassandra_worker:write(PoolName, UserJID, QueryStr, Rows, Opts). |
84 |
|
|
85 |
|
%% -------------------------------------------------------- |
86 |
|
%% @doc Execute async batch write query to cassandra (insert, update or delete). |
87 |
|
%% Note that Cassandra doesn't like big batches and there's not retry login when query size will |
88 |
|
%% be exceeded like in cql_write/5. |
89 |
|
%% -------------------------------------------------------- |
90 |
|
-spec cql_write_async(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(), |
91 |
|
QueryName :: query_name(), Rows :: [parameters()]) -> |
92 |
|
ok | {error, Reason :: any()}. |
93 |
|
cql_write_async(PoolName, UserJID, Module, QueryName, Rows) -> |
94 |
1363 |
QueryStr = proplists:get_value(QueryName, Module:prepared_queries()), |
95 |
1363 |
Opts = #{ |
96 |
|
retry => ?WRITE_RETRY_COUNT |
97 |
|
}, |
98 |
1363 |
mongoose_cassandra_worker:write_async(PoolName, UserJID, QueryStr, Rows, Opts). |
99 |
|
|
100 |
|
|
101 |
|
%% -------------------------------------------------------- |
102 |
|
%% @doc Execute read query to cassandra (select). |
103 |
|
%% Returns all rows at once even if there are several query pages. |
104 |
|
%% -------------------------------------------------------- |
105 |
|
-spec cql_read(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(), |
106 |
|
QueryName :: query_name(), Params :: parameters()) -> |
107 |
|
{ok, Rows :: rows()} | {error, Reason :: any()}. |
108 |
|
cql_read(PoolName, UserJID, Module, QueryName, Params) -> |
109 |
848 |
Fun = fun(Page, Acc) -> [Page | Acc] end, |
110 |
848 |
case cql_foldl(PoolName, UserJID, Module, QueryName, Params, Fun, []) of |
111 |
|
{ok, Rows} -> |
112 |
847 |
{ok, lists:append(lists:reverse(Rows))}; |
113 |
|
{error, Reason} -> |
114 |
1 |
{error, Reason} |
115 |
|
end. |
116 |
|
|
117 |
|
|
118 |
|
%% -------------------------------------------------------- |
119 |
|
%% @doc Execute read query to cassandra (select). |
120 |
|
%% This functions behaves much like the lists:foldl/3 but the input are pages from result of given |
121 |
|
%% query. Therefore each execution of given fun gets list of several result rows (by default 100 at |
122 |
|
%% most). |
123 |
|
%% -------------------------------------------------------- |
124 |
|
-spec cql_foldl(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(), |
125 |
|
QueryName :: query_name(), Params :: parameters(), |
126 |
|
fold_fun(), AccIn :: fold_accumulator()) -> |
127 |
|
{ok, AccOut :: fold_accumulator()} | {error, Reason :: any()}. |
128 |
|
cql_foldl(PoolName, UserJID, Module, QueryName, Params, Fun, AccIn) -> |
129 |
964 |
QueryStr = proplists:get_value(QueryName, Module:prepared_queries()), |
130 |
964 |
Opts = #{ |
131 |
|
retry => ?READ_RETRY_COUNT |
132 |
|
}, |
133 |
964 |
mongoose_cassandra_worker:read(PoolName, UserJID, QueryStr, Params, Fun, AccIn, Opts). |
134 |
|
|
135 |
|
|
136 |
|
%% @doc Return timestamp in microseconds |
137 |
|
now_timestamp() -> |
138 |
145 |
os:system_time(microsecond). |
139 |
|
|
140 |
|
%% ==================================================================== |
141 |
|
%% Internal functions |
142 |
|
%% ==================================================================== |
143 |
|
|
144 |
|
%% ==================================================================== |
145 |
|
%% Queries |
146 |
|
%% ==================================================================== |
147 |
|
|
148 |
|
prepared_queries() -> |
149 |
4 |
[{test_query, test_query_sql()}] ++ |
150 |
|
total_count_queries(). |
151 |
|
|
152 |
|
test_query_sql() -> |
153 |
4 |
"SELECT now() FROM system.local". %% "SELECT 1" for cassandra |
154 |
|
|
155 |
|
total_count_query_cql(T) when is_atom(T) -> |
156 |
12 |
"SELECT COUNT(*) FROM " ++ atom_to_list(T). |
157 |
|
|
158 |
|
|
159 |
|
%% ==================================================================== |
160 |
|
%% Diagnostic utilities |
161 |
|
%% ==================================================================== |
162 |
|
|
163 |
|
-spec test_query(pool_name()) -> ok | {error, Reason :: term()}. |
164 |
|
test_query(PoolName) -> |
165 |
4 |
test_query(PoolName, undefined). |
166 |
|
|
167 |
|
test_query(PoolName, UserJID) -> |
168 |
4 |
try mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, test_query, []) of |
169 |
3 |
{ok, _} -> ok |
170 |
|
catch |
171 |
|
Class:Reason -> |
172 |
:-( |
{error, {Class, Reason}} |
173 |
|
end. |
174 |
|
|
175 |
|
-spec total_count_query(pool_name(), Table :: atom()) -> |
176 |
|
non_neg_integer(). |
177 |
|
total_count_query(PoolName, Table) -> |
178 |
:-( |
UserJID = undefined, |
179 |
:-( |
Res = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, {total_count_query, Table}, []), |
180 |
:-( |
{ok, [#{count := Count}]} = Res, |
181 |
:-( |
Count. |
182 |
|
|
183 |
|
total_count_queries() -> |
184 |
4 |
[{{total_count_query, T}, total_count_query_cql(T)} || T <- tables()]. |
185 |
|
|
186 |
|
|
187 |
|
tables() -> |
188 |
4 |
[ |
189 |
|
mam_message, |
190 |
|
mam_muc_message, |
191 |
|
mam_config |
192 |
|
%% private_storage, |
193 |
|
%% privacy_default_list, |
194 |
|
%% privacy_list, |
195 |
|
%% privacy_item, |
196 |
|
%% rosterusers, |
197 |
|
%% roster_version |
198 |
|
]. |