./ct_report/coverage/mongoose_batch_worker.COVER.html

1 %% @doc
2 %% This module encapsulates an asynchronous buffer, that accumulates tasks until
3 %% either the buffer is full, or a timeout since the first task expires, and then
4 %% flushes the buffer. It takes the buffer size, the timeout interval, and a
5 %% callback for flushing, as its main parameters. It can also take callbacks for
6 %% initialising data structures, or for preparing them before accumulation.
7 %% @end
8 -module(mongoose_batch_worker).
9
10 -behaviour(gen_server).
11
12 -callback prepare(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
13 {ok, mongoose_async_pools:task()} | {error, term()}.
14 -callback flush([mongoose_async_pools:task()], mongoose_async_pools:pool_extra()) ->
15 ok | {error, term()}.
16 -optional_callbacks([prepare/2]).
17
18 -include("mongoose_logger.hrl").
19
20 %% gen_server callbacks
21 -export([init/1,
22 handle_call/3,
23 handle_cast/2,
24 handle_info/2,
25 terminate/2,
26 code_change/3,
27 format_status/2]).
28
29 -record(state, {
30 host_type :: mongooseim:host_type(),
31 pool_id :: mongoose_async_pools:pool_id(),
32 batch_size :: non_neg_integer(),
33 flush_interval :: non_neg_integer(), %% milliseconds
34 flush_interval_tref :: undefined | reference(),
35 flush_callback = fun(_, _) -> ok end :: mongoose_async_pools:flush_callback(),
36 prep_callback :: undefined | mongoose_async_pools:prep_callback(),
37 flush_queue = [] :: list() | censored, % see format_status/2 for censored
38 flush_queue_length = 0 :: non_neg_integer(),
39 flush_extra = #{} :: map()
40 }).
41 -type state() :: #state{}.
42
43 %% gen_server callbacks
44 -spec init(map()) -> {ok, state()}.
45 init(#{host_type := HostType,
46 pool_id := PoolId,
47 batch_size := MaxSize,
48 flush_interval := Interval,
49 flush_callback := FlushCallback,
50 flush_extra := FlushExtra} = Opts)
51 when is_function(FlushCallback, 2), is_map(FlushExtra) ->
52 736 ?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => PoolId}),
53 736 {ok, #state{host_type = HostType,
54 pool_id = PoolId,
55 batch_size = MaxSize,
56 flush_interval = Interval,
57 flush_callback = FlushCallback,
58 prep_callback = maps:get(prep_callback, Opts, undefined),
59 flush_extra = FlushExtra}}.
60
61 -spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}.
62 handle_call(sync, _From, State = #state{host_type = HostType, pool_id = PoolId,
63 flush_queue = [_|_]}) ->
64 108 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
65 108 {reply, ok, run_flush(State)};
66 handle_call(sync, _From, State = #state{flush_queue = []}) ->
67 1604 {reply, skipped, State};
68 handle_call(Msg, From, State) ->
69
:-(
?UNEXPECTED_CALL(Msg, From),
70
:-(
{reply, unexpected_call, State}.
71
72 -spec handle_cast(term(), state()) -> {noreply, state()}.
73 handle_cast({task, Task}, State) ->
74
:-(
{noreply, handle_task(Task, State)};
75 handle_cast({task, _Key, Task}, State) ->
76 1861 {noreply, handle_task(Task, State)};
77 handle_cast(Msg, State) ->
78
:-(
?UNEXPECTED_CAST(Msg),
79
:-(
{noreply, State}.
80
81 -spec handle_info(term(), state()) -> {noreply, state()}.
82 handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef,
83 host_type = HostType,
84 pool_id = PoolId}) ->
85 666 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1),
86 666 {noreply, run_flush(State)};
87 handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) ->
88 776 {noreply, State};
89 handle_info({timeout, _, flush}, State) -> % expired timeout, ignore
90 2 {noreply, State};
91 handle_info({cancel_timer, _, _}, State) -> % timer canceled, ignore
92 776 {noreply, State};
93 handle_info(Msg, State) ->
94
:-(
?UNEXPECTED_INFO(Msg),
95
:-(
{noreply, State}.
96
97 -spec terminate(term(), state()) -> term().
98 terminate(Reason, State) ->
99
:-(
?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})),
100
:-(
case State#state.flush_queue of
101
:-(
[] -> ok;
102 _ ->
103
:-(
?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush})),
104
:-(
do_run_flush(State)
105 end.
106
107 -spec code_change(term(), state(), term()) -> {ok, state()}.
108 code_change(_OldVsn, State, _Extra) ->
109
:-(
{ok, State}.
110
111 % Don't leak the tasks to logs, can contain private information
112 format_status(_Opt, [_PDict, State | _]) ->
113
:-(
[{data, [{"State", State#state{flush_queue = censored}}]}].
114
115 %% Batched tasks callbacks
116 handle_task(Task, State) ->
117 1861 State1 = maybe_schedule_flush(State),
118 1861 State2 = maybe_prep_task(State1, Task),
119 1861 maybe_run_flush(State2).
120
121 maybe_schedule_flush(#state{flush_interval_tref = undefined,
122 flush_queue_length = 0,
123 flush_interval = Interval} = State) ->
124 777 State#state{flush_interval_tref = erlang:start_timer(Interval, self(), flush)};
125 maybe_schedule_flush(State) ->
126 1084 State.
127
128 maybe_prep_task(#state{prep_callback = undefined,
129 flush_queue = Acc,
130 flush_queue_length = Length} = State, Task) ->
131 1861 State#state{flush_queue = [Task | Acc],
132 flush_queue_length = Length + 1};
133 maybe_prep_task(#state{prep_callback = PrepCallback,
134 flush_queue = Acc,
135 flush_queue_length = Length,
136 flush_extra = Extra} = State, Task) ->
137
:-(
case PrepCallback(Task, Extra) of
138 {ok, ProcessedTask} ->
139
:-(
State#state{flush_queue = [ProcessedTask | Acc],
140 flush_queue_length = Length + 1};
141 {error, Reason} ->
142
:-(
?LOG_ERROR(log_fields(State, #{what => preprocess_callback_failed, reason => Reason})),
143
:-(
State
144 end.
145
146 maybe_run_flush(#state{host_type = HostType,
147 pool_id = PoolId,
148 batch_size = MaxSize,
149 flush_queue_length = Length} = State) ->
150 1861 case Length >= MaxSize of
151 1859 false -> State;
152 true ->
153 2 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1),
154 2 run_flush(State)
155 end.
156
157 run_flush(State = #state{flush_interval_tref = TRef}) ->
158 776 cancel_and_flush_timer(TRef),
159 776 ?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})),
160 776 NewState = do_run_flush(State#state{flush_interval_tref = undefined}),
161 776 erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]),
162 776 NewState.
163
164 cancel_and_flush_timer(undefined) ->
165
:-(
ok;
166 cancel_and_flush_timer(TRef) ->
167 776 catch erlang:cancel_timer(TRef, [{async, true}]).
168
169 do_run_flush(State = #state{flush_callback = FlushCallback,
170 flush_queue_length = Length,
171 flush_queue = Queue,
172 flush_extra = Extra}) ->
173 776 case FlushCallback(lists:reverse(Queue), Extra#{queue_length := Length}) of
174 ok ->
175 776 State#state{flush_queue = [], flush_queue_length = 0};
176 {error, Reason} ->
177
:-(
?LOG_ERROR(log_fields(State,
178 #{what => batch_worker_flush_queue_failed, reason => Reason,
179
:-(
text => <<"flush_callback failed">>})),
180
:-(
State#state{flush_queue = [], flush_queue_length = 0}
181 end.
182
183 log_fields(State, LogMessage) ->
184
:-(
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id,
185 flush_queue_length => State#state.flush_queue_length}.
Line Hits Source