1: %%%-------------------------------------------------------------------
    2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com>
    3: %%% @copyright (C) 2018, Kacper Mentel
    4: %%% @doc
    5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`).
    6: %%% @end
    7: %%%-------------------------------------------------------------------
    8: -module(mod_event_pusher_rabbit_SUITE).
    9: 
   10: -include_lib("common_test/include/ct.hrl").
   11: -include_lib("eunit/include/eunit.hrl").
   12: -include_lib("amqp_client/include/amqp_client.hrl").
   13: 
   14: -include("assert_received_match.hrl").
   15: 
   16: -import(distributed_helper, [mim/0, rpc/4]).
   17: -import(domain_helper, [domain/0]).
   18: -import(config_parser_helper, [config/2]).
   19: 
   20: -export([suite/0, all/0, groups/0]).
   21: -export([init_per_suite/1, end_per_suite/1,
   22:          init_per_group/2, end_per_group/2,
   23:          init_per_testcase/2, end_per_testcase/2]).
   24: 
   25: -export([rabbit_pool_starts_with_default_config/1,
   26:          exchanges_are_created_on_module_startup/1]).
   27: -export([connected_users_push_presence_events_when_change_status/1,
   28:          presence_messages_are_properly_formatted/1]).
   29: -export([chat_message_sent_event/1,
   30:          chat_message_sent_event_properly_formatted/1,
   31:          chat_message_received_event/1,
   32:          chat_message_received_event_properly_formatted/1]).
   33: -export([group_chat_message_sent_event/1,
   34:          group_chat_message_sent_event_properly_formatted/1,
   35:          group_chat_message_received_event/1,
   36:          group_chat_message_received_event_properly_formatted/1]).
   37: 
   38: -define(QUEUE_NAME, <<"test_queue">>).
   39: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>).
   40: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>).
   41: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>).
   42: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>).
   43: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>).
   44: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>).
   45: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>).
   46: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>).
   47: -define(WPOOL_CFG, #{type => rabbit, scope => host, tag => event_pusher,
   48:                      opts => #{workers => 20, strategy => best_worker, call_timeout => 5000},
   49:                      conn_opts => #{confirms_enabled => false,
   50:                                     amqp_host => "localhost",
   51:                                     amqp_port => 5672,
   52:                                     amqp_username => "guest",
   53:                                     amqp_password => "guest",
   54:                                     max_worker_queue_len => 1000}
   55: }).
   56: -define(IF_EXCHANGE_EXISTS_RETRIES, 30).
   57: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms
   58: 
   59: -type rabbit_binding() :: {Queue :: binary(),
   60:                            Exchange :: binary(),
   61:                            RoutingKey :: binary()}.
   62: 
   63: %%--------------------------------------------------------------------
   64: %% Suite configuration
   65: %%--------------------------------------------------------------------
   66: 
   67: all() ->
   68:     [
   69:      {group, pool_startup},
   70:      {group, module_startup},
   71:      {group, presence_status_publish},
   72:      {group, chat_message_publish},
   73:      {group, group_chat_message_publish}
   74:     ].
   75: 
   76: groups() ->
   77:     G = [
   78:          {pool_startup, [],
   79:           [
   80:            rabbit_pool_starts_with_default_config
   81:           ]},
   82:          {module_startup, [],
   83:           [
   84:            exchanges_are_created_on_module_startup
   85:           ]},
   86:          {presence_status_publish, [],
   87:           [
   88:            connected_users_push_presence_events_when_change_status,
   89:            presence_messages_are_properly_formatted
   90:           ]},
   91:          {chat_message_publish, [],
   92:           [
   93:            chat_message_sent_event,
   94:            chat_message_sent_event_properly_formatted,
   95:            chat_message_received_event,
   96:            chat_message_received_event_properly_formatted
   97:           ]},
   98:          {group_chat_message_publish, [],
   99:           [
  100:            group_chat_message_sent_event,
  101:            group_chat_message_sent_event_properly_formatted,
  102:            group_chat_message_received_event,
  103:            group_chat_message_received_event_properly_formatted
  104:           ]}
  105:         ],
  106:     ct_helper:repeat_all_until_all_ok(G).
  107: 
  108: suite() ->
  109:     escalus:suite().
  110: 
  111: %%--------------------------------------------------------------------
  112: %% Init & teardown
  113: %%--------------------------------------------------------------------
  114: 
  115: init_per_suite(Config) ->
  116:     start_rabbit_wpool(domain()),
  117:     {ok, _} = application:ensure_all_started(amqp_client),
  118:     case is_rabbitmq_available() of
  119:         true ->
  120:             muc_helper:load_muc(),
  121:             escalus:init_per_suite(Config);
  122:         false ->
  123:             {skip, "RabbitMQ server is not available on default port."}
  124:     end.
  125: 
  126: end_per_suite(Config) ->
  127:     stop_rabbit_wpool(domain()),
  128:     escalus_fresh:clean(),
  129:     muc_helper:unload_muc(),
  130:     escalus:end_per_suite(Config).
  131: 
  132: init_per_group(GroupName, Config0) ->
  133:     Domain = domain(),
  134:     Config = dynamic_modules:save_modules(Domain, Config0),
  135:     dynamic_modules:ensure_modules(Domain, required_modules(GroupName)),
  136:     Config.
  137: 
  138: required_modules(pool_startup) ->
  139:     [{mod_event_pusher, stopped}];
  140: required_modules(GroupName) ->
  141:     [{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}].
  142: 
  143: mod_event_pusher_rabbit_opts(GroupName) ->
  144:     ExtraOpts = extra_exchange_opts(GroupName),
  145:     maps:map(fun(Key, Opts) ->
  146:                      config([modules, mod_event_pusher, rabbit, Key], maps:merge(Opts, ExtraOpts))
  147:              end, basic_exchanges()).
  148: 
  149: basic_exchanges() ->
  150:     #{presence_exchange => #{name => ?PRESENCE_EXCHANGE},
  151:       chat_msg_exchange => #{name => ?CHAT_MSG_EXCHANGE,
  152:                              sent_topic => ?CHAT_MSG_SENT_TOPIC,
  153:                              recv_topic => ?CHAT_MSG_RECV_TOPIC},
  154:       groupchat_msg_exchange => #{name => ?GROUP_CHAT_MSG_EXCHANGE,
  155:                                   sent_topic => ?GROUP_CHAT_MSG_SENT_TOPIC,
  156:                                   recv_topic => ?GROUP_CHAT_MSG_RECV_TOPIC}
  157:      }.
  158: 
  159: extra_exchange_opts(module_startup) -> #{type => <<"headers">>};
  160: extra_exchange_opts(_) -> #{}.
  161: 
  162: end_per_group(_, Config) ->
  163:     delete_exchanges(),
  164:     dynamic_modules:restore_modules(Config).
  165: 
  166: init_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
  167:     Config;
  168: init_per_testcase(CaseName, Config0) ->
  169:     Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
  170:     Config2 = maybe_prepare_muc(CaseName, Config1),
  171:     Config = Config2 ++ connect_to_rabbit(),
  172:     escalus:init_per_testcase(CaseName, Config).
  173: 
  174: end_per_testcase(rabbit_pool_starts_with_default_config, _Config) ->
  175:     ok;
  176: end_per_testcase(CaseName, Config) ->
  177:     maybe_cleanup_muc(CaseName, Config),
  178:     close_rabbit_connection(Config),
  179:     escalus:end_per_testcase(CaseName, Config).
  180: 
  181: %%--------------------------------------------------------------------
  182: %% GROUP initialization_on_startup
  183: %%--------------------------------------------------------------------
  184: 
  185: rabbit_pool_starts_with_default_config(_Config) ->
  186:     %% GIVEN
  187:     Domain = domain(),
  188:     DefaultWpoolConfig = #{type => rabbit, scope => host, tag => rabbit_event_pusher_default,
  189:                            opts => #{workers => 10, strategy => best_worker, call_timeout => 5000},
  190:                            conn_opts => #{amqp_port => 5672, confirms_enabled => false, max_worker_queue_len => 1000}},
  191:     RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default},
  192:     %% WHEN
  193:     start_rabbit_wpool(Domain, DefaultWpoolConfig),
  194:     %% THEN
  195:     Pools = rpc(mim(), mongoose_wpool, get_pools, []),
  196:     ?assertMatch(RabbitWpool,
  197:                  lists:keyfind(rabbit_event_pusher_default, 3, Pools)),
  198:     %% CLEANUP
  199:     stop_rabbit_wpool(RabbitWpool).
  200: 
  201: exchanges_are_created_on_module_startup(Config) ->
  202:     %% GIVEN module is started with custom exchange types
  203:     Connection = proplists:get_value(rabbit_connection, Config),
  204:     ExCustomType = <<"headers">>,
  205:     Exchanges = [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE],
  206:     %% THEN exchanges are created
  207:     [?assert(ensure_exchange_present(Connection, {Exchange, ExCustomType}))
  208:      || Exchange <- Exchanges].
  209: 
  210: %%--------------------------------------------------------------------
  211: %% GROUP presence_status_publish
  212: %%--------------------------------------------------------------------
  213: 
  214: connected_users_push_presence_events_when_change_status(Config) ->
  215:     escalus:story(
  216:       Config, [{bob, 1}],
  217:       fun(Bob) ->
  218:               %% GIVEN
  219:               BobJID = client_lower_short_jid(Bob),
  220:               listen_to_presence_events_from_rabbit([BobJID], Config),
  221:               %% WHEN users generate some traffic.
  222:               send_presence_stanzas([Bob], 1),
  223:               %% THEN  wait for presence events from Rabbit.
  224:               ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID},
  225:                                     #amqp_msg{}}, timer:seconds(5))
  226:       end).
  227: 
  228: presence_messages_are_properly_formatted(Config) ->
  229:     escalus:story(
  230:       Config, [{bob, 1}],
  231:       fun(Bob) ->
  232:               %% GIVEN
  233:               BobJID = client_lower_short_jid(Bob),
  234:               BobFullJID = client_lower_full_jid(Bob),
  235:               listen_to_presence_events_from_rabbit([BobJID], Config),
  236:               %% WHEN user logout
  237:               escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
  238:               %% THEN receive message
  239:               ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false},
  240:                            get_decoded_message_from_rabbit(BobJID))
  241:       end).
  242: 
  243: %%--------------------------------------------------------------------
  244: %% GROUP chat_message_publish
  245: %%--------------------------------------------------------------------
  246: 
  247: chat_message_sent_event(Config) ->
  248:     escalus:story(
  249:       Config, [{bob, 1}, {alice, 1}],
  250:       fun(Bob, Alice) ->
  251:               %% GIVEN
  252:               BobJID = client_lower_short_jid(Bob),
  253:               BobChatMsgSentRK = chat_msg_sent_rk(BobJID),
  254:               listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config),
  255:               %% WHEN users chat
  256:               escalus:send(Bob,
  257:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  258:               %% THEN  wait for chat message sent events events from Rabbit.
  259:               ?assertReceivedMatch({#'basic.deliver'{
  260:                                        routing_key = BobChatMsgSentRK},
  261:                                     #amqp_msg{}}, timer:seconds(5))
  262:       end).
  263: 
  264: chat_message_received_event(Config) ->
  265:     escalus:story(
  266:       Config, [{bob, 1}, {alice, 1}],
  267:       fun(Bob, Alice) ->
  268:               %% GIVEN
  269:               AliceJID = client_lower_short_jid(Alice),
  270:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  271:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  272:               %% WHEN users chat
  273:               escalus:send(Bob,
  274:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  275:               escalus:wait_for_stanzas(Alice, 2),
  276:               %% THEN  wait for chat message received events events from
  277:               %% Rabbit.
  278:               ?assertReceivedMatch({#'basic.deliver'{
  279:                                        routing_key = AliceChatMsgRecvRK},
  280:                                     #amqp_msg{}}, timer:seconds(5))
  281:       end).
  282: 
  283: chat_message_sent_event_properly_formatted(Config) ->
  284:     escalus:story(
  285:       Config, [{bob, 1}, {alice, 1}],
  286:       fun(Bob, Alice) ->
  287:               %% GIVEN
  288:               AliceJID = client_lower_short_jid(Alice),
  289:               AliceFullJID = client_lower_full_jid(Alice),
  290:               BobFullJID = client_lower_full_jid(Bob),
  291:               AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID),
  292:               Message = <<"Hi Bob!">>,
  293:               listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config),
  294:               %% WHEN users chat
  295:               escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)),
  296:               %% THEN
  297:               ?assertMatch(#{<<"from_user_id">> := AliceFullJID,
  298:                              <<"to_user_id">> := BobFullJID,
  299:                              <<"message">> := Message},
  300:                            get_decoded_message_from_rabbit(AliceChatMsgSentRK))
  301:       end).
  302: 
  303: chat_message_received_event_properly_formatted(Config) ->
  304:     escalus:story(
  305:       Config, [{bob, 1}, {alice, 1}],
  306:       fun(Bob, Alice) ->
  307:               %% GIVEN
  308:               AliceJID = client_lower_short_jid(Alice),
  309:               AliceFullJID = client_lower_full_jid(Alice),
  310:               BobFullJID = client_lower_full_jid(Bob),
  311:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  312:               Message = <<"Hi Alice!">>,
  313:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  314:               %% WHEN users chat
  315:               escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)),
  316:               escalus:wait_for_stanzas(Alice, 2),
  317:               %% THEN
  318:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  319:                              <<"to_user_id">> := AliceFullJID,
  320:                              <<"message">> := Message},
  321:                            get_decoded_message_from_rabbit(AliceChatMsgRecvRK))
  322:       end).
  323: 
  324: %%--------------------------------------------------------------------
  325: %% GROUP group_message_publish
  326: %%--------------------------------------------------------------------
  327: 
  328: group_chat_message_sent_event(Config) ->
  329:     escalus:story(
  330:       Config, [{bob, 1}],
  331:       fun(Bob) ->
  332:               %% GIVEN
  333:               Room = ?config(room, Config),
  334:               RoomAddr = muc_helper:room_address(Room),
  335:               BobJID = client_lower_short_jid(Bob),
  336:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  337:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  338:               %% WHEN users chat
  339:               escalus:send(Bob,
  340:                            muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  341: 
  342:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  343:                                                             <<"Hi there!">>)),
  344: 
  345:               %% THEN  wait for chat message sent events events from Rabbit.
  346:               ?assertReceivedMatch({#'basic.deliver'{
  347:                                        routing_key = BobGroupChatMsgSentRK},
  348:                                     #amqp_msg{}}, timer:seconds(5))
  349:       end).
  350: 
  351: group_chat_message_received_event(Config) ->
  352:     escalus:story(
  353:       Config, [{bob, 1}, {alice, 1}],
  354:       fun(Bob, Alice) ->
  355:               %% GIVEN
  356:               Room = ?config(room, Config),
  357:               RoomAddr = muc_helper:room_address(Room),
  358:               AliceJID = client_lower_short_jid(Alice),
  359:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  360:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID],
  361:                                                                Config),
  362:               %% WHEN users chat
  363:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room,
  364:                                                                    nick(Alice))),
  365:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  366:                                                                  nick(Bob))),
  367: 
  368:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  369:                                                             <<"Hi there!">>)),
  370: 
  371:               %% THEN  wait for chat message received events events from Rabbit.
  372:               ?assertReceivedMatch({#'basic.deliver'{
  373:                                        routing_key = AliceGroupChatMsgRecvRK},
  374:                                     #amqp_msg{}}, timer:seconds(5))
  375:       end).
  376: 
  377: group_chat_message_sent_event_properly_formatted(Config) ->
  378:     escalus:story(
  379:       Config, [{bob, 1}],
  380:       fun(Bob) ->
  381:               %% GIVEN
  382:               Room = ?config(room, Config),
  383:               RoomAddr = muc_helper:room_address(Room),
  384:               BobJID = client_lower_short_jid(Bob),
  385:               BobFullJID = client_lower_full_jid(Bob),
  386:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  387:               Message = <<"Hi there!">>,
  388:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  389:               %% WHEN a user chat
  390:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  391:                                                                  nick(Bob))),
  392: 
  393:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  394:               %% THEN
  395:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  396:                              <<"to_user_id">> := RoomAddr,
  397:                              <<"message">> := Message},
  398:                            get_decoded_message_from_rabbit(BobGroupChatMsgSentRK))
  399:       end).
  400: 
  401: group_chat_message_received_event_properly_formatted(Config) ->
  402:     escalus:story(
  403:       Config, [{bob, 1}, {alice, 1}],
  404:       fun(Bob, Alice) ->
  405:               %% GIVEN basic variables
  406:               Room = ?config(room, Config),
  407:               RoomAddr = muc_helper:room_address(Room),
  408:               BobRoomJID = user_room_jid(RoomAddr, Bob),
  409:               AliceJID = client_lower_short_jid(Alice),
  410:               AliceFullJID = client_lower_full_jid(Alice),
  411:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  412:               Message = <<"Hi there!">>,
  413:               %% GIVEN users in room
  414:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))),
  415:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  416:               % wait for all room stanzas to be processed
  417:               escalus:wait_for_stanzas(Alice, 3),
  418:               escalus:wait_for_stanzas(Bob, 3),
  419:               %% GIVEN Room subscription to Rabbit
  420:               % We subscribe to RMQ now and not earlier to avoid messages other
  421:               % than the one we are testing, `Message` from Bob to Room, like
  422:               % for example affiliations and the like.
  423:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  424:               %% WHEN users chat
  425:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  426:               %% THEN
  427:               ?assertMatch(#{<<"from_user_id">> := BobRoomJID,
  428:                              <<"to_user_id">> := AliceFullJID,
  429:                              <<"message">> := Message},
  430:                            get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK))
  431:       end).
  432: 
  433: %%--------------------------------------------------------------------
  434: %% Test helpers
  435: %%--------------------------------------------------------------------
  436: 
  437: -spec connect_to_rabbit() -> proplists:proplist().
  438: connect_to_rabbit() ->
  439:     {ok, Connection} =
  440:         amqp_connection:start(#amqp_params_network{}),
  441:     {ok, Channel} = amqp_connection:open_channel(Connection),
  442:     [{rabbit_connection, Connection}, {rabbit_channel, Channel}].
  443: 
  444: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term().
  445: close_rabbit_connection(Config) ->
  446:     Connection = proplists:get_value(rabbit_connection, Config),
  447:     Channel = proplists:get_value(rabbit_channel, Config),
  448:     amqp_channel:close(Channel),
  449:     amqp_connection:close(Connection).
  450: 
  451: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()],
  452:                                             Config :: proplists:proplist()) ->
  453:     ok | term().
  454: listen_to_presence_events_from_rabbit(JIDs, Config) ->
  455:     QueueBindings = presence_bindings(?QUEUE_NAME, JIDs),
  456:     listen_to_events_from_rabbit(QueueBindings, Config).
  457: 
  458: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  459:                                                  Config :: proplists:proplist()) ->
  460:     ok | term().
  461: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  462:     QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  463:     listen_to_events_from_rabbit(QueueBindings, Config).
  464: 
  465: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  466:                                                  Config :: proplists:proplist()) ->
  467:     ok | term().
  468: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  469:     QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  470:     listen_to_events_from_rabbit(QueueBindings, Config).
  471: 
  472: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  473:                                                        Config :: proplists:proplist()) ->
  474:     ok | term().
  475: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  476:     QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  477:     listen_to_events_from_rabbit(QueueBindings, Config).
  478: 
  479: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  480:                                                        Config :: proplists:proplist()) ->
  481:     ok | term().
  482: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  483:     QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  484:     listen_to_events_from_rabbit(QueueBindings, Config).
  485: 
  486: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()],
  487:                                    Config :: proplists:proplist()) ->
  488:     ok | term().
  489: listen_to_events_from_rabbit(QueueBindings, Config) ->
  490:     Connection = proplists:get_value(rabbit_connection, Config),
  491:     Channel = proplists:get_value(rabbit_channel, Config),
  492:     declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME),
  493:     wait_for_exchanges_to_be_created(Connection,
  494:                                      [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  495:                                       {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  496:                                       {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]),
  497:     bind_queues_to_exchanges(Channel, QueueBindings),
  498:     subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME).
  499: 
  500: -spec wait_for_exchanges_to_be_created(Connection :: pid(),
  501:                                        Exchanges :: [binary()]) -> pid().
  502: wait_for_exchanges_to_be_created(Connection, Exchanges) ->
  503:     [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges],
  504:     ok.
  505: 
  506: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary().
  507: declare_temporary_rabbit_queue(Channel, Queue) ->
  508:     #'queue.declare_ok'{} =
  509:         amqp_channel:call(Channel, #'queue.declare'{queue = Queue,
  510:                                                     exclusive = true}).
  511: 
  512: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  513:     [rabbit_binding()].
  514: presence_bindings(Queue, JIDs) ->
  515:     [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs].
  516: 
  517: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  518:     [rabbit_binding()].
  519: chat_msg_sent_bindings(Queue, JIDs) ->
  520:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs].
  521: 
  522: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  523:     [rabbit_binding()].
  524: chat_msg_recv_bindings(Queue, JIDs) ->
  525:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs].
  526: 
  527: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  528:     [rabbit_binding()].
  529: group_chat_msg_sent_bindings(Queue, JIDs) ->
  530:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)}
  531:      || JID <- JIDs].
  532: 
  533: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  534:     [rabbit_binding()].
  535: group_chat_msg_recv_bindings(Queue, JIDs) ->
  536:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)}
  537:      || JID <- JIDs].
  538: 
  539: -spec bind_queues_to_exchanges(Channel :: pid(),
  540:                                Bindings :: [rabbit_binding()]) ->
  541:     [amqp_client:amqp_method() | ok | blocked | closing].
  542: bind_queues_to_exchanges(Channel, Bindings) ->
  543:     [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings].
  544: 
  545: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) ->
  546:     amqp_client:amqp_method() | ok | blocked | closing.
  547: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) ->
  548:     #'queue.bind_ok'{} =
  549:         amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange,
  550:                                                  routing_key = RoutingKey,
  551:                                                  queue = Queue}).
  552: 
  553: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) ->
  554:                                      {ok, true} | {timeout, any()}.
  555: ensure_exchange_present(Connection, Exchange) ->
  556:     Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000,
  557:              sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL},
  558:     case mongoose_helper:wait_until(fun() ->
  559:                                             is_exchange_present(Connection,
  560:                                                                 Exchange)
  561:                                     end, true, Opts) of
  562:         {ok, true} -> true;
  563:         {timeout, _} ->
  564:             throw(io_lib:format("Exchange has not been created, exchange=~p",
  565:                                 [Exchange]))
  566:     end.
  567: 
  568: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean().
  569: is_exchange_present(Connection, {ExName, ExType}) ->
  570:     {ok, Channel} = amqp_connection:open_channel(Connection),
  571:     try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName,
  572:                                                        type = ExType,
  573:                                                        %% this option allows to
  574:                                                        %% check if an exchange exists
  575:                                                        passive = true}) of
  576:         {'exchange.declare_ok'} -> true
  577:     catch
  578:         _Error:_Reason -> false
  579:     end.
  580: 
  581: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok.
  582: subscribe_to_rabbit_queue(Channel, Queue) ->
  583:     amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
  584:                                                      no_ack = true}, self()),
  585:     receive
  586:         #'basic.consume_ok'{} -> ok
  587:     after
  588:         5000 ->
  589:             throw(io_lib:format("Timeout when subscribing to queue,"
  590:                                 "channel=~p, queue=~p", [Channel, Queue]))
  591:     end.
  592: 
  593: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  594:                            -> [[ok]] | term().
  595: send_presence_stanzas(Users, NumOfMsgs) ->
  596:     [send_presence_stanza(User, NumOfMsgs) || User <- Users].
  597: 
  598: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  599:                           -> [ok] | term().
  600: send_presence_stanza(User, NumOfMsgs) ->
  601:     [escalus:send(User, escalus_stanza:presence(make_pres_type(X)))
  602:      || X <- lists:seq(1, NumOfMsgs)].
  603: 
  604: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) ->
  605:     map() | no_return().
  606: get_decoded_message_from_rabbit(RoutingKey) ->
  607:     receive
  608:         {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
  609:             jiffy:decode(Msg, [return_maps])
  610:     after
  611:         5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p",
  612:                                    [RoutingKey]))
  613:     end.
  614: 
  615: %%--------------------------------------------------------------------
  616: %% Utils
  617: %%--------------------------------------------------------------------
  618: 
  619: start_rabbit_wpool(Host) ->
  620:     start_rabbit_wpool(Host, ?WPOOL_CFG).
  621: 
  622: start_rabbit_wpool(Host, WpoolConfig) ->
  623:     rpc(mim(), mongoose_wpool, ensure_started, []),
  624:     rpc(mim(), mongoose_wpool, start_configured_pools, [[WpoolConfig], [Host]]).
  625: 
  626: stop_rabbit_wpool({Pool, Host, Tag}) ->
  627:     rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]);
  628: stop_rabbit_wpool(Host)->
  629:     stop_rabbit_wpool({rabbit, Host, event_pusher}).
  630: 
  631: delete_exchanges() ->
  632:     ConnConf = connect_to_rabbit(),
  633:     Channel  = proplists:get_value(rabbit_channel, ConnConf),
  634:     [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex})
  635:      || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]],
  636:     close_rabbit_connection(ConnConf).
  637: 
  638: make_pres_type(X) when X rem 2 == 0 ->
  639:     <<"available">>;
  640: make_pres_type(_) ->
  641:     <<"unavailable">>.
  642: 
  643: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC).
  644: 
  645: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC).
  646: 
  647: group_chat_msg_sent_rk(JID) ->
  648:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC).
  649: 
  650: group_chat_msg_recv_rk(JID) ->
  651:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC).
  652: 
  653: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>.
  654: 
  655: client_lower_short_jid(Client) ->
  656:     escalus_utils:jid_to_lower(escalus_client:short_jid(Client)).
  657: 
  658: client_lower_full_jid(Client) ->
  659:     escalus_utils:jid_to_lower(escalus_client:full_jid(Client)).
  660: 
  661: nick(User) -> escalus_utils:get_username(User).
  662: 
  663: maybe_prepare_muc(TestCase, Config) when
  664:       TestCase == group_chat_message_sent_event orelse
  665:       TestCase == group_chat_message_received_event orelse
  666:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  667:       TestCase == group_chat_message_received_event_properly_formatted ->
  668:     prepare_muc(Config);
  669: maybe_prepare_muc(_, Config) -> Config.
  670: 
  671: maybe_cleanup_muc(TestCase, Config) when
  672:       TestCase == group_chat_message_sent_event orelse
  673:       TestCase == group_chat_message_received_event orelse
  674:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  675:       TestCase == group_chat_message_received_event_properly_formatted ->
  676:     cleanup_muc(Config);
  677: maybe_cleanup_muc(_, _) -> ok.
  678: 
  679: prepare_muc(Config) ->
  680:     [User | _] = ?config(escalus_users, Config),
  681:     muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>,
  682:                           [{persistent, true},
  683:                            {anonymous, false}]).
  684: 
  685: cleanup_muc(Config) ->
  686:     muc_helper:destroy_room(Config).
  687: 
  688: user_room_jid(RoomJID, UserJID) ->
  689:     Nick = nick(UserJID),
  690:     <<RoomJID/binary, "/", Nick/binary>>.
  691: 
  692: is_rabbitmq_available() ->
  693:     try amqp_connection:start(#amqp_params_network{}) of
  694:         {ok, Conn} ->
  695:             amqp_connection:close(Conn),
  696:             true;
  697:         {error, econnrefused} ->
  698:             false
  699:     catch
  700:         _Err ->
  701:             false
  702:     end.