1: %%==============================================================================
    2: %% Copyright 2018 Erlang Solutions Ltd.
    3: %%
    4: %% Licensed under the Apache License, Version 2.0 (the "License");
    5: %% you may not use this file except in compliance with the License.
    6: %% You may obtain a copy of the License at
    7: %%
    8: %% http://www.apache.org/licenses/LICENSE-2.0
    9: %%
   10: %% Unless required by applicable law or agreed to in writing, software
   11: %% distributed under the License is distributed on an "AS IS" BASIS,
   12: %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   13: %% See the License for the specific language governing permissions and
   14: %% limitations under the License.
   15: %%==============================================================================
   16: 
   17: -module(mongoose_wpool_SUITE).
   18: -compile([export_all, nowarn_export_all]).
   19: -author('konrad.zemek@erlang-solutions.com').
   20: 
   21: -include_lib("common_test/include/ct.hrl").
   22: -include_lib("eunit/include/eunit.hrl").
   23: 
   24: %%--------------------------------------------------------------------
   25: %% Suite configuration
   26: %%--------------------------------------------------------------------
   27: 
   28: all() ->
   29:     [
   30:      get_pools_returns_pool_names,
   31:      stats_passes_through_to_wpool_stats,
   32:      two_distinct_redis_pools_are_started,
   33:      generic_pools_are_started_for_all_vhosts,
   34:      host_specific_pools_are_preserved,
   35:      pools_for_different_tag_are_expanded_with_host_specific_config_preserved,
   36:      global_pool_is_used_by_default,
   37:      request_behaves_as_gen_server_send_request,
   38:      dead_pool_is_restarted,
   39:      dead_pool_is_stopped_before_restarted,
   40:      redis_pool_cant_be_started_with_available_worker_strategy,
   41:      cassandra_prepare_opts
   42:     ].
   43: 
   44: %%--------------------------------------------------------------------
   45: %% Init & teardown
   46: %%--------------------------------------------------------------------
   47: 
   48: init_per_suite(Config) ->
   49:     ok = meck:new(wpool, [no_link, passthrough]),
   50:     ok = meck:new(mongoose_wpool, [no_link, passthrough]),
   51:     mongoose_config:set_opts(opts()),
   52:     Self = self(),
   53:     spawn(fun() ->
   54:                   register(test_helper, self()),
   55:                   mongoose_wpool:ensure_started(),
   56:                   Self ! ready,
   57:                   receive stop -> ok end
   58:           end),
   59:     receive ready -> ok end,
   60:     Config.
   61: 
   62: end_per_suite(Config) ->
   63:     meck:unload(wpool),
   64:     whereis(test_helper) ! stop,
   65:     mongoose_config:erase_opts(),
   66:     Config.
   67: 
   68: opts() ->
   69:     #{hosts => [<<"a.com">>, <<"b.com">>, <<"c.eu">>], host_types => []}.
   70: 
   71: init_per_testcase(_Case, Config) ->
   72:     cleanup_pools(),
   73:     Config.
   74: 
   75: end_per_testcase(_Case, Config) ->
   76:     cleanup_pools(),
   77:     Config.
   78: 
   79: %%--------------------------------------------------------------------
   80: %% Tests
   81: %%--------------------------------------------------------------------
   82: 
   83: get_pools_returns_pool_names(_Config) ->
   84:     mongoose_wpool:start(generic, <<"b">>, c, [{workers, 1}]),
   85:     mongoose_wpool:start(generic, <<"e">>, f, [{workers, 1}]),
   86:     mongoose_wpool:start(generic, <<"h">>, i, [{workers, 1}]),
   87: 
   88:     ?assertEqual([{generic, <<"b">>, c}, {generic, <<"e">>, f}, {generic, <<"h">>, i}],
   89:                  ordsets:from_list(mongoose_wpool:get_pools())).
   90: 
   91: stats_passes_through_to_wpool_stats(_Config) ->
   92:     mongoose_wpool:start(generic, global, z, [{workers, 1}]),
   93:     Ref = make_ref(),
   94:     meck:expect(wpool, stats, fun(_Name) -> Ref end),
   95:     ?assertEqual(Ref, mongoose_wpool:stats(generic, global, z)).
   96: 
   97: filter_calls_to_start_sup_pool(Pid) ->
   98:     H = meck_history:get_history(Pid, mongoose_wpool),
   99:     F = fun({_, {mongoose_wpool, start_sup_pool, [_, PN, Args]}, _}) -> {true, {PN, Args}};
  100:            (_) -> false
  101:         end,
  102:     lists:filtermap(F, H).
  103: 
  104: two_distinct_redis_pools_are_started(_C) ->
  105:     PoolName1 = mongoose_wpool:make_pool_name(redis, global, default),
  106:     PoolName2 = mongoose_wpool:make_pool_name(redis, global, global_dist),
  107:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock([PoolName1, PoolName2])),
  108:     Pools = [#{type => redis, scope => global, tag => default, opts => #{workers => 2},
  109:                conn_opts => #{host => "localhost",
  110:                               port => 1805,
  111:                               database => 0,
  112:                               password => ""}},
  113:              #{type => redis, scope => global, tag => global_dist, opts => #{workers => 4},
  114:                conn_opts => #{host => "localhost2",
  115:                               port => 1806,
  116:                               database => 0,
  117:                               password => ""}}],
  118: 
  119:     [{ok, PoolName1}, {ok, PoolName2}] = mongoose_wpool:start_configured_pools(Pools),
  120: 
  121:     MgrPid = whereis(mongoose_wpool_mgr:name(redis)),
  122:     [{PoolName1, CallArgs1}, {PoolName2, CallArgs2}] = filter_calls_to_start_sup_pool(MgrPid),
  123:     ?assertEqual(2, proplists:get_value(workers, CallArgs1)),
  124:     ?assertEqual(4, proplists:get_value(workers, CallArgs2)),
  125:     {eredis_client, Props1} = proplists:get_value(worker, CallArgs1),
  126:     {eredis_client, Props2} = proplists:get_value(worker, CallArgs2),
  127:     ?assertMatch(#{host := "localhost", port := 1805}, proplists:to_map(Props1)),
  128:     ?assertMatch(#{host := "localhost2", port := 1806}, proplists:to_map(Props2)).
  129: 
  130: generic_pools_are_started_for_all_vhosts(_C) ->
  131:     Pools = [#{type => generic, scope => host_type, tag => default, opts => #{}, conn_opts => #{}}],
  132:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  133:     ?assertMatch([_, _, _], StartRes),
  134:     ?assertMatch([{generic, <<"a.com">>, default},
  135:                   {generic, <<"b.com">>, default},
  136:                   {generic, <<"c.eu">>, default}],
  137:                  ordsets:from_list(mongoose_wpool:get_pools())).
  138: 
  139: host_specific_pools_are_preserved(_C) ->
  140:     Pools = [#{type => generic, scope => host_type, tag => default, opts => #{}, conn_opts => #{}}],
  141:     HostSpecific = [#{type => generic, scope => <<"b.com">>, tag => default,
  142:                       opts => #{workers => 12}, conn_opts => #{}}],
  143:     Expanded = mongoose_wpool:expand_pools(
  144:                  Pools, HostSpecific, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  145:     Expected = lists:sort([#{type => generic, host_type => <<"a.com">>, tag => default,
  146:                              opts => [], conn_opts => #{}},
  147:                            #{type => generic, host_type => <<"c.eu">>, tag => default,
  148:                              opts => [], conn_opts => #{}},
  149:                            #{type => generic, host_type => <<"b.com">>, tag => default,
  150:                              opts => [{workers, 12}], conn_opts => #{}}]),
  151:     ?assertMatch(Expected, lists:sort(Expanded)).
  152: 
  153: pools_for_different_tag_are_expanded_with_host_specific_config_preserved(_C) ->
  154:     Pools = [#{type => generic, scope => host_type, tag => default, opts => #{}, conn_opts => #{}},
  155:              #{type => generic, scope => host_type, tag => other_tag,
  156:                opts => #{}, conn_opts => #{}}],
  157:     HostSpecific = [#{type => generic, scope => <<"b.com">>, tag => default,
  158:                       opts => #{workers => 12}, conn_opts => #{}}],
  159:     Expanded = mongoose_wpool:expand_pools(
  160:                  Pools, HostSpecific, [<<"a.com">>, <<"b.com">>, <<"c.eu">>]),
  161:     Expected = lists:sort([#{type => generic, host_type => <<"a.com">>, tag => default,
  162:                              opts => [], conn_opts => #{}},
  163:                            #{type => generic, host_type => <<"c.eu">>, tag => default,
  164:                              opts => [], conn_opts => #{}},
  165:                            #{type => generic, host_type => <<"b.com">>, tag => default,
  166:                              opts => [{workers, 12}], conn_opts => #{}},
  167:                            #{type => generic, host_type => <<"a.com">>, tag => other_tag,
  168:                              opts => [], conn_opts => #{}},
  169:                            #{type => generic, host_type => <<"b.com">>, tag => other_tag,
  170:                              opts => [], conn_opts => #{}},
  171:                            #{type => generic, host_type => <<"c.eu">>, tag => other_tag,
  172:                              opts => [], conn_opts => #{}}]),
  173:     ?assertMatch(Expected, lists:sort(Expanded)).
  174: 
  175: global_pool_is_used_by_default(_C) ->
  176:     Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}},
  177:              #{type => generic, scope => <<"a.com">>, tag => default,
  178:                opts => #{}, conn_opts => #{}}],
  179:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  180:     ?assertMatch([{ok, _}, {ok, _}], StartRes),
  181:     meck:expect(wpool, call, fun(Name, _Req, _Strat, _Timeout) -> Name end),
  182:     ?assertEqual(mongoose_wpool:make_pool_name(generic, <<"a.com">>, default),
  183:                  mongoose_wpool:call(generic, <<"a.com">>, default, request)),
  184: 
  185:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  186:                  mongoose_wpool:call(generic, <<"b.com">>, default, request)),
  187: 
  188:     ?assertEqual(mongoose_wpool:make_pool_name(generic, global, default),
  189:                  mongoose_wpool:call(generic, global, default, request)).
  190: 
  191: request_behaves_as_gen_server_send_request(_C) ->
  192:     Pools = [#{type => generic, scope => global, tag => default, opts => #{}, conn_opts => #{}} ],
  193:     StartRes = mongoose_wpool:start_configured_pools(Pools),
  194:     ?assertMatch([{ok, _}], StartRes),
  195:     Req1 = mongoose_wpool:send_request(generic, {?MODULE, echo, [send_request]}),
  196:     ?assertEqual({reply, {ok, send_request}}, gen_server:wait_response(Req1, 5000)).
  197: 
  198: dead_pool_is_restarted(_C) ->
  199:     Size = 3,
  200:     {PoolName, KillingSwitch} = start_killable_pool(Size, kill_and_restart),
  201:     %% set the switch to kill every new worker
  202:     set_killing_switch(KillingSwitch, true),
  203:     Pids = [whereis(wpool_pool:worker_name(PoolName, I)) || I <- lists:seq(1, Size)],
  204:     %% kill existing workers so they will be restarted
  205:     [erlang:exit(Pid, kill) || Pid <- Pids, Pid =/= undefined],
  206: 
  207:     wait_until_pool_is_dead(PoolName),
  208:     %% set the switch to stop killing workers
  209:     set_killing_switch(KillingSwitch, false),
  210: 
  211:     %% wait until the pool is restarted by the manager
  212:     Fun = fun() ->
  213:                   case erlang:whereis(PoolName) of
  214:                       undefined -> false;
  215:                       _ -> true
  216:                   end
  217:           end,
  218: 
  219:     async_helper:wait_until(Fun, true),
  220: 
  221:     meck:unload(killing_workers).
  222: 
  223: dead_pool_is_stopped_before_restarted(_C) ->
  224:     Size = 3,
  225:     Tag = kill_stop_before_restart,
  226:     {PoolName, KillingSwitch} = start_killable_pool(Size, Tag),
  227:     %% set the switch to kill every new worker
  228:     set_killing_switch(KillingSwitch, true),
  229:     %% kill existing workers so they will be restarted
  230:     [erlang:exit(whereis(wpool_pool:worker_name(PoolName, I)), kill) ||
  231:      I <- lists:seq(1, Size)],
  232: 
  233:     wait_until_pool_is_dead(PoolName),
  234:     %% stop the pool before it's restarted
  235:     mongoose_wpool:stop(generic, global, Tag),
  236:     %% set the switch to stop killing workers
  237:     set_killing_switch(KillingSwitch, false),
  238:     %% wait 4s (the restart attempt will happen after 2s)
  239:     %% and check if the pool is started, it should not be
  240:     timer:sleep(timer:seconds(4)),
  241:     ?assertEqual(undefined, erlang:whereis(PoolName)),
  242:     meck:unload(killing_workers).
  243: 
  244: %% --- available_worker strategy is banned for some backends --
  245: 
  246: redis_pool_cant_be_started_with_available_worker_strategy(_Config) ->
  247:     Type = redis,
  248:     Host = global,
  249:     Tag = default,
  250:     PoolName = mongoose_wpool:make_pool_name(Type, Host, Tag),
  251:     meck:expect(mongoose_wpool, start_sup_pool, start_sup_pool_mock(PoolName)),
  252:     PoolDef = [#{type => Type, scope => Host, tag => Tag, opts => #{strategy => available_worker},
  253:                  conn_opts => #{address => "localhost", port => 1805}}],
  254:     ?assertError({strategy_not_supported, Type, Host, Tag, available_worker},
  255:                  mongoose_wpool:start_configured_pools(PoolDef)).
  256: 
  257: cassandra_prepare_opts(_Config) ->
  258:     %% Check that we pass auth options in the correct format to the Cassandra driver
  259:     AuthCfg = #{auth => #{plain => #{username => <<"user">>, password => <<"password">>}}},
  260:     ?assertEqual([{auth, {cqerl_auth_plain_handler, [{<<"user">>, <<"password">>}]}},
  261:                   {tcp_opts, [{keepalive, true}]}],
  262:                   mongoose_wpool_cassandra:prepare_cqerl_opts(AuthCfg)).
  263: 
  264: %%--------------------------------------------------------------------
  265: %% Helpers
  266: %%--------------------------------------------------------------------
  267: 
  268: start_sup_pool_mock(PoolNames) when is_list(PoolNames) ->
  269:     fun(Type, PN, Opts) ->
  270:             case lists:member(PN, PoolNames) of
  271:                 true ->
  272:                     {ok, PN}; %% we don't really need a pid here for mocking
  273:                 _ ->
  274:                     meck:passthrough([Type, PN, Opts])
  275:             end
  276:     end;
  277: start_sup_pool_mock(PoolName) ->
  278:     start_sup_pool_mock([PoolName]).
  279: 
  280: cleanup_pools() ->
  281:     lists:foreach(fun({Type, Host, Tag}) -> mongoose_wpool:stop(Type, Host, Tag) end,
  282:                   mongoose_wpool:get_pools()).
  283: 
  284: start_killable_pool(Size, Tag) ->
  285:     KillingSwitch = ets:new(killing_switch, [public]),
  286:     ets:insert(KillingSwitch, {kill_worker, false}),
  287:     meck:new(killing_workers, [non_strict]),
  288:     meck:expect(killing_workers, handle_worker_creation, kill_worker_fun(KillingSwitch)),
  289:     Pools = [#{type => generic, scope => global, tag => Tag,
  290:                opts => #{workers => Size,
  291:                          enable_callbacks => true,
  292:                          callbacks => [killing_workers]},
  293:                conn_opts => #{}}],
  294:     [{ok, _Pid}] = mongoose_wpool:start_configured_pools(Pools),
  295:     PoolName = mongoose_wpool:make_pool_name(generic, global, Tag),
  296:     {PoolName, KillingSwitch}.
  297: 
  298: kill_worker_fun(KillingSwitch) ->
  299:     fun(AWorkerName) ->
  300:             case ets:lookup_element(KillingSwitch, kill_worker, 2) of
  301:                 true ->
  302:                     ct:pal("I'll be killing ~p", [AWorkerName]),
  303:                     erlang:exit(whereis(AWorkerName), kill);
  304:                 _ ->
  305:                     ok
  306:             end
  307:     end.
  308: 
  309: wait_until_pool_is_dead(PoolName) ->
  310:     %% Wait until the pool is killed due to too many restarts
  311:     Pid = whereis(PoolName),
  312:     MRef = erlang:monitor(process, Pid),
  313:     receive
  314:         {'DOWN', MRef, process, Pid, _} ->
  315:             ok
  316:     after
  317:         timer:minutes(3) ->
  318:             ct:fail("pool not stopped")
  319:     end.
  320: 
  321: set_killing_switch(KillingSwitch, Value) ->
  322:     ets:update_element(KillingSwitch, kill_worker, {2, Value}).
  323: 
  324: echo(Val) -> Val.