1: %%============================================================================== 2: %% Copyright 2014 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(cluster_commands_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: 20: -import(distributed_helper, [add_node_to_cluster/2, 21: is_sm_distributed/0, 22: mim/0, mim2/0, mim3/0, 23: remove_node_from_cluster/2, 24: require_rpc_nodes/1, 25: rpc/4]). 26: -import(mongooseimctl_helper, [mongooseimctl/3, rpc_call/3]). 27: -import(domain_helper, [host_type/1]). 28: 29: -include_lib("eunit/include/eunit.hrl"). 30: -include_lib("common_test/include/ct.hrl"). 31: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)). 32: -define(ne(A, B), ?assertNot(A == B)). 33: 34: %%-------------------------------------------------------------------- 35: %% Suite configuration 36: %%-------------------------------------------------------------------- 37: 38: all() -> 39: [{group, clustered}, 40: {group, clustering_two}, 41: {group, clustering_three}]. 42: 43: groups() -> 44: [{clustered, [], [one_to_one_message]}, 45: {clustering_two, [], clustering_two_tests()}, 46: {clustering_three, [], clustering_three_tests()}]. 47: 48: suite() -> 49: require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite(). 50: 51: clustering_two_tests() -> 52: [join_successful_prompt, 53: join_successful_force, 54: leave_successful_prompt, 55: leave_successful_force, 56: join_unsuccessful, 57: leave_unsuccessful, 58: leave_but_no_cluster, 59: join_twice, 60: leave_using_rpc, 61: leave_twice, 62: join_twice_using_rpc, 63: join_twice_in_parallel_using_rpc]. 64: 65: clustering_three_tests() -> 66: [cluster_of_three, 67: leave_the_three, 68: %remove_dead_from_cluster, % TODO: Breaks cover 69: remove_alive_from_cluster]. 70: 71: %%-------------------------------------------------------------------- 72: %% Init & teardown 73: %%-------------------------------------------------------------------- 74: 75: init_per_suite(Config) -> 76: #{node := Node1} = RPCNode1 = mim(), 77: #{node := Node2} = RPCNode2 = mim2(), 78: #{node := Node3} = RPCNode3 = mim3(), 79: Config1 = ejabberd_node_utils:init(RPCNode1, Config), 80: Config2 = ejabberd_node_utils:init(RPCNode2, Config1), 81: Config3 = ejabberd_node_utils:init(RPCNode3, Config2), 82: NodeCtlPath = distributed_helper:ctl_path(Node1, Config3), 83: Node2CtlPath = distributed_helper:ctl_path(Node2, Config3), 84: Node3CtlPath = distributed_helper:ctl_path(Node3, Config3), 85: escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath}, 86: {ctl_path_atom(Node2), Node2CtlPath}, 87: {ctl_path_atom(Node3), Node3CtlPath}] 88: ++ Config3). 89: 90: end_per_suite(Config) -> 91: escalus:end_per_suite(Config). 92: 93: init_per_group(Group, Config) when Group == clustered orelse Group == mnesia -> 94: Node2 = mim2(), 95: Config1 = add_node_to_cluster(Node2, Config), 96: case is_sm_distributed() of 97: true -> 98: escalus:create_users(Config1, escalus:get_users([alice, clusterguy])); 99: {false, Backend} -> 100: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 101: Node2 = mim2(), 102: remove_node_from_cluster(Node2, Config1), 103: {skip, nondistributed_sm} 104: end; 105: 106: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 107: case is_sm_distributed() of 108: true -> 109: ok; 110: {false, Backend} -> 111: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 112: {skip, nondistributed_sm} 113: end; 114: 115: init_per_group(_GroupName, Config) -> 116: escalus:create_users(Config). 117: 118: end_per_group(Group, Config) when Group == clustered orelse Group == mnesia -> 119: escalus:delete_users(Config, escalus:get_users([alice, clusterguy])), 120: Node2 = mim2(), 121: remove_node_from_cluster(Node2, Config); 122: 123: %% Users are gone after mnesia cleaning 124: %% hence there is no need to delete them manually 125: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 126: ok; 127: end_per_group(_GroupName, Config) -> 128: escalus:delete_users(Config). 129: 130: init_per_testcase(CaseName, Config) -> 131: escalus:init_per_testcase(CaseName, Config). 132: 133: end_per_testcase(cluster_of_three, Config) -> 134: Node2 = mim2(), 135: Node3 = mim3(), 136: remove_node_from_cluster(Node2, Config), 137: remove_node_from_cluster(Node3, Config), 138: escalus:end_per_testcase(cluster_of_three, Config); 139: 140: end_per_testcase(CaseName, Config) when CaseName == remove_alive_from_cluster 141: orelse CaseName == remove_dead_from_cluster-> 142: Node3 = mim3(), 143: Node2 = mim2(), 144: remove_node_from_cluster(Node3, Config), 145: remove_node_from_cluster(Node2, Config), 146: escalus:end_per_testcase(CaseName, Config); 147: 148: end_per_testcase(CaseName, Config) when CaseName == join_successful_prompt 149: orelse CaseName == join_successful_force 150: orelse CaseName == leave_unsuccessful_prompt 151: orelse CaseName == leave_unsuccessful_force 152: orelse CaseName == join_twice 153: orelse CaseName == join_twice_using_rpc 154: orelse CaseName == join_twice_in_parallel_using_rpc -> 155: Node2 = mim2(), 156: remove_node_from_cluster(Node2, Config), 157: escalus:end_per_testcase(CaseName, Config); 158: 159: end_per_testcase(CaseName, Config) -> 160: escalus:end_per_testcase(CaseName, Config). 161: 162: %%-------------------------------------------------------------------- 163: %% Message tests 164: %%-------------------------------------------------------------------- 165: 166: one_to_one_message(ConfigIn) -> 167: %% Given Alice connected to node one and ClusterGuy connected to node two 168: Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}], 169: Config = [{mongoose_metrics, Metrics} | ConfigIn], 170: escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) -> 171: %% When Alice sends a message to ClusterGuy 172: Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>), 173: escalus:send(Alice, Msg1), 174: %% Then he receives it 175: Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000), 176: escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1), 177: 178: %% When ClusterGuy sends a response 179: Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>), 180: escalus:send(ClusterGuy, Msg2), 181: %% Then Alice also receives it 182: Stanza2 = escalus:wait_for_stanza(Alice, 5000), 183: escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2) 184: end). 185: 186: %%-------------------------------------------------------------------- 187: %% Manage cluster commands tests 188: %%-------------------------------------------------------------------- 189: 190: 191: join_successful_prompt(Config) -> 192: %% given 193: #{node := Node2} = RPCSpec2 = mim2(), 194: %% when 195: {_, OpCode} = 196: mongooseimctl_interactive("join_cluster", [atom_to_list(Node2)], "yes\n", Config), 197: %% then 198: distributed_helper:verify_result(RPCSpec2, add), 199: ?eq(0, OpCode). 200: 201: join_successful_force(Config) -> 202: %% given 203: #{node := Node2} = RPCSpec2 = mim2(), 204: %% when 205: {_, OpCode} = mongooseimctl_force("join_cluster", [atom_to_list(Node2)], "--force", Config), 206: %% then 207: distributed_helper:verify_result(RPCSpec2, add), 208: ?eq(0, OpCode). 209: 210: leave_successful_prompt(Config) -> 211: %% given 212: Node2 = mim2(), 213: add_node_to_cluster(Node2, Config), 214: %% when 215: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config), 216: %% then 217: distributed_helper:verify_result(Node2, remove), 218: ?eq(0, OpCode). 219: 220: leave_successful_force(Config) -> 221: %% given 222: Node2 = mim2(), 223: add_node_to_cluster(Node2, Config), 224: %% when 225: {_, OpCode} = mongooseimctl_force("leave_cluster", [], "-f", Config), 226: %% then 227: distributed_helper:verify_result(Node2, remove), 228: ?eq(0, OpCode). 229: 230: join_unsuccessful(Config) -> 231: %% given 232: Node2 = mim2(), 233: %% when 234: {_, OpCode} = mongooseimctl_interactive("join_cluster", [], "no\n", Config), 235: %% then 236: distributed_helper:verify_result(Node2, remove), 237: ?ne(0, OpCode). 238: 239: leave_unsuccessful(Config) -> 240: %% given 241: Node2 = mim(), 242: add_node_to_cluster(Node2, Config), 243: %% when 244: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "no\n", Config), 245: %% then 246: distributed_helper:verify_result(Node2, add), 247: ?ne(0, OpCode). 248: 249: leave_but_no_cluster(Config) -> 250: %% given 251: Node2 = mim2(), 252: %% when 253: {_, OpCode} = mongooseimctl_interactive("leave_cluster", [], "yes\n", Config), 254: %% then 255: distributed_helper:verify_result(Node2, remove), 256: ?ne(0, OpCode). 257: 258: join_twice(Config) -> 259: %% given 260: #{node := Node2} = RPCSpec2 = mim2(), 261: %% when 262: {_, OpCode1} = mongooseimctl_interactive("join_cluster", 263: [atom_to_list(Node2)], "yes\n", Config), 264: {_, OpCode2} = mongooseimctl_interactive("join_cluster", 265: [atom_to_list(Node2)], "yes\n", Config), 266: %% then 267: distributed_helper:verify_result(RPCSpec2, add), 268: ?eq(0, OpCode1), 269: ?ne(0, OpCode2). 270: 271: %% This function checks that it's ok to call mongoose_cluster:join/1 twice 272: join_twice_using_rpc(_Config) -> 273: %% given 274: #{node := Node1} = mim(), 275: RPCSpec2 = mim2(), 276: Timeout = timer:seconds(60), 277: %% when 278: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 279: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 280: %% then 281: distributed_helper:verify_result(RPCSpec2, add), 282: ok. 283: 284: %% Check, that global transaction allows to run only one cluster operation at the time. 285: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail). 286: join_twice_in_parallel_using_rpc(_Config) -> 287: %% given 288: #{node := Node1} = mim(), 289: RPCSpec2 = mim2(), 290: Timeout = timer:seconds(60), 291: %% when 292: Pid1 = proc_lib:spawn_link(fun() -> 293: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 294: end), 295: Pid2 = proc_lib:spawn_link(fun() -> 296: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 297: end), 298: %% then 299: distributed_helper:verify_result(RPCSpec2, add), 300: wait_for_process_to_stop(Pid1, Timeout), 301: wait_for_process_to_stop(Pid2, Timeout), 302: ok. 303: 304: leave_using_rpc(Config) -> 305: %% given 306: Node1 = mim(), 307: Node2 = mim2(), 308: add_node_to_cluster(Node2, Config), 309: %% when 310: Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)}, 311: mongoose_server_api, leave_cluster, []), 312: ct:pal("leave_using_rpc result ~p~n", [Result]), 313: %% then 314: distributed_helper:verify_result(Node2, remove), 315: ok. 316: 317: leave_twice(Config) -> 318: %% given 319: Node2 = mim2(), 320: add_node_to_cluster(Node2, Config), 321: %% when 322: {_, OpCode1} = mongooseimctl_force("leave_cluster", [], "--force", Config), 323: {_, OpCode2} = mongooseimctl_force("leave_cluster", [], "-f", Config), 324: %% then 325: distributed_helper:verify_result(Node2, remove), 326: ?eq(0, OpCode1), 327: ?ne(0, OpCode2). 328: 329: cluster_of_three(Config) -> 330: %% given 331: #{node := ClusterMemberNodeName} = ClusterMember = mim(), 332: #{node := Node2Nodename} = Node2 = mim2(), 333: #{node := Node3Nodename} = Node3 = mim3(), 334: %% when 335: {_, OpCode1} = mongooseimctl_force(Node2Nodename, "join_cluster", 336: [atom_to_list(ClusterMemberNodeName)], "-f", Config), 337: {_, OpCode2} = mongooseimctl_force(Node3Nodename, "join_cluster", 338: [atom_to_list(ClusterMemberNodeName)], "-f", Config), 339: %% then 340: ?eq(0, OpCode1), 341: ?eq(0, OpCode2), 342: nodes_clustered(Node2, ClusterMember, true), 343: nodes_clustered(Node3, ClusterMember, true), 344: nodes_clustered(Node2, Node3, true). 345: 346: leave_the_three(Config) -> 347: %% given 348: Timeout = timer:seconds(60), 349: #{node := ClusterMemberNode} = ClusterMember = mim(), 350: #{node := Node2Nodename} = Node2 = mim2(), 351: #{node := Node3Nodename} = Node3 = mim3(), 352: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]), 353: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [ClusterMemberNode]), 354: %% when 355: {_, OpCode1} = mongooseimctl_interactive(Node2Nodename, "leave_cluster", [], "yes\n", Config), 356: nodes_clustered(Node2, ClusterMember, false), 357: nodes_clustered(Node3, ClusterMember, true), 358: {_, OpCode2} = mongooseimctl_interactive(Node3Nodename, "leave_cluster", [], "yes\n", Config), 359: %% then 360: nodes_clustered(Node3, ClusterMember, false), 361: nodes_clustered(Node2, Node3, false), 362: ?eq(0, OpCode1), 363: ?eq(0, OpCode2). 364: 365: remove_dead_from_cluster(Config) -> 366: % given 367: Timeout = timer:seconds(60), 368: #{node := Node1Nodename} = Node1 = mim(), 369: #{node := _Node2Nodename} = Node2 = mim2(), 370: #{node := Node3Nodename} = Node3 = mim3(), 371: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]), 372: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Nodename]), 373: %% when 374: distributed_helper:stop_node(Node3Nodename, Config), 375: {_, OpCode1} = mongooseimctl_interactive(Node1, "remove_from_cluster", 376: [atom_to_list(Node3Nodename)], "yes\n", Config), 377: %% then 378: ?eq(0, OpCode1), 379: % node is down hence its not in mnesia cluster 380: have_node_in_mnesia(Node1, Node2, true), 381: have_node_in_mnesia(Node1, Node3, false), 382: have_node_in_mnesia(Node2, Node3, false), 383: % after node awakening nodes are clustered again 384: distributed_helper:start_node(Node3Nodename, Config), 385: have_node_in_mnesia(Node1, Node3, true), 386: have_node_in_mnesia(Node2, Node3, true). 387: 388: remove_alive_from_cluster(Config) -> 389: % given 390: Timeout = timer:seconds(60), 391: #{node := Node1Name} = Node1 = mim(), 392: #{node := Node2Name} = Node2 = mim2(), 393: Node3 = mim3(), 394: ok = rpc(Node2#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]), 395: ok = rpc(Node3#{timeout => Timeout}, mongoose_cluster, join, [Node1Name]), 396: %% when 397: %% Node2 is still running 398: {_, OpCode1} = mongooseimctl_force(Node1Name, "remove_from_cluster", 399: [atom_to_list(Node2Name)], "-f", Config), 400: %% then 401: ?eq(0, OpCode1), 402: % node is down hence its not in mnesia cluster 403: have_node_in_mnesia(Node1, Node3, true), 404: have_node_in_mnesia(Node1, Node2, false), 405: have_node_in_mnesia(Node3, Node2, false). 406: 407: 408: 409: %% Helpers 410: mongooseimctl_interactive(C, A, R, Config) -> 411: #{node := DefaultNode} = mim(), 412: mongooseimctl_interactive(DefaultNode, C, A, R, Config). 413: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) -> 414: CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config), 415: run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response). 416: 417: normalize_args(Args) -> 418: lists:map(fun 419: (Arg) when is_binary(Arg) -> 420: binary_to_list(Arg); 421: (Arg) when is_list(Arg) -> 422: Arg 423: end, Args). 424: 425: mongooseimctl_force(Command, Args, ForceFlag, Config) -> 426: #{node := DefaultNode} = mim(), 427: mongooseimctl_force(DefaultNode, Command, Args, ForceFlag, Config). 428: mongooseimctl_force(Node, Cmd, Args, ForceFlag, Config) -> 429: mongooseimctl_helper:mongooseimctl(Node, Cmd, [ForceFlag | Args], Config). 430: 431: ctl_path_atom(NodeName) -> 432: CtlString = atom_to_list(NodeName) ++ "_ctl", 433: list_to_atom(CtlString). 434: 435: %% Long timeout for mnesia and ejabberd app restart 436: run_interactive(Cmd, Response) -> 437: run_interactive(Cmd, Response, timer:seconds(60)). 438: 439: run_interactive(Cmd, Response, Timeout) -> 440: Port = erlang:open_port({spawn, Cmd}, [exit_status]), 441: %% respond to interactive question (yes/no) 442: Port ! {self(), {command, Response}}, 443: mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout). 444: 445: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) -> 446: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 447: DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]), 448: Pairs = [{Node1Name, DbNodes2, ShouldBe}, 449: {Node2Name, DbNodes1, ShouldBe}, 450: {Node1Name, DbNodes1, true}, 451: {Node2Name, DbNodes2, true}], 452: [?assertEqual(ShouldBelong, lists:member(Element, List)) 453: || {Element, List, ShouldBelong} <- Pairs]. 454: 455: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) -> 456: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 457: ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)). 458: 459: wait_for_process_to_stop(Pid, Timeout) -> 460: erlang:monitor(process, Pid), 461: receive 462: {'DOWN', _, process, Pid, _} -> ok 463: after Timeout -> 464: ct:fail(wait_for_process_to_stop_timeout) 465: end.