1 |
|
-module(mod_stream_management). |
2 |
|
-xep([{xep, 198}, {version, "1.6.1"}]). |
3 |
|
-behaviour(gen_mod). |
4 |
|
-behaviour(gen_statem). |
5 |
|
-behaviour(mongoose_module_metrics). |
6 |
|
|
7 |
|
%% `gen_mod' callbacks |
8 |
|
-export([start/2, |
9 |
|
stop/1, |
10 |
|
hooks/1, |
11 |
|
config_spec/0, |
12 |
|
supported_features/0, |
13 |
|
process_buffer_and_ack/1]). |
14 |
|
|
15 |
|
%% hooks handlers |
16 |
|
-export([c2s_stream_features/3, |
17 |
|
session_cleanup/3, |
18 |
|
user_send_packet/3, |
19 |
|
user_receive_packet/3, |
20 |
|
xmpp_presend_element/3, |
21 |
|
user_send_xmlel/3, |
22 |
|
foreign_event/3, |
23 |
|
handle_user_stopping/3, |
24 |
|
user_terminate/3, |
25 |
|
reroute_unacked_messages/3 |
26 |
|
]). |
27 |
|
|
28 |
|
%% gen_statem callbacks |
29 |
|
-export([callback_mode/0, init/1, handle_event/4, terminate/3]). |
30 |
|
|
31 |
|
%% helpers |
32 |
|
-export([handle_resume/3, if_not_already_enabled_create_sm_state/1]). |
33 |
|
-export([stream_error/2, register_smid_return_enabled_stanza/1]). |
34 |
|
|
35 |
|
%% API for inspection and tests |
36 |
|
-export([get_sid/2, get_stale_h/2, get_session_from_smid/2, |
37 |
|
register_smid/3, register_stale_smid_h/3, remove_stale_smid_h/2]). |
38 |
|
-ignore_xref([get_sid/2, get_stale_h/2, get_session_from_smid/2, |
39 |
|
register_smid/3, register_stale_smid_h/3, remove_stale_smid_h/2]). |
40 |
|
|
41 |
|
-include("mongoose.hrl"). |
42 |
|
-include("jlib.hrl"). |
43 |
|
-include("mongoose_config_spec.hrl"). |
44 |
|
-define(STREAM_MGMT_H_MAX, (1 bsl 32 - 1)). |
45 |
|
-define(CONSTRAINT_CHECK_TIMEOUT, 5000). %% 5 seconds |
46 |
|
-define(IS_STREAM_MGMT_STOP(R), R =:= {shutdown, ?MODULE}; R =:= {shutdown, resumed}). |
47 |
|
-define(IS_ALLOWED_STATE(S), S =:= wait_for_session_establishment; S =:= session_established). |
48 |
|
|
49 |
|
-record(sm_state, { |
50 |
|
buffer = [] :: [mongoose_acc:t()], |
51 |
|
buffer_size = 0 :: non_neg_integer(), |
52 |
|
counter_in = 0 :: short(), |
53 |
|
counter_out = 0 :: short(), |
54 |
|
buffer_max = 100 :: buffer_max(), |
55 |
|
ack_freq = 1 :: ack_freq(), |
56 |
|
peer :: undefined | {gen_statem, gen_statem:from()} | {sid, ejabberd_sm:sid()} |
57 |
|
}). |
58 |
|
|
59 |
|
-type sm_state() :: #sm_state{}. |
60 |
|
-type maybe_sm_state() :: {error, not_found} | #sm_state{}. |
61 |
|
-type c2s_state() :: mongoose_c2s:state(resume_session). |
62 |
|
|
63 |
|
-type buffer_max() :: pos_integer() | infinity | no_buffer. |
64 |
|
-type ack_freq() :: pos_integer() | never. |
65 |
|
|
66 |
|
-type resume_return() :: {ok, #{resumed := exml:element(), |
67 |
|
forward := [exml:element()], |
68 |
|
c2s_state := mongoose_c2s:state(), |
69 |
|
c2s_data := mongoose_c2s:data()}} |
70 |
|
| {stream_mgmt_error, exml:element()} |
71 |
|
| {error, exml:element()} |
72 |
|
| {error, exml:element(), term()}. |
73 |
|
|
74 |
|
%% Type base64:ascii_binary() is not exported |
75 |
|
-type smid() :: binary(). |
76 |
|
-type short() :: 0..?STREAM_MGMT_H_MAX. |
77 |
|
|
78 |
|
-type maybe_smid() :: {sid, ejabberd_sm:sid()} |
79 |
|
| {stale_h, non_neg_integer()} |
80 |
|
| {error, smid_not_found}. |
81 |
|
|
82 |
|
-export_type([smid/0, short/0, sm_state/0, c2s_state/0]). |
83 |
|
|
84 |
|
%% |
85 |
|
%% `gen_mod' callbacks |
86 |
|
%% |
87 |
|
|
88 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
89 |
|
start(HostType, Opts) -> |
90 |
487 |
mod_stream_management_backend:init(HostType, Opts), |
91 |
487 |
?LOG_INFO(#{what => stream_management_starting}). |
92 |
|
|
93 |
|
-spec stop(mongooseim:host_type()) -> ok. |
94 |
|
stop(HostType) -> |
95 |
487 |
?LOG_INFO(#{what => stream_management_stopping}), |
96 |
487 |
mod_stream_management_backend:stop(HostType), |
97 |
487 |
ok. |
98 |
|
|
99 |
|
-spec hooks(mongooseim:host_type()) -> gen_hook:hook_list(). |
100 |
|
hooks(HostType) -> |
101 |
|
[{c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 50}, |
102 |
|
{session_cleanup, HostType, fun ?MODULE:session_cleanup/3, #{}, 50}] |
103 |
974 |
++ mod_stream_management_sasl2:hooks(HostType) ++ c2s_hooks(HostType). |
104 |
|
|
105 |
|
-spec c2s_hooks(mongooseim:host_type()) -> gen_hook:hook_list(mongoose_c2s_hooks:fn()). |
106 |
|
c2s_hooks(HostType) -> |
107 |
974 |
[ |
108 |
|
{user_send_packet, HostType, fun ?MODULE:user_send_packet/3, #{}, 20}, |
109 |
|
{user_receive_packet, HostType, fun ?MODULE:user_receive_packet/3, #{}, 10}, |
110 |
|
{xmpp_presend_element, HostType, fun ?MODULE:xmpp_presend_element/3, #{}, 50}, |
111 |
|
{user_send_xmlel, HostType, fun ?MODULE:user_send_xmlel/3, #{}, 50}, |
112 |
|
{foreign_event, HostType, fun ?MODULE:foreign_event/3, #{}, 50}, |
113 |
|
{user_stop_request, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 100}, |
114 |
|
{user_socket_closed, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 100}, |
115 |
|
{user_socket_error, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 100}, |
116 |
|
{user_terminate, HostType, fun ?MODULE:user_terminate/3, #{}, 50}, |
117 |
|
{reroute_unacked_messages, HostType, fun ?MODULE:reroute_unacked_messages/3, #{}, 80} |
118 |
|
]. |
119 |
|
|
120 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
121 |
|
config_spec() -> |
122 |
208 |
#section{ |
123 |
|
items = #{<<"backend">> => #option{type = atom, validate = {module, ?MODULE}}, |
124 |
|
<<"buffer">> => #option{type = boolean}, |
125 |
|
<<"buffer_max">> => #option{type = int_or_infinity, |
126 |
|
validate = positive}, |
127 |
|
<<"ack">> => #option{type = boolean}, |
128 |
|
<<"ack_freq">> => #option{type = integer, |
129 |
|
validate = positive}, |
130 |
|
<<"resume_timeout">> => #option{type = integer, |
131 |
|
validate = positive}, |
132 |
|
<<"stale_h">> => stale_h_config_spec() |
133 |
|
}, |
134 |
|
process = fun ?MODULE:process_buffer_and_ack/1, |
135 |
|
defaults = #{<<"backend">> => mnesia, |
136 |
|
<<"buffer">> => true, |
137 |
|
<<"buffer_max">> => 100, |
138 |
|
<<"ack">> => true, |
139 |
|
<<"ack_freq">> => 1, |
140 |
|
<<"resume_timeout">> => 600 % seconds |
141 |
|
} |
142 |
|
}. |
143 |
|
|
144 |
204 |
supported_features() -> [dynamic_domains]. |
145 |
|
|
146 |
|
process_buffer_and_ack(Opts = #{buffer := Buffer, ack := Ack}) -> |
147 |
104 |
OptsWithBuffer = check_buffer(Opts, Buffer), |
148 |
104 |
check_ack(OptsWithBuffer, Ack). |
149 |
|
|
150 |
|
check_buffer(Opts, false) -> |
151 |
:-( |
Opts#{buffer_max => no_buffer}; |
152 |
|
check_buffer(Opts, _) -> |
153 |
104 |
Opts. |
154 |
|
|
155 |
|
check_ack(Opts, false) -> |
156 |
:-( |
Opts#{ack_freq => never}; |
157 |
|
check_ack(Opts, _) -> |
158 |
104 |
Opts. |
159 |
|
|
160 |
|
stale_h_config_spec() -> |
161 |
208 |
#section{ |
162 |
|
items = #{<<"enabled">> => #option{type = boolean}, |
163 |
|
<<"repeat_after">> => #option{type = integer, |
164 |
|
validate = positive}, |
165 |
|
<<"geriatric">> => #option{type = integer, |
166 |
|
validate = positive}}, |
167 |
|
include = always, |
168 |
|
defaults = #{<<"enabled">> => false, |
169 |
|
<<"repeat_after">> => 1800, % seconds |
170 |
|
<<"geriatric">> => 3600 % seconds |
171 |
|
} |
172 |
|
}. |
173 |
|
|
174 |
|
%% |
175 |
|
%% hooks handlers |
176 |
|
%% |
177 |
|
|
178 |
|
-spec user_send_packet(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
179 |
|
mongoose_c2s_hooks:result(). |
180 |
|
user_send_packet(Acc, #{c2s_data := StateData}, _Extra) -> |
181 |
23607 |
case {get_mod_state(StateData), is_sm_element(Acc)} of |
182 |
|
{#sm_state{counter_in = Counter} = SmState, false} -> |
183 |
146 |
NewSmState = SmState#sm_state{counter_in = incr_counter(Counter)}, |
184 |
146 |
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})}; |
185 |
|
{_, _} -> |
186 |
23461 |
{ok, Acc} |
187 |
|
end; |
188 |
|
user_send_packet(Acc, _Params, _Extra) -> |
189 |
:-( |
{ok, Acc}. |
190 |
|
|
191 |
|
-spec user_receive_packet(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
192 |
|
mongoose_c2s_hooks:result(). |
193 |
|
user_receive_packet(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _) -> |
194 |
40986 |
Check1 = is_conflict_incoming_acc(Acc, StateData), |
195 |
40986 |
Check2 = is_conflict_receiver_sid(Acc, StateData), |
196 |
40986 |
case {Check1, Check2} of |
197 |
|
{true, _} -> %% A race condition detected: same jid, but different sids |
198 |
4 |
C2SSid = mongoose_c2s:get_sid(StateData), |
199 |
4 |
OriginSid = mongoose_acc:get(c2s, origin_sid, undefined, Acc), |
200 |
4 |
?LOG_WARNING(#{what => conflict_check_failed, |
201 |
|
text => <<"Drop Acc that is addressed to another connection " |
202 |
|
"(origin SID check failed)">>, |
203 |
|
c2s_sid => C2SSid, origin_sid => OriginSid, |
204 |
:-( |
acc => Acc, state_name => C2SState, c2s_state => StateData}), |
205 |
4 |
{stop, Acc}; |
206 |
|
{_, true} -> |
207 |
:-( |
C2SSid = mongoose_c2s:get_sid(StateData), |
208 |
:-( |
ReceiverSID = mongoose_acc:get(c2s, receiver_sid, undefined, Acc), |
209 |
:-( |
?LOG_WARNING(#{what => conflict_check_failed, |
210 |
|
text => <<"Drop Acc that is addressed to another connection " |
211 |
|
"(receiver SID check failed)">>, |
212 |
|
c2s_sid => C2SSid, receiver_sid => ReceiverSID, |
213 |
:-( |
acc => Acc, state_name => C2SState, c2s_state => StateData}), |
214 |
:-( |
{stop, Acc}; |
215 |
|
_ -> |
216 |
40982 |
{ok, Acc} |
217 |
|
end; |
218 |
|
user_receive_packet(Acc, _Params, _Extra) -> |
219 |
:-( |
{ok, Acc}. |
220 |
|
|
221 |
|
-spec xmpp_presend_element(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
222 |
|
mongoose_c2s_hooks:result(). |
223 |
|
xmpp_presend_element(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _Extra) -> |
224 |
35035 |
case {get_mod_state(StateData), mongoose_acc:stanza_type(Acc)} of |
225 |
|
{{error, not_found}, _} -> |
226 |
34735 |
{ok, Acc}; |
227 |
|
{_, <<"probe">>} -> |
228 |
:-( |
{ok, Acc}; |
229 |
|
{#sm_state{buffer_max = no_buffer} = SmState, _} -> |
230 |
:-( |
maybe_send_ack_request(Acc, SmState); |
231 |
|
{SmState, _} -> |
232 |
300 |
Jid = mongoose_c2s:get_jid(StateData), |
233 |
300 |
handle_buffer_and_ack(Acc, C2SState, Jid, SmState) |
234 |
|
end. |
235 |
|
|
236 |
|
-spec handle_buffer_and_ack(mongoose_acc:t(), c2s_state(), jid:jid(), sm_state()) -> |
237 |
|
mongoose_c2s_hooks:result(). |
238 |
|
handle_buffer_and_ack(Acc, C2SState, Jid, #sm_state{buffer = Buffer, buffer_max = BufferMax, |
239 |
|
buffer_size = BufferSize} = SmState) -> |
240 |
300 |
NewBufferSize = BufferSize + 1, |
241 |
300 |
MaybeActions = case is_buffer_full(NewBufferSize, BufferMax) of |
242 |
|
true -> |
243 |
38 |
{{timeout, ?MODULE}, ?CONSTRAINT_CHECK_TIMEOUT, check_buffer_full}; |
244 |
|
false -> |
245 |
262 |
[] |
246 |
|
end, |
247 |
300 |
Acc1 = notify_unacknowledged_msg_if_in_resume_state(Acc, Jid, C2SState), |
248 |
300 |
NewSmState = SmState#sm_state{buffer = [Acc1 | Buffer], buffer_size = NewBufferSize}, |
249 |
300 |
Acc2 = mongoose_c2s_acc:to_acc(Acc, actions, MaybeActions), |
250 |
300 |
maybe_send_ack_request(Acc2, NewSmState). |
251 |
|
|
252 |
|
notify_unacknowledged_msg_if_in_resume_state(Acc, Jid, ?EXT_C2S_STATE(resume_session)) -> |
253 |
34 |
maybe_notify_unacknowledged_msg(Acc, Jid); |
254 |
|
notify_unacknowledged_msg_if_in_resume_state(Acc, _, _) -> |
255 |
266 |
Acc. |
256 |
|
|
257 |
|
-spec is_buffer_full(non_neg_integer(), buffer_max()) -> boolean(). |
258 |
|
is_buffer_full(_BufferSize, infinity) -> |
259 |
:-( |
false; |
260 |
|
is_buffer_full(BufferSize, BufferMax) when BufferSize =< BufferMax -> |
261 |
263 |
false; |
262 |
|
is_buffer_full(_, _) -> |
263 |
40 |
true. |
264 |
|
|
265 |
|
-spec maybe_send_ack_request(mongoose_acc:t(), sm_state()) -> |
266 |
|
mongoose_c2s_hooks:result(). |
267 |
|
maybe_send_ack_request(Acc, #sm_state{buffer_size = BufferSize, |
268 |
|
counter_out = Out, |
269 |
|
ack_freq = AckFreq} = SmState) |
270 |
|
when 0 =:= (Out + BufferSize) rem AckFreq, ack_freq =/= never -> |
271 |
144 |
Stanza = mod_stream_management_stanzas:stream_mgmt_request(), |
272 |
144 |
ToAcc = [{socket_send, Stanza}, {state_mod, {?MODULE, SmState}}], |
273 |
144 |
{ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}; |
274 |
|
maybe_send_ack_request(Acc, SmState) -> |
275 |
156 |
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, SmState})}. |
276 |
|
|
277 |
|
-spec user_send_xmlel(Acc, Params, Extra) -> Result when |
278 |
|
Acc :: mongoose_acc:t(), |
279 |
|
Params :: mongoose_c2s_hooks:params(), |
280 |
|
Extra :: gen_hook:extra(), |
281 |
|
Result :: mongoose_c2s_hooks:result(). |
282 |
|
user_send_xmlel(Acc, Params, Extra) -> |
283 |
189 |
El = mongoose_acc:element(Acc), |
284 |
189 |
case exml_query:attr(El, <<"xmlns">>) of |
285 |
|
?NS_STREAM_MGNT_3 -> |
286 |
167 |
handle_stream_mgmt(Acc, Params, El); |
287 |
|
_ -> |
288 |
22 |
user_send_packet(Acc, Params, Extra) |
289 |
|
end. |
290 |
|
|
291 |
|
-spec foreign_event(Acc, Params, Extra) -> Result when |
292 |
|
Acc :: mongoose_acc:t(), |
293 |
|
Params :: mongoose_c2s_hooks:params(), |
294 |
|
Extra :: gen_hook:extra(), |
295 |
|
Result :: mongoose_c2s_hooks:result(). |
296 |
|
foreign_event(Acc, #{c2s_data := StateData, |
297 |
|
event_type := {call, From}, |
298 |
|
event_tag := ?MODULE, |
299 |
|
event_content := {resume, H}}, _Extra) -> |
300 |
1 |
case handle_resume_call(StateData, From, H) of |
301 |
|
{ok, SmState} -> |
302 |
1 |
FlushedStateData = mongoose_c2s:merge_mod_state(StateData, #{?MODULE => SmState}), |
303 |
1 |
ToAcc = [{c2s_data, FlushedStateData}, {hard_stop, {shutdown, resumed}}], |
304 |
1 |
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}; |
305 |
|
error -> |
306 |
:-( |
{stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, bad_stream_management_request)} |
307 |
|
end; |
308 |
|
foreign_event(Acc, #{c2s_data := StateData, event_type := {timeout, ?MODULE}, event_content := check_buffer_full}, _Extra) -> |
309 |
3 |
#sm_state{buffer_size = BufferSize, buffer_max = BufferMax} = get_mod_state(StateData), |
310 |
3 |
case is_buffer_full(BufferSize, BufferMax) of |
311 |
|
true -> |
312 |
2 |
Lang = mongoose_c2s:get_lang(StateData), |
313 |
2 |
Err = mongoose_xmpp_errors:stream_resource_constraint(Lang, <<"too many unacked stanzas">>), |
314 |
2 |
mongoose_c2s:c2s_stream_error(StateData, Err), |
315 |
2 |
{stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, too_many_unacked_stanzas)}; |
316 |
|
false -> |
317 |
1 |
{ok, Acc} |
318 |
|
end; |
319 |
|
foreign_event(Acc, _Params, _Extra) -> |
320 |
132 |
{ok, Acc}. |
321 |
|
|
322 |
|
-spec handle_user_stopping(Acc, Params, Extra) -> Result when |
323 |
|
Acc :: mongoose_acc:t(), |
324 |
|
Params :: mongoose_c2s_hooks:params(), |
325 |
|
Extra :: gen_hook:extra(), |
326 |
|
Result :: mongoose_c2s_hooks:result(). |
327 |
|
handle_user_stopping(Acc, #{c2s_data := StateData}, #{host_type := HostType}) -> |
328 |
210 |
case get_mod_state(StateData) of |
329 |
|
{error, not_found} -> |
330 |
150 |
{ok, Acc}; |
331 |
|
SmState -> |
332 |
60 |
Timeout = get_resume_timeout(HostType), |
333 |
60 |
NewSmState = notify_unacknowledged_messages(Acc, StateData, SmState), |
334 |
60 |
Actions = [{push_callback_module, ?MODULE}, {{timeout, ?MODULE}, Timeout, resume_timeout}, hibernate], |
335 |
60 |
ToAcc = [{c2s_state, ?EXT_C2S_STATE(resume_session)}, {actions, Actions}, {state_mod, {?MODULE, NewSmState}}], |
336 |
60 |
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)} |
337 |
|
end. |
338 |
|
|
339 |
|
-spec notify_unacknowledged_messages(mongoose_acc:t(), mongoose_c2s:data(), sm_state()) -> |
340 |
|
sm_state(). |
341 |
|
notify_unacknowledged_messages(_, StateData, #sm_state{buffer = Buffer} = SmState) -> |
342 |
60 |
Jid = mongoose_c2s:get_jid(StateData), |
343 |
60 |
NewBuffer = [maybe_notify_unacknowledged_msg(Acc, Jid) || Acc <- lists:reverse(Buffer)], |
344 |
60 |
SmState#sm_state{buffer = lists:reverse(NewBuffer)}. |
345 |
|
|
346 |
|
-spec maybe_notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t(). |
347 |
|
maybe_notify_unacknowledged_msg(Acc, Jid) -> |
348 |
169 |
case mongoose_acc:stanza_name(Acc) of |
349 |
110 |
<<"message">> -> notify_unacknowledged_msg(Acc, Jid); |
350 |
59 |
_ -> Acc |
351 |
|
end. |
352 |
|
|
353 |
|
-spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t(). |
354 |
|
notify_unacknowledged_msg(Acc, Jid) -> |
355 |
110 |
NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid), |
356 |
110 |
mongoose_acc:strip(NewAcc). |
357 |
|
|
358 |
|
-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
359 |
|
mongoose_c2s_hooks:result(). |
360 |
|
reroute_unacked_messages(Acc, #{c2s_data := StateData, reason := Reason}, #{host_type := HostType}) -> |
361 |
6967 |
MaybeSmState = get_mod_state(StateData), |
362 |
6967 |
maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, Reason, MaybeSmState). |
363 |
|
|
364 |
|
-spec user_terminate(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
365 |
|
mongoose_c2s_hooks:result(). |
366 |
|
user_terminate(Acc, #{reason := Reason}, _Extra) when ?IS_STREAM_MGMT_STOP(Reason) -> |
367 |
55 |
{stop, Acc}; %% We stop here because this termination was triggered internally |
368 |
|
user_terminate(Acc, _Params, _Extra) -> |
369 |
6912 |
{ok, Acc}. |
370 |
|
|
371 |
|
-spec maybe_handle_stream_mgmt_reroute( |
372 |
|
mongoose_acc:t(), mongoose_c2s:data(), mongooseim:host_type(), term(), maybe_sm_state()) -> |
373 |
|
mongoose_c2s_hooks:result(). |
374 |
|
maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, Reason, #sm_state{counter_in = H} = SmState) |
375 |
|
when ?IS_STREAM_MGMT_STOP(Reason) -> |
376 |
55 |
Sid = mongoose_c2s:get_sid(StateData), |
377 |
55 |
do_remove_smid(HostType, Sid, H), |
378 |
55 |
NewSmState = handle_user_terminate(SmState, StateData, HostType), |
379 |
55 |
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})}; |
380 |
|
maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, _Reason, #sm_state{} = SmState) -> |
381 |
39 |
NewSmState = handle_user_terminate(SmState, StateData, HostType), |
382 |
39 |
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})}; |
383 |
|
maybe_handle_stream_mgmt_reroute(Acc, _StateData, _HostType, _Reason, {error, not_found}) -> |
384 |
6873 |
{ok, Acc}. |
385 |
|
|
386 |
|
-spec handle_user_terminate(sm_state(), mongoose_c2s:data(), mongooseim:host_type()) -> sm_state(). |
387 |
|
handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) -> |
388 |
94 |
Sid = mongoose_c2s:get_sid(StateData), |
389 |
94 |
do_remove_smid(HostType, Sid, H), |
390 |
94 |
reroute_buffer(StateData, SmState), |
391 |
94 |
SmState#sm_state{buffer = [], buffer_size = 0}. |
392 |
|
|
393 |
|
reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) -> |
394 |
12 |
mongoose_c2s:reroute_buffer_to_pid(StateData, Pid, Buffer); |
395 |
|
reroute_buffer(StateData, #sm_state{buffer = Buffer}) -> |
396 |
82 |
mongoose_c2s:reroute_buffer(StateData, Buffer). |
397 |
|
|
398 |
|
-spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term(). |
399 |
|
terminate(Reason, C2SState, StateData) -> |
400 |
55 |
?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason, |
401 |
55 |
c2s_state => C2SState, c2s_data => StateData}), |
402 |
55 |
mongoose_c2s:terminate({shutdown, ?MODULE}, C2SState, StateData). |
403 |
|
|
404 |
|
-spec handle_stream_mgmt(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) -> |
405 |
|
mongoose_c2s_hooks:result(). |
406 |
|
handle_stream_mgmt(Acc, Params = #{c2s_state := C2SState}, El = #xmlel{name = <<"a">>}) |
407 |
|
when ?IS_ALLOWED_STATE(C2SState) -> |
408 |
61 |
handle_a(Acc, Params, El); |
409 |
|
handle_stream_mgmt(Acc, Params = #{c2s_state := C2SState}, #xmlel{name = <<"r">>}) |
410 |
|
when ?IS_ALLOWED_STATE(C2SState) -> |
411 |
7 |
handle_r(Acc, Params); |
412 |
|
handle_stream_mgmt(Acc, Params = #{c2s_state := C2SState}, El = #xmlel{name = <<"enable">>}) |
413 |
|
when ?IS_ALLOWED_STATE(C2SState) -> |
414 |
81 |
handle_enable(Acc, Params, El); |
415 |
|
handle_stream_mgmt(Acc, Params = #{c2s_state := {wait_for_feature_after_auth, _}}, El = #xmlel{name = <<"resume">>}) -> |
416 |
15 |
handle_resume_request(Acc, Params, El); |
417 |
|
handle_stream_mgmt(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _El) -> |
418 |
3 |
unexpected_sm_request(Acc, StateData, C2SState); |
419 |
|
handle_stream_mgmt(Acc, _Params, _El) -> |
420 |
:-( |
{ok, Acc}. |
421 |
|
|
422 |
|
-spec handle_r(mongoose_acc:t(), mongoose_c2s_hooks:params()) -> |
423 |
|
mongoose_c2s_hooks:result(). |
424 |
|
handle_r(Acc, #{c2s_data := StateData}) -> |
425 |
7 |
case get_mod_state(StateData) of |
426 |
|
{error, not_found} -> |
427 |
:-( |
?LOG_WARNING(#{what => unexpected_r, c2s_state => StateData, |
428 |
:-( |
text => <<"received <r/> but stream management is off!">>}), |
429 |
:-( |
{ok, Acc}; |
430 |
|
#sm_state{counter_in = In} -> |
431 |
7 |
Stanza = mod_stream_management_stanzas:stream_mgmt_ack(In), |
432 |
7 |
{ok, mongoose_c2s_acc:to_acc(Acc, socket_send, Stanza)} |
433 |
|
end. |
434 |
|
|
435 |
|
-spec handle_a(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) -> |
436 |
|
mongoose_c2s_hooks:result(). |
437 |
|
handle_a(Acc, #{c2s_data := StateData}, El) -> |
438 |
61 |
case {get_mod_state(StateData), stream_mgmt_parse_h(El)} of |
439 |
|
{{error, not_found}, _} -> |
440 |
1 |
{ok, Acc}; |
441 |
|
{_, invalid_h_attribute} -> |
442 |
1 |
Stanza = mongoose_xmpp_errors:policy_violation(?MYLANG, <<"Invalid h attribute">>), |
443 |
1 |
mongoose_c2s:c2s_stream_error(StateData, Stanza), |
444 |
1 |
{stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, invalid_h_attribute)}; |
445 |
|
{Handler, H} -> |
446 |
59 |
HandledAck = do_handle_ack(Handler, H), |
447 |
59 |
finish_handle_a(StateData, Acc, HandledAck) |
448 |
|
end. |
449 |
|
|
450 |
|
-spec finish_handle_a(mongoose_c2s:data(), mongoose_acc:t(), sm_state() | {error, term()}) -> |
451 |
|
mongoose_c2s_hooks:result(). |
452 |
|
finish_handle_a(StateData, Acc, {error, ErrorStanza, Reason}) -> |
453 |
2 |
mongoose_c2s:c2s_stream_error(StateData, ErrorStanza), |
454 |
2 |
{stop, mongoose_c2s_acc:to_acc(Acc, stop, Reason)}; |
455 |
|
finish_handle_a(_StateData, Acc, #sm_state{} = NewSmState) -> |
456 |
57 |
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})}. |
457 |
|
|
458 |
|
-spec do_handle_ack(sm_state(), non_neg_integer()) -> sm_state() | {error, exml:element(), term()}. |
459 |
|
do_handle_ack(#sm_state{counter_out = OldAcked, |
460 |
|
buffer_size = BufferSize, |
461 |
|
buffer = Buffer} = SmState, Acked) -> |
462 |
73 |
ToDrop = calc_to_drop(Acked, OldAcked), |
463 |
73 |
case BufferSize < ToDrop of |
464 |
|
true -> |
465 |
4 |
ErrorStanza0 = #xmlel{children = Children} |
466 |
|
= mongoose_xmpp_errors:undefined_condition( |
467 |
|
?MYLANG, <<"You acknowledged more stanzas that what has been sent">>), |
468 |
4 |
HandledCountField = mod_stream_management_stanzas:sm_handled_count_too_high_stanza(Acked, OldAcked), |
469 |
4 |
ErrorStanza = ErrorStanza0#xmlel{children = [HandledCountField | Children]}, |
470 |
4 |
{error, ErrorStanza, {shutdown, sm_handled_count_too_high_stanza}}; |
471 |
|
false -> |
472 |
69 |
NewSize = BufferSize - ToDrop, |
473 |
69 |
{NewBuffer, _Dropped} = lists:split(NewSize, Buffer), |
474 |
69 |
SmState#sm_state{counter_out = Acked, buffer_size = NewSize, buffer = NewBuffer} |
475 |
|
end. |
476 |
|
|
477 |
|
-spec incr_counter(short()) -> short(). |
478 |
|
incr_counter(Incoming) when Incoming < ?STREAM_MGMT_H_MAX - 1 -> |
479 |
146 |
Incoming + 1; |
480 |
|
incr_counter(_Incoming) -> |
481 |
:-( |
0. |
482 |
|
|
483 |
|
-spec calc_to_drop(short(), short()) -> short(). |
484 |
|
calc_to_drop(Acked, OldAcked) when Acked >= OldAcked -> |
485 |
73 |
Acked - OldAcked; |
486 |
|
calc_to_drop(Acked, OldAcked) -> |
487 |
:-( |
Acked + ?STREAM_MGMT_H_MAX - OldAcked + 1. |
488 |
|
|
489 |
|
-spec handle_enable(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) -> |
490 |
|
mongoose_c2s_hooks:result(). |
491 |
|
handle_enable(Acc, #{c2s_data := StateData}, El) -> |
492 |
81 |
case if_not_already_enabled_create_sm_state(StateData) of |
493 |
|
error -> |
494 |
1 |
stream_error(Acc, StateData); |
495 |
|
SmState -> |
496 |
80 |
do_handle_enable(Acc, StateData, SmState, El) |
497 |
|
end. |
498 |
|
|
499 |
|
-spec do_handle_enable(mongoose_acc:t(), mongoose_c2s:data(), sm_state(), exml:element()) -> |
500 |
|
mongoose_c2s_hooks:result(). |
501 |
|
do_handle_enable(Acc, StateData, SmState, El) -> |
502 |
80 |
case exml_query:attr(El, <<"resume">>, false) of |
503 |
|
false -> |
504 |
37 |
Stanza = mod_stream_management_stanzas:stream_mgmt_enabled(), |
505 |
37 |
ToAcc = [{state_mod, {?MODULE, SmState}}, {socket_send, Stanza}], |
506 |
37 |
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}; |
507 |
|
Attr when Attr =:= <<"true">>; Attr =:= <<"1">> -> |
508 |
43 |
Stanza = register_smid_return_enabled_stanza(StateData), |
509 |
43 |
ToAcc = [{state_mod, {?MODULE, SmState}}, {socket_send, Stanza}], |
510 |
43 |
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}; |
511 |
|
_ -> |
512 |
:-( |
stream_error(Acc, StateData) |
513 |
|
end. |
514 |
|
|
515 |
|
-spec register_smid_return_enabled_stanza(mongoose_c2s:data()) -> exml:element(). |
516 |
|
register_smid_return_enabled_stanza(StateData) -> |
517 |
44 |
SMID = make_smid(), |
518 |
44 |
Sid = mongoose_c2s:get_sid(StateData), |
519 |
44 |
HostType = mongoose_c2s:get_host_type(StateData), |
520 |
44 |
ok = register_smid(HostType, SMID, Sid), |
521 |
44 |
mod_stream_management_stanzas:stream_mgmt_enabled([{<<"id">>, SMID}, {<<"resume">>, <<"true">>}]). |
522 |
|
|
523 |
|
-spec if_not_already_enabled_create_sm_state(mongoose_c2s:data()) -> sm_state() | error. |
524 |
|
if_not_already_enabled_create_sm_state(StateData) -> |
525 |
83 |
case get_mod_state(StateData) of |
526 |
1 |
#sm_state{} -> error; |
527 |
|
{error, not_found} -> |
528 |
82 |
HostType = mongoose_c2s:get_host_type(StateData), |
529 |
82 |
build_sm_handler(HostType) |
530 |
|
end. |
531 |
|
|
532 |
|
-spec handle_resume_request(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) -> |
533 |
|
mongoose_c2s_hooks:result(). |
534 |
|
handle_resume_request(Acc, #{c2s_state := C2SState, c2s_data := C2SData}, El) -> |
535 |
15 |
case handle_resume(C2SData, C2SState, El) of |
536 |
|
{stream_mgmt_error, ErrorStanza} -> |
537 |
4 |
stream_mgmt_error(Acc, C2SData, C2SState, ErrorStanza); |
538 |
|
{error, ErrorStanza, Reason} -> |
539 |
1 |
mongoose_c2s:c2s_stream_error(C2SData, ErrorStanza), |
540 |
1 |
{stop, mongoose_c2s_acc:to_acc(Acc, stop, Reason)}; |
541 |
|
{error, ErrorStanza} -> |
542 |
:-( |
{stop, mongoose_c2s_acc:to_acc(Acc, socket_send, ErrorStanza)}; |
543 |
|
{ok, #{resumed := Resumed, forward := ToForward, |
544 |
|
c2s_state := C2SState1, c2s_data := C2SData1}} -> |
545 |
10 |
ToAcc = [{c2s_state, C2SState1}, {c2s_data, C2SData1}, {socket_send, [Resumed | ToForward]}], |
546 |
10 |
{ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)} |
547 |
|
end. |
548 |
|
|
549 |
|
%% This runs on the new process |
550 |
|
-spec handle_resume(mongoose_c2s:data(), mongoose_c2s:state(), exml:element()) -> |
551 |
|
resume_return(). |
552 |
|
handle_resume(C2SData, C2SState, El) -> |
553 |
22 |
case {get_previd(El), stream_mgmt_parse_h(El), get_mod_state(C2SData)} of |
554 |
|
{undefined, _, _} -> |
555 |
1 |
{error, mod_stream_management_stanzas:stream_mgmt_failed(<<"bad-request">>)}; |
556 |
|
{_, invalid_h_attribute, _} -> |
557 |
1 |
{error, mod_stream_management_stanzas:stream_mgmt_failed(<<"bad-request">>)}; |
558 |
|
{_, _, #sm_state{}} -> |
559 |
:-( |
{error, mod_stream_management_stanzas:stream_mgmt_failed(<<"bad-request">>)}; |
560 |
|
{SMID, H, {error, not_found}} -> |
561 |
20 |
HostType = mongoose_c2s:get_host_type(C2SData), |
562 |
20 |
FromSMID = get_session_from_smid(HostType, SMID), |
563 |
20 |
do_handle_resume(C2SData, C2SState, SMID, H, FromSMID) |
564 |
|
end. |
565 |
|
|
566 |
|
-spec do_handle_resume(StateData, C2SState, SMID, H, FromSMID) -> HookResult when |
567 |
|
StateData :: mongoose_c2s:data(), |
568 |
|
C2SState :: c2s_state(), |
569 |
|
SMID :: smid(), |
570 |
|
H :: non_neg_integer(), |
571 |
|
FromSMID :: maybe_smid(), |
572 |
|
HookResult :: resume_return(). |
573 |
|
do_handle_resume(StateData, _C2SState, SMID, H, {sid, {_TS, Pid}}) -> |
574 |
15 |
case get_peer_state(Pid, H) of |
575 |
|
{ok, OldStateData} -> |
576 |
12 |
NewState = mongoose_c2s:merge_states(OldStateData, StateData), |
577 |
12 |
do_resume(NewState, SMID); |
578 |
|
{error, ErrorStanza, Reason} -> |
579 |
2 |
{error, ErrorStanza, Reason}; |
580 |
|
{exception, {C, R, S}} -> |
581 |
1 |
?LOG_WARNING(#{what => resumption_error, |
582 |
|
text => <<"Resumption error because of invalid response">>, |
583 |
:-( |
class => C, reason => R, stacktrace => S, pid => Pid, c2s_state => StateData}), |
584 |
1 |
{stream_mgmt_error, mod_stream_management_stanzas:stream_mgmt_failed(<<"item-not-found">>)} |
585 |
|
end; |
586 |
|
do_handle_resume(StateData, _C2SState, SMID, _H, {stale_h, StaleH}) -> |
587 |
1 |
?LOG_WARNING(#{what => resumption_error, reason => session_resumption_timed_out, |
588 |
:-( |
smid => SMID, stale_h => StaleH, c2s_state => StateData}), |
589 |
1 |
{stream_mgmt_error, mod_stream_management_stanzas:stream_mgmt_failed(<<"item-not-found">>, [{<<"h">>, integer_to_binary(StaleH)}])}; |
590 |
|
do_handle_resume(StateData, _C2SState, SMID, _H, {error, smid_not_found}) -> |
591 |
4 |
?LOG_WARNING(#{what => resumption_error, reason => no_previous_session_for_smid, |
592 |
:-( |
smid => SMID, c2s_state => StateData}), |
593 |
4 |
{stream_mgmt_error, mod_stream_management_stanzas:stream_mgmt_failed(<<"item-not-found">>)}. |
594 |
|
|
595 |
|
%% This runs on the new process |
596 |
|
-spec do_resume(StateData, SMID) -> HookResult when |
597 |
|
StateData :: mongoose_c2s:data(), |
598 |
|
SMID :: smid(), |
599 |
|
HookResult :: resume_return(). |
600 |
|
do_resume(StateData, SMID) -> |
601 |
12 |
{_ReplacedPids, StateData2} = mongoose_c2s:open_session(StateData), |
602 |
12 |
ok = register_smid(StateData2, SMID), |
603 |
12 |
{Resumed, ToForward} = get_all_stanzas_to_forward(StateData2, SMID), |
604 |
12 |
{ok, #{resumed => Resumed, forward => ToForward, |
605 |
|
c2s_state => session_established, c2s_data => StateData2}}. |
606 |
|
|
607 |
|
register_smid(StateData, SMID) -> |
608 |
12 |
Sid = mongoose_c2s:get_sid(StateData), |
609 |
12 |
HostType = mongoose_c2s:get_host_type(StateData), |
610 |
12 |
ok = register_smid(HostType, SMID, Sid). |
611 |
|
|
612 |
|
-spec get_all_stanzas_to_forward(mongoose_c2s:data(), smid()) -> {exml:element(), [exml:element()]}. |
613 |
|
get_all_stanzas_to_forward(StateData, SMID) -> |
614 |
12 |
#sm_state{counter_in = Counter, buffer = Buffer} = get_mod_state(StateData), |
615 |
12 |
Resumed = mod_stream_management_stanzas:stream_mgmt_resumed(SMID, Counter), |
616 |
12 |
LServer = mongoose_c2s:get_lserver(StateData), |
617 |
12 |
FromServer = jid:make_noprep(<<>>, LServer, <<>>), |
618 |
12 |
ToForward = [ begin |
619 |
26 |
TS = mongoose_acc:timestamp(Acc), |
620 |
26 |
Packet = mongoose_acc:element(Acc), |
621 |
26 |
StanzaName = mongoose_acc:stanza_name(Acc), |
622 |
26 |
StanzaType = mongoose_acc:stanza_type(Acc), |
623 |
26 |
maybe_add_timestamp(Packet, StanzaName, StanzaType, TS, FromServer) |
624 |
12 |
end || Acc <- lists:reverse(Buffer)], |
625 |
12 |
{Resumed, ToForward}. |
626 |
|
|
627 |
|
maybe_add_timestamp(Packet, <<"message">>, <<"error">>, _, _) -> |
628 |
:-( |
Packet; |
629 |
|
maybe_add_timestamp(Packet, <<"message">>, <<"headline">>, _, _) -> |
630 |
:-( |
Packet; |
631 |
|
maybe_add_timestamp(Packet, <<"message">>, _, TS, FromServer) -> |
632 |
24 |
jlib:maybe_append_delay(Packet, FromServer, TS, <<"SM Storage">>); |
633 |
|
maybe_add_timestamp(Packet, _StanzaName, _StanzaType, _TS, _FromServer) -> |
634 |
2 |
Packet. |
635 |
|
|
636 |
|
%% If jid is the same, but sid is not, then we have a conflict. |
637 |
|
%% jid example is alice@localhost/res1. |
638 |
|
%% sid example is `{now(), pid()}'. |
639 |
|
%% The conflict can happen, when actions with an accumulator were initiated by |
640 |
|
%% one process but the resulting stanzas were routed to another process with |
641 |
|
%% the same JID but different SID. |
642 |
|
%% The conflict usually happens when a user is reconnecting. |
643 |
|
%% Both origin_sid and origin_jid props should be defined. |
644 |
|
%% But we don't force developers to set both of them, so we should correctly |
645 |
|
%% process stanzas, that have only one properly set. |
646 |
|
%% "Incoming" means that stanza is coming from ejabberd_router. |
647 |
|
-spec is_conflict_incoming_acc(mongoose_acc:t(), mongoose_c2s:data()) -> boolean(). |
648 |
|
is_conflict_incoming_acc(Acc, StateData) -> |
649 |
40986 |
OriginJid = mongoose_acc:get(c2s, origin_jid, undefined, Acc), |
650 |
40986 |
OriginSid = mongoose_acc:get(c2s, origin_sid, undefined, Acc), |
651 |
40986 |
AreOriginsDefined = OriginJid =/= undefined andalso OriginSid =/= undefined, |
652 |
40986 |
case AreOriginsDefined of |
653 |
|
false -> |
654 |
10957 |
false; |
655 |
|
true -> |
656 |
30029 |
StateJid = mongoose_c2s:get_jid(StateData), |
657 |
30029 |
SameJid = jid:are_equal(OriginJid, StateJid), |
658 |
30029 |
StateSid = mongoose_c2s:get_sid(StateData), |
659 |
30029 |
SameSid = OriginSid =:= StateSid, |
660 |
|
% possible to receive response addressed to process which we resumed from - still valid! |
661 |
30029 |
OldSid = maybe_get_resumed_from_sid(get_mod_state(StateData)), |
662 |
30029 |
SameOldSession = OriginSid =:= OldSid, |
663 |
30029 |
SameJid andalso not (SameSid or SameOldSession) |
664 |
|
end. |
665 |
|
|
666 |
|
-spec maybe_get_resumed_from_sid(maybe_sm_state()) -> undefined | ejabberd_sm:sid(). |
667 |
|
maybe_get_resumed_from_sid(#sm_state{peer = {sid, ResumedFrom}}) -> |
668 |
5 |
ResumedFrom; |
669 |
|
maybe_get_resumed_from_sid(_) -> |
670 |
30024 |
undefined. |
671 |
|
|
672 |
|
-spec is_conflict_receiver_sid(mongoose_acc:t(), mongoose_c2s:data()) -> boolean(). |
673 |
|
is_conflict_receiver_sid(Acc, StateData) -> |
674 |
40986 |
StateSid = mongoose_c2s:get_sid(StateData), |
675 |
40986 |
AccSid = mongoose_acc:get(c2s, receiver_sid, StateSid, Acc), |
676 |
40986 |
StateSid =/= AccSid. |
677 |
|
|
678 |
|
-spec stream_error(mongoose_acc:t(), mongoose_c2s:data()) -> |
679 |
|
{stop, mongoose_acc:t()}. |
680 |
|
stream_error(Acc, StateData) -> |
681 |
1 |
Err = mod_stream_management_stanzas:stream_mgmt_failed(<<"unexpected-request">>), |
682 |
1 |
mongoose_c2s:c2s_stream_error(StateData, Err), |
683 |
1 |
{stop, mongoose_c2s_acc:to_acc(Acc, stop, {shutdown, stream_error})}. |
684 |
|
|
685 |
|
-spec unexpected_sm_request(mongoose_acc:t(), mongoose_c2s:data(), c2s_state()) -> |
686 |
|
{stop, mongoose_acc:t()}. |
687 |
|
unexpected_sm_request(Acc, StateData, C2SState) -> |
688 |
3 |
Err = mod_stream_management_stanzas:stream_mgmt_failed(<<"unexpected-request">>), |
689 |
3 |
stream_mgmt_error(Acc, StateData, C2SState, Err). |
690 |
|
|
691 |
|
-spec stream_mgmt_error( |
692 |
|
mongoose_acc:t(), mongoose_c2s:data(), c2s_state(), exml:element()) -> |
693 |
|
{stop, mongoose_acc:t()}. |
694 |
|
stream_mgmt_error(Acc, _StateData, C2SState, Err) -> |
695 |
7 |
case mongoose_c2s:maybe_retry_state(C2SState) of |
696 |
|
{stop, {shutdown, retries}} -> |
697 |
:-( |
{stop, mongoose_c2s_acc:to_acc(Acc, stop, {shutdown, retries})}; |
698 |
|
NewC2SState -> |
699 |
7 |
ToAcc = [{c2s_state, NewC2SState}, {socket_send, Err}], |
700 |
7 |
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)} |
701 |
|
end. |
702 |
|
|
703 |
|
-spec build_sm_handler(mongooseim:host_type()) -> sm_state(). |
704 |
|
build_sm_handler(HostType) -> |
705 |
82 |
BufferMax = get_buffer_max(HostType), |
706 |
82 |
AckFreq = get_ack_freq(HostType), |
707 |
82 |
#sm_state{buffer_max = BufferMax, ack_freq = AckFreq}. |
708 |
|
|
709 |
|
-spec get_mod_state(mongoose_c2s:data()) -> maybe_sm_state(). |
710 |
|
get_mod_state(StateData) -> |
711 |
96050 |
case mongoose_c2s:get_mod_state(StateData, ?MODULE) of |
712 |
1109 |
{ok, State} -> State; |
713 |
94941 |
Error -> Error |
714 |
|
end. |
715 |
|
|
716 |
|
-spec get_peer_state(pid(), non_neg_integer()) -> |
717 |
|
{ok, mongoose_c2s:data()} | {error, exml:element(), term()} | {exception, tuple()}. |
718 |
|
get_peer_state(Pid, H) -> |
719 |
15 |
try |
720 |
15 |
mongoose_c2s:call(Pid, ?MODULE, {resume, H}) |
721 |
|
catch |
722 |
1 |
C:R:S -> {exception, {C, R, S}} |
723 |
|
end. |
724 |
|
|
725 |
|
-spec stream_mgmt_parse_h(exml:element()) -> invalid_h_attribute | non_neg_integer(). |
726 |
|
stream_mgmt_parse_h(El) -> |
727 |
83 |
case catch binary_to_integer(exml_query:attr(El, <<"h">>)) of |
728 |
80 |
H when is_integer(H), H >= 0 -> H; |
729 |
3 |
_ -> invalid_h_attribute |
730 |
|
end. |
731 |
|
|
732 |
|
-spec get_previd(exml:element()) -> undefined | binary(). |
733 |
|
get_previd(El) -> |
734 |
22 |
exml_query:attr(El, <<"previd">>, undefined). |
735 |
|
|
736 |
|
-spec c2s_stream_features(Acc, Params, Extra) -> {ok, Acc} when |
737 |
|
Acc :: [exml:element()], |
738 |
|
Params :: map(), |
739 |
|
Extra :: gen_hook:extra(). |
740 |
|
c2s_stream_features(Acc, _, _) -> |
741 |
13632 |
{ok, lists:keystore(<<"sm">>, #xmlel.name, Acc, mod_stream_management_stanzas:sm())}. |
742 |
|
|
743 |
|
-spec session_cleanup(Acc, Params, Extra) -> {ok, Acc} when |
744 |
|
Acc :: mongoose_acc:t(), |
745 |
|
Params :: #{sid := ejabberd_sm:sid()}, |
746 |
|
Extra :: gen_hook:extra(). |
747 |
|
session_cleanup(Acc, #{sid := SID}, #{host_type := HostType}) -> |
748 |
:-( |
{ok, remove_smid(Acc, HostType, SID)}. |
749 |
|
|
750 |
|
|
751 |
|
-spec remove_smid(mongoose_acc:t(), mongooseim:host_type(), ejabberd_sm:sid()) -> |
752 |
|
mongoose_acc:t(). |
753 |
|
remove_smid(Acc, HostType, Sid) -> |
754 |
:-( |
H = mongoose_acc:get(stream_mgmt, h, undefined, Acc), |
755 |
:-( |
MaybeSMID = do_remove_smid(HostType, Sid, H), |
756 |
:-( |
mongoose_acc:set(stream_mgmt, smid, MaybeSMID, Acc). |
757 |
|
|
758 |
|
do_remove_smid(HostType, Sid, H) -> |
759 |
149 |
MaybeSMID = unregister_smid(HostType, Sid), |
760 |
149 |
case MaybeSMID of |
761 |
|
{ok, SMID} when H =/= undefined -> |
762 |
52 |
register_stale_smid_h(HostType, SMID, H); |
763 |
|
_ -> |
764 |
97 |
ok |
765 |
|
end, |
766 |
149 |
MaybeSMID. |
767 |
|
|
768 |
|
%% |
769 |
|
%% `gen_statem' callbacks |
770 |
|
%% |
771 |
|
|
772 |
|
-spec callback_mode() -> gen_statem:callback_mode_result(). |
773 |
|
callback_mode() -> |
774 |
60 |
handle_event_function. |
775 |
|
|
776 |
|
-spec init(term()) -> gen_statem:init_result(c2s_state(), mongoose_c2s:data()). |
777 |
|
init(_) -> |
778 |
:-( |
{stop, this_should_have_never_been_called}. |
779 |
|
|
780 |
|
-spec handle_event(gen_statem:event_type(), term(), c2s_state(), mongoose_c2s:data()) -> |
781 |
|
gen_statem:event_handler_result(c2s_state()). |
782 |
|
handle_event({call, From}, #{event_tag := ?MODULE, event_content := {resume, H}}, |
783 |
|
?EXT_C2S_STATE(resume_session), StateData) -> |
784 |
13 |
case handle_resume_call(StateData, From, H) of |
785 |
|
{ok, SmState} -> |
786 |
11 |
NewStateData = mongoose_c2s:merge_mod_state(StateData, #{?MODULE => SmState}), |
787 |
11 |
{stop, {shutdown, resumed}, NewStateData}; |
788 |
|
error -> |
789 |
2 |
{stop, {shutdown, resumed}} |
790 |
|
end; |
791 |
|
handle_event({timeout, ?MODULE}, resume_timeout, ?EXT_C2S_STATE(resume_session), StateData) -> |
792 |
5 |
{stop, {shutdown, ?MODULE}, StateData}; |
793 |
|
handle_event(EventType, EventContent, C2SState, StateData) -> |
794 |
76 |
mongoose_c2s:handle_event(EventType, EventContent, C2SState, StateData). |
795 |
|
|
796 |
|
%% This runs on the old process |
797 |
|
-spec handle_resume_call( |
798 |
|
mongoose_c2s:data(), gen_statem:from(), non_neg_integer()) -> |
799 |
|
{ok, sm_state()} | error. |
800 |
|
handle_resume_call(StateData, From, H) -> |
801 |
14 |
case do_handle_ack(get_mod_state(StateData), H) of |
802 |
|
#sm_state{} = SmState1 -> |
803 |
12 |
KeepSmState = sm_state_to_keep(SmState1, From), |
804 |
12 |
mongoose_c2s:c2s_stream_error(StateData, mongoose_xmpp_errors:stream_conflict()), |
805 |
12 |
PassSmState = pipeline_future_sm_state(StateData, SmState1), |
806 |
12 |
pass_c2s_data_to_new_session(StateData, PassSmState, From), |
807 |
12 |
{ok, KeepSmState}; |
808 |
|
HandledAck -> |
809 |
2 |
ReplyAction = {reply, From, HandledAck}, |
810 |
2 |
gen_statem:reply(ReplyAction), |
811 |
2 |
error |
812 |
|
end. |
813 |
|
|
814 |
|
-spec pipeline_future_sm_state(mongoose_c2s:data(), sm_state()) -> sm_state(). |
815 |
|
pipeline_future_sm_state(StateData, SmState) -> |
816 |
12 |
Sid = mongoose_c2s:get_sid(StateData), |
817 |
12 |
WithResumedFrom = SmState#sm_state{peer = {sid, Sid}}, |
818 |
12 |
recover_messages(WithResumedFrom). |
819 |
|
|
820 |
|
-spec pass_c2s_data_to_new_session(mongoose_c2s:data(), sm_state(), gen_statem:from()) -> ok. |
821 |
|
pass_c2s_data_to_new_session(StateData, PassSmState, From) -> |
822 |
12 |
FutureStateData = mongoose_c2s:merge_mod_state(StateData, #{?MODULE => PassSmState}), |
823 |
12 |
ReplyAction = {reply, From, {ok, FutureStateData}}, |
824 |
12 |
gen_statem:reply(ReplyAction). |
825 |
|
|
826 |
|
%% The dying c2s will keep no buffer and will only know of the session that resumed him off, |
827 |
|
%% to reroute him messages later |
828 |
|
-spec sm_state_to_keep(sm_state(), gen_statem:from()) -> sm_state(). |
829 |
|
sm_state_to_keep(SmState, From) -> |
830 |
12 |
SmState#sm_state{buffer = [], buffer_size = 0, peer = {gen_statem, From}}. |
831 |
|
|
832 |
|
%% |
833 |
|
%% API for `mongoose_c2s' |
834 |
|
%% |
835 |
|
|
836 |
|
-spec recover_messages(sm_state()) -> sm_state(). |
837 |
|
recover_messages(SmState) -> |
838 |
13 |
receive |
839 |
|
{route, Acc} -> |
840 |
1 |
recover_messages(maybe_buffer_acc(SmState, Acc, is_message(mongoose_acc:stanza_name(Acc)))) |
841 |
|
after 0 -> |
842 |
12 |
SmState |
843 |
|
end. |
844 |
|
|
845 |
|
-spec maybe_buffer_acc(sm_state(), mongoose_acc:t(), boolean()) -> sm_state(). |
846 |
|
maybe_buffer_acc(#sm_state{buffer = Buffer, buffer_size = BufferSize} = SmState, Acc, true) -> |
847 |
1 |
SmState#sm_state{buffer = [Acc | Buffer], buffer_size = BufferSize + 1}; |
848 |
|
maybe_buffer_acc(SmState, _Acc, false) -> |
849 |
:-( |
SmState. |
850 |
|
|
851 |
|
%% IQs and presences are allowed to come to the same SID only |
852 |
|
-spec is_message(binary()) -> boolean(). |
853 |
1 |
is_message(<<"message">>) -> true; |
854 |
:-( |
is_message(_) -> false. |
855 |
|
|
856 |
|
-spec is_sm_element(mongoose_acc:t()) -> boolean(). |
857 |
|
is_sm_element(Acc) -> |
858 |
23607 |
El = mongoose_acc:element(Acc), |
859 |
23607 |
?NS_STREAM_MGNT_3 =:= exml_query:attr(El, <<"xmlns">>). |
860 |
|
|
861 |
|
-spec make_smid() -> smid(). |
862 |
|
make_smid() -> |
863 |
44 |
base64:encode(crypto:strong_rand_bytes(21)). |
864 |
|
|
865 |
|
%% Getters |
866 |
|
-spec get_session_from_smid(mongooseim:host_type(), smid()) -> maybe_smid(). |
867 |
|
get_session_from_smid(HostType, SMID) -> |
868 |
23 |
case get_sid(HostType, SMID) of |
869 |
15 |
{sid, SID} -> {sid, SID}; |
870 |
|
{error, smid_not_found} -> |
871 |
8 |
get_stale_h(HostType, SMID) |
872 |
|
end. |
873 |
|
|
874 |
|
-spec get_stale_h(mongooseim:host_type(), smid()) -> |
875 |
|
{stale_h, non_neg_integer()} | {error, smid_not_found}. |
876 |
|
get_stale_h(HostType, SMID) -> |
877 |
29 |
case is_stale_h_enabled(HostType) of |
878 |
4 |
false -> {error, smid_not_found}; |
879 |
25 |
true -> read_stale_h(HostType, SMID) |
880 |
|
end. |
881 |
|
|
882 |
|
-spec get_buffer_max(mongooseim:host_type()) -> buffer_max(). |
883 |
|
get_buffer_max(HostType) -> |
884 |
82 |
gen_mod:get_module_opt(HostType, ?MODULE, buffer_max). |
885 |
|
|
886 |
|
-spec get_ack_freq(mongooseim:host_type()) -> ack_freq(). |
887 |
|
get_ack_freq(HostType) -> |
888 |
82 |
gen_mod:get_module_opt(HostType, ?MODULE, ack_freq). |
889 |
|
|
890 |
|
-spec get_resume_timeout(mongooseim:host_type()) -> pos_integer(). |
891 |
|
get_resume_timeout(HostType) -> |
892 |
60 |
timer:seconds(gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout)). |
893 |
|
|
894 |
|
-spec register_stale_smid_h(mongooseim:host_type(), smid(), short()) -> ok | {error, any()}. |
895 |
|
register_stale_smid_h(HostType, SMID, H) -> |
896 |
58 |
case is_stale_h_enabled(HostType) of |
897 |
50 |
false -> ok; |
898 |
8 |
true -> write_stale_h(HostType, SMID, H) |
899 |
|
end. |
900 |
|
|
901 |
|
-spec remove_stale_smid_h(mongooseim:host_type(), smid()) -> ok | {error, any()}. |
902 |
|
remove_stale_smid_h(HostType, SMID) -> |
903 |
:-( |
case is_stale_h_enabled(HostType) of |
904 |
:-( |
false -> ok; |
905 |
:-( |
true -> delete_stale_h(HostType, SMID) |
906 |
|
end. |
907 |
|
|
908 |
|
-spec is_stale_h_enabled(mongooseim:host_type()) -> boolean(). |
909 |
|
is_stale_h_enabled(HostType) -> |
910 |
87 |
gen_mod:get_module_opt(HostType, ?MODULE, [stale_h, enabled]). |
911 |
|
|
912 |
|
%% Backend operations |
913 |
|
|
914 |
|
-spec register_smid(HostType, SMID, SID) -> |
915 |
|
ok | {error, term()} when |
916 |
|
HostType :: mongooseim:host_type(), |
917 |
|
SMID :: smid(), |
918 |
|
SID :: ejabberd_sm:sid(). |
919 |
|
register_smid(HostType, SMID, SID) -> |
920 |
57 |
mod_stream_management_backend:register_smid(HostType, SMID, SID). |
921 |
|
|
922 |
|
-spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) -> |
923 |
|
{ok, SMID :: smid()} | {error, smid_not_found}. |
924 |
|
unregister_smid(HostType, SID) -> |
925 |
149 |
mod_stream_management_backend:unregister_smid(HostType, SID). |
926 |
|
|
927 |
|
-spec get_sid(mongooseim:host_type(), smid()) -> |
928 |
|
{sid, ejabberd_sm:sid()} | {error, smid_not_found}. |
929 |
|
get_sid(HostType, SMID) -> |
930 |
24 |
mod_stream_management_backend:get_sid(HostType, SMID). |
931 |
|
|
932 |
|
%% stale_h |
933 |
|
|
934 |
|
-spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when |
935 |
|
HostType :: mongooseim:host_type(), |
936 |
|
SMID :: smid(), |
937 |
|
H :: non_neg_integer(). |
938 |
|
write_stale_h(HostType, SMID, H) -> |
939 |
8 |
mod_stream_management_backend:write_stale_h(HostType, SMID, H). |
940 |
|
|
941 |
|
-spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when |
942 |
|
HostType :: mongooseim:host_type(), |
943 |
|
SMID :: smid(). |
944 |
|
delete_stale_h(HostType, SMID) -> |
945 |
:-( |
mod_stream_management_backend:delete_stale_h(HostType, SMID). |
946 |
|
|
947 |
|
-spec read_stale_h(HostType, SMID) -> |
948 |
|
{stale_h, non_neg_integer()} | {error, smid_not_found} when |
949 |
|
HostType :: mongooseim:host_type(), |
950 |
|
SMID :: smid(). |
951 |
|
read_stale_h(HostType, SMID) -> |
952 |
25 |
mod_stream_management_backend:read_stale_h(HostType, SMID). |