1: %% Stream Management tests 2: -module(sm_SUITE). 3: 4: -compile(export_all). 5: -include_lib("exml/include/exml.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: -include_lib("escalus/include/escalus.hrl"). 8: -include_lib("common_test/include/ct.hrl"). 9: 10: %% Injected code callbacks 11: -export([rpc_start_hook_handler/3, 12: rpc_stop_hook_handler/2, 13: hook_handler_fn/3, 14: regression_handler/5]). 15: 16: -import(distributed_helper, [mim/0, 17: require_rpc_nodes/1, 18: rpc/4]). 19: 20: -import(domain_helper, [host_type/0]). 21: 22: -import(sm_helper, [connect_fresh/3, 23: connect_fresh/4, 24: connect_spec/2, 25: connect_spec/3, 26: connect_same/2, 27: connect_same/3, 28: connect_resume/2, 29: process_initial_stanza/1, 30: send_initial_presence/1, 31: get_ack/1, 32: ack_initial_presence/1]). 33: 34: -define(MOD_SM, mod_stream_management). 35: -define(CONSTRAINT_CHECK_TIMEOUT, 5000). 36: -define(LONG_TIMEOUT, 3600). 37: -define(SHORT_TIMEOUT, 1). 38: -define(SMALL_SM_BUFFER, 3). 39: 40: %%-------------------------------------------------------------------- 41: %% Suite configuration 42: %%-------------------------------------------------------------------- 43: 44: suite() -> 45: require_rpc_nodes([mim]) ++ escalus:suite(). 46: 47: all() -> 48: ct_helper:groups_to_all(groups()). 49: 50: groups() -> 51: P = [parallel], 52: [{parallel, P, parallel_cases()}, 53: {parallel_manual_ack_freq_1, P, parallel_manual_ack_freq_1_cases()}, 54: {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, 55: {stale_h, [], stale_h_cases()}, 56: {parallel_unacknowledged_message_hook, P, parallel_unacknowledged_message_hook_cases()}]. 57: 58: parallel_cases() -> 59: [server_announces_sm, 60: server_enables_sm_before_session, 61: server_enables_sm_after_session, 62: server_returns_failed_after_start, 63: server_returns_failed_after_auth, 64: server_enables_resumption, 65: client_enables_sm_twice_fails_with_correct_error_stanza, 66: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza, 67: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session, 68: basic_ack, 69: h_ok_before_session, 70: h_ok_after_session_enabled_before_session, 71: h_ok_after_session_enabled_after_session, 72: h_ok_after_a_chat, 73: h_non_given_closes_stream_gracefully, 74: resend_unacked_on_reconnection, 75: session_established, 76: wait_for_resumption, 77: resume_session, 78: resume_session_with_wrong_h_does_not_leak_sessions, 79: resume_session_with_wrong_sid_returns_item_not_found, 80: resume_session_with_wrong_namespace_is_a_noop, 81: resume_dead_session_results_in_item_not_found, 82: resume_session_kills_old_C2S_gracefully, 83: aggressively_pipelined_resume, 84: replies_are_processed_by_resumed_session, 85: subscription_requests_are_buffered_properly, 86: messages_are_properly_flushed_during_resumption, 87: messages_are_properly_flushed_during_resumption_p1_fsm_old]. 88: 89: parallel_manual_ack_freq_1_cases() -> 90: [client_acks_more_than_sent, 91: too_many_unacked_stanzas, 92: resend_unacked_after_resume_timeout, 93: resume_session_state_send_message_with_ack, 94: resume_session_state_send_message_without_ack, 95: resume_session_state_stop_c2s, 96: server_requests_ack_after_session, 97: resend_more_offline_messages_than_buffer_size, 98: server_requests_ack]. 99: 100: manual_ack_freq_2_cases() -> 101: [server_requests_ack_freq_2]. 102: 103: stale_h_cases() -> 104: [resume_expired_session_returns_correct_h, 105: gc_repeat_after_never_means_no_cleaning, 106: gc_repeat_after_timeout_does_clean]. 107: 108: stream_mgmt_disabled_cases() -> 109: [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt, 110: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption]. 111: 112: manual_ack_freq_long_session_timeout_cases() -> 113: [preserve_order]. 114: 115: parallel_unacknowledged_message_hook_cases() -> 116: [unacknowledged_message_hook_bounce, 117: unacknowledged_message_hook_offline, 118: unacknowledged_message_hook_resume]. 119: 120: %%-------------------------------------------------------------------- 121: %% Init & teardown 122: %%-------------------------------------------------------------------- 123: 124: init_per_suite(Config) -> 125: NewConfig = dynamic_modules:save_modules(host_type(), Config), 126: NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), 127: mongoose_helper:inject_module(?MODULE), 128: escalus:init_per_suite(NewConfigWithSM). 129: 130: end_per_suite(Config) -> 131: escalus_fresh:clean(), 132: dynamic_modules:restore_modules(Config), 133: escalus:end_per_suite(Config). 134: 135: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; 136: Group =:= manual_ack_freq_long_session_timeout; 137: Group =:= parallel_manual_ack_freq_1; 138: Group =:= manual_ack_freq_2 -> 139: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 140: Config; 141: init_per_group(stale_h, Config) -> 142: Config; 143: init_per_group(stream_mgmt_disabled, Config) -> 144: dynamic_modules:stop(host_type(), ?MOD_SM), 145: rpc(mim(), mnesia, delete_table, [sm_session]), 146: Config; 147: init_per_group(Group, Config) -> 148: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 149: Config. 150: 151: end_per_group(_Group, _Config) -> 152: ok. 153: 154: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> 155: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 156: escalus:init_per_testcase(CN, Config); 157: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; 158: CN =:= gc_repeat_after_timeout_does_clean -> 159: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 160: Config2 = register_some_smid_h(Config), 161: escalus:init_per_testcase(CN, Config2); 162: init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> 163: escalus:init_per_testcase(CN, Config); 164: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 165: register_handler(), 166: escalus:init_per_testcase(CN, Config); 167: init_per_testcase(CaseName, Config) -> 168: escalus:init_per_testcase(CaseName, Config). 169: 170: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; 171: CN =:= gc_repeat_after_never_means_no_cleaning; 172: CN =:= gc_repeat_after_timeout_does_clean -> 173: Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]), 174: rpc(mim(), ejabberd_sup, stop_child, [Name]), 175: escalus:end_per_testcase(CN, Config); 176: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 177: unregister_handler(), 178: escalus:end_per_testcase(CN, Config); 179: end_per_testcase(CaseName, Config) -> 180: escalus:end_per_testcase(CaseName, Config). 181: 182: %% Module configuration per group (in case of stale_h group it is per testcase) 183: 184: required_modules(Scope, Name) -> 185: SMConfig = case required_sm_opts(Scope, Name) of 186: stopped -> stopped; 187: ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts) 188: end, 189: [{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}, 190: {mod_offline, config_parser_helper:mod_config(mod_offline, #{})}]. 191: 192: required_sm_opts(group, parallel) -> 193: #{ack_freq => never}; 194: required_sm_opts(group, parallel_manual_ack_freq_1) -> 195: #{ack_freq => 1, 196: resume_timeout => ?SHORT_TIMEOUT}; 197: required_sm_opts(group, manual_ack_freq_2) -> 198: #{ack_freq => 2}; 199: required_sm_opts(group, stream_mgmt_disabled) -> 200: stopped; 201: required_sm_opts(group, parallel_unacknowledged_message_hook) -> 202: #{ack_freq => 1}; 203: required_sm_opts(group, manual_ack_freq_long_session_timeout) -> 204: #{ack_freq => 1, buffer_max => 1000}; 205: required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> 206: #{ack_freq => 1, 207: resume_timeout => ?SHORT_TIMEOUT, 208: stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)}; 209: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> 210: #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)}; 211: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> 212: #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}. 213: 214: common_sm_opts() -> 215: #{buffer_max => ?SMALL_SM_BUFFER}. 216: 217: stale_h(RepeatAfter, Geriatric) -> 218: #{enabled => true, 219: repeat_after => RepeatAfter, 220: geriatric => Geriatric}. 221: 222: make_smid() -> 223: base64:encode(crypto:strong_rand_bytes(21)). 224: 225: register_smid(IntSmidId) -> 226: S = {SMID = make_smid(), IntSmidId}, 227: ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), 228: S. 229: 230: register_some_smid_h(Config) -> 231: TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)), 232: [{smid_test, TestSmids} | Config]. 233: 234: %%-------------------------------------------------------------------- 235: %% Tests 236: %%-------------------------------------------------------------------- 237: 238: server_announces_sm(Config) -> 239: AliceSpec = escalus_fresh:freshen_spec(Config, alice), 240: {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, 241: [start_stream]), 242: true = escalus_session:can_use_stream_management(Props, Features). 243: 244: 245: server_enables_sm_before_session(Config) -> 246: connect_fresh(Config, alice, sm_after_bind). 247: 248: server_enables_sm_after_session(Config) -> 249: connect_fresh(Config, alice, sm_after_session). 250: 251: server_returns_failed_after_start(Config) -> 252: Alice = connect_fresh(Config, alice, before_auth), 253: server_returns_failed(Alice). 254: 255: server_returns_failed_after_auth(Config) -> 256: Alice = connect_fresh(Config, alice, auth), 257: server_returns_failed(Alice). 258: 259: server_enables_resumption(Config) -> 260: Alice = connect_fresh(Config, alice, sr_presence), 261: escalus_connection:stop(Alice). 262: 263: server_returns_failed(Alice) -> 264: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 265: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 266: escalus_connection:get_stanza(Alice, enable_sm_failed)). 267: 268: client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> 269: Alice = connect_fresh(Config, alice, sm_before_session), 270: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 271: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 272: escalus_connection:get_stanza(Alice, enable_sm_failed)), 273: escalus:assert(is_stream_end, 274: escalus_connection:get_stanza(Alice, enable_sm_failed)), 275: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 276: 277: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> 278: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 279: Alice = connect_fresh(Config, alice, sr_presence), 280: SMH = escalus_connection:get_sm_h(Alice), 281: %% WHEN USER RESUMES SESSION FROM NEW CLIENT 282: Alice2 = connect_resume(Alice, SMH), 283: process_initial_stanza(Alice2), 284: %% THEN: Old session is gracefully closed with the correct error stanza 285: escalus:assert(is_stream_error, [<<"conflict">>, <<>>], 286: escalus_connection:get_stanza(Alice, close_old_stream)), 287: escalus:assert(is_stream_end, 288: escalus_connection:get_stanza(Alice, close_old_stream)), 289: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), 290: true = escalus_connection:is_connected(Alice2), 291: escalus_connection:stop(Alice2). 292: 293: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> 294: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 295: Alice = connect_fresh(Config, alice, sr_presence), 296: %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT 297: Alice2 = sm_helper:kill_and_connect_resume(Alice), 298: process_initial_stanza(Alice2), 299: %% THEN new session does not have any message rerouted 300: false = escalus_client:has_stanzas(Alice2), 301: true = escalus_connection:is_connected(Alice2), 302: escalus_connection:stop(Alice2). 303: 304: basic_ack(Config) -> 305: Alice = connect_fresh(Config, alice, sm_after_session), 306: escalus_connection:send(Alice, escalus_stanza:roster_get()), 307: escalus:assert(is_roster_result, 308: escalus_connection:get_stanza(Alice, roster_result)), 309: escalus_connection:send(Alice, escalus_stanza:sm_request()), 310: escalus:assert(is_sm_ack, 311: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 312: 313: %% Test that "h" value is valid when: 314: %% - SM is enabled *before* the session is established 315: %% - <r/> is sent *before* the session is established 316: h_ok_before_session(Config) -> 317: Alice = connect_fresh(Config, alice, sm_after_bind), 318: escalus_connection:send(Alice, escalus_stanza:sm_request()), 319: escalus:assert(is_sm_ack, [0], 320: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 321: 322: %% Test that "h" value is valid when: 323: %% - SM is enabled *before* the session is established 324: %% - <r/> is sent *after* the session is established 325: h_ok_after_session_enabled_before_session(Config) -> 326: Alice = connect_fresh(Config, alice, sm_before_session), 327: escalus_connection:send(Alice, escalus_stanza:sm_request()), 328: escalus:assert(is_sm_ack, [1], 329: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 330: 331: %% Test that "h" value is valid when: 332: %% - SM is enabled *after* the session is established 333: %% - <r/> is sent *after* the session is established 334: h_ok_after_session_enabled_after_session(Config) -> 335: Alice = connect_fresh(Config, alice, sm_after_session), 336: escalus_connection:send(Alice, escalus_stanza:roster_get()), 337: escalus:assert(is_roster_result, 338: escalus_connection:get_stanza(Alice, roster_result)), 339: escalus_connection:send(Alice, escalus_stanza:sm_request()), 340: escalus:assert(is_sm_ack, [1], 341: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 342: 343: %% Test that "h" value is valid after exchanging a few messages. 344: h_ok_after_a_chat(ConfigIn) -> 345: Config = escalus_users:update_userspec(ConfigIn, alice, 346: stream_management, true), 347: escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> 348: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), 349: escalus:assert(is_chat_message, [<<"Hi, Bob!">>], 350: escalus:wait_for_stanza(Bob)), 351: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 352: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 353: escalus:wait_for_stanza(Alice)), 354: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), 355: escalus:assert(is_chat_message, [<<"How's life?">>], 356: escalus:wait_for_stanza(Alice)), 357: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), 358: escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], 359: escalus:wait_for_stanza(Bob)), 360: escalus:send(Alice, escalus_stanza:sm_request()), 361: escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), 362: %% Ack, so that unacked messages don't go into offline store. 363: escalus:send(Alice, escalus_stanza:sm_ack(3)) 364: end). 365: 366: h_non_given_closes_stream_gracefully(ConfigIn) -> 367: AStanza = #xmlel{name = <<"a">>, 368: attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, 369: Config = escalus_users:update_userspec(ConfigIn, alice, 370: stream_management, true), 371: escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> 372: C2SPid = mongoose_helper:get_session_pid(Alice), 373: escalus:send(Alice, AStanza), 374: escalus:assert(is_stream_error, 375: [<<"policy-violation">>, <<>>], 376: escalus:wait_for_stanza(Alice)), 377: mongoose_helper:wait_for_pid_to_die(C2SPid), 378: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 379: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) 380: end). 381: 382: client_acks_more_than_sent(Config) -> 383: Alice = connect_fresh(Config, alice, sm_after_session), 384: escalus:send(Alice, escalus_stanza:sm_ack(5)), 385: StreamErrorStanza = escalus:wait_for_stanza(Alice), 386: %% Assert "undefined-condition" children 387: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), 388: %% Assert "handled-count-too-high" children with correct attributes 389: HandledCountSubElement = exml_query:path(StreamErrorStanza, 390: [{element_with_ns, 391: <<"handled-count-too-high">>, 392: <<"urn:xmpp:sm:3">>}]), 393: <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), 394: <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), 395: %% Assert graceful stream end 396: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 397: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 398: 399: too_many_unacked_stanzas(Config) -> 400: Bob = connect_fresh(Config, bob, presence), 401: Alice = connect_fresh(Config, alice, sm_presence, manual), 402: get_ack(Alice), 403: [escalus:send(Bob, escalus_stanza:chat_to(Alice, 404: <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) 405: || N <- lists:seq(1,?SMALL_SM_BUFFER)], 406: escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests 407: escalus:assert(is_stream_error, [<<"resource-constraint">>, 408: <<"too many unacked stanzas">>], 409: %% wait for deferred buffer check 410: escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). 411: 412: server_requests_ack(Config) -> 413: server_requests_ack(Config, 1). 414: 415: server_requests_ack_freq_2(Config) -> 416: server_requests_ack(Config, 2). 417: 418: server_requests_ack(Config, N) -> 419: Bob = connect_fresh(Config, bob, presence), 420: Alice = connect_fresh(Config, alice, sm_presence, manual), 421: %% ack request after initial presence 422: maybe_assert_ack_request(1, N, Alice), 423: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 424: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 425: escalus:wait_for_stanza(Alice)), 426: maybe_assert_ack_request(2, N, Alice). 427: 428: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> 429: ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), 430: case StanzasRec rem AckRequests of 431: 0 -> 432: escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); 433: _ -> 434: ok 435: end, 436: StanzasRec. 437: 438: server_requests_ack_after_session(Config) -> 439: Alice = connect_fresh(Config, alice, sm_before_session, manual), 440: escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). 441: 442: resend_more_offline_messages_than_buffer_size(Config) -> 443: %% connect bob and alice 444: Bob = connect_fresh(Config, bob, presence), 445: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 446: 447: % sent some messages - more than unacked buffer size 448: MessagesToSend = ?SMALL_SM_BUFFER + 1, 449: JID = common_helper:get_bjid(AliceSpec), 450: [escalus_connection:send(Bob, escalus_stanza:chat_to(JID, integer_to_binary(I))) 451: || I <- lists:seq(1, MessagesToSend)], 452: 453: % connect alice who wants to receive all messages from offline storage 454: Alice = connect_spec(AliceSpec, sm_after_session, manual), 455: mongoose_helper:wait_for_n_offline_messages(Alice, MessagesToSend), 456: send_initial_presence(Alice), 457: 458: escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests 459: 460: escalus_connection:get_stanza(Alice, presence), 461: get_ack(Alice), % ack request 462: 463: % confirm messages + presence 464: escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), 465: % wait for check constraint message on server side 466: 467: ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), 468: false = escalus_client:has_stanzas(Alice), 469: % should not receive anything especially any stream errors 470: 471: escalus_connection:stop(Alice), 472: escalus_connection:stop(Bob). 473: 474: resend_unacked_on_reconnection(Config) -> 475: Texts = three_texts(), 476: Bob = connect_fresh(Config, bob, presence), 477: Alice = connect_fresh(Config, alice, sm_presence), 478: AliceSpec = sm_helper:client_to_spec0(Alice), 479: %% Bob sends some messages to Alice. 480: sm_helper:send_messages(Bob, Alice, Texts), 481: %% Alice receives the messages. 482: sm_helper:wait_for_messages(Alice, Texts), 483: %% Alice disconnects without acking the messages. 484: sm_helper:stop_client_and_wait_for_termination(Alice), 485: %% Messages go to the offline store. 486: %% Alice receives the messages from the offline store. 487: NewAlice = connect_spec(AliceSpec, session, manual), 488: send_initial_presence(NewAlice), 489: sm_helper:wait_for_messages(NewAlice, Texts), 490: %% Alice acks the delayed messages so they won't go again 491: %% to the offline store. 492: escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). 493: 494: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order 495: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM 496: %% and more strict tests, reproducing delays in SM and in mod_offline. 497: preserve_order(Config) -> 498: %% connect bob and alice 499: Bob = connect_fresh(Config, bob, presence), 500: Alice = connect_fresh(Config, alice, sr_presence, manual), 501: AliceSpec = sm_helper:client_to_spec(Alice), 502: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), 503: 504: %% kill alice connection 505: escalus_connection:kill(Alice), 506: C2SPid = mongoose_helper:get_session_pid(Alice), 507: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 508: 509: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), 510: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), 511: 512: NewAlice = connect_spec(AliceSpec, session, manual), 513: escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), 514: 515: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), 516: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), 517: 518: %% Without this check we will get stuff out of order 519: mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), 520: 521: send_initial_presence(NewAlice), 522: %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered 523: mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), 524: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), 525: 526: %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) 527: receive_all_ordered(NewAlice, 6), 528: 529: % replace connection 530: NewAlice2 = connect_spec(AliceSpec, session, manual), 531: % allow messages to go to the offline storage 532: mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), 533: 534: send_initial_presence(NewAlice2), 535: 536: % receves messages in correct order 537: receive_all_ordered(NewAlice2, 6), 538: 539: escalus_connection:stop(Bob), 540: escalus_connection:stop(NewAlice2). 541: 542: receive_all_ordered(Conn, Last) -> 543: receive_all_ordered(Conn, 1, Last, []). 544: 545: %% Receive messages from N to Last. 546: %% Ignores acks and presences. 547: %% Handles case when out of order and when not enough stanzas. 548: receive_all_ordered(_Conn, N, Last, Acc) when N > Last -> 549: Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)), 550: sm_helper:assert_messages(Acc, Texts); 551: receive_all_ordered(Conn, N, Last, Acc) -> 552: Stanzas = escalus:wait_for_stanzas(Conn, 1), 553: case Stanzas of 554: [#xmlel{name = <<"message">>}] -> 555: receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas); 556: [_] -> %% Ack or presence 557: receive_all_ordered(Conn, N, Last, Acc); 558: [] -> 559: ct:fail({timeout_waiting, N, Acc}) 560: end. 561: 562: resend_unacked_after_resume_timeout(Config) -> 563: %% connect bob and alice 564: Bob = connect_fresh(Config, bob, presence), 565: Alice = connect_fresh(Config, alice, sr_presence), 566: AliceSpec = sm_helper:client_to_spec(Alice), 567: 568: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 569: %% kill alice connection 570: escalus_connection:kill(Alice), 571: 572: %% ensure there is no session 573: C2SPid = mongoose_helper:get_session_pid(Alice), 574: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 575: 576: %% alice come back and receives unacked message 577: NewAlice = connect_spec(AliceSpec, session), 578: send_initial_presence(NewAlice), 579: 580: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], 581: escalus:wait_for_stanzas(NewAlice, 2)), 582: 583: escalus_connection:stop(Bob), 584: escalus_connection:stop(NewAlice). 585: 586: resume_expired_session_returns_correct_h(Config) -> 587: %% connect bob and alice 588: Bob = connect_fresh(Config, bob, sr_presence), 589: Alice = connect_fresh(Config, alice, sr_presence, manual), 590: get_ack(Alice), 591: 592: %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge 593: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 594: escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), 595: %% alice comes back, but too late, so resumption doesn't work, 596: %% but she receives the previous h = 1 anyway 597: %% NewAlice is also manual ack 598: NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), 599: FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), 600: <<"1">> = exml_query:attr(FailedResumption, <<"h">>), 601: %% And we can continue with bind and session 602: escalus_session:session(escalus_session:bind(NewAlice)), 603: send_initial_presence(NewAlice), 604: Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), 605: escalus_connection:get_stanza(NewAlice, {msg, 2})], 606: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), 607: escalus_connection:stop(Bob), 608: escalus_connection:stop(NewAlice). 609: 610: gc_repeat_after_never_means_no_cleaning(Config) -> 611: [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), 612: {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), 613: {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), 614: {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). 615: 616: gc_repeat_after_timeout_does_clean(Config) -> 617: [{SMID1, _} | _ ] = ?config(smid_test, Config), 618: mongoose_helper:wait_until(fun() -> 619: rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) 620: end, 621: {error, smid_not_found}, 622: #{name => smid_garbage_collected}). 623: 624: resume_session_state_send_message_without_ack(Config) -> 625: resume_session_state_send_message_generic(Config, no_ack). 626: 627: resume_session_state_send_message_with_ack(Config) -> 628: resume_session_state_send_message_generic(Config, ack). 629: 630: resume_session_state_send_message_generic(Config, AckInitialPresence) -> 631: %% connect bob and alice 632: Bob = connect_fresh(Config, bob, presence), 633: Alice = connect_fresh(Config, alice, sr_presence, manual), 634: maybe_ack_initial_presence(Alice, AckInitialPresence), 635: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 636: %% kill alice connection 637: C2SPid = mongoose_helper:get_session_pid(Alice), 638: escalus_connection:kill(Alice), 639: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 640: sm_helper:assert_alive_resources(Alice, 1), 641: 642: %% send some messages and check if c2s can handle it 643: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 644: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 645: %% suspend the process to ensure that Alice has enough time to reconnect, 646: %% before resumption timeout occurs. 647: ok = rpc(mim(), sys, suspend, [C2SPid]), 648: 649: %% alice comes back and receives unacked message 650: NewAlice = connect_same(Alice, presence), 651: %% now we can resume c2s process of the old connection 652: %% and let it process session resumption timeout 653: ok = rpc(mim(), sys, resume, [C2SPid]), 654: Stanzas = escalus:wait_for_stanzas(NewAlice, 3), 655: 656: % what about order ? 657: % alice receive presence from herself and 3 unacked messages from bob 658: escalus_new_assert:mix_match([is_chat(<<"msg-1">>), 659: is_chat(<<"msg-2">>), 660: is_chat(<<"msg-3">>)], 661: Stanzas), 662: escalus_connection:stop(Bob), 663: escalus_connection:stop(NewAlice). 664: 665: %%for instance it can be done by mod ping 666: resume_session_state_stop_c2s(Config) -> 667: Bob = connect_fresh(Config, bob, presence), 668: Alice = connect_fresh(Config, alice, sr_presence, manual), 669: 670: get_ack(Alice), 671: ack_initial_presence(Alice), 672: 673: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 674: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), 675: 676: %% get pid of c2s 677: C2SPid = mongoose_helper:get_session_pid(Alice), 678: %% Wait c2s process to process our presence ack. 679: %% Otherwise, we can receive two initial presences sometimes. 680: sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), 681: 682: % kill alice connection 683: escalus_connection:kill(Alice), 684: % session should be alive 685: sm_helper:assert_alive_resources(Alice, 1), 686: rpc(mim(), ejabberd_c2s, stop, [C2SPid]), 687: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 688: %% suspend the process to ensure that Alice has enough time to reconnect, 689: %% before resumption timeout occurs. 690: ok = rpc(mim(), sys, suspend, [C2SPid]), 691: 692: %% alice comes back and receives unacked message 693: NewAlice = connect_same(Alice, presence, manual), 694: %% now we can resume c2s process of the old connection 695: %% and let it process session resumption timeout 696: ok = rpc(mim(), sys, resume, [C2SPid]), 697: 698: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), 699: escalus_connection:stop(Bob), 700: escalus_connection:stop(NewAlice). 701: 702: %% This test only verifies the validity of helpers (get_session_pid, 703: %% get_c2s_state_name) written for wait_for_resumption 704: %% testcase. 705: session_established(Config) -> 706: Alice = connect_fresh(Config, alice, presence), 707: C2SPid = mongoose_helper:get_session_pid(Alice), 708: session_established = mongoose_helper:get_c2s_state_name(C2SPid), 709: escalus_connection:stop(Alice). 710: 711: %% Ensure that after a violent disconnection, 712: %% the c2s waits for resumption (but don't resume yet). 713: wait_for_resumption(Config) -> 714: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 715: Bob = connect_fresh(Config, bob, session), 716: Texts = three_texts(), 717: {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 718: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session). 719: 720: unacknowledged_message_hook_resume(Config) -> 721: unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). 722: 723: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> 724: NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 725: send_initial_presence(NewAlice), 726: {Resource, NewAlice}. 727: 728: unacknowledged_message_hook_bounce(Config) -> 729: unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). 730: 731: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> 732: NewResource = <<"new_", Resource/binary>>, 733: NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), 734: NewAlice = connect_spec(NewSpec, sr_session, manual), 735: send_initial_presence(NewAlice), 736: %% ensure second C2S is registered so all the messages are bounced properly 737: sm_helper:wait_for_resource_count(NewAlice, 2), 738: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 739: {NewResource, NewAlice}. 740: 741: unacknowledged_message_hook_offline(Config) -> 742: unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). 743: 744: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> 745: C2SRef = erlang:monitor(process, C2SPid), 746: %%reset the session, so old C2S process is stopped 747: NewAlice = connect_spec(AliceSpec, sr_session, manual), 748: %% wait for old C2S termination before send presence. other way 749: %% some of the latest unacknowledged messages can be bounced to 750: %% the new C2S process instead of going to the mod_offline storage. 751: %% looks like all the unacknowledged messages arrive to the new 752: %% C2S, but the message sequence is broken (the bounced messages 753: %% delivered before the messages from the mod_offline storage) 754: sm_helper:wait_for_process_termination(C2SRef), 755: send_initial_presence(NewAlice), 756: {Resource, NewAlice}. 757: 758: unacknowledged_message_hook_common(RestartConnectionFN, Config) -> 759: %% connect bob and alice 760: Bob = connect_fresh(Config, bob, presence), 761: 762: AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), 763: Resource = proplists:get_value(username, AliceSpec0), 764: AliceSpec = [{resource, Resource} | AliceSpec0], 765: HookHandlerExtra = start_hook_listener(Resource), 766: Alice = connect_spec(AliceSpec, sr_presence, manual), 767: %% Ack the presence stanza 768: get_ack(Alice), 769: ack_initial_presence(Alice), 770: 771: SMID = sm_helper:client_to_smid(Alice), 772: 773: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 774: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 775: %% kill alice connection 776: C2SPid = mongoose_helper:get_session_pid(Alice), 777: escalus_connection:kill(Alice), 778: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 779: sm_helper:assert_alive_resources(Alice, 1), 780: 781: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), 782: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), 783: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 784: 785: %% send some messages and check if c2s can handle it 786: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 787: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), 788: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), 789: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), 790: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 791: 792: %% alice comes back and receives unacked message 793: {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), 794: 795: mongoose_helper:wait_until( 796: fun() -> 797: Stanza = escalus_connection:get_stanza(NewAlice, msg), 798: escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), 799: ok 800: end, ok), 801: 802: NewC2SPid = mongoose_helper:get_session_pid(NewAlice), 803: escalus_connection:kill(NewAlice), 804: mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session), 805: 806: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 807: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 808: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 809: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 810: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 811: stop_hook_listener(HookHandlerExtra), 812: escalus_connection:stop(Bob). 813: 814: resume_session(Config) -> 815: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 816: Texts = three_texts(), 817: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 818: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 819: %% Resume the session. 820: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 821: %% Alice receives the unacked messages from the previous 822: %% interrupted session. 823: sm_helper:wait_for_messages(Alice, Texts), 824: %% Alice acks the received messages. 825: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 826: escalus_connection:stop(Alice) 827: end). 828: 829: resume_session_with_wrong_h_does_not_leak_sessions(Config) -> 830: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 831: Messages = three_texts(), 832: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 833: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), 834: %% Resume the session. 835: Alice = connect_spec(AliceSpec, auth, manual), 836: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), 837: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), 838: 839: [] = sm_helper:get_user_present_resources(Alice), 840: HostType = host_type(), 841: {error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID), 842: escalus_connection:wait_for_close(Alice, timer:seconds(5)) 843: end). 844: 845: resume_session_with_wrong_sid_returns_item_not_found(Config) -> 846: session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). 847: 848: resume_session_with_wrong_namespace_is_a_noop(Config) -> 849: Alice = connect_fresh(Config, alice, auth), 850: #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), 851: Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), 852: escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), 853: escalus_assert:has_no_stanzas(Alice), 854: [] = sm_helper:get_user_present_resources(Alice), 855: true = escalus_connection:is_connected(Alice), 856: escalus_connection:stop(Alice). 857: 858: resume_dead_session_results_in_item_not_found(Config) -> 859: SMID = base64:encode(crypto:strong_rand_bytes(21)), 860: SID = {os:timestamp(), undefined}, 861: HostType = host_type(), 862: rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), 863: session_resumption_expects_item_not_found(Config, SMID). 864: 865: session_resumption_expects_item_not_found(Config, SMID) -> 866: Alice = connect_fresh(Config, alice, auth), 867: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), 868: escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), 869: [] = sm_helper:get_user_present_resources(Alice), 870: true = escalus_connection:is_connected(Alice), 871: escalus_connection:stop(Alice). 872: 873: resume_session_kills_old_C2S_gracefully(Config) -> 874: Alice = connect_fresh(Config, alice, sr_presence, manual), 875: C2SPid = mongoose_helper:get_session_pid(Alice), 876: 877: %% Monitor the C2S process and disconnect Alice. 878: MonitorRef = sm_helper:monitor_session(Alice), 879: escalus_client:kill_connection(Config, Alice), 880: 881: %% Ensure the c2s process is waiting for resumption. 882: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 883: 884: %% Resume the session. 885: NewAlice = connect_resume(Alice, 1), 886: 887: %% C2S process should die gracefully with Reason=normal. 888: sm_helper:wait_for_process_termination(MonitorRef), 889: escalus_connection:stop(NewAlice). 890: 891: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> 892: Alice = connect_spec(AliceSpec, sr_presence, manual), 893: %% Bobs sends some messages to Alice. 894: sm_helper:send_messages(Bob, Alice, Texts), 895: %% Alice receives them, but doesn't ack. 896: sm_helper:wait_for_messages(Alice, Texts), 897: %% Alice's connection is violently terminated. 898: escalus_client:kill_connection(Config, Alice), 899: C2SPid = mongoose_helper:get_session_pid(Alice), 900: SMID = sm_helper:client_to_smid(Alice), 901: {C2SPid, SMID}. 902: 903: aggressively_pipelined_resume(Config) -> 904: AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} 905: | escalus_fresh:create_fresh_user(Config, alice)], 906: UnackedMessages = three_texts(), 907: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 908: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), 909: %% Resume the session. 910: Alice = escalus_connection:connect(AliceSpec), 911: 912: Username = proplists:get_value(username, AliceSpec), 913: Password = proplists:get_value(password, AliceSpec), 914: Payload = <<0:8,Username/binary,0:8,Password/binary>>, 915: Server = proplists:get_value(server, AliceSpec), 916: 917: Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 918: Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), 919: AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 920: Resume = escalus_stanza:resume(SMID, 2), 921: 922: escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), 923: Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], 924: escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), 925: 926: escalus_connection:stop(Alice) 927: end). 928: 929: %% This is a regression test for a case when a session processes a request, which will 930: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. 931: %% Without proper handling, the reply would be rejected because the resumed session 932: %% has new SID. 933: replies_are_processed_by_resumed_session(Config) -> 934: %% GIVEN a session and registered special IQ handler (added in init_per_testcase), 935: %% that waits for old session process to terminate (at this point new process 936: %% has fully taken over) and then actually sends the reply. 937: Alice = connect_fresh(Config, alice, sr_presence), 938: 939: %% WHEN a client sends IQ request to the special handler... 940: IQReq = escalus_stanza:iq_get(regression_ns(), []), 941: escalus:send(Alice, IQReq), 942: 943: %% ... goes down and session is resumed. 944: Alice2 = sm_helper:kill_and_connect_resume(Alice), 945: 946: %% THEN the client receives the reply properly. 947: IQReply = escalus:wait_for_stanza(Alice2), 948: escalus:assert(is_iq_result, [IQReq], IQReply), 949: escalus_connection:stop(Alice2). 950: 951: %% This is a regression test for a bug, which manifested in following scenario 952: %% (due to improper presence sub requests buffering): 953: %% 1. A is online, B is offline 954: %% 2. A subscribes to B's presence; 955: %% 3. B becomes online 956: %% 4. A sends a message to B 957: %% 5. B doesn't SM-ack the request or message, terminates the connection 958: %% 6. B reconnects but with session *replace*, not resume 959: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer 960: %% 8. B doesn't receive the buffered message 961: subscription_requests_are_buffered_properly(Config) -> 962: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 963: MsgBody = <<"buffered">>, 964: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 965: % GIVEN Bob's pending subscription to Alice's presence 966: AliceJid = common_helper:get_bjid(AliceSpec), 967: escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), 968: _RosterPushReq = escalus:wait_for_stanza(Bob), 969: 970: % WHEN Alice becomes online... 971: Alice = connect_spec(AliceSpec, sr_session, manual), 972: send_initial_presence(Alice), 973: %% subscribe could come before the initial presence 974: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 975: escalus:wait_for_stanzas(Alice, 2)), 976: 977: % ...and Bob sends a message to Alice... 978: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 979: MsgStanza = escalus:wait_for_stanza(Alice), 980: escalus:assert(is_chat_message, [MsgBody], MsgStanza), 981: 982: % ...and Alice terminates connection without acking anything... 983: escalus_client:kill_connection(Config, Alice), 984: 985: % ...and reconnects with session replacement. 986: Alice2 = connect_spec(AliceSpec, session, manual), 987: 988: % THEN Alice receives (without sending initial presence): 989: % * buffered Bob's message (like above) 990: % Alice DOESN'T receive: 991: % * buffered subscription request because it is dropped by ejabberd_sm 992: % because it's treated like repeated sub request to bare JID, so it's not 993: % processed by any sub req handler (like mod_roster) 994: % * buffered available presence from Alice - because it is addressed to another SID 995: % and Alice2 is a brand new session 996: escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), 997: sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), 998: escalus_assert:has_no_stanzas(Alice2), 999: 1000: %% Only once an initial presence is sent, a subscription request is sent 1001: send_initial_presence(Alice2), 1002: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1003: escalus:wait_for_stanzas(Alice2, 2)), 1004: 1005: sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), 1006: escalus_assert:has_no_stanzas(Alice2), 1007: 1008: escalus_connection:stop(Alice2) 1009: end). 1010: 1011: %% This is a regression test for a bug, due to which messages sent to old session 1012: %% in a middle of state handover were not appended properly to SM buffer. 1013: %% Scenario to reproduce: 1014: %% 1. Online Bob and Alice 1015: %% 2. Alice kills the connection 1016: %% 3. Alice's session is suspended 1017: %% 4. Alice resumes session with new connection. At this moment new session is still not 1018: %% present in session table. `resume` request is stuck in old proc mailbox. 1019: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now 1020: %% old session has two messages in mailbox: `resume` and XML from Bob 1021: %% 6. We resume old process and it begins session handover 1022: %% 7. Bob's message is appended to SM buffer in "flush" step 1023: %% 8. With bug fixed, the message is retransmitted properly 1024: messages_are_properly_flushed_during_resumption(Config) -> 1025: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1026: % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended 1027: Alice = connect_fresh(Config, alice, sr_presence), 1028: SMH = escalus_connection:get_sm_h(Alice), 1029: escalus_client:kill_connection(Config, Alice), 1030: %% The receiver process would stop now 1031: C2SPid = mongoose_helper:get_session_pid(Alice), 1032: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1033: 1034: sm_helper:wait_for_queue_length(C2SPid, 0), 1035: ok = rpc(mim(), sys, suspend, [C2SPid]), 1036: 1037: % WHEN new session requests resumption 1038: % we wait until that old session has resumption request enqueued; 1039: % we need it to ensure the order of messages: resume first, Bob's chat second. 1040: % Actual wait and message sent by Bob is done in separate process 1041: % because new client start will block until old process is resumed 1042: 1043: MsgBody = <<"flush-regression">>, 1044: spawn_link(fun() -> 1045: sm_helper:wait_for_queue_length(C2SPid, 1), 1046: 1047: % Bob sends a message... 1048: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1049: 1050: % ...we ensure that a message is enqueued in Alice's session... 1051: % (2 messages = resume request + Bob's message) 1052: sm_helper:wait_for_queue_length(C2SPid, 2), 1053: 1054: % ...and old process is resumed. 1055: ok = rpc(mim(), sys, resume, [C2SPid]) 1056: end), 1057: Alice2 = connect_resume(Alice, SMH), 1058: % THEN Alice's new session receives Bob's message 1059: RecvMsg = escalus:wait_for_stanza(Alice2), 1060: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1061: end). 1062: 1063: messages_are_properly_flushed_during_resumption_p1_fsm_old(Config) -> 1064: %% the same as messages_are_properly_flushed_during_resumption, 1065: %% but tests that buffered by p1_fsm_old messages are delivered 1066: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1067: Alice = connect_fresh(Config, alice, sr_presence), 1068: SMH = escalus_connection:get_sm_h(Alice), 1069: escalus_client:kill_connection(Config, Alice), 1070: C2SPid = mongoose_helper:get_session_pid(Alice), 1071: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1072: ok = rpc(mim(), sys, suspend, [C2SPid]), 1073: 1074: %% send some dummy event. ignored by c2s but ensures that 1075: %% p1_old_fsm buffers the messages, sent after this one 1076: rpc(mim(), p1_fsm_old, send_all_state_event, [C2SPid, dummy_event]), 1077: 1078: MsgBody = <<"flush-regression">>, 1079: spawn_link(fun() -> 1080: sm_helper:wait_for_queue_length(C2SPid, 2), 1081: 1082: % Bob sends a message... 1083: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1084: 1085: % ...we ensure that a message is enqueued in Alice's session... 1086: % (2 messages = resume request + Bob's message) 1087: sm_helper:wait_for_queue_length(C2SPid, 3), 1088: 1089: % ...and old process is resumed. 1090: ok = rpc(mim(), sys, resume, [C2SPid]) 1091: end), 1092: Alice2 = connect_resume(Alice, SMH), 1093: % THEN Alice's new session receives Bob's message 1094: RecvMsg = escalus:wait_for_stanza(Alice2), 1095: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1096: end). 1097: 1098: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> 1099: Alice = connect_fresh(Config, alice, session, manual), 1100: %% Should not crash anything! 1101: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 1102: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1103: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1104: escalus_connection:stop(Alice). 1105: 1106: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> 1107: Alice = connect_fresh(Config, alice, session, manual), 1108: %% Should not crash anything! 1109: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 1110: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1111: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1112: escalus_connection:stop(Alice). 1113: 1114: %%-------------------------------------------------------------------- 1115: %% Helpers 1116: %%-------------------------------------------------------------------- 1117: start_hook_listener(Resource) -> 1118: TestCasePid = self(), 1119: rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). 1120: 1121: stop_hook_listener(HookExtra) -> 1122: rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]). 1123: 1124: rpc_start_hook_handler(TestCasePid, User, HostType) -> 1125: LUser = jid:nodeprep(User), 1126: Extra = #{luser => LUser, pid => TestCasePid}, 1127: gen_hook:add_handler(unacknowledged_message, HostType, 1128: fun ?MODULE:hook_handler_fn/3, 1129: Extra, 50), 1130: Extra. 1131: 1132: rpc_stop_hook_handler(HookExtra, HostType) -> 1133: gen_hook:delete_handler(unacknowledged_message, HostType, 1134: fun ?MODULE:hook_handler_fn/3, 1135: HookExtra, 50). 1136: 1137: hook_handler_fn(Acc, 1138: #{args := [Jid]} = _Params, 1139: #{luser := LUser, pid := TestCasePid} = _Extra) -> 1140: {U, _S, R} = jid:to_lower(Jid), 1141: case U of 1142: LUser -> 1143: Counter = mongoose_acc:get(sm_test, counter, 0, Acc), 1144: El = mongoose_acc:element(Acc), 1145: TestCasePid ! {sm_test, Counter, R, El}, 1146: {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)}; 1147: _ -> {ok, Acc} 1148: end. 1149: 1150: wait_for_unacked_msg_hook(Counter, Res, Timeout) -> 1151: receive 1152: {sm_test, AccCounter, Resource, Stanza} = Msg -> 1153: ?assertEqual(Counter, AccCounter, Msg), 1154: ?assertEqual(Res, Resource, Msg), 1155: Stanza 1156: after Timeout -> 1157: timeout 1158: end. 1159: 1160: is_chat(Content) -> 1161: fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end. 1162: 1163: is_presence(Type) -> 1164: fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end. 1165: 1166: three_texts() -> 1167: [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>]. 1168: 1169: %%-------------------------------------------------------------------- 1170: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" 1171: %%-------------------------------------------------------------------- 1172: 1173: regression_ns() -> 1174: <<"regression">>. 1175: 1176: register_handler() -> 1177: HostType = host_type(), 1178: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 1179: [HostType, regression_ns(), ejabberd_sm, 1180: fun ?MODULE:regression_handler/5, #{}, one_queue]). 1181: 1182: unregister_handler() -> 1183: HostType = host_type(), 1184: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 1185: [HostType, regression_ns(), ejabberd_sm]). 1186: 1187: regression_handler(Acc, _From, _To, IQ, _Extra) -> 1188: %% A bit of a hack - will no longer work when the SID format changes 1189: {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), 1190: erlang:monitor(process, Pid), 1191: receive 1192: {'DOWN', _, _, _, _} -> 1193: ok 1194: after 1195: 10000 -> 1196: error({c2s_not_stopped_after_timeout, Pid}) 1197: end, 1198: %% We avoid another race condition - there is a short window where user session 1199: %% is not registered in ejabberd_sm: between old process termination and the moment 1200: %% when the new process stores new session in memory. It should be fixed separately. 1201: wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100), 1202: {Acc, jlib:make_result_iq_reply(IQ)}. 1203: 1204: wait_for_session(JID, Retries, SleepTime) -> 1205: case ejabberd_sm:get_session(JID) of 1206: offline -> 1207: timer:sleep(SleepTime), 1208: wait_for_session(JID, Retries - 1, SleepTime); 1209: _ -> 1210: ok 1211: end. 1212: 1213: maybe_ack_initial_presence(Alice, ack) -> 1214: ack_initial_presence(Alice); 1215: maybe_ack_initial_presence(_Alice, no_ack) -> 1216: ok.