1: %%==============================================================================
    2: %% Copyright 2018 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mongoose_wpool_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: -author('konrad.zemek@erlang-solutions.com').
   20: 
   21: -include_lib("common_test/include/ct.hrl").
   22: -include_lib("eunit/include/eunit.hrl").
   23: 
   24: %%--------------------------------------------------------------------
   25: %% Suite configuration
   26: %%--------------------------------------------------------------------
   27: 
   28: all() ->
   29:     [
   30:      get_pools_returns_pool_names,
   31:      stats_passes_through_to_wpool_stats,
   32:      two_distinct_redis_pools_are_started,
   33:      generic_pools_are_started_for_all_vhosts,
   34:      host_specific_pools_are_preserved,
   35:      pools_for_different_tag_are_expanded_with_host_specific_config_preserved,
   36:      global_pool_is_used_by_default,
   37:      request_behaves_as_gen_server_send_request,
   38:      dead_pool_is_restarted,
   39:      dead_pool_is_stopped_before_restarted,
   40:      redis_pool_cant_be_started_with_available_worker_strategy,
   41:      cassandra_prepare_opts
   42:     ].
   43: 
   44: %%--------------------------------------------------------------------
   45: %% Init & teardown
   46: %%--------------------------------------------------------------------
   47: 
   48: init_per_suite(Config) ->
   49:     ok = meck:new(wpool, [no_link, passthrough]),
   50:     ok = meck:new(mongoose_wpool, [no_link, passthrough]),
   51:     mongoose_config:set_opts(opts()),
   52:     Self = self(),
   53:     spawn(fun() ->
   54:                   register(test_helper, self()),
   55:                   mongoose_wpool:ensure_started(),
   56:                   Self ! ready,
   57:                   receive stop -> ok end
   58:           end),
   59:     receive ready -> ok end,
   60:     Config.
   61: 
   62: end_per_suite(Config) ->
   63:     meck:unload(wpool),
   64:     whereis(test_helper) ! stop,
   65:     mongoose_config:erase_opts(),
   66:     Config.
   67: 
   68: opts() ->
   69:     #{hosts => [<<"a.com">>, <<"b.com">>, <<"c.eu">>], host_types => []}.
   70: 
   71: init_per_testcase(_Case, Config) ->
   72:     cleanup_pools(),
   73:     Config.
   74: 
   75: end_per_testcase(_Case, Config) ->
   76:     cleanup_pools(),
   77:     Config.
   78: 
   79: %%--------------------------------------------------------------------
   80: %% Tests
   81: %%--------------------------------------------------------------------
   82: 
   83: get_pools_returns_pool_names(_Config) ->
   84:     mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]),
   85:     mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]),
   86:     mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]),
   87: 
   88:     ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}],
   89:                  ordsets:from_list(mongoose_wpool:get_pools())).
   90: 
   91: stats_passes_through_to_wpool_stats(_Config) ->
   92:     mongoose_wpool:start(generic, global, z, [{workers, 1}]),
   93:     Ref = make_ref(),
   94:     meck:expect(wpool, stats, fun(_Name) -> Ref end),
   95:     ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)).
   96: 
   97: filter_calls_to_start_sup_pool(Pid) ->
   98:     H = meck_history:get_history(Pid, mongoose_wpool),
   99:     F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}};
  100:            (_) -> false
  101:         end,
  102:     lists:filtermap(F, H).
  103: 
  104: two_distinct_redis_pools_are_started(_C) ->
  105:     PoolName1 = mongoose_wpool:make_pool_name(redis, global, default),
  106:     PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist),
  107:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])),
  108:     Pools = [#{type => redis, scope => global, tag => default, opts => #{workers => 2},
  109:                conn_opts => #{host => "localhost",
  110:                               port => 1805,
  111:                               database => 0,
  112:                               password => ""}},
  113:              #{type => redis, scope => global, tag => global_dist, opts => #{workers => 4},
  114:                conn_opts => #{host => "localhost2",
  115:                               port => 1806,
  116:                               database => 0,
  117:                               password => ""}}],
  118: 
  119:     [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools),
  120: 
  121:     MgrPid = whereis(mongoose_wpool_mgr:name(redis)),
  122:     [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid),
  123:     ?assertEqual(2, proplists:get_value(workers, CallArgs1)),
  124:     ?assertEqual(4, proplists:get_value(workers, CallArgs2)),
  125:     {eredis_client, Props1} = proplists:get_value(worker, CallArgs1),
  126:     {eredis_client, Props2} = proplists:get_value(worker, CallArgs2),
  127:     ?assertMatch(#{host := "localhost", port := 1805}, proplists:to_map(Props1)),
  128:     ?assertMatch(#{host := "localhost2", port := 1806}, proplists:to_map(Props2)).
  129: 
  130: generic_pools_are_started_for_all_vhosts(_C) ->
  131:     Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}}],
  132:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  133:     ?assertMatch([_, _, _], StartRes),
  134:     ?assertMatch([{generic, <<"a.com">>, default},
  135:                   {generic, <<"b.com">>, default},
  136:                   {generic, <<"c.eu">>, default}],
  137:                  ordsets:from_list(mongoose_wpool:get_pools())).
  138: 
  139: host_specific_pools_are_preserved(_C) ->
  140:     Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}},
  141:              #{type => generic, scope => <<"b.com">>, tag => default,
  142:                opts => #{workers => 12}, conn_opts => #{}}],
  143:     Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  144:     ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default,
  145:                     opts := [], conn_opts := #{}},
  146:                   #{type := generic, host_type := <<"c.eu">>, tag := default,
  147:                     opts := [], conn_opts := #{}},
  148:                   #{type := generic, host_type := <<"b.com">>, tag := default,
  149:                     opts := [{workers, 12}], conn_opts := #{}}],
  150:                  Expanded).
  151: 
  152: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) ->
  153:     Pools = [#{type => generic, scope => host, tag => default, opts => #{}, conn_opts => #{}},
  154:              #{type => generic, scope => <<"b.com">>, tag => default,
  155:                opts => #{workers => 12}, conn_opts => #{}},
  156:              #{type => generic, scope => host, tag => other_tag, opts => #{}, conn_opts => #{}}],
  157:     Expanded = mongoose_wpool:expand_pools(Pools, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  158:     ?assertMatch([#{type := generic, host_type := <<"a.com">>, tag := default,
  159:                     opts := [], conn_opts := #{}},
  160:                   #{type := generic, host_type := <<"c.eu">>, tag := default,
  161:                     opts := [], conn_opts := #{}},
  162:                   #{type := generic, host_type := <<"b.com">>, tag := default,
  163:                     opts := [{workers, 12}], conn_opts := #{}},
  164:                   #{type := generic, host_type := <<"a.com">>, tag := other_tag,
  165:                     opts := [], conn_opts := #{}},
  166:                   #{type := generic, host_type := <<"b.com">>, tag := other_tag,
  167:                     opts := [], conn_opts := #{}},
  168:                   #{type := generic, host_type := <<"c.eu">>, tag := other_tag,
  169:                     opts := [], conn_opts := #{}}],
  170:                  Expanded).
  171: 
  172: global_pool_is_used_by_default(_C) ->
  173:     Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}},
  174:              #{type => generic, scope => <<"a.com">>, tag => default,
  175:                opts => #{}, conn_opts => #{}}],
  176:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  177:     ?assertMatch([{ok, _}, {ok, _}], StartRes),
  178:     meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end),
  179:     ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default),
  180:                  mongoose_wpool:call(generic, <<"a.com">>, default, request)),
  181: 
  182:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  183:                  mongoose_wpool:call(generic, <<"b.com">>, default, request)),
  184: 
  185:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  186:                  mongoose_wpool:call(generic, global, default, request)).
  187: 
  188: request_behaves_as_gen_server_send_request(_C) ->
  189:     Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ],
  190:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  191:     ?assertMatch([{ok, _}], StartRes),
  192:     Req1 = mongoose_wpool:send_request(generic, {?MODULE, echo, [send_request]}),
  193:     ?assertEqual({reply, {ok, send_request}}, gen_server:wait_response(Req1, 5000)).
  194: 
  195: dead_pool_is_restarted(_C) ->
  196:     Size = 3,
  197:     {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart),
  198:     %% set the switch to kill every new worker
  199:     set_killing_switch(KillingSwitch, true),
  200:     Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)],
  201:     %% kill existing workers so they will be restarted
  202:     [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined],
  203: 
  204:     wait_until_pool_is_dead(PoolName),
  205:     %% set the switch to stop killing workers
  206:     set_killing_switch(KillingSwitch, false),
  207: 
  208:     %% wait until the pool is restarted by the manager
  209:     Fun = fun() ->
  210:                   case erlang:whereis(PoolName) of
  211:                       undefined -> false;
  212:                       _ -> true
  213:                   end
  214:           end,
  215: 
  216:     async_helper:wait_until(Fun, true),
  217: 
  218:     meck:unload(killing_workers).
  219: 
  220: dead_pool_is_stopped_before_restarted(_C) ->
  221:     Size = 3,
  222:     Tag = kill_stop_before_restart,
  223:     {PoolName, KillingSwitch} = start_killable_pool(Size, Tag),
  224:     %% set the switch to kill every new worker
  225:     set_killing_switch(KillingSwitch, true),
  226:     %% kill existing workers so they will be restarted
  227:     [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) ||
  228:      I <- lists:seq(1, Size)],
  229: 
  230:     wait_until_pool_is_dead(PoolName),
  231:     %% stop the pool before it's restarted
  232:     mongoose_wpool:stop(generic, global, Tag),
  233:     %% set the switch to stop killing workers
  234:     set_killing_switch(KillingSwitch, false),
  235:     %% wait 4s (the restart attempt will happen after 2s)
  236:     %% and check if the pool is started, it should not be
  237:     timer:sleep(timer:seconds(4)),
  238:     ?assertEqual(undefined, erlang:whereis(PoolName)),
  239:     meck:unload(killing_workers).
  240: 
  241: %% --- available_worker strategy is banned for some backends --
  242: 
  243: redis_pool_cant_be_started_with_available_worker_strategy(_Config) ->
  244:     Type = redis,
  245:     Host = global,
  246:     Tag = default,
  247:     PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag),
  248:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)),
  249:     PoolDef = [#{type => Type, scope => Host, tag => Tag, opts => #{strategy => available_worker},
  250:                  conn_opts => #{address => "localhost", port => 1805}}],
  251:     ?assertError({strategy_not_supported, Type, Host, Tag, available_worker},
  252:                  mongoose_wpool:start_configured_pools(PoolDef)).
  253: 
  254: cassandra_prepare_opts(_Config) ->
  255:     %% Check that we pass auth options in the correct format to the Cassandra driver
  256:     AuthCfg = #{auth => #{plain => #{username => <<"user">>, password => <<"password">>}}},
  257:     ?assertEqual([{auth, {cqerl_auth_plain_handler, [{<<"user">>, <<"password">>}]}},
  258:                   {tcp_opts, [{keepalive, true}]}],
  259:                   mongoose_wpool_cassandra:prepare_cqerl_opts(AuthCfg)).
  260: 
  261: %%--------------------------------------------------------------------
  262: %% Helpers
  263: %%--------------------------------------------------------------------
  264: 
  265: start_sup_pool_mock(PoolNames) when is_list(PoolNames) ->
  266:     fun(Type, PN, Opts) ->
  267:             case lists:member(PN, PoolNames) of
  268:                 true ->
  269:                     {ok, PN}; %% we don't really need a pid here for mocking
  270:                 _ ->
  271:                     meck:passthrough([Type, PN, Opts])
  272:             end
  273:     end;
  274: start_sup_pool_mock(PoolName) ->
  275:     start_sup_pool_mock([PoolName]).
  276: 
  277: cleanup_pools() ->
  278:     lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end,
  279:                   mongoose_wpool:get_pools()).
  280: 
  281: start_killable_pool(Size, Tag) ->
  282:     KillingSwitch = ets:new(killing_switch, [public]),
  283:     ets:insert(KillingSwitch, {kill_worker, false}),
  284:     meck:new(killing_workers, [non_strict]),
  285:     meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)),
  286:     Pools = [#{type => generic, scope => global, tag => Tag,
  287:                opts => #{workers => Size,
  288:                          enable_callbacks => true,
  289:                          callbacks => [killing_workers]},
  290:                conn_opts => #{}}],
  291:     [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools),
  292:     PoolName = mongoose_wpool:make_pool_name(generic, global, Tag),
  293:     {PoolName, KillingSwitch}.
  294: 
  295: kill_worker_fun(KillingSwitch) ->
  296:     fun(AWorkerName) ->
  297:             case ets:lookup_element(KillingSwitch, kill_worker, 2) of
  298:                 true ->
  299:                     ct:pal("I'll be killing ~p", [AWorkerName]),
  300:                     erlang:exit(whereis(AWorkerName), kill);
  301:                 _ ->
  302:                     ok
  303:             end
  304:     end.
  305: 
  306: wait_until_pool_is_dead(PoolName) ->
  307:     %% Wait until the pool is killed due to too many restarts
  308:     Pid = whereis(PoolName),
  309:     MRef = erlang:monitor(process, Pid),
  310:     receive
  311:         {'DOWN', MRef, process, Pid, _} ->
  312:             ok
  313:     after
  314:         timer:minutes(3) ->
  315:             ct:fail("pool not stopped")
  316:     end.
  317: 
  318: set_killing_switch(KillingSwitch, Value) ->
  319:     ets:update_element(KillingSwitch, kill_worker, {2, Value}).
  320: 
  321: echo(Val) -> Val.