1: %%============================================================================== 2: %% Copyright 2014 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(cluster_commands_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: 20: -import(distributed_helper, [add_node_to_cluster/2, 21: is_sm_distributed/0, 22: mim/0, mim2/0, mim3/0, 23: remove_node_from_cluster/2, 24: require_rpc_nodes/1, 25: rpc/4]). 26: -import(mongooseimctl_helper, [mongooseimctl/3, rpc_call/3]). 27: -import(domain_helper, [host_type/1]). 28: 29: -include_lib("eunit/include/eunit.hrl"). 30: -include_lib("common_test/include/ct.hrl"). 31: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)). 32: -define(ne(A, B), ?assertNot(A == B)). 33: 34: %%-------------------------------------------------------------------- 35: %% Suite configuration 36: %%-------------------------------------------------------------------- 37: 38: all() -> 39: [{group, clustered}, 40: {group, mnesia}, 41: {group, clustering_two}, 42: {group, clustering_three}]. 43: 44: groups() -> 45: [{clustered, [], [one_to_one_message]}, 46: {clustering_two, [], clustering_two_tests()}, 47: {clustering_three, [], clustering_three_tests()}, 48: {mnesia, [], [set_master_test]}]. 49: 50: suite() -> 51: require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite(). 52: 53: clustering_two_tests() -> 54: [join_successful_prompt, 55: join_successful_force, 56: leave_successful_prompt, 57: leave_successful_force, 58: join_unsuccessful, 59: leave_unsuccessful, 60: leave_but_no_cluster, 61: join_twice, 62: leave_using_rpc, 63: leave_twice, 64: join_twice_using_rpc, 65: join_twice_in_parallel_using_rpc]. 66: 67: clustering_three_tests() -> 68: [cluster_of_three, 69: leave_the_three, 70: %remove_dead_from_cluster, % TODO: Breaks cover 71: remove_alive_from_cluster]. 72: 73: %%-------------------------------------------------------------------- 74: %% Init & teardown 75: %%-------------------------------------------------------------------- 76: 77: init_per_suite(Config) -> 78: #{node := Node1} = RPCNode1 = mim(), 79: #{node := Node2} = RPCNode2 = mim2(), 80: #{node := Node3} = RPCNode3 = mim3(), 81: Config1 = ejabberd_node_utils:init(RPCNode1, Config), 82: Config2 = ejabberd_node_utils:init(RPCNode2, Config1), 83: Config3 = ejabberd_node_utils:init(RPCNode3, Config2), 84: NodeCtlPath = distributed_helper:ctl_path(Node1, Config3), 85: Node2CtlPath = distributed_helper:ctl_path(Node2, Config3), 86: Node3CtlPath = distributed_helper:ctl_path(Node3, Config3), 87: escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath}, 88: {ctl_path_atom(Node2), Node2CtlPath}, 89: {ctl_path_atom(Node3), Node3CtlPath}] 90: ++ Config3). 91: 92: end_per_suite(Config) -> 93: escalus:end_per_suite(Config). 94: 95: init_per_group(Group, Config) when Group == clustered orelse Group == mnesia -> 96: Node2 = mim2(), 97: Config1 = add_node_to_cluster(Node2, Config), 98: case is_sm_distributed() of 99: true -> 100: escalus:create_users(Config1, escalus:get_users([alice, clusterguy])); 101: {false, Backend} -> 102: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 103: Node2 = mim2(), 104: remove_node_from_cluster(Node2, Config1), 105: {skip, nondistributed_sm} 106: end; 107: 108: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 109: case is_sm_distributed() of 110: true -> 111: ok; 112: {false, Backend} -> 113: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 114: {skip, nondistributed_sm} 115: end; 116: 117: init_per_group(_GroupName, Config) -> 118: escalus:create_users(Config). 119: 120: end_per_group(Group, Config) when Group == clustered orelse Group == mnesia -> 121: escalus:delete_users(Config, escalus:get_users([alice, clusterguy])), 122: Node2 = mim2(), 123: remove_node_from_cluster(Node2, Config); 124: 125: %% Users are gone after mnesia cleaning 126: %% hence there is no need to delete them manually 127: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 128: ok; 129: end_per_group(_GroupName, Config) -> 130: escalus:delete_users(Config). 131: 132: init_per_testcase(CaseName, Config) -> 133: escalus:init_per_testcase(CaseName, Config). 134: 135: end_per_testcase(cluster_of_three, Config) -> 136: Node2 = mim2(), 137: Node3 = mim3(), 138: remove_node_from_cluster(Node2, Config), 139: remove_node_from_cluster(Node3, Config), 140: escalus:end_per_testcase(cluster_of_three, Config); 141: 142: end_per_testcase(CaseName, Config) when CaseName == remove_alive_from_cluster 143: orelse CaseName == remove_dead_from_cluster-> 144: Node3 = mim3(), 145: Node2 = mim2(), 146: remove_node_from_cluster(Node3, Config), 147: remove_node_from_cluster(Node2, Config), 148: escalus:end_per_testcase(CaseName, Config); 149: 150: end_per_testcase(CaseName, Config) when CaseName == join_successful_prompt 151: orelse CaseName == join_successful_force 152: orelse CaseName == leave_unsuccessful_prompt 153: orelse CaseName == leave_unsuccessful_force 154: orelse CaseName == join_twice 155: orelse CaseName == join_twice_using_rpc 156: orelse CaseName == join_twice_in_parallel_using_rpc -> 157: Node2 = mim2(), 158: remove_node_from_cluster(Node2, Config), 159: escalus:end_per_testcase(CaseName, Config); 160: 161: end_per_testcase(CaseName, Config) -> 162: escalus:end_per_testcase(CaseName, Config). 163: 164: %%-------------------------------------------------------------------- 165: %% Message tests 166: %%-------------------------------------------------------------------- 167: 168: one_to_one_message(ConfigIn) -> 169: %% Given Alice connected to node one and ClusterGuy connected to node two 170: Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}], 171: Config = [{mongoose_metrics, Metrics} | ConfigIn], 172: escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) -> 173: %% When Alice sends a message to ClusterGuy 174: Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>), 175: escalus:send(Alice, Msg1), 176: %% Then he receives it 177: Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000), 178: escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1), 179: 180: %% When ClusterGuy sends a response 181: Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>), 182: escalus:send(ClusterGuy, Msg2), 183: %% Then Alice also receives it 184: Stanza2 = escalus:wait_for_stanza(Alice, 5000), 185: escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2) 186: end). 187: %%-------------------------------------------------------------------- 188: %% mnesia tests 189: %%-------------------------------------------------------------------- 190: 191: set_master_test(ConfigIn) -> 192: %% To ensure that passwd table exists. 193: %% We also need at least two nodes for set_master to work. 194: catch distributed_helper:rpc(mim(), ejabberd_auth_internal, start, [host_type(mim1)]), 195: catch distributed_helper:rpc(mim2(), ejabberd_auth_internal, start, [host_type(mim2)]), 196: 197: TableName = passwd, 198: NodeList = rpc_call(mnesia, system_info, [running_db_nodes]), 199: mongooseimctl("set_master", ["self"], ConfigIn), 200: [MasterNode] = rpc_call(mnesia, table_info, [TableName, master_nodes]), 201: true = lists:member(MasterNode, NodeList), 202: RestNodesList = lists:delete(MasterNode, NodeList), 203: OtherNode = hd(RestNodesList), 204: mongooseimctl("set_master", [atom_to_list(OtherNode)], ConfigIn), 205: [OtherNode] = rpc_call(mnesia, table_info, [TableName, master_nodes]), 206: mongooseimctl("set_master", ["self"], ConfigIn), 207: [MasterNode] = rpc_call(mnesia, table_info, [TableName, master_nodes]). 208: 209: 210: %%-------------------------------------------------------------------- 211: %% Manage cluster commands tests 212: %%-------------------------------------------------------------------- 213: 214: 215: join_successful_prompt(Config) -> 216: %% given 217: #{node := Node2} = RPCSpec2 = mim2(), 218: %% when 219: {_, OpCode} = 220: mongooseimctl_interactive("join_cluster", [atom_to_list(Node2)], "yes\n", Config), 221: %% then 222: distributed_helper:verify_result(RPCSpec2, add), 223: ?eq(0, OpCode). 224: 225: join_successful_force(Config) -> 226: %% given 227: #{node := Node2} = RPCSpec2 = mim2(), 228: %% when 229: {_, OpCode} = mongooseimctl_force("join_cluster", [atom_to_list(Node2)], "--force", Config), 230: %% then 231: distributed_helper:verify_result(RPCSpec2, add), 232: ?eq(0, OpCode). 233: 234: leave_successful_prompt(Config) -> 235: %% given 236: Node2 = mim2(), 237: add_node_to_cluster(Node2, Config), 238: %% when 239: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config), 240: %% then 241: distributed_helper:verify_result(Node2, remove), 242: ?eq(0, OpCode). 243: 244: leave_successful_force(Config) -> 245: %% given 246: Node2 = mim2(), 247: add_node_to_cluster(Node2, Config), 248: %% when 249: {_, OpCode} = mongooseimctl_force("leave_cluster", [], "-f", Config), 250: %% then 251: distributed_helper:verify_result(Node2, remove), 252: ?eq(0, OpCode). 253: 254: join_unsuccessful(Config) -> 255: %% given 256: Node2 = mim2(), 257: %% when 258: {_, OpCode} = mongooseimctl_interactive("join_cluster", [], "no\n", Config), 259: %% then 260: distributed_helper:verify_result(Node2, remove), 261: ?ne(0, OpCode). 262: 263: leave_unsuccessful(Config) -> 264: %% given 265: Node2 = mim(), 266: add_node_to_cluster(Node2, Config), 267: %% when 268: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "no\n", Config), 269: %% then 270: distributed_helper:verify_result(Node2, add), 271: ?ne(0, OpCode). 272: 273: leave_but_no_cluster(Config) -> 274: %% given 275: Node2 = mim2(), 276: %% when 277: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config), 278: %% then 279: distributed_helper:verify_result(Node2, remove), 280: ?ne(0, OpCode). 281: 282: join_twice(Config) -> 283: %% given 284: #{node := Node2} = RPCSpec2 = mim2(), 285: %% when 286: {_, OpCode1} = mongooseimctl_interactive("join_cluster", 287: [atom_to_list(Node2)], "yes\n", Config), 288: {_, OpCode2} = mongooseimctl_interactive("join_cluster", 289: [atom_to_list(Node2)], "yes\n", Config), 290: %% then 291: distributed_helper:verify_result(RPCSpec2, add), 292: ?eq(0, OpCode1), 293: ?ne(0, OpCode2). 294: 295: %% This function checks that it's ok to call mongoose_cluster:join/1 twice 296: join_twice_using_rpc(_Config) -> 297: %% given 298: #{node := Node1} = mim(), 299: RPCSpec2 = mim2(), 300: Timeout = timer:seconds(60), 301: %% when 302: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 303: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 304: %% then 305: distributed_helper:verify_result(RPCSpec2, add), 306: ok. 307: 308: %% Check, that global transaction allows to run only one cluster operation at the time. 309: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail). 310: join_twice_in_parallel_using_rpc(_Config) -> 311: %% given 312: #{node := Node1} = mim(), 313: RPCSpec2 = mim2(), 314: Timeout = timer:seconds(60), 315: %% when 316: Pid1 = proc_lib:spawn_link(fun() -> 317: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 318: end), 319: Pid2 = proc_lib:spawn_link(fun() -> 320: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 321: end), 322: %% then 323: distributed_helper:verify_result(RPCSpec2, add), 324: wait_for_process_to_stop(Pid1, Timeout), 325: wait_for_process_to_stop(Pid2, Timeout), 326: ok. 327: 328: leave_using_rpc(Config) -> 329: %% given 330: Node1 = mim(), 331: Node2 = mim2(), 332: add_node_to_cluster(Node2, Config), 333: %% when 334: Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)}, 335: ejabberd_admin, leave_cluster, []), 336: ct:pal("leave_using_rpc result ~p~n", [Result]), 337: %% then 338: distributed_helper:verify_result(Node2, remove), 339: ok. 340: 341: leave_twice(Config) -> 342: %% given 343: Node2 = mim2(), 344: add_node_to_cluster(Node2, Config), 345: %% when 346: {_, OpCode1} = mongooseimctl_force("leave_cluster", [], "--force", Config), 347: {_, OpCode2} = mongooseimctl_force("leave_cluster", [], "-f", Config), 348: %% then 349: distributed_helper:verify_result(Node2, remove), 350: ?eq(0, OpCode1), 351: ?ne(0, OpCode2). 352: 353: cluster_of_three(Config) -> 354: %% given 355: #{node := ClusterMemberNodeName} = ClusterMember = mim(), 356: #{node := Node2Nodename} = Node2 = mim2(), 357: #{node := Node3Nodename} = Node3 = mim3(), 358: %% when 359: {_, OpCode1} = mongooseimctl_force(Node2Nodename, "join_cluster", 360: [atom_to_list(ClusterMemberNodeName)], "-f", Config), 361: {_, OpCode2} = mongooseimctl_force(Node3Nodename, "join_cluster", 362: [atom_to_list(ClusterMemberNodeName)], "-f", Config), 363: %% then 364: ?eq(0, OpCode1), 365: ?eq(0, OpCode2), 366: nodes_clustered(Node2, ClusterMember, true), 367: nodes_clustered(Node3, ClusterMember, true), 368: nodes_clustered(Node2, Node3, true). 369: 370: leave_the_three(Config) -> 371: %% given 372: Timeout = timer:seconds(60), 373: #{node := ClusterMemberNode} = ClusterMember = mim(), 374: #{node := Node2Nodename} = Node2 = mim2(), 375: #{node := Node3Nodename} = Node3 = mim3(), 376: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]), 377: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]), 378: %% when 379: {_, OpCode1} = mongooseimctl_interactive(Node2Nodename, "leave_cluster", [], "yes\n", Config), 380: nodes_clustered(Node2, ClusterMember, false), 381: nodes_clustered(Node3, ClusterMember, true), 382: {_, OpCode2} = mongooseimctl_interactive(Node3Nodename, "leave_cluster", [], "yes\n", Config), 383: %% then 384: nodes_clustered(Node3, ClusterMember, false), 385: nodes_clustered(Node2, Node3, false), 386: ?eq(0, OpCode1), 387: ?eq(0, OpCode2). 388: 389: remove_dead_from_cluster(Config) -> 390: % given 391: Timeout = timer:seconds(60), 392: #{node := Node1Nodename} = Node1 = mim(), 393: #{node := _Node2Nodename} = Node2 = mim2(), 394: #{node := Node3Nodename} = Node3 = mim3(), 395: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]), 396: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]), 397: %% when 398: distributed_helper:stop_node(Node3, Config), 399: {_, OpCode1} = mongooseimctl_interactive(Node1, "remove_from_cluster", 400: [atom_to_list(Node3Nodename)], "yes\n", Config), 401: %% then 402: ?eq(0, OpCode1), 403: % node is down hence its not in mnesia cluster 404: have_node_in_mnesia(Node1, Node2, true), 405: have_node_in_mnesia(Node1, Node3, false), 406: have_node_in_mnesia(Node2, Node3, false), 407: % after node awakening nodes are clustered again 408: distributed_helper:start_node(Node3, Config), 409: have_node_in_mnesia(Node1, Node3, true), 410: have_node_in_mnesia(Node2, Node3, true). 411: 412: remove_alive_from_cluster(Config) -> 413: % given 414: Timeout = timer:seconds(60), 415: #{node := Node1Name} = Node1 = mim(), 416: #{node := Node2Name} = Node2 = mim2(), 417: Node3 = mim3(), 418: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]), 419: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]), 420: %% when 421: %% Node2 is still running 422: {_, OpCode1} = mongooseimctl_force(Node1Name, "remove_from_cluster", 423: [atom_to_list(Node2Name)], "-f", Config), 424: %% then 425: ?eq(0, OpCode1), 426: % node is down hence its not in mnesia cluster 427: have_node_in_mnesia(Node1, Node3, true), 428: have_node_in_mnesia(Node1, Node2, false), 429: have_node_in_mnesia(Node3, Node2, false). 430: 431: 432: 433: %% Helpers 434: mongooseimctl_interactive(C, A, R, Config) -> 435: #{node := DefaultNode} = mim(), 436: mongooseimctl_interactive(DefaultNode, C, A, R, Config). 437: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) -> 438: CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config), 439: run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response). 440: 441: normalize_args(Args) -> 442: lists:map(fun 443: (Arg) when is_binary(Arg) -> 444: binary_to_list(Arg); 445: (Arg) when is_list(Arg) -> 446: Arg 447: end, Args). 448: 449: mongooseimctl_force(Command, Args, ForceFlag, Config) -> 450: #{node := DefaultNode} = mim(), 451: mongooseimctl_force(DefaultNode, Command, Args, ForceFlag, Config). 452: mongooseimctl_force(Node, Cmd, Args, ForceFlag, Config) -> 453: mongooseimctl_helper:mongooseimctl(Node, Cmd, [ForceFlag | Args], Config). 454: 455: ctl_path_atom(NodeName) -> 456: CtlString = atom_to_list(NodeName) ++ "_ctl", 457: list_to_atom(CtlString). 458: 459: %% Long timeout for mnesia and ejabberd app restart 460: run_interactive(Cmd, Response) -> 461: run_interactive(Cmd, Response, timer:seconds(60)). 462: 463: run_interactive(Cmd, Response, Timeout) -> 464: Port = erlang:open_port({spawn, Cmd}, [exit_status]), 465: %% respond to interactive question (yes/no) 466: Port ! {self(), {command, Response}}, 467: mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout). 468: 469: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) -> 470: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 471: DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]), 472: Pairs = [{Node1Name, DbNodes2, ShouldBe}, 473: {Node2Name, DbNodes1, ShouldBe}, 474: {Node1Name, DbNodes1, true}, 475: {Node2Name, DbNodes2, true}], 476: [?assertEqual(ShouldBelong, lists:member(Element, List)) 477: || {Element, List, ShouldBelong} <- Pairs]. 478: 479: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) -> 480: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 481: ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)). 482: 483: wait_for_process_to_stop(Pid, Timeout) -> 484: erlang:monitor(process, Pid), 485: receive 486: {'DOWN', _, process, Pid, _} -> ok 487: after Timeout -> 488: ct:fail(wait_for_process_to_stop_timeout) 489: end.