1: %%' Copyright © 2012-2017 Erlang Solutions Ltd 2: %% 3: %% Licensed under the Apache License, Version 2.0 (the "License"); 4: %% you may not use this file except in compliance with the License. 5: %% You may obtain a copy of the License at 6: %% 7: %% http://www.apache.org/licenses/LICENSE-2.0 8: %% 9: %% Unless required by applicable law or agreed to in writing, software 10: %% distributed under the License is distributed on an "AS IS" BASIS, 11: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12: %% See the License for the specific language governing permissions and 13: %% limitations under the License. 14: %%. 15: 16: -module(mongoose_cassandra_SUITE). 17: -compile([export_all, nowarn_export_all]). 18: 19: %% 20: %%' Imports 21: %% 22: 23: -import(distributed_helper, [mim/0, 24: require_rpc_nodes/1, 25: rpc/4]). 26: 27: %%. 28: %%' Preprocessor directives 29: %% 30: 31: -include_lib("common_test/include/ct.hrl"). 32: -include_lib("eunit/include/eunit.hrl"). 33: 34: %%. 35: %%' Configuration 36: %% 37: 38: -define(TEST_DB_POOL_NAME, default). 39: -define(TEST_KEYSPACE_NAME, "mongooseim"). 40: -define(TEST_TABLE_NAME, "test_table"). 41: 42: %% zazkia TCP proxy API endpoint configuration 43: %% This endpoint will be used to drop Cassandra connections while running some tests from this suite 44: %% zazkia TCP proxy is required to run this suite unless group 'not_so_happy_cases' is disabled 45: -define(TCP_PROXY_API_HOST, "localhost"). 46: -define(TCP_PROXY_API_PORT, 9191). 47: -define(TCP_PROXY_SERVICE_NAME, "cassandra"). 48: 49: %%. 50: %%' Common Test prelude 51: %% 52: 53: suite() -> 54: require_rpc_nodes([mim]) ++ [ 55: {require, ejabberd_node}, 56: {require, ejabberd_cookie} 57: ]. 58: 59: all() -> 60: [ 61: {group, happy_cases}, 62: {group, not_so_happy_cases} 63: %{group, stress_cases} % not intended for CI, uncomment for manual stress test 64: ]. 65: 66: groups() -> 67: G = [ 68: {happy_cases, [parallel], happy_cases()}, 69: {not_so_happy_cases, [sequence], not_so_happy_cases()}, 70: {stress_cases, [sequence], stress_cases()} 71: ], 72: ct_helper:repeat_all_until_all_ok(G). 73: 74: happy_cases() -> 75: [ 76: test_query, 77: empty_write_should_succeed, 78: single_row_read_write_should_succeed, 79: small_batch_read_write_should_succeed, 80: big_batch_read_write_should_succeed, 81: big_objects_read_write_should_succeed 82: ]. 83: 84: not_so_happy_cases() -> 85: [ 86: should_work_after_restart_of_cqerl_cluster, 87: should_work_after_connection_reset 88: ]. 89: 90: stress_cases() -> 91: [ 92: big_batch_read_write_unstable_connection_should_succeed 93: ]. 94: 95: init_per_suite(Config) -> 96: application:ensure_all_started(gun), 97: mongoose_helper:inject_module(?MODULE), 98: 99: case mam_helper:is_cassandra_enabled() of 100: true -> 101: [{pool, ?TEST_DB_POOL_NAME} | Config]; 102: false -> 103: {skip, no_cassandra_configured} 104: end. 105: 106: end_per_suite(Config) -> 107: Config. 108: 109: init_per_group(_GroupName, Config) -> 110: common_group_config(Config). 111: 112: common_group_config(Config) -> 113: Config. 114: 115: end_per_group(_, Config) -> 116: Config. 117: 118: init_per_testcase(_CaseName, Config) -> 119: Config. 120: 121: end_per_testcase(_CaseName, Config) -> 122: Config. 123: 124: %% Queries 125: 126: prepared_queries() -> 127: [ 128: {insert, "INSERT INTO " ?TEST_TABLE_NAME " (f1, f2) VALUES (?, ?)"}, 129: {select_by_f1, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"}, 130: {select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"}, 131: {delete, "DELETE FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"} 132: ]. 133: 134: %%. 135: %%' Tests 136: %% 137: 138: sanity_check(Config) -> 139: ct:pal("~p sanity check config: ~p", [?MODULE, Config]), 140: ok. 141: 142: test_query(_Config) -> 143: ?assertMatch(ok, call(test_query, [?TEST_DB_POOL_NAME])). 144: 145: empty_write_should_succeed(Config) -> 146: ?assertMatch(ok, cql_write(Config, insert, [])), 147: 148: ok. 149: 150: single_row_read_write_should_succeed(Config) -> 151: Row = #{f1 := F1, f2 := _F2} = #{f1 => int(), f2 => str()}, 152: Rows = [Row], 153: 154: ?assertMatch(ok, cql_write(Config, insert, Rows)), 155: ?assertMatch({ok, [Row]}, cql_read(Config, select_by_f1, #{f1 => F1})), 156: 157: ok. 158: 159: small_batch_read_write_should_succeed(Config) -> 160: F2 = str(), 161: RowCount = 10, 162: Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]), 163: 164: ?assertMatch(ok, cql_write(Config, insert, Rows)), 165: 166: {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}), 167: ?assertMatch({ok, _}, {Result, MaybeRows}), 168: ?assertMatch(RowCount, length(MaybeRows)), 169: ?assertMatch(Rows, lists:sort(MaybeRows)), 170: 171: ok. 172: 173: big_batch_read_write_should_succeed(Config) -> 174: F2 = str(), 175: RowCount = 100, 176: Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]), 177: 178: ?assertMatch(ok, cql_write(Config, insert, Rows)), 179: 180: {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}), 181: ?assertMatch({ok, _}, {Result, MaybeRows}), 182: ?assertMatch(RowCount, length(MaybeRows)), 183: ?assertMatch(Rows, lists:sort(MaybeRows)), 184: 185: ok. 186: 187: big_objects_read_write_should_succeed(Config) -> 188: F2 = str(1024 * 5), %% By default batch size limit is 50kB 189: RowCount = 20, %% 20 * (1024 * 5 * 2) is way over this limit 190: Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]), 191: 192: ?assertMatch(ok, cql_write(Config, insert, Rows)), 193: 194: {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}), 195: ?assertMatch({ok, _}, {Result, MaybeRows}), 196: ?assertMatch(RowCount, length(MaybeRows)), 197: ?assertMatch(Rows, lists:sort(MaybeRows)), 198: 199: ok. 200: 201: should_work_after_connection_reset(_Config) -> 202: reset_all_cassandra_connections(), 203: mongoose_helper:wait_until(fun() -> 204: call(test_query, [?TEST_DB_POOL_NAME]) 205: end, ok). 206: 207: should_work_after_restart_of_cqerl_cluster(_Config) -> 208: Pid1 = call(erlang, whereis, [cqerl_cluster]), 209: call(erlang, exit, [Pid1, kill]), 210: mongoose_helper:wait_until(fun() -> 211: Pid2 = call(erlang, whereis, [cqerl_cluster]), 212: Pid1 =/= Pid2 andalso Pid2 =/= undefined 213: end, true), 214: mongoose_helper:wait_until(fun() -> 215: call(test_query, [?TEST_DB_POOL_NAME]) 216: end, ok). 217: 218: big_batch_read_write_unstable_connection_should_succeed(Config) -> 219: Pid = spawn_link(fun reset_all_cassandra_connections_loop/0), 220: batch_main(Config, 30, timer:seconds(20)), 221: Pid ! exit. 222: 223: %%. 224: %%' Helpers 225: %% 226: 227: %% Changing the Interval to 1 second makes some requests fail - cqerl might be improved there 228: reset_all_cassandra_connections_loop() -> 229: reset_all_cassandra_connections_loop(timer:seconds(5)). 230: 231: reset_all_cassandra_connections_loop(Interval) -> 232: reset_all_cassandra_connections(), 233: receive 234: exit -> 235: ok 236: after Interval -> 237: reset_all_cassandra_connections_loop(Interval) 238: end. 239: 240: reset_all_cassandra_connections() -> 241: {ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT), 242: %% More on this proxy's API: https://github.com/emicklei/zazkia 243: Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []), 244: gun:await(ProxyPid, Ref), 245: gun:close(ProxyPid). 246: 247: batch_main(Config, RowCount, TimeLeft) when TimeLeft > 0 -> 248: StartTime = os:system_time(millisecond), 249: F2 = str(), 250: Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]), 251: ?assertMatch(ok, call_until_no_error(cql_write, Config, insert, Rows, 5)), 252: 253: {Result, MaybeRows} = call_until_no_error(cql_read, Config, select_by_f2, #{f2 => F2}, 5), 254: ?assertMatch({ok, _}, {Result, MaybeRows}), 255: ?assertMatch(RowCount, length(MaybeRows)), 256: ?assertMatch(Rows, lists:sort(MaybeRows)), 257: 258: RowsToDelete = [maps:with([f1], Row) || Row <- Rows], 259: ?assertMatch(ok, call_until_no_error(cql_write, Config, delete, RowsToDelete, 5)), 260: 261: EndTime = os:system_time(millisecond), 262: 263: batch_main(Config, RowCount, TimeLeft - (EndTime - StartTime)); 264: batch_main(_Config, _RowCount, _TimeLeft) -> 265: ok. 266: 267: %% The connection is unstable, so the query can fail sometimes. 268: %% In this case log the error and just repeat the failing query. 269: call_until_no_error(F, Config, QueryName, Arg, Retries) -> 270: try call(F, cql_args(Config, [QueryName, Arg])) of 271: {error, Error} when Retries > 0 -> 272: ct:pal("Got error ~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]), 273: timer:sleep(timer:seconds(1)), 274: call_until_no_error(F, Config, QueryName, Arg, Retries - 1); 275: Result -> 276: Result 277: catch 278: error:{badrpc, _} = Error when Retries > 0 -> 279: ct:pal("Got exception error:~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]), 280: timer:sleep(timer:seconds(1)), 281: call_until_no_error(F, Config, QueryName, Arg, Retries - 1) 282: end. 283: 284: getenv_or_fail(EnvVar) -> 285: case os:getenv(EnvVar) of 286: false -> error("environment variable " ++ EnvVar ++ "not defined"); 287: Value -> Value 288: end. 289: 290: call(F, A) -> 291: call(mongoose_cassandra, F, A). 292: 293: call(M, F, A) -> 294: RPCSpec = mim(), 295: rpc(RPCSpec#{timeout => timer:minutes(1)}, M, F, A). 296: 297: cql_args(Config, Args) -> 298: [?config(pool, Config), undefined, ?MODULE | Args]. 299: 300: cql_write(Config, QueryName, Rows) -> 301: call(cql_write, cql_args(Config, [QueryName, Rows])). 302: 303: cql_read(Config, QueryName, Params) -> 304: call(cql_read, cql_args(Config, [QueryName, Params])). 305: 306: int() -> 307: rand:uniform(999999999). 308: 309: str(Bytes) -> 310: erlang:list_to_binary( 311: erlang:integer_to_list( 312: binary:decode_unsigned( 313: crypto:strong_rand_bytes(Bytes)), 16)). 314: 315: str() -> 316: str(16).