1: %%==============================================================================
    2: %% Copyright 2014 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(cluster_commands_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: 
   20: -import(distributed_helper, [add_node_to_cluster/2,
   21:                              is_sm_distributed/0,
   22:                              mim/0, mim2/0, mim3/0,
   23:                              remove_node_from_cluster/2,
   24:                              require_rpc_nodes/1,
   25:                              rpc/4]).
   26: -import(mongooseimctl_helper, [mongooseimctl/3, rpc_call/3]).
   27: -import(domain_helper, [host_type/1]).
   28: 
   29: -include_lib("eunit/include/eunit.hrl").
   30: -include_lib("common_test/include/ct.hrl").
   31: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)).
   32: -define(ne(A, B), ?assertNot(A == B)).
   33: 
   34: %%--------------------------------------------------------------------
   35: %% Suite configuration
   36: %%--------------------------------------------------------------------
   37: 
   38: all() ->
   39:     [{group, clustered},
   40:      {group, clustering_two},
   41:      {group, clustering_three}].
   42: 
   43: groups() ->
   44:     [{clustered, [], [one_to_one_message]},
   45:      {clustering_two, [], clustering_two_tests()},
   46:      {clustering_three, [], clustering_three_tests()}].
   47: 
   48: suite() ->
   49:     require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite().
   50: 
   51: clustering_two_tests() ->
   52:     [join_successful_prompt,
   53:      join_successful_force,
   54:      leave_successful_prompt,
   55:      leave_successful_force,
   56:      join_unsuccessful,
   57:      leave_unsuccessful,
   58:      leave_but_no_cluster,
   59:      join_twice,
   60:      leave_using_rpc,
   61:      leave_twice,
   62:      join_twice_using_rpc,
   63:      join_twice_in_parallel_using_rpc].
   64: 
   65: clustering_three_tests() ->
   66:     [cluster_of_three,
   67:      leave_the_three,
   68:      %remove_dead_from_cluster, % TODO: Breaks cover
   69:      remove_alive_from_cluster].
   70: 
   71: %%--------------------------------------------------------------------
   72: %% Init & teardown
   73: %%--------------------------------------------------------------------
   74: 
   75: init_per_suite(Config) ->
   76:     #{node := Node1} = RPCNode1 = mim(),
   77:     #{node := Node2} = RPCNode2 = mim2(),
   78:     #{node := Node3} = RPCNode3 = mim3(),
   79:     Config1 = ejabberd_node_utils:init(RPCNode1, Config),
   80:     Config2 = ejabberd_node_utils:init(RPCNode2, Config1),
   81:     Config3 = ejabberd_node_utils:init(RPCNode3, Config2),
   82:     NodeCtlPath = distributed_helper:ctl_path(Node1, Config3),
   83:     Node2CtlPath = distributed_helper:ctl_path(Node2, Config3),
   84:     Node3CtlPath = distributed_helper:ctl_path(Node3, Config3),
   85:     escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath},
   86:                             {ctl_path_atom(Node2), Node2CtlPath},
   87:                             {ctl_path_atom(Node3), Node3CtlPath}]
   88:                            ++ Config3).
   89: 
   90: end_per_suite(Config) ->
   91:     escalus:end_per_suite(Config).
   92: 
   93: init_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
   94:     Node2 = mim2(),
   95:     Config1 = add_node_to_cluster(Node2, Config),
   96:     case is_sm_distributed() of
   97:         true ->
   98:             escalus:create_users(Config1, escalus:get_users([alice, clusterguy]));
   99:         {false, Backend} ->
  100:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
  101:             Node2 = mim2(),
  102:             remove_node_from_cluster(Node2, Config1),
  103:             {skip, nondistributed_sm}
  104:     end;
  105: 
  106: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  107:     case is_sm_distributed() of
  108:         true ->
  109:             ok;
  110:         {false, Backend} ->
  111:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
  112:             {skip, nondistributed_sm}
  113:     end;
  114: 
  115: init_per_group(_GroupName, Config) ->
  116:     escalus:create_users(Config).
  117: 
  118: end_per_group(Group, Config) when Group == clustered orelse Group == mnesia ->
  119:     escalus:delete_users(Config, escalus:get_users([alice, clusterguy])),
  120:     Node2 = mim2(),
  121:     remove_node_from_cluster(Node2, Config);
  122: 
  123: %% Users are gone after mnesia cleaning
  124: %% hence there is no need to delete them manually
  125: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  126:     ok;
  127: end_per_group(_GroupName, Config) ->
  128:     escalus:delete_users(Config).
  129: 
  130: init_per_testcase(CaseName, Config) ->
  131:     escalus:init_per_testcase(CaseName, Config).
  132: 
  133: end_per_testcase(cluster_of_three, Config) ->
  134:     Node2 = mim2(),
  135:     Node3 = mim3(),
  136:     remove_node_from_cluster(Node2, Config),
  137:     remove_node_from_cluster(Node3, Config),
  138:     escalus:end_per_testcase(cluster_of_three, Config);
  139: 
  140: end_per_testcase(CaseName, Config) when CaseName == remove_alive_from_cluster
  141:                                    orelse CaseName == remove_dead_from_cluster->
  142:     Node3 = mim3(),
  143:     Node2 = mim2(),
  144:     remove_node_from_cluster(Node3, Config),
  145:     remove_node_from_cluster(Node2, Config),
  146:     escalus:end_per_testcase(CaseName, Config);
  147: 
  148: end_per_testcase(CaseName, Config) when CaseName == join_successful_prompt
  149:                                    orelse CaseName == join_successful_force
  150:                                    orelse CaseName == leave_unsuccessful_prompt
  151:                                    orelse CaseName == leave_unsuccessful_force
  152:                                    orelse CaseName == join_twice
  153:                                    orelse CaseName == join_twice_using_rpc
  154:                                    orelse CaseName == join_twice_in_parallel_using_rpc ->
  155:     Node2 = mim2(),
  156:     remove_node_from_cluster(Node2, Config),
  157:     escalus:end_per_testcase(CaseName, Config);
  158: 
  159: end_per_testcase(CaseName, Config) ->
  160:     escalus:end_per_testcase(CaseName, Config).
  161: 
  162: %%--------------------------------------------------------------------
  163: %% Message tests
  164: %%--------------------------------------------------------------------
  165: 
  166: one_to_one_message(ConfigIn) ->
  167:     %% Given Alice connected to node one and ClusterGuy connected to node two
  168:     Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}],
  169:     Config = [{mongoose_metrics, Metrics} | ConfigIn],
  170:     escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) ->
  171:         %% When Alice sends a message to ClusterGuy
  172:         Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>),
  173:         escalus:send(Alice, Msg1),
  174:         %% Then he receives it
  175:         Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000),
  176:         escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1),
  177: 
  178:         %% When ClusterGuy sends a response
  179:         Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>),
  180:         escalus:send(ClusterGuy, Msg2),
  181:         %% Then Alice also receives it
  182:         Stanza2 = escalus:wait_for_stanza(Alice, 5000),
  183:         escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2)
  184:     end).
  185: 
  186: %%--------------------------------------------------------------------
  187: %% Manage cluster commands tests
  188: %%--------------------------------------------------------------------
  189: 
  190: 
  191: join_successful_prompt(Config) ->
  192:     %% given
  193:     #{node := Node2} = RPCSpec2 = mim2(),
  194:     %% when
  195:     {_, OpCode} =
  196:         mongooseimctl_interactive("join_cluster", [atom_to_list(Node2)], "yes\n", Config),
  197:     %% then
  198:     distributed_helper:verify_result(RPCSpec2, add),
  199:     ?eq(0, OpCode).
  200: 
  201: join_successful_force(Config) ->
  202:     %% given
  203:     #{node := Node2} = RPCSpec2 = mim2(),
  204:     %% when
  205:     {_, OpCode} = mongooseimctl_force("join_cluster", [atom_to_list(Node2)], "--force", Config),
  206:     %% then
  207:     distributed_helper:verify_result(RPCSpec2, add),
  208:     ?eq(0, OpCode).
  209: 
  210: leave_successful_prompt(Config) ->
  211:     %% given
  212:     Node2 = mim2(),
  213:     add_node_to_cluster(Node2, Config),
  214:     %% when
  215:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config),
  216:     %% then
  217:     distributed_helper:verify_result(Node2, remove),
  218:     ?eq(0, OpCode).
  219: 
  220: leave_successful_force(Config) ->
  221:     %% given
  222:     Node2 = mim2(),
  223:     add_node_to_cluster(Node2, Config),
  224:     %% when
  225:     {_, OpCode} = mongooseimctl_force("leave_cluster", [], "-f", Config),
  226:     %% then
  227:     distributed_helper:verify_result(Node2, remove),
  228:     ?eq(0, OpCode).
  229: 
  230: join_unsuccessful(Config) ->
  231:     %% given
  232:     Node2 = mim2(),
  233:     %% when
  234:     {_, OpCode} = mongooseimctl_interactive("join_cluster", [], "no\n", Config),
  235:     %% then
  236:     distributed_helper:verify_result(Node2, remove),
  237:     ?ne(0, OpCode).
  238: 
  239: leave_unsuccessful(Config) ->
  240:     %% given
  241:     Node2 = mim(),
  242:     add_node_to_cluster(Node2, Config),
  243:     %% when
  244:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "no\n", Config),
  245:     %% then
  246:     distributed_helper:verify_result(Node2, add),
  247:     ?ne(0, OpCode).
  248: 
  249: leave_but_no_cluster(Config) ->
  250:     %% given
  251:     Node2 = mim2(),
  252:     %% when
  253:     {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config),
  254:     %% then
  255:     distributed_helper:verify_result(Node2, remove),
  256:     ?ne(0, OpCode).
  257: 
  258: join_twice(Config) ->
  259:     %% given
  260:     #{node := Node2} = RPCSpec2 = mim2(),
  261:     %% when
  262:     {_, OpCode1} = mongooseimctl_interactive("join_cluster",
  263:                                              [atom_to_list(Node2)], "yes\n", Config),
  264:     {_, OpCode2} = mongooseimctl_interactive("join_cluster",
  265:                                              [atom_to_list(Node2)], "yes\n", Config),
  266:     %% then
  267:     distributed_helper:verify_result(RPCSpec2, add),
  268:     ?eq(0, OpCode1),
  269:     ?ne(0, OpCode2).
  270: 
  271: %% This function checks that it's ok to call mongoose_cluster:join/1 twice
  272: join_twice_using_rpc(_Config) ->
  273:     %% given
  274:     #{node := Node1} = mim(),
  275:     RPCSpec2 = mim2(),
  276:     Timeout = timer:seconds(60),
  277:     %% when
  278:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  279:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  280:     %% then
  281:     distributed_helper:verify_result(RPCSpec2, add),
  282:     ok.
  283: 
  284: %% Check, that global transaction allows to run only one cluster operation at the time.
  285: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail).
  286: join_twice_in_parallel_using_rpc(_Config) ->
  287:     %% given
  288:     #{node := Node1} = mim(),
  289:     RPCSpec2 = mim2(),
  290:     Timeout = timer:seconds(60),
  291:     %% when
  292:     Pid1 = proc_lib:spawn_link(fun() ->
  293:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  294:         end),
  295:     Pid2 = proc_lib:spawn_link(fun() ->
  296:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  297:         end),
  298:     %% then
  299:     distributed_helper:verify_result(RPCSpec2, add),
  300:     wait_for_process_to_stop(Pid1, Timeout),
  301:     wait_for_process_to_stop(Pid2, Timeout),
  302:     ok.
  303: 
  304: leave_using_rpc(Config) ->
  305:     %% given
  306:     Node1 = mim(),
  307:     Node2 = mim2(),
  308:     add_node_to_cluster(Node2, Config),
  309:     %% when
  310:     Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)},
  311:                                     mongoose_server_api, leave_cluster, []),
  312:     ct:pal("leave_using_rpc result ~p~n", [Result]),
  313:     %% then
  314:     distributed_helper:verify_result(Node2, remove),
  315:     ok.
  316: 
  317: leave_twice(Config) ->
  318:     %% given
  319:     Node2 = mim2(),
  320:     add_node_to_cluster(Node2, Config),
  321:     %% when
  322:     {_, OpCode1} = mongooseimctl_force("leave_cluster", [], "--force", Config),
  323:     {_, OpCode2} = mongooseimctl_force("leave_cluster", [], "-f", Config),
  324:     %% then
  325:     distributed_helper:verify_result(Node2, remove),
  326:     ?eq(0, OpCode1),
  327:     ?ne(0, OpCode2).
  328: 
  329: cluster_of_three(Config) ->
  330:     %% given
  331:     #{node := ClusterMemberNodeName} = ClusterMember = mim(),
  332:     #{node := Node2Nodename} = Node2 = mim2(),
  333:     #{node := Node3Nodename} = Node3 = mim3(),
  334:     %% when
  335:     {_, OpCode1} = mongooseimctl_force(Node2Nodename, "join_cluster",
  336:                                        [atom_to_list(ClusterMemberNodeName)], "-f", Config),
  337:     {_, OpCode2} = mongooseimctl_force(Node3Nodename, "join_cluster",
  338:                                        [atom_to_list(ClusterMemberNodeName)], "-f", Config),
  339:     %% then
  340:     ?eq(0, OpCode1),
  341:     ?eq(0, OpCode2),
  342:     nodes_clustered(Node2, ClusterMember, true),
  343:     nodes_clustered(Node3, ClusterMember, true),
  344:     nodes_clustered(Node2, Node3, true).
  345: 
  346: leave_the_three(Config) ->
  347:     %% given
  348:     Timeout = timer:seconds(60),
  349:     #{node := ClusterMemberNode} = ClusterMember = mim(),
  350:     #{node := Node2Nodename} = Node2 = mim2(),
  351:     #{node := Node3Nodename} = Node3 = mim3(),
  352:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]),
  353:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]),
  354:     %% when
  355:     {_, OpCode1} = mongooseimctl_interactive(Node2Nodename, "leave_cluster", [], "yes\n", Config),
  356:     nodes_clustered(Node2, ClusterMember, false),
  357:     nodes_clustered(Node3, ClusterMember, true),
  358:     {_, OpCode2} = mongooseimctl_interactive(Node3Nodename, "leave_cluster", [], "yes\n", Config),
  359:     %% then
  360:     nodes_clustered(Node3, ClusterMember, false),
  361:     nodes_clustered(Node2, Node3, false),
  362:     ?eq(0, OpCode1),
  363:     ?eq(0, OpCode2).
  364: 
  365: remove_dead_from_cluster(Config) ->
  366:     % given
  367:     Timeout = timer:seconds(60),
  368:     #{node := Node1Nodename} = Node1 = mim(),
  369:     #{node := _Node2Nodename} = Node2 = mim2(),
  370:     #{node := Node3Nodename} = Node3 = mim3(),
  371:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]),
  372:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]),
  373:     %% when
  374:     distributed_helper:stop_node(Node3Nodename, Config),
  375:     {_, OpCode1} = mongooseimctl_interactive(Node1, "remove_from_cluster",
  376:                                              [atom_to_list(Node3Nodename)], "yes\n", Config),
  377:     %% then
  378:     ?eq(0, OpCode1),
  379:     % node is down hence its not in mnesia cluster
  380:     have_node_in_mnesia(Node1, Node2, true),
  381:     have_node_in_mnesia(Node1, Node3, false),
  382:     have_node_in_mnesia(Node2, Node3, false),
  383:     % after node awakening nodes are clustered again
  384:     distributed_helper:start_node(Node3Nodename, Config),
  385:     have_node_in_mnesia(Node1, Node3, true),
  386:     have_node_in_mnesia(Node2, Node3, true).
  387: 
  388: remove_alive_from_cluster(Config) ->
  389:     % given
  390:     Timeout = timer:seconds(60),
  391:     #{node := Node1Name} = Node1 = mim(),
  392:     #{node := Node2Name} = Node2 = mim2(),
  393:     Node3 = mim3(),
  394:     ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]),
  395:     ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]),
  396:     %% when
  397:     %% Node2 is still running
  398:     {_, OpCode1} = mongooseimctl_force(Node1Name, "remove_from_cluster",
  399:                                        [atom_to_list(Node2Name)], "-f", Config),
  400:     %% then
  401:     ?eq(0, OpCode1),
  402:     % node is down hence its not in mnesia cluster
  403:     have_node_in_mnesia(Node1, Node3, true),
  404:     have_node_in_mnesia(Node1, Node2, false),
  405:     have_node_in_mnesia(Node3, Node2, false).
  406: 
  407: 
  408: 
  409: %% Helpers
  410: mongooseimctl_interactive(C, A, R, Config) ->
  411:     #{node := DefaultNode} = mim(),
  412:     mongooseimctl_interactive(DefaultNode, C, A, R, Config).
  413: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) ->
  414:     CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config),
  415:     run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response).
  416: 
  417: normalize_args(Args) ->
  418:     lists:map(fun
  419:                   (Arg) when is_binary(Arg) ->
  420:                       binary_to_list(Arg);
  421:                   (Arg) when is_list(Arg) ->
  422:                       Arg
  423:               end, Args).
  424: 
  425: mongooseimctl_force(Command, Args, ForceFlag, Config) ->
  426:     #{node := DefaultNode} = mim(),
  427:     mongooseimctl_force(DefaultNode, Command, Args, ForceFlag, Config).
  428: mongooseimctl_force(Node, Cmd, Args, ForceFlag, Config) ->
  429:     mongooseimctl_helper:mongooseimctl(Node, Cmd, [ForceFlag | Args], Config).
  430: 
  431: ctl_path_atom(NodeName) ->
  432:     CtlString = atom_to_list(NodeName) ++ "_ctl",
  433:     list_to_atom(CtlString).
  434: 
  435: %% Long timeout for mnesia and ejabberd app restart
  436: run_interactive(Cmd, Response) ->
  437:     run_interactive(Cmd, Response, timer:seconds(60)).
  438: 
  439: run_interactive(Cmd, Response, Timeout) ->
  440:     Port = erlang:open_port({spawn, Cmd}, [exit_status]),
  441:     %% respond to interactive question (yes/no)
  442:     Port ! {self(), {command, Response}},
  443:     mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout).
  444: 
  445: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) ->
  446:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  447:     DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]),
  448:     Pairs = [{Node1Name, DbNodes2, ShouldBe},
  449:         {Node2Name, DbNodes1, ShouldBe},
  450:         {Node1Name, DbNodes1, true},
  451:         {Node2Name, DbNodes2, true}],
  452:     [?assertEqual(ShouldBelong, lists:member(Element, List))
  453:         || {Element, List, ShouldBelong} <- Pairs].
  454: 
  455: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) ->
  456:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  457:     ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)).
  458: 
  459: wait_for_process_to_stop(Pid, Timeout) ->
  460:     erlang:monitor(process, Pid),
  461:     receive
  462:         {'DOWN', _, process, Pid, _} -> ok
  463:     after Timeout ->
  464:             ct:fail(wait_for_process_to_stop_timeout)
  465:     end.