1: %%==============================================================================
    2: %% Copyright 2014 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(cluster_commands_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: 
   20: -import(distributed_helper, [add_node_to_cluster/2,
   21:                              is_sm_distributed/0,
   22:                              mim/0, mim2/0, mim3/0,
   23:                              remove_node_from_cluster/2,
   24:                              require_rpc_nodes/1,
   25:                              rpc/4]).
   26: -import(mongooseimctl_helper, [mongooseimctl/3, rpc_call/3]).
   27: -import(domain_helper, [host_type/1]).
   28: 
   29: -include_lib("eunit/include/eunit.hrl").
   30: -include_lib("common_test/include/ct.hrl").
   31: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)).
   32: -define(ne(A, B), ?assertNot(A == B)).
   33: 
   34: %%--------------------------------------------------------------------
   35: %% Suite configuration
   36: %%--------------------------------------------------------------------
   37: 
   38: all() ->
   39:     [{group, clustered},
   40:      {group, mnesia},
   41:      {group, clustering_two},
   42:      {group, clustering_three}].
   43: 
   44: groups() ->
   45:     [{clustered, [], [one_to_one_message]},
   46:      {clustering_two, [], clustering_two_tests()},
   47:      {clustering_three, [], clustering_three_tests()},
   48:      {mnesia, [], [set_master_test]}].
   49: 
   50: suite() ->
   51:     require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite().
   52: 
   53: clustering_two_tests() ->
   54:     [join_successful_prompt,
   55:      join_successful_force,
   56:      leave_successful_prompt,
   57:      leave_successful_force,
   58:      join_unsuccessful,
   59:      leave_unsuccessful,
   60:      leave_but_no_cluster,
   61:      join_twice,
   62:      leave_using_rpc,
   63:      leave_twice,
   64:      join_twice_using_rpc,
   65:      join_twice_in_parallel_using_rpc].
   66: 
   67: clustering_three_tests() ->
   68:     [cluster_of_three,
   69:      leave_the_three,
   70:      %remove_dead_from_cluster, % TODO: Breaks cover
   71:      remove_alive_from_cluster].
   72: 
   73: %%--------------------------------------------------------------------
   74: %% Init & teardown
   75: %%--------------------------------------------------------------------
   76: 
   77: init_per_suite(Config) ->
   78:     #{node := Node1} = RPCNode1 = mim(),
   79:     #{node := Node2} = RPCNode2 = mim2(),
   80:     #{node := Node3} = RPCNode3 = mim3(),
   81:     Config1 = ejabberd_node_utils:init(RPCNode1, Config),
   82:     Config2 = ejabberd_node_utils:init(RPCNode2, Config1),
   83:     Config3 = ejabberd_node_utils:init(RPCNode3, Config2),
   84:     NodeCtlPath = distributed_helper:ctl_path(Node1, Config3),
   85:     Node2CtlPath = distributed_helper:ctl_path(Node2, Config3),
   86:     Node3CtlPath = distributed_helper:ctl_path(Node3, Config3),
   87:     escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath},
   88:                             {ctl_path_atom(Node2), Node2CtlPath},
   89:                             {ctl_path_atom(Node3), Node3CtlPath}]
   90:                            ++ Config3).
   91: 
   92: end_per_suite(Config) ->
   93:     escalus:end_per_suite(Config).
   94: 
   95: init_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
   96:     Node2 = mim2(),
   97:     Config1 = add_node_to_cluster(Node2, Config),
   98:     case is_sm_distributed() of
   99:         true ->
  100:             escalus:create_users(Config1, escalus:get_users([alice, clusterguy]));
  101:         {false, Backend} ->
  102:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
  103:             Node2 = mim2(),
  104:             remove_node_from_cluster(Node2, Config1),
  105:             {skip, nondistributed_sm}
  106:     end;
  107: 
  108: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  109:     case is_sm_distributed() of
  110:         true ->
  111:             ok;
  112:         {false, Backend} ->
  113:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
  114:             {skip, nondistributed_sm}
  115:     end;
  116: 
  117: init_per_group(_GroupName, Config) ->
  118:     escalus:create_users(Config).
  119: 
  120: end_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
  121:     escalus:delete_users(Config, escalus:get_users([alice, clusterguy])),
  122:     Node2 = mim2(),
  123:     remove_node_from_cluster(Node2, Config);
  124: 
  125: %% Users are gone after mnesia cleaning
  126: %% hence there is no need to delete them manually
  127: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  128:     ok;
  129: end_per_group(_GroupName, Config) ->
  130:     escalus:delete_users(Config).
  131: 
  132: init_per_testcase(CaseName, Config) ->
  133:     escalus:init_per_testcase(CaseName, Config).
  134: 
  135: end_per_testcase(cluster_of_three, Config) ->
  136:     Node2 = mim2(),
  137:     Node3 = mim3(),
  138:     remove_node_from_cluster(Node2, Config),
  139:     remove_node_from_cluster(Node3, Config),
  140:     escalus:end_per_testcase(cluster_of_three, Config);
  141: 
  142: end_per_testcase(CaseName, Config) when CaseName == remove_alive_from_cluster
  143:                                    orelse CaseName == remove_dead_from_cluster->
  144:     Node3 = mim3(),
  145:     Node2 = mim2(),
  146:     remove_node_from_cluster(Node3, Config),
  147:     remove_node_from_cluster(Node2, Config),
  148:     escalus:end_per_testcase(CaseName, Config);
  149: 
  150: end_per_testcase(CaseName, Config) when CaseName == join_successful_prompt
  151:                                    orelse CaseName == join_successful_force
  152:                                    orelse CaseName == leave_unsuccessful_prompt
  153:                                    orelse CaseName == leave_unsuccessful_force
  154:                                    orelse CaseName == join_twice
  155:                                    orelse CaseName == join_twice_using_rpc
  156:                                    orelse CaseName == join_twice_in_parallel_using_rpc ->
  157:     Node2 = mim2(),
  158:     remove_node_from_cluster(Node2, Config),
  159:     escalus:end_per_testcase(CaseName, Config);
  160: 
  161: end_per_testcase(CaseName, Config) ->
  162:     escalus:end_per_testcase(CaseName, Config).
  163: 
  164: %%--------------------------------------------------------------------
  165: %% Message tests
  166: %%--------------------------------------------------------------------
  167: 
  168: one_to_one_message(ConfigIn) ->
  169:     %% Given Alice connected to node one and ClusterGuy connected to node two
  170:     Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}],
  171:     Config = [{mongoose_metrics, Metrics} | ConfigIn],
  172:     escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) ->
  173:         %% When Alice sends a message to ClusterGuy
  174:         Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>),
  175:         escalus:send(Alice, Msg1),
  176:         %% Then he receives it
  177:         Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000),
  178:         escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1),
  179: 
  180:         %% When ClusterGuy sends a response
  181:         Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>),
  182:         escalus:send(ClusterGuy, Msg2),
  183:         %% Then Alice also receives it
  184:         Stanza2 = escalus:wait_for_stanza(Alice, 5000),
  185:         escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2)
  186:     end).
  187: %%--------------------------------------------------------------------
  188: %% mnesia tests
  189: %%--------------------------------------------------------------------
  190: 
  191: set_master_test(ConfigIn) ->
  192:     %% To ensure that passwd table exists.
  193:     %% We also need at least two nodes for set_master to work.
  194:     catch distributed_helper:rpc(mim(), ejabberd_auth_internal, start, [host_type(mim1)]),
  195:     catch distributed_helper:rpc(mim2(), ejabberd_auth_internal, start, [host_type(mim2)]),
  196: 
  197:     TableName = passwd,
  198:     NodeList =  rpc_call(mnesia, system_info, [running_db_nodes]),
  199:     mongooseimctl("set_master", ["self"], ConfigIn),
  200:     [MasterNode] = rpc_call(mnesia, table_info, [TableName, master_nodes]),
  201:     true = lists:member(MasterNode, NodeList),
  202:     RestNodesList = lists:delete(MasterNode, NodeList),
  203:     OtherNode = hd(RestNodesList),
  204:     mongooseimctl("set_master", [atom_to_list(OtherNode)], ConfigIn),
  205:     [OtherNode] = rpc_call(mnesia, table_info, [TableName, master_nodes]),
  206:     mongooseimctl("set_master", ["self"], ConfigIn),
  207:     [MasterNode] = rpc_call(mnesia, table_info, [TableName, master_nodes]).
  208: 
  209: 
  210: %%--------------------------------------------------------------------
  211: %% Manage cluster commands tests
  212: %%--------------------------------------------------------------------
  213: 
  214: 
  215: join_successful_prompt(Config) ->
  216:     %% given
  217:     #{node := Node2} = RPCSpec2 = mim2(),
  218:     %% when
  219:     {_, OpCode} =
  220:         mongooseimctl_interactive("join_cluster", [atom_to_list(Node2)], "yes\n", Config),
  221:     %% then
  222:     distributed_helper:verify_result(RPCSpec2, add),
  223:     ?eq(0, OpCode).
  224: 
  225: join_successful_force(Config) ->
  226:     %% given
  227:     #{node := Node2} = RPCSpec2 = mim2(),
  228:     %% when
  229:     {_, OpCode} = mongooseimctl_force("join_cluster", [atom_to_list(Node2)], "--force", Config),
  230:     %% then
  231:     distributed_helper:verify_result(RPCSpec2, add),
  232:     ?eq(0, OpCode).
  233: 
  234: leave_successful_prompt(Config) ->
  235:     %% given
  236:     Node2 = mim2(),
  237:     add_node_to_cluster(Node2, Config),
  238:     %% when
  239:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config),
  240:     %% then
  241:     distributed_helper:verify_result(Node2, remove),
  242:     ?eq(0, OpCode).
  243: 
  244: leave_successful_force(Config) ->
  245:     %% given
  246:     Node2 = mim2(),
  247:     add_node_to_cluster(Node2, Config),
  248:     %% when
  249:     {_, OpCode} = mongooseimctl_force("leave_cluster", [], "-f", Config),
  250:     %% then
  251:     distributed_helper:verify_result(Node2, remove),
  252:     ?eq(0, OpCode).
  253: 
  254: join_unsuccessful(Config) ->
  255:     %% given
  256:     Node2 = mim2(),
  257:     %% when
  258:     {_, OpCode} = mongooseimctl_interactive("join_cluster", [], "no\n", Config),
  259:     %% then
  260:     distributed_helper:verify_result(Node2, remove),
  261:     ?ne(0, OpCode).
  262: 
  263: leave_unsuccessful(Config) ->
  264:     %% given
  265:     Node2 = mim(),
  266:     add_node_to_cluster(Node2, Config),
  267:     %% when
  268:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "no\n", Config),
  269:     %% then
  270:     distributed_helper:verify_result(Node2, add),
  271:     ?ne(0, OpCode).
  272: 
  273: leave_but_no_cluster(Config) ->
  274:     %% given
  275:     Node2 = mim2(),
  276:     %% when
  277:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config),
  278:     %% then
  279:     distributed_helper:verify_result(Node2, remove),
  280:     ?ne(0, OpCode).
  281: 
  282: join_twice(Config) ->
  283:     %% given
  284:     #{node := Node2} = RPCSpec2 = mim2(),
  285:     %% when
  286:     {_, OpCode1} = mongooseimctl_interactive("join_cluster",
  287:                                              [atom_to_list(Node2)], "yes\n", Config),
  288:     {_, OpCode2} = mongooseimctl_interactive("join_cluster",
  289:                                              [atom_to_list(Node2)], "yes\n", Config),
  290:     %% then
  291:     distributed_helper:verify_result(RPCSpec2, add),
  292:     ?eq(0, OpCode1),
  293:     ?ne(0, OpCode2).
  294: 
  295: %% This function checks that it's ok to call mongoose_cluster:join/1 twice
  296: join_twice_using_rpc(_Config) ->
  297:     %% given
  298:     #{node := Node1} = mim(),
  299:     RPCSpec2 = mim2(),
  300:     Timeout = timer:seconds(60),
  301:     %% when
  302:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  303:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  304:     %% then
  305:     distributed_helper:verify_result(RPCSpec2, add),
  306:     ok.
  307: 
  308: %% Check, that global transaction allows to run only one cluster operation at the time.
  309: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail).
  310: join_twice_in_parallel_using_rpc(_Config) ->
  311:     %% given
  312:     #{node := Node1} = mim(),
  313:     RPCSpec2 = mim2(),
  314:     Timeout = timer:seconds(60),
  315:     %% when
  316:     Pid1 = proc_lib:spawn_link(fun() ->
  317:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  318:         end),
  319:     Pid2 = proc_lib:spawn_link(fun() ->
  320:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  321:         end),
  322:     %% then
  323:     distributed_helper:verify_result(RPCSpec2, add),
  324:     wait_for_process_to_stop(Pid1, Timeout),
  325:     wait_for_process_to_stop(Pid2, Timeout),
  326:     ok.
  327: 
  328: leave_using_rpc(Config) ->
  329:     %% given
  330:     Node1 = mim(),
  331:     Node2 = mim2(),
  332:     add_node_to_cluster(Node2, Config),
  333:     %% when
  334:     Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)},
  335:                                     ejabberd_admin, leave_cluster, []),
  336:     ct:pal("leave_using_rpc result ~p~n", [Result]),
  337:     %% then
  338:     distributed_helper:verify_result(Node2, remove),
  339:     ok.
  340: 
  341: leave_twice(Config) ->
  342:     %% given
  343:     Node2 = mim2(),
  344:     add_node_to_cluster(Node2, Config),
  345:     %% when
  346:     {_, OpCode1} = mongooseimctl_force("leave_cluster", [], "--force", Config),
  347:     {_, OpCode2} = mongooseimctl_force("leave_cluster", [], "-f", Config),
  348:     %% then
  349:     distributed_helper:verify_result(Node2, remove),
  350:     ?eq(0, OpCode1),
  351:     ?ne(0, OpCode2).
  352: 
  353: cluster_of_three(Config) ->
  354:     %% given
  355:     #{node := ClusterMemberNodeName} = ClusterMember = mim(),
  356:     #{node := Node2Nodename} = Node2 = mim2(),
  357:     #{node := Node3Nodename} = Node3 = mim3(),
  358:     %% when
  359:     {_, OpCode1} = mongooseimctl_force(Node2Nodename, "join_cluster",
  360:                                        [atom_to_list(ClusterMemberNodeName)], "-f", Config),
  361:     {_, OpCode2} = mongooseimctl_force(Node3Nodename, "join_cluster",
  362:                                        [atom_to_list(ClusterMemberNodeName)], "-f", Config),
  363:     %% then
  364:     ?eq(0, OpCode1),
  365:     ?eq(0, OpCode2),
  366:     nodes_clustered(Node2, ClusterMember, true),
  367:     nodes_clustered(Node3, ClusterMember, true),
  368:     nodes_clustered(Node2, Node3, true).
  369: 
  370: leave_the_three(Config) ->
  371:     %% given
  372:     Timeout = timer:seconds(60),
  373:     #{node := ClusterMemberNode} = ClusterMember = mim(),
  374:     #{node := Node2Nodename} = Node2 = mim2(),
  375:     #{node := Node3Nodename} = Node3 = mim3(),
  376:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]),
  377:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]),
  378:     %% when
  379:     {_, OpCode1} = mongooseimctl_interactive(Node2Nodename, "leave_cluster", [], "yes\n", Config),
  380:     nodes_clustered(Node2, ClusterMember, false),
  381:     nodes_clustered(Node3, ClusterMember, true),
  382:     {_, OpCode2} = mongooseimctl_interactive(Node3Nodename, "leave_cluster", [], "yes\n", Config),
  383:     %% then
  384:     nodes_clustered(Node3, ClusterMember, false),
  385:     nodes_clustered(Node2, Node3, false),
  386:     ?eq(0, OpCode1),
  387:     ?eq(0, OpCode2).
  388: 
  389: remove_dead_from_cluster(Config) ->
  390:     % given
  391:     Timeout = timer:seconds(60),
  392:     #{node := Node1Nodename} = Node1 = mim(),
  393:     #{node := _Node2Nodename} = Node2 = mim2(),
  394:     #{node := Node3Nodename} = Node3 = mim3(),
  395:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]),
  396:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]),
  397:     %% when
  398:     distributed_helper:stop_node(Node3, Config),
  399:     {_, OpCode1} = mongooseimctl_interactive(Node1, "remove_from_cluster",
  400:                                              [atom_to_list(Node3Nodename)], "yes\n", Config),
  401:     %% then
  402:     ?eq(0, OpCode1),
  403:     % node is down hence its not in mnesia cluster
  404:     have_node_in_mnesia(Node1, Node2, true),
  405:     have_node_in_mnesia(Node1, Node3, false),
  406:     have_node_in_mnesia(Node2, Node3, false),
  407:     % after node awakening nodes are clustered again
  408:     distributed_helper:start_node(Node3, Config),
  409:     have_node_in_mnesia(Node1, Node3, true),
  410:     have_node_in_mnesia(Node2, Node3, true).
  411: 
  412: remove_alive_from_cluster(Config) ->
  413:     % given
  414:     Timeout = timer:seconds(60),
  415:     #{node := Node1Name} = Node1 = mim(),
  416:     #{node := Node2Name} = Node2 = mim2(),
  417:     Node3 = mim3(),
  418:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]),
  419:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]),
  420:     %% when
  421:     %% Node2 is still running
  422:     {_, OpCode1} = mongooseimctl_force(Node1Name, "remove_from_cluster",
  423:                                        [atom_to_list(Node2Name)], "-f", Config),
  424:     %% then
  425:     ?eq(0, OpCode1),
  426:     % node is down hence its not in mnesia cluster
  427:     have_node_in_mnesia(Node1, Node3, true),
  428:     have_node_in_mnesia(Node1, Node2, false),
  429:     have_node_in_mnesia(Node3, Node2, false).
  430: 
  431: 
  432: 
  433: %% Helpers
  434: mongooseimctl_interactive(C, A, R, Config) ->
  435:     #{node := DefaultNode} = mim(),
  436:     mongooseimctl_interactive(DefaultNode, C, A, R, Config).
  437: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) ->
  438:     CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config),
  439:     run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response).
  440: 
  441: normalize_args(Args) ->
  442:     lists:map(fun
  443:                   (Arg) when is_binary(Arg) ->
  444:                       binary_to_list(Arg);
  445:                   (Arg) when is_list(Arg) ->
  446:                       Arg
  447:               end, Args).
  448: 
  449: mongooseimctl_force(Command, Args, ForceFlag, Config) ->
  450:     #{node := DefaultNode} = mim(),
  451:     mongooseimctl_force(DefaultNode, Command, Args, ForceFlag, Config).
  452: mongooseimctl_force(Node, Cmd, Args, ForceFlag, Config) ->
  453:     mongooseimctl_helper:mongooseimctl(Node, Cmd, [ForceFlag | Args], Config).
  454: 
  455: ctl_path_atom(NodeName) ->
  456:     CtlString = atom_to_list(NodeName) ++ "_ctl",
  457:     list_to_atom(CtlString).
  458: 
  459: %% Long timeout for mnesia and ejabberd app restart
  460: run_interactive(Cmd, Response) ->
  461:     run_interactive(Cmd, Response, timer:seconds(60)).
  462: 
  463: run_interactive(Cmd, Response, Timeout) ->
  464:     Port = erlang:open_port({spawn, Cmd}, [exit_status]),
  465:     %% respond to interactive question (yes/no)
  466:     Port ! {self(), {command, Response}},
  467:     mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout).
  468: 
  469: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) ->
  470:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  471:     DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]),
  472:     Pairs = [{Node1Name, DbNodes2, ShouldBe},
  473:         {Node2Name, DbNodes1, ShouldBe},
  474:         {Node1Name, DbNodes1, true},
  475:         {Node2Name, DbNodes2, true}],
  476:     [?assertEqual(ShouldBelong, lists:member(Element, List))
  477:         || {Element, List, ShouldBelong} <- Pairs].
  478: 
  479: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) ->
  480:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  481:     ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)).
  482: 
  483: wait_for_process_to_stop(Pid, Timeout) ->
  484:     erlang:monitor(process, Pid),
  485:     receive
  486:         {'DOWN', _, process, Pid, _} -> ok
  487:     after Timeout ->
  488:             ct:fail(wait_for_process_to_stop_timeout)
  489:     end.