./ct_report/coverage/shaper_srv.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc Shared shapers.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(shaper_srv).
8 -behaviour(gen_server).
9
10 %% ------------------------------------------------------------------
11 %% API Function Exports
12 %% ------------------------------------------------------------------
13
14 -export([start_link/1,
15 child_specs/0,
16 wait/5,
17 reset_shapers/1,
18 reset_all_shapers/1]).
19
20 -ignore_xref([reset_all_shapers/1, reset_shapers/1, start_link/1]).
21
22 %% ------------------------------------------------------------------
23 %% Record definitions
24 %% ------------------------------------------------------------------
25
26 -record(state, {
27 %% Maximum ammount of milliseconds to wait
28 max_delay :: non_neg_integer(),
29 %% How many seconds to store each shaper
30 ttl :: non_neg_integer(),
31 shapers :: dict:dict(),
32 a_times :: dict:dict()
33 }).
34 -type state() :: #state{}.
35
36 %% ------------------------------------------------------------------
37 %% gen_server Function Exports
38 %% ------------------------------------------------------------------
39
40 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
41 terminate/2, code_change/3]).
42
43 %% ------------------------------------------------------------------
44 %% API Function Definitions
45 %% ------------------------------------------------------------------
46
47 -spec child_specs() -> [supervisor:child_spec()].
48 child_specs() ->
49 80 [child_spec(ProcName) || ProcName <- worker_names(<<>>)].
50
51 -spec child_spec(atom()) -> supervisor:child_spec().
52 child_spec(ProcName) ->
53 800 {ProcName,
54 {?MODULE, start_link, [ProcName]},
55 permanent,
56 5000,
57 worker,
58 [?MODULE]}.
59
60 -spec start_link(atom()) -> 'ignore' | {'error', _} | {'ok', pid()}.
61 start_link(ProcName) ->
62 800 gen_server:start_link({local, ProcName}, ?MODULE, [], []).
63
64 -spec worker_prefix() -> string().
65 worker_prefix() ->
66 800 "ejabberd_shaper_".
67
68 worker_count(_HostType) ->
69 80 10.
70
71 -spec worker_names(mongooseim:host_type()) -> [atom()].
72 worker_names(HostType) ->
73 80 [worker_name(HostType, N) || N <- lists:seq(0, worker_count(HostType) - 1)].
74
75 -spec worker_name(jid:server(), integer()) -> atom().
76 worker_name(_Host, N) ->
77 800 list_to_atom(worker_prefix() ++ integer_to_list(N)).
78
79 -spec select_worker(mongooseim:host_type(), _) -> atom().
80 select_worker(HostType, Tag) ->
81
:-(
N = worker_number(HostType, Tag),
82
:-(
worker_name(HostType, N).
83
84 -spec worker_number(mongooseim:host_type(), _) -> non_neg_integer().
85 worker_number(HostType, Tag) ->
86
:-(
erlang:phash2(Tag, worker_count(HostType)).
87
88 %% @doc Shapes the caller from executing the action.
89 -spec wait(HostType :: mongooseim:host_type(),
90 Domain :: jid:server(), Action :: atom(),
91 FromJID :: jid:jid() | global, Size :: integer()
92 ) -> ok | {error, max_delay_reached}.
93 wait(HostType, Domain, Action, FromJID, Size) ->
94
:-(
gen_server:call(select_worker(HostType, FromJID),
95 {wait, HostType, Domain, Action, FromJID, Size}).
96
97 %% @doc Ask all shaper servers to forget current shapers and read settings again
98 reset_all_shapers(HostType) ->
99
:-(
[reset_shapers(ProcName) || ProcName <- worker_names(HostType)].
100
101 %% @doc Ask server to forget its shapers
102 reset_shapers(ProcName) ->
103
:-(
gen_server:call(ProcName, reset_shapers).
104
105 %% ------------------------------------------------------------------
106 %% gen_server Function Definitions
107 %% ------------------------------------------------------------------
108
109 init(Args) ->
110 800 State = #state{max_delay = proplists:get_value(max_delay, Args, 3000),
111 ttl = proplists:get_value(ttl, Args, 120),
112 shapers = dict:new(),
113 a_times = dict:new()
114 },
115 800 GCInt = proplists:get_value(gc_interval, Args, 30),
116 800 timer:send_interval(timer:seconds(GCInt), delete_old_shapers),
117 800 {ok, State}.
118
119 handle_call({wait, HostType, Domain, Action, FromJID, Size},
120 From, State=#state{max_delay=MaxDelayMs}) ->
121
:-(
Key = new_key(Domain, Action, FromJID),
122
:-(
Shaper = find_or_create_shaper(HostType, Key, State),
123
:-(
State1 = update_access_time(Key, erlang:system_time(), State),
124
:-(
case shaper:update(Shaper, Size) of
125 {UpdatedShaper, 0} ->
126
:-(
{reply, ok, save_shaper(Key, UpdatedShaper, State1)};
127 {UpdatedShaper, DelayMs} when DelayMs < MaxDelayMs ->
128
:-(
reply_after(DelayMs, From, ok),
129
:-(
{noreply, save_shaper(Key, UpdatedShaper, State1)};
130 {_, _} ->
131
:-(
{reply, {error, max_delay_reached}, State1}
132 end;
133 handle_call(reset_shapers, _From, State=#state{}) ->
134
:-(
{reply, ok, init_dicts(State)}.
135
136 handle_cast(_Msg, State) ->
137
:-(
{noreply, State}.
138
139 handle_info(delete_old_shapers, State) ->
140 1070 {noreply, delete_old_shapers(State)};
141 handle_info(_Info, State) ->
142
:-(
{noreply, State}.
143
144 terminate(_Reason, _State) ->
145
:-(
ok.
146
147 code_change(_OldVsn, State, _Extra) ->
148
:-(
{ok, State}.
149
150 %% ------------------------------------------------------------------
151 %% Internal Function Definitions
152 %% ------------------------------------------------------------------
153
154 -type key() :: {global | jid:server(), atom(), jid:jid()}.
155 -spec new_key(jid:server() | global, atom(), jid:jid()) -> key().
156 new_key(Domain, Action, FromJID) ->
157
:-(
{Domain, Action, FromJID}.
158
159 -spec find_or_create_shaper(mongooseim:host_type(), key(), state()) ->
160 shaper:shaper().
161 find_or_create_shaper(HostType, Key, #state{shapers=Shapers}) ->
162
:-(
case dict:find(Key, Shapers) of
163
:-(
{ok, Shaper} -> Shaper;
164
:-(
error -> create_shaper(HostType, Key)
165 end.
166
167 -spec update_access_time(key(), _, state()) -> state().
168 update_access_time(Key, Now, State=#state{a_times=Times}) ->
169
:-(
State#state{a_times=dict:store(Key, Now, Times)}.
170
171 -spec save_shaper(key(), shaper:shaper(), state()) -> state().
172 save_shaper(Key, Shaper, State=#state{shapers=Shapers}) ->
173
:-(
State#state{shapers=dict:store(Key, Shaper, Shapers)}.
174
175 -spec init_dicts(state()) -> state().
176 init_dicts(State) ->
177 1070 State#state{shapers=dict:new(), a_times=dict:new()}.
178
179 -spec delete_old_shapers(state()) -> state().
180 delete_old_shapers(State=#state{shapers=Shapers, a_times=Times, ttl=TTL}) ->
181 1070 Min = subtract_seconds(TTL),
182 %% Copy recently modified shapers
183 1070 dict:fold(fun
184
:-(
(_, ATime, Acc) when ATime < Min -> Acc; %% skip too old
185 (Key, ATime, Acc) ->
186
:-(
Shaper = dict:fetch(Key, Shapers),
187
:-(
update_access_time(Key, ATime, save_shaper(Key, Shaper, Acc))
188 end, init_dicts(State), Times).
189
190 -spec create_shaper(mongooseim:host_type(), key()) ->
191 'none' | {'maxrate', _, 0, non_neg_integer()}.
192 create_shaper(HostType, Key) ->
193
:-(
shaper:new(request_shaper_name(HostType, Key)).
194
195 -spec request_shaper_name(mongooseim:host_type(), key()) -> atom().
196 request_shaper_name(HostType, {Domain, Action, FromJID}) ->
197
:-(
get_shaper_name(HostType, Domain, Action, FromJID, default_shaper()).
198
199 default_shaper() ->
200
:-(
none.
201
202 -spec get_shaper_name(HostType :: mongooseim:host_type(),
203 Domain :: 'global' | jid:server(),
204 Action :: atom(), jid:jid(),
205 Default :: 'none') -> 'allow' | 'none'.
206 get_shaper_name(HostType, Domain, Action, FromJID, Default) ->
207
:-(
case acl:match_rule(HostType, Domain, Action, FromJID) of
208
:-(
deny -> Default;
209
:-(
Value -> Value
210 end.
211
212 %% @doc It is a small hack
213 %% This function calls this in more efficient way:
214 %% timer:apply_after(DelayMs, gen_server, reply, [From, Reply]).
215 -spec reply_after(pos_integer(), {atom() | pid(), _}, 'ok') -> reference().
216 reply_after(DelayMs, {Pid, Tag}, Reply) ->
217
:-(
erlang:send_after(DelayMs, Pid, {Tag, Reply}).
218
219
220 -spec subtract_seconds(integer()) -> integer().
221 subtract_seconds(TTL) ->
222 1070 TimestampThreshold = erlang:system_time(second) - TTL,
223 1070 erlang:convert_time_unit(TimestampThreshold, second, native).
Line Hits Source