1 |
|
%% @doc |
2 |
|
%% This module encapsulates an asynchronous aggregator, that takes tasks classified by key, |
3 |
|
%% to immediately preprocess and flush, but upon flushing, it submits asynchronous requests |
4 |
|
%% and continues taking tasks and preprocessing them –aggregating them–, until the first |
5 |
|
%% request is fulfilled, at which time it puts the next on flush. |
6 |
|
%% |
7 |
|
%% `request_callback' is a function that takes the new task and the extra metadata, and |
8 |
|
%% essentially encapsulates `gen_server:send_request'. The returned reference is what will |
9 |
|
%% be used in `handle_info' to match pending requests, so it is important that the return |
10 |
|
%% value of `request_callback' is that same of the `gen_server:send_request'. |
11 |
|
%% I have chosen to provide an arbitrary function callback so that it can encapsulate all |
12 |
|
%% the logic that chooses the gen_server to request to, and possibly calculates other data |
13 |
|
%% in the meantime. |
14 |
|
%% @end |
15 |
|
-module(mongoose_aggregator_worker). |
16 |
|
|
17 |
|
-behaviour(gen_server). |
18 |
|
|
19 |
|
-ifdef(gen_server_request_id). |
20 |
|
-type request_id() :: gen_server:request_id(). |
21 |
|
-else. |
22 |
|
-type request_id() :: term(). |
23 |
|
-endif. |
24 |
|
|
25 |
|
-callback aggregate(mongoose_async_pools:task(), |
26 |
|
mongoose_async_pools:task(), |
27 |
|
mongoose_async_pools:pool_extra()) -> |
28 |
|
{ok, mongoose_async_pools:task()} | {error, term()}. |
29 |
|
-callback request(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) -> |
30 |
|
request_id() | drop. |
31 |
|
-callback verify(term(), mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) -> |
32 |
|
term(). |
33 |
|
-optional_callbacks([verify/3]). |
34 |
|
|
35 |
|
-include("mongoose_logger.hrl"). |
36 |
|
|
37 |
|
%% gen_server callbacks |
38 |
|
-export([init/1, |
39 |
|
handle_call/3, |
40 |
|
handle_cast/2, |
41 |
|
handle_info/2, |
42 |
|
terminate/2, |
43 |
|
code_change/3, |
44 |
|
format_status/2]). |
45 |
|
|
46 |
|
-type request() :: no_request_pending | {request_id(), mongoose_async_pools:task()}. |
47 |
|
-record(state, { |
48 |
|
host_type :: mongooseim:host_type(), |
49 |
|
pool_id :: mongoose_async_pools:pool_id(), |
50 |
|
async_request = no_request_pending :: request(), |
51 |
|
request_callback :: mongoose_async_pools:request_callback(), |
52 |
|
aggregate_callback :: mongoose_async_pools:aggregate_callback(), |
53 |
|
verify_callback :: undefined | mongoose_async_pools:verify_callback(), |
54 |
|
flush_elems = #{} :: map() | censored, % see format_status/2 for censored |
55 |
|
flush_queue = queue:new() :: queue:queue(), |
56 |
|
flush_extra = #{} :: map(), |
57 |
|
total_retries = 3 :: non_neg_integer(), |
58 |
|
retries_left = 3 :: non_neg_integer() |
59 |
|
}). |
60 |
|
-type state() :: #state{}. |
61 |
|
|
62 |
|
%% gen_server callbacks |
63 |
|
-spec init(map()) -> {ok, state()}. |
64 |
|
init(#{host_type := HostType, |
65 |
|
pool_id := PoolId, |
66 |
|
request_callback := Requestor, |
67 |
|
aggregate_callback := Aggregator, |
68 |
|
flush_extra := FlushExtra} = Opts) |
69 |
|
when is_function(Requestor, 2), |
70 |
|
is_function(Aggregator, 3), |
71 |
|
is_map(FlushExtra) -> |
72 |
30 |
?LOG_DEBUG(#{what => aggregator_worker_start, host_type => HostType, pool_id => PoolId}), |
73 |
30 |
{ok, #state{host_type = HostType, |
74 |
|
pool_id = PoolId, |
75 |
|
request_callback = Requestor, |
76 |
|
aggregate_callback = Aggregator, |
77 |
|
verify_callback = maps:get(verify_callback, Opts, undefined), |
78 |
|
flush_extra = FlushExtra}}. |
79 |
|
|
80 |
|
-spec handle_call(term(), term(), state()) -> {reply, term(), state()}. |
81 |
|
handle_call(sync, _From, State = #state{flush_elems = Elems}) -> |
82 |
:-( |
case maps:size(Elems) of |
83 |
:-( |
0 -> {reply, skipped, State}; |
84 |
:-( |
_ -> {reply, ok, run_flush(State)} |
85 |
|
end; |
86 |
|
handle_call(Msg, From, State) -> |
87 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
88 |
:-( |
{reply, unexpected_call, State}. |
89 |
|
|
90 |
|
-spec handle_cast(term(), state()) -> {noreply, state()}. |
91 |
|
handle_cast({task, Key, Value}, State) -> |
92 |
665 |
{noreply, handle_task(Key, Value, State)}; |
93 |
|
handle_cast({broadcast, Broadcast}, State) -> |
94 |
:-( |
{noreply, handle_broadcast(Broadcast, State)}; |
95 |
|
handle_cast(Msg, State) -> |
96 |
:-( |
?UNEXPECTED_CAST(Msg), |
97 |
:-( |
{noreply, State}. |
98 |
|
|
99 |
|
-spec handle_info(term(), state()) -> {noreply, state()}. |
100 |
|
handle_info(Msg, #state{async_request = no_request_pending} = State) -> |
101 |
:-( |
?UNEXPECTED_INFO(Msg), |
102 |
:-( |
{noreply, State}; |
103 |
|
handle_info(Msg, #state{async_request = {AsyncRequest, ReqTask}} = State) -> |
104 |
650 |
case check_response(Msg, AsyncRequest, ReqTask, State) of |
105 |
|
ignore -> |
106 |
:-( |
{noreply, State}; |
107 |
|
next -> |
108 |
650 |
{noreply, maybe_request_next(State)}; |
109 |
|
retry -> |
110 |
:-( |
{noreply, maybe_request_retry(ReqTask, State)} |
111 |
|
end. |
112 |
|
|
113 |
|
maybe_request_retry(ReqTask, State = #state{retries_left = 0}) -> |
114 |
:-( |
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_dropped, txt => <<"Async request dropped, no more retries">>, task => ReqTask})), |
115 |
:-( |
cancel_request_retry(State); |
116 |
|
maybe_request_retry(ReqTask, State = #state{retries_left = Left}) -> |
117 |
:-( |
case make_async_request(ReqTask, State#state{async_request = no_request_pending, retries_left = Left - 1}) of |
118 |
|
#state{async_request = no_request_pending} = State2 -> |
119 |
:-( |
cancel_request_retry(State2); |
120 |
|
State2 -> |
121 |
:-( |
State2 |
122 |
|
end. |
123 |
|
|
124 |
|
cancel_request_retry(State) -> |
125 |
:-( |
maybe_request_next(State#state{async_request = no_request_pending}). |
126 |
|
|
127 |
|
check_response(Msg, AsyncRequest, ReqTask, State) -> |
128 |
650 |
case gen_server:check_response(Msg, AsyncRequest) of |
129 |
|
{error, {Reason, _Ref}} -> |
130 |
:-( |
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})), |
131 |
:-( |
retry; |
132 |
|
{reply, {error, Reason}} -> |
133 |
:-( |
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})), |
134 |
:-( |
retry; |
135 |
|
{reply, Reply} -> |
136 |
650 |
case maybe_verify_reply(Reply, ReqTask, State) of |
137 |
|
ok -> |
138 |
650 |
next; |
139 |
|
{error, _Reason} -> |
140 |
:-( |
retry |
141 |
|
end; |
142 |
|
no_reply -> |
143 |
:-( |
?UNEXPECTED_INFO(Msg), |
144 |
:-( |
ignore |
145 |
|
end. |
146 |
|
|
147 |
|
-spec terminate(term(), state()) -> term(). |
148 |
|
terminate(Reason, State) -> |
149 |
:-( |
?LOG_INFO(log_fields(State, #{what => aggregate_worker_stopping, reason => Reason})), |
150 |
:-( |
case maps:size(State#state.flush_elems) of |
151 |
:-( |
0 -> ok; |
152 |
|
_ -> |
153 |
:-( |
?LOG_WARNING(log_fields(State, #{what => aggregate_worker_terminate_requires_flush})), |
154 |
:-( |
run_flush(State) |
155 |
|
end. |
156 |
|
|
157 |
|
-spec code_change(term(), state(), term()) -> {ok, state()}. |
158 |
|
code_change(_OldVsn, State, _Extra) -> |
159 |
:-( |
{ok, State}. |
160 |
|
|
161 |
|
% Don't leak the tasks to logs, can contain private information |
162 |
|
format_status(_Opt, [_PDict, State | _]) -> |
163 |
:-( |
[{data, [{"State", State#state{flush_elems = censored}}]}]. |
164 |
|
|
165 |
|
% If we don't have any request pending, it means that it is the first task submitted, |
166 |
|
% so aggregation is not needed. |
167 |
|
handle_task(_, Value, #state{async_request = no_request_pending} = State) -> |
168 |
288 |
make_async_request(Value, State); |
169 |
|
handle_task(Key, NewValue, #state{aggregate_callback = Aggregator, |
170 |
|
flush_elems = Acc, |
171 |
|
flush_queue = Queue, |
172 |
|
flush_extra = Extra} = State) -> |
173 |
377 |
case Acc of |
174 |
|
#{Key := OldValue} -> |
175 |
15 |
case Aggregator(OldValue, NewValue, Extra) of |
176 |
|
{ok, FinalValue} -> |
177 |
15 |
State#state{flush_elems = Acc#{Key := FinalValue}}; |
178 |
|
{error, Reason} -> |
179 |
:-( |
?LOG_ERROR(log_fields(State, #{what => aggregation_failed, reason => Reason})), |
180 |
:-( |
State |
181 |
|
end; |
182 |
|
_ -> |
183 |
|
% The queue is used to ensure the order in which elements are flushed, |
184 |
|
% so that first requests are first flushed. |
185 |
362 |
State#state{flush_elems = Acc#{Key => NewValue}, |
186 |
|
flush_queue = queue:in(Key, Queue)} |
187 |
|
end. |
188 |
|
|
189 |
|
% If we don't have any request pending, it means that it is the first task submitted, |
190 |
|
% so aggregation is not needed. |
191 |
|
handle_broadcast(Task, #state{async_request = no_request_pending} = State) -> |
192 |
:-( |
make_async_request(Task, State); |
193 |
|
handle_broadcast(Task, #state{aggregate_callback = Aggregator, |
194 |
|
flush_elems = Acc, |
195 |
|
flush_extra = Extra} = State) -> |
196 |
:-( |
Map = fun(_Key, OldValue) -> |
197 |
:-( |
case Aggregator(OldValue, Task, Extra) of |
198 |
|
{ok, FinalValue} -> |
199 |
:-( |
FinalValue; |
200 |
|
{error, Reason} -> |
201 |
:-( |
?LOG_ERROR(log_fields(State, #{what => aggregation_failed, reason => Reason})), |
202 |
:-( |
OldValue |
203 |
|
end |
204 |
|
end, |
205 |
:-( |
State#state{flush_elems = maps:map(Map, Acc)}. |
206 |
|
|
207 |
|
maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue, total_retries = Total} = State) -> |
208 |
|
%% Reset number of retries |
209 |
650 |
State1 = State#state{retries_left = Total}, |
210 |
650 |
case queue:out(Queue) of |
211 |
|
{{value, Key}, NewQueue} -> |
212 |
362 |
{Value, NewAcc} = maps:take(Key, Acc), |
213 |
362 |
State2 = State1#state{flush_elems = NewAcc, flush_queue = NewQueue}, |
214 |
362 |
State3 = make_async_request(Value, State2), |
215 |
362 |
case State3 of |
216 |
|
#state{async_request = no_request_pending} -> |
217 |
:-( |
maybe_request_next(State3); |
218 |
|
_ -> |
219 |
362 |
State3 |
220 |
|
end; |
221 |
|
{empty, _} -> |
222 |
288 |
State1#state{async_request = no_request_pending} |
223 |
|
end. |
224 |
|
|
225 |
|
make_async_request(Request, #state{host_type = HostType, pool_id = PoolId, |
226 |
|
request_callback = Requestor, flush_extra = Extra} = State) -> |
227 |
650 |
RetryNumber = State#state.total_retries - State#state.retries_left, |
228 |
650 |
case Requestor(Request, Extra#{retry_number => RetryNumber}) of |
229 |
|
drop -> |
230 |
:-( |
State; |
231 |
|
ReqId -> |
232 |
650 |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, async_request], 1), |
233 |
650 |
State#state{async_request = {ReqId, Request}} |
234 |
|
end. |
235 |
|
|
236 |
|
maybe_verify_reply(_, _, #state{verify_callback = undefined}) -> |
237 |
:-( |
ok; |
238 |
|
maybe_verify_reply(Reply, ReqTask, #state{verify_callback = Verifier, flush_extra = Extra}) -> |
239 |
650 |
Verifier(Reply, ReqTask, Extra). |
240 |
|
|
241 |
|
run_flush(State) -> |
242 |
:-( |
case maybe_request_next(State) of |
243 |
|
State1 = #state{async_request = no_request_pending} -> |
244 |
:-( |
State1; |
245 |
|
State2 -> |
246 |
:-( |
run_flush(State2) |
247 |
|
end. |
248 |
|
|
249 |
|
log_fields(State, LogMessage) -> |
250 |
:-( |
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id}. |