./ct_report/coverage/mongoose_batch_worker.COVER.html

1 %% @doc
2 %% This module encapsulates an asynchronous buffer, that accumulates tasks until
3 %% either the buffer is full, or a timeout since the first task expires, and then
4 %% flushes the buffer. It takes the buffer size, the timeout interval, and a
5 %% callback for flushing, as its main parameters. It can also take callbacks for
6 %% initialising data structures, or for preparing them before accumulation.
7 %% @end
8 -module(mongoose_batch_worker).
9
10 -behaviour(gen_server).
11
12 -callback prepare(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
13 {ok, mongoose_async_pools:task()} | {error, term()}.
14 -callback flush([mongoose_async_pools:task()], mongoose_async_pools:pool_extra()) ->
15 ok | {error, term()}.
16 -optional_callbacks([prepare/2]).
17
18 -include("mongoose_logger.hrl").
19
20 %% gen_server callbacks
21 -export([init/1,
22 handle_call/3,
23 handle_cast/2,
24 handle_info/2,
25 terminate/2,
26 code_change/3,
27 format_status/1]).
28
29 -record(state, {
30 host_type :: mongooseim:host_type(),
31 pool_id :: mongoose_async_pools:pool_id(),
32 batch_size :: non_neg_integer(),
33 flush_interval :: non_neg_integer(), %% milliseconds
34 flush_interval_tref :: undefined | reference(),
35 flush_callback = fun(_, _) -> ok end :: mongoose_async_pools:flush_callback(),
36 prep_callback :: undefined | mongoose_async_pools:prep_callback(),
37 flush_queue = [] :: list() | censored, % see format_status/1 for censored
38 flush_queue_length = 0 :: non_neg_integer(),
39 flush_extra = #{} :: map()
40 }).
41 -type state() :: #state{}.
42
43 %% gen_server callbacks
44 -spec init(map()) -> {ok, state()}.
45 init(#{host_type := HostType,
46 pool_id := PoolId,
47 batch_size := MaxSize,
48 flush_interval := Interval,
49 flush_callback := FlushCallback,
50 flush_extra := FlushExtra} = Opts)
51 when is_function(FlushCallback, 2), is_map(FlushExtra) ->
52 880 ?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => PoolId}),
53 880 {ok, #state{host_type = HostType,
54 pool_id = PoolId,
55 batch_size = MaxSize,
56 flush_interval = Interval,
57 flush_callback = FlushCallback,
58 prep_callback = maps:get(prep_callback, Opts, undefined),
59 flush_extra = FlushExtra}}.
60
61 -spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
62 handle_call(sync, _From, State = #state{host_type = HostType, pool_id = PoolId,
63 flush_queue = [_|_]}) ->
64 260 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
65 260 {reply, ok, run_flush(State)};
66 handle_call(sync, _From, State = #state{flush_queue = []}) ->
67 3388 {reply, skipped, State};
68 handle_call(Msg, From, State) ->
69
:-(
?UNEXPECTED_CALL(Msg, From),
70
:-(
{reply, unexpected_call, State}.
71
72 -spec handle_cast(term(), state()) -> {noreply, state()}.
73 handle_cast({task, Task}, State) ->
74
:-(
{noreply, handle_task(Task, State)};
75 handle_cast({task, _Key, Task}, State) ->
76 2981 {noreply, handle_task(Task, State)};
77 handle_cast({broadcast, Broadcast}, State) ->
78
:-(
{noreply, handle_task(Broadcast, State)};
79 handle_cast(Msg, State) ->
80
:-(
?UNEXPECTED_CAST(Msg),
81
:-(
{noreply, State}.
82
83 -spec handle_info(term(), state()) -> {noreply, state()}.
84 handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef,
85 host_type = HostType,
86 pool_id = PoolId}) ->
87 1149 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
88 1149 {noreply, run_flush(State)};
89 handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) ->
90 1409 {noreply, State};
91 handle_info({timeout, _, flush}, State) -> % expired timeout, ignore
92 5 {noreply, State};
93 handle_info({cancel_timer, _, _}, State) -> % timer canceled, ignore
94 1409 {noreply, State};
95 handle_info(Msg, State) ->
96
:-(
?UNEXPECTED_INFO(Msg),
97
:-(
{noreply, State}.
98
99 -spec terminate(term(), state()) -> term().
100 terminate(Reason, State) ->
101
:-(
?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})),
102
:-(
case State#state.flush_queue of
103
:-(
[] -> ok;
104 _ ->
105
:-(
?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush,
106
:-(
reason => Reason})),
107
:-(
do_run_flush(State)
108 end.
109
110 -spec code_change(term(), state(), term()) -> {ok, state()}.
111 code_change(_OldVsn, State, _Extra) ->
112
:-(
{ok, State}.
113
114 % Don't leak the tasks to logs, can contain private information
115 -spec format_status(gen_server:format_status()) -> gen_server:format_status().
116 format_status(Status = #{state := State}) ->
117
:-(
Status#{state => State#state{flush_queue = censored}}.
118
119 %% Batched tasks callbacks
120 handle_task(Task, State) ->
121 2981 State1 = maybe_schedule_flush(State),
122 2981 State2 = maybe_prep_task(State1, Task),
123 2981 maybe_run_flush(State2).
124
125 maybe_schedule_flush(#state{flush_interval_tref = undefined,
126 flush_queue_length = 0,
127 flush_interval = Interval} = State) ->
128 1409 State#state{flush_interval_tref = erlang:start_timer(Interval, self(), flush)};
129 maybe_schedule_flush(State) ->
130 1572 State.
131
132 maybe_prep_task(#state{prep_callback = undefined,
133 flush_queue = Acc,
134 flush_queue_length = Length} = State, Task) ->
135 2981 State#state{flush_queue = [Task | Acc],
136 flush_queue_length = Length + 1};
137 maybe_prep_task(#state{prep_callback = PrepCallback,
138 flush_queue = Acc,
139 flush_queue_length = Length,
140 flush_extra = Extra} = State, Task) ->
141
:-(
case PrepCallback(Task, Extra) of
142 {ok, ProcessedTask} ->
143
:-(
State#state{flush_queue = [ProcessedTask | Acc],
144 flush_queue_length = Length + 1};
145 {error, Reason} ->
146
:-(
?LOG_ERROR(log_fields(State, #{what => preprocess_callback_failed, reason => Reason})),
147
:-(
State
148 end.
149
150 maybe_run_flush(#state{host_type = HostType,
151 pool_id = PoolId,
152 batch_size = MaxSize,
153 flush_queue_length = Length} = State) ->
154 2981 case Length >= MaxSize of
155 2981 false -> State;
156 true ->
157
:-(
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1),
158
:-(
run_flush(State)
159 end.
160
161 run_flush(State = #state{flush_interval_tref = TRef}) ->
162 1409 cancel_and_flush_timer(TRef),
163 1409 ?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})),
164 1409 NewState = do_run_flush(State#state{flush_interval_tref = undefined}),
165 1409 erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]),
166 1409 NewState.
167
168 cancel_and_flush_timer(undefined) ->
169
:-(
ok;
170 cancel_and_flush_timer(TRef) ->
171 1409 catch erlang:cancel_timer(TRef, [{async, true}]).
172
173 do_run_flush(State = #state{flush_callback = FlushCallback,
174 flush_queue_length = Length,
175 flush_queue = Queue,
176 flush_extra = Extra}) ->
177 1409 case FlushCallback(lists:reverse(Queue), Extra#{queue_length := Length}) of
178 ok ->
179 1409 State#state{flush_queue = [], flush_queue_length = 0};
180 {error, Reason} ->
181
:-(
?LOG_ERROR(log_fields(State,
182 #{what => batch_worker_flush_queue_failed, reason => Reason,
183
:-(
text => <<"flush_callback failed">>})),
184
:-(
State#state{flush_queue = [], flush_queue_length = 0}
185 end.
186
187 log_fields(State, LogMessage) ->
188
:-(
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id,
189 flush_queue_length => State#state.flush_queue_length}.
Line Hits Source