1: -module(batches_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: -behaviour(gen_server).
    4: 
    5: -include_lib("stdlib/include/assert.hrl").
    6: -define(mod(N), list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(N))).
    7: 
    8: all() ->
    9:     [
   10:      {group, cache},
   11:      {group, async_workers}
   12:     ].
   13: 
   14: groups() ->
   15:     [
   16:      {cache, [sequence],
   17:       [
   18:        internal_starts_another_cache,
   19:        external_does_not_start_another_cache,
   20:        internal_stop_does_stop_the_cache,
   21:        external_stop_does_nothing,
   22:        shared_cache_inserts_in_shared_table
   23:       ]},
   24:      {async_workers, [sequence],
   25:       [
   26:        filled_batch_raises_batch_metric,
   27:        unfilled_batch_raises_flush_metric,
   28:        timeouts_and_canceled_timers_do_not_need_to_log_messages,
   29:        prepare_task_works,
   30:        sync_flushes_down_everything,
   31:        sync_aggregates_down_everything,
   32:        aggregating_error_is_handled,
   33:        async_request
   34:       ]}
   35:     ].
   36: 
   37: init_per_suite(Config) ->
   38:     meck:new(telemetry, [stub_all, no_link]),
   39:     meck:new(mongoose_metrics, [stub_all, no_link]),
   40:     Config.
   41: 
   42: end_per_suite(_Config) ->
   43:     meck:unload().
   44: 
   45: init_per_group(_, Config) ->
   46:     Config.
   47: 
   48: end_per_group(_, _Config) ->
   49:     ok.
   50: 
   51: init_per_testcase(_TestCase, Config) ->
   52:     pg:start_link(),
   53:     mim_ct_sup:start_link(ejabberd_sup),
   54:     meck:new(gen_mod, [passthrough]),
   55:     Config.
   56: 
   57: end_per_testcase(_TestCase, _Config) ->
   58:     meck:unload(gen_mod),
   59:     ok.
   60: 
   61: %% Tests
   62: internal_starts_another_cache(_) ->
   63:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []),
   64:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, internal}]),
   65:     L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   66:     ?assertEqual(2, length(L)).
   67: 
   68: external_does_not_start_another_cache(_) ->
   69:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []),
   70:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, ?mod(1)}]),
   71:     L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   72:     ?assertEqual(1, length(L)).
   73: 
   74: internal_stop_does_stop_the_cache(_) ->
   75:     meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> internal end),
   76:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []),
   77:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, internal}]),
   78:     L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   79:     ct:pal("Value ~p~n", [L1]),
   80:     mongoose_user_cache:stop_cache(host_type(), ?mod(2)),
   81:     L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   82:     ct:pal("Value ~p~n", [L2]),
   83:     ?assertNotEqual(L1, L2).
   84: 
   85: external_stop_does_nothing(_) ->
   86:     meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end),
   87:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []),
   88:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, ?mod(1)}]),
   89:     L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   90:     mongoose_user_cache:stop_cache(host_type(), ?mod(2)),
   91:     L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   92:     ?assertEqual(L1, L2).
   93: 
   94: shared_cache_inserts_in_shared_table(_) ->
   95:     meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end),
   96:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []),
   97:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, ?mod(1)}]),
   98:     mongoose_user_cache:merge_entry(host_type(), ?mod(2), some_jid(), #{}),
   99:     ?assert(mongoose_user_cache:is_member(host_type(), ?mod(1), some_jid())).
  100: 
  101: filled_batch_raises_batch_metric(_) ->
  102:     Opts = #{host_type => host_type(),
  103:              pool_id => ?FUNCTION_NAME,
  104:              batch_size => 1,
  105:              flush_interval => 1000,
  106:              flush_callback => fun(_, _) -> ok end,
  107:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  108:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  109:     gen_server:cast(Pid, {task, key, ok}),
  110:     MetricName = [mongoose_async_pools, '_', batch_flushes],
  111:     async_helper:wait_until(
  112:       fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true).
  113: 
  114: unfilled_batch_raises_flush_metric(_) ->
  115:     Opts = #{host_type => host_type(),
  116:              pool_id => ?FUNCTION_NAME,
  117:              batch_size => 1000,
  118:              flush_interval => 5,
  119:              flush_callback => fun(_, _) -> ok end,
  120:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  121:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  122:     gen_server:cast(Pid, {task, key, ok}),
  123:     MetricName = [mongoose_async_pools, '_', timed_flushes],
  124:     async_helper:wait_until(
  125:       fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true).
  126: 
  127: timeouts_and_canceled_timers_do_not_need_to_log_messages(_) ->
  128:     Timeout = 10,
  129:     QueueSize = 2,
  130:     meck:new(logger, [passthrough, unstick]),
  131:     Opts = #{host_type => host_type(),
  132:              pool_id => ?FUNCTION_NAME,
  133:              batch_size => QueueSize,
  134:              flush_interval => Timeout,
  135:              flush_callback => fun(_, _) -> ok end,
  136:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  137:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  138:     [ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ],
  139:     ct:sleep(Timeout*2),
  140:     ?assertEqual(0, meck:num_calls(logger, macro_log, '_')).
  141: 
  142: prepare_task_works(_) ->
  143:     Timeout = 1000,
  144:     QueueSize = 2,
  145:     T = self(),
  146:     meck:new(logger, [passthrough, unstick]),
  147:     Opts = #{host_type => host_type(),
  148:              pool_id => ?FUNCTION_NAME,
  149:              batch_size => QueueSize,
  150:              flush_interval => Timeout,
  151:              prep_callback => fun(0, _) -> {error, bad};
  152:                                  (A, _) -> {ok, A + 1}
  153:                               end,
  154:              flush_callback => fun(Tasks, _) -> T ! {tasks, Tasks}, ok end,
  155:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  156:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  157:     [ gen_server:cast(Pid, {task, N}) || N <- lists:seq(0, QueueSize) ],
  158:     receive
  159:         {tasks, Tasks} ->
  160:             ?assertEqual([ N + 1 || N <- lists:seq(1, QueueSize) ], Tasks)
  161:     after
  162:         Timeout*2 -> ct:fail(no_answer_received)
  163:     end,
  164:     ?assert(0 < meck:num_calls(logger, macro_log, '_')).
  165: 
  166: sync_flushes_down_everything(_) ->
  167:     Opts = #{host_type => host_type(),
  168:              pool_id => ?FUNCTION_NAME,
  169:              batch_size => 5000,
  170:              flush_interval => 5000,
  171:              flush_callback => fun(_, _) -> ok end,
  172:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  173:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  174:     ?assertEqual(skipped, gen_server:call(Pid, sync)),
  175:     gen_server:cast(Pid, {task, key, ok}),
  176:     ?assertEqual(ok, gen_server:call(Pid, sync)),
  177:     MetricName = [mongoose_async_pools, '_', timed_flushes],
  178:     ?assert(0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_'])).
  179: 
  180: sync_aggregates_down_everything(_) ->
  181:     {ok, Server} = gen_server:start_link(?MODULE, [], []),
  182:     Opts = #{host_type => host_type(),
  183:              pool_id => ?FUNCTION_NAME,
  184:              request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end,
  185:              aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end,
  186:              verify_callback => fun(ok, _T, _) -> ok end,
  187:              flush_extra => #{host_type => host_type()}},
  188:     {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
  189:     ?assertEqual(skipped, gen_server:call(Pid, sync)),
  190:     [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
  191:     ?assertEqual(ok, gen_server:call(Pid, sync)),
  192:     ?assertEqual(500500, gen_server:call(Server, get_acc)).
  193: 
  194: aggregating_error_is_handled(_) ->
  195:     {ok, Server} = gen_server:start_link(?MODULE, [], []),
  196:     Opts = #{host_type => host_type(),
  197:              pool_id => ?FUNCTION_NAME,
  198:              request_callback => fun(_, _) -> gen_server:send_request(Server, return_error) end,
  199:              aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end,
  200:              verify_callback => fun(ok, _T, _) -> ok end,
  201:              flush_extra => #{host_type => host_type()}},
  202:     {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
  203:     gen_server:cast(Pid, {task, key, 0}),
  204:     async_helper:wait_until(
  205:       fun() -> gen_server:call(Server, get_acc) end, 0).
  206: 
  207: async_request(_) ->
  208:     {ok, Server} = gen_server:start_link(?MODULE, [], []),
  209:     Opts = #{host_type => host_type(),
  210:              pool_id => ?FUNCTION_NAME,
  211:              request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end,
  212:              aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end,
  213:              verify_callback => fun(ok, _T, _) -> ok end,
  214:              flush_extra => #{host_type => host_type()}},
  215:     {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
  216:     [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
  217:     async_helper:wait_until(
  218:       fun() -> gen_server:call(Server, get_acc) end, 500500).
  219: 
  220: %% helpers
  221: host_type() ->
  222:     <<"HostType">>.
  223: 
  224: some_jid() ->
  225:     jid:make_noprep(<<"alice">>, <<"localhost">>, <<>>).
  226: 
  227: init([]) ->
  228:     {ok, 0}.
  229: 
  230: handle_call(get_acc, _From, Acc) ->
  231:     {reply, Acc, Acc};
  232: handle_call(return_error, _From, Acc) ->
  233:     {reply, {error, return_error}, Acc};
  234: handle_call(N, _From, Acc) ->
  235:     {reply, ok, N + Acc}.
  236: 
  237: handle_cast(_Msg, Acc) ->
  238:     {noreply, Acc}.