1: %%==============================================================================
    2: %% Copyright 2018 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mongoose_rabbit_worker_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: -author('kacper.mentel@erlang-solutions.com').
   20: 
   21: -include_lib("common_test/include/ct.hrl").
   22: -include_lib("eunit/include/eunit.hrl").
   23: -include("assert_received_match.hrl").
   24: 
   25: -define(HOST, <<"localhost">>).
   26: -define(AMQP_MOCK_MODULES, [amqp_connection, amqp_channel]).
   27: -define(MAX_QUEUE_LEN, 1000).
   28: 
   29: %%--------------------------------------------------------------------
   30: %% Suite configuration
   31: %%--------------------------------------------------------------------
   32: 
   33: all() ->
   34:     [
   35:      no_request_in_worker_queue_is_lost_when_amqp_call_fails,
   36:      worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails,
   37:      worker_processes_msgs_when_queue_msg_len_limit_is_not_reached,
   38:      worker_drops_msgs_when_queue_msg_len_limit_is_reached
   39:     ].
   40: 
   41: %%--------------------------------------------------------------------
   42: %% Init & teardown
   43: %%--------------------------------------------------------------------
   44: 
   45: init_per_suite(Config) ->
   46:     mock_amqp(),
   47:     mock_metrics(),
   48:     Config.
   49: 
   50: end_per_suite(Config) ->
   51:     unload_amqp(),
   52:     meck:unload(mongoose_metrics),
   53:     Config.
   54: 
   55: init_per_testcase(_Case, Config) ->
   56:     WorkerOpts = [{host, ?HOST},
   57:                   {amqp_client_opts, []},
   58:                   {confirms, false},
   59:                   {max_queue_len, ?MAX_QUEUE_LEN}],
   60:     {ok, WorkerPid} = gen_server:start(mongoose_rabbit_worker, WorkerOpts, []),
   61:     Config ++ [{worker_pid, WorkerPid}].
   62: 
   63: end_per_testcase(_Case, Config) ->
   64:     exit(proplists:get_value(worker_pid, Config), kill),
   65:     Config.
   66: 
   67: %%--------------------------------------------------------------------
   68: %% Tests
   69: %%--------------------------------------------------------------------
   70: 
   71: no_request_in_worker_queue_is_lost_when_amqp_call_fails(Config) ->
   72:     %% given
   73:     Self = self(),
   74:     Worker = proplists:get_value(worker_pid, Config),
   75:     Ref = make_ref(),
   76:     Lock = lock_fun(),
   77:     SendBack = send_back_fun(),
   78:     Exception = exception_fun(),
   79: 
   80:     %% when
   81:     gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
   82:     gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}),
   83:     gen_server:cast(Worker, {amqp_publish, {SendBack, [Self, Ref]}, ok}),
   84:     spawn(fun() ->
   85:                   gen_server:call(Worker, {amqp_call, {Exception, [ok]}})
   86:           end),
   87:     spawn(fun() ->
   88:                   gen_server:call(Worker, {amqp_call, {SendBack, [Self, Ref]}})
   89:           end),
   90: 
   91:     %% unlock the worker
   92:     Worker ! Ref,
   93: 
   94:     %% then
   95:     [?assertReceivedMatch(Ref) || _ <- lists:seq(1,2)].
   96: 
   97: worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails(Config) ->
   98:     %% given
   99:     Worker = proplists:get_value(worker_pid, Config),
  100:     Exception = exception_fun(),
  101:     ConnectionAndChannel0 = get_worker_conn_and_chann(Worker),
  102: 
  103:     %% when amqp_publish fails
  104:     gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}),
  105:     %% then
  106:     ConnectionAndChannel1 = get_worker_conn_and_chann(Worker),
  107:     ?assertNotMatch(ConnectionAndChannel0, ConnectionAndChannel1),
  108: 
  109:     %% when amqp_call fails
  110:     gen_server:call(Worker, {amqp_call, {Exception, [ok]}}),
  111: 
  112:     %% then
  113:     ConnectionAndChannel2 = get_worker_conn_and_chann(Worker),
  114:     ?assertNotMatch(ConnectionAndChannel1, ConnectionAndChannel2),
  115: 
  116:     %% when connection alive but channel fails
  117:     meck:expect(amqp_connection, start, fun(_) -> {ok, random_long_running_pid()} end),
  118:     gen_server:call(Worker, {amqp_call, {Exception, [ok]}}),
  119:     gen_server:call(Worker, {amqp_call, {Exception, [ok]}}),
  120: 
  121:     %% then
  122:     {Connection3, Channel3} = ConnectionAndChannel3 = get_worker_conn_and_chann(Worker),
  123:     ?assert(is_pid(Connection3)),
  124:     ?assert(is_pid(Channel3)),
  125:     ?assertNotMatch(ConnectionAndChannel2, ConnectionAndChannel3).
  126: 
  127: 
  128: worker_processes_msgs_when_queue_msg_len_limit_is_not_reached(Config) ->
  129:     %% given
  130:     Worker = proplists:get_value(worker_pid, Config),
  131:     Ref = make_ref(),
  132:     Lock = lock_fun(),
  133:     SendBack = send_back_fun(),
  134: 
  135:     %% when
  136:     gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
  137:     gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}),
  138:     [gen_server:cast(Worker, {amqp_publish, ok, ok})
  139:      || _ <- lists:seq(1, ?MAX_QUEUE_LEN-1)],
  140: 
  141:     %% unlock the worker
  142:     Worker ! Ref,
  143: 
  144:     %% then
  145:     ?assertReceivedMatch(Ref, 100).
  146: 
  147: worker_drops_msgs_when_queue_msg_len_limit_is_reached(Config) ->
  148:     %% given
  149:     Worker = proplists:get_value(worker_pid, Config),
  150:     Ref = make_ref(),
  151:     Lock = lock_fun(),
  152:     SendBack = send_back_fun(),
  153: 
  154:     %% when
  155:     gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
  156:     gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}),
  157:     [gen_server:cast(Worker, {amqp_publish, ok, ok})
  158:      || _ <- lists:seq(1, ?MAX_QUEUE_LEN+1)],
  159: 
  160:     %% unlock the worker
  161:     Worker ! Ref,
  162: 
  163:     %% then
  164:     ?assertError({assertReceivedMatch_failed, _},
  165:                  ?assertReceivedMatch(Ref, 100)).
  166: 
  167: %%--------------------------------------------------------------------
  168: %% Helpers
  169: %%--------------------------------------------------------------------
  170: 
  171: mock_amqp() ->
  172:     [meck:new(M, [no_link, passthrough]) || M <- ?AMQP_MOCK_MODULES],
  173:     meck:expect(amqp_connection, start, fun(_) -> {ok, random_pid()} end),
  174:     meck:expect(amqp_connection, open_channel, fun(_) -> {ok, random_pid()} end),
  175:     meck:expect(amqp_connection, close, fun(_) -> ok end),
  176:     meck:expect(amqp_channel, close, fun(_) -> ok end),
  177:     meck:expect(amqp_channel, call,
  178:                 fun(_, {F, A}) when is_function(F) -> apply(F, A);
  179:                    (_, _) -> ok
  180:                 end),
  181:     meck:expect(amqp_channel, call,
  182:                 fun(_, {F, A}, _) when is_function(F) -> apply(F, A);
  183:                    (_, _, _) -> ok
  184:                 end).
  185: 
  186: mock_metrics() ->
  187:     meck:new(mongoose_metrics, [no_link, passthrough]),
  188:     meck:expect(mongoose_metrics, update, fun(_, _, _) -> ok end).
  189: 
  190: unload_amqp() ->
  191:     [meck:unload(M) || M <- ?AMQP_MOCK_MODULES].
  192: 
  193: random_pid() ->
  194:     spawn(fun() -> ok end).
  195: 
  196: random_long_running_pid() ->
  197:     spawn(fun() -> timer:sleep(5000) end).
  198: 
  199: get_worker_conn_and_chann(Worker) ->
  200:     State = sys:get_state(Worker),
  201:     {maps:get(connection, State), maps:get(channel, State)}.
  202: 
  203: %% Funs are wrapped into functions for tracing
  204: 
  205: lock_fun() ->
  206:     fun lock/1.
  207: 
  208: lock(R) ->
  209:     receive
  210:         R -> unlocked
  211:     end.
  212: 
  213: send_back_fun() ->
  214:     fun erlang:send/2.
  215: 
  216: exception_fun() ->
  217:     fun exception/1.
  218: 
  219: exception(_) ->
  220:     throw(exception).