1: %%==============================================================================
    2: %% Copyright 2017 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mod_global_distrib_SUITE).
   18: 
   19: -compile([export_all, nowarn_export_all]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("common_test/include/ct.hrl").
   23: -include_lib("eunit/include/eunit.hrl").
   24: -include_lib("escalus/include/escalus_xmlns.hrl").
   25: -include_lib("exml/include/exml.hrl").
   26: 
   27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms
   28: 
   29: -import(domain_helper, [domain/0]).
   30: -import(config_parser_helper, [config/2, mod_config/2]).
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Suite configuration
   34: %%--------------------------------------------------------------------
   35: 
   36: all() ->
   37:     [
   38:      {group, mod_global_distrib},
   39:      {group, cluster_restart},
   40:      {group, start_checks},
   41:      {group, invalidation},
   42:      {group, multi_connection},
   43:      {group, rebalancing},
   44:      {group, advertised_endpoints},
   45:      {group, hosts_refresher}
   46:     ].
   47: 
   48: groups() ->
   49:     [{mod_global_distrib, [],
   50:           [
   51:            test_pm_between_users_at_different_locations,
   52:            test_pm_between_users_before_available_presence,
   53:            test_component_disconnect ,
   54:            test_component_on_one_host,
   55:            test_components_in_different_regions,
   56:            test_hidden_component_disco_in_different_region,
   57:            test_pm_with_disconnection_on_other_server,
   58:            test_pm_with_graceful_reconnection_to_different_server,
   59:            test_pm_with_ungraceful_reconnection_to_different_server,
   60:            test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first,
   61:            test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first,
   62:            test_component_unregister,
   63:            test_update_senders_host,
   64:            test_update_senders_host_by_ejd_service,
   65: 
   66:            %% with node 2 disabled
   67:            test_muc_conversation_on_one_host,
   68:            test_global_disco
   69:           ]},
   70:          {hosts_refresher, [],
   71:           [test_host_refreshing]},
   72:          {cluster_restart, [],
   73:           [
   74:            test_location_disconnect
   75:           ]},
   76:          {start_checks, [],
   77:           [
   78:            test_error_on_wrong_hosts
   79:           ]},
   80:          {invalidation, [],
   81:           [
   82:            % TODO: Add checks for other mapping refreshes
   83:            refresh_nodes
   84:           ]},
   85:          {multi_connection, [],
   86:           [
   87:            test_in_order_messages_on_multiple_connections,
   88:            test_in_order_messages_on_multiple_connections_with_bounce,
   89:            test_messages_bounced_in_order,
   90: 
   91:            %% with node 2 disabled
   92:            test_muc_conversation_history
   93:           ]},
   94:          {rebalancing, [],
   95:           [
   96:            enable_new_endpoint_on_refresh,
   97:            disable_endpoint_on_refresh,
   98:            wait_for_connection,
   99:            closed_connection_is_removed_from_disabled
  100:           ]},
  101:          {advertised_endpoints, [],
  102:           [
  103:            test_advertised_endpoints_override_endpoints,
  104:            test_pm_between_users_at_different_locations
  105:           ]}
  106:     ].
  107: 
  108: suite() ->
  109:     [{require, europe_node1, {hosts, mim, node}},
  110:      {require, europe_node2, {hosts, mim2, node}},
  111:      {require, asia_node, {hosts, reg, node}},
  112:      {require, c2s_port, {hosts, mim, c2s_port}} |
  113:      escalus:suite()].
  114: 
  115: %%--------------------------------------------------------------------
  116: %% Init & teardown
  117: %%--------------------------------------------------------------------
  118: 
  119: init_per_suite(Config) ->
  120:     case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]),
  121:           rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of
  122:         {{ok, _}, {ok, _}} ->
  123:             ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]),
  124: 
  125:             enable_logging(),
  126:             escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]);
  127:         Result ->
  128:             ct:pal("Redis check result: ~p", [Result]),
  129:             {skip, "GD Redis default pool not available"}
  130:     end.
  131: 
  132: end_per_suite(Config) ->
  133:     disable_logging(),
  134:     escalus_fresh:clean(),
  135:     rpc(europe_node2, mongoose_cluster, leave, []),
  136:     escalus:end_per_suite(Config).
  137: 
  138: init_per_group(start_checks, Config) ->
  139:     NodeName = europe_node1,
  140:     NodeSpec = node_spec(NodeName),
  141:     Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config),
  142:     dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]),
  143:     Config1;
  144: init_per_group(multi_connection, Config) ->
  145:     ExtraConfig = #{bounce => #{resend_after_ms => 20000},
  146:                     connections => #{%% Disable unused feature to avoid interference
  147:                                      connections_per_endpoint => 100,
  148:                                      disabled_gc_interval => 10000}},
  149:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  150: init_per_group(invalidation, Config) ->
  151:     Config1 = init_per_group(invalidation_generic, Config),
  152:     NodeBin = <<"fake_node@localhost">>,
  153:     [{node_to_expire, NodeBin} | Config1];
  154: init_per_group(rebalancing, Config) ->
  155:     %% We need to prevent automatic refreshes, because they may interfere with tests
  156:     %% and we need early disabled garbage collection to check its validity
  157:     ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600,
  158:                                      endpoint_refresh_interval_when_empty => 3600,
  159:                                      disabled_gc_interval => 1},
  160:                     redis => #{refresh_after => 3600}},
  161:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  162: init_per_group(advertised_endpoints, Config) ->
  163:     lists:foreach(fun({NodeName, _, _}) ->
  164:                           Node = ct:get_config(NodeName),
  165:                           mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload)
  166:                   end, get_hosts()),
  167:     mock_inet_on_each_node(),
  168:     init_per_group_generic(
  169:                [{add_advertised_endpoints,
  170:                  [{asia_node, advertised_endpoints()}]} | Config]);
  171: init_per_group(mod_global_distrib, Config) ->
  172:     %% Disable mod_global_distrib_mapping_redis refresher
  173:     ExtraConfig = #{redis => #{refresh_after => 3600}},
  174:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  175: init_per_group(_, Config) ->
  176:     init_per_group_generic(Config).
  177: 
  178: init_per_group_generic(Config0) ->
  179:     Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()),
  180:     wait_for_listeners_to_appear(),
  181:     {SomeNode, _, _} = hd(get_hosts()),
  182:     NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []),
  183:     [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2].
  184: 
  185: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) ->
  186:     Extra0 = module_opts(?config(extra_config, Config0)),
  187:     EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0),
  188:     ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})),
  189:     Extra = Extra0#{local_host => LocalHost, connections => ConnExtra},
  190:     Opts = module_opts(Extra),
  191: 
  192:     VirtHosts = virtual_hosts(NodeName),
  193:     Node = node_spec(NodeName),
  194:     Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0),
  195: 
  196:     %% To reduce load when sending many messages
  197:     ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last,
  198:                      mod_stream_management],
  199:     [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts],
  200: 
  201:     SMOpts = config_parser_helper:mod_config(mod_stream_management, #{resume_timeout => 1}),
  202:     dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts},
  203:                                                     {mod_stream_management, SMOpts}]),
  204:     Config1.
  205: 
  206: module_opts(ExtraOpts) ->
  207:     lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]).
  208: 
  209: set_opts(common, Opts) ->
  210:     maps:merge(#{global_host => <<"localhost">>,
  211:                  hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts);
  212: set_opts(defaults, Opts) ->
  213:     mod_config(mod_global_distrib, Opts);
  214: set_opts(connections, #{connections := ConnExtra} = Opts) ->
  215:     TLSOpts = config([modules, mod_global_distrib, connections, tls],
  216:                      #{certfile => "priv/ssl/fake_server.pem",
  217:                        cacertfile => "priv/ssl/ca/cacert.pem"}),
  218:     Opts#{connections := config([modules, mod_global_distrib, connections],
  219:                                 maps:merge(#{tls => TLSOpts}, ConnExtra))};
  220: set_opts(redis, #{redis := RedisExtra} = Opts) ->
  221:     Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)};
  222: set_opts(bounce, #{bounce := BounceExtra} = Opts) ->
  223:     Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)};
  224: set_opts(_, Opts) ->
  225:     Opts.
  226: 
  227: end_per_group(advertised_endpoints, Config) ->
  228:     Pids = ?config(meck_handlers, Config),
  229:     unmock_inet(Pids),
  230:     escalus_fresh:clean(),
  231:     end_per_group_generic(Config);
  232: end_per_group(start_checks, Config) ->
  233:     escalus_fresh:clean(),
  234:     Config;
  235: end_per_group(invalidation, Config) ->
  236:     redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config),
  237:                             ?config(node_to_expire, Config)]),
  238:     end_per_group_generic(Config);
  239: end_per_group(_, Config) ->
  240:     end_per_group_generic(Config).
  241: 
  242: end_per_group_generic(Config) ->
  243:     dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config).
  244: 
  245: init_per_testcase(CaseName, Config)
  246:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  247:        CaseName == test_muc_conversation_history ->
  248:     %% There is no helper to load MUC on node2
  249:     %% For now it's easier to hide node2
  250:     %% TODO: Do it right at some point!
  251:     hide_node(europe_node2, Config),
  252:     %% There would be no new connections to europe_node2, but there can be some old ones.
  253:     %% We need to disconnect previous connections.
  254:     {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  255:     trigger_rebalance(asia_node, EuropeHost),
  256:     %% Load muc on mim node
  257:     muc_helper:load_muc(),
  258:     RegNode = ct:get_config({hosts, reg, node}),
  259:     %% Wait for muc.localhost to become visible from reg node
  260:     wait_for_domain(RegNode, muc_helper:muc_host()),
  261:     escalus:init_per_testcase(CaseName, Config);
  262: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  263:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server;
  264:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  265:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  266:     escalus:init_per_testcase(CN, init_user_eve(Config));
  267: init_per_testcase(CaseName, Config) ->
  268:     escalus:init_per_testcase(CaseName, Config).
  269: 
  270: init_user_eve(Config) ->
  271:     %% Register Eve in reg cluster
  272:     EveSpec = escalus_fresh:create_fresh_user(Config, eve),
  273:     MimPort = ct:get_config({hosts, mim, c2s_port}),
  274:     EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}),
  275:     %% Register Eve in mim cluster
  276:     escalus:create_users(Config, [{eve, EveSpec2}]),
  277:     [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config].
  278: 
  279: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  280:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server;
  281:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  282:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  283:     MimEveSpec = ?config(evespec_mim, Config),
  284:     %% Clean Eve from reg cluster
  285:     escalus_fresh:clean(),
  286:     %% Clean Eve from mim cluster
  287:     %% For shared databases (i.e. mysql, pgsql...),
  288:     %% removing from one cluster would remove from all clusters.
  289:     %% For mnesia auth backend we need to call removal from each cluster.
  290:     %% That's why there is a catch here.
  291:     catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]),
  292:     generic_end_per_testcase(CN, Config);
  293: end_per_testcase(CaseName, Config)
  294:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  295:        CaseName == test_muc_conversation_history ->
  296:     refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)),
  297:     muc_helper:unload_muc(),
  298:     generic_end_per_testcase(CaseName, Config);
  299: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) ->
  300:     refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  301:     generic_end_per_testcase(CN, Config);
  302: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh;
  303:                                   CN == disable_endpoint_on_refresh;
  304:                                   CN == wait_for_connection;
  305:                                   CN == closed_connection_is_removed_from_disabled ->
  306:     restart_receiver(asia_node),
  307:     refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  308:     generic_end_per_testcase(CN, Config);
  309: end_per_testcase(CaseName, Config) ->
  310:     generic_end_per_testcase(CaseName, Config).
  311: 
  312: generic_end_per_testcase(CaseName, Config) ->
  313:     lists:foreach(
  314:       fun({NodeName, _, _}) ->
  315:               %% TODO: Enable refresher only for specific test cases,
  316:               %% as some of them are based on assumption that node(s)
  317:               %% must open new connections during tests.
  318:               pause_refresher(NodeName, CaseName),
  319:               Node = ct:get_config(NodeName),
  320:               SupRef = {mod_global_distrib_outgoing_conns_sup, Node},
  321:               try
  322:                   OutgoingConns = supervisor:which_children(SupRef),
  323:                   lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) ->
  324:                                         skip;
  325:                                     ({Id, _, _, _}) ->
  326:                                         supervisor:terminate_child(SupRef, Id)
  327:                                 end, OutgoingConns),
  328:                   [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] =
  329:                     supervisor:which_children(SupRef)
  330:               catch
  331:                   _:{noproc, _} ->
  332:                       ct:pal("Sender supervisor not found in ~p", [NodeName])
  333:               end,
  334:               unpause_refresher(NodeName, CaseName)
  335:       end,
  336:       get_hosts()),
  337:     escalus:end_per_testcase(CaseName, Config).
  338: 
  339: virtual_hosts(asia_node) ->
  340:     [domain()];
  341: virtual_hosts(_) ->
  342:     [domain(), secondary_domain()].
  343: 
  344: secondary_domain() ->
  345:     ct:get_config({hosts, mim, secondary_domain}).
  346: 
  347: %% Refresher is not started at all or stopped for some test cases
  348: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  349: pause_refresher(_, test_error_on_wrong_hosts) ->
  350:     ok;
  351: pause_refresher(asia_node, test_location_disconnect) ->
  352:     ok;
  353: pause_refresher(NodeName, _) ->
  354:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []).
  355: 
  356: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  357: unpause_refresher(_, test_error_on_wrong_hosts) ->
  358:     ok;
  359: unpause_refresher(asia_node, test_location_disconnect) ->
  360:     ok;
  361: unpause_refresher(NodeName, _) ->
  362:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []).
  363: 
  364: %%--------------------------------------------------------------------
  365: %% Service discovery test
  366: %%--------------------------------------------------------------------
  367: 
  368: %% Requires module mod_global_distrib to be started with argument advertised_endpoints
  369: %% for each host in get_hosts().
  370: %% Reads Redis to confirm that endpoints (in Redis) are overwritten
  371: %% with `advertised_endpoints` option value
  372: test_advertised_endpoints_override_endpoints(_Config) ->
  373:     Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]),
  374:     true = lists:all(
  375:              fun(E) ->
  376:                      lists:sort(E) =:= lists:sort(advertised_endpoints())
  377:              end, Endps).
  378: 
  379: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if
  380: %% it goes down for some reason (crash or domain unavailability).
  381: %% Also actually verifies that refresher properly reads host list
  382: %% from backend and starts appropriate pool.
  383: test_host_refreshing(_Config) ->
  384:     mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true,
  385:                                #{name => trees_for_connections_present}),
  386:     ConnectionSups = out_connection_sups(asia_node),
  387:     {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  388:     EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]),
  389:     {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups),
  390:     erlang:exit(EuropePid, kill), % it's ok to kill temporary process
  391:     mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true,
  392:                                #{name => tree_for_sup_present}).
  393: 
  394: %% When run in mod_global_distrib group - tests simple case of connection
  395: %% between two users connected to different clusters.
  396: %% When run in advertised_endpoints group it tests whether it is possible
  397: %% to connect to a node that is advertising itself with a domain name.
  398: test_pm_between_users_at_different_locations(Config) ->
  399:     escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2).
  400: 
  401: test_pm_between_users_before_available_presence(Config) ->
  402:     Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]),
  403:     {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>),
  404:     {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>),
  405: 
  406:     test_two_way_pm(Alice, Eve),
  407: 
  408:     escalus_client:stop(Config1, Alice),
  409:     escalus_client:stop(Config1, Eve).
  410: 
  411: test_two_way_pm(Alice, Eve) ->
  412:     %% Ensure that users are properly registered
  413:     %% Otherwise you can get "Unable to route global message... user not found in the routing table"
  414:     %% error, because "escalus_client:start" can return before SM registration is completed.
  415:     wait_for_registration(Alice, ct:get_config({hosts, mim, node})),
  416:     wait_for_registration(Eve, ct:get_config({hosts, reg, node})),
  417: 
  418:     escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)),
  419:     escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)),
  420: 
  421:     FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)),
  422:     FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)),
  423: 
  424:     AliceJid = escalus_client:full_jid(Alice),
  425:     EveJid = escalus_client:full_jid(Eve),
  426: 
  427:     escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>],
  428:                    FromAlice),
  429:     escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>],
  430:                    FromEve).
  431: 
  432: test_muc_conversation_on_one_host(Config0) ->
  433:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  434:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  435:     escalus:fresh_story(
  436:       Config, [{eve, 1}],
  437:       fun(Eve) ->
  438:               Alice = connect_from_spec(AliceSpec, Config),
  439: 
  440:               RoomJid = ?config(room, Config),
  441:               AliceUsername = escalus_utils:get_username(Alice),
  442:               EveUsername = escalus_utils:get_username(Eve),
  443:               RoomAddr = muc_helper:room_address(RoomJid),
  444: 
  445:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  446:               escalus:wait_for_stanza(Alice),
  447: 
  448:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  449:               [_, _, _] = escalus:wait_for_stanzas(Eve, 3),
  450: 
  451:               Msg= <<"Hi, Eve!">>,
  452:               escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)),
  453:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)),
  454:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)),
  455: 
  456:               Msg2= <<"Hi, Alice!">>,
  457:               escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)),
  458:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)),
  459:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice))
  460:       end),
  461:     muc_helper:destroy_room(Config).
  462: 
  463: test_muc_conversation_history(Config0) ->
  464:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  465:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  466:     escalus:fresh_story(
  467:       Config, [{eve, 1}],
  468:       fun(Eve) ->
  469:               Alice = connect_from_spec(AliceSpec, Config),
  470: 
  471:               RoomJid = ?config(room, Config),
  472:               AliceUsername = escalus_utils:get_username(Alice),
  473:               RoomAddr = muc_helper:room_address(RoomJid),
  474: 
  475:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  476:               %% We don't care about presences from Alice, escalus would filter them out
  477:               wait_for_subject(Alice),
  478: 
  479:               send_n_muc_messages(Alice, RoomAddr, 3),
  480: 
  481:               %% Ensure that the messages are received by the room
  482:               %% before trying to login Eve.
  483:               %% Otherwise, Eve would receive some messages from history and
  484:               %% some as regular groupchat messages.
  485:               receive_n_muc_messages(Alice, 3),
  486: 
  487:               EveUsername = escalus_utils:get_username(Eve),
  488:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  489: 
  490:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  491:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  492: 
  493:               %% XEP-0045: After sending the presence broadcast (and only after doing so),
  494:               %% the service MAY then send discussion history, the room subject,
  495:               %% live messages, presence updates, and other in-room traffic.
  496:               receive_n_muc_messages(Eve, 3),
  497:               wait_for_subject(Eve)
  498:       end),
  499:     muc_helper:destroy_room(Config).
  500: 
  501: wait_for_muc_presence(User, RoomJid, FromNickname) ->
  502:     Presence = escalus:wait_for_stanza(User),
  503:     escalus:assert(is_presence, Presence),
  504:     escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence),
  505:     ok.
  506: 
  507: wait_for_subject(User) ->
  508:     Subject = escalus:wait_for_stanza(User),
  509:     escalus:assert(is_groupchat_message, Subject),
  510:     ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)),
  511:     ok.
  512: 
  513: send_n_muc_messages(User, RoomAddr, N) ->
  514:     lists:foreach(fun(I) ->
  515:                           Msg = <<"test-", (integer_to_binary(I))/binary>>,
  516:                           escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg))
  517:                   end, lists:seq(1, N)).
  518: 
  519: receive_n_muc_messages(User, N) ->
  520:     lists:foreach(fun(J) ->
  521:                           Msg = <<"test-", (integer_to_binary(J))/binary>>,
  522:                           Stanza = escalus:wait_for_stanza(User),
  523:                           escalus:assert(is_groupchat_message, [Msg], Stanza)
  524:                             end, lists:seq(1, N)).
  525: 
  526: test_component_on_one_host(Config) ->
  527:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  528:                        {port, service_port()}, {component, <<"test_service">>}],
  529: 
  530:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  531: 
  532:     Story = fun(User) ->
  533:                     Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>),
  534:                     escalus:send(User, Msg1),
  535:                     %% Then component receives it
  536:                     Reply1 = escalus:wait_for_stanza(Comp),
  537:                     escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1),
  538: 
  539:                     %% When components sends a reply
  540:                     Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>),
  541:                     escalus:send(Comp, escalus_stanza:from(Msg2, Addr)),
  542: 
  543:                     %% Then Alice receives it
  544:                     Reply2 = escalus:wait_for_stanza(User),
  545:                     escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2),
  546:                     escalus:assert(is_stanza_from, [Addr], Reply2)
  547:             end,
  548: 
  549:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  550: 
  551: %% Ensures that 2 components in distinct data centers can communicate.
  552: test_components_in_different_regions(_Config) ->
  553:     ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>},
  554:                              {server, <<"localhost">>}, {component, <<"test_service">>}],
  555:     Comp1Port = ct:get_config({hosts, mim, service_port}),
  556:     Comp2Port = ct:get_config({hosts, reg, service_port}),
  557:     Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig],
  558:     Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig],
  559: 
  560:     {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config),
  561:     {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config),
  562: 
  563:     Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1),
  564:     escalus:send(Comp1, Msg1),
  565:     GotMsg1 = escalus:wait_for_stanza(Comp2),
  566:     escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1),
  567: 
  568:     Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2),
  569:     escalus:send(Comp2, Msg2),
  570:     GotMsg2 = escalus:wait_for_stanza(Comp1),
  571:     escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2).
  572: 
  573: %% Ordinary user is not able to discover the hidden component from GD
  574: test_hidden_component_disco_in_different_region(Config) ->
  575:     %% Hidden component from component_SUITE connects to mim1/europe_node1
  576:     HiddenComponentConfig = component_helper:spec(hidden_component, Config),
  577:     {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig),
  578: 
  579:     escalus:fresh_story(
  580:       Config, [{eve, 1}],
  581:       fun(Eve) ->
  582:               EveServer = escalus_client:server(Eve),
  583:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  584:               DiscoReply = escalus:wait_for_stanza(Eve),
  585:               escalus:assert(is_iq_result, DiscoReply),
  586:               escalus:assert(fun(Stanza) ->
  587:                                      not escalus_pred:has_service(HiddenAddr, Stanza)
  588:                              end, DiscoReply)
  589:       end).
  590: 
  591: test_component_disconnect(Config) ->
  592:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  593:                        {port, service_port()}, {component, <<"test_service">>}],
  594: 
  595:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  596:     component_helper:disconnect_component(Comp, Addr),
  597: 
  598:     Story = fun(User) ->
  599:                     escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
  600:                     Error = escalus:wait_for_stanza(User, 5000),
  601:                     escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  602:             end,
  603: 
  604:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  605: 
  606: test_location_disconnect(Config) ->
  607:     try
  608:         escalus:fresh_story(
  609:           Config, [{alice, 1}, {eve, 1}],
  610:           fun(Alice, Eve) ->
  611:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  612: 
  613:                   escalus_client:wait_for_stanza(Eve),
  614: 
  615:                   ok = rpc(asia_node, application, stop, [mongooseim]),
  616:                   %% TODO: Stopping mongooseim alone should probably stop connections too
  617:                   ok = rpc(asia_node, application, stop, [ranch]),
  618: 
  619:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)),
  620:                   Error = escalus:wait_for_stanza(Alice),
  621:                   escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  622:           end)
  623:     after
  624:         rpc(asia_node, application, start, [ranch]),
  625:         rpc(asia_node, application, start, [mongooseim])
  626:     end.
  627: 
  628: test_pm_with_disconnection_on_other_server(Config) ->
  629:     escalus:fresh_story(
  630:       Config, [{alice, 1}, {eve, 1}],
  631:       fun(Alice, Eve) ->
  632:               escalus_connection:stop(Eve),
  633:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  634:               FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000),
  635:               escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce)
  636:       end).
  637: 
  638: test_pm_with_graceful_reconnection_to_different_server(Config) ->
  639:     EveSpec = ?config(evespec_reg, Config),
  640:     EveSpec2 = ?config(evespec_mim, Config),
  641:     escalus:fresh_story(
  642:       Config, [{alice, 1}],
  643:       fun(Alice) ->
  644:               Eve = connect_from_spec(EveSpec, Config),
  645: 
  646:               escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)),
  647: 
  648:               %% Stop connection and wait for process to die
  649:               EveNode = ct:get_config({hosts, reg, node}),
  650:               mongoose_helper:logout_user(Config, Eve, #{node => EveNode}),
  651: 
  652:               FromEve = escalus_client:wait_for_stanza(Alice),
  653: 
  654:               %% Pause Alice until Eve is reconnected
  655:               AliceNode = ct:get_config({hosts, mim, node}),
  656:               C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}),
  657:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  658: 
  659:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)),
  660: 
  661:               NewEve = connect_from_spec(EveSpec2, Config),
  662:               EveNode2 = ct:get_config({hosts, mim, node}),
  663:               wait_for_registration(NewEve, EveNode2),
  664: 
  665:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  666: 
  667: 
  668:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)),
  669:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)),
  670: 
  671:               FirstFromAlice = escalus_client:wait_for_stanza(NewEve),
  672:               AgainFromEve = escalus_client:wait_for_stanza(Alice),
  673:               SecondFromAlice = escalus_client:wait_for_stanza(NewEve),
  674: 
  675:               [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]),
  676: 
  677:               escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice),
  678:               escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve),
  679:               escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice),
  680:               escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve)
  681:           end).
  682: 
  683: %% Refresh logic can cause two possible behaviours.
  684: %% We test both behaviours here (plus no refresh case)
  685: %% See PR #2392
  686: test_pm_with_ungraceful_reconnection_to_different_server(Config) ->
  687:     %% No refresh
  688:     BeforeResume = fun() -> ok end,
  689:     AfterCheck = fun(Alice, NewEve) ->
  690:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  691:             user_receives(Alice, [<<"Hi from Europe!">>])
  692:          end,
  693:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  694: 
  695: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) ->
  696:     %% Same as no refresh
  697:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first",
  698:     % Order of nodes is important here in refresh_hosts!
  699:     BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end,
  700:     AfterCheck = fun(Alice, NewEve) ->
  701:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  702:             user_receives(Alice, [<<"Hi from Europe!">>])
  703:          end,
  704:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  705: 
  706: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) ->
  707:     %% Asia node overrides Europe value with the older ones,
  708:     %% so we loose some messages during rerouting :(
  709:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first",
  710:     BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end,
  711:     AfterCheck = fun(Alice, NewEve) ->
  712:             user_receives(NewEve, [<<"Hi again from Europe1!">>]),
  713:             user_receives(Alice, [<<"Hi from Europe!">>])
  714:          end,
  715:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  716: 
  717: %% Reconnect Eve from asia (reg cluster) to europe (mim)
  718: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) ->
  719:     Config = escalus_users:update_userspec(Config0, eve, stream_management, true),
  720:     EveSpec = ?config(evespec_reg, Config),
  721:     EveSpec2 = ?config(evespec_mim, Config),
  722:     escalus:fresh_story(
  723:       Config, [{alice, 1}],
  724:       fun(Alice) ->
  725:               {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()),
  726:               escalus_story:send_initial_presence(Eve),
  727:               escalus_client:wait_for_stanza(Eve),
  728: 
  729:               %% Stop connection and wait for process to die
  730:               EveNode = ct:get_config({hosts, reg, node}),
  731:               C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}),
  732:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  733: 
  734:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)),
  735: 
  736:               %% Wait for route message to be queued in c2s message queue
  737:               mongoose_helper:wait_for_route_message_count(C2sPid, 1),
  738: 
  739:               %% Time to do bad nasty things with our socket, so once our process wakes up,
  740:               %% it SHOULD detect a dead socket
  741:               escalus_connection:kill(Eve),
  742: 
  743:               %% Connect another one, we hope the message would be rerouted
  744:               NewEve = connect_from_spec(EveSpec2, Config),
  745:               EveNode2 = ct:get_config({hosts, mim, node}),
  746:               wait_for_registration(NewEve, EveNode2),
  747: 
  748:               BeforeResume(),
  749: 
  750:               %% Trigger rerouting
  751:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  752:               C2sPid ! resume_timeout,
  753: 
  754:               %% Let C2sPid to process the message and reroute (and die finally, poor little thing)
  755:               mongoose_helper:wait_for_pid_to_die(C2sPid),
  756: 
  757:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)),
  758:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)),
  759: 
  760:               AfterCheck(Alice, NewEve)
  761:           end).
  762: 
  763: test_global_disco(Config) ->
  764:     escalus:fresh_story(
  765:       Config, [{alice, 1}, {eve, 1}],
  766:       fun(Alice, Eve) ->
  767:               AliceServer = escalus_client:server(Alice),
  768:               escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)),
  769:               _AliceStanza = escalus:wait_for_stanza(Alice),
  770:               %% TODO: test for duplicate components
  771:               %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza),
  772: 
  773:               EveServer = escalus_client:server(Eve),
  774:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  775:               EveStanza = escalus:wait_for_stanza(Eve),
  776:               escalus:assert(has_service, [muc_helper:muc_host()], EveStanza)
  777:       end).
  778: 
  779: test_component_unregister(_Config) ->
  780:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  781:                        {port, service_port()}, {component, <<"test_service">>}],
  782: 
  783:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  784:     ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  785:                               [<<"test_service.localhost">>])),
  786: 
  787:     component_helper:disconnect_component(Comp, Addr),
  788: 
  789:     ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  790:                             [<<"test_service.localhost">>])).
  791: 
  792: test_error_on_wrong_hosts(_Config) ->
  793:     Opts = module_opts(#{local_host => <<"no_such_host">>}),
  794:     ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed,
  795:                                                  domain := <<"no_such_host">>}, _}}},
  796:                      dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>,
  797:                                                     [{mod_global_distrib, Opts}])).
  798: 
  799: refresh_nodes(Config) ->
  800:     NodesKey = ?config(nodes_key, Config),
  801:     NodeBin = ?config(node_to_expire, Config),
  802:     redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]),
  803:     refresh_mappings(europe_node1, "by_refresh_nodes"),
  804:     {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]).
  805: 
  806: test_in_order_messages_on_multiple_connections(Config) ->
  807:     escalus:fresh_story(
  808:       Config, [{alice, 1}, {eve, 1}],
  809:       fun(Alice, Eve) ->
  810:               Seq = lists:seq(1, 100),
  811:               lists:foreach(
  812:                 fun(I) ->
  813:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  814:                         escalus_client:send(Alice, Stanza)
  815:                 end,
  816:                 Seq),
  817:               lists:foreach(
  818:                 fun(I) ->
  819:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  820:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  821:                 end,
  822:                 Seq)
  823:       end).
  824: 
  825: test_in_order_messages_on_multiple_connections_with_bounce(Config) ->
  826:     escalus:fresh_story(
  827:       Config, [{alice, 1}, {eve, 1}],
  828:       fun(Alice, Eve) ->
  829:               %% Send 99 messages, some while server knows the mapping and some when it doesn't
  830:               send_steps(Alice, Eve, 99, <<"reg1">>),
  831:               %% Make sure that the last message is sent when the mapping is known
  832:               set_mapping(europe_node1, Eve, <<"reg1">>),
  833:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),
  834: 
  835:               %% Check that all stanzas were received in order
  836:               lists:foreach(
  837:                 fun(I) ->
  838:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  839:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  840:                 end,
  841:                 lists:seq(1, 100))
  842:       end).
  843: 
  844: test_messages_bounced_in_order(Config) ->
  845:     escalus:fresh_story(
  846:       Config, [{alice, 1}, {eve, 1}],
  847:       fun(Alice, Eve) ->
  848:               %% Make sure all messages land in bounce storage
  849:               delete_mapping(europe_node1, Eve),
  850: 
  851:               Seq = lists:seq(1, 100),
  852:               lists:foreach(
  853:                 fun(I) ->
  854:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  855:                         escalus_client:send(Alice, Stanza)
  856:                 end,
  857:                 Seq),
  858: 
  859:               %% Restore the mapping so that bounce eventually succeeds
  860:               ?assertEqual(undefined, get_mapping(europe_node1, Eve)),
  861:               set_mapping(europe_node1, Eve, <<"reg1">>),
  862: 
  863:               lists:foreach(
  864:                 fun(I) ->
  865:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  866:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  867:                 end,
  868:                 Seq)
  869:       end).
  870: 
  871: test_update_senders_host(Config) ->
  872:     escalus:fresh_story(
  873:       Config, [{alice, 1}, {eve, 1}],
  874:       fun(Alice, Eve) ->
  875:               AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]),
  876:               {ok, <<"localhost.bis">>}
  877:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]),
  878:               ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]),
  879:               wait_for_node(asia_node, AliceJid),
  880: 
  881:               %% TODO: Should prevent Redis refresher from executing for a moment,
  882:               %%       as it may collide with this test.
  883: 
  884:               escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)),
  885:               escalus:wait_for_stanza(Eve),
  886: 
  887:               {ok, <<"localhost.bis">>}
  888:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid])
  889:       end).
  890: wait_for_node(Node,Jid) ->
  891:     mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end,
  892:                                error,
  893:                                #{time_left => timer:seconds(10),
  894:                                  sleep_time => timer:seconds(1),
  895:                                  name => rpc}).
  896: 
  897: test_update_senders_host_by_ejd_service(Config) ->
  898:     refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"),
  899:     %% Connects to europe_node1
  900:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  901:                        {port, service_port()}, {component, <<"test_service">>}],
  902: 
  903:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  904: 
  905:     escalus:fresh_story(
  906:       Config, [{eve, 1}],
  907:       fun(Eve) ->
  908:               %% Eve is connected to asia_node
  909:               EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]),
  910:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  911:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]),
  912: 
  913:               ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]),
  914:               wait_for_node(europe_node1, EveJid),
  915:               wait_for_node(europe_node2, EveJid),
  916: 
  917:               %% Component is connected to europe_node1
  918:               %% but we force asia_node to connect to europe_node2 by hiding europe_node1
  919:               %% and forcing rebalance (effectively disabling connections to europe_node1)
  920:               %% to verify routing cache update on both nodes
  921: 
  922:               %% TODO: Should prevent Redis refresher from executing for a moment,
  923:               %%       as it may collide with this test.
  924: 
  925:               hide_node(europe_node1, Config),
  926:               {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  927:               trigger_rebalance(asia_node, EuropeHost),
  928: 
  929:               escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)),
  930:               escalus:wait_for_stanza(Comp),
  931: 
  932:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  933:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid])
  934:       end).
  935: 
  936: %% -------------------------------- Rebalancing --------------------------------
  937: 
  938: enable_new_endpoint_on_refresh(Config) ->
  939:     get_connection(europe_node1, <<"reg1">>),
  940: 
  941:     {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  942: 
  943:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  944:     NewEndpoint = resolved_endpoint(ExtraPort),
  945:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  946: 
  947:     {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  948: 
  949:     %% One new pool and one new endpoint
  950:     [NewEndpoint] = Pools2 -- Pools1,
  951:     [] = Pools1 -- Pools2,
  952:     [NewEndpoint] = Enabled2 -- Enabled1,
  953:     [] = Enabled1 -- Enabled2.
  954: 
  955: disable_endpoint_on_refresh(Config) ->
  956:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  957:     NewEndpoint = resolved_endpoint(ExtraPort),
  958:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  959: 
  960:     get_connection(europe_node1, <<"reg1">>),
  961: 
  962:     {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  963:     [_, _] = Enabled1,
  964:     [] = Disabled1,
  965: 
  966:     hide_extra_endpoint(asia_node),
  967:     trigger_rebalance(europe_node1, <<"reg1">>),
  968: 
  969:     {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  970: 
  971:     %% 2 pools open even after disable
  972:     [] = Pools1 -- Pools2,
  973:     [] = Pools2 -- Pools1,
  974:     %% NewEndpoint is no longer enabled
  975:     [] = Enabled2 -- Enabled1,
  976:     [NewEndpoint] = Enabled1 -- Enabled2,
  977:     %% NewEndpoint is now disabled
  978:     [] = Disabled1,
  979:     [NewEndpoint] = Disabled2.
  980: 
  981: wait_for_connection(_Config) ->
  982:     set_endpoints(asia_node, []),
  983:     %% Because of hosts refresher, a pool of connections to asia_node
  984:     %% may already be present here
  985:     mongoose_helper:wait_until(
  986:                                 fun () ->
  987:                                     try trigger_rebalance(europe_node1, <<"reg1">>), true
  988:                                     catch _:_ -> false end
  989:                                 end,
  990:                                 true,
  991:                                 #{name => rebalance, time_left => timer:seconds(5)}),
  992: 
  993:     spawn_connection_getter(europe_node1),
  994: 
  995:     receive
  996:         Unexpected1 -> error({unexpected, Unexpected1})
  997:     after
  998:         2000 -> ok
  999:     end,
 1000: 
 1001:     refresh_mappings(asia_node, "by_wait_for_connection"),
 1002:     trigger_rebalance(europe_node1, <<"reg1">>),
 1003: 
 1004:     receive
 1005:         Conn when is_pid(Conn) -> ok;
 1006:         Unexpected2 -> error({unexpected, Unexpected2})
 1007:     after
 1008:         5000 -> error(timeout)
 1009:     end.
 1010: 
 1011: closed_connection_is_removed_from_disabled(_Config) ->
 1012:     get_connection(europe_node1, <<"reg1">>),
 1013:     set_endpoints(asia_node, []),
 1014:     trigger_rebalance(europe_node1, <<"reg1">>),
 1015: 
 1016:     {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>),
 1017: 
 1018:     % Will drop connections and prevent them from reconnecting
 1019:     restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]),
 1020: 
 1021:     mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end,
 1022:                                {[], [], []},
 1023:                               #{name => get_outgoing_connections}).
 1024: 
 1025: 
 1026: %%--------------------------------------------------------------------
 1027: %% Test helpers
 1028: %%--------------------------------------------------------------------
 1029: 
 1030: get_port(Host, Param) ->
 1031:     case ct:get_config({hosts, Host, Param}) of
 1032:         Port when is_integer(Port) ->
 1033:             Port;
 1034:         Other ->
 1035:             ct:fail({get_port_failed, Host, Param, Other})
 1036:     end.
 1037: 
 1038: get_hosts() ->
 1039:     [
 1040:      {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)},
 1041:      {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)},
 1042:      {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)}
 1043:     ].
 1044: 
 1045: listen_endpoint(NodeName) ->
 1046:     endpoint(listen_port(NodeName)).
 1047: 
 1048: listen_port(NodeName) ->
 1049:     {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()),
 1050:     Port.
 1051: 
 1052: resolved_endpoint(Port) when is_integer(Port) ->
 1053:     {{127, 0, 0, 1}, Port}.
 1054: 
 1055: endpoint(Port) when is_integer(Port) ->
 1056:     {"127.0.0.1", Port}.
 1057: 
 1058: %% For dynamic_modules
 1059: node_spec(NodeName) ->
 1060:     #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}.
 1061: 
 1062: rpc(NodeName, M, F, A) ->
 1063:     Node = ct:get_config(NodeName),
 1064:     mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)).
 1065: 
 1066: hide_node(NodeName, Config) ->
 1067:     NodesKey = ?config(nodes_key, Config),
 1068:     NodeBin = atom_to_binary(ct:get_config(NodeName), latin1),
 1069:     {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]).
 1070: 
 1071: connect_from_spec(UserSpec, Config) ->
 1072:     {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>),
 1073:     escalus_connection:set_filter_predicate(User, fun(S) -> not escalus_pred:is_presence(S) end),
 1074:     escalus_story:send_initial_presence(User),
 1075:     User.
 1076: 
 1077: chat_with_seqnum(To, Text) ->
 1078:     escalus_stanza:set_id(escalus_stanza:chat_to(To, Text),
 1079:                           integer_to_binary(erlang:monotonic_time())).
 1080: 
 1081: order_by_seqnum(Stanzas) ->
 1082:     lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end,
 1083:                Stanzas).
 1084: 
 1085: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) ->
 1086:     Pred = fun(Item) ->
 1087:                    exml_query:attr(Item, <<"jid">>) =:= Service
 1088:            end,
 1089:     case lists:filter(Pred, Services) of
 1090:         [_] -> true;
 1091:         _ -> false
 1092:     end.
 1093: 
 1094: send_steps(From, To, Max, ToHost) ->
 1095:     next_send_step(From, To, 1, Max, Max div 10, true, ToHost).
 1096: 
 1097: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok;
 1098: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) ->
 1099:     ct:log("Reset: I: ~B", [I]),
 1100:     case KnowsMapping of
 1101:         true -> delete_mapping(europe_node1, To);
 1102:         false -> set_mapping(europe_node1, To, ToHost)
 1103:     end,
 1104:     next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost);
 1105: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) ->
 1106:     ct:log("I: ~B ~B ~B", [I, Max, ToReset]),
 1107:     Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)),
 1108:     escalus_client:send(From, Stanza),
 1109:     next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost).
 1110: 
 1111: get_mapping(Node, Client) ->
 1112:     {FullJid, _BareJid} = jids(Client),
 1113:     {ok, What} = redis_query(Node, [<<"GET">>, FullJid]),
 1114:     What.
 1115: 
 1116: %% Warning! May not work properly with alice or any other user whose
 1117: %% stringprepped JID is different than original one
 1118: delete_mapping(Node, Client) ->
 1119:     {FullJid, BareJid} = jids(Client),
 1120:     redis_query(Node, [<<"DEL">>, FullJid, BareJid]),
 1121:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1122:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1123: 
 1124: set_mapping(Node, Client, Mapping) ->
 1125:     {FullJid, BareJid} = jids(Client),
 1126:     redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]),
 1127:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1128:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1129: 
 1130: jids(Client) ->
 1131:     FullJid = escalus_client:full_jid(Client),
 1132:     BareJid = escalus_client:short_jid(Client),
 1133:     {FullJid, BareJid}.
 1134: 
 1135: redis_query(Node, Query) ->
 1136:     {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]),
 1137:     rpc(Node, eredis, q, [RedisWorker, Query]).
 1138: 
 1139: %% A fake address we don't try to connect to.
 1140: %% Used in test_advertised_endpoints_override_endpoints testcase.
 1141: advertised_endpoints() ->
 1142:     [
 1143:      {fake_domain(), get_port(reg, gd_endpoint_port)}
 1144:     ].
 1145: 
 1146: fake_domain() ->
 1147:     "somefakedomain.com".
 1148: 
 1149: iptuples_to_string([]) ->
 1150:     [];
 1151: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) ->
 1152:     [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)];
 1153: iptuples_to_string([E | Endps]) ->
 1154:     [E | iptuples_to_string(Endps)].
 1155: 
 1156: endpoint_opts(NodeName, ReceiverPort, Config) ->
 1157:     Endpoints = [endpoint(ReceiverPort)],
 1158:     AdvertisedEndpoints =
 1159:         proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints),
 1160:     #{endpoints => Endpoints,
 1161:       resolved_endpoints => [resolved_endpoint(ReceiverPort)],
 1162:       advertised_endpoints => AdvertisedEndpoints}.
 1163: 
 1164: mock_inet_on_each_node() ->
 1165:     Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()),
 1166:     Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes),
 1167:     true = lists:all(fun(Result) -> Result =:= ok end, Results).
 1168: 
 1169: execute_on_each_node(M, F, A) ->
 1170:     lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()).
 1171: 
 1172: mock_inet() ->
 1173:     %% We don't want to mock inet module itself to avoid strange networking issues
 1174:     meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]),
 1175:     meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]};
 1176:                                                        (_, inet6) -> {error, "No ipv6 address"} end).
 1177: 
 1178: unmock_inet(_Pids) ->
 1179:     execute_on_each_node(meck, unload, [mod_global_distrib_utils]).
 1180: 
 1181: out_connection_sups(Node) ->
 1182:     Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]),
 1183:     lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children).
 1184: 
 1185: trees_for_connections_present() ->
 1186:     AsiaChildren = out_connection_sups(asia_node),
 1187:     Europe1Children = out_connection_sups(europe_node1),
 1188:     Europe2Children = out_connection_sups(europe_node2),
 1189:     lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]).
 1190: 
 1191: tree_for_sup_present(Node, ExpectedSup) ->
 1192:     Children = out_connection_sups(Node),
 1193:     lists:keyfind(ExpectedSup, 1, Children) =/= false.
 1194: 
 1195: 
 1196: %% ------------------------------- rebalancing helpers -----------------------------------
 1197: 
 1198: spawn_connection_getter(SenderNode) ->
 1199:     TestPid = self(),
 1200:     spawn(fun() ->
 1201:                   Conn = get_connection(SenderNode, <<"reg1">>),
 1202:                   TestPid ! Conn
 1203:           end).
 1204: 
 1205: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) ->
 1206:     restart_receiver(ListenNode, [Port, listen_port(ListenNode)]),
 1207:     set_endpoints(ListenNode, [Port, listen_port(ListenNode)]),
 1208:     trigger_rebalance(SenderNode, <<"reg1">>).
 1209: 
 1210: get_connection(SenderNode, ToDomain) ->
 1211:     rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]).
 1212: 
 1213: hide_extra_endpoint(ListenNode) ->
 1214:     set_endpoints(ListenNode, [listen_port(ListenNode)]).
 1215: 
 1216: set_endpoints(ListenNode, Ports) ->
 1217:     Endpoints = [endpoint(Port) || Port <- Ports],
 1218:     {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]).
 1219: 
 1220: get_outgoing_connections(NodeName, DestinationDomain) ->
 1221:     Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]),
 1222:     Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]),
 1223:     Enabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1224:                   get_enabled_endpoints, [DestinationDomain]),
 1225:     Disabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1226:                    get_disabled_endpoints, [DestinationDomain]),
 1227:     PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]),
 1228:     Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ],
 1229:     {Enabled, Disabled, Pools}.
 1230: 
 1231: restart_receiver(NodeName) ->
 1232:     restart_receiver(NodeName, [listen_port(NodeName)]).
 1233: 
 1234: restart_receiver(NodeName, NewPorts) ->
 1235:     OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts,
 1236:                                                   [<<"localhost">>, mod_global_distrib_receiver]),
 1237:     NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts],
 1238:                                resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]},
 1239:     NewOpts = OldOpts#{connections := NewConnOpts},
 1240:     Node = node_spec(NodeName),
 1241:     dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts).
 1242: 
 1243: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) ->
 1244:     %% To ensure that the manager exists,
 1245:     %% otherwise we can get noproc error in the force_refresh call
 1246:     ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup,
 1247:              ensure_server_started, [DestinationDomain]),
 1248:     rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]),
 1249:     StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]),
 1250:     ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]),
 1251:     timer:sleep(1000).
 1252: 
 1253: %% -----------------------------------------------------------------------
 1254: %% Escalus-related helpers
 1255: 
 1256: user_receives(User, Bodies) ->
 1257:     ExpectedLength = length(Bodies),
 1258:     Messages = escalus_client:wait_for_stanzas(User, ExpectedLength),
 1259:     SortedMessages = order_by_seqnum(Messages),
 1260:     case length(Messages) of
 1261:         ExpectedLength ->
 1262:             Checks = [escalus_pred:is_chat_message(Body, Stanza) || {Body, Stanza} <- lists:zip(Bodies, SortedMessages)],
 1263:             case lists:all(fun(Check) -> Check end, Checks) of
 1264:                 true ->
 1265:                     ok;
 1266:                 false ->
 1267:                     ct:fail({user_receives_failed, {wanted, Bodies}, {received, SortedMessages}, {check, Checks}})
 1268:             end;
 1269:         _ ->
 1270:             ct:fail({user_receives_not_enough, {wanted, Bodies}, {received, SortedMessages}})
 1271:     end.
 1272: 
 1273: 
 1274: %% -----------------------------------------------------------------------
 1275: %% Refreshing helpers
 1276: 
 1277: %% Reason is a string
 1278: %% NodeName is asia_node, europe_node2, ... in a format used by this suite.
 1279: refresh_mappings(NodeName, Reason) when is_list(Reason) ->
 1280:     rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]).
 1281: 
 1282: refresh_hosts(NodeNames, Reason) ->
 1283:    [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames].
 1284: 
 1285: 
 1286: %% -----------------------------------------------------------------------
 1287: %% Other helpers
 1288: 
 1289: connect_steps_with_sm() ->
 1290:     [start_stream, stream_features, maybe_use_ssl,
 1291:      authenticate, bind, session, stream_resumption].
 1292: 
 1293: bare_client(Client) ->
 1294:     Client#client{jid = escalus_utils:get_short_jid(Client)}.
 1295: 
 1296: service_port() ->
 1297:     ct:get_config({hosts, mim, service_port}).
 1298: 
 1299: 
 1300: %% -----------------------------------------------------------------------
 1301: %% Waiting helpers
 1302: 
 1303: wait_for_domain(Node, Domain) ->
 1304:     F = fun() ->
 1305:                 Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
 1306:                 lists:member(Domain, Domains)
 1307:         end,
 1308:     mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}).
 1309: 
 1310: %% We receive presence BEFORE session is registered in ejabberd_sm.
 1311: %% So, to ensure that we processed do_open_session completely, let's send a "ping".
 1312: %% by calling the c2s process.
 1313: %% That call would only return, when all messages in erlang message queue
 1314: %% are processed.
 1315: wait_for_registration(Client, Node) ->
 1316:     RPCSpec = #{node => Node},
 1317:     mongoose_helper:wait_until(fun() -> is_pid(mongoose_helper:get_session_pid(Client, RPCSpec)) end, true,
 1318:                                #{name => wait_for_session}),
 1319:     C2sPid = mongoose_helper:get_session_pid(Client, RPCSpec),
 1320:     rpc:call(node(C2sPid), ejabberd_c2s, get_info, [C2sPid]),
 1321:     ok.
 1322: 
 1323: 
 1324: %% -----------------------------------------------------------------------
 1325: %% Ensure, that endpoints are up
 1326: 
 1327: wait_for_listeners_to_appear() ->
 1328:     [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())].
 1329: 
 1330: receiver_ports(Hosts) ->
 1331:     lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts).
 1332: 
 1333: wait_for_can_connect_to_port(Port) ->
 1334:     Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}},
 1335:     mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts).
 1336: 
 1337: can_connect_to_port(Port) ->
 1338:     case gen_tcp:connect("127.0.0.1", Port, []) of
 1339:         {ok, Sock} ->
 1340:             gen_tcp:close(Sock),
 1341:             true;
 1342:         Other ->
 1343:             ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]),
 1344:             false
 1345:     end.
 1346: 
 1347: %% -----------------------------------------------------------------------
 1348: %% Custom log levels for GD modules during the tests
 1349: 
 1350: enable_logging() ->
 1351:     mim_loglevel:enable_logging(test_hosts(), custom_loglevels()).
 1352: 
 1353: disable_logging() ->
 1354:     mim_loglevel:disable_logging(test_hosts(), custom_loglevels()).
 1355: 
 1356: custom_loglevels() ->
 1357:     %% for "s2s connection to muc.localhost not found" debugging
 1358:     [{ejabberd_s2s, debug},
 1359:     %% for debugging event=refreshing_own_data_done
 1360:      {mod_global_distrib_mapping_redis, info},
 1361:     %% to know if connection is already started or would be started
 1362:     %% event=outgoing_conn_start_progress
 1363:      {mod_global_distrib_outgoing_conns_sup, info},
 1364:     %% to debug bound connection issues
 1365:      {mod_global_distrib, debug},
 1366:     %% to know all new connections pids
 1367:      {mod_global_distrib_connection, debug},
 1368:     %% to check if gc or refresh is triggered
 1369:      {mod_global_distrib_server_mgr, info},
 1370:    %% To debug incoming connections
 1371: %    {mod_global_distrib_receiver, info},
 1372:    %% to debug global session set/delete
 1373:      {mod_global_distrib_mapping, debug}
 1374:     ].
 1375: 
 1376: test_hosts() -> [mim, mim2, reg].