./ct_report/coverage/mongoose_batch_worker.COVER.html

1 -module(mongoose_batch_worker).
2
3 -behaviour(gen_server).
4
5 -include("mongoose_logger.hrl").
6
7 %% gen_server callbacks
8 -export([init/1,
9 handle_call/3,
10 handle_cast/2,
11 handle_info/2,
12 terminate/2,
13 code_change/3,
14 format_status/2]).
15
16 -record(state, {
17 host_type :: mongooseim:host_type(),
18 pool_id :: atom(),
19 batch_size :: non_neg_integer(),
20 flush_interval :: non_neg_integer(), %% milliseconds
21 flush_interval_tref :: undefined | reference(),
22 flush_callback = fun(_, _) -> ok end :: flush_callback(),
23 flush_queue = [] :: list() | censored, % see format_status/2 for censored
24 flush_queue_length = 0 :: non_neg_integer(),
25 flush_extra = #{} :: map()
26 }).
27 -type state() :: #state{}.
28 -type flush_callback() :: fun((list(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}).
29
30 -export_type([flush_callback/0]).
31
32 %% gen_server callbacks
33 -spec init({mongooseim:host_type(),
34 mongoose_async_pools:pool_id(),
35 pos_integer(),
36 pos_integer(),
37 flush_callback(),
38 mongoose_async_pools:pool_extra()}) -> {ok, state()}.
39 init({HostType, PoolId, Interval, MaxSize, FlushCallback, FlushExtra}) ->
40
:-(
?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => PoolId}),
41
:-(
{ok, #state{host_type = HostType,
42 pool_id = PoolId,
43 batch_size = MaxSize,
44 flush_interval = Interval,
45 flush_callback = FlushCallback,
46 flush_extra = FlushExtra}}.
47
48 -spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
49 handle_call(sync, _From, State = #state{host_type = HostType,
50 pool_id = PoolId,
51 flush_queue = [_|_]}) ->
52
:-(
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
53
:-(
{reply, ok, run_flush(State)};
54 handle_call(sync, _From, State = #state{flush_queue = []}) ->
55
:-(
{reply, skipped, State};
56 handle_call(Msg, From, State) ->
57
:-(
?UNEXPECTED_CALL(Msg, From),
58
:-(
{reply, unexpected_call, State}.
59
60 -spec handle_cast({task, term()} | term(), state()) -> {noreply, state()}.
61 handle_cast({task, Task}, State) ->
62
:-(
{noreply, handle_task(Task, State)};
63 handle_cast(Msg, State) ->
64
:-(
?UNEXPECTED_CAST(Msg),
65
:-(
{noreply, State}.
66
67 -spec handle_info({timeout, reference(), flush} | term(), state()) -> {noreply, state()}.
68 handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef,
69 host_type = HostType,
70 pool_id = PoolId}) ->
71
:-(
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
72
:-(
{noreply, run_flush(State)};
73 handle_info({timeout, _, flush}, State) -> % expired timeout, ignore
74
:-(
{noreply, State};
75 handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) ->
76
:-(
{noreply, State};
77 handle_info(Msg, State) ->
78
:-(
?UNEXPECTED_INFO(Msg),
79
:-(
{noreply, State}.
80
81 -spec terminate(term(), state()) -> term().
82 terminate(Reason, State) ->
83
:-(
?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})),
84
:-(
case State#state.flush_queue of
85
:-(
[] -> ok;
86 _ ->
87
:-(
?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush})),
88
:-(
do_run_flush(State)
89 end.
90
91 -spec code_change(term(), state(), term()) -> {ok, state()}.
92 code_change(_OldVsn, State, _Extra) ->
93
:-(
{ok, State}.
94
95 % Don't leak the tasks to logs, can contain private information
96 format_status(_Opt, [_PDict, State | _]) ->
97
:-(
[{data, [{"State", State#state{flush_queue = censored}}]}].
98
99 %% Batched tasks callbacks
100 handle_task(Task, State = #state{host_type = HostType,
101 pool_id = PoolId,
102 batch_size = MaxSize,
103 flush_interval = Interval,
104 flush_interval_tref = TRef0,
105 flush_queue = Acc0,
106 flush_queue_length = Length}) ->
107
:-(
TRef1 = maybe_schedule_flush(TRef0, Length, Interval),
108
:-(
State1 = State#state{flush_interval_tref = TRef1,
109 flush_queue = [Task | Acc0],
110 flush_queue_length = Length + 1
111 },
112
:-(
case Length + 1 >= MaxSize of
113
:-(
false -> State1;
114 true ->
115
:-(
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1),
116
:-(
run_flush(State1)
117 end.
118
119 maybe_schedule_flush(undefined, 0, Interval) ->
120
:-(
erlang:start_timer(Interval, self(), flush);
121 maybe_schedule_flush(TRef, _, _) ->
122
:-(
TRef.
123
124 run_flush(State = #state{flush_interval_tref = TRef}) ->
125
:-(
cancel_and_flush_timer(TRef),
126
:-(
?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})),
127
:-(
NewState = do_run_flush(State#state{flush_interval_tref = undefined}),
128
:-(
erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]),
129
:-(
NewState.
130
131 cancel_and_flush_timer(undefined) ->
132
:-(
ok;
133 cancel_and_flush_timer(TRef) ->
134
:-(
catch erlang:cancel_timer(TRef, [{async, true}]).
135
136 do_run_flush(State = #state{flush_callback = FlushCallback,
137 flush_queue_length = Length,
138 flush_queue = Queue,
139 flush_extra = Extra}) ->
140
:-(
case FlushCallback(lists:reverse(Queue), Extra#{queue_length := Length}) of
141 ok ->
142
:-(
State#state{flush_queue = [], flush_queue_length = 0};
143 {error, Reason} ->
144
:-(
?LOG_ERROR(log_fields(State,
145 #{what => batch_worker_flush_queue_failed, reason => Reason,
146
:-(
text => <<"flush_callback failed">>})),
147
:-(
State#state{flush_queue = [], flush_queue_length = 0}
148 end.
149
150 log_fields(State, LogMessage) ->
151
:-(
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id,
152 flush_queue_length => State#state.flush_queue_length}.
Line Hits Source