1 |
|
-module(mod_stream_management_mnesia). |
2 |
|
-behaviour(mod_stream_management_backend). |
3 |
|
-behaviour(gen_server). |
4 |
|
|
5 |
|
-include("mongoose.hrl"). |
6 |
|
-include("jlib.hrl"). |
7 |
|
-include_lib("stdlib/include/ms_transform.hrl"). |
8 |
|
|
9 |
|
-export([init/2, |
10 |
|
register_smid/3, |
11 |
|
unregister_smid/2, |
12 |
|
get_sid/2]). |
13 |
|
|
14 |
|
-export([read_stale_h/2, |
15 |
|
write_stale_h/3, |
16 |
|
delete_stale_h/2]). |
17 |
|
|
18 |
|
%% Internal exports |
19 |
|
-export([start_link/1]). |
20 |
|
|
21 |
|
%% gen_server callbacks |
22 |
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). |
23 |
|
|
24 |
|
-ignore_xref([start_link/1]). |
25 |
|
|
26 |
|
-record(smgc_state, |
27 |
|
{gc_repeat_after :: non_neg_integer(), |
28 |
|
gc_geriatric :: non_neg_integer() }). |
29 |
|
|
30 |
|
-record(stream_mgmt_stale_h, |
31 |
|
{smid :: mod_stream_management:smid(), |
32 |
|
h :: non_neg_integer(), |
33 |
|
stamp :: non_neg_integer() }). |
34 |
|
|
35 |
|
-record(sm_session, |
36 |
|
{smid :: mod_stream_management:smid(), |
37 |
|
sid :: ejabberd_sm:sid() }). |
38 |
|
|
39 |
|
init(_HostType, #{stale_h := StaleOpts}) -> |
40 |
383 |
mnesia:create_table(sm_session, [{ram_copies, [node()]}, |
41 |
|
{attributes, record_info(fields, sm_session)}]), |
42 |
383 |
mnesia:add_table_index(sm_session, sid), |
43 |
383 |
mnesia:add_table_copy(sm_session, node(), ram_copies), |
44 |
383 |
maybe_init_stale_h(StaleOpts), |
45 |
383 |
ok. |
46 |
|
|
47 |
|
maybe_init_stale_h(StaleOpts = #{enabled := true}) -> |
48 |
3 |
?LOG_INFO(#{what => stream_mgmt_stale_h_start}), |
49 |
3 |
mnesia:create_table(stream_mgmt_stale_h, |
50 |
|
[{ram_copies, [node()]}, |
51 |
|
{attributes, record_info(fields, stream_mgmt_stale_h)}]), |
52 |
3 |
mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies), |
53 |
3 |
start_cleaner(StaleOpts); |
54 |
380 |
maybe_init_stale_h(_) -> ok. |
55 |
|
|
56 |
|
-spec register_smid(HostType, SMID, SID) -> |
57 |
|
ok | {error, term()} when |
58 |
|
HostType :: mongooseim:host_type(), |
59 |
|
SMID :: mod_stream_management:smid(), |
60 |
|
SID :: ejabberd_sm:sid(). |
61 |
|
register_smid(_HostType, SMID, SID) -> |
62 |
44 |
try |
63 |
44 |
mnesia:sync_dirty(fun mnesia:write/1, |
64 |
|
[#sm_session{smid = SMID, sid = SID}]), |
65 |
44 |
ok |
66 |
|
catch exit:Reason -> |
67 |
:-( |
{error, Reason} |
68 |
|
end. |
69 |
|
|
70 |
|
-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> |
71 |
|
{ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}. |
72 |
|
unregister_smid(_HostType, SID) -> |
73 |
2704 |
case mnesia:dirty_index_read(sm_session, SID, #sm_session.sid) of |
74 |
|
[] -> |
75 |
2661 |
{error, smid_not_found}; |
76 |
|
[#sm_session{smid = SMID}] -> |
77 |
43 |
mnesia:dirty_delete(sm_session, SMID), |
78 |
43 |
{ok, SMID} |
79 |
|
end. |
80 |
|
|
81 |
|
-spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) -> |
82 |
|
{sid, ejabberd_sm:sid()} | {error, smid_not_found}. |
83 |
|
get_sid(_HostType, SMID) -> |
84 |
18 |
case mnesia:dirty_read(sm_session, SMID) of |
85 |
12 |
[#sm_session{sid = SID}] -> {sid, SID}; |
86 |
6 |
[] -> {error, smid_not_found} |
87 |
|
end. |
88 |
|
|
89 |
|
%% stale_h functions |
90 |
|
|
91 |
|
-spec read_stale_h(HostType, SMID) -> |
92 |
|
{stale_h, non_neg_integer()} | {error, smid_not_found} when |
93 |
|
HostType :: mongooseim:host_type(), |
94 |
|
SMID :: mod_stream_management:smid(). |
95 |
|
read_stale_h(_HostType, SMID) -> |
96 |
25 |
try |
97 |
25 |
case mnesia:dirty_read(stream_mgmt_stale_h, SMID) of |
98 |
24 |
[#stream_mgmt_stale_h{h = H}] -> {stale_h, H}; |
99 |
1 |
[] -> {error, smid_not_found} |
100 |
|
end |
101 |
|
catch exit:_Reason -> |
102 |
:-( |
{error, smid_not_found} |
103 |
|
end. |
104 |
|
|
105 |
|
-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when |
106 |
|
HostType :: mongooseim:host_type(), |
107 |
|
SMID :: mod_stream_management:smid(), |
108 |
|
H :: non_neg_integer(). |
109 |
|
write_stale_h(_HostType, SMID, H) -> |
110 |
8 |
try |
111 |
8 |
Stamp = erlang:monotonic_time(second), |
112 |
8 |
mnesia:dirty_write(#stream_mgmt_stale_h{smid = SMID, h = H, stamp = Stamp}) |
113 |
|
catch exit:Reason -> |
114 |
:-( |
{error, Reason} |
115 |
|
end. |
116 |
|
|
117 |
|
-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when |
118 |
|
HostType :: mongooseim:host_type(), |
119 |
|
SMID :: mod_stream_management:smid(). |
120 |
|
delete_stale_h(_HostType, SMID) -> |
121 |
:-( |
try |
122 |
:-( |
mnesia:dirty_delete(stream_mgmt_stale_h, SMID) |
123 |
|
catch exit:Reason -> |
124 |
:-( |
{error, Reason} |
125 |
|
end. |
126 |
|
|
127 |
|
%% stale_h cleaning logic |
128 |
|
|
129 |
|
start_cleaner(Opts) -> |
130 |
3 |
MFA = {?MODULE, start_link, [Opts]}, |
131 |
3 |
ChildSpec = {stream_management_stale_h, MFA, permanent, 5000, worker, [?MODULE]}, |
132 |
|
%% TODO cleaner should be a service |
133 |
3 |
ejabberd_sup:start_child(ChildSpec). |
134 |
|
|
135 |
|
start_link(Opts) -> |
136 |
3 |
gen_server:start_link({local, stream_management_stale_h}, ?MODULE, Opts, []). |
137 |
|
|
138 |
|
init(#{repeat_after := RepeatAfter, geriatric := GeriatricAge}) -> |
139 |
3 |
State = #smgc_state{gc_repeat_after = RepeatAfter, |
140 |
|
gc_geriatric = GeriatricAge}, |
141 |
3 |
schedule_check(State), |
142 |
3 |
{ok, State}. |
143 |
|
|
144 |
|
handle_call(Msg, From, State) -> |
145 |
:-( |
?UNEXPECTED_CALL(Msg, From), |
146 |
:-( |
{reply, ok, State}. |
147 |
|
|
148 |
|
handle_cast(Msg, State) -> |
149 |
:-( |
?UNEXPECTED_CAST(Msg), |
150 |
:-( |
{noreply, State}. |
151 |
|
|
152 |
|
handle_info(check, #smgc_state{gc_geriatric = GeriatricAge} = State) -> |
153 |
2 |
clear_table(GeriatricAge), |
154 |
2 |
schedule_check(State), |
155 |
2 |
{noreply, State}; |
156 |
|
handle_info(Info, State) -> |
157 |
:-( |
?UNEXPECTED_INFO(Info), |
158 |
:-( |
{noreply, State}. |
159 |
|
|
160 |
|
schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) -> |
161 |
5 |
erlang:send_after(timer:seconds(RepeatAfter), self(), check). |
162 |
|
|
163 |
|
clear_table(GeriatricAge) -> |
164 |
2 |
TimeToDie = erlang:monotonic_time(second) - GeriatricAge, |
165 |
2 |
MS = ets:fun2ms(fun(#stream_mgmt_stale_h{stamp = S}) when S < TimeToDie -> true end), |
166 |
2 |
ets:select_delete(stream_mgmt_stale_h, MS). |