1: %%%------------------------------------------------------------------- 2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com> 3: %%% @copyright (C) 2018, Kacper Mentel 4: %%% @doc 5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`). 6: %%% @end 7: %%%------------------------------------------------------------------- 8: -module(mod_event_pusher_rabbit_SUITE). 9: 10: -include_lib("escalus/include/escalus.hrl"). 11: -include_lib("common_test/include/ct.hrl"). 12: -include_lib("eunit/include/eunit.hrl"). 13: -include_lib("escalus/include/escalus_xmlns.hrl"). 14: -include_lib("exml/include/exml.hrl"). 15: -include_lib("amqp_client/include/amqp_client.hrl"). 16: 17: -include("assert_received_match.hrl"). 18: 19: -import(distributed_helper, [mim/0, 20: rpc/4]). 21: 22: -import(domain_helper, [domain/0]). 23: 24: -export([suite/0, all/0, groups/0]). 25: -export([init_per_suite/1, end_per_suite/1, 26: init_per_group/2, end_per_group/2, 27: init_per_testcase/2, end_per_testcase/2]). 28: 29: -export([rabbit_pool_starts_with_default_config/1, 30: exchanges_are_created_on_module_startup/1]). 31: -export([connected_users_push_presence_events_when_change_status/1, 32: presence_messages_are_properly_formatted/1]). 33: -export([chat_message_sent_event/1, 34: chat_message_sent_event_properly_formatted/1, 35: chat_message_received_event/1, 36: chat_message_received_event_properly_formatted/1]). 37: -export([group_chat_message_sent_event/1, 38: group_chat_message_sent_event_properly_formatted/1, 39: group_chat_message_received_event/1, 40: group_chat_message_received_event_properly_formatted/1]). 41: 42: -define(QUEUE_NAME, <<"test_queue">>). 43: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>). 44: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>). 45: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>). 46: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>). 47: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>). 48: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>). 49: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>). 50: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>). 51: -define(MOD_EVENT_PUSHER_RABBIT_CFG, 52: [{presence_exchange, [{name, ?PRESENCE_EXCHANGE}]}, 53: {chat_msg_exchange, [{name, ?CHAT_MSG_EXCHANGE}, 54: {sent_topic, ?CHAT_MSG_SENT_TOPIC}, 55: {recv_topic, ?CHAT_MSG_RECV_TOPIC}]}, 56: {groupchat_msg_exchange, [{name, ?GROUP_CHAT_MSG_EXCHANGE}, 57: {sent_topic, ?GROUP_CHAT_MSG_SENT_TOPIC}, 58: {recv_topic, ?GROUP_CHAT_MSG_RECV_TOPIC}]} 59: ]). 60: -define(MOD_EVENT_PUSHER_CFG, [{backends, 61: [{rabbit, ?MOD_EVENT_PUSHER_RABBIT_CFG}]}]). 62: -define(WPOOL_CFG, {rabbit, host, event_pusher, 63: [{workers, 20}], 64: [%% enables publisher one-to-one confirms 65: %% disabled by default 66: %% {confirms_enabled, true}, 67: {amqp_host, "localhost"}, 68: {amqp_port, 5672}, 69: {amqp_username, "guest"}, 70: {amqp_password, "guest"} 71: ]}). 72: -define(IF_EXCHANGE_EXISTS_RETRIES, 30). 73: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms 74: 75: -type rabbit_binding() :: {Queue :: binary(), 76: Exchange :: binary(), 77: RoutingKey :: binary()}. 78: 79: %%-------------------------------------------------------------------- 80: %% Suite configuration 81: %%-------------------------------------------------------------------- 82: 83: all() -> 84: [ 85: {group, initialization_on_startup}, 86: {group, presence_status_publish}, 87: {group, chat_message_publish}, 88: {group, group_chat_message_publish} 89: ]. 90: 91: groups() -> 92: G = [ 93: {initialization_on_startup, [], 94: [ 95: rabbit_pool_starts_with_default_config, 96: exchanges_are_created_on_module_startup 97: ]}, 98: {presence_status_publish, [], 99: [ 100: connected_users_push_presence_events_when_change_status, 101: presence_messages_are_properly_formatted 102: ]}, 103: {chat_message_publish, [], 104: [ 105: chat_message_sent_event, 106: chat_message_sent_event_properly_formatted, 107: chat_message_received_event, 108: chat_message_received_event_properly_formatted 109: ]}, 110: {group_chat_message_publish, [], 111: [ 112: group_chat_message_sent_event, 113: group_chat_message_sent_event_properly_formatted, 114: group_chat_message_received_event, 115: group_chat_message_received_event_properly_formatted 116: ]} 117: ], 118: ct_helper:repeat_all_until_all_ok(G). 119: 120: suite() -> 121: escalus:suite(). 122: 123: %%-------------------------------------------------------------------- 124: %% Init & teardown 125: %%-------------------------------------------------------------------- 126: 127: init_per_suite(Config) -> 128: start_rabbit_wpool(domain()), 129: {ok, _} = application:ensure_all_started(amqp_client), 130: case is_rabbitmq_available() of 131: true -> 132: muc_helper:load_muc(), 133: escalus:init_per_suite(Config); 134: false -> 135: {skip, "RabbitMQ server is not available on default port."} 136: end. 137: 138: end_per_suite(Config) -> 139: stop_rabbit_wpool(domain()), 140: escalus_fresh:clean(), 141: muc_helper:unload_muc(), 142: escalus:end_per_suite(Config). 143: 144: init_per_group(initialization_on_startup, Config) -> 145: Config; 146: init_per_group(_, Config0) -> 147: Domain = domain(), 148: Config = dynamic_modules:save_modules(Domain, Config0), 149: dynamic_modules:ensure_modules(Domain, 150: [{mod_event_pusher, ?MOD_EVENT_PUSHER_CFG}]), 151: Config. 152: 153: end_per_group(initialization_on_startup, Config) -> 154: Config; 155: end_per_group(_, Config) -> 156: delete_exchanges(), 157: Domain = domain(), 158: dynamic_modules:stop(Domain, mod_event_pusher), 159: dynamic_modules:restore_modules(Config), 160: escalus:delete_users(Config, escalus:get_users([bob, alice])). 161: 162: init_per_testcase(rabbit_pool_starts_with_default_config, Config) -> 163: Config; 164: init_per_testcase(CaseName, Config0) -> 165: Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]), 166: Config2 = maybe_prepare_muc(CaseName, Config1), 167: Config = Config2 ++ connect_to_rabbit(), 168: escalus:init_per_testcase(CaseName, Config). 169: 170: end_per_testcase(rabbit_pool_starts_with_default_config, Config) -> 171: Config; 172: end_per_testcase(exchanges_are_created_on_module_startup, Config) -> 173: delete_exchanges(), 174: stop_mod_event_pusher_rabbit(), 175: close_rabbit_connection(Config), 176: Config; 177: end_per_testcase(CaseName, Config) -> 178: maybe_cleanup_muc(CaseName, Config), 179: close_rabbit_connection(Config), 180: escalus:end_per_testcase(CaseName, Config). 181: 182: 183: %%-------------------------------------------------------------------- 184: %% GROUP initialization_on_startup 185: %%-------------------------------------------------------------------- 186: 187: rabbit_pool_starts_with_default_config(_Config) -> 188: %% GIVEN 189: Domain = domain(), 190: DefaultWpoolConfig = {rabbit, host, rabbit_event_pusher_default, [], []}, 191: RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default}, 192: %% WHEN 193: start_rabbit_wpool(Domain, DefaultWpoolConfig), 194: %% THEN 195: Pools = rpc(mim(), mongoose_wpool, get_pools, []), 196: ?assertMatch(RabbitWpool, 197: lists:keyfind(rabbit_event_pusher_default, 3, Pools)), 198: %% CLEANUP 199: stop_rabbit_wpool(RabbitWpool). 200: 201: exchanges_are_created_on_module_startup(Config) -> 202: %% GIVEN 203: Connection = proplists:get_value(rabbit_connection, Config), 204: ExCustomType = <<"headers">>, 205: Exchanges = [{?PRESENCE_EXCHANGE, ExCustomType}, 206: {?CHAT_MSG_EXCHANGE, ExCustomType}, 207: {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType}], 208: ConfigWithCustomExchangeType = 209: extend_config_with_exchange_type(ExCustomType), 210: %% WHEN 211: start_mod_event_pusher_rabbit(ConfigWithCustomExchangeType), 212: %% THEN exchanges are created 213: [?assert(ensure_exchange_present(Connection, Exchange)) 214: || Exchange <- Exchanges]. 215: 216: %%-------------------------------------------------------------------- 217: %% GROUP presence_status_publish 218: %%-------------------------------------------------------------------- 219: 220: connected_users_push_presence_events_when_change_status(Config) -> 221: escalus:story( 222: Config, [{bob, 1}], 223: fun(Bob) -> 224: %% GIVEN 225: BobJID = client_lower_short_jid(Bob), 226: listen_to_presence_events_from_rabbit([BobJID], Config), 227: %% WHEN users generate some traffic. 228: send_presence_stanzas([Bob], 1), 229: %% THEN wait for presence events from Rabbit. 230: ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID}, 231: #amqp_msg{}}, timer:seconds(5)) 232: end). 233: 234: presence_messages_are_properly_formatted(Config) -> 235: escalus:story( 236: Config, [{bob, 1}], 237: fun(Bob) -> 238: %% GIVEN 239: BobJID = client_lower_short_jid(Bob), 240: BobFullJID = client_lower_full_jid(Bob), 241: listen_to_presence_events_from_rabbit([BobJID], Config), 242: %% WHEN user logout 243: escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)), 244: %% THEN receive message 245: ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, 246: get_decoded_message_from_rabbit(BobJID)) 247: end). 248: 249: %%-------------------------------------------------------------------- 250: %% GROUP chat_message_publish 251: %%-------------------------------------------------------------------- 252: 253: chat_message_sent_event(Config) -> 254: escalus:story( 255: Config, [{bob, 1}, {alice, 1}], 256: fun(Bob, Alice) -> 257: %% GIVEN 258: BobJID = client_lower_short_jid(Bob), 259: BobChatMsgSentRK = chat_msg_sent_rk(BobJID), 260: listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config), 261: %% WHEN users chat 262: escalus:send(Bob, 263: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 264: %% THEN wait for chat message sent events events from Rabbit. 265: ?assertReceivedMatch({#'basic.deliver'{ 266: routing_key = BobChatMsgSentRK}, 267: #amqp_msg{}}, timer:seconds(5)) 268: end). 269: 270: chat_message_received_event(Config) -> 271: escalus:story( 272: Config, [{bob, 1}, {alice, 1}], 273: fun(Bob, Alice) -> 274: %% GIVEN 275: AliceJID = client_lower_short_jid(Alice), 276: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 277: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 278: %% WHEN users chat 279: escalus:send(Bob, 280: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 281: escalus:wait_for_stanzas(Alice, 2), 282: %% THEN wait for chat message received events events from 283: %% Rabbit. 284: ?assertReceivedMatch({#'basic.deliver'{ 285: routing_key = AliceChatMsgRecvRK}, 286: #amqp_msg{}}, timer:seconds(5)) 287: end). 288: 289: chat_message_sent_event_properly_formatted(Config) -> 290: escalus:story( 291: Config, [{bob, 1}, {alice, 1}], 292: fun(Bob, Alice) -> 293: %% GIVEN 294: AliceJID = client_lower_short_jid(Alice), 295: AliceFullJID = client_lower_full_jid(Alice), 296: BobFullJID = client_lower_full_jid(Bob), 297: AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID), 298: Message = <<"Hi Bob!">>, 299: listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config), 300: %% WHEN users chat 301: escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)), 302: %% THEN 303: ?assertMatch(#{<<"from_user_id">> := AliceFullJID, 304: <<"to_user_id">> := BobFullJID, 305: <<"message">> := Message}, 306: get_decoded_message_from_rabbit(AliceChatMsgSentRK)) 307: end). 308: 309: chat_message_received_event_properly_formatted(Config) -> 310: escalus:story( 311: Config, [{bob, 1}, {alice, 1}], 312: fun(Bob, Alice) -> 313: %% GIVEN 314: AliceJID = client_lower_short_jid(Alice), 315: AliceFullJID = client_lower_full_jid(Alice), 316: BobFullJID = client_lower_full_jid(Bob), 317: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 318: Message = <<"Hi Alice!">>, 319: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 320: %% WHEN users chat 321: escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)), 322: escalus:wait_for_stanzas(Alice, 2), 323: %% THEN 324: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 325: <<"to_user_id">> := AliceFullJID, 326: <<"message">> := Message}, 327: get_decoded_message_from_rabbit(AliceChatMsgRecvRK)) 328: end). 329: 330: %%-------------------------------------------------------------------- 331: %% GROUP group_message_publish 332: %%-------------------------------------------------------------------- 333: 334: group_chat_message_sent_event(Config) -> 335: escalus:story( 336: Config, [{bob, 1}], 337: fun(Bob) -> 338: %% GIVEN 339: Room = ?config(room, Config), 340: RoomAddr = muc_helper:room_address(Room), 341: BobJID = client_lower_short_jid(Bob), 342: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 343: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 344: %% WHEN users chat 345: escalus:send(Bob, 346: muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 347: 348: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 349: <<"Hi there!">>)), 350: 351: %% THEN wait for chat message sent events events from Rabbit. 352: ?assertReceivedMatch({#'basic.deliver'{ 353: routing_key = BobGroupChatMsgSentRK}, 354: #amqp_msg{}}, timer:seconds(5)) 355: end). 356: 357: group_chat_message_received_event(Config) -> 358: escalus:story( 359: Config, [{bob, 1}, {alice, 1}], 360: fun(Bob, Alice) -> 361: %% GIVEN 362: Room = ?config(room, Config), 363: RoomAddr = muc_helper:room_address(Room), 364: AliceJID = client_lower_short_jid(Alice), 365: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 366: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], 367: Config), 368: %% WHEN users chat 369: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, 370: nick(Alice))), 371: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 372: nick(Bob))), 373: 374: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 375: <<"Hi there!">>)), 376: 377: %% THEN wait for chat message received events events from Rabbit. 378: ?assertReceivedMatch({#'basic.deliver'{ 379: routing_key = AliceGroupChatMsgRecvRK}, 380: #amqp_msg{}}, timer:seconds(5)) 381: end). 382: 383: group_chat_message_sent_event_properly_formatted(Config) -> 384: escalus:story( 385: Config, [{bob, 1}], 386: fun(Bob) -> 387: %% GIVEN 388: Room = ?config(room, Config), 389: RoomAddr = muc_helper:room_address(Room), 390: BobJID = client_lower_short_jid(Bob), 391: BobFullJID = client_lower_full_jid(Bob), 392: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 393: Message = <<"Hi there!">>, 394: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 395: %% WHEN a user chat 396: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 397: nick(Bob))), 398: 399: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 400: %% THEN 401: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 402: <<"to_user_id">> := RoomAddr, 403: <<"message">> := Message}, 404: get_decoded_message_from_rabbit(BobGroupChatMsgSentRK)) 405: end). 406: 407: group_chat_message_received_event_properly_formatted(Config) -> 408: escalus:story( 409: Config, [{bob, 1}, {alice, 1}], 410: fun(Bob, Alice) -> 411: %% GIVEN basic variables 412: Room = ?config(room, Config), 413: RoomAddr = muc_helper:room_address(Room), 414: BobRoomJID = user_room_jid(RoomAddr, Bob), 415: AliceJID = client_lower_short_jid(Alice), 416: AliceFullJID = client_lower_full_jid(Alice), 417: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 418: Message = <<"Hi there!">>, 419: %% GIVEN users in room 420: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))), 421: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 422: % wait for all room stanzas to be processed 423: escalus:wait_for_stanzas(Alice, 3), 424: escalus:wait_for_stanzas(Bob, 3), 425: %% GIVEN Room subscription to Rabbit 426: % We subscribe to RMQ now and not earlier to avoid messages other 427: % than the one we are testing, `Message` from Bob to Room, like 428: % for example affiliations and the like. 429: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config), 430: %% WHEN users chat 431: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 432: %% THEN 433: ?assertMatch(#{<<"from_user_id">> := BobRoomJID, 434: <<"to_user_id">> := AliceFullJID, 435: <<"message">> := Message}, 436: get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK)) 437: end). 438: 439: %%-------------------------------------------------------------------- 440: %% Test helpers 441: %%-------------------------------------------------------------------- 442: 443: -spec connect_to_rabbit() -> proplists:proplist(). 444: connect_to_rabbit() -> 445: {ok, Connection} = 446: amqp_connection:start(#amqp_params_network{}), 447: {ok, Channel} = amqp_connection:open_channel(Connection), 448: [{rabbit_connection, Connection}, {rabbit_channel, Channel}]. 449: 450: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term(). 451: close_rabbit_connection(Config) -> 452: Connection = proplists:get_value(rabbit_connection, Config), 453: Channel = proplists:get_value(rabbit_channel, Config), 454: amqp_channel:close(Channel), 455: amqp_connection:close(Connection). 456: 457: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()], 458: Config :: proplists:proplist()) -> 459: ok | term(). 460: listen_to_presence_events_from_rabbit(JIDs, Config) -> 461: QueueBindings = presence_bindings(?QUEUE_NAME, JIDs), 462: listen_to_events_from_rabbit(QueueBindings, Config). 463: 464: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 465: Config :: proplists:proplist()) -> 466: ok | term(). 467: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 468: QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 469: listen_to_events_from_rabbit(QueueBindings, Config). 470: 471: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 472: Config :: proplists:proplist()) -> 473: ok | term(). 474: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 475: QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 476: listen_to_events_from_rabbit(QueueBindings, Config). 477: 478: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 479: Config :: proplists:proplist()) -> 480: ok | term(). 481: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 482: QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 483: listen_to_events_from_rabbit(QueueBindings, Config). 484: 485: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 486: Config :: proplists:proplist()) -> 487: ok | term(). 488: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 489: QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 490: listen_to_events_from_rabbit(QueueBindings, Config). 491: 492: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()], 493: Config :: proplists:proplist()) -> 494: ok | term(). 495: listen_to_events_from_rabbit(QueueBindings, Config) -> 496: Connection = proplists:get_value(rabbit_connection, Config), 497: Channel = proplists:get_value(rabbit_channel, Config), 498: declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME), 499: wait_for_exchanges_to_be_created(Connection, 500: [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 501: {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 502: {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]), 503: bind_queues_to_exchanges(Channel, QueueBindings), 504: subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME). 505: 506: -spec wait_for_exchanges_to_be_created(Connection :: pid(), 507: Exchanges :: [binary()]) -> pid(). 508: wait_for_exchanges_to_be_created(Connection, Exchanges) -> 509: [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges], 510: ok. 511: 512: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary(). 513: declare_temporary_rabbit_queue(Channel, Queue) -> 514: #'queue.declare_ok'{} = 515: amqp_channel:call(Channel, #'queue.declare'{queue = Queue, 516: exclusive = true}). 517: 518: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) -> 519: [rabbit_binding()]. 520: presence_bindings(Queue, JIDs) -> 521: [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs]. 522: 523: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 524: [rabbit_binding()]. 525: chat_msg_sent_bindings(Queue, JIDs) -> 526: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs]. 527: 528: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 529: [rabbit_binding()]. 530: chat_msg_recv_bindings(Queue, JIDs) -> 531: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs]. 532: 533: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 534: [rabbit_binding()]. 535: group_chat_msg_sent_bindings(Queue, JIDs) -> 536: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)} 537: || JID <- JIDs]. 538: 539: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 540: [rabbit_binding()]. 541: group_chat_msg_recv_bindings(Queue, JIDs) -> 542: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)} 543: || JID <- JIDs]. 544: 545: -spec bind_queues_to_exchanges(Channel :: pid(), 546: Bindings :: [rabbit_binding()]) -> 547: [amqp_client:amqp_method() | ok | blocked | closing]. 548: bind_queues_to_exchanges(Channel, Bindings) -> 549: [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings]. 550: 551: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) -> 552: amqp_client:amqp_method() | ok | blocked | closing. 553: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) -> 554: #'queue.bind_ok'{} = 555: amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange, 556: routing_key = RoutingKey, 557: queue = Queue}). 558: 559: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) -> 560: {ok, true} | {timeout, any()}. 561: ensure_exchange_present(Connection, Exchange) -> 562: Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000, 563: sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL}, 564: case mongoose_helper:wait_until(fun() -> 565: is_exchange_present(Connection, 566: Exchange) 567: end, true, Opts) of 568: {ok, true} -> true; 569: {timeout, _} -> 570: throw(io_lib:format("Exchange has not been created, exchange=~p", 571: [Exchange])) 572: end. 573: 574: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean(). 575: is_exchange_present(Connection, {ExName, ExType}) -> 576: {ok, Channel} = amqp_connection:open_channel(Connection), 577: try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName, 578: type = ExType, 579: %% this option allows to 580: %% check if an exchange exists 581: passive = true}) of 582: {'exchange.declare_ok'} -> true 583: catch 584: _Error:_Reason -> false 585: end. 586: 587: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok. 588: subscribe_to_rabbit_queue(Channel, Queue) -> 589: amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue, 590: no_ack = true}, self()), 591: receive 592: #'basic.consume_ok'{} -> ok 593: after 594: 5000 -> 595: throw(io_lib:format("Timeout when subscribing to queue," 596: "channel=~p, queue=~p", [Channel, Queue])) 597: end. 598: 599: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 600: -> [[ok]] | term(). 601: send_presence_stanzas(Users, NumOfMsgs) -> 602: [send_presence_stanza(User, NumOfMsgs) || User <- Users]. 603: 604: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 605: -> [ok] | term(). 606: send_presence_stanza(User, NumOfMsgs) -> 607: [escalus:send(User, escalus_stanza:presence(make_pres_type(X))) 608: || X <- lists:seq(1, NumOfMsgs)]. 609: 610: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) -> 611: map() | no_return(). 612: get_decoded_message_from_rabbit(RoutingKey) -> 613: receive 614: {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} -> 615: jiffy:decode(Msg, [return_maps]) 616: after 617: 5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p", 618: [RoutingKey])) 619: end. 620: 621: %%-------------------------------------------------------------------- 622: %% Utils 623: %%-------------------------------------------------------------------- 624: 625: start_mod_event_pusher_rabbit(Config) -> 626: dynamic_modules:start(domain(), mod_event_pusher_rabbit, Config). 627: 628: stop_mod_event_pusher_rabbit() -> 629: dynamic_modules:stop(domain(), mod_event_pusher_rabbit). 630: 631: start_rabbit_wpool(Host) -> 632: start_rabbit_wpool(Host, ?WPOOL_CFG). 633: 634: start_rabbit_wpool(Host, WpoolConfig) -> 635: rpc(mim(), mongoose_wpool, ensure_started, []), 636: rpc(mim(), mongoose_wpool, start_configured_pools, [[WpoolConfig], [Host]]). 637: 638: stop_rabbit_wpool({Pool, Host, Tag}) -> 639: rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]); 640: stop_rabbit_wpool(Host)-> 641: stop_rabbit_wpool({rabbit, Host, event_pusher}). 642: 643: delete_exchanges() -> 644: ConnConf = connect_to_rabbit(), 645: Channel = proplists:get_value(rabbit_channel, ConnConf), 646: [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex}) 647: || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]], 648: close_rabbit_connection(ConnConf). 649: 650: make_pres_type(X) when X rem 2 == 0 -> 651: <<"available">>; 652: make_pres_type(_) -> 653: <<"unavailable">>. 654: 655: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC). 656: 657: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC). 658: 659: group_chat_msg_sent_rk(JID) -> 660: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC). 661: 662: group_chat_msg_recv_rk(JID) -> 663: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC). 664: 665: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>. 666: 667: client_lower_short_jid(Client) -> 668: escalus_utils:jid_to_lower(escalus_client:short_jid(Client)). 669: 670: client_lower_full_jid(Client) -> 671: escalus_utils:jid_to_lower(escalus_client:full_jid(Client)). 672: 673: nick(User) -> escalus_utils:get_username(User). 674: 675: maybe_prepare_muc(TestCase, Config) when 676: TestCase == group_chat_message_sent_event orelse 677: TestCase == group_chat_message_received_event orelse 678: TestCase == group_chat_message_sent_event_properly_formatted orelse 679: TestCase == group_chat_message_received_event_properly_formatted -> 680: prepare_muc(Config); 681: maybe_prepare_muc(_, Config) -> Config. 682: 683: maybe_cleanup_muc(TestCase, Config) when 684: TestCase == group_chat_message_sent_event orelse 685: TestCase == group_chat_message_received_event orelse 686: TestCase == group_chat_message_sent_event_properly_formatted orelse 687: TestCase == group_chat_message_received_event_properly_formatted -> 688: cleanup_muc(Config); 689: maybe_cleanup_muc(_, _) -> ok. 690: 691: prepare_muc(Config) -> 692: [User | _] = ?config(escalus_users, Config), 693: muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>, 694: [{persistent, true}, 695: {anonymous, false}]). 696: 697: cleanup_muc(Config) -> 698: muc_helper:destroy_room(Config). 699: 700: user_room_jid(RoomJID, UserJID) -> 701: Nick = nick(UserJID), 702: <<RoomJID/binary, "/", Nick/binary>>. 703: 704: extend_config_with_exchange_type(ExType) -> 705: lists:map(fun({Ex, Opts}) when 706: Ex == presence_exchange orelse 707: Ex == chat_msg_exchange orelse 708: Ex == groupchat_msg_exchange -> 709: {Ex, Opts ++ [{type, ExType}]}; 710: (Other) -> Other 711: end, ?MOD_EVENT_PUSHER_RABBIT_CFG). 712: 713: is_rabbitmq_available() -> 714: try amqp_connection:start(#amqp_params_network{}) of 715: {ok, Conn} -> 716: amqp_connection:close(Conn), 717: true; 718: {error, econnrefused} -> 719: false 720: catch 721: _Err -> 722: false 723: end.