1: -module(mod_aws_sns_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: -include_lib("escalus/include/escalus.hrl").
    4: -include_lib("common_test/include/ct.hrl").
    5: -include_lib("eunit/include/eunit.hrl").
    6: -include_lib("escalus/include/escalus_xmlns.hrl").
    7: -include_lib("exml/include/exml.hrl").
    8: 
    9: -include("assert_received_match.hrl").
   10: 
   11: -import(domain_helper, [domain/0]).
   12: 
   13: -define(NS_HTTP_UPLOAD, <<"urn:xmpp:http:upload">>).
   14: -define(S3_HOSTNAME, "http://bucket.s3-eu-east-25.example.com").
   15: -define(SNS_OPTS,
   16:     [
   17:         {access_key_id, "AKIAIH54ALYGMZTESTID"},
   18:         {secret_access_key, "buRqHOxXCFUQkiuYgdUAy+XoGKt0Ec6DTESTKEY+"},
   19:         {region, "eu-west-1"},
   20:         {account_id, "123456789012"},
   21:         {sns_host, "sns.eu-west-1.amazonaws.com"},
   22:         {plugin_module, mod_event_pusher_sns_defaults},
   23:         {presence_updates_topic, "user_presence_updated-dev-1"},
   24:         {pm_messages_topic, "user_message_sent-dev-1"},
   25:         {muc_messages_topic, "user_messagegroup_sent-dev-1"},
   26:         {muc_host, "muc.@HOST@"}
   27:     ]).
   28: 
   29: -record(publish, {
   30:     topic_arn,
   31:     message,
   32:     subject,
   33:     attributes,
   34:     config
   35: }).
   36: 
   37: %%--------------------------------------------------------------------
   38: %% Suite configuration
   39: %%--------------------------------------------------------------------
   40: 
   41: all() ->
   42:     [
   43:         {group, presence_status_publish},
   44:         {group, message_publish}
   45:     ].
   46: 
   47: groups() ->
   48:     G = [
   49:          {presence_status_publish, [],
   50:           [
   51:            connected_user_changes_status,
   52:            disconnected_user_becomes_unavailable
   53:           ]},
   54:          {message_publish, [],
   55:           [
   56:            pm_messages,
   57:            muc_messages
   58:           ]}
   59:         ],
   60:     ct_helper:repeat_all_until_all_ok(G).
   61: 
   62: suite() ->
   63:     escalus:suite().
   64: 
   65: %%--------------------------------------------------------------------
   66: %% Init & teardown
   67: %%--------------------------------------------------------------------
   68: 
   69: init_per_suite(Config) ->
   70:     case rpc(application, ensure_all_started, [erlcloud]) of
   71:         {ok, _} ->
   72:             %% For mocking with unnamed functions
   73:             mongoose_helper:inject_module(?MODULE),
   74: 
   75:             muc_helper:load_muc(),
   76:             escalus:init_per_suite(Config);
   77:         {error, _} ->
   78:             {skip, "erlcloud dependency is not enabled"}
   79:     end.
   80: 
   81: end_per_suite(Config) ->
   82:     escalus_fresh:clean(),
   83:     muc_helper:unload_muc(),
   84:     escalus:end_per_suite(Config).
   85: 
   86: init_per_group(_, Config0) ->
   87:     Domain = domain(),
   88:     Config1 = dynamic_modules:save_modules(Domain, Config0),
   89:     Config = [{sns_config, ?SNS_OPTS} | Config1],
   90:     dynamic_modules:ensure_modules(Domain, [{mod_aws_sns, ?SNS_OPTS}]),
   91:     Config.
   92: 
   93: end_per_group(_, Config) ->
   94:     dynamic_modules:restore_modules(Config),
   95:     escalus:delete_users(Config, escalus:get_users([bob, alice])).
   96: 
   97: init_per_testcase(muc_messages = C, Config0) ->
   98:     Config = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
   99:     start_publish_listener(Config),
  100:     [User | _] = ?config(escalus_users, Config),
  101:     Config2 = muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>,
  102:                                     [{persistent, true},
  103:                                      {anonymous, false}]),
  104:     escalus:init_per_testcase(C, Config2);
  105: init_per_testcase(CaseName, Config0) ->
  106:     Config = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
  107:     start_publish_listener(Config),
  108:     escalus:init_per_testcase(CaseName, Config).
  109: 
  110: end_per_testcase(muc_messages, Config) ->
  111:     muc_helper:destroy_room(Config),
  112:     rpc(meck, unload, []),
  113:     end_per_testcase(any, Config);
  114: end_per_testcase(CaseName, Config) ->
  115:     rpc(meck, unload, []),
  116:     escalus:end_per_testcase(CaseName, Config).
  117: 
  118: 
  119: %%--------------------------------------------------------------------
  120: %% GROUP presence_status_publish
  121: %%--------------------------------------------------------------------
  122: 
  123: connected_user_changes_status(Config) ->
  124:     escalus:story(
  125:         Config, [{bob, 1}, {alice, 1}],
  126:         fun(Bob, Alice) ->
  127:             Topic = make_topic_arn(Config, presence_updates_topic),
  128:             BobJID = nick_to_jid(bob, Config),
  129:             AliceJID = nick_to_jid(alice, Config),
  130: 
  131:             %% Available after login
  132:             escalus:wait_for_stanzas(Bob, 1),
  133:             escalus:wait_for_stanzas(Alice, 1),
  134:             ?assertReceivedMatch(#publish{
  135:                 topic_arn = Topic, message = #{user_id := BobJID, present := true}
  136:             }, timer:seconds(5)),
  137: 
  138:             ?assertReceivedMatch(#publish{
  139:                 topic_arn = Topic, message = #{user_id := AliceJID, present := true}
  140:             }, timer:seconds(5)),
  141: 
  142:             %% Unavailable after presence change
  143:             escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
  144:             escalus:send(Alice, escalus_stanza:presence(<<"unavailable">>)),
  145:             ?assertReceivedMatch(#publish{
  146:                 topic_arn = Topic, message = #{user_id := BobJID, present := false}
  147:             }, timer:seconds(5)),
  148:             ?assertReceivedMatch(#publish{
  149:                 topic_arn = Topic, message = #{user_id := AliceJID, present := false}
  150:             }, timer:seconds(5)),
  151: 
  152:             %% Available after presence change
  153:             escalus:send(Bob, escalus_stanza:presence(<<"available">>)),
  154:             ?assertReceivedMatch(#publish{
  155:                 topic_arn = Topic, message = #{user_id := BobJID, present := true}
  156:             }, timer:seconds(5)),
  157: 
  158:             ok
  159:         end).
  160: 
  161: disconnected_user_becomes_unavailable(Config) ->
  162:     escalus:story(
  163:         Config, [{bob, 1}, {alice, 1}],
  164:         fun(_Bob, _Alice) ->
  165:             %% Presences
  166:             ?assertReceivedMatch(#publish{}),
  167:             ?assertReceivedMatch(#publish{})
  168:         end),
  169: 
  170:     BobJID = nick_to_jid(bob, Config),
  171:     AliceJID = nick_to_jid(alice, Config),
  172:     ?assertReceivedMatch(#publish{
  173:         message = #{user_id := BobJID, present := false}
  174:     }, timer:seconds(5)),
  175:     ?assertReceivedMatch(#publish{
  176:         message = #{user_id := AliceJID, present := false}
  177:     }, timer:seconds(5)).
  178: 
  179: %%--------------------------------------------------------------------
  180: %% GROUP message_publish
  181: %%--------------------------------------------------------------------
  182: 
  183: pm_messages(Config) ->
  184:     escalus:story(
  185:         Config, [{bob, 1}, {alice, 1}],
  186:         fun(Bob, Alice) ->
  187:             Topic = make_topic_arn(Config, pm_messages_topic),
  188:             BobJID = nick_to_jid(bob, Config),
  189:             AliceJID = nick_to_jid(alice, Config),
  190: 
  191:             %% Presences
  192:             ?assertReceivedMatch(#publish{}),
  193:             ?assertReceivedMatch(#publish{}),
  194: 
  195:             escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
  196:             ?assertReceivedMatch(#publish{
  197:                 topic_arn = Topic,
  198:                 message = #{from_user_id := AliceJID,
  199:                             to_user_id := BobJID,
  200:                             message := <<"OH, HAI!">>}
  201:             }, timer:seconds(5)),
  202: 
  203:             escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi there!">>)),
  204:             escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How are you?">>)),
  205:             ?assertReceivedMatch(#publish{
  206:                 topic_arn = Topic,
  207:                 message = #{from_user_id := BobJID,
  208:                             to_user_id := AliceJID,
  209:                             message := <<"Hi there!">>}
  210:             }, timer:seconds(5)),
  211:             ?assertReceivedMatch(#publish{
  212:                 topic_arn = Topic,
  213:                 message = #{from_user_id := BobJID,
  214:                             to_user_id := AliceJID,
  215:                             message := <<"How are you?">>}
  216:             }, timer:seconds(5)),
  217: 
  218:             ok
  219:         end).
  220: 
  221: muc_messages(Config) ->
  222:     escalus:story(
  223:         Config, [{bob, 1}, {alice, 1}],
  224:         fun(Bob, Alice) ->
  225:             Topic = make_topic_arn(Config, muc_messages_topic),
  226:             Room = ?config(room, Config),
  227:             RoomAddr = room_address(Room),
  228:             BobJID = nick_to_jid(bob, Config),
  229: 
  230:             %% Presences
  231:             ?assertReceivedMatch(#publish{}),
  232:             ?assertReceivedMatch(#publish{}),
  233: 
  234:             escalus:send(Alice, stanza_muc_enter_room(Room, nick(Alice))),
  235:             escalus:send(Bob, stanza_muc_enter_room(Room, nick(Bob))),
  236: 
  237:             escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, <<"Hi there!">>)),
  238: 
  239:             %% 2x presence, topic and message
  240:             escalus:wait_for_stanzas(Bob, 4),
  241:             escalus:wait_for_stanzas(Alice, 4),
  242: 
  243:             ?assertReceivedMatch(#publish{
  244:                 topic_arn = Topic,
  245:                 message = #{from_user_id := BobJID,
  246:                             to_user_id := RoomAddr,
  247:                             message := <<"Hi there!">>}
  248:             }, timer:seconds(5)),
  249: 
  250:             ok
  251:         end).
  252: 
  253: 
  254: %%--------------------------------------------------------------------
  255: %% Test helpers
  256: %%--------------------------------------------------------------------
  257: 
  258: %% @doc Forwards all erlcloud_sns:publish calls to local PID as messages
  259: start_publish_listener(Config) ->
  260:     TestCasePid = self(),
  261:     ok = rpc(meck, new, [erlcloud_sns, [no_link, passthrough]]),
  262:     ok = rpc(meck, expect,
  263:         [erlcloud_sns, publish,
  264:          fun(topic, RecipientArn, Message, Subject, Attributes, AWSConfig) ->
  265:              TestCasePid ! #publish{topic_arn = RecipientArn,
  266:                                     message = maps:from_list(Message),
  267:                                     subject = Subject,
  268:                                     attributes = Attributes,
  269:                                     config = AWSConfig},
  270:              uuid:uuid_to_string(uuid:get_v4())
  271:          end]),
  272:     Config.
  273: 
  274: -spec rpc(M :: atom(), F :: atom(), A :: [term()]) -> term().
  275: rpc(M, F, A) ->
  276:     Node = ct:get_config({hosts, mim, node}),
  277:     Cookie = escalus_ct:get_config(ejabberd_cookie),
  278:     escalus_rpc:call(Node, M, F, A, 10000, Cookie).
  279: 
  280: make_topic_arn(Config, TopicVar) ->
  281:     SNSConfig = proplists:get_value(sns_config, Config),
  282:     string:join(["arn", "aws", "sns",
  283:                 proplists:get_value(region, SNSConfig),
  284:                 proplists:get_value(account_id, SNSConfig),
  285:                 proplists:get_value(TopicVar, SNSConfig)], ":").
  286: 
  287: %% @doc Get a binary jid of the user, that tagged with `UserName' in the config.
  288: nick_to_jid(UserName, Config) when is_atom(UserName) ->
  289:     UserSpec = escalus_users:get_userspec(Config, UserName),
  290:     escalus_utils:jid_to_lower(escalus_users:get_jid(Config, UserSpec)).
  291: 
  292: stanza_muc_enter_room(Room, Nick) ->
  293:     stanza_to_room(
  294:         escalus_stanza:presence(<<"available">>,
  295:                                 [#xmlel{name = <<"x">>,
  296:                                         attrs=[{<<"xmlns">>, <<"http://jabber.org/protocol/muc">>}]}
  297:                                 ]),
  298:         Room, Nick).
  299: 
  300: stanza_default_muc_room(Room, Nick) ->
  301:     Form = escalus_stanza:x_data_form(<<"submit">>, []),
  302:     Query = escalus_stanza:query_el(?NS_MUC_OWNER, [Form]),
  303:     IQSet = escalus_stanza:iq(<<"set">>, [Query]),
  304:     stanza_to_room(IQSet, Room, Nick).
  305: 
  306: stanza_to_room(Stanza, Room) ->
  307:     escalus_stanza:to(Stanza, room_address(Room)).
  308: 
  309: stanza_to_room(Stanza, Room, Nick) ->
  310:     escalus_stanza:to(Stanza, room_address(Room, Nick)).
  311: 
  312: room_address(Room) ->
  313:     <<Room/binary, "@", (muc_host())/binary>>.
  314: 
  315: room_address(Room, Nick) ->
  316:     <<Room/binary, "@", (muc_host())/binary, "/", Nick/binary>>.
  317: 
  318: nick(User) -> escalus_utils:get_username(User).
  319: 
  320: muc_host() ->
  321:     muc_helper:muc_host().