./ct_report/coverage/mongoose_cassandra.COVER.html

1 %%==============================================================================
2 %% Copyright 2016 Erlang Solutions Ltd.
3 %%
4 %% Licensed under the Apache License, Version 2.0 (the "License");
5 %% you may not use this file except in compliance with the License.
6 %% You may obtain a copy of the License at
7 %%
8 %% http://www.apache.org/licenses/LICENSE-2.0
9 %%
10 %% Unless required by applicable law or agreed to in writing, software
11 %% distributed under the License is distributed on an "AS IS" BASIS,
12 %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 %% See the License for the specific language governing permissions and
14 %% limitations under the License.
15 %%==============================================================================
16 -module(mongoose_cassandra).
17 -author('rafal.slota@erlang-solutions.com').
18
19 -include_lib("cqerl/include/cqerl.hrl").
20 -include("mongoose_logger.hrl").
21
22 %% ====================================================================
23 %% Definitions
24 %% ====================================================================
25
26 -define(WRITE_RETRY_COUNT, 3).
27 -define(READ_RETRY_COUNT, 3).
28
29 %% ====================================================================
30 %% Types
31 %% ====================================================================
32
33 -type row() :: #{atom() => term()}.
34 -type rows() :: [row()].
35 -type parameters() :: proplists:proplist() | row().
36 -type fold_fun() :: fun((Page :: rows(), fold_accumulator()) -> fold_accumulator()).
37 -type fold_accumulator() :: any().
38
39 -type pool_name() :: atom().
40 -type query_name() :: atom() | {atom(), atom()}.
41
42 %% ====================================================================
43 %% Exports
44 %% ====================================================================
45
46 %% API
47 -export([cql_read/5, cql_foldl/7, cql_write/5, cql_write_async/5]).
48 -export([now_timestamp/0]).
49
50 %% Test queries
51 -export([prepared_queries/0, total_count_query_cql/1, test_query_sql/0]).
52 -export([test_query/1, test_query/2, total_count_query/2]).
53
54 -ignore_xref([behaviour_info/1, prepared_queries/0, test_query/1, test_query/2,
55 test_query_sql/0, total_count_query/2, total_count_query_cql/1]).
56
57 %% Types
58 -export_type([pool_name/0, query_name/0]).
59 -export_type([row/0, rows/0, parameters/0, fold_fun/0, fold_accumulator/0]).
60
61 %% Callbacks definitions
62 -callback prepared_queries() -> list({term(), string()}).
63
64 %% ====================================================================
65 %% Cassandra API
66 %% ====================================================================
67
68
69 %% --------------------------------------------------------
70 %% @doc Execute batch write query to cassandra (insert, update or delete).
71 %% Note that Cassandra doesn't like big batches, therefore this function will try to
72 %% split given rows into batches of 20 rows and will fall back to smaller batches if
73 %% Cassandra rejects the query due to its size being to big.
74 %% --------------------------------------------------------
75 -spec cql_write(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(),
76 QueryName :: query_name(), Rows :: [parameters()]) ->
77 ok | {error, Reason :: any()}.
78 cql_write(PoolName, UserJID, Module, QueryName, Rows) ->
79 18 QueryStr = proplists:get_value(QueryName, Module:prepared_queries()),
80 18 Opts = #{
81 retry => ?WRITE_RETRY_COUNT
82 },
83 18 mongoose_cassandra_worker:write(PoolName, UserJID, QueryStr, Rows, Opts).
84
85 %% --------------------------------------------------------
86 %% @doc Execute async batch write query to cassandra (insert, update or delete).
87 %% Note that Cassandra doesn't like big batches and there's not retry login when query size will
88 %% be exceeded like in cql_write/5.
89 %% --------------------------------------------------------
90 -spec cql_write_async(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(),
91 QueryName :: query_name(), Rows :: [parameters()]) ->
92 ok | {error, Reason :: any()}.
93 cql_write_async(PoolName, UserJID, Module, QueryName, Rows) ->
94 746 QueryStr = proplists:get_value(QueryName, Module:prepared_queries()),
95 746 Opts = #{
96 retry => ?WRITE_RETRY_COUNT
97 },
98 746 mongoose_cassandra_worker:write_async(PoolName, UserJID, QueryStr, Rows, Opts).
99
100
101 %% --------------------------------------------------------
102 %% @doc Execute read query to cassandra (select).
103 %% Returns all rows at once even if there are several query pages.
104 %% --------------------------------------------------------
105 -spec cql_read(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(),
106 QueryName :: query_name(), Params :: parameters()) ->
107 {ok, Rows :: rows()} | {error, Reason :: any()}.
108 cql_read(PoolName, UserJID, Module, QueryName, Params) ->
109 189 Fun = fun(Page, Acc) -> [Page | Acc] end,
110 189 case cql_foldl(PoolName, UserJID, Module, QueryName, Params, Fun, []) of
111 {ok, Rows} ->
112 189 {ok, lists:append(lists:reverse(Rows))};
113 {error, Reason} ->
114
:-(
{error, Reason}
115 end.
116
117
118 %% --------------------------------------------------------
119 %% @doc Execute read query to cassandra (select).
120 %% This functions behaves much like the lists:foldl/3 but the input are pages from result of given
121 %% query. Therefore each execution of given fun gets list of several result rows (by default 100 at
122 %% most).
123 %% --------------------------------------------------------
124 -spec cql_foldl(PoolName :: pool_name(), UserJID :: jid:jid() | undefined, Module :: atom(),
125 QueryName :: query_name(), Params :: parameters(),
126 fold_fun(), AccIn :: fold_accumulator()) ->
127 {ok, AccOut :: fold_accumulator()} | {error, Reason :: any()}.
128 cql_foldl(PoolName, UserJID, Module, QueryName, Params, Fun, AccIn) ->
129 195 QueryStr = proplists:get_value(QueryName, Module:prepared_queries()),
130 195 Opts = #{
131 retry => ?READ_RETRY_COUNT
132 },
133 195 mongoose_cassandra_worker:read(PoolName, UserJID, QueryStr, Params, Fun, AccIn, Opts).
134
135
136 %% @doc Return timestamp in microseconds
137 now_timestamp() ->
138 6 os:system_time(microsecond).
139
140 %% ====================================================================
141 %% Internal functions
142 %% ====================================================================
143
144 %% ====================================================================
145 %% Queries
146 %% ====================================================================
147
148 prepared_queries() ->
149
:-(
[{test_query, test_query_sql()}] ++
150 total_count_queries().
151
152 test_query_sql() ->
153
:-(
"SELECT now() FROM system.local". %% "SELECT 1" for cassandra
154
155 total_count_query_cql(T) when is_atom(T) ->
156
:-(
"SELECT COUNT(*) FROM " ++ atom_to_list(T).
157
158
159 %% ====================================================================
160 %% Diagnostic utilities
161 %% ====================================================================
162
163 -spec test_query(pool_name()) -> ok | {error, Reason :: term()}.
164 test_query(PoolName) ->
165
:-(
test_query(PoolName, undefined).
166
167 test_query(PoolName, UserJID) ->
168
:-(
try mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, test_query, []) of
169
:-(
{ok, _} -> ok
170 catch
171 Class:Reason ->
172
:-(
{error, {Class, Reason}}
173 end.
174
175 -spec total_count_query(pool_name(), Table :: atom()) ->
176 non_neg_integer().
177 total_count_query(PoolName, Table) ->
178
:-(
UserJID = undefined,
179
:-(
Res = mongoose_cassandra:cql_read(PoolName, UserJID, ?MODULE, {total_count_query, Table}, []),
180
:-(
{ok, [#{count := Count}]} = Res,
181
:-(
Count.
182
183 total_count_queries() ->
184
:-(
[{{total_count_query, T}, total_count_query_cql(T)} || T <- tables()].
185
186
187 tables() ->
188
:-(
[
189 mam_message,
190 mam_muc_message,
191 mam_config
192 %% private_storage,
193 %% privacy_default_list,
194 %% privacy_list,
195 %% privacy_item,
196 %% rosterusers,
197 %% roster_version
198 ].
Line Hits Source