1: %%==============================================================================
    2: %% Copyright 2017 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mod_global_distrib_SUITE).
   18: 
   19: -compile([export_all, nowarn_export_all]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("common_test/include/ct.hrl").
   23: -include_lib("eunit/include/eunit.hrl").
   24: -include_lib("escalus/include/escalus_xmlns.hrl").
   25: -include_lib("exml/include/exml.hrl").
   26: 
   27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms
   28: 
   29: -import(domain_helper, [domain/0]).
   30: -import(config_parser_helper, [config/2, mod_config/2]).
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Suite configuration
   34: %%--------------------------------------------------------------------
   35: 
   36: all() ->
   37:     [
   38:      {group, mod_global_distrib},
   39:      {group, cluster_restart},
   40:      {group, start_checks},
   41:      {group, invalidation},
   42:      {group, multi_connection},
   43:      {group, rebalancing},
   44:      {group, advertised_endpoints},
   45:      {group, hosts_refresher}
   46:     ].
   47: 
   48: groups() ->
   49:     [{mod_global_distrib, [],
   50:           [
   51:            test_pm_between_users_at_different_locations,
   52:            test_pm_between_users_before_available_presence,
   53:            test_component_disconnect ,
   54:            test_component_on_one_host,
   55:            test_components_in_different_regions,
   56:            test_hidden_component_disco_in_different_region,
   57:            test_pm_with_disconnection_on_other_server,
   58:            test_pm_with_graceful_reconnection_to_different_server,
   59:            test_pm_with_ungraceful_reconnection_to_different_server,
   60:            test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first,
   61:            test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first,
   62:            test_component_unregister,
   63:            test_update_senders_host,
   64:            test_update_senders_host_by_ejd_service,
   65: 
   66:            %% with node 2 disabled
   67:            test_muc_conversation_on_one_host,
   68:            test_global_disco
   69:           ]},
   70:          {hosts_refresher, [],
   71:           [test_host_refreshing]},
   72:          {cluster_restart, [],
   73:           [
   74:            test_location_disconnect
   75:           ]},
   76:          {start_checks, [],
   77:           [
   78:            test_error_on_wrong_hosts
   79:           ]},
   80:          {invalidation, [],
   81:           [
   82:            % TODO: Add checks for other mapping refreshes
   83:            refresh_nodes
   84:           ]},
   85:          {multi_connection, [],
   86:           [
   87:            test_in_order_messages_on_multiple_connections,
   88:            test_in_order_messages_on_multiple_connections_with_bounce,
   89:            test_messages_bounced_in_order,
   90: 
   91:            %% with node 2 disabled
   92:            test_muc_conversation_history
   93:           ]},
   94:          {rebalancing, [],
   95:           [
   96:            enable_new_endpoint_on_refresh,
   97:            disable_endpoint_on_refresh,
   98:            wait_for_connection,
   99:            closed_connection_is_removed_from_disabled
  100:           ]},
  101:          {advertised_endpoints, [],
  102:           [
  103:            test_advertised_endpoints_override_endpoints,
  104:            test_pm_between_users_at_different_locations
  105:           ]}
  106:     ].
  107: 
  108: suite() ->
  109:     [{require, europe_node1, {hosts, mim, node}},
  110:      {require, europe_node2, {hosts, mim2, node}},
  111:      {require, asia_node, {hosts, reg, node}},
  112:      {require, c2s_port, {hosts, mim, c2s_port}} |
  113:      escalus:suite()].
  114: 
  115: %%--------------------------------------------------------------------
  116: %% Init & teardown
  117: %%--------------------------------------------------------------------
  118: 
  119: init_per_suite(Config) ->
  120:     case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]),
  121:           rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of
  122:         {{ok, _}, {ok, _}} ->
  123:             case ct_helper:get_internal_database() of
  124:                 mnesia ->
  125:                     ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]);
  126:                 _ ->
  127:                     ok
  128:             end,
  129:             enable_logging(),
  130:             escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]);
  131:         Result ->
  132:             ct:pal("Redis check result: ~p", [Result]),
  133:             {skip, "GD Redis default pool not available"}
  134:     end.
  135: 
  136: end_per_suite(Config) ->
  137:     disable_logging(),
  138:     escalus_fresh:clean(),
  139:     rpc(europe_node2, mongoose_cluster, leave, []),
  140:     escalus:end_per_suite(Config).
  141: 
  142: init_per_group(start_checks, Config) ->
  143:     NodeName = europe_node1,
  144:     NodeSpec = node_spec(NodeName),
  145:     Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config),
  146:     dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]),
  147:     Config1;
  148: init_per_group(multi_connection, Config) ->
  149:     ExtraConfig = #{bounce => #{resend_after_ms => 20000},
  150:                     connections => #{%% Disable unused feature to avoid interference
  151:                                      connections_per_endpoint => 100,
  152:                                      disabled_gc_interval => 10000}},
  153:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  154: init_per_group(invalidation, Config) ->
  155:     Config1 = init_per_group(invalidation_generic, Config),
  156:     NodeBin = <<"fake_node@localhost">>,
  157:     [{node_to_expire, NodeBin} | Config1];
  158: init_per_group(rebalancing, Config) ->
  159:     %% We need to prevent automatic refreshes, because they may interfere with tests
  160:     %% and we need early disabled garbage collection to check its validity
  161:     ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600,
  162:                                      endpoint_refresh_interval_when_empty => 3600,
  163:                                      disabled_gc_interval => 1},
  164:                     redis => #{refresh_after => 3600}},
  165:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  166: init_per_group(advertised_endpoints, Config) ->
  167:     lists:foreach(fun({NodeName, _, _}) ->
  168:                           Node = ct:get_config(NodeName),
  169:                           mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload)
  170:                   end, get_hosts()),
  171:     mock_inet_on_each_node(),
  172:     init_per_group_generic(
  173:                [{add_advertised_endpoints,
  174:                  [{asia_node, advertised_endpoints()}]} | Config]);
  175: init_per_group(mod_global_distrib, Config) ->
  176:     %% Disable mod_global_distrib_mapping_redis refresher
  177:     ExtraConfig = #{redis => #{refresh_after => 3600}},
  178:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  179: init_per_group(_, Config) ->
  180:     init_per_group_generic(Config).
  181: 
  182: init_per_group_generic(Config0) ->
  183:     Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()),
  184:     wait_for_listeners_to_appear(),
  185:     {SomeNode, _, _} = hd(get_hosts()),
  186:     NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []),
  187:     [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2].
  188: 
  189: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) ->
  190:     Extra0 = module_opts(?config(extra_config, Config0)),
  191:     EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0),
  192:     ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})),
  193:     Extra = Extra0#{local_host => LocalHost, connections => ConnExtra},
  194:     Opts = module_opts(Extra),
  195: 
  196:     VirtHosts = virtual_hosts(NodeName),
  197:     Node = node_spec(NodeName),
  198:     Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0),
  199: 
  200:     %% To reduce load when sending many messages
  201:     ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last,
  202:                      mod_stream_management],
  203:     [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts],
  204: 
  205:     SMBackend = ct_helper:get_internal_database(),
  206:     SMOpts = config_parser_helper:mod_config(mod_stream_management,
  207:                                              #{resume_timeout => 1, backend => SMBackend}),
  208:     dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts},
  209:                                                     {mod_stream_management, SMOpts}]),
  210:     Config1.
  211: 
  212: module_opts(ExtraOpts) ->
  213:     lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]).
  214: 
  215: set_opts(common, Opts) ->
  216:     maps:merge(#{global_host => <<"localhost">>,
  217:                  hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts);
  218: set_opts(defaults, Opts) ->
  219:     mod_config(mod_global_distrib, Opts);
  220: set_opts(connections, #{connections := ConnExtra} = Opts) ->
  221:     TLSOpts = config([modules, mod_global_distrib, connections, tls],
  222:                      #{certfile => "priv/ssl/fake_server.pem",
  223:                        cacertfile => "priv/ssl/ca/cacert.pem"}),
  224:     Opts#{connections := config([modules, mod_global_distrib, connections],
  225:                                 maps:merge(#{tls => TLSOpts}, ConnExtra))};
  226: set_opts(redis, #{redis := RedisExtra} = Opts) ->
  227:     Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)};
  228: set_opts(bounce, #{bounce := BounceExtra} = Opts) ->
  229:     Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)};
  230: set_opts(_, Opts) ->
  231:     Opts.
  232: 
  233: end_per_group(advertised_endpoints, Config) ->
  234:     Pids = ?config(meck_handlers, Config),
  235:     unmock_inet(Pids),
  236:     escalus_fresh:clean(),
  237:     end_per_group_generic(Config);
  238: end_per_group(start_checks, Config) ->
  239:     escalus_fresh:clean(),
  240:     Config;
  241: end_per_group(invalidation, Config) ->
  242:     redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config),
  243:                             ?config(node_to_expire, Config)]),
  244:     end_per_group_generic(Config);
  245: end_per_group(_, Config) ->
  246:     end_per_group_generic(Config).
  247: 
  248: end_per_group_generic(Config) ->
  249:     dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config).
  250: 
  251: init_per_testcase(CaseName, Config)
  252:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  253:        CaseName == test_muc_conversation_history ->
  254:     %% There is no helper to load MUC on node2
  255:     %% For now it's easier to hide node2
  256:     %% TODO: Do it right at some point!
  257:     hide_node(europe_node2, Config),
  258:     %% There would be no new connections to europe_node2, but there can be some old ones.
  259:     %% We need to disconnect previous connections.
  260:     {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  261:     trigger_rebalance(asia_node, EuropeHost),
  262:     %% Load muc on mim node
  263:     muc_helper:load_muc(),
  264:     RegNode = ct:get_config({hosts, reg, node}),
  265:     %% Wait for muc.localhost to become visible from reg node
  266:     wait_for_domain(RegNode, muc_helper:muc_host()),
  267:     escalus:init_per_testcase(CaseName, Config);
  268: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  269:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server;
  270:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  271:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  272:     escalus:init_per_testcase(CN, init_user_eve(Config));
  273: init_per_testcase(CaseName, Config) ->
  274:     escalus:init_per_testcase(CaseName, Config).
  275: 
  276: init_user_eve(Config) ->
  277:     %% Register Eve in reg cluster
  278:     EveSpec = escalus_fresh:create_fresh_user(Config, eve),
  279:     MimPort = ct:get_config({hosts, mim, c2s_port}),
  280:     EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}),
  281:     %% Register Eve in mim cluster
  282:     escalus:create_users(Config, [{eve, EveSpec2}]),
  283:     [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config].
  284: 
  285: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  286:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server;
  287:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  288:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  289:     MimEveSpec = ?config(evespec_mim, Config),
  290:     %% Clean Eve from reg cluster
  291:     escalus_fresh:clean(),
  292:     %% Clean Eve from mim cluster
  293:     %% For shared databases (i.e. mysql, pgsql...),
  294:     %% removing from one cluster would remove from all clusters.
  295:     %% For mnesia auth backend we need to call removal from each cluster.
  296:     %% That's why there is a catch here.
  297:     catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]),
  298:     generic_end_per_testcase(CN, Config);
  299: end_per_testcase(CaseName, Config)
  300:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  301:        CaseName == test_muc_conversation_history ->
  302:     refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)),
  303:     muc_helper:unload_muc(),
  304:     generic_end_per_testcase(CaseName, Config);
  305: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) ->
  306:     refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  307:     generic_end_per_testcase(CN, Config);
  308: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh;
  309:                                   CN == disable_endpoint_on_refresh;
  310:                                   CN == wait_for_connection;
  311:                                   CN == closed_connection_is_removed_from_disabled ->
  312:     restart_receiver(asia_node),
  313:     refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  314:     generic_end_per_testcase(CN, Config);
  315: end_per_testcase(CaseName, Config) ->
  316:     generic_end_per_testcase(CaseName, Config).
  317: 
  318: generic_end_per_testcase(CaseName, Config) ->
  319:     lists:foreach(
  320:       fun({NodeName, _, _}) ->
  321:               %% TODO: Enable refresher only for specific test cases,
  322:               %% as some of them are based on assumption that node(s)
  323:               %% must open new connections during tests.
  324:               pause_refresher(NodeName, CaseName),
  325:               Node = ct:get_config(NodeName),
  326:               SupRef = {mod_global_distrib_outgoing_conns_sup, Node},
  327:               try
  328:                   OutgoingConns = supervisor:which_children(SupRef),
  329:                   lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) ->
  330:                                         skip;
  331:                                     ({Id, _, _, _}) ->
  332:                                         supervisor:terminate_child(SupRef, Id)
  333:                                 end, OutgoingConns),
  334:                   [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] =
  335:                     supervisor:which_children(SupRef)
  336:               catch
  337:                   _:{noproc, _} ->
  338:                       ct:pal("Sender supervisor not found in ~p", [NodeName])
  339:               end,
  340:               unpause_refresher(NodeName, CaseName)
  341:       end,
  342:       get_hosts()),
  343:     escalus:end_per_testcase(CaseName, Config).
  344: 
  345: virtual_hosts(asia_node) ->
  346:     [domain()];
  347: virtual_hosts(_) ->
  348:     [domain(), secondary_domain()].
  349: 
  350: secondary_domain() ->
  351:     ct:get_config({hosts, mim, secondary_domain}).
  352: 
  353: %% Refresher is not started at all or stopped for some test cases
  354: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  355: pause_refresher(_, test_error_on_wrong_hosts) ->
  356:     ok;
  357: pause_refresher(asia_node, test_location_disconnect) ->
  358:     ok;
  359: pause_refresher(NodeName, _) ->
  360:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []).
  361: 
  362: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  363: unpause_refresher(_, test_error_on_wrong_hosts) ->
  364:     ok;
  365: unpause_refresher(asia_node, test_location_disconnect) ->
  366:     ok;
  367: unpause_refresher(NodeName, _) ->
  368:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []).
  369: 
  370: %%--------------------------------------------------------------------
  371: %% Service discovery test
  372: %%--------------------------------------------------------------------
  373: 
  374: %% Requires module mod_global_distrib to be started with argument advertised_endpoints
  375: %% for each host in get_hosts().
  376: %% Reads Redis to confirm that endpoints (in Redis) are overwritten
  377: %% with `advertised_endpoints` option value
  378: test_advertised_endpoints_override_endpoints(_Config) ->
  379:     Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]),
  380:     true = lists:all(
  381:              fun(E) ->
  382:                      lists:sort(E) =:= lists:sort(advertised_endpoints())
  383:              end, Endps).
  384: 
  385: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if
  386: %% it goes down for some reason (crash or domain unavailability).
  387: %% Also actually verifies that refresher properly reads host list
  388: %% from backend and starts appropriate pool.
  389: test_host_refreshing(_Config) ->
  390:     mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true,
  391:                                #{name => trees_for_connections_present,
  392:                                  time_left => timer:seconds(10)}),
  393:     ConnectionSups = out_connection_sups(asia_node),
  394:     {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  395:     EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]),
  396:     {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups),
  397:     erlang:exit(EuropePid, kill), % it's ok to kill temporary process
  398:     mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true,
  399:                                #{name => tree_for_sup_present}).
  400: 
  401: %% When run in mod_global_distrib group - tests simple case of connection
  402: %% between two users connected to different clusters.
  403: %% When run in advertised_endpoints group it tests whether it is possible
  404: %% to connect to a node that is advertising itself with a domain name.
  405: test_pm_between_users_at_different_locations(Config) ->
  406:     escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2).
  407: 
  408: test_pm_between_users_before_available_presence(Config) ->
  409:     Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]),
  410:     {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>),
  411:     {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>),
  412: 
  413:     test_two_way_pm(Alice, Eve),
  414: 
  415:     escalus_client:stop(Config1, Alice),
  416:     escalus_client:stop(Config1, Eve).
  417: 
  418: test_two_way_pm(Alice, Eve) ->
  419:     escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)),
  420:     escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)),
  421: 
  422:     FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)),
  423:     FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)),
  424: 
  425:     AliceJid = escalus_client:full_jid(Alice),
  426:     EveJid = escalus_client:full_jid(Eve),
  427: 
  428:     escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>],
  429:                    FromAlice),
  430:     escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>],
  431:                    FromEve).
  432: 
  433: test_muc_conversation_on_one_host(Config0) ->
  434:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  435:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  436:     escalus:fresh_story(
  437:       Config, [{eve, 1}],
  438:       fun(Eve) ->
  439:               Alice = connect_from_spec(AliceSpec, Config),
  440: 
  441:               RoomJid = ?config(room, Config),
  442:               AliceUsername = escalus_utils:get_username(Alice),
  443:               EveUsername = escalus_utils:get_username(Eve),
  444:               RoomAddr = muc_helper:room_address(RoomJid),
  445: 
  446:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  447:               wait_for_muc_presence(Alice, RoomJid, AliceUsername),
  448:               wait_for_subject(Alice),
  449: 
  450:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  451:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  452:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  453:               wait_for_muc_presence(Alice, RoomJid, EveUsername),
  454:               wait_for_subject(Eve),
  455: 
  456:               Msg= <<"Hi, Eve!">>,
  457:               escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)),
  458:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)),
  459:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)),
  460: 
  461:               Msg2= <<"Hi, Alice!">>,
  462:               escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)),
  463:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)),
  464:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice))
  465:       end),
  466:     muc_helper:destroy_room(Config).
  467: 
  468: test_muc_conversation_history(Config0) ->
  469:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  470:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  471:     escalus:fresh_story(
  472:       Config, [{eve, 1}],
  473:       fun(Eve) ->
  474:               Alice = connect_from_spec(AliceSpec, Config),
  475: 
  476:               RoomJid = ?config(room, Config),
  477:               AliceUsername = escalus_utils:get_username(Alice),
  478:               RoomAddr = muc_helper:room_address(RoomJid),
  479: 
  480:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  481:               wait_for_muc_presence(Alice, RoomJid, AliceUsername),
  482:               wait_for_subject(Alice),
  483: 
  484:               send_n_muc_messages(Alice, RoomAddr, 3),
  485: 
  486:               %% Ensure that the messages are received by the room
  487:               %% before trying to login Eve.
  488:               %% Otherwise, Eve would receive some messages from history and
  489:               %% some as regular groupchat messages.
  490:               receive_n_muc_messages(Alice, 3),
  491: 
  492:               EveUsername = escalus_utils:get_username(Eve),
  493:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  494: 
  495:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  496:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  497:               wait_for_muc_presence(Alice, RoomJid, EveUsername),
  498: 
  499:               %% XEP-0045: After sending the presence broadcast (and only after doing so),
  500:               %% the service MAY then send discussion history, the room subject,
  501:               %% live messages, presence updates, and other in-room traffic.
  502:               receive_n_muc_messages(Eve, 3),
  503:               wait_for_subject(Eve)
  504:       end),
  505:     muc_helper:destroy_room(Config).
  506: 
  507: wait_for_muc_presence(User, RoomJid, FromNickname) ->
  508:     Presence = escalus:wait_for_stanza(User),
  509:     escalus:assert(is_presence, Presence),
  510:     escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence),
  511:     ok.
  512: 
  513: wait_for_subject(User) ->
  514:     Subject = escalus:wait_for_stanza(User),
  515:     escalus:assert(is_groupchat_message, Subject),
  516:     ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)),
  517:     ok.
  518: 
  519: send_n_muc_messages(User, RoomAddr, N) ->
  520:     lists:foreach(fun(I) ->
  521:                           Msg = <<"test-", (integer_to_binary(I))/binary>>,
  522:                           escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg))
  523:                   end, lists:seq(1, N)).
  524: 
  525: receive_n_muc_messages(User, N) ->
  526:     lists:foreach(fun(J) ->
  527:                           Msg = <<"test-", (integer_to_binary(J))/binary>>,
  528:                           Stanza = escalus:wait_for_stanza(User),
  529:                           escalus:assert(is_groupchat_message, [Msg], Stanza)
  530:                             end, lists:seq(1, N)).
  531: 
  532: test_component_on_one_host(Config) ->
  533:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  534:                        {port, service_port()}, {component, <<"test_service">>}],
  535: 
  536:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  537: 
  538:     Story = fun(User) ->
  539:                     Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>),
  540:                     escalus:send(User, Msg1),
  541:                     %% Then component receives it
  542:                     Reply1 = escalus:wait_for_stanza(Comp),
  543:                     escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1),
  544: 
  545:                     %% When components sends a reply
  546:                     Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>),
  547:                     escalus:send(Comp, escalus_stanza:from(Msg2, Addr)),
  548: 
  549:                     %% Then Alice receives it
  550:                     Reply2 = escalus:wait_for_stanza(User),
  551:                     escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2),
  552:                     escalus:assert(is_stanza_from, [Addr], Reply2)
  553:             end,
  554: 
  555:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  556: 
  557: %% Ensures that 2 components in distinct data centers can communicate.
  558: test_components_in_different_regions(_Config) ->
  559:     ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>},
  560:                              {server, <<"localhost">>}, {component, <<"test_service">>}],
  561:     Comp1Port = ct:get_config({hosts, mim, service_port}),
  562:     Comp2Port = ct:get_config({hosts, reg, service_port}),
  563:     Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig],
  564:     Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig],
  565: 
  566:     {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config),
  567:     {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config),
  568: 
  569:     Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1),
  570:     escalus:send(Comp1, Msg1),
  571:     GotMsg1 = escalus:wait_for_stanza(Comp2),
  572:     escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1),
  573: 
  574:     Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2),
  575:     escalus:send(Comp2, Msg2),
  576:     GotMsg2 = escalus:wait_for_stanza(Comp1),
  577:     escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2).
  578: 
  579: %% Ordinary user is not able to discover the hidden component from GD
  580: test_hidden_component_disco_in_different_region(Config) ->
  581:     %% Hidden component from component_SUITE connects to mim1/europe_node1
  582:     HiddenComponentConfig = component_helper:spec(hidden_component, Config),
  583:     {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig),
  584: 
  585:     escalus:fresh_story(
  586:       Config, [{eve, 1}],
  587:       fun(Eve) ->
  588:               EveServer = escalus_client:server(Eve),
  589:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  590:               DiscoReply = escalus:wait_for_stanza(Eve),
  591:               escalus:assert(is_iq_result, DiscoReply),
  592:               escalus:assert(fun(Stanza) ->
  593:                                      not escalus_pred:has_service(HiddenAddr, Stanza)
  594:                              end, DiscoReply)
  595:       end).
  596: 
  597: test_component_disconnect(Config) ->
  598:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  599:                        {port, service_port()}, {component, <<"test_service">>}],
  600: 
  601:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  602:     component_helper:disconnect_component(Comp, Addr),
  603: 
  604:     Story = fun(User) ->
  605:                     escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
  606:                     Error = escalus:wait_for_stanza(User, 5000),
  607:                     escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  608:             end,
  609: 
  610:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  611: 
  612: test_location_disconnect(Config) ->
  613:     try
  614:         escalus:fresh_story(
  615:           Config, [{alice, 1}, {eve, 1}],
  616:           fun(Alice, Eve) ->
  617:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  618: 
  619:                   escalus_client:wait_for_stanza(Eve),
  620: 
  621:                   ok = rpc(asia_node, application, stop, [mongooseim]),
  622:                   %% TODO: Stopping mongooseim alone should probably stop connections too
  623:                   ok = rpc(asia_node, application, stop, [ranch]),
  624: 
  625:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)),
  626:                   Error = escalus:wait_for_stanza(Alice),
  627:                   escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  628:           end)
  629:     after
  630:         rpc(asia_node, application, start, [ranch]),
  631:         rpc(asia_node, application, start, [mongooseim])
  632:     end.
  633: 
  634: test_pm_with_disconnection_on_other_server(Config) ->
  635:     escalus:fresh_story(
  636:       Config, [{alice, 1}, {eve, 1}],
  637:       fun(Alice, Eve) ->
  638:               escalus_connection:stop(Eve),
  639:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  640:               FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000),
  641:               escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce)
  642:       end).
  643: 
  644: test_pm_with_graceful_reconnection_to_different_server(Config) ->
  645:     EveSpec = ?config(evespec_reg, Config),
  646:     EveSpec2 = ?config(evespec_mim, Config),
  647:     escalus:fresh_story(
  648:       Config, [{alice, 1}],
  649:       fun(Alice) ->
  650:               Eve = connect_from_spec(EveSpec, Config),
  651: 
  652:               escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)),
  653: 
  654:               %% Stop connection and wait for process to die
  655:               EveNode = ct:get_config({hosts, reg, node}),
  656:               mongoose_helper:logout_user(Config, Eve, #{node => EveNode}),
  657: 
  658:               FromEve = escalus_client:wait_for_stanza(Alice),
  659: 
  660:               %% Pause Alice until Eve is reconnected
  661:               AliceNode = ct:get_config({hosts, mim, node}),
  662:               C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}),
  663:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  664: 
  665:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)),
  666: 
  667:               NewEve = connect_from_spec(EveSpec2, Config),
  668: 
  669:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  670: 
  671: 
  672:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)),
  673:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)),
  674: 
  675:               FirstFromAlice = escalus_client:wait_for_stanza(NewEve),
  676:               AgainFromEve = escalus_client:wait_for_stanza(Alice),
  677:               SecondFromAlice = escalus_client:wait_for_stanza(NewEve),
  678: 
  679:               [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]),
  680: 
  681:               escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice),
  682:               escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve),
  683:               escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice),
  684:               escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve)
  685:           end).
  686: 
  687: %% Refresh logic can cause two possible behaviours.
  688: %% We test both behaviours here (plus no refresh case)
  689: %% See PR #2392
  690: test_pm_with_ungraceful_reconnection_to_different_server(Config) ->
  691:     %% No refresh
  692:     BeforeResume = fun() -> ok end,
  693:     AfterCheck = fun(Alice, NewEve) ->
  694:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  695:             user_receives(Alice, [<<"Hi from Europe!">>])
  696:          end,
  697:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  698: 
  699: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) ->
  700:     %% Same as no refresh
  701:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first",
  702:     % Order of nodes is important here in refresh_hosts!
  703:     BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end,
  704:     AfterCheck = fun(Alice, NewEve) ->
  705:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  706:             user_receives(Alice, [<<"Hi from Europe!">>])
  707:          end,
  708:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  709: 
  710: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) ->
  711:     %% Asia node overrides Europe value with the older ones,
  712:     %% so we loose some messages during rerouting :(
  713:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first",
  714:     BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end,
  715:     AfterCheck = fun(Alice, NewEve) ->
  716:             user_receives(NewEve, [<<"Hi again from Europe1!">>]),
  717:             user_receives(Alice, [<<"Hi from Europe!">>])
  718:          end,
  719:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  720: 
  721: %% Reconnect Eve from asia (reg cluster) to europe (mim)
  722: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) ->
  723:     Config = escalus_users:update_userspec(Config0, eve, stream_management, true),
  724:     EveSpec = ?config(evespec_reg, Config),
  725:     EveSpec2 = ?config(evespec_mim, Config),
  726:     escalus:fresh_story(
  727:       Config, [{alice, 1}],
  728:       fun(Alice) ->
  729:               {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()),
  730:               escalus_story:send_initial_presence(Eve),
  731:               escalus_client:wait_for_stanza(Eve),
  732: 
  733:               %% Stop connection and wait for process to die
  734:               EveNode = ct:get_config({hosts, reg, node}),
  735:               C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}),
  736:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  737: 
  738:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)),
  739: 
  740:               %% Wait for route message to be queued in c2s message queue
  741:               mongoose_helper:wait_for_route_message_count(C2sPid, 1),
  742: 
  743:               %% Time to do bad nasty things with our socket, so once our process wakes up,
  744:               %% it SHOULD detect a dead socket
  745:               escalus_connection:kill(Eve),
  746: 
  747:               %% Connect another one, we hope the message would be rerouted
  748:               NewEve = connect_from_spec(EveSpec2, Config),
  749: 
  750:               BeforeResume(),
  751: 
  752:               %% Trigger rerouting
  753:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  754: 
  755:               %% Let C2sPid to process the message and reroute (and die finally, poor little thing)
  756:               mongoose_helper:wait_for_pid_to_die(C2sPid),
  757: 
  758:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)),
  759:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)),
  760: 
  761:               AfterCheck(Alice, NewEve)
  762:           end).
  763: 
  764: test_global_disco(Config) ->
  765:     escalus:fresh_story(
  766:       Config, [{alice, 1}, {eve, 1}],
  767:       fun(Alice, Eve) ->
  768:               AliceServer = escalus_client:server(Alice),
  769:               escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)),
  770:               _AliceStanza = escalus:wait_for_stanza(Alice),
  771:               %% TODO: test for duplicate components
  772:               %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza),
  773: 
  774:               EveServer = escalus_client:server(Eve),
  775:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  776:               EveStanza = escalus:wait_for_stanza(Eve),
  777:               escalus:assert(has_service, [muc_helper:muc_host()], EveStanza)
  778:       end).
  779: 
  780: test_component_unregister(_Config) ->
  781:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  782:                        {port, service_port()}, {component, <<"test_service">>}],
  783: 
  784:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  785:     ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  786:                               [<<"test_service.localhost">>])),
  787: 
  788:     component_helper:disconnect_component(Comp, Addr),
  789: 
  790:     ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  791:                             [<<"test_service.localhost">>])).
  792: 
  793: test_error_on_wrong_hosts(_Config) ->
  794:     Opts = module_opts(#{local_host => <<"no_such_host">>}),
  795:     ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed,
  796:                                                  domain := <<"no_such_host">>}, _}}},
  797:                      dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>,
  798:                                                     [{mod_global_distrib, Opts}])).
  799: 
  800: refresh_nodes(Config) ->
  801:     NodesKey = ?config(nodes_key, Config),
  802:     NodeBin = ?config(node_to_expire, Config),
  803:     redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]),
  804:     refresh_mappings(europe_node1, "by_refresh_nodes"),
  805:     {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]).
  806: 
  807: test_in_order_messages_on_multiple_connections(Config) ->
  808:     escalus:fresh_story(
  809:       Config, [{alice, 1}, {eve, 1}],
  810:       fun(Alice, Eve) ->
  811:               Seq = lists:seq(1, 100),
  812:               lists:foreach(
  813:                 fun(I) ->
  814:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  815:                         escalus_client:send(Alice, Stanza)
  816:                 end,
  817:                 Seq),
  818:               lists:foreach(
  819:                 fun(I) ->
  820:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  821:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  822:                 end,
  823:                 Seq)
  824:       end).
  825: 
  826: test_in_order_messages_on_multiple_connections_with_bounce(Config) ->
  827:     escalus:fresh_story(
  828:       Config, [{alice, 1}, {eve, 1}],
  829:       fun(Alice, Eve) ->
  830:               %% Send 99 messages, some while server knows the mapping and some when it doesn't
  831:               send_steps(Alice, Eve, 99, <<"reg1">>),
  832:               %% Make sure that the last message is sent when the mapping is known
  833:               set_mapping(europe_node1, Eve, <<"reg1">>),
  834:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),
  835: 
  836:               %% Check that all stanzas were received in order
  837:               lists:foreach(
  838:                 fun(I) ->
  839:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  840:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  841:                 end,
  842:                 lists:seq(1, 100))
  843:       end).
  844: 
  845: test_messages_bounced_in_order(Config) ->
  846:     escalus:fresh_story(
  847:       Config, [{alice, 1}, {eve, 1}],
  848:       fun(Alice, Eve) ->
  849:               %% Make sure all messages land in bounce storage
  850:               delete_mapping(europe_node1, Eve),
  851: 
  852:               Seq = lists:seq(1, 100),
  853:               lists:foreach(
  854:                 fun(I) ->
  855:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  856:                         escalus_client:send(Alice, Stanza)
  857:                 end,
  858:                 Seq),
  859: 
  860:               %% Restore the mapping so that bounce eventually succeeds
  861:               ?assertEqual(undefined, get_mapping(europe_node1, Eve)),
  862:               set_mapping(europe_node1, Eve, <<"reg1">>),
  863: 
  864:               lists:foreach(
  865:                 fun(I) ->
  866:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  867:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  868:                 end,
  869:                 Seq)
  870:       end).
  871: 
  872: test_update_senders_host(Config) ->
  873:     escalus:fresh_story(
  874:       Config, [{alice, 1}, {eve, 1}],
  875:       fun(Alice, Eve) ->
  876:               AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]),
  877:               {ok, <<"localhost.bis">>}
  878:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]),
  879:               ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]),
  880:               wait_for_node(asia_node, AliceJid),
  881: 
  882:               %% TODO: Should prevent Redis refresher from executing for a moment,
  883:               %%       as it may collide with this test.
  884: 
  885:               escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)),
  886:               escalus:wait_for_stanza(Eve),
  887: 
  888:               {ok, <<"localhost.bis">>}
  889:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid])
  890:       end).
  891: wait_for_node(Node,Jid) ->
  892:     mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end,
  893:                                error,
  894:                                #{time_left => timer:seconds(10),
  895:                                  sleep_time => timer:seconds(1),
  896:                                  name => rpc}).
  897: 
  898: test_update_senders_host_by_ejd_service(Config) ->
  899:     refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"),
  900:     %% Connects to europe_node1
  901:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  902:                        {port, service_port()}, {component, <<"test_service">>}],
  903: 
  904:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  905: 
  906:     escalus:fresh_story(
  907:       Config, [{eve, 1}],
  908:       fun(Eve) ->
  909:               %% Eve is connected to asia_node
  910:               EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]),
  911:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  912:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]),
  913: 
  914:               ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]),
  915:               wait_for_node(europe_node1, EveJid),
  916:               wait_for_node(europe_node2, EveJid),
  917: 
  918:               %% Component is connected to europe_node1
  919:               %% but we force asia_node to connect to europe_node2 by hiding europe_node1
  920:               %% and forcing rebalance (effectively disabling connections to europe_node1)
  921:               %% to verify routing cache update on both nodes
  922: 
  923:               %% TODO: Should prevent Redis refresher from executing for a moment,
  924:               %%       as it may collide with this test.
  925: 
  926:               hide_node(europe_node1, Config),
  927:               {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  928:               trigger_rebalance(asia_node, EuropeHost),
  929: 
  930:               escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)),
  931:               escalus:wait_for_stanza(Comp),
  932: 
  933:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  934:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid])
  935:       end).
  936: 
  937: %% -------------------------------- Rebalancing --------------------------------
  938: 
  939: enable_new_endpoint_on_refresh(Config) ->
  940:     get_connection(europe_node1, <<"reg1">>),
  941: 
  942:     {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  943: 
  944:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  945:     NewEndpoint = resolved_endpoint(ExtraPort),
  946:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  947: 
  948:     {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  949: 
  950:     %% One new pool and one new endpoint
  951:     [NewEndpoint] = Pools2 -- Pools1,
  952:     [] = Pools1 -- Pools2,
  953:     [NewEndpoint] = Enabled2 -- Enabled1,
  954:     [] = Enabled1 -- Enabled2.
  955: 
  956: disable_endpoint_on_refresh(Config) ->
  957:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  958:     NewEndpoint = resolved_endpoint(ExtraPort),
  959:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  960: 
  961:     get_connection(europe_node1, <<"reg1">>),
  962: 
  963:     {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  964:     [_, _] = Enabled1,
  965:     [] = Disabled1,
  966: 
  967:     hide_extra_endpoint(asia_node),
  968:     trigger_rebalance(europe_node1, <<"reg1">>),
  969: 
  970:     {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  971: 
  972:     %% 2 pools open even after disable
  973:     [] = Pools1 -- Pools2,
  974:     [] = Pools2 -- Pools1,
  975:     %% NewEndpoint is no longer enabled
  976:     [] = Enabled2 -- Enabled1,
  977:     [NewEndpoint] = Enabled1 -- Enabled2,
  978:     %% NewEndpoint is now disabled
  979:     [] = Disabled1,
  980:     [NewEndpoint] = Disabled2.
  981: 
  982: wait_for_connection(_Config) ->
  983:     set_endpoints(asia_node, []),
  984:     %% Because of hosts refresher, a pool of connections to asia_node
  985:     %% may already be present here
  986:     mongoose_helper:wait_until(
  987:                                 fun () ->
  988:                                     try trigger_rebalance(europe_node1, <<"reg1">>), true
  989:                                     catch _:_ -> false end
  990:                                 end,
  991:                                 true,
  992:                                 #{name => rebalance, time_left => timer:seconds(5)}),
  993: 
  994:     spawn_connection_getter(europe_node1),
  995: 
  996:     receive
  997:         Unexpected1 -> error({unexpected, Unexpected1})
  998:     after
  999:         2000 -> ok
 1000:     end,
 1001: 
 1002:     refresh_mappings(asia_node, "by_wait_for_connection"),
 1003:     trigger_rebalance(europe_node1, <<"reg1">>),
 1004: 
 1005:     receive
 1006:         Conn when is_pid(Conn) -> ok;
 1007:         Unexpected2 -> error({unexpected, Unexpected2})
 1008:     after
 1009:         5000 -> error(timeout)
 1010:     end.
 1011: 
 1012: closed_connection_is_removed_from_disabled(_Config) ->
 1013:     get_connection(europe_node1, <<"reg1">>),
 1014:     set_endpoints(asia_node, []),
 1015:     trigger_rebalance(europe_node1, <<"reg1">>),
 1016: 
 1017:     {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>),
 1018: 
 1019:     % Will drop connections and prevent them from reconnecting
 1020:     restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]),
 1021: 
 1022:     mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end,
 1023:                                {[], [], []},
 1024:                               #{name => get_outgoing_connections}).
 1025: 
 1026: 
 1027: %%--------------------------------------------------------------------
 1028: %% Test helpers
 1029: %%--------------------------------------------------------------------
 1030: 
 1031: get_port(Host, Param) ->
 1032:     case ct:get_config({hosts, Host, Param}) of
 1033:         Port when is_integer(Port) ->
 1034:             Port;
 1035:         Other ->
 1036:             ct:fail({get_port_failed, Host, Param, Other})
 1037:     end.
 1038: 
 1039: get_hosts() ->
 1040:     [
 1041:      {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)},
 1042:      {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)},
 1043:      {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)}
 1044:     ].
 1045: 
 1046: listen_endpoint(NodeName) ->
 1047:     endpoint(listen_port(NodeName)).
 1048: 
 1049: listen_port(NodeName) ->
 1050:     {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()),
 1051:     Port.
 1052: 
 1053: resolved_endpoint(Port) when is_integer(Port) ->
 1054:     {{127, 0, 0, 1}, Port}.
 1055: 
 1056: endpoint(Port) when is_integer(Port) ->
 1057:     {"127.0.0.1", Port}.
 1058: 
 1059: %% For dynamic_modules
 1060: node_spec(NodeName) ->
 1061:     #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}.
 1062: 
 1063: rpc(NodeName, M, F, A) ->
 1064:     Node = ct:get_config(NodeName),
 1065:     mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)).
 1066: 
 1067: hide_node(NodeName, Config) ->
 1068:     NodesKey = ?config(nodes_key, Config),
 1069:     NodeBin = atom_to_binary(ct:get_config(NodeName), latin1),
 1070:     {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]).
 1071: 
 1072: connect_from_spec(UserSpec, Config) ->
 1073:     {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>),
 1074:     escalus_story:send_initial_presence(User),
 1075:     escalus:assert(is_presence, escalus_client:wait_for_stanza(User)),
 1076:     User.
 1077: 
 1078: chat_with_seqnum(To, Text) ->
 1079:     escalus_stanza:set_id(escalus_stanza:chat_to(To, Text),
 1080:                           integer_to_binary(erlang:monotonic_time())).
 1081: 
 1082: order_by_seqnum(Stanzas) ->
 1083:     lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end,
 1084:                Stanzas).
 1085: 
 1086: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) ->
 1087:     Pred = fun(Item) ->
 1088:                    exml_query:attr(Item, <<"jid">>) =:= Service
 1089:            end,
 1090:     case lists:filter(Pred, Services) of
 1091:         [_] -> true;
 1092:         _ -> false
 1093:     end.
 1094: 
 1095: send_steps(From, To, Max, ToHost) ->
 1096:     next_send_step(From, To, 1, Max, Max div 10, true, ToHost).
 1097: 
 1098: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok;
 1099: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) ->
 1100:     ct:log("Reset: I: ~B", [I]),
 1101:     case KnowsMapping of
 1102:         true -> delete_mapping(europe_node1, To);
 1103:         false -> set_mapping(europe_node1, To, ToHost)
 1104:     end,
 1105:     next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost);
 1106: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) ->
 1107:     ct:log("I: ~B ~B ~B", [I, Max, ToReset]),
 1108:     Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)),
 1109:     escalus_client:send(From, Stanza),
 1110:     next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost).
 1111: 
 1112: get_mapping(Node, Client) ->
 1113:     {FullJid, _BareJid} = jids(Client),
 1114:     {ok, What} = redis_query(Node, [<<"GET">>, FullJid]),
 1115:     What.
 1116: 
 1117: %% Warning! May not work properly with alice or any other user whose
 1118: %% stringprepped JID is different than original one
 1119: delete_mapping(Node, Client) ->
 1120:     {FullJid, BareJid} = jids(Client),
 1121:     redis_query(Node, [<<"DEL">>, FullJid, BareJid]),
 1122:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1123:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1124: 
 1125: set_mapping(Node, Client, Mapping) ->
 1126:     {FullJid, BareJid} = jids(Client),
 1127:     redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]),
 1128:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1129:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1130: 
 1131: jids(Client) ->
 1132:     FullJid = escalus_client:full_jid(Client),
 1133:     BareJid = escalus_client:short_jid(Client),
 1134:     {FullJid, BareJid}.
 1135: 
 1136: redis_query(Node, Query) ->
 1137:     {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]),
 1138:     rpc(Node, eredis, q, [RedisWorker, Query]).
 1139: 
 1140: %% A fake address we don't try to connect to.
 1141: %% Used in test_advertised_endpoints_override_endpoints testcase.
 1142: advertised_endpoints() ->
 1143:     [
 1144:      {fake_domain(), get_port(reg, gd_endpoint_port)}
 1145:     ].
 1146: 
 1147: fake_domain() ->
 1148:     "somefakedomain.com".
 1149: 
 1150: iptuples_to_string([]) ->
 1151:     [];
 1152: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) ->
 1153:     [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)];
 1154: iptuples_to_string([E | Endps]) ->
 1155:     [E | iptuples_to_string(Endps)].
 1156: 
 1157: endpoint_opts(NodeName, ReceiverPort, Config) ->
 1158:     Endpoints = [endpoint(ReceiverPort)],
 1159:     AdvertisedEndpoints =
 1160:         proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints),
 1161:     #{endpoints => Endpoints,
 1162:       resolved_endpoints => [resolved_endpoint(ReceiverPort)],
 1163:       advertised_endpoints => AdvertisedEndpoints}.
 1164: 
 1165: mock_inet_on_each_node() ->
 1166:     Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()),
 1167:     Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes),
 1168:     true = lists:all(fun(Result) -> Result =:= ok end, Results).
 1169: 
 1170: execute_on_each_node(M, F, A) ->
 1171:     lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()).
 1172: 
 1173: mock_inet() ->
 1174:     %% We don't want to mock inet module itself to avoid strange networking issues
 1175:     meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]),
 1176:     meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]};
 1177:                                                        (_, inet6) -> {error, "No ipv6 address"} end).
 1178: 
 1179: unmock_inet(_Pids) ->
 1180:     execute_on_each_node(meck, unload, [mod_global_distrib_utils]).
 1181: 
 1182: out_connection_sups(Node) ->
 1183:     Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]),
 1184:     lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children).
 1185: 
 1186: trees_for_connections_present() ->
 1187:     AsiaChildren = out_connection_sups(asia_node),
 1188:     Europe1Children = out_connection_sups(europe_node1),
 1189:     Europe2Children = out_connection_sups(europe_node2),
 1190:     lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]).
 1191: 
 1192: tree_for_sup_present(Node, ExpectedSup) ->
 1193:     Children = out_connection_sups(Node),
 1194:     lists:keyfind(ExpectedSup, 1, Children) =/= false.
 1195: 
 1196: 
 1197: %% ------------------------------- rebalancing helpers -----------------------------------
 1198: 
 1199: spawn_connection_getter(SenderNode) ->
 1200:     TestPid = self(),
 1201:     spawn(fun() ->
 1202:                   Conn = get_connection(SenderNode, <<"reg1">>),
 1203:                   TestPid ! Conn
 1204:           end).
 1205: 
 1206: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) ->
 1207:     restart_receiver(ListenNode, [Port, listen_port(ListenNode)]),
 1208:     set_endpoints(ListenNode, [Port, listen_port(ListenNode)]),
 1209:     trigger_rebalance(SenderNode, <<"reg1">>).
 1210: 
 1211: get_connection(SenderNode, ToDomain) ->
 1212:     rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]).
 1213: 
 1214: hide_extra_endpoint(ListenNode) ->
 1215:     set_endpoints(ListenNode, [listen_port(ListenNode)]).
 1216: 
 1217: set_endpoints(ListenNode, Ports) ->
 1218:     Endpoints = [endpoint(Port) || Port <- Ports],
 1219:     {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]).
 1220: 
 1221: get_outgoing_connections(NodeName, DestinationDomain) ->
 1222:     Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]),
 1223:     Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]),
 1224:     Enabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1225:                   get_enabled_endpoints, [DestinationDomain]),
 1226:     Disabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1227:                    get_disabled_endpoints, [DestinationDomain]),
 1228:     PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]),
 1229:     Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ],
 1230:     {Enabled, Disabled, Pools}.
 1231: 
 1232: restart_receiver(NodeName) ->
 1233:     restart_receiver(NodeName, [listen_port(NodeName)]).
 1234: 
 1235: restart_receiver(NodeName, NewPorts) ->
 1236:     OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts,
 1237:                                                   [<<"localhost">>, mod_global_distrib_receiver]),
 1238:     NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts],
 1239:                                resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]},
 1240:     NewOpts = OldOpts#{connections := NewConnOpts},
 1241:     Node = node_spec(NodeName),
 1242:     dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts).
 1243: 
 1244: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) ->
 1245:     %% To ensure that the manager exists,
 1246:     %% otherwise we can get noproc error in the force_refresh call
 1247:     ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup,
 1248:              ensure_server_started, [DestinationDomain]),
 1249:     rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]),
 1250:     StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]),
 1251:     ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]),
 1252:     timer:sleep(1000).
 1253: 
 1254: %% -----------------------------------------------------------------------
 1255: %% Escalus-related helpers
 1256: 
 1257: %% Receive messages with Bodies in any order, skipping presences from stream resumption
 1258: user_receives(User, Bodies) ->
 1259:     Opts = #{pred => fun(Stanza) -> not escalus_pred:is_presence(Stanza) end},
 1260:     Checks = [fun(Stanza) -> escalus_pred:is_chat_message(Body, Stanza) end || Body <- Bodies],
 1261:     escalus:assert_many(Checks, [escalus_connection:receive_stanza(User, Opts) || _ <- Bodies]).
 1262: 
 1263: %% -----------------------------------------------------------------------
 1264: %% Refreshing helpers
 1265: 
 1266: %% Reason is a string
 1267: %% NodeName is asia_node, europe_node2, ... in a format used by this suite.
 1268: refresh_mappings(NodeName, Reason) when is_list(Reason) ->
 1269:     rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]).
 1270: 
 1271: refresh_hosts(NodeNames, Reason) ->
 1272:    [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames].
 1273: 
 1274: 
 1275: %% -----------------------------------------------------------------------
 1276: %% Other helpers
 1277: 
 1278: connect_steps_with_sm() ->
 1279:     [start_stream, stream_features, maybe_use_ssl,
 1280:      authenticate, bind, session, stream_resumption].
 1281: 
 1282: bare_client(Client) ->
 1283:     Client#client{jid = escalus_utils:get_short_jid(Client)}.
 1284: 
 1285: service_port() ->
 1286:     ct:get_config({hosts, mim, service_port}).
 1287: 
 1288: 
 1289: %% -----------------------------------------------------------------------
 1290: %% Waiting helpers
 1291: 
 1292: wait_for_domain(Node, Domain) ->
 1293:     F = fun() ->
 1294:                 Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
 1295:                 lists:member(Domain, Domains)
 1296:         end,
 1297:     mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}).
 1298: 
 1299: 
 1300: %% -----------------------------------------------------------------------
 1301: %% Ensure, that endpoints are up
 1302: 
 1303: wait_for_listeners_to_appear() ->
 1304:     [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())].
 1305: 
 1306: receiver_ports(Hosts) ->
 1307:     lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts).
 1308: 
 1309: wait_for_can_connect_to_port(Port) ->
 1310:     Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}},
 1311:     mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts).
 1312: 
 1313: can_connect_to_port(Port) ->
 1314:     case gen_tcp:connect("127.0.0.1", Port, []) of
 1315:         {ok, Sock} ->
 1316:             gen_tcp:close(Sock),
 1317:             true;
 1318:         Other ->
 1319:             ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]),
 1320:             false
 1321:     end.
 1322: 
 1323: %% -----------------------------------------------------------------------
 1324: %% Custom log levels for GD modules during the tests
 1325: 
 1326: enable_logging() ->
 1327:     mim_loglevel:enable_logging(test_hosts(), custom_loglevels()).
 1328: 
 1329: disable_logging() ->
 1330:     mim_loglevel:disable_logging(test_hosts(), custom_loglevels()).
 1331: 
 1332: custom_loglevels() ->
 1333:     %% for "s2s connection to muc.localhost not found" debugging
 1334:     [{ejabberd_s2s, debug},
 1335:     %% for debugging event=refreshing_own_data_done
 1336:      {mod_global_distrib_mapping_redis, info},
 1337:     %% to know if connection is already started or would be started
 1338:     %% event=outgoing_conn_start_progress
 1339:      {mod_global_distrib_outgoing_conns_sup, info},
 1340:     %% to debug bound connection issues
 1341:      {mod_global_distrib, debug},
 1342:     %% to know all new connections pids
 1343:      {mod_global_distrib_connection, debug},
 1344:     %% to check if gc or refresh is triggered
 1345:      {mod_global_distrib_server_mgr, info},
 1346:    %% To debug incoming connections
 1347: %    {mod_global_distrib_receiver, info},
 1348:    %% to debug global session set/delete
 1349:      {mod_global_distrib_mapping, debug}
 1350:     ].
 1351: 
 1352: test_hosts() -> [mim, mim2, reg].