1: %%============================================================================== 2: %% Copyright 2018 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mongoose_wpool_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: -author('konrad.zemek@erlang-solutions.com'). 20: 21: -include_lib("common_test/include/ct.hrl"). 22: -include_lib("eunit/include/eunit.hrl"). 23: 24: %%-------------------------------------------------------------------- 25: %% Suite configuration 26: %%-------------------------------------------------------------------- 27: 28: all() -> 29: [ 30: get_pools_returns_pool_names, 31: stats_passes_through_to_wpool_stats, 32: a_global_riak_pool_is_started, 33: two_distinct_redis_pools_are_started, 34: generic_pools_are_started_for_all_vhosts, 35: host_specific_pools_are_preseved, 36: pools_for_different_tag_are_expanded_with_host_specific_config_preserved, 37: global_pool_is_used_by_default, 38: dead_pool_is_restarted, 39: dead_pool_is_stopped_before_restarted, 40: riak_pool_cant_be_started_with_available_worker_strategy, 41: redis_pool_cant_be_started_with_available_worker_strategy 42: ]. 43: 44: %%-------------------------------------------------------------------- 45: %% Init & teardown 46: %%-------------------------------------------------------------------- 47: 48: init_per_suite(Config) -> 49: ok = meck:new(wpool, [no_link, passthrough]), 50: ok = meck:new(mongoose_wpool, [no_link, passthrough]), 51: [mongoose_config:set_opt(Key, Value) || {Key, Value} <- opts()], 52: Self = self(), 53: spawn(fun() -> 54: register(test_helper, self()), 55: mongoose_wpool:ensure_started(), 56: Self ! ready, 57: receive stop -> ok end 58: end), 59: receive ready -> ok end, 60: Config. 61: 62: end_per_suite(Config) -> 63: meck:unload(wpool), 64: whereis(test_helper) ! stop, 65: [mongoose_config:unset_opt(Key) || {Key, _Value} <- opts()], 66: Config. 67: 68: opts() -> 69: [{hosts, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]}, 70: {host_types, []}]. 71: 72: init_per_testcase(_Case, Config) -> 73: cleanup_pools(), 74: Config. 75: 76: end_per_testcase(_Case, Config) -> 77: cleanup_pools(), 78: Config. 79: 80: %%-------------------------------------------------------------------- 81: %% Tests 82: %%-------------------------------------------------------------------- 83: 84: get_pools_returns_pool_names(_Config) -> 85: mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]), 86: mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]), 87: mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]), 88: 89: ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}], 90: ordsets:from_list(mongoose_wpool:get_pools())). 91: 92: 93: stats_passes_through_to_wpool_stats(_Config) -> 94: mongoose_wpool:start(generic, global, z, [{workers, 1}]), 95: 96: Ref = make_ref(), 97: 98: meck:expect(wpool, stats, fun(_Name) -> Ref end), 99: ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)). 100: 101: a_global_riak_pool_is_started(_Config) -> 102: PoolName = mongoose_wpool:make_pool_name(riak, global, default), 103: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)), 104: [{ok, PoolName}] = mongoose_wpool:start_configured_pools([{riak, global, default, 105: [{workers, 12}], 106: [{address, "localhost"}, 107: {port, 1805}]}]), 108: 109: MgrPid = whereis(mongoose_wpool_mgr:name(riak)), 110: [{PoolName, CallArgs}] = filter_calls_to_start_sup_pool(MgrPid), 111: ?assertEqual(12, proplists:get_value(workers, CallArgs)), 112: ?assertMatch({riakc_pb_socket, _}, proplists:get_value(worker, CallArgs)), 113: 114: ok. 115: 116: filter_calls_to_start_sup_pool(Pid) -> 117: H = meck_history:get_history(Pid, mongoose_wpool), 118: F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}}; 119: (_) -> false 120: end, 121: lists:filtermap(F, H). 122: 123: two_distinct_redis_pools_are_started(_C) -> 124: PoolName1 = mongoose_wpool:make_pool_name(redis, global, default), 125: PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist), 126: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])), 127: Pools = [{redis, global, default, [{workers, 2}], 128: [{host, "localhost"}, 129: {port, 1805}]}, 130: {redis, global, global_dist, [{workers, 4}], 131: [{host, "localhost2"}, 132: {port, 1806}]}], 133: 134: [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools), 135: 136: MgrPid = whereis(mongoose_wpool_mgr:name(redis)), 137: [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid), 138: ?assertEqual(2, proplists:get_value(workers, CallArgs1)), 139: ?assertEqual(4, proplists:get_value(workers, CallArgs2)), 140: ?assertMatch({eredis_client, ["localhost", 1805 | _]}, proplists:get_value(worker, CallArgs1)), 141: ?assertMatch({eredis_client, ["localhost2", 1806 | _]}, proplists:get_value(worker, CallArgs2)), 142: 143: ok. 144: 145: generic_pools_are_started_for_all_vhosts(_C) -> 146: Pools = [{generic, host, default, [], []}], 147: StartRes = mongoose_wpool:start_configured_pools(Pools), 148: ?assertMatch([_, _, _], StartRes), 149: ?assertMatch([{generic, <<"a.com">>, default}, 150: {generic, <<"b.com">>, default}, 151: {generic, <<"c.eu">>, default}], 152: ordsets:from_list(mongoose_wpool:get_pools())). 153: 154: host_specific_pools_are_preseved(_C) -> 155: Pools = [{generic, host, default, [], []}, 156: {generic, <<"b.com">>, default, [{workers, 12}], []}], 157: Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 158: ?assertMatch([{generic,<<"a.com">>,default,[],[]}, 159: {generic,<<"c.eu">>,default,[],[]}, 160: {generic,<<"b.com">>,default,[{workers,12}],[]}], Expanded). 161: 162: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) -> 163: Pools = [{generic, host, default, [], []}, 164: {generic, <<"b.com">>, default, [{workers, 12}], []}, 165: {generic, host, other_tag, [], []}], 166: Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 167: ?assertMatch([{generic,<<"a.com">>,default,[],[]}, 168: {generic,<<"c.eu">>,default,[],[]}, 169: {generic,<<"b.com">>,default,[{workers,12}],[]}, 170: {generic,<<"a.com">>,other_tag,[],[]}, 171: {generic,<<"b.com">>,other_tag,[],[]}, 172: {generic,<<"c.eu">>,other_tag,[],[]}], Expanded). 173: 174: global_pool_is_used_by_default(_C) -> 175: Pools = [{generic, global, default, [], []}, 176: {generic, <<"a.com">>, default, [], []}], 177: StartRes = mongoose_wpool:start_configured_pools(Pools), 178: ?assertMatch([_, _], StartRes), 179: meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end), 180: ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default), 181: mongoose_wpool:call(generic, <<"a.com">>, default, request)), 182: 183: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 184: mongoose_wpool:call(generic, <<"b.com">>, default, request)), 185: 186: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 187: mongoose_wpool:call(generic, global, default, request)). 188: 189: dead_pool_is_restarted(_C) -> 190: Size = 3, 191: {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart), 192: %% set the switch to kill every new worker 193: set_killing_switch(KillingSwitch, true), 194: Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)], 195: %% kill existing workers so they will be restarted 196: [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined], 197: 198: wait_until_pool_is_dead(PoolName), 199: %% set the switch to stop killing workers 200: set_killing_switch(KillingSwitch, false), 201: 202: %% wait until the pool is restarted by the manager 203: Fun = fun() -> 204: case erlang:whereis(PoolName) of 205: undefined -> false; 206: _ -> true 207: end 208: end, 209: 210: async_helper:wait_until(Fun, true), 211: 212: meck:unload(killing_workers). 213: 214: dead_pool_is_stopped_before_restarted(_C) -> 215: Size = 3, 216: Tag = kill_stop_before_restart, 217: {PoolName, KillingSwitch} = start_killable_pool(Size, Tag), 218: %% set the switch to kill every new worker 219: set_killing_switch(KillingSwitch, true), 220: %% kill existing workers so they will be restarted 221: [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) || 222: I <- lists:seq(1, Size)], 223: 224: wait_until_pool_is_dead(PoolName), 225: %% stop the pool before it's restarted 226: mongoose_wpool:stop(generic, global, Tag), 227: %% set the switch to stop killing workers 228: set_killing_switch(KillingSwitch, false), 229: %% wait 4s (the restart attempt will happen after 2s) 230: %% and check if the pool is started, it should not be 231: timer:sleep(timer:seconds(4)), 232: ?assertEqual(undefined, erlang:whereis(PoolName)), 233: meck:unload(killing_workers). 234: 235: %% --- available_worker strategy is banned for some backends -- 236: 237: riak_pool_cant_be_started_with_available_worker_strategy(_Config) -> 238: pool_cant_be_started_with_available_worker_strategy(riak). 239: 240: redis_pool_cant_be_started_with_available_worker_strategy(_Config) -> 241: pool_cant_be_started_with_available_worker_strategy(redis). 242: 243: pool_cant_be_started_with_available_worker_strategy(Type) -> 244: Host = global, 245: Tag = default, 246: PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag), 247: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)), 248: PoolDef = [{Type, Host, Tag, [{strategy, available_worker}], 249: [{address, "localhost"}, {port, 1805}]}], 250: ?assertError({strategy_not_supported, Type, Host, Tag, available_worker}, 251: mongoose_wpool:start_configured_pools(PoolDef)). 252: 253: %%-------------------------------------------------------------------- 254: %% Helpers 255: %%-------------------------------------------------------------------- 256: 257: start_sup_pool_mock(PoolNames) when is_list(PoolNames) -> 258: fun(Type, PN, Opts) -> 259: case lists:member(PN, PoolNames) of 260: true -> 261: {ok, PN}; %% we don't realy need a pid here for mocking 262: _ -> 263: meck:passthrough([Type, PN, Opts]) 264: end 265: end; 266: start_sup_pool_mock(PoolName) -> 267: start_sup_pool_mock([PoolName]). 268: 269: cleanup_pools() -> 270: lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end, 271: mongoose_wpool:get_pools()). 272: 273: start_killable_pool(Size, Tag) -> 274: KillingSwitch = ets:new(killing_switch, [public]), 275: ets:insert(KillingSwitch, {kill_worker, false}), 276: meck:new(killing_workers, [non_strict]), 277: meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)), 278: Pools = [{generic, global, Tag, 279: [{workers, Size}, 280: {enable_callbacks, true}, 281: {callbacks, [killing_workers]}], 282: []}], 283: [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools), 284: PoolName = mongoose_wpool:make_pool_name(generic, global, Tag), 285: {PoolName, KillingSwitch}. 286: 287: kill_worker_fun(KillingSwitch) -> 288: fun(AWorkerName) -> 289: case ets:lookup_element(KillingSwitch, kill_worker, 2) of 290: true -> 291: ct:pal("I'll be killing ~p", [AWorkerName]), 292: erlang:exit(whereis(AWorkerName), kill); 293: _ -> 294: ok 295: end 296: end. 297: 298: wait_until_pool_is_dead(PoolName) -> 299: %% Wait until the pool is killed due to too many restarts 300: Pid = whereis(PoolName), 301: MRef = erlang:monitor(process, Pid), 302: receive 303: {'DOWN', MRef, process, Pid, _} -> 304: ok 305: after 306: timer:minutes(3) -> 307: ct:fail("pool not stopped") 308: end. 309: 310: set_killing_switch(KillingSwitch, Value) -> 311: ets:update_element(KillingSwitch, kill_worker, {2, Value}).