1: %% Stream Management tests 2: -module(sm_SUITE). 3: 4: -compile(export_all). 5: -include_lib("exml/include/exml.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: -include_lib("escalus/include/escalus.hrl"). 8: -include_lib("common_test/include/ct.hrl"). 9: 10: %% Injected code callbacks 11: -export([rpc_start_hook_handler/3, 12: rpc_stop_hook_handler/2, 13: hook_handler_fn/3, 14: regression_handler/5]). 15: 16: -import(distributed_helper, [mim/0, 17: require_rpc_nodes/1, 18: rpc/4]). 19: 20: -import(domain_helper, [host_type/0]). 21: 22: -import(sm_helper, [connect_fresh/3, 23: connect_fresh/4, 24: connect_spec/2, 25: connect_spec/3, 26: connect_same/2, 27: connect_same/3, 28: connect_resume/2, 29: process_initial_stanza/1, 30: send_initial_presence/1, 31: get_ack/1, 32: ack_initial_presence/1]). 33: 34: -define(MOD_SM, mod_stream_management). 35: -define(CONSTRAINT_CHECK_TIMEOUT, 5000). 36: -define(LONG_TIMEOUT, 3600). 37: -define(SHORT_TIMEOUT, 1). 38: -define(SMALL_SM_BUFFER, 3). 39: 40: %%-------------------------------------------------------------------- 41: %% Suite configuration 42: %%-------------------------------------------------------------------- 43: 44: suite() -> 45: require_rpc_nodes([mim]) ++ escalus:suite(). 46: 47: all() -> 48: ct_helper:groups_to_all(groups()). 49: 50: groups() -> 51: P = [parallel], 52: [{parallel, P, parallel_cases()}, 53: {parallel_manual_ack_freq_1, P, parallel_manual_ack_freq_1_cases()}, 54: {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, 55: {stale_h, [], stale_h_cases()}, 56: {parallel_unacknowledged_message_hook, P, parallel_unacknowledged_message_hook_cases()}]. 57: 58: parallel_cases() -> 59: [server_announces_sm, 60: server_enables_sm_before_session, 61: server_enables_sm_after_session, 62: server_returns_failed_after_start, 63: server_returns_failed_after_auth, 64: server_enables_resumption, 65: client_enables_sm_twice_fails_with_correct_error_stanza, 66: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza, 67: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session, 68: basic_ack, 69: h_ok_before_session, 70: h_ok_after_session_enabled_before_session, 71: h_ok_after_session_enabled_after_session, 72: h_ok_after_a_chat, 73: h_non_given_closes_stream_gracefully, 74: resend_unacked_on_reconnection, 75: session_established, 76: wait_for_resumption, 77: resume_session, 78: resume_session_with_wrong_h_does_not_leak_sessions, 79: resume_session_with_wrong_sid_returns_item_not_found, 80: resume_session_with_wrong_namespace_is_a_noop, 81: resume_dead_session_results_in_item_not_found, 82: resume_session_kills_old_C2S_gracefully, 83: aggressively_pipelined_resume, 84: replies_are_processed_by_resumed_session, 85: subscription_requests_are_buffered_properly, 86: messages_are_properly_flushed_during_resumption, 87: messages_are_properly_flushed_during_resumption_p1_fsm_old]. 88: 89: parallel_manual_ack_freq_1_cases() -> 90: [client_acks_more_than_sent, 91: too_many_unacked_stanzas, 92: resend_unacked_after_resume_timeout, 93: resume_session_state_send_message_with_ack, 94: resume_session_state_send_message_without_ack, 95: resume_session_state_stop_c2s, 96: server_requests_ack_after_session, 97: resend_more_offline_messages_than_buffer_size, 98: server_requests_ack]. 99: 100: manual_ack_freq_2_cases() -> 101: [server_requests_ack_freq_2]. 102: 103: stale_h_cases() -> 104: [resume_expired_session_returns_correct_h, 105: gc_repeat_after_never_means_no_cleaning, 106: gc_repeat_after_timeout_does_clean]. 107: 108: stream_mgmt_disabled_cases() -> 109: [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt, 110: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption]. 111: 112: manual_ack_freq_long_session_timeout_cases() -> 113: [preserve_order]. 114: 115: parallel_unacknowledged_message_hook_cases() -> 116: [unacknowledged_message_hook_bounce, 117: unacknowledged_message_hook_offline, 118: unacknowledged_message_hook_resume]. 119: 120: %%-------------------------------------------------------------------- 121: %% Init & teardown 122: %%-------------------------------------------------------------------- 123: 124: init_per_suite(Config) -> 125: NewConfig = dynamic_modules:save_modules(host_type(), Config), 126: NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), 127: mongoose_helper:inject_module(?MODULE), 128: escalus:init_per_suite(NewConfigWithSM). 129: 130: end_per_suite(Config) -> 131: escalus_fresh:clean(), 132: dynamic_modules:restore_modules(Config), 133: escalus:end_per_suite(Config). 134: 135: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; 136: Group =:= manual_ack_freq_long_session_timeout; 137: Group =:= parallel_manual_ack_freq_1; 138: Group =:= manual_ack_freq_2 -> 139: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 140: Config; 141: init_per_group(stale_h, Config) -> 142: Config; 143: init_per_group(stream_mgmt_disabled, Config) -> 144: dynamic_modules:stop(host_type(), ?MOD_SM), 145: rpc(mim(), mnesia, delete_table, [sm_session]), 146: Config; 147: init_per_group(Group, Config) -> 148: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 149: Config. 150: 151: end_per_group(_Group, _Config) -> 152: ok. 153: 154: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> 155: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 156: escalus:init_per_testcase(CN, Config); 157: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; 158: CN =:= gc_repeat_after_timeout_does_clean -> 159: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 160: Config2 = register_some_smid_h(Config), 161: escalus:init_per_testcase(CN, Config2); 162: init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> 163: escalus:init_per_testcase(CN, Config); 164: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 165: register_handler(), 166: escalus:init_per_testcase(CN, Config); 167: init_per_testcase(CaseName, Config) -> 168: escalus:init_per_testcase(CaseName, Config). 169: 170: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; 171: CN =:= gc_repeat_after_never_means_no_cleaning; 172: CN =:= gc_repeat_after_timeout_does_clean -> 173: rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]), 174: escalus:end_per_testcase(CN, Config); 175: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 176: unregister_handler(), 177: escalus:end_per_testcase(CN, Config); 178: end_per_testcase(CaseName, Config) -> 179: escalus:end_per_testcase(CaseName, Config). 180: 181: %% Module configuration per group (in case of stale_h group it is per testcase) 182: 183: required_modules(Scope, Name) -> 184: SMConfig = case required_sm_opts(Scope, Name) of 185: stopped -> stopped; 186: ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts) 187: end, 188: [{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}, 189: {mod_offline, []}]. 190: 191: required_sm_opts(group, parallel) -> 192: #{ack_freq => never}; 193: required_sm_opts(group, parallel_manual_ack_freq_1) -> 194: #{ack_freq => 1, 195: resume_timeout => ?SHORT_TIMEOUT}; 196: required_sm_opts(group, manual_ack_freq_2) -> 197: #{ack_freq => 2}; 198: required_sm_opts(group, stream_mgmt_disabled) -> 199: stopped; 200: required_sm_opts(group, parallel_unacknowledged_message_hook) -> 201: #{ack_freq => 1}; 202: required_sm_opts(group, manual_ack_freq_long_session_timeout) -> 203: #{ack_freq => 1, buffer_max => 1000}; 204: required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> 205: #{ack_freq => 1, 206: resume_timeout => ?SHORT_TIMEOUT, 207: stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)}; 208: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> 209: #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)}; 210: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> 211: #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}. 212: 213: common_sm_opts() -> 214: #{buffer_max => ?SMALL_SM_BUFFER}. 215: 216: stale_h(RepeatAfter, Geriatric) -> 217: #{enabled => true, 218: repeat_after => RepeatAfter, 219: geriatric => Geriatric}. 220: 221: make_smid() -> 222: base64:encode(crypto:strong_rand_bytes(21)). 223: 224: register_smid(IntSmidId) -> 225: S = {SMID = make_smid(), IntSmidId}, 226: ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), 227: S. 228: 229: register_some_smid_h(Config) -> 230: TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)), 231: [{smid_test, TestSmids} | Config]. 232: 233: %%-------------------------------------------------------------------- 234: %% Tests 235: %%-------------------------------------------------------------------- 236: 237: server_announces_sm(Config) -> 238: AliceSpec = escalus_fresh:freshen_spec(Config, alice), 239: {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, 240: [start_stream]), 241: true = escalus_session:can_use_stream_management(Props, Features). 242: 243: 244: server_enables_sm_before_session(Config) -> 245: connect_fresh(Config, alice, sm_after_bind). 246: 247: server_enables_sm_after_session(Config) -> 248: connect_fresh(Config, alice, sm_after_session). 249: 250: server_returns_failed_after_start(Config) -> 251: Alice = connect_fresh(Config, alice, before_auth), 252: server_returns_failed(Alice). 253: 254: server_returns_failed_after_auth(Config) -> 255: Alice = connect_fresh(Config, alice, auth), 256: server_returns_failed(Alice). 257: 258: server_enables_resumption(Config) -> 259: Alice = connect_fresh(Config, alice, sr_presence), 260: escalus_connection:stop(Alice). 261: 262: server_returns_failed(Alice) -> 263: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 264: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 265: escalus_connection:get_stanza(Alice, enable_sm_failed)). 266: 267: client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> 268: Alice = connect_fresh(Config, alice, sm_before_session), 269: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 270: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 271: escalus_connection:get_stanza(Alice, enable_sm_failed)), 272: escalus:assert(is_stream_end, 273: escalus_connection:get_stanza(Alice, enable_sm_failed)), 274: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 275: 276: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> 277: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 278: Alice = connect_fresh(Config, alice, sr_presence), 279: SMH = escalus_connection:get_sm_h(Alice), 280: %% WHEN USER RESUMES SESSION FROM NEW CLIENT 281: Alice2 = connect_resume(Alice, SMH), 282: process_initial_stanza(Alice2), 283: %% THEN: Old session is gracefully closed with the correct error stanza 284: escalus:assert(is_stream_error, [<<"conflict">>, <<>>], 285: escalus_connection:get_stanza(Alice, close_old_stream)), 286: escalus:assert(is_stream_end, 287: escalus_connection:get_stanza(Alice, close_old_stream)), 288: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), 289: true = escalus_connection:is_connected(Alice2), 290: escalus_connection:stop(Alice2). 291: 292: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> 293: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 294: Alice = connect_fresh(Config, alice, sr_presence), 295: %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT 296: Alice2 = sm_helper:kill_and_connect_resume(Alice), 297: process_initial_stanza(Alice2), 298: %% THEN new session does not have any message rerouted 299: false = escalus_client:has_stanzas(Alice2), 300: true = escalus_connection:is_connected(Alice2), 301: escalus_connection:stop(Alice2). 302: 303: basic_ack(Config) -> 304: Alice = connect_fresh(Config, alice, sm_after_session), 305: escalus_connection:send(Alice, escalus_stanza:roster_get()), 306: escalus:assert(is_roster_result, 307: escalus_connection:get_stanza(Alice, roster_result)), 308: escalus_connection:send(Alice, escalus_stanza:sm_request()), 309: escalus:assert(is_sm_ack, 310: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 311: 312: %% Test that "h" value is valid when: 313: %% - SM is enabled *before* the session is established 314: %% - <r/> is sent *before* the session is established 315: h_ok_before_session(Config) -> 316: Alice = connect_fresh(Config, alice, sm_after_bind), 317: escalus_connection:send(Alice, escalus_stanza:sm_request()), 318: escalus:assert(is_sm_ack, [0], 319: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 320: 321: %% Test that "h" value is valid when: 322: %% - SM is enabled *before* the session is established 323: %% - <r/> is sent *after* the session is established 324: h_ok_after_session_enabled_before_session(Config) -> 325: Alice = connect_fresh(Config, alice, sm_before_session), 326: escalus_connection:send(Alice, escalus_stanza:sm_request()), 327: escalus:assert(is_sm_ack, [1], 328: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 329: 330: %% Test that "h" value is valid when: 331: %% - SM is enabled *after* the session is established 332: %% - <r/> is sent *after* the session is established 333: h_ok_after_session_enabled_after_session(Config) -> 334: Alice = connect_fresh(Config, alice, sm_after_session), 335: escalus_connection:send(Alice, escalus_stanza:roster_get()), 336: escalus:assert(is_roster_result, 337: escalus_connection:get_stanza(Alice, roster_result)), 338: escalus_connection:send(Alice, escalus_stanza:sm_request()), 339: escalus:assert(is_sm_ack, [1], 340: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 341: 342: %% Test that "h" value is valid after exchanging a few messages. 343: h_ok_after_a_chat(ConfigIn) -> 344: Config = escalus_users:update_userspec(ConfigIn, alice, 345: stream_management, true), 346: escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> 347: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), 348: escalus:assert(is_chat_message, [<<"Hi, Bob!">>], 349: escalus:wait_for_stanza(Bob)), 350: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 351: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 352: escalus:wait_for_stanza(Alice)), 353: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), 354: escalus:assert(is_chat_message, [<<"How's life?">>], 355: escalus:wait_for_stanza(Alice)), 356: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), 357: escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], 358: escalus:wait_for_stanza(Bob)), 359: escalus:send(Alice, escalus_stanza:sm_request()), 360: escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), 361: %% Ack, so that unacked messages don't go into offline store. 362: escalus:send(Alice, escalus_stanza:sm_ack(3)) 363: end). 364: 365: h_non_given_closes_stream_gracefully(ConfigIn) -> 366: AStanza = #xmlel{name = <<"a">>, 367: attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, 368: Config = escalus_users:update_userspec(ConfigIn, alice, 369: stream_management, true), 370: escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> 371: C2SPid = mongoose_helper:get_session_pid(Alice), 372: escalus:send(Alice, AStanza), 373: escalus:assert(is_stream_error, 374: [<<"policy-violation">>, <<>>], 375: escalus:wait_for_stanza(Alice)), 376: mongoose_helper:wait_for_pid_to_die(C2SPid), 377: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 378: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) 379: end). 380: 381: client_acks_more_than_sent(Config) -> 382: Alice = connect_fresh(Config, alice, sm_after_session), 383: escalus:send(Alice, escalus_stanza:sm_ack(5)), 384: StreamErrorStanza = escalus:wait_for_stanza(Alice), 385: %% Assert "undefined-condition" children 386: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), 387: %% Assert "handled-count-too-high" children with correct attributes 388: HandledCountSubElement = exml_query:path(StreamErrorStanza, 389: [{element_with_ns, 390: <<"handled-count-too-high">>, 391: <<"urn:xmpp:sm:3">>}]), 392: <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), 393: <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), 394: %% Assert graceful stream end 395: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 396: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 397: 398: too_many_unacked_stanzas(Config) -> 399: Bob = connect_fresh(Config, bob, presence), 400: Alice = connect_fresh(Config, alice, sm_presence, manual), 401: get_ack(Alice), 402: [escalus:send(Bob, escalus_stanza:chat_to(Alice, 403: <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) 404: || N <- lists:seq(1,?SMALL_SM_BUFFER)], 405: escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests 406: escalus:assert(is_stream_error, [<<"resource-constraint">>, 407: <<"too many unacked stanzas">>], 408: %% wait for deferred buffer check 409: escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). 410: 411: server_requests_ack(Config) -> 412: server_requests_ack(Config, 1). 413: 414: server_requests_ack_freq_2(Config) -> 415: server_requests_ack(Config, 2). 416: 417: server_requests_ack(Config, N) -> 418: Bob = connect_fresh(Config, bob, presence), 419: Alice = connect_fresh(Config, alice, sm_presence, manual), 420: %% ack request after initial presence 421: maybe_assert_ack_request(1, N, Alice), 422: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 423: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 424: escalus:wait_for_stanza(Alice)), 425: maybe_assert_ack_request(2, N, Alice). 426: 427: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> 428: ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), 429: case StanzasRec rem AckRequests of 430: 0 -> 431: escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); 432: _ -> 433: ok 434: end, 435: StanzasRec. 436: 437: server_requests_ack_after_session(Config) -> 438: Alice = connect_fresh(Config, alice, sm_before_session, manual), 439: escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). 440: 441: resend_more_offline_messages_than_buffer_size(Config) -> 442: %% connect bob and alice 443: Bob = connect_fresh(Config, bob, presence), 444: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 445: 446: % sent some messages - more than unacked buffer size 447: MessagesToSend = ?SMALL_SM_BUFFER + 1, 448: JID = common_helper:get_bjid(AliceSpec), 449: [escalus_connection:send(Bob, escalus_stanza:chat_to(JID, integer_to_binary(I))) 450: || I <- lists:seq(1, MessagesToSend)], 451: 452: % connect alice who wants to receive all messages from offline storage 453: Alice = connect_spec(AliceSpec, sm_after_session, manual), 454: mongoose_helper:wait_for_n_offline_messages(Alice, MessagesToSend), 455: send_initial_presence(Alice), 456: 457: escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests 458: 459: escalus_connection:get_stanza(Alice, presence), 460: get_ack(Alice), % ack request 461: 462: % confirm messages + presence 463: escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), 464: % wait for check constraint message on server side 465: 466: ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), 467: false = escalus_client:has_stanzas(Alice), 468: % should not receive anything especially any stream errors 469: 470: escalus_connection:stop(Alice), 471: escalus_connection:stop(Bob). 472: 473: resend_unacked_on_reconnection(Config) -> 474: Texts = three_texts(), 475: Bob = connect_fresh(Config, bob, presence), 476: Alice = connect_fresh(Config, alice, sm_presence), 477: AliceSpec = sm_helper:client_to_spec0(Alice), 478: %% Bob sends some messages to Alice. 479: sm_helper:send_messages(Bob, Alice, Texts), 480: %% Alice receives the messages. 481: sm_helper:wait_for_messages(Alice, Texts), 482: %% Alice disconnects without acking the messages. 483: sm_helper:stop_client_and_wait_for_termination(Alice), 484: %% Messages go to the offline store. 485: %% Alice receives the messages from the offline store. 486: NewAlice = connect_spec(AliceSpec, session, manual), 487: send_initial_presence(NewAlice), 488: sm_helper:wait_for_messages(NewAlice, Texts), 489: %% Alice acks the delayed messages so they won't go again 490: %% to the offline store. 491: escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). 492: 493: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order 494: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM 495: %% and more strict tests, reproducing delays in SM and in mod_offline. 496: preserve_order(Config) -> 497: %% connect bob and alice 498: Bob = connect_fresh(Config, bob, presence), 499: Alice = connect_fresh(Config, alice, sr_presence, manual), 500: AliceSpec = sm_helper:client_to_spec(Alice), 501: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), 502: 503: %% kill alice connection 504: escalus_connection:kill(Alice), 505: C2SPid = mongoose_helper:get_session_pid(Alice), 506: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 507: 508: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), 509: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), 510: 511: NewAlice = connect_spec(AliceSpec, session, manual), 512: escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), 513: 514: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), 515: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), 516: 517: %% Without this check we will get stuff out of order 518: mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), 519: 520: send_initial_presence(NewAlice), 521: %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered 522: mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), 523: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), 524: 525: %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) 526: receive_all_ordered(NewAlice, 6), 527: 528: % replace connection 529: NewAlice2 = connect_spec(AliceSpec, session, manual), 530: % allow messages to go to the offline storage 531: mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), 532: 533: send_initial_presence(NewAlice2), 534: 535: % receves messages in correct order 536: receive_all_ordered(NewAlice2, 6), 537: 538: escalus_connection:stop(Bob), 539: escalus_connection:stop(NewAlice2). 540: 541: receive_all_ordered(Conn, Last) -> 542: receive_all_ordered(Conn, 1, Last, []). 543: 544: %% Receive messages from N to Last. 545: %% Ignores acks and presences. 546: %% Handles case when out of order and when not enough stanzas. 547: receive_all_ordered(_Conn, N, Last, Acc) when N > Last -> 548: Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)), 549: sm_helper:assert_messages(Acc, Texts); 550: receive_all_ordered(Conn, N, Last, Acc) -> 551: Stanzas = escalus:wait_for_stanzas(Conn, 1), 552: case Stanzas of 553: [#xmlel{name = <<"message">>}] -> 554: receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas); 555: [_] -> %% Ack or presence 556: receive_all_ordered(Conn, N, Last, Acc); 557: [] -> 558: ct:fail({timeout_waiting, N, Acc}) 559: end. 560: 561: resend_unacked_after_resume_timeout(Config) -> 562: %% connect bob and alice 563: Bob = connect_fresh(Config, bob, presence), 564: Alice = connect_fresh(Config, alice, sr_presence), 565: AliceSpec = sm_helper:client_to_spec(Alice), 566: 567: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 568: %% kill alice connection 569: escalus_connection:kill(Alice), 570: 571: %% ensure there is no session 572: C2SPid = mongoose_helper:get_session_pid(Alice), 573: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 574: 575: %% alice come back and receives unacked message 576: NewAlice = connect_spec(AliceSpec, session), 577: send_initial_presence(NewAlice), 578: 579: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], 580: escalus:wait_for_stanzas(NewAlice, 2)), 581: 582: escalus_connection:stop(Bob), 583: escalus_connection:stop(NewAlice). 584: 585: resume_expired_session_returns_correct_h(Config) -> 586: %% connect bob and alice 587: Bob = connect_fresh(Config, bob, sr_presence), 588: Alice = connect_fresh(Config, alice, sr_presence, manual), 589: get_ack(Alice), 590: 591: %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge 592: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 593: escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), 594: %% alice comes back, but too late, so resumption doesn't work, 595: %% but she receives the previous h = 1 anyway 596: %% NewAlice is also manual ack 597: NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), 598: FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), 599: <<"1">> = exml_query:attr(FailedResumption, <<"h">>), 600: %% And we can continue with bind and session 601: escalus_session:session(escalus_session:bind(NewAlice)), 602: send_initial_presence(NewAlice), 603: Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), 604: escalus_connection:get_stanza(NewAlice, {msg, 2})], 605: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), 606: escalus_connection:stop(Bob), 607: escalus_connection:stop(NewAlice). 608: 609: gc_repeat_after_never_means_no_cleaning(Config) -> 610: [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), 611: {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), 612: {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), 613: {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). 614: 615: gc_repeat_after_timeout_does_clean(Config) -> 616: [{SMID1, _} | _ ] = ?config(smid_test, Config), 617: mongoose_helper:wait_until(fun() -> 618: rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) 619: end, 620: {error, smid_not_found}, 621: #{name => smid_garbage_collected}). 622: 623: resume_session_state_send_message_without_ack(Config) -> 624: resume_session_state_send_message_generic(Config, no_ack). 625: 626: resume_session_state_send_message_with_ack(Config) -> 627: resume_session_state_send_message_generic(Config, ack). 628: 629: resume_session_state_send_message_generic(Config, AckInitialPresence) -> 630: %% connect bob and alice 631: Bob = connect_fresh(Config, bob, presence), 632: Alice = connect_fresh(Config, alice, sr_presence, manual), 633: maybe_ack_initial_presence(Alice, AckInitialPresence), 634: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 635: %% kill alice connection 636: C2SPid = mongoose_helper:get_session_pid(Alice), 637: escalus_connection:kill(Alice), 638: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 639: sm_helper:assert_alive_resources(Alice, 1), 640: 641: %% send some messages and check if c2s can handle it 642: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 643: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 644: %% suspend the process to ensure that Alice has enough time to reconnect, 645: %% before resumption timeout occurs. 646: ok = rpc(mim(), sys, suspend, [C2SPid]), 647: 648: %% alice comes back and receives unacked message 649: NewAlice = connect_same(Alice, presence), 650: %% now we can resume c2s process of the old connection 651: %% and let it process session resumption timeout 652: ok = rpc(mim(), sys, resume, [C2SPid]), 653: Stanzas = escalus:wait_for_stanzas(NewAlice, 3), 654: 655: % what about order ? 656: % alice receive presence from herself and 3 unacked messages from bob 657: escalus_new_assert:mix_match([is_chat(<<"msg-1">>), 658: is_chat(<<"msg-2">>), 659: is_chat(<<"msg-3">>)], 660: Stanzas), 661: escalus_connection:stop(Bob), 662: escalus_connection:stop(NewAlice). 663: 664: %%for instance it can be done by mod ping 665: resume_session_state_stop_c2s(Config) -> 666: Bob = connect_fresh(Config, bob, presence), 667: Alice = connect_fresh(Config, alice, sr_presence, manual), 668: 669: get_ack(Alice), 670: ack_initial_presence(Alice), 671: 672: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 673: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), 674: 675: %% get pid of c2s 676: C2SPid = mongoose_helper:get_session_pid(Alice), 677: %% Wait c2s process to process our presence ack. 678: %% Otherwise, we can receive two initial presences sometimes. 679: sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), 680: 681: % kill alice connection 682: escalus_connection:kill(Alice), 683: % session should be alive 684: sm_helper:assert_alive_resources(Alice, 1), 685: rpc(mim(), ejabberd_c2s, stop, [C2SPid]), 686: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 687: %% suspend the process to ensure that Alice has enough time to reconnect, 688: %% before resumption timeout occurs. 689: ok = rpc(mim(), sys, suspend, [C2SPid]), 690: 691: %% alice comes back and receives unacked message 692: NewAlice = connect_same(Alice, presence, manual), 693: %% now we can resume c2s process of the old connection 694: %% and let it process session resumption timeout 695: ok = rpc(mim(), sys, resume, [C2SPid]), 696: 697: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), 698: escalus_connection:stop(Bob), 699: escalus_connection:stop(NewAlice). 700: 701: %% This test only verifies the validity of helpers (get_session_pid, 702: %% get_c2s_state_name) written for wait_for_resumption 703: %% testcase. 704: session_established(Config) -> 705: Alice = connect_fresh(Config, alice, presence), 706: C2SPid = mongoose_helper:get_session_pid(Alice), 707: session_established = mongoose_helper:get_c2s_state_name(C2SPid), 708: escalus_connection:stop(Alice). 709: 710: %% Ensure that after a violent disconnection, 711: %% the c2s waits for resumption (but don't resume yet). 712: wait_for_resumption(Config) -> 713: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 714: Bob = connect_fresh(Config, bob, session), 715: Texts = three_texts(), 716: {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 717: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session). 718: 719: unacknowledged_message_hook_resume(Config) -> 720: unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). 721: 722: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> 723: NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 724: send_initial_presence(NewAlice), 725: {Resource, NewAlice}. 726: 727: unacknowledged_message_hook_bounce(Config) -> 728: unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). 729: 730: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> 731: NewResource = <<"new_", Resource/binary>>, 732: NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), 733: NewAlice = connect_spec(NewSpec, sr_session, manual), 734: send_initial_presence(NewAlice), 735: %% ensure second C2S is registered so all the messages are bounced properly 736: sm_helper:wait_for_resource_count(NewAlice, 2), 737: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 738: {NewResource, NewAlice}. 739: 740: unacknowledged_message_hook_offline(Config) -> 741: unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). 742: 743: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> 744: C2SRef = erlang:monitor(process, C2SPid), 745: %%reset the session, so old C2S process is stopped 746: NewAlice = connect_spec(AliceSpec, sr_session, manual), 747: %% wait for old C2S termination before send presence. other way 748: %% some of the latest unacknowledged messages can be bounced to 749: %% the new C2S process instead of going to the mod_offline storage. 750: %% looks like all the unacknowledged messages arrive to the new 751: %% C2S, but the message sequence is broken (the bounced messages 752: %% delivered before the messages from the mod_offline storage) 753: sm_helper:wait_for_process_termination(C2SRef), 754: send_initial_presence(NewAlice), 755: {Resource, NewAlice}. 756: 757: unacknowledged_message_hook_common(RestartConnectionFN, Config) -> 758: %% connect bob and alice 759: Bob = connect_fresh(Config, bob, presence), 760: 761: AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), 762: Resource = proplists:get_value(username, AliceSpec0), 763: AliceSpec = [{resource, Resource} | AliceSpec0], 764: HookHandlerExtra = start_hook_listener(Resource), 765: Alice = connect_spec(AliceSpec, sr_presence, manual), 766: %% Ack the presence stanza 767: get_ack(Alice), 768: ack_initial_presence(Alice), 769: 770: SMID = sm_helper:client_to_smid(Alice), 771: 772: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 773: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 774: %% kill alice connection 775: C2SPid = mongoose_helper:get_session_pid(Alice), 776: escalus_connection:kill(Alice), 777: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 778: sm_helper:assert_alive_resources(Alice, 1), 779: 780: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), 781: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), 782: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 783: 784: %% send some messages and check if c2s can handle it 785: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 786: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), 787: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), 788: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), 789: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 790: 791: %% alice comes back and receives unacked message 792: {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), 793: 794: mongoose_helper:wait_until( 795: fun() -> 796: Stanza = escalus_connection:get_stanza(NewAlice, msg), 797: escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), 798: ok 799: end, ok), 800: 801: NewC2SPid = mongoose_helper:get_session_pid(NewAlice), 802: escalus_connection:kill(NewAlice), 803: mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session), 804: 805: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 806: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 807: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 808: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 809: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 810: stop_hook_listener(HookHandlerExtra), 811: escalus_connection:stop(Bob). 812: 813: resume_session(Config) -> 814: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 815: Texts = three_texts(), 816: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 817: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 818: %% Resume the session. 819: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 820: %% Alice receives the unacked messages from the previous 821: %% interrupted session. 822: sm_helper:wait_for_messages(Alice, Texts), 823: %% Alice acks the received messages. 824: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 825: escalus_connection:stop(Alice) 826: end). 827: 828: resume_session_with_wrong_h_does_not_leak_sessions(Config) -> 829: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 830: Messages = three_texts(), 831: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 832: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), 833: %% Resume the session. 834: Alice = connect_spec(AliceSpec, auth, manual), 835: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), 836: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), 837: 838: [] = sm_helper:get_user_present_resources(Alice), 839: HostType = host_type(), 840: {error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID), 841: escalus_connection:wait_for_close(Alice, timer:seconds(5)) 842: end). 843: 844: resume_session_with_wrong_sid_returns_item_not_found(Config) -> 845: session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). 846: 847: resume_session_with_wrong_namespace_is_a_noop(Config) -> 848: Alice = connect_fresh(Config, alice, auth), 849: #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), 850: Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), 851: escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), 852: escalus_assert:has_no_stanzas(Alice), 853: [] = sm_helper:get_user_present_resources(Alice), 854: true = escalus_connection:is_connected(Alice), 855: escalus_connection:stop(Alice). 856: 857: resume_dead_session_results_in_item_not_found(Config) -> 858: SMID = base64:encode(crypto:strong_rand_bytes(21)), 859: SID = {os:timestamp(), undefined}, 860: HostType = host_type(), 861: rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), 862: session_resumption_expects_item_not_found(Config, SMID). 863: 864: session_resumption_expects_item_not_found(Config, SMID) -> 865: Alice = connect_fresh(Config, alice, auth), 866: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), 867: escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), 868: [] = sm_helper:get_user_present_resources(Alice), 869: true = escalus_connection:is_connected(Alice), 870: escalus_connection:stop(Alice). 871: 872: resume_session_kills_old_C2S_gracefully(Config) -> 873: Alice = connect_fresh(Config, alice, sr_presence, manual), 874: C2SPid = mongoose_helper:get_session_pid(Alice), 875: 876: %% Monitor the C2S process and disconnect Alice. 877: MonitorRef = sm_helper:monitor_session(Alice), 878: escalus_client:kill_connection(Config, Alice), 879: 880: %% Ensure the c2s process is waiting for resumption. 881: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 882: 883: %% Resume the session. 884: NewAlice = connect_resume(Alice, 1), 885: 886: %% C2S process should die gracefully with Reason=normal. 887: sm_helper:wait_for_process_termination(MonitorRef), 888: escalus_connection:stop(NewAlice). 889: 890: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> 891: Alice = connect_spec(AliceSpec, sr_presence, manual), 892: %% Bobs sends some messages to Alice. 893: sm_helper:send_messages(Bob, Alice, Texts), 894: %% Alice receives them, but doesn't ack. 895: sm_helper:wait_for_messages(Alice, Texts), 896: %% Alice's connection is violently terminated. 897: escalus_client:kill_connection(Config, Alice), 898: C2SPid = mongoose_helper:get_session_pid(Alice), 899: SMID = sm_helper:client_to_smid(Alice), 900: {C2SPid, SMID}. 901: 902: aggressively_pipelined_resume(Config) -> 903: AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} 904: | escalus_fresh:create_fresh_user(Config, alice)], 905: UnackedMessages = three_texts(), 906: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 907: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), 908: %% Resume the session. 909: Alice = escalus_connection:connect(AliceSpec), 910: 911: Username = proplists:get_value(username, AliceSpec), 912: Password = proplists:get_value(password, AliceSpec), 913: Payload = <<0:8,Username/binary,0:8,Password/binary>>, 914: Server = proplists:get_value(server, AliceSpec), 915: 916: Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 917: Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), 918: AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 919: Resume = escalus_stanza:resume(SMID, 2), 920: 921: escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), 922: Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], 923: escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), 924: 925: escalus_connection:stop(Alice) 926: end). 927: 928: %% This is a regression test for a case when a session processes a request, which will 929: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. 930: %% Without proper handling, the reply would be rejected because the resumed session 931: %% has new SID. 932: replies_are_processed_by_resumed_session(Config) -> 933: %% GIVEN a session and registered special IQ handler (added in init_per_testcase), 934: %% that waits for old session process to terminate (at this point new process 935: %% has fully taken over) and then actually sends the reply. 936: Alice = connect_fresh(Config, alice, sr_presence), 937: 938: %% WHEN a client sends IQ request to the special handler... 939: IQReq = escalus_stanza:iq_get(regression_ns(), []), 940: escalus:send(Alice, IQReq), 941: 942: %% ... goes down and session is resumed. 943: Alice2 = sm_helper:kill_and_connect_resume(Alice), 944: 945: %% THEN the client receives the reply properly. 946: IQReply = escalus:wait_for_stanza(Alice2), 947: escalus:assert(is_iq_result, [IQReq], IQReply), 948: escalus_connection:stop(Alice2). 949: 950: %% This is a regression test for a bug, which manifested in following scenario 951: %% (due to improper presence sub requests buffering): 952: %% 1. A is online, B is offline 953: %% 2. A subscribes to B's presence; 954: %% 3. B becomes online 955: %% 4. A sends a message to B 956: %% 5. B doesn't SM-ack the request or message, terminates the connection 957: %% 6. B reconnects but with session *replace*, not resume 958: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer 959: %% 8. B doesn't receive the buffered message 960: subscription_requests_are_buffered_properly(Config) -> 961: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 962: MsgBody = <<"buffered">>, 963: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 964: % GIVEN Bob's pending subscription to Alice's presence 965: AliceJid = common_helper:get_bjid(AliceSpec), 966: escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), 967: _RosterPushReq = escalus:wait_for_stanza(Bob), 968: 969: % WHEN Alice becomes online... 970: Alice = connect_spec(AliceSpec, sr_session, manual), 971: send_initial_presence(Alice), 972: %% subscribe could come before the initial presence 973: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 974: escalus:wait_for_stanzas(Alice, 2)), 975: 976: % ...and Bob sends a message to Alice... 977: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 978: MsgStanza = escalus:wait_for_stanza(Alice), 979: escalus:assert(is_chat_message, [MsgBody], MsgStanza), 980: 981: % ...and Alice terminates connection without acking anything... 982: escalus_client:kill_connection(Config, Alice), 983: 984: % ...and reconnects with session replacement. 985: Alice2 = connect_spec(AliceSpec, session, manual), 986: 987: % THEN Alice receives (without sending initial presence): 988: % * buffered Bob's message (like above) 989: % Alice DOESN'T receive: 990: % * buffered subscription request because it is dropped by ejabberd_sm 991: % because it's treated like repeated sub request to bare JID, so it's not 992: % processed by any sub req handler (like mod_roster) 993: % * buffered available presence from Alice - because it is addressed to another SID 994: % and Alice2 is a brand new session 995: escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), 996: sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), 997: escalus_assert:has_no_stanzas(Alice2), 998: 999: %% Only once an initial presence is sent, a subscription request is sent 1000: send_initial_presence(Alice2), 1001: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1002: escalus:wait_for_stanzas(Alice2, 2)), 1003: 1004: sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), 1005: escalus_assert:has_no_stanzas(Alice2), 1006: 1007: escalus_connection:stop(Alice2) 1008: end). 1009: 1010: %% This is a regression test for a bug, due to which messages sent to old session 1011: %% in a middle of state handover were not appended properly to SM buffer. 1012: %% Scenario to reproduce: 1013: %% 1. Online Bob and Alice 1014: %% 2. Alice kills the connection 1015: %% 3. Alice's session is suspended 1016: %% 4. Alice resumes session with new connection. At this moment new session is still not 1017: %% present in session table. `resume` request is stuck in old proc mailbox. 1018: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now 1019: %% old session has two messages in mailbox: `resume` and XML from Bob 1020: %% 6. We resume old process and it begins session handover 1021: %% 7. Bob's message is appended to SM buffer in "flush" step 1022: %% 8. With bug fixed, the message is retransmitted properly 1023: messages_are_properly_flushed_during_resumption(Config) -> 1024: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1025: % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended 1026: Alice = connect_fresh(Config, alice, sr_presence), 1027: SMH = escalus_connection:get_sm_h(Alice), 1028: escalus_client:kill_connection(Config, Alice), 1029: %% The receiver process would stop now 1030: C2SPid = mongoose_helper:get_session_pid(Alice), 1031: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1032: 1033: sm_helper:wait_for_queue_length(C2SPid, 0), 1034: ok = rpc(mim(), sys, suspend, [C2SPid]), 1035: 1036: % WHEN new session requests resumption 1037: % we wait until that old session has resumption request enqueued; 1038: % we need it to ensure the order of messages: resume first, Bob's chat second. 1039: % Actual wait and message sent by Bob is done in separate process 1040: % because new client start will block until old process is resumed 1041: 1042: MsgBody = <<"flush-regression">>, 1043: spawn_link(fun() -> 1044: sm_helper:wait_for_queue_length(C2SPid, 1), 1045: 1046: % Bob sends a message... 1047: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1048: 1049: % ...we ensure that a message is enqueued in Alice's session... 1050: % (2 messages = resume request + Bob's message) 1051: sm_helper:wait_for_queue_length(C2SPid, 2), 1052: 1053: % ...and old process is resumed. 1054: ok = rpc(mim(), sys, resume, [C2SPid]) 1055: end), 1056: Alice2 = connect_resume(Alice, SMH), 1057: % THEN Alice's new session receives Bob's message 1058: RecvMsg = escalus:wait_for_stanza(Alice2), 1059: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1060: end). 1061: 1062: messages_are_properly_flushed_during_resumption_p1_fsm_old(Config) -> 1063: %% the same as messages_are_properly_flushed_during_resumption, 1064: %% but tests that buffered by p1_fsm_old messages are delivered 1065: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1066: Alice = connect_fresh(Config, alice, sr_presence), 1067: SMH = escalus_connection:get_sm_h(Alice), 1068: escalus_client:kill_connection(Config, Alice), 1069: C2SPid = mongoose_helper:get_session_pid(Alice), 1070: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1071: ok = rpc(mim(), sys, suspend, [C2SPid]), 1072: 1073: %% send some dummy event. ignored by c2s but ensures that 1074: %% p1_old_fsm buffers the messages, sent after this one 1075: rpc(mim(), p1_fsm_old, send_all_state_event, [C2SPid, dummy_event]), 1076: 1077: MsgBody = <<"flush-regression">>, 1078: spawn_link(fun() -> 1079: sm_helper:wait_for_queue_length(C2SPid, 2), 1080: 1081: % Bob sends a message... 1082: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1083: 1084: % ...we ensure that a message is enqueued in Alice's session... 1085: % (2 messages = resume request + Bob's message) 1086: sm_helper:wait_for_queue_length(C2SPid, 3), 1087: 1088: % ...and old process is resumed. 1089: ok = rpc(mim(), sys, resume, [C2SPid]) 1090: end), 1091: Alice2 = connect_resume(Alice, SMH), 1092: % THEN Alice's new session receives Bob's message 1093: RecvMsg = escalus:wait_for_stanza(Alice2), 1094: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1095: end). 1096: 1097: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> 1098: Alice = connect_fresh(Config, alice, session, manual), 1099: %% Should not crash anything! 1100: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 1101: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1102: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1103: escalus_connection:stop(Alice). 1104: 1105: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> 1106: Alice = connect_fresh(Config, alice, session, manual), 1107: %% Should not crash anything! 1108: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 1109: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1110: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1111: escalus_connection:stop(Alice). 1112: 1113: %%-------------------------------------------------------------------- 1114: %% Helpers 1115: %%-------------------------------------------------------------------- 1116: start_hook_listener(Resource) -> 1117: TestCasePid = self(), 1118: rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). 1119: 1120: stop_hook_listener(HookExtra) -> 1121: rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]). 1122: 1123: rpc_start_hook_handler(TestCasePid, User, HostType) -> 1124: LUser = jid:nodeprep(User), 1125: Extra = #{luser => LUser, pid => TestCasePid}, 1126: gen_hook:add_handler(unacknowledged_message, HostType, 1127: fun ?MODULE:hook_handler_fn/3, 1128: Extra, 50), 1129: Extra. 1130: 1131: rpc_stop_hook_handler(HookExtra, HostType) -> 1132: gen_hook:delete_handler(unacknowledged_message, HostType, 1133: fun ?MODULE:hook_handler_fn/3, 1134: HookExtra, 50). 1135: 1136: hook_handler_fn(Acc, 1137: #{args := [Jid]} = _Params, 1138: #{luser := LUser, pid := TestCasePid} = _Extra) -> 1139: {U, _S, R} = jid:to_lower(Jid), 1140: case U of 1141: LUser -> 1142: Counter = mongoose_acc:get(sm_test, counter, 0, Acc), 1143: El = mongoose_acc:element(Acc), 1144: TestCasePid ! {sm_test, Counter, R, El}, 1145: {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)}; 1146: _ -> {ok, Acc} 1147: end. 1148: 1149: wait_for_unacked_msg_hook(Counter, Res, Timeout) -> 1150: receive 1151: {sm_test, AccCounter, Resource, Stanza} = Msg -> 1152: ?assertEqual(Counter, AccCounter, Msg), 1153: ?assertEqual(Res, Resource, Msg), 1154: Stanza 1155: after Timeout -> 1156: timeout 1157: end. 1158: 1159: is_chat(Content) -> 1160: fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end. 1161: 1162: is_presence(Type) -> 1163: fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end. 1164: 1165: three_texts() -> 1166: [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>]. 1167: 1168: %%-------------------------------------------------------------------- 1169: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" 1170: %%-------------------------------------------------------------------- 1171: 1172: regression_ns() -> 1173: <<"regression">>. 1174: 1175: register_handler() -> 1176: HostType = host_type(), 1177: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 1178: [HostType, regression_ns(), ejabberd_sm, 1179: fun ?MODULE:regression_handler/5, #{}, one_queue]). 1180: 1181: unregister_handler() -> 1182: HostType = host_type(), 1183: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 1184: [HostType, regression_ns(), ejabberd_sm]). 1185: 1186: regression_handler(Acc, _From, _To, IQ, _Extra) -> 1187: %% A bit of a hack - will no longer work when the SID format changes 1188: {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), 1189: erlang:monitor(process, Pid), 1190: receive 1191: {'DOWN', _, _, _, _} -> 1192: ok 1193: after 1194: 10000 -> 1195: error({c2s_not_stopped_after_timeout, Pid}) 1196: end, 1197: %% We avoid another race condition - there is a short window where user session 1198: %% is not registered in ejabberd_sm: between old process termination and the moment 1199: %% when the new process stores new session in memory. It should be fixed separately. 1200: wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100), 1201: {Acc, jlib:make_result_iq_reply(IQ)}. 1202: 1203: wait_for_session(JID, Retries, SleepTime) -> 1204: case ejabberd_sm:get_session(JID) of 1205: offline -> 1206: timer:sleep(SleepTime), 1207: wait_for_session(JID, Retries - 1, SleepTime); 1208: _ -> 1209: ok 1210: end. 1211: 1212: maybe_ack_initial_presence(Alice, ack) -> 1213: ack_initial_presence(Alice); 1214: maybe_ack_initial_presence(_Alice, no_ack) -> 1215: ok.