1: %%============================================================================== 2: %% Copyright 2017 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mod_global_distrib_SUITE). 18: 19: -compile([export_all, nowarn_export_all]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("common_test/include/ct.hrl"). 23: -include_lib("eunit/include/eunit.hrl"). 24: -include_lib("escalus/include/escalus_xmlns.hrl"). 25: -include_lib("exml/include/exml.hrl"). 26: 27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms 28: 29: -import(domain_helper, [domain/0]). 30: -import(config_parser_helper, [config/2, mod_config/2]). 31: 32: %%-------------------------------------------------------------------- 33: %% Suite configuration 34: %%-------------------------------------------------------------------- 35: 36: all() -> 37: [ 38: {group, mod_global_distrib}, 39: {group, cluster_restart}, 40: {group, start_checks}, 41: {group, invalidation}, 42: {group, multi_connection}, 43: {group, rebalancing}, 44: {group, advertised_endpoints}, 45: {group, hosts_refresher} 46: ]. 47: 48: groups() -> 49: [{mod_global_distrib, [], 50: [ 51: test_pm_between_users_at_different_locations, 52: test_pm_between_users_before_available_presence, 53: test_component_disconnect , 54: test_component_on_one_host, 55: test_components_in_different_regions, 56: test_hidden_component_disco_in_different_region, 57: test_pm_with_disconnection_on_other_server, 58: test_pm_with_graceful_reconnection_to_different_server, 59: test_pm_with_ungraceful_reconnection_to_different_server, 60: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first, 61: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first, 62: test_component_unregister, 63: test_update_senders_host, 64: test_update_senders_host_by_ejd_service, 65: 66: %% with node 2 disabled 67: test_muc_conversation_on_one_host, 68: test_global_disco 69: ]}, 70: {hosts_refresher, [], 71: [test_host_refreshing]}, 72: {cluster_restart, [], 73: [ 74: test_location_disconnect 75: ]}, 76: {start_checks, [], 77: [ 78: test_error_on_wrong_hosts 79: ]}, 80: {invalidation, [], 81: [ 82: % TODO: Add checks for other mapping refreshes 83: refresh_nodes 84: ]}, 85: {multi_connection, [], 86: [ 87: test_in_order_messages_on_multiple_connections, 88: test_in_order_messages_on_multiple_connections_with_bounce, 89: test_messages_bounced_in_order, 90: 91: %% with node 2 disabled 92: test_muc_conversation_history 93: ]}, 94: {rebalancing, [], 95: [ 96: enable_new_endpoint_on_refresh, 97: disable_endpoint_on_refresh, 98: wait_for_connection, 99: closed_connection_is_removed_from_disabled 100: ]}, 101: {advertised_endpoints, [], 102: [ 103: test_advertised_endpoints_override_endpoints, 104: test_pm_between_users_at_different_locations 105: ]} 106: ]. 107: 108: suite() -> 109: [{require, europe_node1, {hosts, mim, node}}, 110: {require, europe_node2, {hosts, mim2, node}}, 111: {require, asia_node, {hosts, reg, node}}, 112: {require, c2s_port, {hosts, mim, c2s_port}} | 113: escalus:suite()]. 114: 115: %%-------------------------------------------------------------------- 116: %% Init & teardown 117: %%-------------------------------------------------------------------- 118: 119: init_per_suite(Config) -> 120: case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]), 121: rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of 122: {{ok, _}, {ok, _}} -> 123: ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]), 124: 125: enable_logging(), 126: escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]); 127: Result -> 128: ct:pal("Redis check result: ~p", [Result]), 129: {skip, "GD Redis default pool not available"} 130: end. 131: 132: end_per_suite(Config) -> 133: disable_logging(), 134: escalus_fresh:clean(), 135: rpc(europe_node2, mongoose_cluster, leave, []), 136: escalus:end_per_suite(Config). 137: 138: init_per_group(start_checks, Config) -> 139: NodeName = europe_node1, 140: NodeSpec = node_spec(NodeName), 141: Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config), 142: dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]), 143: Config1; 144: init_per_group(multi_connection, Config) -> 145: ExtraConfig = #{bounce => #{resend_after_ms => 20000}, 146: connections => #{%% Disable unused feature to avoid interference 147: connections_per_endpoint => 100, 148: disabled_gc_interval => 10000}}, 149: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 150: init_per_group(invalidation, Config) -> 151: Config1 = init_per_group(invalidation_generic, Config), 152: NodeBin = <<"fake_node@localhost">>, 153: [{node_to_expire, NodeBin} | Config1]; 154: init_per_group(rebalancing, Config) -> 155: %% We need to prevent automatic refreshes, because they may interfere with tests 156: %% and we need early disabled garbage collection to check its validity 157: ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600, 158: endpoint_refresh_interval_when_empty => 3600, 159: disabled_gc_interval => 1}, 160: redis => #{refresh_after => 3600}}, 161: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 162: init_per_group(advertised_endpoints, Config) -> 163: lists:foreach(fun({NodeName, _, _}) -> 164: Node = ct:get_config(NodeName), 165: mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload) 166: end, get_hosts()), 167: mock_inet_on_each_node(), 168: init_per_group_generic( 169: [{add_advertised_endpoints, 170: [{asia_node, advertised_endpoints()}]} | Config]); 171: init_per_group(mod_global_distrib, Config) -> 172: %% Disable mod_global_distrib_mapping_redis refresher 173: ExtraConfig = #{redis => #{refresh_after => 3600}}, 174: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 175: init_per_group(_, Config) -> 176: init_per_group_generic(Config). 177: 178: init_per_group_generic(Config0) -> 179: Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()), 180: wait_for_listeners_to_appear(), 181: {SomeNode, _, _} = hd(get_hosts()), 182: NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []), 183: [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2]. 184: 185: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) -> 186: Extra0 = module_opts(?config(extra_config, Config0)), 187: EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0), 188: ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})), 189: Extra = Extra0#{local_host => LocalHost, connections => ConnExtra}, 190: Opts = module_opts(Extra), 191: 192: VirtHosts = virtual_hosts(NodeName), 193: Node = node_spec(NodeName), 194: Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0), 195: 196: %% To reduce load when sending many messages 197: ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last, 198: mod_stream_management], 199: [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts], 200: 201: SMOpts = config_parser_helper:mod_config(mod_stream_management, #{resume_timeout => 1}), 202: dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts}, 203: {mod_stream_management, SMOpts}]), 204: Config1. 205: 206: module_opts(ExtraOpts) -> 207: lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]). 208: 209: set_opts(common, Opts) -> 210: maps:merge(#{global_host => <<"localhost">>, 211: hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts); 212: set_opts(defaults, Opts) -> 213: mod_config(mod_global_distrib, Opts); 214: set_opts(connections, #{connections := ConnExtra} = Opts) -> 215: TLSOpts = config([modules, mod_global_distrib, connections, tls], 216: #{certfile => "priv/ssl/fake_server.pem", 217: cacertfile => "priv/ssl/ca/cacert.pem"}), 218: Opts#{connections := config([modules, mod_global_distrib, connections], 219: maps:merge(#{tls => TLSOpts}, ConnExtra))}; 220: set_opts(redis, #{redis := RedisExtra} = Opts) -> 221: Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)}; 222: set_opts(bounce, #{bounce := BounceExtra} = Opts) -> 223: Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)}; 224: set_opts(_, Opts) -> 225: Opts. 226: 227: end_per_group(advertised_endpoints, Config) -> 228: Pids = ?config(meck_handlers, Config), 229: unmock_inet(Pids), 230: escalus_fresh:clean(), 231: end_per_group_generic(Config); 232: end_per_group(start_checks, Config) -> 233: escalus_fresh:clean(), 234: Config; 235: end_per_group(invalidation, Config) -> 236: redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config), 237: ?config(node_to_expire, Config)]), 238: end_per_group_generic(Config); 239: end_per_group(_, Config) -> 240: end_per_group_generic(Config). 241: 242: end_per_group_generic(Config) -> 243: dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config). 244: 245: init_per_testcase(CaseName, Config) 246: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 247: CaseName == test_muc_conversation_history -> 248: %% There is no helper to load MUC on node2 249: %% For now it's easier to hide node2 250: %% TODO: Do it right at some point! 251: hide_node(europe_node2, Config), 252: %% There would be no new connections to europe_node2, but there can be some old ones. 253: %% We need to disconnect previous connections. 254: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 255: trigger_rebalance(asia_node, EuropeHost), 256: %% Load muc on mim node 257: muc_helper:load_muc(), 258: RegNode = ct:get_config({hosts, reg, node}), 259: %% Wait for muc.localhost to become visible from reg node 260: wait_for_domain(RegNode, muc_helper:muc_host()), 261: escalus:init_per_testcase(CaseName, Config); 262: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 263: CN == test_pm_with_ungraceful_reconnection_to_different_server; 264: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 265: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 266: escalus:init_per_testcase(CN, init_user_eve(Config)); 267: init_per_testcase(CaseName, Config) -> 268: escalus:init_per_testcase(CaseName, Config). 269: 270: init_user_eve(Config) -> 271: %% Register Eve in reg cluster 272: EveSpec = escalus_fresh:create_fresh_user(Config, eve), 273: MimPort = ct:get_config({hosts, mim, c2s_port}), 274: EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}), 275: %% Register Eve in mim cluster 276: escalus:create_users(Config, [{eve, EveSpec2}]), 277: [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config]. 278: 279: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 280: CN == test_pm_with_ungraceful_reconnection_to_different_server; 281: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 282: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 283: MimEveSpec = ?config(evespec_mim, Config), 284: %% Clean Eve from reg cluster 285: escalus_fresh:clean(), 286: %% Clean Eve from mim cluster 287: %% For shared databases (i.e. mysql, pgsql...), 288: %% removing from one cluster would remove from all clusters. 289: %% For mnesia auth backend we need to call removal from each cluster. 290: %% That's why there is a catch here. 291: catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]), 292: generic_end_per_testcase(CN, Config); 293: end_per_testcase(CaseName, Config) 294: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 295: CaseName == test_muc_conversation_history -> 296: refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)), 297: muc_helper:unload_muc(), 298: generic_end_per_testcase(CaseName, Config); 299: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) -> 300: refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 301: generic_end_per_testcase(CN, Config); 302: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh; 303: CN == disable_endpoint_on_refresh; 304: CN == wait_for_connection; 305: CN == closed_connection_is_removed_from_disabled -> 306: restart_receiver(asia_node), 307: refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 308: generic_end_per_testcase(CN, Config); 309: end_per_testcase(CaseName, Config) -> 310: generic_end_per_testcase(CaseName, Config). 311: 312: generic_end_per_testcase(CaseName, Config) -> 313: lists:foreach( 314: fun({NodeName, _, _}) -> 315: %% TODO: Enable refresher only for specific test cases, 316: %% as some of them are based on assumption that node(s) 317: %% must open new connections during tests. 318: pause_refresher(NodeName, CaseName), 319: Node = ct:get_config(NodeName), 320: SupRef = {mod_global_distrib_outgoing_conns_sup, Node}, 321: try 322: OutgoingConns = supervisor:which_children(SupRef), 323: lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) -> 324: skip; 325: ({Id, _, _, _}) -> 326: supervisor:terminate_child(SupRef, Id) 327: end, OutgoingConns), 328: [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] = 329: supervisor:which_children(SupRef) 330: catch 331: _:{noproc, _} -> 332: ct:pal("Sender supervisor not found in ~p", [NodeName]) 333: end, 334: unpause_refresher(NodeName, CaseName) 335: end, 336: get_hosts()), 337: escalus:end_per_testcase(CaseName, Config). 338: 339: virtual_hosts(asia_node) -> 340: [domain()]; 341: virtual_hosts(_) -> 342: [domain(), secondary_domain()]. 343: 344: secondary_domain() -> 345: ct:get_config({hosts, mim, secondary_domain}). 346: 347: %% Refresher is not started at all or stopped for some test cases 348: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 349: pause_refresher(_, test_error_on_wrong_hosts) -> 350: ok; 351: pause_refresher(asia_node, test_location_disconnect) -> 352: ok; 353: pause_refresher(NodeName, _) -> 354: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []). 355: 356: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 357: unpause_refresher(_, test_error_on_wrong_hosts) -> 358: ok; 359: unpause_refresher(asia_node, test_location_disconnect) -> 360: ok; 361: unpause_refresher(NodeName, _) -> 362: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []). 363: 364: %%-------------------------------------------------------------------- 365: %% Service discovery test 366: %%-------------------------------------------------------------------- 367: 368: %% Requires module mod_global_distrib to be started with argument advertised_endpoints 369: %% for each host in get_hosts(). 370: %% Reads Redis to confirm that endpoints (in Redis) are overwritten 371: %% with `advertised_endpoints` option value 372: test_advertised_endpoints_override_endpoints(_Config) -> 373: Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]), 374: true = lists:all( 375: fun(E) -> 376: lists:sort(E) =:= lists:sort(advertised_endpoints()) 377: end, Endps). 378: 379: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if 380: %% it goes down for some reason (crash or domain unavailability). 381: %% Also actually verifies that refresher properly reads host list 382: %% from backend and starts appropriate pool. 383: test_host_refreshing(_Config) -> 384: mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true, 385: #{name => trees_for_connections_present}), 386: ConnectionSups = out_connection_sups(asia_node), 387: {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 388: EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]), 389: {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups), 390: erlang:exit(EuropePid, kill), % it's ok to kill temporary process 391: mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true, 392: #{name => tree_for_sup_present}). 393: 394: %% When run in mod_global_distrib group - tests simple case of connection 395: %% between two users connected to different clusters. 396: %% When run in advertised_endpoints group it tests whether it is possible 397: %% to connect to a node that is advertising itself with a domain name. 398: test_pm_between_users_at_different_locations(Config) -> 399: escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2). 400: 401: test_pm_between_users_before_available_presence(Config) -> 402: Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]), 403: {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>), 404: {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>), 405: 406: test_two_way_pm(Alice, Eve), 407: 408: escalus_client:stop(Config1, Alice), 409: escalus_client:stop(Config1, Eve). 410: 411: test_two_way_pm(Alice, Eve) -> 412: %% Ensure that users are properly registered 413: %% Otherwise you can get "Unable to route global message... user not found in the routing table" 414: %% error, because "escalus_client:start" can return before SM registration is completed. 415: wait_for_registration(Alice, ct:get_config({hosts, mim, node})), 416: wait_for_registration(Eve, ct:get_config({hosts, reg, node})), 417: 418: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)), 419: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)), 420: 421: FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)), 422: FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)), 423: 424: AliceJid = escalus_client:full_jid(Alice), 425: EveJid = escalus_client:full_jid(Eve), 426: 427: escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>], 428: FromAlice), 429: escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>], 430: FromEve). 431: 432: test_muc_conversation_on_one_host(Config0) -> 433: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 434: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 435: escalus:fresh_story( 436: Config, [{eve, 1}], 437: fun(Eve) -> 438: Alice = connect_from_spec(AliceSpec, Config), 439: 440: RoomJid = ?config(room, Config), 441: AliceUsername = escalus_utils:get_username(Alice), 442: EveUsername = escalus_utils:get_username(Eve), 443: RoomAddr = muc_helper:room_address(RoomJid), 444: 445: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 446: escalus:wait_for_stanza(Alice), 447: 448: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 449: [_, _, _] = escalus:wait_for_stanzas(Eve, 3), 450: 451: Msg= <<"Hi, Eve!">>, 452: escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)), 453: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)), 454: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)), 455: 456: Msg2= <<"Hi, Alice!">>, 457: escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)), 458: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)), 459: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice)) 460: end), 461: muc_helper:destroy_room(Config). 462: 463: test_muc_conversation_history(Config0) -> 464: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 465: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 466: escalus:fresh_story( 467: Config, [{eve, 1}], 468: fun(Eve) -> 469: Alice = connect_from_spec(AliceSpec, Config), 470: 471: RoomJid = ?config(room, Config), 472: AliceUsername = escalus_utils:get_username(Alice), 473: RoomAddr = muc_helper:room_address(RoomJid), 474: 475: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 476: %% We don't care about presences from Alice, escalus would filter them out 477: wait_for_subject(Alice), 478: 479: send_n_muc_messages(Alice, RoomAddr, 3), 480: 481: %% Ensure that the messages are received by the room 482: %% before trying to login Eve. 483: %% Otherwise, Eve would receive some messages from history and 484: %% some as regular groupchat messages. 485: receive_n_muc_messages(Alice, 3), 486: 487: EveUsername = escalus_utils:get_username(Eve), 488: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 489: 490: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 491: wait_for_muc_presence(Eve, RoomJid, EveUsername), 492: 493: %% XEP-0045: After sending the presence broadcast (and only after doing so), 494: %% the service MAY then send discussion history, the room subject, 495: %% live messages, presence updates, and other in-room traffic. 496: receive_n_muc_messages(Eve, 3), 497: wait_for_subject(Eve) 498: end), 499: muc_helper:destroy_room(Config). 500: 501: wait_for_muc_presence(User, RoomJid, FromNickname) -> 502: Presence = escalus:wait_for_stanza(User), 503: escalus:assert(is_presence, Presence), 504: escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence), 505: ok. 506: 507: wait_for_subject(User) -> 508: Subject = escalus:wait_for_stanza(User), 509: escalus:assert(is_groupchat_message, Subject), 510: ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)), 511: ok. 512: 513: send_n_muc_messages(User, RoomAddr, N) -> 514: lists:foreach(fun(I) -> 515: Msg = <<"test-", (integer_to_binary(I))/binary>>, 516: escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg)) 517: end, lists:seq(1, N)). 518: 519: receive_n_muc_messages(User, N) -> 520: lists:foreach(fun(J) -> 521: Msg = <<"test-", (integer_to_binary(J))/binary>>, 522: Stanza = escalus:wait_for_stanza(User), 523: escalus:assert(is_groupchat_message, [Msg], Stanza) 524: end, lists:seq(1, N)). 525: 526: test_component_on_one_host(Config) -> 527: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 528: {port, service_port()}, {component, <<"test_service">>}], 529: 530: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 531: 532: Story = fun(User) -> 533: Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>), 534: escalus:send(User, Msg1), 535: %% Then component receives it 536: Reply1 = escalus:wait_for_stanza(Comp), 537: escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1), 538: 539: %% When components sends a reply 540: Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>), 541: escalus:send(Comp, escalus_stanza:from(Msg2, Addr)), 542: 543: %% Then Alice receives it 544: Reply2 = escalus:wait_for_stanza(User), 545: escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2), 546: escalus:assert(is_stanza_from, [Addr], Reply2) 547: end, 548: 549: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 550: 551: %% Ensures that 2 components in distinct data centers can communicate. 552: test_components_in_different_regions(_Config) -> 553: ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>}, 554: {server, <<"localhost">>}, {component, <<"test_service">>}], 555: Comp1Port = ct:get_config({hosts, mim, service_port}), 556: Comp2Port = ct:get_config({hosts, reg, service_port}), 557: Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig], 558: Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig], 559: 560: {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config), 561: {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config), 562: 563: Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1), 564: escalus:send(Comp1, Msg1), 565: GotMsg1 = escalus:wait_for_stanza(Comp2), 566: escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1), 567: 568: Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2), 569: escalus:send(Comp2, Msg2), 570: GotMsg2 = escalus:wait_for_stanza(Comp1), 571: escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2). 572: 573: %% Ordinary user is not able to discover the hidden component from GD 574: test_hidden_component_disco_in_different_region(Config) -> 575: %% Hidden component from component_SUITE connects to mim1/europe_node1 576: HiddenComponentConfig = component_helper:spec(hidden_component, Config), 577: {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig), 578: 579: escalus:fresh_story( 580: Config, [{eve, 1}], 581: fun(Eve) -> 582: EveServer = escalus_client:server(Eve), 583: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 584: DiscoReply = escalus:wait_for_stanza(Eve), 585: escalus:assert(is_iq_result, DiscoReply), 586: escalus:assert(fun(Stanza) -> 587: not escalus_pred:has_service(HiddenAddr, Stanza) 588: end, DiscoReply) 589: end). 590: 591: test_component_disconnect(Config) -> 592: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 593: {port, service_port()}, {component, <<"test_service">>}], 594: 595: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 596: component_helper:disconnect_component(Comp, Addr), 597: 598: Story = fun(User) -> 599: escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)), 600: Error = escalus:wait_for_stanza(User, 5000), 601: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 602: end, 603: 604: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 605: 606: test_location_disconnect(Config) -> 607: try 608: escalus:fresh_story( 609: Config, [{alice, 1}, {eve, 1}], 610: fun(Alice, Eve) -> 611: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 612: 613: escalus_client:wait_for_stanza(Eve), 614: 615: ok = rpc(asia_node, application, stop, [mongooseim]), 616: %% TODO: Stopping mongooseim alone should probably stop connections too 617: ok = rpc(asia_node, application, stop, [ranch]), 618: 619: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), 620: Error = escalus:wait_for_stanza(Alice), 621: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 622: end) 623: after 624: rpc(asia_node, application, start, [ranch]), 625: rpc(asia_node, application, start, [mongooseim]) 626: end. 627: 628: test_pm_with_disconnection_on_other_server(Config) -> 629: escalus:fresh_story( 630: Config, [{alice, 1}, {eve, 1}], 631: fun(Alice, Eve) -> 632: escalus_connection:stop(Eve), 633: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 634: FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000), 635: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce) 636: end). 637: 638: test_pm_with_graceful_reconnection_to_different_server(Config) -> 639: EveSpec = ?config(evespec_reg, Config), 640: EveSpec2 = ?config(evespec_mim, Config), 641: escalus:fresh_story( 642: Config, [{alice, 1}], 643: fun(Alice) -> 644: Eve = connect_from_spec(EveSpec, Config), 645: 646: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)), 647: 648: %% Stop connection and wait for process to die 649: EveNode = ct:get_config({hosts, reg, node}), 650: mongoose_helper:logout_user(Config, Eve, #{node => EveNode}), 651: 652: FromEve = escalus_client:wait_for_stanza(Alice), 653: 654: %% Pause Alice until Eve is reconnected 655: AliceNode = ct:get_config({hosts, mim, node}), 656: C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}), 657: ok = rpc(asia_node, sys, suspend, [C2sPid]), 658: 659: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)), 660: 661: NewEve = connect_from_spec(EveSpec2, Config), 662: EveNode2 = ct:get_config({hosts, mim, node}), 663: wait_for_registration(NewEve, EveNode2), 664: 665: ok = rpc(asia_node, sys, resume, [C2sPid]), 666: 667: 668: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)), 669: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)), 670: 671: FirstFromAlice = escalus_client:wait_for_stanza(NewEve), 672: AgainFromEve = escalus_client:wait_for_stanza(Alice), 673: SecondFromAlice = escalus_client:wait_for_stanza(NewEve), 674: 675: [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]), 676: 677: escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice), 678: escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve), 679: escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice), 680: escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve) 681: end). 682: 683: %% Refresh logic can cause two possible behaviours. 684: %% We test both behaviours here (plus no refresh case) 685: %% See PR #2392 686: test_pm_with_ungraceful_reconnection_to_different_server(Config) -> 687: %% No refresh 688: BeforeResume = fun() -> ok end, 689: AfterCheck = fun(Alice, NewEve) -> 690: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 691: user_receives(Alice, [<<"Hi from Europe!">>]) 692: end, 693: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 694: 695: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) -> 696: %% Same as no refresh 697: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first", 698: % Order of nodes is important here in refresh_hosts! 699: BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end, 700: AfterCheck = fun(Alice, NewEve) -> 701: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 702: user_receives(Alice, [<<"Hi from Europe!">>]) 703: end, 704: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 705: 706: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) -> 707: %% Asia node overrides Europe value with the older ones, 708: %% so we loose some messages during rerouting :( 709: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first", 710: BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end, 711: AfterCheck = fun(Alice, NewEve) -> 712: user_receives(NewEve, [<<"Hi again from Europe1!">>]), 713: user_receives(Alice, [<<"Hi from Europe!">>]) 714: end, 715: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 716: 717: %% Reconnect Eve from asia (reg cluster) to europe (mim) 718: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) -> 719: Config = escalus_users:update_userspec(Config0, eve, stream_management, true), 720: EveSpec = ?config(evespec_reg, Config), 721: EveSpec2 = ?config(evespec_mim, Config), 722: escalus:fresh_story( 723: Config, [{alice, 1}], 724: fun(Alice) -> 725: {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()), 726: escalus_story:send_initial_presence(Eve), 727: escalus_client:wait_for_stanza(Eve), 728: 729: %% Stop connection and wait for process to die 730: EveNode = ct:get_config({hosts, reg, node}), 731: C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}), 732: ok = rpc(asia_node, sys, suspend, [C2sPid]), 733: 734: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)), 735: 736: %% Wait for route message to be queued in c2s message queue 737: mongoose_helper:wait_for_route_message_count(C2sPid, 1), 738: 739: %% Time to do bad nasty things with our socket, so once our process wakes up, 740: %% it SHOULD detect a dead socket 741: escalus_connection:kill(Eve), 742: 743: %% Connect another one, we hope the message would be rerouted 744: NewEve = connect_from_spec(EveSpec2, Config), 745: EveNode2 = ct:get_config({hosts, mim, node}), 746: wait_for_registration(NewEve, EveNode2), 747: 748: BeforeResume(), 749: 750: %% Trigger rerouting 751: ok = rpc(asia_node, sys, resume, [C2sPid]), 752: C2sPid ! resume_timeout, 753: 754: %% Let C2sPid to process the message and reroute (and die finally, poor little thing) 755: mongoose_helper:wait_for_pid_to_die(C2sPid), 756: 757: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)), 758: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)), 759: 760: AfterCheck(Alice, NewEve) 761: end). 762: 763: test_global_disco(Config) -> 764: escalus:fresh_story( 765: Config, [{alice, 1}, {eve, 1}], 766: fun(Alice, Eve) -> 767: AliceServer = escalus_client:server(Alice), 768: escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)), 769: _AliceStanza = escalus:wait_for_stanza(Alice), 770: %% TODO: test for duplicate components 771: %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza), 772: 773: EveServer = escalus_client:server(Eve), 774: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 775: EveStanza = escalus:wait_for_stanza(Eve), 776: escalus:assert(has_service, [muc_helper:muc_host()], EveStanza) 777: end). 778: 779: test_component_unregister(_Config) -> 780: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 781: {port, service_port()}, {component, <<"test_service">>}], 782: 783: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 784: ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 785: [<<"test_service.localhost">>])), 786: 787: component_helper:disconnect_component(Comp, Addr), 788: 789: ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 790: [<<"test_service.localhost">>])). 791: 792: test_error_on_wrong_hosts(_Config) -> 793: Opts = module_opts(#{local_host => <<"no_such_host">>}), 794: ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed, 795: domain := <<"no_such_host">>}, _}}}, 796: dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>, 797: [{mod_global_distrib, Opts}])). 798: 799: refresh_nodes(Config) -> 800: NodesKey = ?config(nodes_key, Config), 801: NodeBin = ?config(node_to_expire, Config), 802: redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]), 803: refresh_mappings(europe_node1, "by_refresh_nodes"), 804: {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]). 805: 806: test_in_order_messages_on_multiple_connections(Config) -> 807: escalus:fresh_story( 808: Config, [{alice, 1}, {eve, 1}], 809: fun(Alice, Eve) -> 810: Seq = lists:seq(1, 100), 811: lists:foreach( 812: fun(I) -> 813: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 814: escalus_client:send(Alice, Stanza) 815: end, 816: Seq), 817: lists:foreach( 818: fun(I) -> 819: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 820: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 821: end, 822: Seq) 823: end). 824: 825: test_in_order_messages_on_multiple_connections_with_bounce(Config) -> 826: escalus:fresh_story( 827: Config, [{alice, 1}, {eve, 1}], 828: fun(Alice, Eve) -> 829: %% Send 99 messages, some while server knows the mapping and some when it doesn't 830: send_steps(Alice, Eve, 99, <<"reg1">>), 831: %% Make sure that the last message is sent when the mapping is known 832: set_mapping(europe_node1, Eve, <<"reg1">>), 833: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)), 834: 835: %% Check that all stanzas were received in order 836: lists:foreach( 837: fun(I) -> 838: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 839: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 840: end, 841: lists:seq(1, 100)) 842: end). 843: 844: test_messages_bounced_in_order(Config) -> 845: escalus:fresh_story( 846: Config, [{alice, 1}, {eve, 1}], 847: fun(Alice, Eve) -> 848: %% Make sure all messages land in bounce storage 849: delete_mapping(europe_node1, Eve), 850: 851: Seq = lists:seq(1, 100), 852: lists:foreach( 853: fun(I) -> 854: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 855: escalus_client:send(Alice, Stanza) 856: end, 857: Seq), 858: 859: %% Restore the mapping so that bounce eventually succeeds 860: ?assertEqual(undefined, get_mapping(europe_node1, Eve)), 861: set_mapping(europe_node1, Eve, <<"reg1">>), 862: 863: lists:foreach( 864: fun(I) -> 865: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 866: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 867: end, 868: Seq) 869: end). 870: 871: test_update_senders_host(Config) -> 872: escalus:fresh_story( 873: Config, [{alice, 1}, {eve, 1}], 874: fun(Alice, Eve) -> 875: AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]), 876: {ok, <<"localhost.bis">>} 877: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]), 878: ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]), 879: wait_for_node(asia_node, AliceJid), 880: 881: %% TODO: Should prevent Redis refresher from executing for a moment, 882: %% as it may collide with this test. 883: 884: escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)), 885: escalus:wait_for_stanza(Eve), 886: 887: {ok, <<"localhost.bis">>} 888: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]) 889: end). 890: wait_for_node(Node,Jid) -> 891: mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end, 892: error, 893: #{time_left => timer:seconds(10), 894: sleep_time => timer:seconds(1), 895: name => rpc}). 896: 897: test_update_senders_host_by_ejd_service(Config) -> 898: refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"), 899: %% Connects to europe_node1 900: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 901: {port, service_port()}, {component, <<"test_service">>}], 902: 903: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 904: 905: escalus:fresh_story( 906: Config, [{eve, 1}], 907: fun(Eve) -> 908: %% Eve is connected to asia_node 909: EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]), 910: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 911: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]), 912: 913: ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]), 914: wait_for_node(europe_node1, EveJid), 915: wait_for_node(europe_node2, EveJid), 916: 917: %% Component is connected to europe_node1 918: %% but we force asia_node to connect to europe_node2 by hiding europe_node1 919: %% and forcing rebalance (effectively disabling connections to europe_node1) 920: %% to verify routing cache update on both nodes 921: 922: %% TODO: Should prevent Redis refresher from executing for a moment, 923: %% as it may collide with this test. 924: 925: hide_node(europe_node1, Config), 926: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 927: trigger_rebalance(asia_node, EuropeHost), 928: 929: escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)), 930: escalus:wait_for_stanza(Comp), 931: 932: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 933: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]) 934: end). 935: 936: %% -------------------------------- Rebalancing -------------------------------- 937: 938: enable_new_endpoint_on_refresh(Config) -> 939: get_connection(europe_node1, <<"reg1">>), 940: 941: {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 942: 943: ExtraPort = get_port(reg, gd_extra_endpoint_port), 944: NewEndpoint = resolved_endpoint(ExtraPort), 945: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 946: 947: {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 948: 949: %% One new pool and one new endpoint 950: [NewEndpoint] = Pools2 -- Pools1, 951: [] = Pools1 -- Pools2, 952: [NewEndpoint] = Enabled2 -- Enabled1, 953: [] = Enabled1 -- Enabled2. 954: 955: disable_endpoint_on_refresh(Config) -> 956: ExtraPort = get_port(reg, gd_extra_endpoint_port), 957: NewEndpoint = resolved_endpoint(ExtraPort), 958: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 959: 960: get_connection(europe_node1, <<"reg1">>), 961: 962: {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 963: [_, _] = Enabled1, 964: [] = Disabled1, 965: 966: hide_extra_endpoint(asia_node), 967: trigger_rebalance(europe_node1, <<"reg1">>), 968: 969: {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 970: 971: %% 2 pools open even after disable 972: [] = Pools1 -- Pools2, 973: [] = Pools2 -- Pools1, 974: %% NewEndpoint is no longer enabled 975: [] = Enabled2 -- Enabled1, 976: [NewEndpoint] = Enabled1 -- Enabled2, 977: %% NewEndpoint is now disabled 978: [] = Disabled1, 979: [NewEndpoint] = Disabled2. 980: 981: wait_for_connection(_Config) -> 982: set_endpoints(asia_node, []), 983: %% Because of hosts refresher, a pool of connections to asia_node 984: %% may already be present here 985: mongoose_helper:wait_until( 986: fun () -> 987: try trigger_rebalance(europe_node1, <<"reg1">>), true 988: catch _:_ -> false end 989: end, 990: true, 991: #{name => rebalance, time_left => timer:seconds(5)}), 992: 993: spawn_connection_getter(europe_node1), 994: 995: receive 996: Unexpected1 -> error({unexpected, Unexpected1}) 997: after 998: 2000 -> ok 999: end, 1000: 1001: refresh_mappings(asia_node, "by_wait_for_connection"), 1002: trigger_rebalance(europe_node1, <<"reg1">>), 1003: 1004: receive 1005: Conn when is_pid(Conn) -> ok; 1006: Unexpected2 -> error({unexpected, Unexpected2}) 1007: after 1008: 5000 -> error(timeout) 1009: end. 1010: 1011: closed_connection_is_removed_from_disabled(_Config) -> 1012: get_connection(europe_node1, <<"reg1">>), 1013: set_endpoints(asia_node, []), 1014: trigger_rebalance(europe_node1, <<"reg1">>), 1015: 1016: {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>), 1017: 1018: % Will drop connections and prevent them from reconnecting 1019: restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]), 1020: 1021: mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end, 1022: {[], [], []}, 1023: #{name => get_outgoing_connections}). 1024: 1025: 1026: %%-------------------------------------------------------------------- 1027: %% Test helpers 1028: %%-------------------------------------------------------------------- 1029: 1030: get_port(Host, Param) -> 1031: case ct:get_config({hosts, Host, Param}) of 1032: Port when is_integer(Port) -> 1033: Port; 1034: Other -> 1035: ct:fail({get_port_failed, Host, Param, Other}) 1036: end. 1037: 1038: get_hosts() -> 1039: [ 1040: {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)}, 1041: {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)}, 1042: {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)} 1043: ]. 1044: 1045: listen_endpoint(NodeName) -> 1046: endpoint(listen_port(NodeName)). 1047: 1048: listen_port(NodeName) -> 1049: {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()), 1050: Port. 1051: 1052: resolved_endpoint(Port) when is_integer(Port) -> 1053: {{127, 0, 0, 1}, Port}. 1054: 1055: endpoint(Port) when is_integer(Port) -> 1056: {"127.0.0.1", Port}. 1057: 1058: %% For dynamic_modules 1059: node_spec(NodeName) -> 1060: #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}. 1061: 1062: rpc(NodeName, M, F, A) -> 1063: Node = ct:get_config(NodeName), 1064: mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)). 1065: 1066: hide_node(NodeName, Config) -> 1067: NodesKey = ?config(nodes_key, Config), 1068: NodeBin = atom_to_binary(ct:get_config(NodeName), latin1), 1069: {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]). 1070: 1071: connect_from_spec(UserSpec, Config) -> 1072: {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>), 1073: escalus_connection:set_filter_predicate(User, fun(S) -> not escalus_pred:is_presence(S) end), 1074: escalus_story:send_initial_presence(User), 1075: User. 1076: 1077: chat_with_seqnum(To, Text) -> 1078: escalus_stanza:set_id(escalus_stanza:chat_to(To, Text), 1079: integer_to_binary(erlang:monotonic_time())). 1080: 1081: order_by_seqnum(Stanzas) -> 1082: lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end, 1083: Stanzas). 1084: 1085: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) -> 1086: Pred = fun(Item) -> 1087: exml_query:attr(Item, <<"jid">>) =:= Service 1088: end, 1089: case lists:filter(Pred, Services) of 1090: [_] -> true; 1091: _ -> false 1092: end. 1093: 1094: send_steps(From, To, Max, ToHost) -> 1095: next_send_step(From, To, 1, Max, Max div 10, true, ToHost). 1096: 1097: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok; 1098: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) -> 1099: ct:log("Reset: I: ~B", [I]), 1100: case KnowsMapping of 1101: true -> delete_mapping(europe_node1, To); 1102: false -> set_mapping(europe_node1, To, ToHost) 1103: end, 1104: next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost); 1105: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) -> 1106: ct:log("I: ~B ~B ~B", [I, Max, ToReset]), 1107: Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)), 1108: escalus_client:send(From, Stanza), 1109: next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost). 1110: 1111: get_mapping(Node, Client) -> 1112: {FullJid, _BareJid} = jids(Client), 1113: {ok, What} = redis_query(Node, [<<"GET">>, FullJid]), 1114: What. 1115: 1116: %% Warning! May not work properly with alice or any other user whose 1117: %% stringprepped JID is different than original one 1118: delete_mapping(Node, Client) -> 1119: {FullJid, BareJid} = jids(Client), 1120: redis_query(Node, [<<"DEL">>, FullJid, BareJid]), 1121: Jid = rpc(Node, jid, from_binary, [FullJid]), 1122: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1123: 1124: set_mapping(Node, Client, Mapping) -> 1125: {FullJid, BareJid} = jids(Client), 1126: redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]), 1127: Jid = rpc(Node, jid, from_binary, [FullJid]), 1128: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1129: 1130: jids(Client) -> 1131: FullJid = escalus_client:full_jid(Client), 1132: BareJid = escalus_client:short_jid(Client), 1133: {FullJid, BareJid}. 1134: 1135: redis_query(Node, Query) -> 1136: {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]), 1137: rpc(Node, eredis, q, [RedisWorker, Query]). 1138: 1139: %% A fake address we don't try to connect to. 1140: %% Used in test_advertised_endpoints_override_endpoints testcase. 1141: advertised_endpoints() -> 1142: [ 1143: {fake_domain(), get_port(reg, gd_endpoint_port)} 1144: ]. 1145: 1146: fake_domain() -> 1147: "somefakedomain.com". 1148: 1149: iptuples_to_string([]) -> 1150: []; 1151: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) -> 1152: [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)]; 1153: iptuples_to_string([E | Endps]) -> 1154: [E | iptuples_to_string(Endps)]. 1155: 1156: endpoint_opts(NodeName, ReceiverPort, Config) -> 1157: Endpoints = [endpoint(ReceiverPort)], 1158: AdvertisedEndpoints = 1159: proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints), 1160: #{endpoints => Endpoints, 1161: resolved_endpoints => [resolved_endpoint(ReceiverPort)], 1162: advertised_endpoints => AdvertisedEndpoints}. 1163: 1164: mock_inet_on_each_node() -> 1165: Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()), 1166: Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes), 1167: true = lists:all(fun(Result) -> Result =:= ok end, Results). 1168: 1169: execute_on_each_node(M, F, A) -> 1170: lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()). 1171: 1172: mock_inet() -> 1173: %% We don't want to mock inet module itself to avoid strange networking issues 1174: meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]), 1175: meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]}; 1176: (_, inet6) -> {error, "No ipv6 address"} end). 1177: 1178: unmock_inet(_Pids) -> 1179: execute_on_each_node(meck, unload, [mod_global_distrib_utils]). 1180: 1181: out_connection_sups(Node) -> 1182: Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]), 1183: lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children). 1184: 1185: trees_for_connections_present() -> 1186: AsiaChildren = out_connection_sups(asia_node), 1187: Europe1Children = out_connection_sups(europe_node1), 1188: Europe2Children = out_connection_sups(europe_node2), 1189: lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]). 1190: 1191: tree_for_sup_present(Node, ExpectedSup) -> 1192: Children = out_connection_sups(Node), 1193: lists:keyfind(ExpectedSup, 1, Children) =/= false. 1194: 1195: 1196: %% ------------------------------- rebalancing helpers ----------------------------------- 1197: 1198: spawn_connection_getter(SenderNode) -> 1199: TestPid = self(), 1200: spawn(fun() -> 1201: Conn = get_connection(SenderNode, <<"reg1">>), 1202: TestPid ! Conn 1203: end). 1204: 1205: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) -> 1206: restart_receiver(ListenNode, [Port, listen_port(ListenNode)]), 1207: set_endpoints(ListenNode, [Port, listen_port(ListenNode)]), 1208: trigger_rebalance(SenderNode, <<"reg1">>). 1209: 1210: get_connection(SenderNode, ToDomain) -> 1211: rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]). 1212: 1213: hide_extra_endpoint(ListenNode) -> 1214: set_endpoints(ListenNode, [listen_port(ListenNode)]). 1215: 1216: set_endpoints(ListenNode, Ports) -> 1217: Endpoints = [endpoint(Port) || Port <- Ports], 1218: {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]). 1219: 1220: get_outgoing_connections(NodeName, DestinationDomain) -> 1221: Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]), 1222: Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]), 1223: Enabled = rpc(NodeName, mod_global_distrib_server_mgr, 1224: get_enabled_endpoints, [DestinationDomain]), 1225: Disabled = rpc(NodeName, mod_global_distrib_server_mgr, 1226: get_disabled_endpoints, [DestinationDomain]), 1227: PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]), 1228: Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ], 1229: {Enabled, Disabled, Pools}. 1230: 1231: restart_receiver(NodeName) -> 1232: restart_receiver(NodeName, [listen_port(NodeName)]). 1233: 1234: restart_receiver(NodeName, NewPorts) -> 1235: OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts, 1236: [<<"localhost">>, mod_global_distrib_receiver]), 1237: NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts], 1238: resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]}, 1239: NewOpts = OldOpts#{connections := NewConnOpts}, 1240: Node = node_spec(NodeName), 1241: dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts). 1242: 1243: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) -> 1244: %% To ensure that the manager exists, 1245: %% otherwise we can get noproc error in the force_refresh call 1246: ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup, 1247: ensure_server_started, [DestinationDomain]), 1248: rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]), 1249: StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]), 1250: ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]), 1251: timer:sleep(1000). 1252: 1253: %% ----------------------------------------------------------------------- 1254: %% Escalus-related helpers 1255: 1256: user_receives(User, Bodies) -> 1257: ExpectedLength = length(Bodies), 1258: Messages = escalus_client:wait_for_stanzas(User, ExpectedLength), 1259: SortedMessages = order_by_seqnum(Messages), 1260: case length(Messages) of 1261: ExpectedLength -> 1262: Checks = [escalus_pred:is_chat_message(Body, Stanza) || {Body, Stanza} <- lists:zip(Bodies, SortedMessages)], 1263: case lists:all(fun(Check) -> Check end, Checks) of 1264: true -> 1265: ok; 1266: false -> 1267: ct:fail({user_receives_failed, {wanted, Bodies}, {received, SortedMessages}, {check, Checks}}) 1268: end; 1269: _ -> 1270: ct:fail({user_receives_not_enough, {wanted, Bodies}, {received, SortedMessages}}) 1271: end. 1272: 1273: 1274: %% ----------------------------------------------------------------------- 1275: %% Refreshing helpers 1276: 1277: %% Reason is a string 1278: %% NodeName is asia_node, europe_node2, ... in a format used by this suite. 1279: refresh_mappings(NodeName, Reason) when is_list(Reason) -> 1280: rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]). 1281: 1282: refresh_hosts(NodeNames, Reason) -> 1283: [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames]. 1284: 1285: 1286: %% ----------------------------------------------------------------------- 1287: %% Other helpers 1288: 1289: connect_steps_with_sm() -> 1290: [start_stream, stream_features, maybe_use_ssl, 1291: authenticate, bind, session, stream_resumption]. 1292: 1293: bare_client(Client) -> 1294: Client#client{jid = escalus_utils:get_short_jid(Client)}. 1295: 1296: service_port() -> 1297: ct:get_config({hosts, mim, service_port}). 1298: 1299: 1300: %% ----------------------------------------------------------------------- 1301: %% Waiting helpers 1302: 1303: wait_for_domain(Node, Domain) -> 1304: F = fun() -> 1305: Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []), 1306: lists:member(Domain, Domains) 1307: end, 1308: mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}). 1309: 1310: %% We receive presence BEFORE session is registered in ejabberd_sm. 1311: %% So, to ensure that we processed do_open_session completely, let's send a "ping". 1312: %% by calling the c2s process. 1313: %% That call would only return, when all messages in erlang message queue 1314: %% are processed. 1315: wait_for_registration(Client, Node) -> 1316: RPCSpec = #{node => Node}, 1317: mongoose_helper:wait_until(fun() -> is_pid(mongoose_helper:get_session_pid(Client, RPCSpec)) end, true, 1318: #{name => wait_for_session}), 1319: C2sPid = mongoose_helper:get_session_pid(Client, RPCSpec), 1320: rpc:call(node(C2sPid), ejabberd_c2s, get_info, [C2sPid]), 1321: ok. 1322: 1323: 1324: %% ----------------------------------------------------------------------- 1325: %% Ensure, that endpoints are up 1326: 1327: wait_for_listeners_to_appear() -> 1328: [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())]. 1329: 1330: receiver_ports(Hosts) -> 1331: lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts). 1332: 1333: wait_for_can_connect_to_port(Port) -> 1334: Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}}, 1335: mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts). 1336: 1337: can_connect_to_port(Port) -> 1338: case gen_tcp:connect("127.0.0.1", Port, []) of 1339: {ok, Sock} -> 1340: gen_tcp:close(Sock), 1341: true; 1342: Other -> 1343: ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]), 1344: false 1345: end. 1346: 1347: %% ----------------------------------------------------------------------- 1348: %% Custom log levels for GD modules during the tests 1349: 1350: enable_logging() -> 1351: mim_loglevel:enable_logging(test_hosts(), custom_loglevels()). 1352: 1353: disable_logging() -> 1354: mim_loglevel:disable_logging(test_hosts(), custom_loglevels()). 1355: 1356: custom_loglevels() -> 1357: %% for "s2s connection to muc.localhost not found" debugging 1358: [{ejabberd_s2s, debug}, 1359: %% for debugging event=refreshing_own_data_done 1360: {mod_global_distrib_mapping_redis, info}, 1361: %% to know if connection is already started or would be started 1362: %% event=outgoing_conn_start_progress 1363: {mod_global_distrib_outgoing_conns_sup, info}, 1364: %% to debug bound connection issues 1365: {mod_global_distrib, debug}, 1366: %% to know all new connections pids 1367: {mod_global_distrib_connection, debug}, 1368: %% to check if gc or refresh is triggered 1369: {mod_global_distrib_server_mgr, info}, 1370: %% To debug incoming connections 1371: % {mod_global_distrib_receiver, info}, 1372: %% to debug global session set/delete 1373: {mod_global_distrib_mapping, debug} 1374: ]. 1375: 1376: test_hosts() -> [mim, mim2, reg].