1 |
|
%% ``The contents of this file are subject to the Erlang Public License, |
2 |
|
%% Version 1.1, (the "License"); you may not use this file except in |
3 |
|
%% compliance with the License. You should have received a copy of the |
4 |
|
%% Erlang Public License along with this software. If not, it can be |
5 |
|
%% retrieved via the world wide web at http://www.erlang.org/. |
6 |
|
%% |
7 |
|
%% Software distributed under the License is distributed on an "AS IS" |
8 |
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See |
9 |
|
%% the License for the specific language governing rights and limitations |
10 |
|
%% under the License. |
11 |
|
%% |
12 |
|
%% The Initial Developer of the Original Code is Ericsson Utvecklings AB. |
13 |
|
%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings |
14 |
|
%% AB. All Rights Reserved.'' |
15 |
|
%% |
16 |
|
%% The code has been modified and improved by ProcessOne. |
17 |
|
%% Copyright 2007-2011, ProcessOne |
18 |
|
%% |
19 |
|
%% The change adds the following features: |
20 |
|
%% - You can send exit(priority_shutdown) to the p1_fsm process to |
21 |
|
%% terminate immediatetly. If the fsm trap_exit process flag has been |
22 |
|
%% set to true, the FSM terminate function will called. |
23 |
|
%% - You can pass the gen_fsm options to control resource usage. |
24 |
|
%% {max_queue, N} will exit the process with priority_shutdown |
25 |
|
%% - You can limit the time processing a message (TODO): If the |
26 |
|
%% message processing does not return in a given period of time, the |
27 |
|
%% process will be terminated. |
28 |
|
%% - You might customize the State data before sending it to error_logger |
29 |
|
%% in case of a crash (just export the function print_state/1) |
30 |
|
%% $Id$ |
31 |
|
%% |
32 |
|
-module(p1_fsm_old). |
33 |
|
|
34 |
|
%%%----------------------------------------------------------------- |
35 |
|
%%% |
36 |
|
%%% This state machine is somewhat more pure than state_lib. It is |
37 |
|
%%% still based on State dispatching (one function per state), but |
38 |
|
%%% allows a function handle_event to take care of events in all states. |
39 |
|
%%% It's not that pure anymore :( We also allow synchronized event sending. |
40 |
|
%%% |
41 |
|
%%% If the Parent process terminates the Module:terminate/2 |
42 |
|
%%% function is called. |
43 |
|
%%% |
44 |
|
%%% The user module should export: |
45 |
|
%%% |
46 |
|
%%% init(Args) |
47 |
|
%%% ==> {ok, StateName, StateData} |
48 |
|
%%% {ok, StateName, StateData, Timeout} |
49 |
|
%%% ignore |
50 |
|
%%% {stop, Reason} |
51 |
|
%%% |
52 |
|
%%% StateName(Msg, StateData) |
53 |
|
%%% |
54 |
|
%%% ==> {next_state, NewStateName, NewStateData} |
55 |
|
%%% {next_state, NewStateName, NewStateData, Timeout} |
56 |
|
%%% {stop, Reason, NewStateData} |
57 |
|
%%% Reason = normal | shutdown | Term terminate(State) is called |
58 |
|
%%% |
59 |
|
%%% StateName(Msg, From, StateData) |
60 |
|
%%% |
61 |
|
%%% ==> {next_state, NewStateName, NewStateData} |
62 |
|
%%% {next_state, NewStateName, NewStateData, Timeout} |
63 |
|
%%% {reply, Reply, NewStateName, NewStateData} |
64 |
|
%%% {reply, Reply, NewStateName, NewStateData, Timeout} |
65 |
|
%%% {stop, Reason, NewStateData} |
66 |
|
%%% Reason = normal | shutdown | Term terminate(State) is called |
67 |
|
%%% |
68 |
|
%%% handle_event(Msg, StateName, StateData) |
69 |
|
%%% |
70 |
|
%%% ==> {next_state, NewStateName, NewStateData} |
71 |
|
%%% {next_state, NewStateName, NewStateData, Timeout} |
72 |
|
%%% {stop, Reason, Reply, NewStateData} |
73 |
|
%%% {stop, Reason, NewStateData} |
74 |
|
%%% Reason = normal | shutdown | Term terminate(State) is called |
75 |
|
%%% |
76 |
|
%%% handle_sync_event(Msg, From, StateName, StateData) |
77 |
|
%%% |
78 |
|
%%% ==> {next_state, NewStateName, NewStateData} |
79 |
|
%%% {next_state, NewStateName, NewStateData, Timeout} |
80 |
|
%%% {reply, Reply, NewStateName, NewStateData} |
81 |
|
%%% {reply, Reply, NewStateName, NewStateData, Timeout} |
82 |
|
%%% {stop, Reason, Reply, NewStateData} |
83 |
|
%%% {stop, Reason, NewStateData} |
84 |
|
%%% Reason = normal | shutdown | Term terminate(State) is called |
85 |
|
%%% |
86 |
|
%%% handle_info(Info, StateName) (e.g. {'EXIT', P, R}, {nodedown, N}, ... |
87 |
|
%%% |
88 |
|
%%% ==> {next_state, NewStateName, NewStateData} |
89 |
|
%%% {next_state, NewStateName, NewStateData, Timeout} |
90 |
|
%%% {stop, Reason, NewStateData} |
91 |
|
%%% Reason = normal | shutdown | Term terminate(State) is called |
92 |
|
%%% |
93 |
|
%%% terminate(Reason, StateName, StateData, Messages) |
94 |
|
%%% Let the user module clean up |
95 |
|
%%% always called when server terminates |
96 |
|
%%% |
97 |
|
%%% ==> the return value is ignored |
98 |
|
%%% |
99 |
|
%%% |
100 |
|
%%% The work flow (of the fsm) can be described as follows: |
101 |
|
%%% |
102 |
|
%%% User module fsm |
103 |
|
%%% ----------- ------- |
104 |
|
%%% start -----> start |
105 |
|
%%% init <----- . |
106 |
|
%%% |
107 |
|
%%% loop |
108 |
|
%%% StateName <----- . |
109 |
|
%%% |
110 |
|
%%% handle_event <----- . |
111 |
|
%%% |
112 |
|
%%% handle__sunc_event <----- . |
113 |
|
%%% |
114 |
|
%%% handle_info <----- . |
115 |
|
%%% |
116 |
|
%%% terminate <----- . |
117 |
|
%%% |
118 |
|
%%% |
119 |
|
%%% --------------------------------------------------- |
120 |
|
|
121 |
|
-export([start/3, start/4, |
122 |
|
start_link/3, start_link/4, |
123 |
|
send_event/2, sync_send_event/2, sync_send_event/3, |
124 |
|
send_all_state_event/2, |
125 |
|
sync_send_all_state_event/2, sync_send_all_state_event/3, |
126 |
|
reply/2, |
127 |
|
start_timer/2, send_event_after/2, cancel_timer/1, |
128 |
|
enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). |
129 |
|
|
130 |
|
%% Internal exports |
131 |
|
-export([init_it/6, print_event/3, |
132 |
|
system_continue/3, |
133 |
|
system_terminate/4, |
134 |
|
system_code_change/4, |
135 |
|
format_status/2]). |
136 |
|
|
137 |
|
-ignore_xref([ |
138 |
|
start/3, start/4, start_link/3, start_link/4, send_event/2, sync_send_event/2, sync_send_event/3, |
139 |
|
send_all_state_event/2, sync_send_all_state_event/2, sync_send_all_state_event/3, |
140 |
|
reply/2, start_timer/2, send_event_after/2, cancel_timer/1, |
141 |
|
enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7, init_it/6, print_event/3, |
142 |
|
system_continue/3, system_terminate/4, system_code_change/4, format_status/2, behaviour_info/1 |
143 |
|
]). |
144 |
|
|
145 |
|
-import(error_logger, [format/2]). |
146 |
|
|
147 |
|
%%% Internal gen_fsm state |
148 |
|
%%% This state is used to defined resource control values: |
149 |
|
-record(limits, {max_queue}). |
150 |
|
|
151 |
|
%%% --------------------------------------------------- |
152 |
|
%%% Interface functions. |
153 |
|
%%% --------------------------------------------------- |
154 |
|
|
155 |
|
-callback init(Args :: term()) -> |
156 |
|
{ok, StateName :: atom(), StateData :: term()} | |
157 |
|
{ok, StateName :: atom(), StateData :: term(), timeout()} | |
158 |
|
{stop, Reason :: term()} | ignore. |
159 |
|
-callback handle_event(Event :: term(), StateName :: atom(), |
160 |
|
StateData :: term()) -> |
161 |
|
{next_state, NextStateName :: atom(), NewStateData :: term()} | |
162 |
|
{next_state, NextStateName :: atom(), NewStateData :: term(), |
163 |
|
timeout() | hibernate} | |
164 |
|
{stop, Reason :: term(), NewStateData :: term()}. |
165 |
|
-callback handle_sync_event(Event :: term(), From :: {pid(), Tag :: term()}, |
166 |
|
StateName :: atom(), StateData :: term()) -> |
167 |
|
{reply, Reply :: term(), NextStateName :: atom(), NewStateData :: term()} | |
168 |
|
{reply, Reply :: term(), NextStateName :: atom(), NewStateData :: term(), |
169 |
|
timeout() | hibernate} | |
170 |
|
{next_state, NextStateName :: atom(), NewStateData :: term()} | |
171 |
|
{next_state, NextStateName :: atom(), NewStateData :: term(), |
172 |
|
timeout() | hibernate} | |
173 |
|
{stop, Reason :: term(), Reply :: term(), NewStateData :: term()} | |
174 |
|
{stop, Reason :: term(), NewStateData :: term()}. |
175 |
|
-callback handle_info(Info :: term(), StateName :: atom(), |
176 |
|
StateData :: term()) -> |
177 |
|
{next_state, NextStateName :: atom(), NewStateData :: term()} | |
178 |
|
{next_state, NextStateName :: atom(), NewStateData :: term(), |
179 |
|
timeout() | hibernate} | |
180 |
|
{stop, Reason :: normal | term(), NewStateData :: term()}. |
181 |
|
-callback terminate(Reason :: normal | shutdown | {shutdown, term()} | term(), |
182 |
|
StateName :: atom(), StateData :: term(), |
183 |
|
BufferedMessages :: list()) -> |
184 |
|
term(). |
185 |
|
-callback code_change(OldVsn :: term() | {down, term()}, StateName :: atom(), |
186 |
|
StateData :: term(), Extra :: term()) -> |
187 |
|
{ok, NextStateName :: atom(), NewStateData :: term()}. |
188 |
|
|
189 |
|
%%% --------------------------------------------------- |
190 |
|
%%% Starts a generic state machine. |
191 |
|
%%% start(Mod, Args, Options) |
192 |
|
%%% start(Name, Mod, Args, Options) |
193 |
|
%%% start_link(Mod, Args, Options) |
194 |
|
%%% start_link(Name, Mod, Args, Options) where: |
195 |
|
%%% Name ::= {local, atom()} | {global, atom()} |
196 |
|
%%% Mod ::= atom(), callback module implementing the 'real' fsm |
197 |
|
%%% Args ::= term(), init arguments (to Mod:init/1) |
198 |
|
%%% Options ::= [{debug, [Flag]}] |
199 |
|
%%% Flag ::= trace | log | {logfile, File} | statistics | debug |
200 |
|
%%% (debug == log && statistics) |
201 |
|
%%% Returns: {ok, Pid} | |
202 |
|
%%% {error, {already_started, Pid}} | |
203 |
|
%%% {error, Reason} |
204 |
|
%%% --------------------------------------------------- |
205 |
|
start(Mod, Args, Options) -> |
206 |
:-( |
gen:start(?MODULE, nolink, Mod, Args, Options). |
207 |
|
|
208 |
|
start(Name, Mod, Args, Options) -> |
209 |
:-( |
gen:start(?MODULE, nolink, Name, Mod, Args, Options). |
210 |
|
|
211 |
|
start_link(Mod, Args, Options) -> |
212 |
4360 |
gen:start(?MODULE, link, Mod, Args, Options). |
213 |
|
|
214 |
|
start_link(Name, Mod, Args, Options) -> |
215 |
:-( |
gen:start(?MODULE, link, Name, Mod, Args, Options). |
216 |
|
|
217 |
|
|
218 |
|
send_event({global, Name}, Event) -> |
219 |
:-( |
catch global:send(Name, {'$gen_event', Event}), |
220 |
:-( |
ok; |
221 |
|
send_event(Name, Event) -> |
222 |
139 |
Name ! {'$gen_event', Event}, |
223 |
139 |
ok. |
224 |
|
|
225 |
|
sync_send_event(Name, Event) -> |
226 |
12 |
case catch gen:call(Name, '$gen_sync_event', Event) of |
227 |
|
{ok, Res} -> |
228 |
11 |
Res; |
229 |
|
{'EXIT', Reason} -> |
230 |
1 |
exit({Reason, {?MODULE, sync_send_event, [Name, Event]}}) |
231 |
|
end. |
232 |
|
|
233 |
|
sync_send_event(Name, Event, Timeout) -> |
234 |
:-( |
case catch gen:call(Name, '$gen_sync_event', Event, Timeout) of |
235 |
|
{ok, Res} -> |
236 |
:-( |
Res; |
237 |
|
{'EXIT', Reason} -> |
238 |
:-( |
exit({Reason, {?MODULE, sync_send_event, [Name, Event, Timeout]}}) |
239 |
|
end. |
240 |
|
|
241 |
|
send_all_state_event({global, Name}, Event) -> |
242 |
:-( |
catch global:send(Name, {'$gen_all_state_event', Event}), |
243 |
:-( |
ok; |
244 |
|
send_all_state_event(Name, Event) -> |
245 |
1 |
Name ! {'$gen_all_state_event', Event}, |
246 |
1 |
ok. |
247 |
|
|
248 |
|
sync_send_all_state_event(Name, Event) -> |
249 |
:-( |
case catch gen:call(Name, '$gen_sync_all_state_event', Event) of |
250 |
|
{ok, Res} -> |
251 |
:-( |
Res; |
252 |
|
{'EXIT', Reason} -> |
253 |
:-( |
exit({Reason, {?MODULE, sync_send_all_state_event, [Name, Event]}}) |
254 |
|
end. |
255 |
|
|
256 |
|
sync_send_all_state_event(Name, Event, Timeout) -> |
257 |
39 |
case catch gen:call(Name, '$gen_sync_all_state_event', Event, Timeout) of |
258 |
|
{ok, Res} -> |
259 |
39 |
Res; |
260 |
|
{'EXIT', Reason} -> |
261 |
:-( |
exit({Reason, {?MODULE, sync_send_all_state_event, |
262 |
|
[Name, Event, Timeout]}}) |
263 |
|
end. |
264 |
|
|
265 |
|
%% Designed to be only callable within one of the callbacks |
266 |
|
%% hence using the self() of this instance of the process. |
267 |
|
%% This is to ensure that timers don't go astray in global |
268 |
|
%% e.g. when straddling a failover, or turn up in a restarted |
269 |
|
%% instance of the process. |
270 |
|
|
271 |
|
%% Returns Ref, sends event {timeout, Ref, Msg} after Time |
272 |
|
%% to the (then) current state. |
273 |
|
start_timer(Time, Msg) -> |
274 |
:-( |
erlang:start_timer(Time, self(), {'$gen_timer', Msg}). |
275 |
|
|
276 |
|
%% Returns Ref, sends Event after Time to the (then) current state. |
277 |
|
send_event_after(Time, Event) -> |
278 |
:-( |
erlang:start_timer(Time, self(), {'$gen_event', Event}). |
279 |
|
|
280 |
|
%% Returns the remaing time for the timer if Ref referred to |
281 |
|
%% an active timer/send_event_after, false otherwise. |
282 |
|
cancel_timer(Ref) -> |
283 |
:-( |
case erlang:cancel_timer(Ref) of |
284 |
|
false -> |
285 |
:-( |
receive {timeout, Ref, _} -> 0 |
286 |
:-( |
after 0 -> false |
287 |
|
end; |
288 |
|
RemainingTime -> |
289 |
:-( |
RemainingTime |
290 |
|
end. |
291 |
|
|
292 |
|
%% enter_loop/4, 5, 6 |
293 |
|
%% Makes an existing process into a gen_fsm. |
294 |
|
%% The calling process will enter the gen_fsm receive loop and become a |
295 |
|
%% gen_fsm process. |
296 |
|
%% The process *must* have been started using one of the start functions |
297 |
|
%% in proc_lib, see proc_lib(3). |
298 |
|
%% The user is responsible for any initialization of the process, |
299 |
|
%% including registering a name for it. |
300 |
|
enter_loop(Mod, Options, StateName, StateData) -> |
301 |
:-( |
enter_loop(Mod, Options, StateName, StateData, self(), infinity). |
302 |
|
|
303 |
|
enter_loop(Mod, Options, StateName, StateData, ServerName = {_, _}) -> |
304 |
:-( |
enter_loop(Mod, Options, StateName, StateData, ServerName, infinity); |
305 |
|
enter_loop(Mod, Options, StateName, StateData, Timeout) -> |
306 |
:-( |
enter_loop(Mod, Options, StateName, StateData, self(), Timeout). |
307 |
|
|
308 |
|
enter_loop(Mod, Options, StateName, StateData, ServerName, Timeout) -> |
309 |
:-( |
Name = get_proc_name(ServerName), |
310 |
:-( |
Parent = get_parent(), |
311 |
:-( |
Debug = maybe_debug_options(Options), |
312 |
:-( |
Limits = limit_options(Options), |
313 |
:-( |
Queue = queue:new(), |
314 |
:-( |
QueueLen = 0, |
315 |
:-( |
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, |
316 |
|
Limits, Queue, QueueLen). |
317 |
|
|
318 |
|
maybe_debug_options(Opts) -> |
319 |
4360 |
case lists:keyfind(debug, 1, Opts) of |
320 |
|
{_, Options} -> |
321 |
:-( |
sys:debug_options(Options); |
322 |
|
false -> |
323 |
4360 |
[] |
324 |
|
end. |
325 |
|
|
326 |
|
get_proc_name(Pid) when is_pid(Pid) -> |
327 |
:-( |
Pid; |
328 |
|
get_proc_name({local, Name}) -> |
329 |
:-( |
case process_info(self(), registered_name) of |
330 |
|
{registered_name, Name} -> |
331 |
:-( |
Name; |
332 |
|
{registered_name, _Name} -> |
333 |
:-( |
exit(process_not_registered); |
334 |
|
[] -> |
335 |
:-( |
exit(process_not_registered) |
336 |
|
end; |
337 |
|
get_proc_name({global, Name}) -> |
338 |
:-( |
case global:whereis_name(Name) of |
339 |
|
undefined -> |
340 |
:-( |
exit(process_not_registered_globally); |
341 |
|
Pid when Pid==self() -> |
342 |
:-( |
Name; |
343 |
|
_Pid -> |
344 |
:-( |
exit(process_not_registered_globally) |
345 |
|
end. |
346 |
|
|
347 |
|
get_parent() -> |
348 |
:-( |
case get('$ancestors') of |
349 |
|
[Parent | _] when is_pid(Parent) -> |
350 |
:-( |
Parent; |
351 |
|
[Parent | _] when is_atom(Parent) -> |
352 |
:-( |
name_to_pid(Parent); |
353 |
|
_ -> |
354 |
:-( |
exit(process_was_not_started_by_proc_lib) |
355 |
|
end. |
356 |
|
|
357 |
|
name_to_pid(Name) -> |
358 |
:-( |
case whereis(Name) of |
359 |
|
undefined -> |
360 |
:-( |
case global:whereis_name(Name) of |
361 |
|
undefined -> |
362 |
:-( |
exit(could_not_find_registerd_name); |
363 |
|
Pid -> |
364 |
:-( |
Pid |
365 |
|
end; |
366 |
|
Pid -> |
367 |
:-( |
Pid |
368 |
|
end. |
369 |
|
|
370 |
|
%%% --------------------------------------------------- |
371 |
|
%%% Initiate the new process. |
372 |
|
%%% Register the name using the Rfunc function |
373 |
|
%%% Calls the Mod:init/Args function. |
374 |
|
%%% Finally an acknowledge is sent to Parent and the main |
375 |
|
%%% loop is entered. |
376 |
|
%%% --------------------------------------------------- |
377 |
|
init_it(Starter, self, Name, Mod, Args, Options) -> |
378 |
:-( |
init_it(Starter, self(), Name, Mod, Args, Options); |
379 |
|
init_it(Starter, Parent, Name0, Mod, Args, Options) -> |
380 |
4360 |
Name = name(Name0), |
381 |
4360 |
Debug = maybe_debug_options(Options), |
382 |
4360 |
Limits = limit_options(Options), |
383 |
4360 |
Queue = queue:new(), |
384 |
4360 |
QueueLen = 0, |
385 |
4360 |
case catch Mod:init(Args) of |
386 |
|
{ok, StateName, StateData} -> |
387 |
:-( |
proc_lib:init_ack(Starter, {ok, self()}), |
388 |
:-( |
loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen); |
389 |
|
{ok, StateName, StateData, Timeout} -> |
390 |
3910 |
proc_lib:init_ack(Starter, {ok, self()}), |
391 |
3910 |
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen); |
392 |
|
{stop, Reason} -> |
393 |
:-( |
proc_lib:init_ack(Starter, {error, Reason}), |
394 |
:-( |
exit(Reason); |
395 |
|
ignore -> |
396 |
:-( |
proc_lib:init_ack(Starter, ignore), |
397 |
:-( |
exit(normal); |
398 |
|
{'EXIT', Reason} -> |
399 |
450 |
proc_lib:init_ack(Starter, {error, Reason}), |
400 |
450 |
exit(Reason); |
401 |
|
Else -> |
402 |
:-( |
Error = {bad_return_value, Else}, |
403 |
:-( |
proc_lib:init_ack(Starter, {error, Error}), |
404 |
:-( |
exit(Error) |
405 |
|
end. |
406 |
|
|
407 |
:-( |
name({local, Name}) -> Name; |
408 |
:-( |
name({global, Name}) -> Name; |
409 |
4360 |
name(Pid) when is_pid(Pid) -> Pid. |
410 |
|
|
411 |
|
%%----------------------------------------------------------------- |
412 |
|
%% The MAIN loop |
413 |
|
%%----------------------------------------------------------------- |
414 |
|
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug, |
415 |
|
Limits, Queue, QueueLen) |
416 |
|
when QueueLen > 0 -> |
417 |
:-( |
case queue:out(Queue) of |
418 |
|
{{value, Msg}, Queue1} -> |
419 |
:-( |
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate, |
420 |
|
Debug, Limits, Queue1, QueueLen - 1, false); |
421 |
|
{empty, _} -> |
422 |
:-( |
Reason = internal_queue_error, |
423 |
:-( |
error_info(Mod, Reason, Name, hibernate, StateName, StateData, Debug), |
424 |
:-( |
exit(Reason) |
425 |
|
end; |
426 |
|
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug, |
427 |
|
Limits, _Queue, _QueueLen) -> |
428 |
11578 |
proc_lib:hibernate(?MODULE, wake_hib, |
429 |
|
[Parent, Name, StateName, StateData, Mod, |
430 |
|
Debug, Limits]); |
431 |
|
%% First we test if we have reach a defined limit ... |
432 |
|
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, |
433 |
|
Limits, Queue, QueueLen) -> |
434 |
30919 |
try |
435 |
30919 |
message_queue_len(Limits, QueueLen) |
436 |
|
%% TODO: We can add more limit checking here... |
437 |
|
catch |
438 |
|
{process_limit, Limit} -> |
439 |
:-( |
Reason = {process_limit, Limit}, |
440 |
:-( |
Msg = {'EXIT', Parent, {error, {process_limit, Limit}}}, |
441 |
:-( |
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug, Queue) |
442 |
|
end, |
443 |
30919 |
process_message(Parent, Name, StateName, StateData, |
444 |
|
Mod, Time, Debug, Limits, Queue, QueueLen). |
445 |
|
%% ... then we can process a new message: |
446 |
|
process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, |
447 |
|
Limits, Queue, QueueLen) -> |
448 |
30919 |
{Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time), |
449 |
30919 |
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, Time, |
450 |
|
Debug, Limits, Queue1, QueueLen1, false). |
451 |
|
|
452 |
|
collect_messages(Queue, QueueLen, Time) -> |
453 |
44490 |
receive |
454 |
|
Input -> |
455 |
13571 |
case Input of |
456 |
|
{'EXIT', _Parent, priority_shutdown} -> |
457 |
:-( |
{Input, Queue, QueueLen}; |
458 |
|
_ -> |
459 |
13571 |
collect_messages( |
460 |
|
queue:in(Input, Queue), QueueLen + 1, Time) |
461 |
|
end |
462 |
|
after 0 -> |
463 |
30919 |
case queue:out(Queue) of |
464 |
|
{{value, Msg}, Queue1} -> |
465 |
13511 |
{Msg, Queue1, QueueLen - 1}; |
466 |
|
{empty, _} -> |
467 |
17408 |
receive |
468 |
|
Input -> |
469 |
17408 |
{Input, Queue, QueueLen} |
470 |
|
after Time -> |
471 |
:-( |
{{'$gen_event', timeout}, Queue, QueueLen} |
472 |
|
end |
473 |
|
end |
474 |
|
end. |
475 |
|
|
476 |
|
|
477 |
|
wake_hib(Parent, Name, StateName, StateData, Mod, Debug, |
478 |
|
Limits) -> |
479 |
11578 |
Msg = receive |
480 |
|
Input -> |
481 |
11578 |
Input |
482 |
|
end, |
483 |
11578 |
Queue = queue:new(), |
484 |
11578 |
QueueLen = 0, |
485 |
11578 |
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate, |
486 |
|
Debug, Limits, Queue, QueueLen, true). |
487 |
|
|
488 |
|
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, Time, Debug, |
489 |
|
Limits, Queue, QueueLen, Hib) -> |
490 |
42497 |
put('$internal_queue_len', QueueLen), |
491 |
42497 |
case Msg of |
492 |
|
{system, From, Req} -> |
493 |
40 |
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, |
494 |
|
[Name, StateName, StateData, |
495 |
|
Mod, Time, Limits, Queue, QueueLen], Hib); |
496 |
|
{'EXIT', Parent, Reason} -> |
497 |
:-( |
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug, Queue); |
498 |
|
_Msg when Debug == [] -> |
499 |
42457 |
handle_msg(Msg, Parent, Name, StateName, StateData, |
500 |
|
Mod, Time, Limits, Queue, QueueLen); |
501 |
|
_Msg -> |
502 |
:-( |
Debug1 = sys:handle_debug(Debug, fun print_event/3, |
503 |
|
{Name, StateName}, {in, Msg}), |
504 |
:-( |
handle_msg(Msg, Parent, Name, StateName, StateData, |
505 |
|
Mod, Time, Debug1, Limits, Queue, QueueLen) |
506 |
|
end. |
507 |
|
|
508 |
|
%%----------------------------------------------------------------- |
509 |
|
%% Callback functions for system messages handling. |
510 |
|
%%----------------------------------------------------------------- |
511 |
|
system_continue(Parent, Debug, [Name, StateName, StateData, |
512 |
|
Mod, Time, Limits, Queue, QueueLen]) -> |
513 |
33 |
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, |
514 |
|
Limits, Queue, QueueLen). |
515 |
|
|
516 |
|
-spec system_terminate(term(), _, _, [term(), ...]) -> no_return(). |
517 |
|
system_terminate(Reason, _Parent, Debug, |
518 |
|
[Name, StateName, StateData, Mod, |
519 |
|
_Time, _Limits, Queue, _QueueLen]) -> |
520 |
7 |
terminate(Reason, Name, [], Mod, StateName, StateData, Debug, Queue). |
521 |
|
|
522 |
|
system_code_change([Name, StateName, StateData, Mod, Time, |
523 |
|
Limits, Queue, QueueLen], |
524 |
|
_Module, OldVsn, Extra) -> |
525 |
:-( |
case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of |
526 |
|
{ok, NewStateName, NewStateData} -> |
527 |
:-( |
{ok, [Name, NewStateName, NewStateData, Mod, Time, |
528 |
|
Limits, Queue, QueueLen]}; |
529 |
:-( |
Else -> Else |
530 |
|
end. |
531 |
|
|
532 |
|
%%----------------------------------------------------------------- |
533 |
|
%% Format debug messages. Print them as the call-back module sees |
534 |
|
%% them, not as the real erlang messages. Use trace for that. |
535 |
|
%%----------------------------------------------------------------- |
536 |
|
print_event(Dev, {in, Msg}, {Name, StateName}) -> |
537 |
:-( |
case Msg of |
538 |
|
{'$gen_event', Event} -> |
539 |
:-( |
io:format(Dev, "*DBG* ~p got event ~p in state ~w~n", |
540 |
|
[Name, Event, StateName]); |
541 |
|
{'$gen_all_state_event', Event} -> |
542 |
:-( |
io:format(Dev, |
543 |
|
"*DBG* ~p got all_state_event ~p in state ~w~n", |
544 |
|
[Name, Event, StateName]); |
545 |
|
{timeout, Ref, {'$gen_timer', Message}} -> |
546 |
:-( |
io:format(Dev, |
547 |
|
"*DBG* ~p got timer ~p in state ~w~n", |
548 |
|
[Name, {timeout, Ref, Message}, StateName]); |
549 |
|
{timeout, _Ref, {'$gen_event', Event}} -> |
550 |
:-( |
io:format(Dev, |
551 |
|
"*DBG* ~p got timer ~p in state ~w~n", |
552 |
|
[Name, Event, StateName]); |
553 |
|
_ -> |
554 |
:-( |
io:format(Dev, "*DBG* ~p got ~p in state ~w~n", |
555 |
|
[Name, Msg, StateName]) |
556 |
|
end; |
557 |
|
print_event(Dev, {out, Msg, To, StateName}, Name) -> |
558 |
:-( |
io:format(Dev, "*DBG* ~p sent ~p to ~w~n" |
559 |
|
" and switched to state ~w~n", |
560 |
|
[Name, Msg, To, StateName]); |
561 |
|
print_event(Dev, return, {Name, StateName}) -> |
562 |
:-( |
io:format(Dev, "*DBG* ~p switched to state ~w~n", |
563 |
|
[Name, StateName]). |
564 |
|
|
565 |
|
relay_messages(MRef, TRef, Clone, Queue) -> |
566 |
:-( |
lists:foreach( |
567 |
:-( |
fun(Msg) -> Clone ! Msg end, |
568 |
|
queue:to_list(Queue)), |
569 |
:-( |
relay_messages(MRef, TRef, Clone). |
570 |
|
|
571 |
|
relay_messages(MRef, TRef, Clone) -> |
572 |
:-( |
receive |
573 |
|
{'DOWN', MRef, process, Clone, Reason} -> |
574 |
:-( |
Reason; |
575 |
|
{'EXIT', _Parent, _Reason} -> |
576 |
:-( |
{migrated, Clone}; |
577 |
|
{timeout, TRef, timeout} -> |
578 |
:-( |
{migrated, Clone}; |
579 |
|
Msg -> |
580 |
:-( |
Clone ! Msg, |
581 |
:-( |
relay_messages(MRef, TRef, Clone) |
582 |
|
end. |
583 |
|
|
584 |
|
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, |
585 |
|
Limits, Queue, QueueLen) -> %No debug here |
586 |
42457 |
From = from(Msg), |
587 |
42457 |
case catch dispatch(Msg, Mod, StateName, StateData) of |
588 |
|
{next_state, NStateName, NStateData} -> |
589 |
1 |
loop(Parent, Name, NStateName, NStateData, |
590 |
|
Mod, infinity, [], Limits, Queue, QueueLen); |
591 |
|
{next_state, NStateName, NStateData, Time1} -> |
592 |
38514 |
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], |
593 |
|
Limits, Queue, QueueLen); |
594 |
|
{reply, Reply, NStateName, NStateData} when From =/= undefined -> |
595 |
17 |
reply(From, Reply), |
596 |
17 |
loop(Parent, Name, NStateName, NStateData, |
597 |
|
Mod, infinity, [], Limits, Queue, QueueLen); |
598 |
|
{reply, Reply, NStateName, NStateData, Time1} when From =/= undefined -> |
599 |
22 |
reply(From, Reply), |
600 |
22 |
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], |
601 |
|
Limits, Queue, QueueLen); |
602 |
|
{migrate, NStateData, {Node, M, F, A}, Time1} -> |
603 |
:-( |
Reason = case catch rpc:call(Node, M, F, A, 5000) of |
604 |
|
{badrpc, _} = Err -> |
605 |
:-( |
{migration_error, Err}; |
606 |
|
{'EXIT', _} = Err -> |
607 |
:-( |
{migration_error, Err}; |
608 |
|
{error, _} = Err -> |
609 |
:-( |
{migration_error, Err}; |
610 |
|
{ok, Clone} -> |
611 |
:-( |
process_flag(trap_exit, true), |
612 |
:-( |
MRef = erlang:monitor(process, Clone), |
613 |
:-( |
TRef = erlang:start_timer(Time1, self(), timeout), |
614 |
:-( |
relay_messages(MRef, TRef, Clone, Queue); |
615 |
|
Reply -> |
616 |
:-( |
{migration_error, {bad_reply, Reply}} |
617 |
|
end, |
618 |
:-( |
terminate(Reason, Name, Msg, Mod, StateName, NStateData, [], Queue); |
619 |
|
{stop, Reason, NStateData} -> |
620 |
3890 |
terminate(Reason, Name, Msg, Mod, StateName, NStateData, [], Queue); |
621 |
|
{stop, Reason, Reply, NStateData} when From =/= undefined -> |
622 |
:-( |
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, StateName, |
623 |
|
NStateData, [], Queue)), |
624 |
:-( |
reply(From, Reply), |
625 |
:-( |
exit(R); |
626 |
|
{'EXIT', What} -> |
627 |
13 |
terminate(What, Name, Msg, Mod, StateName, StateData, [], Queue); |
628 |
|
Reply -> |
629 |
:-( |
terminate({bad_return_value, Reply}, Name, Msg, Mod, |
630 |
|
StateName, StateData, [], Queue) |
631 |
|
end. |
632 |
|
|
633 |
|
handle_msg(Msg, Parent, Name, StateName, StateData, |
634 |
|
Mod, _Time, Debug, Limits, Queue, QueueLen) -> |
635 |
:-( |
From = from(Msg), |
636 |
:-( |
case catch dispatch(Msg, Mod, StateName, StateData) of |
637 |
|
{next_state, NStateName, NStateData} -> |
638 |
:-( |
Debug1 = sys:handle_debug(Debug, fun print_event/3, |
639 |
|
{Name, NStateName}, return), |
640 |
:-( |
loop(Parent, Name, NStateName, NStateData, |
641 |
|
Mod, infinity, Debug1, Limits, Queue, QueueLen); |
642 |
|
{next_state, NStateName, NStateData, Time1} -> |
643 |
:-( |
Debug1 = sys:handle_debug(Debug, fun print_event/3, |
644 |
|
{Name, NStateName}, return), |
645 |
:-( |
loop(Parent, Name, NStateName, NStateData, |
646 |
|
Mod, Time1, Debug1, Limits, Queue, QueueLen); |
647 |
|
{reply, Reply, NStateName, NStateData} when From =/= undefined -> |
648 |
:-( |
Debug1 = reply(Name, From, Reply, Debug, NStateName), |
649 |
:-( |
loop(Parent, Name, NStateName, NStateData, |
650 |
|
Mod, infinity, Debug1, Limits, Queue, QueueLen); |
651 |
|
{reply, Reply, NStateName, NStateData, Time1} when From =/= undefined -> |
652 |
:-( |
Debug1 = reply(Name, From, Reply, Debug, NStateName), |
653 |
:-( |
loop(Parent, Name, NStateName, NStateData, |
654 |
|
Mod, Time1, Debug1, Limits, Queue, QueueLen); |
655 |
|
{migrate, NStateData, {Node, M, F, A}, Time1} -> |
656 |
:-( |
Reason = case catch rpc:call(Node, M, F, A, Time1) of |
657 |
|
{badrpc, R} -> |
658 |
:-( |
{migration_error, R}; |
659 |
|
{'EXIT', R} -> |
660 |
:-( |
{migration_error, R}; |
661 |
|
{error, R} -> |
662 |
:-( |
{migration_error, R}; |
663 |
|
{ok, Clone} -> |
664 |
:-( |
process_flag(trap_exit, true), |
665 |
:-( |
MRef = erlang:monitor(process, Clone), |
666 |
:-( |
TRef = erlang:start_timer(Time1, self(), timeout), |
667 |
:-( |
relay_messages(MRef, TRef, Clone, Queue); |
668 |
|
Reply -> |
669 |
:-( |
{migration_error, {bad_reply, Reply}} |
670 |
|
end, |
671 |
:-( |
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug, Queue); |
672 |
|
{stop, Reason, NStateData} -> |
673 |
:-( |
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug, Queue); |
674 |
|
{stop, Reason, Reply, NStateData} when From =/= undefined -> |
675 |
:-( |
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, StateName, |
676 |
|
NStateData, Debug, Queue)), |
677 |
:-( |
reply(Name, From, Reply, Debug, StateName), |
678 |
:-( |
exit(R); |
679 |
|
{'EXIT', What} -> |
680 |
:-( |
terminate(What, Name, Msg, Mod, StateName, StateData, Debug, Queue); |
681 |
|
Reply -> |
682 |
:-( |
terminate({bad_return_value, Reply}, Name, Msg, Mod, |
683 |
|
StateName, StateData, Debug, Queue) |
684 |
|
end. |
685 |
|
|
686 |
|
dispatch({'$gen_event', Event}, Mod, StateName, StateData) -> |
687 |
27731 |
Mod:StateName(Event, StateData); |
688 |
|
dispatch({'$gen_all_state_event', Event}, Mod, StateName, StateData) -> |
689 |
467 |
Mod:handle_event(Event, StateName, StateData); |
690 |
|
dispatch({'$gen_sync_event', From, Event}, Mod, StateName, StateData) -> |
691 |
11 |
Mod:StateName(Event, From, StateData); |
692 |
|
dispatch({'$gen_sync_all_state_event', From, Event}, |
693 |
|
Mod, StateName, StateData) -> |
694 |
39 |
Mod:handle_sync_event(Event, From, StateName, StateData); |
695 |
|
dispatch({timeout, Ref, {'$gen_timer', Msg}}, Mod, StateName, StateData) -> |
696 |
:-( |
Mod:StateName({timeout, Ref, Msg}, StateData); |
697 |
|
dispatch({timeout, _Ref, {'$gen_event', Event}}, Mod, StateName, StateData) -> |
698 |
:-( |
Mod:StateName(Event, StateData); |
699 |
|
dispatch(Info, Mod, StateName, StateData) -> |
700 |
14209 |
Mod:handle_info(Info, StateName, StateData). |
701 |
|
|
702 |
11 |
from({'$gen_sync_event', From, _Event}) -> From; |
703 |
39 |
from({'$gen_sync_all_state_event', From, _Event}) -> From; |
704 |
42407 |
from(_) -> undefined. |
705 |
|
|
706 |
|
%% Send a reply to the client. |
707 |
|
reply({To, Tag}, Reply) -> |
708 |
50 |
catch To ! {Tag, Reply}. |
709 |
|
|
710 |
|
reply(Name, {To, Tag}, Reply, Debug, StateName) -> |
711 |
:-( |
reply({To, Tag}, Reply), |
712 |
:-( |
sys:handle_debug(Debug, fun print_event/3, Name, |
713 |
|
{out, Reply, To, StateName}). |
714 |
|
|
715 |
|
%%% --------------------------------------------------- |
716 |
|
%%% Terminate the server. |
717 |
|
%%% --------------------------------------------------- |
718 |
|
|
719 |
|
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug, Queue) -> |
720 |
3910 |
List = queue:to_list(Queue), |
721 |
3910 |
case catch Mod:terminate(Reason, StateName, StateData, List) of |
722 |
|
{'EXIT', R} -> |
723 |
:-( |
error_info(Mod, R, Name, Msg, StateName, StateData, Debug), |
724 |
:-( |
exit(R); |
725 |
|
_ -> |
726 |
3910 |
case Reason of |
727 |
|
normal -> |
728 |
3899 |
exit(normal); |
729 |
|
shutdown -> |
730 |
:-( |
exit(shutdown); |
731 |
|
priority_shutdown -> |
732 |
|
%% Priority shutdown should be considered as |
733 |
|
%% shutdown by SASL |
734 |
:-( |
exit(shutdown); |
735 |
|
{handover_session, _From} -> |
736 |
11 |
exit(normal); |
737 |
|
{process_limit, _Limit} -> |
738 |
:-( |
exit(Reason); |
739 |
|
{migrated, _Clone} -> |
740 |
:-( |
exit(normal); |
741 |
|
_ -> |
742 |
:-( |
error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug), |
743 |
:-( |
exit(Reason) |
744 |
|
end |
745 |
|
end. |
746 |
|
|
747 |
|
error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug) -> |
748 |
:-( |
Reason1 = |
749 |
|
case Reason of |
750 |
|
{undef, [{M, F, A}|MFAs]} -> |
751 |
:-( |
case code:is_loaded(M) of |
752 |
|
false -> |
753 |
:-( |
{'module could not be loaded', [{M, F, A}|MFAs]}; |
754 |
|
_ -> |
755 |
:-( |
case erlang:function_exported(M, F, length(A)) of |
756 |
|
true -> |
757 |
:-( |
Reason; |
758 |
|
false -> |
759 |
:-( |
{'function not exported', [{M, F, A}|MFAs]} |
760 |
|
end |
761 |
|
end; |
762 |
|
_ -> |
763 |
:-( |
Reason |
764 |
|
end, |
765 |
:-( |
StateToPrint = case erlang:function_exported(Mod, print_state, 1) of |
766 |
:-( |
true -> (catch Mod:print_state(StateData)); |
767 |
:-( |
false -> StateData |
768 |
|
end, |
769 |
:-( |
Str = "** State machine ~p terminating \n" ++ |
770 |
|
get_msg_str(Msg) ++ |
771 |
|
"** When State == ~p~n" |
772 |
|
"** Data == ~p~n" |
773 |
|
"** Reason for termination = ~n** ~p~n", |
774 |
:-( |
format(Str, [Name, get_msg(Msg), StateName, StateToPrint, Reason1]), |
775 |
:-( |
sys:print_log(Debug), |
776 |
:-( |
ok. |
777 |
|
|
778 |
|
get_msg_str({'$gen_event', _Event}) -> |
779 |
:-( |
"** Last event in was ~p~n"; |
780 |
|
get_msg_str({'$gen_sync_event', _Event}) -> |
781 |
:-( |
"** Last sync event in was ~p~n"; |
782 |
|
get_msg_str({'$gen_all_state_event', _Event}) -> |
783 |
:-( |
"** Last event in was ~p (for all states)~n"; |
784 |
|
get_msg_str({'$gen_sync_all_state_event', _Event}) -> |
785 |
:-( |
"** Last sync event in was ~p (for all states)~n"; |
786 |
|
get_msg_str({timeout, _Ref, {'$gen_timer', _Msg}}) -> |
787 |
:-( |
"** Last timer event in was ~p~n"; |
788 |
|
get_msg_str({timeout, _Ref, {'$gen_event', _Msg}}) -> |
789 |
:-( |
"** Last timer event in was ~p~n"; |
790 |
|
get_msg_str(_Msg) -> |
791 |
:-( |
"** Last message in was ~p~n". |
792 |
|
|
793 |
:-( |
get_msg({'$gen_event', Event}) -> Event; |
794 |
:-( |
get_msg({'$gen_sync_event', Event}) -> Event; |
795 |
:-( |
get_msg({'$gen_all_state_event', Event}) -> Event; |
796 |
:-( |
get_msg({'$gen_sync_all_state_event', Event}) -> Event; |
797 |
:-( |
get_msg({timeout, Ref, {'$gen_timer', Msg}}) -> {timeout, Ref, Msg}; |
798 |
:-( |
get_msg({timeout, _Ref, {'$gen_event', Event}}) -> Event; |
799 |
:-( |
get_msg(Msg) -> Msg. |
800 |
|
|
801 |
|
%%----------------------------------------------------------------- |
802 |
|
%% Status information |
803 |
|
%%----------------------------------------------------------------- |
804 |
|
format_status(Opt, StatusData) -> |
805 |
17 |
[PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time | _]] = |
806 |
|
StatusData, |
807 |
17 |
NameTag = if is_pid(Name) -> |
808 |
17 |
pid_to_list(Name); |
809 |
|
is_atom(Name) -> |
810 |
:-( |
Name |
811 |
|
end, |
812 |
17 |
Header = lists:concat(["Status for state machine ", NameTag]), |
813 |
17 |
Log = sys:get_log(Debug), |
814 |
17 |
Specfic = |
815 |
|
case erlang:function_exported(Mod, format_status, 2) of |
816 |
|
true -> |
817 |
:-( |
case catch Mod:format_status(Opt, [PDict, StateData]) of |
818 |
:-( |
{'EXIT', _} -> [{data, [{"StateData", StateData}]}]; |
819 |
:-( |
Else -> Else |
820 |
|
end; |
821 |
|
_ -> |
822 |
17 |
[{data, [{"StateData", StateData}]}] |
823 |
|
end, |
824 |
17 |
[{header, Header}, |
825 |
|
{data, [{"Status", SysState}, |
826 |
|
{"Parent", Parent}, |
827 |
|
{"Logged events", Log}, |
828 |
|
{"StateName", StateName}]} | |
829 |
|
Specfic]. |
830 |
|
|
831 |
|
%%----------------------------------------------------------------- |
832 |
|
%% Resources limit management |
833 |
|
%%----------------------------------------------------------------- |
834 |
|
%% Extract know limit options |
835 |
|
limit_options(Options) -> |
836 |
4360 |
limit_options(Options, #limits{}). |
837 |
|
limit_options([], Limits) -> |
838 |
4360 |
Limits; |
839 |
|
%% Maximum number of messages allowed in the process message queue |
840 |
|
limit_options([{max_queue, N}|Options], Limits) |
841 |
|
when is_integer(N) -> |
842 |
4360 |
NewLimits = Limits#limits{max_queue=N}, |
843 |
4360 |
limit_options(Options, NewLimits); |
844 |
|
limit_options([_|Options], Limits) -> |
845 |
:-( |
limit_options(Options, Limits). |
846 |
|
|
847 |
|
%% Throw max_queue if we have reach the max queue size |
848 |
|
%% Returns ok otherwise |
849 |
|
message_queue_len(#limits{max_queue = undefined}, _QueueLen) -> |
850 |
:-( |
ok; |
851 |
|
message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) -> |
852 |
30919 |
Pid = self(), |
853 |
30919 |
case process_info(Pid, message_queue_len) of |
854 |
|
{message_queue_len, N} when N + QueueLen > MaxQueue -> |
855 |
:-( |
throw({process_limit, {max_queue, N + QueueLen}}); |
856 |
|
_ -> |
857 |
30919 |
ok |
858 |
|
end. |