1: %%==============================================================================
    2: %% Copyright 2017 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mod_global_distrib_SUITE).
   18: 
   19: -compile([export_all, nowarn_export_all]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("common_test/include/ct.hrl").
   23: -include_lib("eunit/include/eunit.hrl").
   24: -include_lib("escalus/include/escalus_xmlns.hrl").
   25: -include_lib("exml/include/exml.hrl").
   26: 
   27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms
   28: 
   29: -import(domain_helper, [domain/0]).
   30: 
   31: %%--------------------------------------------------------------------
   32: %% Suite configuration
   33: %%--------------------------------------------------------------------
   34: 
   35: all() ->
   36:     [
   37:      {group, mod_global_distrib},
   38:      {group, cluster_restart},
   39:      {group, start_checks},
   40:      {group, invalidation},
   41:      {group, multi_connection},
   42:      {group, rebalancing},
   43:      {group, advertised_endpoints},
   44:      {group, hosts_refresher}
   45:     ].
   46: 
   47: groups() ->
   48:     G = [{mod_global_distrib, [],
   49:           [
   50:            test_pm_between_users_at_different_locations,
   51:            test_pm_between_users_before_available_presence,
   52:            test_component_disconnect,
   53:            test_component_on_one_host,
   54:            test_components_in_different_regions,
   55:            test_hidden_component_disco_in_different_region,
   56:            test_pm_with_disconnection_on_other_server,
   57:            test_pm_with_graceful_reconnection_to_different_server,
   58:            test_pm_with_ungraceful_reconnection_to_different_server,
   59:            test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first,
   60:            test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first,
   61:            test_component_unregister,
   62:            test_update_senders_host,
   63:            test_update_senders_host_by_ejd_service,
   64: 
   65:            %% with node 2 disabled
   66:            test_muc_conversation_on_one_host,
   67:            test_global_disco
   68:            %% TODO: Add test case fo global_distrib_addr option
   69:           ]},
   70:          {hosts_refresher, [],
   71:           [test_host_refreshing]},
   72:          {cluster_restart, [],
   73:           [
   74:            test_location_disconnect
   75:           ]},
   76:          {start_checks, [],
   77:           [
   78:            test_error_on_wrong_hosts
   79:           ]},
   80:          {invalidation, [],
   81:           [
   82:            % TODO: Add checks for other mapping refreshes
   83:            refresh_nodes
   84:           ]},
   85:          {multi_connection, [],
   86:           [
   87:            test_in_order_messages_on_multiple_connections,
   88:            test_in_order_messages_on_multiple_connections_with_bounce,
   89:            test_messages_bounced_in_order,
   90: 
   91:            %% with node 2 disabled
   92:            test_muc_conversation_history
   93:           ]},
   94:          {rebalancing, [],
   95:           [
   96:            enable_new_endpoint_on_refresh,
   97:            disable_endpoint_on_refresh,
   98:            wait_for_connection,
   99:            closed_connection_is_removed_from_disabled
  100:           ]},
  101:          {advertised_endpoints, [],
  102:           [
  103:            test_advertised_endpoints_override_endpoints,
  104:            test_pm_between_users_at_different_locations
  105:           ]}
  106:         ],
  107:     ct_helper:repeat_all_until_all_ok(G).
  108: 
  109: suite() ->
  110:     [{require, europe_node1, {hosts, mim, node}},
  111:      {require, europe_node2, {hosts, mim2, node}},
  112:      {require, asia_node, {hosts, reg, node}},
  113:      {require, c2s_port, {hosts, mim, c2s_port}} |
  114:      escalus:suite()].
  115: 
  116: %%--------------------------------------------------------------------
  117: %% Init & teardown
  118: %%--------------------------------------------------------------------
  119: 
  120: init_per_suite(Config) ->
  121:     case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]),
  122:           rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of
  123:         {{ok, _}, {ok, _}} ->
  124:             ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]),
  125: 
  126:             enable_logging(),
  127:             % We have to pass [no_opts] because [] is treated as string and converted
  128:             % automatically to <<>>
  129:             escalus:init_per_suite([{add_advertised_endpoints, []},
  130:                                     {extra_config, []}, {redis_extra_config, [no_opts]} | Config]);
  131:         Result ->
  132:             ct:pal("Redis check result: ~p", [Result]),
  133:             {skip, "GD Redis default pool not available"}
  134:     end.
  135: 
  136: end_per_suite(Config) ->
  137:     disable_logging(),
  138:     escalus_fresh:clean(),
  139:     rpc(europe_node2, mongoose_cluster, leave, []),
  140:     escalus:end_per_suite(Config).
  141: 
  142: init_per_group(start_checks, Config) ->
  143:     Config;
  144: init_per_group(multi_connection, Config) ->
  145:     ExtraConfig = [{resend_after_ms, 20000},
  146:                    %% Disable unused feature to avoid interferance
  147:                    {disabled_gc_interval, 10000},
  148:                    {connections_per_endpoint, 100}],
  149:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  150: init_per_group(invalidation, Config) ->
  151:     Config1 = init_per_group(invalidation_generic, Config),
  152:     NodeBin = <<"fake_node@localhost">>,
  153:     [{node_to_expire, NodeBin} | Config1];
  154: init_per_group(rebalancing, Config) ->
  155:     %% We need to prevent automatic refreshes, because they may interfere with tests
  156:     %% and we need early disabled garbage collection to check its validity
  157:     ExtraConfig = [{endpoint_refresh_interval, 3600},
  158:                    {endpoint_refresh_interval_when_empty, 3600},
  159:                    {disabled_gc_interval, 1}],
  160:     RedisExtraConfig = [{refresh_after, 3600}],
  161:     init_per_group_generic([{extra_config, ExtraConfig},
  162:                             {redis_extra_config, RedisExtraConfig} | Config]);
  163: init_per_group(advertised_endpoints, Config) ->
  164:     lists:foreach(fun({NodeName, _, _}) ->
  165:                           Node = ct:get_config(NodeName),
  166:                           mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload)
  167:                   end, get_hosts()),
  168:     mock_inet_on_each_node(),
  169:     init_per_group_generic(
  170:                [{add_advertised_endpoints,
  171:                  [{asia_node, advertised_endpoints()}]} | Config]);
  172: init_per_group(mod_global_distrib, Config) ->
  173:     %% Disable mod_global_distrib_mapping_redis refresher
  174:     RedisExtraConfig = [{refresh_after, 3600}],
  175:     init_per_group_generic([{redis_extra_config, RedisExtraConfig} | Config]);
  176: init_per_group(_, Config) ->
  177:     init_per_group_generic(Config).
  178: 
  179: init_per_group_generic(Config0) ->
  180:     Config2 =
  181:         lists:foldl(
  182:           fun({NodeName, LocalHost, ReceiverPort}, Config1) ->
  183:                   Opts0 = (?config(extra_config, Config1) ++
  184:                            [{local_host, LocalHost},
  185:                             {hosts_refresh_interval, ?HOSTS_REFRESH_INTERVAL},
  186:                             {global_host, "localhost"},
  187:                             {endpoints, [listen_endpoint(ReceiverPort)]},
  188:                             {tls_opts, [
  189:                                         {certfile, "priv/ssl/fake_server.pem"},
  190:                                         {cafile, "priv/ssl/ca/cacert.pem"}
  191:                                        ]},
  192:                             {redis, ?config(redis_extra_config, Config1)},
  193:                             {resend_after_ms, 500}]),
  194:                   Opts = maybe_add_advertised_endpoints(NodeName, Opts0, Config1),
  195: 
  196:                   VirtHosts = virtual_hosts(NodeName),
  197:                   Node = node_spec(NodeName),
  198:                   Config2 = dynamic_modules:save_modules(Node, VirtHosts, Config1),
  199: 
  200:                   %% To reduce load when sending many messages
  201:                   ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last,
  202:                                    mod_stream_management],
  203:                   [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) ||
  204:                       VirtHost <- VirtHosts],
  205: 
  206:                   dynamic_modules:ensure_modules(Node, domain(),
  207:                                                  [{mod_global_distrib, Opts},
  208:                                                   {mod_stream_management, [{resume_timeout, 1}]}]),
  209:                   Config2
  210:           end,
  211:           Config0,
  212:           get_hosts()),
  213: 
  214:     wait_for_listeners_to_appear(),
  215: 
  216:     {SomeNode, _, _} = hd(get_hosts()),
  217:     NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []),
  218:     [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2].
  219: 
  220: end_per_group(advertised_endpoints, Config) ->
  221:     Pids = ?config(meck_handlers, Config),
  222:     unmock_inet(Pids),
  223:     escalus_fresh:clean(),
  224:     end_per_group_generic(Config);
  225: end_per_group(start_checks, Config) ->
  226:     escalus_fresh:clean(),
  227:     Config;
  228: end_per_group(invalidation, Config) ->
  229:     redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config),
  230:                             ?config(node_to_expire, Config)]),
  231:     end_per_group_generic(Config);
  232: end_per_group(_, Config) ->
  233:     end_per_group_generic(Config).
  234: 
  235: end_per_group_generic(Config) ->
  236:     dynamic_modules:restore_modules(#{timeout => timer:seconds(30)},  Config).
  237: 
  238: init_per_testcase(CaseName, Config)
  239:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  240:        CaseName == test_muc_conversation_history ->
  241:     %% There is no helper to load MUC on node2
  242:     %% For now it's easier to hide node2
  243:     %% TODO: Do it right at some point!
  244:     hide_node(europe_node2, Config),
  245:     %% There would be no new connections to europe_node2, but there can be some old ones.
  246:     %% We need to disconnect previous connections.
  247:     {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  248:     trigger_rebalance(asia_node, list_to_binary(EuropeHost)),
  249:     %% Load muc on mim node
  250:     muc_helper:load_muc(),
  251:     RegNode = ct:get_config({hosts, reg, node}),
  252:     %% Wait for muc.localhost to become visible from reg node
  253:     wait_for_domain(RegNode, muc_helper:muc_host()),
  254:     escalus:init_per_testcase(CaseName, Config);
  255: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  256:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server;
  257:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  258:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  259:     escalus:init_per_testcase(CN, init_user_eve(Config));
  260: init_per_testcase(CaseName, Config) ->
  261:     escalus:init_per_testcase(CaseName, Config).
  262: 
  263: init_user_eve(Config) ->
  264:     %% Register Eve in reg cluster
  265:     EveSpec = escalus_fresh:create_fresh_user(Config, eve),
  266:     MimPort = ct:get_config({hosts, mim, c2s_port}),
  267:     EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}),
  268:     %% Register Eve in mim cluster
  269:     escalus:create_users(Config, [{eve, EveSpec2}]),
  270:     [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config].
  271: 
  272: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  273:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server;
  274:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  275:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  276:     MimEveSpec = ?config(evespec_mim, Config),
  277:     %% Clean Eve from reg cluster
  278:     escalus_fresh:clean(),
  279:     %% Clean Eve from mim cluster
  280:     %% For shared databases (i.e. mysql, pgsql...),
  281:     %% removing from one cluster would remove from all clusters.
  282:     %% For mnesia auth backend we need to call removal from each cluster.
  283:     %% That's why there is a catch here.
  284:     catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]),
  285:     generic_end_per_testcase(CN, Config);
  286: end_per_testcase(CaseName, Config)
  287:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  288:        CaseName == test_muc_conversation_history ->
  289:     refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)),
  290:     muc_helper:unload_muc(),
  291:     generic_end_per_testcase(CaseName, Config);
  292: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) ->
  293:     refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  294:     generic_end_per_testcase(CN, Config);
  295: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh;
  296:                                   CN == disable_endpoint_on_refresh;
  297:                                   CN == wait_for_connection;
  298:                                   CN == closed_connection_is_removed_from_disabled ->
  299:     restart_receiver(asia_node),
  300:     refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  301:     generic_end_per_testcase(CN, Config);
  302: end_per_testcase(CaseName, Config) ->
  303:     generic_end_per_testcase(CaseName, Config).
  304: 
  305: generic_end_per_testcase(CaseName, Config) ->
  306:     lists:foreach(
  307:       fun({NodeName, _, _}) ->
  308:               %% TODO: Enable refresher only for specific test cases,
  309:               %% as some of them are based on assumption that node(s)
  310:               %% must open new connections during tests.
  311:               pause_refresher(NodeName, CaseName),
  312:               Node = ct:get_config(NodeName),
  313:               SupRef = {mod_global_distrib_outgoing_conns_sup, Node},
  314:               try
  315:                   OutgoingConns = supervisor:which_children(SupRef),
  316:                   lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) ->
  317:                                         skip;
  318:                                     ({Id, _, _, _}) ->
  319:                                         supervisor:terminate_child(SupRef, Id)
  320:                                 end, OutgoingConns),
  321:                   [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] =
  322:                     supervisor:which_children(SupRef)
  323:               catch
  324:                   _:{noproc, _} ->
  325:                       ct:pal("Sender supervisor not found in ~p", [NodeName])
  326:               end,
  327:               unpause_refresher(NodeName, CaseName)
  328:       end,
  329:       get_hosts()),
  330:     escalus:end_per_testcase(CaseName, Config).
  331: 
  332: virtual_hosts(asia_node) ->
  333:     [domain()];
  334: virtual_hosts(_) ->
  335:     [domain(), secondary_domain()].
  336: 
  337: secondary_domain() ->
  338:     ct:get_config({hosts, mim, secondary_domain}).
  339: 
  340: %% Refresher is not started at all or stopped for some test cases
  341: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  342: pause_refresher(_, test_error_on_wrong_hosts) ->
  343:     ok;
  344: pause_refresher(asia_node, test_location_disconnect) ->
  345:     ok;
  346: pause_refresher(NodeName, _) ->
  347:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []).
  348: 
  349: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  350: unpause_refresher(_, test_error_on_wrong_hosts) ->
  351:     ok;
  352: unpause_refresher(asia_node, test_location_disconnect) ->
  353:     ok;
  354: unpause_refresher(NodeName, _) ->
  355:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []).
  356: 
  357: %%--------------------------------------------------------------------
  358: %% Service discovery test
  359: %%--------------------------------------------------------------------
  360: 
  361: %% Requires module mod_global_distrib to be started with argument advertised_endpoints
  362: %% for each host in get_hosts().
  363: %% Reads Redis to confirm that endpoints (in Redis) are overwritten
  364: %% with `advertised_endpoints` option value
  365: test_advertised_endpoints_override_endpoints(_Config) ->
  366:     Endps = execute_on_each_node(mod_global_distrib_mapping_redis,
  367:                                  get_endpoints,
  368:                                  [<<"reg1">>]),
  369:     true = lists:all(fun({ok, E}) ->
  370:                              lists:sort(iptuples_to_string(E)) =:=
  371:                                  lists:sort(advertised_endpoints()) end, Endps).
  372: 
  373: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if
  374: %% it goes down for some reason (crash or domain unavailability).
  375: %% Also actually verifies that refresher properly reads host list
  376: %% from backend and starts appropriate pool.
  377: test_host_refreshing(_Config) ->
  378:     mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true,
  379:                                #{name => trees_for_connections_present}),
  380:     ConnectionSups = out_connection_sups(asia_node),
  381:     {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  382:     EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [list_to_binary(EuropeHost)]),
  383:     {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups),
  384:     erlang:exit(EuropePid, kill), % it's ok to kill temporary process
  385:     mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true,
  386:                                #{name => tree_for_sup_present}).
  387: 
  388: %% When run in mod_global_distrib group - tests simple case of connection
  389: %% between two users connected to different clusters.
  390: %% When run in advertised_endpoints group it tests whether it is possible
  391: %% to connect to a node that is advertising itself with a domain name.
  392: test_pm_between_users_at_different_locations(Config) ->
  393:     escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2).
  394: 
  395: test_pm_between_users_before_available_presence(Config) ->
  396:     Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]),
  397:     {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>),
  398:     {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>),
  399: 
  400:     test_two_way_pm(Alice, Eve),
  401: 
  402:     escalus_client:stop(Config1, Alice),
  403:     escalus_client:stop(Config1, Eve).
  404: 
  405: test_two_way_pm(Alice, Eve) ->
  406:     %% Ensure that users are properly registered
  407:     %% Otherwise you can get "Unable to route global message... user not found in the routing table"
  408:     %% error, because "escalus_client:start" can return before SM registration is completed.
  409:     wait_for_registration(Alice, ct:get_config({hosts, mim, node})),
  410:     wait_for_registration(Eve, ct:get_config({hosts, reg, node})),
  411: 
  412:     escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)),
  413:     escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)),
  414: 
  415:     FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)),
  416:     FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)),
  417: 
  418:     AliceJid = escalus_client:full_jid(Alice),
  419:     EveJid = escalus_client:full_jid(Eve),
  420: 
  421:     escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>],
  422:                    FromAlice),
  423:     escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>],
  424:                    FromEve).
  425: 
  426: test_muc_conversation_on_one_host(Config0) ->
  427:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  428:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  429:     escalus:fresh_story(
  430:       Config, [{eve, 1}],
  431:       fun(Eve) ->
  432:               Alice = connect_from_spec(AliceSpec, Config),
  433: 
  434:               RoomJid = ?config(room, Config),
  435:               AliceUsername = escalus_utils:get_username(Alice),
  436:               EveUsername = escalus_utils:get_username(Eve),
  437:               RoomAddr = muc_helper:room_address(RoomJid),
  438: 
  439:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  440:               escalus:wait_for_stanza(Alice),
  441: 
  442:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  443:               [_, _, _] = escalus:wait_for_stanzas(Eve, 3),
  444: 
  445:               Msg= <<"Hi, Eve!">>,
  446:               escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)),
  447:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)),
  448:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)),
  449: 
  450:               Msg2= <<"Hi, Alice!">>,
  451:               escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)),
  452:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)),
  453:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice))
  454:       end),
  455:     muc_helper:destroy_room(Config).
  456: 
  457: test_muc_conversation_history(Config0) ->
  458:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  459:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  460:     escalus:fresh_story(
  461:       Config, [{eve, 1}],
  462:       fun(Eve) ->
  463:               Alice = connect_from_spec(AliceSpec, Config),
  464: 
  465:               RoomJid = ?config(room, Config),
  466:               AliceUsername = escalus_utils:get_username(Alice),
  467:               RoomAddr = muc_helper:room_address(RoomJid),
  468: 
  469:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  470:               %% We don't care about presences from Alice, escalus would filter them out
  471:               wait_for_subject(Alice),
  472: 
  473:               send_n_muc_messages(Alice, RoomAddr, 3),
  474: 
  475:               %% Ensure that the messages are received by the room
  476:               %% before trying to login Eve.
  477:               %% Otherwise, Eve would receive some messages from history and
  478:               %% some as regular groupchat messages.
  479:               receive_n_muc_messages(Alice, 3),
  480: 
  481:               EveUsername = escalus_utils:get_username(Eve),
  482:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  483: 
  484:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  485:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  486: 
  487:               %% XEP-0045: After sending the presence broadcast (and only after doing so),
  488:               %% the service MAY then send discussion history, the room subject,
  489:               %% live messages, presence updates, and other in-room traffic.
  490:               receive_n_muc_messages(Eve, 3),
  491:               wait_for_subject(Eve)
  492:       end),
  493:     muc_helper:destroy_room(Config).
  494: 
  495: wait_for_muc_presence(User, RoomJid, FromNickname) ->
  496:     Presence = escalus:wait_for_stanza(User),
  497:     escalus:assert(is_presence, Presence),
  498:     escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence),
  499:     ok.
  500: 
  501: wait_for_subject(User) ->
  502:     Subject = escalus:wait_for_stanza(User),
  503:     escalus:assert(is_groupchat_message, Subject),
  504:     ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)),
  505:     ok.
  506: 
  507: send_n_muc_messages(User, RoomAddr, N) ->
  508:     lists:foreach(fun(I) ->
  509:                           Msg = <<"test-", (integer_to_binary(I))/binary>>,
  510:                           escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg))
  511:                   end, lists:seq(1, N)).
  512: 
  513: receive_n_muc_messages(User, N) ->
  514:     lists:foreach(fun(J) ->
  515:                           Msg = <<"test-", (integer_to_binary(J))/binary>>,
  516:                           Stanza = escalus:wait_for_stanza(User),
  517:                           escalus:assert(is_groupchat_message, [Msg], Stanza)
  518:                             end, lists:seq(1, N)).
  519: 
  520: test_component_on_one_host(Config) ->
  521:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  522:                        {port, service_port()}, {component, <<"test_service">>}],
  523: 
  524:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  525: 
  526:     Story = fun(User) ->
  527:                     Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>),
  528:                     escalus:send(User, Msg1),
  529:                     %% Then component receives it
  530:                     Reply1 = escalus:wait_for_stanza(Comp),
  531:                     escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1),
  532: 
  533:                     %% When components sends a reply
  534:                     Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>),
  535:                     escalus:send(Comp, escalus_stanza:from(Msg2, Addr)),
  536: 
  537:                     %% Then Alice receives it
  538:                     Reply2 = escalus:wait_for_stanza(User),
  539:                     escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2),
  540:                     escalus:assert(is_stanza_from, [Addr], Reply2)
  541:             end,
  542: 
  543:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  544: 
  545: %% Ensures that 2 components in distinct data centers can communicate.
  546: test_components_in_different_regions(_Config) ->
  547:     ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>},
  548:                              {server, <<"localhost">>}, {component, <<"test_service">>}],
  549:     Comp1Port = ct:get_config({hosts, mim, service_port}),
  550:     Comp2Port = ct:get_config({hosts, reg, service_port}),
  551:     Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig],
  552:     Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig],
  553: 
  554:     {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config),
  555:     {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config),
  556: 
  557:     Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1),
  558:     escalus:send(Comp1, Msg1),
  559:     GotMsg1 = escalus:wait_for_stanza(Comp2),
  560:     escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1),
  561: 
  562:     Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2),
  563:     escalus:send(Comp2, Msg2),
  564:     GotMsg2 = escalus:wait_for_stanza(Comp1),
  565:     escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2).
  566: 
  567: %% Ordinary user is not able to discover the hidden component from GD
  568: test_hidden_component_disco_in_different_region(Config) ->
  569:     %% Hidden component from component_SUITE connects to mim1/europe_node1
  570:     HiddenComponentConfig = component_helper:spec(hidden_component, Config),
  571:     {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig),
  572: 
  573:     escalus:fresh_story(
  574:       Config, [{eve, 1}],
  575:       fun(Eve) ->
  576:               EveServer = escalus_client:server(Eve),
  577:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  578:               DiscoReply = escalus:wait_for_stanza(Eve),
  579:               escalus:assert(is_iq_result, DiscoReply),
  580:               escalus:assert(fun(Stanza) ->
  581:                                      not escalus_pred:has_service(HiddenAddr, Stanza)
  582:                              end, DiscoReply)
  583:       end).
  584: 
  585: test_component_disconnect(Config) ->
  586:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  587:                        {port, service_port()}, {component, <<"test_service">>}],
  588: 
  589:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  590:     component_helper:disconnect_component(Comp, Addr),
  591: 
  592:     Story = fun(User) ->
  593:                     escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
  594:                     Error = escalus:wait_for_stanza(User, 5000),
  595:                     escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  596:             end,
  597: 
  598:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  599: 
  600: test_location_disconnect(Config) ->
  601:     try
  602:         escalus:fresh_story(
  603:           Config, [{alice, 1}, {eve, 1}],
  604:           fun(Alice, Eve) ->
  605:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  606: 
  607:                   escalus_client:wait_for_stanza(Eve),
  608: 
  609:                   ok = rpc(asia_node, application, stop, [mongooseim]),
  610:                   %% TODO: Stopping mongooseim alone should probably stop connections too
  611:                   ok = rpc(asia_node, application, stop, [ranch]),
  612: 
  613:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)),
  614:                   Error = escalus:wait_for_stanza(Alice),
  615:                   escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  616:           end)
  617:     after
  618:         rpc(asia_node, application, start, [ranch]),
  619:         rpc(asia_node, application, start, [mongooseim])
  620:     end.
  621: 
  622: test_pm_with_disconnection_on_other_server(Config) ->
  623:     escalus:fresh_story(
  624:       Config, [{alice, 1}, {eve, 1}],
  625:       fun(Alice, Eve) ->
  626:               escalus_connection:stop(Eve),
  627:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  628:               FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000),
  629:               escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce)
  630:       end).
  631: 
  632: test_pm_with_graceful_reconnection_to_different_server(Config) ->
  633:     EveSpec = ?config(evespec_reg, Config),
  634:     EveSpec2 = ?config(evespec_mim, Config),
  635:     escalus:fresh_story(
  636:       Config, [{alice, 1}],
  637:       fun(Alice) ->
  638:               Eve = connect_from_spec(EveSpec, Config),
  639: 
  640:               escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)),
  641: 
  642:               %% Stop connection and wait for process to die
  643:               EveNode = ct:get_config({hosts, reg, node}),
  644:               mongoose_helper:logout_user(Config, Eve, #{node => EveNode}),
  645: 
  646:               FromEve = escalus_client:wait_for_stanza(Alice),
  647: 
  648:               %% Pause Alice until Eve is reconnected
  649:               AliceNode = ct:get_config({hosts, mim, node}),
  650:               C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}),
  651:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  652: 
  653:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)),
  654: 
  655:               NewEve = connect_from_spec(EveSpec2, Config),
  656:               EveNode2 = ct:get_config({hosts, mim, node}),
  657:               wait_for_registration(NewEve, EveNode2),
  658: 
  659:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  660: 
  661: 
  662:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)),
  663:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)),
  664: 
  665:               FirstFromAlice = escalus_client:wait_for_stanza(NewEve),
  666:               AgainFromEve = escalus_client:wait_for_stanza(Alice),
  667:               SecondFromAlice = escalus_client:wait_for_stanza(NewEve),
  668: 
  669:               [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]),
  670: 
  671:               escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice),
  672:               escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve),
  673:               escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice),
  674:               escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve)
  675:           end).
  676: 
  677: %% Refresh logic can cause two possible behaviours.
  678: %% We test both behaviours here (plus no refresh case)
  679: %% See PR #2392
  680: test_pm_with_ungraceful_reconnection_to_different_server(Config) ->
  681:     %% No refresh
  682:     BeforeResume = fun() -> ok end,
  683:     AfterCheck = fun(Alice, NewEve) ->
  684:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  685:             user_receives(Alice, [<<"Hi from Europe!">>])
  686:          end,
  687:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  688: 
  689: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) ->
  690:     %% Same as no refresh
  691:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first",
  692:     % Order of nodes is important here in refresh_hosts!
  693:     BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end,
  694:     AfterCheck = fun(Alice, NewEve) ->
  695:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  696:             user_receives(Alice, [<<"Hi from Europe!">>])
  697:          end,
  698:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  699: 
  700: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) ->
  701:     %% Asia node overrides Europe value with the older ones,
  702:     %% so we loose some messages during rerouting :(
  703:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first",
  704:     BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end,
  705:     AfterCheck = fun(Alice, NewEve) ->
  706:             user_receives(NewEve, [<<"Hi again from Europe1!">>]),
  707:             user_receives(Alice, [<<"Hi from Europe!">>])
  708:          end,
  709:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  710: 
  711: %% Reconnect Eve from asia (reg cluster) to europe (mim)
  712: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) ->
  713:     Config = escalus_users:update_userspec(Config0, eve, stream_management, true),
  714:     EveSpec = ?config(evespec_reg, Config),
  715:     EveSpec2 = ?config(evespec_mim, Config),
  716:     escalus:fresh_story(
  717:       Config, [{alice, 1}],
  718:       fun(Alice) ->
  719:               {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()),
  720:               escalus_story:send_initial_presence(Eve),
  721:               escalus_client:wait_for_stanza(Eve),
  722: 
  723:               %% Stop connection and wait for process to die
  724:               EveNode = ct:get_config({hosts, reg, node}),
  725:               C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}),
  726:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  727: 
  728:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)),
  729: 
  730:               %% Wait for route message to be queued in c2s message queue
  731:               mongoose_helper:wait_for_route_message_count(C2sPid, 1),
  732: 
  733:               %% Time to do bad nasty things with our socket, so once our process wakes up,
  734:               %% it SHOULD detect a dead socket
  735:               escalus_connection:kill(Eve),
  736: 
  737:               %% Connect another one, we hope the message would be rerouted
  738:               NewEve = connect_from_spec(EveSpec2, Config),
  739:               EveNode2 = ct:get_config({hosts, mim, node}),
  740:               wait_for_registration(NewEve, EveNode2),
  741: 
  742:               BeforeResume(),
  743: 
  744:               %% Trigger rerouting
  745:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  746:               C2sPid ! resume_timeout,
  747: 
  748:               %% Let C2sPid to process the message and reroute (and die finally, poor little thing)
  749:               mongoose_helper:wait_for_pid_to_die(C2sPid),
  750: 
  751:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)),
  752:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)),
  753: 
  754:               AfterCheck(Alice, NewEve)
  755:           end).
  756: 
  757: test_global_disco(Config) ->
  758:     escalus:fresh_story(
  759:       Config, [{alice, 1}, {eve, 1}],
  760:       fun(Alice, Eve) ->
  761:               AliceServer = escalus_client:server(Alice),
  762:               escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)),
  763:               _AliceStanza = escalus:wait_for_stanza(Alice),
  764:               %% TODO: test for duplicate components
  765:               %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza),
  766: 
  767:               EveServer = escalus_client:server(Eve),
  768:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  769:               EveStanza = escalus:wait_for_stanza(Eve),
  770:               escalus:assert(has_service, [muc_helper:muc_host()], EveStanza)
  771:       end).
  772: 
  773: test_component_unregister(_Config) ->
  774:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  775:                        {port, service_port()}, {component, <<"test_service">>}],
  776: 
  777:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  778:     ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  779:                               [<<"test_service.localhost">>])),
  780: 
  781:     component_helper:disconnect_component(Comp, Addr),
  782: 
  783:     ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  784:                             [<<"test_service.localhost">>])).
  785: 
  786: test_error_on_wrong_hosts(_Config) ->
  787:     Opts = [{cookie, "cookie"}, {local_host, "no_such_host"}, {global_host, "localhost"}],
  788:     ?assertException(error, {badrpc, {'EXIT', {"no_such_host is not a member of the host list", _}}},
  789:                      dynamic_modules:start(node_spec(europe_node1), <<"localhost">>,
  790:                                            mod_global_distrib, Opts)).
  791: 
  792: refresh_nodes(Config) ->
  793:     NodesKey = ?config(nodes_key, Config),
  794:     NodeBin = ?config(node_to_expire, Config),
  795:     redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]),
  796:     refresh_mappings(europe_node1, "by_refresh_nodes"),
  797:     {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]).
  798: 
  799: test_in_order_messages_on_multiple_connections(Config) ->
  800:     escalus:fresh_story(
  801:       Config, [{alice, 1}, {eve, 1}],
  802:       fun(Alice, Eve) ->
  803:               Seq = lists:seq(1, 100),
  804:               lists:foreach(
  805:                 fun(I) ->
  806:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  807:                         escalus_client:send(Alice, Stanza)
  808:                 end,
  809:                 Seq),
  810:               lists:foreach(
  811:                 fun(I) ->
  812:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  813:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  814:                 end,
  815:                 Seq)
  816:       end).
  817: 
  818: test_in_order_messages_on_multiple_connections_with_bounce(Config) ->
  819:     escalus:fresh_story(
  820:       Config, [{alice, 1}, {eve, 1}],
  821:       fun(Alice, Eve) ->
  822:               %% Send 99 messages, some while server knows the mapping and some when it doesn't
  823:               send_steps(Alice, Eve, 99, <<"reg1">>),
  824:               %% Make sure that the last message is sent when the mapping is known
  825:               set_mapping(europe_node1, Eve, <<"reg1">>),
  826:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),
  827: 
  828:               %% Check that all stanzas were received in order
  829:               lists:foreach(
  830:                 fun(I) ->
  831:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  832:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  833:                 end,
  834:                 lists:seq(1, 100))
  835:       end).
  836: 
  837: test_messages_bounced_in_order(Config) ->
  838:     escalus:fresh_story(
  839:       Config, [{alice, 1}, {eve, 1}],
  840:       fun(Alice, Eve) ->
  841:               %% Make sure all messages land in bounce storage
  842:               delete_mapping(europe_node1, Eve),
  843: 
  844:               Seq = lists:seq(1, 100),
  845:               lists:foreach(
  846:                 fun(I) ->
  847:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  848:                         escalus_client:send(Alice, Stanza)
  849:                 end,
  850:                 Seq),
  851: 
  852:               %% Restore the mapping so that bounce eventually succeeds
  853:               ?assertEqual(undefined, get_mapping(europe_node1, Eve)),
  854:               set_mapping(europe_node1, Eve, <<"reg1">>),
  855: 
  856:               lists:foreach(
  857:                 fun(I) ->
  858:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  859:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  860:                 end,
  861:                 Seq)
  862:       end).
  863: 
  864: test_update_senders_host(Config) ->
  865:     escalus:fresh_story(
  866:       Config, [{alice, 1}, {eve, 1}],
  867:       fun(Alice, Eve) ->
  868:               AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]),
  869:               {ok, <<"localhost.bis">>}
  870:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]),
  871:               ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]),
  872:               wait_for_node(asia_node, AliceJid),
  873: 
  874:               %% TODO: Should prevent Redis refresher from executing for a moment,
  875:               %%       as it may collide with this test.
  876: 
  877:               escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)),
  878:               escalus:wait_for_stanza(Eve),
  879: 
  880:               {ok, <<"localhost.bis">>}
  881:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid])
  882:       end).
  883: wait_for_node(Node,Jid) ->
  884:     mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end,
  885:                                error,
  886:                                #{time_left => timer:seconds(10),
  887:                                  sleep_time => timer:seconds(1),
  888:                                  name => rpc}).
  889: 
  890: test_update_senders_host_by_ejd_service(Config) ->
  891:     refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"),
  892:     %% Connects to europe_node1
  893:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  894:                        {port, service_port()}, {component, <<"test_service">>}],
  895: 
  896:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  897: 
  898:     escalus:fresh_story(
  899:       Config, [{eve, 1}],
  900:       fun(Eve) ->
  901:               %% Eve is connected to asia_node
  902:               EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]),
  903:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  904:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]),
  905: 
  906:               ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]),
  907:               wait_for_node(europe_node1, EveJid),
  908:               wait_for_node(europe_node2, EveJid),
  909: 
  910:               %% Component is connected to europe_node1
  911:               %% but we force asia_node to connect to europe_node2 by hiding europe_node1
  912:               %% and forcing rebalance (effectively disabling connections to europe_node1)
  913:               %% to verify routing cache update on both nodes
  914: 
  915:               %% TODO: Should prevent Redis refresher from executing for a moment,
  916:               %%       as it may collide with this test.
  917: 
  918:               hide_node(europe_node1, Config),
  919:               {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  920:               trigger_rebalance(asia_node, list_to_binary(EuropeHost)),
  921: 
  922:               escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)),
  923:               escalus:wait_for_stanza(Comp),
  924: 
  925:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  926:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid])
  927:       end).
  928: 
  929: %% -------------------------------- Rebalancing --------------------------------
  930: 
  931: enable_new_endpoint_on_refresh(Config) ->
  932:     get_connection(europe_node1, <<"reg1">>),
  933: 
  934:     {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  935: 
  936:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  937:     NewEndpoint = enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  938: 
  939:     {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  940: 
  941:     %% One new pool and one new endpoint
  942:     [NewEndpoint] = Pools2 -- Pools1,
  943:     [] = Pools1 -- Pools2,
  944:     [NewEndpoint] = Enabled2 -- Enabled1,
  945:     [] = Enabled1 -- Enabled2.
  946: 
  947: disable_endpoint_on_refresh(Config) ->
  948:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  949:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  950: 
  951:     get_connection(europe_node1, <<"reg1">>),
  952: 
  953:     {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  954:     [_, _] = Enabled1,
  955:     [] = Disabled1,
  956: 
  957:     hide_extra_endpoint(asia_node),
  958:     trigger_rebalance(europe_node1, <<"reg1">>),
  959: 
  960:     {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  961: 
  962:     %% 2 pools open even after disable
  963:     [] = Pools1 -- Pools2,
  964:     [] = Pools2 -- Pools1,
  965:     %% NewEndpoint is no longer enabled
  966:     [] = Enabled2 -- Enabled1,
  967:     [NewEndpoint] = Enabled1 -- Enabled2,
  968:     %% NewEndpoint is now disabled
  969:     [] = Disabled1,
  970:     [NewEndpoint] = Disabled2.
  971: 
  972: wait_for_connection(_Config) ->
  973:     set_endpoints(asia_node, []),
  974:     %% Because of hosts refresher, a pool of connections to asia_node
  975:     %% may already be present here
  976:     mongoose_helper:wait_until(
  977:                                 fun () ->
  978:                                     try trigger_rebalance(europe_node1, <<"reg1">>), true
  979:                                     catch _:_ -> false end
  980:                                 end,
  981:                                 true,
  982:                                 #{name => rebalance, time_left => timer:seconds(5)}),
  983: 
  984:     spawn_connection_getter(europe_node1),
  985: 
  986:     receive
  987:         Unexpected1 -> error({unexpected, Unexpected1})
  988:     after
  989:         2000 -> ok
  990:     end,
  991: 
  992:     refresh_mappings(asia_node, "by_wait_for_connection"),
  993:     trigger_rebalance(europe_node1, <<"reg1">>),
  994: 
  995:     receive
  996:         Conn when is_pid(Conn) -> ok;
  997:         Unexpected2 -> error({unexpected, Unexpected2})
  998:     after
  999:         5000 -> error(timeout)
 1000:     end.
 1001: 
 1002: closed_connection_is_removed_from_disabled(_Config) ->
 1003:     get_connection(europe_node1, <<"reg1">>),
 1004:     set_endpoints(asia_node, []),
 1005:     trigger_rebalance(europe_node1, <<"reg1">>),
 1006: 
 1007:     {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>),
 1008: 
 1009:     % Will drop connections and prevent them from reconnecting
 1010:     restart_receiver(asia_node, [listen_endpoint(get_port(reg, gd_supplementary_endpoint_port))]),
 1011: 
 1012:     mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end,
 1013:                                {[], [], []},
 1014:                               #{name => get_outgoing_connections}).
 1015: 
 1016: 
 1017: %%--------------------------------------------------------------------
 1018: %% Test helpers
 1019: %%--------------------------------------------------------------------
 1020: 
 1021: get_port(Host, Param) ->
 1022:     case ct:get_config({hosts, Host, Param}) of
 1023:         Port when is_integer(Port) ->
 1024:             Port;
 1025:         Other ->
 1026:             ct:fail({get_port_failed, Host, Param, Other})
 1027:     end.
 1028: 
 1029: get_hosts() ->
 1030:     [
 1031:      {europe_node1, "localhost.bis", get_port(mim, gd_endpoint_port)},
 1032:      {europe_node2, "localhost.bis", get_port(mim2, gd_endpoint_port)},
 1033:      {asia_node, "reg1", get_port(reg, gd_endpoint_port)}
 1034:     ].
 1035: 
 1036: listen_endpoint(NodeName) when is_atom(NodeName) ->
 1037:     {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()),
 1038:     listen_endpoint(Port);
 1039: listen_endpoint(Port) when is_integer(Port) ->
 1040:     {{127, 0, 0, 1}, Port}.
 1041: 
 1042: %% For dynamic_modules
 1043: node_spec(NodeName) ->
 1044:     #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}.
 1045: 
 1046: rpc(NodeName, M, F, A) ->
 1047:     Node = ct:get_config(NodeName),
 1048:     mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)).
 1049: 
 1050: hide_node(NodeName, Config) ->
 1051:     NodesKey = ?config(nodes_key, Config),
 1052:     NodeBin = atom_to_binary(ct:get_config(NodeName), latin1),
 1053:     {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]).
 1054: 
 1055: connect_from_spec(UserSpec, Config) ->
 1056:     {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>),
 1057:     escalus_connection:set_filter_predicate(User, fun(S) -> not escalus_pred:is_presence(S) end),
 1058:     escalus_story:send_initial_presence(User),
 1059:     User.
 1060: 
 1061: chat_with_seqnum(To, Text) ->
 1062:     escalus_stanza:set_id(escalus_stanza:chat_to(To, Text),
 1063:                           integer_to_binary(erlang:monotonic_time())).
 1064: 
 1065: order_by_seqnum(Stanzas) ->
 1066:     lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end,
 1067:                Stanzas).
 1068: 
 1069: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) ->
 1070:     Pred = fun(Item) ->
 1071:                    exml_query:attr(Item, <<"jid">>) =:= Service
 1072:            end,
 1073:     case lists:filter(Pred, Services) of
 1074:         [_] -> true;
 1075:         _ -> false
 1076:     end.
 1077: 
 1078: send_steps(From, To, Max, ToHost) ->
 1079:     next_send_step(From, To, 1, Max, Max div 10, true, ToHost).
 1080: 
 1081: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok;
 1082: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) ->
 1083:     ct:log("Reset: I: ~B", [I]),
 1084:     case KnowsMapping of
 1085:         true -> delete_mapping(europe_node1, To);
 1086:         false -> set_mapping(europe_node1, To, ToHost)
 1087:     end,
 1088:     next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost);
 1089: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) ->
 1090:     ct:log("I: ~B ~B ~B", [I, Max, ToReset]),
 1091:     Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)),
 1092:     escalus_client:send(From, Stanza),
 1093:     next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost).
 1094: 
 1095: get_mapping(Node, Client) ->
 1096:     {FullJid, _BareJid} = jids(Client),
 1097:     {ok, What} = redis_query(Node, [<<"GET">>, FullJid]),
 1098:     What.
 1099: 
 1100: %% Warning! May not work properly with alice or any other user whose
 1101: %% stringprepped JID is different than original one
 1102: delete_mapping(Node, Client) ->
 1103:     {FullJid, BareJid} = jids(Client),
 1104:     redis_query(Node, [<<"DEL">>, FullJid, BareJid]),
 1105:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1106:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1107: 
 1108: set_mapping(Node, Client, Mapping) ->
 1109:     {FullJid, BareJid} = jids(Client),
 1110:     redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]),
 1111:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1112:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1113: 
 1114: jids(Client) ->
 1115:     FullJid = escalus_client:full_jid(Client),
 1116:     BareJid = escalus_client:short_jid(Client),
 1117:     {FullJid, BareJid}.
 1118: 
 1119: redis_query(Node, Query) ->
 1120:     {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]),
 1121:     rpc(Node, eredis, q, [RedisWorker, Query]).
 1122: 
 1123: %% A fake address we don't try to connect to.
 1124: %% Used in test_advertised_endpoints_override_endpoints testcase.
 1125: advertised_endpoints() ->
 1126:     [
 1127:      {fake_domain(), get_port(reg, gd_endpoint_port)}
 1128:     ].
 1129: 
 1130: fake_domain() ->
 1131:     "somefakedomain.com".
 1132: 
 1133: iptuples_to_string([]) ->
 1134:     [];
 1135: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) ->
 1136:     [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)];
 1137: iptuples_to_string([E | Endps]) ->
 1138:     [E | iptuples_to_string(Endps)].
 1139: 
 1140: maybe_add_advertised_endpoints(NodeName, Opts, Config) ->
 1141:     Endpoints = proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), []),
 1142:     case Endpoints of
 1143:         [] ->
 1144:             Opts;
 1145:         E ->
 1146:             Connections = case lists:keyfind(connections, 1, Opts) of
 1147:                               false -> [];
 1148:                               C -> C
 1149:                           end,
 1150:             NewConnections = {connections, [{advertised_endpoints, E} | Connections]},
 1151:             [NewConnections | Opts]
 1152:     end.
 1153: 
 1154: mock_inet_on_each_node() ->
 1155:     Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()),
 1156:     Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes),
 1157:     true = lists:all(fun(Result) -> Result =:= ok end, Results).
 1158: 
 1159: execute_on_each_node(M, F, A) ->
 1160:     lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()).
 1161: 
 1162: mock_inet() ->
 1163:     %% We don't want to mock inet module itself to avoid strange networking issues
 1164:     meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]),
 1165:     meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]};
 1166:                                                        (_, inet6) -> {error, "No ipv6 address"} end).
 1167: 
 1168: unmock_inet(_Pids) ->
 1169:     execute_on_each_node(meck, unload, [mod_global_distrib_utils]).
 1170: 
 1171: out_connection_sups(Node) ->
 1172:     Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]),
 1173:     lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children).
 1174: 
 1175: trees_for_connections_present() ->
 1176:     AsiaChildren = out_connection_sups(asia_node),
 1177:     Europe1Children = out_connection_sups(europe_node1),
 1178:     Europe2Children = out_connection_sups(europe_node2),
 1179:     lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]).
 1180: 
 1181: tree_for_sup_present(Node, ExpectedSup) ->
 1182:     Children = out_connection_sups(Node),
 1183:     lists:keyfind(ExpectedSup, 1, Children) =/= false.
 1184: 
 1185: 
 1186: %% ------------------------------- rebalancing helpers -----------------------------------
 1187: 
 1188: spawn_connection_getter(SenderNode) ->
 1189:     TestPid = self(),
 1190:     spawn(fun() ->
 1191:                   Conn = get_connection(SenderNode, <<"reg1">>),
 1192:                   TestPid ! Conn
 1193:           end).
 1194: 
 1195: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) ->
 1196:     OriginalEndpoint = listen_endpoint(ListenNode),
 1197:     NewEndpoint = {{127, 0, 0, 1}, Port},
 1198: 
 1199:     restart_receiver(ListenNode, [NewEndpoint, OriginalEndpoint]),
 1200:     refresh_mappings(ListenNode, "by_enable_extra_endpoint,port=" ++ integer_to_list(Port)),
 1201:     trigger_rebalance(SenderNode, <<"reg1">>),
 1202: 
 1203:     NewEndpoint.
 1204: 
 1205: get_connection(SenderNode, ToDomain) ->
 1206:     rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]).
 1207: 
 1208: hide_extra_endpoint(ListenNode) ->
 1209:     set_endpoints(ListenNode, [listen_endpoint(ListenNode)]).
 1210: 
 1211: set_endpoints(ListenNode, Endpoints) ->
 1212:     {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]).
 1213: 
 1214: get_outgoing_connections(NodeName, DestinationDomain) ->
 1215:     Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]),
 1216:     Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]),
 1217:     Enabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1218:                   get_enabled_endpoints, [DestinationDomain]),
 1219:     Disabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1220:                    get_disabled_endpoints, [DestinationDomain]),
 1221:     PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]),
 1222:     Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ],
 1223:     {Enabled, Disabled, Pools}.
 1224: 
 1225: restart_receiver(NodeName) ->
 1226:     restart_receiver(NodeName, [listen_endpoint(NodeName)]).
 1227: 
 1228: restart_receiver(NodeName, NewEndpoints) ->
 1229:     OldOpts = rpc(NodeName, gen_mod, get_module_opts,
 1230:                   [<<"localhost">>, mod_global_distrib_receiver]),
 1231:     NewOpts = lists:keyreplace(endpoints, 1, OldOpts, {endpoints, NewEndpoints}),
 1232:     Node = node_spec(NodeName),
 1233:     dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts).
 1234: 
 1235: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) ->
 1236:     %% To ensure that the manager exists,
 1237:     %% otherwise we can get noproc error in the force_refresh call
 1238:     ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup,
 1239:              ensure_server_started, [DestinationDomain]),
 1240:     rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]),
 1241:     StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]),
 1242:     ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]),
 1243:     timer:sleep(1000).
 1244: 
 1245: %% -----------------------------------------------------------------------
 1246: %% Escalus-related helpers
 1247: 
 1248: user_receives(User, Bodies) ->
 1249:     ExpectedLength = length(Bodies),
 1250:     Messages = escalus_client:wait_for_stanzas(User, ExpectedLength),
 1251:     SortedMessages = order_by_seqnum(Messages),
 1252:     case length(Messages) of
 1253:         ExpectedLength ->
 1254:             Checks = [escalus_pred:is_chat_message(Body, Stanza) || {Body, Stanza} <- lists:zip(Bodies, SortedMessages)],
 1255:             case lists:all(fun(Check) -> Check end, Checks) of
 1256:                 true ->
 1257:                     ok;
 1258:                 false ->
 1259:                     ct:fail({user_receives_failed, {wanted, Bodies}, {received, SortedMessages}, {check, Checks}})
 1260:             end;
 1261:         _ ->
 1262:             ct:fail({user_receives_not_enough, {wanted, Bodies}, {received, SortedMessages}})
 1263:     end.
 1264: 
 1265: 
 1266: %% -----------------------------------------------------------------------
 1267: %% Refreshing helpers
 1268: 
 1269: %% Reason is a string
 1270: %% NodeName is asia_node, europe_node2, ... in a format used by this suite.
 1271: refresh_mappings(NodeName, Reason) when is_list(Reason) ->
 1272:     rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]).
 1273: 
 1274: refresh_hosts(NodeNames, Reason) ->
 1275:    [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames].
 1276: 
 1277: 
 1278: %% -----------------------------------------------------------------------
 1279: %% Other helpers
 1280: 
 1281: connect_steps_with_sm() ->
 1282:     [start_stream, stream_features, maybe_use_ssl,
 1283:      authenticate, bind, session, stream_resumption].
 1284: 
 1285: bare_client(Client) ->
 1286:     Client#client{jid = escalus_utils:get_short_jid(Client)}.
 1287: 
 1288: service_port() ->
 1289:     ct:get_config({hosts, mim, service_port}).
 1290: 
 1291: 
 1292: %% -----------------------------------------------------------------------
 1293: %% Waiting helpers
 1294: 
 1295: wait_for_domain(Node, Domain) ->
 1296:     F = fun() ->
 1297:         {ok, Domains} = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
 1298:         lists:member(Domain, Domains)
 1299:         end,
 1300:     mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}).
 1301: 
 1302: %% We receive presence BEFORE session is registered in ejabberd_sm.
 1303: %% So, to ensure that we processed do_open_session completely, let's send a "ping".
 1304: %% by calling the c2s process.
 1305: %% That call would only return, when all messages in erlang message queue
 1306: %% are processed.
 1307: wait_for_registration(Client, Node) ->
 1308:     RPCSpec = #{node => Node},
 1309:     mongoose_helper:wait_until(fun() -> is_pid(mongoose_helper:get_session_pid(Client, RPCSpec)) end, true,
 1310:                                #{name => wait_for_session}),
 1311:     C2sPid = mongoose_helper:get_session_pid(Client, RPCSpec),
 1312:     rpc:call(node(C2sPid), ejabberd_c2s, get_info, [C2sPid]),
 1313:     ok.
 1314: 
 1315: 
 1316: %% -----------------------------------------------------------------------
 1317: %% Ensure, that endpoints are up
 1318: 
 1319: wait_for_listeners_to_appear() ->
 1320:     [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())].
 1321: 
 1322: receiver_ports(Hosts) ->
 1323:     lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts).
 1324: 
 1325: wait_for_can_connect_to_port(Port) ->
 1326:     Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}},
 1327:     mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts).
 1328: 
 1329: can_connect_to_port(Port) ->
 1330:     case gen_tcp:connect("127.0.0.1", Port, []) of
 1331:         {ok, Sock} ->
 1332:             gen_tcp:close(Sock),
 1333:             true;
 1334:         Other ->
 1335:             ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]),
 1336:             false
 1337:     end.
 1338: 
 1339: %% -----------------------------------------------------------------------
 1340: %% Custom log levels for GD modules during the tests
 1341: 
 1342: enable_logging() ->
 1343:     mim_loglevel:enable_logging(test_hosts(), custom_loglevels()).
 1344: 
 1345: disable_logging() ->
 1346:     mim_loglevel:disable_logging(test_hosts(), custom_loglevels()).
 1347: 
 1348: custom_loglevels() ->
 1349:     %% for "s2s connection to muc.localhost not found" debugging
 1350:     [{ejabberd_s2s, debug},
 1351:     %% for debugging event=refreshing_own_data_done
 1352:      {mod_global_distrib_mapping_redis, info},
 1353:     %% to know if connection is already started or would be started
 1354:     %% event=outgoing_conn_start_progress
 1355:      {mod_global_distrib_outgoing_conns_sup, info},
 1356:     %% to debug bound connection issues
 1357:      {mod_global_distrib, debug},
 1358:     %% to know all new connections pids
 1359:      {mod_global_distrib_connection, debug},
 1360:     %% to check if gc or refresh is triggered
 1361:      {mod_global_distrib_server_mgr, info},
 1362:    %% To debug incoming connections
 1363: %    {mod_global_distrib_receiver, info},
 1364:    %% to debug global session set/delete
 1365:      {mod_global_distrib_mapping, debug}
 1366:     ].
 1367: 
 1368: test_hosts() -> [mim, mim2, reg].