1: -module(batches_SUITE). 2: -compile([export_all, nowarn_export_all]). 3: -behaviour(gen_server). 4: 5: -include_lib("stdlib/include/assert.hrl"). 6: -define(mod(N), list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(N))). 7: 8: all() -> 9: [ 10: {group, cache}, 11: {group, async_workers} 12: ]. 13: 14: groups() -> 15: [ 16: {cache, [sequence], 17: [ 18: internal_starts_another_cache, 19: external_does_not_start_another_cache, 20: internal_stop_does_stop_the_cache, 21: external_stop_does_nothing, 22: shared_cache_inserts_in_shared_table 23: ]}, 24: {async_workers, [sequence], 25: [ 26: filled_batch_raises_batch_metric, 27: unfilled_batch_raises_flush_metric, 28: timeouts_and_canceled_timers_do_not_need_to_log_messages, 29: prepare_task_works, 30: sync_flushes_down_everything, 31: sync_aggregates_down_everything, 32: aggregating_error_is_handled, 33: async_request 34: ]} 35: ]. 36: 37: init_per_suite(Config) -> 38: meck:new(telemetry, [stub_all, no_link]), 39: meck:new(mongoose_metrics, [stub_all, no_link]), 40: Config. 41: 42: end_per_suite(_Config) -> 43: meck:unload(). 44: 45: init_per_group(_, Config) -> 46: Config. 47: 48: end_per_group(_, _Config) -> 49: ok. 50: 51: init_per_testcase(_TestCase, Config) -> 52: pg:start_link(), 53: mim_ct_sup:start_link(ejabberd_sup), 54: meck:new(gen_mod, [passthrough]), 55: Config. 56: 57: end_per_testcase(_TestCase, _Config) -> 58: meck:unload(gen_mod), 59: ok. 60: 61: cache_config() -> 62: config_parser_helper:default_mod_config(mod_cache_users). 63: 64: cache_config(internal) -> 65: Def = config_parser_helper:default_mod_config(mod_cache_users), 66: Def#{module => internal}; 67: cache_config(Module) -> 68: #{module => Module}. 69: 70: %% Tests 71: internal_starts_another_cache(_) -> 72: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()), 73: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(internal)), 74: L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 75: ?assertEqual(2, length(L)). 76: 77: external_does_not_start_another_cache(_) -> 78: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()), 79: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(?mod(1))), 80: L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 81: ?assertEqual(1, length(L)). 82: 83: internal_stop_does_stop_the_cache(_) -> 84: meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> internal end), 85: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()), 86: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(internal)), 87: L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 88: ct:pal("Value ~p~n", [L1]), 89: mongoose_user_cache:stop_cache(host_type(), ?mod(2)), 90: L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 91: ct:pal("Value ~p~n", [L2]), 92: ?assertNotEqual(L1, L2). 93: 94: external_stop_does_nothing(_) -> 95: meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end), 96: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()), 97: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(?mod(1))), 98: L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 99: mongoose_user_cache:stop_cache(host_type(), ?mod(2)), 100: L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 101: ?assertEqual(L1, L2). 102: 103: shared_cache_inserts_in_shared_table(_) -> 104: meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end), 105: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), cache_config()), 106: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), cache_config(?mod(1))), 107: mongoose_user_cache:merge_entry(host_type(), ?mod(2), some_jid(), #{}), 108: ?assert(mongoose_user_cache:is_member(host_type(), ?mod(1), some_jid())). 109: 110: filled_batch_raises_batch_metric(_) -> 111: Opts = #{host_type => host_type(), 112: pool_id => ?FUNCTION_NAME, 113: batch_size => 1, 114: flush_interval => 1000, 115: flush_callback => fun(_, _) -> ok end, 116: flush_extra => #{host_type => host_type(), queue_length => 0}}, 117: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 118: gen_server:cast(Pid, {task, key, ok}), 119: MetricName = [mongoose_async_pools, '_', batch_flushes], 120: async_helper:wait_until( 121: fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true). 122: 123: unfilled_batch_raises_flush_metric(_) -> 124: Opts = #{host_type => host_type(), 125: pool_id => ?FUNCTION_NAME, 126: batch_size => 1000, 127: flush_interval => 5, 128: flush_callback => fun(_, _) -> ok end, 129: flush_extra => #{host_type => host_type(), queue_length => 0}}, 130: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 131: gen_server:cast(Pid, {task, key, ok}), 132: MetricName = [mongoose_async_pools, '_', timed_flushes], 133: async_helper:wait_until( 134: fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true). 135: 136: timeouts_and_canceled_timers_do_not_need_to_log_messages(_) -> 137: Timeout = 10, 138: QueueSize = 2, 139: meck:new(logger, [passthrough, unstick]), 140: Opts = #{host_type => host_type(), 141: pool_id => ?FUNCTION_NAME, 142: batch_size => QueueSize, 143: flush_interval => Timeout, 144: flush_callback => fun(_, _) -> ok end, 145: flush_extra => #{host_type => host_type(), queue_length => 0}}, 146: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 147: [ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ], 148: ct:sleep(Timeout*2), 149: ?assertEqual(0, meck:num_calls(logger, macro_log, '_')). 150: 151: prepare_task_works(_) -> 152: Timeout = 1000, 153: QueueSize = 2, 154: T = self(), 155: meck:new(logger, [passthrough, unstick]), 156: Opts = #{host_type => host_type(), 157: pool_id => ?FUNCTION_NAME, 158: batch_size => QueueSize, 159: flush_interval => Timeout, 160: prep_callback => fun(0, _) -> {error, bad}; 161: (A, _) -> {ok, A + 1} 162: end, 163: flush_callback => fun(Tasks, _) -> T ! {tasks, Tasks}, ok end, 164: flush_extra => #{host_type => host_type(), queue_length => 0}}, 165: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 166: [ gen_server:cast(Pid, {task, N}) || N <- lists:seq(0, QueueSize) ], 167: receive 168: {tasks, Tasks} -> 169: ?assertEqual([ N + 1 || N <- lists:seq(1, QueueSize) ], Tasks) 170: after 171: Timeout*2 -> ct:fail(no_answer_received) 172: end, 173: ?assert(0 < meck:num_calls(logger, macro_log, '_')). 174: 175: sync_flushes_down_everything(_) -> 176: Opts = #{host_type => host_type(), 177: pool_id => ?FUNCTION_NAME, 178: batch_size => 5000, 179: flush_interval => 5000, 180: flush_callback => fun(_, _) -> ok end, 181: flush_extra => #{host_type => host_type(), queue_length => 0}}, 182: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 183: ?assertEqual(skipped, gen_server:call(Pid, sync)), 184: gen_server:cast(Pid, {task, key, ok}), 185: ?assertEqual(ok, gen_server:call(Pid, sync)), 186: MetricName = [mongoose_async_pools, '_', timed_flushes], 187: ?assert(0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_'])). 188: 189: sync_aggregates_down_everything(_) -> 190: {ok, Server} = gen_server:start_link(?MODULE, [], []), 191: Opts = #{host_type => host_type(), 192: pool_id => ?FUNCTION_NAME, 193: request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end, 194: aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end, 195: verify_callback => fun(ok, _T, _) -> ok end, 196: flush_extra => #{host_type => host_type()}}, 197: {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), 198: ?assertEqual(skipped, gen_server:call(Pid, sync)), 199: [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], 200: ?assertEqual(ok, gen_server:call(Pid, sync)), 201: ?assertEqual(500500, gen_server:call(Server, get_acc)). 202: 203: aggregating_error_is_handled(_) -> 204: {ok, Server} = gen_server:start_link(?MODULE, [], []), 205: Opts = #{host_type => host_type(), 206: pool_id => ?FUNCTION_NAME, 207: request_callback => fun(_, _) -> gen_server:send_request(Server, return_error) end, 208: aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end, 209: verify_callback => fun(ok, _T, _) -> ok end, 210: flush_extra => #{host_type => host_type()}}, 211: {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), 212: gen_server:cast(Pid, {task, key, 0}), 213: async_helper:wait_until( 214: fun() -> gen_server:call(Server, get_acc) end, 0). 215: 216: async_request(_) -> 217: {ok, Server} = gen_server:start_link(?MODULE, [], []), 218: Opts = #{host_type => host_type(), 219: pool_id => ?FUNCTION_NAME, 220: request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end, 221: aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end, 222: verify_callback => fun(ok, _T, _) -> ok end, 223: flush_extra => #{host_type => host_type()}}, 224: {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), 225: [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], 226: async_helper:wait_until( 227: fun() -> gen_server:call(Server, get_acc) end, 500500). 228: 229: %% helpers 230: host_type() -> 231: <<"HostType">>. 232: 233: some_jid() -> 234: jid:make_noprep(<<"alice">>, <<"localhost">>, <<>>). 235: 236: init([]) -> 237: {ok, 0}. 238: 239: handle_call(get_acc, _From, Acc) -> 240: {reply, Acc, Acc}; 241: handle_call(return_error, _From, Acc) -> 242: {reply, {error, return_error}, Acc}; 243: handle_call(N, _From, Acc) -> 244: {reply, ok, N + Acc}. 245: 246: handle_cast(_Msg, Acc) -> 247: {noreply, Acc}.