1: %%============================================================================== 2: %% Copyright 2018 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mongoose_wpool_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: -author('konrad.zemek@erlang-solutions.com'). 20: 21: -include_lib("common_test/include/ct.hrl"). 22: -include_lib("eunit/include/eunit.hrl"). 23: 24: %%-------------------------------------------------------------------- 25: %% Suite configuration 26: %%-------------------------------------------------------------------- 27: 28: all() -> 29: [ 30: get_pools_returns_pool_names, 31: stats_passes_through_to_wpool_stats, 32: two_distinct_redis_pools_are_started, 33: generic_pools_are_started_for_all_vhosts, 34: host_specific_pools_are_preserved, 35: pools_for_different_tag_are_expanded_with_host_specific_config_preserved, 36: global_pool_is_used_by_default, 37: request_behaves_as_gen_server_send_request, 38: dead_pool_is_restarted, 39: dead_pool_is_stopped_before_restarted, 40: redis_pool_cant_be_started_with_available_worker_strategy, 41: cassandra_prepare_opts 42: ]. 43: 44: %%-------------------------------------------------------------------- 45: %% Init & teardown 46: %%-------------------------------------------------------------------- 47: 48: init_per_suite(Config) -> 49: ok = meck:new(wpool, [no_link, passthrough]), 50: ok = meck:new(mongoose_wpool, [no_link, passthrough]), 51: mongoose_config:set_opts(opts()), 52: Self = self(), 53: spawn(fun() -> 54: register(test_helper, self()), 55: mongoose_wpool:ensure_started(), 56: Self ! ready, 57: receive stop -> ok end 58: end), 59: receive ready -> ok end, 60: Config. 61: 62: end_per_suite(Config) -> 63: meck:unload(wpool), 64: whereis(test_helper) ! stop, 65: mongoose_config:erase_opts(), 66: Config. 67: 68: opts() -> 69: #{hosts => [<<"a.com">>, <<"b.com">>, <<"c.eu">>], host_types => []}. 70: 71: init_per_testcase(_Case, Config) -> 72: cleanup_pools(), 73: Config. 74: 75: end_per_testcase(_Case, Config) -> 76: cleanup_pools(), 77: Config. 78: 79: %%-------------------------------------------------------------------- 80: %% Tests 81: %%-------------------------------------------------------------------- 82: 83: get_pools_returns_pool_names(_Config) -> 84: mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]), 85: mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]), 86: mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]), 87: 88: ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}], 89: ordsets:from_list(mongoose_wpool:get_pools())). 90: 91: stats_passes_through_to_wpool_stats(_Config) -> 92: mongoose_wpool:start(generic, global, z, [{workers, 1}]), 93: Ref = make_ref(), 94: meck:expect(wpool, stats, fun(_Name) -> Ref end), 95: ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)). 96: 97: filter_calls_to_start_sup_pool(Pid) -> 98: H = meck_history:get_history(Pid, mongoose_wpool), 99: F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}}; 100: (_) -> false 101: end, 102: lists:filtermap(F, H). 103: 104: two_distinct_redis_pools_are_started(_C) -> 105: PoolName1 = mongoose_wpool:make_pool_name(redis, global, default), 106: PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist), 107: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])), 108: Pools = [#{type => redis, scope => global, tag => default, opts => #{workers => 2}, 109: conn_opts => #{host => "localhost", 110: port => 1805, 111: database => 0, 112: password => ""}}, 113: #{type => redis, scope => global, tag => global_dist, opts => #{workers => 4}, 114: conn_opts => #{host => "localhost2", 115: port => 1806, 116: database => 0, 117: password => ""}}], 118: 119: [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools), 120: 121: MgrPid = whereis(mongoose_wpool_mgr:name(redis)), 122: [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid), 123: ?assertEqual(2, proplists:get_value(workers, CallArgs1)), 124: ?assertEqual(4, proplists:get_value(workers, CallArgs2)), 125: {eredis_client, Props1} = proplists:get_value(worker, CallArgs1), 126: {eredis_client, Props2} = proplists:get_value(worker, CallArgs2), 127: ?assertMatch(#{host := "localhost", port := 1805}, proplists:to_map(Props1)), 128: ?assertMatch(#{host := "localhost2", port := 1806}, proplists:to_map(Props2)). 129: 130: generic_pools_are_started_for_all_vhosts(_C) -> 131: Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}], 132: StartRes = mongoose_wpool:start_configured_pools(Pools), 133: ?assertMatch([_, _, _], StartRes), 134: ?assertMatch([{generic, <<"a.com">>, default}, 135: {generic, <<"b.com">>, default}, 136: {generic, <<"c.eu">>, default}], 137: ordsets:from_list(mongoose_wpool:get_pools())). 138: 139: host_specific_pools_are_preserved(_C) -> 140: Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}, 141: #{type => generic, scope => <<"b.com">>, tag => default, 142: opts => #{workers => 12}, conn_opts => #{}}], 143: Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 144: ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default, 145: opts := [], conn_opts := #{}}, 146: #{type := generic, host_type := <<"c.eu">>, tag := default, 147: opts := [], conn_opts := #{}}, 148: #{type := generic, host_type := <<"b.com">>, tag := default, 149: opts := [{workers, 12}], conn_opts := #{}}], 150: Expanded). 151: 152: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) -> 153: Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}, 154: #{type => generic, scope => <<"b.com">>, tag => default, 155: opts => #{workers => 12}, conn_opts => #{}}, 156: #{type => generic, scope => host, tag => other_tag, opts => #{}, conn_opts => #{}}], 157: Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 158: ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default, 159: opts := [], conn_opts := #{}}, 160: #{type := generic, host_type := <<"c.eu">>, tag := default, 161: opts := [], conn_opts := #{}}, 162: #{type := generic, host_type := <<"b.com">>, tag := default, 163: opts := [{workers, 12}], conn_opts := #{}}, 164: #{type := generic, host_type := <<"a.com">>, tag := other_tag, 165: opts := [], conn_opts := #{}}, 166: #{type := generic, host_type := <<"b.com">>, tag := other_tag, 167: opts := [], conn_opts := #{}}, 168: #{type := generic, host_type := <<"c.eu">>, tag := other_tag, 169: opts := [], conn_opts := #{}}], 170: Expanded). 171: 172: global_pool_is_used_by_default(_C) -> 173: Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}}, 174: #{type => generic, scope => <<"a.com">>, tag => default, 175: opts => #{}, conn_opts => #{}}], 176: StartRes = mongoose_wpool:start_configured_pools(Pools), 177: ?assertMatch([{ok, _}, {ok, _}], StartRes), 178: meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end), 179: ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default), 180: mongoose_wpool:call(generic, <<"a.com">>, default, request)), 181: 182: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 183: mongoose_wpool:call(generic, <<"b.com">>, default, request)), 184: 185: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 186: mongoose_wpool:call(generic, global, default, request)). 187: 188: request_behaves_as_gen_server_send_request(_C) -> 189: Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ], 190: StartRes = mongoose_wpool:start_configured_pools(Pools), 191: ?assertMatch([{ok, _}], StartRes), 192: Req1 = mongoose_wpool:send_request(generic, {?MODULE, echo, [send_request]}), 193: ?assertEqual({reply, {ok, send_request}}, gen_server:wait_response(Req1, 5000)). 194: 195: dead_pool_is_restarted(_C) -> 196: Size = 3, 197: {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart), 198: %% set the switch to kill every new worker 199: set_killing_switch(KillingSwitch, true), 200: Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)], 201: %% kill existing workers so they will be restarted 202: [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined], 203: 204: wait_until_pool_is_dead(PoolName), 205: %% set the switch to stop killing workers 206: set_killing_switch(KillingSwitch, false), 207: 208: %% wait until the pool is restarted by the manager 209: Fun = fun() -> 210: case erlang:whereis(PoolName) of 211: undefined -> false; 212: _ -> true 213: end 214: end, 215: 216: async_helper:wait_until(Fun, true), 217: 218: meck:unload(killing_workers). 219: 220: dead_pool_is_stopped_before_restarted(_C) -> 221: Size = 3, 222: Tag = kill_stop_before_restart, 223: {PoolName, KillingSwitch} = start_killable_pool(Size, Tag), 224: %% set the switch to kill every new worker 225: set_killing_switch(KillingSwitch, true), 226: %% kill existing workers so they will be restarted 227: [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) || 228: I <- lists:seq(1, Size)], 229: 230: wait_until_pool_is_dead(PoolName), 231: %% stop the pool before it's restarted 232: mongoose_wpool:stop(generic, global, Tag), 233: %% set the switch to stop killing workers 234: set_killing_switch(KillingSwitch, false), 235: %% wait 4s (the restart attempt will happen after 2s) 236: %% and check if the pool is started, it should not be 237: timer:sleep(timer:seconds(4)), 238: ?assertEqual(undefined, erlang:whereis(PoolName)), 239: meck:unload(killing_workers). 240: 241: %% --- available_worker strategy is banned for some backends -- 242: 243: redis_pool_cant_be_started_with_available_worker_strategy(_Config) -> 244: Type = redis, 245: Host = global, 246: Tag = default, 247: PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag), 248: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)), 249: PoolDef = [#{type => Type, scope => Host, tag => Tag, opts => #{strategy => available_worker}, 250: conn_opts => #{address => "localhost", port => 1805}}], 251: ?assertError({strategy_not_supported, Type, Host, Tag, available_worker}, 252: mongoose_wpool:start_configured_pools(PoolDef)). 253: 254: cassandra_prepare_opts(_Config) -> 255: %% Check that we pass auth options in the correct format to the Cassandra driver 256: AuthCfg = #{auth => #{plain => #{username => <<"user">>, password => <<"password">>}}}, 257: ?assertEqual([{auth, {cqerl_auth_plain_handler, [{<<"user">>, <<"password">>}]}}, 258: {tcp_opts, [{keepalive, true}]}], 259: mongoose_wpool_cassandra:prepare_cqerl_opts(AuthCfg)). 260: 261: %%-------------------------------------------------------------------- 262: %% Helpers 263: %%-------------------------------------------------------------------- 264: 265: start_sup_pool_mock(PoolNames) when is_list(PoolNames) -> 266: fun(Type, PN, Opts) -> 267: case lists:member(PN, PoolNames) of 268: true -> 269: {ok, PN}; %% we don't really need a pid here for mocking 270: _ -> 271: meck:passthrough([Type, PN, Opts]) 272: end 273: end; 274: start_sup_pool_mock(PoolName) -> 275: start_sup_pool_mock([PoolName]). 276: 277: cleanup_pools() -> 278: lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end, 279: mongoose_wpool:get_pools()). 280: 281: start_killable_pool(Size, Tag) -> 282: KillingSwitch = ets:new(killing_switch, [public]), 283: ets:insert(KillingSwitch, {kill_worker, false}), 284: meck:new(killing_workers, [non_strict]), 285: meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)), 286: Pools = [#{type => generic, scope => global, tag => Tag, 287: opts => #{workers => Size, 288: enable_callbacks => true, 289: callbacks => [killing_workers]}, 290: conn_opts => #{}}], 291: [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools), 292: PoolName = mongoose_wpool:make_pool_name(generic, global, Tag), 293: {PoolName, KillingSwitch}. 294: 295: kill_worker_fun(KillingSwitch) -> 296: fun(AWorkerName) -> 297: case ets:lookup_element(KillingSwitch, kill_worker, 2) of 298: true -> 299: ct:pal("I'll be killing ~p", [AWorkerName]), 300: erlang:exit(whereis(AWorkerName), kill); 301: _ -> 302: ok 303: end 304: end. 305: 306: wait_until_pool_is_dead(PoolName) -> 307: %% Wait until the pool is killed due to too many restarts 308: Pid = whereis(PoolName), 309: MRef = erlang:monitor(process, Pid), 310: receive 311: {'DOWN', MRef, process, Pid, _} -> 312: ok 313: after 314: timer:minutes(3) -> 315: ct:fail("pool not stopped") 316: end. 317: 318: set_killing_switch(KillingSwitch, Value) -> 319: ets:update_element(KillingSwitch, kill_worker, {2, Value}). 320: 321: echo(Val) -> Val.