1: %%==============================================================================
    2: %% Copyright 2014 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(cluster_commands_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: 
   20: -import(distributed_helper, [add_node_to_cluster/2,
   21:                              is_sm_distributed/0,
   22:                              mim/0, mim2/0, mim3/0,
   23:                              remove_node_from_cluster/2,
   24:                              require_rpc_nodes/1,
   25:                              rpc/4]).
   26: 
   27: -include_lib("eunit/include/eunit.hrl").
   28: -include_lib("common_test/include/ct.hrl").
   29: -define(eq(Expected, Actual), ?assertEqual(Expected, Actual)).
   30: -define(ne(A, B), ?assertNot(A == B)).
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Suite configuration
   34: %%--------------------------------------------------------------------
   35: 
   36: all() ->
   37:     [{group, clustered},
   38:      {group, clustering_two}].
   39: 
   40: groups() ->
   41:     [{clustered, [], [one_to_one_message]},
   42:      {clustering_two, [], clustering_two_tests()}].
   43: 
   44: suite() ->
   45:     require_rpc_nodes([mim, mim2, mim3]) ++ escalus:suite().
   46: 
   47: clustering_two_tests() ->
   48:     [leave_using_rpc,
   49:      join_twice_using_rpc,
   50:      join_twice_in_parallel_using_rpc].
   51: 
   52: %%--------------------------------------------------------------------
   53: %% Init & teardown
   54: %%--------------------------------------------------------------------
   55: 
   56: init_per_suite(Config) ->
   57:     #{node := Node1} = RPCNode1 = mim(),
   58:     #{node := Node2} = RPCNode2 = mim2(),
   59:     #{node := Node3} = RPCNode3 = mim3(),
   60:     Config1 = ejabberd_node_utils:init(RPCNode1, Config),
   61:     Config2 = ejabberd_node_utils:init(RPCNode2, Config1),
   62:     Config3 = ejabberd_node_utils:init(RPCNode3, Config2),
   63:     NodeCtlPath = distributed_helper:ctl_path(Node1, Config3),
   64:     Node2CtlPath = distributed_helper:ctl_path(Node2, Config3),
   65:     Node3CtlPath = distributed_helper:ctl_path(Node3, Config3),
   66:     escalus:init_per_suite([{ctl_path_atom(Node1), NodeCtlPath},
   67:                             {ctl_path_atom(Node2), Node2CtlPath},
   68:                             {ctl_path_atom(Node3), Node3CtlPath}]
   69:                            ++ Config3).
   70: 
   71: end_per_suite(Config) ->
   72:     escalus:end_per_suite(Config).
   73: 
   74: init_per_group(Group, Config) when Group == clustered ->
   75:     Node2 = mim2(),
   76:     Config1 = add_node_to_cluster(Node2, Config),
   77:     case is_sm_distributed() of
   78:         true ->
   79:             escalus:create_users(Config1, escalus:get_users([alice, clusterguy]));
   80:         {false, Backend} ->
   81:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
   82:             Node2 = mim2(),
   83:             remove_node_from_cluster(Node2, Config1),
   84:             {skip, nondistributed_sm}
   85:     end;
   86: 
   87: init_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
   88:     case is_sm_distributed() of
   89:         true ->
   90:             ok;
   91:         {false, Backend} ->
   92:             ct:pal("Backend ~p doesn't support distributed tests", [Backend]),
   93:             {skip, nondistributed_sm}
   94:     end;
   95: 
   96: init_per_group(_GroupName, Config) ->
   97:     escalus:create_users(Config).
   98: 
   99: end_per_group(Group, Config) when Group == clustered ->
  100:     escalus:delete_users(Config, escalus:get_users([alice, clusterguy])),
  101:     Node2 = mim2(),
  102:     remove_node_from_cluster(Node2, Config);
  103: 
  104: %% Users are gone after mnesia cleaning
  105: %% hence there is no need to delete them manually
  106: end_per_group(Group, _Config) when Group == clustering_two orelse Group == clustering_three ->
  107:     ok;
  108: end_per_group(_GroupName, Config) ->
  109:     escalus:delete_users(Config).
  110: 
  111: init_per_testcase(CaseName, Config) ->
  112:     escalus:init_per_testcase(CaseName, Config).
  113: 
  114: end_per_testcase(CaseName, Config) when CaseName == join_twice_using_rpc
  115:                                    orelse CaseName == join_twice_in_parallel_using_rpc ->
  116:     Node2 = mim2(),
  117:     remove_node_from_cluster(Node2, Config),
  118:     escalus:end_per_testcase(CaseName, Config);
  119: 
  120: end_per_testcase(CaseName, Config) ->
  121:     escalus:end_per_testcase(CaseName, Config).
  122: 
  123: %%--------------------------------------------------------------------
  124: %% Message tests
  125: %%--------------------------------------------------------------------
  126: 
  127: one_to_one_message(ConfigIn) ->
  128:     %% Given Alice connected to node one and ClusterGuy connected to node two
  129:     Metrics = [{[global, data, dist], [{recv_oct, '>'}, {send_oct, '>'}]}],
  130:     Config = [{mongoose_metrics, Metrics} | ConfigIn],
  131:     escalus:story(Config, [{alice, 1}, {clusterguy, 1}], fun(Alice, ClusterGuy) ->
  132:         %% When Alice sends a message to ClusterGuy
  133:         Msg1 = escalus_stanza:chat_to(ClusterGuy, <<"Hi!">>),
  134:         escalus:send(Alice, Msg1),
  135:         %% Then he receives it
  136:         Stanza1 = escalus:wait_for_stanza(ClusterGuy, 5000),
  137:         escalus:assert(is_chat_message, [<<"Hi!">>], Stanza1),
  138: 
  139:         %% When ClusterGuy sends a response
  140:         Msg2 = escalus_stanza:chat_to(Alice, <<"Oh hi!">>),
  141:         escalus:send(ClusterGuy, Msg2),
  142:         %% Then Alice also receives it
  143:         Stanza2 = escalus:wait_for_stanza(Alice, 5000),
  144:         escalus:assert(is_chat_message, [<<"Oh hi!">>], Stanza2)
  145:     end).
  146: 
  147: %%--------------------------------------------------------------------
  148: %% Manage cluster commands tests
  149: %%--------------------------------------------------------------------
  150: 
  151: %% This function checks that it's ok to call mongoose_cluster:join/1 twice
  152: join_twice_using_rpc(_Config) ->
  153:     %% given
  154:     #{node := Node1} = mim(),
  155:     RPCSpec2 = mim2(),
  156:     Timeout = timer:seconds(60),
  157:     %% when
  158:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  159:     ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1]),
  160:     %% then
  161:     distributed_helper:verify_result(RPCSpec2, add),
  162:     ok.
  163: 
  164: %% Check, that global transaction allows to run only one cluster operation at the time.
  165: %% It should technically behave the same way as join_twice_using_rpc test (i.e. not fail).
  166: join_twice_in_parallel_using_rpc(_Config) ->
  167:     %% given
  168:     #{node := Node1} = mim(),
  169:     RPCSpec2 = mim2(),
  170:     Timeout = timer:seconds(60),
  171:     %% when
  172:     Pid1 = proc_lib:spawn_link(fun() ->
  173:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  174:         end),
  175:     Pid2 = proc_lib:spawn_link(fun() ->
  176:         ok = rpc(RPCSpec2#{timeout => Timeout}, mongoose_cluster, join, [Node1])
  177:         end),
  178:     %% then
  179:     distributed_helper:verify_result(RPCSpec2, add),
  180:     wait_for_process_to_stop(Pid1, Timeout),
  181:     wait_for_process_to_stop(Pid2, Timeout),
  182:     ok.
  183: 
  184: leave_using_rpc(Config) ->
  185:     %% given
  186:     Node1 = mim(),
  187:     Node2 = mim2(),
  188:     add_node_to_cluster(Node2, Config),
  189:     %% when
  190:     Result = distributed_helper:rpc(Node1#{timeout => timer:seconds(30)},
  191:                                     mongoose_server_api, leave_cluster, []),
  192:     ct:pal("leave_using_rpc result ~p~n", [Result]),
  193:     %% then
  194:     distributed_helper:verify_result(Node2, remove),
  195:     ok.
  196: 
  197: %% Helpers
  198: mongooseimctl_interactive(C, A, R, Config) ->
  199:     #{node := DefaultNode} = mim(),
  200:     mongooseimctl_interactive(DefaultNode, C, A, R, Config).
  201: mongooseimctl_interactive(Node, Cmd, Args, Response, Config) ->
  202:     CtlCmd = escalus_config:get_config(ctl_path_atom(Node), Config),
  203:     run_interactive(string:join([CtlCmd, Cmd | normalize_args(Args)], " "), Response).
  204: 
  205: normalize_args(Args) ->
  206:     lists:map(fun
  207:                   (Arg) when is_binary(Arg) ->
  208:                       binary_to_list(Arg);
  209:                   (Arg) when is_list(Arg) ->
  210:                       Arg
  211:               end, Args).
  212: 
  213: ctl_path_atom(NodeName) ->
  214:     CtlString = atom_to_list(NodeName) ++ "_ctl",
  215:     list_to_atom(CtlString).
  216: 
  217: %% Long timeout for mnesia and ejabberd app restart
  218: run_interactive(Cmd, Response) ->
  219:     run_interactive(Cmd, Response, timer:seconds(60)).
  220: 
  221: run_interactive(Cmd, Response, Timeout) ->
  222:     Port = erlang:open_port({spawn, Cmd}, [exit_status]),
  223:     %% respond to interactive question (yes/no)
  224:     Port ! {self(), {command, Response}},
  225:     mongooseimctl_helper:loop(Cmd, [], Port, [], Timeout).
  226: 
  227: nodes_clustered(#{node := Node1Name} = Node1, #{node := Node2Name} = Node2, ShouldBe) ->
  228:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  229:     DbNodes2 = distributed_helper:rpc(Node2, mnesia, system_info, [db_nodes]),
  230:     Pairs = [{Node1Name, DbNodes2, ShouldBe},
  231:         {Node2Name, DbNodes1, ShouldBe},
  232:         {Node1Name, DbNodes1, true},
  233:         {Node2Name, DbNodes2, true}],
  234:     [?assertEqual(ShouldBelong, lists:member(Element, List))
  235:         || {Element, List, ShouldBelong} <- Pairs].
  236: 
  237: have_node_in_mnesia(Node1, #{node := Node2}, ShouldBe) ->
  238:     DbNodes1 = distributed_helper:rpc(Node1, mnesia, system_info, [db_nodes]),
  239:     ?assertEqual(ShouldBe, lists:member(Node2, DbNodes1)).
  240: 
  241: wait_for_process_to_stop(Pid, Timeout) ->
  242:     erlang:monitor(process, Pid),
  243:     receive
  244:         {'DOWN', _, process, Pid, _} -> ok
  245:     after Timeout ->
  246:             ct:fail(wait_for_process_to_stop_timeout)
  247:     end.