1: %% Stream Management tests
    2: -module(sm_SUITE).
    3: 
    4: -compile([export_all, nowarn_export_all]).
    5: -include_lib("exml/include/exml.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: -include_lib("escalus/include/escalus.hrl").
    8: -include_lib("common_test/include/ct.hrl").
    9: 
   10: %% Injected code callbacks
   11: -export([rpc_start_hook_handler/3,
   12:          rpc_stop_hook_handler/2,
   13:          hook_handler_fn/3,
   14:          regression_handler/5]).
   15: 
   16: -export([rpc_start_filter_hook_handler/3,
   17:          rpc_stop_filter_hook_handler/2,
   18:          filter_hook_handler_fn/3]).
   19: 
   20: -import(distributed_helper, [mim/0,
   21:                              require_rpc_nodes/1,
   22:                              rpc/4]).
   23: 
   24: -import(domain_helper, [host_type/0]).
   25: 
   26: -import(sm_helper, [connect_fresh/3,
   27:                     connect_fresh/4,
   28:                     connect_spec/2,
   29:                     connect_spec/3,
   30:                     connect_same/2,
   31:                     connect_same/3,
   32:                     connect_resume/2,
   33:                     process_initial_stanza/1,
   34:                     send_initial_presence/1,
   35:                     get_ack/1,
   36:                     ack_initial_presence/1]).
   37: 
   38: -define(MOD_SM, mod_stream_management).
   39: -define(CONSTRAINT_CHECK_TIMEOUT, 5000).
   40: -define(LONG_TIMEOUT, 3600).
   41: -define(SHORT_TIMEOUT, 1).
   42: -define(SMALL_SM_BUFFER, 3).
   43: 
   44: %%--------------------------------------------------------------------
   45: %% Suite configuration
   46: %%--------------------------------------------------------------------
   47: 
   48: suite() ->
   49:     require_rpc_nodes([mim]) ++ escalus:suite().
   50: 
   51: all() ->
   52:     ct_helper:groups_to_all(groups()).
   53: 
   54: groups() ->
   55:     P = [parallel],
   56:     [{parallel, P, parallel_cases()},
   57:      {parallel_manual_ack_freq_1, P, parallel_manual_ack_freq_1_cases()},
   58:      {manual_ack_freq_2, [], manual_ack_freq_2_cases()},
   59:      {stale_h, [], stale_h_cases()},
   60:      {parallel_unacknowledged_message_hook, P, parallel_unacknowledged_message_hook_cases()}].
   61: 
   62: parallel_cases() ->
   63:     [server_announces_sm,
   64:      server_enables_sm_before_session,
   65:      server_enables_sm_after_session,
   66:      server_returns_failed_after_start,
   67:      server_returns_failed_after_auth,
   68:      server_enables_resumption,
   69:      client_enables_sm_twice_fails_with_correct_error_stanza,
   70:      session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza,
   71:      session_resumed_and_old_session_dead_doesnt_route_error_to_new_session,
   72:      basic_ack,
   73:      h_ok_before_session,
   74:      h_ok_after_session_enabled_before_session,
   75:      h_ok_after_session_enabled_after_session,
   76:      h_ok_after_a_chat,
   77:      h_non_given_closes_stream_gracefully,
   78:      resend_unacked_on_reconnection,
   79:      session_established,
   80:      wait_for_resumption,
   81:      resume_session,
   82:      resume_session_with_wrong_h_does_not_leak_sessions,
   83:      resume_session_with_wrong_sid_returns_item_not_found,
   84:      resume_session_with_wrong_namespace_is_a_noop,
   85:      resume_dead_session_results_in_item_not_found,
   86:      resume_session_kills_old_C2S_gracefully,
   87:      aggressively_pipelined_resume,
   88:      replies_are_processed_by_resumed_session,
   89:      subscription_requests_are_buffered_properly,
   90:      messages_are_properly_flushed_during_resumption,
   91:      messages_are_properly_flushed_during_resumption_p1_fsm_old].
   92: 
   93: parallel_manual_ack_freq_1_cases() ->
   94:     [client_acks_more_than_sent,
   95:      too_many_unacked_stanzas,
   96:      resend_unacked_after_resume_timeout,
   97:      resume_session_state_send_message_with_ack,
   98:      resume_session_state_send_message_without_ack,
   99:      resume_session_state_stop_c2s,
  100:      server_requests_ack_after_session,
  101:      resend_more_offline_messages_than_buffer_size,
  102:      server_requests_ack].
  103: 
  104: manual_ack_freq_2_cases() ->
  105:     [server_requests_ack_freq_2].
  106: 
  107: stale_h_cases() ->
  108:     [resume_expired_session_returns_correct_h,
  109:      gc_repeat_after_never_means_no_cleaning,
  110:      gc_repeat_after_timeout_does_clean].
  111: 
  112: stream_mgmt_disabled_cases() ->
  113:     [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt,
  114:      no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption].
  115: 
  116: manual_ack_freq_long_session_timeout_cases() ->
  117:     [preserve_order].
  118: 
  119: parallel_unacknowledged_message_hook_cases() ->
  120:     [unacknowledged_message_hook_bounce,
  121:      unacknowledged_message_hook_offline,
  122:      unacknowledged_message_hook_resume,
  123:      unacknowledged_message_hook_filter].
  124: 
  125: %%--------------------------------------------------------------------
  126: %% Init & teardown
  127: %%--------------------------------------------------------------------
  128: 
  129: init_per_suite(Config) ->
  130:     NewConfig = dynamic_modules:save_modules(host_type(), Config),
  131:     NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true),
  132:     mongoose_helper:inject_module(?MODULE),
  133:     escalus:init_per_suite(NewConfigWithSM).
  134: 
  135: end_per_suite(Config) ->
  136:     escalus_fresh:clean(),
  137:     dynamic_modules:restore_modules(Config),
  138:     escalus:end_per_suite(Config).
  139: 
  140: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook;
  141:                                    Group =:= manual_ack_freq_long_session_timeout;
  142:                                    Group =:= parallel_manual_ack_freq_1;
  143:                                    Group =:= manual_ack_freq_2 ->
  144:     dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
  145:     Config;
  146: init_per_group(stale_h, Config) ->
  147:     Config;
  148: init_per_group(stream_mgmt_disabled, Config) ->
  149:     dynamic_modules:stop(host_type(), ?MOD_SM),
  150:     rpc(mim(), mnesia, delete_table, [sm_session]),
  151:     Config;
  152: init_per_group(Group, Config) ->
  153:     dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
  154:     Config.
  155: 
  156: end_per_group(_Group, _Config) ->
  157:     ok.
  158: 
  159: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
  160:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  161:     escalus:init_per_testcase(CN, Config);
  162: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning;
  163:                                    CN =:= gc_repeat_after_timeout_does_clean ->
  164:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  165:     Config2 = register_some_smid_h(Config),
  166:     escalus:init_per_testcase(CN, Config2);
  167: init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
  168:     escalus:init_per_testcase(CN, Config);
  169: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
  170:     register_handler(),
  171:     escalus:init_per_testcase(CN, Config);
  172: init_per_testcase(CaseName, Config) ->
  173:     escalus:init_per_testcase(CaseName, Config).
  174: 
  175: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h;
  176:                                   CN =:= gc_repeat_after_never_means_no_cleaning;
  177:                                   CN =:= gc_repeat_after_timeout_does_clean ->
  178:     Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]),
  179:     rpc(mim(), ejabberd_sup, stop_child, [Name]),
  180:     escalus:end_per_testcase(CN, Config);
  181: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
  182:     unregister_handler(),
  183:     escalus:end_per_testcase(CN, Config);
  184: end_per_testcase(CaseName, Config) ->
  185:     escalus:end_per_testcase(CaseName, Config).
  186: 
  187: %% Module configuration per group (in case of stale_h group it is per testcase)
  188: 
  189: required_modules(Scope, Name) ->
  190:     SMConfig = case required_sm_opts(Scope, Name) of
  191:                    stopped -> stopped;
  192:                    ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
  193:                end,
  194:     [{mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
  195:      {mod_offline, config_parser_helper:mod_config(mod_offline, #{})}].
  196: 
  197: required_sm_opts(group, parallel) ->
  198:     #{ack_freq => never};
  199: required_sm_opts(group, parallel_manual_ack_freq_1) ->
  200:     #{ack_freq => 1,
  201:       resume_timeout => ?SHORT_TIMEOUT};
  202: required_sm_opts(group, manual_ack_freq_2) ->
  203:     #{ack_freq => 2};
  204: required_sm_opts(group, stream_mgmt_disabled) ->
  205:     stopped;
  206: required_sm_opts(group, parallel_unacknowledged_message_hook) ->
  207:     #{ack_freq => 1};
  208: required_sm_opts(group, manual_ack_freq_long_session_timeout) ->
  209:     #{ack_freq => 1, buffer_max => 1000};
  210: required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
  211:     #{ack_freq => 1,
  212:       resume_timeout => ?SHORT_TIMEOUT,
  213:       stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)};
  214: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
  215:     #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)};
  216: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
  217:     #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)}.
  218: 
  219: common_sm_opts() ->
  220:     #{buffer_max => ?SMALL_SM_BUFFER}.
  221: 
  222: stale_h(RepeatAfter, Geriatric) ->
  223:     #{enabled => true,
  224:       repeat_after => RepeatAfter,
  225:       geriatric => Geriatric}.
  226: 
  227: make_smid() ->
  228:     base64:encode(crypto:strong_rand_bytes(21)).
  229: 
  230: register_smid(IntSmidId) ->
  231:     S = {SMID = make_smid(), IntSmidId},
  232:     ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]),
  233:     S.
  234: 
  235: register_some_smid_h(Config) ->
  236:     TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)),
  237:     [{smid_test, TestSmids} | Config].
  238: 
  239: %%--------------------------------------------------------------------
  240: %% Tests
  241: %%--------------------------------------------------------------------
  242: 
  243: server_announces_sm(Config) ->
  244:     AliceSpec = escalus_fresh:freshen_spec(Config, alice),
  245:     {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec,
  246:                                                                       [start_stream]),
  247:     true = escalus_session:can_use_stream_management(Props, Features).
  248: 
  249: 
  250: server_enables_sm_before_session(Config) ->
  251:     connect_fresh(Config, alice, sm_after_bind).
  252: 
  253: server_enables_sm_after_session(Config) ->
  254:     connect_fresh(Config, alice, sm_after_session).
  255: 
  256: server_returns_failed_after_start(Config) ->
  257:     Alice = connect_fresh(Config, alice, before_auth),
  258:     server_returns_failed(Alice).
  259: 
  260: server_returns_failed_after_auth(Config) ->
  261:     Alice = connect_fresh(Config, alice, auth),
  262:     server_returns_failed(Alice).
  263: 
  264: server_enables_resumption(Config) ->
  265:     Alice = connect_fresh(Config, alice, sr_presence),
  266:     escalus_connection:stop(Alice).
  267: 
  268: server_returns_failed(Alice) ->
  269:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
  270:     escalus:assert(is_sm_failed, [<<"unexpected-request">>],
  271:                    escalus_connection:get_stanza(Alice, enable_sm_failed)).
  272: 
  273: client_enables_sm_twice_fails_with_correct_error_stanza(Config) ->
  274:     Alice = connect_fresh(Config, alice, sm_before_session),
  275:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
  276:     escalus:assert(is_sm_failed, [<<"unexpected-request">>],
  277:                    escalus_connection:get_stanza(Alice, enable_sm_failed)),
  278:     escalus:assert(is_stream_end,
  279:                    escalus_connection:get_stanza(Alice, enable_sm_failed)),
  280:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)).
  281: 
  282: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) ->
  283:     %% GIVEN USER WITH STREAM RESUMPTION ENABLED
  284:     Alice = connect_fresh(Config, alice, sr_presence),
  285:     SMH = escalus_connection:get_sm_h(Alice),
  286:     %% WHEN USER RESUMES SESSION FROM NEW CLIENT
  287:     Alice2 = connect_resume(Alice, SMH),
  288:     process_initial_stanza(Alice2),
  289:     %% THEN: Old session is gracefully closed with the correct error stanza
  290:     escalus:assert(is_stream_error, [<<"conflict">>, <<>>],
  291:                    escalus_connection:get_stanza(Alice, close_old_stream)),
  292:     escalus:assert(is_stream_end,
  293:                    escalus_connection:get_stanza(Alice, close_old_stream)),
  294:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)),
  295:     true = escalus_connection:is_connected(Alice2),
  296:     escalus_connection:stop(Alice2).
  297: 
  298: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) ->
  299:     %% GIVEN USER WITH STREAM RESUMPTION ENABLED
  300:     Alice = connect_fresh(Config, alice, sr_presence),
  301:     %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT
  302:     Alice2 = sm_helper:kill_and_connect_resume(Alice),
  303:     process_initial_stanza(Alice2),
  304:     %% THEN new session does not have any message rerouted
  305:     false = escalus_client:has_stanzas(Alice2),
  306:     true = escalus_connection:is_connected(Alice2),
  307:     escalus_connection:stop(Alice2).
  308: 
  309: basic_ack(Config) ->
  310:     Alice = connect_fresh(Config, alice, sm_after_session),
  311:     escalus_connection:send(Alice, escalus_stanza:roster_get()),
  312:     escalus:assert(is_roster_result,
  313:                    escalus_connection:get_stanza(Alice, roster_result)),
  314:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  315:     escalus:assert(is_sm_ack,
  316:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  317: 
  318: %% Test that "h" value is valid when:
  319: %% - SM is enabled *before* the session is established
  320: %% - <r/> is sent *before* the session is established
  321: h_ok_before_session(Config) ->
  322:     Alice = connect_fresh(Config, alice, sm_after_bind),
  323:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  324:     escalus:assert(is_sm_ack, [0],
  325:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  326: 
  327: %% Test that "h" value is valid when:
  328: %% - SM is enabled *before* the session is established
  329: %% - <r/> is sent *after* the session is established
  330: h_ok_after_session_enabled_before_session(Config) ->
  331:     Alice = connect_fresh(Config, alice, sm_before_session),
  332:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  333:     escalus:assert(is_sm_ack, [1],
  334:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  335: 
  336: %% Test that "h" value is valid when:
  337: %% - SM is enabled *after* the session is established
  338: %% - <r/> is sent *after* the session is established
  339: h_ok_after_session_enabled_after_session(Config) ->
  340:     Alice = connect_fresh(Config, alice, sm_after_session),
  341:     escalus_connection:send(Alice, escalus_stanza:roster_get()),
  342:     escalus:assert(is_roster_result,
  343:                    escalus_connection:get_stanza(Alice, roster_result)),
  344:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  345:     escalus:assert(is_sm_ack, [1],
  346:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  347: 
  348: %% Test that "h" value is valid after exchanging a few messages.
  349: h_ok_after_a_chat(ConfigIn) ->
  350:     Config = escalus_users:update_userspec(ConfigIn, alice,
  351:                                            stream_management, true),
  352:     escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) ->
  353:         escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)),
  354:         escalus:assert(is_chat_message, [<<"Hi, Bob!">>],
  355:                        escalus:wait_for_stanza(Bob)),
  356:         escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)),
  357:         escalus:assert(is_chat_message, [<<"Hi, Alice!">>],
  358:                        escalus:wait_for_stanza(Alice)),
  359:         escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)),
  360:         escalus:assert(is_chat_message, [<<"How's life?">>],
  361:                        escalus:wait_for_stanza(Alice)),
  362:         escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)),
  363:         escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>],
  364:                        escalus:wait_for_stanza(Bob)),
  365:         escalus:send(Alice, escalus_stanza:sm_request()),
  366:         escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)),
  367:         %% Ack, so that unacked messages don't go into offline store.
  368:         escalus:send(Alice, escalus_stanza:sm_ack(3))
  369:     end).
  370: 
  371: h_non_given_closes_stream_gracefully(ConfigIn) ->
  372:     AStanza = #xmlel{name = <<"a">>,
  373:                attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]},
  374:     Config = escalus_users:update_userspec(ConfigIn, alice,
  375:                                            stream_management, true),
  376:     escalus:fresh_story(Config, [{alice,1}], fun(Alice) ->
  377:         C2SPid = mongoose_helper:get_session_pid(Alice),
  378:         escalus:send(Alice, AStanza),
  379:         escalus:assert(is_stream_error,
  380:                        [<<"policy-violation">>, <<>>],
  381:                        escalus:wait_for_stanza(Alice)),
  382:         mongoose_helper:wait_for_pid_to_die(C2SPid),
  383:         escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)),
  384:         true = escalus_connection:wait_for_close(Alice, timer:seconds(5))
  385:     end).
  386: 
  387: client_acks_more_than_sent(Config) ->
  388:     Alice = connect_fresh(Config, alice, sm_after_session),
  389:     escalus:send(Alice, escalus_stanza:sm_ack(5)),
  390:     StreamErrorStanza = escalus:wait_for_stanza(Alice),
  391:     %% Assert "undefined-condition" children
  392:     escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza),
  393:     %% Assert "handled-count-too-high" children with correct attributes
  394:     HandledCountSubElement = exml_query:path(StreamErrorStanza,
  395:                                              [{element_with_ns,
  396:                                                <<"handled-count-too-high">>,
  397:                                                <<"urn:xmpp:sm:3">>}]),
  398:     <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>),
  399:     <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>),
  400:     %% Assert graceful stream end
  401:     escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)),
  402:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)).
  403: 
  404: too_many_unacked_stanzas(Config) ->
  405:     Bob = connect_fresh(Config, bob, presence),
  406:     Alice = connect_fresh(Config, alice, sm_presence, manual),
  407:     get_ack(Alice),
  408:     [escalus:send(Bob, escalus_stanza:chat_to(Alice,
  409:         <<(integer_to_binary(N))/binary, ": Hi, Alice!">>))
  410:      || N <- lists:seq(1,?SMALL_SM_BUFFER)],
  411:     escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests
  412:     escalus:assert(is_stream_error, [<<"resource-constraint">>,
  413:                                      <<"too many unacked stanzas">>],
  414:                    %% wait for deferred buffer check
  415:                    escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)).
  416: 
  417: server_requests_ack(Config) ->
  418:     server_requests_ack(Config, 1).
  419: 
  420: server_requests_ack_freq_2(Config) ->
  421:     server_requests_ack(Config, 2).
  422: 
  423: server_requests_ack(Config, N) ->
  424:     Bob = connect_fresh(Config, bob, presence),
  425:     Alice = connect_fresh(Config, alice, sm_presence, manual),
  426:     %% ack request after initial presence
  427:     maybe_assert_ack_request(1, N, Alice),
  428:     escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)),
  429:     escalus:assert(is_chat_message, [<<"Hi, Alice!">>],
  430:                    escalus:wait_for_stanza(Alice)),
  431:     maybe_assert_ack_request(2, N, Alice).
  432: 
  433: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) ->
  434:     ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]),
  435:     case StanzasRec rem AckRequests of
  436:         0 ->
  437:             escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice));
  438:         _ ->
  439:             ok
  440:     end,
  441:     StanzasRec.
  442: 
  443: server_requests_ack_after_session(Config) ->
  444:     Alice = connect_fresh(Config, alice, sm_before_session, manual),
  445:     escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)).
  446: 
  447: resend_more_offline_messages_than_buffer_size(Config) ->
  448:     %% connect bob and alice
  449:     Bob = connect_fresh(Config, bob, presence),
  450:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  451: 
  452:     % sent some messages - more than unacked buffer size
  453:     MessagesToSend = ?SMALL_SM_BUFFER + 1,
  454:     JID = common_helper:get_bjid(AliceSpec),
  455:     [escalus_connection:send(Bob, escalus_stanza:chat_to(JID, integer_to_binary(I)))
  456:      || I <- lists:seq(1, MessagesToSend)],
  457: 
  458:     % connect alice who wants to receive all messages from offline storage
  459:     Alice = connect_spec(AliceSpec, sm_after_session, manual),
  460:     mongoose_helper:wait_for_n_offline_messages(Alice, MessagesToSend),
  461:     send_initial_presence(Alice),
  462: 
  463:     escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests
  464: 
  465:     escalus_connection:get_stanza(Alice, presence),
  466:     get_ack(Alice), % ack request
  467: 
  468:     % confirm messages + presence
  469:     escalus_connection:send(Alice, escalus_stanza:sm_ack(4)),
  470:     % wait for check constraint message on server side
  471: 
  472:     ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000),
  473:     false = escalus_client:has_stanzas(Alice),
  474:     % should not receive anything especially any stream errors
  475: 
  476:     escalus_connection:stop(Alice),
  477:     escalus_connection:stop(Bob).
  478: 
  479: resend_unacked_on_reconnection(Config) ->
  480:     Texts = three_texts(),
  481:     Bob = connect_fresh(Config, bob, presence),
  482:     Alice = connect_fresh(Config, alice, sm_presence),
  483:     AliceSpec = sm_helper:client_to_spec0(Alice),
  484:     %% Bob sends some messages to Alice.
  485:     sm_helper:send_messages(Bob, Alice, Texts),
  486:     %% Alice receives the messages.
  487:     sm_helper:wait_for_messages(Alice, Texts),
  488:     %% Alice disconnects without acking the messages.
  489:     sm_helper:stop_client_and_wait_for_termination(Alice),
  490:     %% Messages go to the offline store.
  491:     %% Alice receives the messages from the offline store.
  492:     NewAlice = connect_spec(AliceSpec, session, manual),
  493:     send_initial_presence(NewAlice),
  494:     sm_helper:wait_for_messages(NewAlice, Texts),
  495:     %% Alice acks the delayed messages so they won't go again
  496:     %% to the offline store.
  497:     escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)).
  498: 
  499: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order
  500: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM
  501: %%      and more strict tests, reproducing delays in SM and in mod_offline.
  502: preserve_order(Config) ->
  503:     %% connect bob and alice
  504:     Bob = connect_fresh(Config, bob, presence),
  505:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  506:     AliceSpec = sm_helper:client_to_spec(Alice),
  507:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)),
  508: 
  509:     %% kill alice connection
  510:     escalus_connection:kill(Alice),
  511:     C2SPid = mongoose_helper:get_session_pid(Alice),
  512:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  513: 
  514:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)),
  515:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)),
  516: 
  517:     NewAlice = connect_spec(AliceSpec, session, manual),
  518:     escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])),
  519: 
  520:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)),
  521:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)),
  522: 
  523:     %% Without this check we will get stuff out of order
  524:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 5),
  525: 
  526:     send_initial_presence(NewAlice),
  527:     %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered
  528:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 0),
  529:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)),
  530: 
  531:     %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm)
  532:     receive_all_ordered(NewAlice, 6),
  533: 
  534:     % replace connection
  535:     NewAlice2 = connect_spec(AliceSpec, session, manual),
  536:     % allow messages to go to the offline storage
  537:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 6),
  538: 
  539:     send_initial_presence(NewAlice2),
  540: 
  541:     % receves messages in correct order
  542:     receive_all_ordered(NewAlice2, 6),
  543: 
  544:     escalus_connection:stop(Bob),
  545:     escalus_connection:stop(NewAlice2).
  546: 
  547: receive_all_ordered(Conn, Last) ->
  548:     receive_all_ordered(Conn, 1, Last, []).
  549: 
  550: %% Receive messages from N to Last.
  551: %% Ignores acks and presences.
  552: %% Handles case when out of order and when not enough stanzas.
  553: receive_all_ordered(_Conn, N, Last, Acc) when N > Last ->
  554:     Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)),
  555:     sm_helper:assert_messages(Acc, Texts);
  556: receive_all_ordered(Conn, N, Last, Acc) ->
  557:     Stanzas = escalus:wait_for_stanzas(Conn, 1),
  558:     case Stanzas of
  559:         [#xmlel{name = <<"message">>}] ->
  560:             receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas);
  561:         [_] -> %% Ack or presence
  562:             receive_all_ordered(Conn, N, Last, Acc);
  563:         [] ->
  564:             ct:fail({timeout_waiting, N, Acc})
  565:     end.
  566: 
  567: resend_unacked_after_resume_timeout(Config) ->
  568:     %% connect bob and alice
  569:     Bob = connect_fresh(Config, bob, presence),
  570:     Alice = connect_fresh(Config, alice, sr_presence),
  571:     AliceSpec = sm_helper:client_to_spec(Alice),
  572: 
  573:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  574:     %% kill alice connection
  575:     escalus_connection:kill(Alice),
  576: 
  577:     %% ensure there is no session
  578:     C2SPid = mongoose_helper:get_session_pid(Alice),
  579:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  580: 
  581:     %% alice come back and receives unacked message
  582:     NewAlice = connect_spec(AliceSpec, session),
  583:     send_initial_presence(NewAlice),
  584: 
  585:     escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)],
  586:                                  escalus:wait_for_stanzas(NewAlice, 2)),
  587: 
  588:     escalus_connection:stop(Bob),
  589:     escalus_connection:stop(NewAlice).
  590: 
  591: resume_expired_session_returns_correct_h(Config) ->
  592:     %% connect bob and alice
  593:     Bob = connect_fresh(Config, bob, sr_presence),
  594:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  595:     get_ack(Alice),
  596: 
  597:     %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge
  598:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  599:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)),
  600:     %% alice comes back, but too late, so resumption doesn't work,
  601:     %% but she receives the previous h = 1 anyway
  602:     %% NewAlice is also manual ack
  603:     NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice),
  604:     FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption),
  605:     <<"1">> = exml_query:attr(FailedResumption, <<"h">>),
  606:     %% And we can continue with bind and session
  607:     escalus_session:session(escalus_session:bind(NewAlice)),
  608:     send_initial_presence(NewAlice),
  609:     Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}),
  610:                escalus_connection:get_stanza(NewAlice, {msg, 2})],
  611:     escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas),
  612:     escalus_connection:stop(Bob),
  613:     escalus_connection:stop(NewAlice).
  614: 
  615: gc_repeat_after_never_means_no_cleaning(Config) ->
  616:     [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config),
  617:     {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]),
  618:     {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]),
  619:     {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]).
  620: 
  621: gc_repeat_after_timeout_does_clean(Config) ->
  622:     [{SMID1, _} | _ ] = ?config(smid_test, Config),
  623:     mongoose_helper:wait_until(fun() ->
  624:                                        rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1])
  625:                                end,
  626:                                {error, smid_not_found},
  627:                                #{name => smid_garbage_collected}).
  628: 
  629: resume_session_state_send_message_without_ack(Config) ->
  630:     resume_session_state_send_message_generic(Config, no_ack).
  631: 
  632: resume_session_state_send_message_with_ack(Config) ->
  633:     resume_session_state_send_message_generic(Config, ack).
  634: 
  635: resume_session_state_send_message_generic(Config, AckInitialPresence) ->
  636:     %% connect bob and alice
  637:     Bob = connect_fresh(Config, bob, presence),
  638:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  639:     maybe_ack_initial_presence(Alice, AckInitialPresence),
  640:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  641:     %% kill alice connection
  642:     C2SPid = mongoose_helper:get_session_pid(Alice),
  643:     escalus_connection:kill(Alice),
  644:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  645:     sm_helper:assert_alive_resources(Alice, 1),
  646: 
  647:     %% send some messages and check if c2s can handle it
  648:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)),
  649:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)),
  650:     %% suspend the process to ensure that Alice has enough time to reconnect,
  651:     %% before resumption timeout occurs.
  652:     ok = rpc(mim(), sys, suspend, [C2SPid]),
  653: 
  654:     %% alice comes back and receives unacked message
  655:     NewAlice = connect_same(Alice, presence),
  656:     %% now we can resume c2s process of the old connection
  657:     %% and let it process session resumption timeout
  658:     ok = rpc(mim(), sys, resume, [C2SPid]),
  659:     Stanzas = escalus:wait_for_stanzas(NewAlice, 3),
  660: 
  661:     % what about order ?
  662:     % alice receive presence from herself and 3 unacked messages from bob
  663:     escalus_new_assert:mix_match([is_chat(<<"msg-1">>),
  664:                                   is_chat(<<"msg-2">>),
  665:                                   is_chat(<<"msg-3">>)],
  666:                                  Stanzas),
  667:     escalus_connection:stop(Bob),
  668:     escalus_connection:stop(NewAlice).
  669: 
  670: %%for instance it can be done by mod ping
  671: resume_session_state_stop_c2s(Config) ->
  672:     Bob = connect_fresh(Config, bob, presence),
  673:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  674: 
  675:     get_ack(Alice),
  676:     ack_initial_presence(Alice),
  677: 
  678:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  679:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)),
  680: 
  681:     %% get pid of c2s
  682:     C2SPid = mongoose_helper:get_session_pid(Alice),
  683:     %% Wait c2s process to process our presence ack.
  684:     %% Otherwise, we can receive two initial presences sometimes.
  685:     sm_helper:wait_for_c2s_unacked_count(C2SPid, 1),
  686: 
  687:     % kill alice connection
  688:     escalus_connection:kill(Alice),
  689:     % session should be alive
  690:     sm_helper:assert_alive_resources(Alice, 1),
  691:     rpc(mim(), ejabberd_c2s, stop, [C2SPid]),
  692:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  693:     %% suspend the process to ensure that Alice has enough time to reconnect,
  694:     %% before resumption timeout occurs.
  695:     ok = rpc(mim(), sys, suspend, [C2SPid]),
  696: 
  697:     %% alice comes back and receives unacked message
  698:     NewAlice = connect_same(Alice, presence, manual),
  699:     %% now we can resume c2s process of the old connection
  700:     %% and let it process session resumption timeout
  701:     ok = rpc(mim(), sys, resume, [C2SPid]),
  702: 
  703:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)),
  704:     escalus_connection:stop(Bob),
  705:     escalus_connection:stop(NewAlice).
  706: 
  707: %% This test only verifies the validity of helpers (get_session_pid,
  708: %% get_c2s_state_name) written for wait_for_resumption
  709: %% testcase.
  710: session_established(Config) ->
  711:     Alice = connect_fresh(Config, alice, presence),
  712:     C2SPid = mongoose_helper:get_session_pid(Alice),
  713:     session_established = mongoose_helper:get_c2s_state_name(C2SPid),
  714:     escalus_connection:stop(Alice).
  715: 
  716: %% Ensure that after a violent disconnection,
  717: %% the c2s waits for resumption (but don't resume yet).
  718: wait_for_resumption(Config) ->
  719:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  720:     Bob = connect_fresh(Config, bob, session),
  721:     Texts = three_texts(),
  722:     {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
  723:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session).
  724: 
  725: unacknowledged_message_hook_filter(Config) ->
  726:     FilterText = <<"filter">>,
  727:     Bob = connect_fresh(Config, bob, presence),
  728:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  729:     Resource = proplists:get_value(username, AliceSpec),
  730:     HookHandlerExtra = start_filter_hook_listener(FilterText, Resource),
  731:     Alice = connect_spec([{resource, Resource} | AliceSpec], sr_presence, manual),
  732:     %% Ack the presence stanza
  733:     get_ack(Alice),
  734:     ack_initial_presence(Alice),
  735:     Messages = [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>, <<"msg-4">>],
  736:     All = [<<"msg-1">>, FilterText, <<"msg-2">>, FilterText, <<"msg-3">>, <<"msg-4">>],
  737:     [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, Body)) || Body <- All ],
  738:     %% kill alice connection
  739:     C2SPid = mongoose_helper:get_session_pid(Alice),
  740:     escalus_connection:kill(Alice),
  741:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  742:     sm_helper:assert_alive_resources(Alice, 1),
  743:     %% ensure second C2S is registered so all the messages are bounced properly
  744:     NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual),
  745:     send_initial_presence(NewAlice),
  746:     sm_helper:wait_for_resource_count(NewAlice, 2),
  747:     ok = rpc(mim(), sys, terminate, [C2SPid, normal]),
  748:     %% verify that the filtered message is never received
  749:     verify_no_receive_filtertext(NewAlice, FilterText, Messages),
  750:     stop_hook_listener(HookHandlerExtra),
  751:     escalus_connection:stop(Bob).
  752: 
  753: verify_no_receive_filtertext(_, _, []) ->
  754:     ok;
  755: verify_no_receive_filtertext(Client, FilterText, Messages) ->
  756:     case escalus:wait_for_stanzas(Client, 1, 500) of
  757:         [] -> ct:fail("Messages pending: ~p~n", [Messages]);
  758:         [St] ->
  759:             case exml_query:path(St, [{element, <<"body">>}, cdata]) of
  760:                 undefined ->
  761:                     verify_no_receive_filtertext(Client, FilterText, Messages);
  762:                 FilterText ->
  763:                     ct:fail("Composing forwarded from a different c2s process");
  764:                 Message ->
  765:                     Pred = fun(Msg) -> Msg =/= Message end,
  766:                     {Pending, _} = lists:partition(Pred, Messages),
  767:                     verify_no_receive_filtertext(Client, FilterText, Pending)
  768:             end
  769:     end.
  770: 
  771: unacknowledged_message_hook_resume(Config) ->
  772:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config).
  773: 
  774: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) ->
  775:     NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
  776:     send_initial_presence(NewAlice),
  777:     {Resource, NewAlice}.
  778: 
  779: unacknowledged_message_hook_bounce(Config) ->
  780:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config).
  781: 
  782: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) ->
  783:     NewResource = <<"new_", Resource/binary>>,
  784:     NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}),
  785:     NewAlice = connect_spec(NewSpec, sr_session, manual),
  786:     send_initial_presence(NewAlice),
  787:     %% ensure second C2S is registered so all the messages are bounced properly
  788:     sm_helper:wait_for_resource_count(NewAlice, 2),
  789:     ok = rpc(mim(), sys, terminate, [C2SPid, normal]),
  790:     {NewResource, NewAlice}.
  791: 
  792: unacknowledged_message_hook_offline(Config) ->
  793:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config).
  794: 
  795: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) ->
  796:     C2SRef = erlang:monitor(process, C2SPid),
  797:     %%reset the session, so old C2S process is stopped
  798:     NewAlice = connect_spec(AliceSpec, sr_session, manual),
  799:     %% wait for old C2S termination before send presence. other way
  800:     %% some of the latest unacknowledged messages can be bounced to
  801:     %% the new C2S process instead of going to the mod_offline storage.
  802:     %% looks like all the unacknowledged messages arrive to the new
  803:     %% C2S, but the message sequence is broken (the bounced messages
  804:     %% delivered before the messages from the mod_offline storage)
  805:     sm_helper:wait_for_process_termination(C2SRef),
  806:     send_initial_presence(NewAlice),
  807:     {Resource, NewAlice}.
  808: 
  809: unacknowledged_message_hook_common(RestartConnectionFN, Config) ->
  810:     %% connect bob and alice
  811:     Bob = connect_fresh(Config, bob, presence),
  812: 
  813:     AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice),
  814:     Resource = proplists:get_value(username, AliceSpec0),
  815:     AliceSpec = [{resource, Resource} | AliceSpec0],
  816:     HookHandlerExtra = start_hook_listener(Resource),
  817:     Alice = connect_spec(AliceSpec, sr_presence, manual),
  818:     %% Ack the presence stanza
  819:     get_ack(Alice),
  820:     ack_initial_presence(Alice),
  821: 
  822:     SMID = sm_helper:client_to_smid(Alice),
  823: 
  824:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  825:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)),
  826:     %% kill alice connection
  827:     C2SPid = mongoose_helper:get_session_pid(Alice),
  828:     escalus_connection:kill(Alice),
  829:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  830:     sm_helper:assert_alive_resources(Alice, 1),
  831: 
  832:     escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  833:     escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  834:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  835: 
  836:     %% send some messages and check if c2s can handle it
  837:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)),
  838:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)),
  839:     escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  840:     escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  841:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  842: 
  843:     %% alice comes back and receives unacked message
  844:     {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid),
  845: 
  846:     mongoose_helper:wait_until(
  847:         fun() ->
  848:             Stanza = escalus_connection:get_stanza(NewAlice, msg),
  849:             escalus:assert(is_chat_message, [<<"msg-4">>], Stanza),
  850:             ok
  851:         end, ok),
  852: 
  853:     NewC2SPid = mongoose_helper:get_session_pid(NewAlice),
  854:     escalus_connection:kill(NewAlice),
  855:     mongoose_helper:wait_for_c2s_state_name(NewC2SPid, resume_session),
  856: 
  857:     escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  858:     escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  859:     escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  860:     escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  861:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  862:     stop_hook_listener(HookHandlerExtra),
  863:     escalus_connection:stop(Bob).
  864: 
  865: resume_session(Config) ->
  866:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  867:     Texts = three_texts(),
  868:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  869:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
  870:         %% Resume the session.
  871:         Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
  872:         %% Alice receives the unacked messages from the previous
  873:         %% interrupted session.
  874:         sm_helper:wait_for_messages(Alice, Texts),
  875:         %% Alice acks the received messages.
  876:         escalus_connection:send(Alice, escalus_stanza:sm_ack(5)),
  877:         escalus_connection:stop(Alice)
  878:     end).
  879: 
  880: resume_session_with_wrong_h_does_not_leak_sessions(Config) ->
  881:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  882:     Messages = three_texts(),
  883:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  884:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages),
  885:         %% Resume the session.
  886:         Alice = connect_spec(AliceSpec, auth, manual),
  887:         Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30),
  888:         escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed),
  889: 
  890:         [] = sm_helper:get_user_present_resources(Alice),
  891:         HostType = host_type(),
  892:         {error, smid_not_found} = sm_helper:get_sid_by_stream_id(HostType, SMID),
  893:         escalus_connection:wait_for_close(Alice, timer:seconds(5))
  894:     end).
  895: 
  896: resume_session_with_wrong_sid_returns_item_not_found(Config) ->
  897:     session_resumption_expects_item_not_found(Config, <<"wrong-sid">>).
  898: 
  899: resume_session_with_wrong_namespace_is_a_noop(Config) ->
  900:     Alice = connect_fresh(Config, alice, auth),
  901:     #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4),
  902:     Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}),
  903:     escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}),
  904:     escalus_assert:has_no_stanzas(Alice),
  905:     [] = sm_helper:get_user_present_resources(Alice),
  906:     true = escalus_connection:is_connected(Alice),
  907:     escalus_connection:stop(Alice).
  908: 
  909: resume_dead_session_results_in_item_not_found(Config) ->
  910:     SMID = base64:encode(crypto:strong_rand_bytes(21)),
  911:     SID = {os:timestamp(), undefined},
  912:     HostType = host_type(),
  913:     rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]),
  914:     session_resumption_expects_item_not_found(Config, SMID).
  915: 
  916: session_resumption_expects_item_not_found(Config, SMID) ->
  917:     Alice = connect_fresh(Config, alice, auth),
  918:     Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2),
  919:     escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed),
  920:     [] = sm_helper:get_user_present_resources(Alice),
  921:     true = escalus_connection:is_connected(Alice),
  922:     escalus_connection:stop(Alice).
  923: 
  924: resume_session_kills_old_C2S_gracefully(Config) ->
  925:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  926:     C2SPid = mongoose_helper:get_session_pid(Alice),
  927: 
  928:     %% Monitor the C2S process and disconnect Alice.
  929:     MonitorRef = sm_helper:monitor_session(Alice),
  930:     escalus_client:kill_connection(Config, Alice),
  931: 
  932:     %% Ensure the c2s process is waiting for resumption.
  933:     mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
  934: 
  935:     %% Resume the session.
  936:     NewAlice = connect_resume(Alice, 1),
  937: 
  938:     %% C2S process should die gracefully with Reason=normal.
  939:     sm_helper:wait_for_process_termination(MonitorRef),
  940:     escalus_connection:stop(NewAlice).
  941: 
  942: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) ->
  943:     Alice = connect_spec(AliceSpec, sr_presence, manual),
  944:     %% Bobs sends some messages to Alice.
  945:     sm_helper:send_messages(Bob, Alice, Texts),
  946:     %% Alice receives them, but doesn't ack.
  947:     sm_helper:wait_for_messages(Alice, Texts),
  948:     %% Alice's connection is violently terminated.
  949:     escalus_client:kill_connection(Config, Alice),
  950:     C2SPid = mongoose_helper:get_session_pid(Alice),
  951:     SMID = sm_helper:client_to_smid(Alice),
  952:     {C2SPid, SMID}.
  953: 
  954: aggressively_pipelined_resume(Config) ->
  955:     AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]}
  956:                  | escalus_fresh:create_fresh_user(Config, alice)],
  957:     UnackedMessages = three_texts(),
  958:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  959:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages),
  960:         %% Resume the session.
  961:         Alice = escalus_connection:connect(AliceSpec),
  962: 
  963:         Username = proplists:get_value(username, AliceSpec),
  964:         Password = proplists:get_value(password, AliceSpec),
  965:         Payload = <<0:8,Username/binary,0:8,Password/binary>>,
  966:         Server = proplists:get_value(server, AliceSpec),
  967: 
  968:         Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>),
  969:         Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]),
  970:         AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>),
  971:         Resume = escalus_stanza:resume(SMID, 2),
  972: 
  973:         escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]),
  974:         Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)],
  975:         escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)),
  976: 
  977:         escalus_connection:stop(Alice)
  978:     end).
  979: 
  980: %% This is a regression test for a case when a session processes a request, which will
  981: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc.
  982: %% Without proper handling, the reply would be rejected because the resumed session
  983: %% has new SID.
  984: replies_are_processed_by_resumed_session(Config) ->
  985:     %% GIVEN a session and registered special IQ handler (added in init_per_testcase),
  986:     %% that waits for old session process to terminate (at this point new process
  987:     %% has fully taken over) and then actually sends the reply.
  988:     Alice = connect_fresh(Config, alice, sr_presence),
  989: 
  990:     %% WHEN a client sends IQ request to the special handler...
  991:     IQReq = escalus_stanza:iq_get(regression_ns(), []),
  992:     escalus:send(Alice, IQReq),
  993: 
  994:     %% ... goes down and session is resumed.
  995:     Alice2 = sm_helper:kill_and_connect_resume(Alice),
  996: 
  997:     %% THEN the client receives the reply properly.
  998:     IQReply = escalus:wait_for_stanza(Alice2),
  999:     escalus:assert(is_iq_result, [IQReq], IQReply),
 1000:     escalus_connection:stop(Alice2).
 1001: 
 1002: %% This is a regression test for a bug, which manifested in following scenario
 1003: %% (due to improper presence sub requests buffering):
 1004: %% 1. A is online, B is offline
 1005: %% 2. A subscribes to B's presence;
 1006: %% 3. B becomes online
 1007: %% 4. A sends a message to B
 1008: %% 5. B doesn't SM-ack the request or message, terminates the connection
 1009: %% 6. B reconnects but with session *replace*, not resume
 1010: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer
 1011: %% 8. B doesn't receive the buffered message
 1012: subscription_requests_are_buffered_properly(Config) ->
 1013:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
 1014:     MsgBody = <<"buffered">>,
 1015:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1016:         % GIVEN Bob's pending subscription to Alice's presence
 1017:         AliceJid = common_helper:get_bjid(AliceSpec),
 1018:         escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)),
 1019:         _RosterPushReq = escalus:wait_for_stanza(Bob),
 1020: 
 1021:         % WHEN Alice becomes online...
 1022:         Alice = connect_spec(AliceSpec, sr_session, manual),
 1023:         send_initial_presence(Alice),
 1024:         %% subscribe could come before the initial presence
 1025:         escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)],
 1026:                             escalus:wait_for_stanzas(Alice, 2)),
 1027: 
 1028:         % ...and Bob sends a message to Alice...
 1029:         escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1030:         MsgStanza = escalus:wait_for_stanza(Alice),
 1031:         escalus:assert(is_chat_message, [MsgBody], MsgStanza),
 1032: 
 1033:         % ...and Alice terminates connection without acking anything...
 1034:         escalus_client:kill_connection(Config, Alice),
 1035: 
 1036:         % ...and reconnects with session replacement.
 1037:         Alice2 = connect_spec(AliceSpec, session, manual),
 1038: 
 1039:         % THEN Alice receives (without sending initial presence):
 1040:         % * buffered Bob's message (like above)
 1041:         % Alice DOESN'T receive:
 1042:         % * buffered subscription request because it is dropped by ejabberd_sm
 1043:         %   because it's treated like repeated sub request to bare JID, so it's not
 1044:         %   processed by any sub req handler (like mod_roster)
 1045:         % * buffered available presence from Alice - because it is addressed to another SID
 1046:         %   and Alice2 is a brand new session
 1047:         escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)),
 1048:         sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>),
 1049:         escalus_assert:has_no_stanzas(Alice2),
 1050: 
 1051:         %% Only once an initial presence is sent, a subscription request is sent
 1052:         send_initial_presence(Alice2),
 1053:         escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)],
 1054:                             escalus:wait_for_stanzas(Alice2, 2)),
 1055: 
 1056:         sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>),
 1057:         escalus_assert:has_no_stanzas(Alice2),
 1058: 
 1059:         escalus_connection:stop(Alice2)
 1060:     end).
 1061: 
 1062: %% This is a regression test for a bug, due to which messages sent to old session
 1063: %% in a middle of state handover were not appended properly to SM buffer.
 1064: %% Scenario to reproduce:
 1065: %% 1. Online Bob and Alice
 1066: %% 2. Alice kills the connection
 1067: %% 3. Alice's session is suspended
 1068: %% 4. Alice resumes session with new connection. At this moment new session is still not
 1069: %%    present in session table. `resume` request is stuck in old proc mailbox.
 1070: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now
 1071: %%    old session has two messages in mailbox: `resume` and XML from Bob
 1072: %% 6. We resume old process and it begins session handover
 1073: %% 7. Bob's message is appended to SM buffer in "flush" step
 1074: %% 8. With bug fixed, the message is retransmitted properly
 1075: messages_are_properly_flushed_during_resumption(Config) ->
 1076:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1077:         % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended
 1078:         Alice = connect_fresh(Config, alice, sr_presence),
 1079:         SMH = escalus_connection:get_sm_h(Alice),
 1080:         escalus_client:kill_connection(Config, Alice),
 1081:         %% The receiver process would stop now
 1082:         C2SPid = mongoose_helper:get_session_pid(Alice),
 1083:         mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
 1084: 
 1085:         sm_helper:wait_for_queue_length(C2SPid, 0),
 1086:         ok = rpc(mim(), sys, suspend, [C2SPid]),
 1087: 
 1088:         % WHEN new session requests resumption
 1089:         % we wait until that old session has resumption request enqueued;
 1090:         % we need it to ensure the order of messages: resume first, Bob's chat second.
 1091:         % Actual wait and message sent by Bob is done in separate process
 1092:         % because new client start will block until old process is resumed
 1093: 
 1094:         MsgBody = <<"flush-regression">>,
 1095:         spawn_link(fun() ->
 1096:                       sm_helper:wait_for_queue_length(C2SPid, 1),
 1097: 
 1098:                       % Bob sends a message...
 1099:                       escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1100: 
 1101:                       % ...we ensure that a message is enqueued in Alice's session...
 1102:                       % (2 messages = resume request + Bob's message)
 1103:                       sm_helper:wait_for_queue_length(C2SPid, 2),
 1104: 
 1105:                       % ...and old process is resumed.
 1106:                       ok = rpc(mim(), sys, resume, [C2SPid])
 1107:               end),
 1108:         Alice2 = connect_resume(Alice, SMH),
 1109:         % THEN Alice's new session receives Bob's message
 1110:         RecvMsg = escalus:wait_for_stanza(Alice2),
 1111:         escalus:assert(is_chat_message, [MsgBody], RecvMsg)
 1112:       end).
 1113: 
 1114: messages_are_properly_flushed_during_resumption_p1_fsm_old(Config) ->
 1115:     %% the same as messages_are_properly_flushed_during_resumption,
 1116:     %% but tests that buffered by p1_fsm_old messages are delivered
 1117:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1118:         Alice = connect_fresh(Config, alice, sr_presence),
 1119:         SMH = escalus_connection:get_sm_h(Alice),
 1120:         escalus_client:kill_connection(Config, Alice),
 1121:         C2SPid = mongoose_helper:get_session_pid(Alice),
 1122:         mongoose_helper:wait_for_c2s_state_name(C2SPid, resume_session),
 1123:         ok = rpc(mim(), sys, suspend, [C2SPid]),
 1124: 
 1125:         %% send some dummy event. ignored by c2s but ensures that
 1126:         %% p1_old_fsm buffers the messages, sent after this one
 1127:         rpc(mim(), p1_fsm_old, send_all_state_event, [C2SPid, dummy_event]),
 1128: 
 1129:         MsgBody = <<"flush-regression">>,
 1130:         spawn_link(fun() ->
 1131:                     sm_helper:wait_for_queue_length(C2SPid, 2),
 1132: 
 1133:                     % Bob sends a message...
 1134:                     escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1135: 
 1136:                     % ...we ensure that a message is enqueued in Alice's session...
 1137:                     % (2 messages = resume request + Bob's message)
 1138:                     sm_helper:wait_for_queue_length(C2SPid, 3),
 1139: 
 1140:                     % ...and old process is resumed.
 1141:                     ok = rpc(mim(), sys, resume, [C2SPid])
 1142:               end),
 1143:         Alice2 = connect_resume(Alice, SMH),
 1144:         % THEN Alice's new session receives Bob's message
 1145:         RecvMsg = escalus:wait_for_stanza(Alice2),
 1146:         escalus:assert(is_chat_message, [MsgBody], RecvMsg)
 1147:       end).
 1148: 
 1149: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) ->
 1150:     Alice = connect_fresh(Config, alice, session, manual),
 1151:     %% Should not crash anything!
 1152:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
 1153:     Response = escalus_connection:get_stanza(Alice, service_unavailable),
 1154:     escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response),
 1155:     escalus_connection:stop(Alice).
 1156: 
 1157: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) ->
 1158:     Alice = connect_fresh(Config, alice, session, manual),
 1159:     %% Should not crash anything!
 1160:     escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])),
 1161:     Response = escalus_connection:get_stanza(Alice, service_unavailable),
 1162:     escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response),
 1163:     escalus_connection:stop(Alice).
 1164: 
 1165: %%--------------------------------------------------------------------
 1166: %% Helpers
 1167: %%--------------------------------------------------------------------
 1168: start_hook_listener(Resource) ->
 1169:     TestCasePid = self(),
 1170:     rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]).
 1171: 
 1172: stop_hook_listener(HookExtra) ->
 1173:     rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]).
 1174: 
 1175: rpc_start_hook_handler(TestCasePid, User, HostType) ->
 1176:     LUser = jid:nodeprep(User),
 1177:     Extra = #{luser => LUser, pid => TestCasePid},
 1178:     gen_hook:add_handler(unacknowledged_message, HostType,
 1179:                          fun ?MODULE:hook_handler_fn/3,
 1180:                          Extra, 50),
 1181:     Extra.
 1182: 
 1183: rpc_stop_hook_handler(HookExtra, HostType) ->
 1184:     gen_hook:delete_handler(unacknowledged_message, HostType,
 1185:                             fun ?MODULE:hook_handler_fn/3,
 1186:                             HookExtra, 50).
 1187: 
 1188: hook_handler_fn(Acc,
 1189:                 #{args := [Jid]} = _Params,
 1190:                 #{luser := LUser, pid := TestCasePid} = _Extra) ->
 1191:     {U, _S, R} = jid:to_lower(Jid),
 1192:     case U of
 1193:         LUser ->
 1194:             Counter = mongoose_acc:get(sm_test, counter, 0, Acc),
 1195:             El = mongoose_acc:element(Acc),
 1196:             TestCasePid ! {sm_test, Counter, R, El},
 1197:             {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)};
 1198:         _ -> {ok, Acc}
 1199:     end.
 1200: 
 1201: wait_for_unacked_msg_hook(Counter, Res, Timeout) ->
 1202:     receive
 1203:         {sm_test, AccCounter, Resource, Stanza} = Msg ->
 1204:             ?assertEqual(Counter, AccCounter, Msg),
 1205:             ?assertEqual(Res, Resource, Msg),
 1206:             Stanza
 1207:     after Timeout ->
 1208:         timeout
 1209:     end.
 1210: 
 1211: start_filter_hook_listener(FilterText, Resource) ->
 1212:     rpc(mim(), ?MODULE, rpc_start_filter_hook_handler, [FilterText, Resource, host_type()]).
 1213: 
 1214: stop_filter_hook_listener(HookExtra) ->
 1215:     rpc(mim(), ?MODULE, rpc_stop_filter_hook_handler, [HookExtra, host_type()]).
 1216: 
 1217: rpc_start_filter_hook_handler(FilterText, User, HostType) ->
 1218:     LUser = jid:nodeprep(User),
 1219:     Extra = #{luser => LUser, filter_text => FilterText},
 1220:     gen_hook:add_handler(filter_unacknowledged_messages, HostType,
 1221:                          fun ?MODULE:filter_hook_handler_fn/3,
 1222:                          Extra, 50),
 1223:     Extra.
 1224: 
 1225: rpc_stop_filter_hook_handler(HookExtra, HostType) ->
 1226:     gen_hook:delete_handler(filter_unacknowledged_messages, HostType,
 1227:                             fun ?MODULE:filter_hook_handler_fn/3,
 1228:                             HookExtra, 50).
 1229: 
 1230: filter_hook_handler_fn(Buffer,
 1231:                        #{jid := Jid} = _Params,
 1232:                        #{luser := LUser, filter_text := FilterText} = _Extra) ->
 1233:     {U, _} = jid:to_lus(Jid),
 1234:     case U of
 1235:         LUser ->
 1236:             F = fun(Acc) -> filter_text(Acc, FilterText) end,
 1237:             NewBuffer = lists:filter(F, Buffer),
 1238:             {ok, NewBuffer};
 1239:         _ -> {ok, Buffer}
 1240:     end.
 1241: 
 1242: filter_text(Acc, FilterText) ->
 1243:     case mongoose_acc:stanza_name(Acc) of
 1244:         <<"message">> ->
 1245:             El = mongoose_acc:element(Acc),
 1246:             FilterText =/= exml_query:path(El, [{element, <<"body">>}, cdata]);
 1247:         _ ->
 1248:             true
 1249:     end.
 1250: 
 1251: is_chat(Content) ->
 1252:     fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end.
 1253: 
 1254: is_presence(Type) ->
 1255:     fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end.
 1256: 
 1257: three_texts() ->
 1258:     [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>].
 1259: 
 1260: %%--------------------------------------------------------------------
 1261: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session"
 1262: %%--------------------------------------------------------------------
 1263: 
 1264: regression_ns() ->
 1265:     <<"regression">>.
 1266: 
 1267: register_handler() ->
 1268:     HostType = host_type(),
 1269:     rpc(mim(), gen_iq_handler, add_iq_handler_for_domain,
 1270:         [HostType, regression_ns(), ejabberd_sm,
 1271:          fun ?MODULE:regression_handler/5, #{}, one_queue]).
 1272: 
 1273: unregister_handler() ->
 1274:     HostType = host_type(),
 1275:     rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain,
 1276:         [HostType, regression_ns(), ejabberd_sm]).
 1277: 
 1278: regression_handler(Acc, _From, _To, IQ, _Extra) ->
 1279:     %% A bit of a hack - will no longer work when the SID format changes
 1280:     {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
 1281:     erlang:monitor(process, Pid),
 1282:     receive
 1283:         {'DOWN', _, _, _, _} ->
 1284:             ok
 1285:     after
 1286:         10000 ->
 1287:             error({c2s_not_stopped_after_timeout, Pid})
 1288:     end,
 1289:     %% We avoid another race condition - there is a short window where user session
 1290:     %% is not registered in ejabberd_sm: between old process termination and the moment
 1291:     %% when the new process stores new session in memory. It should be fixed separately.
 1292:     wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100),
 1293:     {Acc, jlib:make_result_iq_reply(IQ)}.
 1294: 
 1295: wait_for_session(JID, Retries, SleepTime) ->
 1296:     case ejabberd_sm:get_session(JID) of
 1297:         offline ->
 1298:             timer:sleep(SleepTime),
 1299:             wait_for_session(JID, Retries - 1, SleepTime);
 1300:         _ ->
 1301:             ok
 1302:     end.
 1303: 
 1304: maybe_ack_initial_presence(Alice, ack) ->
 1305:     ack_initial_presence(Alice);
 1306: maybe_ack_initial_presence(_Alice, no_ack) ->
 1307:     ok.