1: %% Stream Management tests 2: -module(sm_SUITE). 3: 4: -compile([export_all, nowarn_export_all]). 5: -include_lib("exml/include/exml.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: -include_lib("escalus/include/escalus.hrl"). 8: -include_lib("common_test/include/ct.hrl"). 9: 10: %% Injected code callbacks 11: -export([rpc_start_hook_handler/3, 12: rpc_stop_hook_handler/2, 13: hook_handler_fn/3, 14: regression_handler/5]). 15: 16: -export([rpc_start_filter_hook_handler/3, 17: rpc_stop_filter_hook_handler/2, 18: filter_hook_handler_fn/3]). 19: 20: -import(distributed_helper, [mim/0, 21: require_rpc_nodes/1, 22: rpc/4]). 23: 24: -import(domain_helper, [host_type/0]). 25: 26: -import(sm_helper, [connect_fresh/3, 27: connect_fresh/4, 28: connect_spec/2, 29: connect_spec/3, 30: connect_same/2, 31: connect_same/3, 32: connect_resume/2, 33: process_initial_stanza/1, 34: send_initial_presence/1, 35: get_ack/1, 36: ack_initial_presence/1]). 37: 38: -define(MOD_SM, mod_stream_management). 39: -define(CONSTRAINT_CHECK_TIMEOUT, 5000). 40: -define(LONG_TIMEOUT, 3600). 41: -define(SHORT_TIMEOUT, 1). 42: -define(SMALL_SM_BUFFER, 3). 43: -define(PING_REQUEST_TIMEOUT, 2). 44: -define(PING_INTERVAL, 5). 45: 46: %%-------------------------------------------------------------------- 47: %% Suite configuration 48: %%-------------------------------------------------------------------- 49: 50: suite() -> 51: require_rpc_nodes([mim]) ++ escalus:suite(). 52: 53: all() -> 54: ct_helper:groups_to_all(groups()) ++ [ping_timeout]. 55: 56: groups() -> 57: [ 58: {parallel, [parallel], parallel_cases()}, 59: {parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()}, 60: {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, 61: {stale_h, [], stale_h_cases()}, 62: {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()} 63: ]. 64: 65: parallel_cases() -> 66: [server_announces_sm, 67: server_enables_sm_before_session, 68: server_enables_sm_after_session, 69: server_returns_failed_after_start, 70: server_returns_failed_after_auth, 71: server_enables_resumption, 72: client_enables_sm_twice_fails_with_correct_error_stanza, 73: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza, 74: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session, 75: basic_ack, 76: h_ok_before_session, 77: h_ok_after_session_enabled_before_session, 78: h_ok_after_session_enabled_after_session, 79: h_ok_after_a_chat, 80: h_non_given_closes_stream_gracefully, 81: resend_unacked_on_reconnection, 82: session_established, 83: wait_for_resumption, 84: resume_session, 85: resume_session_with_wrong_h_does_not_leak_sessions, 86: resume_session_with_wrong_sid_returns_item_not_found, 87: resume_session_with_wrong_namespace_is_a_noop, 88: resume_dead_session_results_in_item_not_found, 89: resume_session_kills_old_C2S_gracefully, 90: carboncopy_works, 91: carboncopy_works_after_resume, 92: aggressively_pipelined_resume, 93: replies_are_processed_by_resumed_session, 94: subscription_requests_are_buffered_properly, 95: messages_are_properly_flushed_during_resumption]. 96: 97: parallel_manual_ack_freq_1_cases() -> 98: [client_acks_more_than_sent, 99: too_many_unacked_stanzas, 100: resend_unacked_after_resume_timeout, 101: resume_session_state_send_message_with_ack, 102: resume_session_state_send_message_without_ack, 103: resume_session_state_stop_c2s, 104: server_requests_ack_after_session, 105: resend_more_offline_messages_than_buffer_size, 106: server_requests_ack]. 107: 108: manual_ack_freq_2_cases() -> 109: [server_requests_ack_freq_2]. 110: 111: stale_h_cases() -> 112: [resume_expired_session_returns_correct_h, 113: gc_repeat_after_never_means_no_cleaning, 114: gc_repeat_after_timeout_does_clean]. 115: 116: stream_mgmt_disabled_cases() -> 117: [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt, 118: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption]. 119: 120: manual_ack_freq_long_session_timeout_cases() -> 121: [preserve_order]. 122: 123: parallel_unacknowledged_message_hook_cases() -> 124: [unacknowledged_message_hook_bounce, 125: unacknowledged_message_hook_offline, 126: unacknowledged_message_hook_resume, 127: unacknowledged_message_hook_filter]. 128: 129: %%-------------------------------------------------------------------- 130: %% Init & teardown 131: %%-------------------------------------------------------------------- 132: 133: init_per_suite(Config) -> 134: NewConfig = dynamic_modules:save_modules(host_type(), Config), 135: NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), 136: mongoose_helper:inject_module(?MODULE), 137: escalus:init_per_suite(NewConfigWithSM). 138: 139: end_per_suite(Config) -> 140: escalus_fresh:clean(), 141: dynamic_modules:restore_modules(Config), 142: escalus:end_per_suite(Config). 143: 144: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; 145: Group =:= manual_ack_freq_long_session_timeout; 146: Group =:= parallel_manual_ack_freq_1; 147: Group =:= manual_ack_freq_2 -> 148: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 149: Config; 150: init_per_group(stale_h, Config) -> 151: Config; 152: init_per_group(stream_mgmt_disabled, Config) -> 153: dynamic_modules:stop(host_type(), ?MOD_SM), 154: rpc(mim(), mnesia, delete_table, [sm_session]), 155: Config; 156: init_per_group(Group, Config) -> 157: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 158: Config. 159: 160: end_per_group(_Group, _Config) -> 161: ok. 162: 163: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> 164: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 165: escalus:init_per_testcase(CN, Config); 166: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; 167: CN =:= gc_repeat_after_timeout_does_clean -> 168: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 169: Config2 = register_some_smid_h(Config), 170: escalus:init_per_testcase(CN, Config2); 171: init_per_testcase(ping_timeout = CN, Config) -> 172: ok = rpc(mim(), meck, new, [mod_ping, [passthrough, no_link]]), 173: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 174: escalus:init_per_testcase(CN, Config); 175: init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> 176: escalus:init_per_testcase(CN, Config); 177: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 178: register_handler(), 179: escalus:init_per_testcase(CN, Config); 180: init_per_testcase(CaseName, Config) -> 181: escalus:init_per_testcase(CaseName, Config). 182: 183: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; 184: CN =:= gc_repeat_after_never_means_no_cleaning; 185: CN =:= gc_repeat_after_timeout_does_clean -> 186: Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]), 187: rpc(mim(), ejabberd_sup, stop_child, [Name]), 188: escalus:end_per_testcase(CN, Config); 189: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 190: unregister_handler(), 191: escalus:end_per_testcase(CN, Config); 192: end_per_testcase(ping_timeout = CN, Config) -> 193: rpc(mim(), meck, unload, [mod_ping]), 194: escalus:end_per_testcase(CN, Config); 195: end_per_testcase(CaseName, Config) -> 196: escalus:end_per_testcase(CaseName, Config). 197: 198: %% Module configuration per group (in case of stale_h group it is per testcase) 199: 200: required_modules(Scope, Name) -> 201: SMConfig = case required_sm_opts(Scope, Name) of 202: stopped -> stopped; 203: ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts) 204: end, 205: Backend = mongoose_helper:mnesia_or_rdbms_backend(), 206: BaseModules = [ 207: {mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}, 208: {mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})} 209: ], 210: case Name of 211: ping_timeout -> 212: BaseModules ++ [{mod_ping, config_parser_helper:mod_config(mod_ping, mod_ping_opts())}]; 213: _ -> 214: BaseModules 215: end. 216: 217: required_sm_opts(group, parallel) -> 218: #{ack_freq => never}; 219: required_sm_opts(group, parallel_manual_ack_freq_1) -> 220: #{ack_freq => 1, 221: resume_timeout => ?LONG_TIMEOUT}; 222: required_sm_opts(group, manual_ack_freq_2) -> 223: #{ack_freq => 2}; 224: required_sm_opts(group, stream_mgmt_disabled) -> 225: stopped; 226: required_sm_opts(group, parallel_unacknowledged_message_hook) -> 227: #{ack_freq => 1}; 228: required_sm_opts(group, manual_ack_freq_long_session_timeout) -> 229: #{ack_freq => 1, buffer_max => 1000}; 230: required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> 231: #{ack_freq => 1, 232: resume_timeout => ?SHORT_TIMEOUT, 233: stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)}; 234: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> 235: #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)}; 236: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> 237: #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}; 238: required_sm_opts(testcase, ping_timeout) -> 239: #{ack_freq => 1, 240: resume_timeout => ?SHORT_TIMEOUT}. 241: 242: common_sm_opts() -> 243: Backend = ct_helper:get_internal_database(), 244: #{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}. 245: 246: stale_h(RepeatAfter, Geriatric) -> 247: #{enabled => true, 248: repeat_after => RepeatAfter, 249: geriatric => Geriatric}. 250: 251: make_smid() -> 252: base64:encode(crypto:strong_rand_bytes(21)). 253: 254: register_smid(IntSmidId) -> 255: S = {SMID = make_smid(), IntSmidId}, 256: ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), 257: S. 258: 259: register_some_smid_h(Config) -> 260: TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)), 261: [{smid_test, TestSmids} | Config]. 262: 263: mod_ping_opts() -> 264: #{send_pings => true, 265: ping_interval => ?PING_INTERVAL, 266: ping_req_timeout => ?PING_REQUEST_TIMEOUT, 267: timeout_action => kill}. 268: 269: %%-------------------------------------------------------------------- 270: %% Tests 271: %%-------------------------------------------------------------------- 272: 273: server_announces_sm(Config) -> 274: AliceSpec = escalus_fresh:freshen_spec(Config, alice), 275: {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, 276: [start_stream]), 277: true = escalus_session:can_use_stream_management(Props, Features). 278: 279: 280: server_enables_sm_before_session(Config) -> 281: connect_fresh(Config, alice, sm_after_bind). 282: 283: server_enables_sm_after_session(Config) -> 284: connect_fresh(Config, alice, sm_after_session). 285: 286: server_returns_failed_after_start(Config) -> 287: Alice = connect_fresh(Config, alice, before_auth), 288: server_returns_failed(Alice). 289: 290: server_returns_failed_after_auth(Config) -> 291: Alice = connect_fresh(Config, alice, auth), 292: server_returns_failed(Alice). 293: 294: server_enables_resumption(Config) -> 295: Alice = connect_fresh(Config, alice, sr_presence), 296: escalus_connection:stop(Alice). 297: 298: server_returns_failed(Alice) -> 299: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 300: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 301: escalus_connection:get_stanza(Alice, enable_sm_failed)). 302: 303: client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> 304: Alice = connect_fresh(Config, alice, sm_before_session), 305: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 306: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 307: escalus_connection:get_stanza(Alice, enable_sm_failed)), 308: escalus:assert(is_stream_end, 309: escalus_connection:get_stanza(Alice, enable_sm_failed)), 310: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 311: 312: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> 313: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 314: Alice = connect_fresh(Config, alice, sr_presence), 315: SMH = escalus_connection:get_sm_h(Alice), 316: %% WHEN USER RESUMES SESSION FROM NEW CLIENT 317: Alice2 = connect_resume(Alice, SMH), 318: process_initial_stanza(Alice2), 319: %% THEN: Old session is gracefully closed with the correct error stanza 320: escalus:assert(is_stream_error, [<<"conflict">>, <<>>], 321: escalus_connection:get_stanza(Alice, close_old_stream)), 322: escalus:assert(is_stream_end, 323: escalus_connection:get_stanza(Alice, close_old_stream)), 324: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), 325: true = escalus_connection:is_connected(Alice2), 326: escalus_connection:stop(Alice2). 327: 328: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> 329: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 330: Alice = connect_fresh(Config, alice, sr_presence), 331: %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT 332: Alice2 = sm_helper:kill_and_connect_resume(Alice), 333: process_initial_stanza(Alice2), 334: %% THEN new session does not have any message rerouted 335: false = escalus_client:has_stanzas(Alice2), 336: true = escalus_connection:is_connected(Alice2), 337: escalus_connection:stop(Alice2). 338: 339: basic_ack(Config) -> 340: Alice = connect_fresh(Config, alice, sm_after_session), 341: escalus_connection:send(Alice, escalus_stanza:roster_get()), 342: escalus:assert(is_roster_result, 343: escalus_connection:get_stanza(Alice, roster_result)), 344: escalus_connection:send(Alice, escalus_stanza:sm_request()), 345: escalus:assert(is_sm_ack, 346: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 347: 348: %% Test that "h" value is valid when: 349: %% - SM is enabled *before* the session is established 350: %% - <r/> is sent *before* the session is established 351: h_ok_before_session(Config) -> 352: Alice = connect_fresh(Config, alice, sm_after_bind), 353: escalus_connection:send(Alice, escalus_stanza:sm_request()), 354: escalus:assert(is_sm_ack, [0], 355: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 356: 357: %% Test that "h" value is valid when: 358: %% - SM is enabled *before* the session is established 359: %% - <r/> is sent *after* the session is established 360: h_ok_after_session_enabled_before_session(Config) -> 361: Alice = connect_fresh(Config, alice, sm_before_session), 362: escalus_connection:send(Alice, escalus_stanza:sm_request()), 363: escalus:assert(is_sm_ack, [1], 364: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 365: 366: %% Test that "h" value is valid when: 367: %% - SM is enabled *after* the session is established 368: %% - <r/> is sent *after* the session is established 369: h_ok_after_session_enabled_after_session(Config) -> 370: Alice = connect_fresh(Config, alice, sm_after_session), 371: escalus_connection:send(Alice, escalus_stanza:roster_get()), 372: escalus:assert(is_roster_result, 373: escalus_connection:get_stanza(Alice, roster_result)), 374: escalus_connection:send(Alice, escalus_stanza:sm_request()), 375: escalus:assert(is_sm_ack, [1], 376: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 377: 378: %% Test that "h" value is valid after exchanging a few messages. 379: h_ok_after_a_chat(ConfigIn) -> 380: Config = escalus_users:update_userspec(ConfigIn, alice, 381: stream_management, true), 382: escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> 383: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), 384: escalus:assert(is_chat_message, [<<"Hi, Bob!">>], 385: escalus:wait_for_stanza(Bob)), 386: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 387: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 388: escalus:wait_for_stanza(Alice)), 389: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), 390: escalus:assert(is_chat_message, [<<"How's life?">>], 391: escalus:wait_for_stanza(Alice)), 392: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), 393: escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], 394: escalus:wait_for_stanza(Bob)), 395: escalus:send(Alice, escalus_stanza:sm_request()), 396: escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), 397: %% Ack, so that unacked messages don't go into offline store. 398: escalus:send(Alice, escalus_stanza:sm_ack(3)) 399: end). 400: 401: h_non_given_closes_stream_gracefully(ConfigIn) -> 402: AStanza = #xmlel{name = <<"a">>, 403: attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, 404: Config = escalus_users:update_userspec(ConfigIn, alice, 405: stream_management, true), 406: escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> 407: C2SPid = mongoose_helper:get_session_pid(Alice), 408: escalus:send(Alice, AStanza), 409: escalus:assert(is_stream_error, 410: [<<"policy-violation">>, <<>>], 411: escalus:wait_for_stanza(Alice)), 412: mongoose_helper:wait_for_pid_to_die(C2SPid), 413: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 414: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) 415: end). 416: 417: client_acks_more_than_sent(Config) -> 418: Alice = connect_fresh(Config, alice, sm_after_session), 419: escalus:send(Alice, escalus_stanza:sm_ack(5)), 420: StreamErrorStanza = escalus:wait_for_stanza(Alice), 421: %% Assert "undefined-condition" children 422: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), 423: %% Assert "handled-count-too-high" children with correct attributes 424: HandledCountSubElement = exml_query:path(StreamErrorStanza, 425: [{element_with_ns, 426: <<"handled-count-too-high">>, 427: <<"urn:xmpp:sm:3">>}]), 428: <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), 429: <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), 430: %% Assert graceful stream end 431: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 432: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 433: 434: too_many_unacked_stanzas(Config) -> 435: Bob = connect_fresh(Config, bob, presence), 436: Alice = connect_fresh(Config, alice, sm_presence, manual), 437: get_ack(Alice), 438: [escalus:send(Bob, escalus_stanza:chat_to(Alice, 439: <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) 440: || N <- lists:seq(1,?SMALL_SM_BUFFER)], 441: escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests 442: escalus:assert(is_stream_error, [<<"resource-constraint">>, 443: <<"too many unacked stanzas">>], 444: %% wait for deferred buffer check 445: escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). 446: 447: server_requests_ack(Config) -> 448: server_requests_ack(Config, 1). 449: 450: server_requests_ack_freq_2(Config) -> 451: server_requests_ack(Config, 2). 452: 453: server_requests_ack(Config, N) -> 454: Bob = connect_fresh(Config, bob, presence), 455: Alice = connect_fresh(Config, alice, sm_presence, manual), 456: %% ack request after initial presence 457: maybe_assert_ack_request(1, N, Alice), 458: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 459: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 460: escalus:wait_for_stanza(Alice)), 461: maybe_assert_ack_request(2, N, Alice). 462: 463: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> 464: ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), 465: case StanzasRec rem AckRequests of 466: 0 -> 467: escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); 468: _ -> 469: ok 470: end, 471: StanzasRec. 472: 473: server_requests_ack_after_session(Config) -> 474: Alice = connect_fresh(Config, alice, sm_before_session, manual), 475: escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). 476: 477: resend_more_offline_messages_than_buffer_size(Config) -> 478: %% connect bob and alice 479: Bob = connect_fresh(Config, bob, presence), 480: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 481: 482: % sent some messages - more than unacked buffer size 483: MessagesToSend = ?SMALL_SM_BUFFER + 1, 484: AliceJid = common_helper:get_bjid(AliceSpec), 485: [escalus_connection:send(Bob, escalus_stanza:chat_to(AliceJid, integer_to_binary(I))) 486: || I <- lists:seq(1, MessagesToSend)], 487: mongoose_helper:wait_for_n_offline_messages(AliceJid, MessagesToSend), 488: 489: % connect alice who wants to receive all messages from offline storage 490: Alice = connect_spec(AliceSpec, sm_after_session, manual), 491: send_initial_presence(Alice), 492: escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests 493: 494: escalus_connection:get_stanza(Alice, presence), 495: get_ack(Alice), % ack request 496: 497: % confirm messages + presence 498: escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), 499: % wait for check constraint message on server side 500: 501: ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), 502: false = escalus_client:has_stanzas(Alice), 503: % should not receive anything especially any stream errors 504: 505: escalus_connection:stop(Alice), 506: escalus_connection:stop(Bob). 507: 508: resend_unacked_on_reconnection(Config) -> 509: Texts = three_texts(), 510: Bob = connect_fresh(Config, bob, presence), 511: Alice = connect_fresh(Config, alice, sm_presence), 512: AliceSpec = sm_helper:client_to_spec0(Alice), 513: %% Bob sends some messages to Alice. 514: sm_helper:send_messages(Bob, Alice, Texts), 515: %% Alice receives the messages. 516: sm_helper:wait_for_messages(Alice, Texts), 517: %% Alice disconnects without acking the messages. 518: sm_helper:stop_client_and_wait_for_termination(Alice), 519: %% Messages go to the offline store. 520: %% Alice receives the messages from the offline store. 521: NewAlice = connect_spec(AliceSpec, session, manual), 522: send_initial_presence(NewAlice), 523: sm_helper:wait_for_messages(NewAlice, Texts), 524: %% Alice acks the delayed messages so they won't go again 525: %% to the offline store. 526: escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). 527: 528: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order 529: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM 530: %% and more strict tests, reproducing delays in SM and in mod_offline. 531: preserve_order(Config) -> 532: %% connect bob and alice 533: Bob = connect_fresh(Config, bob, presence), 534: Alice = connect_fresh(Config, alice, sr_presence, manual), 535: AliceSpec = sm_helper:client_to_spec(Alice), 536: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), 537: 538: %% kill alice connection 539: escalus_connection:kill(Alice), 540: C2SPid = mongoose_helper:get_session_pid(Alice), 541: sm_helper:wait_until_resume_session(C2SPid), 542: 543: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), 544: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), 545: 546: NewAlice = connect_spec(AliceSpec, session, manual), 547: escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), 548: 549: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), 550: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), 551: 552: %% Without this check we will get stuff out of order 553: mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), 554: 555: send_initial_presence(NewAlice), 556: %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered 557: mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), 558: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), 559: 560: %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) 561: receive_all_ordered(NewAlice, 6), 562: 563: % replace connection 564: NewAlice2 = connect_spec(AliceSpec, session, manual), 565: % allow messages to go to the offline storage 566: mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), 567: 568: send_initial_presence(NewAlice2), 569: 570: % receves messages in correct order 571: receive_all_ordered(NewAlice2, 6), 572: 573: escalus_connection:stop(Bob), 574: escalus_connection:stop(NewAlice2). 575: 576: receive_all_ordered(Conn, Last) -> 577: receive_all_ordered(Conn, 1, Last, []). 578: 579: %% Receive messages from N to Last. 580: %% Ignores acks and presences. 581: %% Handles case when out of order and when not enough stanzas. 582: receive_all_ordered(_Conn, N, Last, Acc) when N > Last -> 583: Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)), 584: sm_helper:assert_messages(Acc, Texts); 585: receive_all_ordered(Conn, N, Last, Acc) -> 586: Stanzas = escalus:wait_for_stanzas(Conn, 1), 587: case Stanzas of 588: [#xmlel{name = <<"message">>}] -> 589: receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas); 590: [_] -> %% Ack or presence 591: receive_all_ordered(Conn, N, Last, Acc); 592: [] -> 593: ct:fail({timeout_waiting, N, Acc}) 594: end. 595: 596: resend_unacked_after_resume_timeout(Config) -> 597: %% connect bob and alice 598: Bob = connect_fresh(Config, bob, presence), 599: Alice = connect_fresh(Config, alice, sr_presence), 600: AliceSpec = sm_helper:client_to_spec(Alice), 601: 602: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 603: %% kill alice connection 604: escalus_connection:kill(Alice), 605: 606: %% ensure there is no session 607: C2SPid = mongoose_helper:get_session_pid(Alice), 608: sm_helper:wait_until_resume_session(C2SPid), 609: 610: %% alice come back and receives unacked message 611: NewAlice = connect_spec(AliceSpec, session), 612: send_initial_presence(NewAlice), 613: 614: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], 615: escalus:wait_for_stanzas(NewAlice, 2)), 616: 617: escalus_connection:stop(Bob), 618: escalus_connection:stop(NewAlice). 619: 620: ping_timeout(Config) -> 621: %% make sure there are no leftover stanzas in the history 622: ?assertEqual([], get_stanzas_filtered_by_mod_ping()), 623: 624: %% connect Alice 625: Alice = connect_fresh(Config, alice, session), 626: 627: %% manually configure stream resumption to avoid ping stanzas 628: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 629: Pred = fun(Stanza) -> escalus_pred:is_sm_enabled([resume], Stanza) end, 630: SM = escalus_connection:receive_stanza(Alice, #{ pred => Pred }), 631: SMH = escalus_connection:get_sm_h(Alice), 632: SMID = exml_query:attr(SM, <<"id">>), 633: 634: escalus_client:wait_for_stanza(Alice), 635: ct:sleep(?PING_REQUEST_TIMEOUT + ?PING_INTERVAL + timer:seconds(1)), 636: 637: %% attempt to resume the session after the connection drop 638: sm_helper:kill_client_and_wait_for_termination(Alice), 639: NewAlice = connect_same(Alice, auth), 640: escalus_connection:send(NewAlice, escalus_stanza:resume(SMID, SMH)), 641: 642: %% after resume_timeout, we expect the session to be closed 643: escalus_connection:get_stanza(NewAlice, failed_resumption), 644: 645: %% bind a new session and expect unacknowledged messages to be resent 646: escalus_session:session(escalus_session:bind(NewAlice)), 647: send_initial_presence(NewAlice), 648: 649: %% check if the error stanza was handled by mod_ping 650: FilteredStanzas = get_stanzas_filtered_by_mod_ping(), 651: ?assertNotEqual(length(FilteredStanzas), 0), 652: Fun = fun(Stanza) -> 653: escalus:assert(is_iq_error, Stanza), 654: ?assertNotEqual(undefined, 655: exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, <<"urn:xmpp:ping">>)) 656: end, 657: lists:foreach(Fun, FilteredStanzas), 658: 659: %% kill the connection to avoid errors 660: escalus_connection:kill(NewAlice). 661: 662: resume_expired_session_returns_correct_h(Config) -> 663: %% connect bob and alice 664: Bob = connect_fresh(Config, bob, sr_presence), 665: Alice = connect_fresh(Config, alice, sr_presence, manual), 666: get_ack(Alice), 667: 668: %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge 669: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 670: escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), 671: %% alice comes back, but too late, so resumption doesn't work, 672: %% but she receives the previous h = 1 anyway 673: %% NewAlice is also manual ack 674: NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), 675: FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), 676: <<"1">> = exml_query:attr(FailedResumption, <<"h">>), 677: %% And we can continue with bind and session 678: escalus_session:session(escalus_session:bind(NewAlice)), 679: send_initial_presence(NewAlice), 680: Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), 681: escalus_connection:get_stanza(NewAlice, {msg, 2})], 682: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), 683: escalus_connection:stop(Bob), 684: escalus_connection:stop(NewAlice). 685: 686: gc_repeat_after_never_means_no_cleaning(Config) -> 687: [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), 688: {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), 689: {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), 690: {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). 691: 692: gc_repeat_after_timeout_does_clean(Config) -> 693: [{SMID1, _} | _ ] = ?config(smid_test, Config), 694: mongoose_helper:wait_until(fun() -> 695: rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) 696: end, 697: {error, smid_not_found}, 698: #{name => smid_garbage_collected}). 699: 700: resume_session_state_send_message_without_ack(Config) -> 701: resume_session_state_send_message_generic(Config, no_ack). 702: 703: resume_session_state_send_message_with_ack(Config) -> 704: resume_session_state_send_message_generic(Config, ack). 705: 706: resume_session_state_send_message_generic(Config, AckInitialPresence) -> 707: %% connect bob and alice 708: Bob = connect_fresh(Config, bob, presence), 709: Alice = connect_fresh(Config, alice, sr_presence, manual), 710: maybe_ack_initial_presence(Alice, AckInitialPresence), 711: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 712: %% kill alice connection 713: C2SPid = mongoose_helper:get_session_pid(Alice), 714: escalus_connection:kill(Alice), 715: sm_helper:wait_until_resume_session(C2SPid), 716: sm_helper:assert_alive_resources(Alice, 1), 717: 718: %% send some messages and check if c2s can handle it 719: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 720: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 721: %% suspend the process to ensure that Alice has enough time to reconnect, 722: %% before resumption timeout occurs. 723: ok = rpc(mim(), sys, suspend, [C2SPid]), 724: 725: %% alice comes back and receives unacked message 726: NewAlice = connect_same(Alice, presence), 727: %% now we can resume c2s process of the old connection 728: %% and let it process session resumption timeout 729: ok = rpc(mim(), sys, resume, [C2SPid]), 730: Stanzas = escalus:wait_for_stanzas(NewAlice, 3), 731: 732: % what about order ? 733: % alice receive presence from herself and 3 unacked messages from bob 734: escalus_new_assert:mix_match([is_chat(<<"msg-1">>), 735: is_chat(<<"msg-2">>), 736: is_chat(<<"msg-3">>)], 737: Stanzas), 738: escalus_connection:stop(Bob), 739: escalus_connection:stop(NewAlice). 740: 741: %%for instance it can be done by mod ping 742: resume_session_state_stop_c2s(Config) -> 743: Bob = connect_fresh(Config, bob, presence), 744: Alice = connect_fresh(Config, alice, sr_presence, manual), 745: 746: get_ack(Alice), 747: ack_initial_presence(Alice), 748: 749: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 750: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), 751: 752: %% get pid of c2s 753: C2SPid = mongoose_helper:get_session_pid(Alice), 754: %% Wait c2s process to process our presence ack. 755: %% Otherwise, we can receive two initial presences sometimes. 756: sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), 757: 758: % kill alice connection 759: escalus_connection:kill(Alice), 760: % session should be alive 761: sm_helper:assert_alive_resources(Alice, 1), 762: rpc(mim(), mongoose_c2s, stop, [C2SPid, normal]), 763: sm_helper:wait_until_resume_session(C2SPid), 764: %% suspend the process to ensure that Alice has enough time to reconnect, 765: %% before resumption timeout occurs. 766: ok = rpc(mim(), sys, suspend, [C2SPid]), 767: 768: %% alice comes back and receives unacked message 769: NewAlice = connect_same(Alice, presence, manual), 770: %% now we can resume c2s process of the old connection 771: %% and let it process session resumption timeout 772: ok = rpc(mim(), sys, resume, [C2SPid]), 773: 774: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), 775: escalus_connection:stop(Bob), 776: escalus_connection:stop(NewAlice). 777: 778: %% This test only verifies the validity of helpers (get_session_pid, 779: %% get_c2s_state_name) written for wait_for_resumption 780: %% testcase. 781: session_established(Config) -> 782: Alice = connect_fresh(Config, alice, presence), 783: C2SPid = mongoose_helper:get_session_pid(Alice), 784: session_established = mongoose_helper:get_c2s_state_name(C2SPid), 785: escalus_connection:stop(Alice). 786: 787: %% Ensure that after a violent disconnection, 788: %% the c2s waits for resumption (but don't resume yet). 789: wait_for_resumption(Config) -> 790: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 791: Bob = connect_fresh(Config, bob, session), 792: Texts = three_texts(), 793: {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 794: sm_helper:wait_until_resume_session(C2SPid). 795: 796: unacknowledged_message_hook_filter(Config) -> 797: FilterText = <<"filter">>, 798: Bob = connect_fresh(Config, bob, presence), 799: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 800: Resource = proplists:get_value(username, AliceSpec), 801: HookHandlerExtra = start_filter_hook_listener(FilterText, Resource), 802: Alice = connect_spec([{resource, Resource} | AliceSpec], sr_presence, manual), 803: %% Ack the presence stanza 804: get_ack(Alice), 805: ack_initial_presence(Alice), 806: Messages = [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>, <<"msg-4">>], 807: All = [<<"msg-1">>, FilterText, <<"msg-2">>, FilterText, <<"msg-3">>, <<"msg-4">>], 808: [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, Body)) || Body <- All ], 809: %% kill alice connection 810: C2SPid = mongoose_helper:get_session_pid(Alice), 811: escalus_connection:kill(Alice), 812: sm_helper:wait_until_resume_session(C2SPid), 813: sm_helper:assert_alive_resources(Alice, 1), 814: %% ensure second C2S is registered so all the messages are bounced properly 815: NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual), 816: send_initial_presence(NewAlice), 817: sm_helper:wait_for_resource_count(NewAlice, 2), 818: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 819: %% verify that the filtered message is never received 820: verify_no_receive_filtertext(NewAlice, FilterText, Messages), 821: stop_hook_listener(HookHandlerExtra), 822: escalus_connection:stop(Bob). 823: 824: verify_no_receive_filtertext(_, _, []) -> 825: ok; 826: verify_no_receive_filtertext(Client, FilterText, Messages) -> 827: case escalus:wait_for_stanzas(Client, 1, 500) of 828: [] -> ct:fail("Messages pending: ~p~n", [Messages]); 829: [St] -> 830: case exml_query:path(St, [{element, <<"body">>}, cdata]) of 831: undefined -> 832: verify_no_receive_filtertext(Client, FilterText, Messages); 833: FilterText -> 834: ct:fail("Composing forwarded from a different c2s process"); 835: Message -> 836: Pred = fun(Msg) -> Msg =/= Message end, 837: {Pending, _} = lists:partition(Pred, Messages), 838: verify_no_receive_filtertext(Client, FilterText, Pending) 839: end 840: end. 841: 842: unacknowledged_message_hook_resume(Config) -> 843: unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). 844: 845: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> 846: NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 847: send_initial_presence(NewAlice), 848: {Resource, NewAlice}. 849: 850: unacknowledged_message_hook_bounce(Config) -> 851: unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). 852: 853: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> 854: NewResource = <<"new_", Resource/binary>>, 855: NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), 856: NewAlice = connect_spec(NewSpec, sr_session, manual), 857: send_initial_presence(NewAlice), 858: %% ensure second C2S is registered so all the messages are bounced properly 859: sm_helper:wait_for_resource_count(NewAlice, 2), 860: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 861: {NewResource, NewAlice}. 862: 863: unacknowledged_message_hook_offline(Config) -> 864: unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). 865: 866: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> 867: C2SRef = erlang:monitor(process, C2SPid), 868: sm_helper:wait_for_process_termination(C2SRef), 869: %% reset the session, so old C2S process is stopped 870: NewAlice = connect_spec(AliceSpec, sr_session, manual), 871: %% wait for old C2S termination before send presence. other way 872: %% some of the latest unacknowledged messages can be bounced to 873: %% the new C2S process instead of going to the mod_offline storage. 874: %% looks like all the unacknowledged messages arrive to the new 875: %% C2S, but the message sequence is broken (the bounced messages 876: %% delivered before the messages from the mod_offline storage) 877: send_initial_presence(NewAlice), 878: {Resource, NewAlice}. 879: 880: unacknowledged_message_hook_common(RestartConnectionFN, Config) -> 881: %% connect bob and alice 882: Bob = connect_fresh(Config, bob, presence), 883: 884: AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), 885: Resource = proplists:get_value(username, AliceSpec0), 886: AliceSpec = [{resource, Resource} | AliceSpec0], 887: HookHandlerExtra = start_hook_listener(Resource), 888: Alice = connect_spec(AliceSpec, sr_presence, manual), 889: %% Ack the presence stanza 890: get_ack(Alice), 891: ack_initial_presence(Alice), 892: 893: SMID = sm_helper:client_to_smid(Alice), 894: 895: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 896: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 897: %% kill alice connection 898: C2SPid = mongoose_helper:get_session_pid(Alice), 899: escalus_connection:kill(Alice), 900: sm_helper:wait_until_resume_session(C2SPid), 901: sm_helper:assert_alive_resources(Alice, 1), 902: 903: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), 904: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), 905: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 906: 907: %% send some messages and check if c2s can handle it 908: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 909: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), 910: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), 911: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), 912: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 913: 914: %% alice comes back and receives unacked message 915: {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), 916: 917: mongoose_helper:wait_until( 918: fun() -> 919: Stanza = escalus_connection:get_stanza(NewAlice, msg), 920: escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), 921: ok 922: end, ok), 923: 924: NewC2SPid = mongoose_helper:get_session_pid(NewAlice), 925: escalus_connection:kill(NewAlice), 926: sm_helper:wait_until_resume_session(NewC2SPid), 927: 928: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 929: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 930: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 931: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 932: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 933: stop_hook_listener(HookHandlerExtra), 934: escalus_connection:stop(Bob). 935: 936: resume_session(Config) -> 937: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 938: Texts = three_texts(), 939: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 940: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 941: %% Resume the session. 942: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 943: %% Alice receives the unacked messages from the previous 944: %% interrupted session. 945: sm_helper:wait_for_messages(Alice, Texts), 946: %% Alice acks the received messages. 947: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 948: escalus_connection:stop(Alice) 949: end). 950: 951: resume_session_with_wrong_h_does_not_leak_sessions(Config) -> 952: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 953: Messages = three_texts(), 954: HostType = host_type(), 955: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 956: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), 957: %% Resume the session. 958: Alice = connect_spec(AliceSpec, auth, manual), 959: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), 960: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), 961: escalus_connection:wait_for_close(Alice, timer:seconds(5)), 962: Fun = fun() -> 963: [] = sm_helper:get_user_present_resources(Alice), 964: sm_helper:get_sid_by_stream_id(HostType, SMID) 965: end, 966: mongoose_helper:wait_until(Fun, {error, smid_not_found}, #{name => smid_is_cleaned}) 967: end). 968: 969: resume_session_with_wrong_sid_returns_item_not_found(Config) -> 970: session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). 971: 972: resume_session_with_wrong_namespace_is_a_noop(Config) -> 973: Alice = connect_fresh(Config, alice, auth), 974: #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), 975: Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), 976: escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), 977: escalus_assert:has_no_stanzas(Alice), 978: [] = sm_helper:get_user_present_resources(Alice), 979: true = escalus_connection:is_connected(Alice), 980: escalus_connection:stop(Alice). 981: 982: resume_dead_session_results_in_item_not_found(Config) -> 983: SMID = base64:encode(crypto:strong_rand_bytes(21)), 984: SID = {os:timestamp(), undefined}, 985: HostType = host_type(), 986: rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), 987: session_resumption_expects_item_not_found(Config, SMID). 988: 989: session_resumption_expects_item_not_found(Config, SMID) -> 990: Alice = connect_fresh(Config, alice, auth), 991: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), 992: escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), 993: [] = sm_helper:get_user_present_resources(Alice), 994: true = escalus_connection:is_connected(Alice), 995: escalus_connection:stop(Alice). 996: 997: resume_session_kills_old_C2S_gracefully(Config) -> 998: Alice = connect_fresh(Config, alice, sr_presence, manual), 999: C2SPid = mongoose_helper:get_session_pid(Alice), 1000: 1001: %% Monitor the C2S process and disconnect Alice. 1002: MonitorRef = sm_helper:monitor_session(Alice), 1003: escalus_client:kill_connection(Config, Alice), 1004: 1005: %% Ensure the c2s process is waiting for resumption. 1006: sm_helper:wait_until_resume_session(C2SPid), 1007: 1008: %% Resume the session. 1009: NewAlice = connect_resume(Alice, 1), 1010: 1011: %% C2S process should die gracefully with Reason=normal. 1012: sm_helper:wait_for_process_termination(MonitorRef), 1013: escalus_connection:stop(NewAlice). 1014: 1015: carboncopy_works(Config) -> 1016: escalus:fresh_story(Config, [{alice, 2}, {bob, 1}], fun(Alice1, Alice, Bob) -> 1017: mongoose_helper:enable_carbons([Alice1, Alice]), 1018: escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)), 1019: sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]), 1020: carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}) 1021: end). 1022: 1023: carboncopy_works_after_resume(Config) -> 1024: Texts = three_texts(), 1025: escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice1, Bob) -> 1026: AliceSpec = [{resource, <<"res2">>} | sm_helper:client_to_spec(Alice1)], 1027: F = fun(Alice2) -> 1028: [escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(A)) || A <- [Alice1, Alice2]], 1029: mongoose_helper:enable_carbons([Alice1, Alice2]) 1030: end, 1031: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F), 1032: %% Resume the session. 1033: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 1034: wait_for_carbon_with_bodies(Alice1, Texts, #{from => Bob, to => Alice}), 1035: %% Get a presence from Alice1 again 1036: escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(Alice)), 1037: %% Alice receives an IQ result from the carbon copy enable request 1038: escalus:assert(is_iq_result, [], escalus:wait_for_stanza(Alice)), 1039: %% Alice receives the unacked messages from the previous 1040: %% interrupted session. 1041: sm_helper:wait_for_messages(Alice, Texts), 1042: %% Alice acks the received messages. 1043: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 1044: %% Direct send 1045: escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)), 1046: sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]), 1047: carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}), 1048: escalus_connection:stop(Alice) 1049: end). 1050: 1051: wait_for_carbon_with_bodies(Client, Texts, Params) -> 1052: [carboncopy_helper:wait_for_carbon_chat_with_body(Client, Text, Params) || Text <- Texts]. 1053: 1054: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> 1055: F = fun(_Client) -> ok end, 1056: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F). 1057: 1058: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F) -> 1059: Alice = connect_spec(AliceSpec, sr_presence, manual), 1060: F(Alice), 1061: C2SPid = mongoose_helper:get_session_pid(Alice), 1062: %% Bobs sends some messages to Alice. 1063: sm_helper:send_messages(Bob, Alice, Texts), 1064: %% Alice receives them, but doesn't ack. 1065: sm_helper:wait_for_messages(Alice, Texts), 1066: %% Alice's connection is violently terminated. 1067: escalus_client:kill_connection(Config, Alice), 1068: sm_helper:wait_until_resume_session(C2SPid), 1069: SMID = sm_helper:client_to_smid(Alice), 1070: {C2SPid, SMID}. 1071: 1072: aggressively_pipelined_resume(Config) -> 1073: AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} 1074: | escalus_fresh:create_fresh_user(Config, alice)], 1075: UnackedMessages = three_texts(), 1076: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1077: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), 1078: %% Resume the session. 1079: Alice = escalus_connection:connect(AliceSpec), 1080: 1081: Username = proplists:get_value(username, AliceSpec), 1082: Password = proplists:get_value(password, AliceSpec), 1083: Payload = <<0:8,Username/binary,0:8,Password/binary>>, 1084: Server = proplists:get_value(server, AliceSpec), 1085: 1086: Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 1087: Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), 1088: AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 1089: Resume = escalus_stanza:resume(SMID, 2), 1090: 1091: escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), 1092: Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], 1093: escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), 1094: 1095: escalus_connection:stop(Alice) 1096: end). 1097: 1098: %% This is a regression test for a case when a session processes a request, which will 1099: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. 1100: %% Without proper handling, the reply would be rejected because the resumed session 1101: %% has new SID. 1102: replies_are_processed_by_resumed_session(Config) -> 1103: %% GIVEN a session and registered special IQ handler (added in init_per_testcase), 1104: %% that waits for old session process to terminate (at this point new process 1105: %% has fully taken over) and then actually sends the reply. 1106: Alice = connect_fresh(Config, alice, sr_presence), 1107: 1108: %% WHEN a client sends IQ request to the special handler... 1109: IQReq = escalus_stanza:iq_get(regression_ns(), []), 1110: escalus:send(Alice, IQReq), 1111: 1112: %% ... goes down and session is resumed. 1113: Alice2 = sm_helper:kill_and_connect_resume(Alice), 1114: 1115: %% THEN the client receives the reply properly. 1116: IQReply = escalus:wait_for_stanza(Alice2), 1117: escalus:assert(is_iq_result, [IQReq], IQReply), 1118: escalus_connection:stop(Alice2). 1119: 1120: %% This is a regression test for a bug, which manifested in following scenario 1121: %% (due to improper presence sub requests buffering): 1122: %% 1. Bob is online, Alice is offline 1123: %% 2. Bob subscribes to Alice's presence; 1124: %% 3. Alice becomes online 1125: %% 4. Bob sends a message to Alice 1126: %% 5. Alice doesn't SM-ack the request or message, terminates the connection 1127: %% 6. Alice reconnects but with session *replace*, not resume 1128: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer 1129: %% 8. Alice doesn't receive the buffered message 1130: subscription_requests_are_buffered_properly(Config) -> 1131: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 1132: MsgBody = <<"buffered">>, 1133: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1134: % GIVEN Bob's pending subscription to Alice's presence 1135: AliceJid = common_helper:get_bjid(AliceSpec), 1136: escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), 1137: _RosterPushReq = escalus:wait_for_stanza(Bob), 1138: 1139: % WHEN Alice becomes online... 1140: Alice = connect_spec(AliceSpec, sr_session, manual), 1141: send_initial_presence(Alice), 1142: %% subscribe could come before the initial presence 1143: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1144: escalus:wait_for_stanzas(Alice, 2)), 1145: 1146: % ...and Bob sends a message to Alice... 1147: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1148: MsgStanza = escalus:wait_for_stanza(Alice), 1149: escalus:assert(is_chat_message, [MsgBody], MsgStanza), 1150: 1151: % ...and Alice terminates connection without acking anything... 1152: escalus_client:kill_connection(Config, Alice), 1153: 1154: % ...and reconnects with session replacement. 1155: Alice2 = connect_spec(AliceSpec, session, manual), 1156: 1157: % THEN Alice receives (without sending initial presence): 1158: % * buffered Bob's message (like above) 1159: % Alice DOESN'T receive: 1160: % * buffered subscription request because it is dropped by ejabberd_sm 1161: % because it's treated like repeated sub request to bare JID, so it's not 1162: % processed by any sub req handler (like mod_roster) 1163: % * buffered available presence from Alice - because it is addressed to another SID 1164: % and Alice2 is a brand new session 1165: escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), 1166: sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), 1167: escalus_assert:has_no_stanzas(Alice2), 1168: 1169: %% Only once an initial presence is sent, a subscription request is sent 1170: send_initial_presence(Alice2), 1171: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1172: escalus:wait_for_stanzas(Alice2, 2)), 1173: 1174: sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), 1175: escalus_assert:has_no_stanzas(Alice2), 1176: 1177: escalus_connection:stop(Alice2) 1178: end). 1179: 1180: %% This is a regression test for a bug, due to which messages sent to old session 1181: %% in a middle of state handover were not appended properly to SM buffer. 1182: %% Scenario to reproduce: 1183: %% 1. Online Bob and Alice 1184: %% 2. Alice kills the connection 1185: %% 3. Alice's session is suspended 1186: %% 4. Alice resumes session with new connection. At this moment new session is still not 1187: %% present in session table. `resume` request is stuck in old proc mailbox. 1188: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now 1189: %% old session has two messages in mailbox: `resume` and XML from Bob 1190: %% 6. We resume old process and it begins session handover 1191: %% 7. Bob's message is appended to SM buffer in "flush" step 1192: %% 8. With bug fixed, the message is retransmitted properly 1193: messages_are_properly_flushed_during_resumption(Config) -> 1194: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1195: % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended 1196: Alice = connect_fresh(Config, alice, sr_presence), 1197: SMH = escalus_connection:get_sm_h(Alice), 1198: escalus_client:kill_connection(Config, Alice), 1199: %% The receiver process would stop now 1200: C2SPid = mongoose_helper:get_session_pid(Alice), 1201: sm_helper:wait_until_resume_session(C2SPid), 1202: 1203: sm_helper:wait_for_queue_length(C2SPid, 0), 1204: ok = rpc(mim(), sys, suspend, [C2SPid]), 1205: 1206: % WHEN new session requests resumption 1207: % we wait until that old session has resumption request enqueued; 1208: % we need it to ensure the order of messages: resume first, Bob's chat second. 1209: % Actual wait and message sent by Bob is done in separate process 1210: % because new client start will block until old process is resumed 1211: 1212: MsgBody = <<"flush-regression">>, 1213: spawn_link(fun() -> 1214: sm_helper:wait_for_queue_length(C2SPid, 1), 1215: 1216: % Bob sends a message... 1217: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1218: 1219: % ...we ensure that a message is enqueued in Alice's session... 1220: % (2 messages = resume request + Bob's message) 1221: sm_helper:wait_for_queue_length(C2SPid, 2), 1222: 1223: % ...and old process is resumed. 1224: ok = rpc(mim(), sys, resume, [C2SPid]) 1225: end), 1226: Alice2 = connect_resume(Alice, SMH), 1227: % THEN Alice's new session receives Bob's message 1228: RecvMsg = escalus:wait_for_stanza(Alice2), 1229: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1230: end). 1231: 1232: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> 1233: Alice = connect_fresh(Config, alice, session, manual), 1234: %% Should not crash anything! 1235: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 1236: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1237: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1238: escalus_connection:stop(Alice). 1239: 1240: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> 1241: Alice = connect_fresh(Config, alice, session, manual), 1242: %% Should not crash anything! 1243: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 1244: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1245: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1246: escalus_connection:stop(Alice). 1247: 1248: %%-------------------------------------------------------------------- 1249: %% Helpers 1250: %%-------------------------------------------------------------------- 1251: start_hook_listener(Resource) -> 1252: TestCasePid = self(), 1253: rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). 1254: 1255: stop_hook_listener(HookExtra) -> 1256: rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]). 1257: 1258: rpc_start_hook_handler(TestCasePid, User, HostType) -> 1259: LUser = jid:nodeprep(User), 1260: Extra = #{luser => LUser, pid => TestCasePid}, 1261: gen_hook:add_handler(unacknowledged_message, HostType, 1262: fun ?MODULE:hook_handler_fn/3, 1263: Extra, 50), 1264: Extra. 1265: 1266: rpc_stop_hook_handler(HookExtra, HostType) -> 1267: gen_hook:delete_handler(unacknowledged_message, HostType, 1268: fun ?MODULE:hook_handler_fn/3, 1269: HookExtra, 50). 1270: 1271: hook_handler_fn(Acc, 1272: #{jid := Jid} = _Params, 1273: #{luser := LUser, pid := TestCasePid} = _Extra) -> 1274: {U, _S, R} = jid:to_lower(Jid), 1275: case U of 1276: LUser -> 1277: Counter = mongoose_acc:get(sm_test, counter, 0, Acc), 1278: El = mongoose_acc:element(Acc), 1279: TestCasePid ! {sm_test, Counter, R, El}, 1280: {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)}; 1281: _ -> {ok, Acc} 1282: end. 1283: 1284: wait_for_unacked_msg_hook(Counter, Res, Timeout) -> 1285: receive 1286: {sm_test, AccCounter, Resource, Stanza} = Msg -> 1287: ?assertEqual(Counter, AccCounter, Msg), 1288: ?assertEqual(Res, Resource, Msg), 1289: Stanza 1290: after Timeout -> 1291: timeout 1292: end. 1293: 1294: start_filter_hook_listener(FilterText, Resource) -> 1295: rpc(mim(), ?MODULE, rpc_start_filter_hook_handler, [FilterText, Resource, host_type()]). 1296: 1297: stop_filter_hook_listener(HookExtra) -> 1298: rpc(mim(), ?MODULE, rpc_stop_filter_hook_handler, [HookExtra, host_type()]). 1299: 1300: rpc_start_filter_hook_handler(FilterText, User, HostType) -> 1301: LUser = jid:nodeprep(User), 1302: Extra = #{luser => LUser, filter_text => FilterText}, 1303: gen_hook:add_handler(filter_unacknowledged_messages, HostType, 1304: fun ?MODULE:filter_hook_handler_fn/3, 1305: Extra, 50), 1306: Extra. 1307: 1308: rpc_stop_filter_hook_handler(HookExtra, HostType) -> 1309: gen_hook:delete_handler(filter_unacknowledged_messages, HostType, 1310: fun ?MODULE:filter_hook_handler_fn/3, 1311: HookExtra, 50). 1312: 1313: filter_hook_handler_fn(Buffer, 1314: #{jid := Jid} = _Params, 1315: #{luser := LUser, filter_text := FilterText} = _Extra) -> 1316: {U, _} = jid:to_lus(Jid), 1317: case U of 1318: LUser -> 1319: F = fun(Acc) -> filter_text(Acc, FilterText) end, 1320: NewBuffer = lists:filter(F, Buffer), 1321: {ok, NewBuffer}; 1322: _ -> {ok, Buffer} 1323: end. 1324: 1325: filter_text(Acc, FilterText) -> 1326: case mongoose_acc:stanza_name(Acc) of 1327: <<"message">> -> 1328: El = mongoose_acc:element(Acc), 1329: FilterText =/= exml_query:path(El, [{element, <<"body">>}, cdata]); 1330: _ -> 1331: true 1332: end. 1333: 1334: is_chat(Content) -> 1335: fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end. 1336: 1337: is_presence(Type) -> 1338: fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end. 1339: 1340: three_texts() -> 1341: [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>]. 1342: 1343: get_stanzas_filtered_by_mod_ping() -> 1344: History = rpc(mim(), meck, history, [mod_ping]), 1345: [Stanza || 1346: {_Pid, 1347: {_Mod, 1348: filter_local_packet = _Func, 1349: [{_, _, _, Stanza} = _Acc, _Params, _Extra] = _Args 1350: }, 1351: {stop, drop} = _Result 1352: } <- History 1353: ]. 1354: %%-------------------------------------------------------------------- 1355: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" 1356: %%-------------------------------------------------------------------- 1357: 1358: regression_ns() -> 1359: <<"regression">>. 1360: 1361: register_handler() -> 1362: HostType = host_type(), 1363: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 1364: [HostType, regression_ns(), ejabberd_sm, 1365: fun ?MODULE:regression_handler/5, #{}, one_queue]). 1366: 1367: unregister_handler() -> 1368: HostType = host_type(), 1369: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 1370: [HostType, regression_ns(), ejabberd_sm]). 1371: 1372: regression_handler(Acc, _From, _To, IQ, _Extra) -> 1373: %% A bit of a hack - will no longer work when the SID format changes 1374: {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), 1375: erlang:monitor(process, Pid), 1376: receive 1377: {'DOWN', _, _, _, _} -> 1378: ok 1379: after 1380: 10000 -> 1381: error({c2s_not_stopped_after_timeout, Pid}) 1382: end, 1383: %% We avoid another race condition - there is a short window where user session 1384: %% is not registered in ejabberd_sm: between old process termination and the moment 1385: %% when the new process stores new session in memory. It should be fixed separately. 1386: wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100), 1387: {Acc, jlib:make_result_iq_reply(IQ)}. 1388: 1389: wait_for_session(JID, Retries, SleepTime) -> 1390: case ejabberd_sm:get_session(JID) of 1391: offline -> 1392: timer:sleep(SleepTime), 1393: wait_for_session(JID, Retries - 1, SleepTime); 1394: _ -> 1395: ok 1396: end. 1397: 1398: maybe_ack_initial_presence(Alice, ack) -> 1399: ack_initial_presence(Alice); 1400: maybe_ack_initial_presence(_Alice, no_ack) -> 1401: ok.