1 |
|
%% @doc |
2 |
|
%% This module encapsulates an asynchronous aggregator, that takes tasks classified by key, |
3 |
|
%% to immediately preprocess and flush, but upon flushing, it submits asynchronous requests |
4 |
|
%% and continues taking tasks and preprocessing them –aggregating them–, until the first |
5 |
|
%% request is fulfilled, at which time it puts the next on flush. |
6 |
|
%% |
7 |
|
%% `request_callback' is a function that takes the new task and the extra metadata, and |
8 |
|
%% essentially encapsulates `gen_server:send_request'. The returned reference is what will |
9 |
|
%% be used in `handle_info' to match pending requests, so it is important that the return |
10 |
|
%% value of `request_callback' is that same of the `gen_server:send_request'. |
11 |
|
%% I have chosen to provide an arbitrary function callback so that it can encapsulate all |
12 |
|
%% the logic that chooses the gen_server to request to, and possibly calculates other data |
13 |
|
%% in the meantime. |
14 |
|
%% @end |
15 |
|
-module(mongoose_aggregator_worker). |
16 |
|
|
17 |
|
-behaviour(gen_server). |
18 |
|
|
19 |
|
-callback aggregate(mongoose_async_pools:task(), |
20 |
|
mongoose_async_pools:task(), |
21 |
|
mongoose_async_pools:pool_extra()) -> |
22 |
|
{ok, mongoose_async_pools:task()} | {error, term()}. |
23 |
|
-callback request(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) -> |
24 |
|
reference(). |
25 |
|
-callback verify(term(), mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) -> |
26 |
|
term(). |
27 |
|
-optional_callbacks([verify/3]). |
28 |
|
|
29 |
|
-include("mongoose_logger.hrl"). |
30 |
|
|
31 |
|
%% gen_server callbacks |
32 |
|
-export([init/1, |
33 |
|
handle_call/3, |
34 |
|
handle_cast/2, |
35 |
|
handle_info/2, |
36 |
|
terminate/2, |
37 |
|
code_change/3, |
38 |
|
format_status/2]). |
39 |
|
|
40 |
|
-type request() :: no_request_pending | {reference(), mongoose_async_pools:task()}. |
41 |
|
-record(state, { |
42 |
|
host_type :: mongooseim:host_type(), |
43 |
|
pool_id :: mongoose_async_pools:pool_id(), |
44 |
|
async_request = no_request_pending :: request(), |
45 |
|
request_callback :: mongoose_async_pools:request_callback(), |
46 |
|
aggregate_callback :: mongoose_async_pools:aggregate_callback(), |
47 |
|
verify_callback :: undefined | mongoose_async_pools:verify_callback(), |
48 |
|
flush_elems = #{} :: map() | censored, % see format_status/2 for censored |
49 |
|
flush_queue = queue:new() :: queue:queue(), |
50 |
|
flush_extra = #{} :: map() |
51 |
|
}). |
52 |
|
-type state() :: #state{}. |
53 |
|
|
54 |
|
%% gen_server callbacks |
55 |
|
-spec init(map()) -> {ok, state()}. |
56 |
|
init(#{host_type := HostType, |
57 |
|
pool_id := PoolId, |
58 |
|
request_callback := Requester, |
59 |
|
aggregate_callback := Aggregator, |
60 |
|
flush_extra := FlushExtra} = Opts) |
61 |
|
when is_function(Requester, 2), |
62 |
|
is_function(Aggregator, 3), |
63 |
|
is_map(FlushExtra) -> |
64 |
:-( |
?LOG_DEBUG(#{what => aggregator_worker_start, host_type => HostType, pool_id => PoolId}), |
65 |
:-( |
{ok, #state{host_type = HostType, |
66 |
|
pool_id = PoolId, |
67 |
|
request_callback = Requester, |
68 |
|
aggregate_callback = Aggregator, |
69 |
|
verify_callback = maps:get(verify_callback, Opts, undefined), |
70 |
|
flush_extra = FlushExtra}}. |
71 |
|
|
72 |
|
-spec handle_call(term(), term(), state()) -> {reply, term(), state()}. |
73 |
|
handle_call(sync, _From, State = #state{flush_elems = Elems}) -> |
74 |
:-( |
case maps:size(Elems) of |
75 |
:-( |
0 -> {reply, skipped, State}; |
76 |
:-( |
_ -> {reply, ok, run_flush(State)} |
77 |
|
end; |
78 |
|
handle_call(Msg, From, State) -> |
79 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
80 |
:-( |
{reply, unexpected_call, State}. |
81 |
|
|
82 |
|
-spec handle_cast(term(), state()) -> {noreply, state()}. |
83 |
|
handle_cast({task, Key, Value}, State) -> |
84 |
:-( |
{noreply, handle_task(Key, Value, State)}; |
85 |
|
handle_cast(Msg, State) -> |
86 |
:-( |
?UNEXPECTED_CAST(Msg), |
87 |
:-( |
{noreply, State}. |
88 |
|
|
89 |
|
-spec handle_info(term(), state()) -> {noreply, state()}. |
90 |
|
handle_info(Msg, #state{async_request = no_request_pending} = State) -> |
91 |
:-( |
?UNEXPECTED_INFO(Msg), |
92 |
:-( |
{noreply, State}; |
93 |
|
handle_info(Msg, #state{async_request = {AsyncRequest, ReqTask}} = State) -> |
94 |
:-( |
case gen_server:check_response(Msg, AsyncRequest) of |
95 |
|
{error, {Reason, _Ref}} -> |
96 |
:-( |
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})), |
97 |
:-( |
{noreply, State}; |
98 |
|
{reply, {error, Reason}} -> |
99 |
:-( |
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})), |
100 |
:-( |
{noreply, State}; |
101 |
|
{reply, Reply} -> |
102 |
:-( |
maybe_verify_reply(Reply, ReqTask, State), |
103 |
:-( |
{noreply, maybe_request_next(State)}; |
104 |
|
no_reply -> |
105 |
:-( |
?UNEXPECTED_INFO(Msg), |
106 |
:-( |
{noreply, State} |
107 |
|
end. |
108 |
|
|
109 |
|
-spec terminate(term(), state()) -> term(). |
110 |
|
terminate(Reason, State) -> |
111 |
:-( |
?LOG_INFO(log_fields(State, #{what => aggregate_worker_stopping, reason => Reason})), |
112 |
:-( |
case maps:size(State#state.flush_elems) of |
113 |
:-( |
0 -> ok; |
114 |
|
_ -> |
115 |
:-( |
?LOG_WARNING(log_fields(State, #{what => aggregate_worker_terminate_requires_flush})), |
116 |
:-( |
run_flush(State) |
117 |
|
end. |
118 |
|
|
119 |
|
-spec code_change(term(), state(), term()) -> {ok, state()}. |
120 |
|
code_change(_OldVsn, State, _Extra) -> |
121 |
:-( |
{ok, State}. |
122 |
|
|
123 |
|
% Don't leak the tasks to logs, can contain private information |
124 |
|
format_status(_Opt, [_PDict, State | _]) -> |
125 |
:-( |
[{data, [{"State", State#state{flush_elems = censored}}]}]. |
126 |
|
|
127 |
|
% If we don't have any request pending, it means that it is the first task submitted, |
128 |
|
% so aggregation is not needed. |
129 |
|
handle_task(_, Value, #state{async_request = no_request_pending} = State) -> |
130 |
:-( |
State#state{async_request = make_async_request(Value, State)}; |
131 |
|
handle_task(Key, NewValue, #state{aggregate_callback = Aggregator, |
132 |
|
flush_elems = Acc, |
133 |
|
flush_queue = Queue, |
134 |
|
flush_extra = Extra} = State) -> |
135 |
:-( |
case Acc of |
136 |
|
#{Key := OldValue} -> |
137 |
:-( |
case Aggregator(OldValue, NewValue, Extra) of |
138 |
|
{ok, FinalValue} -> |
139 |
:-( |
State#state{flush_elems = Acc#{Key := FinalValue}}; |
140 |
|
{error, Reason} -> |
141 |
:-( |
?LOG_ERROR(log_fields(State, #{what => aggregation_failed, reason => Reason})), |
142 |
:-( |
State |
143 |
|
end; |
144 |
|
_ -> |
145 |
|
% The queue is used to ensure the order in which elements are flushed, |
146 |
|
% so that first requests are first flushed. |
147 |
:-( |
State#state{flush_elems = Acc#{Key => NewValue}, |
148 |
|
flush_queue = queue:in(Key, Queue)} |
149 |
|
end. |
150 |
|
|
151 |
|
maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue} = State) -> |
152 |
:-( |
case queue:out(Queue) of |
153 |
|
{{value, Key}, NewQueue} -> |
154 |
:-( |
{Value, NewAcc} = maps:take(Key, Acc), |
155 |
:-( |
State#state{async_request = make_async_request(Value, State), |
156 |
|
flush_elems = NewAcc, flush_queue = NewQueue}; |
157 |
|
{empty, _} -> |
158 |
:-( |
State#state{async_request = no_request_pending} |
159 |
|
end. |
160 |
|
|
161 |
|
make_async_request(Value, #state{host_type = HostType, pool_id = PoolId, |
162 |
|
request_callback = Requestor, flush_extra = Extra}) -> |
163 |
:-( |
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, async_request], 1), |
164 |
:-( |
{Requestor(Value, Extra), Value}. |
165 |
|
|
166 |
|
maybe_verify_reply(_, _, #state{verify_callback = undefined}) -> |
167 |
:-( |
ok; |
168 |
|
maybe_verify_reply(Reply, ReqTask, #state{verify_callback = Verifier, flush_extra = Extra}) -> |
169 |
:-( |
Verifier(Reply, ReqTask, Extra). |
170 |
|
|
171 |
|
run_flush(State) -> |
172 |
:-( |
case maybe_request_next(State) of |
173 |
|
State1 = #state{async_request = no_request_pending} -> |
174 |
:-( |
State1; |
175 |
|
State2 -> |
176 |
:-( |
run_flush(State2) |
177 |
|
end. |
178 |
|
|
179 |
|
log_fields(State, LogMessage) -> |
180 |
:-( |
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id}. |