1: %%============================================================================== 2: %% Copyright 2018 Erlang Solutions Ltd. 3: %% 4: %% Licensed under the Apache License, Version 2.0 (the "License"); 5: %% you may not use this file except in compliance with the License. 6: %% You may obtain a copy of the License at 7: %% 8: %% http://www.apache.org/licenses/LICENSE-2.0 9: %% 10: %% Unless required by applicable law or agreed to in writing, software 11: %% distributed under the License is distributed on an "AS IS" BASIS, 12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13: %% See the License for the specific language governing permissions and 14: %% limitations under the License. 15: %%============================================================================== 16: 17: -module(mongoose_wpool_SUITE). 18: -compile([export_all, nowarn_export_all]). 19: -author('konrad.zemek@erlang-solutions.com'). 20: 21: -include_lib("common_test/include/ct.hrl"). 22: -include_lib("eunit/include/eunit.hrl"). 23: 24: %%-------------------------------------------------------------------- 25: %% Suite configuration 26: %%-------------------------------------------------------------------- 27: 28: all() -> 29: [ 30: get_pools_returns_pool_names, 31: stats_passes_through_to_wpool_stats, 32: two_distinct_redis_pools_are_started, 33: generic_pools_are_started_for_all_vhosts, 34: host_specific_pools_are_preserved, 35: pools_for_different_tag_are_expanded_with_host_specific_config_preserved, 36: global_pool_is_used_by_default, 37: request_behaves_as_gen_server_send_request, 38: dead_pool_is_restarted, 39: dead_pool_is_stopped_before_restarted, 40: redis_pool_cant_be_started_with_available_worker_strategy, 41: cassandra_prepare_opts 42: ]. 43: 44: %%-------------------------------------------------------------------- 45: %% Init & teardown 46: %%-------------------------------------------------------------------- 47: 48: init_per_suite(Config) -> 49: ok = meck:new(wpool, [no_link, passthrough]), 50: ok = meck:new(mongoose_wpool, [no_link, passthrough]), 51: mongoose_config:set_opts(opts()), 52: Self = self(), 53: spawn(fun() -> 54: register(test_helper, self()), 55: mongoose_wpool:ensure_started(), 56: Self ! ready, 57: receive stop -> ok end 58: end), 59: receive ready -> ok end, 60: Config. 61: 62: end_per_suite(Config) -> 63: meck:unload(wpool), 64: whereis(test_helper) ! stop, 65: mongoose_config:erase_opts(), 66: Config. 67: 68: opts() -> 69: #{hosts => [<<"a.com">>, <<"b.com">>, <<"c.eu">>], host_types => []}. 70: 71: init_per_testcase(_Case, Config) -> 72: cleanup_pools(), 73: Config. 74: 75: end_per_testcase(_Case, Config) -> 76: cleanup_pools(), 77: Config. 78: 79: %%-------------------------------------------------------------------- 80: %% Tests 81: %%-------------------------------------------------------------------- 82: 83: get_pools_returns_pool_names(_Config) -> 84: mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]), 85: mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]), 86: mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]), 87: 88: ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}], 89: ordsets:from_list(mongoose_wpool:get_pools())). 90: 91: stats_passes_through_to_wpool_stats(_Config) -> 92: mongoose_wpool:start(generic, global, z, [{workers, 1}]), 93: Ref = make_ref(), 94: meck:expect(wpool, stats, fun(_Name) -> Ref end), 95: ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)). 96: 97: filter_calls_to_start_sup_pool(Pid) -> 98: H = meck_history:get_history(Pid, mongoose_wpool), 99: F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}}; 100: (_) -> false 101: end, 102: lists:filtermap(F, H). 103: 104: two_distinct_redis_pools_are_started(_C) -> 105: PoolName1 = mongoose_wpool:make_pool_name(redis, global, default), 106: PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist), 107: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])), 108: Pools = [#{type => redis, scope => global, tag => default, opts => #{workers => 2}, 109: conn_opts => #{host => "localhost", 110: port => 1805, 111: database => 0, 112: password => ""}}, 113: #{type => redis, scope => global, tag => global_dist, opts => #{workers => 4}, 114: conn_opts => #{host => "localhost2", 115: port => 1806, 116: database => 0, 117: password => ""}}], 118: 119: [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools), 120: 121: MgrPid = whereis(mongoose_wpool_mgr:name(redis)), 122: [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid), 123: ?assertEqual(2, proplists:get_value(workers, CallArgs1)), 124: ?assertEqual(4, proplists:get_value(workers, CallArgs2)), 125: {eredis_client, Props1} = proplists:get_value(worker, CallArgs1), 126: {eredis_client, Props2} = proplists:get_value(worker, CallArgs2), 127: ?assertMatch(#{host := "localhost", port := 1805}, proplists:to_map(Props1)), 128: ?assertMatch(#{host := "localhost2", port := 1806}, proplists:to_map(Props2)). 129: 130: generic_pools_are_started_for_all_vhosts(_C) -> 131: Pools = [#{type => generic, scope => host_type, tag => default, opts => #{}, conn_opts => #{}}], 132: StartRes = mongoose_wpool:start_configured_pools(Pools), 133: ?assertMatch([_, _, _], StartRes), 134: ?assertMatch([{generic, <<"a.com">>, default}, 135: {generic, <<"b.com">>, default}, 136: {generic, <<"c.eu">>, default}], 137: ordsets:from_list(mongoose_wpool:get_pools())). 138: 139: host_specific_pools_are_preserved(_C) -> 140: Pools = [#{type => generic, scope => host_type, tag => default, opts => #{}, conn_opts => #{}}], 141: HostSpecific = [#{type => generic, scope => <<"b.com">>, tag => default, 142: opts => #{workers => 12}, conn_opts => #{}}], 143: Expanded = mongoose_wpool:expand_pools( 144: Pools, HostSpecific, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 145: Expected = lists:sort([#{type => generic, host_type => <<"a.com">>, tag => default, 146: opts => [], conn_opts => #{}}, 147: #{type => generic, host_type => <<"c.eu">>, tag => default, 148: opts => [], conn_opts => #{}}, 149: #{type => generic, host_type => <<"b.com">>, tag => default, 150: opts => [{workers, 12}], conn_opts => #{}}]), 151: ?assertMatch(Expected, lists:sort(Expanded)). 152: 153: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) -> 154: Pools = [#{type => generic, scope => host_type, tag => default, opts => #{}, conn_opts => #{}}, 155: #{type => generic, scope => host_type, tag => other_tag, 156: opts => #{}, conn_opts => #{}}], 157: HostSpecific = [#{type => generic, scope => <<"b.com">>, tag => default, 158: opts => #{workers => 12}, conn_opts => #{}}], 159: Expanded = mongoose_wpool:expand_pools( 160: Pools, HostSpecific, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]), 161: Expected = lists:sort([#{type => generic, host_type => <<"a.com">>, tag => default, 162: opts => [], conn_opts => #{}}, 163: #{type => generic, host_type => <<"c.eu">>, tag => default, 164: opts => [], conn_opts => #{}}, 165: #{type => generic, host_type => <<"b.com">>, tag => default, 166: opts => [{workers, 12}], conn_opts => #{}}, 167: #{type => generic, host_type => <<"a.com">>, tag => other_tag, 168: opts => [], conn_opts => #{}}, 169: #{type => generic, host_type => <<"b.com">>, tag => other_tag, 170: opts => [], conn_opts => #{}}, 171: #{type => generic, host_type => <<"c.eu">>, tag => other_tag, 172: opts => [], conn_opts => #{}}]), 173: ?assertMatch(Expected, lists:sort(Expanded)). 174: 175: global_pool_is_used_by_default(_C) -> 176: Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}}, 177: #{type => generic, scope => <<"a.com">>, tag => default, 178: opts => #{}, conn_opts => #{}}], 179: StartRes = mongoose_wpool:start_configured_pools(Pools), 180: ?assertMatch([{ok, _}, {ok, _}], StartRes), 181: meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end), 182: ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default), 183: mongoose_wpool:call(generic, <<"a.com">>, default, request)), 184: 185: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 186: mongoose_wpool:call(generic, <<"b.com">>, default, request)), 187: 188: ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default), 189: mongoose_wpool:call(generic, global, default, request)). 190: 191: request_behaves_as_gen_server_send_request(_C) -> 192: Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ], 193: StartRes = mongoose_wpool:start_configured_pools(Pools), 194: ?assertMatch([{ok, _}], StartRes), 195: Req1 = mongoose_wpool:send_request(generic, {?MODULE, echo, [send_request]}), 196: ?assertEqual({reply, {ok, send_request}}, gen_server:wait_response(Req1, 5000)). 197: 198: dead_pool_is_restarted(_C) -> 199: Size = 3, 200: {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart), 201: %% set the switch to kill every new worker 202: set_killing_switch(KillingSwitch, true), 203: Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)], 204: %% kill existing workers so they will be restarted 205: [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined], 206: 207: wait_until_pool_is_dead(PoolName), 208: %% set the switch to stop killing workers 209: set_killing_switch(KillingSwitch, false), 210: 211: %% wait until the pool is restarted by the manager 212: Fun = fun() -> 213: case erlang:whereis(PoolName) of 214: undefined -> false; 215: _ -> true 216: end 217: end, 218: 219: async_helper:wait_until(Fun, true), 220: 221: meck:unload(killing_workers). 222: 223: dead_pool_is_stopped_before_restarted(_C) -> 224: Size = 3, 225: Tag = kill_stop_before_restart, 226: {PoolName, KillingSwitch} = start_killable_pool(Size, Tag), 227: %% set the switch to kill every new worker 228: set_killing_switch(KillingSwitch, true), 229: %% kill existing workers so they will be restarted 230: [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) || 231: I <- lists:seq(1, Size)], 232: 233: wait_until_pool_is_dead(PoolName), 234: %% stop the pool before it's restarted 235: mongoose_wpool:stop(generic, global, Tag), 236: %% set the switch to stop killing workers 237: set_killing_switch(KillingSwitch, false), 238: %% wait 4s (the restart attempt will happen after 2s) 239: %% and check if the pool is started, it should not be 240: timer:sleep(timer:seconds(4)), 241: ?assertEqual(undefined, erlang:whereis(PoolName)), 242: meck:unload(killing_workers). 243: 244: %% --- available_worker strategy is banned for some backends -- 245: 246: redis_pool_cant_be_started_with_available_worker_strategy(_Config) -> 247: Type = redis, 248: Host = global, 249: Tag = default, 250: PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag), 251: meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)), 252: PoolDef = [#{type => Type, scope => Host, tag => Tag, opts => #{strategy => available_worker}, 253: conn_opts => #{address => "localhost", port => 1805}}], 254: ?assertError({strategy_not_supported, Type, Host, Tag, available_worker}, 255: mongoose_wpool:start_configured_pools(PoolDef)). 256: 257: cassandra_prepare_opts(_Config) -> 258: %% Check that we pass auth options in the correct format to the Cassandra driver 259: AuthCfg = #{auth => #{plain => #{username => <<"user">>, password => <<"password">>}}}, 260: ?assertEqual([{auth, {cqerl_auth_plain_handler, [{<<"user">>, <<"password">>}]}}, 261: {tcp_opts, [{keepalive, true}]}], 262: mongoose_wpool_cassandra:prepare_cqerl_opts(AuthCfg)). 263: 264: %%-------------------------------------------------------------------- 265: %% Helpers 266: %%-------------------------------------------------------------------- 267: 268: start_sup_pool_mock(PoolNames) when is_list(PoolNames) -> 269: fun(Type, PN, Opts) -> 270: case lists:member(PN, PoolNames) of 271: true -> 272: {ok, PN}; %% we don't really need a pid here for mocking 273: _ -> 274: meck:passthrough([Type, PN, Opts]) 275: end 276: end; 277: start_sup_pool_mock(PoolName) -> 278: start_sup_pool_mock([PoolName]). 279: 280: cleanup_pools() -> 281: lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end, 282: mongoose_wpool:get_pools()). 283: 284: start_killable_pool(Size, Tag) -> 285: KillingSwitch = ets:new(killing_switch, [public]), 286: ets:insert(KillingSwitch, {kill_worker, false}), 287: meck:new(killing_workers, [non_strict]), 288: meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)), 289: Pools = [#{type => generic, scope => global, tag => Tag, 290: opts => #{workers => Size, 291: enable_callbacks => true, 292: callbacks => [killing_workers]}, 293: conn_opts => #{}}], 294: [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools), 295: PoolName = mongoose_wpool:make_pool_name(generic, global, Tag), 296: {PoolName, KillingSwitch}. 297: 298: kill_worker_fun(KillingSwitch) -> 299: fun(AWorkerName) -> 300: case ets:lookup_element(KillingSwitch, kill_worker, 2) of 301: true -> 302: ct:pal("I'll be killing ~p", [AWorkerName]), 303: erlang:exit(whereis(AWorkerName), kill); 304: _ -> 305: ok 306: end 307: end. 308: 309: wait_until_pool_is_dead(PoolName) -> 310: %% Wait until the pool is killed due to too many restarts 311: Pid = whereis(PoolName), 312: MRef = erlang:monitor(process, Pid), 313: receive 314: {'DOWN', MRef, process, Pid, _} -> 315: ok 316: after 317: timer:minutes(3) -> 318: ct:fail("pool not stopped") 319: end. 320: 321: set_killing_switch(KillingSwitch, Value) -> 322: ets:update_element(KillingSwitch, kill_worker, {2, Value}). 323: 324: echo(Val) -> Val.