1 |
|
-module(mongoose_batch_worker). |
2 |
|
|
3 |
|
-behaviour(gen_server). |
4 |
|
|
5 |
|
-include("mongoose_logger.hrl"). |
6 |
|
|
7 |
|
%% gen_server callbacks |
8 |
|
-export([init/1, |
9 |
|
handle_call/3, |
10 |
|
handle_cast/2, |
11 |
|
handle_info/2, |
12 |
|
terminate/2, |
13 |
|
code_change/3, |
14 |
|
format_status/2]). |
15 |
|
|
16 |
|
-record(state, { |
17 |
|
host_type :: mongooseim:host_type(), |
18 |
|
pool_id :: atom(), |
19 |
|
batch_size :: non_neg_integer(), |
20 |
|
flush_interval :: non_neg_integer(), %% milliseconds |
21 |
|
flush_interval_tref :: undefined | reference(), |
22 |
|
flush_callback = fun(_, _) -> ok end :: flush_callback(), |
23 |
|
flush_queue = [] :: list() | censored, % see format_status/2 for censored |
24 |
|
flush_queue_length = 0 :: non_neg_integer(), |
25 |
|
flush_extra = #{} :: map() |
26 |
|
}). |
27 |
|
-type state() :: #state{}. |
28 |
|
-type flush_callback() :: fun((list(), mongoose_async_pools:pool_extra()) -> ok | {error, term()}). |
29 |
|
|
30 |
|
-export_type([flush_callback/0]). |
31 |
|
|
32 |
|
%% gen_server callbacks |
33 |
|
-spec init({mongooseim:host_type(), |
34 |
|
mongoose_async_pools:pool_id(), |
35 |
|
pos_integer(), |
36 |
|
pos_integer(), |
37 |
|
flush_callback(), |
38 |
|
mongoose_async_pools:pool_extra()}) -> {ok, state()}. |
39 |
|
init({HostType, PoolId, Interval, MaxSize, FlushCallback, FlushExtra}) -> |
40 |
:-( |
?LOG_DEBUG(#{what => batch_worker_start, host_type => HostType, pool_id => PoolId}), |
41 |
:-( |
{ok, #state{host_type = HostType, |
42 |
|
pool_id = PoolId, |
43 |
|
batch_size = MaxSize, |
44 |
|
flush_interval = Interval, |
45 |
|
flush_callback = FlushCallback, |
46 |
|
flush_extra = FlushExtra}}. |
47 |
|
|
48 |
|
-spec handle_call(term(), {pid(), term()}, state()) -> {reply, term(), state()}. |
49 |
|
handle_call(sync, _From, State = #state{host_type = HostType, |
50 |
|
pool_id = PoolId, |
51 |
|
flush_queue = [_|_]}) -> |
52 |
:-( |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1), |
53 |
:-( |
{reply, ok, run_flush(State)}; |
54 |
|
handle_call(sync, _From, State = #state{flush_queue = []}) -> |
55 |
:-( |
{reply, skipped, State}; |
56 |
|
handle_call(Msg, From, State) -> |
57 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
58 |
:-( |
{reply, unexpected_call, State}. |
59 |
|
|
60 |
|
-spec handle_cast({task, term()} | term(), state()) -> {noreply, state()}. |
61 |
|
handle_cast({task, Task}, State) -> |
62 |
:-( |
{noreply, handle_task(Task, State)}; |
63 |
|
handle_cast(Msg, State) -> |
64 |
:-( |
?UNEXPECTED_CAST(Msg), |
65 |
:-( |
{noreply, State}. |
66 |
|
|
67 |
|
-spec handle_info({timeout, reference(), flush} | term(), state()) -> {noreply, state()}. |
68 |
|
handle_info({timeout, TimerRef, flush}, State = #state{flush_interval_tref = TimerRef, |
69 |
|
host_type = HostType, |
70 |
|
pool_id = PoolId}) -> |
71 |
:-( |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, timed_flushes], 1), |
72 |
:-( |
{noreply, run_flush(State)}; |
73 |
|
handle_info({timeout, _, flush}, State) -> % expired timeout, ignore |
74 |
:-( |
{noreply, State}; |
75 |
|
handle_info({garbage_collect, asynchronous_gc_triggered, true}, State) -> |
76 |
:-( |
{noreply, State}; |
77 |
|
handle_info(Msg, State) -> |
78 |
:-( |
?UNEXPECTED_INFO(Msg), |
79 |
:-( |
{noreply, State}. |
80 |
|
|
81 |
|
-spec terminate(term(), state()) -> term(). |
82 |
|
terminate(Reason, State) -> |
83 |
:-( |
?LOG_INFO(log_fields(State, #{what => batch_worker_stopping, reason => Reason})), |
84 |
:-( |
case State#state.flush_queue of |
85 |
:-( |
[] -> ok; |
86 |
|
_ -> |
87 |
:-( |
?LOG_WARNING(log_fields(State, #{what => batch_worker_terminate_requires_flush})), |
88 |
:-( |
do_run_flush(State) |
89 |
|
end. |
90 |
|
|
91 |
|
-spec code_change(term(), state(), term()) -> {ok, state()}. |
92 |
|
code_change(_OldVsn, State, _Extra) -> |
93 |
:-( |
{ok, State}. |
94 |
|
|
95 |
|
% Don't leak the tasks to logs, can contain private information |
96 |
|
format_status(_Opt, [_PDict, State | _]) -> |
97 |
:-( |
[{data, [{"State", State#state{flush_queue = censored}}]}]. |
98 |
|
|
99 |
|
%% Batched tasks callbacks |
100 |
|
handle_task(Task, State = #state{host_type = HostType, |
101 |
|
pool_id = PoolId, |
102 |
|
batch_size = MaxSize, |
103 |
|
flush_interval = Interval, |
104 |
|
flush_interval_tref = TRef0, |
105 |
|
flush_queue = Acc0, |
106 |
|
flush_queue_length = Length}) -> |
107 |
:-( |
TRef1 = maybe_schedule_flush(TRef0, Length, Interval), |
108 |
:-( |
State1 = State#state{flush_interval_tref = TRef1, |
109 |
|
flush_queue = [Task | Acc0], |
110 |
|
flush_queue_length = Length + 1 |
111 |
|
}, |
112 |
:-( |
case Length + 1 >= MaxSize of |
113 |
:-( |
false -> State1; |
114 |
|
true -> |
115 |
:-( |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, batch_flushes], 1), |
116 |
:-( |
run_flush(State1) |
117 |
|
end. |
118 |
|
|
119 |
|
maybe_schedule_flush(undefined, 0, Interval) -> |
120 |
:-( |
erlang:start_timer(Interval, self(), flush); |
121 |
|
maybe_schedule_flush(TRef, _, _) -> |
122 |
:-( |
TRef. |
123 |
|
|
124 |
|
run_flush(State = #state{flush_interval_tref = TRef}) -> |
125 |
:-( |
cancel_and_flush_timer(TRef), |
126 |
:-( |
?LOG_DEBUG(log_fields(State, #{what => batch_worker_flush})), |
127 |
:-( |
NewState = do_run_flush(State#state{flush_interval_tref = undefined}), |
128 |
:-( |
erlang:garbage_collect(self(), [{async, asynchronous_gc_triggered}, {type, major}]), |
129 |
:-( |
NewState. |
130 |
|
|
131 |
|
cancel_and_flush_timer(undefined) -> |
132 |
:-( |
ok; |
133 |
|
cancel_and_flush_timer(TRef) -> |
134 |
:-( |
catch erlang:cancel_timer(TRef, [{async, true}]). |
135 |
|
|
136 |
|
do_run_flush(State = #state{flush_callback = FlushCallback, |
137 |
|
flush_queue_length = Length, |
138 |
|
flush_queue = Queue, |
139 |
|
flush_extra = Extra}) -> |
140 |
:-( |
case FlushCallback(lists:reverse(Queue), Extra#{queue_length := Length}) of |
141 |
|
ok -> |
142 |
:-( |
State#state{flush_queue = [], flush_queue_length = 0}; |
143 |
|
{error, Reason} -> |
144 |
:-( |
?LOG_ERROR(log_fields(State, |
145 |
|
#{what => batch_worker_flush_queue_failed, reason => Reason, |
146 |
:-( |
text => <<"flush_callback failed">>})), |
147 |
:-( |
State#state{flush_queue = [], flush_queue_length = 0} |
148 |
|
end. |
149 |
|
|
150 |
|
log_fields(State, LogMessage) -> |
151 |
:-( |
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id, |
152 |
|
flush_queue_length => State#state.flush_queue_length}. |