1: -module(ejabberd_c2s_SUITE). 2: -include_lib("eunit/include/eunit.hrl"). 3: -include("ejabberd_c2s.hrl"). 4: -include_lib("exml/include/exml_stream.hrl"). 5: -compile([export_all, nowarn_export_all]). 6: 7: -define(_eq(E, I), ?_assertEqual(E, I)). 8: -define(eq(E, I), ?assertEqual(E, I)). 9: -define(am(E, I), ?assertMatch(E, I)). 10: -define(ne(E, I), ?assert(E =/= I)). 11: 12: 13: all() -> [ 14: c2s_start_stop_test, 15: stream_error_when_invalid_domain, 16: session_established, 17: send_error_when_waiting, 18: c2s_is_killed_when_too_many_messages_in_the_queue 19: ]. 20: 21: init_per_suite(C) -> 22: {ok, _} = application:ensure_all_started(jid), 23: application:start(x), 24: C. 25: 26: end_per_suite(C) -> 27: C. 28: 29: init_per_testcase(_TC, C) -> 30: ejabberd_c2s_SUITE_mocks:setup(), 31: C. 32: 33: end_per_testcase(_TC, C) -> 34: ejabberd_c2s_SUITE_mocks:teardown(), 35: C. 36: 37: c2s_start_stop_test(_) -> 38: {ok, C2SPid} = given_c2s_started(), 39: 40: when_c2s_is_stopped(C2SPid), 41: 42: %% then 43: ?eq(false, erlang:is_process_alive(C2SPid)). 44: 45: 46: stream_error_when_invalid_domain(_) -> 47: ok = meck:new(mongoose_hooks, [passthrough]), 48: {ok, C2SPid} = given_c2s_started(), 49: 50: C2Sactions = when_stream_is_opened(C2SPid, stream_header(<<"badhost">>)), 51: [StreamStart, StreamError, StreamEnd, CloseSocket] = C2Sactions, 52: History = meck:history(mongoose_hooks), 53: HookRes = [ok || {_, {mongoose_hooks, xmpp_send_element, [undefined, #{host_type := undefined} | _]}, _} <- History], 54: ?am([], HookRes), 55: ?am({send, [_P, 56: <<"<?xml version='1.0'?>", 57: "<stream:stream xmlns='jabber:client' ", 58: "xmlns:stream='http://etherx.jabber.org/streams' id='57' ", 59: "from='localhost' xml:lang='en'>">> 60: ]}, 61: StreamStart), 62: ?am({send, [_P, 63: <<"<stream:error>", 64: "<host-unknown xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>", 65: "</stream:error>">>]}, StreamError), 66: ?am({send, [_P, <<"</stream:stream>">>]}, StreamEnd), 67: ?am({close, [_P]}, CloseSocket), 68: ok. 69: 70: session_established(_) -> 71: {ok, C2SPid} = given_c2s_started(), 72: change_state_to(session_established, C2SPid), 73: ?eq(session_established, getstate(C2SPid)), 74: Last = last_stanza(), 75: ?eq(final_iq_response(), Last). 76: 77: send_error_when_waiting(_) -> 78: % this is a regression test for #1252 - when c2s is in state 79: % wait_for_session_or_sm and it fails to send a message 80: % it should be handled properly 81: {ok, C2SPid} = given_c2s_started(), 82: change_state_to(wait_for_session_or_sm, C2SPid), 83: % here we break things to check how c2s will handle error while sending 84: % message in this state 85: meck:expect(ejabberd_socket, send, fun(_, _El) -> error_error_error end), 86: sync_c2s(C2SPid), 87: p1_fsm:send_event(C2SPid, {xmlstreamelement, setsession_stanza()}), 88: sync_c2s(C2SPid), 89: [Close, StreamEnd, StreamError | _] = lists:reverse(received_stanzas()), 90: ?eq(stream_error_response(), 91: StreamError), 92: ?eq(<<"</stream:stream>">>, StreamEnd), 93: ?eq(close, Close), 94: ok. 95: 96: c2s_is_killed_when_too_many_messages_in_the_queue(_) -> 97: meck:new(ejabberd_c2s, [passthrough]), 98: MaxQueueSize = 50, 99: Self = self(), 100: %% We simulate a long running event during which 101: %% there will be many messages put into C2S process message queue 102: meck:expect(ejabberd_c2s, handle_event, 103: fun(go_to_sleep, StateName, ProcState) -> 104: Self ! c2s_going_to_sleep, 105: ct:pal("going to sleep"), 106: receive continue -> ok end, 107: {next_state, StateName, ProcState}; 108: (Event, StateName, ProcState) -> 109: meck:passthrough([Event, StateName, ProcState]) 110: end), 111: {ok, C2SPid} = given_c2s_started([{max_fsm_queue, MaxQueueSize}]), 112: 113: %% We want to monitor the c2s process and not being linked to it 114: Ref = erlang:monitor(process, C2SPid), 115: erlang:unlink(C2SPid), 116: 117: p1_fsm_old:send_all_state_event(C2SPid, go_to_sleep), 118: 119: receive c2s_going_to_sleep -> ok end, 120: 121: meck:unload(ejabberd_c2s), 122: 123: %% We put MaxQueueSize + 1 messages to C2S Process message queue 124: %% while it is asleep 125: %% The process will be killed when it wakes up and tries to process 126: %% next message 127: 128: [p1_fsm_old:send_all_state_event(C2SPid, {event, I}) || 129: I <- lists:seq(1, MaxQueueSize + 1)], 130: C2SPid ! continue, 131: 132: receive 133: {'DOWN', Ref, process, C2SPid, {process_limit,{max_queue, AllMessages}}} -> 134: ct:pal("C2S dead due to message_queue_length=~p, max allowed was ~p", 135: [AllMessages, MaxQueueSize]); 136: Msg -> 137: ct:fail("Other msg: ~p", [Msg]) 138: after timer:seconds(5) -> 139: {message_queue_len, N} = process_info(C2SPid, message_queue_len), 140: ct:fail("timeout waiting c2s exit, with message_queue_len = ~p", [N]) 141: end, 142: ok. 143: 144: last_stanza() -> 145: [H|_] = lists:reverse(received_stanzas()), 146: H. 147: 148: received_stanzas() -> 149: Calls = lists:filtermap(filter_calls(ejabberd_socket, [send, close]), 150: meck:history(ejabberd_socket)), 151: %% ct:pal("Calls: ~p", [Calls]), 152: lists:map(fun extract_stanza/1, Calls). 153: 154: extract_stanza({_, [_, S]}) -> S; 155: extract_stanza({close, _}) -> close. 156: 157: change_state_to(Target, C2SPid) -> 158: Curr = getstate(C2SPid), 159: change_state_to(Curr, Target, C2SPid). 160: 161: change_state_to(T, T, _) -> 162: ok; 163: change_state_to(wait_for_stream, T, C2SPid) -> 164: p1_fsm:send_event(C2SPid, stream_header(<<"localhost">>)), 165: change_state_to(getstate(C2SPid), T, C2SPid); 166: change_state_to(wait_for_feature_before_auth, T, C2SPid) -> 167: p1_fsm:send_event(C2SPid, {xmlstreamelement, auth_stanza()}), 168: change_state_to(getstate(C2SPid), T, C2SPid); 169: change_state_to(wait_for_feature_after_auth, T, C2SPid) -> 170: p1_fsm:send_event(C2SPid, {xmlstreamelement, bind_stanza()}), 171: change_state_to(getstate(C2SPid), T, C2SPid); 172: change_state_to(wait_for_session_or_sm, T, C2SPid) -> 173: p1_fsm:send_event(C2SPid, {xmlstreamelement, setsession_stanza()}), 174: change_state_to(getstate(C2SPid), T, C2SPid); 175: change_state_to(_, _, _) -> 176: error. 177: 178: getstate(C2SPid) -> 179: State = sync_c2s(C2SPid), 180: [_, StateName | _] = State, 181: StateName. 182: 183: when_stream_is_opened(C2SPid, Stanza) -> 184: p1_fsm:send_event(C2SPid, Stanza), 185: sync_c2s(C2SPid), 186: lists:filtermap(filter_calls(ejabberd_socket, [send, close]), 187: meck:history(ejabberd_socket)). 188: 189: filter_calls(_ExpectedMod, Funs) -> 190: fun({_Pid, MFA, _Return}) -> 191: maybe_extract_function_with_args(MFA, Funs) 192: end. 193: 194: maybe_extract_function_with_args({_Mod, Fun, Args}, List) -> 195: case lists:member(Fun, List) of 196: true -> {true, {Fun, Args}}; 197: _ -> false 198: end. 199: 200: sync_c2s(C2SPid) -> catch sys:get_state(C2SPid). 201: 202: stream_valid_header_response() -> 203: R = "<?xml version='1.0'?>" 204: "<stream:stream xmlns='jabber:client' " 205: "xmlns:stream='http://etherx.jabber.org/streams' id='4436' " 206: "from='localhost' xml:lang='en'>", 207: list_to_binary(R). 208: 209: stream_header(Domain) -> 210: #xmlstreamstart{name = <<"stream:stream">>, 211: attrs = [{<<"to">>, Domain}, 212: {<<"xml:lang">>, <<"en">>}, 213: {<<"version">>, <<"1.0">>}, 214: {<<"xmlns">>, <<"jabber:client">>}, 215: {<<"xmlns:stream">>, 216: <<"http://etherx.jabber.org/streams">>}]}. 217: 218: auth_stanza() -> 219: {xmlel, <<"auth">>, 220: [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-sasl">>}, 221: {<<"mechanism">>, <<"PLAIN">>}], 222: [{xmlcdata, <<"AGFsaWNFOTkuODk3NzMzAG1hdHlncnlzYQ==">>}]}. 223: 224: bind_stanza() -> 225: {xmlel, <<"iq">>, 226: [{<<"type">>, <<"set">>}, {<<"id">>, <<"4436">>}], 227: [{xmlel, <<"bind">>, 228: [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-bind">>}], 229: [{xmlel, <<"resource">>, [], [{xmlcdata, <<"res1">>}]}]}]}. 230: 231: setsession_stanza() -> 232: {xmlel, <<"iq">>, 233: [{<<"type">>, <<"set">>}, {<<"id">>, <<"4436">>}], 234: [{xmlel, <<"session">>, [{<<"xmlns">>, 235: <<"urn:ietf:params:xml:ns:xmpp-session">>}], 236: []}]}. 237: 238: given_c2s_started() -> 239: given_c2s_started([]). 240: 241: given_c2s_started(Opts) -> 242: ejabberd_c2s:start_link({ejabberd_socket, self()}, 243: Opts ++ c2s_default_opts()). 244: 245: when_c2s_is_stopped(Pid) -> 246: stop_c2s(Pid), 247: sync_c2s(Pid). 248: 249: c2s_default_opts() -> 250: [{access, c2s}, 251: {shaper, c2s_shaper}, 252: {max_stanza_size, 65536}]. 253: 254: stop_c2s(C2SPid) when is_pid(C2SPid) -> 255: _R = ejabberd_c2s:stop(C2SPid). 256: 257: jid(Str) -> 258: jid:from_binary(Str). 259: 260: final_iq_response() -> 261: <<"<iq type='result' id='4436'>" 262: "<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>" 263: "</iq>">>. 264: 265: stream_error_response() -> 266: <<"<stream:error>", 267: "<internal-server-error xmlns='urn:ietf:params:xml:ns:xmpp-streams'/>", 268: "</stream:error>">>. 269: