1: %%==============================================================================
    2: %% Copyright 2015 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: -module(race_conditions_SUITE).
   17: -compile([export_all, nowarn_export_all]).
   18: 
   19: -export([handle_delayiq_iq/5]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("exml/include/exml.hrl").
   23: -include_lib("common_test/include/ct.hrl").
   24: 
   25: -include("mam_helper.hrl"). %% mam? we need assert_equal_extra
   26: 
   27: -import(distributed_helper, [mim/0, rpc/4]).
   28: 
   29: -import(domain_helper, [host_type/0]).
   30: 
   31: %%--------------------------------------------------------------------
   32: %% Suite configuration
   33: %%--------------------------------------------------------------------
   34: all() ->
   35:     [{group, main_group}].
   36: 
   37: groups() ->
   38:     G = [{main_group, [], main_group_tests()}],
   39:     ct_helper:repeat_all_until_all_ok(G).
   40: 
   41: main_group_tests() ->
   42:     [confirm_c2s_queue_is_flushed_works,
   43:      delayiq_handler_works,
   44:      ignore_iq_result_from_old_session].
   45: 
   46: suite() ->
   47:     escalus:suite().
   48: 
   49: init_per_suite(Config) ->
   50:     escalus:init_per_suite(Config).
   51: 
   52: end_per_suite(Config) ->
   53:     escalus_fresh:clean(),
   54:     escalus:end_per_suite(Config).
   55: 
   56: init_per_group(_, Config) ->
   57:     mongoose_helper:inject_module(?MODULE),
   58:     start_delayiq_handler(),
   59:     Config.
   60: 
   61: end_per_group(_GroupName, Config) ->
   62:     stop_delayiq_handler(),
   63:     Config.
   64: 
   65: init_per_testcase(CaseName, Config) ->
   66:     escalus:init_per_testcase(CaseName, Config).
   67: 
   68: end_per_testcase(CaseName, Config) ->
   69:     escalus:end_per_testcase(CaseName, Config).
   70: 
   71: %%--------------------------------------------------------------------
   72: %% Test cases
   73: %%--------------------------------------------------------------------
   74: 
   75: %% Check, that confirm_c2s_queue_is_flushed works for any regular connection.
   76: %% This function is used in more complex tests.
   77: confirm_c2s_queue_is_flushed_works(Config) ->
   78:     escalus:fresh_story_with_config(Config, [{alice, 1}],
   79:         fun(_FreshConfig, Alice) ->
   80:             %% Should not crash
   81:             confirm_c2s_queue_is_flushed(Alice)
   82:         end).
   83: 
   84: %% Checks, that result IQ is delivered, when delayiq processing is resumed.
   85: delayiq_handler_works(Config) ->
   86:     escalus:fresh_story_with_config(Config, [{alice, 1}],
   87:         fun(_FreshConfig, Alice) ->
   88:             RequestIQ = send_delayiq(Alice),
   89:             {ok, DelayIqConfirmation} = receive_delayiq_confirmation(),
   90:             %% HAVING waiting iq handler
   91:             resume_delayiq(DelayIqConfirmation),
   92:             %% HAVING result IQ stanza put into mongoose_c2s process
   93:             %% (if there is any)
   94:             wait_for_delayiq_handler_to_finish(DelayIqConfirmation),
   95:             receive_delayiq(Alice, RequestIQ),
   96:             confirm_c2s_queue_is_flushed(Alice)
   97:         end).
   98: 
   99: %% We reconnect Alice and check that new connection does not receive
  100: %% IQ result from IQ, sent using the old connection.
  101: %%
  102: %% If old and new resources are different, that the stanza would not be routed
  103: %% to the new connection in any case
  104: ignore_iq_result_from_old_session(Config) ->
  105:     escalus:fresh_story_with_config(Config, [{alice, 1}],
  106:         fun(FreshConfig, Alice) ->
  107:             send_delayiq(Alice),
  108:             {ok, DelayIqConfirmation} = receive_delayiq_confirmation(),
  109:             %% HAVING waiting iq handler
  110: 
  111:             %% Stop the session.
  112:             %% And wait for the session process to be dead
  113:             %% and deregistered.
  114:             mongoose_helper:logout_user(Config, Alice),
  115: 
  116:             %% Use the same resource for both old and new connection.
  117:             Resource = escalus_client:resource(Alice),
  118:             Alice2 = login_send_and_receive_presence(FreshConfig, alice, Resource),
  119:             resume_delayiq(DelayIqConfirmation),
  120:             %% HAVING result IQ stanza put into mongoose_c2s process
  121:             %% (if there is any)
  122:             %% We expect, that Alice2 c2s process receives the route message,
  123:             %% but would ignore it.
  124:             wait_for_delayiq_handler_to_finish(DelayIqConfirmation),
  125:             %% RESULT missing delayiq
  126:             confirm_c2s_queue_is_flushed(Alice2)
  127:         end).
  128: 
  129: %%--------------------------------------------------------------------
  130: %% Helpers
  131: %%--------------------------------------------------------------------
  132: 
  133: start_delayiq_handler() ->
  134:     NS = delayiq_ns(),
  135:     HostType = host_type(),
  136:     %% Register our module as an iq handler.
  137:     %% It's important to use IQDisc=parallel, because
  138:     %% this particular IQ should be executed in a separate process.
  139:     rpc(mim(), gen_iq_handler, add_iq_handler_for_domain,
  140:         [HostType, NS, ejabberd_sm,
  141:          fun ?MODULE:handle_delayiq_iq/5, #{}, parallel]).
  142: 
  143: stop_delayiq_handler() ->
  144:     NS = delayiq_ns(),
  145:     HostType = host_type(),
  146:     %% Register our module as an iq handler.
  147:     %% It's important to use IQDisc=parallel, because
  148:     %% this particular IQ should be executed in a separate process.
  149:     rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain,
  150:         [HostType, NS, ejabberd_sm]).
  151: 
  152: %% Brand new IQ namespace.
  153: %% Send IQ get with this namespace, and MongooseIM would send you
  154: %% IQ result, until you ask for it, using an RPC.
  155: delayiq_ns() ->
  156:     <<"urn:escalus:delayiq">>.
  157: 
  158: %% Make an XMPP stanza, that contains information
  159: %% needed in handle_delayiq_iq/4 callback
  160: delayiq_iq() ->
  161:     BinCallerPid = encode_pid(self()),
  162:     Payload = #xmlel{name = <<"data">>,
  163:                      attrs = [{<<"caller_pid">>, BinCallerPid}]},
  164:     escalus_stanza:iq_get(delayiq_ns(), [Payload]).
  165: 
  166: %% This function is executed by MongooseIM
  167: handle_delayiq_iq(Acc, _From, _To, IQ, _Extra) ->
  168:     SubEl = mongoose_iq:iq_to_sub_el(IQ),
  169:     BinCallerPid = exml_query:path(SubEl, [{element, <<"data">>},
  170:                                            {attr, <<"caller_pid">>}]),
  171:     CallerPid = decode_pid(BinCallerPid),
  172:     CallerPid ! {handle_delayiq_iq_confirmation, self()},
  173:     receive
  174:         {handle_delayiq_iq_resume, CallerPid} ->
  175:             ok
  176:         after timer:seconds(60) ->
  177:             %% provide just enough information for debugging
  178:             erlang:error(#{reason => handle_delayiq_timeout, %% it's unexpected
  179:                            caller_pid => CallerPid,
  180:                            mongoose_pid => self(),
  181:                            messages => erlang:process_info(self(), messages)})
  182:     end,
  183:     ResultIQ = mongoose_iq:empty_result_iq(IQ),
  184:     {Acc, ResultIQ}.
  185: 
  186: %% Ensure, that MongooseIM started to process our IQ
  187: receive_delayiq_confirmation() ->
  188:     receive
  189:         {handle_delayiq_iq_confirmation, MongoosePid} ->
  190:             {ok, MongoosePid}
  191:         after timer:seconds(5) ->
  192:             {error, #{reason => receive_delayiq_confirmation_timeout,
  193:                       caller_pid => self(),
  194:                       messages => erlang:process_info(self(), messages)}}
  195:     end.
  196: 
  197: %% Ask MongooseIM to return result IQ
  198: resume_delayiq(MongoosePid) ->
  199:     MongoosePid ! {handle_delayiq_iq_resume, self()}.
  200: 
  201: %% Send XMPP request stanza.
  202: send_delayiq(Client) ->
  203:     RequestIQ = delayiq_iq(),
  204:     escalus:send(Client, RequestIQ),
  205:     RequestIQ.
  206: 
  207: %% Receive XMPP result stanza.
  208: receive_delayiq(Client, RequestIQ) ->
  209:     ResultIQ = escalus_client:wait_for_stanza(Client),
  210:     escalus:assert(is_iq_result, [RequestIQ], ResultIQ),
  211:     ResultIQ.
  212: 
  213: wait_for_delayiq_handler_to_finish(MongoosePid) ->
  214:     MonRef = erlang:monitor(process, MongoosePid),
  215:     receive
  216:         {'DOWN', MonRef, process, _, _} ->
  217:             ok
  218:     after timer:seconds(5) ->
  219:         error(#{reason => wait_for_delayiq_handler_to_finish_timeout,
  220:                 mongoose_pid => MongoosePid,
  221:                 caller_pid => self()})
  222:     end.
  223: 
  224: %% The simplest way to confirm, that c2s queue is processed is to
  225: %% send and receive a stanza from the process.
  226: confirm_c2s_queue_is_flushed(Client) ->
  227:     BadNS = <<"urn:escalus:this_iq_would_fail_but_its_expected">>,
  228:     %% We can just send an unknown IQ and wait for error.
  229:     %% Another way is to send a message, but we need two clients for this.
  230:     RequestIQ = escalus_stanza:iq_get(BadNS, []),
  231:     escalus:send(Client, RequestIQ),
  232:     %% We will have a timeout here, if something goes wrong.
  233:     ResultIQ = escalus_client:wait_for_stanza(Client),
  234:     %% Expected result
  235:     escalus:assert(is_iq_error, [], ResultIQ),
  236:     %% Just check that it's our IQ
  237:     QueryId = exml_query:attr(RequestIQ, <<"id">>),
  238:     ResultId = exml_query:attr(ResultIQ, <<"id">>),
  239:     ?assert_equal_extra(QueryId, ResultId,
  240:                         #{request_iq => RequestIQ, result_iq => ResultIQ}).
  241: 
  242: login_send_presence(Config, User, Resource) ->
  243:     {ok, Client} = escalus_client:start(Config, User, Resource),
  244:     escalus:send(Client, escalus_stanza:presence(<<"available">>)),
  245:     Client.
  246: 
  247: login_send_and_receive_presence(Config, User, Resource) ->
  248:     Client = login_send_presence(Config, User, Resource),
  249:     P = escalus_client:wait_for_stanza(Client),
  250:     escalus:assert(is_presence, P),
  251:     Client.
  252: 
  253: %% pid_to_list encoding would not work over erlang distribution.
  254: encode_pid(Pid) ->
  255:     base64:encode(erlang:term_to_binary(Pid)).
  256: 
  257: decode_pid(Pid) ->
  258:     erlang:binary_to_term(base64:decode(Pid)).