1: %%%------------------------------------------------------------------- 2: %%% @author Kacper Mentel <kacper.mentel@erlang-solutions.com> 3: %%% @copyright (C) 2018, Kacper Mentel 4: %%% @doc 5: %%% Tests for `mod_event_pusher` RabbitMQ backend (`mod_event_pusher_rabbit`). 6: %%% @end 7: %%%------------------------------------------------------------------- 8: -module(mod_event_pusher_rabbit_SUITE). 9: 10: -include_lib("common_test/include/ct.hrl"). 11: -include_lib("eunit/include/eunit.hrl"). 12: -include_lib("amqp_client/include/amqp_client.hrl"). 13: 14: -include("assert_received_match.hrl"). 15: 16: -import(distributed_helper, [mim/0, rpc/4]). 17: -import(domain_helper, [domain/0]). 18: -import(config_parser_helper, [config/2]). 19: 20: -export([suite/0, all/0, groups/0]). 21: -export([init_per_suite/1, end_per_suite/1, 22: init_per_group/2, end_per_group/2, 23: init_per_testcase/2, end_per_testcase/2]). 24: 25: -export([rabbit_pool_starts_with_default_config/1, 26: exchanges_are_created_on_module_startup/1]). 27: -export([connected_users_push_presence_events_when_change_status/1, 28: presence_messages_are_properly_formatted/1]). 29: -export([chat_message_sent_event/1, 30: chat_message_sent_event_properly_formatted/1, 31: chat_message_received_event/1, 32: chat_message_received_event_properly_formatted/1]). 33: -export([group_chat_message_sent_event/1, 34: group_chat_message_sent_event_properly_formatted/1, 35: group_chat_message_received_event/1, 36: group_chat_message_received_event_properly_formatted/1]). 37: 38: -define(QUEUE_NAME, <<"test_queue">>). 39: -define(DEFAULT_EXCHANGE_TYPE, <<"topic">>). 40: -define(PRESENCE_EXCHANGE, <<"custom_presence_exchange">>). 41: -define(CHAT_MSG_EXCHANGE, <<"custom_chat_msg_exchange">>). 42: -define(GROUP_CHAT_MSG_EXCHANGE, <<"custom_group_chat_msg_exchange">>). 43: -define(CHAT_MSG_SENT_TOPIC, <<"custom_chat_msg_sent_topic">>). 44: -define(CHAT_MSG_RECV_TOPIC, <<"custom_chat_msg_recv_topic">>). 45: -define(GROUP_CHAT_MSG_SENT_TOPIC, <<"custom_group_chat_msg_sent_topic">>). 46: -define(GROUP_CHAT_MSG_RECV_TOPIC, <<"custom_group_chat_msg_recv_topic">>). 47: -define(WPOOL_CFG, #{type => rabbit, scope => host, tag => event_pusher, 48: opts => #{workers => 20, strategy => best_worker, call_timeout => 5000}, 49: conn_opts => #{confirms_enabled => false, 50: amqp_host => "localhost", 51: amqp_port => 5672, 52: amqp_username => "guest", 53: amqp_password => "guest", 54: max_worker_queue_len => 1000} 55: }). 56: -define(IF_EXCHANGE_EXISTS_RETRIES, 30). 57: -define(WAIT_FOR_EXCHANGE_INTERVAL, 100). % ms 58: 59: -type rabbit_binding() :: {Queue :: binary(), 60: Exchange :: binary(), 61: RoutingKey :: binary()}. 62: 63: %%-------------------------------------------------------------------- 64: %% Suite configuration 65: %%-------------------------------------------------------------------- 66: 67: all() -> 68: [ 69: {group, pool_startup}, 70: {group, module_startup}, 71: {group, presence_status_publish}, 72: {group, chat_message_publish}, 73: {group, group_chat_message_publish} 74: ]. 75: 76: groups() -> 77: G = [ 78: {pool_startup, [], 79: [ 80: rabbit_pool_starts_with_default_config 81: ]}, 82: {module_startup, [], 83: [ 84: exchanges_are_created_on_module_startup 85: ]}, 86: {presence_status_publish, [], 87: [ 88: connected_users_push_presence_events_when_change_status, 89: presence_messages_are_properly_formatted 90: ]}, 91: {chat_message_publish, [], 92: [ 93: chat_message_sent_event, 94: chat_message_sent_event_properly_formatted, 95: chat_message_received_event, 96: chat_message_received_event_properly_formatted 97: ]}, 98: {group_chat_message_publish, [], 99: [ 100: group_chat_message_sent_event, 101: group_chat_message_sent_event_properly_formatted, 102: group_chat_message_received_event, 103: group_chat_message_received_event_properly_formatted 104: ]} 105: ], 106: ct_helper:repeat_all_until_all_ok(G). 107: 108: suite() -> 109: escalus:suite(). 110: 111: %%-------------------------------------------------------------------- 112: %% Init & teardown 113: %%-------------------------------------------------------------------- 114: 115: init_per_suite(Config) -> 116: start_rabbit_wpool(domain()), 117: {ok, _} = application:ensure_all_started(amqp_client), 118: case is_rabbitmq_available() of 119: true -> 120: muc_helper:load_muc(), 121: escalus:init_per_suite(Config); 122: false -> 123: {skip, "RabbitMQ server is not available on default port."} 124: end. 125: 126: end_per_suite(Config) -> 127: stop_rabbit_wpool(domain()), 128: escalus_fresh:clean(), 129: muc_helper:unload_muc(), 130: escalus:end_per_suite(Config). 131: 132: init_per_group(GroupName, Config0) -> 133: Domain = domain(), 134: Config = dynamic_modules:save_modules(Domain, Config0), 135: dynamic_modules:ensure_modules(Domain, required_modules(GroupName)), 136: Config. 137: 138: required_modules(pool_startup) -> 139: [{mod_event_pusher, stopped}]; 140: required_modules(GroupName) -> 141: [{mod_event_pusher, #{rabbit => mod_event_pusher_rabbit_opts(GroupName)}}]. 142: 143: mod_event_pusher_rabbit_opts(GroupName) -> 144: ExtraOpts = extra_exchange_opts(GroupName), 145: maps:map(fun(Key, Opts) -> 146: config([modules, mod_event_pusher, rabbit, Key], maps:merge(Opts, ExtraOpts)) 147: end, basic_exchanges()). 148: 149: basic_exchanges() -> 150: #{presence_exchange => #{name => ?PRESENCE_EXCHANGE}, 151: chat_msg_exchange => #{name => ?CHAT_MSG_EXCHANGE, 152: sent_topic => ?CHAT_MSG_SENT_TOPIC, 153: recv_topic => ?CHAT_MSG_RECV_TOPIC}, 154: groupchat_msg_exchange => #{name => ?GROUP_CHAT_MSG_EXCHANGE, 155: sent_topic => ?GROUP_CHAT_MSG_SENT_TOPIC, 156: recv_topic => ?GROUP_CHAT_MSG_RECV_TOPIC} 157: }. 158: 159: extra_exchange_opts(module_startup) -> #{type => <<"headers">>}; 160: extra_exchange_opts(_) -> #{}. 161: 162: end_per_group(_, Config) -> 163: delete_exchanges(), 164: dynamic_modules:restore_modules(Config). 165: 166: init_per_testcase(rabbit_pool_starts_with_default_config, Config) -> 167: Config; 168: init_per_testcase(CaseName, Config0) -> 169: Config1 = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]), 170: Config2 = maybe_prepare_muc(CaseName, Config1), 171: Config = Config2 ++ connect_to_rabbit(), 172: escalus:init_per_testcase(CaseName, Config). 173: 174: end_per_testcase(rabbit_pool_starts_with_default_config, _Config) -> 175: ok; 176: end_per_testcase(CaseName, Config) -> 177: maybe_cleanup_muc(CaseName, Config), 178: close_rabbit_connection(Config), 179: escalus:end_per_testcase(CaseName, Config). 180: 181: %%-------------------------------------------------------------------- 182: %% GROUP initialization_on_startup 183: %%-------------------------------------------------------------------- 184: 185: rabbit_pool_starts_with_default_config(_Config) -> 186: %% GIVEN 187: Domain = domain(), 188: DefaultWpoolConfig = #{type => rabbit, scope => host, tag => rabbit_event_pusher_default, 189: opts => #{workers => 10, strategy => best_worker, call_timeout => 5000}, 190: conn_opts => #{amqp_port => 5672, confirms_enabled => false, max_worker_queue_len => 1000}}, 191: RabbitWpool = {rabbit, Domain, rabbit_event_pusher_default}, 192: %% WHEN 193: start_rabbit_wpool(Domain, DefaultWpoolConfig), 194: %% THEN 195: Pools = rpc(mim(), mongoose_wpool, get_pools, []), 196: ?assertMatch(RabbitWpool, 197: lists:keyfind(rabbit_event_pusher_default, 3, Pools)), 198: %% CLEANUP 199: stop_rabbit_wpool(RabbitWpool). 200: 201: exchanges_are_created_on_module_startup(Config) -> 202: %% GIVEN module is started with custom exchange types 203: Connection = proplists:get_value(rabbit_connection, Config), 204: ExCustomType = <<"headers">>, 205: Exchanges = [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE], 206: %% THEN exchanges are created 207: [?assert(ensure_exchange_present(Connection, {Exchange, ExCustomType})) 208: || Exchange <- Exchanges]. 209: 210: %%-------------------------------------------------------------------- 211: %% GROUP presence_status_publish 212: %%-------------------------------------------------------------------- 213: 214: connected_users_push_presence_events_when_change_status(Config) -> 215: escalus:story( 216: Config, [{bob, 1}], 217: fun(Bob) -> 218: %% GIVEN 219: BobJID = client_lower_short_jid(Bob), 220: listen_to_presence_events_from_rabbit([BobJID], Config), 221: %% WHEN users generate some traffic. 222: send_presence_stanzas([Bob], 1), 223: %% THEN wait for presence events from Rabbit. 224: ?assertReceivedMatch({#'basic.deliver'{routing_key = BobJID}, 225: #amqp_msg{}}, timer:seconds(5)) 226: end). 227: 228: presence_messages_are_properly_formatted(Config) -> 229: escalus:story( 230: Config, [{bob, 1}], 231: fun(Bob) -> 232: %% GIVEN 233: BobJID = client_lower_short_jid(Bob), 234: BobFullJID = client_lower_full_jid(Bob), 235: listen_to_presence_events_from_rabbit([BobJID], Config), 236: %% WHEN user logout 237: escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)), 238: %% THEN receive message 239: ?assertMatch(#{<<"user_id">> := BobFullJID, <<"present">> := false}, 240: get_decoded_message_from_rabbit(BobJID)) 241: end). 242: 243: %%-------------------------------------------------------------------- 244: %% GROUP chat_message_publish 245: %%-------------------------------------------------------------------- 246: 247: chat_message_sent_event(Config) -> 248: escalus:story( 249: Config, [{bob, 1}, {alice, 1}], 250: fun(Bob, Alice) -> 251: %% GIVEN 252: BobJID = client_lower_short_jid(Bob), 253: BobChatMsgSentRK = chat_msg_sent_rk(BobJID), 254: listen_to_chat_msg_sent_events_from_rabbit([BobJID], Config), 255: %% WHEN users chat 256: escalus:send(Bob, 257: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 258: %% THEN wait for chat message sent events events from Rabbit. 259: ?assertReceivedMatch({#'basic.deliver'{ 260: routing_key = BobChatMsgSentRK}, 261: #amqp_msg{}}, timer:seconds(5)) 262: end). 263: 264: chat_message_received_event(Config) -> 265: escalus:story( 266: Config, [{bob, 1}, {alice, 1}], 267: fun(Bob, Alice) -> 268: %% GIVEN 269: AliceJID = client_lower_short_jid(Alice), 270: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 271: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 272: %% WHEN users chat 273: escalus:send(Bob, 274: escalus_stanza:chat_to(Alice, <<"Oh, hi Alice!">>)), 275: escalus:wait_for_stanzas(Alice, 2), 276: %% THEN wait for chat message received events events from 277: %% Rabbit. 278: ?assertReceivedMatch({#'basic.deliver'{ 279: routing_key = AliceChatMsgRecvRK}, 280: #amqp_msg{}}, timer:seconds(5)) 281: end). 282: 283: chat_message_sent_event_properly_formatted(Config) -> 284: escalus:story( 285: Config, [{bob, 1}, {alice, 1}], 286: fun(Bob, Alice) -> 287: %% GIVEN 288: AliceJID = client_lower_short_jid(Alice), 289: AliceFullJID = client_lower_full_jid(Alice), 290: BobFullJID = client_lower_full_jid(Bob), 291: AliceChatMsgSentRK = chat_msg_sent_rk(AliceJID), 292: Message = <<"Hi Bob!">>, 293: listen_to_chat_msg_sent_events_from_rabbit([AliceJID], Config), 294: %% WHEN users chat 295: escalus:send(Alice, escalus_stanza:chat_to(Bob, Message)), 296: %% THEN 297: ?assertMatch(#{<<"from_user_id">> := AliceFullJID, 298: <<"to_user_id">> := BobFullJID, 299: <<"message">> := Message}, 300: get_decoded_message_from_rabbit(AliceChatMsgSentRK)) 301: end). 302: 303: chat_message_received_event_properly_formatted(Config) -> 304: escalus:story( 305: Config, [{bob, 1}, {alice, 1}], 306: fun(Bob, Alice) -> 307: %% GIVEN 308: AliceJID = client_lower_short_jid(Alice), 309: AliceFullJID = client_lower_full_jid(Alice), 310: BobFullJID = client_lower_full_jid(Bob), 311: AliceChatMsgRecvRK = chat_msg_recv_rk(AliceJID), 312: Message = <<"Hi Alice!">>, 313: listen_to_chat_msg_recv_events_from_rabbit([AliceJID], Config), 314: %% WHEN users chat 315: escalus:send(Bob, escalus_stanza:chat_to(Alice, Message)), 316: escalus:wait_for_stanzas(Alice, 2), 317: %% THEN 318: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 319: <<"to_user_id">> := AliceFullJID, 320: <<"message">> := Message}, 321: get_decoded_message_from_rabbit(AliceChatMsgRecvRK)) 322: end). 323: 324: %%-------------------------------------------------------------------- 325: %% GROUP group_message_publish 326: %%-------------------------------------------------------------------- 327: 328: group_chat_message_sent_event(Config) -> 329: escalus:story( 330: Config, [{bob, 1}], 331: fun(Bob) -> 332: %% GIVEN 333: Room = ?config(room, Config), 334: RoomAddr = muc_helper:room_address(Room), 335: BobJID = client_lower_short_jid(Bob), 336: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 337: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 338: %% WHEN users chat 339: escalus:send(Bob, 340: muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 341: 342: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 343: <<"Hi there!">>)), 344: 345: %% THEN wait for chat message sent events events from Rabbit. 346: ?assertReceivedMatch({#'basic.deliver'{ 347: routing_key = BobGroupChatMsgSentRK}, 348: #amqp_msg{}}, timer:seconds(5)) 349: end). 350: 351: group_chat_message_received_event(Config) -> 352: escalus:story( 353: Config, [{bob, 1}, {alice, 1}], 354: fun(Bob, Alice) -> 355: %% GIVEN 356: Room = ?config(room, Config), 357: RoomAddr = muc_helper:room_address(Room), 358: AliceJID = client_lower_short_jid(Alice), 359: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 360: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], 361: Config), 362: %% WHEN users chat 363: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, 364: nick(Alice))), 365: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 366: nick(Bob))), 367: 368: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, 369: <<"Hi there!">>)), 370: 371: %% THEN wait for chat message received events events from Rabbit. 372: ?assertReceivedMatch({#'basic.deliver'{ 373: routing_key = AliceGroupChatMsgRecvRK}, 374: #amqp_msg{}}, timer:seconds(5)) 375: end). 376: 377: group_chat_message_sent_event_properly_formatted(Config) -> 378: escalus:story( 379: Config, [{bob, 1}], 380: fun(Bob) -> 381: %% GIVEN 382: Room = ?config(room, Config), 383: RoomAddr = muc_helper:room_address(Room), 384: BobJID = client_lower_short_jid(Bob), 385: BobFullJID = client_lower_full_jid(Bob), 386: BobGroupChatMsgSentRK = group_chat_msg_sent_rk(BobJID), 387: Message = <<"Hi there!">>, 388: listen_to_group_chat_msg_sent_events_from_rabbit([BobJID], Config), 389: %% WHEN a user chat 390: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, 391: nick(Bob))), 392: 393: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 394: %% THEN 395: ?assertMatch(#{<<"from_user_id">> := BobFullJID, 396: <<"to_user_id">> := RoomAddr, 397: <<"message">> := Message}, 398: get_decoded_message_from_rabbit(BobGroupChatMsgSentRK)) 399: end). 400: 401: group_chat_message_received_event_properly_formatted(Config) -> 402: escalus:story( 403: Config, [{bob, 1}, {alice, 1}], 404: fun(Bob, Alice) -> 405: %% GIVEN basic variables 406: Room = ?config(room, Config), 407: RoomAddr = muc_helper:room_address(Room), 408: BobRoomJID = user_room_jid(RoomAddr, Bob), 409: AliceJID = client_lower_short_jid(Alice), 410: AliceFullJID = client_lower_full_jid(Alice), 411: AliceGroupChatMsgRecvRK = group_chat_msg_recv_rk(AliceJID), 412: Message = <<"Hi there!">>, 413: %% GIVEN users in room 414: escalus:send(Alice, muc_helper:stanza_muc_enter_room(Room, nick(Alice))), 415: escalus:send(Bob, muc_helper:stanza_muc_enter_room(Room, nick(Bob))), 416: % wait for all room stanzas to be processed 417: escalus:wait_for_stanzas(Alice, 3), 418: escalus:wait_for_stanzas(Bob, 3), 419: %% GIVEN Room subscription to Rabbit 420: % We subscribe to RMQ now and not earlier to avoid messages other 421: % than the one we are testing, `Message` from Bob to Room, like 422: % for example affiliations and the like. 423: listen_to_group_chat_msg_recv_events_from_rabbit([AliceJID], Config), 424: %% WHEN users chat 425: escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, Message)), 426: %% THEN 427: ?assertMatch(#{<<"from_user_id">> := BobRoomJID, 428: <<"to_user_id">> := AliceFullJID, 429: <<"message">> := Message}, 430: get_decoded_message_from_rabbit(AliceGroupChatMsgRecvRK)) 431: end). 432: 433: %%-------------------------------------------------------------------- 434: %% Test helpers 435: %%-------------------------------------------------------------------- 436: 437: -spec connect_to_rabbit() -> proplists:proplist(). 438: connect_to_rabbit() -> 439: {ok, Connection} = 440: amqp_connection:start(#amqp_params_network{}), 441: {ok, Channel} = amqp_connection:open_channel(Connection), 442: [{rabbit_connection, Connection}, {rabbit_channel, Channel}]. 443: 444: -spec close_rabbit_connection(Config :: proplists:proplist()) -> ok | term(). 445: close_rabbit_connection(Config) -> 446: Connection = proplists:get_value(rabbit_connection, Config), 447: Channel = proplists:get_value(rabbit_channel, Config), 448: amqp_channel:close(Channel), 449: amqp_connection:close(Connection). 450: 451: -spec listen_to_presence_events_from_rabbit(JIDs :: [binary()], 452: Config :: proplists:proplist()) -> 453: ok | term(). 454: listen_to_presence_events_from_rabbit(JIDs, Config) -> 455: QueueBindings = presence_bindings(?QUEUE_NAME, JIDs), 456: listen_to_events_from_rabbit(QueueBindings, Config). 457: 458: -spec listen_to_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 459: Config :: proplists:proplist()) -> 460: ok | term(). 461: listen_to_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 462: QueueBindings = chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 463: listen_to_events_from_rabbit(QueueBindings, Config). 464: 465: -spec listen_to_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 466: Config :: proplists:proplist()) -> 467: ok | term(). 468: listen_to_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 469: QueueBindings = chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 470: listen_to_events_from_rabbit(QueueBindings, Config). 471: 472: -spec listen_to_group_chat_msg_sent_events_from_rabbit(JIDs :: [binary()], 473: Config :: proplists:proplist()) -> 474: ok | term(). 475: listen_to_group_chat_msg_sent_events_from_rabbit(JIDs, Config) -> 476: QueueBindings = group_chat_msg_sent_bindings(?QUEUE_NAME, JIDs), 477: listen_to_events_from_rabbit(QueueBindings, Config). 478: 479: -spec listen_to_group_chat_msg_recv_events_from_rabbit(JIDs :: [binary()], 480: Config :: proplists:proplist()) -> 481: ok | term(). 482: listen_to_group_chat_msg_recv_events_from_rabbit(JIDs, Config) -> 483: QueueBindings = group_chat_msg_recv_bindings(?QUEUE_NAME, JIDs), 484: listen_to_events_from_rabbit(QueueBindings, Config). 485: 486: -spec listen_to_events_from_rabbit(QueueBindings :: [rabbit_binding()], 487: Config :: proplists:proplist()) -> 488: ok | term(). 489: listen_to_events_from_rabbit(QueueBindings, Config) -> 490: Connection = proplists:get_value(rabbit_connection, Config), 491: Channel = proplists:get_value(rabbit_channel, Config), 492: declare_temporary_rabbit_queue(Channel, ?QUEUE_NAME), 493: wait_for_exchanges_to_be_created(Connection, 494: [{?PRESENCE_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 495: {?CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}, 496: {?GROUP_CHAT_MSG_EXCHANGE, ?DEFAULT_EXCHANGE_TYPE}]), 497: bind_queues_to_exchanges(Channel, QueueBindings), 498: subscribe_to_rabbit_queue(Channel, ?QUEUE_NAME). 499: 500: -spec wait_for_exchanges_to_be_created(Connection :: pid(), 501: Exchanges :: [binary()]) -> pid(). 502: wait_for_exchanges_to_be_created(Connection, Exchanges) -> 503: [ensure_exchange_present(Connection, Exchange) || Exchange <- Exchanges], 504: ok. 505: 506: -spec declare_temporary_rabbit_queue(Channel :: pid(), Queue :: binary()) -> binary(). 507: declare_temporary_rabbit_queue(Channel, Queue) -> 508: #'queue.declare_ok'{} = 509: amqp_channel:call(Channel, #'queue.declare'{queue = Queue, 510: exclusive = true}). 511: 512: -spec presence_bindings(Queue :: binary(), JIDs :: [binary()]) -> 513: [rabbit_binding()]. 514: presence_bindings(Queue, JIDs) -> 515: [{Queue, ?PRESENCE_EXCHANGE, JID} || JID <- JIDs]. 516: 517: -spec chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 518: [rabbit_binding()]. 519: chat_msg_sent_bindings(Queue, JIDs) -> 520: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_sent_rk(JID)} || JID <- JIDs]. 521: 522: -spec chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 523: [rabbit_binding()]. 524: chat_msg_recv_bindings(Queue, JIDs) -> 525: [{Queue, ?CHAT_MSG_EXCHANGE, chat_msg_recv_rk(JID)} || JID <- JIDs]. 526: 527: -spec group_chat_msg_sent_bindings(Queue :: binary(), JIDs :: [binary()]) -> 528: [rabbit_binding()]. 529: group_chat_msg_sent_bindings(Queue, JIDs) -> 530: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_sent_rk(JID)} 531: || JID <- JIDs]. 532: 533: -spec group_chat_msg_recv_bindings(Queue :: binary(), JIDs :: [binary()]) -> 534: [rabbit_binding()]. 535: group_chat_msg_recv_bindings(Queue, JIDs) -> 536: [{Queue, ?GROUP_CHAT_MSG_EXCHANGE, group_chat_msg_recv_rk(JID)} 537: || JID <- JIDs]. 538: 539: -spec bind_queues_to_exchanges(Channel :: pid(), 540: Bindings :: [rabbit_binding()]) -> 541: [amqp_client:amqp_method() | ok | blocked | closing]. 542: bind_queues_to_exchanges(Channel, Bindings) -> 543: [bind_queue_to_exchange(Channel, Binding) || Binding <- Bindings]. 544: 545: -spec bind_queue_to_exchange(Channel :: pid(), rabbit_binding()) -> 546: amqp_client:amqp_method() | ok | blocked | closing. 547: bind_queue_to_exchange(Channel, {Queue, Exchange, RoutingKey}) -> 548: #'queue.bind_ok'{} = 549: amqp_channel:call(Channel, #'queue.bind'{exchange = Exchange, 550: routing_key = RoutingKey, 551: queue = Queue}). 552: 553: -spec ensure_exchange_present(Connection :: pid(), Exchange :: binary()) -> 554: {ok, true} | {timeout, any()}. 555: ensure_exchange_present(Connection, Exchange) -> 556: Opts = #{time_left => ?WAIT_FOR_EXCHANGE_INTERVAL * ?IF_EXCHANGE_EXISTS_RETRIES / 1000, 557: sleep_time => ?WAIT_FOR_EXCHANGE_INTERVAL}, 558: case mongoose_helper:wait_until(fun() -> 559: is_exchange_present(Connection, 560: Exchange) 561: end, true, Opts) of 562: {ok, true} -> true; 563: {timeout, _} -> 564: throw(io_lib:format("Exchange has not been created, exchange=~p", 565: [Exchange])) 566: end. 567: 568: -spec is_exchange_present(Connection :: pid(), Exchange :: binary()) -> boolean(). 569: is_exchange_present(Connection, {ExName, ExType}) -> 570: {ok, Channel} = amqp_connection:open_channel(Connection), 571: try amqp_channel:call(Channel, #'exchange.declare'{exchange = ExName, 572: type = ExType, 573: %% this option allows to 574: %% check if an exchange exists 575: passive = true}) of 576: {'exchange.declare_ok'} -> true 577: catch 578: _Error:_Reason -> false 579: end. 580: 581: -spec subscribe_to_rabbit_queue(Channel :: pid(), Queue :: binary()) -> ok. 582: subscribe_to_rabbit_queue(Channel, Queue) -> 583: amqp_channel:subscribe(Channel, #'basic.consume'{queue = Queue, 584: no_ack = true}, self()), 585: receive 586: #'basic.consume_ok'{} -> ok 587: after 588: 5000 -> 589: throw(io_lib:format("Timeout when subscribing to queue," 590: "channel=~p, queue=~p", [Channel, Queue])) 591: end. 592: 593: -spec send_presence_stanzas(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 594: -> [[ok]] | term(). 595: send_presence_stanzas(Users, NumOfMsgs) -> 596: [send_presence_stanza(User, NumOfMsgs) || User <- Users]. 597: 598: -spec send_presence_stanza(Users :: [binary()], NumOfMsgs :: non_neg_integer()) 599: -> [ok] | term(). 600: send_presence_stanza(User, NumOfMsgs) -> 601: [escalus:send(User, escalus_stanza:presence(make_pres_type(X))) 602: || X <- lists:seq(1, NumOfMsgs)]. 603: 604: -spec get_decoded_message_from_rabbit(RoutingKey :: binary()) -> 605: map() | no_return(). 606: get_decoded_message_from_rabbit(RoutingKey) -> 607: receive 608: {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = Msg}} -> 609: jiffy:decode(Msg, [return_maps]) 610: after 611: 5000 -> ct:fail(io_lib:format("Timeout when decoding message, rk=~p", 612: [RoutingKey])) 613: end. 614: 615: %%-------------------------------------------------------------------- 616: %% Utils 617: %%-------------------------------------------------------------------- 618: 619: start_rabbit_wpool(Host) -> 620: start_rabbit_wpool(Host, ?WPOOL_CFG). 621: 622: start_rabbit_wpool(Host, WpoolConfig) -> 623: rpc(mim(), mongoose_wpool, ensure_started, []), 624: rpc(mim(), mongoose_wpool, start_configured_pools, [[WpoolConfig], [Host]]). 625: 626: stop_rabbit_wpool({Pool, Host, Tag}) -> 627: rpc(mim(), mongoose_wpool, stop, [Pool, Host, Tag]); 628: stop_rabbit_wpool(Host)-> 629: stop_rabbit_wpool({rabbit, Host, event_pusher}). 630: 631: delete_exchanges() -> 632: ConnConf = connect_to_rabbit(), 633: Channel = proplists:get_value(rabbit_channel, ConnConf), 634: [amqp_channel:call(Channel, #'exchange.delete'{exchange = Ex}) 635: || Ex <- [?PRESENCE_EXCHANGE, ?CHAT_MSG_EXCHANGE, ?GROUP_CHAT_MSG_EXCHANGE]], 636: close_rabbit_connection(ConnConf). 637: 638: make_pres_type(X) when X rem 2 == 0 -> 639: <<"available">>; 640: make_pres_type(_) -> 641: <<"unavailable">>. 642: 643: chat_msg_sent_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_SENT_TOPIC). 644: 645: chat_msg_recv_rk(JID) -> user_topic_routing_key(JID, ?CHAT_MSG_RECV_TOPIC). 646: 647: group_chat_msg_sent_rk(JID) -> 648: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_SENT_TOPIC). 649: 650: group_chat_msg_recv_rk(JID) -> 651: user_topic_routing_key(JID, ?GROUP_CHAT_MSG_RECV_TOPIC). 652: 653: user_topic_routing_key(JID, Topic) -> <<JID/binary, ".", Topic/binary>>. 654: 655: client_lower_short_jid(Client) -> 656: escalus_utils:jid_to_lower(escalus_client:short_jid(Client)). 657: 658: client_lower_full_jid(Client) -> 659: escalus_utils:jid_to_lower(escalus_client:full_jid(Client)). 660: 661: nick(User) -> escalus_utils:get_username(User). 662: 663: maybe_prepare_muc(TestCase, Config) when 664: TestCase == group_chat_message_sent_event orelse 665: TestCase == group_chat_message_received_event orelse 666: TestCase == group_chat_message_sent_event_properly_formatted orelse 667: TestCase == group_chat_message_received_event_properly_formatted -> 668: prepare_muc(Config); 669: maybe_prepare_muc(_, Config) -> Config. 670: 671: maybe_cleanup_muc(TestCase, Config) when 672: TestCase == group_chat_message_sent_event orelse 673: TestCase == group_chat_message_received_event orelse 674: TestCase == group_chat_message_sent_event_properly_formatted orelse 675: TestCase == group_chat_message_received_event_properly_formatted -> 676: cleanup_muc(Config); 677: maybe_cleanup_muc(_, _) -> ok. 678: 679: prepare_muc(Config) -> 680: [User | _] = ?config(escalus_users, Config), 681: muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>, 682: [{persistent, true}, 683: {anonymous, false}]). 684: 685: cleanup_muc(Config) -> 686: muc_helper:destroy_room(Config). 687: 688: user_room_jid(RoomJID, UserJID) -> 689: Nick = nick(UserJID), 690: <<RoomJID/binary, "/", Nick/binary>>. 691: 692: is_rabbitmq_available() -> 693: try amqp_connection:start(#amqp_params_network{}) of 694: {ok, Conn} -> 695: amqp_connection:close(Conn), 696: true; 697: {error, econnrefused} -> 698: false 699: catch 700: _Err -> 701: false 702: end.