./ct_report/coverage/mod_stream_management_mnesia.COVER.html

1 -module(mod_stream_management_mnesia).
2 -behaviour(mod_stream_management_backend).
3 -behaviour(gen_server).
4
5 -include("mongoose.hrl").
6 -include("jlib.hrl").
7 -include_lib("stdlib/include/ms_transform.hrl").
8
9 -export([init/2,
10 register_smid/3,
11 unregister_smid/2,
12 get_sid/2]).
13
14 -export([read_stale_h/2,
15 write_stale_h/3,
16 delete_stale_h/2]).
17
18 %% Internal exports
19 -export([start_link/1]).
20
21 %% gen_server callbacks
22 -export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
23
24 -ignore_xref([start_link/1]).
25
26 -record(smgc_state,
27 {gc_repeat_after :: non_neg_integer(),
28 gc_geriatric :: non_neg_integer() }).
29
30 -record(stream_mgmt_stale_h,
31 {smid :: mod_stream_management:smid(),
32 h :: non_neg_integer(),
33 stamp :: non_neg_integer() }).
34
35 -record(sm_session,
36 {smid :: mod_stream_management:smid(),
37 sid :: ejabberd_sm:sid() }).
38
39 init(_HostType, #{stale_h := StaleOpts}) ->
40 383 mnesia:create_table(sm_session, [{ram_copies, [node()]},
41 {attributes, record_info(fields, sm_session)}]),
42 383 mnesia:add_table_index(sm_session, sid),
43 383 mnesia:add_table_copy(sm_session, node(), ram_copies),
44 383 maybe_init_stale_h(StaleOpts),
45 383 ok.
46
47 maybe_init_stale_h(StaleOpts = #{enabled := true}) ->
48 3 ?LOG_INFO(#{what => stream_mgmt_stale_h_start}),
49 3 mnesia:create_table(stream_mgmt_stale_h,
50 [{ram_copies, [node()]},
51 {attributes, record_info(fields, stream_mgmt_stale_h)}]),
52 3 mnesia:add_table_copy(stream_mgmt_stale_h, node(), ram_copies),
53 3 start_cleaner(StaleOpts);
54 380 maybe_init_stale_h(_) -> ok.
55
56 -spec register_smid(HostType, SMID, SID) ->
57 ok | {error, term()} when
58 HostType :: mongooseim:host_type(),
59 SMID :: mod_stream_management:smid(),
60 SID :: ejabberd_sm:sid().
61 register_smid(_HostType, SMID, SID) ->
62 44 try
63 44 mnesia:sync_dirty(fun mnesia:write/1,
64 [#sm_session{smid = SMID, sid = SID}]),
65 44 ok
66 catch exit:Reason ->
67
:-(
{error, Reason}
68 end.
69
70 -spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) ->
71 {ok, SMID :: mod_stream_management:smid()} | {error, smid_not_found}.
72 unregister_smid(_HostType, SID) ->
73 3026 case mnesia:dirty_index_read(sm_session, SID, #sm_session.sid) of
74 [] ->
75 2983 {error, smid_not_found};
76 [#sm_session{smid = SMID}] ->
77 43 mnesia:dirty_delete(sm_session, SMID),
78 43 {ok, SMID}
79 end.
80
81 -spec get_sid(mongooseim:host_type(), mod_stream_management:smid()) ->
82 {sid, ejabberd_sm:sid()} | {error, smid_not_found}.
83 get_sid(_HostType, SMID) ->
84 18 case mnesia:dirty_read(sm_session, SMID) of
85 12 [#sm_session{sid = SID}] -> {sid, SID};
86 6 [] -> {error, smid_not_found}
87 end.
88
89 %% stale_h functions
90
91 -spec read_stale_h(HostType, SMID) ->
92 {stale_h, non_neg_integer()} | {error, smid_not_found} when
93 HostType :: mongooseim:host_type(),
94 SMID :: mod_stream_management:smid().
95 read_stale_h(_HostType, SMID) ->
96 25 try
97 25 case mnesia:dirty_read(stream_mgmt_stale_h, SMID) of
98 24 [#stream_mgmt_stale_h{h = H}] -> {stale_h, H};
99 1 [] -> {error, smid_not_found}
100 end
101 catch exit:_Reason ->
102
:-(
{error, smid_not_found}
103 end.
104
105 -spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when
106 HostType :: mongooseim:host_type(),
107 SMID :: mod_stream_management:smid(),
108 H :: non_neg_integer().
109 write_stale_h(_HostType, SMID, H) ->
110 8 try
111 8 Stamp = erlang:monotonic_time(second),
112 8 mnesia:dirty_write(#stream_mgmt_stale_h{smid = SMID, h = H, stamp = Stamp})
113 catch exit:Reason ->
114
:-(
{error, Reason}
115 end.
116
117 -spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when
118 HostType :: mongooseim:host_type(),
119 SMID :: mod_stream_management:smid().
120 delete_stale_h(_HostType, SMID) ->
121
:-(
try
122
:-(
mnesia:dirty_delete(stream_mgmt_stale_h, SMID)
123 catch exit:Reason ->
124
:-(
{error, Reason}
125 end.
126
127 %% stale_h cleaning logic
128
129 start_cleaner(Opts) ->
130 3 MFA = {?MODULE, start_link, [Opts]},
131 3 ChildSpec = {stream_management_stale_h, MFA, permanent, 5000, worker, [?MODULE]},
132 %% TODO cleaner should be a service
133 3 ejabberd_sup:start_child(ChildSpec).
134
135 start_link(Opts) ->
136 3 gen_server:start_link({local, stream_management_stale_h}, ?MODULE, Opts, []).
137
138 init(#{repeat_after := RepeatAfter, geriatric := GeriatricAge}) ->
139 3 State = #smgc_state{gc_repeat_after = RepeatAfter,
140 gc_geriatric = GeriatricAge},
141 3 schedule_check(State),
142 3 {ok, State}.
143
144 handle_call(Msg, From, State) ->
145
:-(
?UNEXPECTED_CALL(Msg, From),
146
:-(
{reply, ok, State}.
147
148 handle_cast(Msg, State) ->
149
:-(
?UNEXPECTED_CAST(Msg),
150
:-(
{noreply, State}.
151
152 handle_info(check, #smgc_state{gc_geriatric = GeriatricAge} = State) ->
153 2 clear_table(GeriatricAge),
154 2 schedule_check(State),
155 2 {noreply, State};
156 handle_info(Info, State) ->
157
:-(
?UNEXPECTED_INFO(Info),
158
:-(
{noreply, State}.
159
160 schedule_check(#smgc_state{gc_repeat_after = RepeatAfter}) ->
161 5 erlang:send_after(timer:seconds(RepeatAfter), self(), check).
162
163 clear_table(GeriatricAge) ->
164 2 TimeToDie = erlang:monotonic_time(second) - GeriatricAge,
165 2 MS = ets:fun2ms(fun(#stream_mgmt_stale_h{stamp = S}) when S < TimeToDie -> true end),
166 2 ets:select_delete(stream_mgmt_stale_h, MS).
Line Hits Source