1: %%==============================================================================
    2: %% Copyright 2017 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mod_global_distrib_SUITE).
   18: 
   19: -compile([export_all, nowarn_export_all]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("common_test/include/ct.hrl").
   23: -include_lib("eunit/include/eunit.hrl").
   24: -include_lib("escalus/include/escalus_xmlns.hrl").
   25: -include_lib("exml/include/exml.hrl").
   26: 
   27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms
   28: 
   29: -import(domain_helper, [domain/0]).
   30: -import(config_parser_helper, [config/2, mod_config/2]).
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Suite configuration
   34: %%--------------------------------------------------------------------
   35: 
   36: all() ->
   37:     [
   38:      {group, mod_global_distrib},
   39:      {group, cluster_restart},
   40:      {group, start_checks},
   41:      {group, invalidation},
   42:      {group, multi_connection},
   43:      {group, rebalancing},
   44:      {group, advertised_endpoints},
   45:      {group, hosts_refresher}
   46:     ].
   47: 
   48: groups() ->
   49:     [{mod_global_distrib, [],
   50:           [
   51:            test_pm_between_users_at_different_locations,
   52:            test_pm_between_users_before_available_presence,
   53:            test_component_disconnect ,
   54:            test_component_on_one_host,
   55:            test_components_in_different_regions,
   56:            test_hidden_component_disco_in_different_region,
   57:            test_pm_with_disconnection_on_other_server,
   58:            test_pm_with_graceful_reconnection_to_different_server,
   59:            test_pm_with_ungraceful_reconnection_to_different_server,
   60:            test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first,
   61:            test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first,
   62:            test_component_unregister,
   63:            test_update_senders_host,
   64:            test_update_senders_host_by_ejd_service,
   65: 
   66:            %% with node 2 disabled
   67:            test_muc_conversation_on_one_host,
   68:            test_global_disco
   69:           ]},
   70:          {hosts_refresher, [],
   71:           [test_host_refreshing]},
   72:          {cluster_restart, [],
   73:           [
   74:            test_location_disconnect
   75:           ]},
   76:          {start_checks, [],
   77:           [
   78:            test_error_on_wrong_hosts
   79:           ]},
   80:          {invalidation, [],
   81:           [
   82:            % TODO: Add checks for other mapping refreshes
   83:            refresh_nodes
   84:           ]},
   85:          {multi_connection, [],
   86:           [
   87:            test_in_order_messages_on_multiple_connections,
   88:            test_in_order_messages_on_multiple_connections_with_bounce,
   89:            test_messages_bounced_in_order,
   90: 
   91:            %% with node 2 disabled
   92:            test_muc_conversation_history
   93:           ]},
   94:          {rebalancing, [],
   95:           [
   96:            enable_new_endpoint_on_refresh,
   97:            disable_endpoint_on_refresh,
   98:            wait_for_connection,
   99:            closed_connection_is_removed_from_disabled
  100:           ]},
  101:          {advertised_endpoints, [],
  102:           [
  103:            test_advertised_endpoints_override_endpoints,
  104:            test_pm_between_users_at_different_locations
  105:           ]}
  106:     ].
  107: 
  108: suite() ->
  109:     [{require, europe_node1, {hosts, mim, node}},
  110:      {require, europe_node2, {hosts, mim2, node}},
  111:      {require, asia_node, {hosts, reg, node}},
  112:      {require, c2s_port, {hosts, mim, c2s_port}} |
  113:      escalus:suite()].
  114: 
  115: %%--------------------------------------------------------------------
  116: %% Init & teardown
  117: %%--------------------------------------------------------------------
  118: 
  119: init_per_suite(Config) ->
  120:     case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]),
  121:           rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of
  122:         {{ok, _}, {ok, _}} ->
  123:             ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]),
  124: 
  125:             enable_logging(),
  126:             escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]);
  127:         Result ->
  128:             ct:pal("Redis check result: ~p", [Result]),
  129:             {skip, "GD Redis default pool not available"}
  130:     end.
  131: 
  132: end_per_suite(Config) ->
  133:     disable_logging(),
  134:     escalus_fresh:clean(),
  135:     rpc(europe_node2, mongoose_cluster, leave, []),
  136:     escalus:end_per_suite(Config).
  137: 
  138: init_per_group(start_checks, Config) ->
  139:     NodeName = europe_node1,
  140:     NodeSpec = node_spec(NodeName),
  141:     Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config),
  142:     dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]),
  143:     Config1;
  144: init_per_group(multi_connection, Config) ->
  145:     ExtraConfig = #{bounce => #{resend_after_ms => 20000},
  146:                     connections => #{%% Disable unused feature to avoid interference
  147:                                      connections_per_endpoint => 100,
  148:                                      disabled_gc_interval => 10000}},
  149:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  150: init_per_group(invalidation, Config) ->
  151:     Config1 = init_per_group(invalidation_generic, Config),
  152:     NodeBin = <<"fake_node@localhost">>,
  153:     [{node_to_expire, NodeBin} | Config1];
  154: init_per_group(rebalancing, Config) ->
  155:     %% We need to prevent automatic refreshes, because they may interfere with tests
  156:     %% and we need early disabled garbage collection to check its validity
  157:     ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600,
  158:                                      endpoint_refresh_interval_when_empty => 3600,
  159:                                      disabled_gc_interval => 1},
  160:                     redis => #{refresh_after => 3600}},
  161:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  162: init_per_group(advertised_endpoints, Config) ->
  163:     lists:foreach(fun({NodeName, _, _}) ->
  164:                           Node = ct:get_config(NodeName),
  165:                           mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload)
  166:                   end, get_hosts()),
  167:     mock_inet_on_each_node(),
  168:     init_per_group_generic(
  169:                [{add_advertised_endpoints,
  170:                  [{asia_node, advertised_endpoints()}]} | Config]);
  171: init_per_group(mod_global_distrib, Config) ->
  172:     %% Disable mod_global_distrib_mapping_redis refresher
  173:     ExtraConfig = #{redis => #{refresh_after => 3600}},
  174:     init_per_group_generic([{extra_config, ExtraConfig} | Config]);
  175: init_per_group(_, Config) ->
  176:     init_per_group_generic(Config).
  177: 
  178: init_per_group_generic(Config0) ->
  179:     Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()),
  180:     wait_for_listeners_to_appear(),
  181:     {SomeNode, _, _} = hd(get_hosts()),
  182:     NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []),
  183:     [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2].
  184: 
  185: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) ->
  186:     Extra0 = module_opts(?config(extra_config, Config0)),
  187:     EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0),
  188:     ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})),
  189:     Extra = Extra0#{local_host => LocalHost, connections => ConnExtra},
  190:     Opts = module_opts(Extra),
  191: 
  192:     VirtHosts = virtual_hosts(NodeName),
  193:     Node = node_spec(NodeName),
  194:     Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0),
  195: 
  196:     %% To reduce load when sending many messages
  197:     ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last,
  198:                      mod_stream_management],
  199:     [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts],
  200: 
  201:     SMBackend = ct_helper:get_internal_database(),
  202:     SMOpts = config_parser_helper:mod_config(mod_stream_management,
  203:                                              #{resume_timeout => 1, backend => SMBackend}),
  204:     dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts},
  205:                                                     {mod_stream_management, SMOpts}]),
  206:     Config1.
  207: 
  208: module_opts(ExtraOpts) ->
  209:     lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]).
  210: 
  211: set_opts(common, Opts) ->
  212:     maps:merge(#{global_host => <<"localhost">>,
  213:                  hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts);
  214: set_opts(defaults, Opts) ->
  215:     mod_config(mod_global_distrib, Opts);
  216: set_opts(connections, #{connections := ConnExtra} = Opts) ->
  217:     TLSOpts = config([modules, mod_global_distrib, connections, tls],
  218:                      #{certfile => "priv/ssl/fake_server.pem",
  219:                        cacertfile => "priv/ssl/ca/cacert.pem"}),
  220:     Opts#{connections := config([modules, mod_global_distrib, connections],
  221:                                 maps:merge(#{tls => TLSOpts}, ConnExtra))};
  222: set_opts(redis, #{redis := RedisExtra} = Opts) ->
  223:     Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)};
  224: set_opts(bounce, #{bounce := BounceExtra} = Opts) ->
  225:     Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)};
  226: set_opts(_, Opts) ->
  227:     Opts.
  228: 
  229: end_per_group(advertised_endpoints, Config) ->
  230:     Pids = ?config(meck_handlers, Config),
  231:     unmock_inet(Pids),
  232:     escalus_fresh:clean(),
  233:     end_per_group_generic(Config);
  234: end_per_group(start_checks, Config) ->
  235:     escalus_fresh:clean(),
  236:     Config;
  237: end_per_group(invalidation, Config) ->
  238:     redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config),
  239:                             ?config(node_to_expire, Config)]),
  240:     end_per_group_generic(Config);
  241: end_per_group(_, Config) ->
  242:     end_per_group_generic(Config).
  243: 
  244: end_per_group_generic(Config) ->
  245:     dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config).
  246: 
  247: init_per_testcase(CaseName, Config)
  248:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  249:        CaseName == test_muc_conversation_history ->
  250:     %% There is no helper to load MUC on node2
  251:     %% For now it's easier to hide node2
  252:     %% TODO: Do it right at some point!
  253:     hide_node(europe_node2, Config),
  254:     %% There would be no new connections to europe_node2, but there can be some old ones.
  255:     %% We need to disconnect previous connections.
  256:     {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  257:     trigger_rebalance(asia_node, EuropeHost),
  258:     %% Load muc on mim node
  259:     muc_helper:load_muc(),
  260:     RegNode = ct:get_config({hosts, reg, node}),
  261:     %% Wait for muc.localhost to become visible from reg node
  262:     wait_for_domain(RegNode, muc_helper:muc_host()),
  263:     escalus:init_per_testcase(CaseName, Config);
  264: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  265:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server;
  266:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  267:                                    CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  268:     escalus:init_per_testcase(CN, init_user_eve(Config));
  269: init_per_testcase(CaseName, Config) ->
  270:     escalus:init_per_testcase(CaseName, Config).
  271: 
  272: init_user_eve(Config) ->
  273:     %% Register Eve in reg cluster
  274:     EveSpec = escalus_fresh:create_fresh_user(Config, eve),
  275:     MimPort = ct:get_config({hosts, mim, c2s_port}),
  276:     EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}),
  277:     %% Register Eve in mim cluster
  278:     escalus:create_users(Config, [{eve, EveSpec2}]),
  279:     [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config].
  280: 
  281: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server;
  282:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server;
  283:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first;
  284:                                   CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first ->
  285:     MimEveSpec = ?config(evespec_mim, Config),
  286:     %% Clean Eve from reg cluster
  287:     escalus_fresh:clean(),
  288:     %% Clean Eve from mim cluster
  289:     %% For shared databases (i.e. mysql, pgsql...),
  290:     %% removing from one cluster would remove from all clusters.
  291:     %% For mnesia auth backend we need to call removal from each cluster.
  292:     %% That's why there is a catch here.
  293:     catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]),
  294:     generic_end_per_testcase(CN, Config);
  295: end_per_testcase(CaseName, Config)
  296:   when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco;
  297:        CaseName == test_muc_conversation_history ->
  298:     refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)),
  299:     muc_helper:unload_muc(),
  300:     generic_end_per_testcase(CaseName, Config);
  301: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) ->
  302:     refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  303:     generic_end_per_testcase(CN, Config);
  304: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh;
  305:                                   CN == disable_endpoint_on_refresh;
  306:                                   CN == wait_for_connection;
  307:                                   CN == closed_connection_is_removed_from_disabled ->
  308:     restart_receiver(asia_node),
  309:     refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)),
  310:     generic_end_per_testcase(CN, Config);
  311: end_per_testcase(CaseName, Config) ->
  312:     generic_end_per_testcase(CaseName, Config).
  313: 
  314: generic_end_per_testcase(CaseName, Config) ->
  315:     lists:foreach(
  316:       fun({NodeName, _, _}) ->
  317:               %% TODO: Enable refresher only for specific test cases,
  318:               %% as some of them are based on assumption that node(s)
  319:               %% must open new connections during tests.
  320:               pause_refresher(NodeName, CaseName),
  321:               Node = ct:get_config(NodeName),
  322:               SupRef = {mod_global_distrib_outgoing_conns_sup, Node},
  323:               try
  324:                   OutgoingConns = supervisor:which_children(SupRef),
  325:                   lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) ->
  326:                                         skip;
  327:                                     ({Id, _, _, _}) ->
  328:                                         supervisor:terminate_child(SupRef, Id)
  329:                                 end, OutgoingConns),
  330:                   [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] =
  331:                     supervisor:which_children(SupRef)
  332:               catch
  333:                   _:{noproc, _} ->
  334:                       ct:pal("Sender supervisor not found in ~p", [NodeName])
  335:               end,
  336:               unpause_refresher(NodeName, CaseName)
  337:       end,
  338:       get_hosts()),
  339:     escalus:end_per_testcase(CaseName, Config).
  340: 
  341: virtual_hosts(asia_node) ->
  342:     [domain()];
  343: virtual_hosts(_) ->
  344:     [domain(), secondary_domain()].
  345: 
  346: secondary_domain() ->
  347:     ct:get_config({hosts, mim, secondary_domain}).
  348: 
  349: %% Refresher is not started at all or stopped for some test cases
  350: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  351: pause_refresher(_, test_error_on_wrong_hosts) ->
  352:     ok;
  353: pause_refresher(asia_node, test_location_disconnect) ->
  354:     ok;
  355: pause_refresher(NodeName, _) ->
  356:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []).
  357: 
  358: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok.
  359: unpause_refresher(_, test_error_on_wrong_hosts) ->
  360:     ok;
  361: unpause_refresher(asia_node, test_location_disconnect) ->
  362:     ok;
  363: unpause_refresher(NodeName, _) ->
  364:     ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []).
  365: 
  366: %%--------------------------------------------------------------------
  367: %% Service discovery test
  368: %%--------------------------------------------------------------------
  369: 
  370: %% Requires module mod_global_distrib to be started with argument advertised_endpoints
  371: %% for each host in get_hosts().
  372: %% Reads Redis to confirm that endpoints (in Redis) are overwritten
  373: %% with `advertised_endpoints` option value
  374: test_advertised_endpoints_override_endpoints(_Config) ->
  375:     Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]),
  376:     true = lists:all(
  377:              fun(E) ->
  378:                      lists:sort(E) =:= lists:sort(advertised_endpoints())
  379:              end, Endps).
  380: 
  381: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if
  382: %% it goes down for some reason (crash or domain unavailability).
  383: %% Also actually verifies that refresher properly reads host list
  384: %% from backend and starts appropriate pool.
  385: test_host_refreshing(_Config) ->
  386:     mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true,
  387:                                #{name => trees_for_connections_present,
  388:                                  time_left => timer:seconds(10)}),
  389:     ConnectionSups = out_connection_sups(asia_node),
  390:     {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  391:     EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]),
  392:     {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups),
  393:     erlang:exit(EuropePid, kill), % it's ok to kill temporary process
  394:     mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true,
  395:                                #{name => tree_for_sup_present}).
  396: 
  397: %% When run in mod_global_distrib group - tests simple case of connection
  398: %% between two users connected to different clusters.
  399: %% When run in advertised_endpoints group it tests whether it is possible
  400: %% to connect to a node that is advertising itself with a domain name.
  401: test_pm_between_users_at_different_locations(Config) ->
  402:     escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2).
  403: 
  404: test_pm_between_users_before_available_presence(Config) ->
  405:     Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]),
  406:     {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>),
  407:     {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>),
  408: 
  409:     test_two_way_pm(Alice, Eve),
  410: 
  411:     escalus_client:stop(Config1, Alice),
  412:     escalus_client:stop(Config1, Eve).
  413: 
  414: test_two_way_pm(Alice, Eve) ->
  415:     escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)),
  416:     escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)),
  417: 
  418:     FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)),
  419:     FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)),
  420: 
  421:     AliceJid = escalus_client:full_jid(Alice),
  422:     EveJid = escalus_client:full_jid(Eve),
  423: 
  424:     escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>],
  425:                    FromAlice),
  426:     escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>],
  427:                    FromEve).
  428: 
  429: test_muc_conversation_on_one_host(Config0) ->
  430:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  431:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  432:     escalus:fresh_story(
  433:       Config, [{eve, 1}],
  434:       fun(Eve) ->
  435:               Alice = connect_from_spec(AliceSpec, Config),
  436: 
  437:               RoomJid = ?config(room, Config),
  438:               AliceUsername = escalus_utils:get_username(Alice),
  439:               EveUsername = escalus_utils:get_username(Eve),
  440:               RoomAddr = muc_helper:room_address(RoomJid),
  441: 
  442:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  443:               wait_for_muc_presence(Alice, RoomJid, AliceUsername),
  444:               wait_for_subject(Alice),
  445: 
  446:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  447:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  448:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  449:               wait_for_muc_presence(Alice, RoomJid, EveUsername),
  450:               wait_for_subject(Eve),
  451: 
  452:               Msg= <<"Hi, Eve!">>,
  453:               escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)),
  454:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)),
  455:               escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)),
  456: 
  457:               Msg2= <<"Hi, Alice!">>,
  458:               escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)),
  459:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)),
  460:               escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice))
  461:       end),
  462:     muc_helper:destroy_room(Config).
  463: 
  464: test_muc_conversation_history(Config0) ->
  465:     AliceSpec = escalus_fresh:create_fresh_user(Config0, alice),
  466:     Config = muc_helper:given_fresh_room(Config0, AliceSpec, []),
  467:     escalus:fresh_story(
  468:       Config, [{eve, 1}],
  469:       fun(Eve) ->
  470:               Alice = connect_from_spec(AliceSpec, Config),
  471: 
  472:               RoomJid = ?config(room, Config),
  473:               AliceUsername = escalus_utils:get_username(Alice),
  474:               RoomAddr = muc_helper:room_address(RoomJid),
  475: 
  476:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)),
  477:               wait_for_muc_presence(Alice, RoomJid, AliceUsername),
  478:               wait_for_subject(Alice),
  479: 
  480:               send_n_muc_messages(Alice, RoomAddr, 3),
  481: 
  482:               %% Ensure that the messages are received by the room
  483:               %% before trying to login Eve.
  484:               %% Otherwise, Eve would receive some messages from history and
  485:               %% some as regular groupchat messages.
  486:               receive_n_muc_messages(Alice, 3),
  487: 
  488:               EveUsername = escalus_utils:get_username(Eve),
  489:               escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)),
  490: 
  491:               wait_for_muc_presence(Eve, RoomJid, AliceUsername),
  492:               wait_for_muc_presence(Eve, RoomJid, EveUsername),
  493:               wait_for_muc_presence(Alice, RoomJid, EveUsername),
  494: 
  495:               %% XEP-0045: After sending the presence broadcast (and only after doing so),
  496:               %% the service MAY then send discussion history, the room subject,
  497:               %% live messages, presence updates, and other in-room traffic.
  498:               receive_n_muc_messages(Eve, 3),
  499:               wait_for_subject(Eve)
  500:       end),
  501:     muc_helper:destroy_room(Config).
  502: 
  503: wait_for_muc_presence(User, RoomJid, FromNickname) ->
  504:     Presence = escalus:wait_for_stanza(User),
  505:     escalus:assert(is_presence, Presence),
  506:     escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence),
  507:     ok.
  508: 
  509: wait_for_subject(User) ->
  510:     Subject = escalus:wait_for_stanza(User),
  511:     escalus:assert(is_groupchat_message, Subject),
  512:     ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)),
  513:     ok.
  514: 
  515: send_n_muc_messages(User, RoomAddr, N) ->
  516:     lists:foreach(fun(I) ->
  517:                           Msg = <<"test-", (integer_to_binary(I))/binary>>,
  518:                           escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg))
  519:                   end, lists:seq(1, N)).
  520: 
  521: receive_n_muc_messages(User, N) ->
  522:     lists:foreach(fun(J) ->
  523:                           Msg = <<"test-", (integer_to_binary(J))/binary>>,
  524:                           Stanza = escalus:wait_for_stanza(User),
  525:                           escalus:assert(is_groupchat_message, [Msg], Stanza)
  526:                             end, lists:seq(1, N)).
  527: 
  528: test_component_on_one_host(Config) ->
  529:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  530:                        {port, service_port()}, {component, <<"test_service">>}],
  531: 
  532:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  533: 
  534:     Story = fun(User) ->
  535:                     Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>),
  536:                     escalus:send(User, Msg1),
  537:                     %% Then component receives it
  538:                     Reply1 = escalus:wait_for_stanza(Comp),
  539:                     escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1),
  540: 
  541:                     %% When components sends a reply
  542:                     Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>),
  543:                     escalus:send(Comp, escalus_stanza:from(Msg2, Addr)),
  544: 
  545:                     %% Then Alice receives it
  546:                     Reply2 = escalus:wait_for_stanza(User),
  547:                     escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2),
  548:                     escalus:assert(is_stanza_from, [Addr], Reply2)
  549:             end,
  550: 
  551:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  552: 
  553: %% Ensures that 2 components in distinct data centers can communicate.
  554: test_components_in_different_regions(_Config) ->
  555:     ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>},
  556:                              {server, <<"localhost">>}, {component, <<"test_service">>}],
  557:     Comp1Port = ct:get_config({hosts, mim, service_port}),
  558:     Comp2Port = ct:get_config({hosts, reg, service_port}),
  559:     Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig],
  560:     Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig],
  561: 
  562:     {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config),
  563:     {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config),
  564: 
  565:     Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1),
  566:     escalus:send(Comp1, Msg1),
  567:     GotMsg1 = escalus:wait_for_stanza(Comp2),
  568:     escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1),
  569: 
  570:     Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2),
  571:     escalus:send(Comp2, Msg2),
  572:     GotMsg2 = escalus:wait_for_stanza(Comp1),
  573:     escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2).
  574: 
  575: %% Ordinary user is not able to discover the hidden component from GD
  576: test_hidden_component_disco_in_different_region(Config) ->
  577:     %% Hidden component from component_SUITE connects to mim1/europe_node1
  578:     HiddenComponentConfig = component_helper:spec(hidden_component, Config),
  579:     {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig),
  580: 
  581:     escalus:fresh_story(
  582:       Config, [{eve, 1}],
  583:       fun(Eve) ->
  584:               EveServer = escalus_client:server(Eve),
  585:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  586:               DiscoReply = escalus:wait_for_stanza(Eve),
  587:               escalus:assert(is_iq_result, DiscoReply),
  588:               escalus:assert(fun(Stanza) ->
  589:                                      not escalus_pred:has_service(HiddenAddr, Stanza)
  590:                              end, DiscoReply)
  591:       end).
  592: 
  593: test_component_disconnect(Config) ->
  594:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  595:                        {port, service_port()}, {component, <<"test_service">>}],
  596: 
  597:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  598:     component_helper:disconnect_component(Comp, Addr),
  599: 
  600:     Story = fun(User) ->
  601:                     escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)),
  602:                     Error = escalus:wait_for_stanza(User, 5000),
  603:                     escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  604:             end,
  605: 
  606:     [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]].
  607: 
  608: test_location_disconnect(Config) ->
  609:     try
  610:         escalus:fresh_story(
  611:           Config, [{alice, 1}, {eve, 1}],
  612:           fun(Alice, Eve) ->
  613:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  614: 
  615:                   escalus_client:wait_for_stanza(Eve),
  616: 
  617:                   ok = rpc(asia_node, application, stop, [mongooseim]),
  618:                   %% TODO: Stopping mongooseim alone should probably stop connections too
  619:                   ok = rpc(asia_node, application, stop, [ranch]),
  620: 
  621:                   escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)),
  622:                   Error = escalus:wait_for_stanza(Alice),
  623:                   escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error)
  624:           end)
  625:     after
  626:         rpc(asia_node, application, start, [ranch]),
  627:         rpc(asia_node, application, start, [mongooseim])
  628:     end.
  629: 
  630: test_pm_with_disconnection_on_other_server(Config) ->
  631:     escalus:fresh_story(
  632:       Config, [{alice, 1}, {eve, 1}],
  633:       fun(Alice, Eve) ->
  634:               escalus_connection:stop(Eve),
  635:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)),
  636:               FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000),
  637:               escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce)
  638:       end).
  639: 
  640: test_pm_with_graceful_reconnection_to_different_server(Config) ->
  641:     EveSpec = ?config(evespec_reg, Config),
  642:     EveSpec2 = ?config(evespec_mim, Config),
  643:     escalus:fresh_story(
  644:       Config, [{alice, 1}],
  645:       fun(Alice) ->
  646:               Eve = connect_from_spec(EveSpec, Config),
  647: 
  648:               escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)),
  649: 
  650:               %% Stop connection and wait for process to die
  651:               EveNode = ct:get_config({hosts, reg, node}),
  652:               mongoose_helper:logout_user(Config, Eve, #{node => EveNode}),
  653: 
  654:               FromEve = escalus_client:wait_for_stanza(Alice),
  655: 
  656:               %% Pause Alice until Eve is reconnected
  657:               AliceNode = ct:get_config({hosts, mim, node}),
  658:               C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}),
  659:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  660: 
  661:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)),
  662: 
  663:               NewEve = connect_from_spec(EveSpec2, Config),
  664: 
  665:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  666: 
  667: 
  668:               escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)),
  669:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)),
  670: 
  671:               FirstFromAlice = escalus_client:wait_for_stanza(NewEve),
  672:               AgainFromEve = escalus_client:wait_for_stanza(Alice),
  673:               SecondFromAlice = escalus_client:wait_for_stanza(NewEve),
  674: 
  675:               [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]),
  676: 
  677:               escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice),
  678:               escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve),
  679:               escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice),
  680:               escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve)
  681:           end).
  682: 
  683: %% Refresh logic can cause two possible behaviours.
  684: %% We test both behaviours here (plus no refresh case)
  685: %% See PR #2392
  686: test_pm_with_ungraceful_reconnection_to_different_server(Config) ->
  687:     %% No refresh
  688:     BeforeResume = fun() -> ok end,
  689:     AfterCheck = fun(Alice, NewEve) ->
  690:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  691:             user_receives(Alice, [<<"Hi from Europe!">>])
  692:          end,
  693:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  694: 
  695: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) ->
  696:     %% Same as no refresh
  697:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first",
  698:     % Order of nodes is important here in refresh_hosts!
  699:     BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end,
  700:     AfterCheck = fun(Alice, NewEve) ->
  701:             user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]),
  702:             user_receives(Alice, [<<"Hi from Europe!">>])
  703:          end,
  704:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  705: 
  706: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) ->
  707:     %% Asia node overrides Europe value with the older ones,
  708:     %% so we loose some messages during rerouting :(
  709:     RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first",
  710:     BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end,
  711:     AfterCheck = fun(Alice, NewEve) ->
  712:             user_receives(NewEve, [<<"Hi again from Europe1!">>]),
  713:             user_receives(Alice, [<<"Hi from Europe!">>])
  714:          end,
  715:     do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck).
  716: 
  717: %% Reconnect Eve from asia (reg cluster) to europe (mim)
  718: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) ->
  719:     Config = escalus_users:update_userspec(Config0, eve, stream_management, true),
  720:     EveSpec = ?config(evespec_reg, Config),
  721:     EveSpec2 = ?config(evespec_mim, Config),
  722:     escalus:fresh_story(
  723:       Config, [{alice, 1}],
  724:       fun(Alice) ->
  725:               {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()),
  726:               escalus_story:send_initial_presence(Eve),
  727:               escalus_client:wait_for_stanza(Eve),
  728: 
  729:               %% Stop connection and wait for process to die
  730:               EveNode = ct:get_config({hosts, reg, node}),
  731:               C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}),
  732:               ok = rpc(asia_node, sys, suspend, [C2sPid]),
  733: 
  734:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)),
  735: 
  736:               %% Wait for route message to be queued in c2s message queue
  737:               mongoose_helper:wait_for_route_message_count(C2sPid, 1),
  738: 
  739:               %% Time to do bad nasty things with our socket, so once our process wakes up,
  740:               %% it SHOULD detect a dead socket
  741:               escalus_connection:kill(Eve),
  742: 
  743:               %% Connect another one, we hope the message would be rerouted
  744:               NewEve = connect_from_spec(EveSpec2, Config),
  745: 
  746:               BeforeResume(),
  747: 
  748:               %% Trigger rerouting
  749:               ok = rpc(asia_node, sys, resume, [C2sPid]),
  750: 
  751:               %% Let C2sPid to process the message and reroute (and die finally, poor little thing)
  752:               mongoose_helper:wait_for_pid_to_die(C2sPid),
  753: 
  754:               escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)),
  755:               escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)),
  756: 
  757:               AfterCheck(Alice, NewEve)
  758:           end).
  759: 
  760: test_global_disco(Config) ->
  761:     escalus:fresh_story(
  762:       Config, [{alice, 1}, {eve, 1}],
  763:       fun(Alice, Eve) ->
  764:               AliceServer = escalus_client:server(Alice),
  765:               escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)),
  766:               _AliceStanza = escalus:wait_for_stanza(Alice),
  767:               %% TODO: test for duplicate components
  768:               %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza),
  769: 
  770:               EveServer = escalus_client:server(Eve),
  771:               escalus:send(Eve, escalus_stanza:service_discovery(EveServer)),
  772:               EveStanza = escalus:wait_for_stanza(Eve),
  773:               escalus:assert(has_service, [muc_helper:muc_host()], EveStanza)
  774:       end).
  775: 
  776: test_component_unregister(_Config) ->
  777:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  778:                        {port, service_port()}, {component, <<"test_service">>}],
  779: 
  780:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  781:     ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  782:                               [<<"test_service.localhost">>])),
  783: 
  784:     component_helper:disconnect_component(Comp, Addr),
  785: 
  786:     ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain,
  787:                             [<<"test_service.localhost">>])).
  788: 
  789: test_error_on_wrong_hosts(_Config) ->
  790:     Opts = module_opts(#{local_host => <<"no_such_host">>}),
  791:     ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed,
  792:                                                  domain := <<"no_such_host">>}, _}}},
  793:                      dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>,
  794:                                                     [{mod_global_distrib, Opts}])).
  795: 
  796: refresh_nodes(Config) ->
  797:     NodesKey = ?config(nodes_key, Config),
  798:     NodeBin = ?config(node_to_expire, Config),
  799:     redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]),
  800:     refresh_mappings(europe_node1, "by_refresh_nodes"),
  801:     {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]).
  802: 
  803: test_in_order_messages_on_multiple_connections(Config) ->
  804:     escalus:fresh_story(
  805:       Config, [{alice, 1}, {eve, 1}],
  806:       fun(Alice, Eve) ->
  807:               Seq = lists:seq(1, 100),
  808:               lists:foreach(
  809:                 fun(I) ->
  810:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  811:                         escalus_client:send(Alice, Stanza)
  812:                 end,
  813:                 Seq),
  814:               lists:foreach(
  815:                 fun(I) ->
  816:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  817:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  818:                 end,
  819:                 Seq)
  820:       end).
  821: 
  822: test_in_order_messages_on_multiple_connections_with_bounce(Config) ->
  823:     escalus:fresh_story(
  824:       Config, [{alice, 1}, {eve, 1}],
  825:       fun(Alice, Eve) ->
  826:               %% Send 99 messages, some while server knows the mapping and some when it doesn't
  827:               send_steps(Alice, Eve, 99, <<"reg1">>),
  828:               %% Make sure that the last message is sent when the mapping is known
  829:               set_mapping(europe_node1, Eve, <<"reg1">>),
  830:               escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)),
  831: 
  832:               %% Check that all stanzas were received in order
  833:               lists:foreach(
  834:                 fun(I) ->
  835:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  836:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  837:                 end,
  838:                 lists:seq(1, 100))
  839:       end).
  840: 
  841: test_messages_bounced_in_order(Config) ->
  842:     escalus:fresh_story(
  843:       Config, [{alice, 1}, {eve, 1}],
  844:       fun(Alice, Eve) ->
  845:               %% Make sure all messages land in bounce storage
  846:               delete_mapping(europe_node1, Eve),
  847: 
  848:               Seq = lists:seq(1, 100),
  849:               lists:foreach(
  850:                 fun(I) ->
  851:                         Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)),
  852:                         escalus_client:send(Alice, Stanza)
  853:                 end,
  854:                 Seq),
  855: 
  856:               %% Restore the mapping so that bounce eventually succeeds
  857:               ?assertEqual(undefined, get_mapping(europe_node1, Eve)),
  858:               set_mapping(europe_node1, Eve, <<"reg1">>),
  859: 
  860:               lists:foreach(
  861:                 fun(I) ->
  862:                         Stanza = escalus_client:wait_for_stanza(Eve, 5000),
  863:                         escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza)
  864:                 end,
  865:                 Seq)
  866:       end).
  867: 
  868: test_update_senders_host(Config) ->
  869:     escalus:fresh_story(
  870:       Config, [{alice, 1}, {eve, 1}],
  871:       fun(Alice, Eve) ->
  872:               AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]),
  873:               {ok, <<"localhost.bis">>}
  874:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]),
  875:               ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]),
  876:               wait_for_node(asia_node, AliceJid),
  877: 
  878:               %% TODO: Should prevent Redis refresher from executing for a moment,
  879:               %%       as it may collide with this test.
  880: 
  881:               escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)),
  882:               escalus:wait_for_stanza(Eve),
  883: 
  884:               {ok, <<"localhost.bis">>}
  885:               = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid])
  886:       end).
  887: wait_for_node(Node,Jid) ->
  888:     mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end,
  889:                                error,
  890:                                #{time_left => timer:seconds(10),
  891:                                  sleep_time => timer:seconds(1),
  892:                                  name => rpc}).
  893: 
  894: test_update_senders_host_by_ejd_service(Config) ->
  895:     refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"),
  896:     %% Connects to europe_node1
  897:     ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>},
  898:                        {port, service_port()}, {component, <<"test_service">>}],
  899: 
  900:     {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig),
  901: 
  902:     escalus:fresh_story(
  903:       Config, [{eve, 1}],
  904:       fun(Eve) ->
  905:               %% Eve is connected to asia_node
  906:               EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]),
  907:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  908:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]),
  909: 
  910:               ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]),
  911:               wait_for_node(europe_node1, EveJid),
  912:               wait_for_node(europe_node2, EveJid),
  913: 
  914:               %% Component is connected to europe_node1
  915:               %% but we force asia_node to connect to europe_node2 by hiding europe_node1
  916:               %% and forcing rebalance (effectively disabling connections to europe_node1)
  917:               %% to verify routing cache update on both nodes
  918: 
  919:               %% TODO: Should prevent Redis refresher from executing for a moment,
  920:               %%       as it may collide with this test.
  921: 
  922:               hide_node(europe_node1, Config),
  923:               {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()),
  924:               trigger_rebalance(asia_node, EuropeHost),
  925: 
  926:               escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)),
  927:               escalus:wait_for_stanza(Comp),
  928: 
  929:               {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]),
  930:               {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid])
  931:       end).
  932: 
  933: %% -------------------------------- Rebalancing --------------------------------
  934: 
  935: enable_new_endpoint_on_refresh(Config) ->
  936:     get_connection(europe_node1, <<"reg1">>),
  937: 
  938:     {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  939: 
  940:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  941:     NewEndpoint = resolved_endpoint(ExtraPort),
  942:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  943: 
  944:     {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  945: 
  946:     %% One new pool and one new endpoint
  947:     [NewEndpoint] = Pools2 -- Pools1,
  948:     [] = Pools1 -- Pools2,
  949:     [NewEndpoint] = Enabled2 -- Enabled1,
  950:     [] = Enabled1 -- Enabled2.
  951: 
  952: disable_endpoint_on_refresh(Config) ->
  953:     ExtraPort = get_port(reg, gd_extra_endpoint_port),
  954:     NewEndpoint = resolved_endpoint(ExtraPort),
  955:     enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config),
  956: 
  957:     get_connection(europe_node1, <<"reg1">>),
  958: 
  959:     {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>),
  960:     [_, _] = Enabled1,
  961:     [] = Disabled1,
  962: 
  963:     hide_extra_endpoint(asia_node),
  964:     trigger_rebalance(europe_node1, <<"reg1">>),
  965: 
  966:     {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>),
  967: 
  968:     %% 2 pools open even after disable
  969:     [] = Pools1 -- Pools2,
  970:     [] = Pools2 -- Pools1,
  971:     %% NewEndpoint is no longer enabled
  972:     [] = Enabled2 -- Enabled1,
  973:     [NewEndpoint] = Enabled1 -- Enabled2,
  974:     %% NewEndpoint is now disabled
  975:     [] = Disabled1,
  976:     [NewEndpoint] = Disabled2.
  977: 
  978: wait_for_connection(_Config) ->
  979:     set_endpoints(asia_node, []),
  980:     %% Because of hosts refresher, a pool of connections to asia_node
  981:     %% may already be present here
  982:     mongoose_helper:wait_until(
  983:                                 fun () ->
  984:                                     try trigger_rebalance(europe_node1, <<"reg1">>), true
  985:                                     catch _:_ -> false end
  986:                                 end,
  987:                                 true,
  988:                                 #{name => rebalance, time_left => timer:seconds(5)}),
  989: 
  990:     spawn_connection_getter(europe_node1),
  991: 
  992:     receive
  993:         Unexpected1 -> error({unexpected, Unexpected1})
  994:     after
  995:         2000 -> ok
  996:     end,
  997: 
  998:     refresh_mappings(asia_node, "by_wait_for_connection"),
  999:     trigger_rebalance(europe_node1, <<"reg1">>),
 1000: 
 1001:     receive
 1002:         Conn when is_pid(Conn) -> ok;
 1003:         Unexpected2 -> error({unexpected, Unexpected2})
 1004:     after
 1005:         5000 -> error(timeout)
 1006:     end.
 1007: 
 1008: closed_connection_is_removed_from_disabled(_Config) ->
 1009:     get_connection(europe_node1, <<"reg1">>),
 1010:     set_endpoints(asia_node, []),
 1011:     trigger_rebalance(europe_node1, <<"reg1">>),
 1012: 
 1013:     {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>),
 1014: 
 1015:     % Will drop connections and prevent them from reconnecting
 1016:     restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]),
 1017: 
 1018:     mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end,
 1019:                                {[], [], []},
 1020:                               #{name => get_outgoing_connections}).
 1021: 
 1022: 
 1023: %%--------------------------------------------------------------------
 1024: %% Test helpers
 1025: %%--------------------------------------------------------------------
 1026: 
 1027: get_port(Host, Param) ->
 1028:     case ct:get_config({hosts, Host, Param}) of
 1029:         Port when is_integer(Port) ->
 1030:             Port;
 1031:         Other ->
 1032:             ct:fail({get_port_failed, Host, Param, Other})
 1033:     end.
 1034: 
 1035: get_hosts() ->
 1036:     [
 1037:      {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)},
 1038:      {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)},
 1039:      {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)}
 1040:     ].
 1041: 
 1042: listen_endpoint(NodeName) ->
 1043:     endpoint(listen_port(NodeName)).
 1044: 
 1045: listen_port(NodeName) ->
 1046:     {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()),
 1047:     Port.
 1048: 
 1049: resolved_endpoint(Port) when is_integer(Port) ->
 1050:     {{127, 0, 0, 1}, Port}.
 1051: 
 1052: endpoint(Port) when is_integer(Port) ->
 1053:     {"127.0.0.1", Port}.
 1054: 
 1055: %% For dynamic_modules
 1056: node_spec(NodeName) ->
 1057:     #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}.
 1058: 
 1059: rpc(NodeName, M, F, A) ->
 1060:     Node = ct:get_config(NodeName),
 1061:     mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)).
 1062: 
 1063: hide_node(NodeName, Config) ->
 1064:     NodesKey = ?config(nodes_key, Config),
 1065:     NodeBin = atom_to_binary(ct:get_config(NodeName), latin1),
 1066:     {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]).
 1067: 
 1068: connect_from_spec(UserSpec, Config) ->
 1069:     {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>),
 1070:     escalus_story:send_initial_presence(User),
 1071:     escalus:assert(is_presence, escalus_client:wait_for_stanza(User)),
 1072:     User.
 1073: 
 1074: chat_with_seqnum(To, Text) ->
 1075:     escalus_stanza:set_id(escalus_stanza:chat_to(To, Text),
 1076:                           integer_to_binary(erlang:monotonic_time())).
 1077: 
 1078: order_by_seqnum(Stanzas) ->
 1079:     lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end,
 1080:                Stanzas).
 1081: 
 1082: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) ->
 1083:     Pred = fun(Item) ->
 1084:                    exml_query:attr(Item, <<"jid">>) =:= Service
 1085:            end,
 1086:     case lists:filter(Pred, Services) of
 1087:         [_] -> true;
 1088:         _ -> false
 1089:     end.
 1090: 
 1091: send_steps(From, To, Max, ToHost) ->
 1092:     next_send_step(From, To, 1, Max, Max div 10, true, ToHost).
 1093: 
 1094: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok;
 1095: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) ->
 1096:     ct:log("Reset: I: ~B", [I]),
 1097:     case KnowsMapping of
 1098:         true -> delete_mapping(europe_node1, To);
 1099:         false -> set_mapping(europe_node1, To, ToHost)
 1100:     end,
 1101:     next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost);
 1102: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) ->
 1103:     ct:log("I: ~B ~B ~B", [I, Max, ToReset]),
 1104:     Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)),
 1105:     escalus_client:send(From, Stanza),
 1106:     next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost).
 1107: 
 1108: get_mapping(Node, Client) ->
 1109:     {FullJid, _BareJid} = jids(Client),
 1110:     {ok, What} = redis_query(Node, [<<"GET">>, FullJid]),
 1111:     What.
 1112: 
 1113: %% Warning! May not work properly with alice or any other user whose
 1114: %% stringprepped JID is different than original one
 1115: delete_mapping(Node, Client) ->
 1116:     {FullJid, BareJid} = jids(Client),
 1117:     redis_query(Node, [<<"DEL">>, FullJid, BareJid]),
 1118:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1119:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1120: 
 1121: set_mapping(Node, Client, Mapping) ->
 1122:     {FullJid, BareJid} = jids(Client),
 1123:     redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]),
 1124:     Jid = rpc(Node, jid, from_binary, [FullJid]),
 1125:     rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]).
 1126: 
 1127: jids(Client) ->
 1128:     FullJid = escalus_client:full_jid(Client),
 1129:     BareJid = escalus_client:short_jid(Client),
 1130:     {FullJid, BareJid}.
 1131: 
 1132: redis_query(Node, Query) ->
 1133:     {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]),
 1134:     rpc(Node, eredis, q, [RedisWorker, Query]).
 1135: 
 1136: %% A fake address we don't try to connect to.
 1137: %% Used in test_advertised_endpoints_override_endpoints testcase.
 1138: advertised_endpoints() ->
 1139:     [
 1140:      {fake_domain(), get_port(reg, gd_endpoint_port)}
 1141:     ].
 1142: 
 1143: fake_domain() ->
 1144:     "somefakedomain.com".
 1145: 
 1146: iptuples_to_string([]) ->
 1147:     [];
 1148: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) ->
 1149:     [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)];
 1150: iptuples_to_string([E | Endps]) ->
 1151:     [E | iptuples_to_string(Endps)].
 1152: 
 1153: endpoint_opts(NodeName, ReceiverPort, Config) ->
 1154:     Endpoints = [endpoint(ReceiverPort)],
 1155:     AdvertisedEndpoints =
 1156:         proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints),
 1157:     #{endpoints => Endpoints,
 1158:       resolved_endpoints => [resolved_endpoint(ReceiverPort)],
 1159:       advertised_endpoints => AdvertisedEndpoints}.
 1160: 
 1161: mock_inet_on_each_node() ->
 1162:     Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()),
 1163:     Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes),
 1164:     true = lists:all(fun(Result) -> Result =:= ok end, Results).
 1165: 
 1166: execute_on_each_node(M, F, A) ->
 1167:     lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()).
 1168: 
 1169: mock_inet() ->
 1170:     %% We don't want to mock inet module itself to avoid strange networking issues
 1171:     meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]),
 1172:     meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]};
 1173:                                                        (_, inet6) -> {error, "No ipv6 address"} end).
 1174: 
 1175: unmock_inet(_Pids) ->
 1176:     execute_on_each_node(meck, unload, [mod_global_distrib_utils]).
 1177: 
 1178: out_connection_sups(Node) ->
 1179:     Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]),
 1180:     lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children).
 1181: 
 1182: trees_for_connections_present() ->
 1183:     AsiaChildren = out_connection_sups(asia_node),
 1184:     Europe1Children = out_connection_sups(europe_node1),
 1185:     Europe2Children = out_connection_sups(europe_node2),
 1186:     lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]).
 1187: 
 1188: tree_for_sup_present(Node, ExpectedSup) ->
 1189:     Children = out_connection_sups(Node),
 1190:     lists:keyfind(ExpectedSup, 1, Children) =/= false.
 1191: 
 1192: 
 1193: %% ------------------------------- rebalancing helpers -----------------------------------
 1194: 
 1195: spawn_connection_getter(SenderNode) ->
 1196:     TestPid = self(),
 1197:     spawn(fun() ->
 1198:                   Conn = get_connection(SenderNode, <<"reg1">>),
 1199:                   TestPid ! Conn
 1200:           end).
 1201: 
 1202: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) ->
 1203:     restart_receiver(ListenNode, [Port, listen_port(ListenNode)]),
 1204:     set_endpoints(ListenNode, [Port, listen_port(ListenNode)]),
 1205:     trigger_rebalance(SenderNode, <<"reg1">>).
 1206: 
 1207: get_connection(SenderNode, ToDomain) ->
 1208:     rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]).
 1209: 
 1210: hide_extra_endpoint(ListenNode) ->
 1211:     set_endpoints(ListenNode, [listen_port(ListenNode)]).
 1212: 
 1213: set_endpoints(ListenNode, Ports) ->
 1214:     Endpoints = [endpoint(Port) || Port <- Ports],
 1215:     {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]).
 1216: 
 1217: get_outgoing_connections(NodeName, DestinationDomain) ->
 1218:     Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]),
 1219:     Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]),
 1220:     Enabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1221:                   get_enabled_endpoints, [DestinationDomain]),
 1222:     Disabled = rpc(NodeName, mod_global_distrib_server_mgr,
 1223:                    get_disabled_endpoints, [DestinationDomain]),
 1224:     PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]),
 1225:     Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ],
 1226:     {Enabled, Disabled, Pools}.
 1227: 
 1228: restart_receiver(NodeName) ->
 1229:     restart_receiver(NodeName, [listen_port(NodeName)]).
 1230: 
 1231: restart_receiver(NodeName, NewPorts) ->
 1232:     OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts,
 1233:                                                   [<<"localhost">>, mod_global_distrib_receiver]),
 1234:     NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts],
 1235:                                resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]},
 1236:     NewOpts = OldOpts#{connections := NewConnOpts},
 1237:     Node = node_spec(NodeName),
 1238:     dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts).
 1239: 
 1240: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) ->
 1241:     %% To ensure that the manager exists,
 1242:     %% otherwise we can get noproc error in the force_refresh call
 1243:     ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup,
 1244:              ensure_server_started, [DestinationDomain]),
 1245:     rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]),
 1246:     StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]),
 1247:     ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]),
 1248:     timer:sleep(1000).
 1249: 
 1250: %% -----------------------------------------------------------------------
 1251: %% Escalus-related helpers
 1252: 
 1253: %% Receive messages with Bodies in any order, skipping presences from stream resumption
 1254: user_receives(User, Bodies) ->
 1255:     Opts = #{pred => fun(Stanza) -> not escalus_pred:is_presence(Stanza) end},
 1256:     Checks = [fun(Stanza) -> escalus_pred:is_chat_message(Body, Stanza) end || Body <- Bodies],
 1257:     escalus:assert_many(Checks, [escalus_connection:receive_stanza(User, Opts) || _ <- Bodies]).
 1258: 
 1259: %% -----------------------------------------------------------------------
 1260: %% Refreshing helpers
 1261: 
 1262: %% Reason is a string
 1263: %% NodeName is asia_node, europe_node2, ... in a format used by this suite.
 1264: refresh_mappings(NodeName, Reason) when is_list(Reason) ->
 1265:     rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]).
 1266: 
 1267: refresh_hosts(NodeNames, Reason) ->
 1268:    [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames].
 1269: 
 1270: 
 1271: %% -----------------------------------------------------------------------
 1272: %% Other helpers
 1273: 
 1274: connect_steps_with_sm() ->
 1275:     [start_stream, stream_features, maybe_use_ssl,
 1276:      authenticate, bind, session, stream_resumption].
 1277: 
 1278: bare_client(Client) ->
 1279:     Client#client{jid = escalus_utils:get_short_jid(Client)}.
 1280: 
 1281: service_port() ->
 1282:     ct:get_config({hosts, mim, service_port}).
 1283: 
 1284: 
 1285: %% -----------------------------------------------------------------------
 1286: %% Waiting helpers
 1287: 
 1288: wait_for_domain(Node, Domain) ->
 1289:     F = fun() ->
 1290:                 Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []),
 1291:                 lists:member(Domain, Domains)
 1292:         end,
 1293:     mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}).
 1294: 
 1295: 
 1296: %% -----------------------------------------------------------------------
 1297: %% Ensure, that endpoints are up
 1298: 
 1299: wait_for_listeners_to_appear() ->
 1300:     [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())].
 1301: 
 1302: receiver_ports(Hosts) ->
 1303:     lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts).
 1304: 
 1305: wait_for_can_connect_to_port(Port) ->
 1306:     Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}},
 1307:     mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts).
 1308: 
 1309: can_connect_to_port(Port) ->
 1310:     case gen_tcp:connect("127.0.0.1", Port, []) of
 1311:         {ok, Sock} ->
 1312:             gen_tcp:close(Sock),
 1313:             true;
 1314:         Other ->
 1315:             ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]),
 1316:             false
 1317:     end.
 1318: 
 1319: %% -----------------------------------------------------------------------
 1320: %% Custom log levels for GD modules during the tests
 1321: 
 1322: enable_logging() ->
 1323:     mim_loglevel:enable_logging(test_hosts(), custom_loglevels()).
 1324: 
 1325: disable_logging() ->
 1326:     mim_loglevel:disable_logging(test_hosts(), custom_loglevels()).
 1327: 
 1328: custom_loglevels() ->
 1329:     %% for "s2s connection to muc.localhost not found" debugging
 1330:     [{ejabberd_s2s, debug},
 1331:     %% for debugging event=refreshing_own_data_done
 1332:      {mod_global_distrib_mapping_redis, info},
 1333:     %% to know if connection is already started or would be started
 1334:     %% event=outgoing_conn_start_progress
 1335:      {mod_global_distrib_outgoing_conns_sup, info},
 1336:     %% to debug bound connection issues
 1337:      {mod_global_distrib, debug},
 1338:     %% to know all new connections pids
 1339:      {mod_global_distrib_connection, debug},
 1340:     %% to check if gc or refresh is triggered
 1341:      {mod_global_distrib_server_mgr, info},
 1342:    %% To debug incoming connections
 1343: %    {mod_global_distrib_receiver, info},
 1344:    %% to debug global session set/delete
 1345:      {mod_global_distrib_mapping, debug}
 1346:     ].
 1347: 
 1348: test_hosts() -> [mim, mim2, reg].