./ct_report/coverage/mod_csi.COVER.html

1 -module(mod_csi).
2
3 -xep([{xep, 352}, {version, "1.0.0"}]).
4
5 -include("mongoose_config_spec.hrl").
6 -include("jlib.hrl").
7
8 -behaviour(gen_mod).
9 -behaviour(mongoose_module_metrics).
10
11 %% gen_mod callbacks
12 -export([start/2, stop/1, hooks/1, config_spec/0, supported_features/0, instrumentation/1]).
13
14 %% Hook handlers
15 -export([c2s_stream_features/3,
16 bind2_stream_features/3,
17 bind2_enable_features/3,
18 xmpp_presend_element/3,
19 user_send_xmlel/3,
20 handle_user_stopping/3,
21 reroute_unacked_messages/3
22 ]).
23
24 -record(csi_state, {
25 state = active :: state(),
26 buffer = [] :: [mongoose_acc:t()],
27 buffer_max = 20 :: non_neg_integer()
28 }).
29
30 -type params() :: #{c2s_data := mongoose_c2s:data(), _ := _}.
31 -type state() :: active | inactive.
32 -type csi_state() :: #csi_state{}.
33
34 -spec start(mongooseim:host_type(), gen_mod:module_opts()) -> ok.
35 start(_HostType, _Opts) ->
36
:-(
ok.
37
38 -spec stop(mongooseim:host_type()) -> ok.
39 stop(_HostType) ->
40
:-(
ok.
41
42 hooks(HostType) ->
43
:-(
[
44 {c2s_stream_features, HostType, fun ?MODULE:c2s_stream_features/3, #{}, 60},
45 {bind2_stream_features, HostType, fun ?MODULE:bind2_stream_features/3, #{}, 50},
46 {bind2_enable_features, HostType, fun ?MODULE:bind2_enable_features/3, #{}, 50},
47 {xmpp_presend_element, HostType, fun ?MODULE:xmpp_presend_element/3, #{}, 45}, %% just before stream management!
48 {user_send_xmlel, HostType, fun ?MODULE:user_send_xmlel/3, #{}, 60},
49 {user_stop_request, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95},
50 {user_socket_closed, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95},
51 {user_socket_error, HostType, fun ?MODULE:handle_user_stopping/3, #{}, 95},
52 {reroute_unacked_messages, HostType, fun ?MODULE:reroute_unacked_messages/3, #{}, 70}
53 ].
54
55 -spec instrumentation(mongooseim:host_type()) -> [mongoose_instrument:spec()].
56 instrumentation(HostType) ->
57
:-(
[{mod_csi_active, #{host_type => HostType},
58 #{metrics => #{count => spiral}}},
59 {mod_csi_inactive, #{host_type => HostType},
60 #{metrics => #{count => spiral}}}].
61
62 -spec config_spec() -> mongoose_config_spec:config_section().
63 config_spec() ->
64 8 #section{
65 items = #{<<"buffer_max">> => #option{type = int_or_infinity,
66 validate = non_negative}},
67 defaults = #{<<"buffer_max">> => 20}
68 }.
69
70 -spec supported_features() -> [atom()].
71 supported_features() ->
72
:-(
[dynamic_domains].
73
74
75 %%% Hook handlers
76 -spec c2s_stream_features(Acc, map(), gen_hook:extra()) -> {ok, Acc} when Acc :: [exml:element()].
77 c2s_stream_features(Acc, _, _) ->
78
:-(
{ok, lists:keystore(<<"csi">>, #xmlel.name, Acc, csi())}.
79
80 -spec bind2_stream_features(Acc, #{c2s_data := mongoose_c2s:data()}, gen_hook:extra()) ->
81 {ok, Acc} when Acc :: [exml:element()].
82 bind2_stream_features(Acc, _, _) ->
83
:-(
SmFeature = #xmlel{name = <<"feature">>, attrs = [{<<"var">>, ?NS_CSI}]},
84
:-(
{ok, [SmFeature | Acc]}.
85
86 -spec bind2_enable_features(SaslAcc, mod_sasl2:c2s_state_data(), gen_hook:extra()) ->
87 {ok, SaslAcc} when SaslAcc :: mongoose_acc:t().
88 bind2_enable_features(SaslAcc, Params, _) ->
89
:-(
#{request := BindRequest} = mod_bind2:get_bind_request(SaslAcc),
90
:-(
case exml_query:subelement_with_ns(BindRequest, ?NS_CSI) of
91 undefined ->
92
:-(
{ok, SaslAcc};
93 #xmlel{name = <<"active">>} ->
94
:-(
SaslAcc1 = handle_active_request(SaslAcc, Params),
95
:-(
{ok, SaslAcc1};
96 #xmlel{name = <<"inactive">>} ->
97
:-(
SaslAcc1 = handle_inactive_request(SaslAcc, Params),
98
:-(
{ok, SaslAcc1}
99 end.
100
101 %% The XEP doesn't require any specific server behaviour in response to CSI stanzas,
102 %% there are only some suggestions. The implementation in MongooseIM will simply buffer
103 %% all packets (up to a configured limit) when the session is "inactive" and will flush
104 %% the buffer when it becomes "active" again.
105 -spec xmpp_presend_element(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
106 mongoose_c2s_hooks:result().
107 xmpp_presend_element(Acc, #{c2s_data := C2SData}, _Extra) ->
108
:-(
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
109 {ok, Csi} ->
110
:-(
handle_presend_element(Acc, Csi);
111 _ ->
112
:-(
{ok, Acc}
113 end.
114
115 -spec handle_presend_element(mongoose_acc:t(), csi_state()) -> mongoose_c2s_hooks:result().
116 handle_presend_element(Acc, Csi = #csi_state{state = inactive, buffer = Buffer, buffer_max = BMax}) ->
117
:-(
case length(Buffer) + 1 > BMax of
118 true ->
119
:-(
NewBuffer = [mark_acc_as_buffered(Acc) | Buffer],
120
:-(
NewCsi = Csi#csi_state{buffer = []},
121
:-(
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(NewBuffer)}],
122
:-(
{stop, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
123 _ ->
124
:-(
buffer_acc(Acc, Csi, is_acc_buffered(Acc))
125 end;
126 handle_presend_element(Acc, _) ->
127
:-(
{ok, Acc}.
128
129 -spec buffer_acc(mongoose_acc:t(), csi_state(), boolean()) -> mongoose_c2s_hooks:result().
130 buffer_acc(Acc, Csi = #csi_state{buffer = Buffer}, false) ->
131
:-(
AccToBuffer = mark_acc_as_buffered(Acc),
132
:-(
NewCsi = Csi#csi_state{buffer = [AccToBuffer | Buffer]},
133
:-(
{stop, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
134 buffer_acc(Acc, _, true) ->
135
:-(
{ok, Acc}.
136
137 -spec user_send_xmlel(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
138 mongoose_c2s_hooks:result().
139 user_send_xmlel(Acc, Params, _Extra) ->
140
:-(
El = mongoose_acc:element(Acc),
141
:-(
case exml_query:attr(El, <<"xmlns">>) of
142 ?NS_CSI ->
143
:-(
{stop, handle_csi_request(Acc, Params, El)};
144 _ ->
145
:-(
{ok, Acc}
146 end.
147
148 %% here we ensure CSI is active and won't buffer anymore, and the current buffer is given to sm
149 -spec handle_user_stopping(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
150 mongoose_c2s_hooks:result().
151 handle_user_stopping(Acc, #{c2s_data := C2SData}, _) ->
152
:-(
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
153 {ok, Csi = #csi_state{buffer = Buffer}} ->
154
:-(
NewCsi = Csi#csi_state{state = active, buffer = []},
155
:-(
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(Buffer)}],
156
:-(
{ok, mongoose_c2s_acc:to_acc_many(Acc, ToAcc)};
157 _ ->
158
:-(
{ok, Acc}
159 end.
160
161 -spec reroute_unacked_messages(mongoose_acc:t(), mongoose_c2s_hooks:params(), gen_hook:extra()) ->
162 mongoose_c2s_hooks:result().
163 reroute_unacked_messages(Acc, #{c2s_data := C2SData}, _) ->
164
:-(
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
165 {ok, Csi = #csi_state{buffer = Buffer}} ->
166
:-(
mongoose_c2s:reroute_buffer(C2SData, Buffer),
167
:-(
NewCsi = Csi#csi_state{state = active, buffer = []},
168
:-(
{ok, mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})};
169 _ ->
170
:-(
{ok, Acc}
171 end.
172
173 -spec handle_csi_request(mongoose_acc:t(), mongoose_c2s_hooks:params(), exml:element()) ->
174 mongoose_acc:t().
175 handle_csi_request(Acc, Params, #xmlel{name = <<"inactive">>}) ->
176
:-(
handle_inactive_request(Acc, Params);
177 handle_csi_request(Acc, Params, #xmlel{name = <<"active">>}) ->
178
:-(
handle_active_request(Acc, Params);
179 handle_csi_request(Acc, _, _) ->
180
:-(
{From, To, El} = mongoose_acc:packet(Acc),
181
:-(
Error = jlib:make_error_reply(El, mongoose_xmpp_errors:bad_request()),
182
:-(
ErrorAcc = mongoose_acc:update_stanza(#{from_jid => From, to_jid => To, element => Error }, Acc),
183
:-(
mongoose_c2s_acc:to_acc(Acc, route, ErrorAcc).
184
185 -spec handle_inactive_request(mongoose_acc:t(), params()) -> mongoose_acc:t().
186 handle_inactive_request(Acc, #{c2s_data := C2SData} = _Params) ->
187
:-(
HostType = mongoose_c2s:get_host_type(C2SData),
188
:-(
JID = mongoose_c2s:get_jid(C2SData),
189
:-(
mongoose_instrument:execute(mod_csi_inactive, #{host_type => HostType}, #{count => 1, jid => JID}),
190
:-(
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
191 {error, not_found} ->
192
:-(
BMax = gen_mod:get_module_opt(HostType, ?MODULE, buffer_max),
193
:-(
Csi = #csi_state{state = inactive, buffer_max = BMax},
194
:-(
mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, Csi});
195 {ok, Csi = #csi_state{}} ->
196
:-(
NewCsi = Csi#csi_state{state = inactive},
197
:-(
mongoose_c2s_acc:to_acc(Acc, state_mod, {?MODULE, NewCsi})
198 end.
199
200 -spec handle_active_request(mongoose_acc:t(), params()) -> mongoose_acc:t().
201 handle_active_request(Acc, #{c2s_data := C2SData}) ->
202
:-(
HostType = mongoose_c2s:get_host_type(C2SData),
203
:-(
JID = mongoose_c2s:get_jid(C2SData),
204
:-(
mongoose_instrument:execute(mod_csi_active, #{host_type => HostType}, #{count => 1, jid => JID}),
205
:-(
case mongoose_c2s:get_mod_state(C2SData, ?MODULE) of
206 {ok, Csi = #csi_state{state = inactive, buffer = Buffer}} ->
207
:-(
NewCsi = Csi#csi_state{state = active, buffer = []},
208
:-(
ToAcc = [{state_mod, {?MODULE, NewCsi}}, {flush, lists:reverse(Buffer)}],
209
:-(
mongoose_c2s_acc:to_acc_many(Acc, ToAcc);
210 _ ->
211
:-(
Acc
212 end.
213
214 -spec csi() -> exml:element().
215 csi() ->
216
:-(
#xmlel{name = <<"csi">>, attrs = [{<<"xmlns">>, ?NS_CSI}]}.
217
218 -spec mark_acc_as_buffered(mongoose_acc:t()) -> mongoose_acc:t().
219 mark_acc_as_buffered(Acc) ->
220
:-(
mongoose_acc:set(?MODULE, buffered, true, Acc).
221
222 -spec is_acc_buffered(mongoose_acc:t()) -> boolean().
223 is_acc_buffered(Acc) ->
224
:-(
mongoose_acc:get(?MODULE, buffered, false, Acc).
Line Hits Source