1: %%%------------------------------------------------------------------- 2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com> 3: %%% @copyright (C) 2018, Kacper Mentel 4: %%% @doc 5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`). 6: %%% @end 7: %%%------------------------------------------------------------------- 8: -module(mod_event_pusher_rabbit_SUITE). 9: 10: -include_lib("common_test/include/ct.hrl"). 11: -include_lib("eunit/include/eunit.hrl"). 12: -include_lib("amqp_client/include/amqp_client.hrl"). 13: 14: -include("assert_received_match.hrl"). 15: 16: -import(distributed_helper, [mim/0, rpc/4]). 17: -import(domain_helper, [domain/0]). 18: -import(config_parser_helper, [config/2]). 19: 20: -export([suite/0, all/0, groups/0]). 21: -export([init_per_suite/1, end_per_suite/1, 22: init_per_group/2, end_per_group/2, 23: init_per_testcase/2, end_per_testcase/2]). 24: 25: -export([rabbit_pool_starts_with_default_config/1, 26: exchanges_are_created_on_module_startup/1]). 27: -export([connected_users_push_presence_events_when_change_status/1, 28: presence_messages_are_properly_formatted/1]). 29: -export([chat_message_sent_event/1, 30: chat_message_sent_event_properly_formatted/1, 31: chat_message_received_event/1, 32: chat_message_received_event_properly_formatted/1]). 33: -export([group_chat_message_sent_event/1, 34: group_chat_message_sent_event_properly_formatted/1, 35: group_chat_message_received_event/1, 36: group_chat_message_received_event_properly_formatted/1]). 37: 38: -define(QUEUE_NAME, <<"test_queue">>). 39: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>). 40: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>). 41: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>). 42: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>). 43: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>). 44: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>). 45: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>). 46: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>). 47: -define(WPOOL_CFG, #{scope => host_type, 48: opts => #{workers => 20, strategy => best_worker, call_timeout => 5000}}). 49: -define(IF_EXCHANGE_EXISTS_RETRIES, 30). 50: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms 51: 52: -type rabbit_binding() :: {Queue :: binary(), 53: Exchange :: binary(), 54: RoutingKey :: binary()}. 55: 56: %%-------------------------------------------------------------------- 57: %% Suite configuration 58: %%-------------------------------------------------------------------- 59: 60: all() -> 61: [ 62: {group, pool_startup}, 63: {group, module_startup}, 64: {group, presence_status_publish}, 65: {group, chat_message_publish}, 66: {group, group_chat_message_publish} 67: ]. 68: 69: groups() -> 70: G = [ 71: {pool_startup, [], 72: [ 73: rabbit_pool_starts_with_default_config 74: ]}, 75: {module_startup, [], 76: [ 77: exchanges_are_created_on_module_startup 78: ]}, 79: {presence_status_publish, [], 80: [ 81: connected_users_push_presence_events_when_change_status, 82: presence_messages_are_properly_formatted 83: ]}, 84: {chat_message_publish, [], 85: [ 86: chat_message_sent_event, 87: chat_message_sent_event_properly_formatted, 88: chat_message_received_event, 89: chat_message_received_event_properly_formatted 90: ]}, 91: {group_chat_message_publish, [], 92: [ 93: group_chat_message_sent_event, 94: group_chat_message_sent_event_properly_formatted, 95: group_chat_message_received_event, 96: group_chat_message_received_event_properly_formatted 97: ]} 98: ], 99: ct_helper:repeat_all_until_all_ok(G). 100: 101: suite() -> 102: escalus:suite(). 103: 104: %%-------------------------------------------------------------------- 105: %% Init & teardown 106: %%-------------------------------------------------------------------- 107: 108: init_per_suite(Config) -> 109: case is_rabbitmq_available() of 110: true -> 111: start_rabbit_wpool(domain()), 112: {ok, _} = application:ensure_all_started(amqp_client), 113: muc_helper:load_muc(), 114: escalus:init_per_suite(Config); 115: false -> 116: {skip, "RabbitMQ server is not available on default port."} 117: end. 118: 119: end_per_suite(Config) -> 120: stop_rabbit_wpool(domain()), 121: escalus_fresh:clean(), 122: muc_helper:unload_muc(), 123: escalus:end_per_suite(Config). 124: 125: init_per_group(GroupName, Config0) -> 126: Domain = domain(), 127: Config = dynamic_modules:save_modules(Domain, Config0), 128: dynamic_modules:ensure_modules(Domain, required_modules(GroupName)), 129: Config. 130: 131: required_modules(pool_startup) -> 132: [{mod_event_pusher, stopped}]; 133: required_modules(GroupName) -> 134: [{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}]. 135: 136: mod_event_pusher_rabbit_opts(GroupName) -> 137: ExtraOpts = extra_exchange_opts(GroupName), 138: maps:map(fun(Key, Opts) -> 139: config([modules, mod_event_pusher, rabbit, Key], maps:merge(Opts, ExtraOpts)) 140: end, basic_exchanges()). 141: 142: basic_exchanges() -> 143: #{presence_exchange => #{name => ?PRESENCE_EXCHANGE}, 144: chat_msg_exchange => #{name => ?CHAT_MSG_EXCHANGE, 145: sent_topic => ?CHAT_MSG_SENT_TOPIC, 146: recv_topic => ?CHAT_MSG_RECV_TOPIC}, 147: groupchat_msg_exchange => #{name => ?GROUP_CHAT_MSG_EXCHANGE, 148: sent_topic => ?GROUP_CHAT_MSG_SENT_TOPIC, 149: recv_topic => ?GROUP_CHAT_MSG_RECV_TOPIC} 150: }. 151: 152: extra_exchange_opts(module_startup) -> #{type => <<"headers">>}; 153: extra_exchange_opts(_) -> #{}. 154: 155: end_per_group(_, Config) -> 156: delete_exchanges(), 157: dynamic_modules:restore_modules(Config). 158: 159: init_per_testcase(rabbit_pool_starts_with_default_config, Config) -> 160: Config; 161: init_per_testcase(CaseName, Config0) -> 162: Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]), 163: Config2 = maybe_prepare_muc(CaseName, Config1), 164: Config = Config2 ++ connect_to_rabbit(), 165: escalus:init_per_testcase(CaseName, Config). 166: 167: end_per_testcase(rabbit_pool_starts_with_default_config, _Config) -> 168: ok; 169: end_per_testcase(CaseName, Config) -> 170: maybe_cleanup_muc(CaseName, Config), 171: close_rabbit_connection(Config), 172: escalus:end_per_testcase(CaseName, Config). 173: 174: %%-------------------------------------------------------------------- 175: %% GROUP initialization_on_startup 176: %%-------------------------------------------------------------------- 177: 178: rabbit_pool_starts_with_default_config(_Config) -> 179: %% GIVEN 180: Domain = domain(), 181: DefaultWpoolConfig = #{type => rabbit, scope => host_type, tag => rabbit_event_pusher_default, 182: opts => #{workers => 10, strategy => best_worker, call_timeout => 5000}, 183: conn_opts => #{amqp_port => 5672, confirms_enabled => false, max_worker_queue_len => 1000}}, 184: RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default}, 185: %% WHEN 186: start_rabbit_wpool(Domain, DefaultWpoolConfig), 187: %% THEN 188: Pools = rpc(mim(), mongoose_wpool, get_pools, []), 189: ?assertMatch(RabbitWpool, 190: lists:keyfind(rabbit_event_pusher_default, 3, Pools)), 191: %% CLEANUP 192: stop_rabbit_wpool(RabbitWpool). 193: 194: exchanges_are_created_on_module_startup(Config) -> 195: %% GIVEN module is started with custom exchange types 196: Connection = proplists:get_value(rabbit_connection, Config), 197: ExCustomType = <<"headers">>, 198: Exchanges = [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE], 199: %% THEN exchanges are created 200: [?assert(ensure_exchange_present(Connection, {Exchange, ExCustomType})) 201: || Exchange <- Exchanges]. 202: 203: %%-------------------------------------------------------------------- 204: %% GROUP presence_status_publish 205: %%-------------------------------------------------------------------- 206: 207: connected_users_push_presence_events_when_change_status(Config) -> 208: escalus:story( 209: Config, [{bob, 1}], 210: fun(Bob) -> 211: %% GIVEN 212: BobJID = client_lower_short_jid(Bob), 213: listen_to_presence_events_from_rabbit([BobJID], Config), 214: %% WHEN users generate some traffic. 215: send_presence_stanzas([Bob], 1), 216: %% THEN wait for presence events from Rabbit. 217: ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID}, 218: #amqp_msg{}}, timer:seconds(5)) 219: end). 220: 221: presence_messages_are_properly_formatted(Config) -> 222: escalus:story( 223: Config, [{bob, 1}], 224: fun(Bob) -> 225: %% GIVEN 226: BobJID = client_lower_short_jid(Bob), 227: BobFullJID = client_lower_full_jid(Bob), 228: listen_to_presence_events_from_rabbit([BobJID], Config), 229: %% WHEN user logout 230: escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)), 231: %% THEN receive message 232: ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, 233: get_decoded_message_from_rabbit(BobJID)) 234: end). 235: 236: %%-------------------------------------------------------------------- 237: %% GROUP chat_message_publish 238: %%-------------------------------------------------------------------- 239: 240: chat_message_sent_event(Config) -> 241: escalus:story( 242: Config, [{bob, 1}, {alice, 1}], 243: fun(Bob, Alice) -> 244: %% GIVEN 245: BobJID = client_lower_short_jid(Bob), 246: BobChatMsgSentRK = chat_msg_sent_rk(BobJID), 247: listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config), 248: %% WHEN users chat 249: escalus:send(Bob, 250: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 251: %% THEN wait for chat message sent events events from Rabbit. 252: ?assertReceivedMatch({#'basic.deliver'{ 253: routing_key = BobChatMsgSentRK}, 254: #amqp_msg{}}, timer:seconds(5)) 255: end). 256: 257: chat_message_received_event(Config) -> 258: escalus:story( 259: Config, [{bob, 1}, {alice, 1}], 260: fun(Bob, Alice) -> 261: %% GIVEN 262: AliceJID = client_lower_short_jid(Alice), 263: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 264: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 265: %% WHEN users chat 266: escalus:send(Bob, 267: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 268: escalus:wait_for_stanzas(Alice, 2), 269: %% THEN wait for chat message received events events from 270: %% Rabbit. 271: ?assertReceivedMatch({#'basic.deliver'{ 272: routing_key = AliceChatMsgRecvRK}, 273: #amqp_msg{}}, timer:seconds(5)) 274: end). 275: 276: chat_message_sent_event_properly_formatted(Config) -> 277: escalus:story( 278: Config, [{bob, 1}, {alice, 1}], 279: fun(Bob, Alice) -> 280: %% GIVEN 281: AliceJID = client_lower_short_jid(Alice), 282: AliceFullJID = client_lower_full_jid(Alice), 283: BobFullJID = client_lower_full_jid(Bob), 284: AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID), 285: Message = <<"Hi Bob!">>, 286: listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config), 287: %% WHEN users chat 288: escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)), 289: %% THEN 290: ?assertMatch(#{<<"from_user_id">> := AliceFullJID, 291: <<"to_user_id">> := BobFullJID, 292: <<"message">> := Message}, 293: get_decoded_message_from_rabbit(AliceChatMsgSentRK)) 294: end). 295: 296: chat_message_received_event_properly_formatted(Config) -> 297: escalus:story( 298: Config, [{bob, 1}, {alice, 1}], 299: fun(Bob, Alice) -> 300: %% GIVEN 301: AliceJID = client_lower_short_jid(Alice), 302: AliceFullJID = client_lower_full_jid(Alice), 303: BobFullJID = client_lower_full_jid(Bob), 304: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 305: Message = <<"Hi Alice!">>, 306: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 307: %% WHEN users chat 308: escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)), 309: escalus:wait_for_stanzas(Alice, 2), 310: %% THEN 311: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 312: <<"to_user_id">> := AliceFullJID, 313: <<"message">> := Message}, 314: get_decoded_message_from_rabbit(AliceChatMsgRecvRK)) 315: end). 316: 317: %%-------------------------------------------------------------------- 318: %% GROUP group_message_publish 319: %%-------------------------------------------------------------------- 320: 321: group_chat_message_sent_event(Config) -> 322: escalus:story( 323: Config, [{bob, 1}], 324: fun(Bob) -> 325: %% GIVEN 326: Room = ?config(room, Config), 327: RoomAddr = muc_helper:room_address(Room), 328: BobJID = client_lower_short_jid(Bob), 329: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 330: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 331: %% WHEN users chat 332: escalus:send(Bob, 333: muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 334: 335: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 336: <<"Hi there!">>)), 337: 338: %% THEN wait for chat message sent events events from Rabbit. 339: ?assertReceivedMatch({#'basic.deliver'{ 340: routing_key = BobGroupChatMsgSentRK}, 341: #amqp_msg{}}, timer:seconds(5)) 342: end). 343: 344: group_chat_message_received_event(Config) -> 345: escalus:story( 346: Config, [{bob, 1}, {alice, 1}], 347: fun(Bob, Alice) -> 348: %% GIVEN 349: Room = ?config(room, Config), 350: RoomAddr = muc_helper:room_address(Room), 351: AliceJID = client_lower_short_jid(Alice), 352: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 353: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], 354: Config), 355: %% WHEN users chat 356: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, 357: nick(Alice))), 358: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 359: nick(Bob))), 360: 361: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 362: <<"Hi there!">>)), 363: 364: %% THEN wait for chat message received events events from Rabbit. 365: ?assertReceivedMatch({#'basic.deliver'{ 366: routing_key = AliceGroupChatMsgRecvRK}, 367: #amqp_msg{}}, timer:seconds(5)) 368: end). 369: 370: group_chat_message_sent_event_properly_formatted(Config) -> 371: escalus:story( 372: Config, [{bob, 1}], 373: fun(Bob) -> 374: %% GIVEN 375: Room = ?config(room, Config), 376: RoomAddr = muc_helper:room_address(Room), 377: BobJID = client_lower_short_jid(Bob), 378: BobFullJID = client_lower_full_jid(Bob), 379: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 380: Message = <<"Hi there!">>, 381: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 382: %% WHEN a user chat 383: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 384: nick(Bob))), 385: 386: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 387: %% THEN 388: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 389: <<"to_user_id">> := RoomAddr, 390: <<"message">> := Message}, 391: get_decoded_message_from_rabbit(BobGroupChatMsgSentRK)) 392: end). 393: 394: group_chat_message_received_event_properly_formatted(Config) -> 395: escalus:story( 396: Config, [{bob, 1}, {alice, 1}], 397: fun(Bob, Alice) -> 398: %% GIVEN basic variables 399: Room = ?config(room, Config), 400: RoomAddr = muc_helper:room_address(Room), 401: BobRoomJID = user_room_jid(RoomAddr, Bob), 402: AliceJID = client_lower_short_jid(Alice), 403: AliceFullJID = client_lower_full_jid(Alice), 404: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 405: Message = <<"Hi there!">>, 406: %% GIVEN users in room 407: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))), 408: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 409: % wait for all room stanzas to be processed 410: escalus:wait_for_stanzas(Alice, 3), 411: escalus:wait_for_stanzas(Bob, 3), 412: %% GIVEN Room subscription to Rabbit 413: % We subscribe to RMQ now and not earlier to avoid messages other 414: % than the one we are testing, `Message` from Bob to Room, like 415: % for example affiliations and the like. 416: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config), 417: %% WHEN users chat 418: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 419: %% THEN 420: ?assertMatch(#{<<"from_user_id">> := BobRoomJID, 421: <<"to_user_id">> := AliceFullJID, 422: <<"message">> := Message}, 423: get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK)) 424: end). 425: 426: %%-------------------------------------------------------------------- 427: %% Test helpers 428: %%-------------------------------------------------------------------- 429: 430: -spec connect_to_rabbit() -> proplists:proplist(). 431: connect_to_rabbit() -> 432: {ok, Connection} = 433: amqp_connection:start(#amqp_params_network{}), 434: {ok, Channel} = amqp_connection:open_channel(Connection), 435: [{rabbit_connection, Connection}, {rabbit_channel, Channel}]. 436: 437: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term(). 438: close_rabbit_connection(Config) -> 439: Connection = proplists:get_value(rabbit_connection, Config), 440: Channel = proplists:get_value(rabbit_channel, Config), 441: amqp_channel:close(Channel), 442: amqp_connection:close(Connection). 443: 444: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()], 445: Config :: proplists:proplist()) -> 446: ok | term(). 447: listen_to_presence_events_from_rabbit(JIDs, Config) -> 448: QueueBindings = presence_bindings(?QUEUE_NAME, JIDs), 449: listen_to_events_from_rabbit(QueueBindings, Config). 450: 451: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 452: Config :: proplists:proplist()) -> 453: ok | term(). 454: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 455: QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 456: listen_to_events_from_rabbit(QueueBindings, Config). 457: 458: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 459: Config :: proplists:proplist()) -> 460: ok | term(). 461: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 462: QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 463: listen_to_events_from_rabbit(QueueBindings, Config). 464: 465: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 466: Config :: proplists:proplist()) -> 467: ok | term(). 468: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 469: QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 470: listen_to_events_from_rabbit(QueueBindings, Config). 471: 472: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 473: Config :: proplists:proplist()) -> 474: ok | term(). 475: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 476: QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 477: listen_to_events_from_rabbit(QueueBindings, Config). 478: 479: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()], 480: Config :: proplists:proplist()) -> 481: ok | term(). 482: listen_to_events_from_rabbit(QueueBindings, Config) -> 483: Connection = proplists:get_value(rabbit_connection, Config), 484: Channel = proplists:get_value(rabbit_channel, Config), 485: declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME), 486: wait_for_exchanges_to_be_created(Connection, 487: [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 488: {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 489: {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]), 490: bind_queues_to_exchanges(Channel, QueueBindings), 491: subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME). 492: 493: -spec wait_for_exchanges_to_be_created(Connection :: pid(), 494: Exchanges :: [binary()]) -> pid(). 495: wait_for_exchanges_to_be_created(Connection, Exchanges) -> 496: [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges], 497: ok. 498: 499: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary(). 500: declare_temporary_rabbit_queue(Channel, Queue) -> 501: #'queue.declare_ok'{} = 502: amqp_channel:call(Channel, #'queue.declare'{queue = Queue, 503: exclusive = true}). 504: 505: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) -> 506: [rabbit_binding()]. 507: presence_bindings(Queue, JIDs) -> 508: [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs]. 509: 510: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 511: [rabbit_binding()]. 512: chat_msg_sent_bindings(Queue, JIDs) -> 513: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs]. 514: 515: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 516: [rabbit_binding()]. 517: chat_msg_recv_bindings(Queue, JIDs) -> 518: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs]. 519: 520: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 521: [rabbit_binding()]. 522: group_chat_msg_sent_bindings(Queue, JIDs) -> 523: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)} 524: || JID <- JIDs]. 525: 526: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 527: [rabbit_binding()]. 528: group_chat_msg_recv_bindings(Queue, JIDs) -> 529: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)} 530: || JID <- JIDs]. 531: 532: -spec bind_queues_to_exchanges(Channel :: pid(), 533: Bindings :: [rabbit_binding()]) -> 534: [amqp_client:amqp_method() | ok | blocked | closing]. 535: bind_queues_to_exchanges(Channel, Bindings) -> 536: [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings]. 537: 538: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) -> 539: amqp_client:amqp_method() | ok | blocked | closing. 540: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) -> 541: #'queue.bind_ok'{} = 542: amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange, 543: routing_key = RoutingKey, 544: queue = Queue}). 545: 546: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) -> 547: {ok, true} | {timeout, any()}. 548: ensure_exchange_present(Connection, Exchange) -> 549: Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000, 550: sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL}, 551: case mongoose_helper:wait_until(fun() -> 552: is_exchange_present(Connection, 553: Exchange) 554: end, true, Opts) of 555: {ok, true} -> true; 556: {timeout, _} -> 557: throw(io_lib:format("Exchange has not been created, exchange=~p", 558: [Exchange])) 559: end. 560: 561: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean(). 562: is_exchange_present(Connection, {ExName, ExType}) -> 563: {ok, Channel} = amqp_connection:open_channel(Connection), 564: try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName, 565: type = ExType, 566: %% this option allows to 567: %% check if an exchange exists 568: passive = true}) of 569: {'exchange.declare_ok'} -> true 570: catch 571: _Error:_Reason -> false 572: end. 573: 574: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok. 575: subscribe_to_rabbit_queue(Channel, Queue) -> 576: amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue, 577: no_ack = true}, self()), 578: receive 579: #'basic.consume_ok'{} -> ok 580: after 581: 5000 -> 582: throw(io_lib:format("Timeout when subscribing to queue," 583: "channel=~p, queue=~p", [Channel, Queue])) 584: end. 585: 586: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 587: -> [[ok]] | term(). 588: send_presence_stanzas(Users, NumOfMsgs) -> 589: [send_presence_stanza(User, NumOfMsgs) || User <- Users]. 590: 591: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 592: -> [ok] | term(). 593: send_presence_stanza(User, NumOfMsgs) -> 594: [escalus:send(User, escalus_stanza:presence(make_pres_type(X))) 595: || X <- lists:seq(1, NumOfMsgs)]. 596: 597: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) -> 598: map() | no_return(). 599: get_decoded_message_from_rabbit(RoutingKey) -> 600: receive 601: {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} -> 602: jiffy:decode(Msg, [return_maps]) 603: after 604: 5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p", 605: [RoutingKey])) 606: end. 607: 608: %%-------------------------------------------------------------------- 609: %% Utils 610: %%-------------------------------------------------------------------- 611: 612: start_rabbit_wpool(Host) -> 613: start_rabbit_wpool(Host, ?WPOOL_CFG). 614: 615: start_rabbit_wpool(Host, WpoolConfig) -> 616: rpc(mim(), mongoose_wpool, ensure_started, []), 617: Pool = config([outgoing_pools, rabbit, event_pusher], WpoolConfig), 618: [{ok, _Pid}] = rpc(mim(), mongoose_wpool, start_configured_pools, [[Pool], [Host]]). 619: 620: stop_rabbit_wpool({Pool, Host, Tag}) -> 621: rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]); 622: stop_rabbit_wpool(Host)-> 623: stop_rabbit_wpool({rabbit, Host, event_pusher}). 624: 625: delete_exchanges() -> 626: ConnConf = connect_to_rabbit(), 627: Channel = proplists:get_value(rabbit_channel, ConnConf), 628: [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex}) 629: || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]], 630: close_rabbit_connection(ConnConf). 631: 632: make_pres_type(X) when X rem 2 == 0 -> 633: <<"available">>; 634: make_pres_type(_) -> 635: <<"unavailable">>. 636: 637: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC). 638: 639: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC). 640: 641: group_chat_msg_sent_rk(JID) -> 642: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC). 643: 644: group_chat_msg_recv_rk(JID) -> 645: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC). 646: 647: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>. 648: 649: client_lower_short_jid(Client) -> 650: escalus_utils:jid_to_lower(escalus_client:short_jid(Client)). 651: 652: client_lower_full_jid(Client) -> 653: escalus_utils:jid_to_lower(escalus_client:full_jid(Client)). 654: 655: nick(User) -> escalus_utils:get_username(User). 656: 657: maybe_prepare_muc(TestCase, Config) when 658: TestCase == group_chat_message_sent_event orelse 659: TestCase == group_chat_message_received_event orelse 660: TestCase == group_chat_message_sent_event_properly_formatted orelse 661: TestCase == group_chat_message_received_event_properly_formatted -> 662: prepare_muc(Config); 663: maybe_prepare_muc(_, Config) -> Config. 664: 665: maybe_cleanup_muc(TestCase, Config) when 666: TestCase == group_chat_message_sent_event orelse 667: TestCase == group_chat_message_received_event orelse 668: TestCase == group_chat_message_sent_event_properly_formatted orelse 669: TestCase == group_chat_message_received_event_properly_formatted -> 670: cleanup_muc(Config); 671: maybe_cleanup_muc(_, _) -> ok. 672: 673: prepare_muc(Config) -> 674: [User | _] = ?config(escalus_users, Config), 675: muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>, 676: [{persistent, true}, 677: {anonymous, false}]). 678: 679: cleanup_muc(Config) -> 680: muc_helper:destroy_room(Config). 681: 682: user_room_jid(RoomJID, UserJID) -> 683: Nick = nick(UserJID), 684: <<RoomJID/binary, "/", Nick/binary>>. 685: 686: is_rabbitmq_available() -> 687: try amqp_connection:start(#amqp_params_network{}) of 688: {ok, Conn} -> 689: amqp_connection:close(Conn), 690: true; 691: {error, econnrefused} -> 692: false 693: catch 694: _Err -> 695: false 696: end.