1: %% Stream Management tests 2: -module(sm_SUITE). 3: 4: -compile(export_all). 5: -include_lib("exml/include/exml.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: -include_lib("escalus/include/escalus.hrl"). 8: -include_lib("common_test/include/ct.hrl"). 9: 10: %% Injected code callbacks 11: -export([rpc_start_hook_handler/3, 12: rpc_stop_hook_handler/2, 13: hook_handler_fn/3, 14: regression_handler/5]). 15: 16: -import(distributed_helper, [mim/0, 17: require_rpc_nodes/1, 18: rpc/4]). 19: 20: -import(domain_helper, [host_type/0]). 21: 22: -import(sm_helper, [connect_fresh/3, 23: connect_fresh/4, 24: connect_spec/2, 25: connect_spec/3, 26: connect_same/2, 27: connect_same/3, 28: connect_resume/2, 29: process_initial_stanza/1, 30: send_initial_presence/1, 31: get_ack/1, 32: ack_initial_presence/1]). 33: 34: -define(MOD_SM, mod_stream_management). 35: -define(CONSTRAINT_CHECK_TIMEOUT, 5000). 36: -define(LONG_TIMEOUT, 3600). 37: -define(SHORT_TIMEOUT, 1). 38: -define(SMALL_SM_BUFFER, 3). 39: 40: %%-------------------------------------------------------------------- 41: %% Suite configuration 42: %%-------------------------------------------------------------------- 43: 44: suite() -> 45: require_rpc_nodes([mim]) ++ escalus:suite(). 46: 47: all() -> 48: ct_helper:groups_to_all(groups()). 49: 50: groups() -> 51: P = [parallel], 52: [{parallel, P, parallel_cases()}, 53: {parallel_manual_ack_freq_1, P, parallel_manual_ack_freq_1_cases()}, 54: {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, 55: {stale_h, [], stale_h_cases()}, 56: {parallel_unacknowledged_message_hook, P, parallel_unacknowledged_message_hook_cases()}]. 57: 58: parallel_cases() -> 59: [server_announces_sm, 60: server_enables_sm_before_session, 61: server_enables_sm_after_session, 62: server_returns_failed_after_start, 63: server_returns_failed_after_auth, 64: server_enables_resumption, 65: client_enables_sm_twice_fails_with_correct_error_stanza, 66: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza, 67: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session, 68: basic_ack, 69: h_ok_before_session, 70: h_ok_after_session_enabled_before_session, 71: h_ok_after_session_enabled_after_session, 72: h_ok_after_a_chat, 73: h_non_given_closes_stream_gracefully, 74: resend_unacked_on_reconnection, 75: session_established, 76: wait_for_resumption, 77: resume_session, 78: resume_session_with_wrong_h_does_not_leak_sessions, 79: resume_session_with_wrong_sid_returns_item_not_found, 80: resume_session_with_wrong_namespace_is_a_noop, 81: resume_dead_session_results_in_item_not_found, 82: resume_session_kills_old_C2S_gracefully, 83: aggressively_pipelined_resume, 84: replies_are_processed_by_resumed_session, 85: subscription_requests_are_buffered_properly, 86: messages_are_properly_flushed_during_resumption, 87: messages_are_properly_flushed_during_resumption_p1_fsm_old]. 88: 89: parallel_manual_ack_freq_1_cases() -> 90: [client_acks_more_than_sent, 91: too_many_unacked_stanzas, 92: resend_unacked_after_resume_timeout, 93: resume_session_state_send_message_with_ack, 94: resume_session_state_send_message_without_ack, 95: resume_session_state_stop_c2s, 96: server_requests_ack_after_session, 97: resend_more_offline_messages_than_buffer_size, 98: server_requests_ack]. 99: 100: manual_ack_freq_2_cases() -> 101: [server_requests_ack_freq_2]. 102: 103: stale_h_cases() -> 104: [resume_expired_session_returns_correct_h, 105: gc_repeat_after_never_means_no_cleaning, 106: gc_repeat_after_timeout_does_clean]. 107: 108: stream_mgmt_disabled_cases() -> 109: [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt, 110: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption]. 111: 112: manual_ack_freq_long_session_timeout_cases() -> 113: [preserve_order]. 114: 115: parallel_unacknowledged_message_hook_cases() -> 116: [unacknowledged_message_hook_bounce, 117: unacknowledged_message_hook_offline, 118: unacknowledged_message_hook_resume]. 119: 120: %%-------------------------------------------------------------------- 121: %% Init & teardown 122: %%-------------------------------------------------------------------- 123: 124: init_per_suite(Config) -> 125: NewConfig = dynamic_modules:save_modules(host_type(), Config), 126: NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), 127: mongoose_helper:inject_module(?MODULE), 128: escalus:init_per_suite(NewConfigWithSM). 129: 130: end_per_suite(Config) -> 131: escalus_fresh:clean(), 132: dynamic_modules:restore_modules(Config), 133: escalus:end_per_suite(Config). 134: 135: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; 136: Group =:= manual_ack_freq_long_session_timeout; 137: Group =:= parallel_manual_ack_freq_1; 138: Group =:= manual_ack_freq_2 -> 139: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 140: Config; 141: init_per_group(stale_h, Config) -> 142: Config; 143: init_per_group(stream_mgmt_disabled, Config) -> 144: dynamic_modules:stop(host_type(), ?MOD_SM), 145: rpc(mim(), mnesia, delete_table, [sm_session]), 146: Config; 147: init_per_group(Group, Config) -> 148: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 149: Config. 150: 151: end_per_group(_Group, _Config) -> 152: ok. 153: 154: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> 155: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 156: escalus:init_per_testcase(CN, Config); 157: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; 158: CN =:= gc_repeat_after_timeout_does_clean -> 159: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 160: Config2 = register_some_smid_h(Config), 161: escalus:init_per_testcase(CN, Config2); 162: init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> 163: escalus:init_per_testcase(CN, Config); 164: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 165: register_handler(), 166: escalus:init_per_testcase(CN, Config); 167: init_per_testcase(CaseName, Config) -> 168: escalus:init_per_testcase(CaseName, Config). 169: 170: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; 171: CN =:= gc_repeat_after_never_means_no_cleaning; 172: CN =:= gc_repeat_after_timeout_does_clean -> 173: rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]), 174: escalus:end_per_testcase(CN, Config); 175: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 176: unregister_handler(), 177: escalus:end_per_testcase(CN, Config); 178: end_per_testcase(CaseName, Config) -> 179: escalus:end_per_testcase(CaseName, Config). 180: 181: %% Module configuration per group (in case of stale_h group it is per testcase) 182: 183: required_modules(Scope, Name) -> 184: SMConfig = case required_sm_opts(Scope, Name) of 185: stopped -> stopped; 186: ExtraOpts -> merge_proplists(common_sm_opts(), ExtraOpts) 187: end, 188: [{mod_stream_management, SMConfig}, {mod_offline, []}]. 189: 190: required_sm_opts(group, parallel) -> 191: [{ack_freq, never}]; 192: required_sm_opts(group, parallel_manual_ack_freq_1) -> 193: [{ack_freq, 1}, 194: {resume_timeout, ?SHORT_TIMEOUT}]; 195: required_sm_opts(group, manual_ack_freq_2) -> 196: [{ack_freq, 2}]; 197: required_sm_opts(group, stream_mgmt_disabled) -> 198: stopped; 199: required_sm_opts(group, parallel_unacknowledged_message_hook) -> 200: [{ack_freq, 1}]; 201: required_sm_opts(group, manual_ack_freq_long_session_timeout) -> 202: [{ack_freq, 1}, {buffer_max, 1000}]; 203: required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> 204: [{ack_freq, 1}, 205: {resume_timeout, ?SHORT_TIMEOUT} | stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)]; 206: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> 207: stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT); 208: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> 209: stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT). 210: 211: common_sm_opts() -> 212: [{buffer_max, ?SMALL_SM_BUFFER}]. 213: 214: merge_proplists(Defaults, Values) -> 215: Values ++ delete_keys(proplists:get_keys(Values), Defaults). 216: 217: delete_keys(Keys, List) -> 218: lists:foldl(fun proplists:delete/2, List, Keys). 219: 220: stale_h(RepeatAfter, Geriatric) -> 221: [{stale_h, [{enabled, true}, 222: {stale_h_repeat_after, RepeatAfter}, 223: {stale_h_geriatric, Geriatric}]}]. 224: 225: make_smid() -> 226: base64:encode(crypto:strong_rand_bytes(21)). 227: 228: register_smid(IntSmidId) -> 229: S = {SMID = make_smid(), IntSmidId}, 230: ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), 231: S. 232: 233: register_some_smid_h(Config) -> 234: TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)), 235: [{smid_test, TestSmids} | Config]. 236: 237: %%-------------------------------------------------------------------- 238: %% Tests 239: %%-------------------------------------------------------------------- 240: 241: server_announces_sm(Config) -> 242: AliceSpec = escalus_fresh:freshen_spec(Config, alice), 243: {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, 244: [start_stream]), 245: true = escalus_session:can_use_stream_management(Props, Features). 246: 247: 248: server_enables_sm_before_session(Config) -> 249: connect_fresh(Config, alice, sm_after_bind). 250: 251: server_enables_sm_after_session(Config) -> 252: connect_fresh(Config, alice, sm_after_session). 253: 254: server_returns_failed_after_start(Config) -> 255: Alice = connect_fresh(Config, alice, before_auth), 256: server_returns_failed(Alice). 257: 258: server_returns_failed_after_auth(Config) -> 259: Alice = connect_fresh(Config, alice, auth), 260: server_returns_failed(Alice). 261: 262: server_enables_resumption(Config) -> 263: Alice = connect_fresh(Config, alice, sr_presence), 264: escalus_connection:stop(Alice). 265: 266: server_returns_failed(Alice) -> 267: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 268: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 269: escalus_connection:get_stanza(Alice, enable_sm_failed)). 270: 271: client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> 272: Alice = connect_fresh(Config, alice, sm_before_session), 273: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 274: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 275: escalus_connection:get_stanza(Alice, enable_sm_failed)), 276: escalus:assert(is_stream_end, 277: escalus_connection:get_stanza(Alice, enable_sm_failed)), 278: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 279: 280: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> 281: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 282: Alice = connect_fresh(Config, alice, sr_presence), 283: SMH = escalus_connection:get_sm_h(Alice), 284: %% WHEN USER RESUMES SESSION FROM NEW CLIENT 285: Alice2 = connect_resume(Alice, SMH), 286: process_initial_stanza(Alice2), 287: %% THEN: Old session is gracefully closed with the correct error stanza 288: escalus:assert(is_stream_error, [<<"conflict">>, <<>>], 289: escalus_connection:get_stanza(Alice, close_old_stream)), 290: escalus:assert(is_stream_end, 291: escalus_connection:get_stanza(Alice, close_old_stream)), 292: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), 293: true = escalus_connection:is_connected(Alice2), 294: escalus_connection:stop(Alice2). 295: 296: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> 297: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 298: Alice = connect_fresh(Config, alice, sr_presence), 299: %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT 300: Alice2 = sm_helper:kill_and_connect_resume(Alice), 301: process_initial_stanza(Alice2), 302: %% THEN new session does not have any message rerouted 303: false = escalus_client:has_stanzas(Alice2), 304: true = escalus_connection:is_connected(Alice2), 305: escalus_connection:stop(Alice2). 306: 307: basic_ack(Config) -> 308: Alice = connect_fresh(Config, alice, sm_after_session), 309: escalus_connection:send(Alice, escalus_stanza:roster_get()), 310: escalus:assert(is_roster_result, 311: escalus_connection:get_stanza(Alice, roster_result)), 312: escalus_connection:send(Alice, escalus_stanza:sm_request()), 313: escalus:assert(is_sm_ack, 314: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 315: 316: %% Test that "h" value is valid when: 317: %% - SM is enabled *before* the session is established 318: %% - <r/> is sent *before* the session is established 319: h_ok_before_session(Config) -> 320: Alice = connect_fresh(Config, alice, sm_after_bind), 321: escalus_connection:send(Alice, escalus_stanza:sm_request()), 322: escalus:assert(is_sm_ack, [0], 323: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 324: 325: %% Test that "h" value is valid when: 326: %% - SM is enabled *before* the session is established 327: %% - <r/> is sent *after* the session is established 328: h_ok_after_session_enabled_before_session(Config) -> 329: Alice = connect_fresh(Config, alice, sm_before_session), 330: escalus_connection:send(Alice, escalus_stanza:sm_request()), 331: escalus:assert(is_sm_ack, [1], 332: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 333: 334: %% Test that "h" value is valid when: 335: %% - SM is enabled *after* the session is established 336: %% - <r/> is sent *after* the session is established 337: h_ok_after_session_enabled_after_session(Config) -> 338: Alice = connect_fresh(Config, alice, sm_after_session), 339: escalus_connection:send(Alice, escalus_stanza:roster_get()), 340: escalus:assert(is_roster_result, 341: escalus_connection:get_stanza(Alice, roster_result)), 342: escalus_connection:send(Alice, escalus_stanza:sm_request()), 343: escalus:assert(is_sm_ack, [1], 344: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 345: 346: %% Test that "h" value is valid after exchanging a few messages. 347: h_ok_after_a_chat(ConfigIn) -> 348: Config = escalus_users:update_userspec(ConfigIn, alice, 349: stream_management, true), 350: escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> 351: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), 352: escalus:assert(is_chat_message, [<<"Hi, Bob!">>], 353: escalus:wait_for_stanza(Bob)), 354: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 355: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 356: escalus:wait_for_stanza(Alice)), 357: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), 358: escalus:assert(is_chat_message, [<<"How's life?">>], 359: escalus:wait_for_stanza(Alice)), 360: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), 361: escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], 362: escalus:wait_for_stanza(Bob)), 363: escalus:send(Alice, escalus_stanza:sm_request()), 364: escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), 365: %% Ack, so that unacked messages don't go into offline store. 366: escalus:send(Alice, escalus_stanza:sm_ack(3)) 367: end). 368: 369: h_non_given_closes_stream_gracefully(ConfigIn) -> 370: AStanza = #xmlel{name = <<"a">>, 371: attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, 372: Config = escalus_users:update_userspec(ConfigIn, alice, 373: stream_management, true), 374: escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> 375: C2SPid = mongoose_helper:get_session_pid(Alice), 376: escalus:send(Alice, AStanza), 377: escalus:assert(is_stream_error, 378: [<<"policy-violation">>, <<>>], 379: escalus:wait_for_stanza(Alice)), 380: mongoose_helper:wait_for_pid_to_die(C2SPid), 381: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 382: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) 383: end). 384: 385: client_acks_more_than_sent(Config) -> 386: Alice = connect_fresh(Config, alice, sm_after_session), 387: escalus:send(Alice, escalus_stanza:sm_ack(5)), 388: StreamErrorStanza = escalus:wait_for_stanza(Alice), 389: %% Assert "undefined-condition" children 390: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), 391: %% Assert "handled-count-too-high" children with correct attributes 392: HandledCountSubElement = exml_query:path(StreamErrorStanza, 393: [{element_with_ns, 394: <<"handled-count-too-high">>, 395: <<"urn:xmpp:sm:3">>}]), 396: <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), 397: <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), 398: %% Assert graceful stream end 399: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 400: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 401: 402: too_many_unacked_stanzas(Config) -> 403: Bob = connect_fresh(Config, bob, presence), 404: Alice = connect_fresh(Config, alice, sm_presence, manual), 405: get_ack(Alice), 406: [escalus:send(Bob, escalus_stanza:chat_to(Alice, 407: <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) 408: || N <- lists:seq(1,?SMALL_SM_BUFFER)], 409: escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests 410: escalus:assert(is_stream_error, [<<"resource-constraint">>, 411: <<"too many unacked stanzas">>], 412: %% wait for deferred buffer check 413: escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). 414: 415: server_requests_ack(Config) -> 416: server_requests_ack(Config, 1). 417: 418: server_requests_ack_freq_2(Config) -> 419: server_requests_ack(Config, 2). 420: 421: server_requests_ack(Config, N) -> 422: Bob = connect_fresh(Config, bob, presence), 423: Alice = connect_fresh(Config, alice, sm_presence, manual), 424: %% ack request after initial presence 425: maybe_assert_ack_request(1, N, Alice), 426: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 427: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 428: escalus:wait_for_stanza(Alice)), 429: maybe_assert_ack_request(2, N, Alice). 430: 431: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> 432: ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), 433: case StanzasRec rem AckRequests of 434: 0 -> 435: escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); 436: _ -> 437: ok 438: end, 439: StanzasRec. 440: 441: server_requests_ack_after_session(Config) -> 442: Alice = connect_fresh(Config, alice, sm_before_session, manual), 443: escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). 444: 445: resend_more_offline_messages_than_buffer_size(Config) -> 446: %% connect bob and alice 447: Bob = connect_fresh(Config, bob, presence), 448: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 449: 450: % sent some messages - more than unacked buffer size 451: MessagesToSend = ?SMALL_SM_BUFFER + 1, 452: JID = common_helper:get_bjid(AliceSpec), 453: [escalus_connection:send(Bob, escalus_stanza:chat_to(JID, integer_to_binary(I))) 454: || I <- lists:seq(1, MessagesToSend)], 455: 456: % connect alice who wants to receive all messages from offline storage 457: Alice = connect_spec(AliceSpec, sm_after_session, manual), 458: mongoose_helper:wait_for_n_offline_messages(Alice, MessagesToSend), 459: send_initial_presence(Alice), 460: 461: escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests 462: 463: escalus_connection:get_stanza(Alice, presence), 464: get_ack(Alice), % ack request 465: 466: % confirm messages + presence 467: escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), 468: % wait for check constraint message on server side 469: 470: ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), 471: false = escalus_client:has_stanzas(Alice), 472: % should not receive anything especially any stream errors 473: 474: escalus_connection:stop(Alice), 475: escalus_connection:stop(Bob). 476: 477: resend_unacked_on_reconnection(Config) -> 478: Texts = three_texts(), 479: Bob = connect_fresh(Config, bob, presence), 480: Alice = connect_fresh(Config, alice, sm_presence), 481: AliceSpec = sm_helper:client_to_spec0(Alice), 482: %% Bob sends some messages to Alice. 483: sm_helper:send_messages(Bob, Alice, Texts), 484: %% Alice receives the messages. 485: sm_helper:wait_for_messages(Alice, Texts), 486: %% Alice disconnects without acking the messages. 487: sm_helper:stop_client_and_wait_for_termination(Alice), 488: %% Messages go to the offline store. 489: %% Alice receives the messages from the offline store. 490: NewAlice = connect_spec(AliceSpec, session, manual), 491: send_initial_presence(NewAlice), 492: sm_helper:wait_for_messages(NewAlice, Texts), 493: %% Alice acks the delayed messages so they won't go again 494: %% to the offline store. 495: escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). 496: 497: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order 498: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM 499: %% and more strict tests, reproducing delays in SM and in mod_offline. 500: preserve_order(Config) -> 501: %% connect bob and alice 502: Bob = connect_fresh(Config, bob, presence), 503: Alice = connect_fresh(Config, alice, sr_presence, manual), 504: AliceSpec = sm_helper:client_to_spec(Alice), 505: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), 506: 507: %% kill alice connection 508: escalus_connection:kill(Alice), 509: C2SPid = mongoose_helper:get_session_pid(Alice), 510: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 511: 512: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), 513: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), 514: 515: NewAlice = connect_spec(AliceSpec, session, manual), 516: escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), 517: 518: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), 519: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), 520: 521: %% Without this check we will get stuff out of order 522: mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), 523: 524: send_initial_presence(NewAlice), 525: %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered 526: mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), 527: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), 528: 529: %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) 530: receive_all_ordered(NewAlice, 6), 531: 532: % replace connection 533: NewAlice2 = connect_spec(AliceSpec, session, manual), 534: % allow messages to go to the offline storage 535: mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), 536: 537: send_initial_presence(NewAlice2), 538: 539: % receves messages in correct order 540: receive_all_ordered(NewAlice2, 6), 541: 542: escalus_connection:stop(Bob), 543: escalus_connection:stop(NewAlice2). 544: 545: receive_all_ordered(Conn, Last) -> 546: receive_all_ordered(Conn, 1, Last, []). 547: 548: %% Receive messages from N to Last. 549: %% Ignores acks and presences. 550: %% Handles case when out of order and when not enough stanzas. 551: receive_all_ordered(_Conn, N, Last, Acc) when N > Last -> 552: Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)), 553: sm_helper:assert_messages(Acc, Texts); 554: receive_all_ordered(Conn, N, Last, Acc) -> 555: Stanzas = escalus:wait_for_stanzas(Conn, 1), 556: case Stanzas of 557: [#xmlel{name = <<"message">>}] -> 558: receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas); 559: [_] -> %% Ack or presence 560: receive_all_ordered(Conn, N, Last, Acc); 561: [] -> 562: ct:fail({timeout_waiting, N, Acc}) 563: end. 564: 565: resend_unacked_after_resume_timeout(Config) -> 566: %% connect bob and alice 567: Bob = connect_fresh(Config, bob, presence), 568: Alice = connect_fresh(Config, alice, sr_presence), 569: AliceSpec = sm_helper:client_to_spec(Alice), 570: 571: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 572: %% kill alice connection 573: escalus_connection:kill(Alice), 574: 575: %% ensure there is no session 576: C2SPid = mongoose_helper:get_session_pid(Alice), 577: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 578: 579: %% alice come back and receives unacked message 580: NewAlice = connect_spec(AliceSpec, session), 581: send_initial_presence(NewAlice), 582: 583: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], 584: escalus:wait_for_stanzas(NewAlice, 2)), 585: 586: escalus_connection:stop(Bob), 587: escalus_connection:stop(NewAlice). 588: 589: resume_expired_session_returns_correct_h(Config) -> 590: %% connect bob and alice 591: Bob = connect_fresh(Config, bob, sr_presence), 592: Alice = connect_fresh(Config, alice, sr_presence, manual), 593: get_ack(Alice), 594: 595: %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge 596: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 597: escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), 598: %% alice comes back, but too late, so resumption doesn't work, 599: %% but she receives the previous h = 1 anyway 600: %% NewAlice is also manual ack 601: NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), 602: FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), 603: <<"1">> = exml_query:attr(FailedResumption, <<"h">>), 604: %% And we can continue with bind and session 605: escalus_session:session(escalus_session:bind(NewAlice)), 606: send_initial_presence(NewAlice), 607: Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), 608: escalus_connection:get_stanza(NewAlice, {msg, 2})], 609: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), 610: escalus_connection:stop(Bob), 611: escalus_connection:stop(NewAlice). 612: 613: gc_repeat_after_never_means_no_cleaning(Config) -> 614: [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), 615: {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), 616: {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), 617: {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). 618: 619: gc_repeat_after_timeout_does_clean(Config) -> 620: [{SMID1, _} | _ ] = ?config(smid_test, Config), 621: mongoose_helper:wait_until(fun() -> 622: rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) 623: end, 624: {error, smid_not_found}, 625: #{name => smid_garbage_collected}). 626: 627: resume_session_state_send_message_without_ack(Config) -> 628: resume_session_state_send_message_generic(Config, no_ack). 629: 630: resume_session_state_send_message_with_ack(Config) -> 631: resume_session_state_send_message_generic(Config, ack). 632: 633: resume_session_state_send_message_generic(Config, AckInitialPresence) -> 634: %% connect bob and alice 635: Bob = connect_fresh(Config, bob, presence), 636: Alice = connect_fresh(Config, alice, sr_presence, manual), 637: maybe_ack_initial_presence(Alice, AckInitialPresence), 638: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 639: %% kill alice connection 640: C2SPid = mongoose_helper:get_session_pid(Alice), 641: escalus_connection:kill(Alice), 642: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 643: sm_helper:assert_alive_resources(Alice, 1), 644: 645: %% send some messages and check if c2s can handle it 646: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 647: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 648: %% suspend the process to ensure that Alice has enough time to reconnect, 649: %% before resumption timeout occurs. 650: ok = rpc(mim(), sys, suspend, [C2SPid]), 651: 652: %% alice comes back and receives unacked message 653: NewAlice = connect_same(Alice, presence), 654: %% now we can resume c2s process of the old connection 655: %% and let it process session resumption timeout 656: ok = rpc(mim(), sys, resume, [C2SPid]), 657: Stanzas = escalus:wait_for_stanzas(NewAlice, 3), 658: 659: % what about order ? 660: % alice receive presence from herself and 3 unacked messages from bob 661: escalus_new_assert:mix_match([is_chat(<<"msg-1">>), 662: is_chat(<<"msg-2">>), 663: is_chat(<<"msg-3">>)], 664: Stanzas), 665: escalus_connection:stop(Bob), 666: escalus_connection:stop(NewAlice). 667: 668: %%for instance it can be done by mod ping 669: resume_session_state_stop_c2s(Config) -> 670: Bob = connect_fresh(Config, bob, presence), 671: Alice = connect_fresh(Config, alice, sr_presence, manual), 672: 673: get_ack(Alice), 674: ack_initial_presence(Alice), 675: 676: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 677: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), 678: 679: %% get pid of c2s 680: C2SPid = mongoose_helper:get_session_pid(Alice), 681: %% Wait c2s process to process our presence ack. 682: %% Otherwise, we can receive two initial presences sometimes. 683: sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), 684: 685: % kill alice connection 686: escalus_connection:kill(Alice), 687: % session should be alive 688: sm_helper:assert_alive_resources(Alice, 1), 689: rpc(mim(), ejabberd_c2s, stop, [C2SPid]), 690: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 691: %% suspend the process to ensure that Alice has enough time to reconnect, 692: %% before resumption timeout occurs. 693: ok = rpc(mim(), sys, suspend, [C2SPid]), 694: 695: %% alice comes back and receives unacked message 696: NewAlice = connect_same(Alice, presence, manual), 697: %% now we can resume c2s process of the old connection 698: %% and let it process session resumption timeout 699: ok = rpc(mim(), sys, resume, [C2SPid]), 700: 701: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), 702: escalus_connection:stop(Bob), 703: escalus_connection:stop(NewAlice). 704: 705: %% This test only verifies the validity of helpers (get_session_pid, 706: %% get_c2s_state_name) written for wait_for_resumption 707: %% testcase. 708: session_established(Config) -> 709: Alice = connect_fresh(Config, alice, presence), 710: C2SPid = mongoose_helper:get_session_pid(Alice), 711: session_established = mongoose_helper:get_c2s_state_name(C2SPid), 712: escalus_connection:stop(Alice). 713: 714: %% Ensure that after a violent disconnection, 715: %% the c2s waits for resumption (but don't resume yet). 716: wait_for_resumption(Config) -> 717: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 718: Bob = connect_fresh(Config, bob, session), 719: Texts = three_texts(), 720: {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 721: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session). 722: 723: unacknowledged_message_hook_resume(Config) -> 724: unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). 725: 726: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> 727: NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 728: send_initial_presence(NewAlice), 729: {Resource, NewAlice}. 730: 731: unacknowledged_message_hook_bounce(Config) -> 732: unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). 733: 734: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> 735: NewResource = <<"new_", Resource/binary>>, 736: NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), 737: NewAlice = connect_spec(NewSpec, sr_session, manual), 738: send_initial_presence(NewAlice), 739: %% ensure second C2S is registered so all the messages are bounced properly 740: sm_helper:wait_for_resource_count(NewAlice, 2), 741: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 742: {NewResource, NewAlice}. 743: 744: unacknowledged_message_hook_offline(Config) -> 745: unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). 746: 747: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> 748: C2SRef = erlang:monitor(process, C2SPid), 749: %%reset the session, so old C2S process is stopped 750: NewAlice = connect_spec(AliceSpec, sr_session, manual), 751: %% wait for old C2S termination before send presence. other way 752: %% some of the latest unacknowledged messages can be bounced to 753: %% the new C2S process instead of going to the mod_offline storage. 754: %% looks like all the unacknowledged messages arrive to the new 755: %% C2S, but the message sequence is broken (the bounced messages 756: %% delivered before the messages from the mod_offline storage) 757: sm_helper:wait_for_process_termination(C2SRef), 758: send_initial_presence(NewAlice), 759: {Resource, NewAlice}. 760: 761: unacknowledged_message_hook_common(RestartConnectionFN, Config) -> 762: %% connect bob and alice 763: Bob = connect_fresh(Config, bob, presence), 764: 765: AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), 766: Resource = proplists:get_value(username, AliceSpec0), 767: AliceSpec = [{resource, Resource} | AliceSpec0], 768: HookHandlerExtra = start_hook_listener(Resource), 769: Alice = connect_spec(AliceSpec, sr_presence, manual), 770: %% Ack the presence stanza 771: get_ack(Alice), 772: ack_initial_presence(Alice), 773: 774: SMID = sm_helper:client_to_smid(Alice), 775: 776: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 777: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 778: %% kill alice connection 779: C2SPid = mongoose_helper:get_session_pid(Alice), 780: escalus_connection:kill(Alice), 781: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 782: sm_helper:assert_alive_resources(Alice, 1), 783: 784: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), 785: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), 786: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 787: 788: %% send some messages and check if c2s can handle it 789: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 790: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), 791: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), 792: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), 793: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 794: 795: %% alice comes back and receives unacked message 796: {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), 797: 798: mongoose_helper:wait_until( 799: fun() -> 800: Stanza = escalus_connection:get_stanza(NewAlice, msg), 801: escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), 802: ok 803: end, ok), 804: 805: NewC2SPid = mongoose_helper:get_session_pid(NewAlice), 806: escalus_connection:kill(NewAlice), 807: mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session), 808: 809: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 810: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 811: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 812: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 813: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 814: stop_hook_listener(HookHandlerExtra), 815: escalus_connection:stop(Bob). 816: 817: resume_session(Config) -> 818: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 819: Texts = three_texts(), 820: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 821: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 822: %% Resume the session. 823: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 824: %% Alice receives the unacked messages from the previous 825: %% interrupted session. 826: sm_helper:wait_for_messages(Alice, Texts), 827: %% Alice acks the received messages. 828: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 829: escalus_connection:stop(Alice) 830: end). 831: 832: resume_session_with_wrong_h_does_not_leak_sessions(Config) -> 833: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 834: Messages = three_texts(), 835: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 836: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), 837: %% Resume the session. 838: Alice = connect_spec(AliceSpec, auth, manual), 839: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), 840: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), 841: 842: [] = sm_helper:get_user_present_resources(Alice), 843: HostType = host_type(), 844: {error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID), 845: escalus_connection:wait_for_close(Alice, timer:seconds(5)) 846: end). 847: 848: resume_session_with_wrong_sid_returns_item_not_found(Config) -> 849: session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). 850: 851: resume_session_with_wrong_namespace_is_a_noop(Config) -> 852: Alice = connect_fresh(Config, alice, auth), 853: #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), 854: Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), 855: escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), 856: escalus_assert:has_no_stanzas(Alice), 857: [] = sm_helper:get_user_present_resources(Alice), 858: true = escalus_connection:is_connected(Alice), 859: escalus_connection:stop(Alice). 860: 861: resume_dead_session_results_in_item_not_found(Config) -> 862: SMID = base64:encode(crypto:strong_rand_bytes(21)), 863: SID = {os:timestamp(), undefined}, 864: HostType = host_type(), 865: rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), 866: session_resumption_expects_item_not_found(Config, SMID). 867: 868: session_resumption_expects_item_not_found(Config, SMID) -> 869: Alice = connect_fresh(Config, alice, auth), 870: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), 871: escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), 872: [] = sm_helper:get_user_present_resources(Alice), 873: true = escalus_connection:is_connected(Alice), 874: escalus_connection:stop(Alice). 875: 876: resume_session_kills_old_C2S_gracefully(Config) -> 877: Alice = connect_fresh(Config, alice, sr_presence, manual), 878: C2SPid = mongoose_helper:get_session_pid(Alice), 879: 880: %% Monitor the C2S process and disconnect Alice. 881: MonitorRef = sm_helper:monitor_session(Alice), 882: escalus_client:kill_connection(Config, Alice), 883: 884: %% Ensure the c2s process is waiting for resumption. 885: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 886: 887: %% Resume the session. 888: NewAlice = connect_resume(Alice, 1), 889: 890: %% C2S process should die gracefully with Reason=normal. 891: sm_helper:wait_for_process_termination(MonitorRef), 892: escalus_connection:stop(NewAlice). 893: 894: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> 895: Alice = connect_spec(AliceSpec, sr_presence, manual), 896: %% Bobs sends some messages to Alice. 897: sm_helper:send_messages(Bob, Alice, Texts), 898: %% Alice receives them, but doesn't ack. 899: sm_helper:wait_for_messages(Alice, Texts), 900: %% Alice's connection is violently terminated. 901: escalus_client:kill_connection(Config, Alice), 902: C2SPid = mongoose_helper:get_session_pid(Alice), 903: SMID = sm_helper:client_to_smid(Alice), 904: {C2SPid, SMID}. 905: 906: aggressively_pipelined_resume(Config) -> 907: AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} 908: | escalus_fresh:create_fresh_user(Config, alice)], 909: UnackedMessages = three_texts(), 910: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 911: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), 912: %% Resume the session. 913: Alice = escalus_connection:connect(AliceSpec), 914: 915: Username = proplists:get_value(username, AliceSpec), 916: Password = proplists:get_value(password, AliceSpec), 917: Payload = <<0:8,Username/binary,0:8,Password/binary>>, 918: Server = proplists:get_value(server, AliceSpec), 919: 920: Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 921: Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), 922: AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 923: Resume = escalus_stanza:resume(SMID, 2), 924: 925: escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), 926: Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], 927: escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), 928: 929: escalus_connection:stop(Alice) 930: end). 931: 932: %% This is a regression test for a case when a session processes a request, which will 933: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. 934: %% Without proper handling, the reply would be rejected because the resumed session 935: %% has new SID. 936: replies_are_processed_by_resumed_session(Config) -> 937: %% GIVEN a session and registered special IQ handler (added in init_per_testcase), 938: %% that waits for old session process to terminate (at this point new process 939: %% has fully taken over) and then actually sends the reply. 940: Alice = connect_fresh(Config, alice, sr_presence), 941: 942: %% WHEN a client sends IQ request to the special handler... 943: IQReq = escalus_stanza:iq_get(regression_ns(), []), 944: escalus:send(Alice, IQReq), 945: 946: %% ... goes down and session is resumed. 947: Alice2 = sm_helper:kill_and_connect_resume(Alice), 948: 949: %% THEN the client receives the reply properly. 950: IQReply = escalus:wait_for_stanza(Alice2), 951: escalus:assert(is_iq_result, [IQReq], IQReply), 952: escalus_connection:stop(Alice2). 953: 954: %% This is a regression test for a bug, which manifested in following scenario 955: %% (due to improper presence sub requests buffering): 956: %% 1. A is online, B is offline 957: %% 2. A subscribes to B's presence; 958: %% 3. B becomes online 959: %% 4. A sends a message to B 960: %% 5. B doesn't SM-ack the request or message, terminates the connection 961: %% 6. B reconnects but with session *replace*, not resume 962: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer 963: %% 8. B doesn't receive the buffered message 964: subscription_requests_are_buffered_properly(Config) -> 965: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 966: MsgBody = <<"buffered">>, 967: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 968: % GIVEN Bob's pending subscription to Alice's presence 969: AliceJid = common_helper:get_bjid(AliceSpec), 970: escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), 971: _RosterPushReq = escalus:wait_for_stanza(Bob), 972: 973: % WHEN Alice becomes online... 974: Alice = connect_spec(AliceSpec, sr_session, manual), 975: send_initial_presence(Alice), 976: %% subscribe could come before the initial presence 977: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 978: escalus:wait_for_stanzas(Alice, 2)), 979: 980: % ...and Bob sends a message to Alice... 981: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 982: MsgStanza = escalus:wait_for_stanza(Alice), 983: escalus:assert(is_chat_message, [MsgBody], MsgStanza), 984: 985: % ...and Alice terminates connection without acking anything... 986: escalus_client:kill_connection(Config, Alice), 987: 988: % ...and reconnects with session replacement. 989: Alice2 = connect_spec(AliceSpec, session, manual), 990: 991: % THEN Alice receives (without sending initial presence): 992: % * buffered Bob's message (like above) 993: % Alice DOESN'T receive: 994: % * buffered subscription request because it is dropped by ejabberd_sm 995: % because it's treated like repeated sub request to bare JID, so it's not 996: % processed by any sub req handler (like mod_roster) 997: % * buffered available presence from Alice - because it is addressed to another SID 998: % and Alice2 is a brand new session 999: escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), 1000: sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), 1001: escalus_assert:has_no_stanzas(Alice2), 1002: 1003: %% Only once an initial presence is sent, a subscription request is sent 1004: send_initial_presence(Alice2), 1005: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1006: escalus:wait_for_stanzas(Alice2, 2)), 1007: 1008: sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), 1009: escalus_assert:has_no_stanzas(Alice2), 1010: 1011: escalus_connection:stop(Alice2) 1012: end). 1013: 1014: %% This is a regression test for a bug, due to which messages sent to old session 1015: %% in a middle of state handover were not appended properly to SM buffer. 1016: %% Scenario to reproduce: 1017: %% 1. Online Bob and Alice 1018: %% 2. Alice kills the connection 1019: %% 3. Alice's session is suspended 1020: %% 4. Alice resumes session with new connection. At this moment new session is still not 1021: %% present in session table. `resume` request is stuck in old proc mailbox. 1022: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now 1023: %% old session has two messages in mailbox: `resume` and XML from Bob 1024: %% 6. We resume old process and it begins session handover 1025: %% 7. Bob's message is appended to SM buffer in "flush" step 1026: %% 8. With bug fixed, the message is retransmitted properly 1027: messages_are_properly_flushed_during_resumption(Config) -> 1028: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1029: % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended 1030: Alice = connect_fresh(Config, alice, sr_presence), 1031: SMH = escalus_connection:get_sm_h(Alice), 1032: escalus_client:kill_connection(Config, Alice), 1033: %% The receiver process would stop now 1034: C2SPid = mongoose_helper:get_session_pid(Alice), 1035: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1036: 1037: sm_helper:wait_for_queue_length(C2SPid, 0), 1038: ok = rpc(mim(), sys, suspend, [C2SPid]), 1039: 1040: % WHEN new session requests resumption 1041: % we wait until that old session has resumption request enqueued; 1042: % we need it to ensure the order of messages: resume first, Bob's chat second. 1043: % Actual wait and message sent by Bob is done in separate process 1044: % because new client start will block until old process is resumed 1045: 1046: MsgBody = <<"flush-regression">>, 1047: spawn_link(fun() -> 1048: sm_helper:wait_for_queue_length(C2SPid, 1), 1049: 1050: % Bob sends a message... 1051: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1052: 1053: % ...we ensure that a message is enqueued in Alice's session... 1054: % (2 messages = resume request + Bob's message) 1055: sm_helper:wait_for_queue_length(C2SPid, 2), 1056: 1057: % ...and old process is resumed. 1058: ok = rpc(mim(), sys, resume, [C2SPid]) 1059: end), 1060: Alice2 = connect_resume(Alice, SMH), 1061: % THEN Alice's new session receives Bob's message 1062: RecvMsg = escalus:wait_for_stanza(Alice2), 1063: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1064: end). 1065: 1066: messages_are_properly_flushed_during_resumption_p1_fsm_old(Config) -> 1067: %% the same as messages_are_properly_flushed_during_resumption, 1068: %% but tests that buffered by p1_fsm_old messages are delivered 1069: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1070: Alice = connect_fresh(Config, alice, sr_presence), 1071: SMH = escalus_connection:get_sm_h(Alice), 1072: escalus_client:kill_connection(Config, Alice), 1073: C2SPid = mongoose_helper:get_session_pid(Alice), 1074: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1075: ok = rpc(mim(), sys, suspend, [C2SPid]), 1076: 1077: %% send some dummy event. ignored by c2s but ensures that 1078: %% p1_old_fsm buffers the messages, sent after this one 1079: rpc(mim(), p1_fsm_old, send_all_state_event, [C2SPid, dummy_event]), 1080: 1081: MsgBody = <<"flush-regression">>, 1082: spawn_link(fun() -> 1083: sm_helper:wait_for_queue_length(C2SPid, 2), 1084: 1085: % Bob sends a message... 1086: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1087: 1088: % ...we ensure that a message is enqueued in Alice's session... 1089: % (2 messages = resume request + Bob's message) 1090: sm_helper:wait_for_queue_length(C2SPid, 3), 1091: 1092: % ...and old process is resumed. 1093: ok = rpc(mim(), sys, resume, [C2SPid]) 1094: end), 1095: Alice2 = connect_resume(Alice, SMH), 1096: % THEN Alice's new session receives Bob's message 1097: RecvMsg = escalus:wait_for_stanza(Alice2), 1098: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1099: end). 1100: 1101: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> 1102: Alice = connect_fresh(Config, alice, session, manual), 1103: %% Should not crash anything! 1104: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 1105: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1106: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1107: escalus_connection:stop(Alice). 1108: 1109: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> 1110: Alice = connect_fresh(Config, alice, session, manual), 1111: %% Should not crash anything! 1112: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 1113: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1114: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1115: escalus_connection:stop(Alice). 1116: 1117: %%-------------------------------------------------------------------- 1118: %% Helpers 1119: %%-------------------------------------------------------------------- 1120: start_hook_listener(Resource) -> 1121: TestCasePid = self(), 1122: rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). 1123: 1124: stop_hook_listener(HookExtra) -> 1125: rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]). 1126: 1127: rpc_start_hook_handler(TestCasePid, User, HostType) -> 1128: LUser = jid:nodeprep(User), 1129: Extra = #{luser => LUser, pid => TestCasePid}, 1130: gen_hook:add_handler(unacknowledged_message, HostType, 1131: fun ?MODULE:hook_handler_fn/3, 1132: Extra, 50), 1133: Extra. 1134: 1135: rpc_stop_hook_handler(HookExtra, HostType) -> 1136: gen_hook:delete_handler(unacknowledged_message, HostType, 1137: fun ?MODULE:hook_handler_fn/3, 1138: HookExtra, 50). 1139: 1140: hook_handler_fn(Acc, 1141: #{args := [Jid]} = _Params, 1142: #{luser := LUser, pid := TestCasePid} = _Extra) -> 1143: {U, _S, R} = jid:to_lower(Jid), 1144: case U of 1145: LUser -> 1146: Counter = mongoose_acc:get(sm_test, counter, 0, Acc), 1147: El = mongoose_acc:element(Acc), 1148: TestCasePid ! {sm_test, Counter, R, El}, 1149: {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)}; 1150: _ -> {ok, Acc} 1151: end. 1152: 1153: wait_for_unacked_msg_hook(Counter, Res, Timeout) -> 1154: receive 1155: {sm_test, AccCounter, Resource, Stanza} = Msg -> 1156: ?assertEqual(Counter, AccCounter, Msg), 1157: ?assertEqual(Res, Resource, Msg), 1158: Stanza 1159: after Timeout -> 1160: timeout 1161: end. 1162: 1163: is_chat(Content) -> 1164: fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end. 1165: 1166: is_presence(Type) -> 1167: fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end. 1168: 1169: three_texts() -> 1170: [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>]. 1171: 1172: %%-------------------------------------------------------------------- 1173: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" 1174: %%-------------------------------------------------------------------- 1175: 1176: regression_ns() -> 1177: <<"regression">>. 1178: 1179: register_handler() -> 1180: HostType = host_type(), 1181: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 1182: [HostType, regression_ns(), ejabberd_sm, 1183: fun ?MODULE:regression_handler/5, #{}, one_queue]). 1184: 1185: unregister_handler() -> 1186: HostType = host_type(), 1187: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 1188: [HostType, regression_ns(), ejabberd_sm]). 1189: 1190: regression_handler(Acc, _From, _To, IQ, _Extra) -> 1191: %% A bit of a hack - will no longer work when the SID format changes 1192: {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), 1193: erlang:monitor(process, Pid), 1194: receive 1195: {'DOWN', _, _, _, _} -> 1196: ok 1197: after 1198: 10000 -> 1199: error({c2s_not_stopped_after_timeout, Pid}) 1200: end, 1201: %% We avoid another race condition - there is a short window where user session 1202: %% is not registered in ejabberd_sm: between old process termination and the moment 1203: %% when the new process stores new session in memory. It should be fixed separately. 1204: wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100), 1205: {Acc, jlib:make_result_iq_reply(IQ)}. 1206: 1207: wait_for_session(JID, Retries, SleepTime) -> 1208: case ejabberd_sm:get_session(JID) of 1209: offline -> 1210: timer:sleep(SleepTime), 1211: wait_for_session(JID, Retries - 1, SleepTime); 1212: _ -> 1213: ok 1214: end. 1215: 1216: maybe_ack_initial_presence(Alice, ack) -> 1217: ack_initial_presence(Alice); 1218: maybe_ack_initial_presence(_Alice, no_ack) -> 1219: ok.