1: %% Stream Management tests 2: -module(sm_SUITE). 3: 4: -compile([export_all, nowarn_export_all]). 5: -include_lib("exml/include/exml.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: -include_lib("escalus/include/escalus.hrl"). 8: -include_lib("common_test/include/ct.hrl"). 9: 10: %% Injected code callbacks 11: -export([rpc_start_hook_handler/3, 12: rpc_stop_hook_handler/2, 13: hook_handler_fn/3, 14: regression_handler/5]). 15: 16: -export([rpc_start_filter_hook_handler/3, 17: rpc_stop_filter_hook_handler/2, 18: filter_hook_handler_fn/3]). 19: 20: -import(distributed_helper, [mim/0, 21: require_rpc_nodes/1, 22: rpc/4]). 23: 24: -import(domain_helper, [host_type/0]). 25: 26: -import(sm_helper, [connect_fresh/3, 27: connect_fresh/4, 28: connect_spec/2, 29: connect_spec/3, 30: connect_same/2, 31: connect_same/3, 32: connect_resume/2, 33: process_initial_stanza/1, 34: send_initial_presence/1, 35: get_ack/1, 36: ack_initial_presence/1]). 37: 38: -define(MOD_SM, mod_stream_management). 39: -define(CONSTRAINT_CHECK_TIMEOUT, 5000). 40: -define(LONG_TIMEOUT, 3600). 41: -define(SHORT_TIMEOUT, 1). 42: -define(SMALL_SM_BUFFER, 3). 43: 44: %%-------------------------------------------------------------------- 45: %% Suite configuration 46: %%-------------------------------------------------------------------- 47: 48: suite() -> 49: require_rpc_nodes([mim]) ++ escalus:suite(). 50: 51: all() -> 52: ct_helper:groups_to_all(groups()). 53: 54: groups() -> 55: P = [parallel], 56: [{parallel, P, parallel_cases()}, 57: {parallel_manual_ack_freq_1, P, parallel_manual_ack_freq_1_cases()}, 58: {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, 59: {stale_h, [], stale_h_cases()}, 60: {parallel_unacknowledged_message_hook, P, parallel_unacknowledged_message_hook_cases()}]. 61: 62: parallel_cases() -> 63: [server_announces_sm, 64: server_enables_sm_before_session, 65: server_enables_sm_after_session, 66: server_returns_failed_after_start, 67: server_returns_failed_after_auth, 68: server_enables_resumption, 69: client_enables_sm_twice_fails_with_correct_error_stanza, 70: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza, 71: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session, 72: basic_ack, 73: h_ok_before_session, 74: h_ok_after_session_enabled_before_session, 75: h_ok_after_session_enabled_after_session, 76: h_ok_after_a_chat, 77: h_non_given_closes_stream_gracefully, 78: resend_unacked_on_reconnection, 79: session_established, 80: wait_for_resumption, 81: resume_session, 82: resume_session_with_wrong_h_does_not_leak_sessions, 83: resume_session_with_wrong_sid_returns_item_not_found, 84: resume_session_with_wrong_namespace_is_a_noop, 85: resume_dead_session_results_in_item_not_found, 86: resume_session_kills_old_C2S_gracefully, 87: aggressively_pipelined_resume, 88: replies_are_processed_by_resumed_session, 89: subscription_requests_are_buffered_properly, 90: messages_are_properly_flushed_during_resumption, 91: messages_are_properly_flushed_during_resumption_p1_fsm_old]. 92: 93: parallel_manual_ack_freq_1_cases() -> 94: [client_acks_more_than_sent, 95: too_many_unacked_stanzas, 96: resend_unacked_after_resume_timeout, 97: resume_session_state_send_message_with_ack, 98: resume_session_state_send_message_without_ack, 99: resume_session_state_stop_c2s, 100: server_requests_ack_after_session, 101: resend_more_offline_messages_than_buffer_size, 102: server_requests_ack]. 103: 104: manual_ack_freq_2_cases() -> 105: [server_requests_ack_freq_2]. 106: 107: stale_h_cases() -> 108: [resume_expired_session_returns_correct_h, 109: gc_repeat_after_never_means_no_cleaning, 110: gc_repeat_after_timeout_does_clean]. 111: 112: stream_mgmt_disabled_cases() -> 113: [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt, 114: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption]. 115: 116: manual_ack_freq_long_session_timeout_cases() -> 117: [preserve_order]. 118: 119: parallel_unacknowledged_message_hook_cases() -> 120: [unacknowledged_message_hook_bounce, 121: unacknowledged_message_hook_offline, 122: unacknowledged_message_hook_resume, 123: unacknowledged_message_hook_filter]. 124: 125: %%-------------------------------------------------------------------- 126: %% Init & teardown 127: %%-------------------------------------------------------------------- 128: 129: init_per_suite(Config) -> 130: NewConfig = dynamic_modules:save_modules(host_type(), Config), 131: NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), 132: mongoose_helper:inject_module(?MODULE), 133: escalus:init_per_suite(NewConfigWithSM). 134: 135: end_per_suite(Config) -> 136: escalus_fresh:clean(), 137: dynamic_modules:restore_modules(Config), 138: escalus:end_per_suite(Config). 139: 140: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; 141: Group =:= manual_ack_freq_long_session_timeout; 142: Group =:= parallel_manual_ack_freq_1; 143: Group =:= manual_ack_freq_2 -> 144: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 145: Config; 146: init_per_group(stale_h, Config) -> 147: Config; 148: init_per_group(stream_mgmt_disabled, Config) -> 149: dynamic_modules:stop(host_type(), ?MOD_SM), 150: rpc(mim(), mnesia, delete_table, [sm_session]), 151: Config; 152: init_per_group(Group, Config) -> 153: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 154: Config. 155: 156: end_per_group(_Group, _Config) -> 157: ok. 158: 159: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> 160: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 161: escalus:init_per_testcase(CN, Config); 162: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; 163: CN =:= gc_repeat_after_timeout_does_clean -> 164: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 165: Config2 = register_some_smid_h(Config), 166: escalus:init_per_testcase(CN, Config2); 167: init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> 168: escalus:init_per_testcase(CN, Config); 169: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 170: register_handler(), 171: escalus:init_per_testcase(CN, Config); 172: init_per_testcase(CaseName, Config) -> 173: escalus:init_per_testcase(CaseName, Config). 174: 175: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; 176: CN =:= gc_repeat_after_never_means_no_cleaning; 177: CN =:= gc_repeat_after_timeout_does_clean -> 178: Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]), 179: rpc(mim(), ejabberd_sup, stop_child, [Name]), 180: escalus:end_per_testcase(CN, Config); 181: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 182: unregister_handler(), 183: escalus:end_per_testcase(CN, Config); 184: end_per_testcase(CaseName, Config) -> 185: escalus:end_per_testcase(CaseName, Config). 186: 187: %% Module configuration per group (in case of stale_h group it is per testcase) 188: 189: required_modules(Scope, Name) -> 190: SMConfig = case required_sm_opts(Scope, Name) of 191: stopped -> stopped; 192: ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts) 193: end, 194: [{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}, 195: {mod_offline, config_parser_helper:mod_config(mod_offline, #{})}]. 196: 197: required_sm_opts(group, parallel) -> 198: #{ack_freq => never}; 199: required_sm_opts(group, parallel_manual_ack_freq_1) -> 200: #{ack_freq => 1, 201: resume_timeout => ?SHORT_TIMEOUT}; 202: required_sm_opts(group, manual_ack_freq_2) -> 203: #{ack_freq => 2}; 204: required_sm_opts(group, stream_mgmt_disabled) -> 205: stopped; 206: required_sm_opts(group, parallel_unacknowledged_message_hook) -> 207: #{ack_freq => 1}; 208: required_sm_opts(group, manual_ack_freq_long_session_timeout) -> 209: #{ack_freq => 1, buffer_max => 1000}; 210: required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> 211: #{ack_freq => 1, 212: resume_timeout => ?SHORT_TIMEOUT, 213: stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)}; 214: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> 215: #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)}; 216: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> 217: #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}. 218: 219: common_sm_opts() -> 220: #{buffer_max => ?SMALL_SM_BUFFER}. 221: 222: stale_h(RepeatAfter, Geriatric) -> 223: #{enabled => true, 224: repeat_after => RepeatAfter, 225: geriatric => Geriatric}. 226: 227: make_smid() -> 228: base64:encode(crypto:strong_rand_bytes(21)). 229: 230: register_smid(IntSmidId) -> 231: S = {SMID = make_smid(), IntSmidId}, 232: ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), 233: S. 234: 235: register_some_smid_h(Config) -> 236: TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)), 237: [{smid_test, TestSmids} | Config]. 238: 239: %%-------------------------------------------------------------------- 240: %% Tests 241: %%-------------------------------------------------------------------- 242: 243: server_announces_sm(Config) -> 244: AliceSpec = escalus_fresh:freshen_spec(Config, alice), 245: {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, 246: [start_stream]), 247: true = escalus_session:can_use_stream_management(Props, Features). 248: 249: 250: server_enables_sm_before_session(Config) -> 251: connect_fresh(Config, alice, sm_after_bind). 252: 253: server_enables_sm_after_session(Config) -> 254: connect_fresh(Config, alice, sm_after_session). 255: 256: server_returns_failed_after_start(Config) -> 257: Alice = connect_fresh(Config, alice, before_auth), 258: server_returns_failed(Alice). 259: 260: server_returns_failed_after_auth(Config) -> 261: Alice = connect_fresh(Config, alice, auth), 262: server_returns_failed(Alice). 263: 264: server_enables_resumption(Config) -> 265: Alice = connect_fresh(Config, alice, sr_presence), 266: escalus_connection:stop(Alice). 267: 268: server_returns_failed(Alice) -> 269: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 270: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 271: escalus_connection:get_stanza(Alice, enable_sm_failed)). 272: 273: client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> 274: Alice = connect_fresh(Config, alice, sm_before_session), 275: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 276: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 277: escalus_connection:get_stanza(Alice, enable_sm_failed)), 278: escalus:assert(is_stream_end, 279: escalus_connection:get_stanza(Alice, enable_sm_failed)), 280: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 281: 282: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> 283: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 284: Alice = connect_fresh(Config, alice, sr_presence), 285: SMH = escalus_connection:get_sm_h(Alice), 286: %% WHEN USER RESUMES SESSION FROM NEW CLIENT 287: Alice2 = connect_resume(Alice, SMH), 288: process_initial_stanza(Alice2), 289: %% THEN: Old session is gracefully closed with the correct error stanza 290: escalus:assert(is_stream_error, [<<"conflict">>, <<>>], 291: escalus_connection:get_stanza(Alice, close_old_stream)), 292: escalus:assert(is_stream_end, 293: escalus_connection:get_stanza(Alice, close_old_stream)), 294: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), 295: true = escalus_connection:is_connected(Alice2), 296: escalus_connection:stop(Alice2). 297: 298: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> 299: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 300: Alice = connect_fresh(Config, alice, sr_presence), 301: %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT 302: Alice2 = sm_helper:kill_and_connect_resume(Alice), 303: process_initial_stanza(Alice2), 304: %% THEN new session does not have any message rerouted 305: false = escalus_client:has_stanzas(Alice2), 306: true = escalus_connection:is_connected(Alice2), 307: escalus_connection:stop(Alice2). 308: 309: basic_ack(Config) -> 310: Alice = connect_fresh(Config, alice, sm_after_session), 311: escalus_connection:send(Alice, escalus_stanza:roster_get()), 312: escalus:assert(is_roster_result, 313: escalus_connection:get_stanza(Alice, roster_result)), 314: escalus_connection:send(Alice, escalus_stanza:sm_request()), 315: escalus:assert(is_sm_ack, 316: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 317: 318: %% Test that "h" value is valid when: 319: %% - SM is enabled *before* the session is established 320: %% - <r/> is sent *before* the session is established 321: h_ok_before_session(Config) -> 322: Alice = connect_fresh(Config, alice, sm_after_bind), 323: escalus_connection:send(Alice, escalus_stanza:sm_request()), 324: escalus:assert(is_sm_ack, [0], 325: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 326: 327: %% Test that "h" value is valid when: 328: %% - SM is enabled *before* the session is established 329: %% - <r/> is sent *after* the session is established 330: h_ok_after_session_enabled_before_session(Config) -> 331: Alice = connect_fresh(Config, alice, sm_before_session), 332: escalus_connection:send(Alice, escalus_stanza:sm_request()), 333: escalus:assert(is_sm_ack, [1], 334: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 335: 336: %% Test that "h" value is valid when: 337: %% - SM is enabled *after* the session is established 338: %% - <r/> is sent *after* the session is established 339: h_ok_after_session_enabled_after_session(Config) -> 340: Alice = connect_fresh(Config, alice, sm_after_session), 341: escalus_connection:send(Alice, escalus_stanza:roster_get()), 342: escalus:assert(is_roster_result, 343: escalus_connection:get_stanza(Alice, roster_result)), 344: escalus_connection:send(Alice, escalus_stanza:sm_request()), 345: escalus:assert(is_sm_ack, [1], 346: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 347: 348: %% Test that "h" value is valid after exchanging a few messages. 349: h_ok_after_a_chat(ConfigIn) -> 350: Config = escalus_users:update_userspec(ConfigIn, alice, 351: stream_management, true), 352: escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> 353: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), 354: escalus:assert(is_chat_message, [<<"Hi, Bob!">>], 355: escalus:wait_for_stanza(Bob)), 356: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 357: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 358: escalus:wait_for_stanza(Alice)), 359: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), 360: escalus:assert(is_chat_message, [<<"How's life?">>], 361: escalus:wait_for_stanza(Alice)), 362: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), 363: escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], 364: escalus:wait_for_stanza(Bob)), 365: escalus:send(Alice, escalus_stanza:sm_request()), 366: escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), 367: %% Ack, so that unacked messages don't go into offline store. 368: escalus:send(Alice, escalus_stanza:sm_ack(3)) 369: end). 370: 371: h_non_given_closes_stream_gracefully(ConfigIn) -> 372: AStanza = #xmlel{name = <<"a">>, 373: attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, 374: Config = escalus_users:update_userspec(ConfigIn, alice, 375: stream_management, true), 376: escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> 377: C2SPid = mongoose_helper:get_session_pid(Alice), 378: escalus:send(Alice, AStanza), 379: escalus:assert(is_stream_error, 380: [<<"policy-violation">>, <<>>], 381: escalus:wait_for_stanza(Alice)), 382: mongoose_helper:wait_for_pid_to_die(C2SPid), 383: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 384: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) 385: end). 386: 387: client_acks_more_than_sent(Config) -> 388: Alice = connect_fresh(Config, alice, sm_after_session), 389: escalus:send(Alice, escalus_stanza:sm_ack(5)), 390: StreamErrorStanza = escalus:wait_for_stanza(Alice), 391: %% Assert "undefined-condition" children 392: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), 393: %% Assert "handled-count-too-high" children with correct attributes 394: HandledCountSubElement = exml_query:path(StreamErrorStanza, 395: [{element_with_ns, 396: <<"handled-count-too-high">>, 397: <<"urn:xmpp:sm:3">>}]), 398: <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), 399: <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), 400: %% Assert graceful stream end 401: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 402: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 403: 404: too_many_unacked_stanzas(Config) -> 405: Bob = connect_fresh(Config, bob, presence), 406: Alice = connect_fresh(Config, alice, sm_presence, manual), 407: get_ack(Alice), 408: [escalus:send(Bob, escalus_stanza:chat_to(Alice, 409: <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) 410: || N <- lists:seq(1,?SMALL_SM_BUFFER)], 411: escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests 412: escalus:assert(is_stream_error, [<<"resource-constraint">>, 413: <<"too many unacked stanzas">>], 414: %% wait for deferred buffer check 415: escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). 416: 417: server_requests_ack(Config) -> 418: server_requests_ack(Config, 1). 419: 420: server_requests_ack_freq_2(Config) -> 421: server_requests_ack(Config, 2). 422: 423: server_requests_ack(Config, N) -> 424: Bob = connect_fresh(Config, bob, presence), 425: Alice = connect_fresh(Config, alice, sm_presence, manual), 426: %% ack request after initial presence 427: maybe_assert_ack_request(1, N, Alice), 428: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 429: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 430: escalus:wait_for_stanza(Alice)), 431: maybe_assert_ack_request(2, N, Alice). 432: 433: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> 434: ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), 435: case StanzasRec rem AckRequests of 436: 0 -> 437: escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); 438: _ -> 439: ok 440: end, 441: StanzasRec. 442: 443: server_requests_ack_after_session(Config) -> 444: Alice = connect_fresh(Config, alice, sm_before_session, manual), 445: escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). 446: 447: resend_more_offline_messages_than_buffer_size(Config) -> 448: %% connect bob and alice 449: Bob = connect_fresh(Config, bob, presence), 450: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 451: 452: % sent some messages - more than unacked buffer size 453: MessagesToSend = ?SMALL_SM_BUFFER + 1, 454: JID = common_helper:get_bjid(AliceSpec), 455: [escalus_connection:send(Bob, escalus_stanza:chat_to(JID, integer_to_binary(I))) 456: || I <- lists:seq(1, MessagesToSend)], 457: 458: % connect alice who wants to receive all messages from offline storage 459: Alice = connect_spec(AliceSpec, sm_after_session, manual), 460: mongoose_helper:wait_for_n_offline_messages(Alice, MessagesToSend), 461: send_initial_presence(Alice), 462: 463: escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests 464: 465: escalus_connection:get_stanza(Alice, presence), 466: get_ack(Alice), % ack request 467: 468: % confirm messages + presence 469: escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), 470: % wait for check constraint message on server side 471: 472: ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), 473: false = escalus_client:has_stanzas(Alice), 474: % should not receive anything especially any stream errors 475: 476: escalus_connection:stop(Alice), 477: escalus_connection:stop(Bob). 478: 479: resend_unacked_on_reconnection(Config) -> 480: Texts = three_texts(), 481: Bob = connect_fresh(Config, bob, presence), 482: Alice = connect_fresh(Config, alice, sm_presence), 483: AliceSpec = sm_helper:client_to_spec0(Alice), 484: %% Bob sends some messages to Alice. 485: sm_helper:send_messages(Bob, Alice, Texts), 486: %% Alice receives the messages. 487: sm_helper:wait_for_messages(Alice, Texts), 488: %% Alice disconnects without acking the messages. 489: sm_helper:stop_client_and_wait_for_termination(Alice), 490: %% Messages go to the offline store. 491: %% Alice receives the messages from the offline store. 492: NewAlice = connect_spec(AliceSpec, session, manual), 493: send_initial_presence(NewAlice), 494: sm_helper:wait_for_messages(NewAlice, Texts), 495: %% Alice acks the delayed messages so they won't go again 496: %% to the offline store. 497: escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). 498: 499: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order 500: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM 501: %% and more strict tests, reproducing delays in SM and in mod_offline. 502: preserve_order(Config) -> 503: %% connect bob and alice 504: Bob = connect_fresh(Config, bob, presence), 505: Alice = connect_fresh(Config, alice, sr_presence, manual), 506: AliceSpec = sm_helper:client_to_spec(Alice), 507: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), 508: 509: %% kill alice connection 510: escalus_connection:kill(Alice), 511: C2SPid = mongoose_helper:get_session_pid(Alice), 512: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 513: 514: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), 515: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), 516: 517: NewAlice = connect_spec(AliceSpec, session, manual), 518: escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), 519: 520: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), 521: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), 522: 523: %% Without this check we will get stuff out of order 524: mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), 525: 526: send_initial_presence(NewAlice), 527: %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered 528: mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), 529: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), 530: 531: %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) 532: receive_all_ordered(NewAlice, 6), 533: 534: % replace connection 535: NewAlice2 = connect_spec(AliceSpec, session, manual), 536: % allow messages to go to the offline storage 537: mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), 538: 539: send_initial_presence(NewAlice2), 540: 541: % receves messages in correct order 542: receive_all_ordered(NewAlice2, 6), 543: 544: escalus_connection:stop(Bob), 545: escalus_connection:stop(NewAlice2). 546: 547: receive_all_ordered(Conn, Last) -> 548: receive_all_ordered(Conn, 1, Last, []). 549: 550: %% Receive messages from N to Last. 551: %% Ignores acks and presences. 552: %% Handles case when out of order and when not enough stanzas. 553: receive_all_ordered(_Conn, N, Last, Acc) when N > Last -> 554: Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)), 555: sm_helper:assert_messages(Acc, Texts); 556: receive_all_ordered(Conn, N, Last, Acc) -> 557: Stanzas = escalus:wait_for_stanzas(Conn, 1), 558: case Stanzas of 559: [#xmlel{name = <<"message">>}] -> 560: receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas); 561: [_] -> %% Ack or presence 562: receive_all_ordered(Conn, N, Last, Acc); 563: [] -> 564: ct:fail({timeout_waiting, N, Acc}) 565: end. 566: 567: resend_unacked_after_resume_timeout(Config) -> 568: %% connect bob and alice 569: Bob = connect_fresh(Config, bob, presence), 570: Alice = connect_fresh(Config, alice, sr_presence), 571: AliceSpec = sm_helper:client_to_spec(Alice), 572: 573: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 574: %% kill alice connection 575: escalus_connection:kill(Alice), 576: 577: %% ensure there is no session 578: C2SPid = mongoose_helper:get_session_pid(Alice), 579: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 580: 581: %% alice come back and receives unacked message 582: NewAlice = connect_spec(AliceSpec, session), 583: send_initial_presence(NewAlice), 584: 585: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], 586: escalus:wait_for_stanzas(NewAlice, 2)), 587: 588: escalus_connection:stop(Bob), 589: escalus_connection:stop(NewAlice). 590: 591: resume_expired_session_returns_correct_h(Config) -> 592: %% connect bob and alice 593: Bob = connect_fresh(Config, bob, sr_presence), 594: Alice = connect_fresh(Config, alice, sr_presence, manual), 595: get_ack(Alice), 596: 597: %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge 598: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 599: escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), 600: %% alice comes back, but too late, so resumption doesn't work, 601: %% but she receives the previous h = 1 anyway 602: %% NewAlice is also manual ack 603: NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), 604: FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), 605: <<"1">> = exml_query:attr(FailedResumption, <<"h">>), 606: %% And we can continue with bind and session 607: escalus_session:session(escalus_session:bind(NewAlice)), 608: send_initial_presence(NewAlice), 609: Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), 610: escalus_connection:get_stanza(NewAlice, {msg, 2})], 611: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), 612: escalus_connection:stop(Bob), 613: escalus_connection:stop(NewAlice). 614: 615: gc_repeat_after_never_means_no_cleaning(Config) -> 616: [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), 617: {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), 618: {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), 619: {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). 620: 621: gc_repeat_after_timeout_does_clean(Config) -> 622: [{SMID1, _} | _ ] = ?config(smid_test, Config), 623: mongoose_helper:wait_until(fun() -> 624: rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) 625: end, 626: {error, smid_not_found}, 627: #{name => smid_garbage_collected}). 628: 629: resume_session_state_send_message_without_ack(Config) -> 630: resume_session_state_send_message_generic(Config, no_ack). 631: 632: resume_session_state_send_message_with_ack(Config) -> 633: resume_session_state_send_message_generic(Config, ack). 634: 635: resume_session_state_send_message_generic(Config, AckInitialPresence) -> 636: %% connect bob and alice 637: Bob = connect_fresh(Config, bob, presence), 638: Alice = connect_fresh(Config, alice, sr_presence, manual), 639: maybe_ack_initial_presence(Alice, AckInitialPresence), 640: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 641: %% kill alice connection 642: C2SPid = mongoose_helper:get_session_pid(Alice), 643: escalus_connection:kill(Alice), 644: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 645: sm_helper:assert_alive_resources(Alice, 1), 646: 647: %% send some messages and check if c2s can handle it 648: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 649: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 650: %% suspend the process to ensure that Alice has enough time to reconnect, 651: %% before resumption timeout occurs. 652: ok = rpc(mim(), sys, suspend, [C2SPid]), 653: 654: %% alice comes back and receives unacked message 655: NewAlice = connect_same(Alice, presence), 656: %% now we can resume c2s process of the old connection 657: %% and let it process session resumption timeout 658: ok = rpc(mim(), sys, resume, [C2SPid]), 659: Stanzas = escalus:wait_for_stanzas(NewAlice, 3), 660: 661: % what about order ? 662: % alice receive presence from herself and 3 unacked messages from bob 663: escalus_new_assert:mix_match([is_chat(<<"msg-1">>), 664: is_chat(<<"msg-2">>), 665: is_chat(<<"msg-3">>)], 666: Stanzas), 667: escalus_connection:stop(Bob), 668: escalus_connection:stop(NewAlice). 669: 670: %%for instance it can be done by mod ping 671: resume_session_state_stop_c2s(Config) -> 672: Bob = connect_fresh(Config, bob, presence), 673: Alice = connect_fresh(Config, alice, sr_presence, manual), 674: 675: get_ack(Alice), 676: ack_initial_presence(Alice), 677: 678: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 679: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), 680: 681: %% get pid of c2s 682: C2SPid = mongoose_helper:get_session_pid(Alice), 683: %% Wait c2s process to process our presence ack. 684: %% Otherwise, we can receive two initial presences sometimes. 685: sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), 686: 687: % kill alice connection 688: escalus_connection:kill(Alice), 689: % session should be alive 690: sm_helper:assert_alive_resources(Alice, 1), 691: rpc(mim(), ejabberd_c2s, stop, [C2SPid]), 692: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 693: %% suspend the process to ensure that Alice has enough time to reconnect, 694: %% before resumption timeout occurs. 695: ok = rpc(mim(), sys, suspend, [C2SPid]), 696: 697: %% alice comes back and receives unacked message 698: NewAlice = connect_same(Alice, presence, manual), 699: %% now we can resume c2s process of the old connection 700: %% and let it process session resumption timeout 701: ok = rpc(mim(), sys, resume, [C2SPid]), 702: 703: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), 704: escalus_connection:stop(Bob), 705: escalus_connection:stop(NewAlice). 706: 707: %% This test only verifies the validity of helpers (get_session_pid, 708: %% get_c2s_state_name) written for wait_for_resumption 709: %% testcase. 710: session_established(Config) -> 711: Alice = connect_fresh(Config, alice, presence), 712: C2SPid = mongoose_helper:get_session_pid(Alice), 713: session_established = mongoose_helper:get_c2s_state_name(C2SPid), 714: escalus_connection:stop(Alice). 715: 716: %% Ensure that after a violent disconnection, 717: %% the c2s waits for resumption (but don't resume yet). 718: wait_for_resumption(Config) -> 719: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 720: Bob = connect_fresh(Config, bob, session), 721: Texts = three_texts(), 722: {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 723: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session). 724: 725: unacknowledged_message_hook_filter(Config) -> 726: FilterText = <<"filter">>, 727: Bob = connect_fresh(Config, bob, presence), 728: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 729: Resource = proplists:get_value(username, AliceSpec), 730: HookHandlerExtra = start_filter_hook_listener(FilterText, Resource), 731: Alice = connect_spec([{resource, Resource} | AliceSpec], sr_presence, manual), 732: %% Ack the presence stanza 733: get_ack(Alice), 734: ack_initial_presence(Alice), 735: Messages = [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>, <<"msg-4">>], 736: All = [<<"msg-1">>, FilterText, <<"msg-2">>, FilterText, <<"msg-3">>, <<"msg-4">>], 737: [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, Body)) || Body <- All ], 738: %% kill alice connection 739: C2SPid = mongoose_helper:get_session_pid(Alice), 740: escalus_connection:kill(Alice), 741: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 742: sm_helper:assert_alive_resources(Alice, 1), 743: %% ensure second C2S is registered so all the messages are bounced properly 744: NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual), 745: send_initial_presence(NewAlice), 746: sm_helper:wait_for_resource_count(NewAlice, 2), 747: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 748: %% verify that the filtered message is never received 749: verify_no_receive_filtertext(NewAlice, FilterText, Messages), 750: stop_hook_listener(HookHandlerExtra), 751: escalus_connection:stop(Bob). 752: 753: verify_no_receive_filtertext(_, _, []) -> 754: ok; 755: verify_no_receive_filtertext(Client, FilterText, Messages) -> 756: case escalus:wait_for_stanzas(Client, 1, 500) of 757: [] -> ct:fail("Messages pending: ~p~n", [Messages]); 758: [St] -> 759: case exml_query:path(St, [{element, <<"body">>}, cdata]) of 760: undefined -> 761: verify_no_receive_filtertext(Client, FilterText, Messages); 762: FilterText -> 763: ct:fail("Composing forwarded from a different c2s process"); 764: Message -> 765: Pred = fun(Msg) -> Msg =/= Message end, 766: {Pending, _} = lists:partition(Pred, Messages), 767: verify_no_receive_filtertext(Client, FilterText, Pending) 768: end 769: end. 770: 771: unacknowledged_message_hook_resume(Config) -> 772: unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). 773: 774: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> 775: NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 776: send_initial_presence(NewAlice), 777: {Resource, NewAlice}. 778: 779: unacknowledged_message_hook_bounce(Config) -> 780: unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). 781: 782: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> 783: NewResource = <<"new_", Resource/binary>>, 784: NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), 785: NewAlice = connect_spec(NewSpec, sr_session, manual), 786: send_initial_presence(NewAlice), 787: %% ensure second C2S is registered so all the messages are bounced properly 788: sm_helper:wait_for_resource_count(NewAlice, 2), 789: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 790: {NewResource, NewAlice}. 791: 792: unacknowledged_message_hook_offline(Config) -> 793: unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). 794: 795: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> 796: C2SRef = erlang:monitor(process, C2SPid), 797: %%reset the session, so old C2S process is stopped 798: NewAlice = connect_spec(AliceSpec, sr_session, manual), 799: %% wait for old C2S termination before send presence. other way 800: %% some of the latest unacknowledged messages can be bounced to 801: %% the new C2S process instead of going to the mod_offline storage. 802: %% looks like all the unacknowledged messages arrive to the new 803: %% C2S, but the message sequence is broken (the bounced messages 804: %% delivered before the messages from the mod_offline storage) 805: sm_helper:wait_for_process_termination(C2SRef), 806: send_initial_presence(NewAlice), 807: {Resource, NewAlice}. 808: 809: unacknowledged_message_hook_common(RestartConnectionFN, Config) -> 810: %% connect bob and alice 811: Bob = connect_fresh(Config, bob, presence), 812: 813: AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), 814: Resource = proplists:get_value(username, AliceSpec0), 815: AliceSpec = [{resource, Resource} | AliceSpec0], 816: HookHandlerExtra = start_hook_listener(Resource), 817: Alice = connect_spec(AliceSpec, sr_presence, manual), 818: %% Ack the presence stanza 819: get_ack(Alice), 820: ack_initial_presence(Alice), 821: 822: SMID = sm_helper:client_to_smid(Alice), 823: 824: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 825: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 826: %% kill alice connection 827: C2SPid = mongoose_helper:get_session_pid(Alice), 828: escalus_connection:kill(Alice), 829: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 830: sm_helper:assert_alive_resources(Alice, 1), 831: 832: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), 833: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), 834: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 835: 836: %% send some messages and check if c2s can handle it 837: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 838: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), 839: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), 840: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), 841: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 842: 843: %% alice comes back and receives unacked message 844: {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), 845: 846: mongoose_helper:wait_until( 847: fun() -> 848: Stanza = escalus_connection:get_stanza(NewAlice, msg), 849: escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), 850: ok 851: end, ok), 852: 853: NewC2SPid = mongoose_helper:get_session_pid(NewAlice), 854: escalus_connection:kill(NewAlice), 855: mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session), 856: 857: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 858: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 859: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 860: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 861: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 862: stop_hook_listener(HookHandlerExtra), 863: escalus_connection:stop(Bob). 864: 865: resume_session(Config) -> 866: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 867: Texts = three_texts(), 868: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 869: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 870: %% Resume the session. 871: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 872: %% Alice receives the unacked messages from the previous 873: %% interrupted session. 874: sm_helper:wait_for_messages(Alice, Texts), 875: %% Alice acks the received messages. 876: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 877: escalus_connection:stop(Alice) 878: end). 879: 880: resume_session_with_wrong_h_does_not_leak_sessions(Config) -> 881: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 882: Messages = three_texts(), 883: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 884: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), 885: %% Resume the session. 886: Alice = connect_spec(AliceSpec, auth, manual), 887: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), 888: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), 889: 890: [] = sm_helper:get_user_present_resources(Alice), 891: HostType = host_type(), 892: {error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID), 893: escalus_connection:wait_for_close(Alice, timer:seconds(5)) 894: end). 895: 896: resume_session_with_wrong_sid_returns_item_not_found(Config) -> 897: session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). 898: 899: resume_session_with_wrong_namespace_is_a_noop(Config) -> 900: Alice = connect_fresh(Config, alice, auth), 901: #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), 902: Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), 903: escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), 904: escalus_assert:has_no_stanzas(Alice), 905: [] = sm_helper:get_user_present_resources(Alice), 906: true = escalus_connection:is_connected(Alice), 907: escalus_connection:stop(Alice). 908: 909: resume_dead_session_results_in_item_not_found(Config) -> 910: SMID = base64:encode(crypto:strong_rand_bytes(21)), 911: SID = {os:timestamp(), undefined}, 912: HostType = host_type(), 913: rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), 914: session_resumption_expects_item_not_found(Config, SMID). 915: 916: session_resumption_expects_item_not_found(Config, SMID) -> 917: Alice = connect_fresh(Config, alice, auth), 918: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), 919: escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), 920: [] = sm_helper:get_user_present_resources(Alice), 921: true = escalus_connection:is_connected(Alice), 922: escalus_connection:stop(Alice). 923: 924: resume_session_kills_old_C2S_gracefully(Config) -> 925: Alice = connect_fresh(Config, alice, sr_presence, manual), 926: C2SPid = mongoose_helper:get_session_pid(Alice), 927: 928: %% Monitor the C2S process and disconnect Alice. 929: MonitorRef = sm_helper:monitor_session(Alice), 930: escalus_client:kill_connection(Config, Alice), 931: 932: %% Ensure the c2s process is waiting for resumption. 933: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 934: 935: %% Resume the session. 936: NewAlice = connect_resume(Alice, 1), 937: 938: %% C2S process should die gracefully with Reason=normal. 939: sm_helper:wait_for_process_termination(MonitorRef), 940: escalus_connection:stop(NewAlice). 941: 942: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> 943: Alice = connect_spec(AliceSpec, sr_presence, manual), 944: %% Bobs sends some messages to Alice. 945: sm_helper:send_messages(Bob, Alice, Texts), 946: %% Alice receives them, but doesn't ack. 947: sm_helper:wait_for_messages(Alice, Texts), 948: %% Alice's connection is violently terminated. 949: escalus_client:kill_connection(Config, Alice), 950: C2SPid = mongoose_helper:get_session_pid(Alice), 951: SMID = sm_helper:client_to_smid(Alice), 952: {C2SPid, SMID}. 953: 954: aggressively_pipelined_resume(Config) -> 955: AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} 956: | escalus_fresh:create_fresh_user(Config, alice)], 957: UnackedMessages = three_texts(), 958: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 959: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), 960: %% Resume the session. 961: Alice = escalus_connection:connect(AliceSpec), 962: 963: Username = proplists:get_value(username, AliceSpec), 964: Password = proplists:get_value(password, AliceSpec), 965: Payload = <<0:8,Username/binary,0:8,Password/binary>>, 966: Server = proplists:get_value(server, AliceSpec), 967: 968: Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 969: Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), 970: AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 971: Resume = escalus_stanza:resume(SMID, 2), 972: 973: escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), 974: Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], 975: escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), 976: 977: escalus_connection:stop(Alice) 978: end). 979: 980: %% This is a regression test for a case when a session processes a request, which will 981: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. 982: %% Without proper handling, the reply would be rejected because the resumed session 983: %% has new SID. 984: replies_are_processed_by_resumed_session(Config) -> 985: %% GIVEN a session and registered special IQ handler (added in init_per_testcase), 986: %% that waits for old session process to terminate (at this point new process 987: %% has fully taken over) and then actually sends the reply. 988: Alice = connect_fresh(Config, alice, sr_presence), 989: 990: %% WHEN a client sends IQ request to the special handler... 991: IQReq = escalus_stanza:iq_get(regression_ns(), []), 992: escalus:send(Alice, IQReq), 993: 994: %% ... goes down and session is resumed. 995: Alice2 = sm_helper:kill_and_connect_resume(Alice), 996: 997: %% THEN the client receives the reply properly. 998: IQReply = escalus:wait_for_stanza(Alice2), 999: escalus:assert(is_iq_result, [IQReq], IQReply), 1000: escalus_connection:stop(Alice2). 1001: 1002: %% This is a regression test for a bug, which manifested in following scenario 1003: %% (due to improper presence sub requests buffering): 1004: %% 1. A is online, B is offline 1005: %% 2. A subscribes to B's presence; 1006: %% 3. B becomes online 1007: %% 4. A sends a message to B 1008: %% 5. B doesn't SM-ack the request or message, terminates the connection 1009: %% 6. B reconnects but with session *replace*, not resume 1010: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer 1011: %% 8. B doesn't receive the buffered message 1012: subscription_requests_are_buffered_properly(Config) -> 1013: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 1014: MsgBody = <<"buffered">>, 1015: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1016: % GIVEN Bob's pending subscription to Alice's presence 1017: AliceJid = common_helper:get_bjid(AliceSpec), 1018: escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), 1019: _RosterPushReq = escalus:wait_for_stanza(Bob), 1020: 1021: % WHEN Alice becomes online... 1022: Alice = connect_spec(AliceSpec, sr_session, manual), 1023: send_initial_presence(Alice), 1024: %% subscribe could come before the initial presence 1025: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1026: escalus:wait_for_stanzas(Alice, 2)), 1027: 1028: % ...and Bob sends a message to Alice... 1029: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1030: MsgStanza = escalus:wait_for_stanza(Alice), 1031: escalus:assert(is_chat_message, [MsgBody], MsgStanza), 1032: 1033: % ...and Alice terminates connection without acking anything... 1034: escalus_client:kill_connection(Config, Alice), 1035: 1036: % ...and reconnects with session replacement. 1037: Alice2 = connect_spec(AliceSpec, session, manual), 1038: 1039: % THEN Alice receives (without sending initial presence): 1040: % * buffered Bob's message (like above) 1041: % Alice DOESN'T receive: 1042: % * buffered subscription request because it is dropped by ejabberd_sm 1043: % because it's treated like repeated sub request to bare JID, so it's not 1044: % processed by any sub req handler (like mod_roster) 1045: % * buffered available presence from Alice - because it is addressed to another SID 1046: % and Alice2 is a brand new session 1047: escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), 1048: sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), 1049: escalus_assert:has_no_stanzas(Alice2), 1050: 1051: %% Only once an initial presence is sent, a subscription request is sent 1052: send_initial_presence(Alice2), 1053: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1054: escalus:wait_for_stanzas(Alice2, 2)), 1055: 1056: sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), 1057: escalus_assert:has_no_stanzas(Alice2), 1058: 1059: escalus_connection:stop(Alice2) 1060: end). 1061: 1062: %% This is a regression test for a bug, due to which messages sent to old session 1063: %% in a middle of state handover were not appended properly to SM buffer. 1064: %% Scenario to reproduce: 1065: %% 1. Online Bob and Alice 1066: %% 2. Alice kills the connection 1067: %% 3. Alice's session is suspended 1068: %% 4. Alice resumes session with new connection. At this moment new session is still not 1069: %% present in session table. `resume` request is stuck in old proc mailbox. 1070: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now 1071: %% old session has two messages in mailbox: `resume` and XML from Bob 1072: %% 6. We resume old process and it begins session handover 1073: %% 7. Bob's message is appended to SM buffer in "flush" step 1074: %% 8. With bug fixed, the message is retransmitted properly 1075: messages_are_properly_flushed_during_resumption(Config) -> 1076: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1077: % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended 1078: Alice = connect_fresh(Config, alice, sr_presence), 1079: SMH = escalus_connection:get_sm_h(Alice), 1080: escalus_client:kill_connection(Config, Alice), 1081: %% The receiver process would stop now 1082: C2SPid = mongoose_helper:get_session_pid(Alice), 1083: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1084: 1085: sm_helper:wait_for_queue_length(C2SPid, 0), 1086: ok = rpc(mim(), sys, suspend, [C2SPid]), 1087: 1088: % WHEN new session requests resumption 1089: % we wait until that old session has resumption request enqueued; 1090: % we need it to ensure the order of messages: resume first, Bob's chat second. 1091: % Actual wait and message sent by Bob is done in separate process 1092: % because new client start will block until old process is resumed 1093: 1094: MsgBody = <<"flush-regression">>, 1095: spawn_link(fun() -> 1096: sm_helper:wait_for_queue_length(C2SPid, 1), 1097: 1098: % Bob sends a message... 1099: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1100: 1101: % ...we ensure that a message is enqueued in Alice's session... 1102: % (2 messages = resume request + Bob's message) 1103: sm_helper:wait_for_queue_length(C2SPid, 2), 1104: 1105: % ...and old process is resumed. 1106: ok = rpc(mim(), sys, resume, [C2SPid]) 1107: end), 1108: Alice2 = connect_resume(Alice, SMH), 1109: % THEN Alice's new session receives Bob's message 1110: RecvMsg = escalus:wait_for_stanza(Alice2), 1111: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1112: end). 1113: 1114: messages_are_properly_flushed_during_resumption_p1_fsm_old(Config) -> 1115: %% the same as messages_are_properly_flushed_during_resumption, 1116: %% but tests that buffered by p1_fsm_old messages are delivered 1117: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1118: Alice = connect_fresh(Config, alice, sr_presence), 1119: SMH = escalus_connection:get_sm_h(Alice), 1120: escalus_client:kill_connection(Config, Alice), 1121: C2SPid = mongoose_helper:get_session_pid(Alice), 1122: mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session), 1123: ok = rpc(mim(), sys, suspend, [C2SPid]), 1124: 1125: %% send some dummy event. ignored by c2s but ensures that 1126: %% p1_old_fsm buffers the messages, sent after this one 1127: rpc(mim(), p1_fsm_old, send_all_state_event, [C2SPid, dummy_event]), 1128: 1129: MsgBody = <<"flush-regression">>, 1130: spawn_link(fun() -> 1131: sm_helper:wait_for_queue_length(C2SPid, 2), 1132: 1133: % Bob sends a message... 1134: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1135: 1136: % ...we ensure that a message is enqueued in Alice's session... 1137: % (2 messages = resume request + Bob's message) 1138: sm_helper:wait_for_queue_length(C2SPid, 3), 1139: 1140: % ...and old process is resumed. 1141: ok = rpc(mim(), sys, resume, [C2SPid]) 1142: end), 1143: Alice2 = connect_resume(Alice, SMH), 1144: % THEN Alice's new session receives Bob's message 1145: RecvMsg = escalus:wait_for_stanza(Alice2), 1146: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1147: end). 1148: 1149: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> 1150: Alice = connect_fresh(Config, alice, session, manual), 1151: %% Should not crash anything! 1152: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 1153: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1154: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1155: escalus_connection:stop(Alice). 1156: 1157: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> 1158: Alice = connect_fresh(Config, alice, session, manual), 1159: %% Should not crash anything! 1160: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 1161: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1162: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1163: escalus_connection:stop(Alice). 1164: 1165: %%-------------------------------------------------------------------- 1166: %% Helpers 1167: %%-------------------------------------------------------------------- 1168: start_hook_listener(Resource) -> 1169: TestCasePid = self(), 1170: rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). 1171: 1172: stop_hook_listener(HookExtra) -> 1173: rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]). 1174: 1175: rpc_start_hook_handler(TestCasePid, User, HostType) -> 1176: LUser = jid:nodeprep(User), 1177: Extra = #{luser => LUser, pid => TestCasePid}, 1178: gen_hook:add_handler(unacknowledged_message, HostType, 1179: fun ?MODULE:hook_handler_fn/3, 1180: Extra, 50), 1181: Extra. 1182: 1183: rpc_stop_hook_handler(HookExtra, HostType) -> 1184: gen_hook:delete_handler(unacknowledged_message, HostType, 1185: fun ?MODULE:hook_handler_fn/3, 1186: HookExtra, 50). 1187: 1188: hook_handler_fn(Acc, 1189: #{args := [Jid]} = _Params, 1190: #{luser := LUser, pid := TestCasePid} = _Extra) -> 1191: {U, _S, R} = jid:to_lower(Jid), 1192: case U of 1193: LUser -> 1194: Counter = mongoose_acc:get(sm_test, counter, 0, Acc), 1195: El = mongoose_acc:element(Acc), 1196: TestCasePid ! {sm_test, Counter, R, El}, 1197: {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)}; 1198: _ -> {ok, Acc} 1199: end. 1200: 1201: wait_for_unacked_msg_hook(Counter, Res, Timeout) -> 1202: receive 1203: {sm_test, AccCounter, Resource, Stanza} = Msg -> 1204: ?assertEqual(Counter, AccCounter, Msg), 1205: ?assertEqual(Res, Resource, Msg), 1206: Stanza 1207: after Timeout -> 1208: timeout 1209: end. 1210: 1211: start_filter_hook_listener(FilterText, Resource) -> 1212: rpc(mim(), ?MODULE, rpc_start_filter_hook_handler, [FilterText, Resource, host_type()]). 1213: 1214: stop_filter_hook_listener(HookExtra) -> 1215: rpc(mim(), ?MODULE, rpc_stop_filter_hook_handler, [HookExtra, host_type()]). 1216: 1217: rpc_start_filter_hook_handler(FilterText, User, HostType) -> 1218: LUser = jid:nodeprep(User), 1219: Extra = #{luser => LUser, filter_text => FilterText}, 1220: gen_hook:add_handler(filter_unacknowledged_messages, HostType, 1221: fun ?MODULE:filter_hook_handler_fn/3, 1222: Extra, 50), 1223: Extra. 1224: 1225: rpc_stop_filter_hook_handler(HookExtra, HostType) -> 1226: gen_hook:delete_handler(filter_unacknowledged_messages, HostType, 1227: fun ?MODULE:filter_hook_handler_fn/3, 1228: HookExtra, 50). 1229: 1230: filter_hook_handler_fn(Buffer, 1231: #{jid := Jid} = _Params, 1232: #{luser := LUser, filter_text := FilterText} = _Extra) -> 1233: {U, _} = jid:to_lus(Jid), 1234: case U of 1235: LUser -> 1236: F = fun(Acc) -> filter_text(Acc, FilterText) end, 1237: NewBuffer = lists:filter(F, Buffer), 1238: {ok, NewBuffer}; 1239: _ -> {ok, Buffer} 1240: end. 1241: 1242: filter_text(Acc, FilterText) -> 1243: case mongoose_acc:stanza_name(Acc) of 1244: <<"message">> -> 1245: El = mongoose_acc:element(Acc), 1246: FilterText =/= exml_query:path(El, [{element, <<"body">>}, cdata]); 1247: _ -> 1248: true 1249: end. 1250: 1251: is_chat(Content) -> 1252: fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end. 1253: 1254: is_presence(Type) -> 1255: fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end. 1256: 1257: three_texts() -> 1258: [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>]. 1259: 1260: %%-------------------------------------------------------------------- 1261: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" 1262: %%-------------------------------------------------------------------- 1263: 1264: regression_ns() -> 1265: <<"regression">>. 1266: 1267: register_handler() -> 1268: HostType = host_type(), 1269: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 1270: [HostType, regression_ns(), ejabberd_sm, 1271: fun ?MODULE:regression_handler/5, #{}, one_queue]). 1272: 1273: unregister_handler() -> 1274: HostType = host_type(), 1275: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 1276: [HostType, regression_ns(), ejabberd_sm]). 1277: 1278: regression_handler(Acc, _From, _To, IQ, _Extra) -> 1279: %% A bit of a hack - will no longer work when the SID format changes 1280: {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), 1281: erlang:monitor(process, Pid), 1282: receive 1283: {'DOWN', _, _, _, _} -> 1284: ok 1285: after 1286: 10000 -> 1287: error({c2s_not_stopped_after_timeout, Pid}) 1288: end, 1289: %% We avoid another race condition - there is a short window where user session 1290: %% is not registered in ejabberd_sm: between old process termination and the moment 1291: %% when the new process stores new session in memory. It should be fixed separately. 1292: wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100), 1293: {Acc, jlib:make_result_iq_reply(IQ)}. 1294: 1295: wait_for_session(JID, Retries, SleepTime) -> 1296: case ejabberd_sm:get_session(JID) of 1297: offline -> 1298: timer:sleep(SleepTime), 1299: wait_for_session(JID, Retries - 1, SleepTime); 1300: _ -> 1301: ok 1302: end. 1303: 1304: maybe_ack_initial_presence(Alice, ack) -> 1305: ack_initial_presence(Alice); 1306: maybe_ack_initial_presence(_Alice, no_ack) -> 1307: ok.