./ct_report/coverage/mod_csi.COVER.html

1 -module(mod_csi).
2
3 -xep([{xep, 352}, {version, "1.0.0"}]).
4
5 -include("mongoose_config_spec.hrl").
6 -include("jlib.hrl").
7
8 -behaviour(gen_mod).
9 -behaviour(mongoose_module_metrics).
10
11 %% gen_mod callbacks
12 -export([start/2, stop/1, hooks/1, config_spec/0, supported_features/0]).
13
14 %% Hook handlers
15 -export([c2s_stream_features/3,
16 bind2_stream_features/3,
17 bind2_enable_features/3,
18 xmpp_presend_element/3,
19 user_send_xmlel/3,
20 handle_user_stopping/3,
21 reroute_unacked_messages/3
22 ]).
23
24 -record(csi_state, {
25 state = active :: state(),
26 buffer = [] :: [mongoose_acc:t()],
27 buffer_max = 20 :: non_neg_integer()
28 }).
29
30 -type params() :: #{c2s_data := mongoose_c2s:data(), _ := _}.
31 -type state() :: active | inactive.
32 -type csi_state() :: #csi_state{}.
33
34 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
35 start(HostType, _Opts) ->
36 3 ensure_metrics(HostType).
37
38 -spec stop(mongooseim:host_type()) -> ok.
39 stop(_HostType) ->
40 3 ok.
41
42 hooks(HostType) ->
43 6 [
44 {c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 60},
45 {bind2_stream_features, HostType, fun ?MODULE:bind2_stream_features/3, #{}, 50},
46 {bind2_enable_features, HostType, fun ?MODULE:bind2_enable_features/3, #{}, 50},
47 {xmpp_presend_element, HostType, fun ?MODULE:xmpp_presend_element/3, #{}, 45}, %% just before stream management!
48 {user_send_xmlel, HostType, fun ?MODULE:user_send_xmlel/3, #{}, 60},
49 {user_stop_request, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95},
50 {user_socket_closed, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95},
51 {user_socket_error, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95},
52 {reroute_unacked_messages, HostType, fun ?MODULE:reroute_unacked_messages/3, #{}, 70}
53 ].
54
55 ensure_metrics(HostType) ->
56 3 mongoose_metrics:ensure_metric(HostType, [modCSIInactive], spiral),
57 3 mongoose_metrics:ensure_metric(HostType, [modCSIActive], spiral).
58
59 -spec config_spec() -> mongoose_config_spec:config_section().
60 config_spec() ->
61 208 #section{
62 items = #{<<"buffer_max">> => #option{type = int_or_infinity,
63 validate = non_negative}},
64 defaults = #{<<"buffer_max">> => 20}
65 }.
66
67 -spec supported_features() -> [atom()].
68 supported_features() ->
69
:-(
[dynamic_domains].
70
71
72 %%% Hook handlers
73 -spec c2s_stream_features(Acc, map(), gen_hook:extra()) -> {ok, Acc} when Acc :: [exml:element()].
74 c2s_stream_features(Acc, _, _) ->
75 129 {ok, lists:keystore(<<"csi">>, #xmlel.name, Acc, csi())}.
76
77 -spec bind2_stream_features(Acc, #{c2s_data := mongoose_c2s:data()}, gen_hook:extra()) ->
78 {ok, Acc} when Acc :: [exml:element()].
79 bind2_stream_features(Acc, _, _) ->
80 29 SmFeature = #xmlel{name = <<"feature">>, attrs = [{<<"var">>, ?NS_CSI}]},
81 29 {ok, [SmFeature | Acc]}.
82
83 -spec bind2_enable_features(SaslAcc, mod_sasl2:c2s_state_data(), gen_hook:extra()) ->
84 {ok, SaslAcc} when SaslAcc :: mongoose_acc:t().
85 bind2_enable_features(SaslAcc, Params, _) ->
86 10 #{request := BindRequest} = mod_bind2:get_bind_request(SaslAcc),
87 10 case exml_query:subelement_with_ns(BindRequest, ?NS_CSI) of
88 undefined ->
89 8 {ok, SaslAcc};
90 #xmlel{name = <<"active">>} ->
91 1 SaslAcc1 = handle_active_request(SaslAcc, Params),
92 1 {ok, SaslAcc1};
93 #xmlel{name = <<"inactive">>} ->
94 1 SaslAcc1 = handle_inactive_request(SaslAcc, Params),
95 1 {ok, SaslAcc1}
96 end.
97
98 %% The XEP doesn't require any specific server behaviour in response to CSI stanzas,
99 %% there are only some suggestions. The implementation in MongooseIM will simply buffer
100 %% all packets (up to a configured limit) when the session is "inactive" and will flush
101 %% the buffer when it becomes "active" again.
102 -spec xmpp_presend_element(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
103 mongoose_c2s_hooks:result().
104 xmpp_presend_element(Acc, #{c2s_data := C2SData}, _Extra) ->
105 188 case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
106 {ok, Csi} ->
107 62 handle_presend_element(Acc, Csi);
108 _ ->
109 126 {ok, Acc}
110 end.
111
112 -spec handle_presend_element(mongoose_acc:t(), csi_state()) -> mongoose_c2s_hooks:result().
113 handle_presend_element(Acc, Csi = #csi_state{state = inactive, buffer = Buffer, buffer_max = BMax}) ->
114 39 case length(Buffer) + 1 > BMax of
115 true ->
116 1 NewBuffer = [mark_acc_as_buffered(Acc) | Buffer],
117 1 NewCsi = Csi#csi_state{buffer = []},
118 1 ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(NewBuffer)}],
119 1 {stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
120 _ ->
121 38 buffer_acc(Acc, Csi, is_acc_buffered(Acc))
122 end;
123 handle_presend_element(Acc, _) ->
124 23 {ok, Acc}.
125
126 -spec buffer_acc(mongoose_acc:t(), csi_state(), boolean()) -> mongoose_c2s_hooks:result().
127 buffer_acc(Acc, Csi = #csi_state{buffer = Buffer}, false) ->
128 27 AccToBuffer = mark_acc_as_buffered(Acc),
129 27 NewCsi = Csi#csi_state{buffer = [AccToBuffer | Buffer]},
130 27 {stop, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
131 buffer_acc(Acc, _, true) ->
132 11 {ok, Acc}.
133
134 -spec user_send_xmlel(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
135 mongoose_c2s_hooks:result().
136 user_send_xmlel(Acc, Params, _Extra) ->
137 50 El = mongoose_acc:element(Acc),
138 50 case exml_query:attr(El, <<"xmlns">>) of
139 ?NS_CSI ->
140 17 {stop, handle_csi_request(Acc, Params, El)};
141 _ ->
142 33 {ok, Acc}
143 end.
144
145 %% here we ensure CSI is active and won't buffer anymore, and the current buffer is given to sm
146 -spec handle_user_stopping(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
147 mongoose_c2s_hooks:result().
148 handle_user_stopping(Acc, #{c2s_data := C2SData}, _) ->
149 23 case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
150 {ok, Csi = #csi_state{buffer = Buffer}} ->
151 2 NewCsi = Csi#csi_state{state = active, buffer = []},
152 2 ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(Buffer)}],
153 2 {ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
154 _ ->
155 21 {ok, Acc}
156 end.
157
158 -spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
159 mongoose_c2s_hooks:result().
160 reroute_unacked_messages(Acc, #{c2s_data := C2SData}, _) ->
161 70 case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
162 {ok, Csi = #csi_state{buffer = Buffer}} ->
163 12 mongoose_c2s:reroute_buffer(C2SData, Buffer),
164 12 NewCsi = Csi#csi_state{state = active, buffer = []},
165 12 {ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
166 _ ->
167 58 {ok, Acc}
168 end.
169
170 -spec handle_csi_request(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
171 mongoose_acc:t().
172 handle_csi_request(Acc, Params, #xmlel{name = <<"inactive">>}) ->
173 11 handle_inactive_request(Acc, Params);
174 handle_csi_request(Acc, Params, #xmlel{name = <<"active">>}) ->
175 5 handle_active_request(Acc, Params);
176 handle_csi_request(Acc, _, _) ->
177 1 {From, To, El} = mongoose_acc:packet(Acc),
178 1 Error = jlib:make_error_reply(El, mongoose_xmpp_errors:bad_request()),
179 1 ErrorAcc = mongoose_acc:update_stanza(#{from_jid => From, to_jid => To, element => Error }, Acc),
180 1 mongoose_c2s_acc:to_acc(Acc, route, ErrorAcc).
181
182 -spec handle_inactive_request(mongoose_acc:t(), params()) -> mongoose_acc:t().
183 handle_inactive_request(Acc, #{c2s_data := C2SData} = _Params) ->
184 12 HostType = mongoose_c2s:get_host_type(C2SData),
185 12 mongoose_metrics:update(HostType, modCSIInactive, 1),
186 12 case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
187 {error, not_found} ->
188 11 BMax = gen_mod:get_module_opt(HostType, ?MODULE, buffer_max),
189 11 Csi = #csi_state{state = inactive, buffer_max = BMax},
190 11 mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, Csi});
191 {ok, Csi = #csi_state{}} ->
192 1 NewCsi = Csi#csi_state{state = inactive},
193 1 mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})
194 end.
195
196 -spec handle_active_request(mongoose_acc:t(), params()) -> mongoose_acc:t().
197 handle_active_request(Acc, #{c2s_data := C2SData}) ->
198 6 HostType = mongoose_c2s:get_host_type(C2SData),
199 6 mongoose_metrics:update(HostType, modCSIActive, 1),
200 6 case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
201 {ok, Csi = #csi_state{state = inactive, buffer = Buffer}} ->
202 5 NewCsi = Csi#csi_state{state = active, buffer = []},
203 5 ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(Buffer)}],
204 5 mongoose_c2s_acc:to_acc_many(Acc, ToAcc);
205 _ ->
206 1 Acc
207 end.
208
209 -spec csi() -> exml:element().
210 csi() ->
211 129 #xmlel{name = <<"csi">>, attrs = [{<<"xmlns">>, ?NS_CSI}]}.
212
213 -spec mark_acc_as_buffered(mongoose_acc:t()) -> mongoose_acc:t().
214 mark_acc_as_buffered(Acc) ->
215 28 mongoose_acc:set(?MODULE, buffered, true, Acc).
216
217 -spec is_acc_buffered(mongoose_acc:t()) -> boolean().
218 is_acc_buffered(Acc) ->
219 38 mongoose_acc:get(?MODULE, buffered, false, Acc).
Line Hits Source