1: -module(ejabberd_c2s_SUITE).
    2: -include_lib("eunit/include/eunit.hrl").
    3: -include("ejabberd_c2s.hrl").
    4: -include_lib("exml/include/exml_stream.hrl").
    5: -compile([export_all, nowarn_export_all]).
    6: 
    7: -define(_eq(E, I), ?_assertEqual(E, I)).
    8: -define(eq(E, I), ?assertEqual(E, I)).
    9: -define(am(E, I), ?assertMatch(E, I)).
   10: -define(ne(E, I), ?assert(E =/= I)).
   11: 
   12: 
   13: all() -> [
   14:           c2s_start_stop_test,
   15:           stream_error_when_invalid_domain,
   16:           session_established,
   17:           send_error_when_waiting,
   18:           c2s_is_killed_when_too_many_messages_in_the_queue
   19:          ].
   20: 
   21: init_per_suite(C) ->
   22:     {ok, _} = application:ensure_all_started(jid),
   23:     application:start(x),
   24:     C.
   25: 
   26: end_per_suite(C) ->
   27:     C.
   28: 
   29: init_per_testcase(_TC, C) ->
   30:     ejabberd_c2s_SUITE_mocks:setup(),
   31:     C.
   32: 
   33: end_per_testcase(_TC, C) ->
   34:     ejabberd_c2s_SUITE_mocks:teardown(),
   35:     C.
   36: 
   37: c2s_start_stop_test(_) ->
   38:     {ok, C2SPid} = given_c2s_started(),
   39: 
   40:     when_c2s_is_stopped(C2SPid),
   41: 
   42:     %% then
   43:     ?eq(false, erlang:is_process_alive(C2SPid)).
   44: 
   45: 
   46: stream_error_when_invalid_domain(_) ->
   47:     ok = meck:new(mongoose_hooks, [passthrough]),
   48:     {ok, C2SPid} = given_c2s_started(),
   49: 
   50:     C2Sactions = when_stream_is_opened(C2SPid, stream_header(<<"badhost">>)),
   51:     [StreamStart, StreamError, StreamEnd, CloseSocket] = C2Sactions,
   52:     History = meck:history(mongoose_hooks),
   53:     HookRes = [ok || {_, {mongoose_hooks, xmpp_send_element, [undefined, #{host_type := undefined} | _]}, _} <- History],
   54:     ?am([], HookRes),
   55:     ?am({send, [_P,
   56:          <<"<?xml version='1.0'?>",
   57:              "<stream:stream xmlns='jabber:client' ",
   58:              "xmlns:stream='http://etherx.jabber.org/streams' id='57' ",
   59:              "from='localhost' xml:lang='en'>">>
   60:          ]},
   61:         StreamStart),
   62:     ?am({send, [_P,
   63:                 <<"<stream:error>",
   64:                 "<host-unknown xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>",
   65:                 "</stream:error>">>]}, StreamError),
   66:     ?am({send, [_P, <<"</stream:stream>">>]}, StreamEnd),
   67:     ?am({close, [_P]}, CloseSocket),
   68:     ok.
   69: 
   70: session_established(_) ->
   71:     {ok, C2SPid} = given_c2s_started(),
   72:     change_state_to(session_established, C2SPid),
   73:     ?eq(session_established, getstate(C2SPid)),
   74:     Last = last_stanza(),
   75:     ?eq(final_iq_response(), Last).
   76: 
   77: send_error_when_waiting(_) ->
   78:     % this is a regression test for #1252 - when c2s is in state
   79:     % wait_for_session_or_sm and it fails to send a message
   80:     % it should be handled properly
   81:     {ok, C2SPid} = given_c2s_started(),
   82:     change_state_to(wait_for_session_or_sm, C2SPid),
   83:     % here we break things to check how c2s will handle error while sending
   84:     % message in this state
   85:     meck:expect(ejabberd_socket, send, fun(_, _El) -> error_error_error  end),
   86:     sync_c2s(C2SPid),
   87:     p1_fsm:send_event(C2SPid, {xmlstreamelement, setsession_stanza()}),
   88:     sync_c2s(C2SPid),
   89:     [Close, StreamEnd, StreamError | _] = lists:reverse(received_stanzas()),
   90:     ?eq(stream_error_response(),
   91:         StreamError),
   92:     ?eq(<<"</stream:stream>">>, StreamEnd),
   93:     ?eq(close, Close),
   94:     ok.
   95: 
   96: c2s_is_killed_when_too_many_messages_in_the_queue(_) ->
   97:     meck:new(ejabberd_c2s, [passthrough]),
   98:     MaxQueueSize = 50,
   99:     Self = self(),
  100:     %% We simulate a long running event during which
  101:     %% there will be many messages put into C2S process message queue
  102:     meck:expect(ejabberd_c2s, handle_event,
  103:                 fun(go_to_sleep, StateName, ProcState) ->
  104:                         Self ! c2s_going_to_sleep,
  105:                         ct:pal("going to sleep"),
  106:                         receive continue -> ok end,
  107:                         {next_state, StateName, ProcState};
  108:                    (Event, StateName, ProcState) ->
  109:                         meck:passthrough([Event, StateName, ProcState])
  110:                 end),
  111:     {ok, C2SPid} = given_c2s_started([{max_fsm_queue, MaxQueueSize}]),
  112: 
  113:     %% We want to monitor the c2s process and not being linked to it
  114:     Ref = erlang:monitor(process, C2SPid),
  115:     erlang:unlink(C2SPid),
  116: 
  117:     p1_fsm_old:send_all_state_event(C2SPid, go_to_sleep),
  118: 
  119:     receive c2s_going_to_sleep -> ok end,
  120: 
  121:     meck:unload(ejabberd_c2s),
  122: 
  123:     %% We put MaxQueueSize + 1 messages to C2S Process message queue
  124:     %% while it is asleep
  125:     %% The process will be killed when it wakes up and tries to process
  126:     %% next message
  127: 
  128:     [p1_fsm_old:send_all_state_event(C2SPid, {event, I}) ||
  129:      I <- lists:seq(1, MaxQueueSize + 1)],
  130:     C2SPid ! continue,
  131: 
  132:     receive
  133:         {'DOWN', Ref, process, C2SPid, {process_limit,{max_queue, AllMessages}}} ->
  134:             ct:pal("C2S dead due to message_queue_length=~p, max allowed was ~p",
  135:                    [AllMessages, MaxQueueSize]);
  136:         Msg ->
  137:             ct:fail("Other msg: ~p", [Msg])
  138:     after timer:seconds(5) ->
  139:               {message_queue_len, N} = process_info(C2SPid, message_queue_len),
  140:               ct:fail("timeout waiting c2s exit, with message_queue_len = ~p", [N])
  141:     end,
  142:     ok.
  143: 
  144: last_stanza() ->
  145:     [H|_] = lists:reverse(received_stanzas()),
  146:     H.
  147: 
  148: received_stanzas() ->
  149:     Calls = lists:filtermap(filter_calls(ejabberd_socket, [send, close]),
  150:                             meck:history(ejabberd_socket)),
  151: %%    ct:pal("Calls: ~p", [Calls]),
  152:     lists:map(fun extract_stanza/1, Calls).
  153: 
  154: extract_stanza({_, [_, S]}) -> S;
  155: extract_stanza({close, _}) -> close.
  156: 
  157: change_state_to(Target, C2SPid) ->
  158:     Curr = getstate(C2SPid),
  159:     change_state_to(Curr, Target, C2SPid).
  160: 
  161: change_state_to(T, T, _) ->
  162:     ok;
  163: change_state_to(wait_for_stream, T, C2SPid) ->
  164:     p1_fsm:send_event(C2SPid, stream_header(<<"localhost">>)),
  165:     change_state_to(getstate(C2SPid), T, C2SPid);
  166: change_state_to(wait_for_feature_before_auth, T, C2SPid) ->
  167:     p1_fsm:send_event(C2SPid, {xmlstreamelement, auth_stanza()}),
  168:     change_state_to(getstate(C2SPid), T, C2SPid);
  169: change_state_to(wait_for_feature_after_auth, T, C2SPid) ->
  170:     p1_fsm:send_event(C2SPid, {xmlstreamelement, bind_stanza()}),
  171:     change_state_to(getstate(C2SPid), T, C2SPid);
  172: change_state_to(wait_for_session_or_sm, T, C2SPid) ->
  173:     p1_fsm:send_event(C2SPid, {xmlstreamelement, setsession_stanza()}),
  174:     change_state_to(getstate(C2SPid), T, C2SPid);
  175: change_state_to(_, _, _) ->
  176:     error.
  177: 
  178: getstate(C2SPid) ->
  179:     State = sync_c2s(C2SPid),
  180:     [_, StateName | _] = State,
  181:     StateName.
  182: 
  183: when_stream_is_opened(C2SPid, Stanza) ->
  184:     p1_fsm:send_event(C2SPid, Stanza),
  185:     sync_c2s(C2SPid),
  186:     lists:filtermap(filter_calls(ejabberd_socket, [send, close]),
  187:                        meck:history(ejabberd_socket)).
  188: 
  189: filter_calls(_ExpectedMod, Funs) ->
  190:     fun({_Pid, MFA, _Return}) ->
  191:             maybe_extract_function_with_args(MFA, Funs)
  192:     end.
  193: 
  194: maybe_extract_function_with_args({_Mod, Fun, Args}, List) ->
  195:     case lists:member(Fun, List) of
  196:         true -> {true, {Fun, Args}};
  197:         _ -> false
  198:     end.
  199: 
  200: sync_c2s(C2SPid) -> catch sys:get_state(C2SPid).
  201: 
  202: stream_valid_header_response() ->
  203:      R = "<?xml version='1.0'?>"
  204:          "<stream:stream xmlns='jabber:client' "
  205:          "xmlns:stream='http://etherx.jabber.org/streams' id='4436' "
  206:          "from='localhost' xml:lang='en'>",
  207:     list_to_binary(R).
  208: 
  209: stream_header(Domain) ->
  210:     #xmlstreamstart{name = <<"stream:stream">>,
  211:                     attrs = [{<<"to">>, Domain},
  212:                              {<<"xml:lang">>, <<"en">>},
  213:                              {<<"version">>, <<"1.0">>},
  214:                              {<<"xmlns">>, <<"jabber:client">>},
  215:                              {<<"xmlns:stream">>,
  216:                               <<"http://etherx.jabber.org/streams">>}]}.
  217: 
  218: auth_stanza() ->
  219:     {xmlel, <<"auth">>,
  220:         [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-sasl">>},
  221:          {<<"mechanism">>, <<"PLAIN">>}],
  222:         [{xmlcdata, <<"AGFsaWNFOTkuODk3NzMzAG1hdHlncnlzYQ==">>}]}.
  223: 
  224: bind_stanza() ->
  225:     {xmlel, <<"iq">>,
  226:             [{<<"type">>, <<"set">>}, {<<"id">>, <<"4436">>}],
  227:             [{xmlel, <<"bind">>,
  228:                     [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-bind">>}],
  229:                     [{xmlel, <<"resource">>, [], [{xmlcdata, <<"res1">>}]}]}]}.
  230: 
  231: setsession_stanza() ->
  232:     {xmlel, <<"iq">>,
  233:         [{<<"type">>, <<"set">>}, {<<"id">>, <<"4436">>}],
  234:         [{xmlel, <<"session">>, [{<<"xmlns">>,
  235:                                   <<"urn:ietf:params:xml:ns:xmpp-session">>}],
  236:             []}]}.
  237: 
  238: given_c2s_started() ->
  239:     given_c2s_started([]).
  240: 
  241: given_c2s_started(Opts) ->
  242:     ejabberd_c2s:start_link({ejabberd_socket, self()},
  243:                             Opts ++ c2s_default_opts()).
  244: 
  245: when_c2s_is_stopped(Pid) ->
  246:     stop_c2s(Pid),
  247:     sync_c2s(Pid).
  248: 
  249: c2s_default_opts() ->
  250:     [{access, c2s},
  251:      {shaper, c2s_shaper},
  252:      {max_stanza_size, 65536}].
  253: 
  254: stop_c2s(C2SPid) when is_pid(C2SPid) ->
  255:     _R = ejabberd_c2s:stop(C2SPid).
  256: 
  257: jid(Str) ->
  258:     jid:from_binary(Str).
  259: 
  260: final_iq_response() ->
  261:     <<"<iq type='result' id='4436'>"
  262:           "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>"
  263:       "</iq>">>.
  264: 
  265: stream_error_response() ->
  266:     <<"<stream:error>",
  267:       "<internal-server-error xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>",
  268:       "</stream:error>">>.
  269: