1: %%============================================================================== 2: %% Copyright 2017 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mod_global_distrib_SUITE). 18: 19: -compile([export_all, nowarn_export_all]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("common_test/include/ct.hrl"). 23: -include_lib("eunit/include/eunit.hrl"). 24: -include_lib("escalus/include/escalus_xmlns.hrl"). 25: -include_lib("exml/include/exml.hrl"). 26: 27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms 28: 29: -import(domain_helper, [domain/0]). 30: -import(config_parser_helper, [config/2, mod_config/2]). 31: 32: %%-------------------------------------------------------------------- 33: %% Suite configuration 34: %%-------------------------------------------------------------------- 35: 36: all() -> 37: [ 38: {group, mod_global_distrib}, 39: {group, cluster_restart}, 40: {group, start_checks}, 41: {group, invalidation}, 42: {group, multi_connection}, 43: {group, rebalancing}, 44: {group, advertised_endpoints}, 45: {group, hosts_refresher} 46: ]. 47: 48: groups() -> 49: [{mod_global_distrib, [], 50: [ 51: test_pm_between_users_at_different_locations, 52: test_pm_between_users_before_available_presence, 53: test_component_disconnect , 54: test_component_on_one_host, 55: test_components_in_different_regions, 56: test_hidden_component_disco_in_different_region, 57: test_pm_with_disconnection_on_other_server, 58: test_pm_with_graceful_reconnection_to_different_server, 59: test_pm_with_ungraceful_reconnection_to_different_server, 60: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first, 61: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first, 62: test_component_unregister, 63: test_update_senders_host, 64: test_update_senders_host_by_ejd_service, 65: 66: %% with node 2 disabled 67: test_muc_conversation_on_one_host, 68: test_global_disco 69: ]}, 70: {hosts_refresher, [], 71: [test_host_refreshing]}, 72: {cluster_restart, [], 73: [ 74: test_location_disconnect 75: ]}, 76: {start_checks, [], 77: [ 78: test_error_on_wrong_hosts 79: ]}, 80: {invalidation, [], 81: [ 82: % TODO: Add checks for other mapping refreshes 83: refresh_nodes 84: ]}, 85: {multi_connection, [], 86: [ 87: test_in_order_messages_on_multiple_connections, 88: test_in_order_messages_on_multiple_connections_with_bounce, 89: test_messages_bounced_in_order, 90: 91: %% with node 2 disabled 92: test_muc_conversation_history 93: ]}, 94: {rebalancing, [], 95: [ 96: enable_new_endpoint_on_refresh, 97: disable_endpoint_on_refresh, 98: wait_for_connection, 99: closed_connection_is_removed_from_disabled 100: ]}, 101: {advertised_endpoints, [], 102: [ 103: test_advertised_endpoints_override_endpoints, 104: test_pm_between_users_at_different_locations 105: ]} 106: ]. 107: 108: suite() -> 109: [{require, europe_node1, {hosts, mim, node}}, 110: {require, europe_node2, {hosts, mim2, node}}, 111: {require, asia_node, {hosts, reg, node}}, 112: {require, c2s_port, {hosts, mim, c2s_port}} | 113: escalus:suite()]. 114: 115: %%-------------------------------------------------------------------- 116: %% Init & teardown 117: %%-------------------------------------------------------------------- 118: 119: init_per_suite(Config) -> 120: case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]), 121: rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of 122: {{ok, _}, {ok, _}} -> 123: ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]), 124: 125: enable_logging(), 126: escalus:init_per_suite([{add_advertised_endpoints, []}, {extra_config, #{}} | Config]); 127: Result -> 128: ct:pal("Redis check result: ~p", [Result]), 129: {skip, "GD Redis default pool not available"} 130: end. 131: 132: end_per_suite(Config) -> 133: disable_logging(), 134: escalus_fresh:clean(), 135: rpc(europe_node2, mongoose_cluster, leave, []), 136: escalus:end_per_suite(Config). 137: 138: init_per_group(start_checks, Config) -> 139: NodeName = europe_node1, 140: NodeSpec = node_spec(NodeName), 141: Config1 = dynamic_modules:save_modules(NodeSpec, [domain()], Config), 142: dynamic_modules:ensure_modules(NodeSpec, domain(), [{mod_global_distrib, stopped}]), 143: Config1; 144: init_per_group(multi_connection, Config) -> 145: ExtraConfig = #{bounce => #{resend_after_ms => 20000}, 146: connections => #{%% Disable unused feature to avoid interference 147: connections_per_endpoint => 100, 148: disabled_gc_interval => 10000}}, 149: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 150: init_per_group(invalidation, Config) -> 151: Config1 = init_per_group(invalidation_generic, Config), 152: NodeBin = <<"fake_node@localhost">>, 153: [{node_to_expire, NodeBin} | Config1]; 154: init_per_group(rebalancing, Config) -> 155: %% We need to prevent automatic refreshes, because they may interfere with tests 156: %% and we need early disabled garbage collection to check its validity 157: ExtraConfig = #{connections => #{endpoint_refresh_interval => 3600, 158: endpoint_refresh_interval_when_empty => 3600, 159: disabled_gc_interval => 1}, 160: redis => #{refresh_after => 3600}}, 161: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 162: init_per_group(advertised_endpoints, Config) -> 163: lists:foreach(fun({NodeName, _, _}) -> 164: Node = ct:get_config(NodeName), 165: mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload) 166: end, get_hosts()), 167: mock_inet_on_each_node(), 168: init_per_group_generic( 169: [{add_advertised_endpoints, 170: [{asia_node, advertised_endpoints()}]} | Config]); 171: init_per_group(mod_global_distrib, Config) -> 172: %% Disable mod_global_distrib_mapping_redis refresher 173: ExtraConfig = #{redis => #{refresh_after => 3600}}, 174: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 175: init_per_group(_, Config) -> 176: init_per_group_generic(Config). 177: 178: init_per_group_generic(Config0) -> 179: Config2 = lists:foldl(fun init_modules_per_node/2, Config0, get_hosts()), 180: wait_for_listeners_to_appear(), 181: {SomeNode, _, _} = hd(get_hosts()), 182: NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []), 183: [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2]. 184: 185: init_modules_per_node({NodeName, LocalHost, ReceiverPort}, Config0) -> 186: Extra0 = module_opts(?config(extra_config, Config0)), 187: EndpointOpts = endpoint_opts(NodeName, ReceiverPort, Config0), 188: ConnExtra = maps:merge(EndpointOpts, maps:get(connections, Extra0, #{})), 189: Extra = Extra0#{local_host => LocalHost, connections => ConnExtra}, 190: Opts = module_opts(Extra), 191: 192: VirtHosts = virtual_hosts(NodeName), 193: Node = node_spec(NodeName), 194: Config1 = dynamic_modules:save_modules(Node, VirtHosts, Config0), 195: 196: %% To reduce load when sending many messages 197: ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last, 198: mod_stream_management], 199: [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || VirtHost <- VirtHosts], 200: 201: SMOpts = config_parser_helper:mod_config(mod_stream_management, #{resume_timeout => 1}), 202: dynamic_modules:ensure_modules(Node, domain(), [{mod_global_distrib, Opts}, 203: {mod_stream_management, SMOpts}]), 204: Config1. 205: 206: module_opts(ExtraOpts) -> 207: lists:foldl(fun set_opts/2, ExtraOpts, [common, defaults, connections, redis, bounce]). 208: 209: set_opts(common, Opts) -> 210: maps:merge(#{global_host => <<"localhost">>, 211: hosts_refresh_interval => ?HOSTS_REFRESH_INTERVAL}, Opts); 212: set_opts(defaults, Opts) -> 213: mod_config(mod_global_distrib, Opts); 214: set_opts(connections, #{connections := ConnExtra} = Opts) -> 215: TLSOpts = config([modules, mod_global_distrib, connections, tls], 216: #{certfile => "priv/ssl/fake_server.pem", 217: cacertfile => "priv/ssl/ca/cacert.pem"}), 218: Opts#{connections := config([modules, mod_global_distrib, connections], 219: maps:merge(#{tls => TLSOpts}, ConnExtra))}; 220: set_opts(redis, #{redis := RedisExtra} = Opts) -> 221: Opts#{redis := config([modules, mod_global_distrib, redis], RedisExtra)}; 222: set_opts(bounce, #{bounce := BounceExtra} = Opts) -> 223: Opts#{bounce := config([modules, mod_global_distrib, bounce], BounceExtra)}; 224: set_opts(_, Opts) -> 225: Opts. 226: 227: end_per_group(advertised_endpoints, Config) -> 228: Pids = ?config(meck_handlers, Config), 229: unmock_inet(Pids), 230: escalus_fresh:clean(), 231: end_per_group_generic(Config); 232: end_per_group(start_checks, Config) -> 233: escalus_fresh:clean(), 234: Config; 235: end_per_group(invalidation, Config) -> 236: redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config), 237: ?config(node_to_expire, Config)]), 238: end_per_group_generic(Config); 239: end_per_group(_, Config) -> 240: end_per_group_generic(Config). 241: 242: end_per_group_generic(Config) -> 243: dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config). 244: 245: init_per_testcase(CaseName, Config) 246: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 247: CaseName == test_muc_conversation_history -> 248: %% There is no helper to load MUC on node2 249: %% For now it's easier to hide node2 250: %% TODO: Do it right at some point! 251: hide_node(europe_node2, Config), 252: %% There would be no new connections to europe_node2, but there can be some old ones. 253: %% We need to disconnect previous connections. 254: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 255: trigger_rebalance(asia_node, EuropeHost), 256: %% Load muc on mim node 257: muc_helper:load_muc(), 258: RegNode = ct:get_config({hosts, reg, node}), 259: %% Wait for muc.localhost to become visible from reg node 260: wait_for_domain(RegNode, muc_helper:muc_host()), 261: escalus:init_per_testcase(CaseName, Config); 262: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 263: CN == test_pm_with_ungraceful_reconnection_to_different_server; 264: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 265: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 266: escalus:init_per_testcase(CN, init_user_eve(Config)); 267: init_per_testcase(CaseName, Config) -> 268: escalus:init_per_testcase(CaseName, Config). 269: 270: init_user_eve(Config) -> 271: %% Register Eve in reg cluster 272: EveSpec = escalus_fresh:create_fresh_user(Config, eve), 273: MimPort = ct:get_config({hosts, mim, c2s_port}), 274: EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}), 275: %% Register Eve in mim cluster 276: escalus:create_users(Config, [{eve, EveSpec2}]), 277: [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config]. 278: 279: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 280: CN == test_pm_with_ungraceful_reconnection_to_different_server; 281: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 282: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 283: MimEveSpec = ?config(evespec_mim, Config), 284: %% Clean Eve from reg cluster 285: escalus_fresh:clean(), 286: %% Clean Eve from mim cluster 287: %% For shared databases (i.e. mysql, pgsql...), 288: %% removing from one cluster would remove from all clusters. 289: %% For mnesia auth backend we need to call removal from each cluster. 290: %% That's why there is a catch here. 291: catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]), 292: generic_end_per_testcase(CN, Config); 293: end_per_testcase(CaseName, Config) 294: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 295: CaseName == test_muc_conversation_history -> 296: refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)), 297: muc_helper:unload_muc(), 298: generic_end_per_testcase(CaseName, Config); 299: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) -> 300: refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 301: generic_end_per_testcase(CN, Config); 302: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh; 303: CN == disable_endpoint_on_refresh; 304: CN == wait_for_connection; 305: CN == closed_connection_is_removed_from_disabled -> 306: restart_receiver(asia_node), 307: refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 308: generic_end_per_testcase(CN, Config); 309: end_per_testcase(CaseName, Config) -> 310: generic_end_per_testcase(CaseName, Config). 311: 312: generic_end_per_testcase(CaseName, Config) -> 313: lists:foreach( 314: fun({NodeName, _, _}) -> 315: %% TODO: Enable refresher only for specific test cases, 316: %% as some of them are based on assumption that node(s) 317: %% must open new connections during tests. 318: pause_refresher(NodeName, CaseName), 319: Node = ct:get_config(NodeName), 320: SupRef = {mod_global_distrib_outgoing_conns_sup, Node}, 321: try 322: OutgoingConns = supervisor:which_children(SupRef), 323: lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) -> 324: skip; 325: ({Id, _, _, _}) -> 326: supervisor:terminate_child(SupRef, Id) 327: end, OutgoingConns), 328: [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] = 329: supervisor:which_children(SupRef) 330: catch 331: _:{noproc, _} -> 332: ct:pal("Sender supervisor not found in ~p", [NodeName]) 333: end, 334: unpause_refresher(NodeName, CaseName) 335: end, 336: get_hosts()), 337: escalus:end_per_testcase(CaseName, Config). 338: 339: virtual_hosts(asia_node) -> 340: [domain()]; 341: virtual_hosts(_) -> 342: [domain(), secondary_domain()]. 343: 344: secondary_domain() -> 345: ct:get_config({hosts, mim, secondary_domain}). 346: 347: %% Refresher is not started at all or stopped for some test cases 348: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 349: pause_refresher(_, test_error_on_wrong_hosts) -> 350: ok; 351: pause_refresher(asia_node, test_location_disconnect) -> 352: ok; 353: pause_refresher(NodeName, _) -> 354: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []). 355: 356: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 357: unpause_refresher(_, test_error_on_wrong_hosts) -> 358: ok; 359: unpause_refresher(asia_node, test_location_disconnect) -> 360: ok; 361: unpause_refresher(NodeName, _) -> 362: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []). 363: 364: %%-------------------------------------------------------------------- 365: %% Service discovery test 366: %%-------------------------------------------------------------------- 367: 368: %% Requires module mod_global_distrib to be started with argument advertised_endpoints 369: %% for each host in get_hosts(). 370: %% Reads Redis to confirm that endpoints (in Redis) are overwritten 371: %% with `advertised_endpoints` option value 372: test_advertised_endpoints_override_endpoints(_Config) -> 373: Endps = execute_on_each_node(mod_global_distrib_mapping_redis, get_endpoints, [<<"reg1">>]), 374: true = lists:all( 375: fun(E) -> 376: lists:sort(E) =:= lists:sort(advertised_endpoints()) 377: end, Endps). 378: 379: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if 380: %% it goes down for some reason (crash or domain unavailability). 381: %% Also actually verifies that refresher properly reads host list 382: %% from backend and starts appropriate pool. 383: test_host_refreshing(_Config) -> 384: mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true, 385: #{name => trees_for_connections_present, 386: time_left => timer:seconds(10)}), 387: ConnectionSups = out_connection_sups(asia_node), 388: {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 389: EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [EuropeHost]), 390: {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups), 391: erlang:exit(EuropePid, kill), % it's ok to kill temporary process 392: mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true, 393: #{name => tree_for_sup_present}). 394: 395: %% When run in mod_global_distrib group - tests simple case of connection 396: %% between two users connected to different clusters. 397: %% When run in advertised_endpoints group it tests whether it is possible 398: %% to connect to a node that is advertising itself with a domain name. 399: test_pm_between_users_at_different_locations(Config) -> 400: escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2). 401: 402: test_pm_between_users_before_available_presence(Config) -> 403: Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]), 404: {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>), 405: {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>), 406: 407: test_two_way_pm(Alice, Eve), 408: 409: escalus_client:stop(Config1, Alice), 410: escalus_client:stop(Config1, Eve). 411: 412: test_two_way_pm(Alice, Eve) -> 413: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)), 414: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)), 415: 416: FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)), 417: FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)), 418: 419: AliceJid = escalus_client:full_jid(Alice), 420: EveJid = escalus_client:full_jid(Eve), 421: 422: escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>], 423: FromAlice), 424: escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>], 425: FromEve). 426: 427: test_muc_conversation_on_one_host(Config0) -> 428: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 429: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 430: escalus:fresh_story( 431: Config, [{eve, 1}], 432: fun(Eve) -> 433: Alice = connect_from_spec(AliceSpec, Config), 434: 435: RoomJid = ?config(room, Config), 436: AliceUsername = escalus_utils:get_username(Alice), 437: EveUsername = escalus_utils:get_username(Eve), 438: RoomAddr = muc_helper:room_address(RoomJid), 439: 440: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 441: wait_for_muc_presence(Alice, RoomJid, AliceUsername), 442: wait_for_subject(Alice), 443: 444: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 445: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 446: wait_for_muc_presence(Eve, RoomJid, EveUsername), 447: wait_for_muc_presence(Alice, RoomJid, EveUsername), 448: wait_for_subject(Eve), 449: 450: Msg= <<"Hi, Eve!">>, 451: escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)), 452: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)), 453: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)), 454: 455: Msg2= <<"Hi, Alice!">>, 456: escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)), 457: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)), 458: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice)) 459: end), 460: muc_helper:destroy_room(Config). 461: 462: test_muc_conversation_history(Config0) -> 463: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 464: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 465: escalus:fresh_story( 466: Config, [{eve, 1}], 467: fun(Eve) -> 468: Alice = connect_from_spec(AliceSpec, Config), 469: 470: RoomJid = ?config(room, Config), 471: AliceUsername = escalus_utils:get_username(Alice), 472: RoomAddr = muc_helper:room_address(RoomJid), 473: 474: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 475: wait_for_muc_presence(Alice, RoomJid, AliceUsername), 476: wait_for_subject(Alice), 477: 478: send_n_muc_messages(Alice, RoomAddr, 3), 479: 480: %% Ensure that the messages are received by the room 481: %% before trying to login Eve. 482: %% Otherwise, Eve would receive some messages from history and 483: %% some as regular groupchat messages. 484: receive_n_muc_messages(Alice, 3), 485: 486: EveUsername = escalus_utils:get_username(Eve), 487: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 488: 489: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 490: wait_for_muc_presence(Eve, RoomJid, EveUsername), 491: wait_for_muc_presence(Alice, RoomJid, EveUsername), 492: 493: %% XEP-0045: After sending the presence broadcast (and only after doing so), 494: %% the service MAY then send discussion history, the room subject, 495: %% live messages, presence updates, and other in-room traffic. 496: receive_n_muc_messages(Eve, 3), 497: wait_for_subject(Eve) 498: end), 499: muc_helper:destroy_room(Config). 500: 501: wait_for_muc_presence(User, RoomJid, FromNickname) -> 502: Presence = escalus:wait_for_stanza(User), 503: escalus:assert(is_presence, Presence), 504: escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence), 505: ok. 506: 507: wait_for_subject(User) -> 508: Subject = escalus:wait_for_stanza(User), 509: escalus:assert(is_groupchat_message, Subject), 510: ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)), 511: ok. 512: 513: send_n_muc_messages(User, RoomAddr, N) -> 514: lists:foreach(fun(I) -> 515: Msg = <<"test-", (integer_to_binary(I))/binary>>, 516: escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg)) 517: end, lists:seq(1, N)). 518: 519: receive_n_muc_messages(User, N) -> 520: lists:foreach(fun(J) -> 521: Msg = <<"test-", (integer_to_binary(J))/binary>>, 522: Stanza = escalus:wait_for_stanza(User), 523: escalus:assert(is_groupchat_message, [Msg], Stanza) 524: end, lists:seq(1, N)). 525: 526: test_component_on_one_host(Config) -> 527: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 528: {port, service_port()}, {component, <<"test_service">>}], 529: 530: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 531: 532: Story = fun(User) -> 533: Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>), 534: escalus:send(User, Msg1), 535: %% Then component receives it 536: Reply1 = escalus:wait_for_stanza(Comp), 537: escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1), 538: 539: %% When components sends a reply 540: Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>), 541: escalus:send(Comp, escalus_stanza:from(Msg2, Addr)), 542: 543: %% Then Alice receives it 544: Reply2 = escalus:wait_for_stanza(User), 545: escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2), 546: escalus:assert(is_stanza_from, [Addr], Reply2) 547: end, 548: 549: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 550: 551: %% Ensures that 2 components in distinct data centers can communicate. 552: test_components_in_different_regions(_Config) -> 553: ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>}, 554: {server, <<"localhost">>}, {component, <<"test_service">>}], 555: Comp1Port = ct:get_config({hosts, mim, service_port}), 556: Comp2Port = ct:get_config({hosts, reg, service_port}), 557: Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig], 558: Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig], 559: 560: {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config), 561: {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config), 562: 563: Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1), 564: escalus:send(Comp1, Msg1), 565: GotMsg1 = escalus:wait_for_stanza(Comp2), 566: escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1), 567: 568: Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2), 569: escalus:send(Comp2, Msg2), 570: GotMsg2 = escalus:wait_for_stanza(Comp1), 571: escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2). 572: 573: %% Ordinary user is not able to discover the hidden component from GD 574: test_hidden_component_disco_in_different_region(Config) -> 575: %% Hidden component from component_SUITE connects to mim1/europe_node1 576: HiddenComponentConfig = component_helper:spec(hidden_component, Config), 577: {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig), 578: 579: escalus:fresh_story( 580: Config, [{eve, 1}], 581: fun(Eve) -> 582: EveServer = escalus_client:server(Eve), 583: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 584: DiscoReply = escalus:wait_for_stanza(Eve), 585: escalus:assert(is_iq_result, DiscoReply), 586: escalus:assert(fun(Stanza) -> 587: not escalus_pred:has_service(HiddenAddr, Stanza) 588: end, DiscoReply) 589: end). 590: 591: test_component_disconnect(Config) -> 592: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 593: {port, service_port()}, {component, <<"test_service">>}], 594: 595: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 596: component_helper:disconnect_component(Comp, Addr), 597: 598: Story = fun(User) -> 599: escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)), 600: Error = escalus:wait_for_stanza(User, 5000), 601: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 602: end, 603: 604: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 605: 606: test_location_disconnect(Config) -> 607: try 608: escalus:fresh_story( 609: Config, [{alice, 1}, {eve, 1}], 610: fun(Alice, Eve) -> 611: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 612: 613: escalus_client:wait_for_stanza(Eve), 614: 615: ok = rpc(asia_node, application, stop, [mongooseim]), 616: %% TODO: Stopping mongooseim alone should probably stop connections too 617: ok = rpc(asia_node, application, stop, [ranch]), 618: 619: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), 620: Error = escalus:wait_for_stanza(Alice), 621: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 622: end) 623: after 624: rpc(asia_node, application, start, [ranch]), 625: rpc(asia_node, application, start, [mongooseim]) 626: end. 627: 628: test_pm_with_disconnection_on_other_server(Config) -> 629: escalus:fresh_story( 630: Config, [{alice, 1}, {eve, 1}], 631: fun(Alice, Eve) -> 632: escalus_connection:stop(Eve), 633: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 634: FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000), 635: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce) 636: end). 637: 638: test_pm_with_graceful_reconnection_to_different_server(Config) -> 639: EveSpec = ?config(evespec_reg, Config), 640: EveSpec2 = ?config(evespec_mim, Config), 641: escalus:fresh_story( 642: Config, [{alice, 1}], 643: fun(Alice) -> 644: Eve = connect_from_spec(EveSpec, Config), 645: 646: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)), 647: 648: %% Stop connection and wait for process to die 649: EveNode = ct:get_config({hosts, reg, node}), 650: mongoose_helper:logout_user(Config, Eve, #{node => EveNode}), 651: 652: FromEve = escalus_client:wait_for_stanza(Alice), 653: 654: %% Pause Alice until Eve is reconnected 655: AliceNode = ct:get_config({hosts, mim, node}), 656: C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}), 657: ok = rpc(asia_node, sys, suspend, [C2sPid]), 658: 659: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)), 660: 661: NewEve = connect_from_spec(EveSpec2, Config), 662: 663: ok = rpc(asia_node, sys, resume, [C2sPid]), 664: 665: 666: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)), 667: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)), 668: 669: FirstFromAlice = escalus_client:wait_for_stanza(NewEve), 670: AgainFromEve = escalus_client:wait_for_stanza(Alice), 671: SecondFromAlice = escalus_client:wait_for_stanza(NewEve), 672: 673: [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]), 674: 675: escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice), 676: escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve), 677: escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice), 678: escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve) 679: end). 680: 681: %% Refresh logic can cause two possible behaviours. 682: %% We test both behaviours here (plus no refresh case) 683: %% See PR #2392 684: test_pm_with_ungraceful_reconnection_to_different_server(Config) -> 685: %% No refresh 686: BeforeResume = fun() -> ok end, 687: AfterCheck = fun(Alice, NewEve) -> 688: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 689: user_receives(Alice, [<<"Hi from Europe!">>]) 690: end, 691: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 692: 693: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) -> 694: %% Same as no refresh 695: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first", 696: % Order of nodes is important here in refresh_hosts! 697: BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end, 698: AfterCheck = fun(Alice, NewEve) -> 699: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 700: user_receives(Alice, [<<"Hi from Europe!">>]) 701: end, 702: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 703: 704: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) -> 705: %% Asia node overrides Europe value with the older ones, 706: %% so we loose some messages during rerouting :( 707: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first", 708: BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end, 709: AfterCheck = fun(Alice, NewEve) -> 710: user_receives(NewEve, [<<"Hi again from Europe1!">>]), 711: user_receives(Alice, [<<"Hi from Europe!">>]) 712: end, 713: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 714: 715: %% Reconnect Eve from asia (reg cluster) to europe (mim) 716: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) -> 717: Config = escalus_users:update_userspec(Config0, eve, stream_management, true), 718: EveSpec = ?config(evespec_reg, Config), 719: EveSpec2 = ?config(evespec_mim, Config), 720: escalus:fresh_story( 721: Config, [{alice, 1}], 722: fun(Alice) -> 723: {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()), 724: escalus_story:send_initial_presence(Eve), 725: escalus_client:wait_for_stanza(Eve), 726: 727: %% Stop connection and wait for process to die 728: EveNode = ct:get_config({hosts, reg, node}), 729: C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}), 730: ok = rpc(asia_node, sys, suspend, [C2sPid]), 731: 732: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)), 733: 734: %% Wait for route message to be queued in c2s message queue 735: mongoose_helper:wait_for_route_message_count(C2sPid, 1), 736: 737: %% Time to do bad nasty things with our socket, so once our process wakes up, 738: %% it SHOULD detect a dead socket 739: escalus_connection:kill(Eve), 740: 741: %% Connect another one, we hope the message would be rerouted 742: NewEve = connect_from_spec(EveSpec2, Config), 743: 744: BeforeResume(), 745: 746: %% Trigger rerouting 747: ok = rpc(asia_node, sys, resume, [C2sPid]), 748: 749: %% Let C2sPid to process the message and reroute (and die finally, poor little thing) 750: mongoose_helper:wait_for_pid_to_die(C2sPid), 751: 752: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)), 753: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)), 754: 755: AfterCheck(Alice, NewEve) 756: end). 757: 758: test_global_disco(Config) -> 759: escalus:fresh_story( 760: Config, [{alice, 1}, {eve, 1}], 761: fun(Alice, Eve) -> 762: AliceServer = escalus_client:server(Alice), 763: escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)), 764: _AliceStanza = escalus:wait_for_stanza(Alice), 765: %% TODO: test for duplicate components 766: %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza), 767: 768: EveServer = escalus_client:server(Eve), 769: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 770: EveStanza = escalus:wait_for_stanza(Eve), 771: escalus:assert(has_service, [muc_helper:muc_host()], EveStanza) 772: end). 773: 774: test_component_unregister(_Config) -> 775: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 776: {port, service_port()}, {component, <<"test_service">>}], 777: 778: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 779: ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 780: [<<"test_service.localhost">>])), 781: 782: component_helper:disconnect_component(Comp, Addr), 783: 784: ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 785: [<<"test_service.localhost">>])). 786: 787: test_error_on_wrong_hosts(_Config) -> 788: Opts = module_opts(#{local_host => <<"no_such_host">>}), 789: ?assertException(error, {badrpc, {'EXIT', {#{what := check_host_failed, 790: domain := <<"no_such_host">>}, _}}}, 791: dynamic_modules:ensure_modules(node_spec(europe_node1), <<"localhost">>, 792: [{mod_global_distrib, Opts}])). 793: 794: refresh_nodes(Config) -> 795: NodesKey = ?config(nodes_key, Config), 796: NodeBin = ?config(node_to_expire, Config), 797: redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]), 798: refresh_mappings(europe_node1, "by_refresh_nodes"), 799: {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]). 800: 801: test_in_order_messages_on_multiple_connections(Config) -> 802: escalus:fresh_story( 803: Config, [{alice, 1}, {eve, 1}], 804: fun(Alice, Eve) -> 805: Seq = lists:seq(1, 100), 806: lists:foreach( 807: fun(I) -> 808: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 809: escalus_client:send(Alice, Stanza) 810: end, 811: Seq), 812: lists:foreach( 813: fun(I) -> 814: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 815: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 816: end, 817: Seq) 818: end). 819: 820: test_in_order_messages_on_multiple_connections_with_bounce(Config) -> 821: escalus:fresh_story( 822: Config, [{alice, 1}, {eve, 1}], 823: fun(Alice, Eve) -> 824: %% Send 99 messages, some while server knows the mapping and some when it doesn't 825: send_steps(Alice, Eve, 99, <<"reg1">>), 826: %% Make sure that the last message is sent when the mapping is known 827: set_mapping(europe_node1, Eve, <<"reg1">>), 828: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)), 829: 830: %% Check that all stanzas were received in order 831: lists:foreach( 832: fun(I) -> 833: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 834: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 835: end, 836: lists:seq(1, 100)) 837: end). 838: 839: test_messages_bounced_in_order(Config) -> 840: escalus:fresh_story( 841: Config, [{alice, 1}, {eve, 1}], 842: fun(Alice, Eve) -> 843: %% Make sure all messages land in bounce storage 844: delete_mapping(europe_node1, Eve), 845: 846: Seq = lists:seq(1, 100), 847: lists:foreach( 848: fun(I) -> 849: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 850: escalus_client:send(Alice, Stanza) 851: end, 852: Seq), 853: 854: %% Restore the mapping so that bounce eventually succeeds 855: ?assertEqual(undefined, get_mapping(europe_node1, Eve)), 856: set_mapping(europe_node1, Eve, <<"reg1">>), 857: 858: lists:foreach( 859: fun(I) -> 860: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 861: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 862: end, 863: Seq) 864: end). 865: 866: test_update_senders_host(Config) -> 867: escalus:fresh_story( 868: Config, [{alice, 1}, {eve, 1}], 869: fun(Alice, Eve) -> 870: AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]), 871: {ok, <<"localhost.bis">>} 872: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]), 873: ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]), 874: wait_for_node(asia_node, AliceJid), 875: 876: %% TODO: Should prevent Redis refresher from executing for a moment, 877: %% as it may collide with this test. 878: 879: escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)), 880: escalus:wait_for_stanza(Eve), 881: 882: {ok, <<"localhost.bis">>} 883: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]) 884: end). 885: wait_for_node(Node,Jid) -> 886: mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end, 887: error, 888: #{time_left => timer:seconds(10), 889: sleep_time => timer:seconds(1), 890: name => rpc}). 891: 892: test_update_senders_host_by_ejd_service(Config) -> 893: refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"), 894: %% Connects to europe_node1 895: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 896: {port, service_port()}, {component, <<"test_service">>}], 897: 898: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 899: 900: escalus:fresh_story( 901: Config, [{eve, 1}], 902: fun(Eve) -> 903: %% Eve is connected to asia_node 904: EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]), 905: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 906: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]), 907: 908: ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]), 909: wait_for_node(europe_node1, EveJid), 910: wait_for_node(europe_node2, EveJid), 911: 912: %% Component is connected to europe_node1 913: %% but we force asia_node to connect to europe_node2 by hiding europe_node1 914: %% and forcing rebalance (effectively disabling connections to europe_node1) 915: %% to verify routing cache update on both nodes 916: 917: %% TODO: Should prevent Redis refresher from executing for a moment, 918: %% as it may collide with this test. 919: 920: hide_node(europe_node1, Config), 921: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 922: trigger_rebalance(asia_node, EuropeHost), 923: 924: escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)), 925: escalus:wait_for_stanza(Comp), 926: 927: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 928: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]) 929: end). 930: 931: %% -------------------------------- Rebalancing -------------------------------- 932: 933: enable_new_endpoint_on_refresh(Config) -> 934: get_connection(europe_node1, <<"reg1">>), 935: 936: {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 937: 938: ExtraPort = get_port(reg, gd_extra_endpoint_port), 939: NewEndpoint = resolved_endpoint(ExtraPort), 940: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 941: 942: {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 943: 944: %% One new pool and one new endpoint 945: [NewEndpoint] = Pools2 -- Pools1, 946: [] = Pools1 -- Pools2, 947: [NewEndpoint] = Enabled2 -- Enabled1, 948: [] = Enabled1 -- Enabled2. 949: 950: disable_endpoint_on_refresh(Config) -> 951: ExtraPort = get_port(reg, gd_extra_endpoint_port), 952: NewEndpoint = resolved_endpoint(ExtraPort), 953: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 954: 955: get_connection(europe_node1, <<"reg1">>), 956: 957: {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 958: [_, _] = Enabled1, 959: [] = Disabled1, 960: 961: hide_extra_endpoint(asia_node), 962: trigger_rebalance(europe_node1, <<"reg1">>), 963: 964: {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 965: 966: %% 2 pools open even after disable 967: [] = Pools1 -- Pools2, 968: [] = Pools2 -- Pools1, 969: %% NewEndpoint is no longer enabled 970: [] = Enabled2 -- Enabled1, 971: [NewEndpoint] = Enabled1 -- Enabled2, 972: %% NewEndpoint is now disabled 973: [] = Disabled1, 974: [NewEndpoint] = Disabled2. 975: 976: wait_for_connection(_Config) -> 977: set_endpoints(asia_node, []), 978: %% Because of hosts refresher, a pool of connections to asia_node 979: %% may already be present here 980: mongoose_helper:wait_until( 981: fun () -> 982: try trigger_rebalance(europe_node1, <<"reg1">>), true 983: catch _:_ -> false end 984: end, 985: true, 986: #{name => rebalance, time_left => timer:seconds(5)}), 987: 988: spawn_connection_getter(europe_node1), 989: 990: receive 991: Unexpected1 -> error({unexpected, Unexpected1}) 992: after 993: 2000 -> ok 994: end, 995: 996: refresh_mappings(asia_node, "by_wait_for_connection"), 997: trigger_rebalance(europe_node1, <<"reg1">>), 998: 999: receive 1000: Conn when is_pid(Conn) -> ok; 1001: Unexpected2 -> error({unexpected, Unexpected2}) 1002: after 1003: 5000 -> error(timeout) 1004: end. 1005: 1006: closed_connection_is_removed_from_disabled(_Config) -> 1007: get_connection(europe_node1, <<"reg1">>), 1008: set_endpoints(asia_node, []), 1009: trigger_rebalance(europe_node1, <<"reg1">>), 1010: 1011: {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>), 1012: 1013: % Will drop connections and prevent them from reconnecting 1014: restart_receiver(asia_node, [get_port(reg, gd_supplementary_endpoint_port)]), 1015: 1016: mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end, 1017: {[], [], []}, 1018: #{name => get_outgoing_connections}). 1019: 1020: 1021: %%-------------------------------------------------------------------- 1022: %% Test helpers 1023: %%-------------------------------------------------------------------- 1024: 1025: get_port(Host, Param) -> 1026: case ct:get_config({hosts, Host, Param}) of 1027: Port when is_integer(Port) -> 1028: Port; 1029: Other -> 1030: ct:fail({get_port_failed, Host, Param, Other}) 1031: end. 1032: 1033: get_hosts() -> 1034: [ 1035: {europe_node1, <<"localhost.bis">>, get_port(mim, gd_endpoint_port)}, 1036: {europe_node2, <<"localhost.bis">>, get_port(mim2, gd_endpoint_port)}, 1037: {asia_node, <<"reg1">>, get_port(reg, gd_endpoint_port)} 1038: ]. 1039: 1040: listen_endpoint(NodeName) -> 1041: endpoint(listen_port(NodeName)). 1042: 1043: listen_port(NodeName) -> 1044: {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()), 1045: Port. 1046: 1047: resolved_endpoint(Port) when is_integer(Port) -> 1048: {{127, 0, 0, 1}, Port}. 1049: 1050: endpoint(Port) when is_integer(Port) -> 1051: {"127.0.0.1", Port}. 1052: 1053: %% For dynamic_modules 1054: node_spec(NodeName) -> 1055: #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}. 1056: 1057: rpc(NodeName, M, F, A) -> 1058: Node = ct:get_config(NodeName), 1059: mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)). 1060: 1061: hide_node(NodeName, Config) -> 1062: NodesKey = ?config(nodes_key, Config), 1063: NodeBin = atom_to_binary(ct:get_config(NodeName), latin1), 1064: {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]). 1065: 1066: connect_from_spec(UserSpec, Config) -> 1067: {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>), 1068: escalus_story:send_initial_presence(User), 1069: escalus:assert(is_presence, escalus_client:wait_for_stanza(User)), 1070: User. 1071: 1072: chat_with_seqnum(To, Text) -> 1073: escalus_stanza:set_id(escalus_stanza:chat_to(To, Text), 1074: integer_to_binary(erlang:monotonic_time())). 1075: 1076: order_by_seqnum(Stanzas) -> 1077: lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end, 1078: Stanzas). 1079: 1080: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) -> 1081: Pred = fun(Item) -> 1082: exml_query:attr(Item, <<"jid">>) =:= Service 1083: end, 1084: case lists:filter(Pred, Services) of 1085: [_] -> true; 1086: _ -> false 1087: end. 1088: 1089: send_steps(From, To, Max, ToHost) -> 1090: next_send_step(From, To, 1, Max, Max div 10, true, ToHost). 1091: 1092: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok; 1093: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) -> 1094: ct:log("Reset: I: ~B", [I]), 1095: case KnowsMapping of 1096: true -> delete_mapping(europe_node1, To); 1097: false -> set_mapping(europe_node1, To, ToHost) 1098: end, 1099: next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost); 1100: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) -> 1101: ct:log("I: ~B ~B ~B", [I, Max, ToReset]), 1102: Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)), 1103: escalus_client:send(From, Stanza), 1104: next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost). 1105: 1106: get_mapping(Node, Client) -> 1107: {FullJid, _BareJid} = jids(Client), 1108: {ok, What} = redis_query(Node, [<<"GET">>, FullJid]), 1109: What. 1110: 1111: %% Warning! May not work properly with alice or any other user whose 1112: %% stringprepped JID is different than original one 1113: delete_mapping(Node, Client) -> 1114: {FullJid, BareJid} = jids(Client), 1115: redis_query(Node, [<<"DEL">>, FullJid, BareJid]), 1116: Jid = rpc(Node, jid, from_binary, [FullJid]), 1117: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1118: 1119: set_mapping(Node, Client, Mapping) -> 1120: {FullJid, BareJid} = jids(Client), 1121: redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]), 1122: Jid = rpc(Node, jid, from_binary, [FullJid]), 1123: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1124: 1125: jids(Client) -> 1126: FullJid = escalus_client:full_jid(Client), 1127: BareJid = escalus_client:short_jid(Client), 1128: {FullJid, BareJid}. 1129: 1130: redis_query(Node, Query) -> 1131: {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]), 1132: rpc(Node, eredis, q, [RedisWorker, Query]). 1133: 1134: %% A fake address we don't try to connect to. 1135: %% Used in test_advertised_endpoints_override_endpoints testcase. 1136: advertised_endpoints() -> 1137: [ 1138: {fake_domain(), get_port(reg, gd_endpoint_port)} 1139: ]. 1140: 1141: fake_domain() -> 1142: "somefakedomain.com". 1143: 1144: iptuples_to_string([]) -> 1145: []; 1146: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) -> 1147: [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)]; 1148: iptuples_to_string([E | Endps]) -> 1149: [E | iptuples_to_string(Endps)]. 1150: 1151: endpoint_opts(NodeName, ReceiverPort, Config) -> 1152: Endpoints = [endpoint(ReceiverPort)], 1153: AdvertisedEndpoints = 1154: proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), Endpoints), 1155: #{endpoints => Endpoints, 1156: resolved_endpoints => [resolved_endpoint(ReceiverPort)], 1157: advertised_endpoints => AdvertisedEndpoints}. 1158: 1159: mock_inet_on_each_node() -> 1160: Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()), 1161: Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes), 1162: true = lists:all(fun(Result) -> Result =:= ok end, Results). 1163: 1164: execute_on_each_node(M, F, A) -> 1165: lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()). 1166: 1167: mock_inet() -> 1168: %% We don't want to mock inet module itself to avoid strange networking issues 1169: meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]), 1170: meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]}; 1171: (_, inet6) -> {error, "No ipv6 address"} end). 1172: 1173: unmock_inet(_Pids) -> 1174: execute_on_each_node(meck, unload, [mod_global_distrib_utils]). 1175: 1176: out_connection_sups(Node) -> 1177: Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]), 1178: lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children). 1179: 1180: trees_for_connections_present() -> 1181: AsiaChildren = out_connection_sups(asia_node), 1182: Europe1Children = out_connection_sups(europe_node1), 1183: Europe2Children = out_connection_sups(europe_node2), 1184: lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]). 1185: 1186: tree_for_sup_present(Node, ExpectedSup) -> 1187: Children = out_connection_sups(Node), 1188: lists:keyfind(ExpectedSup, 1, Children) =/= false. 1189: 1190: 1191: %% ------------------------------- rebalancing helpers ----------------------------------- 1192: 1193: spawn_connection_getter(SenderNode) -> 1194: TestPid = self(), 1195: spawn(fun() -> 1196: Conn = get_connection(SenderNode, <<"reg1">>), 1197: TestPid ! Conn 1198: end). 1199: 1200: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) -> 1201: restart_receiver(ListenNode, [Port, listen_port(ListenNode)]), 1202: set_endpoints(ListenNode, [Port, listen_port(ListenNode)]), 1203: trigger_rebalance(SenderNode, <<"reg1">>). 1204: 1205: get_connection(SenderNode, ToDomain) -> 1206: rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]). 1207: 1208: hide_extra_endpoint(ListenNode) -> 1209: set_endpoints(ListenNode, [listen_port(ListenNode)]). 1210: 1211: set_endpoints(ListenNode, Ports) -> 1212: Endpoints = [endpoint(Port) || Port <- Ports], 1213: {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]). 1214: 1215: get_outgoing_connections(NodeName, DestinationDomain) -> 1216: Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]), 1217: Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]), 1218: Enabled = rpc(NodeName, mod_global_distrib_server_mgr, 1219: get_enabled_endpoints, [DestinationDomain]), 1220: Disabled = rpc(NodeName, mod_global_distrib_server_mgr, 1221: get_disabled_endpoints, [DestinationDomain]), 1222: PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]), 1223: Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ], 1224: {Enabled, Disabled, Pools}. 1225: 1226: restart_receiver(NodeName) -> 1227: restart_receiver(NodeName, [listen_port(NodeName)]). 1228: 1229: restart_receiver(NodeName, NewPorts) -> 1230: OldOpts = #{connections := OldConnOpts} = rpc(NodeName, gen_mod, get_module_opts, 1231: [<<"localhost">>, mod_global_distrib_receiver]), 1232: NewConnOpts = OldConnOpts#{endpoints := [endpoint(Port) || Port <- NewPorts], 1233: resolved_endpoints := [resolved_endpoint(Port) || Port <- NewPorts]}, 1234: NewOpts = OldOpts#{connections := NewConnOpts}, 1235: Node = node_spec(NodeName), 1236: dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts). 1237: 1238: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) -> 1239: %% To ensure that the manager exists, 1240: %% otherwise we can get noproc error in the force_refresh call 1241: ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup, 1242: ensure_server_started, [DestinationDomain]), 1243: rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]), 1244: StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]), 1245: ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]), 1246: timer:sleep(1000). 1247: 1248: %% ----------------------------------------------------------------------- 1249: %% Escalus-related helpers 1250: 1251: %% Receive messages with Bodies in any order, skipping presences from stream resumption 1252: user_receives(User, Bodies) -> 1253: Opts = #{pred => fun(Stanza) -> not escalus_pred:is_presence(Stanza) end}, 1254: Checks = [fun(Stanza) -> escalus_pred:is_chat_message(Body, Stanza) end || Body <- Bodies], 1255: escalus:assert_many(Checks, [escalus_connection:receive_stanza(User, Opts) || _ <- Bodies]). 1256: 1257: %% ----------------------------------------------------------------------- 1258: %% Refreshing helpers 1259: 1260: %% Reason is a string 1261: %% NodeName is asia_node, europe_node2, ... in a format used by this suite. 1262: refresh_mappings(NodeName, Reason) when is_list(Reason) -> 1263: rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]). 1264: 1265: refresh_hosts(NodeNames, Reason) -> 1266: [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames]. 1267: 1268: 1269: %% ----------------------------------------------------------------------- 1270: %% Other helpers 1271: 1272: connect_steps_with_sm() -> 1273: [start_stream, stream_features, maybe_use_ssl, 1274: authenticate, bind, session, stream_resumption]. 1275: 1276: bare_client(Client) -> 1277: Client#client{jid = escalus_utils:get_short_jid(Client)}. 1278: 1279: service_port() -> 1280: ct:get_config({hosts, mim, service_port}). 1281: 1282: 1283: %% ----------------------------------------------------------------------- 1284: %% Waiting helpers 1285: 1286: wait_for_domain(Node, Domain) -> 1287: F = fun() -> 1288: Domains = rpc:call(Node, mod_global_distrib_mapping, all_domains, []), 1289: lists:member(Domain, Domains) 1290: end, 1291: mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}). 1292: 1293: 1294: %% ----------------------------------------------------------------------- 1295: %% Ensure, that endpoints are up 1296: 1297: wait_for_listeners_to_appear() -> 1298: [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())]. 1299: 1300: receiver_ports(Hosts) -> 1301: lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts). 1302: 1303: wait_for_can_connect_to_port(Port) -> 1304: Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}}, 1305: mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts). 1306: 1307: can_connect_to_port(Port) -> 1308: case gen_tcp:connect("127.0.0.1", Port, []) of 1309: {ok, Sock} -> 1310: gen_tcp:close(Sock), 1311: true; 1312: Other -> 1313: ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]), 1314: false 1315: end. 1316: 1317: %% ----------------------------------------------------------------------- 1318: %% Custom log levels for GD modules during the tests 1319: 1320: enable_logging() -> 1321: mim_loglevel:enable_logging(test_hosts(), custom_loglevels()). 1322: 1323: disable_logging() -> 1324: mim_loglevel:disable_logging(test_hosts(), custom_loglevels()). 1325: 1326: custom_loglevels() -> 1327: %% for "s2s connection to muc.localhost not found" debugging 1328: [{ejabberd_s2s, debug}, 1329: %% for debugging event=refreshing_own_data_done 1330: {mod_global_distrib_mapping_redis, info}, 1331: %% to know if connection is already started or would be started 1332: %% event=outgoing_conn_start_progress 1333: {mod_global_distrib_outgoing_conns_sup, info}, 1334: %% to debug bound connection issues 1335: {mod_global_distrib, debug}, 1336: %% to know all new connections pids 1337: {mod_global_distrib_connection, debug}, 1338: %% to check if gc or refresh is triggered 1339: {mod_global_distrib_server_mgr, info}, 1340: %% To debug incoming connections 1341: % {mod_global_distrib_receiver, info}, 1342: %% to debug global session set/delete 1343: {mod_global_distrib_mapping, debug} 1344: ]. 1345: 1346: test_hosts() -> [mim, mim2, reg].