./ct_report/coverage/mongoose_batch_worker.COVER.html

1 %% @doc
2 %% This module encapsulates an asynchronous buffer, that accumulates tasks until
3 %% either the buffer is full, or a timeout since the first task expires, and then
4 %% flushes the buffer. It takes the buffer size, the timeout interval, and a
5 %% callback for flushing, as its main parameters. It can also take callbacks for
6 %% initialising data structures, or for preparing them before accumulation.
7 %% @end
8 -module(mongoose_batch_worker).
9
10 -behaviour(gen_server).
11
12 -callback prepare(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
13 {ok, mongoose_async_pools:task()} | {error, term()}.
14 -callback flush([mongoose_async_pools:task()], mongoose_async_pools:pool_extra()) ->
15 ok | {error, term()}.
16 -optional_callbacks([prepare/2]).
17
18 -include("mongoose_logger.hrl").
19
20 %% gen_server callbacks
21 -export([init/1,
22 handle_call/3,
23 handle_cast/2,
24 handle_info/2,
25 terminate/2,
26 code_change/3,
27 format_status/2]).
28
29 -record(state, {
30 host_type :: mongooseim:host_type(),
31 pool_id :: mongoose_async_pools:pool_id(),
32 batch_size :: non_neg_integer(),
33 flush_interval :: non_neg_integer(), %% milliseconds
34 flush_interval_tref :: undefined | reference(),
35 flush_callback = fun(_, _) -> ok end :: mongoose_async_pools:flush_callback(),
36 prep_callback :: undefined | mongoose_async_pools:prep_callback(),
37 flush_queue = [] :: list() | censored, % see format_status/2 for censored
38 flush_queue_length = 0 :: non_neg_integer(),
39 flush_extra = #{} :: map()
40 }).
41 -type state() :: #state{}.
42
43 %% gen_server callbacks
44 -spec init(map()) -> {ok, state()}.
45 init(#{host_type := HostType,
46 pool_id := PoolId,
47 batch_size := MaxSize,
48 flush_interval := Interval,
49 flush_callback := FlushCallback,
50 flush_extra := FlushExtra} = Opts)
51 when is_function(FlushCallback, 2), is_map(FlushExtra) ->
52 928 ?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => PoolId}),
53 928 {ok, #state{host_type = HostType,
54 pool_id = PoolId,
55 batch_size = MaxSize,
56 flush_interval = Interval,
57 flush_callback = FlushCallback,
58 prep_callback = maps:get(prep_callback, Opts, undefined),
59 flush_extra = FlushExtra}}.
60
61 -spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
62 handle_call(sync, _From, State = #state{host_type = HostType, pool_id = PoolId,
63 flush_queue = [_|_]}) ->
64 273 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
65 273 {reply, ok, run_flush(State)};
66 handle_call(sync, _From, State = #state{flush_queue = []}) ->
67 3343 {reply, skipped, State};
68 handle_call(Msg, From, State) ->
69
:-(
?UNEXPECTED_CALL(Msg, From),
70
:-(
{reply, unexpected_call, State}.
71
72 -spec handle_cast(term(), state()) -> {noreply, state()}.
73 handle_cast({task, Task}, State) ->
74
:-(
{noreply, handle_task(Task, State)};
75 handle_cast({task, _Key, Task}, State) ->
76 2922 {noreply, handle_task(Task, State)};
77 handle_cast({broadcast, Broadcast}, State) ->
78
:-(
{noreply, handle_task(Broadcast, State)};
79 handle_cast(Msg, State) ->
80
:-(
?UNEXPECTED_CAST(Msg),
81
:-(
{noreply, State}.
82
83 -spec handle_info(term(), state()) -> {noreply, state()}.
84 handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef,
85 host_type = HostType,
86 pool_id = PoolId}) ->
87 1070 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
88 1070 {noreply, run_flush(State)};
89 handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) ->
90 1347 {noreply, State};
91 handle_info({timeout, _, flush}, State) -> % expired timeout, ignore
92 4 {noreply, State};
93 handle_info({cancel_timer, _, _}, State) -> % timer canceled, ignore
94 1347 {noreply, State};
95 handle_info(Msg, State) ->
96
:-(
?UNEXPECTED_INFO(Msg),
97
:-(
{noreply, State}.
98
99 -spec terminate(term(), state()) -> term().
100 terminate(Reason, State) ->
101
:-(
?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})),
102
:-(
case State#state.flush_queue of
103
:-(
[] -> ok;
104 _ ->
105
:-(
?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush,
106
:-(
reason => Reason})),
107
:-(
do_run_flush(State)
108 end.
109
110 -spec code_change(term(), state(), term()) -> {ok, state()}.
111 code_change(_OldVsn, State, _Extra) ->
112
:-(
{ok, State}.
113
114 % Don't leak the tasks to logs, can contain private information
115 format_status(_Opt, [_PDict, State | _]) ->
116
:-(
[{data, [{"State", State#state{flush_queue = censored}}]}].
117
118 %% Batched tasks callbacks
119 handle_task(Task, State) ->
120 2922 State1 = maybe_schedule_flush(State),
121 2922 State2 = maybe_prep_task(State1, Task),
122 2922 maybe_run_flush(State2).
123
124 maybe_schedule_flush(#state{flush_interval_tref = undefined,
125 flush_queue_length = 0,
126 flush_interval = Interval} = State) ->
127 1347 State#state{flush_interval_tref = erlang:start_timer(Interval, self(), flush)};
128 maybe_schedule_flush(State) ->
129 1575 State.
130
131 maybe_prep_task(#state{prep_callback = undefined,
132 flush_queue = Acc,
133 flush_queue_length = Length} = State, Task) ->
134 2922 State#state{flush_queue = [Task | Acc],
135 flush_queue_length = Length + 1};
136 maybe_prep_task(#state{prep_callback = PrepCallback,
137 flush_queue = Acc,
138 flush_queue_length = Length,
139 flush_extra = Extra} = State, Task) ->
140
:-(
case PrepCallback(Task, Extra) of
141 {ok, ProcessedTask} ->
142
:-(
State#state{flush_queue = [ProcessedTask | Acc],
143 flush_queue_length = Length + 1};
144 {error, Reason} ->
145
:-(
?LOG_ERROR(log_fields(State, #{what => preprocess_callback_failed, reason => Reason})),
146
:-(
State
147 end.
148
149 maybe_run_flush(#state{host_type = HostType,
150 pool_id = PoolId,
151 batch_size = MaxSize,
152 flush_queue_length = Length} = State) ->
153 2922 case Length >= MaxSize of
154 2918 false -> State;
155 true ->
156 4 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1),
157 4 run_flush(State)
158 end.
159
160 run_flush(State = #state{flush_interval_tref = TRef}) ->
161 1347 cancel_and_flush_timer(TRef),
162 1347 ?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})),
163 1347 NewState = do_run_flush(State#state{flush_interval_tref = undefined}),
164 1347 erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]),
165 1347 NewState.
166
167 cancel_and_flush_timer(undefined) ->
168
:-(
ok;
169 cancel_and_flush_timer(TRef) ->
170 1347 catch erlang:cancel_timer(TRef, [{async, true}]).
171
172 do_run_flush(State = #state{flush_callback = FlushCallback,
173 flush_queue_length = Length,
174 flush_queue = Queue,
175 flush_extra = Extra}) ->
176 1347 case FlushCallback(lists:reverse(Queue), Extra#{queue_length := Length}) of
177 ok ->
178 1347 State#state{flush_queue = [], flush_queue_length = 0};
179 {error, Reason} ->
180
:-(
?LOG_ERROR(log_fields(State,
181 #{what => batch_worker_flush_queue_failed, reason => Reason,
182
:-(
text => <<"flush_callback failed">>})),
183
:-(
State#state{flush_queue = [], flush_queue_length = 0}
184 end.
185
186 log_fields(State, LogMessage) ->
187
:-(
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id,
188 flush_queue_length => State#state.flush_queue_length}.
Line Hits Source