1: -module(batches_SUITE). 2: -compile([export_all, nowarn_export_all]). 3: -behaviour(gen_server). 4: 5: -include_lib("stdlib/include/assert.hrl"). 6: -define(mod(N), list_to_atom(atom_to_list(?FUNCTION_NAME) ++ integer_to_list(N))). 7: 8: all() -> 9: [ 10: {group, cache}, 11: {group, async_workers} 12: ]. 13: 14: groups() -> 15: [ 16: {cache, [sequence], 17: [ 18: internal_starts_another_cache, 19: external_does_not_start_another_cache, 20: internal_stop_does_stop_the_cache, 21: external_stop_does_nothing, 22: shared_cache_inserts_in_shared_table 23: ]}, 24: {async_workers, [sequence], 25: [ 26: filled_batch_raises_batch_metric, 27: unfilled_batch_raises_flush_metric, 28: timeouts_and_canceled_timers_do_not_need_to_log_messages, 29: prepare_task_works, 30: sync_flushes_down_everything, 31: sync_aggregates_down_everything, 32: aggregating_error_is_handled, 33: async_request 34: ]} 35: ]. 36: 37: init_per_suite(Config) -> 38: meck:new(telemetry, [stub_all, no_link]), 39: meck:new(mongoose_metrics, [stub_all, no_link]), 40: Config. 41: 42: end_per_suite(_Config) -> 43: meck:unload(). 44: 45: init_per_group(_, Config) -> 46: Config. 47: 48: end_per_group(_, _Config) -> 49: ok. 50: 51: init_per_testcase(_TestCase, Config) -> 52: pg:start_link(), 53: mim_ct_sup:start_link(ejabberd_sup), 54: meck:new(gen_mod, [passthrough]), 55: Config. 56: 57: end_per_testcase(_TestCase, _Config) -> 58: meck:unload(gen_mod), 59: ok. 60: 61: %% Tests 62: internal_starts_another_cache(_) -> 63: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []), 64: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, internal}]), 65: L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 66: ?assertEqual(2, length(L)). 67: 68: external_does_not_start_another_cache(_) -> 69: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []), 70: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, ?mod(1)}]), 71: L = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 72: ?assertEqual(1, length(L)). 73: 74: internal_stop_does_stop_the_cache(_) -> 75: meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> internal end), 76: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []), 77: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, internal}]), 78: L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 79: ct:pal("Value ~p~n", [L1]), 80: mongoose_user_cache:stop_cache(host_type(), ?mod(2)), 81: L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 82: ct:pal("Value ~p~n", [L2]), 83: ?assertNotEqual(L1, L2). 84: 85: external_stop_does_nothing(_) -> 86: meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end), 87: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []), 88: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, ?mod(1)}]), 89: L1 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 90: mongoose_user_cache:stop_cache(host_type(), ?mod(2)), 91: L2 = [S || S = {_Name, _Pid, worker, [segmented_cache]} <- supervisor:which_children(ejabberd_sup)], 92: ?assertEqual(L1, L2). 93: 94: shared_cache_inserts_in_shared_table(_) -> 95: meck:expect(gen_mod, get_module_opt, fun(_, _, module, _) -> ?mod(1) end), 96: mongoose_user_cache:start_new_cache(host_type(), ?mod(1), []), 97: mongoose_user_cache:start_new_cache(host_type(), ?mod(2), [{module, ?mod(1)}]), 98: mongoose_user_cache:merge_entry(host_type(), ?mod(2), some_jid(), #{}), 99: ?assert(mongoose_user_cache:is_member(host_type(), ?mod(1), some_jid())). 100: 101: filled_batch_raises_batch_metric(_) -> 102: Opts = #{host_type => host_type(), 103: pool_id => ?FUNCTION_NAME, 104: batch_size => 1, 105: flush_interval => 1000, 106: flush_callback => fun(_, _) -> ok end, 107: flush_extra => #{host_type => host_type(), queue_length => 0}}, 108: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 109: gen_server:cast(Pid, {task, key, ok}), 110: MetricName = [mongoose_async_pools, '_', batch_flushes], 111: async_helper:wait_until( 112: fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true). 113: 114: unfilled_batch_raises_flush_metric(_) -> 115: Opts = #{host_type => host_type(), 116: pool_id => ?FUNCTION_NAME, 117: batch_size => 1000, 118: flush_interval => 5, 119: flush_callback => fun(_, _) -> ok end, 120: flush_extra => #{host_type => host_type(), queue_length => 0}}, 121: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 122: gen_server:cast(Pid, {task, key, ok}), 123: MetricName = [mongoose_async_pools, '_', timed_flushes], 124: async_helper:wait_until( 125: fun() -> 0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_']) end, true). 126: 127: timeouts_and_canceled_timers_do_not_need_to_log_messages(_) -> 128: Timeout = 10, 129: QueueSize = 2, 130: meck:new(logger, [passthrough, unstick]), 131: Opts = #{host_type => host_type(), 132: pool_id => ?FUNCTION_NAME, 133: batch_size => QueueSize, 134: flush_interval => Timeout, 135: flush_callback => fun(_, _) -> ok end, 136: flush_extra => #{host_type => host_type(), queue_length => 0}}, 137: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 138: [ gen_server:cast(Pid, {task, ok}) || _ <- lists:seq(1, QueueSize) ], 139: ct:sleep(Timeout*2), 140: ?assertEqual(0, meck:num_calls(logger, macro_log, '_')). 141: 142: prepare_task_works(_) -> 143: Timeout = 1000, 144: QueueSize = 2, 145: T = self(), 146: meck:new(logger, [passthrough, unstick]), 147: Opts = #{host_type => host_type(), 148: pool_id => ?FUNCTION_NAME, 149: batch_size => QueueSize, 150: flush_interval => Timeout, 151: prep_callback => fun(0, _) -> {error, bad}; 152: (A, _) -> {ok, A + 1} 153: end, 154: flush_callback => fun(Tasks, _) -> T ! {tasks, Tasks}, ok end, 155: flush_extra => #{host_type => host_type(), queue_length => 0}}, 156: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 157: [ gen_server:cast(Pid, {task, N}) || N <- lists:seq(0, QueueSize) ], 158: receive 159: {tasks, Tasks} -> 160: ?assertEqual([ N + 1 || N <- lists:seq(1, QueueSize) ], Tasks) 161: after 162: Timeout*2 -> ct:fail(no_answer_received) 163: end, 164: ?assert(0 < meck:num_calls(logger, macro_log, '_')). 165: 166: sync_flushes_down_everything(_) -> 167: Opts = #{host_type => host_type(), 168: pool_id => ?FUNCTION_NAME, 169: batch_size => 5000, 170: flush_interval => 5000, 171: flush_callback => fun(_, _) -> ok end, 172: flush_extra => #{host_type => host_type(), queue_length => 0}}, 173: {ok, Pid} = gen_server:start_link(mongoose_batch_worker, Opts, []), 174: ?assertEqual(skipped, gen_server:call(Pid, sync)), 175: gen_server:cast(Pid, {task, key, ok}), 176: ?assertEqual(ok, gen_server:call(Pid, sync)), 177: MetricName = [mongoose_async_pools, '_', timed_flushes], 178: ?assert(0 < meck:num_calls(mongoose_metrics, update, ['_', MetricName, '_'])). 179: 180: sync_aggregates_down_everything(_) -> 181: {ok, Server} = gen_server:start_link(?MODULE, [], []), 182: Opts = #{host_type => host_type(), 183: pool_id => ?FUNCTION_NAME, 184: request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end, 185: aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end, 186: verify_callback => fun(ok, _T, _) -> ok end, 187: flush_extra => #{host_type => host_type()}}, 188: {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), 189: ?assertEqual(skipped, gen_server:call(Pid, sync)), 190: [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], 191: ?assertEqual(ok, gen_server:call(Pid, sync)), 192: ?assertEqual(500500, gen_server:call(Server, get_acc)). 193: 194: aggregating_error_is_handled(_) -> 195: {ok, Server} = gen_server:start_link(?MODULE, [], []), 196: Opts = #{host_type => host_type(), 197: pool_id => ?FUNCTION_NAME, 198: request_callback => fun(_, _) -> gen_server:send_request(Server, return_error) end, 199: aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end, 200: verify_callback => fun(ok, _T, _) -> ok end, 201: flush_extra => #{host_type => host_type()}}, 202: {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), 203: gen_server:cast(Pid, {task, key, 0}), 204: async_helper:wait_until( 205: fun() -> gen_server:call(Server, get_acc) end, 0). 206: 207: async_request(_) -> 208: {ok, Server} = gen_server:start_link(?MODULE, [], []), 209: Opts = #{host_type => host_type(), 210: pool_id => ?FUNCTION_NAME, 211: request_callback => fun(Task, _) -> timer:sleep(1), gen_server:send_request(Server, Task) end, 212: aggregate_callback => fun(T1, T2, _) -> {ok, T1 + T2} end, 213: verify_callback => fun(ok, _T, _) -> ok end, 214: flush_extra => #{host_type => host_type()}}, 215: {ok, Pid} = gen_server:start_link(mongoose_aggregator_worker, Opts, []), 216: [ gen_server:cast(Pid, {task, key, N}) || N <- lists:seq(1, 1000) ], 217: async_helper:wait_until( 218: fun() -> gen_server:call(Server, get_acc) end, 500500). 219: 220: %% helpers 221: host_type() -> 222: <<"HostType">>. 223: 224: some_jid() -> 225: jid:make_noprep(<<"alice">>, <<"localhost">>, <<>>). 226: 227: init([]) -> 228: {ok, 0}. 229: 230: handle_call(get_acc, _From, Acc) -> 231: {reply, Acc, Acc}; 232: handle_call(return_error, _From, Acc) -> 233: {reply, {error, return_error}, Acc}; 234: handle_call(N, _From, Acc) -> 235: {reply, ok, N + Acc}. 236: 237: handle_cast(_Msg, Acc) -> 238: {noreply, Acc}.