1: %%%-------------------------------------------------------------------
    2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com>
    3: %%% @copyright (C) 2018, Kacper Mentel
    4: %%% @doc
    5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`).
    6: %%% @end
    7: %%%-------------------------------------------------------------------
    8: -module(mod_event_pusher_rabbit_SUITE).
    9: 
   10: -include_lib("escalus/include/escalus.hrl").
   11: -include_lib("common_test/include/ct.hrl").
   12: -include_lib("eunit/include/eunit.hrl").
   13: -include_lib("escalus/include/escalus_xmlns.hrl").
   14: -include_lib("exml/include/exml.hrl").
   15: -include_lib("amqp_client/include/amqp_client.hrl").
   16: 
   17: -include("assert_received_match.hrl").
   18: 
   19: -import(distributed_helper, [mim/0,
   20:                              rpc/4]).
   21: 
   22: -import(domain_helper, [domain/0]).
   23: 
   24: -export([suite/0, all/0, groups/0]).
   25: -export([init_per_suite/1, end_per_suite/1,
   26:          init_per_group/2, end_per_group/2,
   27:          init_per_testcase/2, end_per_testcase/2]).
   28: 
   29: -export([rabbit_pool_starts_with_default_config/1,
   30:          exchanges_are_created_on_module_startup/1]).
   31: -export([connected_users_push_presence_events_when_change_status/1,
   32:          presence_messages_are_properly_formatted/1]).
   33: -export([chat_message_sent_event/1,
   34:          chat_message_sent_event_properly_formatted/1,
   35:          chat_message_received_event/1,
   36:          chat_message_received_event_properly_formatted/1]).
   37: -export([group_chat_message_sent_event/1,
   38:          group_chat_message_sent_event_properly_formatted/1,
   39:          group_chat_message_received_event/1,
   40:          group_chat_message_received_event_properly_formatted/1]).
   41: 
   42: -define(QUEUE_NAME, <<"test_queue">>).
   43: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>).
   44: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>).
   45: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>).
   46: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>).
   47: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>).
   48: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>).
   49: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>).
   50: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>).
   51: -define(MOD_EVENT_PUSHER_RABBIT_CFG,
   52:         [{presence_exchange, [{name, ?PRESENCE_EXCHANGE}]},
   53:          {chat_msg_exchange, [{name, ?CHAT_MSG_EXCHANGE},
   54:                               {sent_topic, ?CHAT_MSG_SENT_TOPIC},
   55:                               {recv_topic, ?CHAT_MSG_RECV_TOPIC}]},
   56:          {groupchat_msg_exchange, [{name, ?GROUP_CHAT_MSG_EXCHANGE},
   57:                                    {sent_topic, ?GROUP_CHAT_MSG_SENT_TOPIC},
   58:                                    {recv_topic, ?GROUP_CHAT_MSG_RECV_TOPIC}]}
   59:         ]).
   60: -define(MOD_EVENT_PUSHER_CFG, [{backends,
   61:                                 [{rabbit, ?MOD_EVENT_PUSHER_RABBIT_CFG}]}]).
   62: -define(WPOOL_CFG, #{type => rabbit, scope => host, tag => event_pusher,
   63:                      opts => #{workers => 20, strategy => best_worker, call_timeout => 5000},
   64:                      conn_opts => #{confirms_enabled => false,
   65:                                     amqp_host => "localhost",
   66:                                     amqp_port => 5672,
   67:                                     amqp_username => "guest",
   68:                                     amqp_password => "guest",
   69:                                     max_worker_queue_len => 1000}
   70: }).
   71: -define(IF_EXCHANGE_EXISTS_RETRIES, 30).
   72: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms
   73: 
   74: -type rabbit_binding() :: {Queue :: binary(),
   75:                            Exchange :: binary(),
   76:                            RoutingKey :: binary()}.
   77: 
   78: %%--------------------------------------------------------------------
   79: %% Suite configuration
   80: %%--------------------------------------------------------------------
   81: 
   82: all() ->
   83:     [
   84:      {group, initialization_on_startup},
   85:      {group, presence_status_publish},
   86:      {group, chat_message_publish},
   87:      {group, group_chat_message_publish}
   88:     ].
   89: 
   90: groups() ->
   91:     G = [
   92:          {initialization_on_startup, [],
   93:           [
   94:            rabbit_pool_starts_with_default_config,
   95:            exchanges_are_created_on_module_startup
   96:           ]},
   97:          {presence_status_publish, [],
   98:           [
   99:            connected_users_push_presence_events_when_change_status,
  100:            presence_messages_are_properly_formatted
  101:           ]},
  102:          {chat_message_publish, [],
  103:           [
  104:            chat_message_sent_event,
  105:            chat_message_sent_event_properly_formatted,
  106:            chat_message_received_event,
  107:            chat_message_received_event_properly_formatted
  108:           ]},
  109:          {group_chat_message_publish, [],
  110:           [
  111:            group_chat_message_sent_event,
  112:            group_chat_message_sent_event_properly_formatted,
  113:            group_chat_message_received_event,
  114:            group_chat_message_received_event_properly_formatted
  115:           ]}
  116:         ],
  117:     ct_helper:repeat_all_until_all_ok(G).
  118: 
  119: suite() ->
  120:     escalus:suite().
  121: 
  122: %%--------------------------------------------------------------------
  123: %% Init & teardown
  124: %%--------------------------------------------------------------------
  125: 
  126: init_per_suite(Config) ->
  127:     start_rabbit_wpool(domain()),
  128:     {ok, _} = application:ensure_all_started(amqp_client),
  129:     case is_rabbitmq_available() of
  130:         true ->
  131:             muc_helper:load_muc(),
  132:             escalus:init_per_suite(Config);
  133:         false ->
  134:             {skip, "RabbitMQ server is not available on default port."}
  135:     end.
  136: 
  137: end_per_suite(Config) ->
  138:     stop_rabbit_wpool(domain()),
  139:     escalus_fresh:clean(),
  140:     muc_helper:unload_muc(),
  141:     escalus:end_per_suite(Config).
  142: 
  143: init_per_group(initialization_on_startup, Config) ->
  144:     Config;
  145: init_per_group(_, Config0) ->
  146:     Domain = domain(),
  147:     Config = dynamic_modules:save_modules(Domain, Config0),
  148:     dynamic_modules:ensure_modules(Domain,
  149:                                    [{mod_event_pusher, ?MOD_EVENT_PUSHER_CFG}]),
  150:     Config.
  151: 
  152: end_per_group(initialization_on_startup, Config) ->
  153:     Config;
  154: end_per_group(_, Config) ->
  155:     delete_exchanges(),
  156:     Domain = domain(),
  157:     dynamic_modules:stop(Domain, mod_event_pusher),
  158:     dynamic_modules:restore_modules(Config),
  159:     escalus:delete_users(Config, escalus:get_users([bob, alice])).
  160: 
  161: init_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
  162:     Config;
  163: init_per_testcase(CaseName, Config0) ->
  164:     Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
  165:     Config2 = maybe_prepare_muc(CaseName, Config1),
  166:     Config = Config2 ++ connect_to_rabbit(),
  167:     escalus:init_per_testcase(CaseName, Config).
  168: 
  169: end_per_testcase(rabbit_pool_starts_with_default_config, Config) ->
  170:     Config;
  171: end_per_testcase(exchanges_are_created_on_module_startup, Config) ->
  172:     delete_exchanges(),
  173:     stop_mod_event_pusher_rabbit(),
  174:     close_rabbit_connection(Config),
  175:     Config;
  176: end_per_testcase(CaseName, Config) ->
  177:     maybe_cleanup_muc(CaseName, Config),
  178:     close_rabbit_connection(Config),
  179:     escalus:end_per_testcase(CaseName, Config).
  180: 
  181: 
  182: %%--------------------------------------------------------------------
  183: %% GROUP initialization_on_startup
  184: %%--------------------------------------------------------------------
  185: 
  186: rabbit_pool_starts_with_default_config(_Config) ->
  187:     %% GIVEN
  188:     Domain = domain(),
  189:     DefaultWpoolConfig = #{type => rabbit, scope => host, tag => rabbit_event_pusher_default,
  190:                            opts => #{workers => 10, strategy => best_worker, call_timeout => 5000},
  191:                            conn_opts => #{amqp_port => 5672, confirms_enabled => false, max_worker_queue_len => 1000}},
  192:     RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default},
  193:     %% WHEN
  194:     start_rabbit_wpool(Domain, DefaultWpoolConfig),
  195:     %% THEN
  196:     Pools = rpc(mim(), mongoose_wpool, get_pools, []),
  197:     ?assertMatch(RabbitWpool,
  198:                  lists:keyfind(rabbit_event_pusher_default, 3, Pools)),
  199:     %% CLEANUP
  200:     stop_rabbit_wpool(RabbitWpool).
  201: 
  202: exchanges_are_created_on_module_startup(Config) ->
  203:     %% GIVEN
  204:     Connection = proplists:get_value(rabbit_connection, Config),
  205:     ExCustomType = <<"headers">>,
  206:     Exchanges = [{?PRESENCE_EXCHANGE, ExCustomType},
  207:                  {?CHAT_MSG_EXCHANGE, ExCustomType},
  208:                  {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType}],
  209:     ConfigWithCustomExchangeType =
  210:         extend_config_with_exchange_type(ExCustomType),
  211:     %% WHEN
  212:     start_mod_event_pusher_rabbit(ConfigWithCustomExchangeType),
  213:     %% THEN exchanges are created
  214:     [?assert(ensure_exchange_present(Connection, Exchange))
  215:      || Exchange <- Exchanges].
  216: 
  217: %%--------------------------------------------------------------------
  218: %% GROUP presence_status_publish
  219: %%--------------------------------------------------------------------
  220: 
  221: connected_users_push_presence_events_when_change_status(Config) ->
  222:     escalus:story(
  223:       Config, [{bob, 1}],
  224:       fun(Bob) ->
  225:               %% GIVEN
  226:               BobJID = client_lower_short_jid(Bob),
  227:               listen_to_presence_events_from_rabbit([BobJID], Config),
  228:               %% WHEN users generate some traffic.
  229:               send_presence_stanzas([Bob], 1),
  230:               %% THEN  wait for presence events from Rabbit.
  231:               ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID},
  232:                                     #amqp_msg{}}, timer:seconds(5))
  233:       end).
  234: 
  235: presence_messages_are_properly_formatted(Config) ->
  236:     escalus:story(
  237:       Config, [{bob, 1}],
  238:       fun(Bob) ->
  239:               %% GIVEN
  240:               BobJID = client_lower_short_jid(Bob),
  241:               BobFullJID = client_lower_full_jid(Bob),
  242:               listen_to_presence_events_from_rabbit([BobJID], Config),
  243:               %% WHEN user logout
  244:               escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
  245:               %% THEN receive message
  246:               ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false},
  247:                            get_decoded_message_from_rabbit(BobJID))
  248:       end).
  249: 
  250: %%--------------------------------------------------------------------
  251: %% GROUP chat_message_publish
  252: %%--------------------------------------------------------------------
  253: 
  254: chat_message_sent_event(Config) ->
  255:     escalus:story(
  256:       Config, [{bob, 1}, {alice, 1}],
  257:       fun(Bob, Alice) ->
  258:               %% GIVEN
  259:               BobJID = client_lower_short_jid(Bob),
  260:               BobChatMsgSentRK = chat_msg_sent_rk(BobJID),
  261:               listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config),
  262:               %% WHEN users chat
  263:               escalus:send(Bob,
  264:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  265:               %% THEN  wait for chat message sent events events from Rabbit.
  266:               ?assertReceivedMatch({#'basic.deliver'{
  267:                                        routing_key = BobChatMsgSentRK},
  268:                                     #amqp_msg{}}, timer:seconds(5))
  269:       end).
  270: 
  271: chat_message_received_event(Config) ->
  272:     escalus:story(
  273:       Config, [{bob, 1}, {alice, 1}],
  274:       fun(Bob, Alice) ->
  275:               %% GIVEN
  276:               AliceJID = client_lower_short_jid(Alice),
  277:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  278:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  279:               %% WHEN users chat
  280:               escalus:send(Bob,
  281:                            escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)),
  282:               escalus:wait_for_stanzas(Alice, 2),
  283:               %% THEN  wait for chat message received events events from
  284:               %% Rabbit.
  285:               ?assertReceivedMatch({#'basic.deliver'{
  286:                                        routing_key = AliceChatMsgRecvRK},
  287:                                     #amqp_msg{}}, timer:seconds(5))
  288:       end).
  289: 
  290: chat_message_sent_event_properly_formatted(Config) ->
  291:     escalus:story(
  292:       Config, [{bob, 1}, {alice, 1}],
  293:       fun(Bob, Alice) ->
  294:               %% GIVEN
  295:               AliceJID = client_lower_short_jid(Alice),
  296:               AliceFullJID = client_lower_full_jid(Alice),
  297:               BobFullJID = client_lower_full_jid(Bob),
  298:               AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID),
  299:               Message = <<"Hi Bob!">>,
  300:               listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config),
  301:               %% WHEN users chat
  302:               escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)),
  303:               %% THEN
  304:               ?assertMatch(#{<<"from_user_id">> := AliceFullJID,
  305:                              <<"to_user_id">> := BobFullJID,
  306:                              <<"message">> := Message},
  307:                            get_decoded_message_from_rabbit(AliceChatMsgSentRK))
  308:       end).
  309: 
  310: chat_message_received_event_properly_formatted(Config) ->
  311:     escalus:story(
  312:       Config, [{bob, 1}, {alice, 1}],
  313:       fun(Bob, Alice) ->
  314:               %% GIVEN
  315:               AliceJID = client_lower_short_jid(Alice),
  316:               AliceFullJID = client_lower_full_jid(Alice),
  317:               BobFullJID = client_lower_full_jid(Bob),
  318:               AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID),
  319:               Message = <<"Hi Alice!">>,
  320:               listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  321:               %% WHEN users chat
  322:               escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)),
  323:               escalus:wait_for_stanzas(Alice, 2),
  324:               %% THEN
  325:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  326:                              <<"to_user_id">> := AliceFullJID,
  327:                              <<"message">> := Message},
  328:                            get_decoded_message_from_rabbit(AliceChatMsgRecvRK))
  329:       end).
  330: 
  331: %%--------------------------------------------------------------------
  332: %% GROUP group_message_publish
  333: %%--------------------------------------------------------------------
  334: 
  335: group_chat_message_sent_event(Config) ->
  336:     escalus:story(
  337:       Config, [{bob, 1}],
  338:       fun(Bob) ->
  339:               %% GIVEN
  340:               Room = ?config(room, Config),
  341:               RoomAddr = muc_helper:room_address(Room),
  342:               BobJID = client_lower_short_jid(Bob),
  343:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  344:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  345:               %% WHEN users chat
  346:               escalus:send(Bob,
  347:                            muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  348: 
  349:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  350:                                                             <<"Hi there!">>)),
  351: 
  352:               %% THEN  wait for chat message sent events events from Rabbit.
  353:               ?assertReceivedMatch({#'basic.deliver'{
  354:                                        routing_key = BobGroupChatMsgSentRK},
  355:                                     #amqp_msg{}}, timer:seconds(5))
  356:       end).
  357: 
  358: group_chat_message_received_event(Config) ->
  359:     escalus:story(
  360:       Config, [{bob, 1}, {alice, 1}],
  361:       fun(Bob, Alice) ->
  362:               %% GIVEN
  363:               Room = ?config(room, Config),
  364:               RoomAddr = muc_helper:room_address(Room),
  365:               AliceJID = client_lower_short_jid(Alice),
  366:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  367:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID],
  368:                                                                Config),
  369:               %% WHEN users chat
  370:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room,
  371:                                                                    nick(Alice))),
  372:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  373:                                                                  nick(Bob))),
  374: 
  375:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr,
  376:                                                             <<"Hi there!">>)),
  377: 
  378:               %% THEN  wait for chat message received events events from Rabbit.
  379:               ?assertReceivedMatch({#'basic.deliver'{
  380:                                        routing_key = AliceGroupChatMsgRecvRK},
  381:                                     #amqp_msg{}}, timer:seconds(5))
  382:       end).
  383: 
  384: group_chat_message_sent_event_properly_formatted(Config) ->
  385:     escalus:story(
  386:       Config, [{bob, 1}],
  387:       fun(Bob) ->
  388:               %% GIVEN
  389:               Room = ?config(room, Config),
  390:               RoomAddr = muc_helper:room_address(Room),
  391:               BobJID = client_lower_short_jid(Bob),
  392:               BobFullJID = client_lower_full_jid(Bob),
  393:               BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID),
  394:               Message = <<"Hi there!">>,
  395:               listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config),
  396:               %% WHEN a user chat
  397:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room,
  398:                                                                  nick(Bob))),
  399: 
  400:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  401:               %% THEN
  402:               ?assertMatch(#{<<"from_user_id">> := BobFullJID,
  403:                              <<"to_user_id">> := RoomAddr,
  404:                              <<"message">> := Message},
  405:                            get_decoded_message_from_rabbit(BobGroupChatMsgSentRK))
  406:       end).
  407: 
  408: group_chat_message_received_event_properly_formatted(Config) ->
  409:     escalus:story(
  410:       Config, [{bob, 1}, {alice, 1}],
  411:       fun(Bob, Alice) ->
  412:               %% GIVEN basic variables
  413:               Room = ?config(room, Config),
  414:               RoomAddr = muc_helper:room_address(Room),
  415:               BobRoomJID = user_room_jid(RoomAddr, Bob),
  416:               AliceJID = client_lower_short_jid(Alice),
  417:               AliceFullJID = client_lower_full_jid(Alice),
  418:               AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID),
  419:               Message = <<"Hi there!">>,
  420:               %% GIVEN users in room
  421:               escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))),
  422:               escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))),
  423:               % wait for all room stanzas to be processed
  424:               escalus:wait_for_stanzas(Alice, 3),
  425:               escalus:wait_for_stanzas(Bob, 3),
  426:               %% GIVEN Room subscription to Rabbit
  427:               % We subscribe to RMQ now and not earlier to avoid messages other
  428:               % than the one we are testing, `Message` from Bob to Room, like
  429:               % for example affiliations and the like.
  430:               listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config),
  431:               %% WHEN users chat
  432:               escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)),
  433:               %% THEN
  434:               ?assertMatch(#{<<"from_user_id">> := BobRoomJID,
  435:                              <<"to_user_id">> := AliceFullJID,
  436:                              <<"message">> := Message},
  437:                            get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK))
  438:       end).
  439: 
  440: %%--------------------------------------------------------------------
  441: %% Test helpers
  442: %%--------------------------------------------------------------------
  443: 
  444: -spec connect_to_rabbit() -> proplists:proplist().
  445: connect_to_rabbit() ->
  446:     {ok, Connection} =
  447:         amqp_connection:start(#amqp_params_network{}),
  448:     {ok, Channel} = amqp_connection:open_channel(Connection),
  449:     [{rabbit_connection, Connection}, {rabbit_channel, Channel}].
  450: 
  451: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term().
  452: close_rabbit_connection(Config) ->
  453:     Connection = proplists:get_value(rabbit_connection, Config),
  454:     Channel = proplists:get_value(rabbit_channel, Config),
  455:     amqp_channel:close(Channel),
  456:     amqp_connection:close(Connection).
  457: 
  458: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()],
  459:                                             Config :: proplists:proplist()) ->
  460:     ok | term().
  461: listen_to_presence_events_from_rabbit(JIDs, Config) ->
  462:     QueueBindings = presence_bindings(?QUEUE_NAME, JIDs),
  463:     listen_to_events_from_rabbit(QueueBindings, Config).
  464: 
  465: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  466:                                                  Config :: proplists:proplist()) ->
  467:     ok | term().
  468: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  469:     QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  470:     listen_to_events_from_rabbit(QueueBindings, Config).
  471: 
  472: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  473:                                                  Config :: proplists:proplist()) ->
  474:     ok | term().
  475: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  476:     QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  477:     listen_to_events_from_rabbit(QueueBindings, Config).
  478: 
  479: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()],
  480:                                                        Config :: proplists:proplist()) ->
  481:     ok | term().
  482: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) ->
  483:     QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs),
  484:     listen_to_events_from_rabbit(QueueBindings, Config).
  485: 
  486: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()],
  487:                                                        Config :: proplists:proplist()) ->
  488:     ok | term().
  489: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) ->
  490:     QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs),
  491:     listen_to_events_from_rabbit(QueueBindings, Config).
  492: 
  493: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()],
  494:                                    Config :: proplists:proplist()) ->
  495:     ok | term().
  496: listen_to_events_from_rabbit(QueueBindings, Config) ->
  497:     Connection = proplists:get_value(rabbit_connection, Config),
  498:     Channel = proplists:get_value(rabbit_channel, Config),
  499:     declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME),
  500:     wait_for_exchanges_to_be_created(Connection,
  501:                                      [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  502:                                       {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE},
  503:                                       {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]),
  504:     bind_queues_to_exchanges(Channel, QueueBindings),
  505:     subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME).
  506: 
  507: -spec wait_for_exchanges_to_be_created(Connection :: pid(),
  508:                                        Exchanges :: [binary()]) -> pid().
  509: wait_for_exchanges_to_be_created(Connection, Exchanges) ->
  510:     [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges],
  511:     ok.
  512: 
  513: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary().
  514: declare_temporary_rabbit_queue(Channel, Queue) ->
  515:     #'queue.declare_ok'{} =
  516:         amqp_channel:call(Channel, #'queue.declare'{queue = Queue,
  517:                                                     exclusive = true}).
  518: 
  519: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  520:     [rabbit_binding()].
  521: presence_bindings(Queue, JIDs) ->
  522:     [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs].
  523: 
  524: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  525:     [rabbit_binding()].
  526: chat_msg_sent_bindings(Queue, JIDs) ->
  527:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs].
  528: 
  529: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  530:     [rabbit_binding()].
  531: chat_msg_recv_bindings(Queue, JIDs) ->
  532:     [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs].
  533: 
  534: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  535:     [rabbit_binding()].
  536: group_chat_msg_sent_bindings(Queue, JIDs) ->
  537:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)}
  538:      || JID <- JIDs].
  539: 
  540: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) ->
  541:     [rabbit_binding()].
  542: group_chat_msg_recv_bindings(Queue, JIDs) ->
  543:     [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)}
  544:      || JID <- JIDs].
  545: 
  546: -spec bind_queues_to_exchanges(Channel :: pid(),
  547:                                Bindings :: [rabbit_binding()]) ->
  548:     [amqp_client:amqp_method() | ok | blocked | closing].
  549: bind_queues_to_exchanges(Channel, Bindings) ->
  550:     [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings].
  551: 
  552: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) ->
  553:     amqp_client:amqp_method() | ok | blocked | closing.
  554: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) ->
  555:     #'queue.bind_ok'{} =
  556:         amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange,
  557:                                                  routing_key = RoutingKey,
  558:                                                  queue = Queue}).
  559: 
  560: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) ->
  561:                                      {ok, true} | {timeout, any()}.
  562: ensure_exchange_present(Connection, Exchange) ->
  563:     Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000,
  564:              sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL},
  565:     case mongoose_helper:wait_until(fun() ->
  566:                                             is_exchange_present(Connection,
  567:                                                                 Exchange)
  568:                                     end, true, Opts) of
  569:         {ok, true} -> true;
  570:         {timeout, _} ->
  571:             throw(io_lib:format("Exchange has not been created, exchange=~p",
  572:                                 [Exchange]))
  573:     end.
  574: 
  575: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean().
  576: is_exchange_present(Connection, {ExName, ExType}) ->
  577:     {ok, Channel} = amqp_connection:open_channel(Connection),
  578:     try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName,
  579:                                                        type = ExType,
  580:                                                        %% this option allows to
  581:                                                        %% check if an exchange exists
  582:                                                        passive = true}) of
  583:         {'exchange.declare_ok'} -> true
  584:     catch
  585:         _Error:_Reason -> false
  586:     end.
  587: 
  588: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok.
  589: subscribe_to_rabbit_queue(Channel, Queue) ->
  590:     amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue,
  591:                                                      no_ack = true}, self()),
  592:     receive
  593:         #'basic.consume_ok'{} -> ok
  594:     after
  595:         5000 ->
  596:             throw(io_lib:format("Timeout when subscribing to queue,"
  597:                                 "channel=~p, queue=~p", [Channel, Queue]))
  598:     end.
  599: 
  600: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  601:                            -> [[ok]] | term().
  602: send_presence_stanzas(Users, NumOfMsgs) ->
  603:     [send_presence_stanza(User, NumOfMsgs) || User <- Users].
  604: 
  605: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer())
  606:                           -> [ok] | term().
  607: send_presence_stanza(User, NumOfMsgs) ->
  608:     [escalus:send(User, escalus_stanza:presence(make_pres_type(X)))
  609:      || X <- lists:seq(1, NumOfMsgs)].
  610: 
  611: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) ->
  612:     map() | no_return().
  613: get_decoded_message_from_rabbit(RoutingKey) ->
  614:     receive
  615:         {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} ->
  616:             jiffy:decode(Msg, [return_maps])
  617:     after
  618:         5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p",
  619:                                    [RoutingKey]))
  620:     end.
  621: 
  622: %%--------------------------------------------------------------------
  623: %% Utils
  624: %%--------------------------------------------------------------------
  625: 
  626: start_mod_event_pusher_rabbit(Config) ->
  627:     dynamic_modules:start(domain(), mod_event_pusher_rabbit, Config).
  628: 
  629: stop_mod_event_pusher_rabbit() ->
  630:     dynamic_modules:stop(domain(), mod_event_pusher_rabbit).
  631: 
  632: start_rabbit_wpool(Host) ->
  633:     start_rabbit_wpool(Host, ?WPOOL_CFG).
  634: 
  635: start_rabbit_wpool(Host, WpoolConfig) ->
  636:     rpc(mim(), mongoose_wpool, ensure_started, []),
  637:     rpc(mim(), mongoose_wpool, start_configured_pools, [[WpoolConfig], [Host]]).
  638: 
  639: stop_rabbit_wpool({Pool, Host, Tag}) ->
  640:     rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]);
  641: stop_rabbit_wpool(Host)->
  642:     stop_rabbit_wpool({rabbit, Host, event_pusher}).
  643: 
  644: delete_exchanges() ->
  645:     ConnConf = connect_to_rabbit(),
  646:     Channel  = proplists:get_value(rabbit_channel, ConnConf),
  647:     [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex})
  648:      || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]],
  649:     close_rabbit_connection(ConnConf).
  650: 
  651: make_pres_type(X) when X rem 2 == 0 ->
  652:     <<"available">>;
  653: make_pres_type(_) ->
  654:     <<"unavailable">>.
  655: 
  656: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC).
  657: 
  658: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC).
  659: 
  660: group_chat_msg_sent_rk(JID) ->
  661:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC).
  662: 
  663: group_chat_msg_recv_rk(JID) ->
  664:     user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC).
  665: 
  666: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>.
  667: 
  668: client_lower_short_jid(Client) ->
  669:     escalus_utils:jid_to_lower(escalus_client:short_jid(Client)).
  670: 
  671: client_lower_full_jid(Client) ->
  672:     escalus_utils:jid_to_lower(escalus_client:full_jid(Client)).
  673: 
  674: nick(User) -> escalus_utils:get_username(User).
  675: 
  676: maybe_prepare_muc(TestCase, Config) when
  677:       TestCase == group_chat_message_sent_event orelse
  678:       TestCase == group_chat_message_received_event orelse
  679:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  680:       TestCase == group_chat_message_received_event_properly_formatted ->
  681:     prepare_muc(Config);
  682: maybe_prepare_muc(_, Config) -> Config.
  683: 
  684: maybe_cleanup_muc(TestCase, Config) when
  685:       TestCase == group_chat_message_sent_event orelse
  686:       TestCase == group_chat_message_received_event orelse
  687:       TestCase == group_chat_message_sent_event_properly_formatted orelse
  688:       TestCase == group_chat_message_received_event_properly_formatted ->
  689:     cleanup_muc(Config);
  690: maybe_cleanup_muc(_, _) -> ok.
  691: 
  692: prepare_muc(Config) ->
  693:     [User | _] = ?config(escalus_users, Config),
  694:     muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>,
  695:                           [{persistent, true},
  696:                            {anonymous, false}]).
  697: 
  698: cleanup_muc(Config) ->
  699:     muc_helper:destroy_room(Config).
  700: 
  701: user_room_jid(RoomJID, UserJID) ->
  702:     Nick = nick(UserJID),
  703:     <<RoomJID/binary, "/", Nick/binary>>.
  704: 
  705: extend_config_with_exchange_type(ExType) ->
  706:     lists:map(fun({Ex, Opts}) when
  707:                         Ex == presence_exchange orelse
  708:                         Ex == chat_msg_exchange orelse
  709:                         Ex == groupchat_msg_exchange ->
  710:                       {Ex, Opts ++ [{type, ExType}]};
  711:                  (Other) -> Other
  712:               end, ?MOD_EVENT_PUSHER_RABBIT_CFG).
  713: 
  714: is_rabbitmq_available() ->
  715:     try amqp_connection:start(#amqp_params_network{}) of
  716:         {ok, Conn} ->
  717:             amqp_connection:close(Conn),
  718:             true;
  719:         {error, econnrefused} ->
  720:             false
  721:     catch
  722:         _Err ->
  723:             false
  724:     end.