1: %% Stream Management tests 2: -module(sm_SUITE). 3: 4: -compile([export_all, nowarn_export_all]). 5: -include_lib("exml/include/exml.hrl"). 6: -include_lib("eunit/include/eunit.hrl"). 7: -include_lib("escalus/include/escalus.hrl"). 8: -include_lib("common_test/include/ct.hrl"). 9: 10: %% Injected code callbacks 11: -export([rpc_start_hook_handler/3, 12: rpc_stop_hook_handler/2, 13: hook_handler_fn/3, 14: regression_handler/5]). 15: 16: -export([rpc_start_filter_hook_handler/3, 17: rpc_stop_filter_hook_handler/2, 18: filter_hook_handler_fn/3]). 19: 20: -import(distributed_helper, [mim/0, 21: require_rpc_nodes/1, 22: rpc/4]). 23: 24: -import(domain_helper, [host_type/0]). 25: 26: -import(sm_helper, [connect_fresh/3, 27: connect_fresh/4, 28: connect_spec/2, 29: connect_spec/3, 30: connect_same/2, 31: connect_same/3, 32: connect_resume/2, 33: process_initial_stanza/1, 34: send_initial_presence/1, 35: get_ack/1, 36: ack_initial_presence/1]). 37: 38: -define(MOD_SM, mod_stream_management). 39: -define(CONSTRAINT_CHECK_TIMEOUT, 5000). 40: -define(LONG_TIMEOUT, 3600). 41: -define(SHORT_TIMEOUT, 1). 42: -define(SMALL_SM_BUFFER, 3). 43: -define(PING_REQUEST_TIMEOUT, 2). 44: -define(PING_INTERVAL, 5). 45: 46: %%-------------------------------------------------------------------- 47: %% Suite configuration 48: %%-------------------------------------------------------------------- 49: 50: suite() -> 51: require_rpc_nodes([mim]) ++ escalus:suite(). 52: 53: all() -> 54: ct_helper:groups_to_all(groups()). 55: 56: groups() -> 57: G = [ 58: % {parallel, [parallel], parallel_cases()}, 59: % {parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()}, 60: % {manual_ack_freq_2, [], manual_ack_freq_2_cases()}, 61: % {stale_h, [], stale_h_cases()}, 62: % {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()} 63: {ping, [], [ping_timeout]} 64: ], 65: ct_helper:repeat_all_until_any_fail(G, 1000). 66: 67: parallel_cases() -> 68: [server_announces_sm, 69: server_enables_sm_before_session, 70: server_enables_sm_after_session, 71: server_returns_failed_after_start, 72: server_returns_failed_after_auth, 73: server_enables_resumption, 74: client_enables_sm_twice_fails_with_correct_error_stanza, 75: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza, 76: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session, 77: basic_ack, 78: h_ok_before_session, 79: h_ok_after_session_enabled_before_session, 80: h_ok_after_session_enabled_after_session, 81: h_ok_after_a_chat, 82: h_non_given_closes_stream_gracefully, 83: resend_unacked_on_reconnection, 84: session_established, 85: wait_for_resumption, 86: resume_session, 87: resume_session_with_wrong_h_does_not_leak_sessions, 88: resume_session_with_wrong_sid_returns_item_not_found, 89: resume_session_with_wrong_namespace_is_a_noop, 90: resume_dead_session_results_in_item_not_found, 91: resume_session_kills_old_C2S_gracefully, 92: carboncopy_works, 93: carboncopy_works_after_resume, 94: aggressively_pipelined_resume, 95: replies_are_processed_by_resumed_session, 96: subscription_requests_are_buffered_properly, 97: messages_are_properly_flushed_during_resumption]. 98: 99: parallel_manual_ack_freq_1_cases() -> 100: [client_acks_more_than_sent, 101: too_many_unacked_stanzas, 102: resend_unacked_after_resume_timeout, 103: resume_session_state_send_message_with_ack, 104: resume_session_state_send_message_without_ack, 105: resume_session_state_stop_c2s, 106: server_requests_ack_after_session, 107: resend_more_offline_messages_than_buffer_size, 108: server_requests_ack]. 109: 110: manual_ack_freq_2_cases() -> 111: [server_requests_ack_freq_2]. 112: 113: stale_h_cases() -> 114: [resume_expired_session_returns_correct_h, 115: gc_repeat_after_never_means_no_cleaning, 116: gc_repeat_after_timeout_does_clean]. 117: 118: stream_mgmt_disabled_cases() -> 119: [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt, 120: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption]. 121: 122: manual_ack_freq_long_session_timeout_cases() -> 123: [preserve_order]. 124: 125: parallel_unacknowledged_message_hook_cases() -> 126: [unacknowledged_message_hook_bounce, 127: unacknowledged_message_hook_offline, 128: unacknowledged_message_hook_resume, 129: unacknowledged_message_hook_filter]. 130: 131: %%-------------------------------------------------------------------- 132: %% Init & teardown 133: %%-------------------------------------------------------------------- 134: 135: init_per_suite(Config) -> 136: NewConfig = dynamic_modules:save_modules(host_type(), Config), 137: NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true), 138: mongoose_helper:inject_module(?MODULE), 139: escalus:init_per_suite(NewConfigWithSM). 140: 141: end_per_suite(Config) -> 142: escalus_fresh:clean(), 143: dynamic_modules:restore_modules(Config), 144: escalus:end_per_suite(Config). 145: 146: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook; 147: Group =:= manual_ack_freq_long_session_timeout; 148: Group =:= parallel_manual_ack_freq_1; 149: Group =:= manual_ack_freq_2 -> 150: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 151: Config; 152: init_per_group(stale_h, Config) -> 153: Config; 154: init_per_group(stream_mgmt_disabled, Config) -> 155: dynamic_modules:stop(host_type(), ?MOD_SM), 156: rpc(mim(), mnesia, delete_table, [sm_session]), 157: Config; 158: init_per_group(ping, Config) -> 159: Config; 160: init_per_group(Group, Config) -> 161: dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)), 162: Config. 163: 164: end_per_group(_Group, _Config) -> 165: ok. 166: 167: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) -> 168: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 169: escalus:init_per_testcase(CN, Config); 170: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning; 171: CN =:= gc_repeat_after_timeout_does_clean -> 172: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 173: Config2 = register_some_smid_h(Config), 174: escalus:init_per_testcase(CN, Config2); 175: init_per_testcase(ping_timeout = CN, Config) -> 176: ok = rpc(mim(), meck, new, [mod_ping, [passthrough, no_link]]), 177: dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)), 178: escalus:init_per_testcase(CN, Config); 179: init_per_testcase(server_requests_ack_freq_2 = CN, Config) -> 180: escalus:init_per_testcase(CN, Config); 181: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 182: register_handler(), 183: escalus:init_per_testcase(CN, Config); 184: init_per_testcase(CaseName, Config) -> 185: escalus:init_per_testcase(CaseName, Config). 186: 187: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h; 188: CN =:= gc_repeat_after_never_means_no_cleaning; 189: CN =:= gc_repeat_after_timeout_does_clean -> 190: Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]), 191: rpc(mim(), ejabberd_sup, stop_child, [Name]), 192: escalus:end_per_testcase(CN, Config); 193: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) -> 194: unregister_handler(), 195: escalus:end_per_testcase(CN, Config); 196: end_per_testcase(ping_timeout = CN, Config) -> 197: rpc(mim(), meck, unload, [mod_ping]), 198: escalus:end_per_testcase(CN, Config); 199: end_per_testcase(CaseName, Config) -> 200: escalus:end_per_testcase(CaseName, Config). 201: 202: %% Module configuration per group (in case of stale_h group it is per testcase) 203: 204: required_modules(Scope, Name) -> 205: SMConfig = case required_sm_opts(Scope, Name) of 206: stopped -> stopped; 207: ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts) 208: end, 209: Backend = mongoose_helper:mnesia_or_rdbms_backend(), 210: BaseModules = [ 211: {mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)}, 212: {mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})} 213: ], 214: case Name of 215: ping_timeout -> 216: BaseModules ++ [{mod_ping, config_parser_helper:mod_config(mod_ping, mod_ping_opts())}]; 217: _ -> 218: BaseModules 219: end. 220: 221: required_sm_opts(group, parallel) -> 222: #{ack_freq => never}; 223: required_sm_opts(group, parallel_manual_ack_freq_1) -> 224: #{ack_freq => 1, 225: resume_timeout => ?LONG_TIMEOUT}; 226: required_sm_opts(group, manual_ack_freq_2) -> 227: #{ack_freq => 2}; 228: required_sm_opts(group, stream_mgmt_disabled) -> 229: stopped; 230: required_sm_opts(group, parallel_unacknowledged_message_hook) -> 231: #{ack_freq => 1}; 232: required_sm_opts(group, manual_ack_freq_long_session_timeout) -> 233: #{ack_freq => 1, buffer_max => 1000}; 234: required_sm_opts(testcase, resume_expired_session_returns_correct_h) -> 235: #{ack_freq => 1, 236: resume_timeout => ?SHORT_TIMEOUT, 237: stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)}; 238: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) -> 239: #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)}; 240: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) -> 241: #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}; 242: required_sm_opts(testcase, ping_timeout) -> 243: #{ack_freq => 1, 244: resume_timeout => ?SHORT_TIMEOUT}. 245: 246: common_sm_opts() -> 247: Backend = ct_helper:get_internal_database(), 248: #{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}. 249: 250: stale_h(RepeatAfter, Geriatric) -> 251: #{enabled => true, 252: repeat_after => RepeatAfter, 253: geriatric => Geriatric}. 254: 255: make_smid() -> 256: base64:encode(crypto:strong_rand_bytes(21)). 257: 258: register_smid(IntSmidId) -> 259: S = {SMID = make_smid(), IntSmidId}, 260: ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]), 261: S. 262: 263: register_some_smid_h(Config) -> 264: TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)), 265: [{smid_test, TestSmids} | Config]. 266: 267: mod_ping_opts() -> 268: #{send_pings => true, 269: ping_interval => ?PING_INTERVAL, 270: ping_req_timeout => ?PING_REQUEST_TIMEOUT, 271: timeout_action => kill}. 272: 273: %%-------------------------------------------------------------------- 274: %% Tests 275: %%-------------------------------------------------------------------- 276: 277: server_announces_sm(Config) -> 278: AliceSpec = escalus_fresh:freshen_spec(Config, alice), 279: {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec, 280: [start_stream]), 281: true = escalus_session:can_use_stream_management(Props, Features). 282: 283: 284: server_enables_sm_before_session(Config) -> 285: connect_fresh(Config, alice, sm_after_bind). 286: 287: server_enables_sm_after_session(Config) -> 288: connect_fresh(Config, alice, sm_after_session). 289: 290: server_returns_failed_after_start(Config) -> 291: Alice = connect_fresh(Config, alice, before_auth), 292: server_returns_failed(Alice). 293: 294: server_returns_failed_after_auth(Config) -> 295: Alice = connect_fresh(Config, alice, auth), 296: server_returns_failed(Alice). 297: 298: server_enables_resumption(Config) -> 299: Alice = connect_fresh(Config, alice, sr_presence), 300: escalus_connection:stop(Alice). 301: 302: server_returns_failed(Alice) -> 303: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 304: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 305: escalus_connection:get_stanza(Alice, enable_sm_failed)). 306: 307: client_enables_sm_twice_fails_with_correct_error_stanza(Config) -> 308: Alice = connect_fresh(Config, alice, sm_before_session), 309: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 310: escalus:assert(is_sm_failed, [<<"unexpected-request">>], 311: escalus_connection:get_stanza(Alice, enable_sm_failed)), 312: escalus:assert(is_stream_end, 313: escalus_connection:get_stanza(Alice, enable_sm_failed)), 314: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 315: 316: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) -> 317: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 318: Alice = connect_fresh(Config, alice, sr_presence), 319: SMH = escalus_connection:get_sm_h(Alice), 320: %% WHEN USER RESUMES SESSION FROM NEW CLIENT 321: Alice2 = connect_resume(Alice, SMH), 322: process_initial_stanza(Alice2), 323: %% THEN: Old session is gracefully closed with the correct error stanza 324: escalus:assert(is_stream_error, [<<"conflict">>, <<>>], 325: escalus_connection:get_stanza(Alice, close_old_stream)), 326: escalus:assert(is_stream_end, 327: escalus_connection:get_stanza(Alice, close_old_stream)), 328: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)), 329: true = escalus_connection:is_connected(Alice2), 330: escalus_connection:stop(Alice2). 331: 332: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) -> 333: %% GIVEN USER WITH STREAM RESUMPTION ENABLED 334: Alice = connect_fresh(Config, alice, sr_presence), 335: %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT 336: Alice2 = sm_helper:kill_and_connect_resume(Alice), 337: process_initial_stanza(Alice2), 338: %% THEN new session does not have any message rerouted 339: false = escalus_client:has_stanzas(Alice2), 340: true = escalus_connection:is_connected(Alice2), 341: escalus_connection:stop(Alice2). 342: 343: basic_ack(Config) -> 344: Alice = connect_fresh(Config, alice, sm_after_session), 345: escalus_connection:send(Alice, escalus_stanza:roster_get()), 346: escalus:assert(is_roster_result, 347: escalus_connection:get_stanza(Alice, roster_result)), 348: escalus_connection:send(Alice, escalus_stanza:sm_request()), 349: escalus:assert(is_sm_ack, 350: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 351: 352: %% Test that "h" value is valid when: 353: %% - SM is enabled *before* the session is established 354: %% - <r/> is sent *before* the session is established 355: h_ok_before_session(Config) -> 356: Alice = connect_fresh(Config, alice, sm_after_bind), 357: escalus_connection:send(Alice, escalus_stanza:sm_request()), 358: escalus:assert(is_sm_ack, [0], 359: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 360: 361: %% Test that "h" value is valid when: 362: %% - SM is enabled *before* the session is established 363: %% - <r/> is sent *after* the session is established 364: h_ok_after_session_enabled_before_session(Config) -> 365: Alice = connect_fresh(Config, alice, sm_before_session), 366: escalus_connection:send(Alice, escalus_stanza:sm_request()), 367: escalus:assert(is_sm_ack, [1], 368: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 369: 370: %% Test that "h" value is valid when: 371: %% - SM is enabled *after* the session is established 372: %% - <r/> is sent *after* the session is established 373: h_ok_after_session_enabled_after_session(Config) -> 374: Alice = connect_fresh(Config, alice, sm_after_session), 375: escalus_connection:send(Alice, escalus_stanza:roster_get()), 376: escalus:assert(is_roster_result, 377: escalus_connection:get_stanza(Alice, roster_result)), 378: escalus_connection:send(Alice, escalus_stanza:sm_request()), 379: escalus:assert(is_sm_ack, [1], 380: escalus_connection:get_stanza(Alice, stream_mgmt_ack)). 381: 382: %% Test that "h" value is valid after exchanging a few messages. 383: h_ok_after_a_chat(ConfigIn) -> 384: Config = escalus_users:update_userspec(ConfigIn, alice, 385: stream_management, true), 386: escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) -> 387: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)), 388: escalus:assert(is_chat_message, [<<"Hi, Bob!">>], 389: escalus:wait_for_stanza(Bob)), 390: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 391: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 392: escalus:wait_for_stanza(Alice)), 393: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)), 394: escalus:assert(is_chat_message, [<<"How's life?">>], 395: escalus:wait_for_stanza(Alice)), 396: escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)), 397: escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>], 398: escalus:wait_for_stanza(Bob)), 399: escalus:send(Alice, escalus_stanza:sm_request()), 400: escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)), 401: %% Ack, so that unacked messages don't go into offline store. 402: escalus:send(Alice, escalus_stanza:sm_ack(3)) 403: end). 404: 405: h_non_given_closes_stream_gracefully(ConfigIn) -> 406: AStanza = #xmlel{name = <<"a">>, 407: attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]}, 408: Config = escalus_users:update_userspec(ConfigIn, alice, 409: stream_management, true), 410: escalus:fresh_story(Config, [{alice,1}], fun(Alice) -> 411: C2SPid = mongoose_helper:get_session_pid(Alice), 412: escalus:send(Alice, AStanza), 413: escalus:assert(is_stream_error, 414: [<<"policy-violation">>, <<>>], 415: escalus:wait_for_stanza(Alice)), 416: mongoose_helper:wait_for_pid_to_die(C2SPid), 417: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 418: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)) 419: end). 420: 421: client_acks_more_than_sent(Config) -> 422: Alice = connect_fresh(Config, alice, sm_after_session), 423: escalus:send(Alice, escalus_stanza:sm_ack(5)), 424: StreamErrorStanza = escalus:wait_for_stanza(Alice), 425: %% Assert "undefined-condition" children 426: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza), 427: %% Assert "handled-count-too-high" children with correct attributes 428: HandledCountSubElement = exml_query:path(StreamErrorStanza, 429: [{element_with_ns, 430: <<"handled-count-too-high">>, 431: <<"urn:xmpp:sm:3">>}]), 432: <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>), 433: <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>), 434: %% Assert graceful stream end 435: escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)), 436: true = escalus_connection:wait_for_close(Alice, timer:seconds(5)). 437: 438: too_many_unacked_stanzas(Config) -> 439: Bob = connect_fresh(Config, bob, presence), 440: Alice = connect_fresh(Config, alice, sm_presence, manual), 441: get_ack(Alice), 442: [escalus:send(Bob, escalus_stanza:chat_to(Alice, 443: <<(integer_to_binary(N))/binary, ": Hi, Alice!">>)) 444: || N <- lists:seq(1,?SMALL_SM_BUFFER)], 445: escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests 446: escalus:assert(is_stream_error, [<<"resource-constraint">>, 447: <<"too many unacked stanzas">>], 448: %% wait for deferred buffer check 449: escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)). 450: 451: server_requests_ack(Config) -> 452: server_requests_ack(Config, 1). 453: 454: server_requests_ack_freq_2(Config) -> 455: server_requests_ack(Config, 2). 456: 457: server_requests_ack(Config, N) -> 458: Bob = connect_fresh(Config, bob, presence), 459: Alice = connect_fresh(Config, alice, sm_presence, manual), 460: %% ack request after initial presence 461: maybe_assert_ack_request(1, N, Alice), 462: escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)), 463: escalus:assert(is_chat_message, [<<"Hi, Alice!">>], 464: escalus:wait_for_stanza(Alice)), 465: maybe_assert_ack_request(2, N, Alice). 466: 467: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) -> 468: ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]), 469: case StanzasRec rem AckRequests of 470: 0 -> 471: escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice)); 472: _ -> 473: ok 474: end, 475: StanzasRec. 476: 477: server_requests_ack_after_session(Config) -> 478: Alice = connect_fresh(Config, alice, sm_before_session, manual), 479: escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)). 480: 481: resend_more_offline_messages_than_buffer_size(Config) -> 482: %% connect bob and alice 483: Bob = connect_fresh(Config, bob, presence), 484: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 485: 486: % sent some messages - more than unacked buffer size 487: MessagesToSend = ?SMALL_SM_BUFFER + 1, 488: AliceJid = common_helper:get_bjid(AliceSpec), 489: [escalus_connection:send(Bob, escalus_stanza:chat_to(AliceJid, integer_to_binary(I))) 490: || I <- lists:seq(1, MessagesToSend)], 491: mongoose_helper:wait_for_n_offline_messages(AliceJid, MessagesToSend), 492: 493: % connect alice who wants to receive all messages from offline storage 494: Alice = connect_spec(AliceSpec, sm_after_session, manual), 495: send_initial_presence(Alice), 496: escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests 497: 498: escalus_connection:get_stanza(Alice, presence), 499: get_ack(Alice), % ack request 500: 501: % confirm messages + presence 502: escalus_connection:send(Alice, escalus_stanza:sm_ack(4)), 503: % wait for check constraint message on server side 504: 505: ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000), 506: false = escalus_client:has_stanzas(Alice), 507: % should not receive anything especially any stream errors 508: 509: escalus_connection:stop(Alice), 510: escalus_connection:stop(Bob). 511: 512: resend_unacked_on_reconnection(Config) -> 513: Texts = three_texts(), 514: Bob = connect_fresh(Config, bob, presence), 515: Alice = connect_fresh(Config, alice, sm_presence), 516: AliceSpec = sm_helper:client_to_spec0(Alice), 517: %% Bob sends some messages to Alice. 518: sm_helper:send_messages(Bob, Alice, Texts), 519: %% Alice receives the messages. 520: sm_helper:wait_for_messages(Alice, Texts), 521: %% Alice disconnects without acking the messages. 522: sm_helper:stop_client_and_wait_for_termination(Alice), 523: %% Messages go to the offline store. 524: %% Alice receives the messages from the offline store. 525: NewAlice = connect_spec(AliceSpec, session, manual), 526: send_initial_presence(NewAlice), 527: sm_helper:wait_for_messages(NewAlice, Texts), 528: %% Alice acks the delayed messages so they won't go again 529: %% to the offline store. 530: escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)). 531: 532: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order 533: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM 534: %% and more strict tests, reproducing delays in SM and in mod_offline. 535: preserve_order(Config) -> 536: %% connect bob and alice 537: Bob = connect_fresh(Config, bob, presence), 538: Alice = connect_fresh(Config, alice, sr_presence, manual), 539: AliceSpec = sm_helper:client_to_spec(Alice), 540: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)), 541: 542: %% kill alice connection 543: escalus_connection:kill(Alice), 544: C2SPid = mongoose_helper:get_session_pid(Alice), 545: sm_helper:wait_until_resume_session(C2SPid), 546: 547: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)), 548: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)), 549: 550: NewAlice = connect_spec(AliceSpec, session, manual), 551: escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])), 552: 553: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)), 554: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)), 555: 556: %% Without this check we will get stuff out of order 557: mongoose_helper:wait_for_n_offline_messages(NewAlice, 5), 558: 559: send_initial_presence(NewAlice), 560: %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered 561: mongoose_helper:wait_for_n_offline_messages(NewAlice, 0), 562: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)), 563: 564: %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm) 565: receive_all_ordered(NewAlice, 6), 566: 567: % replace connection 568: NewAlice2 = connect_spec(AliceSpec, session, manual), 569: % allow messages to go to the offline storage 570: mongoose_helper:wait_for_n_offline_messages(NewAlice, 6), 571: 572: send_initial_presence(NewAlice2), 573: 574: % receves messages in correct order 575: receive_all_ordered(NewAlice2, 6), 576: 577: escalus_connection:stop(Bob), 578: escalus_connection:stop(NewAlice2). 579: 580: receive_all_ordered(Conn, Last) -> 581: receive_all_ordered(Conn, 1, Last, []). 582: 583: %% Receive messages from N to Last. 584: %% Ignores acks and presences. 585: %% Handles case when out of order and when not enough stanzas. 586: receive_all_ordered(_Conn, N, Last, Acc) when N > Last -> 587: Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)), 588: sm_helper:assert_messages(Acc, Texts); 589: receive_all_ordered(Conn, N, Last, Acc) -> 590: Stanzas = escalus:wait_for_stanzas(Conn, 1), 591: case Stanzas of 592: [#xmlel{name = <<"message">>}] -> 593: receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas); 594: [_] -> %% Ack or presence 595: receive_all_ordered(Conn, N, Last, Acc); 596: [] -> 597: ct:fail({timeout_waiting, N, Acc}) 598: end. 599: 600: resend_unacked_after_resume_timeout(Config) -> 601: %% connect bob and alice 602: Bob = connect_fresh(Config, bob, presence), 603: Alice = connect_fresh(Config, alice, sr_presence), 604: AliceSpec = sm_helper:client_to_spec(Alice), 605: 606: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 607: %% kill alice connection 608: escalus_connection:kill(Alice), 609: 610: %% ensure there is no session 611: C2SPid = mongoose_helper:get_session_pid(Alice), 612: sm_helper:wait_until_resume_session(C2SPid), 613: 614: %% alice come back and receives unacked message 615: NewAlice = connect_spec(AliceSpec, session), 616: send_initial_presence(NewAlice), 617: 618: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], 619: escalus:wait_for_stanzas(NewAlice, 2)), 620: 621: escalus_connection:stop(Bob), 622: escalus_connection:stop(NewAlice). 623: 624: ping_timeout(Config) -> 625: %% make sure there are no leftover stanzas in the history 626: ?assertEqual([], get_stanzas_filtered_by_mod_ping()), 627: 628: %% connect Alice and wait for the session to close 629: % Alice = connect_fresh(Config, alice, sr_session), 630: Alice = connect_fresh(Config, alice, session), 631: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 632: Pred = fun(Stanza) -> escalus_pred:is_sm_enabled([resume], Stanza) end, 633: SM = escalus_connection:receive_stanza(Alice, #{ pred => Pred }), 634: SMH = escalus_connection:get_sm_h(Alice), 635: SMID = exml_query:attr(SM, <<"id">>), 636: % SMID = sm_helper:client_to_smid(Alice), 637: 638: escalus_client:wait_for_stanza(Alice), 639: ct:sleep(?PING_REQUEST_TIMEOUT + ?PING_INTERVAL + timer:seconds(1)), 640: 641: %% attempt to resume the session after the connection drop 642: sm_helper:kill_client_and_wait_for_termination(Alice), 643: NewAlice = connect_same(Alice, auth), 644: escalus_connection:send(NewAlice, escalus_stanza:resume(SMID, SMH)), 645: 646: %% after resume_timeout, we expect the session to be closed 647: escalus_connection:get_stanza(NewAlice, failed_resumption), 648: 649: %% bind a new session and expect unacknowledged messages to be resent 650: escalus_session:session(escalus_session:bind(NewAlice)), 651: send_initial_presence(NewAlice), 652: 653: %% check if the error stanza was handled by mod_ping 654: FilteredStanzas = get_stanzas_filtered_by_mod_ping(), 655: ?assertNotEqual(length(FilteredStanzas), 0), 656: Fun = fun(Stanza) -> 657: escalus:assert(is_iq_error, Stanza), 658: ?assertNotEqual(undefined, 659: exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, <<"urn:xmpp:ping">>)) 660: end, 661: lists:foreach(Fun, FilteredStanzas), 662: 663: %% kill the connection to avoid errors 664: escalus_connection:kill(NewAlice). 665: 666: resume_expired_session_returns_correct_h(Config) -> 667: %% connect bob and alice 668: Bob = connect_fresh(Config, bob, sr_presence), 669: Alice = connect_fresh(Config, alice, sr_presence, manual), 670: get_ack(Alice), 671: 672: %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge 673: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 674: escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)), 675: %% alice comes back, but too late, so resumption doesn't work, 676: %% but she receives the previous h = 1 anyway 677: %% NewAlice is also manual ack 678: NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice), 679: FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption), 680: <<"1">> = exml_query:attr(FailedResumption, <<"h">>), 681: %% And we can continue with bind and session 682: escalus_session:session(escalus_session:bind(NewAlice)), 683: send_initial_presence(NewAlice), 684: Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}), 685: escalus_connection:get_stanza(NewAlice, {msg, 2})], 686: escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas), 687: escalus_connection:stop(Bob), 688: escalus_connection:stop(NewAlice). 689: 690: gc_repeat_after_never_means_no_cleaning(Config) -> 691: [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config), 692: {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]), 693: {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]), 694: {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]). 695: 696: gc_repeat_after_timeout_does_clean(Config) -> 697: [{SMID1, _} | _ ] = ?config(smid_test, Config), 698: mongoose_helper:wait_until(fun() -> 699: rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1]) 700: end, 701: {error, smid_not_found}, 702: #{name => smid_garbage_collected}). 703: 704: resume_session_state_send_message_without_ack(Config) -> 705: resume_session_state_send_message_generic(Config, no_ack). 706: 707: resume_session_state_send_message_with_ack(Config) -> 708: resume_session_state_send_message_generic(Config, ack). 709: 710: resume_session_state_send_message_generic(Config, AckInitialPresence) -> 711: %% connect bob and alice 712: Bob = connect_fresh(Config, bob, presence), 713: Alice = connect_fresh(Config, alice, sr_presence, manual), 714: maybe_ack_initial_presence(Alice, AckInitialPresence), 715: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 716: %% kill alice connection 717: C2SPid = mongoose_helper:get_session_pid(Alice), 718: escalus_connection:kill(Alice), 719: sm_helper:wait_until_resume_session(C2SPid), 720: sm_helper:assert_alive_resources(Alice, 1), 721: 722: %% send some messages and check if c2s can handle it 723: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 724: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 725: %% suspend the process to ensure that Alice has enough time to reconnect, 726: %% before resumption timeout occurs. 727: ok = rpc(mim(), sys, suspend, [C2SPid]), 728: 729: %% alice comes back and receives unacked message 730: NewAlice = connect_same(Alice, presence), 731: %% now we can resume c2s process of the old connection 732: %% and let it process session resumption timeout 733: ok = rpc(mim(), sys, resume, [C2SPid]), 734: Stanzas = escalus:wait_for_stanzas(NewAlice, 3), 735: 736: % what about order ? 737: % alice receive presence from herself and 3 unacked messages from bob 738: escalus_new_assert:mix_match([is_chat(<<"msg-1">>), 739: is_chat(<<"msg-2">>), 740: is_chat(<<"msg-3">>)], 741: Stanzas), 742: escalus_connection:stop(Bob), 743: escalus_connection:stop(NewAlice). 744: 745: %%for instance it can be done by mod ping 746: resume_session_state_stop_c2s(Config) -> 747: Bob = connect_fresh(Config, bob, presence), 748: Alice = connect_fresh(Config, alice, sr_presence, manual), 749: 750: get_ack(Alice), 751: ack_initial_presence(Alice), 752: 753: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 754: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)), 755: 756: %% get pid of c2s 757: C2SPid = mongoose_helper:get_session_pid(Alice), 758: %% Wait c2s process to process our presence ack. 759: %% Otherwise, we can receive two initial presences sometimes. 760: sm_helper:wait_for_c2s_unacked_count(C2SPid, 1), 761: 762: % kill alice connection 763: escalus_connection:kill(Alice), 764: % session should be alive 765: sm_helper:assert_alive_resources(Alice, 1), 766: rpc(mim(), mongoose_c2s, stop, [C2SPid, normal]), 767: sm_helper:wait_until_resume_session(C2SPid), 768: %% suspend the process to ensure that Alice has enough time to reconnect, 769: %% before resumption timeout occurs. 770: ok = rpc(mim(), sys, suspend, [C2SPid]), 771: 772: %% alice comes back and receives unacked message 773: NewAlice = connect_same(Alice, presence, manual), 774: %% now we can resume c2s process of the old connection 775: %% and let it process session resumption timeout 776: ok = rpc(mim(), sys, resume, [C2SPid]), 777: 778: escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)), 779: escalus_connection:stop(Bob), 780: escalus_connection:stop(NewAlice). 781: 782: %% This test only verifies the validity of helpers (get_session_pid, 783: %% get_c2s_state_name) written for wait_for_resumption 784: %% testcase. 785: session_established(Config) -> 786: Alice = connect_fresh(Config, alice, presence), 787: C2SPid = mongoose_helper:get_session_pid(Alice), 788: session_established = mongoose_helper:get_c2s_state_name(C2SPid), 789: escalus_connection:stop(Alice). 790: 791: %% Ensure that after a violent disconnection, 792: %% the c2s waits for resumption (but don't resume yet). 793: wait_for_resumption(Config) -> 794: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 795: Bob = connect_fresh(Config, bob, session), 796: Texts = three_texts(), 797: {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 798: sm_helper:wait_until_resume_session(C2SPid). 799: 800: unacknowledged_message_hook_filter(Config) -> 801: FilterText = <<"filter">>, 802: Bob = connect_fresh(Config, bob, presence), 803: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 804: Resource = proplists:get_value(username, AliceSpec), 805: HookHandlerExtra = start_filter_hook_listener(FilterText, Resource), 806: Alice = connect_spec([{resource, Resource} | AliceSpec], sr_presence, manual), 807: %% Ack the presence stanza 808: get_ack(Alice), 809: ack_initial_presence(Alice), 810: Messages = [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>, <<"msg-4">>], 811: All = [<<"msg-1">>, FilterText, <<"msg-2">>, FilterText, <<"msg-3">>, <<"msg-4">>], 812: [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, Body)) || Body <- All ], 813: %% kill alice connection 814: C2SPid = mongoose_helper:get_session_pid(Alice), 815: escalus_connection:kill(Alice), 816: sm_helper:wait_until_resume_session(C2SPid), 817: sm_helper:assert_alive_resources(Alice, 1), 818: %% ensure second C2S is registered so all the messages are bounced properly 819: NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual), 820: send_initial_presence(NewAlice), 821: sm_helper:wait_for_resource_count(NewAlice, 2), 822: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 823: %% verify that the filtered message is never received 824: verify_no_receive_filtertext(NewAlice, FilterText, Messages), 825: stop_hook_listener(HookHandlerExtra), 826: escalus_connection:stop(Bob). 827: 828: verify_no_receive_filtertext(_, _, []) -> 829: ok; 830: verify_no_receive_filtertext(Client, FilterText, Messages) -> 831: case escalus:wait_for_stanzas(Client, 1, 500) of 832: [] -> ct:fail("Messages pending: ~p~n", [Messages]); 833: [St] -> 834: case exml_query:path(St, [{element, <<"body">>}, cdata]) of 835: undefined -> 836: verify_no_receive_filtertext(Client, FilterText, Messages); 837: FilterText -> 838: ct:fail("Composing forwarded from a different c2s process"); 839: Message -> 840: Pred = fun(Msg) -> Msg =/= Message end, 841: {Pending, _} = lists:partition(Pred, Messages), 842: verify_no_receive_filtertext(Client, FilterText, Pending) 843: end 844: end. 845: 846: unacknowledged_message_hook_resume(Config) -> 847: unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config). 848: 849: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) -> 850: NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 851: send_initial_presence(NewAlice), 852: {Resource, NewAlice}. 853: 854: unacknowledged_message_hook_bounce(Config) -> 855: unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config). 856: 857: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) -> 858: NewResource = <<"new_", Resource/binary>>, 859: NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}), 860: NewAlice = connect_spec(NewSpec, sr_session, manual), 861: send_initial_presence(NewAlice), 862: %% ensure second C2S is registered so all the messages are bounced properly 863: sm_helper:wait_for_resource_count(NewAlice, 2), 864: ok = rpc(mim(), sys, terminate, [C2SPid, normal]), 865: {NewResource, NewAlice}. 866: 867: unacknowledged_message_hook_offline(Config) -> 868: unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config). 869: 870: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) -> 871: C2SRef = erlang:monitor(process, C2SPid), 872: sm_helper:wait_for_process_termination(C2SRef), 873: %% reset the session, so old C2S process is stopped 874: NewAlice = connect_spec(AliceSpec, sr_session, manual), 875: %% wait for old C2S termination before send presence. other way 876: %% some of the latest unacknowledged messages can be bounced to 877: %% the new C2S process instead of going to the mod_offline storage. 878: %% looks like all the unacknowledged messages arrive to the new 879: %% C2S, but the message sequence is broken (the bounced messages 880: %% delivered before the messages from the mod_offline storage) 881: send_initial_presence(NewAlice), 882: {Resource, NewAlice}. 883: 884: unacknowledged_message_hook_common(RestartConnectionFN, Config) -> 885: %% connect bob and alice 886: Bob = connect_fresh(Config, bob, presence), 887: 888: AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice), 889: Resource = proplists:get_value(username, AliceSpec0), 890: AliceSpec = [{resource, Resource} | AliceSpec0], 891: HookHandlerExtra = start_hook_listener(Resource), 892: Alice = connect_spec(AliceSpec, sr_presence, manual), 893: %% Ack the presence stanza 894: get_ack(Alice), 895: ack_initial_presence(Alice), 896: 897: SMID = sm_helper:client_to_smid(Alice), 898: 899: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)), 900: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)), 901: %% kill alice connection 902: C2SPid = mongoose_helper:get_session_pid(Alice), 903: escalus_connection:kill(Alice), 904: sm_helper:wait_until_resume_session(C2SPid), 905: sm_helper:assert_alive_resources(Alice, 1), 906: 907: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)), 908: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)), 909: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 910: 911: %% send some messages and check if c2s can handle it 912: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)), 913: escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)), 914: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)), 915: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)), 916: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 917: 918: %% alice comes back and receives unacked message 919: {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid), 920: 921: mongoose_helper:wait_until( 922: fun() -> 923: Stanza = escalus_connection:get_stanza(NewAlice, msg), 924: escalus:assert(is_chat_message, [<<"msg-4">>], Stanza), 925: ok 926: end, ok), 927: 928: NewC2SPid = mongoose_helper:get_session_pid(NewAlice), 929: escalus_connection:kill(NewAlice), 930: sm_helper:wait_until_resume_session(NewC2SPid), 931: 932: escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 933: escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 934: escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 935: escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)), 936: ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)), 937: stop_hook_listener(HookHandlerExtra), 938: escalus_connection:stop(Bob). 939: 940: resume_session(Config) -> 941: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 942: Texts = three_texts(), 943: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 944: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts), 945: %% Resume the session. 946: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 947: %% Alice receives the unacked messages from the previous 948: %% interrupted session. 949: sm_helper:wait_for_messages(Alice, Texts), 950: %% Alice acks the received messages. 951: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 952: escalus_connection:stop(Alice) 953: end). 954: 955: resume_session_with_wrong_h_does_not_leak_sessions(Config) -> 956: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 957: Messages = three_texts(), 958: HostType = host_type(), 959: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 960: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages), 961: %% Resume the session. 962: Alice = connect_spec(AliceSpec, auth, manual), 963: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30), 964: escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed), 965: escalus_connection:wait_for_close(Alice, timer:seconds(5)), 966: Fun = fun() -> 967: [] = sm_helper:get_user_present_resources(Alice), 968: sm_helper:get_sid_by_stream_id(HostType, SMID) 969: end, 970: mongoose_helper:wait_until(Fun, {error, smid_not_found}, #{name => smid_is_cleaned}) 971: end). 972: 973: resume_session_with_wrong_sid_returns_item_not_found(Config) -> 974: session_resumption_expects_item_not_found(Config, <<"wrong-sid">>). 975: 976: resume_session_with_wrong_namespace_is_a_noop(Config) -> 977: Alice = connect_fresh(Config, alice, auth), 978: #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4), 979: Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}), 980: escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}), 981: escalus_assert:has_no_stanzas(Alice), 982: [] = sm_helper:get_user_present_resources(Alice), 983: true = escalus_connection:is_connected(Alice), 984: escalus_connection:stop(Alice). 985: 986: resume_dead_session_results_in_item_not_found(Config) -> 987: SMID = base64:encode(crypto:strong_rand_bytes(21)), 988: SID = {os:timestamp(), undefined}, 989: HostType = host_type(), 990: rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]), 991: session_resumption_expects_item_not_found(Config, SMID). 992: 993: session_resumption_expects_item_not_found(Config, SMID) -> 994: Alice = connect_fresh(Config, alice, auth), 995: Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2), 996: escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed), 997: [] = sm_helper:get_user_present_resources(Alice), 998: true = escalus_connection:is_connected(Alice), 999: escalus_connection:stop(Alice). 1000: 1001: resume_session_kills_old_C2S_gracefully(Config) -> 1002: Alice = connect_fresh(Config, alice, sr_presence, manual), 1003: C2SPid = mongoose_helper:get_session_pid(Alice), 1004: 1005: %% Monitor the C2S process and disconnect Alice. 1006: MonitorRef = sm_helper:monitor_session(Alice), 1007: escalus_client:kill_connection(Config, Alice), 1008: 1009: %% Ensure the c2s process is waiting for resumption. 1010: sm_helper:wait_until_resume_session(C2SPid), 1011: 1012: %% Resume the session. 1013: NewAlice = connect_resume(Alice, 1), 1014: 1015: %% C2S process should die gracefully with Reason=normal. 1016: sm_helper:wait_for_process_termination(MonitorRef), 1017: escalus_connection:stop(NewAlice). 1018: 1019: carboncopy_works(Config) -> 1020: escalus:fresh_story(Config, [{alice, 2}, {bob, 1}], fun(Alice1, Alice, Bob) -> 1021: mongoose_helper:enable_carbons([Alice1, Alice]), 1022: escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)), 1023: sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]), 1024: carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}) 1025: end). 1026: 1027: carboncopy_works_after_resume(Config) -> 1028: Texts = three_texts(), 1029: escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice1, Bob) -> 1030: AliceSpec = [{resource, <<"res2">>} | sm_helper:client_to_spec(Alice1)], 1031: F = fun(Alice2) -> 1032: [escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(A)) || A <- [Alice1, Alice2]], 1033: mongoose_helper:enable_carbons([Alice1, Alice2]) 1034: end, 1035: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F), 1036: %% Resume the session. 1037: Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual), 1038: wait_for_carbon_with_bodies(Alice1, Texts, #{from => Bob, to => Alice}), 1039: %% Get a presence from Alice1 again 1040: escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(Alice)), 1041: %% Alice receives an IQ result from the carbon copy enable request 1042: escalus:assert(is_iq_result, [], escalus:wait_for_stanza(Alice)), 1043: %% Alice receives the unacked messages from the previous 1044: %% interrupted session. 1045: sm_helper:wait_for_messages(Alice, Texts), 1046: %% Alice acks the received messages. 1047: escalus_connection:send(Alice, escalus_stanza:sm_ack(5)), 1048: %% Direct send 1049: escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)), 1050: sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]), 1051: carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}), 1052: escalus_connection:stop(Alice) 1053: end). 1054: 1055: wait_for_carbon_with_bodies(Client, Texts, Params) -> 1056: [carboncopy_helper:wait_for_carbon_chat_with_body(Client, Text, Params) || Text <- Texts]. 1057: 1058: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) -> 1059: F = fun(_Client) -> ok end, 1060: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F). 1061: 1062: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F) -> 1063: Alice = connect_spec(AliceSpec, sr_presence, manual), 1064: F(Alice), 1065: C2SPid = mongoose_helper:get_session_pid(Alice), 1066: %% Bobs sends some messages to Alice. 1067: sm_helper:send_messages(Bob, Alice, Texts), 1068: %% Alice receives them, but doesn't ack. 1069: sm_helper:wait_for_messages(Alice, Texts), 1070: %% Alice's connection is violently terminated. 1071: escalus_client:kill_connection(Config, Alice), 1072: sm_helper:wait_until_resume_session(C2SPid), 1073: SMID = sm_helper:client_to_smid(Alice), 1074: {C2SPid, SMID}. 1075: 1076: aggressively_pipelined_resume(Config) -> 1077: AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]} 1078: | escalus_fresh:create_fresh_user(Config, alice)], 1079: UnackedMessages = three_texts(), 1080: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1081: {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages), 1082: %% Resume the session. 1083: Alice = escalus_connection:connect(AliceSpec), 1084: 1085: Username = proplists:get_value(username, AliceSpec), 1086: Password = proplists:get_value(password, AliceSpec), 1087: Payload = <<0:8,Username/binary,0:8,Password/binary>>, 1088: Server = proplists:get_value(server, AliceSpec), 1089: 1090: Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 1091: Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]), 1092: AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>), 1093: Resume = escalus_stanza:resume(SMID, 2), 1094: 1095: escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]), 1096: Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)], 1097: escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)), 1098: 1099: escalus_connection:stop(Alice) 1100: end). 1101: 1102: %% This is a regression test for a case when a session processes a request, which will 1103: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc. 1104: %% Without proper handling, the reply would be rejected because the resumed session 1105: %% has new SID. 1106: replies_are_processed_by_resumed_session(Config) -> 1107: %% GIVEN a session and registered special IQ handler (added in init_per_testcase), 1108: %% that waits for old session process to terminate (at this point new process 1109: %% has fully taken over) and then actually sends the reply. 1110: Alice = connect_fresh(Config, alice, sr_presence), 1111: 1112: %% WHEN a client sends IQ request to the special handler... 1113: IQReq = escalus_stanza:iq_get(regression_ns(), []), 1114: escalus:send(Alice, IQReq), 1115: 1116: %% ... goes down and session is resumed. 1117: Alice2 = sm_helper:kill_and_connect_resume(Alice), 1118: 1119: %% THEN the client receives the reply properly. 1120: IQReply = escalus:wait_for_stanza(Alice2), 1121: escalus:assert(is_iq_result, [IQReq], IQReply), 1122: escalus_connection:stop(Alice2). 1123: 1124: %% This is a regression test for a bug, which manifested in following scenario 1125: %% (due to improper presence sub requests buffering): 1126: %% 1. Bob is online, Alice is offline 1127: %% 2. Bob subscribes to Alice's presence; 1128: %% 3. Alice becomes online 1129: %% 4. Bob sends a message to Alice 1130: %% 5. Alice doesn't SM-ack the request or message, terminates the connection 1131: %% 6. Alice reconnects but with session *replace*, not resume 1132: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer 1133: %% 8. Alice doesn't receive the buffered message 1134: subscription_requests_are_buffered_properly(Config) -> 1135: AliceSpec = escalus_fresh:create_fresh_user(Config, alice), 1136: MsgBody = <<"buffered">>, 1137: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1138: % GIVEN Bob's pending subscription to Alice's presence 1139: AliceJid = common_helper:get_bjid(AliceSpec), 1140: escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)), 1141: _RosterPushReq = escalus:wait_for_stanza(Bob), 1142: 1143: % WHEN Alice becomes online... 1144: Alice = connect_spec(AliceSpec, sr_session, manual), 1145: send_initial_presence(Alice), 1146: %% subscribe could come before the initial presence 1147: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1148: escalus:wait_for_stanzas(Alice, 2)), 1149: 1150: % ...and Bob sends a message to Alice... 1151: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1152: MsgStanza = escalus:wait_for_stanza(Alice), 1153: escalus:assert(is_chat_message, [MsgBody], MsgStanza), 1154: 1155: % ...and Alice terminates connection without acking anything... 1156: escalus_client:kill_connection(Config, Alice), 1157: 1158: % ...and reconnects with session replacement. 1159: Alice2 = connect_spec(AliceSpec, session, manual), 1160: 1161: % THEN Alice receives (without sending initial presence): 1162: % * buffered Bob's message (like above) 1163: % Alice DOESN'T receive: 1164: % * buffered subscription request because it is dropped by ejabberd_sm 1165: % because it's treated like repeated sub request to bare JID, so it's not 1166: % processed by any sub req handler (like mod_roster) 1167: % * buffered available presence from Alice - because it is addressed to another SID 1168: % and Alice2 is a brand new session 1169: escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)), 1170: sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>), 1171: escalus_assert:has_no_stanzas(Alice2), 1172: 1173: %% Only once an initial presence is sent, a subscription request is sent 1174: send_initial_presence(Alice2), 1175: escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)], 1176: escalus:wait_for_stanzas(Alice2, 2)), 1177: 1178: sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>), 1179: escalus_assert:has_no_stanzas(Alice2), 1180: 1181: escalus_connection:stop(Alice2) 1182: end). 1183: 1184: %% This is a regression test for a bug, due to which messages sent to old session 1185: %% in a middle of state handover were not appended properly to SM buffer. 1186: %% Scenario to reproduce: 1187: %% 1. Online Bob and Alice 1188: %% 2. Alice kills the connection 1189: %% 3. Alice's session is suspended 1190: %% 4. Alice resumes session with new connection. At this moment new session is still not 1191: %% present in session table. `resume` request is stuck in old proc mailbox. 1192: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now 1193: %% old session has two messages in mailbox: `resume` and XML from Bob 1194: %% 6. We resume old process and it begins session handover 1195: %% 7. Bob's message is appended to SM buffer in "flush" step 1196: %% 8. With bug fixed, the message is retransmitted properly 1197: messages_are_properly_flushed_during_resumption(Config) -> 1198: escalus:fresh_story(Config, [{bob, 1}], fun(Bob) -> 1199: % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended 1200: Alice = connect_fresh(Config, alice, sr_presence), 1201: SMH = escalus_connection:get_sm_h(Alice), 1202: escalus_client:kill_connection(Config, Alice), 1203: %% The receiver process would stop now 1204: C2SPid = mongoose_helper:get_session_pid(Alice), 1205: sm_helper:wait_until_resume_session(C2SPid), 1206: 1207: sm_helper:wait_for_queue_length(C2SPid, 0), 1208: ok = rpc(mim(), sys, suspend, [C2SPid]), 1209: 1210: % WHEN new session requests resumption 1211: % we wait until that old session has resumption request enqueued; 1212: % we need it to ensure the order of messages: resume first, Bob's chat second. 1213: % Actual wait and message sent by Bob is done in separate process 1214: % because new client start will block until old process is resumed 1215: 1216: MsgBody = <<"flush-regression">>, 1217: spawn_link(fun() -> 1218: sm_helper:wait_for_queue_length(C2SPid, 1), 1219: 1220: % Bob sends a message... 1221: escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)), 1222: 1223: % ...we ensure that a message is enqueued in Alice's session... 1224: % (2 messages = resume request + Bob's message) 1225: sm_helper:wait_for_queue_length(C2SPid, 2), 1226: 1227: % ...and old process is resumed. 1228: ok = rpc(mim(), sys, resume, [C2SPid]) 1229: end), 1230: Alice2 = connect_resume(Alice, SMH), 1231: % THEN Alice's new session receives Bob's message 1232: RecvMsg = escalus:wait_for_stanza(Alice2), 1233: escalus:assert(is_chat_message, [MsgBody], RecvMsg) 1234: end). 1235: 1236: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) -> 1237: Alice = connect_fresh(Config, alice, session, manual), 1238: %% Should not crash anything! 1239: escalus_connection:send(Alice, escalus_stanza:enable_sm()), 1240: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1241: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1242: escalus_connection:stop(Alice). 1243: 1244: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) -> 1245: Alice = connect_fresh(Config, alice, session, manual), 1246: %% Should not crash anything! 1247: escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])), 1248: Response = escalus_connection:get_stanza(Alice, service_unavailable), 1249: escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response), 1250: escalus_connection:stop(Alice). 1251: 1252: %%-------------------------------------------------------------------- 1253: %% Helpers 1254: %%-------------------------------------------------------------------- 1255: start_hook_listener(Resource) -> 1256: TestCasePid = self(), 1257: rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]). 1258: 1259: stop_hook_listener(HookExtra) -> 1260: rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]). 1261: 1262: rpc_start_hook_handler(TestCasePid, User, HostType) -> 1263: LUser = jid:nodeprep(User), 1264: Extra = #{luser => LUser, pid => TestCasePid}, 1265: gen_hook:add_handler(unacknowledged_message, HostType, 1266: fun ?MODULE:hook_handler_fn/3, 1267: Extra, 50), 1268: Extra. 1269: 1270: rpc_stop_hook_handler(HookExtra, HostType) -> 1271: gen_hook:delete_handler(unacknowledged_message, HostType, 1272: fun ?MODULE:hook_handler_fn/3, 1273: HookExtra, 50). 1274: 1275: hook_handler_fn(Acc, 1276: #{jid := Jid} = _Params, 1277: #{luser := LUser, pid := TestCasePid} = _Extra) -> 1278: {U, _S, R} = jid:to_lower(Jid), 1279: case U of 1280: LUser -> 1281: Counter = mongoose_acc:get(sm_test, counter, 0, Acc), 1282: El = mongoose_acc:element(Acc), 1283: TestCasePid ! {sm_test, Counter, R, El}, 1284: {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)}; 1285: _ -> {ok, Acc} 1286: end. 1287: 1288: wait_for_unacked_msg_hook(Counter, Res, Timeout) -> 1289: receive 1290: {sm_test, AccCounter, Resource, Stanza} = Msg -> 1291: ?assertEqual(Counter, AccCounter, Msg), 1292: ?assertEqual(Res, Resource, Msg), 1293: Stanza 1294: after Timeout -> 1295: timeout 1296: end. 1297: 1298: start_filter_hook_listener(FilterText, Resource) -> 1299: rpc(mim(), ?MODULE, rpc_start_filter_hook_handler, [FilterText, Resource, host_type()]). 1300: 1301: stop_filter_hook_listener(HookExtra) -> 1302: rpc(mim(), ?MODULE, rpc_stop_filter_hook_handler, [HookExtra, host_type()]). 1303: 1304: rpc_start_filter_hook_handler(FilterText, User, HostType) -> 1305: LUser = jid:nodeprep(User), 1306: Extra = #{luser => LUser, filter_text => FilterText}, 1307: gen_hook:add_handler(filter_unacknowledged_messages, HostType, 1308: fun ?MODULE:filter_hook_handler_fn/3, 1309: Extra, 50), 1310: Extra. 1311: 1312: rpc_stop_filter_hook_handler(HookExtra, HostType) -> 1313: gen_hook:delete_handler(filter_unacknowledged_messages, HostType, 1314: fun ?MODULE:filter_hook_handler_fn/3, 1315: HookExtra, 50). 1316: 1317: filter_hook_handler_fn(Buffer, 1318: #{jid := Jid} = _Params, 1319: #{luser := LUser, filter_text := FilterText} = _Extra) -> 1320: {U, _} = jid:to_lus(Jid), 1321: case U of 1322: LUser -> 1323: F = fun(Acc) -> filter_text(Acc, FilterText) end, 1324: NewBuffer = lists:filter(F, Buffer), 1325: {ok, NewBuffer}; 1326: _ -> {ok, Buffer} 1327: end. 1328: 1329: filter_text(Acc, FilterText) -> 1330: case mongoose_acc:stanza_name(Acc) of 1331: <<"message">> -> 1332: El = mongoose_acc:element(Acc), 1333: FilterText =/= exml_query:path(El, [{element, <<"body">>}, cdata]); 1334: _ -> 1335: true 1336: end. 1337: 1338: is_chat(Content) -> 1339: fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end. 1340: 1341: is_presence(Type) -> 1342: fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end. 1343: 1344: three_texts() -> 1345: [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>]. 1346: 1347: get_stanzas_filtered_by_mod_ping() -> 1348: History = rpc(mim(), meck, history, [mod_ping]), 1349: [Stanza || 1350: {_Pid, 1351: {_Mod, 1352: filter_local_packet = _Func, 1353: [{_, _, _, Stanza} = _Acc, _Params, _Extra] = _Args 1354: }, 1355: {stop, drop} = _Result 1356: } <- History 1357: ]. 1358: %%-------------------------------------------------------------------- 1359: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session" 1360: %%-------------------------------------------------------------------- 1361: 1362: regression_ns() -> 1363: <<"regression">>. 1364: 1365: register_handler() -> 1366: HostType = host_type(), 1367: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 1368: [HostType, regression_ns(), ejabberd_sm, 1369: fun ?MODULE:regression_handler/5, #{}, one_queue]). 1370: 1371: unregister_handler() -> 1372: HostType = host_type(), 1373: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 1374: [HostType, regression_ns(), ejabberd_sm]). 1375: 1376: regression_handler(Acc, _From, _To, IQ, _Extra) -> 1377: %% A bit of a hack - will no longer work when the SID format changes 1378: {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc), 1379: erlang:monitor(process, Pid), 1380: receive 1381: {'DOWN', _, _, _, _} -> 1382: ok 1383: after 1384: 10000 -> 1385: error({c2s_not_stopped_after_timeout, Pid}) 1386: end, 1387: %% We avoid another race condition - there is a short window where user session 1388: %% is not registered in ejabberd_sm: between old process termination and the moment 1389: %% when the new process stores new session in memory. It should be fixed separately. 1390: wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100), 1391: {Acc, jlib:make_result_iq_reply(IQ)}. 1392: 1393: wait_for_session(JID, Retries, SleepTime) -> 1394: case ejabberd_sm:get_session(JID) of 1395: offline -> 1396: timer:sleep(SleepTime), 1397: wait_for_session(JID, Retries - 1, SleepTime); 1398: _ -> 1399: ok 1400: end. 1401: 1402: maybe_ack_initial_presence(Alice, ack) -> 1403: ack_initial_presence(Alice); 1404: maybe_ack_initial_presence(_Alice, no_ack) -> 1405: ok.