1: %%============================================================================== 2: %% Copyright 2014 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(cluster_commands_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: 20: -import(distributed_helper, [add_node_to_cluster/2, 21: is_sm_distributed/0, 22: mim/0, mim2/0, mim3/0, 23: remove_node_from_cluster/2, 24: require_rpc_nodes/1, 25: rpc/4]). 26: 27: -include_lib("eunit/include/eunit.hrl"). 28: -include_lib("common_test/include/ct.hrl"). 29: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)). 30: -define(ne(A, B), ?assertNot(A == B)). 31: 32: %%-------------------------------------------------------------------- 33: %% Suite configuration 34: %%-------------------------------------------------------------------- 35: 36: all() -> 37: [{group, clustered}, 38: {group, clustering_two}]. 39: 40: groups() -> 41: [{clustered, [], [one_to_one_message]}, 42: {clustering_two, [], clustering_two_tests()}]. 43: 44: suite() -> 45: require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite(). 46: 47: clustering_two_tests() -> 48: [leave_using_rpc, 49: join_twice_using_rpc, 50: join_twice_in_parallel_using_rpc]. 51: 52: %%-------------------------------------------------------------------- 53: %% Init & teardown 54: %%-------------------------------------------------------------------- 55: 56: init_per_suite(Config) -> 57: #{node := Node1} = RPCNode1 = mim(), 58: #{node := Node2} = RPCNode2 = mim2(), 59: #{node := Node3} = RPCNode3 = mim3(), 60: Config1 = ejabberd_node_utils:init(RPCNode1, Config), 61: Config2 = ejabberd_node_utils:init(RPCNode2, Config1), 62: Config3 = ejabberd_node_utils:init(RPCNode3, Config2), 63: NodeCtlPath = distributed_helper:ctl_path(Node1, Config3), 64: Node2CtlPath = distributed_helper:ctl_path(Node2, Config3), 65: Node3CtlPath = distributed_helper:ctl_path(Node3, Config3), 66: escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath}, 67: {ctl_path_atom(Node2), Node2CtlPath}, 68: {ctl_path_atom(Node3), Node3CtlPath}] 69: ++ Config3). 70: 71: end_per_suite(Config) -> 72: escalus:end_per_suite(Config). 73: 74: init_per_group(Group, Config) when Group == clustered -> 75: Node2 = mim2(), 76: Config1 = add_node_to_cluster(Node2, Config), 77: case is_sm_distributed() of 78: true -> 79: escalus:create_users(Config1, escalus:get_users([alice, clusterguy])); 80: {false, Backend} -> 81: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 82: Node2 = mim2(), 83: remove_node_from_cluster(Node2, Config1), 84: {skip, nondistributed_sm} 85: end; 86: 87: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 88: case is_sm_distributed() of 89: true -> 90: ok; 91: {false, Backend} -> 92: ct:pal("Backend ~p doesn't support distributed tests", [Backend]), 93: {skip, nondistributed_sm} 94: end; 95: 96: init_per_group(_GroupName, Config) -> 97: escalus:create_users(Config). 98: 99: end_per_group(Group, Config) when Group == clustered -> 100: escalus:delete_users(Config, escalus:get_users([alice, clusterguy])), 101: Node2 = mim2(), 102: remove_node_from_cluster(Node2, Config); 103: 104: %% Users are gone after mnesia cleaning 105: %% hence there is no need to delete them manually 106: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three -> 107: ok; 108: end_per_group(_GroupName, Config) -> 109: escalus:delete_users(Config). 110: 111: init_per_testcase(CaseName, Config) -> 112: escalus:init_per_testcase(CaseName, Config). 113: 114: end_per_testcase(CaseName, Config) when CaseName == join_twice_using_rpc 115: orelse CaseName == join_twice_in_parallel_using_rpc -> 116: Node2 = mim2(), 117: remove_node_from_cluster(Node2, Config), 118: escalus:end_per_testcase(CaseName, Config); 119: 120: end_per_testcase(CaseName, Config) -> 121: escalus:end_per_testcase(CaseName, Config). 122: 123: %%-------------------------------------------------------------------- 124: %% Message tests 125: %%-------------------------------------------------------------------- 126: 127: one_to_one_message(ConfigIn) -> 128: %% Given Alice connected to node one and ClusterGuy connected to node two 129: Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}], 130: Config = [{mongoose_metrics, Metrics} | ConfigIn], 131: escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) -> 132: %% When Alice sends a message to ClusterGuy 133: Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>), 134: escalus:send(Alice, Msg1), 135: %% Then he receives it 136: Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000), 137: escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1), 138: 139: %% When ClusterGuy sends a response 140: Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>), 141: escalus:send(ClusterGuy, Msg2), 142: %% Then Alice also receives it 143: Stanza2 = escalus:wait_for_stanza(Alice, 5000), 144: escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2) 145: end). 146: 147: %%-------------------------------------------------------------------- 148: %% Manage cluster commands tests 149: %%-------------------------------------------------------------------- 150: 151: %% This function checks that it's ok to call mongoose_cluster:join/1 twice 152: join_twice_using_rpc(_Config) -> 153: %% given 154: #{node := Node1} = mim(), 155: RPCSpec2 = mim2(), 156: Timeout = timer:seconds(60), 157: %% when 158: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 159: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]), 160: %% then 161: distributed_helper:verify_result(RPCSpec2, add), 162: ok. 163: 164: %% Check, that global transaction allows to run only one cluster operation at the time. 165: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail). 166: join_twice_in_parallel_using_rpc(_Config) -> 167: %% given 168: #{node := Node1} = mim(), 169: RPCSpec2 = mim2(), 170: Timeout = timer:seconds(60), 171: %% when 172: Pid1 = proc_lib:spawn_link(fun() -> 173: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 174: end), 175: Pid2 = proc_lib:spawn_link(fun() -> 176: ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]) 177: end), 178: %% then 179: distributed_helper:verify_result(RPCSpec2, add), 180: wait_for_process_to_stop(Pid1, Timeout), 181: wait_for_process_to_stop(Pid2, Timeout), 182: ok. 183: 184: leave_using_rpc(Config) -> 185: %% given 186: Node1 = mim(), 187: Node2 = mim2(), 188: add_node_to_cluster(Node2, Config), 189: %% when 190: Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)}, 191: mongoose_server_api, leave_cluster, []), 192: ct:pal("leave_using_rpc result ~p~n", [Result]), 193: %% then 194: distributed_helper:verify_result(Node2, remove), 195: ok. 196: 197: %% Helpers 198: mongooseimctl_interactive(C, A, R, Config) -> 199: #{node := DefaultNode} = mim(), 200: mongooseimctl_interactive(DefaultNode, C, A, R, Config). 201: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) -> 202: CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config), 203: run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response). 204: 205: normalize_args(Args) -> 206: lists:map(fun 207: (Arg) when is_binary(Arg) -> 208: binary_to_list(Arg); 209: (Arg) when is_list(Arg) -> 210: Arg 211: end, Args). 212: 213: ctl_path_atom(NodeName) -> 214: CtlString = atom_to_list(NodeName) ++ "_ctl", 215: list_to_atom(CtlString). 216: 217: %% Long timeout for mnesia and ejabberd app restart 218: run_interactive(Cmd, Response) -> 219: run_interactive(Cmd, Response, timer:seconds(60)). 220: 221: run_interactive(Cmd, Response, Timeout) -> 222: Port = erlang:open_port({spawn, Cmd}, [exit_status]), 223: %% respond to interactive question (yes/no) 224: Port ! {self(), {command, Response}}, 225: mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout). 226: 227: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) -> 228: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 229: DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]), 230: Pairs = [{Node1Name, DbNodes2, ShouldBe}, 231: {Node2Name, DbNodes1, ShouldBe}, 232: {Node1Name, DbNodes1, true}, 233: {Node2Name, DbNodes2, true}], 234: [?assertEqual(ShouldBelong, lists:member(Element, List)) 235: || {Element, List, ShouldBelong} <- Pairs]. 236: 237: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) -> 238: DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]), 239: ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)). 240: 241: wait_for_process_to_stop(Pid, Timeout) -> 242: erlang:monitor(process, Pid), 243: receive 244: {'DOWN', _, process, Pid, _} -> ok 245: after Timeout -> 246: ct:fail(wait_for_process_to_stop_timeout) 247: end.