1: -module(mod_event_pusher_sns_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: -include_lib("escalus/include/escalus.hrl").
    4: -include_lib("common_test/include/ct.hrl").
    5: -include_lib("eunit/include/eunit.hrl").
    6: -include_lib("escalus/include/escalus_xmlns.hrl").
    7: -include_lib("exml/include/exml.hrl").
    8: 
    9: -include("assert_received_match.hrl").
   10: 
   11: -import(domain_helper, [domain/0]).
   12: -import(config_parser_helper, [config/2]).
   13: 
   14: -define(NS_HTTP_UPLOAD, <<"urn:xmpp:http:upload">>).
   15: -define(S3_HOSTNAME, "http://bucket.s3-eu-east-25.example.com").
   16: 
   17: -record(publish, {
   18:     topic_arn,
   19:     message,
   20:     subject,
   21:     attributes,
   22:     config
   23: }).
   24: 
   25: %%--------------------------------------------------------------------
   26: %% Suite configuration
   27: %%--------------------------------------------------------------------
   28: 
   29: all() ->
   30:     [
   31:         {group, presence_status_publish},
   32:         {group, message_publish}
   33:     ].
   34: 
   35: groups() ->
   36:     G = [
   37:          {presence_status_publish, [],
   38:           [
   39:            connected_user_changes_status,
   40:            disconnected_user_becomes_unavailable
   41:           ]},
   42:          {message_publish, [],
   43:           [
   44:            pm_messages,
   45:            muc_messages
   46:           ]}
   47:         ],
   48:     ct_helper:repeat_all_until_all_ok(G).
   49: 
   50: suite() ->
   51:     escalus:suite().
   52: 
   53: %%--------------------------------------------------------------------
   54: %% Init & teardown
   55: %%--------------------------------------------------------------------
   56: 
   57: init_per_suite(Config) ->
   58:     case rpc(application, ensure_all_started, [erlcloud]) of
   59:         {ok, _} ->
   60:             %% For mocking with unnamed functions
   61:             mongoose_helper:inject_module(?MODULE),
   62: 
   63:             muc_helper:load_muc(),
   64:             escalus:init_per_suite(Config);
   65:         {error, _} ->
   66:             {skip, "erlcloud dependency is not enabled"}
   67:     end.
   68: 
   69: end_per_suite(Config) ->
   70:     escalus_fresh:clean(),
   71:     muc_helper:unload_muc(),
   72:     escalus:end_per_suite(Config).
   73: 
   74: init_per_group(_, Config0) ->
   75:     Domain = domain(),
   76:     Config = dynamic_modules:save_modules(Domain, Config0),
   77:     dynamic_modules:ensure_modules(Domain, required_modules()),
   78:     Config.
   79: 
   80: required_modules() ->
   81:     [{mod_event_pusher, #{sns => config([modules, mod_event_pusher, sns], sns_opts())}}].
   82: 
   83: sns_opts() ->
   84:     #{presence_updates_topic => "user_presence_updated-dev-1",
   85:       pm_messages_topic => "user_message_sent-dev-1",
   86:       muc_messages_topic => "user_messagegroup_sent-dev-1",
   87:       sns_host => "sns.eu-west-1.amazonaws.com",
   88:       region => "eu-west-1",
   89:       access_key_id => "AKIAIH54ALYGMZTESTID",
   90:       secret_access_key => "buRqHOxXCFUQkiuYgdUAy+XoGKt0Ec6DTESTKEY+",
   91:       account_id => "123456789012"}.
   92: 
   93: end_per_group(_, Config) ->
   94:     dynamic_modules:restore_modules(Config),
   95:     escalus:delete_users(Config, escalus:get_users([bob, alice])).
   96: 
   97: init_per_testcase(muc_messages = C, Config0) ->
   98:     Config = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
   99:     start_publish_listener(Config),
  100:     [User | _] = ?config(escalus_users, Config),
  101:     Config2 = muc_helper:start_room(Config, User, <<"muc_publish">>, <<"user_nick">>,
  102:                                     [{persistent, true},
  103:                                      {anonymous, false}]),
  104:     escalus:init_per_testcase(C, Config2);
  105: init_per_testcase(CaseName, Config0) ->
  106:     Config = escalus_fresh:create_users(Config0, [{bob, 1}, {alice, 1}]),
  107:     start_publish_listener(Config),
  108:     escalus:init_per_testcase(CaseName, Config).
  109: 
  110: end_per_testcase(muc_messages, Config) ->
  111:     muc_helper:destroy_room(Config),
  112:     rpc(meck, unload, []),
  113:     end_per_testcase(any, Config);
  114: end_per_testcase(CaseName, Config) ->
  115:     rpc(meck, unload, []),
  116:     escalus:end_per_testcase(CaseName, Config).
  117: 
  118: 
  119: %%--------------------------------------------------------------------
  120: %% GROUP presence_status_publish
  121: %%--------------------------------------------------------------------
  122: 
  123: connected_user_changes_status(Config) ->
  124:     escalus:story(
  125:         Config, [{bob, 1}, {alice, 1}],
  126:         fun(Bob, Alice) ->
  127:             Topic = make_topic_arn(presence_updates_topic),
  128:             BobJID = nick_to_jid(bob, Config),
  129:             AliceJID = nick_to_jid(alice, Config),
  130: 
  131:             %% Available after login
  132:             escalus:wait_for_stanzas(Bob, 1),
  133:             escalus:wait_for_stanzas(Alice, 1),
  134:             ?assertReceivedMatch(#publish{
  135:                 topic_arn = Topic, message = #{user_id := BobJID, present := true}
  136:             }, timer:seconds(5)),
  137: 
  138:             ?assertReceivedMatch(#publish{
  139:                 topic_arn = Topic, message = #{user_id := AliceJID, present := true}
  140:             }, timer:seconds(5)),
  141: 
  142:             %% Unavailable after presence change
  143:             escalus:send(Bob, escalus_stanza:presence(<<"unavailable">>)),
  144:             escalus:send(Alice, escalus_stanza:presence(<<"unavailable">>)),
  145:             ?assertReceivedMatch(#publish{
  146:                 topic_arn = Topic, message = #{user_id := BobJID, present := false}
  147:             }, timer:seconds(5)),
  148:             ?assertReceivedMatch(#publish{
  149:                 topic_arn = Topic, message = #{user_id := AliceJID, present := false}
  150:             }, timer:seconds(5)),
  151: 
  152:             %% Available after presence change
  153:             escalus:send(Bob, escalus_stanza:presence(<<"available">>)),
  154:             ?assertReceivedMatch(#publish{
  155:                 topic_arn = Topic, message = #{user_id := BobJID, present := true}
  156:             }, timer:seconds(5)),
  157: 
  158:             ok
  159:         end).
  160: 
  161: disconnected_user_becomes_unavailable(Config) ->
  162:     escalus:story(
  163:         Config, [{bob, 1}, {alice, 1}],
  164:         fun(_Bob, _Alice) ->
  165:             %% Presences
  166:             ?assertReceivedMatch(#publish{}),
  167:             ?assertReceivedMatch(#publish{})
  168:         end),
  169: 
  170:     BobJID = nick_to_jid(bob, Config),
  171:     AliceJID = nick_to_jid(alice, Config),
  172:     ?assertReceivedMatch(#publish{
  173:         message = #{user_id := BobJID, present := false}
  174:     }, timer:seconds(5)),
  175:     ?assertReceivedMatch(#publish{
  176:         message = #{user_id := AliceJID, present := false}
  177:     }, timer:seconds(5)).
  178: 
  179: %%--------------------------------------------------------------------
  180: %% GROUP message_publish
  181: %%--------------------------------------------------------------------
  182: 
  183: pm_messages(Config) ->
  184:     escalus:story(
  185:         Config, [{bob, 1}, {alice, 1}],
  186:         fun(Bob, Alice) ->
  187:             Topic = make_topic_arn(pm_messages_topic),
  188:             BobJID = nick_to_jid(bob, Config),
  189:             AliceJID = nick_to_jid(alice, Config),
  190: 
  191:             %% Presences
  192:             ?assertReceivedMatch(#publish{}),
  193:             ?assertReceivedMatch(#publish{}),
  194: 
  195:             escalus:send(Alice, escalus_stanza:chat_to(Bob, <<"OH, HAI!">>)),
  196:             ?assertReceivedMatch(#publish{
  197:                 topic_arn = Topic,
  198:                 message = #{from_user_id := AliceJID,
  199:                             to_user_id := BobJID,
  200:                             message := <<"OH, HAI!">>}
  201:             }, timer:seconds(5)),
  202: 
  203:             escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"Hi there!">>)),
  204:             escalus:send(Bob, escalus_stanza:chat_to(Alice, <<"How are you?">>)),
  205:             ?assertReceivedMatch(#publish{
  206:                 topic_arn = Topic,
  207:                 message = #{from_user_id := BobJID,
  208:                             to_user_id := AliceJID,
  209:                             message := <<"Hi there!">>}
  210:             }, timer:seconds(5)),
  211:             ?assertReceivedMatch(#publish{
  212:                 topic_arn = Topic,
  213:                 message = #{from_user_id := BobJID,
  214:                             to_user_id := AliceJID,
  215:                             message := <<"How are you?">>}
  216:             }, timer:seconds(5)),
  217: 
  218:             ok
  219:         end).
  220: 
  221: muc_messages(Config) ->
  222:     escalus:story(
  223:         Config, [{bob, 1}, {alice, 1}],
  224:         fun(Bob, Alice) ->
  225:             Topic = make_topic_arn(muc_messages_topic),
  226:             Room = ?config(room, Config),
  227:             RoomAddr = room_address(Room),
  228:             BobJID = nick_to_jid(bob, Config),
  229: 
  230:             %% Presences
  231:             ?assertReceivedMatch(#publish{}),
  232:             ?assertReceivedMatch(#publish{}),
  233: 
  234:             escalus:send(Alice, stanza_muc_enter_room(Room, nick(Alice))),
  235:             escalus:send(Bob, stanza_muc_enter_room(Room, nick(Bob))),
  236: 
  237:             escalus:send(Bob, escalus_stanza:groupchat_to(RoomAddr, <<"Hi there!">>)),
  238: 
  239:             %% 2x presence, topic and message
  240:             escalus:wait_for_stanzas(Bob, 4),
  241:             escalus:wait_for_stanzas(Alice, 4),
  242: 
  243:             ?assertReceivedMatch(#publish{
  244:                 topic_arn = Topic,
  245:                 message = #{from_user_id := BobJID,
  246:                             to_user_id := RoomAddr,
  247:                             message := <<"Hi there!">>}
  248:             }, timer:seconds(5)),
  249: 
  250:             ok
  251:         end).
  252: 
  253: 
  254: %%--------------------------------------------------------------------
  255: %% Test helpers
  256: %%--------------------------------------------------------------------
  257: 
  258: %% @doc Forwards all erlcloud_sns:publish calls to local PID as messages
  259: start_publish_listener(Config) ->
  260:     TestCasePid = self(),
  261:     ok = rpc(meck, new, [erlcloud_sns, [no_link, passthrough]]),
  262:     ok = rpc(meck, expect,
  263:         [erlcloud_sns, publish,
  264:          fun(topic, RecipientArn, Message, Subject, Attributes, AWSConfig) ->
  265:              TestCasePid ! #publish{topic_arn = RecipientArn,
  266:                                     message = maps:from_list(Message),
  267:                                     subject = Subject,
  268:                                     attributes = Attributes,
  269:                                     config = AWSConfig},
  270:              uuid:uuid_to_string(uuid:get_v4())
  271:          end]),
  272:     Config.
  273: 
  274: -spec rpc(M :: atom(), F :: atom(), A :: [term()]) -> term().
  275: rpc(M, F, A) ->
  276:     Node = ct:get_config({hosts, mim, node}),
  277:     Cookie = escalus_ct:get_config(ejabberd_cookie),
  278:     escalus_rpc:call(Node, M, F, A, 10000, Cookie).
  279: 
  280: make_topic_arn(TopicVar) ->
  281:     #{region := Region, account_id := AccountId, TopicVar := Topic} = sns_opts(),
  282:     string:join(["arn", "aws", "sns", Region, AccountId, Topic], ":").
  283: 
  284: %% @doc Get a binary jid of the user, that tagged with `UserName' in the config.
  285: nick_to_jid(UserName, Config) when is_atom(UserName) ->
  286:     UserSpec = escalus_users:get_userspec(Config, UserName),
  287:     escalus_utils:jid_to_lower(escalus_users:get_jid(Config, UserSpec)).
  288: 
  289: stanza_muc_enter_room(Room, Nick) ->
  290:     stanza_to_room(
  291:         escalus_stanza:presence(<<"available">>,
  292:                                 [#xmlel{name = <<"x">>,
  293:                                         attrs=[{<<"xmlns">>, <<"http://jabber.org/protocol/muc">>}]}
  294:                                 ]),
  295:         Room, Nick).
  296: 
  297: stanza_default_muc_room(Room, Nick) ->
  298:     Form = escalus_stanza:x_data_form(<<"submit">>, []),
  299:     Query = escalus_stanza:query_el(?NS_MUC_OWNER, [Form]),
  300:     IQSet = escalus_stanza:iq(<<"set">>, [Query]),
  301:     stanza_to_room(IQSet, Room, Nick).
  302: 
  303: stanza_to_room(Stanza, Room) ->
  304:     escalus_stanza:to(Stanza, room_address(Room)).
  305: 
  306: stanza_to_room(Stanza, Room, Nick) ->
  307:     escalus_stanza:to(Stanza, room_address(Room, Nick)).
  308: 
  309: room_address(Room) ->
  310:     <<Room/binary, "@", (muc_host())/binary>>.
  311: 
  312: room_address(Room, Nick) ->
  313:     <<Room/binary, "@", (muc_host())/binary, "/", Nick/binary>>.
  314: 
  315: nick(User) -> escalus_utils:get_username(User).
  316: 
  317: muc_host() ->
  318:     muc_helper:muc_host().