1 |
|
-module(mod_csi). |
2 |
|
|
3 |
|
-xep([{xep, 352}, {version, "1.0.0"}]). |
4 |
|
|
5 |
|
-include("mongoose_config_spec.hrl"). |
6 |
|
-include("jlib.hrl"). |
7 |
|
|
8 |
|
-behaviour(gen_mod). |
9 |
|
-behaviour(mongoose_module_metrics). |
10 |
|
|
11 |
|
%% gen_mod callbacks |
12 |
|
-export([start/2, stop/1, hooks/1, config_spec/0, supported_features/0]). |
13 |
|
|
14 |
|
%% Hook handlers |
15 |
|
-export([c2s_stream_features/3, |
16 |
|
bind2_stream_features/3, |
17 |
|
bind2_enable_features/3, |
18 |
|
xmpp_presend_element/3, |
19 |
|
user_send_xmlel/3, |
20 |
|
handle_user_stopping/3, |
21 |
|
reroute_unacked_messages/3 |
22 |
|
]). |
23 |
|
|
24 |
|
-record(csi_state, { |
25 |
|
state = active :: state(), |
26 |
|
buffer = [] :: [mongoose_acc:t()], |
27 |
|
buffer_max = 20 :: non_neg_integer() |
28 |
|
}). |
29 |
|
|
30 |
|
-type params() :: #{c2s_data := mongoose_c2s:data(), _ := _}. |
31 |
|
-type state() :: active | inactive. |
32 |
|
-type csi_state() :: #csi_state{}. |
33 |
|
|
34 |
|
-spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok. |
35 |
|
start(HostType, _Opts) -> |
36 |
3 |
ensure_metrics(HostType). |
37 |
|
|
38 |
|
-spec stop(mongooseim:host_type()) -> ok. |
39 |
|
stop(_HostType) -> |
40 |
3 |
ok. |
41 |
|
|
42 |
|
hooks(HostType) -> |
43 |
6 |
[ |
44 |
|
{c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 60}, |
45 |
|
{bind2_stream_features, HostType, fun ?MODULE:bind2_stream_features/3, #{}, 50}, |
46 |
|
{bind2_enable_features, HostType, fun ?MODULE:bind2_enable_features/3, #{}, 50}, |
47 |
|
{xmpp_presend_element, HostType, fun ?MODULE:xmpp_presend_element/3, #{}, 45}, %% just before stream management! |
48 |
|
{user_send_xmlel, HostType, fun ?MODULE:user_send_xmlel/3, #{}, 60}, |
49 |
|
{user_stop_request, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95}, |
50 |
|
{user_socket_closed, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95}, |
51 |
|
{user_socket_error, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95}, |
52 |
|
{reroute_unacked_messages, HostType, fun ?MODULE:reroute_unacked_messages/3, #{}, 70} |
53 |
|
]. |
54 |
|
|
55 |
|
ensure_metrics(HostType) -> |
56 |
3 |
mongoose_metrics:ensure_metric(HostType, [modCSIInactive], spiral), |
57 |
3 |
mongoose_metrics:ensure_metric(HostType, [modCSIActive], spiral). |
58 |
|
|
59 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
60 |
|
config_spec() -> |
61 |
202 |
#section{ |
62 |
|
items = #{<<"buffer_max">> => #option{type = int_or_infinity, |
63 |
|
validate = non_negative}}, |
64 |
|
defaults = #{<<"buffer_max">> => 20} |
65 |
|
}. |
66 |
|
|
67 |
|
-spec supported_features() -> [atom()]. |
68 |
|
supported_features() -> |
69 |
:-( |
[dynamic_domains]. |
70 |
|
|
71 |
|
|
72 |
|
%%% Hook handlers |
73 |
|
-spec c2s_stream_features(Acc, map(), gen_hook:extra()) -> {ok, Acc} when Acc :: [exml:element()]. |
74 |
|
c2s_stream_features(Acc, _, _) -> |
75 |
124 |
{ok, lists:keystore(<<"csi">>, #xmlel.name, Acc, csi())}. |
76 |
|
|
77 |
|
-spec bind2_stream_features(Acc, #{c2s_data := mongoose_c2s:data()}, gen_hook:extra()) -> |
78 |
|
{ok, Acc} when Acc :: [exml:element()]. |
79 |
|
bind2_stream_features(Acc, _, _) -> |
80 |
25 |
SmFeature = #xmlel{name = <<"feature">>, attrs = [{<<"var">>, ?NS_CSI}]}, |
81 |
25 |
{ok, [SmFeature | Acc]}. |
82 |
|
|
83 |
|
-spec bind2_enable_features(SaslAcc, mod_sasl2:c2s_state_data(), gen_hook:extra()) -> |
84 |
|
{ok, SaslAcc} when SaslAcc :: mongoose_acc:t(). |
85 |
|
bind2_enable_features(SaslAcc, Params, _) -> |
86 |
10 |
#{request := BindRequest} = mod_bind2:get_bind_request(SaslAcc), |
87 |
10 |
case exml_query:subelement_with_ns(BindRequest, ?NS_CSI) of |
88 |
|
undefined -> |
89 |
8 |
{ok, SaslAcc}; |
90 |
|
#xmlel{name = <<"active">>} -> |
91 |
1 |
SaslAcc1 = handle_active_request(SaslAcc, Params), |
92 |
1 |
{ok, SaslAcc1}; |
93 |
|
#xmlel{name = <<"inactive">>} -> |
94 |
1 |
SaslAcc1 = handle_inactive_request(SaslAcc, Params), |
95 |
1 |
{ok, SaslAcc1} |
96 |
|
end. |
97 |
|
|
98 |
|
%% The XEP doesn't require any specific server behaviour in response to CSI stanzas, |
99 |
|
%% there are only some suggestions. The implementation in MongooseIM will simply buffer |
100 |
|
%% all packets (up to a configured limit) when the session is "inactive" and will flush |
101 |
|
%% the buffer when it becomes "active" again. |
102 |
|
-spec xmpp_presend_element(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
103 |
|
mongoose_c2s_hooks:result(). |
104 |
|
xmpp_presend_element(Acc, #{c2s_data := C2SData}, _Extra) -> |
105 |
186 |
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of |
106 |
|
{ok, Csi} -> |
107 |
64 |
handle_presend_element(Acc, Csi); |
108 |
|
_ -> |
109 |
122 |
{ok, Acc} |
110 |
|
end. |
111 |
|
|
112 |
|
-spec handle_presend_element(mongoose_acc:t(), csi_state()) -> mongoose_c2s_hooks:result(). |
113 |
|
handle_presend_element(Acc, Csi = #csi_state{state = inactive, buffer = Buffer, buffer_max = BMax}) -> |
114 |
41 |
case length(Buffer) + 1 > BMax of |
115 |
|
true -> |
116 |
1 |
NewBuffer = [mark_acc_as_buffered(Acc) | Buffer], |
117 |
1 |
NewCsi = Csi#csi_state{buffer = []}, |
118 |
1 |
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(NewBuffer)}], |
119 |
1 |
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}; |
120 |
|
_ -> |
121 |
40 |
buffer_acc(Acc, Csi, is_acc_buffered(Acc)) |
122 |
|
end; |
123 |
|
handle_presend_element(Acc, _) -> |
124 |
23 |
{ok, Acc}. |
125 |
|
|
126 |
|
-spec buffer_acc(mongoose_acc:t(), csi_state(), boolean()) -> mongoose_c2s_hooks:result(). |
127 |
|
buffer_acc(Acc, Csi = #csi_state{buffer = Buffer}, false) -> |
128 |
29 |
AccToBuffer = mark_acc_as_buffered(Acc), |
129 |
29 |
NewCsi = Csi#csi_state{buffer = [AccToBuffer | Buffer]}, |
130 |
29 |
{stop, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})}; |
131 |
|
buffer_acc(Acc, _, true) -> |
132 |
11 |
{ok, Acc}. |
133 |
|
|
134 |
|
-spec user_send_xmlel(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
135 |
|
mongoose_c2s_hooks:result(). |
136 |
|
user_send_xmlel(Acc, Params, _Extra) -> |
137 |
50 |
El = mongoose_acc:element(Acc), |
138 |
50 |
case exml_query:attr(El, <<"xmlns">>) of |
139 |
|
?NS_CSI -> |
140 |
17 |
{stop, handle_csi_request(Acc, Params, El)}; |
141 |
|
_ -> |
142 |
33 |
{ok, Acc} |
143 |
|
end. |
144 |
|
|
145 |
|
%% here we ensure CSI is active and won't buffer anymore, and the current buffer is given to sm |
146 |
|
-spec handle_user_stopping(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
147 |
|
mongoose_c2s_hooks:result(). |
148 |
|
handle_user_stopping(Acc, #{c2s_data := C2SData}, _) -> |
149 |
23 |
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of |
150 |
|
{ok, Csi = #csi_state{buffer = Buffer}} -> |
151 |
2 |
NewCsi = Csi#csi_state{state = active, buffer = []}, |
152 |
2 |
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(Buffer)}], |
153 |
2 |
{ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)}; |
154 |
|
_ -> |
155 |
21 |
{ok, Acc} |
156 |
|
end. |
157 |
|
|
158 |
|
-spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) -> |
159 |
|
mongoose_c2s_hooks:result(). |
160 |
|
reroute_unacked_messages(Acc, #{c2s_data := C2SData}, _) -> |
161 |
66 |
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of |
162 |
|
{ok, Csi = #csi_state{buffer = Buffer}} -> |
163 |
12 |
mongoose_c2s:reroute_buffer(C2SData, Buffer), |
164 |
12 |
NewCsi = Csi#csi_state{state = active, buffer = []}, |
165 |
12 |
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})}; |
166 |
|
_ -> |
167 |
54 |
{ok, Acc} |
168 |
|
end. |
169 |
|
|
170 |
|
-spec handle_csi_request(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) -> |
171 |
|
mongoose_acc:t(). |
172 |
|
handle_csi_request(Acc, Params, #xmlel{name = <<"inactive">>}) -> |
173 |
11 |
handle_inactive_request(Acc, Params); |
174 |
|
handle_csi_request(Acc, Params, #xmlel{name = <<"active">>}) -> |
175 |
5 |
handle_active_request(Acc, Params); |
176 |
|
handle_csi_request(Acc, _, _) -> |
177 |
1 |
{From, To, El} = mongoose_acc:packet(Acc), |
178 |
1 |
Error = jlib:make_error_reply(El, mongoose_xmpp_errors:bad_request()), |
179 |
1 |
ErrorAcc = mongoose_acc:update_stanza(#{from_jid => From, to_jid => To, element => Error }, Acc), |
180 |
1 |
mongoose_c2s_acc:to_acc(Acc, route, ErrorAcc). |
181 |
|
|
182 |
|
-spec handle_inactive_request(mongoose_acc:t(), params()) -> mongoose_acc:t(). |
183 |
|
handle_inactive_request(Acc, #{c2s_data := C2SData} = _Params) -> |
184 |
12 |
HostType = mongoose_c2s:get_host_type(C2SData), |
185 |
12 |
mongoose_metrics:update(HostType, modCSIInactive, 1), |
186 |
12 |
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of |
187 |
|
{error, not_found} -> |
188 |
11 |
BMax = gen_mod:get_module_opt(HostType, ?MODULE, buffer_max), |
189 |
11 |
Csi = #csi_state{state = inactive, buffer_max = BMax}, |
190 |
11 |
mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, Csi}); |
191 |
|
{ok, Csi = #csi_state{}} -> |
192 |
1 |
NewCsi = Csi#csi_state{state = inactive}, |
193 |
1 |
mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi}) |
194 |
|
end. |
195 |
|
|
196 |
|
-spec handle_active_request(mongoose_acc:t(), params()) -> mongoose_acc:t(). |
197 |
|
handle_active_request(Acc, #{c2s_data := C2SData}) -> |
198 |
6 |
HostType = mongoose_c2s:get_host_type(C2SData), |
199 |
6 |
mongoose_metrics:update(HostType, modCSIActive, 1), |
200 |
6 |
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of |
201 |
|
{ok, Csi = #csi_state{state = inactive, buffer = Buffer}} -> |
202 |
5 |
NewCsi = Csi#csi_state{state = active, buffer = []}, |
203 |
5 |
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(Buffer)}], |
204 |
5 |
mongoose_c2s_acc:to_acc_many(Acc, ToAcc); |
205 |
|
_ -> |
206 |
1 |
Acc |
207 |
|
end. |
208 |
|
|
209 |
|
-spec csi() -> exml:element(). |
210 |
|
csi() -> |
211 |
124 |
#xmlel{name = <<"csi">>, attrs = [{<<"xmlns">>, ?NS_CSI}]}. |
212 |
|
|
213 |
|
-spec mark_acc_as_buffered(mongoose_acc:t()) -> mongoose_acc:t(). |
214 |
|
mark_acc_as_buffered(Acc) -> |
215 |
30 |
mongoose_acc:set(?MODULE, buffered, true, Acc). |
216 |
|
|
217 |
|
-spec is_acc_buffered(mongoose_acc:t()) -> boolean(). |
218 |
|
is_acc_buffered(Acc) -> |
219 |
40 |
mongoose_acc:get(?MODULE, buffered, false, Acc). |