1: %%============================================================================== 2: %% Copyright 2015 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: -module(race_conditions_SUITE). 17: -compile([export_all, nowarn_export_all]). 18: 19: -export([handle_delayiq_iq/5]). 20: 21: -include_lib("escalus/include/escalus.hrl"). 22: -include_lib("exml/include/exml.hrl"). 23: -include_lib("common_test/include/ct.hrl"). 24: 25: -include("mam_helper.hrl"). %% mam? we need assert_equal_extra 26: 27: -import(distributed_helper, [mim/0, rpc/4]). 28: 29: -import(domain_helper, [host_type/0]). 30: 31: %%-------------------------------------------------------------------- 32: %% Suite configuration 33: %%-------------------------------------------------------------------- 34: all() -> 35: [{group, main_group}]. 36: 37: groups() -> 38: G = [{main_group, [], main_group_tests()}], 39: ct_helper:repeat_all_until_all_ok(G). 40: 41: main_group_tests() -> 42: [confirm_c2s_queue_is_flushed_works, 43: delayiq_handler_works, 44: ignore_iq_result_from_old_session]. 45: 46: suite() -> 47: escalus:suite(). 48: 49: init_per_suite(Config) -> 50: escalus:init_per_suite(Config). 51: 52: end_per_suite(Config) -> 53: escalus_fresh:clean(), 54: escalus:end_per_suite(Config). 55: 56: init_per_group(_, Config) -> 57: mongoose_helper:inject_module(?MODULE), 58: start_delayiq_handler(), 59: Config. 60: 61: end_per_group(_GroupName, Config) -> 62: stop_delayiq_handler(), 63: Config. 64: 65: init_per_testcase(CaseName, Config) -> 66: escalus:init_per_testcase(CaseName, Config). 67: 68: end_per_testcase(CaseName, Config) -> 69: escalus:end_per_testcase(CaseName, Config). 70: 71: %%-------------------------------------------------------------------- 72: %% Test cases 73: %%-------------------------------------------------------------------- 74: 75: %% Check, that confirm_c2s_queue_is_flushed works for any regular connection. 76: %% This function is used in more complex tests. 77: confirm_c2s_queue_is_flushed_works(Config) -> 78: escalus:fresh_story_with_config(Config, [{alice, 1}], 79: fun(_FreshConfig, Alice) -> 80: %% Should not crash 81: confirm_c2s_queue_is_flushed(Alice) 82: end). 83: 84: %% Checks, that result IQ is delivered, when delayiq processing is resumed. 85: delayiq_handler_works(Config) -> 86: escalus:fresh_story_with_config(Config, [{alice, 1}], 87: fun(_FreshConfig, Alice) -> 88: RequestIQ = send_delayiq(Alice), 89: {ok, DelayIqConfirmation} = receive_delayiq_confirmation(), 90: %% HAVING waiting iq handler 91: resume_delayiq(DelayIqConfirmation), 92: %% HAVING result IQ stanza put into mongoose_c2s process 93: %% (if there is any) 94: wait_for_delayiq_handler_to_finish(DelayIqConfirmation), 95: receive_delayiq(Alice, RequestIQ), 96: confirm_c2s_queue_is_flushed(Alice) 97: end). 98: 99: %% We reconnect Alice and check that new connection does not receive 100: %% IQ result from IQ, sent using the old connection. 101: %% 102: %% If old and new resources are different, that the stanza would not be routed 103: %% to the new connection in any case 104: ignore_iq_result_from_old_session(Config) -> 105: escalus:fresh_story_with_config(Config, [{alice, 1}], 106: fun(FreshConfig, Alice) -> 107: send_delayiq(Alice), 108: {ok, DelayIqConfirmation} = receive_delayiq_confirmation(), 109: %% HAVING waiting iq handler 110: 111: %% Stop the session. 112: %% And wait for the session process to be dead 113: %% and deregistered. 114: mongoose_helper:logout_user(Config, Alice), 115: 116: %% Use the same resource for both old and new connection. 117: Resource = escalus_client:resource(Alice), 118: Alice2 = login_send_and_receive_presence(FreshConfig, alice, Resource), 119: resume_delayiq(DelayIqConfirmation), 120: %% HAVING result IQ stanza put into mongoose_c2s process 121: %% (if there is any) 122: %% We expect, that Alice2 c2s process receives the route message, 123: %% but would ignore it. 124: wait_for_delayiq_handler_to_finish(DelayIqConfirmation), 125: %% RESULT missing delayiq 126: confirm_c2s_queue_is_flushed(Alice2) 127: end). 128: 129: %%-------------------------------------------------------------------- 130: %% Helpers 131: %%-------------------------------------------------------------------- 132: 133: start_delayiq_handler() -> 134: NS = delayiq_ns(), 135: HostType = host_type(), 136: %% Register our module as an iq handler. 137: %% It's important to use IQDisc=parallel, because 138: %% this particular IQ should be executed in a separate process. 139: rpc(mim(), gen_iq_handler, add_iq_handler_for_domain, 140: [HostType, NS, ejabberd_sm, 141: fun ?MODULE:handle_delayiq_iq/5, #{}, parallel]). 142: 143: stop_delayiq_handler() -> 144: NS = delayiq_ns(), 145: HostType = host_type(), 146: %% Register our module as an iq handler. 147: %% It's important to use IQDisc=parallel, because 148: %% this particular IQ should be executed in a separate process. 149: rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain, 150: [HostType, NS, ejabberd_sm]). 151: 152: %% Brand new IQ namespace. 153: %% Send IQ get with this namespace, and MongooseIM would send you 154: %% IQ result, until you ask for it, using an RPC. 155: delayiq_ns() -> 156: <<"urn:escalus:delayiq">>. 157: 158: %% Make an XMPP stanza, that contains information 159: %% needed in handle_delayiq_iq/4 callback 160: delayiq_iq() -> 161: BinCallerPid = encode_pid(self()), 162: Payload = #xmlel{name = <<"data">>, 163: attrs = [{<<"caller_pid">>, BinCallerPid}]}, 164: escalus_stanza:iq_get(delayiq_ns(), [Payload]). 165: 166: %% This function is executed by MongooseIM 167: handle_delayiq_iq(Acc, _From, _To, IQ, _Extra) -> 168: SubEl = mongoose_iq:iq_to_sub_el(IQ), 169: BinCallerPid = exml_query:path(SubEl, [{element, <<"data">>}, 170: {attr, <<"caller_pid">>}]), 171: CallerPid = decode_pid(BinCallerPid), 172: CallerPid ! {handle_delayiq_iq_confirmation, self()}, 173: receive 174: {handle_delayiq_iq_resume, CallerPid} -> 175: ok 176: after timer:seconds(60) -> 177: %% provide just enough information for debugging 178: erlang:error(#{reason => handle_delayiq_timeout, %% it's unexpected 179: caller_pid => CallerPid, 180: mongoose_pid => self(), 181: messages => erlang:process_info(self(), messages)}) 182: end, 183: ResultIQ = mongoose_iq:empty_result_iq(IQ), 184: {Acc, ResultIQ}. 185: 186: %% Ensure, that MongooseIM started to process our IQ 187: receive_delayiq_confirmation() -> 188: receive 189: {handle_delayiq_iq_confirmation, MongoosePid} -> 190: {ok, MongoosePid} 191: after timer:seconds(5) -> 192: {error, #{reason => receive_delayiq_confirmation_timeout, 193: caller_pid => self(), 194: messages => erlang:process_info(self(), messages)}} 195: end. 196: 197: %% Ask MongooseIM to return result IQ 198: resume_delayiq(MongoosePid) -> 199: MongoosePid ! {handle_delayiq_iq_resume, self()}. 200: 201: %% Send XMPP request stanza. 202: send_delayiq(Client) -> 203: RequestIQ = delayiq_iq(), 204: escalus:send(Client, RequestIQ), 205: RequestIQ. 206: 207: %% Receive XMPP result stanza. 208: receive_delayiq(Client, RequestIQ) -> 209: ResultIQ = escalus_client:wait_for_stanza(Client), 210: escalus:assert(is_iq_result, [RequestIQ], ResultIQ), 211: ResultIQ. 212: 213: wait_for_delayiq_handler_to_finish(MongoosePid) -> 214: MonRef = erlang:monitor(process, MongoosePid), 215: receive 216: {'DOWN', MonRef, process, _, _} -> 217: ok 218: after timer:seconds(5) -> 219: error(#{reason => wait_for_delayiq_handler_to_finish_timeout, 220: mongoose_pid => MongoosePid, 221: caller_pid => self()}) 222: end. 223: 224: %% The simplest way to confirm, that c2s queue is processed is to 225: %% send and receive a stanza from the process. 226: confirm_c2s_queue_is_flushed(Client) -> 227: BadNS = <<"urn:escalus:this_iq_would_fail_but_its_expected">>, 228: %% We can just send an unknown IQ and wait for error. 229: %% Another way is to send a message, but we need two clients for this. 230: RequestIQ = escalus_stanza:iq_get(BadNS, []), 231: escalus:send(Client, RequestIQ), 232: %% We will have a timeout here, if something goes wrong. 233: ResultIQ = escalus_client:wait_for_stanza(Client), 234: %% Expected result 235: escalus:assert(is_iq_error, [], ResultIQ), 236: %% Just check that it's our IQ 237: QueryId = exml_query:attr(RequestIQ, <<"id">>), 238: ResultId = exml_query:attr(ResultIQ, <<"id">>), 239: ?assert_equal_extra(QueryId, ResultId, 240: #{request_iq => RequestIQ, result_iq => ResultIQ}). 241: 242: login_send_presence(Config, User, Resource) -> 243: {ok, Client} = escalus_client:start(Config, User, Resource), 244: escalus:send(Client, escalus_stanza:presence(<<"available">>)), 245: Client. 246: 247: login_send_and_receive_presence(Config, User, Resource) -> 248: Client = login_send_presence(Config, User, Resource), 249: P = escalus_client:wait_for_stanza(Client), 250: escalus:assert(is_presence, P), 251: Client. 252: 253: %% pid_to_list encoding would not work over erlang distribution. 254: encode_pid(Pid) -> 255: base64:encode(erlang:term_to_binary(Pid)). 256: 257: decode_pid(Pid) -> 258: erlang:binary_to_term(base64:decode(Pid)).