1: %%%-------------------------------------------------------------------
    2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com>
    3: %%% @copyright (C) 2018, Kacper Mentel
    4: %%% @doc
    5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`).
    6: %%% @end
    7: %%%-------------------------------------------------------------------
    8: -module(mod_event_pusher_rabbit_SUITE).
    9: 
   10: -include_lib("common_test/include/ct.hrl").
   11: -include_lib("eunit/include/eunit.hrl").
   12: -include_lib("amqp_client/include/amqp_client.hrl").
   13: 
   14: -include("assert_received_match.hrl").
   15: 
   16: -import(distributed_helper, [mim/0, rpc/4]).
   17: -import(domain_helper, [domain/0]).
   18: -import(config_parser_helper, [config/2]).
   19: 
   20: -export([suite/0, all/0, groups/0]).
   21: -export([init_per_suite/1, end_per_suite/1,
   22:          init_per_group/2, end_per_group/2,
   23:          init_per_testcase/2, end_per_testcase/2]).
   24: 
   25: -export([rabbit_pool_starts_with_default_config/1,
   26:          exchanges_are_created_on_module_startup/1]).
   27: -export([connected_users_push_presence_events_when_change_status/1,
   28:          presence_messages_are_properly_formatted/1]).
   29: -export([chat_message_sent_event/1,
   30:          chat_message_sent_event_properly_formatted/1,
   31:          chat_message_received_event/1,
   32:          chat_message_received_event_properly_formatted/1]).
   33: -export([group_chat_message_sent_event/1,
   34:          group_chat_message_sent_event_properly_formatted/1,
   35:          group_chat_message_received_event/1,
   36:          group_chat_message_received_event_properly_formatted/1]).
   37: 
   38: -define(QUEUE_NAME, <<"test_queue">>).
   39: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>).
   40: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>).
   41: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>).
   42: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>).
   43: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>).
   44: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>).
   45: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>).
   46: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>).
   47: -define(WPOOL_CFG, #{scope => host_type,
   48:                      opts => #{workers => 20, strategy => best_worker, call_timeout => 5000}}).
   49: -define(IF_EXCHANGE_EXISTS_RETRIES, 30).
   50: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms
   51: 
   52: -type rabbit_binding() :: {Queue :: binary(),
   53:                            Exchange :: binary(),
   54:                            RoutingKey :: binary()}.
   55: 
   56: %%--------------------------------------------------------------------
   57: %% Suite configuration
   58: %%--------------------------------------------------------------------
   59: 
   60: all() ->
   61:     [
   62:      {group, pool_startup},
   63:      {group, module_startup},
   64:      {group, presence_status_publish},
   65:      {group, chat_message_publish},
   66:      {group, group_chat_message_publish}
   67:     ].
   68: 
   69: groups() ->
   70:     G = [
   71:          {pool_startup, [],
   72:           [
   73:            rabbit_pool_starts_with_default_config
   74:           ]},
   75:          {module_startup, [],
   76:           [
   77:            exchanges_are_created_on_module_startup
   78:           ]},
   79:          {presence_status_publish, [],
   80:           [
   81:            connected_users_push_presence_events_when_change_status,
   82:            presence_messages_are_properly_formatted
   83:           ]},
   84:          {chat_message_publish, [],
   85:           [
   86:            chat_message_sent_event,
   87:            chat_message_sent_event_properly_formatted,
   88:            chat_message_received_event,
   89:            chat_message_received_event_properly_formatted
   90:           ]},
   91:          {group_chat_message_publish, [],
   92:           [
   93:            group_chat_message_sent_event,
   94:            group_chat_message_sent_event_properly_formatted,
   95:            group_chat_message_received_event,
   96:            group_chat_message_received_event_properly_formatted
   97:           ]}
   98:         ],
   99:     ct_helper:repeat_all_until_all_ok(G).
  100: 
  101: suite() ->
  102:     escalus:suite().
  103: 
  104: %%--------------------------------------------------------------------
  105: %% Init & teardown
  106: %%--------------------------------------------------------------------
  107: 
  108: init_per_suite(Config) ->
  109:     case is_rabbitmq_available() of
  110:         true ->
  111:             start_rabbit_wpool(domain()),
  112:             {ok, _} = application:ensure_all_started(amqp_client),
  113:             muc_helper:load_muc(),
  114:             escalus:init_per_suite(Config);
  115:         false ->
  116:             {skip, "RabbitMQ server is not available on default port."}
  117:     end.
  118: 
  119: end_per_suite(Config) ->
  120:     stop_rabbit_wpool(domain()),
  121:     escalus_fresh:clean(),
  122:     muc_helper:unload_muc(),
  123:     escalus:end_per_suite(Config).
  124: 
  125: init_per_group(GroupName, Config0) ->
  126:     Domain = domain(),
  127:     Config = dynamic_modules:save_modules(Domain, Config0),
  128:     dynamic_modules:ensure_modules(Domain, required_modules(GroupName)),
  129:     Config.
  130: 
  131: required_modules(pool_startup) ->
  132:     [{mod_event_pusher, stopped}];
  133: required_modules(GroupName) ->
  134:     [{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}].
  135: 
  136: mod_event_pusher_rabbit_opts(GroupName) ->
  137:     ExtraOpts = extra_exchange_opts(GroupName),
  138:     maps:map(fun(Key, Opts) ->
  139:                      config([modules, mod_event_pusher, rabbit, Key], maps:merge(Opts, ExtraOpts))
  140:              end, basic_exchanges()).
  141: 
  142: basic_exchanges() ->
  143:     #{presence_exchange => #{name => ?PRESENCE_EXCHANGE},
  144:       chat_msg_exchange => #{name => ?CHAT_MSG_EXCHANGE,
  145:                              sent_topic => ?CHAT_MSG_SENT_TOPIC,
  146:                              recv_topic => ?CHAT_MSG_RECV_TOPIC},
  147:       groupchat_msg_exchange => #{name => ?GROUP_CHAT_MSG_EXCHANGE,
  148:                                   sent_topic => ?GROUP_CHAT_MSG_SENT_TOPIC,
  149:                                   recv_topic => ?GROUP_CHAT_MSG_RECV_TOPIC}
  150:      }.
  151: 
  152: extra_exchange_opts(module_startup) -> #{type => <<"headers">>};
  153: extra_exchange_opts(_) -> #{}.
  154: 
  155: end_per_group(_, Config) ->
  156:     delete_exchanges(),
  157:     dynamic_modules:restore_modules(Config).
  158: 
  159: init_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
  160:     Config;
  161: init_per_testcase(CaseName, Config0) ->
  162:     Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
  163:     Config2 = maybe_prepare_muc(CaseName, Config1),
  164:     Config = Config2 ++ connect_to_rabbit(),
  165:     escalus:init_per_testcase(CaseName, Config).
  166: 
  167: end_per_testcase(rabbit_pool_starts_with_default_config, _Config) ->
  168:     ok;
  169: end_per_testcase(CaseName, Config) ->
  170:     maybe_cleanup_muc(CaseName, Config),
  171:     close_rabbit_connection(Config),
  172:     escalus:end_per_testcase(CaseName, Config).
  173: 
  174: %%--------------------------------------------------------------------
  175: %% GROUP initialization_on_startup
  176: %%--------------------------------------------------------------------
  177: 
  178: rabbit_pool_starts_with_default_config(_Config) ->
  179:     %% GIVEN
  180:     Domain = domain(),
  181:     DefaultWpoolConfig = #{type => rabbit, scope => host_type, tag => rabbit_event_pusher_default,
  182:                            opts => #{workers => 10, strategy => best_worker, call_timeout => 5000},
  183:                            conn_opts => #{amqp_port => 5672, confirms_enabled => false, max_worker_queue_len => 1000}},
  184:     RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default},
  185:     %% WHEN
  186:     start_rabbit_wpool(Domain, DefaultWpoolConfig),
  187:     %% THEN
  188:     Pools = rpc(mim(), mongoose_wpool, get_pools, []),
  189:     ?assertMatch(RabbitWpool,
  190:                  lists:keyfind(rabbit_event_pusher_default, 3, Pools)),
  191:     %% CLEANUP
  192:     stop_rabbit_wpool(RabbitWpool).
  193: 
  194: exchanges_are_created_on_module_startup(Config) ->
  195:     %% GIVEN module is started with custom exchange types
  196:     Connection = proplists:get_value(rabbit_connection, Config),
  197:     ExCustomType = <<"headers">>,
  198:     Exchanges = [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE],
  199:     %% THEN exchanges are created
  200:     [?assert(ensure_exchange_present(Connection, {Exchange, ExCustomType}))
  201:      || Exchange <- Exchanges].
  202: 
  203: %%--------------------------------------------------------------------
  204: %% GROUP presence_status_publish
  205: %%--------------------------------------------------------------------
  206: 
  207: connected_users_push_presence_events_when_change_status(Config) ->
  208:     escalus:story(
  209:       Config, [{bob, 1}],
  210:       fun(Bob) ->
  211:               %% GIVEN
  212:               BobJID = client_lower_short_jid(Bob),
  213:               listen_to_presence_events_from_rabbit([BobJID], Config),
  214:               %% WHEN users generate some traffic.
  215:               send_presence_stanzas([Bob], 1),
  216:               %% THEN  wait for presence events from Rabbit.
  217:               ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID},
  218:                                     #amqp_msg{}}, timer:seconds(5))
  219:       end).
  220: 
  221: presence_messages_are_properly_formatted(Config) ->
  222:     escalus:story(
  223:       Config, [{bob, 1}],
  224:       fun(Bob) ->
  225:               %% GIVEN
  226:               BobJID = client_lower_short_jid(Bob),
  227:               BobFullJID = client_lower_full_jid(Bob),
  228:               listen_to_presence_events_from_rabbit([BobJID], Config),
  229:               %% WHEN user logout
  230:               escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
  231:               %% THEN receive message
  232:               ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false},
  233:                            get_decoded_message_from_rabbit(BobJID))
  234:       end).
  235: 
  236: %%--------------------------------------------------------------------
  237: %% GROUP chat_message_publish
  238: %%--------------------------------------------------------------------
  239: 
  240: chat_message_sent_event(Config) ->
  241:     escalus:story(
  242:       Config, [{bob, 1}, {alice, 1}],
  243:       fun(Bob, Alice) ->
  244:               %% GIVEN
  245:               BobJID = client_lower_short_jid(Bob),
  246:               BobChatMsgSentRK = chat_msg_sent_rk(BobJID),
  247:               listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config),
  248:               %% WHEN users chat
  249:               escalus:send(Bob,
  250:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  251:               %% THEN  wait for chat message sent events events from Rabbit.
  252:               ?assertReceivedMatch({#'basic.deliver'{
  253:                                        routing_key = BobChatMsgSentRK},
  254:                                     #amqp_msg{}}, timer:seconds(5))
  255:       end).
  256: 
  257: chat_message_received_event(Config) ->
  258:     escalus:story(
  259:       Config, [{bob, 1}, {alice, 1}],
  260:       fun(Bob, Alice) ->
  261:               %% GIVEN
  262:               AliceJID = client_lower_short_jid(Alice),
  263:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  264:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  265:               %% WHEN users chat
  266:               escalus:send(Bob,
  267:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  268:               escalus:wait_for_stanzas(Alice, 2),
  269:               %% THEN  wait for chat message received events events from
  270:               %% Rabbit.
  271:               ?assertReceivedMatch({#'basic.deliver'{
  272:                                        routing_key = AliceChatMsgRecvRK},
  273:                                     #amqp_msg{}}, timer:seconds(5))
  274:       end).
  275: 
  276: chat_message_sent_event_properly_formatted(Config) ->
  277:     escalus:story(
  278:       Config, [{bob, 1}, {alice, 1}],
  279:       fun(Bob, Alice) ->
  280:               %% GIVEN
  281:               AliceJID = client_lower_short_jid(Alice),
  282:               AliceFullJID = client_lower_full_jid(Alice),
  283:               BobFullJID = client_lower_full_jid(Bob),
  284:               AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID),
  285:               Message = <<"Hi Bob!">>,
  286:               listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config),
  287:               %% WHEN users chat
  288:               escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)),
  289:               %% THEN
  290:               ?assertMatch(#{<<"from_user_id">> := AliceFullJID,
  291:                              <<"to_user_id">> := BobFullJID,
  292:                              <<"message">> := Message},
  293:                            get_decoded_message_from_rabbit(AliceChatMsgSentRK))
  294:       end).
  295: 
  296: chat_message_received_event_properly_formatted(Config) ->
  297:     escalus:story(
  298:       Config, [{bob, 1}, {alice, 1}],
  299:       fun(Bob, Alice) ->
  300:               %% GIVEN
  301:               AliceJID = client_lower_short_jid(Alice),
  302:               AliceFullJID = client_lower_full_jid(Alice),
  303:               BobFullJID = client_lower_full_jid(Bob),
  304:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  305:               Message = <<"Hi Alice!">>,
  306:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  307:               %% WHEN users chat
  308:               escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)),
  309:               escalus:wait_for_stanzas(Alice, 2),
  310:               %% THEN
  311:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  312:                              <<"to_user_id">> := AliceFullJID,
  313:                              <<"message">> := Message},
  314:                            get_decoded_message_from_rabbit(AliceChatMsgRecvRK))
  315:       end).
  316: 
  317: %%--------------------------------------------------------------------
  318: %% GROUP group_message_publish
  319: %%--------------------------------------------------------------------
  320: 
  321: group_chat_message_sent_event(Config) ->
  322:     escalus:story(
  323:       Config, [{bob, 1}],
  324:       fun(Bob) ->
  325:               %% GIVEN
  326:               Room = ?config(room, Config),
  327:               RoomAddr = muc_helper:room_address(Room),
  328:               BobJID = client_lower_short_jid(Bob),
  329:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  330:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  331:               %% WHEN users chat
  332:               escalus:send(Bob,
  333:                            muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  334: 
  335:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  336:                                                             <<"Hi there!">>)),
  337: 
  338:               %% THEN  wait for chat message sent events events from Rabbit.
  339:               ?assertReceivedMatch({#'basic.deliver'{
  340:                                        routing_key = BobGroupChatMsgSentRK},
  341:                                     #amqp_msg{}}, timer:seconds(5))
  342:       end).
  343: 
  344: group_chat_message_received_event(Config) ->
  345:     escalus:story(
  346:       Config, [{bob, 1}, {alice, 1}],
  347:       fun(Bob, Alice) ->
  348:               %% GIVEN
  349:               Room = ?config(room, Config),
  350:               RoomAddr = muc_helper:room_address(Room),
  351:               AliceJID = client_lower_short_jid(Alice),
  352:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  353:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID],
  354:                                                                Config),
  355:               %% WHEN users chat
  356:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room,
  357:                                                                    nick(Alice))),
  358:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  359:                                                                  nick(Bob))),
  360: 
  361:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  362:                                                             <<"Hi there!">>)),
  363: 
  364:               %% THEN  wait for chat message received events events from Rabbit.
  365:               ?assertReceivedMatch({#'basic.deliver'{
  366:                                        routing_key = AliceGroupChatMsgRecvRK},
  367:                                     #amqp_msg{}}, timer:seconds(5))
  368:       end).
  369: 
  370: group_chat_message_sent_event_properly_formatted(Config) ->
  371:     escalus:story(
  372:       Config, [{bob, 1}],
  373:       fun(Bob) ->
  374:               %% GIVEN
  375:               Room = ?config(room, Config),
  376:               RoomAddr = muc_helper:room_address(Room),
  377:               BobJID = client_lower_short_jid(Bob),
  378:               BobFullJID = client_lower_full_jid(Bob),
  379:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  380:               Message = <<"Hi there!">>,
  381:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  382:               %% WHEN a user chat
  383:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  384:                                                                  nick(Bob))),
  385: 
  386:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  387:               %% THEN
  388:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  389:                              <<"to_user_id">> := RoomAddr,
  390:                              <<"message">> := Message},
  391:                            get_decoded_message_from_rabbit(BobGroupChatMsgSentRK))
  392:       end).
  393: 
  394: group_chat_message_received_event_properly_formatted(Config) ->
  395:     escalus:story(
  396:       Config, [{bob, 1}, {alice, 1}],
  397:       fun(Bob, Alice) ->
  398:               %% GIVEN basic variables
  399:               Room = ?config(room, Config),
  400:               RoomAddr = muc_helper:room_address(Room),
  401:               BobRoomJID = user_room_jid(RoomAddr, Bob),
  402:               AliceJID = client_lower_short_jid(Alice),
  403:               AliceFullJID = client_lower_full_jid(Alice),
  404:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  405:               Message = <<"Hi there!">>,
  406:               %% GIVEN users in room
  407:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))),
  408:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  409:               % wait for all room stanzas to be processed
  410:               escalus:wait_for_stanzas(Alice, 3),
  411:               escalus:wait_for_stanzas(Bob, 3),
  412:               %% GIVEN Room subscription to Rabbit
  413:               % We subscribe to RMQ now and not earlier to avoid messages other
  414:               % than the one we are testing, `Message` from Bob to Room, like
  415:               % for example affiliations and the like.
  416:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  417:               %% WHEN users chat
  418:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  419:               %% THEN
  420:               ?assertMatch(#{<<"from_user_id">> := BobRoomJID,
  421:                              <<"to_user_id">> := AliceFullJID,
  422:                              <<"message">> := Message},
  423:                            get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK))
  424:       end).
  425: 
  426: %%--------------------------------------------------------------------
  427: %% Test helpers
  428: %%--------------------------------------------------------------------
  429: 
  430: -spec connect_to_rabbit() -> proplists:proplist().
  431: connect_to_rabbit() ->
  432:     {ok, Connection} =
  433:         amqp_connection:start(#amqp_params_network{}),
  434:     {ok, Channel} = amqp_connection:open_channel(Connection),
  435:     [{rabbit_connection, Connection}, {rabbit_channel, Channel}].
  436: 
  437: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term().
  438: close_rabbit_connection(Config) ->
  439:     Connection = proplists:get_value(rabbit_connection, Config),
  440:     Channel = proplists:get_value(rabbit_channel, Config),
  441:     amqp_channel:close(Channel),
  442:     amqp_connection:close(Connection).
  443: 
  444: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()],
  445:                                             Config :: proplists:proplist()) ->
  446:     ok | term().
  447: listen_to_presence_events_from_rabbit(JIDs, Config) ->
  448:     QueueBindings = presence_bindings(?QUEUE_NAME, JIDs),
  449:     listen_to_events_from_rabbit(QueueBindings, Config).
  450: 
  451: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  452:                                                  Config :: proplists:proplist()) ->
  453:     ok | term().
  454: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  455:     QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  456:     listen_to_events_from_rabbit(QueueBindings, Config).
  457: 
  458: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  459:                                                  Config :: proplists:proplist()) ->
  460:     ok | term().
  461: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  462:     QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  463:     listen_to_events_from_rabbit(QueueBindings, Config).
  464: 
  465: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  466:                                                        Config :: proplists:proplist()) ->
  467:     ok | term().
  468: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  469:     QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  470:     listen_to_events_from_rabbit(QueueBindings, Config).
  471: 
  472: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  473:                                                        Config :: proplists:proplist()) ->
  474:     ok | term().
  475: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  476:     QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  477:     listen_to_events_from_rabbit(QueueBindings, Config).
  478: 
  479: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()],
  480:                                    Config :: proplists:proplist()) ->
  481:     ok | term().
  482: listen_to_events_from_rabbit(QueueBindings, Config) ->
  483:     Connection = proplists:get_value(rabbit_connection, Config),
  484:     Channel = proplists:get_value(rabbit_channel, Config),
  485:     declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME),
  486:     wait_for_exchanges_to_be_created(Connection,
  487:                                      [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  488:                                       {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  489:                                       {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]),
  490:     bind_queues_to_exchanges(Channel, QueueBindings),
  491:     subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME).
  492: 
  493: -spec wait_for_exchanges_to_be_created(Connection :: pid(),
  494:                                        Exchanges :: [binary()]) -> pid().
  495: wait_for_exchanges_to_be_created(Connection, Exchanges) ->
  496:     [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges],
  497:     ok.
  498: 
  499: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary().
  500: declare_temporary_rabbit_queue(Channel, Queue) ->
  501:     #'queue.declare_ok'{} =
  502:         amqp_channel:call(Channel, #'queue.declare'{queue = Queue,
  503:                                                     exclusive = true}).
  504: 
  505: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  506:     [rabbit_binding()].
  507: presence_bindings(Queue, JIDs) ->
  508:     [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs].
  509: 
  510: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  511:     [rabbit_binding()].
  512: chat_msg_sent_bindings(Queue, JIDs) ->
  513:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs].
  514: 
  515: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  516:     [rabbit_binding()].
  517: chat_msg_recv_bindings(Queue, JIDs) ->
  518:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs].
  519: 
  520: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  521:     [rabbit_binding()].
  522: group_chat_msg_sent_bindings(Queue, JIDs) ->
  523:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)}
  524:      || JID <- JIDs].
  525: 
  526: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  527:     [rabbit_binding()].
  528: group_chat_msg_recv_bindings(Queue, JIDs) ->
  529:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)}
  530:      || JID <- JIDs].
  531: 
  532: -spec bind_queues_to_exchanges(Channel :: pid(),
  533:                                Bindings :: [rabbit_binding()]) ->
  534:     [amqp_client:amqp_method() | ok | blocked | closing].
  535: bind_queues_to_exchanges(Channel, Bindings) ->
  536:     [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings].
  537: 
  538: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) ->
  539:     amqp_client:amqp_method() | ok | blocked | closing.
  540: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) ->
  541:     #'queue.bind_ok'{} =
  542:         amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange,
  543:                                                  routing_key = RoutingKey,
  544:                                                  queue = Queue}).
  545: 
  546: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) ->
  547:                                      {ok, true} | {timeout, any()}.
  548: ensure_exchange_present(Connection, Exchange) ->
  549:     Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000,
  550:              sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL},
  551:     case mongoose_helper:wait_until(fun() ->
  552:                                             is_exchange_present(Connection,
  553:                                                                 Exchange)
  554:                                     end, true, Opts) of
  555:         {ok, true} -> true;
  556:         {timeout, _} ->
  557:             throw(io_lib:format("Exchange has not been created, exchange=~p",
  558:                                 [Exchange]))
  559:     end.
  560: 
  561: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean().
  562: is_exchange_present(Connection, {ExName, ExType}) ->
  563:     {ok, Channel} = amqp_connection:open_channel(Connection),
  564:     try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName,
  565:                                                        type = ExType,
  566:                                                        %% this option allows to
  567:                                                        %% check if an exchange exists
  568:                                                        passive = true}) of
  569:         {'exchange.declare_ok'} -> true
  570:     catch
  571:         _Error:_Reason -> false
  572:     end.
  573: 
  574: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok.
  575: subscribe_to_rabbit_queue(Channel, Queue) ->
  576:     amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
  577:                                                      no_ack = true}, self()),
  578:     receive
  579:         #'basic.consume_ok'{} -> ok
  580:     after
  581:         5000 ->
  582:             throw(io_lib:format("Timeout when subscribing to queue,"
  583:                                 "channel=~p, queue=~p", [Channel, Queue]))
  584:     end.
  585: 
  586: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  587:                            -> [[ok]] | term().
  588: send_presence_stanzas(Users, NumOfMsgs) ->
  589:     [send_presence_stanza(User, NumOfMsgs) || User <- Users].
  590: 
  591: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  592:                           -> [ok] | term().
  593: send_presence_stanza(User, NumOfMsgs) ->
  594:     [escalus:send(User, escalus_stanza:presence(make_pres_type(X)))
  595:      || X <- lists:seq(1, NumOfMsgs)].
  596: 
  597: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) ->
  598:     map() | no_return().
  599: get_decoded_message_from_rabbit(RoutingKey) ->
  600:     receive
  601:         {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
  602:             jiffy:decode(Msg, [return_maps])
  603:     after
  604:         5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p",
  605:                                    [RoutingKey]))
  606:     end.
  607: 
  608: %%--------------------------------------------------------------------
  609: %% Utils
  610: %%--------------------------------------------------------------------
  611: 
  612: start_rabbit_wpool(Host) ->
  613:     start_rabbit_wpool(Host, ?WPOOL_CFG).
  614: 
  615: start_rabbit_wpool(Host, WpoolConfig) ->
  616:     rpc(mim(), mongoose_wpool, ensure_started, []),
  617:     Pool = config([outgoing_pools, rabbit, event_pusher], WpoolConfig),
  618:     [{ok, _Pid}] = rpc(mim(), mongoose_wpool, start_configured_pools, [[Pool], [Host]]).
  619: 
  620: stop_rabbit_wpool({Pool, Host, Tag}) ->
  621:     rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]);
  622: stop_rabbit_wpool(Host)->
  623:     stop_rabbit_wpool({rabbit, Host, event_pusher}).
  624: 
  625: delete_exchanges() ->
  626:     ConnConf = connect_to_rabbit(),
  627:     Channel  = proplists:get_value(rabbit_channel, ConnConf),
  628:     [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex})
  629:      || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]],
  630:     close_rabbit_connection(ConnConf).
  631: 
  632: make_pres_type(X) when X rem 2 == 0 ->
  633:     <<"available">>;
  634: make_pres_type(_) ->
  635:     <<"unavailable">>.
  636: 
  637: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC).
  638: 
  639: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC).
  640: 
  641: group_chat_msg_sent_rk(JID) ->
  642:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC).
  643: 
  644: group_chat_msg_recv_rk(JID) ->
  645:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC).
  646: 
  647: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>.
  648: 
  649: client_lower_short_jid(Client) ->
  650:     escalus_utils:jid_to_lower(escalus_client:short_jid(Client)).
  651: 
  652: client_lower_full_jid(Client) ->
  653:     escalus_utils:jid_to_lower(escalus_client:full_jid(Client)).
  654: 
  655: nick(User) -> escalus_utils:get_username(User).
  656: 
  657: maybe_prepare_muc(TestCase, Config) when
  658:       TestCase == group_chat_message_sent_event orelse
  659:       TestCase == group_chat_message_received_event orelse
  660:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  661:       TestCase == group_chat_message_received_event_properly_formatted ->
  662:     prepare_muc(Config);
  663: maybe_prepare_muc(_, Config) -> Config.
  664: 
  665: maybe_cleanup_muc(TestCase, Config) when
  666:       TestCase == group_chat_message_sent_event orelse
  667:       TestCase == group_chat_message_received_event orelse
  668:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  669:       TestCase == group_chat_message_received_event_properly_formatted ->
  670:     cleanup_muc(Config);
  671: maybe_cleanup_muc(_, _) -> ok.
  672: 
  673: prepare_muc(Config) ->
  674:     [User | _] = ?config(escalus_users, Config),
  675:     muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>,
  676:                           [{persistent, true},
  677:                            {anonymous, false}]).
  678: 
  679: cleanup_muc(Config) ->
  680:     muc_helper:destroy_room(Config).
  681: 
  682: user_room_jid(RoomJID, UserJID) ->
  683:     Nick = nick(UserJID),
  684:     <<RoomJID/binary, "/", Nick/binary>>.
  685: 
  686: is_rabbitmq_available() ->
  687:     try amqp_connection:start(#amqp_params_network{}) of
  688:         {ok, Conn} ->
  689:             amqp_connection:close(Conn),
  690:             true;
  691:         {error, econnrefused} ->
  692:             false
  693:     catch
  694:         _Err ->
  695:             false
  696:     end.