1: %%==============================================================================
    2: %% Copyright 2018 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mongoose_wpool_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: -author('konrad.zemek@erlang-solutions.com').
   20: 
   21: -include_lib("common_test/include/ct.hrl").
   22: -include_lib("eunit/include/eunit.hrl").
   23: 
   24: %%--------------------------------------------------------------------
   25: %% Suite configuration
   26: %%--------------------------------------------------------------------
   27: 
   28: all() ->
   29:     [
   30:      get_pools_returns_pool_names,
   31:      stats_passes_through_to_wpool_stats,
   32:      a_global_riak_pool_is_started,
   33:      two_distinct_redis_pools_are_started,
   34:      generic_pools_are_started_for_all_vhosts,
   35:      host_specific_pools_are_preseved,
   36:      pools_for_different_tag_are_expanded_with_host_specific_config_preserved,
   37:      global_pool_is_used_by_default,
   38:      dead_pool_is_restarted,
   39:      dead_pool_is_stopped_before_restarted,
   40:      riak_pool_cant_be_started_with_available_worker_strategy,
   41:      redis_pool_cant_be_started_with_available_worker_strategy
   42:     ].
   43: 
   44: %%--------------------------------------------------------------------
   45: %% Init & teardown
   46: %%--------------------------------------------------------------------
   47: 
   48: init_per_suite(Config) ->
   49:     ok = meck:new(wpool, [no_link, passthrough]),
   50:     ok = meck:new(mongoose_wpool, [no_link, passthrough]),
   51:     [mongoose_config:set_opt(Key, Value) || {Key, Value} <- opts()],
   52:     Self = self(),
   53:     spawn(fun() ->
   54:                   register(test_helper, self()),
   55:                   mongoose_wpool:ensure_started(),
   56:                   Self ! ready,
   57:                   receive stop -> ok end
   58:           end),
   59:     receive ready -> ok end,
   60:     Config.
   61: 
   62: end_per_suite(Config) ->
   63:     meck:unload(wpool),
   64:     whereis(test_helper) ! stop,
   65:     [mongoose_config:unset_opt(Key) || {Key, _Value} <- opts()],
   66:     Config.
   67: 
   68: opts() ->
   69:     [{hosts, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]},
   70:      {host_types, []}].
   71: 
   72: init_per_testcase(_Case, Config) ->
   73:     cleanup_pools(),
   74:     Config.
   75: 
   76: end_per_testcase(_Case, Config) ->
   77:     cleanup_pools(),
   78:     Config.
   79: 
   80: %%--------------------------------------------------------------------
   81: %% Tests
   82: %%--------------------------------------------------------------------
   83: 
   84: get_pools_returns_pool_names(_Config) ->
   85:     mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]),
   86:     mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]),
   87:     mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]),
   88: 
   89:     ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}],
   90:                  ordsets:from_list(mongoose_wpool:get_pools())).
   91: 
   92: 
   93: stats_passes_through_to_wpool_stats(_Config) ->
   94:     mongoose_wpool:start(generic, global, z, [{workers, 1}]),
   95: 
   96:     Ref = make_ref(),
   97: 
   98:         meck:expect(wpool, stats, fun(_Name) -> Ref end),
   99:         ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)).
  100: 
  101: a_global_riak_pool_is_started(_Config) ->
  102:     PoolName = mongoose_wpool:make_pool_name(riak, global, default),
  103:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)),
  104:     [{ok, PoolName}] = mongoose_wpool:start_configured_pools([{riak, global, default,
  105:                                                                [{workers, 12}],
  106:                                                                [{address, "localhost"},
  107:                                                                 {port, 1805}]}]),
  108: 
  109:     MgrPid = whereis(mongoose_wpool_mgr:name(riak)),
  110:     [{PoolName, CallArgs}] = filter_calls_to_start_sup_pool(MgrPid),
  111:     ?assertEqual(12, proplists:get_value(workers, CallArgs)),
  112:     ?assertMatch({riakc_pb_socket, _}, proplists:get_value(worker, CallArgs)),
  113: 
  114:     ok.
  115: 
  116: filter_calls_to_start_sup_pool(Pid) ->
  117:     H = meck_history:get_history(Pid, mongoose_wpool),
  118:     F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}};
  119:            (_) -> false
  120:         end,
  121:     lists:filtermap(F, H).
  122: 
  123: two_distinct_redis_pools_are_started(_C) ->
  124:     PoolName1 = mongoose_wpool:make_pool_name(redis, global, default),
  125:     PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist),
  126:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])),
  127:     Pools = [{redis, global, default, [{workers, 2}],
  128:               [{host, "localhost"},
  129:                {port, 1805}]},
  130:              {redis, global, global_dist, [{workers, 4}],
  131:               [{host, "localhost2"},
  132:                {port, 1806}]}],
  133: 
  134:     [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools),
  135: 
  136:     MgrPid = whereis(mongoose_wpool_mgr:name(redis)),
  137:     [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid),
  138:     ?assertEqual(2, proplists:get_value(workers, CallArgs1)),
  139:     ?assertEqual(4, proplists:get_value(workers, CallArgs2)),
  140:     ?assertMatch({eredis_client, ["localhost", 1805 | _]}, proplists:get_value(worker, CallArgs1)),
  141:     ?assertMatch({eredis_client, ["localhost2", 1806 | _]}, proplists:get_value(worker, CallArgs2)),
  142: 
  143:     ok.
  144: 
  145: generic_pools_are_started_for_all_vhosts(_C) ->
  146:     Pools = [{generic, host, default, [], []}],
  147:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  148:     ?assertMatch([_, _, _], StartRes),
  149:     ?assertMatch([{generic, <<"a.com">>, default},
  150:                   {generic, <<"b.com">>, default},
  151:                   {generic, <<"c.eu">>, default}],
  152:                  ordsets:from_list(mongoose_wpool:get_pools())).
  153: 
  154: host_specific_pools_are_preseved(_C) ->
  155:     Pools = [{generic, host, default, [], []},
  156:              {generic, <<"b.com">>, default, [{workers, 12}], []}],
  157:     Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  158:     ?assertMatch([{generic,<<"a.com">>,default,[],[]},
  159:                   {generic,<<"c.eu">>,default,[],[]},
  160:                   {generic,<<"b.com">>,default,[{workers,12}],[]}], Expanded).
  161: 
  162: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) ->
  163:     Pools = [{generic, host, default, [], []},
  164:              {generic, <<"b.com">>, default, [{workers, 12}], []},
  165:              {generic, host, other_tag, [], []}],
  166:     Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  167:     ?assertMatch([{generic,<<"a.com">>,default,[],[]},
  168:                   {generic,<<"c.eu">>,default,[],[]},
  169:                   {generic,<<"b.com">>,default,[{workers,12}],[]},
  170:                   {generic,<<"a.com">>,other_tag,[],[]},
  171:                   {generic,<<"b.com">>,other_tag,[],[]},
  172:                   {generic,<<"c.eu">>,other_tag,[],[]}], Expanded).
  173: 
  174: global_pool_is_used_by_default(_C) ->
  175:     Pools = [{generic, global, default, [], []},
  176:              {generic, <<"a.com">>, default, [], []}],
  177:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  178:     ?assertMatch([_, _], StartRes),
  179:     meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end),
  180:     ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default),
  181:                  mongoose_wpool:call(generic, <<"a.com">>, default, request)),
  182: 
  183:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  184:                  mongoose_wpool:call(generic, <<"b.com">>, default, request)),
  185: 
  186:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  187:                  mongoose_wpool:call(generic, global, default, request)).
  188: 
  189: dead_pool_is_restarted(_C) ->
  190:     Size = 3,
  191:     {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart),
  192:     %% set the switch to kill every new worker
  193:     set_killing_switch(KillingSwitch, true),
  194:     Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)],
  195:     %% kill existing workers so they will be restarted
  196:     [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined],
  197: 
  198:     wait_until_pool_is_dead(PoolName),
  199:     %% set the switch to stop killing workers
  200:     set_killing_switch(KillingSwitch, false),
  201: 
  202:     %% wait until the pool is restarted by the manager
  203:     Fun = fun() ->
  204:                   case erlang:whereis(PoolName) of
  205:                       undefined -> false;
  206:                       _ -> true
  207:                   end
  208:           end,
  209: 
  210:     async_helper:wait_until(Fun, true),
  211: 
  212:     meck:unload(killing_workers).
  213: 
  214: dead_pool_is_stopped_before_restarted(_C) ->
  215:     Size = 3,
  216:     Tag = kill_stop_before_restart,
  217:     {PoolName, KillingSwitch} = start_killable_pool(Size, Tag),
  218:     %% set the switch to kill every new worker
  219:     set_killing_switch(KillingSwitch, true),
  220:     %% kill existing workers so they will be restarted
  221:     [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) ||
  222:      I <- lists:seq(1, Size)],
  223: 
  224:     wait_until_pool_is_dead(PoolName),
  225:     %% stop the pool before it's restarted
  226:     mongoose_wpool:stop(generic, global, Tag),
  227:     %% set the switch to stop killing workers
  228:     set_killing_switch(KillingSwitch, false),
  229:     %% wait 4s (the restart attempt will happen after 2s)
  230:     %% and check if the pool is started, it should not be
  231:     timer:sleep(timer:seconds(4)),
  232:     ?assertEqual(undefined, erlang:whereis(PoolName)),
  233:     meck:unload(killing_workers).
  234: 
  235: %% --- available_worker strategy is banned for some backends --
  236: 
  237: riak_pool_cant_be_started_with_available_worker_strategy(_Config) ->
  238:     pool_cant_be_started_with_available_worker_strategy(riak).
  239: 
  240: redis_pool_cant_be_started_with_available_worker_strategy(_Config) ->
  241:     pool_cant_be_started_with_available_worker_strategy(redis).
  242: 
  243: pool_cant_be_started_with_available_worker_strategy(Type) ->
  244:     Host = global,
  245:     Tag = default,
  246:     PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag),
  247:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)),
  248:     PoolDef = [{Type, Host, Tag, [{strategy, available_worker}],
  249:                 [{address, "localhost"}, {port, 1805}]}],
  250:     ?assertError({strategy_not_supported, Type, Host, Tag, available_worker},
  251:                  mongoose_wpool:start_configured_pools(PoolDef)).
  252: 
  253: %%--------------------------------------------------------------------
  254: %% Helpers
  255: %%--------------------------------------------------------------------
  256: 
  257: start_sup_pool_mock(PoolNames) when is_list(PoolNames) ->
  258:     fun(Type, PN, Opts) ->
  259:             case lists:member(PN, PoolNames) of
  260:                 true ->
  261:                     {ok, PN}; %% we don't realy need a pid here for mocking
  262:                 _ ->
  263:                     meck:passthrough([Type, PN, Opts])
  264:             end
  265:     end;
  266: start_sup_pool_mock(PoolName) ->
  267:     start_sup_pool_mock([PoolName]).
  268: 
  269: cleanup_pools() ->
  270:     lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end,
  271:                   mongoose_wpool:get_pools()).
  272: 
  273: start_killable_pool(Size, Tag) ->
  274:     KillingSwitch = ets:new(killing_switch, [public]),
  275:     ets:insert(KillingSwitch, {kill_worker, false}),
  276:     meck:new(killing_workers, [non_strict]),
  277:     meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)),
  278:     Pools = [{generic, global, Tag,
  279:               [{workers, Size},
  280:                {enable_callbacks, true},
  281:                {callbacks, [killing_workers]}],
  282:               []}],
  283:     [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools),
  284:     PoolName = mongoose_wpool:make_pool_name(generic, global, Tag),
  285:     {PoolName, KillingSwitch}.
  286: 
  287: kill_worker_fun(KillingSwitch) ->
  288:     fun(AWorkerName) ->
  289:             case ets:lookup_element(KillingSwitch, kill_worker, 2) of
  290:                 true ->
  291:                     ct:pal("I'll be killing ~p", [AWorkerName]),
  292:                     erlang:exit(whereis(AWorkerName), kill);
  293:                 _ ->
  294:                     ok
  295:             end
  296:     end.
  297: 
  298: wait_until_pool_is_dead(PoolName) ->
  299:     %% Wait until the pool is killed due to too many restarts
  300:     Pid = whereis(PoolName),
  301:     MRef = erlang:monitor(process, Pid),
  302:     receive
  303:         {'DOWN', MRef, process, Pid, _} ->
  304:             ok
  305:     after
  306:         timer:minutes(3) ->
  307:             ct:fail("pool not stopped")
  308:     end.
  309: 
  310: set_killing_switch(KillingSwitch, Value) ->
  311:     ets:update_element(KillingSwitch, kill_worker, {2, Value}).