./ct_report/coverage/mod_stream_management.COVER.html

1 -module(mod_stream_management).
2 -xep([{xep, 198}, {version, "1.6.1"}]).
3 -behaviour(gen_mod).
4 -behaviour(gen_statem).
5 -behaviour(mongoose_module_metrics).
6
7 %% `gen_mod' callbacks
8 -export([start/2,
9 stop/1,
10 hooks/1,
11 config_spec/0,
12 supported_features/0,
13 process_buffer_and_ack/1]).
14
15 %% hooks handlers
16 -export([c2s_stream_features/3,
17 session_cleanup/3,
18 user_send_packet/3,
19 user_receive_packet/3,
20 xmpp_presend_element/3,
21 user_send_xmlel/3,
22 foreign_event/3,
23 handle_user_stopping/3,
24 user_terminate/3,
25 reroute_unacked_messages/3
26 ]).
27
28 %% gen_statem callbacks
29 -export([callback_mode/0, init/1, handle_event/4, terminate/3]).
30
31 %% helpers
32 -export([handle_resume/3, if_not_already_enabled_create_sm_state/1]).
33 -export([stream_error/2, register_smid_return_enabled_stanza/1]).
34
35 %% API for inspection and tests
36 -export([get_sid/2, get_stale_h/2, get_session_from_smid/2,
37 register_smid/3, register_stale_smid_h/3, remove_stale_smid_h/2]).
38 -ignore_xref([get_sid/2, get_stale_h/2, get_session_from_smid/2,
39 register_smid/3, register_stale_smid_h/3, remove_stale_smid_h/2]).
40
41 -include("mongoose.hrl").
42 -include("jlib.hrl").
43 -include("mongoose_config_spec.hrl").
44 -define(STREAM_MGMT_H_MAX, (1 bsl 32 - 1)).
45 -define(CONSTRAINT_CHECK_TIMEOUT, 5000). %% 5 seconds
46 -define(IS_STREAM_MGMT_STOP(R), R =:= {shutdown, ?MODULE}; R =:= {shutdown, resumed}).
47 -define(IS_ALLOWED_STATE(S), S =:= wait_for_session_establishment; S =:= session_established).
48
49 -record(sm_state, {
50 buffer = [] :: [mongoose_acc:t()],
51 buffer_size = 0 :: non_neg_integer(),
52 counter_in = 0 :: short(),
53 counter_out = 0 :: short(),
54 buffer_max = 100 :: buffer_max(),
55 ack_freq = 1 :: ack_freq(),
56 peer :: undefined | {gen_statem, gen_statem:from()} | {sid, ejabberd_sm:sid()}
57 }).
58
59 -type sm_state() :: #sm_state{}.
60 -type maybe_sm_state() :: {error, not_found} | #sm_state{}.
61 -type c2s_state() :: mongoose_c2s:state(resume_session).
62
63 -type buffer_max() :: pos_integer() | infinity | no_buffer.
64 -type ack_freq() :: pos_integer() | never.
65
66 -type resume_return() :: {ok, #{resumed := exml:element(),
67 forward := [exml:element()],
68 c2s_state := mongoose_c2s:state(),
69 c2s_data := mongoose_c2s:data()}}
70 | {stream_mgmt_error, exml:element()}
71 | {error, exml:element()}
72 | {error, exml:element(), term()}.
73
74 %% Type base64:ascii_binary() is not exported
75 -type smid() :: binary().
76 -type short() :: 0..?STREAM_MGMT_H_MAX.
77
78 -type maybe_smid() :: {sid, ejabberd_sm:sid()}
79 | {stale_h, non_neg_integer()}
80 | {error, smid_not_found}.
81
82 -export_type([smid/0, short/0, sm_state/0, c2s_state/0]).
83
84 %%
85 %% `gen_mod' callbacks
86 %%
87
88 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
89 start(HostType, Opts) ->
90 487 mod_stream_management_backend:init(HostType, Opts),
91 487 ?LOG_INFO(#{what => stream_management_starting}).
92
93 -spec stop(mongooseim:host_type()) -> ok.
94 stop(HostType) ->
95 487 ?LOG_INFO(#{what => stream_management_stopping}),
96 487 mod_stream_management_backend:stop(HostType),
97 487 ok.
98
99 -spec hooks(mongooseim:host_type()) -> gen_hook:hook_list().
100 hooks(HostType) ->
101 [{c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 50},
102 {session_cleanup, HostType, fun ?MODULE:session_cleanup/3, #{}, 50}]
103 974 ++ mod_stream_management_sasl2:hooks(HostType) ++ c2s_hooks(HostType).
104
105 -spec c2s_hooks(mongooseim:host_type()) -> gen_hook:hook_list(mongoose_c2s_hooks:fn()).
106 c2s_hooks(HostType) ->
107 974 [
108 {user_send_packet, HostType, fun ?MODULE:user_send_packet/3, #{}, 20},
109 {user_receive_packet, HostType, fun ?MODULE:user_receive_packet/3, #{}, 10},
110 {xmpp_presend_element, HostType, fun ?MODULE:xmpp_presend_element/3, #{}, 50},
111 {user_send_xmlel, HostType, fun ?MODULE:user_send_xmlel/3, #{}, 50},
112 {foreign_event, HostType, fun ?MODULE:foreign_event/3, #{}, 50},
113 {user_stop_request, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 100},
114 {user_socket_closed, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 100},
115 {user_socket_error, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 100},
116 {user_terminate, HostType, fun ?MODULE:user_terminate/3, #{}, 50},
117 {reroute_unacked_messages, HostType, fun ?MODULE:reroute_unacked_messages/3, #{}, 80}
118 ].
119
120 -spec config_spec() -> mongoose_config_spec:config_section().
121 config_spec() ->
122 208 #section{
123 items = #{<<"backend">> => #option{type = atom, validate = {module, ?MODULE}},
124 <<"buffer">> => #option{type = boolean},
125 <<"buffer_max">> => #option{type = int_or_infinity,
126 validate = positive},
127 <<"ack">> => #option{type = boolean},
128 <<"ack_freq">> => #option{type = integer,
129 validate = positive},
130 <<"resume_timeout">> => #option{type = integer,
131 validate = positive},
132 <<"stale_h">> => stale_h_config_spec()
133 },
134 process = fun ?MODULE:process_buffer_and_ack/1,
135 defaults = #{<<"backend">> => mnesia,
136 <<"buffer">> => true,
137 <<"buffer_max">> => 100,
138 <<"ack">> => true,
139 <<"ack_freq">> => 1,
140 <<"resume_timeout">> => 600 % seconds
141 }
142 }.
143
144 204 supported_features() -> [dynamic_domains].
145
146 process_buffer_and_ack(Opts = #{buffer := Buffer, ack := Ack}) ->
147 104 OptsWithBuffer = check_buffer(Opts, Buffer),
148 104 check_ack(OptsWithBuffer, Ack).
149
150 check_buffer(Opts, false) ->
151
:-(
Opts#{buffer_max => no_buffer};
152 check_buffer(Opts, _) ->
153 104 Opts.
154
155 check_ack(Opts, false) ->
156
:-(
Opts#{ack_freq => never};
157 check_ack(Opts, _) ->
158 104 Opts.
159
160 stale_h_config_spec() ->
161 208 #section{
162 items = #{<<"enabled">> => #option{type = boolean},
163 <<"repeat_after">> => #option{type = integer,
164 validate = positive},
165 <<"geriatric">> => #option{type = integer,
166 validate = positive}},
167 include = always,
168 defaults = #{<<"enabled">> => false,
169 <<"repeat_after">> => 1800, % seconds
170 <<"geriatric">> => 3600 % seconds
171 }
172 }.
173
174 %%
175 %% hooks handlers
176 %%
177
178 -spec user_send_packet(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
179 mongoose_c2s_hooks:result().
180 user_send_packet(Acc, #{c2s_data := StateData}, _Extra) ->
181 23607 case {get_mod_state(StateData), is_sm_element(Acc)} of
182 {#sm_state{counter_in = Counter} = SmState, false} ->
183 146 NewSmState = SmState#sm_state{counter_in = incr_counter(Counter)},
184 146 {ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})};
185 {_, _} ->
186 23461 {ok, Acc}
187 end;
188 user_send_packet(Acc, _Params, _Extra) ->
189
:-(
{ok, Acc}.
190
191 -spec user_receive_packet(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
192 mongoose_c2s_hooks:result().
193 user_receive_packet(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _) ->
194 40986 Check1 = is_conflict_incoming_acc(Acc, StateData),
195 40986 Check2 = is_conflict_receiver_sid(Acc, StateData),
196 40986 case {Check1, Check2} of
197 {true, _} -> %% A race condition detected: same jid, but different sids
198 4 C2SSid = mongoose_c2s:get_sid(StateData),
199 4 OriginSid = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
200 4 ?LOG_WARNING(#{what => conflict_check_failed,
201 text => <<"Drop Acc that is addressed to another connection "
202 "(origin SID check failed)">>,
203 c2s_sid => C2SSid, origin_sid => OriginSid,
204
:-(
acc => Acc, state_name => C2SState, c2s_state => StateData}),
205 4 {stop, Acc};
206 {_, true} ->
207
:-(
C2SSid = mongoose_c2s:get_sid(StateData),
208
:-(
ReceiverSID = mongoose_acc:get(c2s, receiver_sid, undefined, Acc),
209
:-(
?LOG_WARNING(#{what => conflict_check_failed,
210 text => <<"Drop Acc that is addressed to another connection "
211 "(receiver SID check failed)">>,
212 c2s_sid => C2SSid, receiver_sid => ReceiverSID,
213
:-(
acc => Acc, state_name => C2SState, c2s_state => StateData}),
214
:-(
{stop, Acc};
215 _ ->
216 40982 {ok, Acc}
217 end;
218 user_receive_packet(Acc, _Params, _Extra) ->
219
:-(
{ok, Acc}.
220
221 -spec xmpp_presend_element(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
222 mongoose_c2s_hooks:result().
223 xmpp_presend_element(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _Extra) ->
224 35035 case {get_mod_state(StateData), mongoose_acc:stanza_type(Acc)} of
225 {{error, not_found}, _} ->
226 34735 {ok, Acc};
227 {_, <<"probe">>} ->
228
:-(
{ok, Acc};
229 {#sm_state{buffer_max = no_buffer} = SmState, _} ->
230
:-(
maybe_send_ack_request(Acc, SmState);
231 {SmState, _} ->
232 300 Jid = mongoose_c2s:get_jid(StateData),
233 300 handle_buffer_and_ack(Acc, C2SState, Jid, SmState)
234 end.
235
236 -spec handle_buffer_and_ack(mongoose_acc:t(), c2s_state(), jid:jid(), sm_state()) ->
237 mongoose_c2s_hooks:result().
238 handle_buffer_and_ack(Acc, C2SState, Jid, #sm_state{buffer = Buffer, buffer_max = BufferMax,
239 buffer_size = BufferSize} = SmState) ->
240 300 NewBufferSize = BufferSize + 1,
241 300 MaybeActions = case is_buffer_full(NewBufferSize, BufferMax) of
242 true ->
243 38 {{timeout, ?MODULE}, ?CONSTRAINT_CHECK_TIMEOUT, check_buffer_full};
244 false ->
245 262 []
246 end,
247 300 Acc1 = notify_unacknowledged_msg_if_in_resume_state(Acc, Jid, C2SState),
248 300 NewSmState = SmState#sm_state{buffer = [Acc1 | Buffer], buffer_size = NewBufferSize},
249 300 Acc2 = mongoose_c2s_acc:to_acc(Acc, actions, MaybeActions),
250 300 maybe_send_ack_request(Acc2, NewSmState).
251
252 notify_unacknowledged_msg_if_in_resume_state(Acc, Jid, ?EXT_C2S_STATE(resume_session)) ->
253 34 maybe_notify_unacknowledged_msg(Acc, Jid);
254 notify_unacknowledged_msg_if_in_resume_state(Acc, _, _) ->
255 266 Acc.
256
257 -spec is_buffer_full(non_neg_integer(), buffer_max()) -> boolean().
258 is_buffer_full(_BufferSize, infinity) ->
259
:-(
false;
260 is_buffer_full(BufferSize, BufferMax) when BufferSize =< BufferMax ->
261 263 false;
262 is_buffer_full(_, _) ->
263 40 true.
264
265 -spec maybe_send_ack_request(mongoose_acc:t(), sm_state()) ->
266 mongoose_c2s_hooks:result().
267 maybe_send_ack_request(Acc, #sm_state{buffer_size = BufferSize,
268 counter_out = Out,
269 ack_freq = AckFreq} = SmState)
270 when 0 =:= (Out + BufferSize) rem AckFreq, ack_freq =/= never ->
271 144 Stanza = mod_stream_management_stanzas:stream_mgmt_request(),
272 144 ToAcc = [{socket_send, Stanza}, {state_mod, {?MODULE, SmState}}],
273 144 {ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
274 maybe_send_ack_request(Acc, SmState) ->
275 156 {ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, SmState})}.
276
277 -spec user_send_xmlel(Acc, Params, Extra) -> Result when
278 Acc :: mongoose_acc:t(),
279 Params :: mongoose_c2s_hooks:params(),
280 Extra :: gen_hook:extra(),
281 Result :: mongoose_c2s_hooks:result().
282 user_send_xmlel(Acc, Params, Extra) ->
283 189 El = mongoose_acc:element(Acc),
284 189 case exml_query:attr(El, <<"xmlns">>) of
285 ?NS_STREAM_MGNT_3 ->
286 167 handle_stream_mgmt(Acc, Params, El);
287 _ ->
288 22 user_send_packet(Acc, Params, Extra)
289 end.
290
291 -spec foreign_event(Acc, Params, Extra) -> Result when
292 Acc :: mongoose_acc:t(),
293 Params :: mongoose_c2s_hooks:params(),
294 Extra :: gen_hook:extra(),
295 Result :: mongoose_c2s_hooks:result().
296 foreign_event(Acc, #{c2s_data := StateData,
297 event_type := {call, From},
298 event_tag := ?MODULE,
299 event_content := {resume, H}}, _Extra) ->
300 1 case handle_resume_call(StateData, From, H) of
301 {ok, SmState} ->
302 1 FlushedStateData = mongoose_c2s:merge_mod_state(StateData, #{?MODULE => SmState}),
303 1 ToAcc = [{c2s_data, FlushedStateData}, {hard_stop, {shutdown, resumed}}],
304 1 {stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
305 error ->
306
:-(
{stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, bad_stream_management_request)}
307 end;
308 foreign_event(Acc, #{c2s_data := StateData, event_type := {timeout, ?MODULE}, event_content := check_buffer_full}, _Extra) ->
309 3 #sm_state{buffer_size = BufferSize, buffer_max = BufferMax} = get_mod_state(StateData),
310 3 case is_buffer_full(BufferSize, BufferMax) of
311 true ->
312 2 Lang = mongoose_c2s:get_lang(StateData),
313 2 Err = mongoose_xmpp_errors:stream_resource_constraint(Lang, <<"too many unacked stanzas">>),
314 2 mongoose_c2s:c2s_stream_error(StateData, Err),
315 2 {stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, too_many_unacked_stanzas)};
316 false ->
317 1 {ok, Acc}
318 end;
319 foreign_event(Acc, _Params, _Extra) ->
320 132 {ok, Acc}.
321
322 -spec handle_user_stopping(Acc, Params, Extra) -> Result when
323 Acc :: mongoose_acc:t(),
324 Params :: mongoose_c2s_hooks:params(),
325 Extra :: gen_hook:extra(),
326 Result :: mongoose_c2s_hooks:result().
327 handle_user_stopping(Acc, #{c2s_data := StateData}, #{host_type := HostType}) ->
328 210 case get_mod_state(StateData) of
329 {error, not_found} ->
330 150 {ok, Acc};
331 SmState ->
332 60 Timeout = get_resume_timeout(HostType),
333 60 NewSmState = notify_unacknowledged_messages(Acc, StateData, SmState),
334 60 Actions = [{push_callback_module, ?MODULE}, {{timeout, ?MODULE}, Timeout, resume_timeout}, hibernate],
335 60 ToAcc = [{c2s_state, ?EXT_C2S_STATE(resume_session)}, {actions, Actions}, {state_mod, {?MODULE, NewSmState}}],
336 60 {stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}
337 end.
338
339 -spec notify_unacknowledged_messages(mongoose_acc:t(), mongoose_c2s:data(), sm_state()) ->
340 sm_state().
341 notify_unacknowledged_messages(_, StateData, #sm_state{buffer = Buffer} = SmState) ->
342 60 Jid = mongoose_c2s:get_jid(StateData),
343 60 NewBuffer = [maybe_notify_unacknowledged_msg(Acc, Jid) || Acc <- lists:reverse(Buffer)],
344 60 SmState#sm_state{buffer = lists:reverse(NewBuffer)}.
345
346 -spec maybe_notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t().
347 maybe_notify_unacknowledged_msg(Acc, Jid) ->
348 169 case mongoose_acc:stanza_name(Acc) of
349 110 <<"message">> -> notify_unacknowledged_msg(Acc, Jid);
350 59 _ -> Acc
351 end.
352
353 -spec notify_unacknowledged_msg(mongoose_acc:t(), jid:jid()) -> mongoose_acc:t().
354 notify_unacknowledged_msg(Acc, Jid) ->
355 110 NewAcc = mongoose_hooks:unacknowledged_message(Acc, Jid),
356 110 mongoose_acc:strip(NewAcc).
357
358 -spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
359 mongoose_c2s_hooks:result().
360 reroute_unacked_messages(Acc, #{c2s_data := StateData, reason := Reason}, #{host_type := HostType}) ->
361 6967 MaybeSmState = get_mod_state(StateData),
362 6967 maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, Reason, MaybeSmState).
363
364 -spec user_terminate(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
365 mongoose_c2s_hooks:result().
366 user_terminate(Acc, #{reason := Reason}, _Extra) when ?IS_STREAM_MGMT_STOP(Reason) ->
367 55 {stop, Acc}; %% We stop here because this termination was triggered internally
368 user_terminate(Acc, _Params, _Extra) ->
369 6912 {ok, Acc}.
370
371 -spec maybe_handle_stream_mgmt_reroute(
372 mongoose_acc:t(), mongoose_c2s:data(), mongooseim:host_type(), term(), maybe_sm_state()) ->
373 mongoose_c2s_hooks:result().
374 maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, Reason, #sm_state{counter_in = H} = SmState)
375 when ?IS_STREAM_MGMT_STOP(Reason) ->
376 55 Sid = mongoose_c2s:get_sid(StateData),
377 55 do_remove_smid(HostType, Sid, H),
378 55 NewSmState = handle_user_terminate(SmState, StateData, HostType),
379 55 {ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})};
380 maybe_handle_stream_mgmt_reroute(Acc, StateData, HostType, _Reason, #sm_state{} = SmState) ->
381 39 NewSmState = handle_user_terminate(SmState, StateData, HostType),
382 39 {ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})};
383 maybe_handle_stream_mgmt_reroute(Acc, _StateData, _HostType, _Reason, {error, not_found}) ->
384 6873 {ok, Acc}.
385
386 -spec handle_user_terminate(sm_state(), mongoose_c2s:data(), mongooseim:host_type()) -> sm_state().
387 handle_user_terminate(#sm_state{counter_in = H} = SmState, StateData, HostType) ->
388 94 Sid = mongoose_c2s:get_sid(StateData),
389 94 do_remove_smid(HostType, Sid, H),
390 94 reroute_buffer(StateData, SmState),
391 94 SmState#sm_state{buffer = [], buffer_size = 0}.
392
393 reroute_buffer(StateData, #sm_state{buffer = Buffer, peer = {gen_statem, {Pid, _}}}) ->
394 12 mongoose_c2s:reroute_buffer_to_pid(StateData, Pid, Buffer);
395 reroute_buffer(StateData, #sm_state{buffer = Buffer}) ->
396 82 mongoose_c2s:reroute_buffer(StateData, Buffer).
397
398 -spec terminate(term(), c2s_state(), mongoose_c2s:data()) -> term().
399 terminate(Reason, C2SState, StateData) ->
400 55 ?LOG_DEBUG(#{what => stream_mgmt_statem_terminate, reason => Reason,
401 55 c2s_state => C2SState, c2s_data => StateData}),
402 55 mongoose_c2s:terminate({shutdown, ?MODULE}, C2SState, StateData).
403
404 -spec handle_stream_mgmt(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
405 mongoose_c2s_hooks:result().
406 handle_stream_mgmt(Acc, Params = #{c2s_state := C2SState}, El = #xmlel{name = <<"a">>})
407 when ?IS_ALLOWED_STATE(C2SState) ->
408 61 handle_a(Acc, Params, El);
409 handle_stream_mgmt(Acc, Params = #{c2s_state := C2SState}, #xmlel{name = <<"r">>})
410 when ?IS_ALLOWED_STATE(C2SState) ->
411 7 handle_r(Acc, Params);
412 handle_stream_mgmt(Acc, Params = #{c2s_state := C2SState}, El = #xmlel{name = <<"enable">>})
413 when ?IS_ALLOWED_STATE(C2SState) ->
414 81 handle_enable(Acc, Params, El);
415 handle_stream_mgmt(Acc, Params = #{c2s_state := {wait_for_feature_after_auth, _}}, El = #xmlel{name = <<"resume">>}) ->
416 15 handle_resume_request(Acc, Params, El);
417 handle_stream_mgmt(Acc, #{c2s_data := StateData, c2s_state := C2SState}, _El) ->
418 3 unexpected_sm_request(Acc, StateData, C2SState);
419 handle_stream_mgmt(Acc, _Params, _El) ->
420
:-(
{ok, Acc}.
421
422 -spec handle_r(mongoose_acc:t(), mongoose_c2s_hooks:params()) ->
423 mongoose_c2s_hooks:result().
424 handle_r(Acc, #{c2s_data := StateData}) ->
425 7 case get_mod_state(StateData) of
426 {error, not_found} ->
427
:-(
?LOG_WARNING(#{what => unexpected_r, c2s_state => StateData,
428
:-(
text => <<"received <r/> but stream management is off!">>}),
429
:-(
{ok, Acc};
430 #sm_state{counter_in = In} ->
431 7 Stanza = mod_stream_management_stanzas:stream_mgmt_ack(In),
432 7 {ok, mongoose_c2s_acc:to_acc(Acc, socket_send, Stanza)}
433 end.
434
435 -spec handle_a(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
436 mongoose_c2s_hooks:result().
437 handle_a(Acc, #{c2s_data := StateData}, El) ->
438 61 case {get_mod_state(StateData), stream_mgmt_parse_h(El)} of
439 {{error, not_found}, _} ->
440 1 {ok, Acc};
441 {_, invalid_h_attribute} ->
442 1 Stanza = mongoose_xmpp_errors:policy_violation(?MYLANG, <<"Invalid h attribute">>),
443 1 mongoose_c2s:c2s_stream_error(StateData, Stanza),
444 1 {stop, mongoose_c2s_acc:to_acc(Acc, hard_stop, invalid_h_attribute)};
445 {Handler, H} ->
446 59 HandledAck = do_handle_ack(Handler, H),
447 59 finish_handle_a(StateData, Acc, HandledAck)
448 end.
449
450 -spec finish_handle_a(mongoose_c2s:data(), mongoose_acc:t(), sm_state() | {error, term()}) ->
451 mongoose_c2s_hooks:result().
452 finish_handle_a(StateData, Acc, {error, ErrorStanza, Reason}) ->
453 2 mongoose_c2s:c2s_stream_error(StateData, ErrorStanza),
454 2 {stop, mongoose_c2s_acc:to_acc(Acc, stop, Reason)};
455 finish_handle_a(_StateData, Acc, #sm_state{} = NewSmState) ->
456 57 {ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewSmState})}.
457
458 -spec do_handle_ack(sm_state(), non_neg_integer()) -> sm_state() | {error, exml:element(), term()}.
459 do_handle_ack(#sm_state{counter_out = OldAcked,
460 buffer_size = BufferSize,
461 buffer = Buffer} = SmState, Acked) ->
462 73 ToDrop = calc_to_drop(Acked, OldAcked),
463 73 case BufferSize < ToDrop of
464 true ->
465 4 ErrorStanza0 = #xmlel{children = Children}
466 = mongoose_xmpp_errors:undefined_condition(
467 ?MYLANG, <<"You acknowledged more stanzas that what has been sent">>),
468 4 HandledCountField = mod_stream_management_stanzas:sm_handled_count_too_high_stanza(Acked, OldAcked),
469 4 ErrorStanza = ErrorStanza0#xmlel{children = [HandledCountField | Children]},
470 4 {error, ErrorStanza, {shutdown, sm_handled_count_too_high_stanza}};
471 false ->
472 69 NewSize = BufferSize - ToDrop,
473 69 {NewBuffer, _Dropped} = lists:split(NewSize, Buffer),
474 69 SmState#sm_state{counter_out = Acked, buffer_size = NewSize, buffer = NewBuffer}
475 end.
476
477 -spec incr_counter(short()) -> short().
478 incr_counter(Incoming) when Incoming < ?STREAM_MGMT_H_MAX - 1 ->
479 146 Incoming + 1;
480 incr_counter(_Incoming) ->
481
:-(
0.
482
483 -spec calc_to_drop(short(), short()) -> short().
484 calc_to_drop(Acked, OldAcked) when Acked >= OldAcked ->
485 73 Acked - OldAcked;
486 calc_to_drop(Acked, OldAcked) ->
487
:-(
Acked + ?STREAM_MGMT_H_MAX - OldAcked + 1.
488
489 -spec handle_enable(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
490 mongoose_c2s_hooks:result().
491 handle_enable(Acc, #{c2s_data := StateData}, El) ->
492 81 case if_not_already_enabled_create_sm_state(StateData) of
493 error ->
494 1 stream_error(Acc, StateData);
495 SmState ->
496 80 do_handle_enable(Acc, StateData, SmState, El)
497 end.
498
499 -spec do_handle_enable(mongoose_acc:t(), mongoose_c2s:data(), sm_state(), exml:element()) ->
500 mongoose_c2s_hooks:result().
501 do_handle_enable(Acc, StateData, SmState, El) ->
502 80 case exml_query:attr(El, <<"resume">>, false) of
503 false ->
504 37 Stanza = mod_stream_management_stanzas:stream_mgmt_enabled(),
505 37 ToAcc = [{state_mod, {?MODULE, SmState}}, {socket_send, Stanza}],
506 37 {stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
507 Attr when Attr =:= <<"true">>; Attr =:= <<"1">> ->
508 43 Stanza = register_smid_return_enabled_stanza(StateData),
509 43 ToAcc = [{state_mod, {?MODULE, SmState}}, {socket_send, Stanza}],
510 43 {stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
511 _ ->
512
:-(
stream_error(Acc, StateData)
513 end.
514
515 -spec register_smid_return_enabled_stanza(mongoose_c2s:data()) -> exml:element().
516 register_smid_return_enabled_stanza(StateData) ->
517 44 SMID = make_smid(),
518 44 Sid = mongoose_c2s:get_sid(StateData),
519 44 HostType = mongoose_c2s:get_host_type(StateData),
520 44 ok = register_smid(HostType, SMID, Sid),
521 44 mod_stream_management_stanzas:stream_mgmt_enabled([{<<"id">>, SMID}, {<<"resume">>, <<"true">>}]).
522
523 -spec if_not_already_enabled_create_sm_state(mongoose_c2s:data()) -> sm_state() | error.
524 if_not_already_enabled_create_sm_state(StateData) ->
525 83 case get_mod_state(StateData) of
526 1 #sm_state{} -> error;
527 {error, not_found} ->
528 82 HostType = mongoose_c2s:get_host_type(StateData),
529 82 build_sm_handler(HostType)
530 end.
531
532 -spec handle_resume_request(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
533 mongoose_c2s_hooks:result().
534 handle_resume_request(Acc, #{c2s_state := C2SState, c2s_data := C2SData}, El) ->
535 15 case handle_resume(C2SData, C2SState, El) of
536 {stream_mgmt_error, ErrorStanza} ->
537 4 stream_mgmt_error(Acc, C2SData, C2SState, ErrorStanza);
538 {error, ErrorStanza, Reason} ->
539 1 mongoose_c2s:c2s_stream_error(C2SData, ErrorStanza),
540 1 {stop, mongoose_c2s_acc:to_acc(Acc, stop, Reason)};
541 {error, ErrorStanza} ->
542
:-(
{stop, mongoose_c2s_acc:to_acc(Acc, socket_send, ErrorStanza)};
543 {ok, #{resumed := Resumed, forward := ToForward,
544 c2s_state := C2SState1, c2s_data := C2SData1}} ->
545 10 ToAcc = [{c2s_state, C2SState1}, {c2s_data, C2SData1}, {socket_send, [Resumed | ToForward]}],
546 10 {ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}
547 end.
548
549 %% This runs on the new process
550 -spec handle_resume(mongoose_c2s:data(), mongoose_c2s:state(), exml:element()) ->
551 resume_return().
552 handle_resume(C2SData, C2SState, El) ->
553 22 case {get_previd(El), stream_mgmt_parse_h(El), get_mod_state(C2SData)} of
554 {undefined, _, _} ->
555 1 {error, mod_stream_management_stanzas:stream_mgmt_failed(<<"bad-request">>)};
556 {_, invalid_h_attribute, _} ->
557 1 {error, mod_stream_management_stanzas:stream_mgmt_failed(<<"bad-request">>)};
558 {_, _, #sm_state{}} ->
559
:-(
{error, mod_stream_management_stanzas:stream_mgmt_failed(<<"bad-request">>)};
560 {SMID, H, {error, not_found}} ->
561 20 HostType = mongoose_c2s:get_host_type(C2SData),
562 20 FromSMID = get_session_from_smid(HostType, SMID),
563 20 do_handle_resume(C2SData, C2SState, SMID, H, FromSMID)
564 end.
565
566 -spec do_handle_resume(StateData, C2SState, SMID, H, FromSMID) -> HookResult when
567 StateData :: mongoose_c2s:data(),
568 C2SState :: c2s_state(),
569 SMID :: smid(),
570 H :: non_neg_integer(),
571 FromSMID :: maybe_smid(),
572 HookResult :: resume_return().
573 do_handle_resume(StateData, _C2SState, SMID, H, {sid, {_TS, Pid}}) ->
574 15 case get_peer_state(Pid, H) of
575 {ok, OldStateData} ->
576 12 NewState = mongoose_c2s:merge_states(OldStateData, StateData),
577 12 do_resume(NewState, SMID);
578 {error, ErrorStanza, Reason} ->
579 2 {error, ErrorStanza, Reason};
580 {exception, {C, R, S}} ->
581 1 ?LOG_WARNING(#{what => resumption_error,
582 text => <<"Resumption error because of invalid response">>,
583
:-(
class => C, reason => R, stacktrace => S, pid => Pid, c2s_state => StateData}),
584 1 {stream_mgmt_error, mod_stream_management_stanzas:stream_mgmt_failed(<<"item-not-found">>)}
585 end;
586 do_handle_resume(StateData, _C2SState, SMID, _H, {stale_h, StaleH}) ->
587 1 ?LOG_WARNING(#{what => resumption_error, reason => session_resumption_timed_out,
588
:-(
smid => SMID, stale_h => StaleH, c2s_state => StateData}),
589 1 {stream_mgmt_error, mod_stream_management_stanzas:stream_mgmt_failed(<<"item-not-found">>, [{<<"h">>, integer_to_binary(StaleH)}])};
590 do_handle_resume(StateData, _C2SState, SMID, _H, {error, smid_not_found}) ->
591 4 ?LOG_WARNING(#{what => resumption_error, reason => no_previous_session_for_smid,
592
:-(
smid => SMID, c2s_state => StateData}),
593 4 {stream_mgmt_error, mod_stream_management_stanzas:stream_mgmt_failed(<<"item-not-found">>)}.
594
595 %% This runs on the new process
596 -spec do_resume(StateData, SMID) -> HookResult when
597 StateData :: mongoose_c2s:data(),
598 SMID :: smid(),
599 HookResult :: resume_return().
600 do_resume(StateData, SMID) ->
601 12 {_ReplacedPids, StateData2} = mongoose_c2s:open_session(StateData),
602 12 ok = register_smid(StateData2, SMID),
603 12 {Resumed, ToForward} = get_all_stanzas_to_forward(StateData2, SMID),
604 12 {ok, #{resumed => Resumed, forward => ToForward,
605 c2s_state => session_established, c2s_data => StateData2}}.
606
607 register_smid(StateData, SMID) ->
608 12 Sid = mongoose_c2s:get_sid(StateData),
609 12 HostType = mongoose_c2s:get_host_type(StateData),
610 12 ok = register_smid(HostType, SMID, Sid).
611
612 -spec get_all_stanzas_to_forward(mongoose_c2s:data(), smid()) -> {exml:element(), [exml:element()]}.
613 get_all_stanzas_to_forward(StateData, SMID) ->
614 12 #sm_state{counter_in = Counter, buffer = Buffer} = get_mod_state(StateData),
615 12 Resumed = mod_stream_management_stanzas:stream_mgmt_resumed(SMID, Counter),
616 12 LServer = mongoose_c2s:get_lserver(StateData),
617 12 FromServer = jid:make_noprep(<<>>, LServer, <<>>),
618 12 ToForward = [ begin
619 26 TS = mongoose_acc:timestamp(Acc),
620 26 Packet = mongoose_acc:element(Acc),
621 26 StanzaName = mongoose_acc:stanza_name(Acc),
622 26 StanzaType = mongoose_acc:stanza_type(Acc),
623 26 maybe_add_timestamp(Packet, StanzaName, StanzaType, TS, FromServer)
624 12 end || Acc <- lists:reverse(Buffer)],
625 12 {Resumed, ToForward}.
626
627 maybe_add_timestamp(Packet, <<"message">>, <<"error">>, _, _) ->
628
:-(
Packet;
629 maybe_add_timestamp(Packet, <<"message">>, <<"headline">>, _, _) ->
630
:-(
Packet;
631 maybe_add_timestamp(Packet, <<"message">>, _, TS, FromServer) ->
632 24 jlib:maybe_append_delay(Packet, FromServer, TS, <<"SM Storage">>);
633 maybe_add_timestamp(Packet, _StanzaName, _StanzaType, _TS, _FromServer) ->
634 2 Packet.
635
636 %% If jid is the same, but sid is not, then we have a conflict.
637 %% jid example is alice@localhost/res1.
638 %% sid example is `{now(), pid()}'.
639 %% The conflict can happen, when actions with an accumulator were initiated by
640 %% one process but the resulting stanzas were routed to another process with
641 %% the same JID but different SID.
642 %% The conflict usually happens when a user is reconnecting.
643 %% Both origin_sid and origin_jid props should be defined.
644 %% But we don't force developers to set both of them, so we should correctly
645 %% process stanzas, that have only one properly set.
646 %% "Incoming" means that stanza is coming from ejabberd_router.
647 -spec is_conflict_incoming_acc(mongoose_acc:t(), mongoose_c2s:data()) -> boolean().
648 is_conflict_incoming_acc(Acc, StateData) ->
649 40986 OriginJid = mongoose_acc:get(c2s, origin_jid, undefined, Acc),
650 40986 OriginSid = mongoose_acc:get(c2s, origin_sid, undefined, Acc),
651 40986 AreOriginsDefined = OriginJid =/= undefined andalso OriginSid =/= undefined,
652 40986 case AreOriginsDefined of
653 false ->
654 10957 false;
655 true ->
656 30029 StateJid = mongoose_c2s:get_jid(StateData),
657 30029 SameJid = jid:are_equal(OriginJid, StateJid),
658 30029 StateSid = mongoose_c2s:get_sid(StateData),
659 30029 SameSid = OriginSid =:= StateSid,
660 % possible to receive response addressed to process which we resumed from - still valid!
661 30029 OldSid = maybe_get_resumed_from_sid(get_mod_state(StateData)),
662 30029 SameOldSession = OriginSid =:= OldSid,
663 30029 SameJid andalso not (SameSid or SameOldSession)
664 end.
665
666 -spec maybe_get_resumed_from_sid(maybe_sm_state()) -> undefined | ejabberd_sm:sid().
667 maybe_get_resumed_from_sid(#sm_state{peer = {sid, ResumedFrom}}) ->
668 5 ResumedFrom;
669 maybe_get_resumed_from_sid(_) ->
670 30024 undefined.
671
672 -spec is_conflict_receiver_sid(mongoose_acc:t(), mongoose_c2s:data()) -> boolean().
673 is_conflict_receiver_sid(Acc, StateData) ->
674 40986 StateSid = mongoose_c2s:get_sid(StateData),
675 40986 AccSid = mongoose_acc:get(c2s, receiver_sid, StateSid, Acc),
676 40986 StateSid =/= AccSid.
677
678 -spec stream_error(mongoose_acc:t(), mongoose_c2s:data()) ->
679 {stop, mongoose_acc:t()}.
680 stream_error(Acc, StateData) ->
681 1 Err = mod_stream_management_stanzas:stream_mgmt_failed(<<"unexpected-request">>),
682 1 mongoose_c2s:c2s_stream_error(StateData, Err),
683 1 {stop, mongoose_c2s_acc:to_acc(Acc, stop, {shutdown, stream_error})}.
684
685 -spec unexpected_sm_request(mongoose_acc:t(), mongoose_c2s:data(), c2s_state()) ->
686 {stop, mongoose_acc:t()}.
687 unexpected_sm_request(Acc, StateData, C2SState) ->
688 3 Err = mod_stream_management_stanzas:stream_mgmt_failed(<<"unexpected-request">>),
689 3 stream_mgmt_error(Acc, StateData, C2SState, Err).
690
691 -spec stream_mgmt_error(
692 mongoose_acc:t(), mongoose_c2s:data(), c2s_state(), exml:element()) ->
693 {stop, mongoose_acc:t()}.
694 stream_mgmt_error(Acc, _StateData, C2SState, Err) ->
695 7 case mongoose_c2s:maybe_retry_state(C2SState) of
696 {stop, {shutdown, retries}} ->
697
:-(
{stop, mongoose_c2s_acc:to_acc(Acc, stop, {shutdown, retries})};
698 NewC2SState ->
699 7 ToAcc = [{c2s_state, NewC2SState}, {socket_send, Err}],
700 7 {stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}
701 end.
702
703 -spec build_sm_handler(mongooseim:host_type()) -> sm_state().
704 build_sm_handler(HostType) ->
705 82 BufferMax = get_buffer_max(HostType),
706 82 AckFreq = get_ack_freq(HostType),
707 82 #sm_state{buffer_max = BufferMax, ack_freq = AckFreq}.
708
709 -spec get_mod_state(mongoose_c2s:data()) -> maybe_sm_state().
710 get_mod_state(StateData) ->
711 96050 case mongoose_c2s:get_mod_state(StateData, ?MODULE) of
712 1109 {ok, State} -> State;
713 94941 Error -> Error
714 end.
715
716 -spec get_peer_state(pid(), non_neg_integer()) ->
717 {ok, mongoose_c2s:data()} | {error, exml:element(), term()} | {exception, tuple()}.
718 get_peer_state(Pid, H) ->
719 15 try
720 15 mongoose_c2s:call(Pid, ?MODULE, {resume, H})
721 catch
722 1 C:R:S -> {exception, {C, R, S}}
723 end.
724
725 -spec stream_mgmt_parse_h(exml:element()) -> invalid_h_attribute | non_neg_integer().
726 stream_mgmt_parse_h(El) ->
727 83 case catch binary_to_integer(exml_query:attr(El, <<"h">>)) of
728 80 H when is_integer(H), H >= 0 -> H;
729 3 _ -> invalid_h_attribute
730 end.
731
732 -spec get_previd(exml:element()) -> undefined | binary().
733 get_previd(El) ->
734 22 exml_query:attr(El, <<"previd">>, undefined).
735
736 -spec c2s_stream_features(Acc, Params, Extra) -> {ok, Acc} when
737 Acc :: [exml:element()],
738 Params :: map(),
739 Extra :: gen_hook:extra().
740 c2s_stream_features(Acc, _, _) ->
741 13632 {ok, lists:keystore(<<"sm">>, #xmlel.name, Acc, mod_stream_management_stanzas:sm())}.
742
743 -spec session_cleanup(Acc, Params, Extra) -> {ok, Acc} when
744 Acc :: mongoose_acc:t(),
745 Params :: #{sid := ejabberd_sm:sid()},
746 Extra :: gen_hook:extra().
747 session_cleanup(Acc, #{sid := SID}, #{host_type := HostType}) ->
748
:-(
{ok, remove_smid(Acc, HostType, SID)}.
749
750
751 -spec remove_smid(mongoose_acc:t(), mongooseim:host_type(), ejabberd_sm:sid()) ->
752 mongoose_acc:t().
753 remove_smid(Acc, HostType, Sid) ->
754
:-(
H = mongoose_acc:get(stream_mgmt, h, undefined, Acc),
755
:-(
MaybeSMID = do_remove_smid(HostType, Sid, H),
756
:-(
mongoose_acc:set(stream_mgmt, smid, MaybeSMID, Acc).
757
758 do_remove_smid(HostType, Sid, H) ->
759 149 MaybeSMID = unregister_smid(HostType, Sid),
760 149 case MaybeSMID of
761 {ok, SMID} when H =/= undefined ->
762 52 register_stale_smid_h(HostType, SMID, H);
763 _ ->
764 97 ok
765 end,
766 149 MaybeSMID.
767
768 %%
769 %% `gen_statem' callbacks
770 %%
771
772 -spec callback_mode() -> gen_statem:callback_mode_result().
773 callback_mode() ->
774 60 handle_event_function.
775
776 -spec init(term()) -> gen_statem:init_result(c2s_state(), mongoose_c2s:data()).
777 init(_) ->
778
:-(
{stop, this_should_have_never_been_called}.
779
780 -spec handle_event(gen_statem:event_type(), term(), c2s_state(), mongoose_c2s:data()) ->
781 gen_statem:event_handler_result(c2s_state()).
782 handle_event({call, From}, #{event_tag := ?MODULE, event_content := {resume, H}},
783 ?EXT_C2S_STATE(resume_session), StateData) ->
784 13 case handle_resume_call(StateData, From, H) of
785 {ok, SmState} ->
786 11 NewStateData = mongoose_c2s:merge_mod_state(StateData, #{?MODULE => SmState}),
787 11 {stop, {shutdown, resumed}, NewStateData};
788 error ->
789 2 {stop, {shutdown, resumed}}
790 end;
791 handle_event({timeout, ?MODULE}, resume_timeout, ?EXT_C2S_STATE(resume_session), StateData) ->
792 5 {stop, {shutdown, ?MODULE}, StateData};
793 handle_event(EventType, EventContent, C2SState, StateData) ->
794 76 mongoose_c2s:handle_event(EventType, EventContent, C2SState, StateData).
795
796 %% This runs on the old process
797 -spec handle_resume_call(
798 mongoose_c2s:data(), gen_statem:from(), non_neg_integer()) ->
799 {ok, sm_state()} | error.
800 handle_resume_call(StateData, From, H) ->
801 14 case do_handle_ack(get_mod_state(StateData), H) of
802 #sm_state{} = SmState1 ->
803 12 KeepSmState = sm_state_to_keep(SmState1, From),
804 12 mongoose_c2s:c2s_stream_error(StateData, mongoose_xmpp_errors:stream_conflict()),
805 12 PassSmState = pipeline_future_sm_state(StateData, SmState1),
806 12 pass_c2s_data_to_new_session(StateData, PassSmState, From),
807 12 {ok, KeepSmState};
808 HandledAck ->
809 2 ReplyAction = {reply, From, HandledAck},
810 2 gen_statem:reply(ReplyAction),
811 2 error
812 end.
813
814 -spec pipeline_future_sm_state(mongoose_c2s:data(), sm_state()) -> sm_state().
815 pipeline_future_sm_state(StateData, SmState) ->
816 12 Sid = mongoose_c2s:get_sid(StateData),
817 12 WithResumedFrom = SmState#sm_state{peer = {sid, Sid}},
818 12 recover_messages(WithResumedFrom).
819
820 -spec pass_c2s_data_to_new_session(mongoose_c2s:data(), sm_state(), gen_statem:from()) -> ok.
821 pass_c2s_data_to_new_session(StateData, PassSmState, From) ->
822 12 FutureStateData = mongoose_c2s:merge_mod_state(StateData, #{?MODULE => PassSmState}),
823 12 ReplyAction = {reply, From, {ok, FutureStateData}},
824 12 gen_statem:reply(ReplyAction).
825
826 %% The dying c2s will keep no buffer and will only know of the session that resumed him off,
827 %% to reroute him messages later
828 -spec sm_state_to_keep(sm_state(), gen_statem:from()) -> sm_state().
829 sm_state_to_keep(SmState, From) ->
830 12 SmState#sm_state{buffer = [], buffer_size = 0, peer = {gen_statem, From}}.
831
832 %%
833 %% API for `mongoose_c2s'
834 %%
835
836 -spec recover_messages(sm_state()) -> sm_state().
837 recover_messages(SmState) ->
838 13 receive
839 {route, Acc} ->
840 1 recover_messages(maybe_buffer_acc(SmState, Acc, is_message(mongoose_acc:stanza_name(Acc))))
841 after 0 ->
842 12 SmState
843 end.
844
845 -spec maybe_buffer_acc(sm_state(), mongoose_acc:t(), boolean()) -> sm_state().
846 maybe_buffer_acc(#sm_state{buffer = Buffer, buffer_size = BufferSize} = SmState, Acc, true) ->
847 1 SmState#sm_state{buffer = [Acc | Buffer], buffer_size = BufferSize + 1};
848 maybe_buffer_acc(SmState, _Acc, false) ->
849
:-(
SmState.
850
851 %% IQs and presences are allowed to come to the same SID only
852 -spec is_message(binary()) -> boolean().
853 1 is_message(<<"message">>) -> true;
854
:-(
is_message(_) -> false.
855
856 -spec is_sm_element(mongoose_acc:t()) -> boolean().
857 is_sm_element(Acc) ->
858 23607 El = mongoose_acc:element(Acc),
859 23607 ?NS_STREAM_MGNT_3 =:= exml_query:attr(El, <<"xmlns">>).
860
861 -spec make_smid() -> smid().
862 make_smid() ->
863 44 base64:encode(crypto:strong_rand_bytes(21)).
864
865 %% Getters
866 -spec get_session_from_smid(mongooseim:host_type(), smid()) -> maybe_smid().
867 get_session_from_smid(HostType, SMID) ->
868 23 case get_sid(HostType, SMID) of
869 15 {sid, SID} -> {sid, SID};
870 {error, smid_not_found} ->
871 8 get_stale_h(HostType, SMID)
872 end.
873
874 -spec get_stale_h(mongooseim:host_type(), smid()) ->
875 {stale_h, non_neg_integer()} | {error, smid_not_found}.
876 get_stale_h(HostType, SMID) ->
877 29 case is_stale_h_enabled(HostType) of
878 4 false -> {error, smid_not_found};
879 25 true -> read_stale_h(HostType, SMID)
880 end.
881
882 -spec get_buffer_max(mongooseim:host_type()) -> buffer_max().
883 get_buffer_max(HostType) ->
884 82 gen_mod:get_module_opt(HostType, ?MODULE, buffer_max).
885
886 -spec get_ack_freq(mongooseim:host_type()) -> ack_freq().
887 get_ack_freq(HostType) ->
888 82 gen_mod:get_module_opt(HostType, ?MODULE, ack_freq).
889
890 -spec get_resume_timeout(mongooseim:host_type()) -> pos_integer().
891 get_resume_timeout(HostType) ->
892 60 timer:seconds(gen_mod:get_module_opt(HostType, ?MODULE, resume_timeout)).
893
894 -spec register_stale_smid_h(mongooseim:host_type(), smid(), short()) -> ok | {error, any()}.
895 register_stale_smid_h(HostType, SMID, H) ->
896 58 case is_stale_h_enabled(HostType) of
897 50 false -> ok;
898 8 true -> write_stale_h(HostType, SMID, H)
899 end.
900
901 -spec remove_stale_smid_h(mongooseim:host_type(), smid()) -> ok | {error, any()}.
902 remove_stale_smid_h(HostType, SMID) ->
903
:-(
case is_stale_h_enabled(HostType) of
904
:-(
false -> ok;
905
:-(
true -> delete_stale_h(HostType, SMID)
906 end.
907
908 -spec is_stale_h_enabled(mongooseim:host_type()) -> boolean().
909 is_stale_h_enabled(HostType) ->
910 87 gen_mod:get_module_opt(HostType, ?MODULE, [stale_h, enabled]).
911
912 %% Backend operations
913
914 -spec register_smid(HostType, SMID, SID) ->
915 ok | {error, term()} when
916 HostType :: mongooseim:host_type(),
917 SMID :: smid(),
918 SID :: ejabberd_sm:sid().
919 register_smid(HostType, SMID, SID) ->
920 57 mod_stream_management_backend:register_smid(HostType, SMID, SID).
921
922 -spec unregister_smid(mongooseim:host_type(), ejabberd_sm:sid()) ->
923 {ok, SMID :: smid()} | {error, smid_not_found}.
924 unregister_smid(HostType, SID) ->
925 149 mod_stream_management_backend:unregister_smid(HostType, SID).
926
927 -spec get_sid(mongooseim:host_type(), smid()) ->
928 {sid, ejabberd_sm:sid()} | {error, smid_not_found}.
929 get_sid(HostType, SMID) ->
930 24 mod_stream_management_backend:get_sid(HostType, SMID).
931
932 %% stale_h
933
934 -spec write_stale_h(HostType, SMID, H) -> ok | {error, any()} when
935 HostType :: mongooseim:host_type(),
936 SMID :: smid(),
937 H :: non_neg_integer().
938 write_stale_h(HostType, SMID, H) ->
939 8 mod_stream_management_backend:write_stale_h(HostType, SMID, H).
940
941 -spec delete_stale_h(HostType, SMID) -> ok | {error, any()} when
942 HostType :: mongooseim:host_type(),
943 SMID :: smid().
944 delete_stale_h(HostType, SMID) ->
945
:-(
mod_stream_management_backend:delete_stale_h(HostType, SMID).
946
947 -spec read_stale_h(HostType, SMID) ->
948 {stale_h, non_neg_integer()} | {error, smid_not_found} when
949 HostType :: mongooseim:host_type(),
950 SMID :: smid().
951 read_stale_h(HostType, SMID) ->
952 25 mod_stream_management_backend:read_stale_h(HostType, SMID).
Line Hits Source