1: %%==============================================================================
    2: %% Copyright 2017 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mod_global_distrib_SUITE).
   18: 
   19: -compile([export_all, nowarn_export_all]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("common_test/include/ct.hrl").
   23: -include_lib("eunit/include/eunit.hrl").
   24: -include_lib("escalus/include/escalus_xmlns.hrl").
   25: -include_lib("exml/include/exml.hrl").
   26: 
   27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms
   28: 
   29: -import(domain_helper, [domain/0]).
   30: -import(config_parser_helper, [config/2, mod_config/2]).
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Suite configuration
   34: %%--------------------------------------------------------------------
   35: 
   36: all() ->
   37:     [
   38:      {group, mod_global_distrib},
   39:      {group, cluster_restart},
   40:      {group, start_checks},
   41:      {group, invalidation},
   42:      {group, multi_connection},
   43:      {group, rebalancing},
   44:      {group, advertised_endpoints},
   45:      {group, hosts_refresher}
   46:     ].
   47: 
   48: groups() ->
   49:     [{mod_global_distrib, [],
   50:           [
   51:            test_pm_between_users_at_different_locations,
   52:            test_pm_between_users_before_available_presence,
   53:            test_component_disconnect ,
   54:            test_component_on_one_host,
   55:            test_components_in_different_regions,
   56:            test_hidden_component_disco_in_different_region,
   57:            test_pm_with_disconnection_on_other_server,
   58:            test_pm_with_graceful_reconnection_to_different_server,
   59:            test_pm_with_ungraceful_reconnection_to_different_server,
   60:            test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first,
   61:            test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first,
   62:            test_component_unregister,
   63:            test_update_senders_host,
   64:            test_update_senders_host_by_ejd_service,
   65: 
   66:            %% with node 2 disabled
   67:            test_muc_conversation_on_one_host,
   68:            test_global_disco
   69:           ]},
   70:          {hosts_refresher, [],
   71:           [test_host_refreshing]},
   72:          {cluster_restart, [],
   73:           [
   74:            test_location_disconnect
   75:           ]},
   76:          {start_checks, [],
   77:           [
   78:            test_error_on_wrong_hosts
   79:           ]},
   80:          {invalidation, [],
   81:           [
   82:            % TODO: Add checks for other mapping refreshes
   83:            refresh_nodes
   84:           ]},
   85:          {multi_connection, [],
   86:           [
   87:            test_in_order_messages_on_multiple_connections,
   88:            test_in_order_messages_on_multiple_connections_with_bounce,
   89:            test_messages_bounced_in_order,
   90: 
   91:            %% with node 2 disabled
   92:            test_muc_conversation_history
   93:           ]},
   94:          {rebalancing, [],
   95:           [
   96:            enable_new_endpoint_on_refresh,
   97:            disable_endpoint_on_refresh,
   98:            wait_for_connection,
   99:            closed_connection_is_removed_from_disabled
  100:           ]},
  101:          {advertised_endpoints, [],
  102:           [
  103:            test_advertised_endpoints_override_endpoints,
  104:            test_pm_between_users_at_different_locations
  105:           ]}
  106:     ].
  107: 
  108: suite() ->
  109:     [{require, europe_node1, {hosts, mim, node}},
  110:      {require, europe_node2, {hosts, mim2, node}},
  111:      {require, asia_node, {hosts, reg, node}},
  112:      {require, c2s_port, {hosts, mim, c2s_port}} |
  113:      escalus:suite()].
  114: 
  115: %%--------------------------------------------------------------------
  116: %% Init & teardown
  117: %%--------------------------------------------------------------------
  118: 
  119: init_per_suite(Config) ->
  120:     case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]),
  121:           rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of
  122:         {{ok, _}, {ok, _}} ->
  123:             ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]),
  124: 
  125:             enable_logging(),
  126:             escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]);
  127:         Result ->
  128:             ct:pal("Redis check result: ~p", [Result]),
  129:             {skip, "GD Redis default pool not available"}
  130:     end.
  131: 
  132: end_per_suite(Config) ->
  133:     disable_logging(),
  134:     escalus_fresh:clean(),
  135:     rpc(europe_node2, mongoose_cluster, leave, []),
  136:     escalus:end_per_suite(Config).
  137: 
  138: init_per_group(start_checks, Config) ->
  139:     NodeName = europe_node1,
  140:     NodeSpec = node_spec(NodeName),
  141:     Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config),
  142:     dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]),
  143:     Config1;
  144: init_per_group(multi_connection, Config) ->
  145:     ExtraConfig = #{bounce => #{resend_after_ms => 20000},
  146:                     connections => #{%% Disable unused feature to avoid interference
  147:                                      connections_per_endpoint => 100,
  148:                                      disabled_gc_interval => 10000}},
  149:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  150: init_per_group(invalidation, Config) ->
  151:     Config1 = init_per_group(invalidation_generic, Config),
  152:     NodeBin = <<"fake_node@localhost">>,
  153:     [{node_to_expire, NodeBin} | Config1];
  154: init_per_group(rebalancing, Config) ->
  155:     %% We need to prevent automatic refreshes, because they may interfere with tests
  156:     %% and we need early disabled garbage collection to check its validity
  157:     ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600,
  158:                                      endpoint_refresh_interval_when_empty => 3600,
  159:                                      disabled_gc_interval => 1},
  160:                     redis => #{refresh_after => 3600}},
  161:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  162: init_per_group(advertised_endpoints, Config) ->
  163:     lists:foreach(fun({NodeName, _, _}) ->
  164:                           Node = ct:get_config(NodeName),
  165:                           mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload)
  166:                   end, get_hosts()),
  167:     mock_inet_on_each_node(),
  168:     init_per_group_generic(
  169:                [{add_advertised_endpoints,
  170:                  [{asia_node, advertised_endpoints()}]} | Config]);
  171: init_per_group(mod_global_distrib, Config) ->
  172:     %% Disable mod_global_distrib_mapping_redis refresher
  173:     ExtraConfig = #{redis => #{refresh_after => 3600}},
  174:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  175: init_per_group(_, Config) ->
  176:     init_per_group_generic(Config).
  177: 
  178: init_per_group_generic(Config0) ->
  179:     Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()),
  180:     wait_for_listeners_to_appear(),
  181:     {SomeNode, _, _} = hd(get_hosts()),
  182:     NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []),
  183:     [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2].
  184: 
  185: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) ->
  186:     Extra0 = module_opts(?config(extra_config, Config0)),
  187:     EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0),
  188:     ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})),
  189:     Extra = Extra0#{local_host => LocalHost, connections => ConnExtra},
  190:     Opts = module_opts(Extra),
  191: 
  192:     VirtHosts = virtual_hosts(NodeName),
  193:     Node = node_spec(NodeName),
  194:     Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0),
  195: 
  196:     %% To reduce load when sending many messages
  197:     ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last,
  198:                      mod_stream_management],
  199:     [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts],
  200: 
  201:     SMOpts = config_parser_helper:mod_config(mod_stream_management, #{resume_timeout => 1}),
  202:     dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts},
  203:                                                     {mod_stream_management, SMOpts}]),
  204:     Config1.
  205: 
  206: module_opts(ExtraOpts) ->
  207:     lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]).
  208: 
  209: set_opts(common, Opts) ->
  210:     maps:merge(#{global_host => <<"localhost">>,
  211:                  hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts);
  212: set_opts(defaults, Opts) ->
  213:     mod_config(mod_global_distrib, Opts);
  214: set_opts(connections, #{connections := ConnExtra} = Opts) ->
  215:     TLSOpts = config([modules, mod_global_distrib, connections, tls],
  216:                      #{certfile => "priv/ssl/fake_server.pem",
  217:                        cacertfile => "priv/ssl/ca/cacert.pem"}),
  218:     Opts#{connections := config([modules, mod_global_distrib, connections],
  219:                                 maps:merge(#{tls => TLSOpts}, ConnExtra))};
  220: set_opts(redis, #{redis := RedisExtra} = Opts) ->
  221:     Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)};
  222: set_opts(bounce, #{bounce := BounceExtra} = Opts) ->
  223:     Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)};
  224: set_opts(_, Opts) ->
  225:     Opts.
  226: 
  227: end_per_group(advertised_endpoints, Config) ->
  228:     Pids = ?config(meck_handlers, Config),
  229:     unmock_inet(Pids),
  230:     escalus_fresh:clean(),
  231:     end_per_group_generic(Config);
  232: end_per_group(start_checks, Config) ->
  233:     escalus_fresh:clean(),
  234:     Config;
  235: end_per_group(invalidation, Config) ->
  236:     redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config),
  237:                             ?config(node_to_expire, Config)]),
  238:     end_per_group_generic(Config);
  239: end_per_group(_, Config) ->
  240:     end_per_group_generic(Config).
  241: 
  242: end_per_group_generic(Config) ->
  243:     dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config).
  244: 
  245: init_per_testcase(CaseName, Config)
  246:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  247:        CaseName == test_muc_conversation_history ->
  248:     %% There is no helper to load MUC on node2
  249:     %% For now it's easier to hide node2
  250:     %% TODO: Do it right at some point!
  251:     hide_node(europe_node2, Config),
  252:     %% There would be no new connections to europe_node2, but there can be some old ones.
  253:     %% We need to disconnect previous connections.
  254:     {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  255:     trigger_rebalance(asia_node, EuropeHost),
  256:     %% Load muc on mim node
  257:     muc_helper:load_muc(),
  258:     RegNode = ct:get_config({hosts, reg, node}),
  259:     %% Wait for muc.localhost to become visible from reg node
  260:     wait_for_domain(RegNode, muc_helper:muc_host()),
  261:     escalus:init_per_testcase(CaseName, Config);
  262: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  263:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server;
  264:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  265:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  266:     escalus:init_per_testcase(CN, init_user_eve(Config));
  267: init_per_testcase(CaseName, Config) ->
  268:     escalus:init_per_testcase(CaseName, Config).
  269: 
  270: init_user_eve(Config) ->
  271:     %% Register Eve in reg cluster
  272:     EveSpec = escalus_fresh:create_fresh_user(Config, eve),
  273:     MimPort = ct:get_config({hosts, mim, c2s_port}),
  274:     EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}),
  275:     %% Register Eve in mim cluster
  276:     escalus:create_users(Config, [{eve, EveSpec2}]),
  277:     [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config].
  278: 
  279: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  280:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server;
  281:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  282:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  283:     MimEveSpec = ?config(evespec_mim, Config),
  284:     %% Clean Eve from reg cluster
  285:     escalus_fresh:clean(),
  286:     %% Clean Eve from mim cluster
  287:     %% For shared databases (i.e. mysql, pgsql...),
  288:     %% removing from one cluster would remove from all clusters.
  289:     %% For mnesia auth backend we need to call removal from each cluster.
  290:     %% That's why there is a catch here.
  291:     catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]),
  292:     generic_end_per_testcase(CN, Config);
  293: end_per_testcase(CaseName, Config)
  294:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  295:        CaseName == test_muc_conversation_history ->
  296:     refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)),
  297:     muc_helper:unload_muc(),
  298:     generic_end_per_testcase(CaseName, Config);
  299: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) ->
  300:     refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  301:     generic_end_per_testcase(CN, Config);
  302: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh;
  303:                                   CN == disable_endpoint_on_refresh;
  304:                                   CN == wait_for_connection;
  305:                                   CN == closed_connection_is_removed_from_disabled ->
  306:     restart_receiver(asia_node),
  307:     refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  308:     generic_end_per_testcase(CN, Config);
  309: end_per_testcase(CaseName, Config) ->
  310:     generic_end_per_testcase(CaseName, Config).
  311: 
  312: generic_end_per_testcase(CaseName, Config) ->
  313:     lists:foreach(
  314:       fun({NodeName, _, _}) ->
  315:               %% TODO: Enable refresher only for specific test cases,
  316:               %% as some of them are based on assumption that node(s)
  317:               %% must open new connections during tests.
  318:               pause_refresher(NodeName, CaseName),
  319:               Node = ct:get_config(NodeName),
  320:               SupRef = {mod_global_distrib_outgoing_conns_sup, Node},
  321:               try
  322:                   OutgoingConns = supervisor:which_children(SupRef),
  323:                   lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) ->
  324:                                         skip;
  325:                                     ({Id, _, _, _}) ->
  326:                                         supervisor:terminate_child(SupRef, Id)
  327:                                 end, OutgoingConns),
  328:                   [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] =
  329:                     supervisor:which_children(SupRef)
  330:               catch
  331:                   _:{noproc, _} ->
  332:                       ct:pal("Sender supervisor not found in ~p", [NodeName])
  333:               end,
  334:               unpause_refresher(NodeName, CaseName)
  335:       end,
  336:       get_hosts()),
  337:     escalus:end_per_testcase(CaseName, Config).
  338: 
  339: virtual_hosts(asia_node) ->
  340:     [domain()];
  341: virtual_hosts(_) ->
  342:     [domain(), secondary_domain()].
  343: 
  344: secondary_domain() ->
  345:     ct:get_config({hosts, mim, secondary_domain}).
  346: 
  347: %% Refresher is not started at all or stopped for some test cases
  348: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  349: pause_refresher(_, test_error_on_wrong_hosts) ->
  350:     ok;
  351: pause_refresher(asia_node, test_location_disconnect) ->
  352:     ok;
  353: pause_refresher(NodeName, _) ->
  354:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []).
  355: 
  356: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  357: unpause_refresher(_, test_error_on_wrong_hosts) ->
  358:     ok;
  359: unpause_refresher(asia_node, test_location_disconnect) ->
  360:     ok;
  361: unpause_refresher(NodeName, _) ->
  362:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []).
  363: 
  364: %%--------------------------------------------------------------------
  365: %% Service discovery test
  366: %%--------------------------------------------------------------------
  367: 
  368: %% Requires module mod_global_distrib to be started with argument advertised_endpoints
  369: %% for each host in get_hosts().
  370: %% Reads Redis to confirm that endpoints (in Redis) are overwritten
  371: %% with `advertised_endpoints` option value
  372: test_advertised_endpoints_override_endpoints(_Config) ->
  373:     Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]),
  374:     true = lists:all(
  375:              fun(E) ->
  376:                      lists:sort(E) =:= lists:sort(advertised_endpoints())
  377:              end, Endps).
  378: 
  379: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if
  380: %% it goes down for some reason (crash or domain unavailability).
  381: %% Also actually verifies that refresher properly reads host list
  382: %% from backend and starts appropriate pool.
  383: test_host_refreshing(_Config) ->
  384:     mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true,
  385:                                #{name => trees_for_connections_present,
  386:                                  time_left => timer:seconds(10)}),
  387:     ConnectionSups = out_connection_sups(asia_node),
  388:     {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  389:     EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]),
  390:     {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups),
  391:     erlang:exit(EuropePid, kill), % it's ok to kill temporary process
  392:     mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true,
  393:                                #{name => tree_for_sup_present}).
  394: 
  395: %% When run in mod_global_distrib group - tests simple case of connection
  396: %% between two users connected to different clusters.
  397: %% When run in advertised_endpoints group it tests whether it is possible
  398: %% to connect to a node that is advertising itself with a domain name.
  399: test_pm_between_users_at_different_locations(Config) ->
  400:     escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2).
  401: 
  402: test_pm_between_users_before_available_presence(Config) ->
  403:     Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]),
  404:     {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>),
  405:     {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>),
  406: 
  407:     test_two_way_pm(Alice, Eve),
  408: 
  409:     escalus_client:stop(Config1, Alice),
  410:     escalus_client:stop(Config1, Eve).
  411: 
  412: test_two_way_pm(Alice, Eve) ->
  413:     escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)),
  414:     escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)),
  415: 
  416:     FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)),
  417:     FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)),
  418: 
  419:     AliceJid = escalus_client:full_jid(Alice),
  420:     EveJid = escalus_client:full_jid(Eve),
  421: 
  422:     escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>],
  423:                    FromAlice),
  424:     escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>],
  425:                    FromEve).
  426: 
  427: test_muc_conversation_on_one_host(Config0) ->
  428:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  429:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  430:     escalus:fresh_story(
  431:       Config, [{eve, 1}],
  432:       fun(Eve) ->
  433:               Alice = connect_from_spec(AliceSpec, Config),
  434: 
  435:               RoomJid = ?config(room, Config),
  436:               AliceUsername = escalus_utils:get_username(Alice),
  437:               EveUsername = escalus_utils:get_username(Eve),
  438:               RoomAddr = muc_helper:room_address(RoomJid),
  439: 
  440:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  441:               wait_for_muc_presence(Alice, RoomJid, AliceUsername),
  442:               wait_for_subject(Alice),
  443: 
  444:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  445:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  446:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  447:               wait_for_muc_presence(Alice, RoomJid, EveUsername),
  448:               wait_for_subject(Eve),
  449: 
  450:               Msg= <<"Hi, Eve!">>,
  451:               escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)),
  452:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)),
  453:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)),
  454: 
  455:               Msg2= <<"Hi, Alice!">>,
  456:               escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)),
  457:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)),
  458:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice))
  459:       end),
  460:     muc_helper:destroy_room(Config).
  461: 
  462: test_muc_conversation_history(Config0) ->
  463:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  464:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  465:     escalus:fresh_story(
  466:       Config, [{eve, 1}],
  467:       fun(Eve) ->
  468:               Alice = connect_from_spec(AliceSpec, Config),
  469: 
  470:               RoomJid = ?config(room, Config),
  471:               AliceUsername = escalus_utils:get_username(Alice),
  472:               RoomAddr = muc_helper:room_address(RoomJid),
  473: 
  474:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  475:               wait_for_muc_presence(Alice, RoomJid, AliceUsername),
  476:               wait_for_subject(Alice),
  477: 
  478:               send_n_muc_messages(Alice, RoomAddr, 3),
  479: 
  480:               %% Ensure that the messages are received by the room
  481:               %% before trying to login Eve.
  482:               %% Otherwise, Eve would receive some messages from history and
  483:               %% some as regular groupchat messages.
  484:               receive_n_muc_messages(Alice, 3),
  485: 
  486:               EveUsername = escalus_utils:get_username(Eve),
  487:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  488: 
  489:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  490:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  491:               wait_for_muc_presence(Alice, RoomJid, EveUsername),
  492: 
  493:               %% XEP-0045: After sending the presence broadcast (and only after doing so),
  494:               %% the service MAY then send discussion history, the room subject,
  495:               %% live messages, presence updates, and other in-room traffic.
  496:               receive_n_muc_messages(Eve, 3),
  497:               wait_for_subject(Eve)
  498:       end),
  499:     muc_helper:destroy_room(Config).
  500: 
  501: wait_for_muc_presence(User, RoomJid, FromNickname) ->
  502:     Presence = escalus:wait_for_stanza(User),
  503:     escalus:assert(is_presence, Presence),
  504:     escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence),
  505:     ok.
  506: 
  507: wait_for_subject(User) ->
  508:     Subject = escalus:wait_for_stanza(User),
  509:     escalus:assert(is_groupchat_message, Subject),
  510:     ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)),
  511:     ok.
  512: 
  513: send_n_muc_messages(User, RoomAddr, N) ->
  514:     lists:foreach(fun(I) ->
  515:                           Msg = <<"test-", (integer_to_binary(I))/binary>>,
  516:                           escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg))
  517:                   end, lists:seq(1, N)).
  518: 
  519: receive_n_muc_messages(User, N) ->
  520:     lists:foreach(fun(J) ->
  521:                           Msg = <<"test-", (integer_to_binary(J))/binary>>,
  522:                           Stanza = escalus:wait_for_stanza(User),
  523:                           escalus:assert(is_groupchat_message, [Msg], Stanza)
  524:                             end, lists:seq(1, N)).
  525: 
  526: test_component_on_one_host(Config) ->
  527:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  528:                        {port, service_port()}, {component, <<"test_service">>}],
  529: 
  530:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  531: 
  532:     Story = fun(User) ->
  533:                     Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>),
  534:                     escalus:send(User, Msg1),
  535:                     %% Then component receives it
  536:                     Reply1 = escalus:wait_for_stanza(Comp),
  537:                     escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1),
  538: 
  539:                     %% When components sends a reply
  540:                     Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>),
  541:                     escalus:send(Comp, escalus_stanza:from(Msg2, Addr)),
  542: 
  543:                     %% Then Alice receives it
  544:                     Reply2 = escalus:wait_for_stanza(User),
  545:                     escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2),
  546:                     escalus:assert(is_stanza_from, [Addr], Reply2)
  547:             end,
  548: 
  549:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  550: 
  551: %% Ensures that 2 components in distinct data centers can communicate.
  552: test_components_in_different_regions(_Config) ->
  553:     ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>},
  554:                              {server, <<"localhost">>}, {component, <<"test_service">>}],
  555:     Comp1Port = ct:get_config({hosts, mim, service_port}),
  556:     Comp2Port = ct:get_config({hosts, reg, service_port}),
  557:     Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig],
  558:     Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig],
  559: 
  560:     {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config),
  561:     {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config),
  562: 
  563:     Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1),
  564:     escalus:send(Comp1, Msg1),
  565:     GotMsg1 = escalus:wait_for_stanza(Comp2),
  566:     escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1),
  567: 
  568:     Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2),
  569:     escalus:send(Comp2, Msg2),
  570:     GotMsg2 = escalus:wait_for_stanza(Comp1),
  571:     escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2).
  572: 
  573: %% Ordinary user is not able to discover the hidden component from GD
  574: test_hidden_component_disco_in_different_region(Config) ->
  575:     %% Hidden component from component_SUITE connects to mim1/europe_node1
  576:     HiddenComponentConfig = component_helper:spec(hidden_component, Config),
  577:     {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig),
  578: 
  579:     escalus:fresh_story(
  580:       Config, [{eve, 1}],
  581:       fun(Eve) ->
  582:               EveServer = escalus_client:server(Eve),
  583:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  584:               DiscoReply = escalus:wait_for_stanza(Eve),
  585:               escalus:assert(is_iq_result, DiscoReply),
  586:               escalus:assert(fun(Stanza) ->
  587:                                      not escalus_pred:has_service(HiddenAddr, Stanza)
  588:                              end, DiscoReply)
  589:       end).
  590: 
  591: test_component_disconnect(Config) ->
  592:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  593:                        {port, service_port()}, {component, <<"test_service">>}],
  594: 
  595:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  596:     component_helper:disconnect_component(Comp, Addr),
  597: 
  598:     Story = fun(User) ->
  599:                     escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
  600:                     Error = escalus:wait_for_stanza(User, 5000),
  601:                     escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  602:             end,
  603: 
  604:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  605: 
  606: test_location_disconnect(Config) ->
  607:     try
  608:         escalus:fresh_story(
  609:           Config, [{alice, 1}, {eve, 1}],
  610:           fun(Alice, Eve) ->
  611:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  612: 
  613:                   escalus_client:wait_for_stanza(Eve),
  614: 
  615:                   ok = rpc(asia_node, application, stop, [mongooseim]),
  616:                   %% TODO: Stopping mongooseim alone should probably stop connections too
  617:                   ok = rpc(asia_node, application, stop, [ranch]),
  618: 
  619:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)),
  620:                   Error = escalus:wait_for_stanza(Alice),
  621:                   escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  622:           end)
  623:     after
  624:         rpc(asia_node, application, start, [ranch]),
  625:         rpc(asia_node, application, start, [mongooseim])
  626:     end.
  627: 
  628: test_pm_with_disconnection_on_other_server(Config) ->
  629:     escalus:fresh_story(
  630:       Config, [{alice, 1}, {eve, 1}],
  631:       fun(Alice, Eve) ->
  632:               escalus_connection:stop(Eve),
  633:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  634:               FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000),
  635:               escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce)
  636:       end).
  637: 
  638: test_pm_with_graceful_reconnection_to_different_server(Config) ->
  639:     EveSpec = ?config(evespec_reg, Config),
  640:     EveSpec2 = ?config(evespec_mim, Config),
  641:     escalus:fresh_story(
  642:       Config, [{alice, 1}],
  643:       fun(Alice) ->
  644:               Eve = connect_from_spec(EveSpec, Config),
  645: 
  646:               escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)),
  647: 
  648:               %% Stop connection and wait for process to die
  649:               EveNode = ct:get_config({hosts, reg, node}),
  650:               mongoose_helper:logout_user(Config, Eve, #{node => EveNode}),
  651: 
  652:               FromEve = escalus_client:wait_for_stanza(Alice),
  653: 
  654:               %% Pause Alice until Eve is reconnected
  655:               AliceNode = ct:get_config({hosts, mim, node}),
  656:               C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}),
  657:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  658: 
  659:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)),
  660: 
  661:               NewEve = connect_from_spec(EveSpec2, Config),
  662: 
  663:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  664: 
  665: 
  666:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)),
  667:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)),
  668: 
  669:               FirstFromAlice = escalus_client:wait_for_stanza(NewEve),
  670:               AgainFromEve = escalus_client:wait_for_stanza(Alice),
  671:               SecondFromAlice = escalus_client:wait_for_stanza(NewEve),
  672: 
  673:               [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]),
  674: 
  675:               escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice),
  676:               escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve),
  677:               escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice),
  678:               escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve)
  679:           end).
  680: 
  681: %% Refresh logic can cause two possible behaviours.
  682: %% We test both behaviours here (plus no refresh case)
  683: %% See PR #2392
  684: test_pm_with_ungraceful_reconnection_to_different_server(Config) ->
  685:     %% No refresh
  686:     BeforeResume = fun() -> ok end,
  687:     AfterCheck = fun(Alice, NewEve) ->
  688:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  689:             user_receives(Alice, [<<"Hi from Europe!">>])
  690:          end,
  691:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  692: 
  693: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) ->
  694:     %% Same as no refresh
  695:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first",
  696:     % Order of nodes is important here in refresh_hosts!
  697:     BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end,
  698:     AfterCheck = fun(Alice, NewEve) ->
  699:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  700:             user_receives(Alice, [<<"Hi from Europe!">>])
  701:          end,
  702:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  703: 
  704: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) ->
  705:     %% Asia node overrides Europe value with the older ones,
  706:     %% so we loose some messages during rerouting :(
  707:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first",
  708:     BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end,
  709:     AfterCheck = fun(Alice, NewEve) ->
  710:             user_receives(NewEve, [<<"Hi again from Europe1!">>]),
  711:             user_receives(Alice, [<<"Hi from Europe!">>])
  712:          end,
  713:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  714: 
  715: %% Reconnect Eve from asia (reg cluster) to europe (mim)
  716: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) ->
  717:     Config = escalus_users:update_userspec(Config0, eve, stream_management, true),
  718:     EveSpec = ?config(evespec_reg, Config),
  719:     EveSpec2 = ?config(evespec_mim, Config),
  720:     escalus:fresh_story(
  721:       Config, [{alice, 1}],
  722:       fun(Alice) ->
  723:               {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()),
  724:               escalus_story:send_initial_presence(Eve),
  725:               escalus_client:wait_for_stanza(Eve),
  726: 
  727:               %% Stop connection and wait for process to die
  728:               EveNode = ct:get_config({hosts, reg, node}),
  729:               C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}),
  730:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  731: 
  732:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)),
  733: 
  734:               %% Wait for route message to be queued in c2s message queue
  735:               mongoose_helper:wait_for_route_message_count(C2sPid, 1),
  736: 
  737:               %% Time to do bad nasty things with our socket, so once our process wakes up,
  738:               %% it SHOULD detect a dead socket
  739:               escalus_connection:kill(Eve),
  740: 
  741:               %% Connect another one, we hope the message would be rerouted
  742:               NewEve = connect_from_spec(EveSpec2, Config),
  743: 
  744:               BeforeResume(),
  745: 
  746:               %% Trigger rerouting
  747:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  748: 
  749:               %% Let C2sPid to process the message and reroute (and die finally, poor little thing)
  750:               mongoose_helper:wait_for_pid_to_die(C2sPid),
  751: 
  752:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)),
  753:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)),
  754: 
  755:               AfterCheck(Alice, NewEve)
  756:           end).
  757: 
  758: test_global_disco(Config) ->
  759:     escalus:fresh_story(
  760:       Config, [{alice, 1}, {eve, 1}],
  761:       fun(Alice, Eve) ->
  762:               AliceServer = escalus_client:server(Alice),
  763:               escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)),
  764:               _AliceStanza = escalus:wait_for_stanza(Alice),
  765:               %% TODO: test for duplicate components
  766:               %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza),
  767: 
  768:               EveServer = escalus_client:server(Eve),
  769:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  770:               EveStanza = escalus:wait_for_stanza(Eve),
  771:               escalus:assert(has_service, [muc_helper:muc_host()], EveStanza)
  772:       end).
  773: 
  774: test_component_unregister(_Config) ->
  775:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  776:                        {port, service_port()}, {component, <<"test_service">>}],
  777: 
  778:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  779:     ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  780:                               [<<"test_service.localhost">>])),
  781: 
  782:     component_helper:disconnect_component(Comp, Addr),
  783: 
  784:     ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  785:                             [<<"test_service.localhost">>])).
  786: 
  787: test_error_on_wrong_hosts(_Config) ->
  788:     Opts = module_opts(#{local_host => <<"no_such_host">>}),
  789:     ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed,
  790:                                                  domain := <<"no_such_host">>}, _}}},
  791:                      dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>,
  792:                                                     [{mod_global_distrib, Opts}])).
  793: 
  794: refresh_nodes(Config) ->
  795:     NodesKey = ?config(nodes_key, Config),
  796:     NodeBin = ?config(node_to_expire, Config),
  797:     redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]),
  798:     refresh_mappings(europe_node1, "by_refresh_nodes"),
  799:     {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]).
  800: 
  801: test_in_order_messages_on_multiple_connections(Config) ->
  802:     escalus:fresh_story(
  803:       Config, [{alice, 1}, {eve, 1}],
  804:       fun(Alice, Eve) ->
  805:               Seq = lists:seq(1, 100),
  806:               lists:foreach(
  807:                 fun(I) ->
  808:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  809:                         escalus_client:send(Alice, Stanza)
  810:                 end,
  811:                 Seq),
  812:               lists:foreach(
  813:                 fun(I) ->
  814:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  815:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  816:                 end,
  817:                 Seq)
  818:       end).
  819: 
  820: test_in_order_messages_on_multiple_connections_with_bounce(Config) ->
  821:     escalus:fresh_story(
  822:       Config, [{alice, 1}, {eve, 1}],
  823:       fun(Alice, Eve) ->
  824:               %% Send 99 messages, some while server knows the mapping and some when it doesn't
  825:               send_steps(Alice, Eve, 99, <<"reg1">>),
  826:               %% Make sure that the last message is sent when the mapping is known
  827:               set_mapping(europe_node1, Eve, <<"reg1">>),
  828:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),
  829: 
  830:               %% Check that all stanzas were received in order
  831:               lists:foreach(
  832:                 fun(I) ->
  833:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  834:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  835:                 end,
  836:                 lists:seq(1, 100))
  837:       end).
  838: 
  839: test_messages_bounced_in_order(Config) ->
  840:     escalus:fresh_story(
  841:       Config, [{alice, 1}, {eve, 1}],
  842:       fun(Alice, Eve) ->
  843:               %% Make sure all messages land in bounce storage
  844:               delete_mapping(europe_node1, Eve),
  845: 
  846:               Seq = lists:seq(1, 100),
  847:               lists:foreach(
  848:                 fun(I) ->
  849:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  850:                         escalus_client:send(Alice, Stanza)
  851:                 end,
  852:                 Seq),
  853: 
  854:               %% Restore the mapping so that bounce eventually succeeds
  855:               ?assertEqual(undefined, get_mapping(europe_node1, Eve)),
  856:               set_mapping(europe_node1, Eve, <<"reg1">>),
  857: 
  858:               lists:foreach(
  859:                 fun(I) ->
  860:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  861:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  862:                 end,
  863:                 Seq)
  864:       end).
  865: 
  866: test_update_senders_host(Config) ->
  867:     escalus:fresh_story(
  868:       Config, [{alice, 1}, {eve, 1}],
  869:       fun(Alice, Eve) ->
  870:               AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]),
  871:               {ok, <<"localhost.bis">>}
  872:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]),
  873:               ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]),
  874:               wait_for_node(asia_node, AliceJid),
  875: 
  876:               %% TODO: Should prevent Redis refresher from executing for a moment,
  877:               %%       as it may collide with this test.
  878: 
  879:               escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)),
  880:               escalus:wait_for_stanza(Eve),
  881: 
  882:               {ok, <<"localhost.bis">>}
  883:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid])
  884:       end).
  885: wait_for_node(Node,Jid) ->
  886:     mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end,
  887:                                error,
  888:                                #{time_left => timer:seconds(10),
  889:                                  sleep_time => timer:seconds(1),
  890:                                  name => rpc}).
  891: 
  892: test_update_senders_host_by_ejd_service(Config) ->
  893:     refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"),
  894:     %% Connects to europe_node1
  895:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  896:                        {port, service_port()}, {component, <<"test_service">>}],
  897: 
  898:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  899: 
  900:     escalus:fresh_story(
  901:       Config, [{eve, 1}],
  902:       fun(Eve) ->
  903:               %% Eve is connected to asia_node
  904:               EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]),
  905:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  906:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]),
  907: 
  908:               ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]),
  909:               wait_for_node(europe_node1, EveJid),
  910:               wait_for_node(europe_node2, EveJid),
  911: 
  912:               %% Component is connected to europe_node1
  913:               %% but we force asia_node to connect to europe_node2 by hiding europe_node1
  914:               %% and forcing rebalance (effectively disabling connections to europe_node1)
  915:               %% to verify routing cache update on both nodes
  916: 
  917:               %% TODO: Should prevent Redis refresher from executing for a moment,
  918:               %%       as it may collide with this test.
  919: 
  920:               hide_node(europe_node1, Config),
  921:               {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  922:               trigger_rebalance(asia_node, EuropeHost),
  923: 
  924:               escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)),
  925:               escalus:wait_for_stanza(Comp),
  926: 
  927:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  928:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid])
  929:       end).
  930: 
  931: %% -------------------------------- Rebalancing --------------------------------
  932: 
  933: enable_new_endpoint_on_refresh(Config) ->
  934:     get_connection(europe_node1, <<"reg1">>),
  935: 
  936:     {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  937: 
  938:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  939:     NewEndpoint = resolved_endpoint(ExtraPort),
  940:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  941: 
  942:     {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  943: 
  944:     %% One new pool and one new endpoint
  945:     [NewEndpoint] = Pools2 -- Pools1,
  946:     [] = Pools1 -- Pools2,
  947:     [NewEndpoint] = Enabled2 -- Enabled1,
  948:     [] = Enabled1 -- Enabled2.
  949: 
  950: disable_endpoint_on_refresh(Config) ->
  951:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  952:     NewEndpoint = resolved_endpoint(ExtraPort),
  953:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  954: 
  955:     get_connection(europe_node1, <<"reg1">>),
  956: 
  957:     {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  958:     [_, _] = Enabled1,
  959:     [] = Disabled1,
  960: 
  961:     hide_extra_endpoint(asia_node),
  962:     trigger_rebalance(europe_node1, <<"reg1">>),
  963: 
  964:     {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  965: 
  966:     %% 2 pools open even after disable
  967:     [] = Pools1 -- Pools2,
  968:     [] = Pools2 -- Pools1,
  969:     %% NewEndpoint is no longer enabled
  970:     [] = Enabled2 -- Enabled1,
  971:     [NewEndpoint] = Enabled1 -- Enabled2,
  972:     %% NewEndpoint is now disabled
  973:     [] = Disabled1,
  974:     [NewEndpoint] = Disabled2.
  975: 
  976: wait_for_connection(_Config) ->
  977:     set_endpoints(asia_node, []),
  978:     %% Because of hosts refresher, a pool of connections to asia_node
  979:     %% may already be present here
  980:     mongoose_helper:wait_until(
  981:                                 fun () ->
  982:                                     try trigger_rebalance(europe_node1, <<"reg1">>), true
  983:                                     catch _:_ -> false end
  984:                                 end,
  985:                                 true,
  986:                                 #{name => rebalance, time_left => timer:seconds(5)}),
  987: 
  988:     spawn_connection_getter(europe_node1),
  989: 
  990:     receive
  991:         Unexpected1 -> error({unexpected, Unexpected1})
  992:     after
  993:         2000 -> ok
  994:     end,
  995: 
  996:     refresh_mappings(asia_node, "by_wait_for_connection"),
  997:     trigger_rebalance(europe_node1, <<"reg1">>),
  998: 
  999:     receive
 1000:         Conn when is_pid(Conn) -> ok;
 1001:         Unexpected2 -> error({unexpected, Unexpected2})
 1002:     after
 1003:         5000 -> error(timeout)
 1004:     end.
 1005: 
 1006: closed_connection_is_removed_from_disabled(_Config) ->
 1007:     get_connection(europe_node1, <<"reg1">>),
 1008:     set_endpoints(asia_node, []),
 1009:     trigger_rebalance(europe_node1, <<"reg1">>),
 1010: 
 1011:     {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>),
 1012: 
 1013:     % Will drop connections and prevent them from reconnecting
 1014:     restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]),
 1015: 
 1016:     mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end,
 1017:                                {[], [], []},
 1018:                               #{name => get_outgoing_connections}).
 1019: 
 1020: 
 1021: %%--------------------------------------------------------------------
 1022: %% Test helpers
 1023: %%--------------------------------------------------------------------
 1024: 
 1025: get_port(Host, Param) ->
 1026:     case ct:get_config({hosts, Host, Param}) of
 1027:         Port when is_integer(Port) ->
 1028:             Port;
 1029:         Other ->
 1030:             ct:fail({get_port_failed, Host, Param, Other})
 1031:     end.
 1032: 
 1033: get_hosts() ->
 1034:     [
 1035:      {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)},
 1036:      {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)},
 1037:      {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)}
 1038:     ].
 1039: 
 1040: listen_endpoint(NodeName) ->
 1041:     endpoint(listen_port(NodeName)).
 1042: 
 1043: listen_port(NodeName) ->
 1044:     {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()),
 1045:     Port.
 1046: 
 1047: resolved_endpoint(Port) when is_integer(Port) ->
 1048:     {{127, 0, 0, 1}, Port}.
 1049: 
 1050: endpoint(Port) when is_integer(Port) ->
 1051:     {"127.0.0.1", Port}.
 1052: 
 1053: %% For dynamic_modules
 1054: node_spec(NodeName) ->
 1055:     #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}.
 1056: 
 1057: rpc(NodeName, M, F, A) ->
 1058:     Node = ct:get_config(NodeName),
 1059:     mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)).
 1060: 
 1061: hide_node(NodeName, Config) ->
 1062:     NodesKey = ?config(nodes_key, Config),
 1063:     NodeBin = atom_to_binary(ct:get_config(NodeName), latin1),
 1064:     {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]).
 1065: 
 1066: connect_from_spec(UserSpec, Config) ->
 1067:     {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>),
 1068:     escalus_story:send_initial_presence(User),
 1069:     escalus:assert(is_presence, escalus_client:wait_for_stanza(User)),
 1070:     User.
 1071: 
 1072: chat_with_seqnum(To, Text) ->
 1073:     escalus_stanza:set_id(escalus_stanza:chat_to(To, Text),
 1074:                           integer_to_binary(erlang:monotonic_time())).
 1075: 
 1076: order_by_seqnum(Stanzas) ->
 1077:     lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end,
 1078:                Stanzas).
 1079: 
 1080: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) ->
 1081:     Pred = fun(Item) ->
 1082:                    exml_query:attr(Item, <<"jid">>) =:= Service
 1083:            end,
 1084:     case lists:filter(Pred, Services) of
 1085:         [_] -> true;
 1086:         _ -> false
 1087:     end.
 1088: 
 1089: send_steps(From, To, Max, ToHost) ->
 1090:     next_send_step(From, To, 1, Max, Max div 10, true, ToHost).
 1091: 
 1092: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok;
 1093: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) ->
 1094:     ct:log("Reset: I: ~B", [I]),
 1095:     case KnowsMapping of
 1096:         true -> delete_mapping(europe_node1, To);
 1097:         false -> set_mapping(europe_node1, To, ToHost)
 1098:     end,
 1099:     next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost);
 1100: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) ->
 1101:     ct:log("I: ~B ~B ~B", [I, Max, ToReset]),
 1102:     Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)),
 1103:     escalus_client:send(From, Stanza),
 1104:     next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost).
 1105: 
 1106: get_mapping(Node, Client) ->
 1107:     {FullJid, _BareJid} = jids(Client),
 1108:     {ok, What} = redis_query(Node, [<<"GET">>, FullJid]),
 1109:     What.
 1110: 
 1111: %% Warning! May not work properly with alice or any other user whose
 1112: %% stringprepped JID is different than original one
 1113: delete_mapping(Node, Client) ->
 1114:     {FullJid, BareJid} = jids(Client),
 1115:     redis_query(Node, [<<"DEL">>, FullJid, BareJid]),
 1116:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1117:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1118: 
 1119: set_mapping(Node, Client, Mapping) ->
 1120:     {FullJid, BareJid} = jids(Client),
 1121:     redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]),
 1122:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1123:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1124: 
 1125: jids(Client) ->
 1126:     FullJid = escalus_client:full_jid(Client),
 1127:     BareJid = escalus_client:short_jid(Client),
 1128:     {FullJid, BareJid}.
 1129: 
 1130: redis_query(Node, Query) ->
 1131:     {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]),
 1132:     rpc(Node, eredis, q, [RedisWorker, Query]).
 1133: 
 1134: %% A fake address we don't try to connect to.
 1135: %% Used in test_advertised_endpoints_override_endpoints testcase.
 1136: advertised_endpoints() ->
 1137:     [
 1138:      {fake_domain(), get_port(reg, gd_endpoint_port)}
 1139:     ].
 1140: 
 1141: fake_domain() ->
 1142:     "somefakedomain.com".
 1143: 
 1144: iptuples_to_string([]) ->
 1145:     [];
 1146: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) ->
 1147:     [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)];
 1148: iptuples_to_string([E | Endps]) ->
 1149:     [E | iptuples_to_string(Endps)].
 1150: 
 1151: endpoint_opts(NodeName, ReceiverPort, Config) ->
 1152:     Endpoints = [endpoint(ReceiverPort)],
 1153:     AdvertisedEndpoints =
 1154:         proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints),
 1155:     #{endpoints => Endpoints,
 1156:       resolved_endpoints => [resolved_endpoint(ReceiverPort)],
 1157:       advertised_endpoints => AdvertisedEndpoints}.
 1158: 
 1159: mock_inet_on_each_node() ->
 1160:     Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()),
 1161:     Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes),
 1162:     true = lists:all(fun(Result) -> Result =:= ok end, Results).
 1163: 
 1164: execute_on_each_node(M, F, A) ->
 1165:     lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()).
 1166: 
 1167: mock_inet() ->
 1168:     %% We don't want to mock inet module itself to avoid strange networking issues
 1169:     meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]),
 1170:     meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]};
 1171:                                                        (_, inet6) -> {error, "No ipv6 address"} end).
 1172: 
 1173: unmock_inet(_Pids) ->
 1174:     execute_on_each_node(meck, unload, [mod_global_distrib_utils]).
 1175: 
 1176: out_connection_sups(Node) ->
 1177:     Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]),
 1178:     lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children).
 1179: 
 1180: trees_for_connections_present() ->
 1181:     AsiaChildren = out_connection_sups(asia_node),
 1182:     Europe1Children = out_connection_sups(europe_node1),
 1183:     Europe2Children = out_connection_sups(europe_node2),
 1184:     lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]).
 1185: 
 1186: tree_for_sup_present(Node, ExpectedSup) ->
 1187:     Children = out_connection_sups(Node),
 1188:     lists:keyfind(ExpectedSup, 1, Children) =/= false.
 1189: 
 1190: 
 1191: %% ------------------------------- rebalancing helpers -----------------------------------
 1192: 
 1193: spawn_connection_getter(SenderNode) ->
 1194:     TestPid = self(),
 1195:     spawn(fun() ->
 1196:                   Conn = get_connection(SenderNode, <<"reg1">>),
 1197:                   TestPid ! Conn
 1198:           end).
 1199: 
 1200: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) ->
 1201:     restart_receiver(ListenNode, [Port, listen_port(ListenNode)]),
 1202:     set_endpoints(ListenNode, [Port, listen_port(ListenNode)]),
 1203:     trigger_rebalance(SenderNode, <<"reg1">>).
 1204: 
 1205: get_connection(SenderNode, ToDomain) ->
 1206:     rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]).
 1207: 
 1208: hide_extra_endpoint(ListenNode) ->
 1209:     set_endpoints(ListenNode, [listen_port(ListenNode)]).
 1210: 
 1211: set_endpoints(ListenNode, Ports) ->
 1212:     Endpoints = [endpoint(Port) || Port <- Ports],
 1213:     {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]).
 1214: 
 1215: get_outgoing_connections(NodeName, DestinationDomain) ->
 1216:     Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]),
 1217:     Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]),
 1218:     Enabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1219:                   get_enabled_endpoints, [DestinationDomain]),
 1220:     Disabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1221:                    get_disabled_endpoints, [DestinationDomain]),
 1222:     PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]),
 1223:     Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ],
 1224:     {Enabled, Disabled, Pools}.
 1225: 
 1226: restart_receiver(NodeName) ->
 1227:     restart_receiver(NodeName, [listen_port(NodeName)]).
 1228: 
 1229: restart_receiver(NodeName, NewPorts) ->
 1230:     OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts,
 1231:                                                   [<<"localhost">>, mod_global_distrib_receiver]),
 1232:     NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts],
 1233:                                resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]},
 1234:     NewOpts = OldOpts#{connections := NewConnOpts},
 1235:     Node = node_spec(NodeName),
 1236:     dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts).
 1237: 
 1238: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) ->
 1239:     %% To ensure that the manager exists,
 1240:     %% otherwise we can get noproc error in the force_refresh call
 1241:     ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup,
 1242:              ensure_server_started, [DestinationDomain]),
 1243:     rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]),
 1244:     StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]),
 1245:     ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]),
 1246:     timer:sleep(1000).
 1247: 
 1248: %% -----------------------------------------------------------------------
 1249: %% Escalus-related helpers
 1250: 
 1251: %% Receive messages with Bodies in any order, skipping presences from stream resumption
 1252: user_receives(User, Bodies) ->
 1253:     Opts = #{pred => fun(Stanza) -> not escalus_pred:is_presence(Stanza) end},
 1254:     Checks = [fun(Stanza) -> escalus_pred:is_chat_message(Body, Stanza) end || Body <- Bodies],
 1255:     escalus:assert_many(Checks, [escalus_connection:receive_stanza(User, Opts) || _ <- Bodies]).
 1256: 
 1257: %% -----------------------------------------------------------------------
 1258: %% Refreshing helpers
 1259: 
 1260: %% Reason is a string
 1261: %% NodeName is asia_node, europe_node2, ... in a format used by this suite.
 1262: refresh_mappings(NodeName, Reason) when is_list(Reason) ->
 1263:     rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]).
 1264: 
 1265: refresh_hosts(NodeNames, Reason) ->
 1266:    [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames].
 1267: 
 1268: 
 1269: %% -----------------------------------------------------------------------
 1270: %% Other helpers
 1271: 
 1272: connect_steps_with_sm() ->
 1273:     [start_stream, stream_features, maybe_use_ssl,
 1274:      authenticate, bind, session, stream_resumption].
 1275: 
 1276: bare_client(Client) ->
 1277:     Client#client{jid = escalus_utils:get_short_jid(Client)}.
 1278: 
 1279: service_port() ->
 1280:     ct:get_config({hosts, mim, service_port}).
 1281: 
 1282: 
 1283: %% -----------------------------------------------------------------------
 1284: %% Waiting helpers
 1285: 
 1286: wait_for_domain(Node, Domain) ->
 1287:     F = fun() ->
 1288:                 Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
 1289:                 lists:member(Domain, Domains)
 1290:         end,
 1291:     mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}).
 1292: 
 1293: 
 1294: %% -----------------------------------------------------------------------
 1295: %% Ensure, that endpoints are up
 1296: 
 1297: wait_for_listeners_to_appear() ->
 1298:     [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())].
 1299: 
 1300: receiver_ports(Hosts) ->
 1301:     lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts).
 1302: 
 1303: wait_for_can_connect_to_port(Port) ->
 1304:     Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}},
 1305:     mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts).
 1306: 
 1307: can_connect_to_port(Port) ->
 1308:     case gen_tcp:connect("127.0.0.1", Port, []) of
 1309:         {ok, Sock} ->
 1310:             gen_tcp:close(Sock),
 1311:             true;
 1312:         Other ->
 1313:             ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]),
 1314:             false
 1315:     end.
 1316: 
 1317: %% -----------------------------------------------------------------------
 1318: %% Custom log levels for GD modules during the tests
 1319: 
 1320: enable_logging() ->
 1321:     mim_loglevel:enable_logging(test_hosts(), custom_loglevels()).
 1322: 
 1323: disable_logging() ->
 1324:     mim_loglevel:disable_logging(test_hosts(), custom_loglevels()).
 1325: 
 1326: custom_loglevels() ->
 1327:     %% for "s2s connection to muc.localhost not found" debugging
 1328:     [{ejabberd_s2s, debug},
 1329:     %% for debugging event=refreshing_own_data_done
 1330:      {mod_global_distrib_mapping_redis, info},
 1331:     %% to know if connection is already started or would be started
 1332:     %% event=outgoing_conn_start_progress
 1333:      {mod_global_distrib_outgoing_conns_sup, info},
 1334:     %% to debug bound connection issues
 1335:      {mod_global_distrib, debug},
 1336:     %% to know all new connections pids
 1337:      {mod_global_distrib_connection, debug},
 1338:     %% to check if gc or refresh is triggered
 1339:      {mod_global_distrib_server_mgr, info},
 1340:    %% To debug incoming connections
 1341: %    {mod_global_distrib_receiver, info},
 1342:    %% to debug global session set/delete
 1343:      {mod_global_distrib_mapping, debug}
 1344:     ].
 1345: 
 1346: test_hosts() -> [mim, mim2, reg].