1: %%============================================================================== 2: %% Copyright 2014 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(cluster_commands_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: 20: -import(distributed_helper, [add_node_to_cluster/2, 21: is_sm_distributed/0, 22: mim/0, mim2/0, mim3/0, 23: remove_node_from_cluster/2, 24: require_rpc_nodes/1, 25: rpc/4]). 26: -import(mongooseimctl_helper, [mongooseimctl/3, rpc_call/3]). 27: -import(domain_helper, [host_type/1]). 28: 29: -include_lib("eunit/include/eunit.hrl"). 30: -include_lib("common_test/include/ct.hrl"). 31: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)). 32: -define(ne(A, B), ?assertNot(A == B)). 33: 34: %%-------------------------------------------------------------------- 35: %% Suite configuration 36: %%-------------------------------------------------------------------- 37: 38: all() -> 39: [{group, clustered}, 40: {group, clustering_two}, 41: {group, clustering_three}]. 42: 43: groups() -> 44: [{clustered, [], [one_to_one_message]}, 45: {clustering_two, [], clustering_two_tests()}, 46: {clustering_three, [], clustering_three_tests()}]. 47: 48: suite() -> 49: require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite(). 50: 51: clustering_two_tests() -> 52: [commands_without_args, 53: join_successful_prompt, 54: join_successful_force, 55: leave_successful_prompt, 56: leave_successful_force, 57: join_unsuccessful, 58: leave_unsuccessful, 59: leave_but_no_cluster, 60: join_twice, 61: leave_using_rpc, 62: leave_twice, 63: join_twice_using_rpc, 64: join_twice_in_parallel_using_rpc]. 65: 66: clustering_three_tests() -> 67: [cluster_of_three, 68: leave_the_three, 69: %remove_dead_from_cluster, % TODO: Breaks cover 70: remove_alive_from_cluster]. 71: 72: %%-------------------------------------------------------------------- 73: %% Init & teardown 74: %%-------------------------------------------------------------------- 75: 76: init_per_suite(Config) -> 77: #{node := Node1} = RPCNode1 = mim(), 78: #{node := Node2} = RPCNode2 = mim2(), 79: #{node := Node3} = RPCNode3 = mim3(), 80: Config1 = ejabberd_node_utils:init(RPCNode1, Config), 81: Config2 = ejabberd_node_utils:init(RPCNode2, Config1), 82: Config3 = ejabberd_node_utils:init(RPCNode3, Config2), 83: NodeCtlPath = distributed_helper:ctl_path(Node1, Config3), 84: Node2CtlPath = distributed_helper:ctl_path(Node2, Config3), 85: Node3CtlPath = distributed_helper:ctl_path(Node3, Config3), 86: escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath}, 87: {ctl_path_atom(Node2), Node2CtlPath}, 88: {ctl_path_atom(Node3), Node3CtlPath}] 89: ++ Config3). 90: 91: end_per_suite(Config) -> 92: escalus:end_per_suite(Config). 93: 94: init_per_group(Group, Config) when Group == clustered orelse Group == mnesia -> 95: Node2 = mim2(), 96: Config1 = add_node_to_cluster(Node2, Config), 97: case is_sm_distributed() of 98: true -> 99: escalus:create_users(Config1, escalus:get_users([alice, clusterguy])); 100: {false, Backend} -> 101: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 102: Node2 = mim2(), 103: remove_node_from_cluster(Node2, Config1), 104: {skip, nondistributed_sm} 105: end; 106: 107: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 108: case is_sm_distributed() of 109: true -> 110: ok; 111: {false, Backend} -> 112: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 113: {skip, nondistributed_sm} 114: end; 115: 116: init_per_group(_GroupName, Config) -> 117: escalus:create_users(Config). 118: 119: end_per_group(Group, Config) when Group == clustered orelse Group == mnesia -> 120: escalus:delete_users(Config, escalus:get_users([alice, clusterguy])), 121: Node2 = mim2(), 122: remove_node_from_cluster(Node2, Config); 123: 124: %% Users are gone after mnesia cleaning 125: %% hence there is no need to delete them manually 126: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 127: ok; 128: end_per_group(_GroupName, Config) -> 129: escalus:delete_users(Config). 130: 131: init_per_testcase(CaseName, Config) -> 132: escalus:init_per_testcase(CaseName, Config). 133: 134: end_per_testcase(cluster_of_three, Config) -> 135: Node2 = mim2(), 136: Node3 = mim3(), 137: remove_node_from_cluster(Node2, Config), 138: remove_node_from_cluster(Node3, Config), 139: escalus:end_per_testcase(cluster_of_three, Config); 140: 141: end_per_testcase(CaseName, Config) when CaseName == remove_alive_from_cluster 142: orelse CaseName == remove_dead_from_cluster-> 143: Node3 = mim3(), 144: Node2 = mim2(), 145: remove_node_from_cluster(Node3, Config), 146: remove_node_from_cluster(Node2, Config), 147: escalus:end_per_testcase(CaseName, Config); 148: 149: end_per_testcase(CaseName, Config) when CaseName == join_successful_prompt 150: orelse CaseName == join_successful_force 151: orelse CaseName == leave_unsuccessful_prompt 152: orelse CaseName == leave_unsuccessful_force 153: orelse CaseName == join_twice 154: orelse CaseName == join_twice_using_rpc 155: orelse CaseName == join_twice_in_parallel_using_rpc -> 156: Node2 = mim2(), 157: remove_node_from_cluster(Node2, Config), 158: escalus:end_per_testcase(CaseName, Config); 159: 160: end_per_testcase(CaseName, Config) -> 161: escalus:end_per_testcase(CaseName, Config). 162: 163: %%-------------------------------------------------------------------- 164: %% Message tests 165: %%-------------------------------------------------------------------- 166: 167: one_to_one_message(ConfigIn) -> 168: %% Given Alice connected to node one and ClusterGuy connected to node two 169: Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}], 170: Config = [{mongoose_metrics, Metrics} | ConfigIn], 171: escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) -> 172: %% When Alice sends a message to ClusterGuy 173: Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>), 174: escalus:send(Alice, Msg1), 175: %% Then he receives it 176: Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000), 177: escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1), 178: 179: %% When ClusterGuy sends a response 180: Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>), 181: escalus:send(ClusterGuy, Msg2), 182: %% Then Alice also receives it 183: Stanza2 = escalus:wait_for_stanza(Alice, 5000), 184: escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2) 185: end). 186: 187: %%-------------------------------------------------------------------- 188: %% Manage cluster commands tests 189: %%-------------------------------------------------------------------- 190: 191: commands_without_args(Config) -> 192: {Res1, 1} = mongooseimctl_interactive("join_cluster", [], "yes\n", Config), 193: ?assertMatch({match, _}, re:run(Res1, "This command requires one argument: other node's name")), 194: {Res2, 1} = mongooseimctl_interactive("remove_from_cluster", [], "yes\n",Config), 195: ?assertMatch({match, _}, re:run(Res2, "This command requires one argument: other node's name")). 196: 197: join_successful_prompt(Config) -> 198: %% given 199: #{node := Node2} = RPCSpec2 = mim2(), 200: %% when 201: {_, OpCode} = 202: mongooseimctl_interactive("join_cluster", [atom_to_list(Node2)], "yes\n", Config), 203: %% then 204: distributed_helper:verify_result(RPCSpec2, add), 205: ?eq(0, OpCode). 206: 207: join_successful_force(Config) -> 208: %% given 209: #{node := Node2} = RPCSpec2 = mim2(), 210: %% when 211: {_, OpCode} = mongooseimctl_force("join_cluster", [atom_to_list(Node2)], "--force", Config), 212: %% then 213: distributed_helper:verify_result(RPCSpec2, add), 214: ?eq(0, OpCode). 215: 216: leave_successful_prompt(Config) -> 217: %% given 218: Node2 = mim2(), 219: add_node_to_cluster(Node2, Config), 220: %% when 221: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config), 222: %% then 223: distributed_helper:verify_result(Node2, remove), 224: ?eq(0, OpCode). 225: 226: leave_successful_force(Config) -> 227: %% given 228: Node2 = mim2(), 229: add_node_to_cluster(Node2, Config), 230: %% when 231: {_, OpCode} = mongooseimctl_force("leave_cluster", [], "-f", Config), 232: %% then 233: distributed_helper:verify_result(Node2, remove), 234: ?eq(0, OpCode). 235: 236: join_unsuccessful(Config) -> 237: %% given 238: Node2 = mim2(), 239: %% when 240: {_, OpCode} = mongooseimctl_interactive("join_cluster", [], "no\n", Config), 241: %% then 242: distributed_helper:verify_result(Node2, remove), 243: ?ne(0, OpCode). 244: 245: leave_unsuccessful(Config) -> 246: %% given 247: Node2 = mim(), 248: add_node_to_cluster(Node2, Config), 249: %% when 250: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "no\n", Config), 251: %% then 252: distributed_helper:verify_result(Node2, add), 253: ?ne(0, OpCode). 254: 255: leave_but_no_cluster(Config) -> 256: %% given 257: Node2 = mim2(), 258: %% when 259: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config), 260: %% then 261: distributed_helper:verify_result(Node2, remove), 262: ?ne(0, OpCode). 263: 264: join_twice(Config) -> 265: %% given 266: #{node := Node2} = RPCSpec2 = mim2(), 267: %% when 268: {_, OpCode1} = mongooseimctl_interactive("join_cluster", 269: [atom_to_list(Node2)], "yes\n", Config), 270: {_, OpCode2} = mongooseimctl_interactive("join_cluster", 271: [atom_to_list(Node2)], "yes\n", Config), 272: %% then 273: distributed_helper:verify_result(RPCSpec2, add), 274: ?eq(0, OpCode1), 275: ?ne(0, OpCode2). 276: 277: %% This function checks that it's ok to call mongoose_cluster:join/1 twice 278: join_twice_using_rpc(_Config) -> 279: %% given 280: #{node := Node1} = mim(), 281: RPCSpec2 = mim2(), 282: Timeout = timer:seconds(60), 283: %% when 284: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 285: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 286: %% then 287: distributed_helper:verify_result(RPCSpec2, add), 288: ok. 289: 290: %% Check, that global transaction allows to run only one cluster operation at the time. 291: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail). 292: join_twice_in_parallel_using_rpc(_Config) -> 293: %% given 294: #{node := Node1} = mim(), 295: RPCSpec2 = mim2(), 296: Timeout = timer:seconds(60), 297: %% when 298: Pid1 = proc_lib:spawn_link(fun() -> 299: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 300: end), 301: Pid2 = proc_lib:spawn_link(fun() -> 302: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 303: end), 304: %% then 305: distributed_helper:verify_result(RPCSpec2, add), 306: wait_for_process_to_stop(Pid1, Timeout), 307: wait_for_process_to_stop(Pid2, Timeout), 308: ok. 309: 310: leave_using_rpc(Config) -> 311: %% given 312: Node1 = mim(), 313: Node2 = mim2(), 314: add_node_to_cluster(Node2, Config), 315: %% when 316: Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)}, 317: mongoose_server_api, leave_cluster, []), 318: ct:pal("leave_using_rpc result ~p~n", [Result]), 319: %% then 320: distributed_helper:verify_result(Node2, remove), 321: ok. 322: 323: leave_twice(Config) -> 324: %% given 325: Node2 = mim2(), 326: add_node_to_cluster(Node2, Config), 327: %% when 328: {_, OpCode1} = mongooseimctl_force("leave_cluster", [], "--force", Config), 329: {_, OpCode2} = mongooseimctl_force("leave_cluster", [], "-f", Config), 330: %% then 331: distributed_helper:verify_result(Node2, remove), 332: ?eq(0, OpCode1), 333: ?ne(0, OpCode2). 334: 335: cluster_of_three(Config) -> 336: %% given 337: #{node := ClusterMemberNodeName} = ClusterMember = mim(), 338: #{node := Node2Nodename} = Node2 = mim2(), 339: #{node := Node3Nodename} = Node3 = mim3(), 340: %% when 341: {_, OpCode1} = mongooseimctl_force(Node2Nodename, "join_cluster", 342: [atom_to_list(ClusterMemberNodeName)], "-f", Config), 343: {_, OpCode2} = mongooseimctl_force(Node3Nodename, "join_cluster", 344: [atom_to_list(ClusterMemberNodeName)], "-f", Config), 345: %% then 346: ?eq(0, OpCode1), 347: ?eq(0, OpCode2), 348: nodes_clustered(Node2, ClusterMember, true), 349: nodes_clustered(Node3, ClusterMember, true), 350: nodes_clustered(Node2, Node3, true). 351: 352: leave_the_three(Config) -> 353: %% given 354: Timeout = timer:seconds(60), 355: #{node := ClusterMemberNode} = ClusterMember = mim(), 356: #{node := Node2Nodename} = Node2 = mim2(), 357: #{node := Node3Nodename} = Node3 = mim3(), 358: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]), 359: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]), 360: %% when 361: {_, OpCode1} = mongooseimctl_interactive(Node2Nodename, "leave_cluster", [], "yes\n", Config), 362: nodes_clustered(Node2, ClusterMember, false), 363: nodes_clustered(Node3, ClusterMember, true), 364: {_, OpCode2} = mongooseimctl_interactive(Node3Nodename, "leave_cluster", [], "yes\n", Config), 365: %% then 366: nodes_clustered(Node3, ClusterMember, false), 367: nodes_clustered(Node2, Node3, false), 368: ?eq(0, OpCode1), 369: ?eq(0, OpCode2). 370: 371: remove_dead_from_cluster(Config) -> 372: % given 373: Timeout = timer:seconds(60), 374: #{node := Node1Nodename} = Node1 = mim(), 375: #{node := _Node2Nodename} = Node2 = mim2(), 376: #{node := Node3Nodename} = Node3 = mim3(), 377: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]), 378: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]), 379: %% when 380: distributed_helper:stop_node(Node3Nodename, Config), 381: {_, OpCode1} = mongooseimctl_interactive(Node1, "remove_from_cluster", 382: [atom_to_list(Node3Nodename)], "yes\n", Config), 383: %% then 384: ?eq(0, OpCode1), 385: % node is down hence its not in mnesia cluster 386: have_node_in_mnesia(Node1, Node2, true), 387: have_node_in_mnesia(Node1, Node3, false), 388: have_node_in_mnesia(Node2, Node3, false), 389: % after node awakening nodes are clustered again 390: distributed_helper:start_node(Node3Nodename, Config), 391: have_node_in_mnesia(Node1, Node3, true), 392: have_node_in_mnesia(Node2, Node3, true). 393: 394: remove_alive_from_cluster(Config) -> 395: % given 396: Timeout = timer:seconds(60), 397: #{node := Node1Name} = Node1 = mim(), 398: #{node := Node2Name} = Node2 = mim2(), 399: Node3 = mim3(), 400: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]), 401: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]), 402: %% when 403: %% Node2 is still running 404: {_, OpCode1} = mongooseimctl_force(Node1Name, "remove_from_cluster", 405: [atom_to_list(Node2Name)], "-f", Config), 406: %% then 407: ?eq(0, OpCode1), 408: % node is down hence its not in mnesia cluster 409: have_node_in_mnesia(Node1, Node3, true), 410: have_node_in_mnesia(Node1, Node2, false), 411: have_node_in_mnesia(Node3, Node2, false). 412: 413: 414: 415: %% Helpers 416: mongooseimctl_interactive(C, A, R, Config) -> 417: #{node := DefaultNode} = mim(), 418: mongooseimctl_interactive(DefaultNode, C, A, R, Config). 419: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) -> 420: CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config), 421: run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response). 422: 423: normalize_args(Args) -> 424: lists:map(fun 425: (Arg) when is_binary(Arg) -> 426: binary_to_list(Arg); 427: (Arg) when is_list(Arg) -> 428: Arg 429: end, Args). 430: 431: mongooseimctl_force(Command, Args, ForceFlag, Config) -> 432: #{node := DefaultNode} = mim(), 433: mongooseimctl_force(DefaultNode, Command, Args, ForceFlag, Config). 434: mongooseimctl_force(Node, Cmd, Args, ForceFlag, Config) -> 435: mongooseimctl_helper:mongooseimctl(Node, Cmd, [ForceFlag | Args], Config). 436: 437: ctl_path_atom(NodeName) -> 438: CtlString = atom_to_list(NodeName) ++ "_ctl", 439: list_to_atom(CtlString). 440: 441: %% Long timeout for mnesia and ejabberd app restart 442: run_interactive(Cmd, Response) -> 443: run_interactive(Cmd, Response, timer:seconds(60)). 444: 445: run_interactive(Cmd, Response, Timeout) -> 446: Port = erlang:open_port({spawn, Cmd}, [exit_status]), 447: %% respond to interactive question (yes/no) 448: Port ! {self(), {command, Response}}, 449: mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout). 450: 451: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) -> 452: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 453: DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]), 454: Pairs = [{Node1Name, DbNodes2, ShouldBe}, 455: {Node2Name, DbNodes1, ShouldBe}, 456: {Node1Name, DbNodes1, true}, 457: {Node2Name, DbNodes2, true}], 458: [?assertEqual(ShouldBelong, lists:member(Element, List)) 459: || {Element, List, ShouldBelong} <- Pairs]. 460: 461: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) -> 462: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 463: ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)). 464: 465: wait_for_process_to_stop(Pid, Timeout) -> 466: erlang:monitor(process, Pid), 467: receive 468: {'DOWN', _, process, Pid, _} -> ok 469: after Timeout -> 470: ct:fail(wait_for_process_to_stop_timeout) 471: end.