1: %%============================================================================== 2: %% Copyright 2015 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: -module(race_conditions_SUITE). 17: -compile([export_all, nowarn_export_all]). 18: 19: -export([handle_delayiq_iq/5]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("exml/include/exml.hrl"). 23: -include_lib("common_test/include/ct.hrl"). 24: 25: -include("mam_helper.hrl"). %% mam? we need assert_equal_extra 26: 27: -import(distributed_helper, [mim/0, rpc/4]). 28: 29: -import(domain_helper, [host_type/0]). 30: 31: %%-------------------------------------------------------------------- 32: %% Suite configuration 33: %%-------------------------------------------------------------------- 34: all() -> 35: [{group, main_group}]. 36: 37: groups() -> 38: G = [{main_group, [], main_group_tests()}], 39: ct_helper:repeat_all_until_all_ok(G). 40: 41: main_group_tests() -> 42: [confirm_c2s_queue_is_flushed_works, 43: delayiq_handler_works, 44: ignore_iq_result_from_old_session]. 45: 46: suite() -> 47: escalus:suite(). 48: 49: init_per_suite(Config) -> 50: escalus:init_per_suite(Config). 51: 52: end_per_suite(Config) -> 53: escalus_fresh:clean(), 54: escalus:end_per_suite(Config). 55: 56: init_per_group(_, Config) -> 57: mongoose_helper:inject_module(?MODULE), 58: start_delayiq_handler(), 59: Config. 60: 61: end_per_group(_GroupName, Config) -> 62: stop_delayiq_handler(), 63: Config. 64: 65: init_per_testcase(CaseName, Config) -> 66: escalus:init_per_testcase(CaseName, Config). 67: 68: end_per_testcase(CaseName, Config) -> 69: escalus:end_per_testcase(CaseName, Config). 70: 71: %%-------------------------------------------------------------------- 72: %% Test cases 73: %%-------------------------------------------------------------------- 74: 75: %% Check, that confirm_c2s_queue_is_flushed works for any regular connection. 76: %% This function is used in more complex tests. 77: confirm_c2s_queue_is_flushed_works(Config) -> 78: escalus:fresh_story_with_config(Config, [{alice, 1}], 79: fun(_FreshConfig, Alice) -> 80: %% Should not crash 81: confirm_c2s_queue_is_flushed(Alice) 82: end). 83: 84: %% Checks, that result IQ is delivered, when delayiq processing is resumed. 85: delayiq_handler_works(Config) -> 86: escalus:fresh_story_with_config(Config, [{alice, 1}], 87: fun(_FreshConfig, Alice) -> 88: RequestIQ = send_delayiq(Alice), 89: {ok, DelayIqConfirmation} = receive_delayiq_confirmation(), 90: %% HAVING waiting iq handler 91: resume_delayiq(DelayIqConfirmation), 92: %% HAVING result IQ stanza put into ejabberd_c2s process 93: %% (if there is any) 94: wait_for_delayiq_handler_to_finish(DelayIqConfirmation), 95: receive_delayiq(Alice, RequestIQ), 96: confirm_c2s_queue_is_flushed(Alice) 97: end). 98: 99: %% We reconnect Alice and check that new connection does not receive 100: %% IQ result from IQ, sent using the old connection. 101: %% 102: %% If old and new resources are different, that the stanza would not be routed 103: %% to the new connection in any case (with or without 104: %% check_incoming_accum_for_conflicts check in ejabberd_c2s). 105: ignore_iq_result_from_old_session(Config) -> 106: escalus:fresh_story_with_config(Config, [{alice, 1}], 107: fun(FreshConfig, Alice) -> 108: send_delayiq(Alice), 109: {ok, DelayIqConfirmation} = receive_delayiq_confirmation(), 110: %% HAVING waiting iq handler 111: 112: %% Stop the session. 113: %% And wait for the session process to be dead 114: %% and deregistered. 115: mongoose_helper:logout_user(Config, Alice), 116: 117: %% Use the same resource for both old and new connection. 118: Resource = escalus_client:resource(Alice), 119: Alice2 = login_send_and_receive_presence(FreshConfig, alice, Resource), 120: resume_delayiq(DelayIqConfirmation), 121: %% HAVING result IQ stanza put into ejabberd_c2s process 122: %% (if there is any) 123: %% We expect, that Alice2 c2s process receives the route message, 124: %% but would ignore it. 125: wait_for_delayiq_handler_to_finish(DelayIqConfirmation), 126: %% RESULT missing delayiq 127: confirm_c2s_queue_is_flushed(Alice2) 128: end). 129: 130: %%-------------------------------------------------------------------- 131: %% Helpers 132: %%-------------------------------------------------------------------- 133: 134: start_delayiq_handler() -> 135: NS = delayiq_ns(), 136: HostType = host_type(), 137: %% Register our module as an iq handler. 138: %% It's important to use IQDisc=parallel, because 139: %% this particular IQ should be executed in a separate process. 140: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 141: [HostType, NS, ejabberd_sm, 142: fun ?MODULE:handle_delayiq_iq/5, #{}, parallel]). 143: 144: stop_delayiq_handler() -> 145: NS = delayiq_ns(), 146: HostType = host_type(), 147: %% Register our module as an iq handler. 148: %% It's important to use IQDisc=parallel, because 149: %% this particular IQ should be executed in a separate process. 150: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 151: [HostType, NS, ejabberd_sm]). 152: 153: %% Brand new IQ namespace. 154: %% Send IQ get with this namespace, and MongooseIM would send you 155: %% IQ result, until you ask for it, using an RPC. 156: delayiq_ns() -> 157: <<"urn:escalus:delayiq">>. 158: 159: %% Make an XMPP stanza, that contains information 160: %% needed in handle_delayiq_iq/4 callback 161: delayiq_iq() -> 162: BinCallerPid = encode_pid(self()), 163: Payload = #xmlel{name = <<"data">>, 164: attrs = [{<<"caller_pid">>, BinCallerPid}]}, 165: escalus_stanza:iq_get(delayiq_ns(), [Payload]). 166: 167: %% This function is executed by MongooseIM 168: handle_delayiq_iq(Acc, _From, _To, IQ, _Extra) -> 169: SubEl = mongoose_iq:iq_to_sub_el(IQ), 170: BinCallerPid = exml_query:path(SubEl, [{element, <<"data">>}, 171: {attr, <<"caller_pid">>}]), 172: CallerPid = decode_pid(BinCallerPid), 173: CallerPid ! {handle_delayiq_iq_confirmation, self()}, 174: receive 175: {handle_delayiq_iq_resume, CallerPid} -> 176: ok 177: after timer:seconds(60) -> 178: %% provide just enough information for debugging 179: erlang:error(#{reason => handle_delayiq_timeout, %% it's unexpected 180: caller_pid => CallerPid, 181: mongoose_pid => self(), 182: messages => erlang:process_info(self(), messages)}) 183: end, 184: ResultIQ = mongoose_iq:empty_result_iq(IQ), 185: {Acc, ResultIQ}. 186: 187: %% Ensure, that MongooseIM started to process our IQ 188: receive_delayiq_confirmation() -> 189: receive 190: {handle_delayiq_iq_confirmation, MongoosePid} -> 191: {ok, MongoosePid} 192: after timer:seconds(5) -> 193: {error, #{reason => receive_delayiq_confirmation_timeout, 194: caller_pid => self(), 195: messages => erlang:process_info(self(), messages)}} 196: end. 197: 198: %% Ask MongooseIM to return result IQ 199: resume_delayiq(MongoosePid) -> 200: MongoosePid ! {handle_delayiq_iq_resume, self()}. 201: 202: %% Send XMPP request stanza. 203: send_delayiq(Client) -> 204: RequestIQ = delayiq_iq(), 205: escalus:send(Client, RequestIQ), 206: RequestIQ. 207: 208: %% Receive XMPP result stanza. 209: receive_delayiq(Client, RequestIQ) -> 210: ResultIQ = escalus_client:wait_for_stanza(Client), 211: escalus:assert(is_iq_result, [RequestIQ], ResultIQ), 212: ResultIQ. 213: 214: wait_for_delayiq_handler_to_finish(MongoosePid) -> 215: MonRef = erlang:monitor(process, MongoosePid), 216: receive 217: {'DOWN', MonRef, process, _, _} -> 218: ok 219: after timer:seconds(5) -> 220: error(#{reason => wait_for_delayiq_handler_to_finish_timeout, 221: mongoose_pid => MongoosePid, 222: caller_pid => self()}) 223: end. 224: 225: %% The simplest way to confirm, that c2s queue is processed is to 226: %% send and receive a stanza from the process. 227: confirm_c2s_queue_is_flushed(Client) -> 228: BadNS = <<"urn:escalus:this_iq_would_fail_but_its_expected">>, 229: %% We can just send an unknown IQ and wait for error. 230: %% Another way is to send a message, but we need two clients for this. 231: RequestIQ = escalus_stanza:iq_get(BadNS, []), 232: escalus:send(Client, RequestIQ), 233: %% We will have a timeout here, if something goes wrong. 234: ResultIQ = escalus_client:wait_for_stanza(Client), 235: %% Expected result 236: escalus:assert(is_iq_error, [], ResultIQ), 237: %% Just check that it's our IQ 238: QueryId = exml_query:attr(RequestIQ, <<"id">>), 239: ResultId = exml_query:attr(ResultIQ, <<"id">>), 240: ?assert_equal_extra(QueryId, ResultId, 241: #{request_iq => RequestIQ, result_iq => ResultIQ}). 242: 243: login_send_presence(Config, User, Resource) -> 244: {ok, Client} = escalus_client:start(Config, User, Resource), 245: escalus:send(Client, escalus_stanza:presence(<<"available">>)), 246: Client. 247: 248: login_send_and_receive_presence(Config, User, Resource) -> 249: Client = login_send_presence(Config, User, Resource), 250: P = escalus_client:wait_for_stanza(Client), 251: escalus:assert(is_presence, P), 252: Client. 253: 254: %% pid_to_list encoding would not work over erlang distribution. 255: encode_pid(Pid) -> 256: base64:encode(erlang:term_to_binary(Pid)). 257: 258: decode_pid(Pid) -> 259: erlang:binary_to_term(base64:decode(Pid)).