1: %%============================================================================== 2: %% Copyright 2018 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mongoose_rabbit_worker_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: -author('kacper.mentel@erlang-solutions.com'). 20: 21: -include_lib("common_test/include/ct.hrl"). 22: -include_lib("eunit/include/eunit.hrl"). 23: -include("assert_received_match.hrl"). 24: 25: -define(HOST, <<"localhost">>). 26: -define(AMQP_MOCK_MODULES, [amqp_connection, amqp_channel]). 27: -define(MAX_QUEUE_LEN, 1000). 28: 29: %%-------------------------------------------------------------------- 30: %% Suite configuration 31: %%-------------------------------------------------------------------- 32: 33: all() -> 34: [ 35: no_request_in_worker_queue_is_lost_when_amqp_call_fails, 36: worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails, 37: worker_processes_msgs_when_queue_msg_len_limit_is_not_reached, 38: worker_drops_msgs_when_queue_msg_len_limit_is_reached 39: ]. 40: 41: %%-------------------------------------------------------------------- 42: %% Init & teardown 43: %%-------------------------------------------------------------------- 44: 45: init_per_suite(Config) -> 46: mock_amqp(), 47: mock_metrics(), 48: Config. 49: 50: end_per_suite(Config) -> 51: unload_amqp(), 52: meck:unload(mongoose_metrics), 53: Config. 54: 55: init_per_testcase(_Case, Config) -> 56: WorkerOpts = [{host, ?HOST}, 57: {amqp_client_opts, []}, 58: {confirms, false}, 59: {max_queue_len, ?MAX_QUEUE_LEN}], 60: {ok, WorkerPid} = gen_server:start(mongoose_rabbit_worker, WorkerOpts, []), 61: Config ++ [{worker_pid, WorkerPid}]. 62: 63: end_per_testcase(_Case, Config) -> 64: exit(proplists:get_value(worker_pid, Config), kill), 65: Config. 66: 67: %%-------------------------------------------------------------------- 68: %% Tests 69: %%-------------------------------------------------------------------- 70: 71: no_request_in_worker_queue_is_lost_when_amqp_call_fails(Config) -> 72: %% given 73: Self = self(), 74: Worker = proplists:get_value(worker_pid, Config), 75: Ref = make_ref(), 76: Lock = lock_fun(), 77: SendBack = send_back_fun(), 78: Exception = exception_fun(), 79: 80: %% when 81: gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), 82: gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}), 83: gen_server:cast(Worker, {amqp_publish, {SendBack, [Self, Ref]}, ok}), 84: spawn(fun() -> 85: gen_server:call(Worker, {amqp_call, {Exception, [ok]}}) 86: end), 87: spawn(fun() -> 88: gen_server:call(Worker, {amqp_call, {SendBack, [Self, Ref]}}) 89: end), 90: 91: %% unlock the worker 92: Worker ! Ref, 93: 94: %% then 95: [?assertReceivedMatch(Ref) || _ <- lists:seq(1,2)]. 96: 97: worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails(Config) -> 98: %% given 99: Worker = proplists:get_value(worker_pid, Config), 100: Exception = exception_fun(), 101: ConnectionAndChannel0 = get_worker_conn_and_chann(Worker), 102: 103: %% when amqp_publish fails 104: gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}), 105: %% then 106: ConnectionAndChannel1 = get_worker_conn_and_chann(Worker), 107: ?assertNotMatch(ConnectionAndChannel0, ConnectionAndChannel1), 108: 109: %% when amqp_call fails 110: gen_server:call(Worker, {amqp_call, {Exception, [ok]}}), 111: 112: %% then 113: ConnectionAndChannel2 = get_worker_conn_and_chann(Worker), 114: ?assertNotMatch(ConnectionAndChannel1, ConnectionAndChannel2), 115: 116: %% when connection alive but channel fails 117: meck:expect(amqp_connection, start, fun(_) -> {ok, random_long_running_pid()} end), 118: gen_server:call(Worker, {amqp_call, {Exception, [ok]}}), 119: gen_server:call(Worker, {amqp_call, {Exception, [ok]}}), 120: 121: %% then 122: {Connection3, Channel3} = ConnectionAndChannel3 = get_worker_conn_and_chann(Worker), 123: ?assert(is_pid(Connection3)), 124: ?assert(is_pid(Channel3)), 125: ?assertNotMatch(ConnectionAndChannel2, ConnectionAndChannel3). 126: 127: 128: worker_processes_msgs_when_queue_msg_len_limit_is_not_reached(Config) -> 129: %% given 130: Worker = proplists:get_value(worker_pid, Config), 131: Ref = make_ref(), 132: Lock = lock_fun(), 133: SendBack = send_back_fun(), 134: 135: %% when 136: gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), 137: gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}), 138: [gen_server:cast(Worker, {amqp_publish, ok, ok}) 139: || _ <- lists:seq(1, ?MAX_QUEUE_LEN-1)], 140: 141: %% unlock the worker 142: Worker ! Ref, 143: 144: %% then 145: ?assertReceivedMatch(Ref, 100). 146: 147: worker_drops_msgs_when_queue_msg_len_limit_is_reached(Config) -> 148: %% given 149: Worker = proplists:get_value(worker_pid, Config), 150: Ref = make_ref(), 151: Lock = lock_fun(), 152: SendBack = send_back_fun(), 153: 154: %% when 155: gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), 156: gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}), 157: [gen_server:cast(Worker, {amqp_publish, ok, ok}) 158: || _ <- lists:seq(1, ?MAX_QUEUE_LEN+1)], 159: 160: %% unlock the worker 161: Worker ! Ref, 162: 163: %% then 164: ?assertError({assertReceivedMatch_failed, _}, 165: ?assertReceivedMatch(Ref, 100)). 166: 167: %%-------------------------------------------------------------------- 168: %% Helpers 169: %%-------------------------------------------------------------------- 170: 171: mock_amqp() -> 172: [meck:new(M, [no_link, passthrough]) || M <- ?AMQP_MOCK_MODULES], 173: meck:expect(amqp_connection, start, fun(_) -> {ok, random_pid()} end), 174: meck:expect(amqp_connection, open_channel, fun(_) -> {ok, random_pid()} end), 175: meck:expect(amqp_connection, close, fun(_) -> ok end), 176: meck:expect(amqp_channel, close, fun(_) -> ok end), 177: meck:expect(amqp_channel, call, 178: fun(_, {F, A}) when is_function(F) -> apply(F, A); 179: (_, _) -> ok 180: end), 181: meck:expect(amqp_channel, call, 182: fun(_, {F, A}, _) when is_function(F) -> apply(F, A); 183: (_, _, _) -> ok 184: end). 185: 186: mock_metrics() -> 187: meck:new(mongoose_metrics, [no_link, passthrough]), 188: meck:expect(mongoose_metrics, update, fun(_, _, _) -> ok end). 189: 190: unload_amqp() -> 191: [meck:unload(M) || M <- ?AMQP_MOCK_MODULES]. 192: 193: random_pid() -> 194: spawn(fun() -> ok end). 195: 196: random_long_running_pid() -> 197: spawn(fun() -> timer:sleep(5000) end). 198: 199: get_worker_conn_and_chann(Worker) -> 200: State = sys:get_state(Worker), 201: {maps:get(connection, State), maps:get(channel, State)}. 202: 203: %% Funs are wrapped into functions for tracing 204: 205: lock_fun() -> 206: fun lock/1. 207: 208: lock(R) -> 209: receive 210: R -> unlocked 211: end. 212: 213: send_back_fun() -> 214: fun erlang:send/2. 215: 216: exception_fun() -> 217: fun exception/1. 218: 219: exception(_) -> 220: throw(exception).