1: %%============================================================================== 2: %% Copyright 2017 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mod_global_distrib_SUITE). 18: 19: -compile([export_all, nowarn_export_all]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("common_test/include/ct.hrl"). 23: -include_lib("eunit/include/eunit.hrl"). 24: -include_lib("escalus/include/escalus_xmlns.hrl"). 25: -include_lib("exml/include/exml.hrl"). 26: 27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms 28: 29: -import(domain_helper, [domain/0]). 30: -import(config_parser_helper, [config/2, mod_config/2]). 31: 32: %%-------------------------------------------------------------------- 33: %% Suite configuration 34: %%-------------------------------------------------------------------- 35: 36: all() -> 37: [ 38: {group, mod_global_distrib}, 39: {group, cluster_restart}, 40: {group, start_checks}, 41: {group, invalidation}, 42: {group, multi_connection}, 43: {group, rebalancing}, 44: {group, advertised_endpoints}, 45: {group, hosts_refresher} 46: ]. 47: 48: groups() -> 49: [{mod_global_distrib, [], 50: [ 51: test_pm_between_users_at_different_locations, 52: test_pm_between_users_before_available_presence, 53: test_component_disconnect , 54: test_component_on_one_host, 55: test_components_in_different_regions, 56: test_hidden_component_disco_in_different_region, 57: test_pm_with_disconnection_on_other_server, 58: test_pm_with_graceful_reconnection_to_different_server, 59: test_pm_with_ungraceful_reconnection_to_different_server, 60: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first, 61: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first, 62: test_component_unregister, 63: test_update_senders_host, 64: test_update_senders_host_by_ejd_service, 65: 66: %% with node 2 disabled 67: test_muc_conversation_on_one_host, 68: test_global_disco 69: ]}, 70: {hosts_refresher, [], 71: [test_host_refreshing]}, 72: {cluster_restart, [], 73: [ 74: test_location_disconnect 75: ]}, 76: {start_checks, [], 77: [ 78: test_error_on_wrong_hosts 79: ]}, 80: {invalidation, [], 81: [ 82: % TODO: Add checks for other mapping refreshes 83: refresh_nodes 84: ]}, 85: {multi_connection, [], 86: [ 87: test_in_order_messages_on_multiple_connections, 88: test_in_order_messages_on_multiple_connections_with_bounce, 89: test_messages_bounced_in_order, 90: 91: %% with node 2 disabled 92: test_muc_conversation_history 93: ]}, 94: {rebalancing, [], 95: [ 96: enable_new_endpoint_on_refresh, 97: disable_endpoint_on_refresh, 98: wait_for_connection, 99: closed_connection_is_removed_from_disabled 100: ]}, 101: {advertised_endpoints, [], 102: [ 103: test_advertised_endpoints_override_endpoints, 104: test_pm_between_users_at_different_locations 105: ]} 106: ]. 107: 108: suite() -> 109: [{require, europe_node1, {hosts, mim, node}}, 110: {require, europe_node2, {hosts, mim2, node}}, 111: {require, asia_node, {hosts, reg, node}}, 112: {require, c2s_port, {hosts, mim, c2s_port}} | 113: escalus:suite()]. 114: 115: %%-------------------------------------------------------------------- 116: %% Init & teardown 117: %%-------------------------------------------------------------------- 118: 119: init_per_suite(Config) -> 120: case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]), 121: rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of 122: {{ok, _}, {ok, _}} -> 123: ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]), 124: 125: enable_logging(), 126: escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]); 127: Result -> 128: ct:pal("Redis check result: ~p", [Result]), 129: {skip, "GD Redis default pool not available"} 130: end. 131: 132: end_per_suite(Config) -> 133: disable_logging(), 134: escalus_fresh:clean(), 135: rpc(europe_node2, mongoose_cluster, leave, []), 136: escalus:end_per_suite(Config). 137: 138: init_per_group(start_checks, Config) -> 139: NodeName = europe_node1, 140: NodeSpec = node_spec(NodeName), 141: Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config), 142: dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]), 143: Config1; 144: init_per_group(multi_connection, Config) -> 145: ExtraConfig = #{bounce => #{resend_after_ms => 20000}, 146: connections => #{%% Disable unused feature to avoid interference 147: connections_per_endpoint => 100, 148: disabled_gc_interval => 10000}}, 149: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 150: init_per_group(invalidation, Config) -> 151: Config1 = init_per_group(invalidation_generic, Config), 152: NodeBin = <<"fake_node@localhost">>, 153: [{node_to_expire, NodeBin} | Config1]; 154: init_per_group(rebalancing, Config) -> 155: %% We need to prevent automatic refreshes, because they may interfere with tests 156: %% and we need early disabled garbage collection to check its validity 157: ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600, 158: endpoint_refresh_interval_when_empty => 3600, 159: disabled_gc_interval => 1}, 160: redis => #{refresh_after => 3600}}, 161: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 162: init_per_group(advertised_endpoints, Config) -> 163: lists:foreach(fun({NodeName, _, _}) -> 164: Node = ct:get_config(NodeName), 165: mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload) 166: end, get_hosts()), 167: mock_inet_on_each_node(), 168: init_per_group_generic( 169: [{add_advertised_endpoints, 170: [{asia_node, advertised_endpoints()}]} | Config]); 171: init_per_group(mod_global_distrib, Config) -> 172: %% Disable mod_global_distrib_mapping_redis refresher 173: ExtraConfig = #{redis => #{refresh_after => 3600}}, 174: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 175: init_per_group(_, Config) -> 176: init_per_group_generic(Config). 177: 178: init_per_group_generic(Config0) -> 179: Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()), 180: wait_for_listeners_to_appear(), 181: {SomeNode, _, _} = hd(get_hosts()), 182: NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []), 183: [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2]. 184: 185: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) -> 186: Extra0 = module_opts(?config(extra_config, Config0)), 187: EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0), 188: ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})), 189: Extra = Extra0#{local_host => LocalHost, connections => ConnExtra}, 190: Opts = module_opts(Extra), 191: 192: VirtHosts = virtual_hosts(NodeName), 193: Node = node_spec(NodeName), 194: Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0), 195: 196: %% To reduce load when sending many messages 197: ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last, 198: mod_stream_management], 199: [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts], 200: 201: SMBackend = ct_helper:get_internal_database(), 202: SMOpts = config_parser_helper:mod_config(mod_stream_management, 203: #{resume_timeout => 1, backend => SMBackend}), 204: dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts}, 205: {mod_stream_management, SMOpts}]), 206: Config1. 207: 208: module_opts(ExtraOpts) -> 209: lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]). 210: 211: set_opts(common, Opts) -> 212: maps:merge(#{global_host => <<"localhost">>, 213: hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts); 214: set_opts(defaults, Opts) -> 215: mod_config(mod_global_distrib, Opts); 216: set_opts(connections, #{connections := ConnExtra} = Opts) -> 217: TLSOpts = config([modules, mod_global_distrib, connections, tls], 218: #{certfile => "priv/ssl/fake_server.pem", 219: cacertfile => "priv/ssl/ca/cacert.pem"}), 220: Opts#{connections := config([modules, mod_global_distrib, connections], 221: maps:merge(#{tls => TLSOpts}, ConnExtra))}; 222: set_opts(redis, #{redis := RedisExtra} = Opts) -> 223: Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)}; 224: set_opts(bounce, #{bounce := BounceExtra} = Opts) -> 225: Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)}; 226: set_opts(_, Opts) -> 227: Opts. 228: 229: end_per_group(advertised_endpoints, Config) -> 230: Pids = ?config(meck_handlers, Config), 231: unmock_inet(Pids), 232: escalus_fresh:clean(), 233: end_per_group_generic(Config); 234: end_per_group(start_checks, Config) -> 235: escalus_fresh:clean(), 236: Config; 237: end_per_group(invalidation, Config) -> 238: redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config), 239: ?config(node_to_expire, Config)]), 240: end_per_group_generic(Config); 241: end_per_group(_, Config) -> 242: end_per_group_generic(Config). 243: 244: end_per_group_generic(Config) -> 245: dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config). 246: 247: init_per_testcase(CaseName, Config) 248: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 249: CaseName == test_muc_conversation_history -> 250: %% There is no helper to load MUC on node2 251: %% For now it's easier to hide node2 252: %% TODO: Do it right at some point! 253: hide_node(europe_node2, Config), 254: %% There would be no new connections to europe_node2, but there can be some old ones. 255: %% We need to disconnect previous connections. 256: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 257: trigger_rebalance(asia_node, EuropeHost), 258: %% Load muc on mim node 259: muc_helper:load_muc(), 260: RegNode = ct:get_config({hosts, reg, node}), 261: %% Wait for muc.localhost to become visible from reg node 262: wait_for_domain(RegNode, muc_helper:muc_host()), 263: escalus:init_per_testcase(CaseName, Config); 264: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 265: CN == test_pm_with_ungraceful_reconnection_to_different_server; 266: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 267: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 268: escalus:init_per_testcase(CN, init_user_eve(Config)); 269: init_per_testcase(CaseName, Config) -> 270: escalus:init_per_testcase(CaseName, Config). 271: 272: init_user_eve(Config) -> 273: %% Register Eve in reg cluster 274: EveSpec = escalus_fresh:create_fresh_user(Config, eve), 275: MimPort = ct:get_config({hosts, mim, c2s_port}), 276: EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}), 277: %% Register Eve in mim cluster 278: escalus:create_users(Config, [{eve, EveSpec2}]), 279: [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config]. 280: 281: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 282: CN == test_pm_with_ungraceful_reconnection_to_different_server; 283: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 284: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 285: MimEveSpec = ?config(evespec_mim, Config), 286: %% Clean Eve from reg cluster 287: escalus_fresh:clean(), 288: %% Clean Eve from mim cluster 289: %% For shared databases (i.e. mysql, pgsql...), 290: %% removing from one cluster would remove from all clusters. 291: %% For mnesia auth backend we need to call removal from each cluster. 292: %% That's why there is a catch here. 293: catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]), 294: generic_end_per_testcase(CN, Config); 295: end_per_testcase(CaseName, Config) 296: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 297: CaseName == test_muc_conversation_history -> 298: refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)), 299: muc_helper:unload_muc(), 300: generic_end_per_testcase(CaseName, Config); 301: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) -> 302: refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 303: generic_end_per_testcase(CN, Config); 304: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh; 305: CN == disable_endpoint_on_refresh; 306: CN == wait_for_connection; 307: CN == closed_connection_is_removed_from_disabled -> 308: restart_receiver(asia_node), 309: refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 310: generic_end_per_testcase(CN, Config); 311: end_per_testcase(CaseName, Config) -> 312: generic_end_per_testcase(CaseName, Config). 313: 314: generic_end_per_testcase(CaseName, Config) -> 315: lists:foreach( 316: fun({NodeName, _, _}) -> 317: %% TODO: Enable refresher only for specific test cases, 318: %% as some of them are based on assumption that node(s) 319: %% must open new connections during tests. 320: pause_refresher(NodeName, CaseName), 321: Node = ct:get_config(NodeName), 322: SupRef = {mod_global_distrib_outgoing_conns_sup, Node}, 323: try 324: OutgoingConns = supervisor:which_children(SupRef), 325: lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) -> 326: skip; 327: ({Id, _, _, _}) -> 328: supervisor:terminate_child(SupRef, Id) 329: end, OutgoingConns), 330: [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] = 331: supervisor:which_children(SupRef) 332: catch 333: _:{noproc, _} -> 334: ct:pal("Sender supervisor not found in ~p", [NodeName]) 335: end, 336: unpause_refresher(NodeName, CaseName) 337: end, 338: get_hosts()), 339: escalus:end_per_testcase(CaseName, Config). 340: 341: virtual_hosts(asia_node) -> 342: [domain()]; 343: virtual_hosts(_) -> 344: [domain(), secondary_domain()]. 345: 346: secondary_domain() -> 347: ct:get_config({hosts, mim, secondary_domain}). 348: 349: %% Refresher is not started at all or stopped for some test cases 350: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 351: pause_refresher(_, test_error_on_wrong_hosts) -> 352: ok; 353: pause_refresher(asia_node, test_location_disconnect) -> 354: ok; 355: pause_refresher(NodeName, _) -> 356: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []). 357: 358: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 359: unpause_refresher(_, test_error_on_wrong_hosts) -> 360: ok; 361: unpause_refresher(asia_node, test_location_disconnect) -> 362: ok; 363: unpause_refresher(NodeName, _) -> 364: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []). 365: 366: %%-------------------------------------------------------------------- 367: %% Service discovery test 368: %%-------------------------------------------------------------------- 369: 370: %% Requires module mod_global_distrib to be started with argument advertised_endpoints 371: %% for each host in get_hosts(). 372: %% Reads Redis to confirm that endpoints (in Redis) are overwritten 373: %% with `advertised_endpoints` option value 374: test_advertised_endpoints_override_endpoints(_Config) -> 375: Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]), 376: true = lists:all( 377: fun(E) -> 378: lists:sort(E) =:= lists:sort(advertised_endpoints()) 379: end, Endps). 380: 381: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if 382: %% it goes down for some reason (crash or domain unavailability). 383: %% Also actually verifies that refresher properly reads host list 384: %% from backend and starts appropriate pool. 385: test_host_refreshing(_Config) -> 386: mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true, 387: #{name => trees_for_connections_present, 388: time_left => timer:seconds(10)}), 389: ConnectionSups = out_connection_sups(asia_node), 390: {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 391: EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]), 392: {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups), 393: erlang:exit(EuropePid, kill), % it's ok to kill temporary process 394: mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true, 395: #{name => tree_for_sup_present}). 396: 397: %% When run in mod_global_distrib group - tests simple case of connection 398: %% between two users connected to different clusters. 399: %% When run in advertised_endpoints group it tests whether it is possible 400: %% to connect to a node that is advertising itself with a domain name. 401: test_pm_between_users_at_different_locations(Config) -> 402: escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2). 403: 404: test_pm_between_users_before_available_presence(Config) -> 405: Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]), 406: {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>), 407: {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>), 408: 409: test_two_way_pm(Alice, Eve), 410: 411: escalus_client:stop(Config1, Alice), 412: escalus_client:stop(Config1, Eve). 413: 414: test_two_way_pm(Alice, Eve) -> 415: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)), 416: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)), 417: 418: FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)), 419: FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)), 420: 421: AliceJid = escalus_client:full_jid(Alice), 422: EveJid = escalus_client:full_jid(Eve), 423: 424: escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>], 425: FromAlice), 426: escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>], 427: FromEve). 428: 429: test_muc_conversation_on_one_host(Config0) -> 430: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 431: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 432: escalus:fresh_story( 433: Config, [{eve, 1}], 434: fun(Eve) -> 435: Alice = connect_from_spec(AliceSpec, Config), 436: 437: RoomJid = ?config(room, Config), 438: AliceUsername = escalus_utils:get_username(Alice), 439: EveUsername = escalus_utils:get_username(Eve), 440: RoomAddr = muc_helper:room_address(RoomJid), 441: 442: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 443: wait_for_muc_presence(Alice, RoomJid, AliceUsername), 444: wait_for_subject(Alice), 445: 446: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 447: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 448: wait_for_muc_presence(Eve, RoomJid, EveUsername), 449: wait_for_muc_presence(Alice, RoomJid, EveUsername), 450: wait_for_subject(Eve), 451: 452: Msg= <<"Hi, Eve!">>, 453: escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)), 454: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)), 455: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)), 456: 457: Msg2= <<"Hi, Alice!">>, 458: escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)), 459: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)), 460: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice)) 461: end), 462: muc_helper:destroy_room(Config). 463: 464: test_muc_conversation_history(Config0) -> 465: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 466: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 467: escalus:fresh_story( 468: Config, [{eve, 1}], 469: fun(Eve) -> 470: Alice = connect_from_spec(AliceSpec, Config), 471: 472: RoomJid = ?config(room, Config), 473: AliceUsername = escalus_utils:get_username(Alice), 474: RoomAddr = muc_helper:room_address(RoomJid), 475: 476: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 477: wait_for_muc_presence(Alice, RoomJid, AliceUsername), 478: wait_for_subject(Alice), 479: 480: send_n_muc_messages(Alice, RoomAddr, 3), 481: 482: %% Ensure that the messages are received by the room 483: %% before trying to login Eve. 484: %% Otherwise, Eve would receive some messages from history and 485: %% some as regular groupchat messages. 486: receive_n_muc_messages(Alice, 3), 487: 488: EveUsername = escalus_utils:get_username(Eve), 489: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 490: 491: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 492: wait_for_muc_presence(Eve, RoomJid, EveUsername), 493: wait_for_muc_presence(Alice, RoomJid, EveUsername), 494: 495: %% XEP-0045: After sending the presence broadcast (and only after doing so), 496: %% the service MAY then send discussion history, the room subject, 497: %% live messages, presence updates, and other in-room traffic. 498: receive_n_muc_messages(Eve, 3), 499: wait_for_subject(Eve) 500: end), 501: muc_helper:destroy_room(Config). 502: 503: wait_for_muc_presence(User, RoomJid, FromNickname) -> 504: Presence = escalus:wait_for_stanza(User), 505: escalus:assert(is_presence, Presence), 506: escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence), 507: ok. 508: 509: wait_for_subject(User) -> 510: Subject = escalus:wait_for_stanza(User), 511: escalus:assert(is_groupchat_message, Subject), 512: ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)), 513: ok. 514: 515: send_n_muc_messages(User, RoomAddr, N) -> 516: lists:foreach(fun(I) -> 517: Msg = <<"test-", (integer_to_binary(I))/binary>>, 518: escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg)) 519: end, lists:seq(1, N)). 520: 521: receive_n_muc_messages(User, N) -> 522: lists:foreach(fun(J) -> 523: Msg = <<"test-", (integer_to_binary(J))/binary>>, 524: Stanza = escalus:wait_for_stanza(User), 525: escalus:assert(is_groupchat_message, [Msg], Stanza) 526: end, lists:seq(1, N)). 527: 528: test_component_on_one_host(Config) -> 529: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 530: {port, service_port()}, {component, <<"test_service">>}], 531: 532: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 533: 534: Story = fun(User) -> 535: Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>), 536: escalus:send(User, Msg1), 537: %% Then component receives it 538: Reply1 = escalus:wait_for_stanza(Comp), 539: escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1), 540: 541: %% When components sends a reply 542: Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>), 543: escalus:send(Comp, escalus_stanza:from(Msg2, Addr)), 544: 545: %% Then Alice receives it 546: Reply2 = escalus:wait_for_stanza(User), 547: escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2), 548: escalus:assert(is_stanza_from, [Addr], Reply2) 549: end, 550: 551: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 552: 553: %% Ensures that 2 components in distinct data centers can communicate. 554: test_components_in_different_regions(_Config) -> 555: ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>}, 556: {server, <<"localhost">>}, {component, <<"test_service">>}], 557: Comp1Port = ct:get_config({hosts, mim, service_port}), 558: Comp2Port = ct:get_config({hosts, reg, service_port}), 559: Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig], 560: Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig], 561: 562: {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config), 563: {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config), 564: 565: Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1), 566: escalus:send(Comp1, Msg1), 567: GotMsg1 = escalus:wait_for_stanza(Comp2), 568: escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1), 569: 570: Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2), 571: escalus:send(Comp2, Msg2), 572: GotMsg2 = escalus:wait_for_stanza(Comp1), 573: escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2). 574: 575: %% Ordinary user is not able to discover the hidden component from GD 576: test_hidden_component_disco_in_different_region(Config) -> 577: %% Hidden component from component_SUITE connects to mim1/europe_node1 578: HiddenComponentConfig = component_helper:spec(hidden_component, Config), 579: {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig), 580: 581: escalus:fresh_story( 582: Config, [{eve, 1}], 583: fun(Eve) -> 584: EveServer = escalus_client:server(Eve), 585: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 586: DiscoReply = escalus:wait_for_stanza(Eve), 587: escalus:assert(is_iq_result, DiscoReply), 588: escalus:assert(fun(Stanza) -> 589: not escalus_pred:has_service(HiddenAddr, Stanza) 590: end, DiscoReply) 591: end). 592: 593: test_component_disconnect(Config) -> 594: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 595: {port, service_port()}, {component, <<"test_service">>}], 596: 597: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 598: component_helper:disconnect_component(Comp, Addr), 599: 600: Story = fun(User) -> 601: escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)), 602: Error = escalus:wait_for_stanza(User, 5000), 603: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 604: end, 605: 606: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 607: 608: test_location_disconnect(Config) -> 609: try 610: escalus:fresh_story( 611: Config, [{alice, 1}, {eve, 1}], 612: fun(Alice, Eve) -> 613: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 614: 615: escalus_client:wait_for_stanza(Eve), 616: 617: ok = rpc(asia_node, application, stop, [mongooseim]), 618: %% TODO: Stopping mongooseim alone should probably stop connections too 619: ok = rpc(asia_node, application, stop, [ranch]), 620: 621: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), 622: Error = escalus:wait_for_stanza(Alice), 623: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 624: end) 625: after 626: rpc(asia_node, application, start, [ranch]), 627: rpc(asia_node, application, start, [mongooseim]) 628: end. 629: 630: test_pm_with_disconnection_on_other_server(Config) -> 631: escalus:fresh_story( 632: Config, [{alice, 1}, {eve, 1}], 633: fun(Alice, Eve) -> 634: escalus_connection:stop(Eve), 635: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 636: FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000), 637: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce) 638: end). 639: 640: test_pm_with_graceful_reconnection_to_different_server(Config) -> 641: EveSpec = ?config(evespec_reg, Config), 642: EveSpec2 = ?config(evespec_mim, Config), 643: escalus:fresh_story( 644: Config, [{alice, 1}], 645: fun(Alice) -> 646: Eve = connect_from_spec(EveSpec, Config), 647: 648: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)), 649: 650: %% Stop connection and wait for process to die 651: EveNode = ct:get_config({hosts, reg, node}), 652: mongoose_helper:logout_user(Config, Eve, #{node => EveNode}), 653: 654: FromEve = escalus_client:wait_for_stanza(Alice), 655: 656: %% Pause Alice until Eve is reconnected 657: AliceNode = ct:get_config({hosts, mim, node}), 658: C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}), 659: ok = rpc(asia_node, sys, suspend, [C2sPid]), 660: 661: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)), 662: 663: NewEve = connect_from_spec(EveSpec2, Config), 664: 665: ok = rpc(asia_node, sys, resume, [C2sPid]), 666: 667: 668: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)), 669: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)), 670: 671: FirstFromAlice = escalus_client:wait_for_stanza(NewEve), 672: AgainFromEve = escalus_client:wait_for_stanza(Alice), 673: SecondFromAlice = escalus_client:wait_for_stanza(NewEve), 674: 675: [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]), 676: 677: escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice), 678: escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve), 679: escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice), 680: escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve) 681: end). 682: 683: %% Refresh logic can cause two possible behaviours. 684: %% We test both behaviours here (plus no refresh case) 685: %% See PR #2392 686: test_pm_with_ungraceful_reconnection_to_different_server(Config) -> 687: %% No refresh 688: BeforeResume = fun() -> ok end, 689: AfterCheck = fun(Alice, NewEve) -> 690: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 691: user_receives(Alice, [<<"Hi from Europe!">>]) 692: end, 693: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 694: 695: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) -> 696: %% Same as no refresh 697: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first", 698: % Order of nodes is important here in refresh_hosts! 699: BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end, 700: AfterCheck = fun(Alice, NewEve) -> 701: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 702: user_receives(Alice, [<<"Hi from Europe!">>]) 703: end, 704: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 705: 706: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) -> 707: %% Asia node overrides Europe value with the older ones, 708: %% so we loose some messages during rerouting :( 709: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first", 710: BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end, 711: AfterCheck = fun(Alice, NewEve) -> 712: user_receives(NewEve, [<<"Hi again from Europe1!">>]), 713: user_receives(Alice, [<<"Hi from Europe!">>]) 714: end, 715: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 716: 717: %% Reconnect Eve from asia (reg cluster) to europe (mim) 718: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) -> 719: Config = escalus_users:update_userspec(Config0, eve, stream_management, true), 720: EveSpec = ?config(evespec_reg, Config), 721: EveSpec2 = ?config(evespec_mim, Config), 722: escalus:fresh_story( 723: Config, [{alice, 1}], 724: fun(Alice) -> 725: {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()), 726: escalus_story:send_initial_presence(Eve), 727: escalus_client:wait_for_stanza(Eve), 728: 729: %% Stop connection and wait for process to die 730: EveNode = ct:get_config({hosts, reg, node}), 731: C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}), 732: ok = rpc(asia_node, sys, suspend, [C2sPid]), 733: 734: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)), 735: 736: %% Wait for route message to be queued in c2s message queue 737: mongoose_helper:wait_for_route_message_count(C2sPid, 1), 738: 739: %% Time to do bad nasty things with our socket, so once our process wakes up, 740: %% it SHOULD detect a dead socket 741: escalus_connection:kill(Eve), 742: 743: %% Connect another one, we hope the message would be rerouted 744: NewEve = connect_from_spec(EveSpec2, Config), 745: 746: BeforeResume(), 747: 748: %% Trigger rerouting 749: ok = rpc(asia_node, sys, resume, [C2sPid]), 750: 751: %% Let C2sPid to process the message and reroute (and die finally, poor little thing) 752: mongoose_helper:wait_for_pid_to_die(C2sPid), 753: 754: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)), 755: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)), 756: 757: AfterCheck(Alice, NewEve) 758: end). 759: 760: test_global_disco(Config) -> 761: escalus:fresh_story( 762: Config, [{alice, 1}, {eve, 1}], 763: fun(Alice, Eve) -> 764: AliceServer = escalus_client:server(Alice), 765: escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)), 766: _AliceStanza = escalus:wait_for_stanza(Alice), 767: %% TODO: test for duplicate components 768: %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza), 769: 770: EveServer = escalus_client:server(Eve), 771: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 772: EveStanza = escalus:wait_for_stanza(Eve), 773: escalus:assert(has_service, [muc_helper:muc_host()], EveStanza) 774: end). 775: 776: test_component_unregister(_Config) -> 777: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 778: {port, service_port()}, {component, <<"test_service">>}], 779: 780: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 781: ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 782: [<<"test_service.localhost">>])), 783: 784: component_helper:disconnect_component(Comp, Addr), 785: 786: ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 787: [<<"test_service.localhost">>])). 788: 789: test_error_on_wrong_hosts(_Config) -> 790: Opts = module_opts(#{local_host => <<"no_such_host">>}), 791: ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed, 792: domain := <<"no_such_host">>}, _}}}, 793: dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>, 794: [{mod_global_distrib, Opts}])). 795: 796: refresh_nodes(Config) -> 797: NodesKey = ?config(nodes_key, Config), 798: NodeBin = ?config(node_to_expire, Config), 799: redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]), 800: refresh_mappings(europe_node1, "by_refresh_nodes"), 801: {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]). 802: 803: test_in_order_messages_on_multiple_connections(Config) -> 804: escalus:fresh_story( 805: Config, [{alice, 1}, {eve, 1}], 806: fun(Alice, Eve) -> 807: Seq = lists:seq(1, 100), 808: lists:foreach( 809: fun(I) -> 810: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 811: escalus_client:send(Alice, Stanza) 812: end, 813: Seq), 814: lists:foreach( 815: fun(I) -> 816: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 817: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 818: end, 819: Seq) 820: end). 821: 822: test_in_order_messages_on_multiple_connections_with_bounce(Config) -> 823: escalus:fresh_story( 824: Config, [{alice, 1}, {eve, 1}], 825: fun(Alice, Eve) -> 826: %% Send 99 messages, some while server knows the mapping and some when it doesn't 827: send_steps(Alice, Eve, 99, <<"reg1">>), 828: %% Make sure that the last message is sent when the mapping is known 829: set_mapping(europe_node1, Eve, <<"reg1">>), 830: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)), 831: 832: %% Check that all stanzas were received in order 833: lists:foreach( 834: fun(I) -> 835: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 836: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 837: end, 838: lists:seq(1, 100)) 839: end). 840: 841: test_messages_bounced_in_order(Config) -> 842: escalus:fresh_story( 843: Config, [{alice, 1}, {eve, 1}], 844: fun(Alice, Eve) -> 845: %% Make sure all messages land in bounce storage 846: delete_mapping(europe_node1, Eve), 847: 848: Seq = lists:seq(1, 100), 849: lists:foreach( 850: fun(I) -> 851: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 852: escalus_client:send(Alice, Stanza) 853: end, 854: Seq), 855: 856: %% Restore the mapping so that bounce eventually succeeds 857: ?assertEqual(undefined, get_mapping(europe_node1, Eve)), 858: set_mapping(europe_node1, Eve, <<"reg1">>), 859: 860: lists:foreach( 861: fun(I) -> 862: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 863: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 864: end, 865: Seq) 866: end). 867: 868: test_update_senders_host(Config) -> 869: escalus:fresh_story( 870: Config, [{alice, 1}, {eve, 1}], 871: fun(Alice, Eve) -> 872: AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]), 873: {ok, <<"localhost.bis">>} 874: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]), 875: ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]), 876: wait_for_node(asia_node, AliceJid), 877: 878: %% TODO: Should prevent Redis refresher from executing for a moment, 879: %% as it may collide with this test. 880: 881: escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)), 882: escalus:wait_for_stanza(Eve), 883: 884: {ok, <<"localhost.bis">>} 885: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]) 886: end). 887: wait_for_node(Node,Jid) -> 888: mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end, 889: error, 890: #{time_left => timer:seconds(10), 891: sleep_time => timer:seconds(1), 892: name => rpc}). 893: 894: test_update_senders_host_by_ejd_service(Config) -> 895: refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"), 896: %% Connects to europe_node1 897: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 898: {port, service_port()}, {component, <<"test_service">>}], 899: 900: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 901: 902: escalus:fresh_story( 903: Config, [{eve, 1}], 904: fun(Eve) -> 905: %% Eve is connected to asia_node 906: EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]), 907: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 908: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]), 909: 910: ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]), 911: wait_for_node(europe_node1, EveJid), 912: wait_for_node(europe_node2, EveJid), 913: 914: %% Component is connected to europe_node1 915: %% but we force asia_node to connect to europe_node2 by hiding europe_node1 916: %% and forcing rebalance (effectively disabling connections to europe_node1) 917: %% to verify routing cache update on both nodes 918: 919: %% TODO: Should prevent Redis refresher from executing for a moment, 920: %% as it may collide with this test. 921: 922: hide_node(europe_node1, Config), 923: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 924: trigger_rebalance(asia_node, EuropeHost), 925: 926: escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)), 927: escalus:wait_for_stanza(Comp), 928: 929: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 930: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]) 931: end). 932: 933: %% -------------------------------- Rebalancing -------------------------------- 934: 935: enable_new_endpoint_on_refresh(Config) -> 936: get_connection(europe_node1, <<"reg1">>), 937: 938: {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 939: 940: ExtraPort = get_port(reg, gd_extra_endpoint_port), 941: NewEndpoint = resolved_endpoint(ExtraPort), 942: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 943: 944: {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 945: 946: %% One new pool and one new endpoint 947: [NewEndpoint] = Pools2 -- Pools1, 948: [] = Pools1 -- Pools2, 949: [NewEndpoint] = Enabled2 -- Enabled1, 950: [] = Enabled1 -- Enabled2. 951: 952: disable_endpoint_on_refresh(Config) -> 953: ExtraPort = get_port(reg, gd_extra_endpoint_port), 954: NewEndpoint = resolved_endpoint(ExtraPort), 955: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 956: 957: get_connection(europe_node1, <<"reg1">>), 958: 959: {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 960: [_, _] = Enabled1, 961: [] = Disabled1, 962: 963: hide_extra_endpoint(asia_node), 964: trigger_rebalance(europe_node1, <<"reg1">>), 965: 966: {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 967: 968: %% 2 pools open even after disable 969: [] = Pools1 -- Pools2, 970: [] = Pools2 -- Pools1, 971: %% NewEndpoint is no longer enabled 972: [] = Enabled2 -- Enabled1, 973: [NewEndpoint] = Enabled1 -- Enabled2, 974: %% NewEndpoint is now disabled 975: [] = Disabled1, 976: [NewEndpoint] = Disabled2. 977: 978: wait_for_connection(_Config) -> 979: set_endpoints(asia_node, []), 980: %% Because of hosts refresher, a pool of connections to asia_node 981: %% may already be present here 982: mongoose_helper:wait_until( 983: fun () -> 984: try trigger_rebalance(europe_node1, <<"reg1">>), true 985: catch _:_ -> false end 986: end, 987: true, 988: #{name => rebalance, time_left => timer:seconds(5)}), 989: 990: spawn_connection_getter(europe_node1), 991: 992: receive 993: Unexpected1 -> error({unexpected, Unexpected1}) 994: after 995: 2000 -> ok 996: end, 997: 998: refresh_mappings(asia_node, "by_wait_for_connection"), 999: trigger_rebalance(europe_node1, <<"reg1">>), 1000: 1001: receive 1002: Conn when is_pid(Conn) -> ok; 1003: Unexpected2 -> error({unexpected, Unexpected2}) 1004: after 1005: 5000 -> error(timeout) 1006: end. 1007: 1008: closed_connection_is_removed_from_disabled(_Config) -> 1009: get_connection(europe_node1, <<"reg1">>), 1010: set_endpoints(asia_node, []), 1011: trigger_rebalance(europe_node1, <<"reg1">>), 1012: 1013: {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>), 1014: 1015: % Will drop connections and prevent them from reconnecting 1016: restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]), 1017: 1018: mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end, 1019: {[], [], []}, 1020: #{name => get_outgoing_connections}). 1021: 1022: 1023: %%-------------------------------------------------------------------- 1024: %% Test helpers 1025: %%-------------------------------------------------------------------- 1026: 1027: get_port(Host, Param) -> 1028: case ct:get_config({hosts, Host, Param}) of 1029: Port when is_integer(Port) -> 1030: Port; 1031: Other -> 1032: ct:fail({get_port_failed, Host, Param, Other}) 1033: end. 1034: 1035: get_hosts() -> 1036: [ 1037: {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)}, 1038: {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)}, 1039: {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)} 1040: ]. 1041: 1042: listen_endpoint(NodeName) -> 1043: endpoint(listen_port(NodeName)). 1044: 1045: listen_port(NodeName) -> 1046: {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()), 1047: Port. 1048: 1049: resolved_endpoint(Port) when is_integer(Port) -> 1050: {{127, 0, 0, 1}, Port}. 1051: 1052: endpoint(Port) when is_integer(Port) -> 1053: {"127.0.0.1", Port}. 1054: 1055: %% For dynamic_modules 1056: node_spec(NodeName) -> 1057: #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}. 1058: 1059: rpc(NodeName, M, F, A) -> 1060: Node = ct:get_config(NodeName), 1061: mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)). 1062: 1063: hide_node(NodeName, Config) -> 1064: NodesKey = ?config(nodes_key, Config), 1065: NodeBin = atom_to_binary(ct:get_config(NodeName), latin1), 1066: {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]). 1067: 1068: connect_from_spec(UserSpec, Config) -> 1069: {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>), 1070: escalus_story:send_initial_presence(User), 1071: escalus:assert(is_presence, escalus_client:wait_for_stanza(User)), 1072: User. 1073: 1074: chat_with_seqnum(To, Text) -> 1075: escalus_stanza:set_id(escalus_stanza:chat_to(To, Text), 1076: integer_to_binary(erlang:monotonic_time())). 1077: 1078: order_by_seqnum(Stanzas) -> 1079: lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end, 1080: Stanzas). 1081: 1082: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) -> 1083: Pred = fun(Item) -> 1084: exml_query:attr(Item, <<"jid">>) =:= Service 1085: end, 1086: case lists:filter(Pred, Services) of 1087: [_] -> true; 1088: _ -> false 1089: end. 1090: 1091: send_steps(From, To, Max, ToHost) -> 1092: next_send_step(From, To, 1, Max, Max div 10, true, ToHost). 1093: 1094: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok; 1095: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) -> 1096: ct:log("Reset: I: ~B", [I]), 1097: case KnowsMapping of 1098: true -> delete_mapping(europe_node1, To); 1099: false -> set_mapping(europe_node1, To, ToHost) 1100: end, 1101: next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost); 1102: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) -> 1103: ct:log("I: ~B ~B ~B", [I, Max, ToReset]), 1104: Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)), 1105: escalus_client:send(From, Stanza), 1106: next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost). 1107: 1108: get_mapping(Node, Client) -> 1109: {FullJid, _BareJid} = jids(Client), 1110: {ok, What} = redis_query(Node, [<<"GET">>, FullJid]), 1111: What. 1112: 1113: %% Warning! May not work properly with alice or any other user whose 1114: %% stringprepped JID is different than original one 1115: delete_mapping(Node, Client) -> 1116: {FullJid, BareJid} = jids(Client), 1117: redis_query(Node, [<<"DEL">>, FullJid, BareJid]), 1118: Jid = rpc(Node, jid, from_binary, [FullJid]), 1119: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1120: 1121: set_mapping(Node, Client, Mapping) -> 1122: {FullJid, BareJid} = jids(Client), 1123: redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]), 1124: Jid = rpc(Node, jid, from_binary, [FullJid]), 1125: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1126: 1127: jids(Client) -> 1128: FullJid = escalus_client:full_jid(Client), 1129: BareJid = escalus_client:short_jid(Client), 1130: {FullJid, BareJid}. 1131: 1132: redis_query(Node, Query) -> 1133: {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]), 1134: rpc(Node, eredis, q, [RedisWorker, Query]). 1135: 1136: %% A fake address we don't try to connect to. 1137: %% Used in test_advertised_endpoints_override_endpoints testcase. 1138: advertised_endpoints() -> 1139: [ 1140: {fake_domain(), get_port(reg, gd_endpoint_port)} 1141: ]. 1142: 1143: fake_domain() -> 1144: "somefakedomain.com". 1145: 1146: iptuples_to_string([]) -> 1147: []; 1148: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) -> 1149: [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)]; 1150: iptuples_to_string([E | Endps]) -> 1151: [E | iptuples_to_string(Endps)]. 1152: 1153: endpoint_opts(NodeName, ReceiverPort, Config) -> 1154: Endpoints = [endpoint(ReceiverPort)], 1155: AdvertisedEndpoints = 1156: proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints), 1157: #{endpoints => Endpoints, 1158: resolved_endpoints => [resolved_endpoint(ReceiverPort)], 1159: advertised_endpoints => AdvertisedEndpoints}. 1160: 1161: mock_inet_on_each_node() -> 1162: Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()), 1163: Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes), 1164: true = lists:all(fun(Result) -> Result =:= ok end, Results). 1165: 1166: execute_on_each_node(M, F, A) -> 1167: lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()). 1168: 1169: mock_inet() -> 1170: %% We don't want to mock inet module itself to avoid strange networking issues 1171: meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]), 1172: meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]}; 1173: (_, inet6) -> {error, "No ipv6 address"} end). 1174: 1175: unmock_inet(_Pids) -> 1176: execute_on_each_node(meck, unload, [mod_global_distrib_utils]). 1177: 1178: out_connection_sups(Node) -> 1179: Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]), 1180: lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children). 1181: 1182: trees_for_connections_present() -> 1183: AsiaChildren = out_connection_sups(asia_node), 1184: Europe1Children = out_connection_sups(europe_node1), 1185: Europe2Children = out_connection_sups(europe_node2), 1186: lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]). 1187: 1188: tree_for_sup_present(Node, ExpectedSup) -> 1189: Children = out_connection_sups(Node), 1190: lists:keyfind(ExpectedSup, 1, Children) =/= false. 1191: 1192: 1193: %% ------------------------------- rebalancing helpers ----------------------------------- 1194: 1195: spawn_connection_getter(SenderNode) -> 1196: TestPid = self(), 1197: spawn(fun() -> 1198: Conn = get_connection(SenderNode, <<"reg1">>), 1199: TestPid ! Conn 1200: end). 1201: 1202: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) -> 1203: restart_receiver(ListenNode, [Port, listen_port(ListenNode)]), 1204: set_endpoints(ListenNode, [Port, listen_port(ListenNode)]), 1205: trigger_rebalance(SenderNode, <<"reg1">>). 1206: 1207: get_connection(SenderNode, ToDomain) -> 1208: rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]). 1209: 1210: hide_extra_endpoint(ListenNode) -> 1211: set_endpoints(ListenNode, [listen_port(ListenNode)]). 1212: 1213: set_endpoints(ListenNode, Ports) -> 1214: Endpoints = [endpoint(Port) || Port <- Ports], 1215: {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]). 1216: 1217: get_outgoing_connections(NodeName, DestinationDomain) -> 1218: Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]), 1219: Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]), 1220: Enabled = rpc(NodeName, mod_global_distrib_server_mgr, 1221: get_enabled_endpoints, [DestinationDomain]), 1222: Disabled = rpc(NodeName, mod_global_distrib_server_mgr, 1223: get_disabled_endpoints, [DestinationDomain]), 1224: PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]), 1225: Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ], 1226: {Enabled, Disabled, Pools}. 1227: 1228: restart_receiver(NodeName) -> 1229: restart_receiver(NodeName, [listen_port(NodeName)]). 1230: 1231: restart_receiver(NodeName, NewPorts) -> 1232: OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts, 1233: [<<"localhost">>, mod_global_distrib_receiver]), 1234: NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts], 1235: resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]}, 1236: NewOpts = OldOpts#{connections := NewConnOpts}, 1237: Node = node_spec(NodeName), 1238: dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts). 1239: 1240: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) -> 1241: %% To ensure that the manager exists, 1242: %% otherwise we can get noproc error in the force_refresh call 1243: ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup, 1244: ensure_server_started, [DestinationDomain]), 1245: rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]), 1246: StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]), 1247: ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]), 1248: timer:sleep(1000). 1249: 1250: %% ----------------------------------------------------------------------- 1251: %% Escalus-related helpers 1252: 1253: %% Receive messages with Bodies in any order, skipping presences from stream resumption 1254: user_receives(User, Bodies) -> 1255: Opts = #{pred => fun(Stanza) -> not escalus_pred:is_presence(Stanza) end}, 1256: Checks = [fun(Stanza) -> escalus_pred:is_chat_message(Body, Stanza) end || Body <- Bodies], 1257: escalus:assert_many(Checks, [escalus_connection:receive_stanza(User, Opts) || _ <- Bodies]). 1258: 1259: %% ----------------------------------------------------------------------- 1260: %% Refreshing helpers 1261: 1262: %% Reason is a string 1263: %% NodeName is asia_node, europe_node2, ... in a format used by this suite. 1264: refresh_mappings(NodeName, Reason) when is_list(Reason) -> 1265: rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]). 1266: 1267: refresh_hosts(NodeNames, Reason) -> 1268: [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames]. 1269: 1270: 1271: %% ----------------------------------------------------------------------- 1272: %% Other helpers 1273: 1274: connect_steps_with_sm() -> 1275: [start_stream, stream_features, maybe_use_ssl, 1276: authenticate, bind, session, stream_resumption]. 1277: 1278: bare_client(Client) -> 1279: Client#client{jid = escalus_utils:get_short_jid(Client)}. 1280: 1281: service_port() -> 1282: ct:get_config({hosts, mim, service_port}). 1283: 1284: 1285: %% ----------------------------------------------------------------------- 1286: %% Waiting helpers 1287: 1288: wait_for_domain(Node, Domain) -> 1289: F = fun() -> 1290: Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []), 1291: lists:member(Domain, Domains) 1292: end, 1293: mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}). 1294: 1295: 1296: %% ----------------------------------------------------------------------- 1297: %% Ensure, that endpoints are up 1298: 1299: wait_for_listeners_to_appear() -> 1300: [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())]. 1301: 1302: receiver_ports(Hosts) -> 1303: lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts). 1304: 1305: wait_for_can_connect_to_port(Port) -> 1306: Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}}, 1307: mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts). 1308: 1309: can_connect_to_port(Port) -> 1310: case gen_tcp:connect("127.0.0.1", Port, []) of 1311: {ok, Sock} -> 1312: gen_tcp:close(Sock), 1313: true; 1314: Other -> 1315: ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]), 1316: false 1317: end. 1318: 1319: %% ----------------------------------------------------------------------- 1320: %% Custom log levels for GD modules during the tests 1321: 1322: enable_logging() -> 1323: mim_loglevel:enable_logging(test_hosts(), custom_loglevels()). 1324: 1325: disable_logging() -> 1326: mim_loglevel:disable_logging(test_hosts(), custom_loglevels()). 1327: 1328: custom_loglevels() -> 1329: %% for "s2s connection to muc.localhost not found" debugging 1330: [{ejabberd_s2s, debug}, 1331: %% for debugging event=refreshing_own_data_done 1332: {mod_global_distrib_mapping_redis, info}, 1333: %% to know if connection is already started or would be started 1334: %% event=outgoing_conn_start_progress 1335: {mod_global_distrib_outgoing_conns_sup, info}, 1336: %% to debug bound connection issues 1337: {mod_global_distrib, debug}, 1338: %% to know all new connections pids 1339: {mod_global_distrib_connection, debug}, 1340: %% to check if gc or refresh is triggered 1341: {mod_global_distrib_server_mgr, info}, 1342: %% To debug incoming connections 1343: % {mod_global_distrib_receiver, info}, 1344: %% to debug global session set/delete 1345: {mod_global_distrib_mapping, debug} 1346: ]. 1347: 1348: test_hosts() -> [mim, mim2, reg].