1: %%%------------------------------------------------------------------- 2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com> 3: %%% @copyright (C) 2018, Kacper Mentel 4: %%% @doc 5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`). 6: %%% @end 7: %%%------------------------------------------------------------------- 8: -module(mod_event_pusher_rabbit_SUITE). 9: 10: -include_lib("escalus/include/escalus.hrl"). 11: -include_lib("common_test/include/ct.hrl"). 12: -include_lib("eunit/include/eunit.hrl"). 13: -include_lib("escalus/include/escalus_xmlns.hrl"). 14: -include_lib("exml/include/exml.hrl"). 15: -include_lib("amqp_client/include/amqp_client.hrl"). 16: 17: -include("assert_received_match.hrl"). 18: 19: -import(distributed_helper, [mim/0, 20: rpc/4]). 21: 22: -import(domain_helper, [domain/0]). 23: 24: -export([suite/0, all/0, groups/0]). 25: -export([init_per_suite/1, end_per_suite/1, 26: init_per_group/2, end_per_group/2, 27: init_per_testcase/2, end_per_testcase/2]). 28: 29: -export([rabbit_pool_starts_with_default_config/1, 30: exchanges_are_created_on_module_startup/1]). 31: -export([connected_users_push_presence_events_when_change_status/1, 32: presence_messages_are_properly_formatted/1]). 33: -export([chat_message_sent_event/1, 34: chat_message_sent_event_properly_formatted/1, 35: chat_message_received_event/1, 36: chat_message_received_event_properly_formatted/1]). 37: -export([group_chat_message_sent_event/1, 38: group_chat_message_sent_event_properly_formatted/1, 39: group_chat_message_received_event/1, 40: group_chat_message_received_event_properly_formatted/1]). 41: 42: -define(QUEUE_NAME, <<"test_queue">>). 43: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>). 44: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>). 45: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>). 46: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>). 47: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>). 48: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>). 49: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>). 50: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>). 51: -define(MOD_EVENT_PUSHER_RABBIT_CFG, 52: [{presence_exchange, [{name, ?PRESENCE_EXCHANGE}]}, 53: {chat_msg_exchange, [{name, ?CHAT_MSG_EXCHANGE}, 54: {sent_topic, ?CHAT_MSG_SENT_TOPIC}, 55: {recv_topic, ?CHAT_MSG_RECV_TOPIC}]}, 56: {groupchat_msg_exchange, [{name, ?GROUP_CHAT_MSG_EXCHANGE}, 57: {sent_topic, ?GROUP_CHAT_MSG_SENT_TOPIC}, 58: {recv_topic, ?GROUP_CHAT_MSG_RECV_TOPIC}]} 59: ]). 60: -define(MOD_EVENT_PUSHER_CFG, [{backends, 61: [{rabbit, ?MOD_EVENT_PUSHER_RABBIT_CFG}]}]). 62: -define(WPOOL_CFG, #{type => rabbit, scope => host, tag => event_pusher, 63: opts => #{workers => 20, strategy => best_worker, call_timeout => 5000}, 64: conn_opts => #{confirms_enabled => false, 65: amqp_host => "localhost", 66: amqp_port => 5672, 67: amqp_username => "guest", 68: amqp_password => "guest", 69: max_worker_queue_len => 1000} 70: }). 71: -define(IF_EXCHANGE_EXISTS_RETRIES, 30). 72: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms 73: 74: -type rabbit_binding() :: {Queue :: binary(), 75: Exchange :: binary(), 76: RoutingKey :: binary()}. 77: 78: %%-------------------------------------------------------------------- 79: %% Suite configuration 80: %%-------------------------------------------------------------------- 81: 82: all() -> 83: [ 84: {group, initialization_on_startup}, 85: {group, presence_status_publish}, 86: {group, chat_message_publish}, 87: {group, group_chat_message_publish} 88: ]. 89: 90: groups() -> 91: G = [ 92: {initialization_on_startup, [], 93: [ 94: rabbit_pool_starts_with_default_config, 95: exchanges_are_created_on_module_startup 96: ]}, 97: {presence_status_publish, [], 98: [ 99: connected_users_push_presence_events_when_change_status, 100: presence_messages_are_properly_formatted 101: ]}, 102: {chat_message_publish, [], 103: [ 104: chat_message_sent_event, 105: chat_message_sent_event_properly_formatted, 106: chat_message_received_event, 107: chat_message_received_event_properly_formatted 108: ]}, 109: {group_chat_message_publish, [], 110: [ 111: group_chat_message_sent_event, 112: group_chat_message_sent_event_properly_formatted, 113: group_chat_message_received_event, 114: group_chat_message_received_event_properly_formatted 115: ]} 116: ], 117: ct_helper:repeat_all_until_all_ok(G). 118: 119: suite() -> 120: escalus:suite(). 121: 122: %%-------------------------------------------------------------------- 123: %% Init & teardown 124: %%-------------------------------------------------------------------- 125: 126: init_per_suite(Config) -> 127: start_rabbit_wpool(domain()), 128: {ok, _} = application:ensure_all_started(amqp_client), 129: case is_rabbitmq_available() of 130: true -> 131: muc_helper:load_muc(), 132: escalus:init_per_suite(Config); 133: false -> 134: {skip, "RabbitMQ server is not available on default port."} 135: end. 136: 137: end_per_suite(Config) -> 138: stop_rabbit_wpool(domain()), 139: escalus_fresh:clean(), 140: muc_helper:unload_muc(), 141: escalus:end_per_suite(Config). 142: 143: init_per_group(initialization_on_startup, Config) -> 144: Config; 145: init_per_group(_, Config0) -> 146: Domain = domain(), 147: Config = dynamic_modules:save_modules(Domain, Config0), 148: dynamic_modules:ensure_modules(Domain, 149: [{mod_event_pusher, ?MOD_EVENT_PUSHER_CFG}]), 150: Config. 151: 152: end_per_group(initialization_on_startup, Config) -> 153: Config; 154: end_per_group(_, Config) -> 155: delete_exchanges(), 156: Domain = domain(), 157: dynamic_modules:stop(Domain, mod_event_pusher), 158: dynamic_modules:restore_modules(Config), 159: escalus:delete_users(Config, escalus:get_users([bob, alice])). 160: 161: init_per_testcase(rabbit_pool_starts_with_default_config, Config) -> 162: Config; 163: init_per_testcase(CaseName, Config0) -> 164: Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]), 165: Config2 = maybe_prepare_muc(CaseName, Config1), 166: Config = Config2 ++ connect_to_rabbit(), 167: escalus:init_per_testcase(CaseName, Config). 168: 169: end_per_testcase(rabbit_pool_starts_with_default_config, Config) -> 170: Config; 171: end_per_testcase(exchanges_are_created_on_module_startup, Config) -> 172: delete_exchanges(), 173: stop_mod_event_pusher_rabbit(), 174: close_rabbit_connection(Config), 175: Config; 176: end_per_testcase(CaseName, Config) -> 177: maybe_cleanup_muc(CaseName, Config), 178: close_rabbit_connection(Config), 179: escalus:end_per_testcase(CaseName, Config). 180: 181: 182: %%-------------------------------------------------------------------- 183: %% GROUP initialization_on_startup 184: %%-------------------------------------------------------------------- 185: 186: rabbit_pool_starts_with_default_config(_Config) -> 187: %% GIVEN 188: Domain = domain(), 189: DefaultWpoolConfig = #{type => rabbit, scope => host, tag => rabbit_event_pusher_default, 190: opts => #{workers => 10, strategy => best_worker, call_timeout => 5000}, 191: conn_opts => #{amqp_port => 5672, confirms_enabled => false, max_worker_queue_len => 1000}}, 192: RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default}, 193: %% WHEN 194: start_rabbit_wpool(Domain, DefaultWpoolConfig), 195: %% THEN 196: Pools = rpc(mim(), mongoose_wpool, get_pools, []), 197: ?assertMatch(RabbitWpool, 198: lists:keyfind(rabbit_event_pusher_default, 3, Pools)), 199: %% CLEANUP 200: stop_rabbit_wpool(RabbitWpool). 201: 202: exchanges_are_created_on_module_startup(Config) -> 203: %% GIVEN 204: Connection = proplists:get_value(rabbit_connection, Config), 205: ExCustomType = <<"headers">>, 206: Exchanges = [{?PRESENCE_EXCHANGE, ExCustomType}, 207: {?CHAT_MSG_EXCHANGE, ExCustomType}, 208: {?GROUP_CHAT_MSG_EXCHANGE, ExCustomType}], 209: ConfigWithCustomExchangeType = 210: extend_config_with_exchange_type(ExCustomType), 211: %% WHEN 212: start_mod_event_pusher_rabbit(ConfigWithCustomExchangeType), 213: %% THEN exchanges are created 214: [?assert(ensure_exchange_present(Connection, Exchange)) 215: || Exchange <- Exchanges]. 216: 217: %%-------------------------------------------------------------------- 218: %% GROUP presence_status_publish 219: %%-------------------------------------------------------------------- 220: 221: connected_users_push_presence_events_when_change_status(Config) -> 222: escalus:story( 223: Config, [{bob, 1}], 224: fun(Bob) -> 225: %% GIVEN 226: BobJID = client_lower_short_jid(Bob), 227: listen_to_presence_events_from_rabbit([BobJID], Config), 228: %% WHEN users generate some traffic. 229: send_presence_stanzas([Bob], 1), 230: %% THEN wait for presence events from Rabbit. 231: ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID}, 232: #amqp_msg{}}, timer:seconds(5)) 233: end). 234: 235: presence_messages_are_properly_formatted(Config) -> 236: escalus:story( 237: Config, [{bob, 1}], 238: fun(Bob) -> 239: %% GIVEN 240: BobJID = client_lower_short_jid(Bob), 241: BobFullJID = client_lower_full_jid(Bob), 242: listen_to_presence_events_from_rabbit([BobJID], Config), 243: %% WHEN user logout 244: escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)), 245: %% THEN receive message 246: ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, 247: get_decoded_message_from_rabbit(BobJID)) 248: end). 249: 250: %%-------------------------------------------------------------------- 251: %% GROUP chat_message_publish 252: %%-------------------------------------------------------------------- 253: 254: chat_message_sent_event(Config) -> 255: escalus:story( 256: Config, [{bob, 1}, {alice, 1}], 257: fun(Bob, Alice) -> 258: %% GIVEN 259: BobJID = client_lower_short_jid(Bob), 260: BobChatMsgSentRK = chat_msg_sent_rk(BobJID), 261: listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config), 262: %% WHEN users chat 263: escalus:send(Bob, 264: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 265: %% THEN wait for chat message sent events events from Rabbit. 266: ?assertReceivedMatch({#'basic.deliver'{ 267: routing_key = BobChatMsgSentRK}, 268: #amqp_msg{}}, timer:seconds(5)) 269: end). 270: 271: chat_message_received_event(Config) -> 272: escalus:story( 273: Config, [{bob, 1}, {alice, 1}], 274: fun(Bob, Alice) -> 275: %% GIVEN 276: AliceJID = client_lower_short_jid(Alice), 277: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 278: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 279: %% WHEN users chat 280: escalus:send(Bob, 281: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 282: escalus:wait_for_stanzas(Alice, 2), 283: %% THEN wait for chat message received events events from 284: %% Rabbit. 285: ?assertReceivedMatch({#'basic.deliver'{ 286: routing_key = AliceChatMsgRecvRK}, 287: #amqp_msg{}}, timer:seconds(5)) 288: end). 289: 290: chat_message_sent_event_properly_formatted(Config) -> 291: escalus:story( 292: Config, [{bob, 1}, {alice, 1}], 293: fun(Bob, Alice) -> 294: %% GIVEN 295: AliceJID = client_lower_short_jid(Alice), 296: AliceFullJID = client_lower_full_jid(Alice), 297: BobFullJID = client_lower_full_jid(Bob), 298: AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID), 299: Message = <<"Hi Bob!">>, 300: listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config), 301: %% WHEN users chat 302: escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)), 303: %% THEN 304: ?assertMatch(#{<<"from_user_id">> := AliceFullJID, 305: <<"to_user_id">> := BobFullJID, 306: <<"message">> := Message}, 307: get_decoded_message_from_rabbit(AliceChatMsgSentRK)) 308: end). 309: 310: chat_message_received_event_properly_formatted(Config) -> 311: escalus:story( 312: Config, [{bob, 1}, {alice, 1}], 313: fun(Bob, Alice) -> 314: %% GIVEN 315: AliceJID = client_lower_short_jid(Alice), 316: AliceFullJID = client_lower_full_jid(Alice), 317: BobFullJID = client_lower_full_jid(Bob), 318: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 319: Message = <<"Hi Alice!">>, 320: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 321: %% WHEN users chat 322: escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)), 323: escalus:wait_for_stanzas(Alice, 2), 324: %% THEN 325: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 326: <<"to_user_id">> := AliceFullJID, 327: <<"message">> := Message}, 328: get_decoded_message_from_rabbit(AliceChatMsgRecvRK)) 329: end). 330: 331: %%-------------------------------------------------------------------- 332: %% GROUP group_message_publish 333: %%-------------------------------------------------------------------- 334: 335: group_chat_message_sent_event(Config) -> 336: escalus:story( 337: Config, [{bob, 1}], 338: fun(Bob) -> 339: %% GIVEN 340: Room = ?config(room, Config), 341: RoomAddr = muc_helper:room_address(Room), 342: BobJID = client_lower_short_jid(Bob), 343: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 344: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 345: %% WHEN users chat 346: escalus:send(Bob, 347: muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 348: 349: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 350: <<"Hi there!">>)), 351: 352: %% THEN wait for chat message sent events events from Rabbit. 353: ?assertReceivedMatch({#'basic.deliver'{ 354: routing_key = BobGroupChatMsgSentRK}, 355: #amqp_msg{}}, timer:seconds(5)) 356: end). 357: 358: group_chat_message_received_event(Config) -> 359: escalus:story( 360: Config, [{bob, 1}, {alice, 1}], 361: fun(Bob, Alice) -> 362: %% GIVEN 363: Room = ?config(room, Config), 364: RoomAddr = muc_helper:room_address(Room), 365: AliceJID = client_lower_short_jid(Alice), 366: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 367: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], 368: Config), 369: %% WHEN users chat 370: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, 371: nick(Alice))), 372: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 373: nick(Bob))), 374: 375: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 376: <<"Hi there!">>)), 377: 378: %% THEN wait for chat message received events events from Rabbit. 379: ?assertReceivedMatch({#'basic.deliver'{ 380: routing_key = AliceGroupChatMsgRecvRK}, 381: #amqp_msg{}}, timer:seconds(5)) 382: end). 383: 384: group_chat_message_sent_event_properly_formatted(Config) -> 385: escalus:story( 386: Config, [{bob, 1}], 387: fun(Bob) -> 388: %% GIVEN 389: Room = ?config(room, Config), 390: RoomAddr = muc_helper:room_address(Room), 391: BobJID = client_lower_short_jid(Bob), 392: BobFullJID = client_lower_full_jid(Bob), 393: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 394: Message = <<"Hi there!">>, 395: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 396: %% WHEN a user chat 397: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 398: nick(Bob))), 399: 400: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 401: %% THEN 402: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 403: <<"to_user_id">> := RoomAddr, 404: <<"message">> := Message}, 405: get_decoded_message_from_rabbit(BobGroupChatMsgSentRK)) 406: end). 407: 408: group_chat_message_received_event_properly_formatted(Config) -> 409: escalus:story( 410: Config, [{bob, 1}, {alice, 1}], 411: fun(Bob, Alice) -> 412: %% GIVEN basic variables 413: Room = ?config(room, Config), 414: RoomAddr = muc_helper:room_address(Room), 415: BobRoomJID = user_room_jid(RoomAddr, Bob), 416: AliceJID = client_lower_short_jid(Alice), 417: AliceFullJID = client_lower_full_jid(Alice), 418: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 419: Message = <<"Hi there!">>, 420: %% GIVEN users in room 421: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))), 422: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 423: % wait for all room stanzas to be processed 424: escalus:wait_for_stanzas(Alice, 3), 425: escalus:wait_for_stanzas(Bob, 3), 426: %% GIVEN Room subscription to Rabbit 427: % We subscribe to RMQ now and not earlier to avoid messages other 428: % than the one we are testing, `Message` from Bob to Room, like 429: % for example affiliations and the like. 430: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config), 431: %% WHEN users chat 432: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 433: %% THEN 434: ?assertMatch(#{<<"from_user_id">> := BobRoomJID, 435: <<"to_user_id">> := AliceFullJID, 436: <<"message">> := Message}, 437: get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK)) 438: end). 439: 440: %%-------------------------------------------------------------------- 441: %% Test helpers 442: %%-------------------------------------------------------------------- 443: 444: -spec connect_to_rabbit() -> proplists:proplist(). 445: connect_to_rabbit() -> 446: {ok, Connection} = 447: amqp_connection:start(#amqp_params_network{}), 448: {ok, Channel} = amqp_connection:open_channel(Connection), 449: [{rabbit_connection, Connection}, {rabbit_channel, Channel}]. 450: 451: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term(). 452: close_rabbit_connection(Config) -> 453: Connection = proplists:get_value(rabbit_connection, Config), 454: Channel = proplists:get_value(rabbit_channel, Config), 455: amqp_channel:close(Channel), 456: amqp_connection:close(Connection). 457: 458: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()], 459: Config :: proplists:proplist()) -> 460: ok | term(). 461: listen_to_presence_events_from_rabbit(JIDs, Config) -> 462: QueueBindings = presence_bindings(?QUEUE_NAME, JIDs), 463: listen_to_events_from_rabbit(QueueBindings, Config). 464: 465: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 466: Config :: proplists:proplist()) -> 467: ok | term(). 468: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 469: QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 470: listen_to_events_from_rabbit(QueueBindings, Config). 471: 472: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 473: Config :: proplists:proplist()) -> 474: ok | term(). 475: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 476: QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 477: listen_to_events_from_rabbit(QueueBindings, Config). 478: 479: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 480: Config :: proplists:proplist()) -> 481: ok | term(). 482: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 483: QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 484: listen_to_events_from_rabbit(QueueBindings, Config). 485: 486: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 487: Config :: proplists:proplist()) -> 488: ok | term(). 489: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 490: QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 491: listen_to_events_from_rabbit(QueueBindings, Config). 492: 493: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()], 494: Config :: proplists:proplist()) -> 495: ok | term(). 496: listen_to_events_from_rabbit(QueueBindings, Config) -> 497: Connection = proplists:get_value(rabbit_connection, Config), 498: Channel = proplists:get_value(rabbit_channel, Config), 499: declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME), 500: wait_for_exchanges_to_be_created(Connection, 501: [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 502: {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 503: {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]), 504: bind_queues_to_exchanges(Channel, QueueBindings), 505: subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME). 506: 507: -spec wait_for_exchanges_to_be_created(Connection :: pid(), 508: Exchanges :: [binary()]) -> pid(). 509: wait_for_exchanges_to_be_created(Connection, Exchanges) -> 510: [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges], 511: ok. 512: 513: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary(). 514: declare_temporary_rabbit_queue(Channel, Queue) -> 515: #'queue.declare_ok'{} = 516: amqp_channel:call(Channel, #'queue.declare'{queue = Queue, 517: exclusive = true}). 518: 519: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) -> 520: [rabbit_binding()]. 521: presence_bindings(Queue, JIDs) -> 522: [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs]. 523: 524: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 525: [rabbit_binding()]. 526: chat_msg_sent_bindings(Queue, JIDs) -> 527: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs]. 528: 529: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 530: [rabbit_binding()]. 531: chat_msg_recv_bindings(Queue, JIDs) -> 532: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs]. 533: 534: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 535: [rabbit_binding()]. 536: group_chat_msg_sent_bindings(Queue, JIDs) -> 537: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)} 538: || JID <- JIDs]. 539: 540: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 541: [rabbit_binding()]. 542: group_chat_msg_recv_bindings(Queue, JIDs) -> 543: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)} 544: || JID <- JIDs]. 545: 546: -spec bind_queues_to_exchanges(Channel :: pid(), 547: Bindings :: [rabbit_binding()]) -> 548: [amqp_client:amqp_method() | ok | blocked | closing]. 549: bind_queues_to_exchanges(Channel, Bindings) -> 550: [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings]. 551: 552: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) -> 553: amqp_client:amqp_method() | ok | blocked | closing. 554: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) -> 555: #'queue.bind_ok'{} = 556: amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange, 557: routing_key = RoutingKey, 558: queue = Queue}). 559: 560: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) -> 561: {ok, true} | {timeout, any()}. 562: ensure_exchange_present(Connection, Exchange) -> 563: Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000, 564: sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL}, 565: case mongoose_helper:wait_until(fun() -> 566: is_exchange_present(Connection, 567: Exchange) 568: end, true, Opts) of 569: {ok, true} -> true; 570: {timeout, _} -> 571: throw(io_lib:format("Exchange has not been created, exchange=~p", 572: [Exchange])) 573: end. 574: 575: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean(). 576: is_exchange_present(Connection, {ExName, ExType}) -> 577: {ok, Channel} = amqp_connection:open_channel(Connection), 578: try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName, 579: type = ExType, 580: %% this option allows to 581: %% check if an exchange exists 582: passive = true}) of 583: {'exchange.declare_ok'} -> true 584: catch 585: _Error:_Reason -> false 586: end. 587: 588: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok. 589: subscribe_to_rabbit_queue(Channel, Queue) -> 590: amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue, 591: no_ack = true}, self()), 592: receive 593: #'basic.consume_ok'{} -> ok 594: after 595: 5000 -> 596: throw(io_lib:format("Timeout when subscribing to queue," 597: "channel=~p, queue=~p", [Channel, Queue])) 598: end. 599: 600: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 601: -> [[ok]] | term(). 602: send_presence_stanzas(Users, NumOfMsgs) -> 603: [send_presence_stanza(User, NumOfMsgs) || User <- Users]. 604: 605: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 606: -> [ok] | term(). 607: send_presence_stanza(User, NumOfMsgs) -> 608: [escalus:send(User, escalus_stanza:presence(make_pres_type(X))) 609: || X <- lists:seq(1, NumOfMsgs)]. 610: 611: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) -> 612: map() | no_return(). 613: get_decoded_message_from_rabbit(RoutingKey) -> 614: receive 615: {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} -> 616: jiffy:decode(Msg, [return_maps]) 617: after 618: 5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p", 619: [RoutingKey])) 620: end. 621: 622: %%-------------------------------------------------------------------- 623: %% Utils 624: %%-------------------------------------------------------------------- 625: 626: start_mod_event_pusher_rabbit(Config) -> 627: dynamic_modules:start(domain(), mod_event_pusher_rabbit, Config). 628: 629: stop_mod_event_pusher_rabbit() -> 630: dynamic_modules:stop(domain(), mod_event_pusher_rabbit). 631: 632: start_rabbit_wpool(Host) -> 633: start_rabbit_wpool(Host, ?WPOOL_CFG). 634: 635: start_rabbit_wpool(Host, WpoolConfig) -> 636: rpc(mim(), mongoose_wpool, ensure_started, []), 637: rpc(mim(), mongoose_wpool, start_configured_pools, [[WpoolConfig], [Host]]). 638: 639: stop_rabbit_wpool({Pool, Host, Tag}) -> 640: rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]); 641: stop_rabbit_wpool(Host)-> 642: stop_rabbit_wpool({rabbit, Host, event_pusher}). 643: 644: delete_exchanges() -> 645: ConnConf = connect_to_rabbit(), 646: Channel = proplists:get_value(rabbit_channel, ConnConf), 647: [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex}) 648: || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]], 649: close_rabbit_connection(ConnConf). 650: 651: make_pres_type(X) when X rem 2 == 0 -> 652: <<"available">>; 653: make_pres_type(_) -> 654: <<"unavailable">>. 655: 656: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC). 657: 658: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC). 659: 660: group_chat_msg_sent_rk(JID) -> 661: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC). 662: 663: group_chat_msg_recv_rk(JID) -> 664: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC). 665: 666: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>. 667: 668: client_lower_short_jid(Client) -> 669: escalus_utils:jid_to_lower(escalus_client:short_jid(Client)). 670: 671: client_lower_full_jid(Client) -> 672: escalus_utils:jid_to_lower(escalus_client:full_jid(Client)). 673: 674: nick(User) -> escalus_utils:get_username(User). 675: 676: maybe_prepare_muc(TestCase, Config) when 677: TestCase == group_chat_message_sent_event orelse 678: TestCase == group_chat_message_received_event orelse 679: TestCase == group_chat_message_sent_event_properly_formatted orelse 680: TestCase == group_chat_message_received_event_properly_formatted -> 681: prepare_muc(Config); 682: maybe_prepare_muc(_, Config) -> Config. 683: 684: maybe_cleanup_muc(TestCase, Config) when 685: TestCase == group_chat_message_sent_event orelse 686: TestCase == group_chat_message_received_event orelse 687: TestCase == group_chat_message_sent_event_properly_formatted orelse 688: TestCase == group_chat_message_received_event_properly_formatted -> 689: cleanup_muc(Config); 690: maybe_cleanup_muc(_, _) -> ok. 691: 692: prepare_muc(Config) -> 693: [User | _] = ?config(escalus_users, Config), 694: muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>, 695: [{persistent, true}, 696: {anonymous, false}]). 697: 698: cleanup_muc(Config) -> 699: muc_helper:destroy_room(Config). 700: 701: user_room_jid(RoomJID, UserJID) -> 702: Nick = nick(UserJID), 703: <<RoomJID/binary, "/", Nick/binary>>. 704: 705: extend_config_with_exchange_type(ExType) -> 706: lists:map(fun({Ex, Opts}) when 707: Ex == presence_exchange orelse 708: Ex == chat_msg_exchange orelse 709: Ex == groupchat_msg_exchange -> 710: {Ex, Opts ++ [{type, ExType}]}; 711: (Other) -> Other 712: end, ?MOD_EVENT_PUSHER_RABBIT_CFG). 713: 714: is_rabbitmq_available() -> 715: try amqp_connection:start(#amqp_params_network{}) of 716: {ok, Conn} -> 717: amqp_connection:close(Conn), 718: true; 719: {error, econnrefused} -> 720: false 721: catch 722: _Err -> 723: false 724: end.