1: %%==============================================================================
    2: %% Copyright 2018 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mongoose_rabbit_worker_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: -author('kacper.mentel@erlang-solutions.com').
   20: 
   21: -include_lib("common_test/include/ct.hrl").
   22: -include_lib("eunit/include/eunit.hrl").
   23: -include("assert_received_match.hrl").
   24: 
   25: -define(HOST, <<"localhost">>).
   26: -define(AMQP_MOCK_MODULES, [amqp_connection, amqp_channel]).
   27: -define(MAX_QUEUE_LEN, 1000).
   28: 
   29: %%--------------------------------------------------------------------
   30: %% Suite configuration
   31: %%--------------------------------------------------------------------
   32: 
   33: all() ->
   34:     [
   35:      no_request_in_worker_queue_is_lost_when_amqp_call_fails,
   36:      worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails,
   37:      worker_processes_msgs_when_queue_msg_len_limit_is_not_reached,
   38:      worker_drops_msgs_when_queue_msg_len_limit_is_reached
   39:     ].
   40: 
   41: %%--------------------------------------------------------------------
   42: %% Init & teardown
   43: %%--------------------------------------------------------------------
   44: 
   45: init_per_suite(Config) ->
   46:     mock_amqp(),
   47:     mock_metrics(),
   48:     Config.
   49: 
   50: end_per_suite(Config) ->
   51:     unload_amqp(),
   52:     meck:unload(mongoose_metrics),
   53:     Config.
   54: 
   55: init_per_testcase(_Case, Config) ->
   56:     WorkerOpts = [{host, ?HOST},
   57:                   {amqp_client_opts, []},
   58:                   {confirms, false},
   59:                   {max_queue_len, ?MAX_QUEUE_LEN}],
   60:     {ok, WorkerPid} = gen_server:start(mongoose_rabbit_worker, WorkerOpts, []),
   61:     Config ++ [{worker_pid, WorkerPid}].
   62: 
   63: end_per_testcase(_Case, Config) ->
   64:     exit(proplists:get_value(worker_pid, Config), kill),
   65:     Config.
   66: 
   67: %%--------------------------------------------------------------------
   68: %% Tests
   69: %%--------------------------------------------------------------------
   70: 
   71: no_request_in_worker_queue_is_lost_when_amqp_call_fails(Config) ->
   72:     %% given
   73:     Self = self(),
   74:     Worker = proplists:get_value(worker_pid, Config),
   75:     Ref = make_ref(),
   76:     Lock = lock_fun(),
   77:     SendBack = send_back_fun(),
   78:     Exception = exception_fun(),
   79: 
   80:     %% when
   81:     gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
   82:     gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}),
   83:     gen_server:cast(Worker, {amqp_publish, {SendBack, [Self, Ref]}, ok}),
   84:     spawn(fun() ->
   85:                   gen_server:call(Worker, {amqp_call, {Exception, [ok]}})
   86:           end),
   87:     spawn(fun() ->
   88:                   gen_server:call(Worker, {amqp_call, {SendBack, [Self, Ref]}})
   89:           end),
   90: 
   91:     %% unlock the worker
   92:     Worker ! Ref,
   93: 
   94:     %% then
   95:     [?assertReceivedMatch(Ref) || _ <- lists:seq(1,2)].
   96: 
   97: worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails(Config) ->
   98:     %% given
   99:     Worker = proplists:get_value(worker_pid, Config),
  100:     Exception = exception_fun(),
  101:     ConnectionAndChannel0 = get_worker_conn_and_chann(Worker),
  102: 
  103:     %% when amqp_publish fails
  104:     gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}),
  105:     %% then
  106:     ConnectionAndChannel1 = get_worker_conn_and_chann(Worker),
  107:     ?assertNotMatch(ConnectionAndChannel0, ConnectionAndChannel1),
  108: 
  109:     %% when amqp_call fails
  110:     gen_server:call(Worker, {amqp_call, {Exception, [ok]}}),
  111: 
  112:     %% then
  113:     ConnectionAndChannel2 = get_worker_conn_and_chann(Worker),
  114:     ?assertNotMatch(ConnectionAndChannel1, ConnectionAndChannel2).
  115: 
  116: 
  117: worker_processes_msgs_when_queue_msg_len_limit_is_not_reached(Config) ->
  118:     %% given
  119:     Worker = proplists:get_value(worker_pid, Config),
  120:     Ref = make_ref(),
  121:     Lock = lock_fun(),
  122:     SendBack = send_back_fun(),
  123: 
  124:     %% when
  125:     gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
  126:     gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}),
  127:     [gen_server:cast(Worker, {amqp_publish, ok, ok})
  128:      || _ <- lists:seq(1, ?MAX_QUEUE_LEN-1)],
  129: 
  130:     %% unlock the worker
  131:     Worker ! Ref,
  132: 
  133:     %% then
  134:     ?assertReceivedMatch(Ref, 100).
  135: 
  136: worker_drops_msgs_when_queue_msg_len_limit_is_reached(Config) ->
  137:     %% given
  138:     Worker = proplists:get_value(worker_pid, Config),
  139:     Ref = make_ref(),
  140:     Lock = lock_fun(),
  141:     SendBack = send_back_fun(),
  142: 
  143:     %% when
  144:     gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}),
  145:     gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}),
  146:     [gen_server:cast(Worker, {amqp_publish, ok, ok})
  147:      || _ <- lists:seq(1, ?MAX_QUEUE_LEN+1)],
  148: 
  149:     %% unlock the worker
  150:     Worker ! Ref,
  151: 
  152:     %% then
  153:     ?assertError({assertReceivedMatch_failed, _},
  154:                  ?assertReceivedMatch(Ref, 100)).
  155: 
  156: %%--------------------------------------------------------------------
  157: %% Helpers
  158: %%--------------------------------------------------------------------
  159: 
  160: mock_amqp() ->
  161:     [meck:new(M, [no_link, passthrough]) || M <- ?AMQP_MOCK_MODULES],
  162:     meck:expect(amqp_connection, start, fun(_) -> {ok, random_pid()} end),
  163:     meck:expect(amqp_connection, open_channel, fun(_) -> {ok, random_pid()} end),
  164:     meck:expect(amqp_connection, close, fun(_) -> ok end),
  165:     meck:expect(amqp_channel, close, fun(_) -> ok end),
  166:     meck:expect(amqp_channel, call,
  167:                 fun(_, {F, A}) when is_function(F) -> apply(F, A);
  168:                    (_, _) -> ok
  169:                 end),
  170:     meck:expect(amqp_channel, call,
  171:                 fun(_, {F, A}, _) when is_function(F) -> apply(F, A);
  172:                    (_, _, _) -> ok
  173:                 end).
  174: 
  175: mock_metrics() ->
  176:     meck:new(mongoose_metrics, [no_link, passthrough]),
  177:     meck:expect(mongoose_metrics, update, fun(_, _, _) -> ok end).
  178: 
  179: unload_amqp() ->
  180:     [meck:unload(M) || M <- ?AMQP_MOCK_MODULES].
  181: 
  182: random_pid() ->
  183:     spawn(fun() -> ok end).
  184: 
  185: get_worker_conn_and_chann(Worker) ->
  186:     State = sys:get_state(Worker),
  187:     {maps:get(connection, State), maps:get(channel, State)}.
  188: 
  189: %% Funs are wrapped into functions for tracing
  190: 
  191: lock_fun() ->
  192:     fun lock/1.
  193: 
  194: lock(R) ->
  195:     receive
  196:         R -> unlocked
  197:     end.
  198: 
  199: send_back_fun() ->
  200:     fun erlang:send/2.
  201: 
  202: exception_fun() ->
  203:     fun exception/1.
  204: 
  205: exception(_) ->
  206:     throw(exception).