1 |
|
%% @doc |
2 |
|
%% This module encapsulates an asynchronous buffer, that accumulates tasks until |
3 |
|
%% either the buffer is full, or a timeout since the first task expires, and then |
4 |
|
%% flushes the buffer. It takes the buffer size, the timeout interval, and a |
5 |
|
%% callback for flushing, as its main parameters. It can also take callbacks for |
6 |
|
%% initialising data structures, or for preparing them before accumulation. |
7 |
|
%% @end |
8 |
|
-module(mongoose_batch_worker). |
9 |
|
|
10 |
|
-behaviour(gen_server). |
11 |
|
|
12 |
|
-callback prepare(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) -> |
13 |
|
{ok, mongoose_async_pools:task()} | {error, term()}. |
14 |
|
-callback flush([mongoose_async_pools:task()], mongoose_async_pools:pool_extra()) -> |
15 |
|
ok | {error, term()}. |
16 |
|
-optional_callbacks([prepare/2]). |
17 |
|
|
18 |
|
-include("mongoose_logger.hrl"). |
19 |
|
|
20 |
|
%% gen_server callbacks |
21 |
|
-export([init/1, |
22 |
|
handle_call/3, |
23 |
|
handle_cast/2, |
24 |
|
handle_info/2, |
25 |
|
terminate/2, |
26 |
|
code_change/3, |
27 |
|
format_status/2]). |
28 |
|
|
29 |
|
-record(state, { |
30 |
|
host_type :: mongooseim:host_type(), |
31 |
|
pool_id :: mongoose_async_pools:pool_id(), |
32 |
|
batch_size :: non_neg_integer(), |
33 |
|
flush_interval :: non_neg_integer(), %% milliseconds |
34 |
|
flush_interval_tref :: undefined | reference(), |
35 |
|
flush_callback = fun(_, _) -> ok end :: mongoose_async_pools:flush_callback(), |
36 |
|
prep_callback :: undefined | mongoose_async_pools:prep_callback(), |
37 |
|
flush_queue = [] :: list() | censored, % see format_status/2 for censored |
38 |
|
flush_queue_length = 0 :: non_neg_integer(), |
39 |
|
flush_extra = #{} :: map() |
40 |
|
}). |
41 |
|
-type state() :: #state{}. |
42 |
|
|
43 |
|
%% gen_server callbacks |
44 |
|
-spec init(map()) -> {ok, state()}. |
45 |
|
init(#{host_type := HostType, |
46 |
|
pool_id := PoolId, |
47 |
|
batch_size := MaxSize, |
48 |
|
flush_interval := Interval, |
49 |
|
flush_callback := FlushCallback, |
50 |
|
flush_extra := FlushExtra} = Opts) |
51 |
|
when is_function(FlushCallback, 2), is_map(FlushExtra) -> |
52 |
880 |
?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => PoolId}), |
53 |
880 |
{ok, #state{host_type = HostType, |
54 |
|
pool_id = PoolId, |
55 |
|
batch_size = MaxSize, |
56 |
|
flush_interval = Interval, |
57 |
|
flush_callback = FlushCallback, |
58 |
|
prep_callback = maps:get(prep_callback, Opts, undefined), |
59 |
|
flush_extra = FlushExtra}}. |
60 |
|
|
61 |
|
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}. |
62 |
|
handle_call(sync, _From, State = #state{host_type = HostType, pool_id = PoolId, |
63 |
|
flush_queue = [_|_]}) -> |
64 |
233 |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1), |
65 |
233 |
{reply, ok, run_flush(State)}; |
66 |
|
handle_call(sync, _From, State = #state{flush_queue = []}) -> |
67 |
3383 |
{reply, skipped, State}; |
68 |
|
handle_call(Msg, From, State) -> |
69 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
70 |
:-( |
{reply, unexpected_call, State}. |
71 |
|
|
72 |
|
-spec handle_cast(term(), state()) -> {noreply, state()}. |
73 |
|
handle_cast({task, Task}, State) -> |
74 |
:-( |
{noreply, handle_task(Task, State)}; |
75 |
|
handle_cast({task, _Key, Task}, State) -> |
76 |
2971 |
{noreply, handle_task(Task, State)}; |
77 |
|
handle_cast({broadcast, Broadcast}, State) -> |
78 |
:-( |
{noreply, handle_task(Broadcast, State)}; |
79 |
|
handle_cast(Msg, State) -> |
80 |
:-( |
?UNEXPECTED_CAST(Msg), |
81 |
:-( |
{noreply, State}. |
82 |
|
|
83 |
|
-spec handle_info(term(), state()) -> {noreply, state()}. |
84 |
|
handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef, |
85 |
|
host_type = HostType, |
86 |
|
pool_id = PoolId}) -> |
87 |
936 |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1), |
88 |
936 |
{noreply, run_flush(State)}; |
89 |
|
handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) -> |
90 |
1172 |
{noreply, State}; |
91 |
|
handle_info({timeout, _, flush}, State) -> % expired timeout, ignore |
92 |
4 |
{noreply, State}; |
93 |
|
handle_info({cancel_timer, _, _}, State) -> % timer canceled, ignore |
94 |
1172 |
{noreply, State}; |
95 |
|
handle_info(Msg, State) -> |
96 |
:-( |
?UNEXPECTED_INFO(Msg), |
97 |
:-( |
{noreply, State}. |
98 |
|
|
99 |
|
-spec terminate(term(), state()) -> term(). |
100 |
|
terminate(Reason, State) -> |
101 |
:-( |
?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})), |
102 |
:-( |
case State#state.flush_queue of |
103 |
:-( |
[] -> ok; |
104 |
|
_ -> |
105 |
:-( |
?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush, |
106 |
:-( |
reason => Reason})), |
107 |
:-( |
do_run_flush(State) |
108 |
|
end. |
109 |
|
|
110 |
|
-spec code_change(term(), state(), term()) -> {ok, state()}. |
111 |
|
code_change(_OldVsn, State, _Extra) -> |
112 |
:-( |
{ok, State}. |
113 |
|
|
114 |
|
% Don't leak the tasks to logs, can contain private information |
115 |
|
format_status(_Opt, [_PDict, State | _]) -> |
116 |
:-( |
[{data, [{"State", State#state{flush_queue = censored}}]}]. |
117 |
|
|
118 |
|
%% Batched tasks callbacks |
119 |
|
handle_task(Task, State) -> |
120 |
2971 |
State1 = maybe_schedule_flush(State), |
121 |
2971 |
State2 = maybe_prep_task(State1, Task), |
122 |
2971 |
maybe_run_flush(State2). |
123 |
|
|
124 |
|
maybe_schedule_flush(#state{flush_interval_tref = undefined, |
125 |
|
flush_queue_length = 0, |
126 |
|
flush_interval = Interval} = State) -> |
127 |
1172 |
State#state{flush_interval_tref = erlang:start_timer(Interval, self(), flush)}; |
128 |
|
maybe_schedule_flush(State) -> |
129 |
1799 |
State. |
130 |
|
|
131 |
|
maybe_prep_task(#state{prep_callback = undefined, |
132 |
|
flush_queue = Acc, |
133 |
|
flush_queue_length = Length} = State, Task) -> |
134 |
2971 |
State#state{flush_queue = [Task | Acc], |
135 |
|
flush_queue_length = Length + 1}; |
136 |
|
maybe_prep_task(#state{prep_callback = PrepCallback, |
137 |
|
flush_queue = Acc, |
138 |
|
flush_queue_length = Length, |
139 |
|
flush_extra = Extra} = State, Task) -> |
140 |
:-( |
case PrepCallback(Task, Extra) of |
141 |
|
{ok, ProcessedTask} -> |
142 |
:-( |
State#state{flush_queue = [ProcessedTask | Acc], |
143 |
|
flush_queue_length = Length + 1}; |
144 |
|
{error, Reason} -> |
145 |
:-( |
?LOG_ERROR(log_fields(State, #{what => preprocess_callback_failed, reason => Reason})), |
146 |
:-( |
State |
147 |
|
end. |
148 |
|
|
149 |
|
maybe_run_flush(#state{host_type = HostType, |
150 |
|
pool_id = PoolId, |
151 |
|
batch_size = MaxSize, |
152 |
|
flush_queue_length = Length} = State) -> |
153 |
2971 |
case Length >= MaxSize of |
154 |
2968 |
false -> State; |
155 |
|
true -> |
156 |
3 |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1), |
157 |
3 |
run_flush(State) |
158 |
|
end. |
159 |
|
|
160 |
|
run_flush(State = #state{flush_interval_tref = TRef}) -> |
161 |
1172 |
cancel_and_flush_timer(TRef), |
162 |
1172 |
?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})), |
163 |
1172 |
NewState = do_run_flush(State#state{flush_interval_tref = undefined}), |
164 |
1172 |
erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]), |
165 |
1172 |
NewState. |
166 |
|
|
167 |
|
cancel_and_flush_timer(undefined) -> |
168 |
:-( |
ok; |
169 |
|
cancel_and_flush_timer(TRef) -> |
170 |
1172 |
catch erlang:cancel_timer(TRef, [{async, true}]). |
171 |
|
|
172 |
|
do_run_flush(State = #state{flush_callback = FlushCallback, |
173 |
|
flush_queue_length = Length, |
174 |
|
flush_queue = Queue, |
175 |
|
flush_extra = Extra}) -> |
176 |
1172 |
case FlushCallback(lists:reverse(Queue), Extra#{queue_length := Length}) of |
177 |
|
ok -> |
178 |
1172 |
State#state{flush_queue = [], flush_queue_length = 0}; |
179 |
|
{error, Reason} -> |
180 |
:-( |
?LOG_ERROR(log_fields(State, |
181 |
|
#{what => batch_worker_flush_queue_failed, reason => Reason, |
182 |
:-( |
text => <<"flush_callback failed">>})), |
183 |
:-( |
State#state{flush_queue = [], flush_queue_length = 0} |
184 |
|
end. |
185 |
|
|
186 |
|
log_fields(State, LogMessage) -> |
187 |
:-( |
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id, |
188 |
|
flush_queue_length => State#state.flush_queue_length}. |