1 |
|
%%%------------------------------------------------------------------- |
2 |
|
%%% @author Uvarov Michael <arcusfelis@gmail.com> |
3 |
|
%%% @copyright (C) 2013, Uvarov Michael |
4 |
|
%%% @doc Shared shapers. |
5 |
|
%%% @end |
6 |
|
%%%------------------------------------------------------------------- |
7 |
|
-module(shaper_srv). |
8 |
|
-behaviour(gen_server). |
9 |
|
|
10 |
|
%% API Function Exports |
11 |
|
-export([start_link/1, wait/5, reset_all_shapers/1]). |
12 |
|
-ignore_xref([start_link/1, reset_all_shapers/1]). |
13 |
|
|
14 |
|
%% Record definitions |
15 |
|
-record(state, { |
16 |
|
%% Maximum ammount of milliseconds to wait |
17 |
|
max_delay :: non_neg_integer(), |
18 |
|
%% How many seconds to store each shaper |
19 |
|
ttl :: non_neg_integer(), |
20 |
|
shapers :: map(), |
21 |
|
a_times :: map() |
22 |
|
}). |
23 |
|
-type state() :: #state{}. |
24 |
|
|
25 |
|
%% gen_server Function Exports |
26 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). |
27 |
|
|
28 |
|
%% API Function Definitions |
29 |
|
-spec start_link(atom()) -> ignore | {error, _} | {ok, pid()}. |
30 |
|
start_link(ProcName) -> |
31 |
810 |
gen_server:start_link({local, ProcName}, ?MODULE, [], []). |
32 |
|
|
33 |
|
%% @doc Shapes the caller from executing the action. |
34 |
|
-spec wait(HostType :: mongooseim:host_type(), |
35 |
|
Domain :: jid:server(), |
36 |
|
Action :: atom(), |
37 |
|
FromJID :: jid:jid() | global, |
38 |
|
Size :: integer()) -> ok | {error, max_delay_reached}. |
39 |
|
wait(HostType, Domain, Action, FromJID, Size) -> |
40 |
:-( |
Worker = mongoose_shaper_sup:select_worker(FromJID), |
41 |
:-( |
gen_server:call(Worker, {wait, HostType, Domain, Action, FromJID, Size}). |
42 |
|
|
43 |
|
%% @doc Ask all shaper servers to forget current shapers and read settings again |
44 |
|
reset_all_shapers(_HostType) -> |
45 |
:-( |
[reset_shapers(ProcName) || ProcName <- mongoose_shaper_sup:get_workers()]. |
46 |
|
|
47 |
|
%% @doc Ask server to forget its shapers |
48 |
|
reset_shapers(ProcName) -> |
49 |
:-( |
gen_server:call(ProcName, reset_shapers). |
50 |
|
|
51 |
|
%% gen_server Function Definitions |
52 |
|
init(Args) -> |
53 |
810 |
State = #state{max_delay = proplists:get_value(max_delay, Args, 3000), |
54 |
|
ttl = proplists:get_value(ttl, Args, 120), |
55 |
|
shapers = #{}, |
56 |
|
a_times = #{} |
57 |
|
}, |
58 |
810 |
GCInt = proplists:get_value(gc_interval, Args, 30), |
59 |
810 |
timer:send_interval(timer:seconds(GCInt), delete_old_shapers), |
60 |
810 |
{ok, State}. |
61 |
|
|
62 |
|
handle_call({wait, HostType, Domain, Action, FromJID, Size}, |
63 |
|
From, State = #state{max_delay = MaxDelayMs}) -> |
64 |
:-( |
Key = new_key(Domain, Action, FromJID), |
65 |
:-( |
Shaper = find_or_create_shaper(HostType, Key, State), |
66 |
:-( |
State1 = update_access_time(Key, erlang:system_time(), State), |
67 |
:-( |
case shaper:update(Shaper, Size) of |
68 |
|
{UpdatedShaper, 0} -> |
69 |
:-( |
{reply, ok, save_shaper(Key, UpdatedShaper, State1)}; |
70 |
|
{UpdatedShaper, DelayMs} when DelayMs < MaxDelayMs -> |
71 |
:-( |
reply_after(DelayMs, From, ok), |
72 |
:-( |
{noreply, save_shaper(Key, UpdatedShaper, State1)}; |
73 |
|
{_, _} -> |
74 |
:-( |
{reply, {error, max_delay_reached}, State1} |
75 |
|
end; |
76 |
|
handle_call(reset_shapers, _From, State = #state{}) -> |
77 |
:-( |
{reply, ok, init_dicts(State)}. |
78 |
|
|
79 |
|
handle_cast(_Msg, State) -> |
80 |
:-( |
{noreply, State}. |
81 |
|
|
82 |
|
handle_info(delete_old_shapers, State) -> |
83 |
930 |
{noreply, delete_old_shapers(State)}; |
84 |
|
handle_info(_Info, State) -> |
85 |
:-( |
{noreply, State}. |
86 |
|
|
87 |
|
terminate(_Reason, _State) -> |
88 |
:-( |
ok. |
89 |
|
|
90 |
|
code_change(_OldVsn, State, _Extra) -> |
91 |
:-( |
{ok, State}. |
92 |
|
|
93 |
|
%% ------------------------------------------------------------------ |
94 |
|
%% Internal Function Definitions |
95 |
|
%% ------------------------------------------------------------------ |
96 |
|
|
97 |
|
-type key() :: {global | jid:server(), atom(), jid:jid()}. |
98 |
|
-spec new_key(jid:server() | global, atom(), jid:jid()) -> key(). |
99 |
|
new_key(Domain, Action, FromJID) -> |
100 |
:-( |
{Domain, Action, FromJID}. |
101 |
|
|
102 |
|
-spec find_or_create_shaper(mongooseim:host_type(), key(), state()) -> |
103 |
|
shaper:shaper(). |
104 |
|
find_or_create_shaper(HostType, Key, #state{shapers = Shapers}) -> |
105 |
:-( |
case Shapers of |
106 |
:-( |
#{Key := Shaper} -> Shaper; |
107 |
:-( |
_ -> create_shaper(HostType, Key) |
108 |
|
end. |
109 |
|
|
110 |
|
-spec update_access_time(key(), _, state()) -> state(). |
111 |
|
update_access_time(Key, Now, State = #state{a_times = Times}) -> |
112 |
:-( |
State#state{a_times = maps:put(Key, Now, Times)}. |
113 |
|
|
114 |
|
-spec save_shaper(key(), shaper:shaper(), state()) -> state(). |
115 |
|
save_shaper(Key, Shaper, State = #state{shapers = Shapers}) -> |
116 |
:-( |
State#state{shapers = maps:put(Key, Shaper, Shapers)}. |
117 |
|
|
118 |
|
-spec init_dicts(state()) -> state(). |
119 |
|
init_dicts(State) -> |
120 |
930 |
State#state{shapers = #{}, a_times = #{}}. |
121 |
|
|
122 |
|
-spec delete_old_shapers(state()) -> state(). |
123 |
|
delete_old_shapers(State = #state{shapers = Shapers, a_times = Times, ttl = TTL}) -> |
124 |
930 |
Min = subtract_seconds(TTL), |
125 |
|
%% Copy recently modified shapers |
126 |
930 |
maps:fold(fun |
127 |
:-( |
(_, ATime, Acc) when ATime < Min -> Acc; %% skip too old |
128 |
|
(Key, ATime, Acc) -> |
129 |
:-( |
Shaper = maps:get(Key, Shapers), |
130 |
:-( |
update_access_time(Key, ATime, save_shaper(Key, Shaper, Acc)) |
131 |
|
end, init_dicts(State), Times). |
132 |
|
|
133 |
|
-spec create_shaper(mongooseim:host_type(), key()) -> |
134 |
|
none | shaper:shaper(). |
135 |
|
create_shaper(HostType, Key) -> |
136 |
:-( |
shaper:new(request_shaper_name(HostType, Key)). |
137 |
|
|
138 |
|
-spec request_shaper_name(mongooseim:host_type(), key()) -> atom(). |
139 |
|
request_shaper_name(HostType, {Domain, Action, FromJID}) -> |
140 |
:-( |
get_shaper_name(HostType, Domain, Action, FromJID, default_shaper()). |
141 |
|
|
142 |
|
default_shaper() -> |
143 |
:-( |
none. |
144 |
|
|
145 |
|
-spec get_shaper_name(HostType :: mongooseim:host_type(), |
146 |
|
Domain :: global | jid:server(), |
147 |
|
Action :: atom(), jid:jid(), |
148 |
|
Default :: none) -> allow | none. |
149 |
|
get_shaper_name(HostType, Domain, Action, FromJID, Default) -> |
150 |
:-( |
case acl:match_rule(HostType, Domain, Action, FromJID) of |
151 |
:-( |
deny -> Default; |
152 |
:-( |
Value -> Value |
153 |
|
end. |
154 |
|
|
155 |
|
%% @doc It is a small hack |
156 |
|
%% This function calls this in more efficient way: |
157 |
|
%% timer:apply_after(DelayMs, gen_server, reply, [From, Reply]). |
158 |
|
-spec reply_after(pos_integer(), {atom() | pid(), _}, ok) -> reference(). |
159 |
|
reply_after(DelayMs, {Pid, Tag}, Reply) -> |
160 |
:-( |
erlang:send_after(DelayMs, Pid, {Tag, Reply}). |
161 |
|
|
162 |
|
-spec subtract_seconds(integer()) -> integer(). |
163 |
|
subtract_seconds(TTL) -> |
164 |
930 |
TimestampThreshold = erlang:system_time(second) - TTL, |
165 |
930 |
erlang:convert_time_unit(TimestampThreshold, second, native). |