1: %%============================================================================== 2: %% Copyright 2017 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mod_global_distrib_SUITE). 18: 19: -compile([export_all, nowarn_export_all]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("common_test/include/ct.hrl"). 23: -include_lib("eunit/include/eunit.hrl"). 24: -include_lib("escalus/include/escalus_xmlns.hrl"). 25: -include_lib("exml/include/exml.hrl"). 26: 27: -define(HOSTS_REFRESH_INTERVAL, 200). %% in ms 28: 29: -import(domain_helper, [domain/0]). 30: 31: %%-------------------------------------------------------------------- 32: %% Suite configuration 33: %%-------------------------------------------------------------------- 34: 35: all() -> 36: [ 37: {group, mod_global_distrib}, 38: {group, cluster_restart}, 39: {group, start_checks}, 40: {group, invalidation}, 41: {group, multi_connection}, 42: {group, rebalancing}, 43: {group, advertised_endpoints}, 44: {group, hosts_refresher} 45: ]. 46: 47: groups() -> 48: G = [{mod_global_distrib, [], 49: [ 50: test_pm_between_users_at_different_locations, 51: test_pm_between_users_before_available_presence, 52: test_component_disconnect, 53: test_component_on_one_host, 54: test_components_in_different_regions, 55: test_hidden_component_disco_in_different_region, 56: test_pm_with_disconnection_on_other_server, 57: test_pm_with_graceful_reconnection_to_different_server, 58: test_pm_with_ungraceful_reconnection_to_different_server, 59: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first, 60: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first, 61: test_component_unregister, 62: test_update_senders_host, 63: test_update_senders_host_by_ejd_service, 64: 65: %% with node 2 disabled 66: test_muc_conversation_on_one_host, 67: test_global_disco 68: %% TODO: Add test case fo global_distrib_addr option 69: ]}, 70: {hosts_refresher, [], 71: [test_host_refreshing]}, 72: {cluster_restart, [], 73: [ 74: test_location_disconnect 75: ]}, 76: {start_checks, [], 77: [ 78: test_error_on_wrong_hosts 79: ]}, 80: {invalidation, [], 81: [ 82: % TODO: Add checks for other mapping refreshes 83: refresh_nodes 84: ]}, 85: {multi_connection, [], 86: [ 87: test_in_order_messages_on_multiple_connections, 88: test_in_order_messages_on_multiple_connections_with_bounce, 89: test_messages_bounced_in_order, 90: 91: %% with node 2 disabled 92: test_muc_conversation_history 93: ]}, 94: {rebalancing, [], 95: [ 96: enable_new_endpoint_on_refresh, 97: disable_endpoint_on_refresh, 98: wait_for_connection, 99: closed_connection_is_removed_from_disabled 100: ]}, 101: {advertised_endpoints, [], 102: [ 103: test_advertised_endpoints_override_endpoints, 104: test_pm_between_users_at_different_locations 105: ]} 106: ], 107: ct_helper:repeat_all_until_all_ok(G). 108: 109: suite() -> 110: [{require, europe_node1, {hosts, mim, node}}, 111: {require, europe_node2, {hosts, mim2, node}}, 112: {require, asia_node, {hosts, reg, node}}, 113: {require, c2s_port, {hosts, mim, c2s_port}} | 114: escalus:suite()]. 115: 116: %%-------------------------------------------------------------------- 117: %% Init & teardown 118: %%-------------------------------------------------------------------- 119: 120: init_per_suite(Config) -> 121: case {rpc(europe_node1, mongoose_wpool, get_worker, [redis, global, global_distrib]), 122: rpc(asia_node, mongoose_wpool, get_worker, [redis, global, global_distrib])} of 123: {{ok, _}, {ok, _}} -> 124: ok = rpc(europe_node2, mongoose_cluster, join, [ct:get_config(europe_node1)]), 125: 126: enable_logging(), 127: % We have to pass [no_opts] because [] is treated as string and converted 128: % automatically to <<>> 129: escalus:init_per_suite([{add_advertised_endpoints, []}, 130: {extra_config, []}, {redis_extra_config, [no_opts]} | Config]); 131: Result -> 132: ct:pal("Redis check result: ~p", [Result]), 133: {skip, "GD Redis default pool not available"} 134: end. 135: 136: end_per_suite(Config) -> 137: disable_logging(), 138: escalus_fresh:clean(), 139: rpc(europe_node2, mongoose_cluster, leave, []), 140: escalus:end_per_suite(Config). 141: 142: init_per_group(start_checks, Config) -> 143: Config; 144: init_per_group(multi_connection, Config) -> 145: ExtraConfig = [{resend_after_ms, 20000}, 146: %% Disable unused feature to avoid interferance 147: {disabled_gc_interval, 10000}, 148: {connections_per_endpoint, 100}], 149: init_per_group_generic([{extra_config, ExtraConfig} | Config]); 150: init_per_group(invalidation, Config) -> 151: Config1 = init_per_group(invalidation_generic, Config), 152: NodeBin = <<"fake_node@localhost">>, 153: [{node_to_expire, NodeBin} | Config1]; 154: init_per_group(rebalancing, Config) -> 155: %% We need to prevent automatic refreshes, because they may interfere with tests 156: %% and we need early disabled garbage collection to check its validity 157: ExtraConfig = [{endpoint_refresh_interval, 3600}, 158: {endpoint_refresh_interval_when_empty, 3600}, 159: {disabled_gc_interval, 1}], 160: RedisExtraConfig = [{refresh_after, 3600}], 161: init_per_group_generic([{extra_config, ExtraConfig}, 162: {redis_extra_config, RedisExtraConfig} | Config]); 163: init_per_group(advertised_endpoints, Config) -> 164: lists:foreach(fun({NodeName, _, _}) -> 165: Node = ct:get_config(NodeName), 166: mongoose_helper:inject_module(#{node => Node}, ?MODULE, reload) 167: end, get_hosts()), 168: mock_inet_on_each_node(), 169: init_per_group_generic( 170: [{add_advertised_endpoints, 171: [{asia_node, advertised_endpoints()}]} | Config]); 172: init_per_group(mod_global_distrib, Config) -> 173: %% Disable mod_global_distrib_mapping_redis refresher 174: RedisExtraConfig = [{refresh_after, 3600}], 175: init_per_group_generic([{redis_extra_config, RedisExtraConfig} | Config]); 176: init_per_group(_, Config) -> 177: init_per_group_generic(Config). 178: 179: init_per_group_generic(Config0) -> 180: Config2 = 181: lists:foldl( 182: fun({NodeName, LocalHost, ReceiverPort}, Config1) -> 183: Opts0 = (?config(extra_config, Config1) ++ 184: [{local_host, LocalHost}, 185: {hosts_refresh_interval, ?HOSTS_REFRESH_INTERVAL}, 186: {global_host, "localhost"}, 187: {endpoints, [listen_endpoint(ReceiverPort)]}, 188: {tls_opts, [ 189: {certfile, "priv/ssl/fake_server.pem"}, 190: {cafile, "priv/ssl/ca/cacert.pem"} 191: ]}, 192: {redis, ?config(redis_extra_config, Config1)}, 193: {resend_after_ms, 500}]), 194: Opts = maybe_add_advertised_endpoints(NodeName, Opts0, Config1), 195: 196: VirtHosts = virtual_hosts(NodeName), 197: Node = node_spec(NodeName), 198: Config2 = dynamic_modules:save_modules(Node, VirtHosts, Config1), 199: 200: %% To reduce load when sending many messages 201: ModulesToStop = [mod_offline, mod_blocking, mod_privacy, mod_roster, mod_last, 202: mod_stream_management], 203: [dynamic_modules:ensure_stopped(Node, VirtHost, ModulesToStop) || 204: VirtHost <- VirtHosts], 205: 206: dynamic_modules:ensure_modules(Node, domain(), 207: [{mod_global_distrib, Opts}, 208: {mod_stream_management, [{resume_timeout, 1}]}]), 209: Config2 210: end, 211: Config0, 212: get_hosts()), 213: 214: wait_for_listeners_to_appear(), 215: 216: {SomeNode, _, _} = hd(get_hosts()), 217: NodesKey = rpc(SomeNode, mod_global_distrib_mapping_redis, nodes_key, []), 218: [{nodes_key, NodesKey}, {escalus_user_db, xmpp} | Config2]. 219: 220: end_per_group(advertised_endpoints, Config) -> 221: Pids = ?config(meck_handlers, Config), 222: unmock_inet(Pids), 223: escalus_fresh:clean(), 224: end_per_group_generic(Config); 225: end_per_group(start_checks, Config) -> 226: escalus_fresh:clean(), 227: Config; 228: end_per_group(invalidation, Config) -> 229: redis_query(europe_node1, [<<"HDEL">>, ?config(nodes_key, Config), 230: ?config(node_to_expire, Config)]), 231: end_per_group_generic(Config); 232: end_per_group(_, Config) -> 233: end_per_group_generic(Config). 234: 235: end_per_group_generic(Config) -> 236: dynamic_modules:restore_modules(#{timeout => timer:seconds(30)}, Config). 237: 238: init_per_testcase(CaseName, Config) 239: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 240: CaseName == test_muc_conversation_history -> 241: %% There is no helper to load MUC on node2 242: %% For now it's easier to hide node2 243: %% TODO: Do it right at some point! 244: hide_node(europe_node2, Config), 245: %% There would be no new connections to europe_node2, but there can be some old ones. 246: %% We need to disconnect previous connections. 247: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 248: trigger_rebalance(asia_node, list_to_binary(EuropeHost)), 249: %% Load muc on mim node 250: muc_helper:load_muc(), 251: RegNode = ct:get_config({hosts, reg, node}), 252: %% Wait for muc.localhost to become visible from reg node 253: wait_for_domain(RegNode, muc_helper:muc_host()), 254: escalus:init_per_testcase(CaseName, Config); 255: init_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 256: CN == test_pm_with_ungraceful_reconnection_to_different_server; 257: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 258: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 259: escalus:init_per_testcase(CN, init_user_eve(Config)); 260: init_per_testcase(CaseName, Config) -> 261: escalus:init_per_testcase(CaseName, Config). 262: 263: init_user_eve(Config) -> 264: %% Register Eve in reg cluster 265: EveSpec = escalus_fresh:create_fresh_user(Config, eve), 266: MimPort = ct:get_config({hosts, mim, c2s_port}), 267: EveSpec2 = lists:keystore(port, 1, EveSpec, {port, MimPort}), 268: %% Register Eve in mim cluster 269: escalus:create_users(Config, [{eve, EveSpec2}]), 270: [{evespec_reg, EveSpec}, {evespec_mim, EveSpec2} | Config]. 271: 272: end_per_testcase(CN, Config) when CN == test_pm_with_graceful_reconnection_to_different_server; 273: CN == test_pm_with_ungraceful_reconnection_to_different_server; 274: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first; 275: CN == test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first -> 276: MimEveSpec = ?config(evespec_mim, Config), 277: %% Clean Eve from reg cluster 278: escalus_fresh:clean(), 279: %% Clean Eve from mim cluster 280: %% For shared databases (i.e. mysql, pgsql...), 281: %% removing from one cluster would remove from all clusters. 282: %% For mnesia auth backend we need to call removal from each cluster. 283: %% That's why there is a catch here. 284: catch escalus_users:delete_users(Config, [{mim_eve, MimEveSpec}]), 285: generic_end_per_testcase(CN, Config); 286: end_per_testcase(CaseName, Config) 287: when CaseName == test_muc_conversation_on_one_host; CaseName == test_global_disco; 288: CaseName == test_muc_conversation_history -> 289: refresh_mappings(europe_node2, "by_end_per_testcase,testcase=" ++ atom_to_list(CaseName)), 290: muc_helper:unload_muc(), 291: generic_end_per_testcase(CaseName, Config); 292: end_per_testcase(test_update_senders_host_by_ejd_service = CN, Config) -> 293: refresh_mappings(europe_node1, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 294: generic_end_per_testcase(CN, Config); 295: end_per_testcase(CN, Config) when CN == enable_new_endpoint_on_refresh; 296: CN == disable_endpoint_on_refresh; 297: CN == wait_for_connection; 298: CN == closed_connection_is_removed_from_disabled -> 299: restart_receiver(asia_node), 300: refresh_mappings(asia_node, "by_end_per_testcase,testcase=" ++ atom_to_list(CN)), 301: generic_end_per_testcase(CN, Config); 302: end_per_testcase(CaseName, Config) -> 303: generic_end_per_testcase(CaseName, Config). 304: 305: generic_end_per_testcase(CaseName, Config) -> 306: lists:foreach( 307: fun({NodeName, _, _}) -> 308: %% TODO: Enable refresher only for specific test cases, 309: %% as some of them are based on assumption that node(s) 310: %% must open new connections during tests. 311: pause_refresher(NodeName, CaseName), 312: Node = ct:get_config(NodeName), 313: SupRef = {mod_global_distrib_outgoing_conns_sup, Node}, 314: try 315: OutgoingConns = supervisor:which_children(SupRef), 316: lists:foreach(fun ({mod_global_distrib_hosts_refresher, _, _, _}) -> 317: skip; 318: ({Id, _, _, _}) -> 319: supervisor:terminate_child(SupRef, Id) 320: end, OutgoingConns), 321: [{mod_global_distrib_hosts_refresher, _, worker, _Modules}] = 322: supervisor:which_children(SupRef) 323: catch 324: _:{noproc, _} -> 325: ct:pal("Sender supervisor not found in ~p", [NodeName]) 326: end, 327: unpause_refresher(NodeName, CaseName) 328: end, 329: get_hosts()), 330: escalus:end_per_testcase(CaseName, Config). 331: 332: virtual_hosts(asia_node) -> 333: [domain()]; 334: virtual_hosts(_) -> 335: [domain(), secondary_domain()]. 336: 337: secondary_domain() -> 338: ct:get_config({hosts, mim, secondary_domain}). 339: 340: %% Refresher is not started at all or stopped for some test cases 341: -spec pause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 342: pause_refresher(_, test_error_on_wrong_hosts) -> 343: ok; 344: pause_refresher(asia_node, test_location_disconnect) -> 345: ok; 346: pause_refresher(NodeName, _) -> 347: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, pause, []). 348: 349: -spec unpause_refresher(NodeName :: atom(), CaseName :: atom()) -> ok. 350: unpause_refresher(_, test_error_on_wrong_hosts) -> 351: ok; 352: unpause_refresher(asia_node, test_location_disconnect) -> 353: ok; 354: unpause_refresher(NodeName, _) -> 355: ok = rpc(NodeName, mod_global_distrib_hosts_refresher, unpause, []). 356: 357: %%-------------------------------------------------------------------- 358: %% Service discovery test 359: %%-------------------------------------------------------------------- 360: 361: %% Requires module mod_global_distrib to be started with argument advertised_endpoints 362: %% for each host in get_hosts(). 363: %% Reads Redis to confirm that endpoints (in Redis) are overwritten 364: %% with `advertised_endpoints` option value 365: test_advertised_endpoints_override_endpoints(_Config) -> 366: Endps = execute_on_each_node(mod_global_distrib_mapping_redis, 367: get_endpoints, 368: [<<"reg1">>]), 369: true = lists:all(fun({ok, E}) -> 370: lists:sort(iptuples_to_string(E)) =:= 371: lists:sort(advertised_endpoints()) end, Endps). 372: 373: %% @doc Verifies that hosts refresher will restart the outgoing connection pool if 374: %% it goes down for some reason (crash or domain unavailability). 375: %% Also actually verifies that refresher properly reads host list 376: %% from backend and starts appropriate pool. 377: test_host_refreshing(_Config) -> 378: mongoose_helper:wait_until(fun() -> trees_for_connections_present() end, true, 379: #{name => trees_for_connections_present}), 380: ConnectionSups = out_connection_sups(asia_node), 381: {europe_node1, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 382: EuropeSup = rpc(asia_node, mod_global_distrib_utils, server_to_sup_name, [list_to_binary(EuropeHost)]), 383: {_, EuropePid, supervisor, _} = lists:keyfind(EuropeSup, 1, ConnectionSups), 384: erlang:exit(EuropePid, kill), % it's ok to kill temporary process 385: mongoose_helper:wait_until(fun() -> tree_for_sup_present(asia_node, EuropeSup) end, true, 386: #{name => tree_for_sup_present}). 387: 388: %% When run in mod_global_distrib group - tests simple case of connection 389: %% between two users connected to different clusters. 390: %% When run in advertised_endpoints group it tests whether it is possible 391: %% to connect to a node that is advertising itself with a domain name. 392: test_pm_between_users_at_different_locations(Config) -> 393: escalus:fresh_story(Config, [{alice, 1}, {eve, 1}], fun test_two_way_pm/2). 394: 395: test_pm_between_users_before_available_presence(Config) -> 396: Config1 = escalus_fresh:create_users(Config, [{alice, 1}, {eve, 1}]), 397: {ok, Alice} = escalus_client:start(Config1, alice, <<"res1">>), 398: {ok, Eve} = escalus_client:start(Config1, eve, <<"res1">>), 399: 400: test_two_way_pm(Alice, Eve), 401: 402: escalus_client:stop(Config1, Alice), 403: escalus_client:stop(Config1, Eve). 404: 405: test_two_way_pm(Alice, Eve) -> 406: %% Ensure that users are properly registered 407: %% Otherwise you can get "Unable to route global message... user not found in the routing table" 408: %% error, because "escalus_client:start" can return before SM registration is completed. 409: wait_for_registration(Alice, ct:get_config({hosts, mim, node})), 410: wait_for_registration(Eve, ct:get_config({hosts, reg, node})), 411: 412: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi to Eve from Europe1!">>)), 413: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi to Alice from Asia!">>)), 414: 415: FromAlice = escalus_client:wait_for_stanza(Eve, timer:seconds(15)), 416: FromEve = escalus_client:wait_for_stanza(Alice, timer:seconds(15)), 417: 418: AliceJid = escalus_client:full_jid(Alice), 419: EveJid = escalus_client:full_jid(Eve), 420: 421: escalus:assert(is_chat_message_from_to, [AliceJid, EveJid, <<"Hi to Eve from Europe1!">>], 422: FromAlice), 423: escalus:assert(is_chat_message_from_to, [EveJid, AliceJid, <<"Hi to Alice from Asia!">>], 424: FromEve). 425: 426: test_muc_conversation_on_one_host(Config0) -> 427: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 428: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 429: escalus:fresh_story( 430: Config, [{eve, 1}], 431: fun(Eve) -> 432: Alice = connect_from_spec(AliceSpec, Config), 433: 434: RoomJid = ?config(room, Config), 435: AliceUsername = escalus_utils:get_username(Alice), 436: EveUsername = escalus_utils:get_username(Eve), 437: RoomAddr = muc_helper:room_address(RoomJid), 438: 439: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 440: escalus:wait_for_stanza(Alice), 441: 442: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 443: [_, _, _] = escalus:wait_for_stanzas(Eve, 3), 444: 445: Msg= <<"Hi, Eve!">>, 446: escalus:send(Alice, escalus_stanza:groupchat_to(RoomAddr, Msg)), 447: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Alice)), 448: escalus:assert(is_groupchat_message, [Msg], escalus:wait_for_stanza(Eve)), 449: 450: Msg2= <<"Hi, Alice!">>, 451: escalus:send(Eve, escalus_stanza:groupchat_to(RoomAddr, Msg2)), 452: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Eve)), 453: escalus:assert(is_groupchat_message, [Msg2], escalus:wait_for_stanza(Alice)) 454: end), 455: muc_helper:destroy_room(Config). 456: 457: test_muc_conversation_history(Config0) -> 458: AliceSpec = escalus_fresh:create_fresh_user(Config0, alice), 459: Config = muc_helper:given_fresh_room(Config0, AliceSpec, []), 460: escalus:fresh_story( 461: Config, [{eve, 1}], 462: fun(Eve) -> 463: Alice = connect_from_spec(AliceSpec, Config), 464: 465: RoomJid = ?config(room, Config), 466: AliceUsername = escalus_utils:get_username(Alice), 467: RoomAddr = muc_helper:room_address(RoomJid), 468: 469: escalus:send(Alice, muc_helper:stanza_muc_enter_room(RoomJid, AliceUsername)), 470: %% We don't care about presences from Alice, escalus would filter them out 471: wait_for_subject(Alice), 472: 473: send_n_muc_messages(Alice, RoomAddr, 3), 474: 475: %% Ensure that the messages are received by the room 476: %% before trying to login Eve. 477: %% Otherwise, Eve would receive some messages from history and 478: %% some as regular groupchat messages. 479: receive_n_muc_messages(Alice, 3), 480: 481: EveUsername = escalus_utils:get_username(Eve), 482: escalus:send(Eve, muc_helper:stanza_muc_enter_room(RoomJid, EveUsername)), 483: 484: wait_for_muc_presence(Eve, RoomJid, AliceUsername), 485: wait_for_muc_presence(Eve, RoomJid, EveUsername), 486: 487: %% XEP-0045: After sending the presence broadcast (and only after doing so), 488: %% the service MAY then send discussion history, the room subject, 489: %% live messages, presence updates, and other in-room traffic. 490: receive_n_muc_messages(Eve, 3), 491: wait_for_subject(Eve) 492: end), 493: muc_helper:destroy_room(Config). 494: 495: wait_for_muc_presence(User, RoomJid, FromNickname) -> 496: Presence = escalus:wait_for_stanza(User), 497: escalus:assert(is_presence, Presence), 498: escalus:assert(is_stanza_from, [muc_helper:room_address(RoomJid, FromNickname)], Presence), 499: ok. 500: 501: wait_for_subject(User) -> 502: Subject = escalus:wait_for_stanza(User), 503: escalus:assert(is_groupchat_message, Subject), 504: ?assertNotEqual(undefined, exml_query:subelement(Subject, <<"subject">>)), 505: ok. 506: 507: send_n_muc_messages(User, RoomAddr, N) -> 508: lists:foreach(fun(I) -> 509: Msg = <<"test-", (integer_to_binary(I))/binary>>, 510: escalus:send(User, escalus_stanza:groupchat_to(RoomAddr, Msg)) 511: end, lists:seq(1, N)). 512: 513: receive_n_muc_messages(User, N) -> 514: lists:foreach(fun(J) -> 515: Msg = <<"test-", (integer_to_binary(J))/binary>>, 516: Stanza = escalus:wait_for_stanza(User), 517: escalus:assert(is_groupchat_message, [Msg], Stanza) 518: end, lists:seq(1, N)). 519: 520: test_component_on_one_host(Config) -> 521: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 522: {port, service_port()}, {component, <<"test_service">>}], 523: 524: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 525: 526: Story = fun(User) -> 527: Msg1 = escalus_stanza:chat_to(Addr, <<"Hi2!">>), 528: escalus:send(User, Msg1), 529: %% Then component receives it 530: Reply1 = escalus:wait_for_stanza(Comp), 531: escalus:assert(is_chat_message, [<<"Hi2!">>], Reply1), 532: 533: %% When components sends a reply 534: Msg2 = escalus_stanza:chat_to(User, <<"Oh hi!">>), 535: escalus:send(Comp, escalus_stanza:from(Msg2, Addr)), 536: 537: %% Then Alice receives it 538: Reply2 = escalus:wait_for_stanza(User), 539: escalus:assert(is_chat_message, [<<"Oh hi!">>], Reply2), 540: escalus:assert(is_stanza_from, [Addr], Reply2) 541: end, 542: 543: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 544: 545: %% Ensures that 2 components in distinct data centers can communicate. 546: test_components_in_different_regions(_Config) -> 547: ComponentCommonConfig = [{host, <<"localhost">>}, {password, <<"secret">>}, 548: {server, <<"localhost">>}, {component, <<"test_service">>}], 549: Comp1Port = ct:get_config({hosts, mim, service_port}), 550: Comp2Port = ct:get_config({hosts, reg, service_port}), 551: Component1Config = [{port, Comp1Port}, {component, <<"service1">>} | ComponentCommonConfig], 552: Component2Config = [{port, Comp2Port}, {component, <<"service2">>} | ComponentCommonConfig], 553: 554: {Comp1, Addr1, _Name1} = component_helper:connect_component(Component1Config), 555: {Comp2, Addr2, _Name2} = component_helper:connect_component(Component2Config), 556: 557: Msg1 = escalus_stanza:from(escalus_stanza:chat_to(Addr2, <<"Hi from 1!">>), Addr1), 558: escalus:send(Comp1, Msg1), 559: GotMsg1 = escalus:wait_for_stanza(Comp2), 560: escalus:assert(is_chat_message, [<<"Hi from 1!">>], GotMsg1), 561: 562: Msg2 = escalus_stanza:from(escalus_stanza:chat_to(Addr1, <<"Hi from 2!">>), Addr2), 563: escalus:send(Comp2, Msg2), 564: GotMsg2 = escalus:wait_for_stanza(Comp1), 565: escalus:assert(is_chat_message, [<<"Hi from 2!">>], GotMsg2). 566: 567: %% Ordinary user is not able to discover the hidden component from GD 568: test_hidden_component_disco_in_different_region(Config) -> 569: %% Hidden component from component_SUITE connects to mim1/europe_node1 570: HiddenComponentConfig = component_helper:spec(hidden_component, Config), 571: {_HiddenComp, HiddenAddr, _} = component_helper:connect_component(HiddenComponentConfig), 572: 573: escalus:fresh_story( 574: Config, [{eve, 1}], 575: fun(Eve) -> 576: EveServer = escalus_client:server(Eve), 577: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 578: DiscoReply = escalus:wait_for_stanza(Eve), 579: escalus:assert(is_iq_result, DiscoReply), 580: escalus:assert(fun(Stanza) -> 581: not escalus_pred:has_service(HiddenAddr, Stanza) 582: end, DiscoReply) 583: end). 584: 585: test_component_disconnect(Config) -> 586: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 587: {port, service_port()}, {component, <<"test_service">>}], 588: 589: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 590: component_helper:disconnect_component(Comp, Addr), 591: 592: Story = fun(User) -> 593: escalus:send(User, escalus_stanza:chat_to(Addr, <<"Hi!">>)), 594: Error = escalus:wait_for_stanza(User, 5000), 595: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 596: end, 597: 598: [escalus:fresh_story(Config, [{User, 1}], Story) || User <- [alice, eve]]. 599: 600: test_location_disconnect(Config) -> 601: try 602: escalus:fresh_story( 603: Config, [{alice, 1}, {eve, 1}], 604: fun(Alice, Eve) -> 605: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 606: 607: escalus_client:wait_for_stanza(Eve), 608: 609: ok = rpc(asia_node, application, stop, [mongooseim]), 610: %% TODO: Stopping mongooseim alone should probably stop connections too 611: ok = rpc(asia_node, application, stop, [ranch]), 612: 613: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi again!">>)), 614: Error = escalus:wait_for_stanza(Alice), 615: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], Error) 616: end) 617: after 618: rpc(asia_node, application, start, [ranch]), 619: rpc(asia_node, application, start, [mongooseim]) 620: end. 621: 622: test_pm_with_disconnection_on_other_server(Config) -> 623: escalus:fresh_story( 624: Config, [{alice, 1}, {eve, 1}], 625: fun(Alice, Eve) -> 626: escalus_connection:stop(Eve), 627: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"Hi from Europe1!">>)), 628: FromAliceBounce = escalus_client:wait_for_stanza(Alice, 15000), 629: escalus:assert(is_error, [<<"cancel">>, <<"service-unavailable">>], FromAliceBounce) 630: end). 631: 632: test_pm_with_graceful_reconnection_to_different_server(Config) -> 633: EveSpec = ?config(evespec_reg, Config), 634: EveSpec2 = ?config(evespec_mim, Config), 635: escalus:fresh_story( 636: Config, [{alice, 1}], 637: fun(Alice) -> 638: Eve = connect_from_spec(EveSpec, Config), 639: 640: escalus_client:send(Eve, escalus_stanza:chat_to(Alice, <<"Hi from Asia!">>)), 641: 642: %% Stop connection and wait for process to die 643: EveNode = ct:get_config({hosts, reg, node}), 644: mongoose_helper:logout_user(Config, Eve, #{node => EveNode}), 645: 646: FromEve = escalus_client:wait_for_stanza(Alice), 647: 648: %% Pause Alice until Eve is reconnected 649: AliceNode = ct:get_config({hosts, mim, node}), 650: C2sPid = mongoose_helper:get_session_pid(Alice, #{node => AliceNode}), 651: ok = rpc(asia_node, sys, suspend, [C2sPid]), 652: 653: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi from Europe1!">>)), 654: 655: NewEve = connect_from_spec(EveSpec2, Config), 656: EveNode2 = ct:get_config({hosts, mim, node}), 657: wait_for_registration(NewEve, EveNode2), 658: 659: ok = rpc(asia_node, sys, resume, [C2sPid]), 660: 661: 662: escalus_client:send(Alice, chat_with_seqnum(Eve, <<"Hi again from Europe1!">>)), 663: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi again from Asia!">>)), 664: 665: FirstFromAlice = escalus_client:wait_for_stanza(NewEve), 666: AgainFromEve = escalus_client:wait_for_stanza(Alice), 667: SecondFromAlice = escalus_client:wait_for_stanza(NewEve), 668: 669: [FromAlice, AgainFromAlice] = order_by_seqnum([FirstFromAlice, SecondFromAlice]), 670: 671: escalus:assert(is_chat_message, [<<"Hi from Europe1!">>], FromAlice), 672: escalus:assert(is_chat_message, [<<"Hi from Asia!">>], FromEve), 673: escalus:assert(is_chat_message, [<<"Hi again from Europe1!">>], AgainFromAlice), 674: escalus:assert(is_chat_message, [<<"Hi again from Asia!">>], AgainFromEve) 675: end). 676: 677: %% Refresh logic can cause two possible behaviours. 678: %% We test both behaviours here (plus no refresh case) 679: %% See PR #2392 680: test_pm_with_ungraceful_reconnection_to_different_server(Config) -> 681: %% No refresh 682: BeforeResume = fun() -> ok end, 683: AfterCheck = fun(Alice, NewEve) -> 684: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 685: user_receives(Alice, [<<"Hi from Europe!">>]) 686: end, 687: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 688: 689: test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first(Config) -> 690: %% Same as no refresh 691: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_asia_refreshes_first", 692: % Order of nodes is important here in refresh_hosts! 693: BeforeResume = fun() -> refresh_hosts([asia_node, europe_node1], RefreshReason) end, 694: AfterCheck = fun(Alice, NewEve) -> 695: user_receives(NewEve, [<<"Hi from Europe1!">>, <<"Hi again from Europe1!">>]), 696: user_receives(Alice, [<<"Hi from Europe!">>]) 697: end, 698: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 699: 700: test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first(Config) -> 701: %% Asia node overrides Europe value with the older ones, 702: %% so we loose some messages during rerouting :( 703: RefreshReason = "by_test_pm_with_ungraceful_reconnection_to_different_server_with_europe_refreshes_first", 704: BeforeResume = fun() -> refresh_hosts([europe_node1, asia_node], RefreshReason) end, 705: AfterCheck = fun(Alice, NewEve) -> 706: user_receives(NewEve, [<<"Hi again from Europe1!">>]), 707: user_receives(Alice, [<<"Hi from Europe!">>]) 708: end, 709: do_test_pm_with_ungraceful_reconnection_to_different_server(Config, BeforeResume, AfterCheck). 710: 711: %% Reconnect Eve from asia (reg cluster) to europe (mim) 712: do_test_pm_with_ungraceful_reconnection_to_different_server(Config0, BeforeResume, AfterCheck) -> 713: Config = escalus_users:update_userspec(Config0, eve, stream_management, true), 714: EveSpec = ?config(evespec_reg, Config), 715: EveSpec2 = ?config(evespec_mim, Config), 716: escalus:fresh_story( 717: Config, [{alice, 1}], 718: fun(Alice) -> 719: {ok, Eve, _} = escalus_connection:start(EveSpec, connect_steps_with_sm()), 720: escalus_story:send_initial_presence(Eve), 721: escalus_client:wait_for_stanza(Eve), 722: 723: %% Stop connection and wait for process to die 724: EveNode = ct:get_config({hosts, reg, node}), 725: C2sPid = mongoose_helper:get_session_pid(Eve, #{node => EveNode}), 726: ok = rpc(asia_node, sys, suspend, [C2sPid]), 727: 728: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi from Europe1!">>)), 729: 730: %% Wait for route message to be queued in c2s message queue 731: mongoose_helper:wait_for_route_message_count(C2sPid, 1), 732: 733: %% Time to do bad nasty things with our socket, so once our process wakes up, 734: %% it SHOULD detect a dead socket 735: escalus_connection:kill(Eve), 736: 737: %% Connect another one, we hope the message would be rerouted 738: NewEve = connect_from_spec(EveSpec2, Config), 739: EveNode2 = ct:get_config({hosts, mim, node}), 740: wait_for_registration(NewEve, EveNode2), 741: 742: BeforeResume(), 743: 744: %% Trigger rerouting 745: ok = rpc(asia_node, sys, resume, [C2sPid]), 746: C2sPid ! resume_timeout, 747: 748: %% Let C2sPid to process the message and reroute (and die finally, poor little thing) 749: mongoose_helper:wait_for_pid_to_die(C2sPid), 750: 751: escalus_client:send(Alice, chat_with_seqnum(bare_client(Eve), <<"Hi again from Europe1!">>)), 752: escalus_client:send(NewEve, escalus_stanza:chat_to(Alice, <<"Hi from Europe!">>)), 753: 754: AfterCheck(Alice, NewEve) 755: end). 756: 757: test_global_disco(Config) -> 758: escalus:fresh_story( 759: Config, [{alice, 1}, {eve, 1}], 760: fun(Alice, Eve) -> 761: AliceServer = escalus_client:server(Alice), 762: escalus:send(Alice, escalus_stanza:service_discovery(AliceServer)), 763: _AliceStanza = escalus:wait_for_stanza(Alice), 764: %% TODO: test for duplicate components 765: %%escalus:assert(fun has_exactly_one_service/2, [muc_helper:muc_host()], AliceStanza), 766: 767: EveServer = escalus_client:server(Eve), 768: escalus:send(Eve, escalus_stanza:service_discovery(EveServer)), 769: EveStanza = escalus:wait_for_stanza(Eve), 770: escalus:assert(has_service, [muc_helper:muc_host()], EveStanza) 771: end). 772: 773: test_component_unregister(_Config) -> 774: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 775: {port, service_port()}, {component, <<"test_service">>}], 776: 777: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 778: ?assertMatch({ok, _}, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 779: [<<"test_service.localhost">>])), 780: 781: component_helper:disconnect_component(Comp, Addr), 782: 783: ?assertEqual(error, rpc(europe_node1, mod_global_distrib_mapping, for_domain, 784: [<<"test_service.localhost">>])). 785: 786: test_error_on_wrong_hosts(_Config) -> 787: Opts = [{cookie, "cookie"}, {local_host, "no_such_host"}, {global_host, "localhost"}], 788: ?assertException(error, {badrpc, {'EXIT', {"no_such_host is not a member of the host list", _}}}, 789: dynamic_modules:start(node_spec(europe_node1), <<"localhost">>, 790: mod_global_distrib, Opts)). 791: 792: refresh_nodes(Config) -> 793: NodesKey = ?config(nodes_key, Config), 794: NodeBin = ?config(node_to_expire, Config), 795: redis_query(europe_node1, [<<"HSET">>, NodesKey, NodeBin, <<"0">>]), 796: refresh_mappings(europe_node1, "by_refresh_nodes"), 797: {ok, undefined} = redis_query(europe_node1, [<<"HGET">>, NodesKey, NodeBin]). 798: 799: test_in_order_messages_on_multiple_connections(Config) -> 800: escalus:fresh_story( 801: Config, [{alice, 1}, {eve, 1}], 802: fun(Alice, Eve) -> 803: Seq = lists:seq(1, 100), 804: lists:foreach( 805: fun(I) -> 806: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 807: escalus_client:send(Alice, Stanza) 808: end, 809: Seq), 810: lists:foreach( 811: fun(I) -> 812: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 813: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 814: end, 815: Seq) 816: end). 817: 818: test_in_order_messages_on_multiple_connections_with_bounce(Config) -> 819: escalus:fresh_story( 820: Config, [{alice, 1}, {eve, 1}], 821: fun(Alice, Eve) -> 822: %% Send 99 messages, some while server knows the mapping and some when it doesn't 823: send_steps(Alice, Eve, 99, <<"reg1">>), 824: %% Make sure that the last message is sent when the mapping is known 825: set_mapping(europe_node1, Eve, <<"reg1">>), 826: escalus_client:send(Alice, escalus_stanza:chat_to(Eve, <<"100">>)), 827: 828: %% Check that all stanzas were received in order 829: lists:foreach( 830: fun(I) -> 831: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 832: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 833: end, 834: lists:seq(1, 100)) 835: end). 836: 837: test_messages_bounced_in_order(Config) -> 838: escalus:fresh_story( 839: Config, [{alice, 1}, {eve, 1}], 840: fun(Alice, Eve) -> 841: %% Make sure all messages land in bounce storage 842: delete_mapping(europe_node1, Eve), 843: 844: Seq = lists:seq(1, 100), 845: lists:foreach( 846: fun(I) -> 847: Stanza = escalus_stanza:chat_to(Eve, integer_to_binary(I)), 848: escalus_client:send(Alice, Stanza) 849: end, 850: Seq), 851: 852: %% Restore the mapping so that bounce eventually succeeds 853: ?assertEqual(undefined, get_mapping(europe_node1, Eve)), 854: set_mapping(europe_node1, Eve, <<"reg1">>), 855: 856: lists:foreach( 857: fun(I) -> 858: Stanza = escalus_client:wait_for_stanza(Eve, 5000), 859: escalus:assert(is_chat_message, [integer_to_binary(I)], Stanza) 860: end, 861: Seq) 862: end). 863: 864: test_update_senders_host(Config) -> 865: escalus:fresh_story( 866: Config, [{alice, 1}, {eve, 1}], 867: fun(Alice, Eve) -> 868: AliceJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Alice)]), 869: {ok, <<"localhost.bis">>} 870: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]), 871: ok = rpc(europe_node1, mod_global_distrib_mapping, delete_for_jid, [AliceJid]), 872: wait_for_node(asia_node, AliceJid), 873: 874: %% TODO: Should prevent Redis refresher from executing for a moment, 875: %% as it may collide with this test. 876: 877: escalus:send(Alice, escalus_stanza:chat_to(Eve, <<"test_update_senders_host">>)), 878: escalus:wait_for_stanza(Eve), 879: 880: {ok, <<"localhost.bis">>} 881: = rpc(asia_node, mod_global_distrib_mapping, for_jid, [AliceJid]) 882: end). 883: wait_for_node(Node,Jid) -> 884: mongoose_helper:wait_until(fun() -> rpc(Node, mod_global_distrib_mapping, for_jid, [Jid]) end, 885: error, 886: #{time_left => timer:seconds(10), 887: sleep_time => timer:seconds(1), 888: name => rpc}). 889: 890: test_update_senders_host_by_ejd_service(Config) -> 891: refresh_hosts([europe_node1, europe_node2, asia_node], "by_test_update_senders_host_by_ejd_service"), 892: %% Connects to europe_node1 893: ComponentConfig = [{server, <<"localhost">>}, {host, <<"localhost">>}, {password, <<"secret">>}, 894: {port, service_port()}, {component, <<"test_service">>}], 895: 896: {Comp, Addr, _Name} = component_helper:connect_component(ComponentConfig), 897: 898: escalus:fresh_story( 899: Config, [{eve, 1}], 900: fun(Eve) -> 901: %% Eve is connected to asia_node 902: EveJid = rpc(asia_node, jid, from_binary, [escalus_client:full_jid(Eve)]), 903: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 904: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]), 905: 906: ok = rpc(asia_node, mod_global_distrib_mapping, delete_for_jid, [EveJid]), 907: wait_for_node(europe_node1, EveJid), 908: wait_for_node(europe_node2, EveJid), 909: 910: %% Component is connected to europe_node1 911: %% but we force asia_node to connect to europe_node2 by hiding europe_node1 912: %% and forcing rebalance (effectively disabling connections to europe_node1) 913: %% to verify routing cache update on both nodes 914: 915: %% TODO: Should prevent Redis refresher from executing for a moment, 916: %% as it may collide with this test. 917: 918: hide_node(europe_node1, Config), 919: {_, EuropeHost, _} = lists:keyfind(europe_node1, 1, get_hosts()), 920: trigger_rebalance(asia_node, list_to_binary(EuropeHost)), 921: 922: escalus:send(Eve, escalus_stanza:chat_to(Addr, <<"hi">>)), 923: escalus:wait_for_stanza(Comp), 924: 925: {ok, <<"reg1">>} = rpc(europe_node1, mod_global_distrib_mapping, for_jid, [EveJid]), 926: {ok, <<"reg1">>} = rpc(europe_node2, mod_global_distrib_mapping, for_jid, [EveJid]) 927: end). 928: 929: %% -------------------------------- Rebalancing -------------------------------- 930: 931: enable_new_endpoint_on_refresh(Config) -> 932: get_connection(europe_node1, <<"reg1">>), 933: 934: {Enabled1, _Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 935: 936: ExtraPort = get_port(reg, gd_extra_endpoint_port), 937: NewEndpoint = enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 938: 939: {Enabled2, _Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 940: 941: %% One new pool and one new endpoint 942: [NewEndpoint] = Pools2 -- Pools1, 943: [] = Pools1 -- Pools2, 944: [NewEndpoint] = Enabled2 -- Enabled1, 945: [] = Enabled1 -- Enabled2. 946: 947: disable_endpoint_on_refresh(Config) -> 948: ExtraPort = get_port(reg, gd_extra_endpoint_port), 949: enable_extra_endpoint(asia_node, europe_node1, ExtraPort, Config), 950: 951: get_connection(europe_node1, <<"reg1">>), 952: 953: {Enabled1, Disabled1, Pools1} = get_outgoing_connections(europe_node1, <<"reg1">>), 954: [_, _] = Enabled1, 955: [] = Disabled1, 956: 957: hide_extra_endpoint(asia_node), 958: trigger_rebalance(europe_node1, <<"reg1">>), 959: 960: {Enabled2, Disabled2, Pools2} = get_outgoing_connections(europe_node1, <<"reg1">>), 961: 962: %% 2 pools open even after disable 963: [] = Pools1 -- Pools2, 964: [] = Pools2 -- Pools1, 965: %% NewEndpoint is no longer enabled 966: [] = Enabled2 -- Enabled1, 967: [NewEndpoint] = Enabled1 -- Enabled2, 968: %% NewEndpoint is now disabled 969: [] = Disabled1, 970: [NewEndpoint] = Disabled2. 971: 972: wait_for_connection(_Config) -> 973: set_endpoints(asia_node, []), 974: %% Because of hosts refresher, a pool of connections to asia_node 975: %% may already be present here 976: mongoose_helper:wait_until( 977: fun () -> 978: try trigger_rebalance(europe_node1, <<"reg1">>), true 979: catch _:_ -> false end 980: end, 981: true, 982: #{name => rebalance, time_left => timer:seconds(5)}), 983: 984: spawn_connection_getter(europe_node1), 985: 986: receive 987: Unexpected1 -> error({unexpected, Unexpected1}) 988: after 989: 2000 -> ok 990: end, 991: 992: refresh_mappings(asia_node, "by_wait_for_connection"), 993: trigger_rebalance(europe_node1, <<"reg1">>), 994: 995: receive 996: Conn when is_pid(Conn) -> ok; 997: Unexpected2 -> error({unexpected, Unexpected2}) 998: after 999: 5000 -> error(timeout) 1000: end. 1001: 1002: closed_connection_is_removed_from_disabled(_Config) -> 1003: get_connection(europe_node1, <<"reg1">>), 1004: set_endpoints(asia_node, []), 1005: trigger_rebalance(europe_node1, <<"reg1">>), 1006: 1007: {[], [_], [_]} = get_outgoing_connections(europe_node1, <<"reg1">>), 1008: 1009: % Will drop connections and prevent them from reconnecting 1010: restart_receiver(asia_node, [listen_endpoint(get_port(reg, gd_supplementary_endpoint_port))]), 1011: 1012: mongoose_helper:wait_until(fun() -> get_outgoing_connections(europe_node1, <<"reg1">>) end, 1013: {[], [], []}, 1014: #{name => get_outgoing_connections}). 1015: 1016: 1017: %%-------------------------------------------------------------------- 1018: %% Test helpers 1019: %%-------------------------------------------------------------------- 1020: 1021: get_port(Host, Param) -> 1022: case ct:get_config({hosts, Host, Param}) of 1023: Port when is_integer(Port) -> 1024: Port; 1025: Other -> 1026: ct:fail({get_port_failed, Host, Param, Other}) 1027: end. 1028: 1029: get_hosts() -> 1030: [ 1031: {europe_node1, "localhost.bis", get_port(mim, gd_endpoint_port)}, 1032: {europe_node2, "localhost.bis", get_port(mim2, gd_endpoint_port)}, 1033: {asia_node, "reg1", get_port(reg, gd_endpoint_port)} 1034: ]. 1035: 1036: listen_endpoint(NodeName) when is_atom(NodeName) -> 1037: {_, _, Port} = lists:keyfind(NodeName, 1, get_hosts()), 1038: listen_endpoint(Port); 1039: listen_endpoint(Port) when is_integer(Port) -> 1040: {{127, 0, 0, 1}, Port}. 1041: 1042: %% For dynamic_modules 1043: node_spec(NodeName) -> 1044: #{node => ct:get_config(NodeName), timeout => timer:seconds(30)}. 1045: 1046: rpc(NodeName, M, F, A) -> 1047: Node = ct:get_config(NodeName), 1048: mongoose_helper:successful_rpc(#{node => Node}, M, F, A, timer:seconds(30)). 1049: 1050: hide_node(NodeName, Config) -> 1051: NodesKey = ?config(nodes_key, Config), 1052: NodeBin = atom_to_binary(ct:get_config(NodeName), latin1), 1053: {ok, <<"1">>} = redis_query(europe_node1, [<<"HDEL">>, NodesKey, NodeBin]). 1054: 1055: connect_from_spec(UserSpec, Config) -> 1056: {ok, User} = escalus_client:start(Config, UserSpec, <<"res1">>), 1057: escalus_connection:set_filter_predicate(User, fun(S) -> not escalus_pred:is_presence(S) end), 1058: escalus_story:send_initial_presence(User), 1059: User. 1060: 1061: chat_with_seqnum(To, Text) -> 1062: escalus_stanza:set_id(escalus_stanza:chat_to(To, Text), 1063: integer_to_binary(erlang:monotonic_time())). 1064: 1065: order_by_seqnum(Stanzas) -> 1066: lists:sort(fun(A, B) -> exml_query:attr(B, <<"id">>) < exml_query:attr(A, <<"id">>) end, 1067: Stanzas). 1068: 1069: has_exactly_one_service(Service, #xmlel{children = [#xmlel{children = Services}]}) -> 1070: Pred = fun(Item) -> 1071: exml_query:attr(Item, <<"jid">>) =:= Service 1072: end, 1073: case lists:filter(Pred, Services) of 1074: [_] -> true; 1075: _ -> false 1076: end. 1077: 1078: send_steps(From, To, Max, ToHost) -> 1079: next_send_step(From, To, 1, Max, Max div 10, true, ToHost). 1080: 1081: next_send_step(_From, _To, I, Max, _ToReset, _KnowsMapping, _ToHost) when I > Max -> ok; 1082: next_send_step(From, To, I, Max, 0, KnowsMapping, ToHost) -> 1083: ct:log("Reset: I: ~B", [I]), 1084: case KnowsMapping of 1085: true -> delete_mapping(europe_node1, To); 1086: false -> set_mapping(europe_node1, To, ToHost) 1087: end, 1088: next_send_step(From, To, I, Max, Max div 10, not KnowsMapping, ToHost); 1089: next_send_step(From, To, I, Max, ToReset, KnowsMapping, ToHost) -> 1090: ct:log("I: ~B ~B ~B", [I, Max, ToReset]), 1091: Stanza = escalus_stanza:chat_to(To, integer_to_binary(I)), 1092: escalus_client:send(From, Stanza), 1093: next_send_step(From, To, I + 1, Max, ToReset - 1, KnowsMapping, ToHost). 1094: 1095: get_mapping(Node, Client) -> 1096: {FullJid, _BareJid} = jids(Client), 1097: {ok, What} = redis_query(Node, [<<"GET">>, FullJid]), 1098: What. 1099: 1100: %% Warning! May not work properly with alice or any other user whose 1101: %% stringprepped JID is different than original one 1102: delete_mapping(Node, Client) -> 1103: {FullJid, BareJid} = jids(Client), 1104: redis_query(Node, [<<"DEL">>, FullJid, BareJid]), 1105: Jid = rpc(Node, jid, from_binary, [FullJid]), 1106: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1107: 1108: set_mapping(Node, Client, Mapping) -> 1109: {FullJid, BareJid} = jids(Client), 1110: redis_query(Node, [<<"MSET">>, FullJid, Mapping, BareJid, Mapping]), 1111: Jid = rpc(Node, jid, from_binary, [FullJid]), 1112: rpc(Node, mod_global_distrib_mapping, clear_cache, [Jid]). 1113: 1114: jids(Client) -> 1115: FullJid = escalus_client:full_jid(Client), 1116: BareJid = escalus_client:short_jid(Client), 1117: {FullJid, BareJid}. 1118: 1119: redis_query(Node, Query) -> 1120: {ok, RedisWorker} = rpc(Node, mongoose_wpool, get_worker, [redis, global, global_distrib]), 1121: rpc(Node, eredis, q, [RedisWorker, Query]). 1122: 1123: %% A fake address we don't try to connect to. 1124: %% Used in test_advertised_endpoints_override_endpoints testcase. 1125: advertised_endpoints() -> 1126: [ 1127: {fake_domain(), get_port(reg, gd_endpoint_port)} 1128: ]. 1129: 1130: fake_domain() -> 1131: "somefakedomain.com". 1132: 1133: iptuples_to_string([]) -> 1134: []; 1135: iptuples_to_string([{Addr, Port} | Endps]) when is_tuple(Addr) -> 1136: [{inet_parse:ntoa(Addr), Port} | iptuples_to_string(Endps)]; 1137: iptuples_to_string([E | Endps]) -> 1138: [E | iptuples_to_string(Endps)]. 1139: 1140: maybe_add_advertised_endpoints(NodeName, Opts, Config) -> 1141: Endpoints = proplists:get_value(NodeName, ?config(add_advertised_endpoints, Config), []), 1142: case Endpoints of 1143: [] -> 1144: Opts; 1145: E -> 1146: Connections = case lists:keyfind(connections, 1, Opts) of 1147: false -> []; 1148: C -> C 1149: end, 1150: NewConnections = {connections, [{advertised_endpoints, E} | Connections]}, 1151: [NewConnections | Opts] 1152: end. 1153: 1154: mock_inet_on_each_node() -> 1155: Nodes = lists:map(fun({NodeName, _, _}) -> ct:get_config(NodeName) end, get_hosts()), 1156: Results = lists:map(fun(Node) -> rpc:block_call(Node, ?MODULE, mock_inet, []) end, Nodes), 1157: true = lists:all(fun(Result) -> Result =:= ok end, Results). 1158: 1159: execute_on_each_node(M, F, A) -> 1160: lists:map(fun({NodeName, _, _}) -> rpc(NodeName, M, F, A) end, get_hosts()). 1161: 1162: mock_inet() -> 1163: %% We don't want to mock inet module itself to avoid strange networking issues 1164: meck:new(mod_global_distrib_utils, [non_strict, passthrough, unstick]), 1165: meck:expect(mod_global_distrib_utils, getaddrs, fun(_, inet) -> {ok, [{127, 0, 0, 1}]}; 1166: (_, inet6) -> {error, "No ipv6 address"} end). 1167: 1168: unmock_inet(_Pids) -> 1169: execute_on_each_node(meck, unload, [mod_global_distrib_utils]). 1170: 1171: out_connection_sups(Node) -> 1172: Children = rpc(Node, supervisor, which_children, [mod_global_distrib_outgoing_conns_sup]), 1173: lists:filter(fun({Sup, _, _, _}) -> Sup =/= mod_global_distrib_hosts_refresher end, Children). 1174: 1175: trees_for_connections_present() -> 1176: AsiaChildren = out_connection_sups(asia_node), 1177: Europe1Children = out_connection_sups(europe_node1), 1178: Europe2Children = out_connection_sups(europe_node2), 1179: lists:all(fun(Host) -> length(Host) > 0 end, [AsiaChildren, Europe1Children, Europe2Children]). 1180: 1181: tree_for_sup_present(Node, ExpectedSup) -> 1182: Children = out_connection_sups(Node), 1183: lists:keyfind(ExpectedSup, 1, Children) =/= false. 1184: 1185: 1186: %% ------------------------------- rebalancing helpers ----------------------------------- 1187: 1188: spawn_connection_getter(SenderNode) -> 1189: TestPid = self(), 1190: spawn(fun() -> 1191: Conn = get_connection(SenderNode, <<"reg1">>), 1192: TestPid ! Conn 1193: end). 1194: 1195: enable_extra_endpoint(ListenNode, SenderNode, Port, _Config) -> 1196: OriginalEndpoint = listen_endpoint(ListenNode), 1197: NewEndpoint = {{127, 0, 0, 1}, Port}, 1198: 1199: restart_receiver(ListenNode, [NewEndpoint, OriginalEndpoint]), 1200: refresh_mappings(ListenNode, "by_enable_extra_endpoint,port=" ++ integer_to_list(Port)), 1201: trigger_rebalance(SenderNode, <<"reg1">>), 1202: 1203: NewEndpoint. 1204: 1205: get_connection(SenderNode, ToDomain) -> 1206: rpc(SenderNode, mod_global_distrib_outgoing_conns_sup, get_connection, [ToDomain]). 1207: 1208: hide_extra_endpoint(ListenNode) -> 1209: set_endpoints(ListenNode, [listen_endpoint(ListenNode)]). 1210: 1211: set_endpoints(ListenNode, Endpoints) -> 1212: {ok, _} = rpc(ListenNode, mod_global_distrib_mapping_redis, set_endpoints, [Endpoints]). 1213: 1214: get_outgoing_connections(NodeName, DestinationDomain) -> 1215: Supervisor = rpc(NodeName, mod_global_distrib_utils, server_to_sup_name, [DestinationDomain]), 1216: Manager = rpc(NodeName, mod_global_distrib_utils, server_to_mgr_name, [DestinationDomain]), 1217: Enabled = rpc(NodeName, mod_global_distrib_server_mgr, 1218: get_enabled_endpoints, [DestinationDomain]), 1219: Disabled = rpc(NodeName, mod_global_distrib_server_mgr, 1220: get_disabled_endpoints, [DestinationDomain]), 1221: PoolsChildren = rpc(NodeName, supervisor, which_children, [Supervisor]), 1222: Pools = [ Id || {Id, _Child, _Type, _Modules} <- PoolsChildren, Id /= Manager ], 1223: {Enabled, Disabled, Pools}. 1224: 1225: restart_receiver(NodeName) -> 1226: restart_receiver(NodeName, [listen_endpoint(NodeName)]). 1227: 1228: restart_receiver(NodeName, NewEndpoints) -> 1229: OldOpts = rpc(NodeName, gen_mod, get_module_opts, 1230: [<<"localhost">>, mod_global_distrib_receiver]), 1231: NewOpts = lists:keyreplace(endpoints, 1, OldOpts, {endpoints, NewEndpoints}), 1232: Node = node_spec(NodeName), 1233: dynamic_modules:restart(Node, <<"localhost">>, mod_global_distrib_receiver, NewOpts). 1234: 1235: trigger_rebalance(NodeName, DestinationDomain) when is_binary(DestinationDomain) -> 1236: %% To ensure that the manager exists, 1237: %% otherwise we can get noproc error in the force_refresh call 1238: ok = rpc(NodeName, mod_global_distrib_outgoing_conns_sup, 1239: ensure_server_started, [DestinationDomain]), 1240: rpc(NodeName, mod_global_distrib_server_mgr, force_refresh, [DestinationDomain]), 1241: StateInfo = rpc(NodeName, mod_global_distrib_server_mgr, get_state_info, [DestinationDomain]), 1242: ct:log("mgr_state_info_after_rebalance nodename=~p state_info=~p", [NodeName, StateInfo]), 1243: timer:sleep(1000). 1244: 1245: %% ----------------------------------------------------------------------- 1246: %% Escalus-related helpers 1247: 1248: user_receives(User, Bodies) -> 1249: ExpectedLength = length(Bodies), 1250: Messages = escalus_client:wait_for_stanzas(User, ExpectedLength), 1251: SortedMessages = order_by_seqnum(Messages), 1252: case length(Messages) of 1253: ExpectedLength -> 1254: Checks = [escalus_pred:is_chat_message(Body, Stanza) || {Body, Stanza} <- lists:zip(Bodies, SortedMessages)], 1255: case lists:all(fun(Check) -> Check end, Checks) of 1256: true -> 1257: ok; 1258: false -> 1259: ct:fail({user_receives_failed, {wanted, Bodies}, {received, SortedMessages}, {check, Checks}}) 1260: end; 1261: _ -> 1262: ct:fail({user_receives_not_enough, {wanted, Bodies}, {received, SortedMessages}}) 1263: end. 1264: 1265: 1266: %% ----------------------------------------------------------------------- 1267: %% Refreshing helpers 1268: 1269: %% Reason is a string 1270: %% NodeName is asia_node, europe_node2, ... in a format used by this suite. 1271: refresh_mappings(NodeName, Reason) when is_list(Reason) -> 1272: rpc(NodeName, mod_global_distrib_mapping_redis, refresh, [Reason]). 1273: 1274: refresh_hosts(NodeNames, Reason) -> 1275: [refresh_mappings(NodeName, Reason) || NodeName <- NodeNames]. 1276: 1277: 1278: %% ----------------------------------------------------------------------- 1279: %% Other helpers 1280: 1281: connect_steps_with_sm() -> 1282: [start_stream, stream_features, maybe_use_ssl, 1283: authenticate, bind, session, stream_resumption]. 1284: 1285: bare_client(Client) -> 1286: Client#client{jid = escalus_utils:get_short_jid(Client)}. 1287: 1288: service_port() -> 1289: ct:get_config({hosts, mim, service_port}). 1290: 1291: 1292: %% ----------------------------------------------------------------------- 1293: %% Waiting helpers 1294: 1295: wait_for_domain(Node, Domain) -> 1296: F = fun() -> 1297: {ok, Domains} = rpc:call(Node, mod_global_distrib_mapping, all_domains, []), 1298: lists:member(Domain, Domains) 1299: end, 1300: mongoose_helper:wait_until(F, true, #{name => {wait_for_domain, Node, Domain}}). 1301: 1302: %% We receive presence BEFORE session is registered in ejabberd_sm. 1303: %% So, to ensure that we processed do_open_session completely, let's send a "ping". 1304: %% by calling the c2s process. 1305: %% That call would only return, when all messages in erlang message queue 1306: %% are processed. 1307: wait_for_registration(Client, Node) -> 1308: RPCSpec = #{node => Node}, 1309: mongoose_helper:wait_until(fun() -> is_pid(mongoose_helper:get_session_pid(Client, RPCSpec)) end, true, 1310: #{name => wait_for_session}), 1311: C2sPid = mongoose_helper:get_session_pid(Client, RPCSpec), 1312: rpc:call(node(C2sPid), ejabberd_c2s, get_info, [C2sPid]), 1313: ok. 1314: 1315: 1316: %% ----------------------------------------------------------------------- 1317: %% Ensure, that endpoints are up 1318: 1319: wait_for_listeners_to_appear() -> 1320: [wait_for_can_connect_to_port(Port) || Port <- receiver_ports(get_hosts())]. 1321: 1322: receiver_ports(Hosts) -> 1323: lists:map(fun({_NodeName, _LocalHost, ReceiverPort}) -> ReceiverPort end, Hosts). 1324: 1325: wait_for_can_connect_to_port(Port) -> 1326: Opts = #{time_left => timer:seconds(30), sleep_time => 1000, name => {can_connect_to_port, Port}}, 1327: mongoose_helper:wait_until(fun() -> can_connect_to_port(Port) end, true, Opts). 1328: 1329: can_connect_to_port(Port) -> 1330: case gen_tcp:connect("127.0.0.1", Port, []) of 1331: {ok, Sock} -> 1332: gen_tcp:close(Sock), 1333: true; 1334: Other -> 1335: ct:pal("can_connect_to_port port=~p result=~p", [Port, Other]), 1336: false 1337: end. 1338: 1339: %% ----------------------------------------------------------------------- 1340: %% Custom log levels for GD modules during the tests 1341: 1342: enable_logging() -> 1343: mim_loglevel:enable_logging(test_hosts(), custom_loglevels()). 1344: 1345: disable_logging() -> 1346: mim_loglevel:disable_logging(test_hosts(), custom_loglevels()). 1347: 1348: custom_loglevels() -> 1349: %% for "s2s connection to muc.localhost not found" debugging 1350: [{ejabberd_s2s, debug}, 1351: %% for debugging event=refreshing_own_data_done 1352: {mod_global_distrib_mapping_redis, info}, 1353: %% to know if connection is already started or would be started 1354: %% event=outgoing_conn_start_progress 1355: {mod_global_distrib_outgoing_conns_sup, info}, 1356: %% to debug bound connection issues 1357: {mod_global_distrib, debug}, 1358: %% to know all new connections pids 1359: {mod_global_distrib_connection, debug}, 1360: %% to check if gc or refresh is triggered 1361: {mod_global_distrib_server_mgr, info}, 1362: %% To debug incoming connections 1363: % {mod_global_distrib_receiver, info}, 1364: %% to debug global session set/delete 1365: {mod_global_distrib_mapping, debug} 1366: ]. 1367: 1368: test_hosts() -> [mim, mim2, reg].