1: %%' Copyright © 2012-2017 Erlang Solutions Ltd
    2: %%
    3: %% Licensed under the Apache License, Version 2.0 (the "License");
    4: %% you may not use this file except in compliance with the License.
    5: %% You may obtain a copy of the License at
    6: %%
    7: %% http://www.apache.org/licenses/LICENSE-2.0
    8: %%
    9: %% Unless required by applicable law or agreed to in writing, software
   10: %% distributed under the License is distributed on an "AS IS" BASIS,
   11: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12: %% See the License for the specific language governing permissions and
   13: %% limitations under the License.
   14: %%.
   15: 
   16: -module(mongoose_cassandra_SUITE).
   17: -compile([export_all, nowarn_export_all]).
   18: 
   19: %%
   20: %%' Imports
   21: %%
   22: 
   23: -import(distributed_helper, [mim/0,
   24:                              require_rpc_nodes/1,
   25:                              rpc/4]).
   26: 
   27: %%.
   28: %%' Preprocessor directives
   29: %%
   30: 
   31: -include_lib("common_test/include/ct.hrl").
   32: -include_lib("eunit/include/eunit.hrl").
   33: 
   34: %%.
   35: %%' Configuration
   36: %%
   37: 
   38: -define(TEST_DB_POOL_NAME,          default).
   39: -define(TEST_KEYSPACE_NAME,         "mongooseim").
   40: -define(TEST_TABLE_NAME,            "test_table").
   41: 
   42: %% zazkia TCP proxy API endpoint configuration
   43: %% This endpoint will be used to drop Cassandra connections while running some tests from this suite
   44: %% zazkia TCP proxy is required to run this suite unless group 'not_so_happy_cases' is disabled
   45: -define(TCP_PROXY_API_HOST,         "localhost").
   46: -define(TCP_PROXY_API_PORT,         9191).
   47: -define(TCP_PROXY_SERVICE_NAME,     "cassandra").
   48: 
   49: %%.
   50: %%' Common Test prelude
   51: %%
   52: 
   53: suite() ->
   54:     require_rpc_nodes([mim]) ++ [
   55:         {require, ejabberd_node},
   56:         {require, ejabberd_cookie}
   57:     ].
   58: 
   59: all() ->
   60:     [
   61:         {group, happy_cases},
   62:         {group, not_so_happy_cases}
   63:         %{group, stress_cases} % not intended for CI, uncomment for manual stress test
   64:     ].
   65: 
   66: groups() ->
   67:     G = [
   68:          {happy_cases,          [parallel], happy_cases()},
   69:          {not_so_happy_cases,   [sequence], not_so_happy_cases()},
   70:          {stress_cases,         [sequence], stress_cases()}
   71:         ],
   72:     ct_helper:repeat_all_until_all_ok(G).
   73: 
   74: happy_cases() ->
   75:     [
   76:         test_query,
   77:         empty_write_should_succeed,
   78:         single_row_read_write_should_succeed,
   79:         small_batch_read_write_should_succeed,
   80:         big_batch_read_write_should_succeed,
   81:         big_objects_read_write_should_succeed
   82:     ].
   83: 
   84: not_so_happy_cases() ->
   85:     [
   86:      should_work_after_restart_of_cqerl_cluster,
   87:      should_work_after_connection_reset
   88:     ].
   89: 
   90: stress_cases() ->
   91:     [
   92:      big_batch_read_write_unstable_connection_should_succeed
   93:     ].
   94: 
   95: init_per_suite(Config) ->
   96:     application:ensure_all_started(gun),
   97:     mongoose_helper:inject_module(?MODULE),
   98: 
   99:     case mam_helper:is_cassandra_enabled() of
  100:         true ->
  101:             [{pool, ?TEST_DB_POOL_NAME} | Config];
  102:         false ->
  103:             {skip, no_cassandra_configured}
  104:     end.
  105: 
  106: end_per_suite(Config) ->
  107:     Config.
  108: 
  109: init_per_group(_GroupName, Config) ->
  110:     common_group_config(Config).
  111: 
  112: common_group_config(Config) ->
  113:     Config.
  114: 
  115: end_per_group(_, Config) ->
  116:     Config.
  117: 
  118: init_per_testcase(_CaseName, Config) ->
  119:     Config.
  120: 
  121: end_per_testcase(_CaseName, Config) ->
  122:     Config.
  123: 
  124: %% Queries
  125: 
  126: prepared_queries() ->
  127:     [
  128:         {insert, "INSERT INTO " ?TEST_TABLE_NAME " (f1, f2) VALUES (?, ?)"},
  129:         {select_by_f1, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"},
  130:         {select_by_f2, "SELECT * FROM " ?TEST_TABLE_NAME " WHERE f2 = ? ALLOW FILTERING"},
  131:         {delete, "DELETE FROM " ?TEST_TABLE_NAME " WHERE f1 = ?"}
  132:     ].
  133: 
  134: %%.
  135: %%' Tests
  136: %%
  137: 
  138: sanity_check(Config) ->
  139:     ct:pal("~p sanity check config: ~p", [?MODULE, Config]),
  140:     ok.
  141: 
  142: test_query(_Config) ->
  143:     ?assertMatch(ok, call(test_query, [?TEST_DB_POOL_NAME])).
  144: 
  145: empty_write_should_succeed(Config) ->
  146:     ?assertMatch(ok, cql_write(Config, insert, [])),
  147: 
  148:     ok.
  149: 
  150: single_row_read_write_should_succeed(Config) ->
  151:     Row = #{f1 := F1, f2 := _F2} = #{f1 => int(), f2 => str()},
  152:     Rows = [Row],
  153: 
  154:     ?assertMatch(ok, cql_write(Config, insert, Rows)),
  155:     ?assertMatch({ok, [Row]}, cql_read(Config, select_by_f1, #{f1 => F1})),
  156: 
  157:     ok.
  158: 
  159: small_batch_read_write_should_succeed(Config) ->
  160:     F2 = str(),
  161:     RowCount = 10,
  162:     Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]),
  163: 
  164:     ?assertMatch(ok, cql_write(Config, insert, Rows)),
  165: 
  166:     {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}),
  167:     ?assertMatch({ok, _}, {Result, MaybeRows}),
  168:     ?assertMatch(RowCount, length(MaybeRows)),
  169:     ?assertMatch(Rows, lists:sort(MaybeRows)),
  170: 
  171:     ok.
  172: 
  173: big_batch_read_write_should_succeed(Config) ->
  174:     F2 = str(),
  175:     RowCount = 100,
  176:     Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]),
  177: 
  178:     ?assertMatch(ok, cql_write(Config, insert, Rows)),
  179: 
  180:     {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}),
  181:     ?assertMatch({ok, _}, {Result, MaybeRows}),
  182:     ?assertMatch(RowCount, length(MaybeRows)),
  183:     ?assertMatch(Rows, lists:sort(MaybeRows)),
  184: 
  185:     ok.
  186: 
  187: big_objects_read_write_should_succeed(Config) ->
  188:     F2 = str(1024 * 5),  %% By default batch size limit is 50kB
  189:     RowCount = 20,       %% 20 * (1024  * 5 * 2) is way over this limit
  190:     Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]),
  191: 
  192:     ?assertMatch(ok, cql_write(Config, insert, Rows)),
  193: 
  194:     {Result, MaybeRows} = cql_read(Config, select_by_f2, #{f2 => F2}),
  195:     ?assertMatch({ok, _}, {Result, MaybeRows}),
  196:     ?assertMatch(RowCount, length(MaybeRows)),
  197:     ?assertMatch(Rows, lists:sort(MaybeRows)),
  198: 
  199:     ok.
  200: 
  201: should_work_after_connection_reset(_Config) ->
  202:     reset_all_cassandra_connections(),
  203:     mongoose_helper:wait_until(fun() ->
  204:                                        call(test_query, [?TEST_DB_POOL_NAME])
  205:                                end, ok).
  206: 
  207: should_work_after_restart_of_cqerl_cluster(_Config) ->
  208:     Pid1 = call(erlang, whereis, [cqerl_cluster]),
  209:     call(erlang, exit, [Pid1, kill]),
  210:     mongoose_helper:wait_until(fun() ->
  211:                                        Pid2 = call(erlang, whereis, [cqerl_cluster]),
  212:                                        Pid1 =/= Pid2 andalso Pid2 =/= undefined
  213:                                end, true),
  214:     mongoose_helper:wait_until(fun() ->
  215:                                        call(test_query, [?TEST_DB_POOL_NAME])
  216:                                end, ok).
  217: 
  218: big_batch_read_write_unstable_connection_should_succeed(Config) ->
  219:     Pid = spawn_link(fun reset_all_cassandra_connections_loop/0),
  220:     batch_main(Config, 30, timer:seconds(20)),
  221:     Pid ! exit.
  222: 
  223: %%.
  224: %%' Helpers
  225: %%
  226: 
  227: %% Changing the Interval to 1 second makes some requests fail - cqerl might be improved there
  228: reset_all_cassandra_connections_loop() ->
  229:     reset_all_cassandra_connections_loop(timer:seconds(5)).
  230: 
  231: reset_all_cassandra_connections_loop(Interval) ->
  232:     reset_all_cassandra_connections(),
  233:     receive
  234:         exit ->
  235:             ok
  236:     after Interval ->
  237:         reset_all_cassandra_connections_loop(Interval)
  238:     end.
  239: 
  240: reset_all_cassandra_connections() ->
  241:     {ok, ProxyPid} = gun:open(?TCP_PROXY_API_HOST, ?TCP_PROXY_API_PORT),
  242:     %% More on this proxy's API: https://github.com/emicklei/zazkia
  243:     Ref = gun:post(ProxyPid, <<"/routes/" ?TCP_PROXY_SERVICE_NAME "/links/close">>, []),
  244:     gun:await(ProxyPid, Ref),
  245:     gun:close(ProxyPid).
  246: 
  247: batch_main(Config, RowCount, TimeLeft) when TimeLeft > 0 ->
  248:     StartTime = os:system_time(millisecond),
  249:     F2 = str(),
  250:     Rows = lists:sort([#{f1 => int(), f2 => F2} || _ <- lists:seq(1, RowCount)]),
  251:     ?assertMatch(ok, call_until_no_error(cql_write, Config, insert, Rows, 5)),
  252: 
  253:     {Result, MaybeRows} = call_until_no_error(cql_read, Config, select_by_f2, #{f2 => F2}, 5),
  254:     ?assertMatch({ok, _}, {Result, MaybeRows}),
  255:     ?assertMatch(RowCount, length(MaybeRows)),
  256:     ?assertMatch(Rows, lists:sort(MaybeRows)),
  257: 
  258:     RowsToDelete = [maps:with([f1], Row) || Row <- Rows],
  259:     ?assertMatch(ok, call_until_no_error(cql_write, Config, delete, RowsToDelete, 5)),
  260: 
  261:     EndTime = os:system_time(millisecond),
  262: 
  263:     batch_main(Config, RowCount, TimeLeft - (EndTime - StartTime));
  264: batch_main(_Config, _RowCount, _TimeLeft) ->
  265:     ok.
  266: 
  267: %% The connection is unstable, so the query can fail sometimes.
  268: %% In this case log the error and just repeat the failing query.
  269: call_until_no_error(F, Config, QueryName, Arg, Retries) ->
  270:     try call(F, cql_args(Config, [QueryName, Arg])) of
  271:         {error, Error} when Retries > 0 ->
  272:             ct:pal("Got error ~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]),
  273:             timer:sleep(timer:seconds(1)),
  274:             call_until_no_error(F, Config, QueryName, Arg, Retries - 1);
  275:         Result ->
  276:             Result
  277:     catch
  278:         error:{badrpc, _} = Error when Retries > 0 ->
  279:             ct:pal("Got exception error:~p from ~p(~p, ~p)", [Error, F, QueryName, Arg]),
  280:             timer:sleep(timer:seconds(1)),
  281:             call_until_no_error(F, Config, QueryName, Arg, Retries - 1)
  282:     end.
  283: 
  284: getenv_or_fail(EnvVar) ->
  285:     case os:getenv(EnvVar) of
  286:         false -> error("environment variable " ++ EnvVar ++ "not defined");
  287:         Value -> Value
  288:     end.
  289: 
  290: call(F, A) ->
  291:     call(mongoose_cassandra, F, A).
  292: 
  293: call(M, F, A) ->
  294:     RPCSpec = mim(),
  295:     rpc(RPCSpec#{timeout => timer:minutes(1)}, M, F, A).
  296: 
  297: cql_args(Config, Args) ->
  298:     [?config(pool, Config), undefined, ?MODULE | Args].
  299: 
  300: cql_write(Config, QueryName, Rows) ->
  301:     call(cql_write, cql_args(Config, [QueryName, Rows])).
  302: 
  303: cql_read(Config, QueryName, Params) ->
  304:     call(cql_read, cql_args(Config, [QueryName, Params])).
  305: 
  306: int() ->
  307:     rand:uniform(999999999).
  308: 
  309: str(Bytes) ->
  310:     erlang:list_to_binary(
  311:         erlang:integer_to_list(
  312:             binary:decode_unsigned(
  313:                 crypto:strong_rand_bytes(Bytes)), 16)).
  314: 
  315: str() ->
  316:     str(16).