./ct_report/coverage/mongoose_aggregator_worker.COVER.html

1 %% @doc
2 %% This module encapsulates an asynchronous aggregator, that takes tasks classified by key,
3 %% to immediately preprocess and flush, but upon flushing, it submits asynchronous requests
4 %% and continues taking tasks and preprocessing them –aggregating them–, until the first
5 %% request is fulfilled, at which time it puts the next on flush.
6 %%
7 %% `request_callback' is a function that takes the new task and the extra metadata, and
8 %% essentially encapsulates `gen_server:send_request'. The returned reference is what will
9 %% be used in `handle_info' to match pending requests, so it is important that the return
10 %% value of `request_callback' is that same of the `gen_server:send_request'.
11 %% I have chosen to provide an arbitrary function callback so that it can encapsulate all
12 %% the logic that chooses the gen_server to request to, and possibly calculates other data
13 %% in the meantime.
14 %% @end
15 -module(mongoose_aggregator_worker).
16
17 -behaviour(gen_server).
18
19 -callback aggregate(mongoose_async_pools:task(),
20 mongoose_async_pools:task(),
21 mongoose_async_pools:pool_extra()) ->
22 {ok, mongoose_async_pools:task()} | {error, term()}.
23 -callback request(mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
24 gen_server:request_id().
25 -callback verify(term(), mongoose_async_pools:task(), mongoose_async_pools:pool_extra()) ->
26 term().
27 -optional_callbacks([verify/3]).
28
29 -include("mongoose_logger.hrl").
30
31 %% gen_server callbacks
32 -export([init/1,
33 handle_call/3,
34 handle_cast/2,
35 handle_info/2,
36 terminate/2,
37 code_change/3,
38 format_status/2]).
39
40 -type request() :: no_request_pending | {gen_server:request_id(), mongoose_async_pools:task()}.
41 -record(state, {
42 host_type :: mongooseim:host_type(),
43 pool_id :: mongoose_async_pools:pool_id(),
44 async_request = no_request_pending :: request(),
45 request_callback :: mongoose_async_pools:request_callback(),
46 aggregate_callback :: mongoose_async_pools:aggregate_callback(),
47 verify_callback :: undefined | mongoose_async_pools:verify_callback(),
48 flush_elems = #{} :: map() | censored, % see format_status/2 for censored
49 flush_queue = queue:new() :: queue:queue(),
50 flush_extra = #{} :: map()
51 }).
52 -type state() :: #state{}.
53
54 %% gen_server callbacks
55 -spec init(map()) -> {ok, state()}.
56 init(#{host_type := HostType,
57 pool_id := PoolId,
58 request_callback := Requester,
59 aggregate_callback := Aggregator,
60 flush_extra := FlushExtra} = Opts)
61 when is_function(Requester, 2),
62 is_function(Aggregator, 3),
63 is_map(FlushExtra) ->
64
:-(
?LOG_DEBUG(#{what => aggregator_worker_start, host_type => HostType, pool_id => PoolId}),
65
:-(
{ok, #state{host_type = HostType,
66 pool_id = PoolId,
67 request_callback = Requester,
68 aggregate_callback = Aggregator,
69 verify_callback = maps:get(verify_callback, Opts, undefined),
70 flush_extra = FlushExtra}}.
71
72 -spec handle_call(term(), term(), state()) -> {reply, term(), state()}.
73 handle_call(sync, _From, State = #state{flush_elems = Elems}) ->
74
:-(
case maps:size(Elems) of
75
:-(
0 -> {reply, skipped, State};
76
:-(
_ -> {reply, ok, run_flush(State)}
77 end;
78 handle_call(Msg, From, State) ->
79
:-(
?UNEXPECTED_CALL(Msg, From),
80
:-(
{reply, unexpected_call, State}.
81
82 -spec handle_cast(term(), state()) -> {noreply, state()}.
83 handle_cast({task, Key, Value}, State) ->
84
:-(
{noreply, handle_task(Key, Value, State)};
85 handle_cast(Msg, State) ->
86
:-(
?UNEXPECTED_CAST(Msg),
87
:-(
{noreply, State}.
88
89 -spec handle_info(term(), state()) -> {noreply, state()}.
90 handle_info(Msg, #state{async_request = no_request_pending} = State) ->
91
:-(
?UNEXPECTED_INFO(Msg),
92
:-(
{noreply, State};
93 handle_info(Msg, #state{async_request = {AsyncRequest, ReqTask}} = State) ->
94
:-(
case gen_server:check_response(Msg, AsyncRequest) of
95 {error, {Reason, _Ref}} ->
96
:-(
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})),
97
:-(
{noreply, State};
98 {reply, {error, Reason}} ->
99
:-(
?LOG_ERROR(log_fields(State, #{what => asynchronous_request_failed, reason => Reason})),
100
:-(
{noreply, State};
101 {reply, Reply} ->
102
:-(
maybe_verify_reply(Reply, ReqTask, State),
103
:-(
{noreply, maybe_request_next(State)};
104 no_reply ->
105
:-(
?UNEXPECTED_INFO(Msg),
106
:-(
{noreply, State}
107 end.
108
109 -spec terminate(term(), state()) -> term().
110 terminate(Reason, State) ->
111
:-(
?LOG_INFO(log_fields(State, #{what => aggregate_worker_stopping, reason => Reason})),
112
:-(
case maps:size(State#state.flush_elems) of
113
:-(
0 -> ok;
114 _ ->
115
:-(
?LOG_WARNING(log_fields(State, #{what => aggregate_worker_terminate_requires_flush})),
116
:-(
run_flush(State)
117 end.
118
119 -spec code_change(term(), state(), term()) -> {ok, state()}.
120 code_change(_OldVsn, State, _Extra) ->
121
:-(
{ok, State}.
122
123 % Don't leak the tasks to logs, can contain private information
124 format_status(_Opt, [_PDict, State | _]) ->
125
:-(
[{data, [{"State", State#state{flush_elems = censored}}]}].
126
127 % If we don't have any request pending, it means that it is the first task submitted,
128 % so aggregation is not needed.
129 handle_task(_, Value, #state{async_request = no_request_pending} = State) ->
130
:-(
State#state{async_request = make_async_request(Value, State)};
131 handle_task(Key, NewValue, #state{aggregate_callback = Aggregator,
132 flush_elems = Acc,
133 flush_queue = Queue,
134 flush_extra = Extra} = State) ->
135
:-(
case Acc of
136 #{Key := OldValue} ->
137
:-(
case Aggregator(OldValue, NewValue, Extra) of
138 {ok, FinalValue} ->
139
:-(
State#state{flush_elems = Acc#{Key := FinalValue}};
140 {error, Reason} ->
141
:-(
?LOG_ERROR(log_fields(State, #{what => aggregation_failed, reason => Reason})),
142
:-(
State
143 end;
144 _ ->
145 % The queue is used to ensure the order in which elements are flushed,
146 % so that first requests are first flushed.
147
:-(
State#state{flush_elems = Acc#{Key => NewValue},
148 flush_queue = queue:in(Key, Queue)}
149 end.
150
151 maybe_request_next(#state{flush_elems = Acc, flush_queue = Queue} = State) ->
152
:-(
case queue:out(Queue) of
153 {{value, Key}, NewQueue} ->
154
:-(
{Value, NewAcc} = maps:take(Key, Acc),
155
:-(
State#state{async_request = make_async_request(Value, State),
156 flush_elems = NewAcc, flush_queue = NewQueue};
157 {empty, _} ->
158
:-(
State#state{async_request = no_request_pending}
159 end.
160
161 make_async_request(Value, #state{host_type = HostType, pool_id = PoolId,
162 request_callback = Requestor, flush_extra = Extra}) ->
163
:-(
mongoose_metrics:update(HostType, [mongoose_async_pools, PoolId, async_request], 1),
164
:-(
{Requestor(Value, Extra), Value}.
165
166 maybe_verify_reply(_, _, #state{verify_callback = undefined}) ->
167
:-(
ok;
168 maybe_verify_reply(Reply, ReqTask, #state{verify_callback = Verifier, flush_extra = Extra}) ->
169
:-(
Verifier(Reply, ReqTask, Extra).
170
171 run_flush(State) ->
172
:-(
case maybe_request_next(State) of
173 State1 = #state{async_request = no_request_pending} ->
174
:-(
State1;
175 State2 ->
176
:-(
run_flush(State2)
177 end.
178
179 log_fields(State, LogMessage) ->
180
:-(
LogMessage#{host_type => State#state.host_type, pool_id => State#state.pool_id}.
Line Hits Source