1: -module(push_http_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: 
    4: -include_lib("eunit/include/eunit.hrl").
    5: 
    6: -define(ETS_TABLE, push_http).
    7: 
    8: -import(push_helper, [http_notifications_port/0, http_notifications_host/0]).
    9: -import(domain_helper, [domain/0]).
   10: -import(config_parser_helper, [config/2, mod_event_pusher_http_handler/0]).
   11: 
   12: %%--------------------------------------------------------------------
   13: %% Suite configuration
   14: %%--------------------------------------------------------------------
   15: 
   16: all() ->
   17:     [
   18:         {group, single},
   19:         {group, customised},
   20:         {group, multiple}
   21:     ].
   22: 
   23: groups() ->
   24:     [{single, [sequence], [simple_push]},
   25:      {customised, [sequence], [custom_push]},
   26:      {multiple, [sequence], [push_to_many]}
   27:     ].
   28: 
   29: suite() ->
   30:     distributed_helper:require_rpc_nodes([mim]) ++ escalus:suite().
   31: 
   32: %%--------------------------------------------------------------------
   33: %% Init & teardown
   34: %%--------------------------------------------------------------------
   35: 
   36: init_per_suite(Config) ->
   37:     start_http_listener(),
   38:     start_pool(),
   39:     setup_modules(),
   40:     escalus:init_per_suite(Config).
   41: 
   42: end_per_suite(Config) ->
   43:     stop_pool(),
   44:     stop_http_listener(),
   45:     teardown_modules(),
   46:     escalus_fresh:clean(),
   47:     escalus:end_per_suite(Config).
   48: 
   49: init_per_group(GroupName, Config) ->
   50:     Config2 = dynamic_modules:save_modules(domain(), Config),
   51:     dynamic_modules:ensure_modules(domain(), required_modules(GroupName)),
   52:     escalus:create_users(Config2, escalus:get_users([alice, bob])).
   53: 
   54: end_per_group(_, Config) ->
   55:     dynamic_modules:restore_modules(Config),
   56:     escalus:delete_users(Config, escalus:get_users([alice, bob])).
   57: 
   58: init_per_testcase(CaseName, Config) ->
   59:     create_events_collection(),
   60:     escalus:init_per_testcase(CaseName, Config).
   61: 
   62: end_per_testcase(CaseName, Config) ->
   63:     clear_events_collection(),
   64:     escalus:end_per_testcase(CaseName, Config).
   65: 
   66: required_modules(GroupName) ->
   67:     [{mod_event_pusher, #{http => #{handlers => push_http_handler_opts(GroupName)}}}].
   68: 
   69: push_http_handler_opts(GroupName) ->
   70:     BasicOpts = mod_event_pusher_http_handler(),
   71:     [maps:merge(BasicOpts, ExtraOpts) || ExtraOpts <- push_http_handler_extra_opts(GroupName)].
   72: 
   73: push_http_handler_extra_opts(single) ->
   74:     [#{path => <<"push">>}];
   75: push_http_handler_extra_opts(customised) ->
   76:     [#{path => <<"push">>, callback_module => mod_event_pusher_http_custom}];
   77: push_http_handler_extra_opts(multiple) ->
   78:     [#{path => <<"push">>, callback_module => mod_event_pusher_http_custom},
   79:      #{path => <<"push2">>, callback_module => mod_event_pusher_http_custom_2}].
   80: 
   81: %%--------------------------------------------------------------------
   82: %% Tests
   83: %%--------------------------------------------------------------------
   84: 
   85: simple_push(Config) ->
   86:     escalus:fresh_story(
   87:         Config, [{alice, 1}, {bob, 1}],
   88:         fun(Alice, Bob) ->
   89:             send(Alice, Bob, <<"hej">>),
   90:             [R] = got_push(push, 1),
   91:             check_default_format(Alice, Bob, <<"hej">>, R),
   92:             send(Alice, Bob, <<>>),
   93:             got_no_push(push),
   94:             ok
   95:         end).
   96: 
   97: custom_push(Config) ->
   98:     escalus:fresh_story(
   99:         Config, [{alice, 1}, {bob, 1}],
  100:         fun(Alice, Bob) ->
  101:             send(Alice, Bob, <<"hej">>),
  102:             send(Alice, Bob, <<>>),
  103:             % now we receive them both ways, and with a custom body
  104:             Res = got_push(push, 4),
  105:             ?assertEqual([<<"in-">>,<<"in-hej">>,<<"out-">>,<<"out-hej">>], lists:sort(Res)),
  106:             ok
  107:         end).
  108: 
  109: push_to_many(Config) ->
  110:     escalus:fresh_story(
  111:         Config, [{alice, 1}, {bob, 1}],
  112:         fun(Alice, Bob) ->
  113:             send(Alice, Bob, <<"hej">>),
  114:             send(Alice, Bob, <<>>),
  115:             % now we receive them both ways, and with a custom body
  116:             Res = got_push(push, 4),
  117:             ?assertEqual([<<"in-">>,<<"in-hej">>,<<"out-">>,<<"out-hej">>], lists:sort(Res)),
  118:             % while the other backend sends only those 'out'
  119:             Res2 = got_push(push2, 2),
  120:             ?assertEqual([<<"2-out-">>,<<"2-out-hej">>], lists:sort(Res2)),
  121:             ok
  122:         end).
  123: 
  124: %%--------------------------------------------------------------------
  125: %% Libs
  126: %%--------------------------------------------------------------------
  127: 
  128: got_no_push(Type) ->
  129:     ?assertEqual(0, length(ets:lookup(?ETS_TABLE, {got_http_push, Type})), unwanted_push).
  130: 
  131: got_push(Type, Count)->
  132:     Key = {got_http_push, Type},
  133:     mongoose_helper:wait_until(
  134:       fun() -> length(ets:lookup(?ETS_TABLE, Key)) end,
  135:       Count, #{name => http_request_timeout}),
  136:     Bins = lists:map(fun({_, El}) -> El end, ets:lookup(?ETS_TABLE, Key)),
  137:     ?assertEqual(Count, length(Bins)), % Assert that this didn't magically grow in the meantime
  138:     ets:delete(?ETS_TABLE, Key),
  139:     Bins.
  140: 
  141: create_events_collection() ->
  142:     ets:new(?ETS_TABLE, [duplicate_bag, named_table, public]).
  143: 
  144: clear_events_collection() ->
  145:     ets:delete_all_objects(?ETS_TABLE).
  146: 
  147: start_http_listener() ->
  148:     http_helper:start(http_notifications_port(), '_', fun process_notification/1).
  149: 
  150: stop_http_listener() ->
  151:     http_helper:stop().
  152: 
  153: process_notification(Req) ->
  154:     <<$/, BType/binary>> = cowboy_req:path(Req),
  155:     Type = binary_to_atom(BType, utf8),
  156:     {ok, Body, Req1} = cowboy_req:read_body(Req),
  157:     Req2 = cowboy_req:reply(200, #{<<"content-type">> => <<"text/plain">>}, <<"OK">>, Req1),
  158:     Event = {{got_http_push, Type}, Body},
  159:     ets:insert(?ETS_TABLE, Event),
  160:     Req2.
  161: 
  162: check_default_format(From, To, Body, Msg) ->
  163:     Attrs = lists:map(fun(P) -> list_to_tuple(binary:split(P, <<$=>>)) end, binary:split(Msg, <<$&>>, [global])),
  164:     ?assertEqual(to_lower(escalus_client:username(From)), proplists:get_value(<<"author">>, Attrs)),
  165:     ?assertEqual(to_lower(escalus_client:username(To)), proplists:get_value(<<"receiver">>, Attrs)),
  166:     ?assertEqual(Body, proplists:get_value(<<"message">>, Attrs)),
  167:     ?assertEqual(<<"localhost">>, proplists:get_value(<<"server">>, Attrs)),
  168:     ok.
  169: 
  170: start_pool() ->
  171:     PoolOpts = #{strategy => random_worker, call_timeout => 5000, workers => 10},
  172:     ConnOpts = #{host => http_notifications_host(), request_timeout => 5000},
  173:     Pool = config([outgoing_pools, http, http_pool],
  174:                   #{scope => host_type, opts => PoolOpts, conn_opts => ConnOpts}),
  175:     [{ok, _Pid}] = rpc(mongoose_wpool, start_configured_pools, [[Pool], [<<"localhost">>]]).
  176: 
  177: stop_pool() ->
  178:     rpc(mongoose_wpool, stop, [http, <<"localhost">>, http_pool]).
  179: 
  180: setup_modules() ->
  181:     {Mod, Code} = rpc(dynamic_compile, from_string, [custom_module_code()]),
  182:     rpc(code, load_binary, [Mod, "mod_event_pusher_http_custom.erl", Code]),
  183:     {Mod2, Code2} = rpc(dynamic_compile, from_string, [custom_module_code_2()]),
  184:     rpc(code, load_binary, [Mod2, "mod_event_pusher_http_custom_2.erl", Code2]),
  185:     ok.
  186: 
  187: teardown_modules() ->
  188:     ok.
  189: 
  190: rpc(M, F, A) ->
  191:     distributed_helper:rpc(distributed_helper:mim(), M, F, A).
  192: 
  193: custom_module_code() ->
  194:     "-module(mod_event_pusher_http_custom).
  195:      -export([should_make_req/6, prepare_body/7, prepare_headers/7]).
  196:      should_make_req(Acc, _, _, _, _, _) ->
  197:          case mongoose_acc:stanza_name(Acc) of
  198:              <<\"message\">> -> true;
  199:              _ -> false
  200:          end.
  201:      prepare_headers(_, _, _, _, _, _, _) ->
  202:          mod_event_pusher_http_defaults:prepare_headers(x, x, x, x, x, x, x).
  203:      prepare_body(_Acc, Dir, _Host, Message, _Sender, _Receiver, _Opts) ->
  204:          <<(atom_to_binary(Dir, utf8))/binary, $-, Message/binary>>.
  205:      "
  206: .
  207: 
  208: custom_module_code_2() ->
  209:     "-module(mod_event_pusher_http_custom_2).
  210:      -export([should_make_req/6, prepare_body/7, prepare_headers/7]).
  211:      should_make_req(Acc, out, _, _, _, _) ->
  212:          case mongoose_acc:stanza_name(Acc) of
  213:              <<\"message\">> -> true;
  214:              _ -> false
  215:          end;
  216:      should_make_req(_, in, _, _, _, _) -> false.
  217:      prepare_headers(_, _, _, _, _, _, _) ->
  218:          mod_event_pusher_http_defaults:prepare_headers(x, x, x, x, x, x, x).
  219:      prepare_body(_Acc, Dir, _Host, Message, _Sender, _Receiver, _Opts) ->
  220:          <<$2, $-, (atom_to_binary(Dir, utf8))/binary, $-, Message/binary>>.
  221:      "
  222:     .
  223: 
  224: to_lower(B) ->
  225:     list_to_binary(
  226:         string:to_lower(
  227:             binary_to_list(
  228:                 B
  229:             )
  230:         )
  231:     ).
  232: 
  233: send(Alice, Bob, Body) ->
  234:     Stanza = escalus_stanza:chat_to(Bob, Body),
  235:     escalus_client:send(Alice, Stanza).