1: %%============================================================================== 2: %% Copyright 2018 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mongoose_wpool_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: -author('konrad.zemek@erlang-solutions.com'). 20: 21: -include_lib("common_test/include/ct.hrl"). 22: -include_lib("eunit/include/eunit.hrl"). 23: 24: %%-------------------------------------------------------------------- 25: %% Suite configuration 26: %%-------------------------------------------------------------------- 27: 28: all() -> 29: [ 30: get_pools_returns_pool_names, 31: stats_passes_through_to_wpool_stats, 32: a_global_riak_pool_is_started, 33: two_distinct_redis_pools_are_started, 34: generic_pools_are_started_for_all_vhosts, 35: host_specific_pools_are_preserved, 36: pools_for_different_tag_are_expanded_with_host_specific_config_preserved, 37: global_pool_is_used_by_default, 38: request_behaves_as_gen_server_send_request, 39: dead_pool_is_restarted, 40: dead_pool_is_stopped_before_restarted, 41: riak_pool_cant_be_started_with_available_worker_strategy, 42: redis_pool_cant_be_started_with_available_worker_strategy 43: ]. 44: 45: %%-------------------------------------------------------------------- 46: %% Init & teardown 47: %%-------------------------------------------------------------------- 48: 49: init_per_suite(Config) -> 50: ok = meck:new(wpool, [no_link, passthrough]), 51: ok = meck:new(mongoose_wpool, [no_link, passthrough]), 52: [mongoose_config:set_opt(Key, Value) || {Key, Value} <- opts()], 53: Self = self(), 54: spawn(fun() -> 55: register(test_helper, self()), 56: mongoose_wpool:ensure_started(), 57: Self ! ready, 58: receive stop -> ok end 59: end), 60: receive ready -> ok end, 61: Config. 62: 63: end_per_suite(Config) -> 64: meck:unload(wpool), 65: whereis(test_helper) ! stop, 66: [mongoose_config:unset_opt(Key) || {Key, _Value} <- opts()], 67: Config. 68: 69: opts() -> 70: [{hosts, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]}, 71: {host_types, []}]. 72: 73: init_per_testcase(_Case, Config) -> 74: cleanup_pools(), 75: Config. 76: 77: end_per_testcase(_Case, Config) -> 78: cleanup_pools(), 79: Config. 80: 81: %%-------------------------------------------------------------------- 82: %% Tests 83: %%-------------------------------------------------------------------- 84: 85: get_pools_returns_pool_names(_Config) -> 86: mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]), 87: mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]), 88: mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]), 89: 90: ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}], 91: ordsets:from_list(mongoose_wpool:get_pools())). 92: 93: stats_passes_through_to_wpool_stats(_Config) -> 94: mongoose_wpool:start(generic, global, z, [{workers, 1}]), 95: Ref = make_ref(), 96: meck:expect(wpool, stats, fun(_Name) -> Ref end), 97: ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)). 98: 99: a_global_riak_pool_is_started(_Config) -> 100: PoolName = mongoose_wpool:make_pool_name(riak, global, default), 101: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)), 102: Pool = #{type => riak, scope => global, tag => default, 103: opts => #{workers => 12}, 104: conn_opts => #{address => "localhost", port => 1805}}, 105: [{ok, PoolName}] = mongoose_wpool:start_configured_pools([Pool]), 106: 107: MgrPid = whereis(mongoose_wpool_mgr:name(riak)), 108: [{PoolName, CallArgs}] = filter_calls_to_start_sup_pool(MgrPid), 109: ?assertEqual(12, proplists:get_value(workers, CallArgs)), 110: ?assertMatch({riakc_pb_socket, _}, proplists:get_value(worker, CallArgs)). 111: 112: filter_calls_to_start_sup_pool(Pid) -> 113: H = meck_history:get_history(Pid, mongoose_wpool), 114: F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}}; 115: (_) -> false 116: end, 117: lists:filtermap(F, H). 118: 119: two_distinct_redis_pools_are_started(_C) -> 120: PoolName1 = mongoose_wpool:make_pool_name(redis, global, default), 121: PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist), 122: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])), 123: Pools = [#{type => redis, scope => global, tag => default, opts => #{workers => 2}, 124: conn_opts => #{host => "localhost", 125: port => 1805, 126: database => 0, 127: password => ""}}, 128: #{type => redis, scope => global, tag => global_dist, opts => #{workers => 4}, 129: conn_opts => #{host => "localhost2", 130: port => 1806, 131: database => 0, 132: password => ""}}], 133: 134: [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools), 135: 136: MgrPid = whereis(mongoose_wpool_mgr:name(redis)), 137: [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid), 138: ?assertEqual(2, proplists:get_value(workers, CallArgs1)), 139: ?assertEqual(4, proplists:get_value(workers, CallArgs2)), 140: ?assertMatch({eredis_client, ["localhost", 1805 | _]}, proplists:get_value(worker, CallArgs1)), 141: ?assertMatch({eredis_client, ["localhost2", 1806 | _]}, proplists:get_value(worker, CallArgs2)). 142: 143: generic_pools_are_started_for_all_vhosts(_C) -> 144: Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}], 145: StartRes = mongoose_wpool:start_configured_pools(Pools), 146: ?assertMatch([_, _, _], StartRes), 147: ?assertMatch([{generic, <<"a.com">>, default}, 148: {generic, <<"b.com">>, default}, 149: {generic, <<"c.eu">>, default}], 150: ordsets:from_list(mongoose_wpool:get_pools())). 151: 152: host_specific_pools_are_preserved(_C) -> 153: Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}, 154: #{type => generic, scope => <<"b.com">>, tag => default, 155: opts => #{workers => 12}, conn_opts => #{}}], 156: Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 157: ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default, 158: opts := [], conn_opts := #{}}, 159: #{type := generic, host_type := <<"c.eu">>, tag := default, 160: opts := [], conn_opts := #{}}, 161: #{type := generic, host_type := <<"b.com">>, tag := default, 162: opts := [{workers, 12}], conn_opts := #{}}], 163: Expanded). 164: 165: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) -> 166: Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}, 167: #{type => generic, scope => <<"b.com">>, tag => default, 168: opts => #{workers => 12}, conn_opts => #{}}, 169: #{type => generic, scope => host, tag => other_tag, opts => #{}, conn_opts => #{}}], 170: Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 171: ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default, 172: opts := [], conn_opts := #{}}, 173: #{type := generic, host_type := <<"c.eu">>, tag := default, 174: opts := [], conn_opts := #{}}, 175: #{type := generic, host_type := <<"b.com">>, tag := default, 176: opts := [{workers, 12}], conn_opts := #{}}, 177: #{type := generic, host_type := <<"a.com">>, tag := other_tag, 178: opts := [], conn_opts := #{}}, 179: #{type := generic, host_type := <<"b.com">>, tag := other_tag, 180: opts := [], conn_opts := #{}}, 181: #{type := generic, host_type := <<"c.eu">>, tag := other_tag, 182: opts := [], conn_opts := #{}}], 183: Expanded). 184: 185: global_pool_is_used_by_default(_C) -> 186: Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}}, 187: #{type => generic, scope => <<"a.com">>, tag => default, 188: opts => #{}, conn_opts => #{}}], 189: StartRes = mongoose_wpool:start_configured_pools(Pools), 190: ?assertMatch([{ok, _}, {ok, _}], StartRes), 191: meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end), 192: ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default), 193: mongoose_wpool:call(generic, <<"a.com">>, default, request)), 194: 195: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 196: mongoose_wpool:call(generic, <<"b.com">>, default, request)), 197: 198: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 199: mongoose_wpool:call(generic, global, default, request)). 200: 201: request_behaves_as_gen_server_send_request(_C) -> 202: Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ], 203: StartRes = mongoose_wpool:start_configured_pools(Pools), 204: ?assertMatch([{ok, _}], StartRes), 205: Req1 = mongoose_wpool:send_request(generic, {?MODULE, echo, [send_request]}), 206: ?assertEqual({reply, {ok, send_request}}, gen_server:wait_response(Req1, 5000)). 207: 208: dead_pool_is_restarted(_C) -> 209: Size = 3, 210: {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart), 211: %% set the switch to kill every new worker 212: set_killing_switch(KillingSwitch, true), 213: Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)], 214: %% kill existing workers so they will be restarted 215: [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined], 216: 217: wait_until_pool_is_dead(PoolName), 218: %% set the switch to stop killing workers 219: set_killing_switch(KillingSwitch, false), 220: 221: %% wait until the pool is restarted by the manager 222: Fun = fun() -> 223: case erlang:whereis(PoolName) of 224: undefined -> false; 225: _ -> true 226: end 227: end, 228: 229: async_helper:wait_until(Fun, true), 230: 231: meck:unload(killing_workers). 232: 233: dead_pool_is_stopped_before_restarted(_C) -> 234: Size = 3, 235: Tag = kill_stop_before_restart, 236: {PoolName, KillingSwitch} = start_killable_pool(Size, Tag), 237: %% set the switch to kill every new worker 238: set_killing_switch(KillingSwitch, true), 239: %% kill existing workers so they will be restarted 240: [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) || 241: I <- lists:seq(1, Size)], 242: 243: wait_until_pool_is_dead(PoolName), 244: %% stop the pool before it's restarted 245: mongoose_wpool:stop(generic, global, Tag), 246: %% set the switch to stop killing workers 247: set_killing_switch(KillingSwitch, false), 248: %% wait 4s (the restart attempt will happen after 2s) 249: %% and check if the pool is started, it should not be 250: timer:sleep(timer:seconds(4)), 251: ?assertEqual(undefined, erlang:whereis(PoolName)), 252: meck:unload(killing_workers). 253: 254: %% --- available_worker strategy is banned for some backends -- 255: 256: riak_pool_cant_be_started_with_available_worker_strategy(_Config) -> 257: pool_cant_be_started_with_available_worker_strategy(riak). 258: 259: redis_pool_cant_be_started_with_available_worker_strategy(_Config) -> 260: pool_cant_be_started_with_available_worker_strategy(redis). 261: 262: pool_cant_be_started_with_available_worker_strategy(Type) -> 263: Host = global, 264: Tag = default, 265: PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag), 266: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)), 267: PoolDef = [#{type => Type, scope => Host, tag => Tag, opts => #{strategy => available_worker}, 268: conn_opts => #{address => "localhost", port => 1805}}], 269: ?assertError({strategy_not_supported, Type, Host, Tag, available_worker}, 270: mongoose_wpool:start_configured_pools(PoolDef)). 271: 272: %%-------------------------------------------------------------------- 273: %% Helpers 274: %%-------------------------------------------------------------------- 275: 276: start_sup_pool_mock(PoolNames) when is_list(PoolNames) -> 277: fun(Type, PN, Opts) -> 278: case lists:member(PN, PoolNames) of 279: true -> 280: {ok, PN}; %% we don't really need a pid here for mocking 281: _ -> 282: meck:passthrough([Type, PN, Opts]) 283: end 284: end; 285: start_sup_pool_mock(PoolName) -> 286: start_sup_pool_mock([PoolName]). 287: 288: cleanup_pools() -> 289: lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end, 290: mongoose_wpool:get_pools()). 291: 292: start_killable_pool(Size, Tag) -> 293: KillingSwitch = ets:new(killing_switch, [public]), 294: ets:insert(KillingSwitch, {kill_worker, false}), 295: meck:new(killing_workers, [non_strict]), 296: meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)), 297: Pools = [#{type => generic, scope => global, tag => Tag, 298: opts => #{workers => Size, 299: enable_callbacks => true, 300: callbacks => [killing_workers]}, 301: conn_opts => #{}}], 302: [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools), 303: PoolName = mongoose_wpool:make_pool_name(generic, global, Tag), 304: {PoolName, KillingSwitch}. 305: 306: kill_worker_fun(KillingSwitch) -> 307: fun(AWorkerName) -> 308: case ets:lookup_element(KillingSwitch, kill_worker, 2) of 309: true -> 310: ct:pal("I'll be killing ~p", [AWorkerName]), 311: erlang:exit(whereis(AWorkerName), kill); 312: _ -> 313: ok 314: end 315: end. 316: 317: wait_until_pool_is_dead(PoolName) -> 318: %% Wait until the pool is killed due to too many restarts 319: Pid = whereis(PoolName), 320: MRef = erlang:monitor(process, Pid), 321: receive 322: {'DOWN', MRef, process, Pid, _} -> 323: ok 324: after 325: timer:minutes(3) -> 326: ct:fail("pool not stopped") 327: end. 328: 329: set_killing_switch(KillingSwitch, Value) -> 330: ets:update_element(KillingSwitch, kill_worker, {2, Value}). 331: 332: echo(Val) -> Val.