1: %%==============================================================================
    2: %% Copyright 2015 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: -module(race_conditions_SUITE).
   17: -compile([export_all, nowarn_export_all]).
   18: 
   19: -export([handle_delayiq_iq/5]).
   20: 
   21: -include_lib("escalus/include/escalus.hrl").
   22: -include_lib("exml/include/exml.hrl").
   23: -include_lib("common_test/include/ct.hrl").
   24: 
   25: -include("mam_helper.hrl"). %% mam? we need assert_equal_extra
   26: 
   27: -import(distributed_helper, [mim/0, rpc/4]).
   28: 
   29: -import(domain_helper, [host_type/0]).
   30: 
   31: %%--------------------------------------------------------------------
   32: %% Suite configuration
   33: %%--------------------------------------------------------------------
   34: all() ->
   35:     [{group, main_group}].
   36: 
   37: groups() ->
   38:     G = [{main_group, [], main_group_tests()}],
   39:     ct_helper:repeat_all_until_all_ok(G).
   40: 
   41: main_group_tests() ->
   42:     [confirm_c2s_queue_is_flushed_works,
   43:      delayiq_handler_works,
   44:      ignore_iq_result_from_old_session].
   45: 
   46: suite() ->
   47:     escalus:suite().
   48: 
   49: init_per_suite(Config) ->
   50:     escalus:init_per_suite(Config).
   51: 
   52: end_per_suite(Config) ->
   53:     escalus_fresh:clean(),
   54:     escalus:end_per_suite(Config).
   55: 
   56: init_per_group(_, Config) ->
   57:     mongoose_helper:inject_module(?MODULE),
   58:     start_delayiq_handler(),
   59:     Config.
   60: 
   61: end_per_group(_GroupName, Config) ->
   62:     stop_delayiq_handler(),
   63:     Config.
   64: 
   65: init_per_testcase(CaseName, Config) ->
   66:     escalus:init_per_testcase(CaseName, Config).
   67: 
   68: end_per_testcase(CaseName, Config) ->
   69:     escalus:end_per_testcase(CaseName, Config).
   70: 
   71: %%--------------------------------------------------------------------
   72: %% Test cases
   73: %%--------------------------------------------------------------------
   74: 
   75: %% Check, that confirm_c2s_queue_is_flushed works for any regular connection.
   76: %% This function is used in more complex tests.
   77: confirm_c2s_queue_is_flushed_works(Config) ->
   78:     escalus:fresh_story_with_config(Config, [{alice, 1}],
   79:         fun(_FreshConfig, Alice) ->
   80:             %% Should not crash
   81:             confirm_c2s_queue_is_flushed(Alice)
   82:         end).
   83: 
   84: %% Checks, that result IQ is delivered, when delayiq processing is resumed.
   85: delayiq_handler_works(Config) ->
   86:     escalus:fresh_story_with_config(Config, [{alice, 1}],
   87:         fun(_FreshConfig, Alice) ->
   88:             RequestIQ = send_delayiq(Alice),
   89:             {ok, DelayIqConfirmation} = receive_delayiq_confirmation(),
   90:             %% HAVING waiting iq handler
   91:             resume_delayiq(DelayIqConfirmation),
   92:             %% HAVING result IQ stanza put into ejabberd_c2s process
   93:             %% (if there is any)
   94:             wait_for_delayiq_handler_to_finish(DelayIqConfirmation),
   95:             receive_delayiq(Alice, RequestIQ),
   96:             confirm_c2s_queue_is_flushed(Alice)
   97:         end).
   98: 
   99: %% We reconnect Alice and check that new connection does not receive
  100: %% IQ result from IQ, sent using the old connection.
  101: %%
  102: %% If old and new resources are different, that the stanza would not be routed
  103: %% to the new connection in any case (with or without
  104: %% check_incoming_accum_for_conflicts check in ejabberd_c2s).
  105: ignore_iq_result_from_old_session(Config) ->
  106:     escalus:fresh_story_with_config(Config, [{alice, 1}],
  107:         fun(FreshConfig, Alice) ->
  108:             send_delayiq(Alice),
  109:             {ok, DelayIqConfirmation} = receive_delayiq_confirmation(),
  110:             %% HAVING waiting iq handler
  111: 
  112:             %% Stop the session.
  113:             %% And wait for the session process to be dead
  114:             %% and deregistered.
  115:             mongoose_helper:logout_user(Config, Alice),
  116: 
  117:             %% Use the same resource for both old and new connection.
  118:             Resource = escalus_client:resource(Alice),
  119:             Alice2 = login_send_and_receive_presence(FreshConfig, alice, Resource),
  120:             resume_delayiq(DelayIqConfirmation),
  121:             %% HAVING result IQ stanza put into ejabberd_c2s process
  122:             %% (if there is any)
  123:             %% We expect, that Alice2 c2s process receives the route message,
  124:             %% but would ignore it.
  125:             wait_for_delayiq_handler_to_finish(DelayIqConfirmation),
  126:             %% RESULT missing delayiq
  127:             confirm_c2s_queue_is_flushed(Alice2)
  128:         end).
  129: 
  130: %%--------------------------------------------------------------------
  131: %% Helpers
  132: %%--------------------------------------------------------------------
  133: 
  134: start_delayiq_handler() ->
  135:     NS = delayiq_ns(),
  136:     HostType = host_type(),
  137:     %% Register our module as an iq handler.
  138:     %% It's important to use IQDisc=parallel, because
  139:     %% this particular IQ should be executed in a separate process.
  140:     rpc(mim(), gen_iq_handler, add_iq_handler_for_domain,
  141:         [HostType, NS, ejabberd_sm,
  142:          fun ?MODULE:handle_delayiq_iq/5, #{}, parallel]).
  143: 
  144: stop_delayiq_handler() ->
  145:     NS = delayiq_ns(),
  146:     HostType = host_type(),
  147:     %% Register our module as an iq handler.
  148:     %% It's important to use IQDisc=parallel, because
  149:     %% this particular IQ should be executed in a separate process.
  150:     rpc(mim(), gen_iq_handler, remove_iq_handler_for_domain,
  151:         [HostType, NS, ejabberd_sm]).
  152: 
  153: %% Brand new IQ namespace.
  154: %% Send IQ get with this namespace, and MongooseIM would send you
  155: %% IQ result, until you ask for it, using an RPC.
  156: delayiq_ns() ->
  157:     <<"urn:escalus:delayiq">>.
  158: 
  159: %% Make an XMPP stanza, that contains information
  160: %% needed in handle_delayiq_iq/4 callback
  161: delayiq_iq() ->
  162:     BinCallerPid = encode_pid(self()),
  163:     Payload = #xmlel{name = <<"data">>,
  164:                      attrs = [{<<"caller_pid">>, BinCallerPid}]},
  165:     escalus_stanza:iq_get(delayiq_ns(), [Payload]).
  166: 
  167: %% This function is executed by MongooseIM
  168: handle_delayiq_iq(Acc, _From, _To, IQ, _Extra) ->
  169:     SubEl = mongoose_iq:iq_to_sub_el(IQ),
  170:     BinCallerPid = exml_query:path(SubEl, [{element, <<"data">>},
  171:                                            {attr, <<"caller_pid">>}]),
  172:     CallerPid = decode_pid(BinCallerPid),
  173:     CallerPid ! {handle_delayiq_iq_confirmation, self()},
  174:     receive
  175:         {handle_delayiq_iq_resume, CallerPid} ->
  176:             ok
  177:         after timer:seconds(60) ->
  178:             %% provide just enough information for debugging
  179:             erlang:error(#{reason => handle_delayiq_timeout, %% it's unexpected
  180:                            caller_pid => CallerPid,
  181:                            mongoose_pid => self(),
  182:                            messages => erlang:process_info(self(), messages)})
  183:     end,
  184:     ResultIQ = mongoose_iq:empty_result_iq(IQ),
  185:     {Acc, ResultIQ}.
  186: 
  187: %% Ensure, that MongooseIM started to process our IQ
  188: receive_delayiq_confirmation() ->
  189:     receive
  190:         {handle_delayiq_iq_confirmation, MongoosePid} ->
  191:             {ok, MongoosePid}
  192:         after timer:seconds(5) ->
  193:             {error, #{reason => receive_delayiq_confirmation_timeout,
  194:                       caller_pid => self(),
  195:                       messages => erlang:process_info(self(), messages)}}
  196:     end.
  197: 
  198: %% Ask MongooseIM to return result IQ
  199: resume_delayiq(MongoosePid) ->
  200:     MongoosePid ! {handle_delayiq_iq_resume, self()}.
  201: 
  202: %% Send XMPP request stanza.
  203: send_delayiq(Client) ->
  204:     RequestIQ = delayiq_iq(),
  205:     escalus:send(Client, RequestIQ),
  206:     RequestIQ.
  207: 
  208: %% Receive XMPP result stanza.
  209: receive_delayiq(Client, RequestIQ) ->
  210:     ResultIQ = escalus_client:wait_for_stanza(Client),
  211:     escalus:assert(is_iq_result, [RequestIQ], ResultIQ),
  212:     ResultIQ.
  213: 
  214: wait_for_delayiq_handler_to_finish(MongoosePid) ->
  215:     MonRef = erlang:monitor(process, MongoosePid),
  216:     receive
  217:         {'DOWN', MonRef, process, _, _} ->
  218:             ok
  219:     after timer:seconds(5) ->
  220:         error(#{reason => wait_for_delayiq_handler_to_finish_timeout,
  221:                 mongoose_pid => MongoosePid,
  222:                 caller_pid => self()})
  223:     end.
  224: 
  225: %% The simplest way to confirm, that c2s queue is processed is to
  226: %% send and receive a stanza from the process.
  227: confirm_c2s_queue_is_flushed(Client) ->
  228:     BadNS = <<"urn:escalus:this_iq_would_fail_but_its_expected">>,
  229:     %% We can just send an unknown IQ and wait for error.
  230:     %% Another way is to send a message, but we need two clients for this.
  231:     RequestIQ = escalus_stanza:iq_get(BadNS, []),
  232:     escalus:send(Client, RequestIQ),
  233:     %% We will have a timeout here, if something goes wrong.
  234:     ResultIQ = escalus_client:wait_for_stanza(Client),
  235:     %% Expected result
  236:     escalus:assert(is_iq_error, [], ResultIQ),
  237:     %% Just check that it's our IQ
  238:     QueryId = exml_query:attr(RequestIQ, <<"id">>),
  239:     ResultId = exml_query:attr(ResultIQ, <<"id">>),
  240:     ?assert_equal_extra(QueryId, ResultId,
  241:                         #{request_iq => RequestIQ, result_iq => ResultIQ}).
  242: 
  243: login_send_presence(Config, User, Resource) ->
  244:     {ok, Client} = escalus_client:start(Config, User, Resource),
  245:     escalus:send(Client, escalus_stanza:presence(<<"available">>)),
  246:     Client.
  247: 
  248: login_send_and_receive_presence(Config, User, Resource) ->
  249:     Client = login_send_presence(Config, User, Resource),
  250:     P = escalus_client:wait_for_stanza(Client),
  251:     escalus:assert(is_presence, P),
  252:     Client.
  253: 
  254: %% pid_to_list encoding would not work over erlang distribution.
  255: encode_pid(Pid) ->
  256:     base64:encode(erlang:term_to_binary(Pid)).
  257: 
  258: decode_pid(Pid) ->
  259:     erlang:binary_to_term(base64:decode(Pid)).