1: -module(batches_SUITE).
    2: -compile([export_all, nowarn_export_all]).
    3: -behaviour(gen_server).
    4: 
    5: -include_lib("stdlib/include/assert.hrl").
    6: -define(mod(N), list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(N))).
    7: 
    8: all() ->
    9:     [
   10:      {group, cache},
   11:      {group, async_workers}
   12:     ].
   13: 
   14: groups() ->
   15:     [
   16:      {cache, [sequence],
   17:       [
   18:        internal_starts_another_cache,
   19:        external_does_not_start_another_cache,
   20:        internal_stop_does_stop_the_cache,
   21:        external_stop_does_nothing,
   22:        shared_cache_inserts_in_shared_table
   23:       ]},
   24:      {async_workers, [sequence],
   25:       [
   26:        filled_batch_raises_batch_metric,
   27:        unfilled_batch_raises_flush_metric,
   28:        timeouts_and_canceled_timers_do_not_need_to_log_messages,
   29:        prepare_task_works,
   30:        sync_flushes_down_everything,
   31:        sync_aggregates_down_everything,
   32:        aggregating_error_is_handled,
   33:        async_request
   34:       ]}
   35:     ].
   36: 
   37: init_per_suite(Config) ->
   38:     meck:new(telemetry, [stub_all, no_link]),
   39:     meck:new(mongoose_metrics, [stub_all, no_link]),
   40:     Config.
   41: 
   42: end_per_suite(_Config) ->
   43:     meck:unload().
   44: 
   45: init_per_group(_, Config) ->
   46:     Config.
   47: 
   48: end_per_group(_, _Config) ->
   49:     ok.
   50: 
   51: init_per_testcase(_TestCase, Config) ->
   52:     pg:start_link(),
   53:     mim_ct_sup:start_link(ejabberd_sup),
   54:     meck:new(gen_mod, [passthrough]),
   55:     Config.
   56: 
   57: end_per_testcase(_TestCase, _Config) ->
   58:     meck:unload(gen_mod),
   59:     ok.
   60: 
   61: cache_config() ->
   62:     config_parser_helper:default_mod_config(mod_cache_users).
   63: 
   64: cache_config(internal) ->
   65:     Def = config_parser_helper:default_mod_config(mod_cache_users),
   66:     Def#{module => internal};
   67: cache_config(Module) ->
   68:     #{module => Module}.
   69: 
   70: %% Tests
   71: internal_starts_another_cache(_) ->
   72:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()),
   73:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(internal)),
   74:     L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   75:     ?assertEqual(2, length(L)).
   76: 
   77: external_does_not_start_another_cache(_) ->
   78:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()),
   79:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(?mod(1))),
   80:     L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   81:     ?assertEqual(1, length(L)).
   82: 
   83: internal_stop_does_stop_the_cache(_) ->
   84:     meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> internal end),
   85:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()),
   86:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(internal)),
   87:     L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   88:     ct:pal("Value ~p~n", [L1]),
   89:     mongoose_user_cache:stop_cache(host_type(), ?mod(2)),
   90:     L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   91:     ct:pal("Value ~p~n", [L2]),
   92:     ?assertNotEqual(L1, L2).
   93: 
   94: external_stop_does_nothing(_) ->
   95:     meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end),
   96:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()),
   97:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(?mod(1))),
   98:     L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
   99:     mongoose_user_cache:stop_cache(host_type(), ?mod(2)),
  100:     L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)],
  101:     ?assertEqual(L1, L2).
  102: 
  103: shared_cache_inserts_in_shared_table(_) ->
  104:     meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end),
  105:     mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()),
  106:     mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(?mod(1))),
  107:     mongoose_user_cache:merge_entry(host_type(), ?mod(2), some_jid(), #{}),
  108:     ?assert(mongoose_user_cache:is_member(host_type(), ?mod(1), some_jid())).
  109: 
  110: filled_batch_raises_batch_metric(_) ->
  111:     Opts = #{host_type => host_type(),
  112:              pool_id => ?FUNCTION_NAME,
  113:              batch_size => 1,
  114:              flush_interval => 1000,
  115:              flush_callback => fun(_, _) -> ok end,
  116:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  117:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  118:     gen_server:cast(Pid, {task, key, ok}),
  119:     MetricName = [mongoose_async_pools, '_', batch_flushes],
  120:     async_helper:wait_until(
  121:       fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true).
  122: 
  123: unfilled_batch_raises_flush_metric(_) ->
  124:     Opts = #{host_type => host_type(),
  125:              pool_id => ?FUNCTION_NAME,
  126:              batch_size => 1000,
  127:              flush_interval => 5,
  128:              flush_callback => fun(_, _) -> ok end,
  129:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  130:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  131:     gen_server:cast(Pid, {task, key, ok}),
  132:     MetricName = [mongoose_async_pools, '_', timed_flushes],
  133:     async_helper:wait_until(
  134:       fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true).
  135: 
  136: timeouts_and_canceled_timers_do_not_need_to_log_messages(_) ->
  137:     Timeout = 10,
  138:     QueueSize = 2,
  139:     meck:new(logger, [passthrough, unstick]),
  140:     Opts = #{host_type => host_type(),
  141:              pool_id => ?FUNCTION_NAME,
  142:              batch_size => QueueSize,
  143:              flush_interval => Timeout,
  144:              flush_callback => fun(_, _) -> ok end,
  145:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  146:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  147:     [ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ],
  148:     ct:sleep(Timeout*2),
  149:     ?assertEqual(0, meck:num_calls(logger, macro_log, '_')).
  150: 
  151: prepare_task_works(_) ->
  152:     Timeout = 1000,
  153:     QueueSize = 2,
  154:     T = self(),
  155:     meck:new(logger, [passthrough, unstick]),
  156:     Opts = #{host_type => host_type(),
  157:              pool_id => ?FUNCTION_NAME,
  158:              batch_size => QueueSize,
  159:              flush_interval => Timeout,
  160:              prep_callback => fun(0, _) -> {error, bad};
  161:                                  (A, _) -> {ok, A + 1}
  162:                               end,
  163:              flush_callback => fun(Tasks, _) -> T ! {tasks, Tasks}, ok end,
  164:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  165:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  166:     [ gen_server:cast(Pid, {task, N}) || N <- lists:seq(0, QueueSize) ],
  167:     receive
  168:         {tasks, Tasks} ->
  169:             ?assertEqual([ N + 1 || N <- lists:seq(1, QueueSize) ], Tasks)
  170:     after
  171:         Timeout*2 -> ct:fail(no_answer_received)
  172:     end,
  173:     ?assert(0 < meck:num_calls(logger, macro_log, '_')).
  174: 
  175: sync_flushes_down_everything(_) ->
  176:     Opts = #{host_type => host_type(),
  177:              pool_id => ?FUNCTION_NAME,
  178:              batch_size => 5000,
  179:              flush_interval => 5000,
  180:              flush_callback => fun(_, _) -> ok end,
  181:              flush_extra => #{host_type => host_type(), queue_length => 0}},
  182:     {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []),
  183:     ?assertEqual(skipped, gen_server:call(Pid, sync)),
  184:     gen_server:cast(Pid, {task, key, ok}),
  185:     ?assertEqual(ok, gen_server:call(Pid, sync)),
  186:     MetricName = [mongoose_async_pools, '_', timed_flushes],
  187:     ?assert(0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_'])).
  188: 
  189: sync_aggregates_down_everything(_) ->
  190:     {ok, Server} = gen_server:start_link(?MODULE, [], []),
  191:     Opts = #{host_type => host_type(),
  192:              pool_id => ?FUNCTION_NAME,
  193:              request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end,
  194:              aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end,
  195:              verify_callback => fun(ok, _T, _) -> ok end,
  196:              flush_extra => #{host_type => host_type()}},
  197:     {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
  198:     ?assertEqual(skipped, gen_server:call(Pid, sync)),
  199:     [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
  200:     ?assertEqual(ok, gen_server:call(Pid, sync)),
  201:     ?assertEqual(500500, gen_server:call(Server, get_acc)).
  202: 
  203: aggregating_error_is_handled(_) ->
  204:     {ok, Server} = gen_server:start_link(?MODULE, [], []),
  205:     Opts = #{host_type => host_type(),
  206:              pool_id => ?FUNCTION_NAME,
  207:              request_callback => fun(_, _) -> gen_server:send_request(Server, return_error) end,
  208:              aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end,
  209:              verify_callback => fun(ok, _T, _) -> ok end,
  210:              flush_extra => #{host_type => host_type()}},
  211:     {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
  212:     gen_server:cast(Pid, {task, key, 0}),
  213:     async_helper:wait_until(
  214:       fun() -> gen_server:call(Server, get_acc) end, 0).
  215: 
  216: async_request(_) ->
  217:     {ok, Server} = gen_server:start_link(?MODULE, [], []),
  218:     Opts = #{host_type => host_type(),
  219:              pool_id => ?FUNCTION_NAME,
  220:              request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end,
  221:              aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end,
  222:              verify_callback => fun(ok, _T, _) -> ok end,
  223:              flush_extra => #{host_type => host_type()}},
  224:     {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []),
  225:     [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ],
  226:     async_helper:wait_until(
  227:       fun() -> gen_server:call(Server, get_acc) end, 500500).
  228: 
  229: %% helpers
  230: host_type() ->
  231:     <<"HostType">>.
  232: 
  233: some_jid() ->
  234:     jid:make_noprep(<<"alice">>, <<"localhost">>, <<>>).
  235: 
  236: init([]) ->
  237:     {ok, 0}.
  238: 
  239: handle_call(get_acc, _From, Acc) ->
  240:     {reply, Acc, Acc};
  241: handle_call(return_error, _From, Acc) ->
  242:     {reply, {error, return_error}, Acc};
  243: handle_call(N, _From, Acc) ->
  244:     {reply, ok, N + Acc}.
  245: 
  246: handle_cast(_Msg, Acc) ->
  247:     {noreply, Acc}.