1: %%%-------------------------------------------------------------------
    2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com>
    3: %%% @copyright (C) 2018, Kacper Mentel
    4: %%% @doc
    5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`).
    6: %%% @end
    7: %%%-------------------------------------------------------------------
    8: -module(mod_event_pusher_rabbit_SUITE).
    9: 
   10: -include_lib("escalus/include/escalus.hrl").
   11: -include_lib("common_test/include/ct.hrl").
   12: -include_lib("eunit/include/eunit.hrl").
   13: -include_lib("escalus/include/escalus_xmlns.hrl").
   14: -include_lib("exml/include/exml.hrl").
   15: -include_lib("amqp_client/include/amqp_client.hrl").
   16: 
   17: -include("assert_received_match.hrl").
   18: 
   19: -import(distributed_helper, [mim/0,
   20:                              rpc/4]).
   21: 
   22: -import(domain_helper, [domain/0]).
   23: 
   24: -export([suite/0, all/0, groups/0]).
   25: -export([init_per_suite/1, end_per_suite/1,
   26:          init_per_group/2, end_per_group/2,
   27:          init_per_testcase/2, end_per_testcase/2]).
   28: 
   29: -export([rabbit_pool_starts_with_default_config/1,
   30:          exchanges_are_created_on_module_startup/1]).
   31: -export([connected_users_push_presence_events_when_change_status/1,
   32:          presence_messages_are_properly_formatted/1]).
   33: -export([chat_message_sent_event/1,
   34:          chat_message_sent_event_properly_formatted/1,
   35:          chat_message_received_event/1,
   36:          chat_message_received_event_properly_formatted/1]).
   37: -export([group_chat_message_sent_event/1,
   38:          group_chat_message_sent_event_properly_formatted/1,
   39:          group_chat_message_received_event/1,
   40:          group_chat_message_received_event_properly_formatted/1]).
   41: 
   42: -define(QUEUE_NAME, <<"test_queue">>).
   43: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>).
   44: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>).
   45: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>).
   46: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>).
   47: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>).
   48: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>).
   49: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>).
   50: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>).
   51: -define(MOD_EVENT_PUSHER_RABBIT_CFG,
   52:         [{presence_exchange, [{name, ?PRESENCE_EXCHANGE}]},
   53:          {chat_msg_exchange, [{name, ?CHAT_MSG_EXCHANGE},
   54:                               {sent_topic, ?CHAT_MSG_SENT_TOPIC},
   55:                               {recv_topic, ?CHAT_MSG_RECV_TOPIC}]},
   56:          {groupchat_msg_exchange, [{name, ?GROUP_CHAT_MSG_EXCHANGE},
   57:                                    {sent_topic, ?GROUP_CHAT_MSG_SENT_TOPIC},
   58:                                    {recv_topic, ?GROUP_CHAT_MSG_RECV_TOPIC}]}
   59:         ]).
   60: -define(MOD_EVENT_PUSHER_CFG, [{backends,
   61:                                 [{rabbit, ?MOD_EVENT_PUSHER_RABBIT_CFG}]}]).
   62: -define(WPOOL_CFG, {rabbit, host, event_pusher,
   63:                     [{workers, 20}],
   64:                     [%% enables publisher one-to-one confirms
   65:                      %% disabled by default
   66:                      %% {confirms_enabled, true},
   67:                      {amqp_host, "localhost"},
   68:                      {amqp_port, 5672},
   69:                      {amqp_username, "guest"},
   70:                      {amqp_password, "guest"}
   71:                     ]}).
   72: -define(IF_EXCHANGE_EXISTS_RETRIES, 30).
   73: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms
   74: 
   75: -type rabbit_binding() :: {Queue :: binary(),
   76:                            Exchange :: binary(),
   77:                            RoutingKey :: binary()}.
   78: 
   79: %%--------------------------------------------------------------------
   80: %% Suite configuration
   81: %%--------------------------------------------------------------------
   82: 
   83: all() ->
   84:     [
   85:      {group, initialization_on_startup},
   86:      {group, presence_status_publish},
   87:      {group, chat_message_publish},
   88:      {group, group_chat_message_publish}
   89:     ].
   90: 
   91: groups() ->
   92:     G = [
   93:          {initialization_on_startup, [],
   94:           [
   95:            rabbit_pool_starts_with_default_config,
   96:            exchanges_are_created_on_module_startup
   97:           ]},
   98:          {presence_status_publish, [],
   99:           [
  100:            connected_users_push_presence_events_when_change_status,
  101:            presence_messages_are_properly_formatted
  102:           ]},
  103:          {chat_message_publish, [],
  104:           [
  105:            chat_message_sent_event,
  106:            chat_message_sent_event_properly_formatted,
  107:            chat_message_received_event,
  108:            chat_message_received_event_properly_formatted
  109:           ]},
  110:          {group_chat_message_publish, [],
  111:           [
  112:            group_chat_message_sent_event,
  113:            group_chat_message_sent_event_properly_formatted,
  114:            group_chat_message_received_event,
  115:            group_chat_message_received_event_properly_formatted
  116:           ]}
  117:         ],
  118:     ct_helper:repeat_all_until_all_ok(G).
  119: 
  120: suite() ->
  121:     escalus:suite().
  122: 
  123: %%--------------------------------------------------------------------
  124: %% Init & teardown
  125: %%--------------------------------------------------------------------
  126: 
  127: init_per_suite(Config) ->
  128:     start_rabbit_wpool(domain()),
  129:     {ok, _} = application:ensure_all_started(amqp_client),
  130:     case is_rabbitmq_available() of
  131:         true ->
  132:             muc_helper:load_muc(),
  133:             escalus:init_per_suite(Config);
  134:         false ->
  135:             {skip, "RabbitMQ server is not available on default port."}
  136:     end.
  137: 
  138: end_per_suite(Config) ->
  139:     stop_rabbit_wpool(domain()),
  140:     escalus_fresh:clean(),
  141:     muc_helper:unload_muc(),
  142:     escalus:end_per_suite(Config).
  143: 
  144: init_per_group(initialization_on_startup, Config) ->
  145:     Config;
  146: init_per_group(_, Config0) ->
  147:     Domain = domain(),
  148:     Config = dynamic_modules:save_modules(Domain, Config0),
  149:     dynamic_modules:ensure_modules(Domain,
  150:                                    [{mod_event_pusher, ?MOD_EVENT_PUSHER_CFG}]),
  151:     Config.
  152: 
  153: end_per_group(initialization_on_startup, Config) ->
  154:     Config;
  155: end_per_group(_, Config) ->
  156:     delete_exchanges(),
  157:     Domain = domain(),
  158:     dynamic_modules:stop(Domain, mod_event_pusher),
  159:     dynamic_modules:restore_modules(Config),
  160:     escalus:delete_users(Config, escalus:get_users([bob, alice])).
  161: 
  162: init_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
  163:     Config;
  164: init_per_testcase(CaseName, Config0) ->
  165:     Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
  166:     Config2 = maybe_prepare_muc(CaseName, Config1),
  167:     Config = Config2 ++ connect_to_rabbit(),
  168:     escalus:init_per_testcase(CaseName, Config).
  169: 
  170: end_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
  171:     Config;
  172: end_per_testcase(exchanges_are_created_on_module_startup, Config) ->
  173:     delete_exchanges(),
  174:     stop_mod_event_pusher_rabbit(),
  175:     close_rabbit_connection(Config),
  176:     Config;
  177: end_per_testcase(CaseName, Config) ->
  178:     maybe_cleanup_muc(CaseName, Config),
  179:     close_rabbit_connection(Config),
  180:     escalus:end_per_testcase(CaseName, Config).
  181: 
  182: 
  183: %%--------------------------------------------------------------------
  184: %% GROUP initialization_on_startup
  185: %%--------------------------------------------------------------------
  186: 
  187: rabbit_pool_starts_with_default_config(_Config) ->
  188:     %% GIVEN
  189:     Domain = domain(),
  190:     DefaultWpoolConfig = {rabbit, host, rabbit_event_pusher_default, [], []},
  191:     RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default},
  192:     %% WHEN
  193:     start_rabbit_wpool(Domain, DefaultWpoolConfig),
  194:     %% THEN
  195:     Pools = rpc(mim(), mongoose_wpool, get_pools, []),
  196:     ?assertMatch(RabbitWpool,
  197:                  lists:keyfind(rabbit_event_pusher_default, 3, Pools)),
  198:     %% CLEANUP
  199:     stop_rabbit_wpool(RabbitWpool).
  200: 
  201: exchanges_are_created_on_module_startup(Config) ->
  202:     %% GIVEN
  203:     Connection = proplists:get_value(rabbit_connection, Config),
  204:     ExCustomType = <<"headers">>,
  205:     Exchanges = [{?PRESENCE_EXCHANGE, ExCustomType},
  206:                  {?CHAT_MSG_EXCHANGE, ExCustomType},
  207:                  {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType}],
  208:     ConfigWithCustomExchangeType =
  209:         extend_config_with_exchange_type(ExCustomType),
  210:     %% WHEN
  211:     start_mod_event_pusher_rabbit(ConfigWithCustomExchangeType),
  212:     %% THEN exchanges are created
  213:     [?assert(ensure_exchange_present(Connection, Exchange))
  214:      || Exchange <- Exchanges].
  215: 
  216: %%--------------------------------------------------------------------
  217: %% GROUP presence_status_publish
  218: %%--------------------------------------------------------------------
  219: 
  220: connected_users_push_presence_events_when_change_status(Config) ->
  221:     escalus:story(
  222:       Config, [{bob, 1}],
  223:       fun(Bob) ->
  224:               %% GIVEN
  225:               BobJID = client_lower_short_jid(Bob),
  226:               listen_to_presence_events_from_rabbit([BobJID], Config),
  227:               %% WHEN users generate some traffic.
  228:               send_presence_stanzas([Bob], 1),
  229:               %% THEN  wait for presence events from Rabbit.
  230:               ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID},
  231:                                     #amqp_msg{}}, timer:seconds(5))
  232:       end).
  233: 
  234: presence_messages_are_properly_formatted(Config) ->
  235:     escalus:story(
  236:       Config, [{bob, 1}],
  237:       fun(Bob) ->
  238:               %% GIVEN
  239:               BobJID = client_lower_short_jid(Bob),
  240:               BobFullJID = client_lower_full_jid(Bob),
  241:               listen_to_presence_events_from_rabbit([BobJID], Config),
  242:               %% WHEN user logout
  243:               escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
  244:               %% THEN receive message
  245:               ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false},
  246:                            get_decoded_message_from_rabbit(BobJID))
  247:       end).
  248: 
  249: %%--------------------------------------------------------------------
  250: %% GROUP chat_message_publish
  251: %%--------------------------------------------------------------------
  252: 
  253: chat_message_sent_event(Config) ->
  254:     escalus:story(
  255:       Config, [{bob, 1}, {alice, 1}],
  256:       fun(Bob, Alice) ->
  257:               %% GIVEN
  258:               BobJID = client_lower_short_jid(Bob),
  259:               BobChatMsgSentRK = chat_msg_sent_rk(BobJID),
  260:               listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config),
  261:               %% WHEN users chat
  262:               escalus:send(Bob,
  263:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  264:               %% THEN  wait for chat message sent events events from Rabbit.
  265:               ?assertReceivedMatch({#'basic.deliver'{
  266:                                        routing_key = BobChatMsgSentRK},
  267:                                     #amqp_msg{}}, timer:seconds(5))
  268:       end).
  269: 
  270: chat_message_received_event(Config) ->
  271:     escalus:story(
  272:       Config, [{bob, 1}, {alice, 1}],
  273:       fun(Bob, Alice) ->
  274:               %% GIVEN
  275:               AliceJID = client_lower_short_jid(Alice),
  276:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  277:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  278:               %% WHEN users chat
  279:               escalus:send(Bob,
  280:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  281:               escalus:wait_for_stanzas(Alice, 2),
  282:               %% THEN  wait for chat message received events events from
  283:               %% Rabbit.
  284:               ?assertReceivedMatch({#'basic.deliver'{
  285:                                        routing_key = AliceChatMsgRecvRK},
  286:                                     #amqp_msg{}}, timer:seconds(5))
  287:       end).
  288: 
  289: chat_message_sent_event_properly_formatted(Config) ->
  290:     escalus:story(
  291:       Config, [{bob, 1}, {alice, 1}],
  292:       fun(Bob, Alice) ->
  293:               %% GIVEN
  294:               AliceJID = client_lower_short_jid(Alice),
  295:               AliceFullJID = client_lower_full_jid(Alice),
  296:               BobFullJID = client_lower_full_jid(Bob),
  297:               AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID),
  298:               Message = <<"Hi Bob!">>,
  299:               listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config),
  300:               %% WHEN users chat
  301:               escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)),
  302:               %% THEN
  303:               ?assertMatch(#{<<"from_user_id">> := AliceFullJID,
  304:                              <<"to_user_id">> := BobFullJID,
  305:                              <<"message">> := Message},
  306:                            get_decoded_message_from_rabbit(AliceChatMsgSentRK))
  307:       end).
  308: 
  309: chat_message_received_event_properly_formatted(Config) ->
  310:     escalus:story(
  311:       Config, [{bob, 1}, {alice, 1}],
  312:       fun(Bob, Alice) ->
  313:               %% GIVEN
  314:               AliceJID = client_lower_short_jid(Alice),
  315:               AliceFullJID = client_lower_full_jid(Alice),
  316:               BobFullJID = client_lower_full_jid(Bob),
  317:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  318:               Message = <<"Hi Alice!">>,
  319:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  320:               %% WHEN users chat
  321:               escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)),
  322:               escalus:wait_for_stanzas(Alice, 2),
  323:               %% THEN
  324:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  325:                              <<"to_user_id">> := AliceFullJID,
  326:                              <<"message">> := Message},
  327:                            get_decoded_message_from_rabbit(AliceChatMsgRecvRK))
  328:       end).
  329: 
  330: %%--------------------------------------------------------------------
  331: %% GROUP group_message_publish
  332: %%--------------------------------------------------------------------
  333: 
  334: group_chat_message_sent_event(Config) ->
  335:     escalus:story(
  336:       Config, [{bob, 1}],
  337:       fun(Bob) ->
  338:               %% GIVEN
  339:               Room = ?config(room, Config),
  340:               RoomAddr = muc_helper:room_address(Room),
  341:               BobJID = client_lower_short_jid(Bob),
  342:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  343:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  344:               %% WHEN users chat
  345:               escalus:send(Bob,
  346:                            muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  347: 
  348:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  349:                                                             <<"Hi there!">>)),
  350: 
  351:               %% THEN  wait for chat message sent events events from Rabbit.
  352:               ?assertReceivedMatch({#'basic.deliver'{
  353:                                        routing_key = BobGroupChatMsgSentRK},
  354:                                     #amqp_msg{}}, timer:seconds(5))
  355:       end).
  356: 
  357: group_chat_message_received_event(Config) ->
  358:     escalus:story(
  359:       Config, [{bob, 1}, {alice, 1}],
  360:       fun(Bob, Alice) ->
  361:               %% GIVEN
  362:               Room = ?config(room, Config),
  363:               RoomAddr = muc_helper:room_address(Room),
  364:               AliceJID = client_lower_short_jid(Alice),
  365:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  366:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID],
  367:                                                                Config),
  368:               %% WHEN users chat
  369:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room,
  370:                                                                    nick(Alice))),
  371:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  372:                                                                  nick(Bob))),
  373: 
  374:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  375:                                                             <<"Hi there!">>)),
  376: 
  377:               %% THEN  wait for chat message received events events from Rabbit.
  378:               ?assertReceivedMatch({#'basic.deliver'{
  379:                                        routing_key = AliceGroupChatMsgRecvRK},
  380:                                     #amqp_msg{}}, timer:seconds(5))
  381:       end).
  382: 
  383: group_chat_message_sent_event_properly_formatted(Config) ->
  384:     escalus:story(
  385:       Config, [{bob, 1}],
  386:       fun(Bob) ->
  387:               %% GIVEN
  388:               Room = ?config(room, Config),
  389:               RoomAddr = muc_helper:room_address(Room),
  390:               BobJID = client_lower_short_jid(Bob),
  391:               BobFullJID = client_lower_full_jid(Bob),
  392:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  393:               Message = <<"Hi there!">>,
  394:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  395:               %% WHEN a user chat
  396:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  397:                                                                  nick(Bob))),
  398: 
  399:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  400:               %% THEN
  401:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  402:                              <<"to_user_id">> := RoomAddr,
  403:                              <<"message">> := Message},
  404:                            get_decoded_message_from_rabbit(BobGroupChatMsgSentRK))
  405:       end).
  406: 
  407: group_chat_message_received_event_properly_formatted(Config) ->
  408:     escalus:story(
  409:       Config, [{bob, 1}, {alice, 1}],
  410:       fun(Bob, Alice) ->
  411:               %% GIVEN basic variables
  412:               Room = ?config(room, Config),
  413:               RoomAddr = muc_helper:room_address(Room),
  414:               BobRoomJID = user_room_jid(RoomAddr, Bob),
  415:               AliceJID = client_lower_short_jid(Alice),
  416:               AliceFullJID = client_lower_full_jid(Alice),
  417:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  418:               Message = <<"Hi there!">>,
  419:               %% GIVEN users in room
  420:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))),
  421:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  422:               % wait for all room stanzas to be processed
  423:               escalus:wait_for_stanzas(Alice, 3),
  424:               escalus:wait_for_stanzas(Bob, 3),
  425:               %% GIVEN Room subscription to Rabbit
  426:               % We subscribe to RMQ now and not earlier to avoid messages other
  427:               % than the one we are testing, `Message` from Bob to Room, like
  428:               % for example affiliations and the like.
  429:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  430:               %% WHEN users chat
  431:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  432:               %% THEN
  433:               ?assertMatch(#{<<"from_user_id">> := BobRoomJID,
  434:                              <<"to_user_id">> := AliceFullJID,
  435:                              <<"message">> := Message},
  436:                            get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK))
  437:       end).
  438: 
  439: %%--------------------------------------------------------------------
  440: %% Test helpers
  441: %%--------------------------------------------------------------------
  442: 
  443: -spec connect_to_rabbit() -> proplists:proplist().
  444: connect_to_rabbit() ->
  445:     {ok, Connection} =
  446:         amqp_connection:start(#amqp_params_network{}),
  447:     {ok, Channel} = amqp_connection:open_channel(Connection),
  448:     [{rabbit_connection, Connection}, {rabbit_channel, Channel}].
  449: 
  450: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term().
  451: close_rabbit_connection(Config) ->
  452:     Connection = proplists:get_value(rabbit_connection, Config),
  453:     Channel = proplists:get_value(rabbit_channel, Config),
  454:     amqp_channel:close(Channel),
  455:     amqp_connection:close(Connection).
  456: 
  457: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()],
  458:                                             Config :: proplists:proplist()) ->
  459:     ok | term().
  460: listen_to_presence_events_from_rabbit(JIDs, Config) ->
  461:     QueueBindings = presence_bindings(?QUEUE_NAME, JIDs),
  462:     listen_to_events_from_rabbit(QueueBindings, Config).
  463: 
  464: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  465:                                                  Config :: proplists:proplist()) ->
  466:     ok | term().
  467: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  468:     QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  469:     listen_to_events_from_rabbit(QueueBindings, Config).
  470: 
  471: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  472:                                                  Config :: proplists:proplist()) ->
  473:     ok | term().
  474: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  475:     QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  476:     listen_to_events_from_rabbit(QueueBindings, Config).
  477: 
  478: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  479:                                                        Config :: proplists:proplist()) ->
  480:     ok | term().
  481: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  482:     QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  483:     listen_to_events_from_rabbit(QueueBindings, Config).
  484: 
  485: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  486:                                                        Config :: proplists:proplist()) ->
  487:     ok | term().
  488: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  489:     QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  490:     listen_to_events_from_rabbit(QueueBindings, Config).
  491: 
  492: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()],
  493:                                    Config :: proplists:proplist()) ->
  494:     ok | term().
  495: listen_to_events_from_rabbit(QueueBindings, Config) ->
  496:     Connection = proplists:get_value(rabbit_connection, Config),
  497:     Channel = proplists:get_value(rabbit_channel, Config),
  498:     declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME),
  499:     wait_for_exchanges_to_be_created(Connection,
  500:                                      [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  501:                                       {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  502:                                       {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]),
  503:     bind_queues_to_exchanges(Channel, QueueBindings),
  504:     subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME).
  505: 
  506: -spec wait_for_exchanges_to_be_created(Connection :: pid(),
  507:                                        Exchanges :: [binary()]) -> pid().
  508: wait_for_exchanges_to_be_created(Connection, Exchanges) ->
  509:     [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges],
  510:     ok.
  511: 
  512: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary().
  513: declare_temporary_rabbit_queue(Channel, Queue) ->
  514:     #'queue.declare_ok'{} =
  515:         amqp_channel:call(Channel, #'queue.declare'{queue = Queue,
  516:                                                     exclusive = true}).
  517: 
  518: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  519:     [rabbit_binding()].
  520: presence_bindings(Queue, JIDs) ->
  521:     [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs].
  522: 
  523: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  524:     [rabbit_binding()].
  525: chat_msg_sent_bindings(Queue, JIDs) ->
  526:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs].
  527: 
  528: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  529:     [rabbit_binding()].
  530: chat_msg_recv_bindings(Queue, JIDs) ->
  531:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs].
  532: 
  533: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  534:     [rabbit_binding()].
  535: group_chat_msg_sent_bindings(Queue, JIDs) ->
  536:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)}
  537:      || JID <- JIDs].
  538: 
  539: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  540:     [rabbit_binding()].
  541: group_chat_msg_recv_bindings(Queue, JIDs) ->
  542:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)}
  543:      || JID <- JIDs].
  544: 
  545: -spec bind_queues_to_exchanges(Channel :: pid(),
  546:                                Bindings :: [rabbit_binding()]) ->
  547:     [amqp_client:amqp_method() | ok | blocked | closing].
  548: bind_queues_to_exchanges(Channel, Bindings) ->
  549:     [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings].
  550: 
  551: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) ->
  552:     amqp_client:amqp_method() | ok | blocked | closing.
  553: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) ->
  554:     #'queue.bind_ok'{} =
  555:         amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange,
  556:                                                  routing_key = RoutingKey,
  557:                                                  queue = Queue}).
  558: 
  559: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) ->
  560:                                      {ok, true} | {timeout, any()}.
  561: ensure_exchange_present(Connection, Exchange) ->
  562:     Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000,
  563:              sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL},
  564:     case mongoose_helper:wait_until(fun() ->
  565:                                             is_exchange_present(Connection,
  566:                                                                 Exchange)
  567:                                     end, true, Opts) of
  568:         {ok, true} -> true;
  569:         {timeout, _} ->
  570:             throw(io_lib:format("Exchange has not been created, exchange=~p",
  571:                                 [Exchange]))
  572:     end.
  573: 
  574: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean().
  575: is_exchange_present(Connection, {ExName, ExType}) ->
  576:     {ok, Channel} = amqp_connection:open_channel(Connection),
  577:     try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName,
  578:                                                        type = ExType,
  579:                                                        %% this option allows to
  580:                                                        %% check if an exchange exists
  581:                                                        passive = true}) of
  582:         {'exchange.declare_ok'} -> true
  583:     catch
  584:         _Error:_Reason -> false
  585:     end.
  586: 
  587: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok.
  588: subscribe_to_rabbit_queue(Channel, Queue) ->
  589:     amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
  590:                                                      no_ack = true}, self()),
  591:     receive
  592:         #'basic.consume_ok'{} -> ok
  593:     after
  594:         5000 ->
  595:             throw(io_lib:format("Timeout when subscribing to queue,"
  596:                                 "channel=~p, queue=~p", [Channel, Queue]))
  597:     end.
  598: 
  599: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  600:                            -> [[ok]] | term().
  601: send_presence_stanzas(Users, NumOfMsgs) ->
  602:     [send_presence_stanza(User, NumOfMsgs) || User <- Users].
  603: 
  604: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  605:                           -> [ok] | term().
  606: send_presence_stanza(User, NumOfMsgs) ->
  607:     [escalus:send(User, escalus_stanza:presence(make_pres_type(X)))
  608:      || X <- lists:seq(1, NumOfMsgs)].
  609: 
  610: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) ->
  611:     map() | no_return().
  612: get_decoded_message_from_rabbit(RoutingKey) ->
  613:     receive
  614:         {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
  615:             jiffy:decode(Msg, [return_maps])
  616:     after
  617:         5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p",
  618:                                    [RoutingKey]))
  619:     end.
  620: 
  621: %%--------------------------------------------------------------------
  622: %% Utils
  623: %%--------------------------------------------------------------------
  624: 
  625: start_mod_event_pusher_rabbit(Config) ->
  626:     dynamic_modules:start(domain(), mod_event_pusher_rabbit, Config).
  627: 
  628: stop_mod_event_pusher_rabbit() ->
  629:     dynamic_modules:stop(domain(), mod_event_pusher_rabbit).
  630: 
  631: start_rabbit_wpool(Host) ->
  632:     start_rabbit_wpool(Host, ?WPOOL_CFG).
  633: 
  634: start_rabbit_wpool(Host, WpoolConfig) ->
  635:     rpc(mim(), mongoose_wpool, ensure_started, []),
  636:     rpc(mim(), mongoose_wpool, start_configured_pools, [[WpoolConfig], [Host]]).
  637: 
  638: stop_rabbit_wpool({Pool, Host, Tag}) ->
  639:     rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]);
  640: stop_rabbit_wpool(Host)->
  641:     stop_rabbit_wpool({rabbit, Host, event_pusher}).
  642: 
  643: delete_exchanges() ->
  644:     ConnConf = connect_to_rabbit(),
  645:     Channel  = proplists:get_value(rabbit_channel, ConnConf),
  646:     [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex})
  647:      || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]],
  648:     close_rabbit_connection(ConnConf).
  649: 
  650: make_pres_type(X) when X rem 2 == 0 ->
  651:     <<"available">>;
  652: make_pres_type(_) ->
  653:     <<"unavailable">>.
  654: 
  655: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC).
  656: 
  657: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC).
  658: 
  659: group_chat_msg_sent_rk(JID) ->
  660:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC).
  661: 
  662: group_chat_msg_recv_rk(JID) ->
  663:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC).
  664: 
  665: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>.
  666: 
  667: client_lower_short_jid(Client) ->
  668:     escalus_utils:jid_to_lower(escalus_client:short_jid(Client)).
  669: 
  670: client_lower_full_jid(Client) ->
  671:     escalus_utils:jid_to_lower(escalus_client:full_jid(Client)).
  672: 
  673: nick(User) -> escalus_utils:get_username(User).
  674: 
  675: maybe_prepare_muc(TestCase, Config) when
  676:       TestCase == group_chat_message_sent_event orelse
  677:       TestCase == group_chat_message_received_event orelse
  678:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  679:       TestCase == group_chat_message_received_event_properly_formatted ->
  680:     prepare_muc(Config);
  681: maybe_prepare_muc(_, Config) -> Config.
  682: 
  683: maybe_cleanup_muc(TestCase, Config) when
  684:       TestCase == group_chat_message_sent_event orelse
  685:       TestCase == group_chat_message_received_event orelse
  686:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  687:       TestCase == group_chat_message_received_event_properly_formatted ->
  688:     cleanup_muc(Config);
  689: maybe_cleanup_muc(_, _) -> ok.
  690: 
  691: prepare_muc(Config) ->
  692:     [User | _] = ?config(escalus_users, Config),
  693:     muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>,
  694:                           [{persistent, true},
  695:                            {anonymous, false}]).
  696: 
  697: cleanup_muc(Config) ->
  698:     muc_helper:destroy_room(Config).
  699: 
  700: user_room_jid(RoomJID, UserJID) ->
  701:     Nick = nick(UserJID),
  702:     <<RoomJID/binary, "/", Nick/binary>>.
  703: 
  704: extend_config_with_exchange_type(ExType) ->
  705:     lists:map(fun({Ex, Opts}) when
  706:                         Ex == presence_exchange orelse
  707:                         Ex == chat_msg_exchange orelse
  708:                         Ex == groupchat_msg_exchange ->
  709:                       {Ex, Opts ++ [{type, ExType}]};
  710:                  (Other) -> Other
  711:               end, ?MOD_EVENT_PUSHER_RABBIT_CFG).
  712: 
  713: is_rabbitmq_available() ->
  714:     try amqp_connection:start(#amqp_params_network{}) of
  715:         {ok, Conn} ->
  716:             amqp_connection:close(Conn),
  717:             true;
  718:         {error, econnrefused} ->
  719:             false
  720:     catch
  721:         _Err ->
  722:             false
  723:     end.