1: %%==============================================================================
    2: %% Copyright 2014 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(cluster_commands_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: 
   20: -import(distributed_helper, [add_node_to_cluster/2,
   21:                              is_sm_distributed/0,
   22:                              mim/0, mim2/0, mim3/0,
   23:                              remove_node_from_cluster/2,
   24:                              require_rpc_nodes/1,
   25:                              rpc/4]).
   26: -import(mongooseimctl_helper, [mongooseimctl/3, rpc_call/3]).
   27: -import(domain_helper, [host_type/1]).
   28: 
   29: -include_lib("eunit/include/eunit.hrl").
   30: -include_lib("common_test/include/ct.hrl").
   31: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)).
   32: -define(ne(A, B), ?assertNot(A == B)).
   33: 
   34: %%--------------------------------------------------------------------
   35: %% Suite configuration
   36: %%--------------------------------------------------------------------
   37: 
   38: all() ->
   39:     [{group, clustered},
   40:      {group, clustering_two},
   41:      {group, clustering_three}].
   42: 
   43: groups() ->
   44:     [{clustered, [], [one_to_one_message]},
   45:      {clustering_two, [], clustering_two_tests()},
   46:      {clustering_three, [], clustering_three_tests()}].
   47: 
   48: suite() ->
   49:     require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite().
   50: 
   51: clustering_two_tests() ->
   52:     [commands_without_args,
   53:      join_successful_prompt,
   54:      join_successful_force,
   55:      leave_successful_prompt,
   56:      leave_successful_force,
   57:      join_unsuccessful,
   58:      leave_unsuccessful,
   59:      leave_but_no_cluster,
   60:      join_twice,
   61:      leave_using_rpc,
   62:      leave_twice,
   63:      join_twice_using_rpc,
   64:      join_twice_in_parallel_using_rpc].
   65: 
   66: clustering_three_tests() ->
   67:     [cluster_of_three,
   68:      leave_the_three,
   69:      %remove_dead_from_cluster, % TODO: Breaks cover
   70:      remove_alive_from_cluster].
   71: 
   72: %%--------------------------------------------------------------------
   73: %% Init & teardown
   74: %%--------------------------------------------------------------------
   75: 
   76: init_per_suite(Config) ->
   77:     #{node := Node1} = RPCNode1 = mim(),
   78:     #{node := Node2} = RPCNode2 = mim2(),
   79:     #{node := Node3} = RPCNode3 = mim3(),
   80:     Config1 = ejabberd_node_utils:init(RPCNode1, Config),
   81:     Config2 = ejabberd_node_utils:init(RPCNode2, Config1),
   82:     Config3 = ejabberd_node_utils:init(RPCNode3, Config2),
   83:     NodeCtlPath = distributed_helper:ctl_path(Node1, Config3),
   84:     Node2CtlPath = distributed_helper:ctl_path(Node2, Config3),
   85:     Node3CtlPath = distributed_helper:ctl_path(Node3, Config3),
   86:     escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath},
   87:                             {ctl_path_atom(Node2), Node2CtlPath},
   88:                             {ctl_path_atom(Node3), Node3CtlPath}]
   89:                            ++ Config3).
   90: 
   91: end_per_suite(Config) ->
   92:     escalus:end_per_suite(Config).
   93: 
   94: init_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
   95:     Node2 = mim2(),
   96:     Config1 = add_node_to_cluster(Node2, Config),
   97:     case is_sm_distributed() of
   98:         true ->
   99:             escalus:create_users(Config1, escalus:get_users([alice, clusterguy]));
  100:         {false, Backend} ->
  101:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
  102:             Node2 = mim2(),
  103:             remove_node_from_cluster(Node2, Config1),
  104:             {skip, nondistributed_sm}
  105:     end;
  106: 
  107: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  108:     case is_sm_distributed() of
  109:         true ->
  110:             ok;
  111:         {false, Backend} ->
  112:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
  113:             {skip, nondistributed_sm}
  114:     end;
  115: 
  116: init_per_group(_GroupName, Config) ->
  117:     escalus:create_users(Config).
  118: 
  119: end_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
  120:     escalus:delete_users(Config, escalus:get_users([alice, clusterguy])),
  121:     Node2 = mim2(),
  122:     remove_node_from_cluster(Node2, Config);
  123: 
  124: %% Users are gone after mnesia cleaning
  125: %% hence there is no need to delete them manually
  126: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  127:     ok;
  128: end_per_group(_GroupName, Config) ->
  129:     escalus:delete_users(Config).
  130: 
  131: init_per_testcase(CaseName, Config) ->
  132:     escalus:init_per_testcase(CaseName, Config).
  133: 
  134: end_per_testcase(cluster_of_three, Config) ->
  135:     Node2 = mim2(),
  136:     Node3 = mim3(),
  137:     remove_node_from_cluster(Node2, Config),
  138:     remove_node_from_cluster(Node3, Config),
  139:     escalus:end_per_testcase(cluster_of_three, Config);
  140: 
  141: end_per_testcase(CaseName, Config) when CaseName == remove_alive_from_cluster
  142:                                    orelse CaseName == remove_dead_from_cluster->
  143:     Node3 = mim3(),
  144:     Node2 = mim2(),
  145:     remove_node_from_cluster(Node3, Config),
  146:     remove_node_from_cluster(Node2, Config),
  147:     escalus:end_per_testcase(CaseName, Config);
  148: 
  149: end_per_testcase(CaseName, Config) when CaseName == join_successful_prompt
  150:                                    orelse CaseName == join_successful_force
  151:                                    orelse CaseName == leave_unsuccessful_prompt
  152:                                    orelse CaseName == leave_unsuccessful_force
  153:                                    orelse CaseName == join_twice
  154:                                    orelse CaseName == join_twice_using_rpc
  155:                                    orelse CaseName == join_twice_in_parallel_using_rpc ->
  156:     Node2 = mim2(),
  157:     remove_node_from_cluster(Node2, Config),
  158:     escalus:end_per_testcase(CaseName, Config);
  159: 
  160: end_per_testcase(CaseName, Config) ->
  161:     escalus:end_per_testcase(CaseName, Config).
  162: 
  163: %%--------------------------------------------------------------------
  164: %% Message tests
  165: %%--------------------------------------------------------------------
  166: 
  167: one_to_one_message(ConfigIn) ->
  168:     %% Given Alice connected to node one and ClusterGuy connected to node two
  169:     Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}],
  170:     Config = [{mongoose_metrics, Metrics} | ConfigIn],
  171:     escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) ->
  172:         %% When Alice sends a message to ClusterGuy
  173:         Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>),
  174:         escalus:send(Alice, Msg1),
  175:         %% Then he receives it
  176:         Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000),
  177:         escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1),
  178: 
  179:         %% When ClusterGuy sends a response
  180:         Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>),
  181:         escalus:send(ClusterGuy, Msg2),
  182:         %% Then Alice also receives it
  183:         Stanza2 = escalus:wait_for_stanza(Alice, 5000),
  184:         escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2)
  185:     end).
  186: 
  187: %%--------------------------------------------------------------------
  188: %% Manage cluster commands tests
  189: %%--------------------------------------------------------------------
  190: 
  191: commands_without_args(Config) ->
  192:     {Res1, 1} = mongooseimctl_interactive("join_cluster", [], "yes\n", Config),
  193:     ?assertMatch({match, _}, re:run(Res1, "This command requires one argument: other node's name")),
  194:     {Res2, 1} = mongooseimctl_interactive("remove_from_cluster", [], "yes\n",Config),
  195:     ?assertMatch({match, _}, re:run(Res2, "This command requires one argument: other node's name")).
  196: 
  197: join_successful_prompt(Config) ->
  198:     %% given
  199:     #{node := Node2} = RPCSpec2 = mim2(),
  200:     %% when
  201:     {_, OpCode} =
  202:         mongooseimctl_interactive("join_cluster", [atom_to_list(Node2)], "yes\n", Config),
  203:     %% then
  204:     distributed_helper:verify_result(RPCSpec2, add),
  205:     ?eq(0, OpCode).
  206: 
  207: join_successful_force(Config) ->
  208:     %% given
  209:     #{node := Node2} = RPCSpec2 = mim2(),
  210:     %% when
  211:     {_, OpCode} = mongooseimctl_force("join_cluster", [atom_to_list(Node2)], "--force", Config),
  212:     %% then
  213:     distributed_helper:verify_result(RPCSpec2, add),
  214:     ?eq(0, OpCode).
  215: 
  216: leave_successful_prompt(Config) ->
  217:     %% given
  218:     Node2 = mim2(),
  219:     add_node_to_cluster(Node2, Config),
  220:     %% when
  221:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config),
  222:     %% then
  223:     distributed_helper:verify_result(Node2, remove),
  224:     ?eq(0, OpCode).
  225: 
  226: leave_successful_force(Config) ->
  227:     %% given
  228:     Node2 = mim2(),
  229:     add_node_to_cluster(Node2, Config),
  230:     %% when
  231:     {_, OpCode} = mongooseimctl_force("leave_cluster", [], "-f", Config),
  232:     %% then
  233:     distributed_helper:verify_result(Node2, remove),
  234:     ?eq(0, OpCode).
  235: 
  236: join_unsuccessful(Config) ->
  237:     %% given
  238:     Node2 = mim2(),
  239:     %% when
  240:     {_, OpCode} = mongooseimctl_interactive("join_cluster", [], "no\n", Config),
  241:     %% then
  242:     distributed_helper:verify_result(Node2, remove),
  243:     ?ne(0, OpCode).
  244: 
  245: leave_unsuccessful(Config) ->
  246:     %% given
  247:     Node2 = mim(),
  248:     add_node_to_cluster(Node2, Config),
  249:     %% when
  250:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "no\n", Config),
  251:     %% then
  252:     distributed_helper:verify_result(Node2, add),
  253:     ?ne(0, OpCode).
  254: 
  255: leave_but_no_cluster(Config) ->
  256:     %% given
  257:     Node2 = mim2(),
  258:     %% when
  259:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config),
  260:     %% then
  261:     distributed_helper:verify_result(Node2, remove),
  262:     ?ne(0, OpCode).
  263: 
  264: join_twice(Config) ->
  265:     %% given
  266:     #{node := Node2} = RPCSpec2 = mim2(),
  267:     %% when
  268:     {_, OpCode1} = mongooseimctl_interactive("join_cluster",
  269:                                              [atom_to_list(Node2)], "yes\n", Config),
  270:     {_, OpCode2} = mongooseimctl_interactive("join_cluster",
  271:                                              [atom_to_list(Node2)], "yes\n", Config),
  272:     %% then
  273:     distributed_helper:verify_result(RPCSpec2, add),
  274:     ?eq(0, OpCode1),
  275:     ?ne(0, OpCode2).
  276: 
  277: %% This function checks that it's ok to call mongoose_cluster:join/1 twice
  278: join_twice_using_rpc(_Config) ->
  279:     %% given
  280:     #{node := Node1} = mim(),
  281:     RPCSpec2 = mim2(),
  282:     Timeout = timer:seconds(60),
  283:     %% when
  284:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  285:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  286:     %% then
  287:     distributed_helper:verify_result(RPCSpec2, add),
  288:     ok.
  289: 
  290: %% Check, that global transaction allows to run only one cluster operation at the time.
  291: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail).
  292: join_twice_in_parallel_using_rpc(_Config) ->
  293:     %% given
  294:     #{node := Node1} = mim(),
  295:     RPCSpec2 = mim2(),
  296:     Timeout = timer:seconds(60),
  297:     %% when
  298:     Pid1 = proc_lib:spawn_link(fun() ->
  299:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  300:         end),
  301:     Pid2 = proc_lib:spawn_link(fun() ->
  302:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  303:         end),
  304:     %% then
  305:     distributed_helper:verify_result(RPCSpec2, add),
  306:     wait_for_process_to_stop(Pid1, Timeout),
  307:     wait_for_process_to_stop(Pid2, Timeout),
  308:     ok.
  309: 
  310: leave_using_rpc(Config) ->
  311:     %% given
  312:     Node1 = mim(),
  313:     Node2 = mim2(),
  314:     add_node_to_cluster(Node2, Config),
  315:     %% when
  316:     Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)},
  317:                                     mongoose_server_api, leave_cluster, []),
  318:     ct:pal("leave_using_rpc result ~p~n", [Result]),
  319:     %% then
  320:     distributed_helper:verify_result(Node2, remove),
  321:     ok.
  322: 
  323: leave_twice(Config) ->
  324:     %% given
  325:     Node2 = mim2(),
  326:     add_node_to_cluster(Node2, Config),
  327:     %% when
  328:     {_, OpCode1} = mongooseimctl_force("leave_cluster", [], "--force", Config),
  329:     {_, OpCode2} = mongooseimctl_force("leave_cluster", [], "-f", Config),
  330:     %% then
  331:     distributed_helper:verify_result(Node2, remove),
  332:     ?eq(0, OpCode1),
  333:     ?ne(0, OpCode2).
  334: 
  335: cluster_of_three(Config) ->
  336:     %% given
  337:     #{node := ClusterMemberNodeName} = ClusterMember = mim(),
  338:     #{node := Node2Nodename} = Node2 = mim2(),
  339:     #{node := Node3Nodename} = Node3 = mim3(),
  340:     %% when
  341:     {_, OpCode1} = mongooseimctl_force(Node2Nodename, "join_cluster",
  342:                                        [atom_to_list(ClusterMemberNodeName)], "-f", Config),
  343:     {_, OpCode2} = mongooseimctl_force(Node3Nodename, "join_cluster",
  344:                                        [atom_to_list(ClusterMemberNodeName)], "-f", Config),
  345:     %% then
  346:     ?eq(0, OpCode1),
  347:     ?eq(0, OpCode2),
  348:     nodes_clustered(Node2, ClusterMember, true),
  349:     nodes_clustered(Node3, ClusterMember, true),
  350:     nodes_clustered(Node2, Node3, true).
  351: 
  352: leave_the_three(Config) ->
  353:     %% given
  354:     Timeout = timer:seconds(60),
  355:     #{node := ClusterMemberNode} = ClusterMember = mim(),
  356:     #{node := Node2Nodename} = Node2 = mim2(),
  357:     #{node := Node3Nodename} = Node3 = mim3(),
  358:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]),
  359:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]),
  360:     %% when
  361:     {_, OpCode1} = mongooseimctl_interactive(Node2Nodename, "leave_cluster", [], "yes\n", Config),
  362:     nodes_clustered(Node2, ClusterMember, false),
  363:     nodes_clustered(Node3, ClusterMember, true),
  364:     {_, OpCode2} = mongooseimctl_interactive(Node3Nodename, "leave_cluster", [], "yes\n", Config),
  365:     %% then
  366:     nodes_clustered(Node3, ClusterMember, false),
  367:     nodes_clustered(Node2, Node3, false),
  368:     ?eq(0, OpCode1),
  369:     ?eq(0, OpCode2).
  370: 
  371: remove_dead_from_cluster(Config) ->
  372:     % given
  373:     Timeout = timer:seconds(60),
  374:     #{node := Node1Nodename} = Node1 = mim(),
  375:     #{node := _Node2Nodename} = Node2 = mim2(),
  376:     #{node := Node3Nodename} = Node3 = mim3(),
  377:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]),
  378:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]),
  379:     %% when
  380:     distributed_helper:stop_node(Node3Nodename, Config),
  381:     {_, OpCode1} = mongooseimctl_interactive(Node1, "remove_from_cluster",
  382:                                              [atom_to_list(Node3Nodename)], "yes\n", Config),
  383:     %% then
  384:     ?eq(0, OpCode1),
  385:     % node is down hence its not in mnesia cluster
  386:     have_node_in_mnesia(Node1, Node2, true),
  387:     have_node_in_mnesia(Node1, Node3, false),
  388:     have_node_in_mnesia(Node2, Node3, false),
  389:     % after node awakening nodes are clustered again
  390:     distributed_helper:start_node(Node3Nodename, Config),
  391:     have_node_in_mnesia(Node1, Node3, true),
  392:     have_node_in_mnesia(Node2, Node3, true).
  393: 
  394: remove_alive_from_cluster(Config) ->
  395:     % given
  396:     Timeout = timer:seconds(60),
  397:     #{node := Node1Name} = Node1 = mim(),
  398:     #{node := Node2Name} = Node2 = mim2(),
  399:     Node3 = mim3(),
  400:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]),
  401:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]),
  402:     %% when
  403:     %% Node2 is still running
  404:     {_, OpCode1} = mongooseimctl_force(Node1Name, "remove_from_cluster",
  405:                                        [atom_to_list(Node2Name)], "-f", Config),
  406:     %% then
  407:     ?eq(0, OpCode1),
  408:     % node is down hence its not in mnesia cluster
  409:     have_node_in_mnesia(Node1, Node3, true),
  410:     have_node_in_mnesia(Node1, Node2, false),
  411:     have_node_in_mnesia(Node3, Node2, false).
  412: 
  413: 
  414: 
  415: %% Helpers
  416: mongooseimctl_interactive(C, A, R, Config) ->
  417:     #{node := DefaultNode} = mim(),
  418:     mongooseimctl_interactive(DefaultNode, C, A, R, Config).
  419: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) ->
  420:     CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config),
  421:     run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response).
  422: 
  423: normalize_args(Args) ->
  424:     lists:map(fun
  425:                   (Arg) when is_binary(Arg) ->
  426:                       binary_to_list(Arg);
  427:                   (Arg) when is_list(Arg) ->
  428:                       Arg
  429:               end, Args).
  430: 
  431: mongooseimctl_force(Command, Args, ForceFlag, Config) ->
  432:     #{node := DefaultNode} = mim(),
  433:     mongooseimctl_force(DefaultNode, Command, Args, ForceFlag, Config).
  434: mongooseimctl_force(Node, Cmd, Args, ForceFlag, Config) ->
  435:     mongooseimctl_helper:mongooseimctl(Node, Cmd, [ForceFlag | Args], Config).
  436: 
  437: ctl_path_atom(NodeName) ->
  438:     CtlString = atom_to_list(NodeName) ++ "_ctl",
  439:     list_to_atom(CtlString).
  440: 
  441: %% Long timeout for mnesia and ejabberd app restart
  442: run_interactive(Cmd, Response) ->
  443:     run_interactive(Cmd, Response, timer:seconds(60)).
  444: 
  445: run_interactive(Cmd, Response, Timeout) ->
  446:     Port = erlang:open_port({spawn, Cmd}, [exit_status]),
  447:     %% respond to interactive question (yes/no)
  448:     Port ! {self(), {command, Response}},
  449:     mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout).
  450: 
  451: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) ->
  452:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  453:     DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]),
  454:     Pairs = [{Node1Name, DbNodes2, ShouldBe},
  455:         {Node2Name, DbNodes1, ShouldBe},
  456:         {Node1Name, DbNodes1, true},
  457:         {Node2Name, DbNodes2, true}],
  458:     [?assertEqual(ShouldBelong, lists:member(Element, List))
  459:         || {Element, List, ShouldBelong} <- Pairs].
  460: 
  461: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) ->
  462:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  463:     ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)).
  464: 
  465: wait_for_process_to_stop(Pid, Timeout) ->
  466:     erlang:monitor(process, Pid),
  467:     receive
  468:         {'DOWN', _, process, Pid, _} -> ok
  469:     after Timeout ->
  470:             ct:fail(wait_for_process_to_stop_timeout)
  471:     end.