1: %%============================================================================== 2: %% Copyright 2018 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mongoose_rabbit_worker_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: -author('kacper.mentel@erlang-solutions.com'). 20: 21: -include_lib("common_test/include/ct.hrl"). 22: -include_lib("eunit/include/eunit.hrl"). 23: -include("assert_received_match.hrl"). 24: 25: -define(HOST, <<"localhost">>). 26: -define(AMQP_MOCK_MODULES, [amqp_connection, amqp_channel]). 27: -define(MAX_QUEUE_LEN, 1000). 28: 29: %%-------------------------------------------------------------------- 30: %% Suite configuration 31: %%-------------------------------------------------------------------- 32: 33: all() -> 34: [ 35: no_request_in_worker_queue_is_lost_when_amqp_call_fails, 36: worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails, 37: worker_processes_msgs_when_queue_msg_len_limit_is_not_reached, 38: worker_drops_msgs_when_queue_msg_len_limit_is_reached 39: ]. 40: 41: %%-------------------------------------------------------------------- 42: %% Init & teardown 43: %%-------------------------------------------------------------------- 44: 45: init_per_suite(Config) -> 46: mock_amqp(), 47: mock_metrics(), 48: Config. 49: 50: end_per_suite(Config) -> 51: unload_amqp(), 52: meck:unload(mongoose_metrics), 53: Config. 54: 55: init_per_testcase(_Case, Config) -> 56: WorkerOpts = [{host, ?HOST}, 57: {amqp_client_opts, []}, 58: {confirms, false}, 59: {max_queue_len, ?MAX_QUEUE_LEN}], 60: {ok, WorkerPid} = gen_server:start(mongoose_rabbit_worker, WorkerOpts, []), 61: Config ++ [{worker_pid, WorkerPid}]. 62: 63: end_per_testcase(_Case, Config) -> 64: exit(proplists:get_value(worker_pid, Config), kill), 65: Config. 66: 67: %%-------------------------------------------------------------------- 68: %% Tests 69: %%-------------------------------------------------------------------- 70: 71: no_request_in_worker_queue_is_lost_when_amqp_call_fails(Config) -> 72: %% given 73: Self = self(), 74: Worker = proplists:get_value(worker_pid, Config), 75: Ref = make_ref(), 76: Lock = lock_fun(), 77: SendBack = send_back_fun(), 78: Exception = exception_fun(), 79: 80: %% when 81: gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), 82: gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}), 83: gen_server:cast(Worker, {amqp_publish, {SendBack, [Self, Ref]}, ok}), 84: spawn(fun() -> 85: gen_server:call(Worker, {amqp_call, {Exception, [ok]}}) 86: end), 87: spawn(fun() -> 88: gen_server:call(Worker, {amqp_call, {SendBack, [Self, Ref]}}) 89: end), 90: 91: %% unlock the worker 92: Worker ! Ref, 93: 94: %% then 95: [?assertReceivedMatch(Ref) || _ <- lists:seq(1,2)]. 96: 97: worker_creates_fresh_amqp_conection_and_channel_when_amqp_call_fails(Config) -> 98: %% given 99: Worker = proplists:get_value(worker_pid, Config), 100: Exception = exception_fun(), 101: ConnectionAndChannel0 = get_worker_conn_and_chann(Worker), 102: 103: %% when amqp_publish fails 104: gen_server:cast(Worker, {amqp_publish, {Exception, [ok]}, ok}), 105: %% then 106: ConnectionAndChannel1 = get_worker_conn_and_chann(Worker), 107: ?assertNotMatch(ConnectionAndChannel0, ConnectionAndChannel1), 108: 109: %% when amqp_call fails 110: gen_server:call(Worker, {amqp_call, {Exception, [ok]}}), 111: 112: %% then 113: ConnectionAndChannel2 = get_worker_conn_and_chann(Worker), 114: ?assertNotMatch(ConnectionAndChannel1, ConnectionAndChannel2). 115: 116: 117: worker_processes_msgs_when_queue_msg_len_limit_is_not_reached(Config) -> 118: %% given 119: Worker = proplists:get_value(worker_pid, Config), 120: Ref = make_ref(), 121: Lock = lock_fun(), 122: SendBack = send_back_fun(), 123: 124: %% when 125: gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), 126: gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}), 127: [gen_server:cast(Worker, {amqp_publish, ok, ok}) 128: || _ <- lists:seq(1, ?MAX_QUEUE_LEN-1)], 129: 130: %% unlock the worker 131: Worker ! Ref, 132: 133: %% then 134: ?assertReceivedMatch(Ref, 100). 135: 136: worker_drops_msgs_when_queue_msg_len_limit_is_reached(Config) -> 137: %% given 138: Worker = proplists:get_value(worker_pid, Config), 139: Ref = make_ref(), 140: Lock = lock_fun(), 141: SendBack = send_back_fun(), 142: 143: %% when 144: gen_server:cast(Worker, {amqp_publish, {Lock, [Ref]}, ok}), 145: gen_server:cast(Worker, {amqp_publish, {SendBack, [self(), Ref]}, ok}), 146: [gen_server:cast(Worker, {amqp_publish, ok, ok}) 147: || _ <- lists:seq(1, ?MAX_QUEUE_LEN+1)], 148: 149: %% unlock the worker 150: Worker ! Ref, 151: 152: %% then 153: ?assertError({assertReceivedMatch_failed, _}, 154: ?assertReceivedMatch(Ref, 100)). 155: 156: %%-------------------------------------------------------------------- 157: %% Helpers 158: %%-------------------------------------------------------------------- 159: 160: mock_amqp() -> 161: [meck:new(M, [no_link, passthrough]) || M <- ?AMQP_MOCK_MODULES], 162: meck:expect(amqp_connection, start, fun(_) -> {ok, random_pid()} end), 163: meck:expect(amqp_connection, open_channel, fun(_) -> {ok, random_pid()} end), 164: meck:expect(amqp_connection, close, fun(_) -> ok end), 165: meck:expect(amqp_channel, close, fun(_) -> ok end), 166: meck:expect(amqp_channel, call, 167: fun(_, {F, A}) when is_function(F) -> apply(F, A); 168: (_, _) -> ok 169: end), 170: meck:expect(amqp_channel, call, 171: fun(_, {F, A}, _) when is_function(F) -> apply(F, A); 172: (_, _, _) -> ok 173: end). 174: 175: mock_metrics() -> 176: meck:new(mongoose_metrics, [no_link, passthrough]), 177: meck:expect(mongoose_metrics, update, fun(_, _, _) -> ok end). 178: 179: unload_amqp() -> 180: [meck:unload(M) || M <- ?AMQP_MOCK_MODULES]. 181: 182: random_pid() -> 183: spawn(fun() -> ok end). 184: 185: get_worker_conn_and_chann(Worker) -> 186: State = sys:get_state(Worker), 187: {maps:get(connection, State), maps:get(channel, State)}. 188: 189: %% Funs are wrapped into functions for tracing 190: 191: lock_fun() -> 192: fun lock/1. 193: 194: lock(R) -> 195: receive 196: R -> unlocked 197: end. 198: 199: send_back_fun() -> 200: fun erlang:send/2. 201: 202: exception_fun() -> 203: fun exception/1. 204: 205: exception(_) -> 206: throw(exception).