./ct_report/coverage/mongoose_aggregator_worker.COVER.html

1 %% @doc
2 %% This module encapsulates an asynchronous aggregator, that takes tasks classified by key,
3 %% to immediately preprocess and flush, but upon flushing, it submits asynchronous requests
4 %% and continues taking tasks and preprocessing them –aggregating them–, until the first
5 %% request is fulfilled, at which time it puts the next on flush.
6 %%
7 %% `request_callback' is a function that takes the new task and the extra metadata, and
8 %% essentially encapsulates `gen_server:send_request'. The returned reference is what will
9 %% be used in `handle_info' to match pending requests, so it is important that the return
10 %% value of `request_callback' is that same of the `gen_server:send_request'.
11 %% I have chosen to provide an arbitrary function callback so that it can encapsulate all
12 %% the logic that chooses the gen_server to request to, and possibly calculates other data
13 %% in the meantime.
14 %% @end
15 -module(mongoose_aggregator_worker).
16
17 -behaviour(gen_server).
18
19 -ifdef(gen_server_request_id).
20 -type request_id() :: gen_server:request_id().
21 -else.
22 -type request_id() :: term().
23 -endif.
24
25 -callback aggregate(mongoose_async_pools:task(),
26 mongoose_async_pools:task(),
27 mongoose_async_pools:pool_extra()) ->
28 {ok, mongoose_async_pools:task()} | {error, term()}.
29 -callback request(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
30 request_id() | drop.
31 -callback verify(term(), mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
32 term().
33 -optional_callbacks([verify/3]).
34
35 -include("mongoose_logger.hrl").
36
37 %% gen_server callbacks
38 -export([init/1,
39 handle_call/3,
40 handle_cast/2,
41 handle_info/2,
42 terminate/2,
43 code_change/3,
44 format_status/2]).
45
46 -type request() :: no_request_pending | {request_id(), mongoose_async_pools:task()}.
47 -record(state, {
48 host_type :: mongooseim:host_type(),
49 pool_id :: mongoose_async_pools:pool_id(),
50 async_request = no_request_pending :: request(),
51 request_callback :: mongoose_async_pools:request_callback(),
52 aggregate_callback :: mongoose_async_pools:aggregate_callback(),
53 verify_callback :: undefined | mongoose_async_pools:verify_callback(),
54 flush_elems = #{} :: map() | censored, % see format_status/2 for censored
55 flush_queue = queue:new() :: queue:queue(),
56 flush_extra = #{} :: map(),
57 total_retries = 3 :: non_neg_integer(),
58 retries_left = 3 :: non_neg_integer()
59 }).
60 -type state() :: #state{}.
61
62 %% gen_server callbacks
63 -spec init(map()) -> {ok, state()}.
64 init(#{host_type := HostType,
65 pool_id := PoolId,
66 request_callback := Requestor,
67 aggregate_callback := Aggregator,
68 flush_extra := FlushExtra} = Opts)
69 when is_function(Requestor, 2),
70 is_function(Aggregator, 3),
71 is_map(FlushExtra) ->
72 28 ?LOG_DEBUG(#{what => aggregator_worker_start, host_type => HostType, pool_id => PoolId}),
73 28 {ok, #state{host_type = HostType,
74 pool_id = PoolId,
75 request_callback = Requestor,
76 aggregate_callback = Aggregator,
77 verify_callback = maps:get(verify_callback, Opts, undefined),
78 flush_extra = FlushExtra}}.
79
80 -spec handle_call(term(), term(), state()) -> {reply, term(), state()}.
81 handle_call(sync, _From, State = #state{flush_elems = Elems}) ->
82
:-(
case maps:size(Elems) of
83
:-(
0 -> {reply, skipped, State};
84
:-(
_ -> {reply, ok, run_flush(State)}
85 end;
86 handle_call(Msg, From, State) ->
87
:-(
?UNEXPECTED_CALL(Msg, From),
88
:-(
{reply, unexpected_call, State}.
89
90 -spec handle_cast(term(), state()) -> {noreply, state()}.
91 handle_cast({task, Key, Value}, State) ->
92 667 {noreply, handle_task(Key, Value, State)};
93 handle_cast({broadcast, Broadcast}, State) ->
94
:-(
{noreply, handle_broadcast(Broadcast, State)};
95 handle_cast(Msg, State) ->
96
:-(
?UNEXPECTED_CAST(Msg),
97
:-(
{noreply, State}.
98
99 -spec handle_info(term(), state()) -> {noreply, state()}.
100 handle_info(Msg, #state{async_request = no_request_pending} = State) ->
101
:-(
?UNEXPECTED_INFO(Msg),
102
:-(
{noreply, State};
103 handle_info(Msg, #state{async_request = {AsyncRequest, ReqTask}} = State) ->
104 645 case check_response(Msg, AsyncRequest, ReqTask, State) of
105 ignore ->
106
:-(
{noreply, State};
107 next ->
108 645 {noreply, maybe_request_next(State)};
109 retry ->
110
:-(
{noreply, maybe_request_retry(ReqTask, State)}
111 end.
112
113 maybe_request_retry(ReqTask, State = #state{retries_left = 0}) ->
114
:-(
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_dropped, txt => <<"Async request dropped, no more retries">>, task => ReqTask})),
115
:-(
cancel_request_retry(State);
116 maybe_request_retry(ReqTask, State = #state{retries_left = Left}) ->
117
:-(
case make_async_request(ReqTask, State#state{async_request = no_request_pending, retries_left = Left - 1}) of
118 #state{async_request = no_request_pending} = State2 ->
119
:-(
cancel_request_retry(State2);
120 State2 ->
121
:-(
State2
122 end.
123
124 cancel_request_retry(State) ->
125
:-(
maybe_request_next(State#state{async_request = no_request_pending}).
126
127 check_response(Msg, AsyncRequest, ReqTask, State) ->
128 645 case gen_server:check_response(Msg, AsyncRequest) of
129 {error, {Reason, _Ref}} ->
130
:-(
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})),
131
:-(
retry;
132 {reply, {error, Reason}} ->
133
:-(
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})),
134
:-(
retry;
135 {reply, Reply} ->
136 645 case maybe_verify_reply(Reply, ReqTask, State) of
137 ok ->
138 645 next;
139 {error, _Reason} ->
140
:-(
retry
141 end;
142 no_reply ->
143
:-(
?UNEXPECTED_INFO(Msg),
144
:-(
ignore
145 end.
146
147 -spec terminate(term(), state()) -> term().
148 terminate(Reason, State) ->
149
:-(
?LOG_INFO(log_fields(State, #{what => aggregate_worker_stopping, reason => Reason})),
150
:-(
case maps:size(State#state.flush_elems) of
151
:-(
0 -> ok;
152 _ ->
153
:-(
?LOG_WARNING(log_fields(State, #{what => aggregate_worker_terminate_requires_flush})),
154
:-(
run_flush(State)
155 end.
156
157 -spec code_change(term(), state(), term()) -> {ok, state()}.
158 code_change(_OldVsn, State, _Extra) ->
159
:-(
{ok, State}.
160
161 % Don't leak the tasks to logs, can contain private information
162 format_status(_Opt, [_PDict, State | _]) ->
163
:-(
[{data, [{"State", State#state{flush_elems = censored}}]}].
164
165 % If we don't have any request pending, it means that it is the first task submitted,
166 % so aggregation is not needed.
167 handle_task(_, Value, #state{async_request = no_request_pending} = State) ->
168 258 make_async_request(Value, State);
169 handle_task(Key, NewValue, #state{aggregate_callback = Aggregator,
170 flush_elems = Acc,
171 flush_queue = Queue,
172 flush_extra = Extra} = State) ->
173 409 case Acc of
174 #{Key := OldValue} ->
175 22 case Aggregator(OldValue, NewValue, Extra) of
176 {ok, FinalValue} ->
177 22 State#state{flush_elems = Acc#{Key := FinalValue}};
178 {error, Reason} ->
179
:-(
?LOG_ERROR(log_fields(State, #{what => aggregation_failed, reason => Reason})),
180
:-(
State
181 end;
182 _ ->
183 % The queue is used to ensure the order in which elements are flushed,
184 % so that first requests are first flushed.
185 387 State#state{flush_elems = Acc#{Key => NewValue},
186 flush_queue = queue:in(Key, Queue)}
187 end.
188
189 % If we don't have any request pending, it means that it is the first task submitted,
190 % so aggregation is not needed.
191 handle_broadcast(Task, #state{async_request = no_request_pending} = State) ->
192
:-(
make_async_request(Task, State);
193 handle_broadcast(Task, #state{aggregate_callback = Aggregator,
194 flush_elems = Acc,
195 flush_extra = Extra} = State) ->
196
:-(
Map = fun(_Key, OldValue) ->
197
:-(
case Aggregator(OldValue, Task, Extra) of
198 {ok, FinalValue} ->
199
:-(
FinalValue;
200 {error, Reason} ->
201
:-(
?LOG_ERROR(log_fields(State, #{what => aggregation_failed, reason => Reason})),
202
:-(
OldValue
203 end
204 end,
205
:-(
State#state{flush_elems = maps:map(Map, Acc)}.
206
207 maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue, total_retries = Total} = State) ->
208 %% Reset number of retries
209 645 State1 = State#state{retries_left = Total},
210 645 case queue:out(Queue) of
211 {{value, Key}, NewQueue} ->
212 387 {Value, NewAcc} = maps:take(Key, Acc),
213 387 State2 = State1#state{flush_elems = NewAcc, flush_queue = NewQueue},
214 387 State3 = make_async_request(Value, State2),
215 387 case State3 of
216 #state{async_request = no_request_pending} ->
217
:-(
maybe_request_next(State3);
218 _ ->
219 387 State3
220 end;
221 {empty, _} ->
222 258 State1#state{async_request = no_request_pending}
223 end.
224
225 make_async_request(Request, #state{host_type = HostType, pool_id = PoolId,
226 request_callback = Requestor, flush_extra = Extra} = State) ->
227 645 RetryNumber = State#state.total_retries - State#state.retries_left,
228 645 case Requestor(Request, Extra#{retry_number => RetryNumber}) of
229 drop ->
230
:-(
State;
231 ReqId ->
232 645 mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, async_request], 1),
233 645 State#state{async_request = {ReqId, Request}}
234 end.
235
236 maybe_verify_reply(_, _, #state{verify_callback = undefined}) ->
237
:-(
ok;
238 maybe_verify_reply(Reply, ReqTask, #state{verify_callback = Verifier, flush_extra = Extra}) ->
239 645 Verifier(Reply, ReqTask, Extra).
240
241 run_flush(State) ->
242
:-(
case maybe_request_next(State) of
243 State1 = #state{async_request = no_request_pending} ->
244
:-(
State1;
245 State2 ->
246
:-(
run_flush(State2)
247 end.
248
249 log_fields(State, LogMessage) ->
250
:-(
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id}.
Line Hits Source