1: %% Stream Management tests
    2: -module(sm_SUITE).
    3: 
    4: -compile([export_all, nowarn_export_all]).
    5: -include_lib("exml/include/exml.hrl").
    6: -include_lib("eunit/include/eunit.hrl").
    7: -include_lib("escalus/include/escalus.hrl").
    8: -include_lib("common_test/include/ct.hrl").
    9: 
   10: %% Injected code callbacks
   11: -export([rpc_start_hook_handler/3,
   12:          rpc_stop_hook_handler/2,
   13:          hook_handler_fn/3,
   14:          regression_handler/5]).
   15: 
   16: -export([rpc_start_filter_hook_handler/3,
   17:          rpc_stop_filter_hook_handler/2,
   18:          filter_hook_handler_fn/3]).
   19: 
   20: -import(distributed_helper, [mim/0,
   21:                              require_rpc_nodes/1,
   22:                              rpc/4]).
   23: 
   24: -import(domain_helper, [host_type/0]).
   25: 
   26: -import(sm_helper, [connect_fresh/3,
   27:                     connect_fresh/4,
   28:                     connect_spec/2,
   29:                     connect_spec/3,
   30:                     connect_same/2,
   31:                     connect_same/3,
   32:                     connect_resume/2,
   33:                     process_initial_stanza/1,
   34:                     send_initial_presence/1,
   35:                     get_ack/1,
   36:                     ack_initial_presence/1]).
   37: 
   38: -define(MOD_SM, mod_stream_management).
   39: -define(CONSTRAINT_CHECK_TIMEOUT, 5000).
   40: -define(LONG_TIMEOUT, 3600).
   41: -define(SHORT_TIMEOUT, 1).
   42: -define(SMALL_SM_BUFFER, 3).
   43: -define(PING_REQUEST_TIMEOUT, 2).
   44: -define(PING_INTERVAL, 5).
   45: 
   46: %%--------------------------------------------------------------------
   47: %% Suite configuration
   48: %%--------------------------------------------------------------------
   49: 
   50: suite() ->
   51:     require_rpc_nodes([mim]) ++ escalus:suite().
   52: 
   53: all() ->
   54:     ct_helper:groups_to_all(groups()) ++ [ping_timeout].
   55: 
   56: groups() ->
   57:     [
   58:      {parallel, [parallel], parallel_cases()},
   59:      {parallel_manual_ack_freq_1, [parallel], parallel_manual_ack_freq_1_cases()},
   60:      {manual_ack_freq_2, [], manual_ack_freq_2_cases()},
   61:      {stale_h, [], stale_h_cases()},
   62:      {parallel_unacknowledged_message_hook, [parallel], parallel_unacknowledged_message_hook_cases()}
   63:     ].
   64: 
   65: parallel_cases() ->
   66:     [server_announces_sm,
   67:      server_enables_sm_before_session,
   68:      server_enables_sm_after_session,
   69:      server_returns_failed_after_start,
   70:      server_returns_failed_after_auth,
   71:      server_enables_resumption,
   72:      client_enables_sm_twice_fails_with_correct_error_stanza,
   73:      session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza,
   74:      session_resumed_and_old_session_dead_doesnt_route_error_to_new_session,
   75:      basic_ack,
   76:      h_ok_before_session,
   77:      h_ok_after_session_enabled_before_session,
   78:      h_ok_after_session_enabled_after_session,
   79:      h_ok_after_a_chat,
   80:      h_non_given_closes_stream_gracefully,
   81:      resend_unacked_on_reconnection,
   82:      session_established,
   83:      wait_for_resumption,
   84:      resume_session,
   85:      resume_session_with_wrong_h_does_not_leak_sessions,
   86:      resume_session_with_wrong_sid_returns_item_not_found,
   87:      resume_session_with_wrong_namespace_is_a_noop,
   88:      resume_dead_session_results_in_item_not_found,
   89:      resume_session_kills_old_C2S_gracefully,
   90:      carboncopy_works,
   91:      carboncopy_works_after_resume,
   92:      aggressively_pipelined_resume,
   93:      replies_are_processed_by_resumed_session,
   94:      subscription_requests_are_buffered_properly,
   95:      messages_are_properly_flushed_during_resumption].
   96: 
   97: parallel_manual_ack_freq_1_cases() ->
   98:     [client_acks_more_than_sent,
   99:      too_many_unacked_stanzas,
  100:      resend_unacked_after_resume_timeout,
  101:      resume_session_state_send_message_with_ack,
  102:      resume_session_state_send_message_without_ack,
  103:      resume_session_state_stop_c2s,
  104:      server_requests_ack_after_session,
  105:      resend_more_offline_messages_than_buffer_size,
  106:      server_requests_ack].
  107: 
  108: manual_ack_freq_2_cases() ->
  109:     [server_requests_ack_freq_2].
  110: 
  111: stale_h_cases() ->
  112:     [resume_expired_session_returns_correct_h,
  113:      gc_repeat_after_never_means_no_cleaning,
  114:      gc_repeat_after_timeout_does_clean].
  115: 
  116: stream_mgmt_disabled_cases() ->
  117:     [no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt,
  118:      no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption].
  119: 
  120: manual_ack_freq_long_session_timeout_cases() ->
  121:     [preserve_order].
  122: 
  123: parallel_unacknowledged_message_hook_cases() ->
  124:     [unacknowledged_message_hook_bounce,
  125:      unacknowledged_message_hook_offline,
  126:      unacknowledged_message_hook_resume,
  127:      unacknowledged_message_hook_filter].
  128: 
  129: %%--------------------------------------------------------------------
  130: %% Init & teardown
  131: %%--------------------------------------------------------------------
  132: 
  133: init_per_suite(Config) ->
  134:     NewConfig = dynamic_modules:save_modules(host_type(), Config),
  135:     NewConfigWithSM = escalus_users:update_userspec(NewConfig, alice, stream_management, true),
  136:     mongoose_helper:inject_module(?MODULE),
  137:     escalus:init_per_suite(NewConfigWithSM).
  138: 
  139: end_per_suite(Config) ->
  140:     escalus_fresh:clean(),
  141:     dynamic_modules:restore_modules(Config),
  142:     escalus:end_per_suite(Config).
  143: 
  144: init_per_group(Group, Config) when Group =:= parallel_unacknowledged_message_hook;
  145:                                    Group =:= manual_ack_freq_long_session_timeout;
  146:                                    Group =:= parallel_manual_ack_freq_1;
  147:                                    Group =:= manual_ack_freq_2 ->
  148:     dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
  149:     Config;
  150: init_per_group(stale_h, Config) ->
  151:     Config;
  152: init_per_group(stream_mgmt_disabled, Config) ->
  153:     dynamic_modules:stop(host_type(), ?MOD_SM),
  154:     rpc(mim(), mnesia, delete_table, [sm_session]),
  155:     Config;
  156: init_per_group(Group, Config) ->
  157:     dynamic_modules:ensure_modules(host_type(), required_modules(group, Group)),
  158:     Config.
  159: 
  160: end_per_group(_Group, _Config) ->
  161:     ok.
  162: 
  163: init_per_testcase(resume_expired_session_returns_correct_h = CN, Config) ->
  164:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  165:     escalus:init_per_testcase(CN, Config);
  166: init_per_testcase(CN, Config) when CN =:= gc_repeat_after_never_means_no_cleaning;
  167:                                    CN =:= gc_repeat_after_timeout_does_clean ->
  168:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  169:     Config2 = register_some_smid_h(Config),
  170:     escalus:init_per_testcase(CN, Config2);
  171: init_per_testcase(ping_timeout = CN, Config) ->
  172:     ok = rpc(mim(), meck, new, [mod_ping, [passthrough, no_link]]),
  173:     dynamic_modules:ensure_modules(host_type(), required_modules(testcase, CN)),
  174:     escalus:init_per_testcase(CN, Config);
  175: init_per_testcase(server_requests_ack_freq_2 = CN, Config) ->
  176:     escalus:init_per_testcase(CN, Config);
  177: init_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
  178:     register_handler(),
  179:     escalus:init_per_testcase(CN, Config);
  180: init_per_testcase(CaseName, Config) ->
  181:     escalus:init_per_testcase(CaseName, Config).
  182: 
  183: end_per_testcase(CN, Config) when CN =:= resume_expired_session_returns_correct_h;
  184:                                   CN =:= gc_repeat_after_never_means_no_cleaning;
  185:                                   CN =:= gc_repeat_after_timeout_does_clean ->
  186:     Name = rpc(mim(), gen_mod, get_module_proc, [host_type(), stream_management_stale_h]),
  187:     rpc(mim(), ejabberd_sup, stop_child, [Name]),
  188:     escalus:end_per_testcase(CN, Config);
  189: end_per_testcase(replies_are_processed_by_resumed_session = CN, Config) ->
  190:     unregister_handler(),
  191:     escalus:end_per_testcase(CN, Config);
  192: end_per_testcase(ping_timeout = CN, Config) ->
  193:     rpc(mim(), meck, unload, [mod_ping]),
  194:     escalus:end_per_testcase(CN, Config);
  195: end_per_testcase(CaseName, Config) ->
  196:     escalus:end_per_testcase(CaseName, Config).
  197: 
  198: %% Module configuration per group (in case of stale_h group it is per testcase)
  199: 
  200: required_modules(Scope, Name) ->
  201:     SMConfig = case required_sm_opts(Scope, Name) of
  202:                    stopped -> stopped;
  203:                    ExtraOpts -> maps:merge(common_sm_opts(), ExtraOpts)
  204:                end,
  205:     Backend = mongoose_helper:mnesia_or_rdbms_backend(),
  206:     BaseModules = [
  207:      {mod_stream_management, config_parser_helper:mod_config(mod_stream_management, SMConfig)},
  208:      {mod_offline, config_parser_helper:mod_config(mod_offline, #{backend => Backend})}
  209:      ],
  210:      case Name of
  211:         ping_timeout ->
  212:             BaseModules ++ [{mod_ping, config_parser_helper:mod_config(mod_ping, mod_ping_opts())}];
  213:         _ ->
  214:             BaseModules
  215:     end.
  216: 
  217: required_sm_opts(group, parallel) ->
  218:     #{ack_freq => never};
  219: required_sm_opts(group, parallel_manual_ack_freq_1) ->
  220:     #{ack_freq => 1,
  221:       resume_timeout => ?LONG_TIMEOUT};
  222: required_sm_opts(group, manual_ack_freq_2) ->
  223:     #{ack_freq => 2};
  224: required_sm_opts(group, stream_mgmt_disabled) ->
  225:     stopped;
  226: required_sm_opts(group, parallel_unacknowledged_message_hook) ->
  227:     #{ack_freq => 1};
  228: required_sm_opts(group, manual_ack_freq_long_session_timeout) ->
  229:     #{ack_freq => 1, buffer_max => 1000};
  230: required_sm_opts(testcase, resume_expired_session_returns_correct_h) ->
  231:     #{ack_freq => 1,
  232:       resume_timeout => ?SHORT_TIMEOUT,
  233:       stale_h => stale_h(?LONG_TIMEOUT, ?LONG_TIMEOUT)};
  234: required_sm_opts(testcase, gc_repeat_after_never_means_no_cleaning) ->
  235:     #{stale_h => stale_h(?LONG_TIMEOUT, ?SHORT_TIMEOUT)};
  236: required_sm_opts(testcase, gc_repeat_after_timeout_does_clean) ->
  237:     #{stale_h => stale_h(?SHORT_TIMEOUT, ?SHORT_TIMEOUT)};
  238: required_sm_opts(testcase, ping_timeout) ->
  239:     #{ack_freq => 1,
  240:       resume_timeout => ?SHORT_TIMEOUT}.
  241: 
  242: common_sm_opts() ->
  243:     Backend = ct_helper:get_internal_database(),
  244:     #{buffer_max => ?SMALL_SM_BUFFER, backend => Backend}.
  245: 
  246: stale_h(RepeatAfter, Geriatric) ->
  247:     #{enabled => true,
  248:       repeat_after => RepeatAfter,
  249:       geriatric => Geriatric}.
  250: 
  251: make_smid() ->
  252:     base64:encode(crypto:strong_rand_bytes(21)).
  253: 
  254: register_smid(IntSmidId) ->
  255:     S = {SMID = make_smid(), IntSmidId},
  256:     ok = rpc(mim(), ?MOD_SM, register_stale_smid_h, [host_type(), SMID, IntSmidId]),
  257:     S.
  258: 
  259: register_some_smid_h(Config) ->
  260:     TestSmids = lists:map(fun register_smid/1, lists:seq(1, 3)),
  261:     [{smid_test, TestSmids} | Config].
  262: 
  263: mod_ping_opts() ->
  264:     #{send_pings => true,
  265:       ping_interval => ?PING_INTERVAL,
  266:       ping_req_timeout => ?PING_REQUEST_TIMEOUT,
  267:       timeout_action => kill}.
  268: 
  269: %%--------------------------------------------------------------------
  270: %% Tests
  271: %%--------------------------------------------------------------------
  272: 
  273: server_announces_sm(Config) ->
  274:     AliceSpec = escalus_fresh:freshen_spec(Config, alice),
  275:     {ok, #client{props = Props}, Features} = escalus_connection:start(AliceSpec,
  276:                                                                       [start_stream]),
  277:     true = escalus_session:can_use_stream_management(Props, Features).
  278: 
  279: 
  280: server_enables_sm_before_session(Config) ->
  281:     connect_fresh(Config, alice, sm_after_bind).
  282: 
  283: server_enables_sm_after_session(Config) ->
  284:     connect_fresh(Config, alice, sm_after_session).
  285: 
  286: server_returns_failed_after_start(Config) ->
  287:     Alice = connect_fresh(Config, alice, before_auth),
  288:     server_returns_failed(Alice).
  289: 
  290: server_returns_failed_after_auth(Config) ->
  291:     Alice = connect_fresh(Config, alice, auth),
  292:     server_returns_failed(Alice).
  293: 
  294: server_enables_resumption(Config) ->
  295:     Alice = connect_fresh(Config, alice, sr_presence),
  296:     escalus_connection:stop(Alice).
  297: 
  298: server_returns_failed(Alice) ->
  299:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
  300:     escalus:assert(is_sm_failed, [<<"unexpected-request">>],
  301:                    escalus_connection:get_stanza(Alice, enable_sm_failed)).
  302: 
  303: client_enables_sm_twice_fails_with_correct_error_stanza(Config) ->
  304:     Alice = connect_fresh(Config, alice, sm_before_session),
  305:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
  306:     escalus:assert(is_sm_failed, [<<"unexpected-request">>],
  307:                    escalus_connection:get_stanza(Alice, enable_sm_failed)),
  308:     escalus:assert(is_stream_end,
  309:                    escalus_connection:get_stanza(Alice, enable_sm_failed)),
  310:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)).
  311: 
  312: session_resumed_then_old_session_is_closed_gracefully_with_correct_error_stanza(Config) ->
  313:     %% GIVEN USER WITH STREAM RESUMPTION ENABLED
  314:     Alice = connect_fresh(Config, alice, sr_presence),
  315:     SMH = escalus_connection:get_sm_h(Alice),
  316:     %% WHEN USER RESUMES SESSION FROM NEW CLIENT
  317:     Alice2 = connect_resume(Alice, SMH),
  318:     process_initial_stanza(Alice2),
  319:     %% THEN: Old session is gracefully closed with the correct error stanza
  320:     escalus:assert(is_stream_error, [<<"conflict">>, <<>>],
  321:                    escalus_connection:get_stanza(Alice, close_old_stream)),
  322:     escalus:assert(is_stream_end,
  323:                    escalus_connection:get_stanza(Alice, close_old_stream)),
  324:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)),
  325:     true = escalus_connection:is_connected(Alice2),
  326:     escalus_connection:stop(Alice2).
  327: 
  328: session_resumed_and_old_session_dead_doesnt_route_error_to_new_session(Config) ->
  329:     %% GIVEN USER WITH STREAM RESUMPTION ENABLED
  330:     Alice = connect_fresh(Config, alice, sr_presence),
  331:     %% WHEN FIRST SESSION DIES AND USER RESUMES FROM NEW CLIENT
  332:     Alice2 = sm_helper:kill_and_connect_resume(Alice),
  333:     process_initial_stanza(Alice2),
  334:     %% THEN new session does not have any message rerouted
  335:     false = escalus_client:has_stanzas(Alice2),
  336:     true = escalus_connection:is_connected(Alice2),
  337:     escalus_connection:stop(Alice2).
  338: 
  339: basic_ack(Config) ->
  340:     Alice = connect_fresh(Config, alice, sm_after_session),
  341:     escalus_connection:send(Alice, escalus_stanza:roster_get()),
  342:     escalus:assert(is_roster_result,
  343:                    escalus_connection:get_stanza(Alice, roster_result)),
  344:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  345:     escalus:assert(is_sm_ack,
  346:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  347: 
  348: %% Test that "h" value is valid when:
  349: %% - SM is enabled *before* the session is established
  350: %% - <r/> is sent *before* the session is established
  351: h_ok_before_session(Config) ->
  352:     Alice = connect_fresh(Config, alice, sm_after_bind),
  353:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  354:     escalus:assert(is_sm_ack, [0],
  355:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  356: 
  357: %% Test that "h" value is valid when:
  358: %% - SM is enabled *before* the session is established
  359: %% - <r/> is sent *after* the session is established
  360: h_ok_after_session_enabled_before_session(Config) ->
  361:     Alice = connect_fresh(Config, alice, sm_before_session),
  362:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  363:     escalus:assert(is_sm_ack, [1],
  364:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  365: 
  366: %% Test that "h" value is valid when:
  367: %% - SM is enabled *after* the session is established
  368: %% - <r/> is sent *after* the session is established
  369: h_ok_after_session_enabled_after_session(Config) ->
  370:     Alice = connect_fresh(Config, alice, sm_after_session),
  371:     escalus_connection:send(Alice, escalus_stanza:roster_get()),
  372:     escalus:assert(is_roster_result,
  373:                    escalus_connection:get_stanza(Alice, roster_result)),
  374:     escalus_connection:send(Alice, escalus_stanza:sm_request()),
  375:     escalus:assert(is_sm_ack, [1],
  376:                    escalus_connection:get_stanza(Alice, stream_mgmt_ack)).
  377: 
  378: %% Test that "h" value is valid after exchanging a few messages.
  379: h_ok_after_a_chat(ConfigIn) ->
  380:     Config = escalus_users:update_userspec(ConfigIn, alice,
  381:                                            stream_management, true),
  382:     escalus:fresh_story(Config, [{alice,1}, {bob,1}], fun(Alice, Bob) ->
  383:         escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Hi, Bob!">>)),
  384:         escalus:assert(is_chat_message, [<<"Hi, Bob!">>],
  385:                        escalus:wait_for_stanza(Bob)),
  386:         escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)),
  387:         escalus:assert(is_chat_message, [<<"Hi, Alice!">>],
  388:                        escalus:wait_for_stanza(Alice)),
  389:         escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How's life?">>)),
  390:         escalus:assert(is_chat_message, [<<"How's life?">>],
  391:                        escalus:wait_for_stanza(Alice)),
  392:         escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"Pretty !@#$%^$">>)),
  393:         escalus:assert(is_chat_message, [<<"Pretty !@#$%^$">>],
  394:                        escalus:wait_for_stanza(Bob)),
  395:         escalus:send(Alice, escalus_stanza:sm_request()),
  396:         escalus:assert(is_sm_ack, [3], escalus:wait_for_stanza(Alice)),
  397:         %% Ack, so that unacked messages don't go into offline store.
  398:         escalus:send(Alice, escalus_stanza:sm_ack(3))
  399:     end).
  400: 
  401: h_non_given_closes_stream_gracefully(ConfigIn) ->
  402:     AStanza = #xmlel{name = <<"a">>,
  403:                attrs = [{<<"xmlns">>, <<"urn:xmpp:sm:3">>}]},
  404:     Config = escalus_users:update_userspec(ConfigIn, alice,
  405:                                            stream_management, true),
  406:     escalus:fresh_story(Config, [{alice,1}], fun(Alice) ->
  407:         C2SPid = mongoose_helper:get_session_pid(Alice),
  408:         escalus:send(Alice, AStanza),
  409:         escalus:assert(is_stream_error,
  410:                        [<<"policy-violation">>, <<>>],
  411:                        escalus:wait_for_stanza(Alice)),
  412:         mongoose_helper:wait_for_pid_to_die(C2SPid),
  413:         escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)),
  414:         true = escalus_connection:wait_for_close(Alice, timer:seconds(5))
  415:     end).
  416: 
  417: client_acks_more_than_sent(Config) ->
  418:     Alice = connect_fresh(Config, alice, sm_after_session),
  419:     escalus:send(Alice, escalus_stanza:sm_ack(5)),
  420:     StreamErrorStanza = escalus:wait_for_stanza(Alice),
  421:     %% Assert "undefined-condition" children
  422:     escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], StreamErrorStanza),
  423:     %% Assert "handled-count-too-high" children with correct attributes
  424:     HandledCountSubElement = exml_query:path(StreamErrorStanza,
  425:                                              [{element_with_ns,
  426:                                                <<"handled-count-too-high">>,
  427:                                                <<"urn:xmpp:sm:3">>}]),
  428:     <<"5">> = exml_query:attr(HandledCountSubElement, <<"h">>),
  429:     <<"0">> = exml_query:attr(HandledCountSubElement, <<"send-count">>),
  430:     %% Assert graceful stream end
  431:     escalus:assert(is_stream_end, escalus_connection:get_stanza(Alice, stream_end)),
  432:     true = escalus_connection:wait_for_close(Alice, timer:seconds(5)).
  433: 
  434: too_many_unacked_stanzas(Config) ->
  435:     Bob = connect_fresh(Config, bob, presence),
  436:     Alice = connect_fresh(Config, alice, sm_presence, manual),
  437:     get_ack(Alice),
  438:     [escalus:send(Bob, escalus_stanza:chat_to(Alice,
  439:         <<(integer_to_binary(N))/binary, ": Hi, Alice!">>))
  440:      || N <- lists:seq(1,?SMALL_SM_BUFFER)],
  441:     escalus:wait_for_stanzas(Alice, ?SMALL_SM_BUFFER * 2), % messages and ack requests
  442:     escalus:assert(is_stream_error, [<<"resource-constraint">>,
  443:                                      <<"too many unacked stanzas">>],
  444:                    %% wait for deferred buffer check
  445:                    escalus:wait_for_stanza(Alice, ?CONSTRAINT_CHECK_TIMEOUT + 1000)).
  446: 
  447: server_requests_ack(Config) ->
  448:     server_requests_ack(Config, 1).
  449: 
  450: server_requests_ack_freq_2(Config) ->
  451:     server_requests_ack(Config, 2).
  452: 
  453: server_requests_ack(Config, N) ->
  454:     Bob = connect_fresh(Config, bob, presence),
  455:     Alice = connect_fresh(Config, alice, sm_presence, manual),
  456:     %% ack request after initial presence
  457:     maybe_assert_ack_request(1, N, Alice),
  458:     escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi, Alice!">>)),
  459:     escalus:assert(is_chat_message, [<<"Hi, Alice!">>],
  460:                    escalus:wait_for_stanza(Alice)),
  461:     maybe_assert_ack_request(2, N, Alice).
  462: 
  463: maybe_assert_ack_request(StanzasRec, AckRequests, Alice) ->
  464:     ct:log("StanzasRec: ~p, AckRequests: ~p", [StanzasRec, AckRequests]),
  465:     case StanzasRec rem AckRequests of
  466:         0 ->
  467:             escalus:assert(is_sm_ack_request, escalus:wait_for_stanza(Alice));
  468:         _ ->
  469:             ok
  470:     end,
  471:     StanzasRec.
  472: 
  473: server_requests_ack_after_session(Config) ->
  474:     Alice = connect_fresh(Config, alice, sm_before_session, manual),
  475:     escalus:assert(is_sm_ack_request, escalus_connection:get_stanza(Alice, stream_mgmt_req)).
  476: 
  477: resend_more_offline_messages_than_buffer_size(Config) ->
  478:     %% connect bob and alice
  479:     Bob = connect_fresh(Config, bob, presence),
  480:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  481: 
  482:     % sent some messages - more than unacked buffer size
  483:     MessagesToSend = ?SMALL_SM_BUFFER + 1,
  484:     AliceJid = common_helper:get_bjid(AliceSpec),
  485:     [escalus_connection:send(Bob, escalus_stanza:chat_to(AliceJid, integer_to_binary(I)))
  486:      || I <- lists:seq(1, MessagesToSend)],
  487:     mongoose_helper:wait_for_n_offline_messages(AliceJid, MessagesToSend),
  488: 
  489:     % connect alice who wants to receive all messages from offline storage
  490:     Alice = connect_spec(AliceSpec, sm_after_session, manual),
  491:     send_initial_presence(Alice),
  492:     escalus:wait_for_stanzas(Alice, MessagesToSend * 2), %messages and ack requests
  493: 
  494:     escalus_connection:get_stanza(Alice, presence),
  495:     get_ack(Alice), % ack request
  496: 
  497:     % confirm messages + presence
  498:     escalus_connection:send(Alice, escalus_stanza:sm_ack(4)),
  499:     % wait for check constraint message on server side
  500: 
  501:     ct:sleep(?CONSTRAINT_CHECK_TIMEOUT + 1000),
  502:     false = escalus_client:has_stanzas(Alice),
  503:     % should not receive anything especially any stream errors
  504: 
  505:     escalus_connection:stop(Alice),
  506:     escalus_connection:stop(Bob).
  507: 
  508: resend_unacked_on_reconnection(Config) ->
  509:     Texts = three_texts(),
  510:     Bob = connect_fresh(Config, bob, presence),
  511:     Alice = connect_fresh(Config, alice, sm_presence),
  512:     AliceSpec = sm_helper:client_to_spec0(Alice),
  513:     %% Bob sends some messages to Alice.
  514:     sm_helper:send_messages(Bob, Alice, Texts),
  515:     %% Alice receives the messages.
  516:     sm_helper:wait_for_messages(Alice, Texts),
  517:     %% Alice disconnects without acking the messages.
  518:     sm_helper:stop_client_and_wait_for_termination(Alice),
  519:     %% Messages go to the offline store.
  520:     %% Alice receives the messages from the offline store.
  521:     NewAlice = connect_spec(AliceSpec, session, manual),
  522:     send_initial_presence(NewAlice),
  523:     sm_helper:wait_for_messages(NewAlice, Texts),
  524:     %% Alice acks the delayed messages so they won't go again
  525:     %% to the offline store.
  526:     escalus_connection:send(NewAlice, escalus_stanza:sm_ack(3)).
  527: 
  528: %% Remove wait_for_n_offline_messages and you will get anything, but preserve_order
  529: %% TODO Test without wait_for_n_offline_messages. It would require changes in SM
  530: %%      and more strict tests, reproducing delays in SM and in mod_offline.
  531: preserve_order(Config) ->
  532:     %% connect bob and alice
  533:     Bob = connect_fresh(Config, bob, presence),
  534:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  535:     AliceSpec = sm_helper:client_to_spec(Alice),
  536:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"1">>)),
  537: 
  538:     %% kill alice connection
  539:     escalus_connection:kill(Alice),
  540:     C2SPid = mongoose_helper:get_session_pid(Alice),
  541:     sm_helper:wait_until_resume_session(C2SPid),
  542: 
  543:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"2">>)),
  544:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"3">>)),
  545: 
  546:     NewAlice = connect_spec(AliceSpec, session, manual),
  547:     escalus_connection:send(NewAlice, escalus_stanza:enable_sm([resume])),
  548: 
  549:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"4">>)),
  550:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"5">>)),
  551: 
  552:     %% Without this check we will get stuff out of order
  553:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 5),
  554: 
  555:     send_initial_presence(NewAlice),
  556:     %% Without this check we can get "6, 1, 2, 3, 4, 5" messages in the next receive_all_ordered
  557:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 0),
  558:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"6">>)),
  559: 
  560:     %% "2, 3, 4, 5, 6, 1" is possible (where only 1 is from offline storage, the rest is from sm)
  561:     receive_all_ordered(NewAlice, 6),
  562: 
  563:     % replace connection
  564:     NewAlice2 = connect_spec(AliceSpec, session, manual),
  565:     % allow messages to go to the offline storage
  566:     mongoose_helper:wait_for_n_offline_messages(NewAlice, 6),
  567: 
  568:     send_initial_presence(NewAlice2),
  569: 
  570:     % receves messages in correct order
  571:     receive_all_ordered(NewAlice2, 6),
  572: 
  573:     escalus_connection:stop(Bob),
  574:     escalus_connection:stop(NewAlice2).
  575: 
  576: receive_all_ordered(Conn, Last) ->
  577:     receive_all_ordered(Conn, 1, Last, []).
  578: 
  579: %% Receive messages from N to Last.
  580: %% Ignores acks and presences.
  581: %% Handles case when out of order and when not enough stanzas.
  582: receive_all_ordered(_Conn, N, Last, Acc) when N > Last ->
  583:     Texts = lists:map(fun integer_to_binary/1, lists:seq(1, Last)),
  584:     sm_helper:assert_messages(Acc, Texts);
  585: receive_all_ordered(Conn, N, Last, Acc) ->
  586:     Stanzas = escalus:wait_for_stanzas(Conn, 1),
  587:     case Stanzas of
  588:         [#xmlel{name = <<"message">>}] ->
  589:             receive_all_ordered(Conn, N + 1, Last, Acc ++ Stanzas);
  590:         [_] -> %% Ack or presence
  591:             receive_all_ordered(Conn, N, Last, Acc);
  592:         [] ->
  593:             ct:fail({timeout_waiting, N, Acc})
  594:     end.
  595: 
  596: resend_unacked_after_resume_timeout(Config) ->
  597:     %% connect bob and alice
  598:     Bob = connect_fresh(Config, bob, presence),
  599:     Alice = connect_fresh(Config, alice, sr_presence),
  600:     AliceSpec = sm_helper:client_to_spec(Alice),
  601: 
  602:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  603:     %% kill alice connection
  604:     escalus_connection:kill(Alice),
  605: 
  606:     %% ensure there is no session
  607:     C2SPid = mongoose_helper:get_session_pid(Alice),
  608:     sm_helper:wait_until_resume_session(C2SPid),
  609: 
  610:     %% alice come back and receives unacked message
  611:     NewAlice = connect_spec(AliceSpec, session),
  612:     send_initial_presence(NewAlice),
  613: 
  614:     escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)],
  615:                                  escalus:wait_for_stanzas(NewAlice, 2)),
  616: 
  617:     escalus_connection:stop(Bob),
  618:     escalus_connection:stop(NewAlice).
  619: 
  620: ping_timeout(Config) ->
  621:     %% make sure there are no leftover stanzas in the history
  622:     ?assertEqual([], get_stanzas_filtered_by_mod_ping()),
  623: 
  624:     %% connect Alice
  625:     Alice = connect_fresh(Config, alice, session),
  626: 
  627:     %% manually configure stream resumption to avoid ping stanzas
  628:     escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])),
  629:     Pred = fun(Stanza) -> escalus_pred:is_sm_enabled([resume], Stanza) end,
  630:     SM = escalus_connection:receive_stanza(Alice, #{ pred => Pred }),
  631:     SMH = escalus_connection:get_sm_h(Alice),
  632:     SMID = exml_query:attr(SM, <<"id">>),
  633: 
  634:     escalus_client:wait_for_stanza(Alice),
  635:     ct:sleep(?PING_REQUEST_TIMEOUT + ?PING_INTERVAL + timer:seconds(1)),
  636: 
  637:     %% attempt to resume the session after the connection drop
  638:     sm_helper:kill_client_and_wait_for_termination(Alice),
  639:     NewAlice = connect_same(Alice, auth),
  640:     escalus_connection:send(NewAlice, escalus_stanza:resume(SMID, SMH)),
  641: 
  642:     %% after resume_timeout, we expect the session to be closed
  643:     escalus_connection:get_stanza(NewAlice, failed_resumption),
  644: 
  645:     %% bind a new session and expect unacknowledged messages to be resent
  646:     escalus_session:session(escalus_session:bind(NewAlice)),
  647:     send_initial_presence(NewAlice),
  648: 
  649:     %% check if the error stanza was handled by mod_ping
  650:     FilteredStanzas = get_stanzas_filtered_by_mod_ping(),
  651:     ?assertNotEqual(length(FilteredStanzas), 0),
  652:     Fun = fun(Stanza) ->
  653:             escalus:assert(is_iq_error, Stanza),
  654:             ?assertNotEqual(undefined,
  655:                 exml_query:subelement_with_name_and_ns(Stanza, <<"ping">>, <<"urn:xmpp:ping">>))
  656:           end,
  657:     lists:foreach(Fun, FilteredStanzas),
  658: 
  659:     %% kill the connection to avoid errors
  660:     escalus_connection:kill(NewAlice).
  661: 
  662: resume_expired_session_returns_correct_h(Config) ->
  663:     %% connect bob and alice
  664:     Bob = connect_fresh(Config, bob, sr_presence),
  665:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  666:     get_ack(Alice),
  667: 
  668:     %% Bob sends a message to Alice, and Alice receives it but doesn't acknowledge
  669:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  670:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus:wait_for_stanza(Alice)),
  671:     %% alice comes back, but too late, so resumption doesn't work,
  672:     %% but she receives the previous h = 1 anyway
  673:     %% NewAlice is also manual ack
  674:     NewAlice = sm_helper:kill_and_connect_with_resume_session_without_waiting_for_result(Alice),
  675:     FailedResumption = escalus_connection:get_stanza(NewAlice, failed_resumption),
  676:     <<"1">> = exml_query:attr(FailedResumption, <<"h">>),
  677:     %% And we can continue with bind and session
  678:     escalus_session:session(escalus_session:bind(NewAlice)),
  679:     send_initial_presence(NewAlice),
  680:     Stanzas = [escalus_connection:get_stanza(NewAlice, {msg, 1}),
  681:                escalus_connection:get_stanza(NewAlice, {msg, 2})],
  682:     escalus_new_assert:mix_match([is_presence, is_chat(<<"msg-1">>)], Stanzas),
  683:     escalus_connection:stop(Bob),
  684:     escalus_connection:stop(NewAlice).
  685: 
  686: gc_repeat_after_never_means_no_cleaning(Config) ->
  687:     [{SMID1, _}, {SMID2, _}, {SMID3, _}] = ?config(smid_test, Config),
  688:     {stale_h, 1} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID1]),
  689:     {stale_h, 2} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID2]),
  690:     {stale_h, 3} = rpc(mim(), ?MOD_SM, get_session_from_smid, [host_type(), SMID3]).
  691: 
  692: gc_repeat_after_timeout_does_clean(Config) ->
  693:     [{SMID1, _} | _ ] = ?config(smid_test, Config),
  694:     mongoose_helper:wait_until(fun() ->
  695:                                        rpc(mim(), ?MOD_SM, get_stale_h, [host_type(), SMID1])
  696:                                end,
  697:                                {error, smid_not_found},
  698:                                #{name => smid_garbage_collected}).
  699: 
  700: resume_session_state_send_message_without_ack(Config) ->
  701:     resume_session_state_send_message_generic(Config, no_ack).
  702: 
  703: resume_session_state_send_message_with_ack(Config) ->
  704:     resume_session_state_send_message_generic(Config, ack).
  705: 
  706: resume_session_state_send_message_generic(Config, AckInitialPresence) ->
  707:     %% connect bob and alice
  708:     Bob = connect_fresh(Config, bob, presence),
  709:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  710:     maybe_ack_initial_presence(Alice, AckInitialPresence),
  711:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  712:     %% kill alice connection
  713:     C2SPid = mongoose_helper:get_session_pid(Alice),
  714:     escalus_connection:kill(Alice),
  715:     sm_helper:wait_until_resume_session(C2SPid),
  716:     sm_helper:assert_alive_resources(Alice, 1),
  717: 
  718:     %% send some messages and check if c2s can handle it
  719:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)),
  720:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)),
  721:     %% suspend the process to ensure that Alice has enough time to reconnect,
  722:     %% before resumption timeout occurs.
  723:     ok = rpc(mim(), sys, suspend, [C2SPid]),
  724: 
  725:     %% alice comes back and receives unacked message
  726:     NewAlice = connect_same(Alice, presence),
  727:     %% now we can resume c2s process of the old connection
  728:     %% and let it process session resumption timeout
  729:     ok = rpc(mim(), sys, resume, [C2SPid]),
  730:     Stanzas = escalus:wait_for_stanzas(NewAlice, 3),
  731: 
  732:     % what about order ?
  733:     % alice receive presence from herself and 3 unacked messages from bob
  734:     escalus_new_assert:mix_match([is_chat(<<"msg-1">>),
  735:                                   is_chat(<<"msg-2">>),
  736:                                   is_chat(<<"msg-3">>)],
  737:                                  Stanzas),
  738:     escalus_connection:stop(Bob),
  739:     escalus_connection:stop(NewAlice).
  740: 
  741: %%for instance it can be done by mod ping
  742: resume_session_state_stop_c2s(Config) ->
  743:     Bob = connect_fresh(Config, bob, presence),
  744:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  745: 
  746:     get_ack(Alice),
  747:     ack_initial_presence(Alice),
  748: 
  749:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  750:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(Alice, msg)),
  751: 
  752:     %% get pid of c2s
  753:     C2SPid = mongoose_helper:get_session_pid(Alice),
  754:     %% Wait c2s process to process our presence ack.
  755:     %% Otherwise, we can receive two initial presences sometimes.
  756:     sm_helper:wait_for_c2s_unacked_count(C2SPid, 1),
  757: 
  758:     % kill alice connection
  759:     escalus_connection:kill(Alice),
  760:     % session should be alive
  761:     sm_helper:assert_alive_resources(Alice, 1),
  762:     rpc(mim(), mongoose_c2s, stop, [C2SPid, normal]),
  763:     sm_helper:wait_until_resume_session(C2SPid),
  764:     %% suspend the process to ensure that Alice has enough time to reconnect,
  765:     %% before resumption timeout occurs.
  766:     ok = rpc(mim(), sys, suspend, [C2SPid]),
  767: 
  768:     %% alice comes back and receives unacked message
  769:     NewAlice = connect_same(Alice, presence, manual),
  770:     %% now we can resume c2s process of the old connection
  771:     %% and let it process session resumption timeout
  772:     ok = rpc(mim(), sys, resume, [C2SPid]),
  773: 
  774:     escalus:assert(is_chat_message, [<<"msg-1">>], escalus_connection:get_stanza(NewAlice, msg)),
  775:     escalus_connection:stop(Bob),
  776:     escalus_connection:stop(NewAlice).
  777: 
  778: %% This test only verifies the validity of helpers (get_session_pid,
  779: %% get_c2s_state_name) written for wait_for_resumption
  780: %% testcase.
  781: session_established(Config) ->
  782:     Alice = connect_fresh(Config, alice, presence),
  783:     C2SPid = mongoose_helper:get_session_pid(Alice),
  784:     session_established = mongoose_helper:get_c2s_state_name(C2SPid),
  785:     escalus_connection:stop(Alice).
  786: 
  787: %% Ensure that after a violent disconnection,
  788: %% the c2s waits for resumption (but don't resume yet).
  789: wait_for_resumption(Config) ->
  790:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  791:     Bob = connect_fresh(Config, bob, session),
  792:     Texts = three_texts(),
  793:     {C2SPid, _} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
  794:     sm_helper:wait_until_resume_session(C2SPid).
  795: 
  796: unacknowledged_message_hook_filter(Config) ->
  797:     FilterText = <<"filter">>,
  798:     Bob = connect_fresh(Config, bob, presence),
  799:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  800:     Resource = proplists:get_value(username, AliceSpec),
  801:     HookHandlerExtra = start_filter_hook_listener(FilterText, Resource),
  802:     Alice = connect_spec([{resource, Resource} | AliceSpec], sr_presence, manual),
  803:     %% Ack the presence stanza
  804:     get_ack(Alice),
  805:     ack_initial_presence(Alice),
  806:     Messages = [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>, <<"msg-4">>],
  807:     All = [<<"msg-1">>, FilterText, <<"msg-2">>, FilterText, <<"msg-3">>, <<"msg-4">>],
  808:     [ escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, Body)) || Body <- All ],
  809:     %% kill alice connection
  810:     C2SPid = mongoose_helper:get_session_pid(Alice),
  811:     escalus_connection:kill(Alice),
  812:     sm_helper:wait_until_resume_session(C2SPid),
  813:     sm_helper:assert_alive_resources(Alice, 1),
  814:     %% ensure second C2S is registered so all the messages are bounced properly
  815:     NewAlice = connect_spec([{resource, <<"2">>}| AliceSpec], sr_presence, manual),
  816:     send_initial_presence(NewAlice),
  817:     sm_helper:wait_for_resource_count(NewAlice, 2),
  818:     ok = rpc(mim(), sys, terminate, [C2SPid, normal]),
  819:     %% verify that the filtered message is never received
  820:     verify_no_receive_filtertext(NewAlice, FilterText, Messages),
  821:     stop_hook_listener(HookHandlerExtra),
  822:     escalus_connection:stop(Bob).
  823: 
  824: verify_no_receive_filtertext(_, _, []) ->
  825:     ok;
  826: verify_no_receive_filtertext(Client, FilterText, Messages) ->
  827:     case escalus:wait_for_stanzas(Client, 1, 500) of
  828:         [] -> ct:fail("Messages pending: ~p~n", [Messages]);
  829:         [St] ->
  830:             case exml_query:path(St, [{element, <<"body">>}, cdata]) of
  831:                 undefined ->
  832:                     verify_no_receive_filtertext(Client, FilterText, Messages);
  833:                 FilterText ->
  834:                     ct:fail("Composing forwarded from a different c2s process");
  835:                 Message ->
  836:                     Pred = fun(Msg) -> Msg =/= Message end,
  837:                     {Pending, _} = lists:partition(Pred, Messages),
  838:                     verify_no_receive_filtertext(Client, FilterText, Pending)
  839:             end
  840:     end.
  841: 
  842: unacknowledged_message_hook_resume(Config) ->
  843:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_resume/4, Config).
  844: 
  845: unacknowledged_message_hook_resume(AliceSpec, Resource, SMID, _C2SPid) ->
  846:     NewAlice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
  847:     send_initial_presence(NewAlice),
  848:     {Resource, NewAlice}.
  849: 
  850: unacknowledged_message_hook_bounce(Config) ->
  851:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_bounce/4, Config).
  852: 
  853: unacknowledged_message_hook_bounce(AliceSpec, Resource, _SMID, C2SPid) ->
  854:     NewResource = <<"new_", Resource/binary>>,
  855:     NewSpec = lists:keystore(resource, 1, AliceSpec, {resource, NewResource}),
  856:     NewAlice = connect_spec(NewSpec, sr_session, manual),
  857:     send_initial_presence(NewAlice),
  858:     %% ensure second C2S is registered so all the messages are bounced properly
  859:     sm_helper:wait_for_resource_count(NewAlice, 2),
  860:     ok = rpc(mim(), sys, terminate, [C2SPid, normal]),
  861:     {NewResource, NewAlice}.
  862: 
  863: unacknowledged_message_hook_offline(Config) ->
  864:     unacknowledged_message_hook_common(fun unacknowledged_message_hook_offline/4, Config).
  865: 
  866: unacknowledged_message_hook_offline(AliceSpec, Resource, _SMID, C2SPid) ->
  867:     C2SRef = erlang:monitor(process, C2SPid),
  868:     sm_helper:wait_for_process_termination(C2SRef),
  869:     %% reset the session, so old C2S process is stopped
  870:     NewAlice = connect_spec(AliceSpec, sr_session, manual),
  871:     %% wait for old C2S termination before send presence. other way
  872:     %% some of the latest unacknowledged messages can be bounced to
  873:     %% the new C2S process instead of going to the mod_offline storage.
  874:     %% looks like all the unacknowledged messages arrive to the new
  875:     %% C2S, but the message sequence is broken (the bounced messages
  876:     %% delivered before the messages from the mod_offline storage)
  877:     send_initial_presence(NewAlice),
  878:     {Resource, NewAlice}.
  879: 
  880: unacknowledged_message_hook_common(RestartConnectionFN, Config) ->
  881:     %% connect bob and alice
  882:     Bob = connect_fresh(Config, bob, presence),
  883: 
  884:     AliceSpec0 = escalus_fresh:create_fresh_user(Config, alice),
  885:     Resource = proplists:get_value(username, AliceSpec0),
  886:     AliceSpec = [{resource, Resource} | AliceSpec0],
  887:     HookHandlerExtra = start_hook_listener(Resource),
  888:     Alice = connect_spec(AliceSpec, sr_presence, manual),
  889:     %% Ack the presence stanza
  890:     get_ack(Alice),
  891:     ack_initial_presence(Alice),
  892: 
  893:     SMID = sm_helper:client_to_smid(Alice),
  894: 
  895:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-1">>)),
  896:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-2">>)),
  897:     %% kill alice connection
  898:     C2SPid = mongoose_helper:get_session_pid(Alice),
  899:     escalus_connection:kill(Alice),
  900:     sm_helper:wait_until_resume_session(C2SPid),
  901:     sm_helper:assert_alive_resources(Alice, 1),
  902: 
  903:     escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  904:     escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  905:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  906: 
  907:     %% send some messages and check if c2s can handle it
  908:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-3">>)),
  909:     escalus_connection:send(Bob, escalus_stanza:chat_to_short_jid(Alice, <<"msg-4">>)),
  910:     escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  911:     escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(0, Resource, 100)),
  912:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  913: 
  914:     %% alice comes back and receives unacked message
  915:     {NewResource, NewAlice} = RestartConnectionFN(AliceSpec, Resource, SMID, C2SPid),
  916: 
  917:     mongoose_helper:wait_until(
  918:         fun() ->
  919:             Stanza = escalus_connection:get_stanza(NewAlice, msg),
  920:             escalus:assert(is_chat_message, [<<"msg-4">>], Stanza),
  921:             ok
  922:         end, ok),
  923: 
  924:     NewC2SPid = mongoose_helper:get_session_pid(NewAlice),
  925:     escalus_connection:kill(NewAlice),
  926:     sm_helper:wait_until_resume_session(NewC2SPid),
  927: 
  928:     escalus:assert(is_chat_message, [<<"msg-1">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  929:     escalus:assert(is_chat_message, [<<"msg-2">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  930:     escalus:assert(is_chat_message, [<<"msg-3">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  931:     escalus:assert(is_chat_message, [<<"msg-4">>], wait_for_unacked_msg_hook(1, NewResource, 100)),
  932:     ?assertEqual(timeout, wait_for_unacked_msg_hook(0, Resource, 100)),
  933:     stop_hook_listener(HookHandlerExtra),
  934:     escalus_connection:stop(Bob).
  935: 
  936: resume_session(Config) ->
  937:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  938:     Texts = three_texts(),
  939:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  940:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts),
  941:         %% Resume the session.
  942:         Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
  943:         %% Alice receives the unacked messages from the previous
  944:         %% interrupted session.
  945:         sm_helper:wait_for_messages(Alice, Texts),
  946:         %% Alice acks the received messages.
  947:         escalus_connection:send(Alice, escalus_stanza:sm_ack(5)),
  948:         escalus_connection:stop(Alice)
  949:     end).
  950: 
  951: resume_session_with_wrong_h_does_not_leak_sessions(Config) ->
  952:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
  953:     Messages = three_texts(),
  954:     HostType = host_type(),
  955:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
  956:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Messages),
  957:         %% Resume the session.
  958:         Alice = connect_spec(AliceSpec, auth, manual),
  959:         Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 30),
  960:         escalus:assert(is_stream_error, [<<"undefined-condition">>, <<>>], Resumed),
  961:         escalus_connection:wait_for_close(Alice, timer:seconds(5)),
  962:         Fun = fun() ->
  963:                       [] = sm_helper:get_user_present_resources(Alice),
  964:                       sm_helper:get_sid_by_stream_id(HostType, SMID)
  965:               end,
  966:         mongoose_helper:wait_until(Fun, {error, smid_not_found}, #{name => smid_is_cleaned})
  967:     end).
  968: 
  969: resume_session_with_wrong_sid_returns_item_not_found(Config) ->
  970:     session_resumption_expects_item_not_found(Config, <<"wrong-sid">>).
  971: 
  972: resume_session_with_wrong_namespace_is_a_noop(Config) ->
  973:     Alice = connect_fresh(Config, alice, auth),
  974:     #xmlel{attrs = Attrs} = Resume = escalus_stanza:resume(<<"doesnt_matter">>, 4),
  975:     Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, <<"not-stream-mgnt">>}),
  976:     escalus_connection:send(Alice, Resume#xmlel{attrs = Attrs2}),
  977:     escalus_assert:has_no_stanzas(Alice),
  978:     [] = sm_helper:get_user_present_resources(Alice),
  979:     true = escalus_connection:is_connected(Alice),
  980:     escalus_connection:stop(Alice).
  981: 
  982: resume_dead_session_results_in_item_not_found(Config) ->
  983:     SMID = base64:encode(crypto:strong_rand_bytes(21)),
  984:     SID = {os:timestamp(), undefined},
  985:     HostType = host_type(),
  986:     rpc(mim(), ?MOD_SM, register_smid, [HostType, SMID, SID]),
  987:     session_resumption_expects_item_not_found(Config, SMID).
  988: 
  989: session_resumption_expects_item_not_found(Config, SMID) ->
  990:     Alice = connect_fresh(Config, alice, auth),
  991:     Resumed = sm_helper:try_to_resume_stream(Alice, SMID, 2),
  992:     escalus:assert(is_sm_failed, [<<"item-not-found">>], Resumed),
  993:     [] = sm_helper:get_user_present_resources(Alice),
  994:     true = escalus_connection:is_connected(Alice),
  995:     escalus_connection:stop(Alice).
  996: 
  997: resume_session_kills_old_C2S_gracefully(Config) ->
  998:     Alice = connect_fresh(Config, alice, sr_presence, manual),
  999:     C2SPid = mongoose_helper:get_session_pid(Alice),
 1000: 
 1001:     %% Monitor the C2S process and disconnect Alice.
 1002:     MonitorRef = sm_helper:monitor_session(Alice),
 1003:     escalus_client:kill_connection(Config, Alice),
 1004: 
 1005:     %% Ensure the c2s process is waiting for resumption.
 1006:     sm_helper:wait_until_resume_session(C2SPid),
 1007: 
 1008:     %% Resume the session.
 1009:     NewAlice = connect_resume(Alice, 1),
 1010: 
 1011:     %% C2S process should die gracefully with Reason=normal.
 1012:     sm_helper:wait_for_process_termination(MonitorRef),
 1013:     escalus_connection:stop(NewAlice).
 1014: 
 1015: carboncopy_works(Config) ->
 1016:     escalus:fresh_story(Config, [{alice, 2}, {bob, 1}], fun(Alice1, Alice, Bob) ->
 1017:         mongoose_helper:enable_carbons([Alice1, Alice]),
 1018:         escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)),
 1019:         sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]),
 1020:         carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1})
 1021:     end).
 1022: 
 1023: carboncopy_works_after_resume(Config) ->
 1024:     Texts = three_texts(),
 1025:     escalus:fresh_story(Config, [{alice, 1}, {bob, 1}], fun(Alice1, Bob) ->
 1026:         AliceSpec = [{resource, <<"res2">>} | sm_helper:client_to_spec(Alice1)],
 1027:         F = fun(Alice2) ->
 1028:             [escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(A)) || A <- [Alice1, Alice2]],
 1029:             mongoose_helper:enable_carbons([Alice1, Alice2])
 1030:             end,
 1031:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F),
 1032:         %% Resume the session.
 1033:         Alice = connect_spec(AliceSpec, {resume, SMID, 1}, manual),
 1034:         wait_for_carbon_with_bodies(Alice1, Texts, #{from => Bob, to => Alice}),
 1035:         %% Get a presence from Alice1 again
 1036:         escalus:assert(is_presence_with_type, [<<"available">>], escalus:wait_for_stanza(Alice)),
 1037:         %% Alice receives an IQ result from the carbon copy enable request
 1038:         escalus:assert(is_iq_result, [], escalus:wait_for_stanza(Alice)),
 1039:         %% Alice receives the unacked messages from the previous
 1040:         %% interrupted session.
 1041:         sm_helper:wait_for_messages(Alice, Texts),
 1042:         %% Alice acks the received messages.
 1043:         escalus_connection:send(Alice, escalus_stanza:sm_ack(5)),
 1044:         %% Direct send
 1045:         escalus_connection:send(Bob, escalus_stanza:chat_to(Alice1, <<"msg-4">>)),
 1046:         sm_helper:wait_for_messages(Alice1, [<<"msg-4">>]),
 1047:         carboncopy_helper:wait_for_carbon_chat_with_body(Alice, <<"msg-4">>, #{from => Bob, to => Alice1}),
 1048:         escalus_connection:stop(Alice)
 1049:     end).
 1050: 
 1051: wait_for_carbon_with_bodies(Client, Texts, Params) ->
 1052:     [carboncopy_helper:wait_for_carbon_chat_with_body(Client, Text, Params) || Text <- Texts].
 1053: 
 1054: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts) ->
 1055:     F = fun(_Client) -> ok end,
 1056:     buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F).
 1057: 
 1058: buffer_unacked_messages_and_die(Config, AliceSpec, Bob, Texts, F) ->
 1059:     Alice = connect_spec(AliceSpec, sr_presence, manual),
 1060:     F(Alice),
 1061:     C2SPid = mongoose_helper:get_session_pid(Alice),
 1062:     %% Bobs sends some messages to Alice.
 1063:     sm_helper:send_messages(Bob, Alice, Texts),
 1064:     %% Alice receives them, but doesn't ack.
 1065:     sm_helper:wait_for_messages(Alice, Texts),
 1066:     %% Alice's connection is violently terminated.
 1067:     escalus_client:kill_connection(Config, Alice),
 1068:     sm_helper:wait_until_resume_session(C2SPid),
 1069:     SMID = sm_helper:client_to_smid(Alice),
 1070:     {C2SPid, SMID}.
 1071: 
 1072: aggressively_pipelined_resume(Config) ->
 1073:     AliceSpec = [{manual_ack, true}, {parser_opts, [{start_tag, <<"stream:stream">>}]}
 1074:                  | escalus_fresh:create_fresh_user(Config, alice)],
 1075:     UnackedMessages = three_texts(),
 1076:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1077:         {_, SMID} = buffer_unacked_messages_and_die(Config, AliceSpec, Bob, UnackedMessages),
 1078:         %% Resume the session.
 1079:         Alice = escalus_connection:connect(AliceSpec),
 1080: 
 1081:         Username = proplists:get_value(username, AliceSpec),
 1082:         Password = proplists:get_value(password, AliceSpec),
 1083:         Payload = <<0:8,Username/binary,0:8,Password/binary>>,
 1084:         Server = proplists:get_value(server, AliceSpec),
 1085: 
 1086:         Stream = escalus_stanza:stream_start(Server, <<"jabber:client">>),
 1087:         Auth = escalus_stanza:auth(<<"PLAIN">>, [#xmlcdata{content = base64:encode(Payload)}]),
 1088:         AuthStream = escalus_stanza:stream_start(Server, <<"jabber:client">>),
 1089:         Resume = escalus_stanza:resume(SMID, 2),
 1090: 
 1091:         escalus_client:send(Alice, [Stream, Auth, AuthStream, Resume]),
 1092:         Messages = [escalus_connection:get_stanza(Alice, {get_resumed, I}) || I <- lists:seq(1, 6)],
 1093:         escalus:assert(is_sm_resumed, [SMID], lists:last(Messages)),
 1094: 
 1095:         escalus_connection:stop(Alice)
 1096:     end).
 1097: 
 1098: %% This is a regression test for a case when a session processes a request, which will
 1099: %% receive a response from the server, i.e. will have the same origin SID in mongoose_acc.
 1100: %% Without proper handling, the reply would be rejected because the resumed session
 1101: %% has new SID.
 1102: replies_are_processed_by_resumed_session(Config) ->
 1103:     %% GIVEN a session and registered special IQ handler (added in init_per_testcase),
 1104:     %% that waits for old session process to terminate (at this point new process
 1105:     %% has fully taken over) and then actually sends the reply.
 1106:     Alice = connect_fresh(Config, alice, sr_presence),
 1107: 
 1108:     %% WHEN a client sends IQ request to the special handler...
 1109:     IQReq = escalus_stanza:iq_get(regression_ns(), []),
 1110:     escalus:send(Alice, IQReq),
 1111: 
 1112:     %% ... goes down and session is resumed.
 1113:     Alice2 = sm_helper:kill_and_connect_resume(Alice),
 1114: 
 1115:     %% THEN the client receives the reply properly.
 1116:     IQReply = escalus:wait_for_stanza(Alice2),
 1117:     escalus:assert(is_iq_result, [IQReq], IQReply),
 1118:     escalus_connection:stop(Alice2).
 1119: 
 1120: %% This is a regression test for a bug, which manifested in following scenario
 1121: %% (due to improper presence sub requests buffering):
 1122: %% 1. Bob is online, Alice is offline
 1123: %% 2. Bob subscribes to Alice's presence;
 1124: %% 3. Alice becomes online
 1125: %% 4. Bob sends a message to Alice
 1126: %% 5. Alice doesn't SM-ack the request or message, terminates the connection
 1127: %% 6. Alice reconnects but with session *replace*, not resume
 1128: %% 7. Packet rerouting crashes on the buffered sub request, preventing resending whole buffer
 1129: %% 8. Alice doesn't receive the buffered message
 1130: subscription_requests_are_buffered_properly(Config) ->
 1131:     AliceSpec = escalus_fresh:create_fresh_user(Config, alice),
 1132:     MsgBody = <<"buffered">>,
 1133:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1134:         % GIVEN Bob's pending subscription to Alice's presence
 1135:         AliceJid = common_helper:get_bjid(AliceSpec),
 1136:         escalus:send(Bob, escalus_stanza:presence_direct(AliceJid, <<"subscribe">>)),
 1137:         _RosterPushReq = escalus:wait_for_stanza(Bob),
 1138: 
 1139:         % WHEN Alice becomes online...
 1140:         Alice = connect_spec(AliceSpec, sr_session, manual),
 1141:         send_initial_presence(Alice),
 1142:         %% subscribe could come before the initial presence
 1143:         escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)],
 1144:                             escalus:wait_for_stanzas(Alice, 2)),
 1145: 
 1146:         % ...and Bob sends a message to Alice...
 1147:         escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1148:         MsgStanza = escalus:wait_for_stanza(Alice),
 1149:         escalus:assert(is_chat_message, [MsgBody], MsgStanza),
 1150: 
 1151:         % ...and Alice terminates connection without acking anything...
 1152:         escalus_client:kill_connection(Config, Alice),
 1153: 
 1154:         % ...and reconnects with session replacement.
 1155:         Alice2 = connect_spec(AliceSpec, session, manual),
 1156: 
 1157:         % THEN Alice receives (without sending initial presence):
 1158:         % * buffered Bob's message (like above)
 1159:         % Alice DOESN'T receive:
 1160:         % * buffered subscription request because it is dropped by ejabberd_sm
 1161:         %   because it's treated like repeated sub request to bare JID, so it's not
 1162:         %   processed by any sub req handler (like mod_roster)
 1163:         % * buffered available presence from Alice - because it is addressed to another SID
 1164:         %   and Alice2 is a brand new session
 1165:         escalus:assert(is_chat_message, [MsgBody], escalus:wait_for_stanza(Alice2)),
 1166:         sm_helper:send_and_receive(Bob, Alice2, <<"flush1">>),
 1167:         escalus_assert:has_no_stanzas(Alice2),
 1168: 
 1169:         %% Only once an initial presence is sent, a subscription request is sent
 1170:         send_initial_presence(Alice2),
 1171:         escalus:assert_many([is_presence(<<"available">>), is_presence(<<"subscribe">>)],
 1172:                             escalus:wait_for_stanzas(Alice2, 2)),
 1173: 
 1174:         sm_helper:send_and_receive(Bob, Alice2, <<"flush2">>),
 1175:         escalus_assert:has_no_stanzas(Alice2),
 1176: 
 1177:         escalus_connection:stop(Alice2)
 1178:     end).
 1179: 
 1180: %% This is a regression test for a bug, due to which messages sent to old session
 1181: %% in a middle of state handover were not appended properly to SM buffer.
 1182: %% Scenario to reproduce:
 1183: %% 1. Online Bob and Alice
 1184: %% 2. Alice kills the connection
 1185: %% 3. Alice's session is suspended
 1186: %% 4. Alice resumes session with new connection. At this moment new session is still not
 1187: %%    present in session table. `resume` request is stuck in old proc mailbox.
 1188: %% 5. Bob sends a message to Alice. Only old proc is present in session table so now
 1189: %%    old session has two messages in mailbox: `resume` and XML from Bob
 1190: %% 6. We resume old process and it begins session handover
 1191: %% 7. Bob's message is appended to SM buffer in "flush" step
 1192: %% 8. With bug fixed, the message is retransmitted properly
 1193: messages_are_properly_flushed_during_resumption(Config) ->
 1194:     escalus:fresh_story(Config, [{bob, 1}], fun(Bob) ->
 1195:         % GIVEN (online Bob) and (Alice in resume state); Alice's session is suspended
 1196:         Alice = connect_fresh(Config, alice, sr_presence),
 1197:         SMH = escalus_connection:get_sm_h(Alice),
 1198:         escalus_client:kill_connection(Config, Alice),
 1199:         %% The receiver process would stop now
 1200:         C2SPid = mongoose_helper:get_session_pid(Alice),
 1201:         sm_helper:wait_until_resume_session(C2SPid),
 1202: 
 1203:         sm_helper:wait_for_queue_length(C2SPid, 0),
 1204:         ok = rpc(mim(), sys, suspend, [C2SPid]),
 1205: 
 1206:         % WHEN new session requests resumption
 1207:         % we wait until that old session has resumption request enqueued;
 1208:         % we need it to ensure the order of messages: resume first, Bob's chat second.
 1209:         % Actual wait and message sent by Bob is done in separate process
 1210:         % because new client start will block until old process is resumed
 1211: 
 1212:         MsgBody = <<"flush-regression">>,
 1213:         spawn_link(fun() ->
 1214:                       sm_helper:wait_for_queue_length(C2SPid, 1),
 1215: 
 1216:                       % Bob sends a message...
 1217:                       escalus:send(Bob, escalus_stanza:chat_to(Alice, MsgBody)),
 1218: 
 1219:                       % ...we ensure that a message is enqueued in Alice's session...
 1220:                       % (2 messages = resume request + Bob's message)
 1221:                       sm_helper:wait_for_queue_length(C2SPid, 2),
 1222: 
 1223:                       % ...and old process is resumed.
 1224:                       ok = rpc(mim(), sys, resume, [C2SPid])
 1225:               end),
 1226:         Alice2 = connect_resume(Alice, SMH),
 1227:         % THEN Alice's new session receives Bob's message
 1228:         RecvMsg = escalus:wait_for_stanza(Alice2),
 1229:         escalus:assert(is_chat_message, [MsgBody], RecvMsg)
 1230:       end).
 1231: 
 1232: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt(Config) ->
 1233:     Alice = connect_fresh(Config, alice, session, manual),
 1234:     %% Should not crash anything!
 1235:     escalus_connection:send(Alice, escalus_stanza:enable_sm()),
 1236:     Response = escalus_connection:get_stanza(Alice, service_unavailable),
 1237:     escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response),
 1238:     escalus_connection:stop(Alice).
 1239: 
 1240: no_crash_if_stream_mgmt_disabled_but_client_requests_stream_mgmt_with_resumption(Config) ->
 1241:     Alice = connect_fresh(Config, alice, session, manual),
 1242:     %% Should not crash anything!
 1243:     escalus_connection:send(Alice, escalus_stanza:enable_sm([resume])),
 1244:     Response = escalus_connection:get_stanza(Alice, service_unavailable),
 1245:     escalus:assert(is_sm_failed, [<<"feature-not-implemented">>], Response),
 1246:     escalus_connection:stop(Alice).
 1247: 
 1248: %%--------------------------------------------------------------------
 1249: %% Helpers
 1250: %%--------------------------------------------------------------------
 1251: start_hook_listener(Resource) ->
 1252:     TestCasePid = self(),
 1253:     rpc(mim(), ?MODULE, rpc_start_hook_handler, [TestCasePid, Resource, host_type()]).
 1254: 
 1255: stop_hook_listener(HookExtra) ->
 1256:     rpc(mim(), ?MODULE, rpc_stop_hook_handler, [HookExtra, host_type()]).
 1257: 
 1258: rpc_start_hook_handler(TestCasePid, User, HostType) ->
 1259:     LUser = jid:nodeprep(User),
 1260:     Extra = #{luser => LUser, pid => TestCasePid},
 1261:     gen_hook:add_handler(unacknowledged_message, HostType,
 1262:                          fun ?MODULE:hook_handler_fn/3,
 1263:                          Extra, 50),
 1264:     Extra.
 1265: 
 1266: rpc_stop_hook_handler(HookExtra, HostType) ->
 1267:     gen_hook:delete_handler(unacknowledged_message, HostType,
 1268:                             fun ?MODULE:hook_handler_fn/3,
 1269:                             HookExtra, 50).
 1270: 
 1271: hook_handler_fn(Acc,
 1272:                 #{jid := Jid} = _Params,
 1273:                 #{luser := LUser, pid := TestCasePid} = _Extra) ->
 1274:     {U, _S, R} = jid:to_lower(Jid),
 1275:     case U of
 1276:         LUser ->
 1277:             Counter = mongoose_acc:get(sm_test, counter, 0, Acc),
 1278:             El = mongoose_acc:element(Acc),
 1279:             TestCasePid ! {sm_test, Counter, R, El},
 1280:             {ok, mongoose_acc:set_permanent(sm_test, counter, Counter + 1, Acc)};
 1281:         _ -> {ok, Acc}
 1282:     end.
 1283: 
 1284: wait_for_unacked_msg_hook(Counter, Res, Timeout) ->
 1285:     receive
 1286:         {sm_test, AccCounter, Resource, Stanza} = Msg ->
 1287:             ?assertEqual(Counter, AccCounter, Msg),
 1288:             ?assertEqual(Res, Resource, Msg),
 1289:             Stanza
 1290:     after Timeout ->
 1291:         timeout
 1292:     end.
 1293: 
 1294: start_filter_hook_listener(FilterText, Resource) ->
 1295:     rpc(mim(), ?MODULE, rpc_start_filter_hook_handler, [FilterText, Resource, host_type()]).
 1296: 
 1297: stop_filter_hook_listener(HookExtra) ->
 1298:     rpc(mim(), ?MODULE, rpc_stop_filter_hook_handler, [HookExtra, host_type()]).
 1299: 
 1300: rpc_start_filter_hook_handler(FilterText, User, HostType) ->
 1301:     LUser = jid:nodeprep(User),
 1302:     Extra = #{luser => LUser, filter_text => FilterText},
 1303:     gen_hook:add_handler(filter_unacknowledged_messages, HostType,
 1304:                          fun ?MODULE:filter_hook_handler_fn/3,
 1305:                          Extra, 50),
 1306:     Extra.
 1307: 
 1308: rpc_stop_filter_hook_handler(HookExtra, HostType) ->
 1309:     gen_hook:delete_handler(filter_unacknowledged_messages, HostType,
 1310:                             fun ?MODULE:filter_hook_handler_fn/3,
 1311:                             HookExtra, 50).
 1312: 
 1313: filter_hook_handler_fn(Buffer,
 1314:                        #{jid := Jid} = _Params,
 1315:                        #{luser := LUser, filter_text := FilterText} = _Extra) ->
 1316:     {U, _} = jid:to_lus(Jid),
 1317:     case U of
 1318:         LUser ->
 1319:             F = fun(Acc) -> filter_text(Acc, FilterText) end,
 1320:             NewBuffer = lists:filter(F, Buffer),
 1321:             {ok, NewBuffer};
 1322:         _ -> {ok, Buffer}
 1323:     end.
 1324: 
 1325: filter_text(Acc, FilterText) ->
 1326:     case mongoose_acc:stanza_name(Acc) of
 1327:         <<"message">> ->
 1328:             El = mongoose_acc:element(Acc),
 1329:             FilterText =/= exml_query:path(El, [{element, <<"body">>}, cdata]);
 1330:         _ ->
 1331:             true
 1332:     end.
 1333: 
 1334: is_chat(Content) ->
 1335:     fun(Stanza) -> escalus_pred:is_chat_message(Content, Stanza) end.
 1336: 
 1337: is_presence(Type) ->
 1338:     fun(Stanza) -> escalus_pred:is_presence_with_type(Type, Stanza) end.
 1339: 
 1340: three_texts() ->
 1341:     [<<"msg-1">>, <<"msg-2">>, <<"msg-3">>].
 1342: 
 1343: get_stanzas_filtered_by_mod_ping() ->
 1344:     History = rpc(mim(), meck, history, [mod_ping]),
 1345:     [Stanza ||
 1346:         {_Pid,
 1347:          {_Mod,
 1348:           filter_local_packet = _Func,
 1349:           [{_, _, _, Stanza} = _Acc, _Params, _Extra] = _Args
 1350:          },
 1351:          {stop, drop} = _Result
 1352:         } <- History
 1353:     ].
 1354: %%--------------------------------------------------------------------
 1355: %% IQ handler necessary for reproducing "replies_are_processed_by_resumed_session"
 1356: %%--------------------------------------------------------------------
 1357: 
 1358: regression_ns() ->
 1359:     <<"regression">>.
 1360: 
 1361: register_handler() ->
 1362:     HostType = host_type(),
 1363:     rpc(mim(), gen_iq_handler, add_iq_handler_for_domain,
 1364:         [HostType, regression_ns(), ejabberd_sm,
 1365:          fun ?MODULE:regression_handler/5, #{}, one_queue]).
 1366: 
 1367: unregister_handler() ->
 1368:     HostType = host_type(),
 1369:     rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain,
 1370:         [HostType, regression_ns(), ejabberd_sm]).
 1371: 
 1372: regression_handler(Acc, _From, _To, IQ, _Extra) ->
 1373:     %% A bit of a hack - will no longer work when the SID format changes
 1374:     {_, Pid} = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
 1375:     erlang:monitor(process, Pid),
 1376:     receive
 1377:         {'DOWN', _, _, _, _} ->
 1378:             ok
 1379:     after
 1380:         10000 ->
 1381:             error({c2s_not_stopped_after_timeout, Pid})
 1382:     end,
 1383:     %% We avoid another race condition - there is a short window where user session
 1384:     %% is not registered in ejabberd_sm: between old process termination and the moment
 1385:     %% when the new process stores new session in memory. It should be fixed separately.
 1386:     wait_for_session(mongoose_acc:get(c2s, origin_jid, undefined, Acc), 50, 100),
 1387:     {Acc, jlib:make_result_iq_reply(IQ)}.
 1388: 
 1389: wait_for_session(JID, Retries, SleepTime) ->
 1390:     case ejabberd_sm:get_session(JID) of
 1391:         offline ->
 1392:             timer:sleep(SleepTime),
 1393:             wait_for_session(JID, Retries - 1, SleepTime);
 1394:         _ ->
 1395:             ok
 1396:     end.
 1397: 
 1398: maybe_ack_initial_presence(Alice, ack) ->
 1399:     ack_initial_presence(Alice);
 1400: maybe_ack_initial_presence(_Alice, no_ack) ->
 1401:     ok.