1: %%============================================================================== 2: %% Copyright 2017 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mod_global_distrib_SUITE). 18: 19: -compile([export_all, nowarn_export_all]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("common_test/include/ct.hrl"). 23: -include_lib("eunit/include/eunit.hrl"). 24: -include_lib("escalus/include/escalus_xmlns.hrl"). 25: -include_lib("exml/include/exml.hrl"). 26: 27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms 28: 29: -import(domain_helper, [domain/0]). 30: -import(config_parser_helper, [config/2, mod_config/2]). 31: 32: %%-------------------------------------------------------------------- 33: %% Suite configuration 34: %%-------------------------------------------------------------------- 35: 36: all() -> 37: [ 38: {group, mod_global_distrib}, 39: {group, cluster_restart}, 40: {group, start_checks}, 41: {group, invalidation}, 42: {group, multi_connection}, 43: {group, rebalancing}, 44: {group, advertised_endpoints}, 45: {group, hosts_refresher} 46: ]. 47: 48: groups() -> 49: [{mod_global_distrib, [], 50: [ 51: test_pm_between_users_at_different_locations, 52: test_pm_between_users_before_available_presence, 53: test_component_disconnect , 54: test_component_on_one_host, 55: test_components_in_different_regions, 56: test_hidden_component_disco_in_different_region, 57: test_pm_with_disconnection_on_other_server, 58: test_pm_with_graceful_reconnection_to_different_server, 59: test_pm_with_ungraceful_reconnection_to_different_server, 60: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first, 61: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first, 62: test_component_unregister, 63: test_update_senders_host, 64: test_update_senders_host_by_ejd_service, 65: 66: %% with node 2 disabled 67: test_muc_conversation_on_one_host, 68: test_global_disco 69: ]}, 70: {hosts_refresher, [], 71: [test_host_refreshing]}, 72: {cluster_restart, [], 73: [ 74: test_location_disconnect 75: ]}, 76: {start_checks, [], 77: [ 78: test_error_on_wrong_hosts 79: ]}, 80: {invalidation, [], 81: [ 82: % TODO: Add checks for other mapping refreshes 83: refresh_nodes 84: ]}, 85: {multi_connection, [], 86: [ 87: test_in_order_messages_on_multiple_connections, 88: test_in_order_messages_on_multiple_connections_with_bounce, 89: test_messages_bounced_in_order, 90: 91: %% with node 2 disabled 92: test_muc_conversation_history 93: ]}, 94: {rebalancing, [], 95: [ 96: enable_new_endpoint_on_refresh, 97: disable_endpoint_on_refresh, 98: wait_for_connection, 99: closed_connection_is_removed_from_disabled 100: ]}, 101: {advertised_endpoints, [], 102: [ 103: test_advertised_endpoints_override_endpoints, 104: test_pm_between_users_at_different_locations 105: ]} 106: ]. 107: 108: suite() -> 109: [{require, europe_node1, {hosts, mim, node}}, 110: {require, europe_node2, {hosts, mim2, node}}, 111: {require, asia_node, {hosts, reg, node}}, 112: {require, c2s_port, {hosts, mim, c2s_port}} | 113: escalus:suite()]. 114: 115: %%-------------------------------------------------------------------- 116: %% Init & teardown 117: %%-------------------------------------------------------------------- 118: 119: init_per_suite(Config) -> 120: case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]), 121: rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of 122: {{ok, _}, {ok, _}} -> 123: case ct_helper:get_internal_database() of 124: mnesia -> 125: ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]); 126: _ -> 127: ok 128: end, 129: enable_logging(), 130: escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]); 131: Result -> 132: ct:pal("Redis check result: ~p", [Result]), 133: {skip, "GD Redis default pool not available"} 134: end. 135: 136: end_per_suite(Config) -> 137: disable_logging(), 138: escalus_fresh:clean(), 139: rpc(europe_node2, mongoose_cluster, leave, []), 140: escalus:end_per_suite(Config). 141: 142: init_per_group(start_checks, Config) -> 143: NodeName = europe_node1, 144: NodeSpec = node_spec(NodeName), 145: Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config), 146: dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]), 147: Config1; 148: init_per_group(multi_connection, Config) -> 149: ExtraConfig = #{bounce => #{resend_after_ms => 20000}, 150: connections => #{%% Disable unused feature to avoid interference 151: connections_per_endpoint => 100, 152: disabled_gc_interval => 10000}}, 153: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 154: init_per_group(invalidation, Config) -> 155: Config1 = init_per_group(invalidation_generic, Config), 156: NodeBin = <<"fake_node@localhost">>, 157: [{node_to_expire, NodeBin} | Config1]; 158: init_per_group(rebalancing, Config) -> 159: %% We need to prevent automatic refreshes, because they may interfere with tests 160: %% and we need early disabled garbage collection to check its validity 161: ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600, 162: endpoint_refresh_interval_when_empty => 3600, 163: disabled_gc_interval => 1}, 164: redis => #{refresh_after => 3600}}, 165: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 166: init_per_group(advertised_endpoints, Config) -> 167: lists:foreach(fun({NodeName, _, _}) -> 168: Node = ct:get_config(NodeName), 169: mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload) 170: end, get_hosts()), 171: mock_inet_on_each_node(), 172: init_per_group_generic( 173: [{add_advertised_endpoints, 174: [{asia_node, advertised_endpoints()}]} | Config]); 175: init_per_group(mod_global_distrib, Config) -> 176: %% Disable mod_global_distrib_mapping_redis refresher 177: ExtraConfig = #{redis => #{refresh_after => 3600}}, 178: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 179: init_per_group(_, Config) -> 180: init_per_group_generic(Config). 181: 182: init_per_group_generic(Config0) -> 183: Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()), 184: wait_for_listeners_to_appear(), 185: {SomeNode, _, _} = hd(get_hosts()), 186: NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []), 187: [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2]. 188: 189: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) -> 190: Extra0 = module_opts(?config(extra_config, Config0)), 191: EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0), 192: ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})), 193: Extra = Extra0#{local_host => LocalHost, connections => ConnExtra}, 194: Opts = module_opts(Extra), 195: 196: VirtHosts = virtual_hosts(NodeName), 197: Node = node_spec(NodeName), 198: Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0), 199: 200: %% To reduce load when sending many messages 201: ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last, 202: mod_stream_management], 203: [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts], 204: 205: SMBackend = ct_helper:get_internal_database(), 206: SMOpts = config_parser_helper:mod_config(mod_stream_management, 207: #{resume_timeout => 1, backend => SMBackend}), 208: dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts}, 209: {mod_stream_management, SMOpts}]), 210: Config1. 211: 212: module_opts(ExtraOpts) -> 213: lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]). 214: 215: set_opts(common, Opts) -> 216: maps:merge(#{global_host => <<"localhost">>, 217: hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts); 218: set_opts(defaults, Opts) -> 219: mod_config(mod_global_distrib, Opts); 220: set_opts(connections, #{connections := ConnExtra} = Opts) -> 221: TLSOpts = config([modules, mod_global_distrib, connections, tls], 222: #{certfile => "priv/ssl/fake_server.pem", 223: cacertfile => "priv/ssl/ca/cacert.pem"}), 224: Opts#{connections := config([modules, mod_global_distrib, connections], 225: maps:merge(#{tls => TLSOpts}, ConnExtra))}; 226: set_opts(redis, #{redis := RedisExtra} = Opts) -> 227: Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)}; 228: set_opts(bounce, #{bounce := BounceExtra} = Opts) -> 229: Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)}; 230: set_opts(_, Opts) -> 231: Opts. 232: 233: end_per_group(advertised_endpoints, Config) -> 234: Pids = ?config(meck_handlers, Config), 235: unmock_inet(Pids), 236: escalus_fresh:clean(), 237: end_per_group_generic(Config); 238: end_per_group(start_checks, Config) -> 239: escalus_fresh:clean(), 240: Config; 241: end_per_group(invalidation, Config) -> 242: redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config), 243: ?config(node_to_expire, Config)]), 244: end_per_group_generic(Config); 245: end_per_group(_, Config) -> 246: end_per_group_generic(Config). 247: 248: end_per_group_generic(Config) -> 249: dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config). 250: 251: init_per_testcase(CaseName, Config) 252: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 253: CaseName == test_muc_conversation_history -> 254: %% There is no helper to load MUC on node2 255: %% For now it's easier to hide node2 256: %% TODO: Do it right at some point! 257: hide_node(europe_node2, Config), 258: %% There would be no new connections to europe_node2, but there can be some old ones. 259: %% We need to disconnect previous connections. 260: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 261: trigger_rebalance(asia_node, EuropeHost), 262: %% Load muc on mim node 263: muc_helper:load_muc(), 264: RegNode = ct:get_config({hosts, reg, node}), 265: %% Wait for muc.localhost to become visible from reg node 266: wait_for_domain(RegNode, muc_helper:muc_host()), 267: escalus:init_per_testcase(CaseName, Config); 268: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 269: CN == test_pm_with_ungraceful_reconnection_to_different_server; 270: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 271: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 272: escalus:init_per_testcase(CN, init_user_eve(Config)); 273: init_per_testcase(CaseName, Config) -> 274: escalus:init_per_testcase(CaseName, Config). 275: 276: init_user_eve(Config) -> 277: %% Register Eve in reg cluster 278: EveSpec = escalus_fresh:create_fresh_user(Config, eve), 279: MimPort = ct:get_config({hosts, mim, c2s_port}), 280: EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}), 281: %% Register Eve in mim cluster 282: escalus:create_users(Config, [{eve, EveSpec2}]), 283: [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config]. 284: 285: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 286: CN == test_pm_with_ungraceful_reconnection_to_different_server; 287: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 288: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 289: MimEveSpec = ?config(evespec_mim, Config), 290: %% Clean Eve from reg cluster 291: escalus_fresh:clean(), 292: %% Clean Eve from mim cluster 293: %% For shared databases (i.e. mysql, pgsql...), 294: %% removing from one cluster would remove from all clusters. 295: %% For mnesia auth backend we need to call removal from each cluster. 296: %% That's why there is a catch here. 297: catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]), 298: generic_end_per_testcase(CN, Config); 299: end_per_testcase(CaseName, Config) 300: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 301: CaseName == test_muc_conversation_history -> 302: refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)), 303: muc_helper:unload_muc(), 304: generic_end_per_testcase(CaseName, Config); 305: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) -> 306: refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 307: generic_end_per_testcase(CN, Config); 308: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh; 309: CN == disable_endpoint_on_refresh; 310: CN == wait_for_connection; 311: CN == closed_connection_is_removed_from_disabled -> 312: restart_receiver(asia_node), 313: refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 314: generic_end_per_testcase(CN, Config); 315: end_per_testcase(CaseName, Config) -> 316: generic_end_per_testcase(CaseName, Config). 317: 318: generic_end_per_testcase(CaseName, Config) -> 319: lists:foreach( 320: fun({NodeName, _, _}) -> 321: %% TODO: Enable refresher only for specific test cases, 322: %% as some of them are based on assumption that node(s) 323: %% must open new connections during tests. 324: pause_refresher(NodeName, CaseName), 325: Node = ct:get_config(NodeName), 326: SupRef = {mod_global_distrib_outgoing_conns_sup, Node}, 327: try 328: OutgoingConns = supervisor:which_children(SupRef), 329: lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) -> 330: skip; 331: ({Id, _, _, _}) -> 332: supervisor:terminate_child(SupRef, Id) 333: end, OutgoingConns), 334: [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] = 335: supervisor:which_children(SupRef) 336: catch 337: _:{noproc, _} -> 338: ct:pal("Sender supervisor not found in ~p", [NodeName]) 339: end, 340: unpause_refresher(NodeName, CaseName) 341: end, 342: get_hosts()), 343: escalus:end_per_testcase(CaseName, Config). 344: 345: virtual_hosts(asia_node) -> 346: [domain()]; 347: virtual_hosts(_) -> 348: [domain(), secondary_domain()]. 349: 350: secondary_domain() -> 351: ct:get_config({hosts, mim, secondary_domain}). 352: 353: %% Refresher is not started at all or stopped for some test cases 354: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 355: pause_refresher(_, test_error_on_wrong_hosts) -> 356: ok; 357: pause_refresher(asia_node, test_location_disconnect) -> 358: ok; 359: pause_refresher(NodeName, _) -> 360: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []). 361: 362: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 363: unpause_refresher(_, test_error_on_wrong_hosts) -> 364: ok; 365: unpause_refresher(asia_node, test_location_disconnect) -> 366: ok; 367: unpause_refresher(NodeName, _) -> 368: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []). 369: 370: %%-------------------------------------------------------------------- 371: %% Service discovery test 372: %%-------------------------------------------------------------------- 373: 374: %% Requires module mod_global_distrib to be started with argument advertised_endpoints 375: %% for each host in get_hosts(). 376: %% Reads Redis to confirm that endpoints (in Redis) are overwritten 377: %% with `advertised_endpoints` option value 378: test_advertised_endpoints_override_endpoints(_Config) -> 379: Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]), 380: true = lists:all( 381: fun(E) -> 382: lists:sort(E) =:= lists:sort(advertised_endpoints()) 383: end, Endps). 384: 385: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if 386: %% it goes down for some reason (crash or domain unavailability). 387: %% Also actually verifies that refresher properly reads host list 388: %% from backend and starts appropriate pool. 389: test_host_refreshing(_Config) -> 390: mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true, 391: #{name => trees_for_connections_present, 392: time_left => timer:seconds(10)}), 393: ConnectionSups = out_connection_sups(asia_node), 394: {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 395: EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]), 396: {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups), 397: erlang:exit(EuropePid, kill), % it's ok to kill temporary process 398: mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true, 399: #{name => tree_for_sup_present}). 400: 401: %% When run in mod_global_distrib group - tests simple case of connection 402: %% between two users connected to different clusters. 403: %% When run in advertised_endpoints group it tests whether it is possible 404: %% to connect to a node that is advertising itself with a domain name. 405: test_pm_between_users_at_different_locations(Config) -> 406: escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2). 407: 408: test_pm_between_users_before_available_presence(Config) -> 409: Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]), 410: {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>), 411: {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>), 412: 413: test_two_way_pm(Alice, Eve), 414: 415: escalus_client:stop(Config1, Alice), 416: escalus_client:stop(Config1, Eve). 417: 418: test_two_way_pm(Alice, Eve) -> 419: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)), 420: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)), 421: 422: FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)), 423: FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)), 424: 425: AliceJid = escalus_client:full_jid(Alice), 426: EveJid = escalus_client:full_jid(Eve), 427: 428: escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>], 429: FromAlice), 430: escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>], 431: FromEve). 432: 433: test_muc_conversation_on_one_host(Config0) -> 434: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 435: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 436: escalus:fresh_story( 437: Config, [{eve, 1}], 438: fun(Eve) -> 439: Alice = connect_from_spec(AliceSpec, Config), 440: 441: RoomJid = ?config(room, Config), 442: AliceUsername = escalus_utils:get_username(Alice), 443: EveUsername = escalus_utils:get_username(Eve), 444: RoomAddr = muc_helper:room_address(RoomJid), 445: 446: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 447: wait_for_muc_presence(Alice, RoomJid, AliceUsername), 448: wait_for_subject(Alice), 449: 450: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 451: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 452: wait_for_muc_presence(Eve, RoomJid, EveUsername), 453: wait_for_muc_presence(Alice, RoomJid, EveUsername), 454: wait_for_subject(Eve), 455: 456: Msg= <<"Hi, Eve!">>, 457: escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)), 458: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)), 459: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)), 460: 461: Msg2= <<"Hi, Alice!">>, 462: escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)), 463: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)), 464: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice)) 465: end), 466: muc_helper:destroy_room(Config). 467: 468: test_muc_conversation_history(Config0) -> 469: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 470: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 471: escalus:fresh_story( 472: Config, [{eve, 1}], 473: fun(Eve) -> 474: Alice = connect_from_spec(AliceSpec, Config), 475: 476: RoomJid = ?config(room, Config), 477: AliceUsername = escalus_utils:get_username(Alice), 478: RoomAddr = muc_helper:room_address(RoomJid), 479: 480: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 481: wait_for_muc_presence(Alice, RoomJid, AliceUsername), 482: wait_for_subject(Alice), 483: 484: send_n_muc_messages(Alice, RoomAddr, 3), 485: 486: %% Ensure that the messages are received by the room 487: %% before trying to login Eve. 488: %% Otherwise, Eve would receive some messages from history and 489: %% some as regular groupchat messages. 490: receive_n_muc_messages(Alice, 3), 491: 492: EveUsername = escalus_utils:get_username(Eve), 493: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 494: 495: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 496: wait_for_muc_presence(Eve, RoomJid, EveUsername), 497: wait_for_muc_presence(Alice, RoomJid, EveUsername), 498: 499: %% XEP-0045: After sending the presence broadcast (and only after doing so), 500: %% the service MAY then send discussion history, the room subject, 501: %% live messages, presence updates, and other in-room traffic. 502: receive_n_muc_messages(Eve, 3), 503: wait_for_subject(Eve) 504: end), 505: muc_helper:destroy_room(Config). 506: 507: wait_for_muc_presence(User, RoomJid, FromNickname) -> 508: Presence = escalus:wait_for_stanza(User), 509: escalus:assert(is_presence, Presence), 510: escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence), 511: ok. 512: 513: wait_for_subject(User) -> 514: Subject = escalus:wait_for_stanza(User), 515: escalus:assert(is_groupchat_message, Subject), 516: ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)), 517: ok. 518: 519: send_n_muc_messages(User, RoomAddr, N) -> 520: lists:foreach(fun(I) -> 521: Msg = <<"test-", (integer_to_binary(I))/binary>>, 522: escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg)) 523: end, lists:seq(1, N)). 524: 525: receive_n_muc_messages(User, N) -> 526: lists:foreach(fun(J) -> 527: Msg = <<"test-", (integer_to_binary(J))/binary>>, 528: Stanza = escalus:wait_for_stanza(User), 529: escalus:assert(is_groupchat_message, [Msg], Stanza) 530: end, lists:seq(1, N)). 531: 532: test_component_on_one_host(Config) -> 533: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 534: {port, service_port()}, {component, <<"test_service">>}], 535: 536: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 537: 538: Story = fun(User) -> 539: Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>), 540: escalus:send(User, Msg1), 541: %% Then component receives it 542: Reply1 = escalus:wait_for_stanza(Comp), 543: escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1), 544: 545: %% When components sends a reply 546: Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>), 547: escalus:send(Comp, escalus_stanza:from(Msg2, Addr)), 548: 549: %% Then Alice receives it 550: Reply2 = escalus:wait_for_stanza(User), 551: escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2), 552: escalus:assert(is_stanza_from, [Addr], Reply2) 553: end, 554: 555: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 556: 557: %% Ensures that 2 components in distinct data centers can communicate. 558: test_components_in_different_regions(_Config) -> 559: ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>}, 560: {server, <<"localhost">>}, {component, <<"test_service">>}], 561: Comp1Port = ct:get_config({hosts, mim, service_port}), 562: Comp2Port = ct:get_config({hosts, reg, service_port}), 563: Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig], 564: Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig], 565: 566: {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config), 567: {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config), 568: 569: Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1), 570: escalus:send(Comp1, Msg1), 571: GotMsg1 = escalus:wait_for_stanza(Comp2), 572: escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1), 573: 574: Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2), 575: escalus:send(Comp2, Msg2), 576: GotMsg2 = escalus:wait_for_stanza(Comp1), 577: escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2). 578: 579: %% Ordinary user is not able to discover the hidden component from GD 580: test_hidden_component_disco_in_different_region(Config) -> 581: %% Hidden component from component_SUITE connects to mim1/europe_node1 582: HiddenComponentConfig = component_helper:spec(hidden_component, Config), 583: {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig), 584: 585: escalus:fresh_story( 586: Config, [{eve, 1}], 587: fun(Eve) -> 588: EveServer = escalus_client:server(Eve), 589: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 590: DiscoReply = escalus:wait_for_stanza(Eve), 591: escalus:assert(is_iq_result, DiscoReply), 592: escalus:assert(fun(Stanza) -> 593: not escalus_pred:has_service(HiddenAddr, Stanza) 594: end, DiscoReply) 595: end). 596: 597: test_component_disconnect(Config) -> 598: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 599: {port, service_port()}, {component, <<"test_service">>}], 600: 601: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 602: component_helper:disconnect_component(Comp, Addr), 603: 604: Story = fun(User) -> 605: escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)), 606: Error = escalus:wait_for_stanza(User, 5000), 607: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 608: end, 609: 610: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 611: 612: test_location_disconnect(Config) -> 613: try 614: escalus:fresh_story( 615: Config, [{alice, 1}, {eve, 1}], 616: fun(Alice, Eve) -> 617: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 618: 619: escalus_client:wait_for_stanza(Eve), 620: 621: ok = rpc(asia_node, application, stop, [mongooseim]), 622: %% TODO: Stopping mongooseim alone should probably stop connections too 623: ok = rpc(asia_node, application, stop, [ranch]), 624: 625: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), 626: Error = escalus:wait_for_stanza(Alice), 627: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 628: end) 629: after 630: rpc(asia_node, application, start, [ranch]), 631: rpc(asia_node, application, start, [mongooseim]) 632: end. 633: 634: test_pm_with_disconnection_on_other_server(Config) -> 635: escalus:fresh_story( 636: Config, [{alice, 1}, {eve, 1}], 637: fun(Alice, Eve) -> 638: escalus_connection:stop(Eve), 639: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 640: FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000), 641: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce) 642: end). 643: 644: test_pm_with_graceful_reconnection_to_different_server(Config) -> 645: EveSpec = ?config(evespec_reg, Config), 646: EveSpec2 = ?config(evespec_mim, Config), 647: escalus:fresh_story( 648: Config, [{alice, 1}], 649: fun(Alice) -> 650: Eve = connect_from_spec(EveSpec, Config), 651: 652: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)), 653: 654: %% Stop connection and wait for process to die 655: EveNode = ct:get_config({hosts, reg, node}), 656: mongoose_helper:logout_user(Config, Eve, #{node => EveNode}), 657: 658: FromEve = escalus_client:wait_for_stanza(Alice), 659: 660: %% Pause Alice until Eve is reconnected 661: AliceNode = ct:get_config({hosts, mim, node}), 662: C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}), 663: ok = rpc(asia_node, sys, suspend, [C2sPid]), 664: 665: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)), 666: 667: NewEve = connect_from_spec(EveSpec2, Config), 668: 669: ok = rpc(asia_node, sys, resume, [C2sPid]), 670: 671: 672: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)), 673: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)), 674: 675: FirstFromAlice = escalus_client:wait_for_stanza(NewEve), 676: AgainFromEve = escalus_client:wait_for_stanza(Alice), 677: SecondFromAlice = escalus_client:wait_for_stanza(NewEve), 678: 679: [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]), 680: 681: escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice), 682: escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve), 683: escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice), 684: escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve) 685: end). 686: 687: %% Refresh logic can cause two possible behaviours. 688: %% We test both behaviours here (plus no refresh case) 689: %% See PR #2392 690: test_pm_with_ungraceful_reconnection_to_different_server(Config) -> 691: %% No refresh 692: BeforeResume = fun() -> ok end, 693: AfterCheck = fun(Alice, NewEve) -> 694: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 695: user_receives(Alice, [<<"Hi from Europe!">>]) 696: end, 697: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 698: 699: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) -> 700: %% Same as no refresh 701: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first", 702: % Order of nodes is important here in refresh_hosts! 703: BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end, 704: AfterCheck = fun(Alice, NewEve) -> 705: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 706: user_receives(Alice, [<<"Hi from Europe!">>]) 707: end, 708: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 709: 710: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) -> 711: %% Asia node overrides Europe value with the older ones, 712: %% so we loose some messages during rerouting :( 713: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first", 714: BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end, 715: AfterCheck = fun(Alice, NewEve) -> 716: user_receives(NewEve, [<<"Hi again from Europe1!">>]), 717: user_receives(Alice, [<<"Hi from Europe!">>]) 718: end, 719: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 720: 721: %% Reconnect Eve from asia (reg cluster) to europe (mim) 722: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) -> 723: Config = escalus_users:update_userspec(Config0, eve, stream_management, true), 724: EveSpec = ?config(evespec_reg, Config), 725: EveSpec2 = ?config(evespec_mim, Config), 726: escalus:fresh_story( 727: Config, [{alice, 1}], 728: fun(Alice) -> 729: {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()), 730: escalus_story:send_initial_presence(Eve), 731: escalus_client:wait_for_stanza(Eve), 732: 733: %% Stop connection and wait for process to die 734: EveNode = ct:get_config({hosts, reg, node}), 735: C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}), 736: ok = rpc(asia_node, sys, suspend, [C2sPid]), 737: 738: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)), 739: 740: %% Wait for route message to be queued in c2s message queue 741: mongoose_helper:wait_for_route_message_count(C2sPid, 1), 742: 743: %% Time to do bad nasty things with our socket, so once our process wakes up, 744: %% it SHOULD detect a dead socket 745: escalus_connection:kill(Eve), 746: 747: %% Connect another one, we hope the message would be rerouted 748: NewEve = connect_from_spec(EveSpec2, Config), 749: 750: BeforeResume(), 751: 752: %% Trigger rerouting 753: ok = rpc(asia_node, sys, resume, [C2sPid]), 754: 755: %% Let C2sPid to process the message and reroute (and die finally, poor little thing) 756: mongoose_helper:wait_for_pid_to_die(C2sPid), 757: 758: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)), 759: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)), 760: 761: AfterCheck(Alice, NewEve) 762: end). 763: 764: test_global_disco(Config) -> 765: escalus:fresh_story( 766: Config, [{alice, 1}, {eve, 1}], 767: fun(Alice, Eve) -> 768: AliceServer = escalus_client:server(Alice), 769: escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)), 770: _AliceStanza = escalus:wait_for_stanza(Alice), 771: %% TODO: test for duplicate components 772: %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza), 773: 774: EveServer = escalus_client:server(Eve), 775: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 776: EveStanza = escalus:wait_for_stanza(Eve), 777: escalus:assert(has_service, [muc_helper:muc_host()], EveStanza) 778: end). 779: 780: test_component_unregister(_Config) -> 781: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 782: {port, service_port()}, {component, <<"test_service">>}], 783: 784: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 785: ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 786: [<<"test_service.localhost">>])), 787: 788: component_helper:disconnect_component(Comp, Addr), 789: 790: ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 791: [<<"test_service.localhost">>])). 792: 793: test_error_on_wrong_hosts(_Config) -> 794: Opts = module_opts(#{local_host => <<"no_such_host">>}), 795: ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed, 796: domain := <<"no_such_host">>}, _}}}, 797: dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>, 798: [{mod_global_distrib, Opts}])). 799: 800: refresh_nodes(Config) -> 801: NodesKey = ?config(nodes_key, Config), 802: NodeBin = ?config(node_to_expire, Config), 803: redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]), 804: refresh_mappings(europe_node1, "by_refresh_nodes"), 805: {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]). 806: 807: test_in_order_messages_on_multiple_connections(Config) -> 808: escalus:fresh_story( 809: Config, [{alice, 1}, {eve, 1}], 810: fun(Alice, Eve) -> 811: Seq = lists:seq(1, 100), 812: lists:foreach( 813: fun(I) -> 814: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 815: escalus_client:send(Alice, Stanza) 816: end, 817: Seq), 818: lists:foreach( 819: fun(I) -> 820: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 821: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 822: end, 823: Seq) 824: end). 825: 826: test_in_order_messages_on_multiple_connections_with_bounce(Config) -> 827: escalus:fresh_story( 828: Config, [{alice, 1}, {eve, 1}], 829: fun(Alice, Eve) -> 830: %% Send 99 messages, some while server knows the mapping and some when it doesn't 831: send_steps(Alice, Eve, 99, <<"reg1">>), 832: %% Make sure that the last message is sent when the mapping is known 833: set_mapping(europe_node1, Eve, <<"reg1">>), 834: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)), 835: 836: %% Check that all stanzas were received in order 837: lists:foreach( 838: fun(I) -> 839: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 840: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 841: end, 842: lists:seq(1, 100)) 843: end). 844: 845: test_messages_bounced_in_order(Config) -> 846: escalus:fresh_story( 847: Config, [{alice, 1}, {eve, 1}], 848: fun(Alice, Eve) -> 849: %% Make sure all messages land in bounce storage 850: delete_mapping(europe_node1, Eve), 851: 852: Seq = lists:seq(1, 100), 853: lists:foreach( 854: fun(I) -> 855: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 856: escalus_client:send(Alice, Stanza) 857: end, 858: Seq), 859: 860: %% Restore the mapping so that bounce eventually succeeds 861: ?assertEqual(undefined, get_mapping(europe_node1, Eve)), 862: set_mapping(europe_node1, Eve, <<"reg1">>), 863: 864: lists:foreach( 865: fun(I) -> 866: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 867: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 868: end, 869: Seq) 870: end). 871: 872: test_update_senders_host(Config) -> 873: escalus:fresh_story( 874: Config, [{alice, 1}, {eve, 1}], 875: fun(Alice, Eve) -> 876: AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]), 877: {ok, <<"localhost.bis">>} 878: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]), 879: ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]), 880: wait_for_node(asia_node, AliceJid), 881: 882: %% TODO: Should prevent Redis refresher from executing for a moment, 883: %% as it may collide with this test. 884: 885: escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)), 886: escalus:wait_for_stanza(Eve), 887: 888: {ok, <<"localhost.bis">>} 889: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]) 890: end). 891: wait_for_node(Node,Jid) -> 892: mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end, 893: error, 894: #{time_left => timer:seconds(10), 895: sleep_time => timer:seconds(1), 896: name => rpc}). 897: 898: test_update_senders_host_by_ejd_service(Config) -> 899: refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"), 900: %% Connects to europe_node1 901: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 902: {port, service_port()}, {component, <<"test_service">>}], 903: 904: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 905: 906: escalus:fresh_story( 907: Config, [{eve, 1}], 908: fun(Eve) -> 909: %% Eve is connected to asia_node 910: EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]), 911: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 912: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]), 913: 914: ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]), 915: wait_for_node(europe_node1, EveJid), 916: wait_for_node(europe_node2, EveJid), 917: 918: %% Component is connected to europe_node1 919: %% but we force asia_node to connect to europe_node2 by hiding europe_node1 920: %% and forcing rebalance (effectively disabling connections to europe_node1) 921: %% to verify routing cache update on both nodes 922: 923: %% TODO: Should prevent Redis refresher from executing for a moment, 924: %% as it may collide with this test. 925: 926: hide_node(europe_node1, Config), 927: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 928: trigger_rebalance(asia_node, EuropeHost), 929: 930: escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)), 931: escalus:wait_for_stanza(Comp), 932: 933: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 934: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]) 935: end). 936: 937: %% -------------------------------- Rebalancing -------------------------------- 938: 939: enable_new_endpoint_on_refresh(Config) -> 940: get_connection(europe_node1, <<"reg1">>), 941: 942: {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 943: 944: ExtraPort = get_port(reg, gd_extra_endpoint_port), 945: NewEndpoint = resolved_endpoint(ExtraPort), 946: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 947: 948: {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 949: 950: %% One new pool and one new endpoint 951: [NewEndpoint] = Pools2 -- Pools1, 952: [] = Pools1 -- Pools2, 953: [NewEndpoint] = Enabled2 -- Enabled1, 954: [] = Enabled1 -- Enabled2. 955: 956: disable_endpoint_on_refresh(Config) -> 957: ExtraPort = get_port(reg, gd_extra_endpoint_port), 958: NewEndpoint = resolved_endpoint(ExtraPort), 959: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 960: 961: get_connection(europe_node1, <<"reg1">>), 962: 963: {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 964: [_, _] = Enabled1, 965: [] = Disabled1, 966: 967: hide_extra_endpoint(asia_node), 968: trigger_rebalance(europe_node1, <<"reg1">>), 969: 970: {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 971: 972: %% 2 pools open even after disable 973: [] = Pools1 -- Pools2, 974: [] = Pools2 -- Pools1, 975: %% NewEndpoint is no longer enabled 976: [] = Enabled2 -- Enabled1, 977: [NewEndpoint] = Enabled1 -- Enabled2, 978: %% NewEndpoint is now disabled 979: [] = Disabled1, 980: [NewEndpoint] = Disabled2. 981: 982: wait_for_connection(_Config) -> 983: set_endpoints(asia_node, []), 984: %% Because of hosts refresher, a pool of connections to asia_node 985: %% may already be present here 986: mongoose_helper:wait_until( 987: fun () -> 988: try trigger_rebalance(europe_node1, <<"reg1">>), true 989: catch _:_ -> false end 990: end, 991: true, 992: #{name => rebalance, time_left => timer:seconds(5)}), 993: 994: spawn_connection_getter(europe_node1), 995: 996: receive 997: Unexpected1 -> error({unexpected, Unexpected1}) 998: after 999: 2000 -> ok 1000: end, 1001: 1002: refresh_mappings(asia_node, "by_wait_for_connection"), 1003: trigger_rebalance(europe_node1, <<"reg1">>), 1004: 1005: receive 1006: Conn when is_pid(Conn) -> ok; 1007: Unexpected2 -> error({unexpected, Unexpected2}) 1008: after 1009: 5000 -> error(timeout) 1010: end. 1011: 1012: closed_connection_is_removed_from_disabled(_Config) -> 1013: get_connection(europe_node1, <<"reg1">>), 1014: set_endpoints(asia_node, []), 1015: trigger_rebalance(europe_node1, <<"reg1">>), 1016: 1017: {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>), 1018: 1019: % Will drop connections and prevent them from reconnecting 1020: restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]), 1021: 1022: mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end, 1023: {[], [], []}, 1024: #{name => get_outgoing_connections}). 1025: 1026: 1027: %%-------------------------------------------------------------------- 1028: %% Test helpers 1029: %%-------------------------------------------------------------------- 1030: 1031: get_port(Host, Param) -> 1032: case ct:get_config({hosts, Host, Param}) of 1033: Port when is_integer(Port) -> 1034: Port; 1035: Other -> 1036: ct:fail({get_port_failed, Host, Param, Other}) 1037: end. 1038: 1039: get_hosts() -> 1040: [ 1041: {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)}, 1042: {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)}, 1043: {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)} 1044: ]. 1045: 1046: listen_endpoint(NodeName) -> 1047: endpoint(listen_port(NodeName)). 1048: 1049: listen_port(NodeName) -> 1050: {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()), 1051: Port. 1052: 1053: resolved_endpoint(Port) when is_integer(Port) -> 1054: {{127, 0, 0, 1}, Port}. 1055: 1056: endpoint(Port) when is_integer(Port) -> 1057: {"127.0.0.1", Port}. 1058: 1059: %% For dynamic_modules 1060: node_spec(NodeName) -> 1061: #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}. 1062: 1063: rpc(NodeName, M, F, A) -> 1064: Node = ct:get_config(NodeName), 1065: mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)). 1066: 1067: hide_node(NodeName, Config) -> 1068: NodesKey = ?config(nodes_key, Config), 1069: NodeBin = atom_to_binary(ct:get_config(NodeName), latin1), 1070: {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]). 1071: 1072: connect_from_spec(UserSpec, Config) -> 1073: {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>), 1074: escalus_story:send_initial_presence(User), 1075: escalus:assert(is_presence, escalus_client:wait_for_stanza(User)), 1076: User. 1077: 1078: chat_with_seqnum(To, Text) -> 1079: escalus_stanza:set_id(escalus_stanza:chat_to(To, Text), 1080: integer_to_binary(erlang:monotonic_time())). 1081: 1082: order_by_seqnum(Stanzas) -> 1083: lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end, 1084: Stanzas). 1085: 1086: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) -> 1087: Pred = fun(Item) -> 1088: exml_query:attr(Item, <<"jid">>) =:= Service 1089: end, 1090: case lists:filter(Pred, Services) of 1091: [_] -> true; 1092: _ -> false 1093: end. 1094: 1095: send_steps(From, To, Max, ToHost) -> 1096: next_send_step(From, To, 1, Max, Max div 10, true, ToHost). 1097: 1098: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok; 1099: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) -> 1100: ct:log("Reset: I: ~B", [I]), 1101: case KnowsMapping of 1102: true -> delete_mapping(europe_node1, To); 1103: false -> set_mapping(europe_node1, To, ToHost) 1104: end, 1105: next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost); 1106: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) -> 1107: ct:log("I: ~B ~B ~B", [I, Max, ToReset]), 1108: Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)), 1109: escalus_client:send(From, Stanza), 1110: next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost). 1111: 1112: get_mapping(Node, Client) -> 1113: {FullJid, _BareJid} = jids(Client), 1114: {ok, What} = redis_query(Node, [<<"GET">>, FullJid]), 1115: What. 1116: 1117: %% Warning! May not work properly with alice or any other user whose 1118: %% stringprepped JID is different than original one 1119: delete_mapping(Node, Client) -> 1120: {FullJid, BareJid} = jids(Client), 1121: redis_query(Node, [<<"DEL">>, FullJid, BareJid]), 1122: Jid = rpc(Node, jid, from_binary, [FullJid]), 1123: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1124: 1125: set_mapping(Node, Client, Mapping) -> 1126: {FullJid, BareJid} = jids(Client), 1127: redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]), 1128: Jid = rpc(Node, jid, from_binary, [FullJid]), 1129: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1130: 1131: jids(Client) -> 1132: FullJid = escalus_client:full_jid(Client), 1133: BareJid = escalus_client:short_jid(Client), 1134: {FullJid, BareJid}. 1135: 1136: redis_query(Node, Query) -> 1137: {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]), 1138: rpc(Node, eredis, q, [RedisWorker, Query]). 1139: 1140: %% A fake address we don't try to connect to. 1141: %% Used in test_advertised_endpoints_override_endpoints testcase. 1142: advertised_endpoints() -> 1143: [ 1144: {fake_domain(), get_port(reg, gd_endpoint_port)} 1145: ]. 1146: 1147: fake_domain() -> 1148: "somefakedomain.com". 1149: 1150: iptuples_to_string([]) -> 1151: []; 1152: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) -> 1153: [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)]; 1154: iptuples_to_string([E | Endps]) -> 1155: [E | iptuples_to_string(Endps)]. 1156: 1157: endpoint_opts(NodeName, ReceiverPort, Config) -> 1158: Endpoints = [endpoint(ReceiverPort)], 1159: AdvertisedEndpoints = 1160: proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints), 1161: #{endpoints => Endpoints, 1162: resolved_endpoints => [resolved_endpoint(ReceiverPort)], 1163: advertised_endpoints => AdvertisedEndpoints}. 1164: 1165: mock_inet_on_each_node() -> 1166: Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()), 1167: Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes), 1168: true = lists:all(fun(Result) -> Result =:= ok end, Results). 1169: 1170: execute_on_each_node(M, F, A) -> 1171: lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()). 1172: 1173: mock_inet() -> 1174: %% We don't want to mock inet module itself to avoid strange networking issues 1175: meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]), 1176: meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]}; 1177: (_, inet6) -> {error, "No ipv6 address"} end). 1178: 1179: unmock_inet(_Pids) -> 1180: execute_on_each_node(meck, unload, [mod_global_distrib_utils]). 1181: 1182: out_connection_sups(Node) -> 1183: Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]), 1184: lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children). 1185: 1186: trees_for_connections_present() -> 1187: AsiaChildren = out_connection_sups(asia_node), 1188: Europe1Children = out_connection_sups(europe_node1), 1189: Europe2Children = out_connection_sups(europe_node2), 1190: lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]). 1191: 1192: tree_for_sup_present(Node, ExpectedSup) -> 1193: Children = out_connection_sups(Node), 1194: lists:keyfind(ExpectedSup, 1, Children) =/= false. 1195: 1196: 1197: %% ------------------------------- rebalancing helpers ----------------------------------- 1198: 1199: spawn_connection_getter(SenderNode) -> 1200: TestPid = self(), 1201: spawn(fun() -> 1202: Conn = get_connection(SenderNode, <<"reg1">>), 1203: TestPid ! Conn 1204: end). 1205: 1206: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) -> 1207: restart_receiver(ListenNode, [Port, listen_port(ListenNode)]), 1208: set_endpoints(ListenNode, [Port, listen_port(ListenNode)]), 1209: trigger_rebalance(SenderNode, <<"reg1">>). 1210: 1211: get_connection(SenderNode, ToDomain) -> 1212: rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]). 1213: 1214: hide_extra_endpoint(ListenNode) -> 1215: set_endpoints(ListenNode, [listen_port(ListenNode)]). 1216: 1217: set_endpoints(ListenNode, Ports) -> 1218: Endpoints = [endpoint(Port) || Port <- Ports], 1219: {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]). 1220: 1221: get_outgoing_connections(NodeName, DestinationDomain) -> 1222: Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]), 1223: Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]), 1224: Enabled = rpc(NodeName, mod_global_distrib_server_mgr, 1225: get_enabled_endpoints, [DestinationDomain]), 1226: Disabled = rpc(NodeName, mod_global_distrib_server_mgr, 1227: get_disabled_endpoints, [DestinationDomain]), 1228: PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]), 1229: Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ], 1230: {Enabled, Disabled, Pools}. 1231: 1232: restart_receiver(NodeName) -> 1233: restart_receiver(NodeName, [listen_port(NodeName)]). 1234: 1235: restart_receiver(NodeName, NewPorts) -> 1236: OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts, 1237: [<<"localhost">>, mod_global_distrib_receiver]), 1238: NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts], 1239: resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]}, 1240: NewOpts = OldOpts#{connections := NewConnOpts}, 1241: Node = node_spec(NodeName), 1242: dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts). 1243: 1244: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) -> 1245: %% To ensure that the manager exists, 1246: %% otherwise we can get noproc error in the force_refresh call 1247: ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup, 1248: ensure_server_started, [DestinationDomain]), 1249: rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]), 1250: StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]), 1251: ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]), 1252: timer:sleep(1000). 1253: 1254: %% ----------------------------------------------------------------------- 1255: %% Escalus-related helpers 1256: 1257: %% Receive messages with Bodies in any order, skipping presences from stream resumption 1258: user_receives(User, Bodies) -> 1259: Opts = #{pred => fun(Stanza) -> not escalus_pred:is_presence(Stanza) end}, 1260: Checks = [fun(Stanza) -> escalus_pred:is_chat_message(Body, Stanza) end || Body <- Bodies], 1261: escalus:assert_many(Checks, [escalus_connection:receive_stanza(User, Opts) || _ <- Bodies]). 1262: 1263: %% ----------------------------------------------------------------------- 1264: %% Refreshing helpers 1265: 1266: %% Reason is a string 1267: %% NodeName is asia_node, europe_node2, ... in a format used by this suite. 1268: refresh_mappings(NodeName, Reason) when is_list(Reason) -> 1269: rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]). 1270: 1271: refresh_hosts(NodeNames, Reason) -> 1272: [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames]. 1273: 1274: 1275: %% ----------------------------------------------------------------------- 1276: %% Other helpers 1277: 1278: connect_steps_with_sm() -> 1279: [start_stream, stream_features, maybe_use_ssl, 1280: authenticate, bind, session, stream_resumption]. 1281: 1282: bare_client(Client) -> 1283: Client#client{jid = escalus_utils:get_short_jid(Client)}. 1284: 1285: service_port() -> 1286: ct:get_config({hosts, mim, service_port}). 1287: 1288: 1289: %% ----------------------------------------------------------------------- 1290: %% Waiting helpers 1291: 1292: wait_for_domain(Node, Domain) -> 1293: F = fun() -> 1294: Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []), 1295: lists:member(Domain, Domains) 1296: end, 1297: mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}). 1298: 1299: 1300: %% ----------------------------------------------------------------------- 1301: %% Ensure, that endpoints are up 1302: 1303: wait_for_listeners_to_appear() -> 1304: [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())]. 1305: 1306: receiver_ports(Hosts) -> 1307: lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts). 1308: 1309: wait_for_can_connect_to_port(Port) -> 1310: Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}}, 1311: mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts). 1312: 1313: can_connect_to_port(Port) -> 1314: case gen_tcp:connect("127.0.0.1", Port, []) of 1315: {ok, Sock} -> 1316: gen_tcp:close(Sock), 1317: true; 1318: Other -> 1319: ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]), 1320: false 1321: end. 1322: 1323: %% ----------------------------------------------------------------------- 1324: %% Custom log levels for GD modules during the tests 1325: 1326: enable_logging() -> 1327: mim_loglevel:enable_logging(test_hosts(), custom_loglevels()). 1328: 1329: disable_logging() -> 1330: mim_loglevel:disable_logging(test_hosts(), custom_loglevels()). 1331: 1332: custom_loglevels() -> 1333: %% for "s2s connection to muc.localhost not found" debugging 1334: [{ejabberd_s2s, debug}, 1335: %% for debugging event=refreshing_own_data_done 1336: {mod_global_distrib_mapping_redis, info}, 1337: %% to know if connection is already started or would be started 1338: %% event=outgoing_conn_start_progress 1339: {mod_global_distrib_outgoing_conns_sup, info}, 1340: %% to debug bound connection issues 1341: {mod_global_distrib, debug}, 1342: %% to know all new connections pids 1343: {mod_global_distrib_connection, debug}, 1344: %% to check if gc or refresh is triggered 1345: {mod_global_distrib_server_mgr, info}, 1346: %% To debug incoming connections 1347: % {mod_global_distrib_receiver, info}, 1348: %% to debug global session set/delete 1349: {mod_global_distrib_mapping, debug} 1350: ]. 1351: 1352: test_hosts() -> [mim, mim2, reg].