1: %%==============================================================================
    2: %% Copyright 2018 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mongoose_wpool_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: -author('konrad.zemek@erlang-solutions.com').
   20: 
   21: -include_lib("common_test/include/ct.hrl").
   22: -include_lib("eunit/include/eunit.hrl").
   23: 
   24: %%--------------------------------------------------------------------
   25: %% Suite configuration
   26: %%--------------------------------------------------------------------
   27: 
   28: all() ->
   29:     [
   30:      get_pools_returns_pool_names,
   31:      stats_passes_through_to_wpool_stats,
   32:      a_global_riak_pool_is_started,
   33:      two_distinct_redis_pools_are_started,
   34:      generic_pools_are_started_for_all_vhosts,
   35:      host_specific_pools_are_preserved,
   36:      pools_for_different_tag_are_expanded_with_host_specific_config_preserved,
   37:      global_pool_is_used_by_default,
   38:      request_behaves_as_gen_server_send_request,
   39:      dead_pool_is_restarted,
   40:      dead_pool_is_stopped_before_restarted,
   41:      riak_pool_cant_be_started_with_available_worker_strategy,
   42:      redis_pool_cant_be_started_with_available_worker_strategy
   43:     ].
   44: 
   45: %%--------------------------------------------------------------------
   46: %% Init & teardown
   47: %%--------------------------------------------------------------------
   48: 
   49: init_per_suite(Config) ->
   50:     ok = meck:new(wpool, [no_link, passthrough]),
   51:     ok = meck:new(mongoose_wpool, [no_link, passthrough]),
   52:     [mongoose_config:set_opt(Key, Value) || {Key, Value} <- opts()],
   53:     Self = self(),
   54:     spawn(fun() ->
   55:                   register(test_helper, self()),
   56:                   mongoose_wpool:ensure_started(),
   57:                   Self ! ready,
   58:                   receive stop -> ok end
   59:           end),
   60:     receive ready -> ok end,
   61:     Config.
   62: 
   63: end_per_suite(Config) ->
   64:     meck:unload(wpool),
   65:     whereis(test_helper) ! stop,
   66:     [mongoose_config:unset_opt(Key) || {Key, _Value} <- opts()],
   67:     Config.
   68: 
   69: opts() ->
   70:     [{hosts, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]},
   71:      {host_types, []}].
   72: 
   73: init_per_testcase(_Case, Config) ->
   74:     cleanup_pools(),
   75:     Config.
   76: 
   77: end_per_testcase(_Case, Config) ->
   78:     cleanup_pools(),
   79:     Config.
   80: 
   81: %%--------------------------------------------------------------------
   82: %% Tests
   83: %%--------------------------------------------------------------------
   84: 
   85: get_pools_returns_pool_names(_Config) ->
   86:     mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]),
   87:     mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]),
   88:     mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]),
   89: 
   90:     ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}],
   91:                  ordsets:from_list(mongoose_wpool:get_pools())).
   92: 
   93: stats_passes_through_to_wpool_stats(_Config) ->
   94:     mongoose_wpool:start(generic, global, z, [{workers, 1}]),
   95:     Ref = make_ref(),
   96:     meck:expect(wpool, stats, fun(_Name) -> Ref end),
   97:     ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)).
   98: 
   99: a_global_riak_pool_is_started(_Config) ->
  100:     PoolName = mongoose_wpool:make_pool_name(riak, global, default),
  101:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)),
  102:     Pool = #{type => riak, scope => global, tag => default,
  103:              opts => #{workers => 12},
  104:              conn_opts => #{address => "localhost", port => 1805}},
  105:     [{ok, PoolName}] = mongoose_wpool:start_configured_pools([Pool]),
  106: 
  107:     MgrPid = whereis(mongoose_wpool_mgr:name(riak)),
  108:     [{PoolName, CallArgs}] = filter_calls_to_start_sup_pool(MgrPid),
  109:     ?assertEqual(12, proplists:get_value(workers, CallArgs)),
  110:     ?assertMatch({riakc_pb_socket, _}, proplists:get_value(worker, CallArgs)).
  111: 
  112: filter_calls_to_start_sup_pool(Pid) ->
  113:     H = meck_history:get_history(Pid, mongoose_wpool),
  114:     F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}};
  115:            (_) -> false
  116:         end,
  117:     lists:filtermap(F, H).
  118: 
  119: two_distinct_redis_pools_are_started(_C) ->
  120:     PoolName1 = mongoose_wpool:make_pool_name(redis, global, default),
  121:     PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist),
  122:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])),
  123:     Pools = [#{type => redis, scope => global, tag => default, opts => #{workers => 2},
  124:                conn_opts => #{host => "localhost",
  125:                               port => 1805,
  126:                               database => 0,
  127:                               password => ""}},
  128:              #{type => redis, scope => global, tag => global_dist, opts => #{workers => 4},
  129:                conn_opts => #{host => "localhost2",
  130:                               port => 1806,
  131:                               database => 0,
  132:                               password => ""}}],
  133: 
  134:     [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools),
  135: 
  136:     MgrPid = whereis(mongoose_wpool_mgr:name(redis)),
  137:     [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid),
  138:     ?assertEqual(2, proplists:get_value(workers, CallArgs1)),
  139:     ?assertEqual(4, proplists:get_value(workers, CallArgs2)),
  140:     ?assertMatch({eredis_client, ["localhost", 1805 | _]}, proplists:get_value(worker, CallArgs1)),
  141:     ?assertMatch({eredis_client, ["localhost2", 1806 | _]}, proplists:get_value(worker, CallArgs2)).
  142: 
  143: generic_pools_are_started_for_all_vhosts(_C) ->
  144:     Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}],
  145:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  146:     ?assertMatch([_, _, _], StartRes),
  147:     ?assertMatch([{generic, <<"a.com">>, default},
  148:                   {generic, <<"b.com">>, default},
  149:                   {generic, <<"c.eu">>, default}],
  150:                  ordsets:from_list(mongoose_wpool:get_pools())).
  151: 
  152: host_specific_pools_are_preserved(_C) ->
  153:     Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}},
  154:              #{type => generic, scope => <<"b.com">>, tag => default,
  155:                opts => #{workers => 12}, conn_opts => #{}}],
  156:     Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  157:     ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default,
  158:                     opts := [], conn_opts := #{}},
  159:                   #{type := generic, host_type := <<"c.eu">>, tag := default,
  160:                     opts := [], conn_opts := #{}},
  161:                   #{type := generic, host_type := <<"b.com">>, tag := default,
  162:                     opts := [{workers, 12}], conn_opts := #{}}],
  163:                  Expanded).
  164: 
  165: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) ->
  166:     Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}},
  167:              #{type => generic, scope => <<"b.com">>, tag => default,
  168:                opts => #{workers => 12}, conn_opts => #{}},
  169:              #{type => generic, scope => host, tag => other_tag, opts => #{}, conn_opts => #{}}],
  170:     Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  171:     ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default,
  172:                     opts := [], conn_opts := #{}},
  173:                   #{type := generic, host_type := <<"c.eu">>, tag := default,
  174:                     opts := [], conn_opts := #{}},
  175:                   #{type := generic, host_type := <<"b.com">>, tag := default,
  176:                     opts := [{workers, 12}], conn_opts := #{}},
  177:                   #{type := generic, host_type := <<"a.com">>, tag := other_tag,
  178:                     opts := [], conn_opts := #{}},
  179:                   #{type := generic, host_type := <<"b.com">>, tag := other_tag,
  180:                     opts := [], conn_opts := #{}},
  181:                   #{type := generic, host_type := <<"c.eu">>, tag := other_tag,
  182:                     opts := [], conn_opts := #{}}],
  183:                  Expanded).
  184: 
  185: global_pool_is_used_by_default(_C) ->
  186:     Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}},
  187:              #{type => generic, scope => <<"a.com">>, tag => default,
  188:                opts => #{}, conn_opts => #{}}],
  189:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  190:     ?assertMatch([{ok, _}, {ok, _}], StartRes),
  191:     meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end),
  192:     ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default),
  193:                  mongoose_wpool:call(generic, <<"a.com">>, default, request)),
  194: 
  195:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  196:                  mongoose_wpool:call(generic, <<"b.com">>, default, request)),
  197: 
  198:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  199:                  mongoose_wpool:call(generic, global, default, request)).
  200: 
  201: request_behaves_as_gen_server_send_request(_C) ->
  202:     Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ],
  203:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  204:     ?assertMatch([{ok, _}], StartRes),
  205:     Req1 = mongoose_wpool:send_request(generic, {?MODULE, echo, [send_request]}),
  206:     ?assertEqual({reply, {ok, send_request}}, gen_server:wait_response(Req1, 5000)).
  207: 
  208: dead_pool_is_restarted(_C) ->
  209:     Size = 3,
  210:     {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart),
  211:     %% set the switch to kill every new worker
  212:     set_killing_switch(KillingSwitch, true),
  213:     Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)],
  214:     %% kill existing workers so they will be restarted
  215:     [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined],
  216: 
  217:     wait_until_pool_is_dead(PoolName),
  218:     %% set the switch to stop killing workers
  219:     set_killing_switch(KillingSwitch, false),
  220: 
  221:     %% wait until the pool is restarted by the manager
  222:     Fun = fun() ->
  223:                   case erlang:whereis(PoolName) of
  224:                       undefined -> false;
  225:                       _ -> true
  226:                   end
  227:           end,
  228: 
  229:     async_helper:wait_until(Fun, true),
  230: 
  231:     meck:unload(killing_workers).
  232: 
  233: dead_pool_is_stopped_before_restarted(_C) ->
  234:     Size = 3,
  235:     Tag = kill_stop_before_restart,
  236:     {PoolName, KillingSwitch} = start_killable_pool(Size, Tag),
  237:     %% set the switch to kill every new worker
  238:     set_killing_switch(KillingSwitch, true),
  239:     %% kill existing workers so they will be restarted
  240:     [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) ||
  241:      I <- lists:seq(1, Size)],
  242: 
  243:     wait_until_pool_is_dead(PoolName),
  244:     %% stop the pool before it's restarted
  245:     mongoose_wpool:stop(generic, global, Tag),
  246:     %% set the switch to stop killing workers
  247:     set_killing_switch(KillingSwitch, false),
  248:     %% wait 4s (the restart attempt will happen after 2s)
  249:     %% and check if the pool is started, it should not be
  250:     timer:sleep(timer:seconds(4)),
  251:     ?assertEqual(undefined, erlang:whereis(PoolName)),
  252:     meck:unload(killing_workers).
  253: 
  254: %% --- available_worker strategy is banned for some backends --
  255: 
  256: riak_pool_cant_be_started_with_available_worker_strategy(_Config) ->
  257:     pool_cant_be_started_with_available_worker_strategy(riak).
  258: 
  259: redis_pool_cant_be_started_with_available_worker_strategy(_Config) ->
  260:     pool_cant_be_started_with_available_worker_strategy(redis).
  261: 
  262: pool_cant_be_started_with_available_worker_strategy(Type) ->
  263:     Host = global,
  264:     Tag = default,
  265:     PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag),
  266:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)),
  267:     PoolDef = [#{type => Type, scope => Host, tag => Tag, opts => #{strategy => available_worker},
  268:                  conn_opts => #{address => "localhost", port => 1805}}],
  269:     ?assertError({strategy_not_supported, Type, Host, Tag, available_worker},
  270:                  mongoose_wpool:start_configured_pools(PoolDef)).
  271: 
  272: %%--------------------------------------------------------------------
  273: %% Helpers
  274: %%--------------------------------------------------------------------
  275: 
  276: start_sup_pool_mock(PoolNames) when is_list(PoolNames) ->
  277:     fun(Type, PN, Opts) ->
  278:             case lists:member(PN, PoolNames) of
  279:                 true ->
  280:                     {ok, PN}; %% we don't really need a pid here for mocking
  281:                 _ ->
  282:                     meck:passthrough([Type, PN, Opts])
  283:             end
  284:     end;
  285: start_sup_pool_mock(PoolName) ->
  286:     start_sup_pool_mock([PoolName]).
  287: 
  288: cleanup_pools() ->
  289:     lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end,
  290:                   mongoose_wpool:get_pools()).
  291: 
  292: start_killable_pool(Size, Tag) ->
  293:     KillingSwitch = ets:new(killing_switch, [public]),
  294:     ets:insert(KillingSwitch, {kill_worker, false}),
  295:     meck:new(killing_workers, [non_strict]),
  296:     meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)),
  297:     Pools = [#{type => generic, scope => global, tag => Tag,
  298:                opts => #{workers => Size,
  299:                          enable_callbacks => true,
  300:                          callbacks => [killing_workers]},
  301:                conn_opts => #{}}],
  302:     [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools),
  303:     PoolName = mongoose_wpool:make_pool_name(generic, global, Tag),
  304:     {PoolName, KillingSwitch}.
  305: 
  306: kill_worker_fun(KillingSwitch) ->
  307:     fun(AWorkerName) ->
  308:             case ets:lookup_element(KillingSwitch, kill_worker, 2) of
  309:                 true ->
  310:                     ct:pal("I'll be killing ~p", [AWorkerName]),
  311:                     erlang:exit(whereis(AWorkerName), kill);
  312:                 _ ->
  313:                     ok
  314:             end
  315:     end.
  316: 
  317: wait_until_pool_is_dead(PoolName) ->
  318:     %% Wait until the pool is killed due to too many restarts
  319:     Pid = whereis(PoolName),
  320:     MRef = erlang:monitor(process, Pid),
  321:     receive
  322:         {'DOWN', MRef, process, Pid, _} ->
  323:             ok
  324:     after
  325:         timer:minutes(3) ->
  326:             ct:fail("pool not stopped")
  327:     end.
  328: 
  329: set_killing_switch(KillingSwitch, Value) ->
  330:     ets:update_element(KillingSwitch, kill_worker, {2, Value}).
  331: 
  332: echo(Val) -> Val.