./ct_report/coverage/shaper_srv.COVER.html

1 %%%-------------------------------------------------------------------
2 %%% @author Uvarov Michael <arcusfelis@gmail.com>
3 %%% @copyright (C) 2013, Uvarov Michael
4 %%% @doc Shared shapers.
5 %%% @end
6 %%%-------------------------------------------------------------------
7 -module(shaper_srv).
8 -behaviour(gen_server).
9
10 %% API Function Exports
11 -export([start_link/1, wait/5, reset_all_shapers/1]).
12 -ignore_xref([start_link/1, reset_all_shapers/1]).
13
14 %% Record definitions
15 -record(state, {
16 %% Maximum ammount of milliseconds to wait
17 max_delay :: non_neg_integer(),
18 %% How many seconds to store each shaper
19 ttl :: non_neg_integer(),
20 shapers :: map(),
21 a_times :: map()
22 }).
23 -type state() :: #state{}.
24
25 %% gen_server Function Exports
26 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
27
28 %% API Function Definitions
29 -spec start_link(atom()) -> ignore | {error, _} | {ok, pid()}.
30 start_link(ProcName) ->
31 930 gen_server:start_link({local, ProcName}, ?MODULE, [], []).
32
33 %% @doc Shapes the caller from executing the action.
34 -spec wait(HostType :: mongooseim:host_type(),
35 Domain :: jid:server(),
36 Action :: atom(),
37 FromJID :: jid:jid() | global,
38 Size :: integer()) -> ok | {error, max_delay_reached}.
39 wait(HostType, Domain, Action, FromJID, Size) ->
40 3460 Worker = mongoose_shaper_sup:select_worker(FromJID),
41 3460 gen_server:call(Worker, {wait, HostType, Domain, Action, FromJID, Size}).
42
43 %% @doc Ask all shaper servers to forget current shapers and read settings again
44 reset_all_shapers(_HostType) ->
45 1 [reset_shapers(ProcName) || ProcName <- mongoose_shaper_sup:get_workers()].
46
47 %% @doc Ask server to forget its shapers
48 reset_shapers(ProcName) ->
49 10 gen_server:call(ProcName, reset_shapers).
50
51 %% gen_server Function Definitions
52 init(Args) ->
53 930 State = #state{max_delay = proplists:get_value(max_delay, Args, 3000),
54 ttl = proplists:get_value(ttl, Args, 120),
55 shapers = #{},
56 a_times = #{}
57 },
58 930 GCInt = proplists:get_value(gc_interval, Args, 30),
59 930 timer:send_interval(timer:seconds(GCInt), delete_old_shapers),
60 930 {ok, State}.
61
62 handle_call({wait, HostType, Domain, Action, FromJID, Size},
63 From, State = #state{max_delay = MaxDelayMs}) ->
64 3460 Key = new_key(Domain, Action, FromJID),
65 3460 Shaper = find_or_create_shaper(HostType, Key, State),
66 3460 State1 = update_access_time(Key, erlang:system_time(), State),
67 3460 case shaper:update(Shaper, Size) of
68 {UpdatedShaper, 0} ->
69 3460 {reply, ok, save_shaper(Key, UpdatedShaper, State1)};
70 {UpdatedShaper, DelayMs} when DelayMs < MaxDelayMs ->
71
:-(
reply_after(DelayMs, From, ok),
72
:-(
{noreply, save_shaper(Key, UpdatedShaper, State1)};
73 {_, _} ->
74
:-(
{reply, {error, max_delay_reached}, State1}
75 end;
76 handle_call(reset_shapers, _From, State = #state{}) ->
77 10 {reply, ok, init_dicts(State)}.
78
79 handle_cast(_Msg, State) ->
80
:-(
{noreply, State}.
81
82 handle_info(delete_old_shapers, State) ->
83 750 {noreply, delete_old_shapers(State)};
84 handle_info(_Info, State) ->
85
:-(
{noreply, State}.
86
87 terminate(_Reason, _State) ->
88
:-(
ok.
89
90 code_change(_OldVsn, State, _Extra) ->
91
:-(
{ok, State}.
92
93 %% ------------------------------------------------------------------
94 %% Internal Function Definitions
95 %% ------------------------------------------------------------------
96
97 -type key() :: {global | jid:server(), atom(), jid:jid()}.
98 -spec new_key(jid:server() | global, atom(), jid:jid()) -> key().
99 new_key(Domain, Action, FromJID) ->
100 3460 {Domain, Action, FromJID}.
101
102 -spec find_or_create_shaper(mongooseim:host_type(), key(), state()) ->
103 shaper:shaper().
104 find_or_create_shaper(HostType, Key, #state{shapers = Shapers}) ->
105 3460 case Shapers of
106 2305 #{Key := Shaper} -> Shaper;
107 1155 _ -> create_shaper(HostType, Key)
108 end.
109
110 -spec update_access_time(key(), _, state()) -> state().
111 update_access_time(Key, Now, State = #state{a_times = Times}) ->
112 8002 State#state{a_times = maps:put(Key, Now, Times)}.
113
114 -spec save_shaper(key(), shaper:shaper(), state()) -> state().
115 save_shaper(Key, Shaper, State = #state{shapers = Shapers}) ->
116 8002 State#state{shapers = maps:put(Key, Shaper, Shapers)}.
117
118 -spec init_dicts(state()) -> state().
119 init_dicts(State) ->
120 760 State#state{shapers = #{}, a_times = #{}}.
121
122 -spec delete_old_shapers(state()) -> state().
123 delete_old_shapers(State = #state{shapers = Shapers, a_times = Times, ttl = TTL}) ->
124 750 Min = subtract_seconds(TTL),
125 %% Copy recently modified shapers
126 750 maps:fold(fun
127 1127 (_, ATime, Acc) when ATime < Min -> Acc; %% skip too old
128 (Key, ATime, Acc) ->
129 4542 Shaper = maps:get(Key, Shapers),
130 4542 update_access_time(Key, ATime, save_shaper(Key, Shaper, Acc))
131 end, init_dicts(State), Times).
132
133 -spec create_shaper(mongooseim:host_type(), key()) ->
134 none | shaper:shaper().
135 create_shaper(HostType, Key) ->
136 1155 shaper:new(request_shaper_name(HostType, Key)).
137
138 -spec request_shaper_name(mongooseim:host_type(), key()) -> atom().
139 request_shaper_name(HostType, {Domain, Action, FromJID}) ->
140 1155 get_shaper_name(HostType, Domain, Action, FromJID, default_shaper()).
141
142 default_shaper() ->
143 1155 none.
144
145 -spec get_shaper_name(HostType :: mongooseim:host_type(),
146 Domain :: global | jid:server(),
147 Action :: atom(), jid:jid(),
148 Default :: none) -> allow | none.
149 get_shaper_name(HostType, Domain, Action, FromJID, Default) ->
150 1155 case acl:match_rule(HostType, Domain, Action, FromJID) of
151 1102 deny -> Default;
152 53 Value -> Value
153 end.
154
155 %% @doc It is a small hack
156 %% This function calls this in more efficient way:
157 %% timer:apply_after(DelayMs, gen_server, reply, [From, Reply]).
158 -spec reply_after(pos_integer(), {atom() | pid(), _}, ok) -> reference().
159 reply_after(DelayMs, {Pid, Tag}, Reply) ->
160
:-(
erlang:send_after(DelayMs, Pid, {Tag, Reply}).
161
162 -spec subtract_seconds(integer()) -> integer().
163 subtract_seconds(TTL) ->
164 750 TimestampThreshold = erlang:system_time(second) - TTL,
165 750 erlang:convert_time_unit(TimestampThreshold, second, native).
Line Hits Source