1 |
|
%% @doc Disallow MongooseIM node connections until cleaning is done |
2 |
|
%% |
3 |
|
%% This module prevents a node from reconnecting, until cleaning activity is |
4 |
|
%% finished. It prevents race conditions. |
5 |
|
%% |
6 |
|
%% This module assume all nodes share the same cookie. |
7 |
|
-module(mongoose_dist_blocker). |
8 |
|
-behaviour(gen_server). |
9 |
|
|
10 |
|
-include("mongoose_logger.hrl"). |
11 |
|
|
12 |
|
%% API |
13 |
|
-export([start_link/0, |
14 |
|
add_cleaner/1, |
15 |
|
cleaning_done/2]). |
16 |
|
|
17 |
|
%% gen_server callbacks |
18 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, |
19 |
|
terminate/2, code_change/3]). |
20 |
|
|
21 |
|
-ignore_xref([start_link/0]). |
22 |
|
|
23 |
|
start_link() -> |
24 |
4 |
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
25 |
|
|
26 |
|
%% Register Pid as a cleaner. |
27 |
|
add_cleaner(CleanerPid) -> |
28 |
4 |
gen_server:call(?MODULE, {add_cleaner, CleanerPid}). |
29 |
|
|
30 |
|
%% Cleaner calls must call this function. |
31 |
|
cleaning_done(CleanerPid, Node) -> |
32 |
:-( |
gen_server:call(?MODULE, {cleaning_done, CleanerPid, Node}). |
33 |
|
|
34 |
|
%%-------------------------------------------------------------------- |
35 |
|
%% gen_server callbacks |
36 |
|
%%-------------------------------------------------------------------- |
37 |
|
init([]) -> |
38 |
4 |
net_kernel:monitor_nodes(true), |
39 |
4 |
State = #{cleaners => [], waiting => []}, |
40 |
4 |
State2 = lists:foldl(fun handle_nodeup/2, State, nodes()), |
41 |
4 |
{ok, State2}. |
42 |
|
|
43 |
|
handle_call({add_cleaner, CleanerPid}, _From, State) -> |
44 |
4 |
{reply, ok, handle_add_cleaner(CleanerPid, State)}; |
45 |
|
handle_call({cleaning_done, CleanerPid, Node}, _From, State) -> |
46 |
:-( |
{reply, ok, maybe_unblock(State, handle_cleaning_done(CleanerPid, Node, State))}; |
47 |
|
handle_call(Request, From, State) -> |
48 |
:-( |
?UNEXPECTED_CALL(Request, From), |
49 |
:-( |
{reply, ok, State}. |
50 |
|
|
51 |
|
handle_cast(Msg, State) -> |
52 |
:-( |
?UNEXPECTED_CAST(Msg), |
53 |
:-( |
{noreply, State}. |
54 |
|
|
55 |
|
handle_info({nodeup, Node}, State) -> |
56 |
2 |
{noreply, handle_nodeup(Node, State)}; |
57 |
|
handle_info({nodedown, Node}, State) -> |
58 |
:-( |
{noreply, handle_nodedown(Node, State)}; |
59 |
|
handle_info({'DOWN', _Ref, process, Pid, _Info}, State) -> |
60 |
4 |
{noreply, maybe_unblock(State, handle_cleaner_down(Pid, State))}; |
61 |
|
handle_info(Info, State) -> |
62 |
:-( |
?UNEXPECTED_INFO(Info), |
63 |
:-( |
{noreply, State}. |
64 |
|
|
65 |
|
terminate(_Reason, _State) -> |
66 |
:-( |
ok. |
67 |
|
|
68 |
|
code_change(_OldVsn, State, _Extra) -> |
69 |
:-( |
{ok, State}. |
70 |
|
|
71 |
|
%%-------------------------------------------------------------------- |
72 |
|
%% internal functions |
73 |
|
%%-------------------------------------------------------------------- |
74 |
|
|
75 |
|
handle_nodeup(Node, State) -> |
76 |
|
%% We change the cookie as soon as the node is connected. |
77 |
|
%% Alternative is to do it on nodedown, but because nodedown-s are async, |
78 |
|
%% we would have a high chance of race conditions (so, node could reconnect |
79 |
|
%% before we set cookie). |
80 |
2 |
erlang:set_cookie(Node, blocking_cookie()), |
81 |
2 |
State. |
82 |
|
|
83 |
|
%% Make cookie, that would prevent node from connecting |
84 |
|
blocking_cookie() -> |
85 |
2 |
list_to_atom(atom_to_list(erlang:get_cookie()) ++ "_blocked_by_" ++ atom_to_list(node())). |
86 |
|
|
87 |
|
%% Allow the node to connect to us again |
88 |
|
unblock_node(Node) -> |
89 |
:-( |
erlang:set_cookie(Node, erlang:get_cookie()). |
90 |
|
|
91 |
|
handle_nodedown(Node, State = #{cleaners := []}) -> |
92 |
|
%% Skip waiting when no cleaners |
93 |
:-( |
unblock_node(Node), |
94 |
:-( |
State; |
95 |
|
handle_nodedown(Node, State = #{cleaners := Cleaners, waiting := Waiting}) -> |
96 |
:-( |
New = [{Node, CleanerPid} || CleanerPid <- Cleaners], |
97 |
:-( |
State#{waiting := lists:usort(New ++ Waiting)}. |
98 |
|
|
99 |
|
handle_add_cleaner(CleanerPid, State = #{cleaners := Cleaners}) -> |
100 |
4 |
erlang:monitor(process, CleanerPid), |
101 |
4 |
State#{cleaners := lists:usort([CleanerPid | Cleaners])}. |
102 |
|
|
103 |
|
handle_cleaning_done(CleanerPid, Node, State = #{waiting := Waiting}) -> |
104 |
:-( |
State#{waiting := lists:delete({Node, CleanerPid}, Waiting)}. |
105 |
|
|
106 |
|
handle_cleaner_down(CleanerPid, State = #{cleaners := Cleaners, waiting := Waiting}) -> |
107 |
4 |
State#{cleaners := lists:delete(CleanerPid, Cleaners), |
108 |
:-( |
waiting := [X || {_Node, CleanerPid2} = X <- Waiting, CleanerPid =/= CleanerPid2]}. |
109 |
|
|
110 |
|
%% Unblock nodes when the last cleaner confirms the cleaning is done. |
111 |
|
%% Call this function each time you remove entries from the waiting list. |
112 |
|
maybe_unblock(_OldState = #{waiting := OldWaiting}, NewState = #{waiting := NewWaiting}) -> |
113 |
4 |
OldNodes = cast_waiting_to_nodes(OldWaiting), |
114 |
4 |
NewNodes = cast_waiting_to_nodes(NewWaiting), |
115 |
4 |
CleanedNodes = OldNodes -- NewNodes, |
116 |
4 |
[unblock_node(Node) || Node <- CleanedNodes], |
117 |
4 |
NewState. |
118 |
|
|
119 |
|
cast_waiting_to_nodes(Waiting) -> |
120 |
8 |
lists:usort([Node || {Node, _CleanerPid} <- Waiting]). |