1: %% Stream Management tests
    2: -module(sm_SUITE).
    3: 
    4: -compile(export_all).
    5: -include_lib("exml/include/exml.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: -include_lib("escalus/include/escalus.hrl").
    8: -include_lib("common_test/include/ct.hrl").
    9: 
   10: %% Injected code callbacks
   11: -export([rpc_start_hook_handler/3,
   12:          rpc_stop_hook_handler/2,
   13:          hook_handler_fn/3,
   14:          regression_handler/5]).
   15: 
   16: -import(distributed_helper, [mim/0,
   17:                              require_rpc_nodes/1,
   18:                              rpc/4]).
   19: 
   20: -import(domain_helper, [host_type/0]).
   21: 
   22: -import(sm_helper, [connect_fresh/3,
   23:                     connect_fresh/4,
   24:                     connect_spec/2,
   25:                     connect_spec/3,
   26:                     connect_same/2,
   27:                     connect_same/3,
   28:                     connect_resume/2,
   29:                     process_initial_stanza/1,
   30:                     send_initial_presence/1,
   31:                     get_ack/1,
   32:                     ack_initial_presence/1]).
   33: 
   34: -define(MOD_SM, mod_stream_management).
   35: -define(CONSTRAINT_CHECK_TIMEOUT, 5000).
   36: -define(LONG_TIMEOUT, 3600).
   37: -define(SHORT_TIMEOUT, 3).
   38: -define(SMALL_SM_BUFFER, 3).
   39: 
   40: %%--------------------------------------------------------------------
   41: %% Suite configuration
   42: %%--------------------------------------------------------------------
   43: 
   44: suite() ->
   45:     require_rpc_nodes([mim]) ++ escalus:suite().
   46: 
   47: all() ->
   48:     ct_helper:groups_to_all(groups()).
   49: 
   50: groups() ->
   51:     P = [parallel],
   52:     [{parallel, P, parallel_cases()},
   53:      {parallel_manual_ack_freq_1, P, parallel_manual_ack_freq_1_cases()},
   54:      {manual_ack_freq_2, [], manual_ack_freq_2_cases()},
   55:      {stale_h, [], stale_h_cases()},
   56:      {parallel_unacknowledged_message_hook, P, parallel_unacknowledged_message_hook_cases()}].
   57: 
   58: parallel_cases() ->
   59:     [server_announces_sm,
   60:      server_enables_sm_before_session,
   61:      server_enables_sm_after_session,
   62:      server_returns_failed_after_start,
   63:      server_returns_failed_after_auth,
   64:      server_enables_resumption,
   65:      client_enables_sm_twice_fails_with_correct_error_stanza,
   66:      session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza,
   67:      session_resumed_and_old_session_dead_doesnt_route_error_to_new_session,
   68:      basic_ack,
   69:      h_ok_before_session,
   70:      h_ok_after_session_enabled_before_session,
   71:      h_ok_after_session_enabled_after_session,
   72:      h_ok_after_a_chat,
   73:      h_non_given_closes_stream_gracefully,
   74:      resend_unacked_on_reconnection,
   75:      session_established,
   76:      wait_for_resumption,
   77:      resume_session,
   78:      resume_session_with_wrong_h_does_not_leak_sessions,
   79:      resume_session_with_wrong_sid_returns_item_not_found,
   80:      resume_session_with_wrong_namespace_is_a_noop,
   81:      resume_dead_session_results_in_item_not_found,
   82:      resume_session_kills_old_C2S_gracefully,
   83:      aggressively_pipelined_resume,
   84:      replies_are_processed_by_resumed_session,
   85:      subscription_requests_are_buffered_properly,
   86:      messages_are_properly_flushed_during_resumption,
   87:      messages_are_properly_flushed_during_resumption_p1_fsm_old].
   88: 
   89: parallel_manual_ack_freq_1_cases() ->
   90:     [client_acks_more_than_sent,
   91:      too_many_unacked_stanzas,
   92:      resend_unacked_after_resume_timeout,
   93:      resume_session_state_send_message_with_ack,
   94:      resume_session_state_send_message_without_ack,
   95:      resume_session_state_stop_c2s,
   96:      server_requests_ack_after_session,
   97:      resend_more_offline_messages_than_buffer_size,
   98:      server_requests_ack].
   99: 
  100: manual_ack_freq_2_cases() ->
  101:     [server_requests_ack_freq_2].
  102: 
  103: stale_h_cases() ->
  104:     [resume_expired_session_returns_correct_h,
  105:      gc_repeat_after_never_means_no_cleaning,
  106:      gc_repeat_after_timeout_does_clean].
  107: 
  108: stream_mgmt_disabled_cases() ->
  109:     [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt,
  110:      no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption].
  111: 
  112: manual_ack_freq_long_session_timeout_cases() ->
  113:     [preserve_order].
  114: 
  115: parallel_unacknowledged_message_hook_cases() ->
  116:     [unacknowledged_message_hook_bounce,
  117:      unacknowledged_message_hook_offline,
  118:      unacknowledged_message_hook_resume].
  119: 
  120: %%--------------------------------------------------------------------
  121: %% Init & teardown
  122: %%--------------------------------------------------------------------
  123: 
  124: init_per_suite(Config) ->
  125:     NewConfig = dynamic_modules:save_modules(host_type(), Config),
  126:     NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true),
  127:     mongoose_helper:inject_module(?MODULE),
  128:     escalus:init_per_suite(NewConfigWithSM).
  129: 
  130: end_per_suite(Config) ->
  131:     escalus_fresh:clean(),
  132:     dynamic_modules:restore_modules(Config),
  133:     escalus:end_per_suite(Config).
  134: 
  135: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook;
  136:                                    Group =:= manual_ack_freq_long_session_timeout;
  137:                                    Group =:= parallel_manual_ack_freq_1;
  138:                                    Group =:= manual_ack_freq_2 ->
  139:     dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
  140:     Config;
  141: init_per_group(stale_h, Config) ->
  142:     Config;
  143: init_per_group(stream_mgmt_disabled, Config) ->
  144:     dynamic_modules:stop(host_type(), ?MOD_SM),
  145:     rpc(mim(), mnesia, delete_table, [sm_session]),
  146:     Config;
  147: init_per_group(Group, Config) ->
  148:     dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
  149:     Config.
  150: 
  151: end_per_group(_Group, _Config) ->
  152:     ok.
  153: 
  154: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
  155:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  156:     escalus:init_per_testcase(CN, Config);
  157: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning;
  158:                                    CN =:= gc_repeat_after_timeout_does_clean ->
  159:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  160:     Config2 = register_some_smid_h(Config),
  161:     escalus:init_per_testcase(CN, Config2);
  162: init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
  163:     escalus:init_per_testcase(CN, Config);
  164: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
  165:     register_handler(),
  166:     escalus:init_per_testcase(CN, Config);
  167: init_per_testcase(CaseName, Config) ->
  168:     escalus:init_per_testcase(CaseName, Config).
  169: 
  170: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h;
  171:                                   CN =:= gc_repeat_after_never_means_no_cleaning;
  172:                                   CN =:= gc_repeat_after_timeout_does_clean ->
  173:     rpc(mim(), ejabberd_sup, stop_child, [stream_management_stale_h]),
  174:     escalus:end_per_testcase(CN, Config);
  175: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
  176:     unregister_handler(),
  177:     escalus:end_per_testcase(CN, Config);
  178: end_per_testcase(CaseName, Config) ->
  179:     escalus:end_per_testcase(CaseName, Config).
  180: 
  181: %% Module configuration per group (in case of stale_h group it is per testcase)
  182: 
  183: required_modules(Scope, Name) ->
  184:     SMConfig = case required_sm_opts(Scope, Name) of
  185:                    stopped -> stopped;
  186:                    ExtraOpts -> merge_proplists(common_sm_opts(), ExtraOpts)
  187:                end,
  188:     [{mod_stream_management, SMConfig}, {mod_offline, []}].
  189: 
  190: required_sm_opts(group, parallel) ->
  191:     [{ack_freq, never}];
  192: required_sm_opts(group, parallel_manual_ack_freq_1) ->
  193:     [{ack_freq, 1},
  194:      {resume_timeout, ?SHORT_TIMEOUT}];
  195: required_sm_opts(group, manual_ack_freq_2) ->
  196:     [{ack_freq, 2}];
  197: required_sm_opts(group, stream_mgmt_disabled) ->
  198:     stopped;
  199: required_sm_opts(group, parallel_unacknowledged_message_hook) ->
  200:     [{ack_freq, 1}];
  201: required_sm_opts(group, manual_ack_freq_long_session_timeout) ->
  202:     [{ack_freq, 1}, {buffer_max, 1000}];
  203: required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
  204:     [{ack_freq, 1},
  205:      {resume_timeout, ?SHORT_TIMEOUT} | stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)];
  206: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
  207:     stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT);
  208: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
  209:     stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT).
  210: 
  211: common_sm_opts() ->
  212:     [{buffer_max, ?SMALL_SM_BUFFER}].
  213: 
  214: merge_proplists(Defaults, Values) ->
  215:     Values ++ delete_keys(proplists:get_keys(Values), Defaults).
  216: 
  217: delete_keys(Keys, List) ->
  218:     lists:foldl(fun proplists:delete/2, List, Keys).
  219: 
  220: stale_h(RepeatAfter, Geriatric) ->
  221:     [{stale_h, [{enabled, true},
  222:                 {stale_h_repeat_after, RepeatAfter},
  223:                 {stale_h_geriatric, Geriatric}]}].
  224: 
  225: make_smid() ->
  226:     base64:encode(crypto:strong_rand_bytes(21)).
  227: 
  228: register_smid(IntSmidId) ->
  229:     S = {SMID = make_smid(), IntSmidId},
  230:     ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]),
  231:     S.
  232: 
  233: register_some_smid_h(Config) ->
  234:     TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)),
  235:     [{smid_test, TestSmids} | Config].
  236: 
  237: %%--------------------------------------------------------------------
  238: %% Tests
  239: %%--------------------------------------------------------------------
  240: 
  241: server_announces_sm(Config) ->
  242:     AliceSpec = escalus_fresh:freshen_spec(Config, alice),
  243:     {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec,
  244:                                                                       [start_stream]),
  245:     true = escalus_session:can_use_stream_management(Props, Features).
  246: 
  247: 
  248: server_enables_sm_before_session(Config) ->
  249:     connect_fresh(Config, alice, sm_after_bind).
  250: 
  251: server_enables_sm_after_session(Config) ->
  252:     connect_fresh(Config, alice, sm_after_session).
  253: 
  254: server_returns_failed_after_start(Config) ->
  255:     Alice = connect_fresh(Config, alice, before_auth),
  256:     server_returns_failed(Alice).
  257: 
  258: server_returns_failed_after_auth(Config) ->
  259:     Alice = connect_fresh(Config, alice, auth),
  260:     server_returns_failed(Alice).
  261: 
  262: server_enables_resumption(Config) ->
  263:     Alice = connect_fresh(Config, alice, sr_presence),
  264:     escalus_connection:stop(Alice).
  265: 
  266: server_returns_failed(Alice) ->
  267:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
  268:     escalus:assert(is_sm_failed, [<<"unexpected-request">>],
  269:                    escalus_connection:get_stanza(Alice, enable_sm_failed)).
  270: 
  271: client_enables_sm_twice_fails_with_correct_error_stanza(Config) ->
  272:     Alice = connect_fresh(Config, alice, sm_before_session),
  273:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
  274:     escalus:assert(is_sm_failed, [<<"unexpected-request">>],
  275:                    escalus_connection:get_stanza(Alice, enable_sm_failed)),
  276:     escalus:assert(is_stream_end,
  277:                    escalus_connection:get_stanza(Alice, enable_sm_failed)),
  278:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)).
  279: 
  280: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) ->
  281:     %% GIVEN USER WITH STREAM RESUMPTION ENABLED
  282:     Alice = connect_fresh(Config, alice, sr_presence),
  283:     SMH = escalus_connection:get_sm_h(Alice),
  284:     %% WHEN USER RESUMES SESSION FROM NEW CLIENT
  285:     Alice2 = connect_resume(Alice, SMH),
  286:     process_initial_stanza(Alice2),
  287:     %% THEN: Old session is gracefully closed with the correct error stanza
  288:     escalus:assert(is_stream_error, [<<"conflict">>, <<>>],
  289:                    escalus_connection:get_stanza(Alice, close_old_stream)),
  290:     escalus:assert(is_stream_end,
  291:                    escalus_connection:get_stanza(Alice, close_old_stream)),
  292:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)),
  293:     true = escalus_connection:is_connected(Alice2),
  294:     escalus_connection:stop(Alice2).
  295: 
  296: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) ->
  297:     %% GIVEN USER WITH STREAM RESUMPTION ENABLED
  298:     Alice = connect_fresh(Config, alice, sr_presence),
  299:     %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT
  300:     Alice2 = sm_helper:kill_and_connect_resume(Alice),
  301:     process_initial_stanza(Alice2),
  302:     %% THEN new session does not have any message rerouted
  303:     false = escalus_client:has_stanzas(Alice2),
  304:     true = escalus_connection:is_connected(Alice2),
  305:     escalus_connection:stop(Alice2).
  306: 
  307: basic_ack(Config) ->
  308:     Alice = connect_fresh(Config, alice, sm_after_session),
  309:     escalus_connection:send(Alice, escalus_stanza:roster_get()),
  310:     escalus:assert(is_roster_result,
  311:                    escalus_connection:get_stanza(Alice, roster_result)),
  312:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  313:     escalus:assert(is_sm_ack,
  314:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  315: 
  316: %% Test that "h" value is valid when:
  317: %% - SM is enabled *before* the session is established
  318: %% - <r/> is sent *before* the session is established
  319: h_ok_before_session(Config) ->
  320:     Alice = connect_fresh(Config, alice, sm_after_bind),
  321:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  322:     escalus:assert(is_sm_ack, [0],
  323:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  324: 
  325: %% Test that "h" value is valid when:
  326: %% - SM is enabled *before* the session is established
  327: %% - <r/> is sent *after* the session is established
  328: h_ok_after_session_enabled_before_session(Config) ->
  329:     Alice = connect_fresh(Config, alice, sm_before_session),
  330:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  331:     escalus:assert(is_sm_ack, [1],
  332:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  333: 
  334: %% Test that "h" value is valid when:
  335: %% - SM is enabled *after* the session is established
  336: %% - <r/> is sent *after* the session is established
  337: h_ok_after_session_enabled_after_session(Config) ->
  338:     Alice = connect_fresh(Config, alice, sm_after_session),
  339:     escalus_connection:send(Alice, escalus_stanza:roster_get()),
  340:     escalus:assert(is_roster_result,
  341:                    escalus_connection:get_stanza(Alice, roster_result)),
  342:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  343:     escalus:assert(is_sm_ack, [1],
  344:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  345: 
  346: %% Test that "h" value is valid after exchanging a few messages.
  347: h_ok_after_a_chat(ConfigIn) ->
  348:     Config = escalus_users:update_userspec(ConfigIn, alice,
  349:                                            stream_management, true),
  350:     escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) ->
  351:         escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)),
  352:         escalus:assert(is_chat_message, [<<"Hi, Bob!">>],
  353:                        escalus:wait_for_stanza(Bob)),
  354:         escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)),
  355:         escalus:assert(is_chat_message, [<<"Hi, Alice!">>],
  356:                        escalus:wait_for_stanza(Alice)),
  357:         escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)),
  358:         escalus:assert(is_chat_message, [<<"How's life?">>],
  359:                        escalus:wait_for_stanza(Alice)),
  360:         escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)),
  361:         escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>],
  362:                        escalus:wait_for_stanza(Bob)),
  363:         escalus:send(Alice, escalus_stanza:sm_request()),
  364:         escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)),
  365:         %% Ack, so that unacked messages don't go into offline store.
  366:         escalus:send(Alice, escalus_stanza:sm_ack(3))
  367:     end).
  368: 
  369: h_non_given_closes_stream_gracefully(ConfigIn) ->
  370:     AStanza = #xmlel{name = <<"a">>,
  371:                attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]},
  372:     Config = escalus_users:update_userspec(ConfigIn, alice,
  373:                                            stream_management, true),
  374:     escalus:fresh_story(Config, [{alice,1}], fun(Alice) ->
  375:         C2SPid = mongoose_helper:get_session_pid(Alice),
  376:         escalus:send(Alice, AStanza),
  377:         escalus:assert(is_stream_error,
  378:                        [<<"policy-violation">>, <<>>],
  379:                        escalus:wait_for_stanza(Alice)),
  380:         mongoose_helper:wait_for_pid_to_die(C2SPid),
  381:         escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)),
  382:         true = escalus_connection:wait_for_close(Alice, timer:seconds(5))
  383:     end).
  384: 
  385: client_acks_more_than_sent(Config) ->
  386:     Alice = connect_fresh(Config, alice, sm_after_session),
  387:     escalus:send(Alice, escalus_stanza:sm_ack(5)),
  388:     StreamErrorStanza = escalus:wait_for_stanza(Alice),
  389:     %% Assert "undefined-condition" children
  390:     escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza),
  391:     %% Assert "handled-count-too-high" children with correct attributes
  392:     HandledCountSubElement = exml_query:path(StreamErrorStanza,
  393:                                              [{element_with_ns,
  394:                                                <<"handled-count-too-high">>,
  395:                                                <<"urn:xmpp:sm:3">>}]),
  396:     <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>),
  397:     <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>),
  398:     %% Assert graceful stream end
  399:     escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)),
  400:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)).
  401: 
  402: too_many_unacked_stanzas(Config) ->
  403:     Bob = connect_fresh(Config, bob, presence),
  404:     Alice = connect_fresh(Config, alice, sm_presence, manual),
  405:     get_ack(Alice),
  406:     [escalus:send(Bob, escalus_stanza:chat_to(Alice,
  407:         <<(integer_to_binary(N))/binary, ": Hi, Alice!">>))
  408:      || N <- lists:seq(1,?SMALL_SM_BUFFER)],
  409:     escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests
  410:     escalus:assert(is_stream_error, [<<"resource-constraint">>,
  411:                                      <<"too many unacked stanzas">>],
  412:                    %% wait for deferred buffer check
  413:                    escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)).
  414: 
  415: server_requests_ack(Config) ->
  416:     server_requests_ack(Config, 1).
  417: 
  418: server_requests_ack_freq_2(Config) ->
  419:     server_requests_ack(Config, 2).
  420: 
  421: server_requests_ack(Config, N) ->
  422:     Bob = connect_fresh(Config, bob, presence),
  423:     Alice = connect_fresh(Config, alice, sm_presence, manual),
  424:     %% ack request after initial presence
  425:     maybe_assert_ack_request(1, N, Alice),
  426:     escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)),
  427:     escalus:assert(is_chat_message, [<<"Hi, Alice!">>],
  428:                    escalus:wait_for_stanza(Alice)),
  429:     maybe_assert_ack_request(2, N, Alice).
  430: 
  431: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) ->
  432:     ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]),
  433:     case StanzasRec rem AckRequests of
  434:         0 ->
  435:             escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice));
  436:         _ ->
  437:             ok
  438:     end,
  439:     StanzasRec.
  440: 
  441: server_requests_ack_after_session(Config) ->
  442:     Alice = connect_fresh(Config, alice, sm_before_session, manual),
  443:     escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)).
  444: 
  445: resend_more_offline_messages_than_buffer_size(Config) ->
  446:     %% connect bob and alice
  447:     Bob = connect_fresh(Config, bob, presence),
  448:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  449: 
  450:     % sent some messages - more than unacked buffer size
  451:     MessagesToSend = ?SMALL_SM_BUFFER + 1,
  452:     JID = common_helper:get_bjid(AliceSpec),
  453:     [escalus_connection:send(Bob, escalus_stanza:chat_to(JID, integer_to_binary(I)))
  454:      || I <- lists:seq(1, MessagesToSend)],
  455: 
  456:     % connect alice who wants to receive all messages from offline storage
  457:     Alice = connect_spec(AliceSpec, sm_after_session, manual),
  458:     mongoose_helper:wait_for_n_offline_messages(Alice, MessagesToSend),
  459:     send_initial_presence(Alice),
  460: 
  461:     escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests
  462: 
  463:     escalus_connection:get_stanza(Alice, presence),
  464:     get_ack(Alice), % ack request
  465: 
  466:     % confirm messages + presence
  467:     escalus_connection:send(Alice, escalus_stanza:sm_ack(4)),
  468:     % wait for check constraint message on server side
  469: 
  470:     ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000),
  471:     false = escalus_client:has_stanzas(Alice),
  472:     % should not receive anything especially any stream errors
  473: 
  474:     escalus_connection:stop(Alice),
  475:     escalus_connection:stop(Bob).
  476: 
  477: resend_unacked_on_reconnection(Config) ->
  478:     Texts = three_texts(),
  479:     Bob = connect_fresh(Config, bob, presence),
  480:     Alice = connect_fresh(Config, alice, sm_presence),
  481:     AliceSpec = sm_helper:client_to_spec0(Alice),
  482:     %% Bob sends some messages to Alice.
  483:     sm_helper:send_messages(Bob, Alice, Texts),
  484:     %% Alice receives the messages.
  485:     sm_helper:wait_for_messages(Alice, Texts),
  486:     %% Alice disconnects without acking the messages.
  487:     sm_helper:stop_client_and_wait_for_termination(Alice),
  488:     %% Messages go to the offline store.
  489:     %% Alice receives the messages from the offline store.
  490:     NewAlice = connect_spec(AliceSpec, session, manual),
  491:     send_initial_presence(NewAlice),
  492:     sm_helper:wait_for_messages(NewAlice, Texts),
  493:     %% Alice acks the delayed messages so they won't go again
  494:     %% to the offline store.
  495:     escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)).
  496: 
  497: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order
  498: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM
  499: %%      and more strict tests, reproducing delays in SM and in mod_offline.
  500: preserve_order(Config) ->
  501:     %% connect bob and alice
  502:     Bob = connect_fresh(Config, bob, presence),
  503:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  504:     AliceSpec = sm_helper:client_to_spec(Alice),
  505:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)),
  506: 
  507:     %% kill alice connection
  508:     escalus_connection:kill(Alice),
  509:     C2SPid = mongoose_helper:get_session_pid(Alice),
  510:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  511: 
  512:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)),
  513:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)),
  514: 
  515:     NewAlice = connect_spec(AliceSpec, session, manual),
  516:     escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])),
  517: 
  518:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)),
  519:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)),
  520: 
  521:     %% Without this check we will get stuff out of order
  522:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 5),
  523: 
  524:     send_initial_presence(NewAlice),
  525:     %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered
  526:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 0),
  527:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)),
  528: 
  529:     %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm)
  530:     receive_all_ordered(NewAlice, 6),
  531: 
  532:     % replace connection
  533:     NewAlice2 = connect_spec(AliceSpec, session, manual),
  534:     % allow messages to go to the offline storage
  535:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 6),
  536: 
  537:     send_initial_presence(NewAlice2),
  538: 
  539:     % receves messages in correct order
  540:     receive_all_ordered(NewAlice2, 6),
  541: 
  542:     escalus_connection:stop(Bob),
  543:     escalus_connection:stop(NewAlice2).
  544: 
  545: receive_all_ordered(Conn, Last) ->
  546:     receive_all_ordered(Conn, 1, Last, []).
  547: 
  548: %% Receive messages from N to Last.
  549: %% Ignores acks and presences.
  550: %% Handles case when out of order and when not enough stanzas.
  551: receive_all_ordered(_Conn, N, Last, Acc) when N > Last ->
  552:     Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)),
  553:     sm_helper:assert_messages(Acc, Texts);
  554: receive_all_ordered(Conn, N, Last, Acc) ->
  555:     Stanzas = escalus:wait_for_stanzas(Conn, 1),
  556:     case Stanzas of
  557:         [#xmlel{name = <<"message">>}] ->
  558:             receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas);
  559:         [_] -> %% Ack or presence
  560:             receive_all_ordered(Conn, N, Last, Acc);
  561:         [] ->
  562:             ct:fail({timeout_waiting, N, Acc})
  563:     end.
  564: 
  565: resend_unacked_after_resume_timeout(Config) ->
  566:     %% connect bob and alice
  567:     Bob = connect_fresh(Config, bob, presence),
  568:     Alice = connect_fresh(Config, alice, sr_presence),
  569:     AliceSpec = sm_helper:client_to_spec(Alice),
  570: 
  571:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  572:     %% kill alice connection
  573:     escalus_connection:kill(Alice),
  574: 
  575:     %% ensure there is no session
  576:     C2SPid = mongoose_helper:get_session_pid(Alice),
  577:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  578: 
  579:     %% alice come back and receives unacked message
  580:     NewAlice = connect_spec(AliceSpec, session),
  581:     send_initial_presence(NewAlice),
  582: 
  583:     escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)],
  584:                                  escalus:wait_for_stanzas(NewAlice, 2)),
  585: 
  586:     escalus_connection:stop(Bob),
  587:     escalus_connection:stop(NewAlice).
  588: 
  589: resume_expired_session_returns_correct_h(Config) ->
  590:     %% connect bob and alice
  591:     Bob = connect_fresh(Config, bob, sr_presence),
  592:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  593:     get_ack(Alice),
  594: 
  595:     %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge
  596:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  597:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)),
  598:     %% alice comes back, but too late, so resumption doesn't work,
  599:     %% but she receives the previous h = 1 anyway
  600:     %% NewAlice is also manual ack
  601:     NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice),
  602:     FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption),
  603:     <<"1">> = exml_query:attr(FailedResumption, <<"h">>),
  604:     %% And we can continue with bind and session
  605:     escalus_session:session(escalus_session:bind(NewAlice)),
  606:     send_initial_presence(NewAlice),
  607:     Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}),
  608:                escalus_connection:get_stanza(NewAlice, {msg, 2})],
  609:     escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas),
  610:     escalus_connection:stop(Bob),
  611:     escalus_connection:stop(NewAlice).
  612: 
  613: gc_repeat_after_never_means_no_cleaning(Config) ->
  614:     [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config),
  615:     {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]),
  616:     {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]),
  617:     {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]).
  618: 
  619: gc_repeat_after_timeout_does_clean(Config) ->
  620:     [{SMID1, _} | _ ] = ?config(smid_test, Config),
  621:     mongoose_helper:wait_until(fun() ->
  622:                                        rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1])
  623:                                end,
  624:                                {error, smid_not_found},
  625:                                #{name => smid_garbage_collected}).
  626: 
  627: resume_session_state_send_message_without_ack(Config) ->
  628:     resume_session_state_send_message_generic(Config, no_ack).
  629: 
  630: resume_session_state_send_message_with_ack(Config) ->
  631:     resume_session_state_send_message_generic(Config, ack).
  632: 
  633: resume_session_state_send_message_generic(Config, AckInitialPresence) ->
  634:     %% connect bob and alice
  635:     Bob = connect_fresh(Config, bob, presence),
  636:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  637:     maybe_ack_initial_presence(Alice, AckInitialPresence),
  638:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  639:     %% kill alice connection
  640:     C2SPid = mongoose_helper:get_session_pid(Alice),
  641:     escalus_connection:kill(Alice),
  642:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  643:     sm_helper:assert_alive_resources(Alice, 1),
  644: 
  645:     %% send some messages and check if c2s can handle it
  646:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)),
  647:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)),
  648:     %% suspend the process to ensure that Alice has enough time to reconnect,
  649:     %% before resumption timeout occurs.
  650:     ok = rpc(mim(), sys, suspend, [C2SPid]),
  651: 
  652:     %% alice comes back and receives unacked message
  653:     NewAlice = connect_same(Alice, presence),
  654:     %% now we can resume c2s process of the old connection
  655:     %% and let it process session resumption timeout
  656:     ok = rpc(mim(), sys, resume, [C2SPid]),
  657:     Stanzas = escalus:wait_for_stanzas(NewAlice, 3),
  658: 
  659:     % what about order ?
  660:     % alice receive presence from herself and 3 unacked messages from bob
  661:     escalus_new_assert:mix_match([is_chat(<<"msg-1">>),
  662:                                   is_chat(<<"msg-2">>),
  663:                                   is_chat(<<"msg-3">>)],
  664:                                  Stanzas),
  665:     escalus_connection:stop(Bob),
  666:     escalus_connection:stop(NewAlice).
  667: 
  668: %%for instance it can be done by mod ping
  669: resume_session_state_stop_c2s(Config) ->
  670:     Bob = connect_fresh(Config, bob, presence),
  671:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  672: 
  673:     get_ack(Alice),
  674:     ack_initial_presence(Alice),
  675: 
  676:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  677:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)),
  678: 
  679:     %% get pid of c2s
  680:     C2SPid = mongoose_helper:get_session_pid(Alice),
  681:     %% Wait c2s process to process our presence ack.
  682:     %% Otherwise, we can receive two initial presences sometimes.
  683:     sm_helper:wait_for_c2s_unacked_count(C2SPid, 1),
  684: 
  685:     % kill alice connection
  686:     escalus_connection:kill(Alice),
  687:     % session should be alive
  688:     sm_helper:assert_alive_resources(Alice, 1),
  689:     rpc(mim(), ejabberd_c2s, stop, [C2SPid]),
  690:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  691:     %% suspend the process to ensure that Alice has enough time to reconnect,
  692:     %% before resumption timeout occurs.
  693:     ok = rpc(mim(), sys, suspend, [C2SPid]),
  694: 
  695:     %% alice comes back and receives unacked message
  696:     NewAlice = connect_same(Alice, presence, manual),
  697:     %% now we can resume c2s process of the old connection
  698:     %% and let it process session resumption timeout
  699:     ok = rpc(mim(), sys, resume, [C2SPid]),
  700: 
  701:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)),
  702:     escalus_connection:stop(Bob),
  703:     escalus_connection:stop(NewAlice).
  704: 
  705: %% This test only verifies the validity of helpers (get_session_pid,
  706: %% get_c2s_state_name) written for wait_for_resumption
  707: %% testcase.
  708: session_established(Config) ->
  709:     Alice = connect_fresh(Config, alice, presence),
  710:     C2SPid = mongoose_helper:get_session_pid(Alice),
  711:     session_established = mongoose_helper:get_c2s_state_name(C2SPid),
  712:     escalus_connection:stop(Alice).
  713: 
  714: %% Ensure that after a violent disconnection,
  715: %% the c2s waits for resumption (but don't resume yet).
  716: wait_for_resumption(Config) ->
  717:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  718:     Bob = connect_fresh(Config, bob, session),
  719:     Texts = three_texts(),
  720:     {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
  721:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session).
  722: 
  723: unacknowledged_message_hook_resume(Config) ->
  724:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config).
  725: 
  726: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) ->
  727:     NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
  728:     send_initial_presence(NewAlice),
  729:     {Resource, NewAlice}.
  730: 
  731: unacknowledged_message_hook_bounce(Config) ->
  732:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config).
  733: 
  734: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) ->
  735:     NewResource = <<"new_", Resource/binary>>,
  736:     NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}),
  737:     NewAlice = connect_spec(NewSpec, sr_session, manual),
  738:     send_initial_presence(NewAlice),
  739:     %% ensure second C2S is registered so all the messages are bounced properly
  740:     sm_helper:wait_for_resource_count(NewAlice, 2),
  741:     ok = rpc(mim(), sys, terminate, [C2SPid, normal]),
  742:     {NewResource, NewAlice}.
  743: 
  744: unacknowledged_message_hook_offline(Config) ->
  745:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config).
  746: 
  747: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) ->
  748:     C2SRef = erlang:monitor(process, C2SPid),
  749:     %%reset the session, so old C2S process is stopped
  750:     NewAlice = connect_spec(AliceSpec, sr_session, manual),
  751:     %% wait for old C2S termination before send presence. other way
  752:     %% some of the latest unacknowledged messages can be bounced to
  753:     %% the new C2S process instead of going to the mod_offline storage.
  754:     %% looks like all the unacknowledged messages arrive to the new
  755:     %% C2S, but the message sequence is broken (the bounced messages
  756:     %% delivered before the messages from the mod_offline storage)
  757:     sm_helper:wait_for_process_termination(C2SRef),
  758:     send_initial_presence(NewAlice),
  759:     {Resource, NewAlice}.
  760: 
  761: unacknowledged_message_hook_common(RestartConnectionFN, Config) ->
  762:     %% connect bob and alice
  763:     Bob = connect_fresh(Config, bob, presence),
  764: 
  765:     AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice),
  766:     Resource = proplists:get_value(username, AliceSpec0),
  767:     AliceSpec = [{resource, Resource} | AliceSpec0],
  768:     HookHandlerExtra = start_hook_listener(Resource),
  769:     Alice = connect_spec(AliceSpec, sr_presence, manual),
  770:     %% Ack the presence stanza
  771:     get_ack(Alice),
  772:     ack_initial_presence(Alice),
  773: 
  774:     SMID = sm_helper:client_to_smid(Alice),
  775: 
  776:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  777:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)),
  778:     %% kill alice connection
  779:     C2SPid = mongoose_helper:get_session_pid(Alice),
  780:     escalus_connection:kill(Alice),
  781:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  782:     sm_helper:assert_alive_resources(Alice, 1),
  783: 
  784:     escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  785:     escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  786:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  787: 
  788:     %% send some messages and check if c2s can handle it
  789:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)),
  790:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)),
  791:     escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  792:     escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  793:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  794: 
  795:     %% alice comes back and receives unacked message
  796:     {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid),
  797: 
  798:     mongoose_helper:wait_until(
  799:         fun() ->
  800:             Stanza = escalus_connection:get_stanza(NewAlice, msg),
  801:             escalus:assert(is_chat_message, [<<"msg-4">>], Stanza),
  802:             ok
  803:         end, ok),
  804: 
  805:     NewC2SPid = mongoose_helper:get_session_pid(NewAlice),
  806:     escalus_connection:kill(NewAlice),
  807:     mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session),
  808: 
  809:     escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  810:     escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  811:     escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  812:     escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  813:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  814:     stop_hook_listener(HookHandlerExtra),
  815:     escalus_connection:stop(Bob).
  816: 
  817: resume_session(Config) ->
  818:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  819:     Texts = three_texts(),
  820:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  821:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
  822:         %% Resume the session.
  823:         Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
  824:         %% Alice receives the unacked messages from the previous
  825:         %% interrupted session.
  826:         sm_helper:wait_for_messages(Alice, Texts),
  827:         %% Alice acks the received messages.
  828:         escalus_connection:send(Alice, escalus_stanza:sm_ack(5)),
  829:         escalus_connection:stop(Alice)
  830:     end).
  831: 
  832: resume_session_with_wrong_h_does_not_leak_sessions(Config) ->
  833:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  834:     Messages = three_texts(),
  835:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  836:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages),
  837:         %% Resume the session.
  838:         Alice = connect_spec(AliceSpec, auth, manual),
  839:         Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30),
  840:         escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed),
  841: 
  842:         [] = sm_helper:get_user_present_resources(Alice),
  843:         {error, smid_not_found} = sm_helper:get_sid_by_stream_id(SMID),
  844:         escalus_connection:wait_for_close(Alice, timer:seconds(5))
  845:     end).
  846: 
  847: resume_session_with_wrong_sid_returns_item_not_found(Config) ->
  848:     session_resumption_expects_item_not_found(Config, <<"wrong-sid">>).
  849: 
  850: resume_session_with_wrong_namespace_is_a_noop(Config) ->
  851:     Alice = connect_fresh(Config, alice, auth),
  852:     #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4),
  853:     Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}),
  854:     escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}),
  855:     escalus_assert:has_no_stanzas(Alice),
  856:     [] = sm_helper:get_user_present_resources(Alice),
  857:     true = escalus_connection:is_connected(Alice),
  858:     escalus_connection:stop(Alice).
  859: 
  860: resume_dead_session_results_in_item_not_found(Config) ->
  861:     SMID = base64:encode(crypto:strong_rand_bytes(21)),
  862:     SID = {os:timestamp(), undefined},
  863:     rpc(mim(), ?MOD_SM, register_smid, [SMID, SID]),
  864:     session_resumption_expects_item_not_found(Config, SMID).
  865: 
  866: session_resumption_expects_item_not_found(Config, SMID) ->
  867:     Alice = connect_fresh(Config, alice, auth),
  868:     Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2),
  869:     escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed),
  870:     [] = sm_helper:get_user_present_resources(Alice),
  871:     true = escalus_connection:is_connected(Alice),
  872:     escalus_connection:stop(Alice).
  873: 
  874: resume_session_kills_old_C2S_gracefully(Config) ->
  875:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  876:     C2SPid = mongoose_helper:get_session_pid(Alice),
  877: 
  878:     %% Monitor the C2S process and disconnect Alice.
  879:     MonitorRef = sm_helper:monitor_session(Alice),
  880:     escalus_client:kill_connection(Config, Alice),
  881: 
  882:     %% Ensure the c2s process is waiting for resumption.
  883:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  884: 
  885:     %% Resume the session.
  886:     NewAlice = connect_resume(Alice, 1),
  887: 
  888:     %% C2S process should die gracefully with Reason=normal.
  889:     sm_helper:wait_for_process_termination(MonitorRef),
  890:     escalus_connection:stop(NewAlice).
  891: 
  892: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) ->
  893:     Alice = connect_spec(AliceSpec, sr_presence, manual),
  894:     %% Bobs sends some messages to Alice.
  895:     sm_helper:send_messages(Bob, Alice, Texts),
  896:     %% Alice receives them, but doesn't ack.
  897:     sm_helper:wait_for_messages(Alice, Texts),
  898:     %% Alice's connection is violently terminated.
  899:     escalus_client:kill_connection(Config, Alice),
  900:     C2SPid = mongoose_helper:get_session_pid(Alice),
  901:     SMID = sm_helper:client_to_smid(Alice),
  902:     {C2SPid, SMID}.
  903: 
  904: aggressively_pipelined_resume(Config) ->
  905:     AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]}
  906:                  | escalus_fresh:create_fresh_user(Config, alice)],
  907:     UnackedMessages = three_texts(),
  908:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  909:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages),
  910:         %% Resume the session.
  911:         Alice = escalus_connection:connect(AliceSpec),
  912: 
  913:         Username = proplists:get_value(username, AliceSpec),
  914:         Password = proplists:get_value(password, AliceSpec),
  915:         Payload = <<0:8,Username/binary,0:8,Password/binary>>,
  916:         Server = proplists:get_value(server, AliceSpec),
  917: 
  918:         Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>),
  919:         Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]),
  920:         AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>),
  921:         Resume = escalus_stanza:resume(SMID, 2),
  922: 
  923:         escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]),
  924:         Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)],
  925:         escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)),
  926: 
  927:         escalus_connection:stop(Alice)
  928:     end).
  929: 
  930: %% This is a regression test for a case when a session processes a request, which will
  931: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc.
  932: %% Without proper handling, the reply would be rejected because the resumed session
  933: %% has new SID.
  934: replies_are_processed_by_resumed_session(Config) ->
  935:     %% GIVEN a session and registered special IQ handler (added in init_per_testcase),
  936:     %% that waits for old session process to terminate (at this point new process
  937:     %% has fully taken over) and then actually sends the reply.
  938:     Alice = connect_fresh(Config, alice, sr_presence),
  939: 
  940:     %% WHEN a client sends IQ request to the special handler...
  941:     IQReq = escalus_stanza:iq_get(regression_ns(), []),
  942:     escalus:send(Alice, IQReq),
  943: 
  944:     %% ... goes down and session is resumed.
  945:     Alice2 = sm_helper:kill_and_connect_resume(Alice),
  946: 
  947:     %% THEN the client receives the reply properly.
  948:     IQReply = escalus:wait_for_stanza(Alice2),
  949:     escalus:assert(is_iq_result, [IQReq], IQReply),
  950:     escalus_connection:stop(Alice2).
  951: 
  952: %% This is a regression test for a bug, which manifested in following scenario
  953: %% (due to improper presence sub requests buffering):
  954: %% 1. A is online, B is offline
  955: %% 2. A subscribes to B's presence;
  956: %% 3. B becomes online
  957: %% 4. A sends a message to B
  958: %% 5. B doesn't SM-ack the request or message, terminates the connection
  959: %% 6. B reconnects but with session *replace*, not resume
  960: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer
  961: %% 8. B doesn't receive the buffered message
  962: subscription_requests_are_buffered_properly(Config) ->
  963:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  964:     MsgBody = <<"buffered">>,
  965:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  966:         % GIVEN Bob's pending subscription to Alice's presence
  967:         AliceJid = common_helper:get_bjid(AliceSpec),
  968:         escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)),
  969:         _RosterPushReq = escalus:wait_for_stanza(Bob),
  970: 
  971:         % WHEN Alice becomes online...
  972:         Alice = connect_spec(AliceSpec, sr_session, manual),
  973:         send_initial_presence(Alice),
  974:         %% subscribe could come before the initial presence
  975:         escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)],
  976:                             escalus:wait_for_stanzas(Alice, 2)),
  977: 
  978:         % ...and Bob sends a message to Alice...
  979:         escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
  980:         MsgStanza = escalus:wait_for_stanza(Alice),
  981:         escalus:assert(is_chat_message, [MsgBody], MsgStanza),
  982: 
  983:         % ...and Alice terminates connection without acking anything...
  984:         escalus_client:kill_connection(Config, Alice),
  985: 
  986:         % ...and reconnects with session replacement.
  987:         Alice2 = connect_spec(AliceSpec, session, manual),
  988: 
  989:         % THEN Alice receives (without sending initial presence):
  990:         % * buffered Bob's message (like above)
  991:         % Alice DOESN'T receive:
  992:         % * buffered subscription request because it is dropped by ejabberd_sm
  993:         %   because it's treated like repeated sub request to bare JID, so it's not
  994:         %   processed by any sub req handler (like mod_roster)
  995:         % * buffered available presence from Alice - because it is addressed to another SID
  996:         %   and Alice2 is a brand new session
  997:         escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)),
  998:         sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>),
  999:         escalus_assert:has_no_stanzas(Alice2),
 1000: 
 1001:         %% Only once an initial presence is sent, a subscription request is sent
 1002:         send_initial_presence(Alice2),
 1003:         escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)],
 1004:                             escalus:wait_for_stanzas(Alice2, 2)),
 1005: 
 1006:         sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>),
 1007:         escalus_assert:has_no_stanzas(Alice2),
 1008: 
 1009:         escalus_connection:stop(Alice2)
 1010:     end).
 1011: 
 1012: %% This is a regression test for a bug, due to which messages sent to old session
 1013: %% in a middle of state handover were not appended properly to SM buffer.
 1014: %% Scenario to reproduce:
 1015: %% 1. Online Bob and Alice
 1016: %% 2. Alice kills the connection
 1017: %% 3. Alice's session is suspended
 1018: %% 4. Alice resumes session with new connection. At this moment new session is still not
 1019: %%    present in session table. `resume` request is stuck in old proc mailbox.
 1020: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now
 1021: %%    old session has two messages in mailbox: `resume` and XML from Bob
 1022: %% 6. We resume old process and it begins session handover
 1023: %% 7. Bob's message is appended to SM buffer in "flush" step
 1024: %% 8. With bug fixed, the message is retransmitted properly
 1025: messages_are_properly_flushed_during_resumption(Config) ->
 1026:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1027:         % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended
 1028:         Alice = connect_fresh(Config, alice, sr_presence),
 1029:         SMH = escalus_connection:get_sm_h(Alice),
 1030:         escalus_client:kill_connection(Config, Alice),
 1031:         %% The receiver process would stop now
 1032:         C2SPid = mongoose_helper:get_session_pid(Alice),
 1033:         mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
 1034: 
 1035:         sm_helper:wait_for_queue_length(C2SPid, 0),
 1036:         ok = rpc(mim(), sys, suspend, [C2SPid]),
 1037: 
 1038:         % WHEN new session requests resumption
 1039:         % we wait until that old session has resumption request enqueued;
 1040:         % we need it to ensure the order of messages: resume first, Bob's chat second.
 1041:         % Actual wait and message sent by Bob is done in separate process
 1042:         % because new client start will block until old process is resumed
 1043: 
 1044:         MsgBody = <<"flush-regression">>,
 1045:         spawn_link(fun() ->
 1046:                       sm_helper:wait_for_queue_length(C2SPid, 1),
 1047: 
 1048:                       % Bob sends a message...
 1049:                       escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1050: 
 1051:                       % ...we ensure that a message is enqueued in Alice's session...
 1052:                       % (2 messages = resume request + Bob's message)
 1053:                       sm_helper:wait_for_queue_length(C2SPid, 2),
 1054: 
 1055:                       % ...and old process is resumed.
 1056:                       ok = rpc(mim(), sys, resume, [C2SPid])
 1057:               end),
 1058:         Alice2 = connect_resume(Alice, SMH),
 1059:         % THEN Alice's new session receives Bob's message
 1060:         RecvMsg = escalus:wait_for_stanza(Alice2),
 1061:         escalus:assert(is_chat_message, [MsgBody], RecvMsg)
 1062:       end).
 1063: 
 1064: messages_are_properly_flushed_during_resumption_p1_fsm_old(Config) ->
 1065:     %% the same as messages_are_properly_flushed_during_resumption,
 1066:     %% but tests that buffered by p1_fsm_old messages are delivered
 1067:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1068:         Alice = connect_fresh(Config, alice, sr_presence),
 1069:         SMH = escalus_connection:get_sm_h(Alice),
 1070:         escalus_client:kill_connection(Config, Alice),
 1071:         C2SPid = mongoose_helper:get_session_pid(Alice),
 1072:         mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
 1073:         ok = rpc(mim(), sys, suspend, [C2SPid]),
 1074: 
 1075:         %% send some dummy event. ignored by c2s but ensures that
 1076:         %% p1_old_fsm buffers the messages, sent after this one
 1077:         rpc(mim(), p1_fsm_old, send_all_state_event, [C2SPid, dummy_event]),
 1078: 
 1079:         MsgBody = <<"flush-regression">>,
 1080:         spawn_link(fun() ->
 1081:                     sm_helper:wait_for_queue_length(C2SPid, 2),
 1082: 
 1083:                     % Bob sends a message...
 1084:                     escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1085: 
 1086:                     % ...we ensure that a message is enqueued in Alice's session...
 1087:                     % (2 messages = resume request + Bob's message)
 1088:                     sm_helper:wait_for_queue_length(C2SPid, 3),
 1089: 
 1090:                     % ...and old process is resumed.
 1091:                     ok = rpc(mim(), sys, resume, [C2SPid])
 1092:               end),
 1093:         Alice2 = connect_resume(Alice, SMH),
 1094:         % THEN Alice's new session receives Bob's message
 1095:         RecvMsg = escalus:wait_for_stanza(Alice2),
 1096:         escalus:assert(is_chat_message, [MsgBody], RecvMsg)
 1097:       end).
 1098: 
 1099: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) ->
 1100:     Alice = connect_fresh(Config, alice, session, manual),
 1101:     %% Should not crash anything!
 1102:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
 1103:     Response = escalus_connection:get_stanza(Alice, service_unavailable),
 1104:     escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response),
 1105:     escalus_connection:stop(Alice).
 1106: 
 1107: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) ->
 1108:     Alice = connect_fresh(Config, alice, session, manual),
 1109:     %% Should not crash anything!
 1110:     escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])),
 1111:     Response = escalus_connection:get_stanza(Alice, service_unavailable),
 1112:     escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response),
 1113:     escalus_connection:stop(Alice).
 1114: 
 1115: %%--------------------------------------------------------------------
 1116: %% Helpers
 1117: %%--------------------------------------------------------------------
 1118: start_hook_listener(Resource) ->
 1119:     TestCasePid = self(),
 1120:     rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]).
 1121: 
 1122: stop_hook_listener(HookExtra) ->
 1123:     rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]).
 1124: 
 1125: rpc_start_hook_handler(TestCasePid, User, HostType) ->
 1126:     LUser = jid:nodeprep(User),
 1127:     Extra = #{luser => LUser, pid => TestCasePid},
 1128:     gen_hook:add_handler(unacknowledged_message, HostType,
 1129:                          fun ?MODULE:hook_handler_fn/3,
 1130:                          Extra, 50),
 1131:     Extra.
 1132: 
 1133: rpc_stop_hook_handler(HookExtra, HostType) ->
 1134:     gen_hook:delete_handler(unacknowledged_message, HostType,
 1135:                             fun ?MODULE:hook_handler_fn/3,
 1136:                             HookExtra, 50).
 1137: 
 1138: hook_handler_fn(Acc,
 1139:                 #{args := [Jid]} = _Params,
 1140:                 #{luser := LUser, pid := TestCasePid} = _Extra) ->
 1141:     {U, _S, R} = jid:to_lower(Jid),
 1142:     case U of
 1143:         LUser ->
 1144:             Counter = mongoose_acc:get(sm_test, counter, 0, Acc),
 1145:             El = mongoose_acc:element(Acc),
 1146:             TestCasePid ! {sm_test, Counter, R, El},
 1147:             {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)};
 1148:         _ -> {ok, Acc}
 1149:     end.
 1150: 
 1151: wait_for_unacked_msg_hook(Counter, Res, Timeout) ->
 1152:     receive
 1153:         {sm_test, AccCounter, Resource, Stanza} = Msg ->
 1154:             ?assertEqual(Counter, AccCounter, Msg),
 1155:             ?assertEqual(Res, Resource, Msg),
 1156:             Stanza
 1157:     after Timeout ->
 1158:         timeout
 1159:     end.
 1160: 
 1161: is_chat(Content) ->
 1162:     fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end.
 1163: 
 1164: is_presence(Type) ->
 1165:     fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end.
 1166: 
 1167: three_texts() ->
 1168:     [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>].
 1169: 
 1170: %%--------------------------------------------------------------------
 1171: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session"
 1172: %%--------------------------------------------------------------------
 1173: 
 1174: regression_ns() ->
 1175:     <<"regression">>.
 1176: 
 1177: register_handler() ->
 1178:     HostType = host_type(),
 1179:     rpc(mim(), gen_iq_handler, add_iq_handler_for_domain,
 1180:         [HostType, regression_ns(), ejabberd_sm,
 1181:          fun ?MODULE:regression_handler/5, #{}, one_queue]).
 1182: 
 1183: unregister_handler() ->
 1184:     HostType = host_type(),
 1185:     rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain,
 1186:         [HostType, regression_ns(), ejabberd_sm]).
 1187: 
 1188: regression_handler(Acc, _From, _To, IQ, _Extra) ->
 1189:     %% A bit of a hack - will no longer work when the SID format changes
 1190:     {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
 1191:     erlang:monitor(process, Pid),
 1192:     receive
 1193:         {'DOWN', _, _, _, _} ->
 1194:             ok
 1195:     after
 1196:         10000 ->
 1197:             error({c2s_not_stopped_after_timeout, Pid})
 1198:     end,
 1199:     %% We avoid another race condition - there is a short window where user session
 1200:     %% is not registered in ejabberd_sm: between old process termination and the moment
 1201:     %% when the new process stores new session in memory. It should be fixed separately.
 1202:     wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100),
 1203:     {Acc, jlib:make_result_iq_reply(IQ)}.
 1204: 
 1205: wait_for_session(JID, Retries, SleepTime) ->
 1206:     case ejabberd_sm:get_session(JID) of
 1207:         offline ->
 1208:             timer:sleep(SleepTime),
 1209:             wait_for_session(JID, Retries - 1, SleepTime);
 1210:         _ ->
 1211:             ok
 1212:     end.
 1213: 
 1214: maybe_ack_initial_presence(Alice, ack) ->
 1215:     ack_initial_presence(Alice);
 1216: maybe_ack_initial_presence(_Alice, no_ack) ->
 1217:     ok.