1 |
|
%%%=================================================================== |
2 |
|
%%% @copyright (C) 2016, Erlang Solutions Ltd. |
3 |
|
%%% @doc Module providing support for websockets in MongooseIM |
4 |
|
%%% @end |
5 |
|
%%%=================================================================== |
6 |
|
-module(mod_websockets). |
7 |
|
|
8 |
|
-behaviour(mongoose_http_handler). |
9 |
|
-behaviour(cowboy_websocket). |
10 |
|
-behaviour(mongoose_c2s_socket). |
11 |
|
|
12 |
|
%% mongoose_http_handler callbacks |
13 |
|
-export([config_spec/0]). |
14 |
|
|
15 |
|
%% cowboy_http_websocket_handler callbacks |
16 |
|
-export([init/2, |
17 |
|
websocket_init/1, |
18 |
|
websocket_handle/2, |
19 |
|
websocket_info/2, |
20 |
|
terminate/3]). |
21 |
|
|
22 |
|
%% mongoose_c2s_socket callbacks |
23 |
|
-export([socket_new/2, |
24 |
|
socket_peername/1, |
25 |
|
tcp_to_tls/2, |
26 |
|
socket_handle_data/2, |
27 |
|
socket_activate/1, |
28 |
|
socket_close/1, |
29 |
|
socket_send_xml/2, |
30 |
|
get_peer_certificate/2, |
31 |
|
has_peer_cert/2, |
32 |
|
is_channel_binding_supported/1, |
33 |
|
get_tls_last_message/1, |
34 |
|
is_ssl/1]). |
35 |
|
|
36 |
|
-include("mongoose.hrl"). |
37 |
|
-include("mongoose_config_spec.hrl"). |
38 |
|
-include("jlib.hrl"). |
39 |
|
-include_lib("exml/include/exml_stream.hrl"). |
40 |
|
|
41 |
|
-define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>). |
42 |
|
-define(NS_COMPONENT, <<"jabber:component:accept">>). |
43 |
|
|
44 |
|
-record(websocket, { |
45 |
|
pid :: pid(), |
46 |
|
peername :: mongoose_transport:peer(), |
47 |
|
peercert :: undefined | binary() |
48 |
|
}). |
49 |
|
-record(ws_state, { |
50 |
|
peer :: mongoose_transport:peer() | undefined, |
51 |
|
fsm_pid :: pid() | undefined, |
52 |
|
parser :: exml_stream:parser() | undefined, |
53 |
|
opts :: map(), |
54 |
|
ping_rate :: integer() | none, |
55 |
|
max_stanza_size :: integer() | infinity, |
56 |
|
peercert :: undefined | passed | binary() |
57 |
|
%% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket() |
58 |
|
}). |
59 |
|
|
60 |
|
-type socket() :: #websocket{}. |
61 |
|
|
62 |
|
%% mongoose_http_handler callbacks |
63 |
|
|
64 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
65 |
|
config_spec() -> |
66 |
104 |
#section{items = #{<<"timeout">> => #option{type = int_or_infinity, |
67 |
|
validate = non_negative}, |
68 |
|
<<"ping_rate">> => #option{type = integer, |
69 |
|
validate = positive}, |
70 |
|
<<"max_stanza_size">> => #option{type = int_or_infinity, |
71 |
|
validate = positive}, |
72 |
|
<<"c2s_state_timeout">> => #option{type = int_or_infinity, |
73 |
|
validate = non_negative}, |
74 |
|
<<"backwards_compatible_session">> => #option{type = boolean}}, |
75 |
|
defaults = #{<<"timeout">> => 60000, |
76 |
|
<<"max_stanza_size">> => infinity, |
77 |
|
<<"c2s_state_timeout">> => 5000, |
78 |
|
<<"backwards_compatible_session">> => true} |
79 |
|
}. |
80 |
|
|
81 |
|
%%-------------------------------------------------------------------- |
82 |
|
%% Common callbacks for all cowboy behaviours |
83 |
|
%%-------------------------------------------------------------------- |
84 |
|
|
85 |
|
init(Req, Opts = #{timeout := Timeout}) -> |
86 |
118 |
Peer = cowboy_req:peer(Req), |
87 |
118 |
PeerCert = cowboy_req:cert(Req), |
88 |
118 |
Req1 = add_sec_websocket_protocol_header(Req), |
89 |
118 |
?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>, |
90 |
118 |
req => Req, opts => Opts}), |
91 |
118 |
AllModOpts = Opts#{peer => Peer, peer_cert => PeerCert}, |
92 |
|
%% upgrade protocol |
93 |
118 |
{cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}. |
94 |
|
|
95 |
|
terminate(_Reason, _Req, #ws_state{fsm_pid = undefined} = State) -> |
96 |
:-( |
ok; |
97 |
|
terminate(Reason, _Req, #ws_state{fsm_pid = FSM} = State) when Reason =:= normal; |
98 |
|
Reason =:= stop; |
99 |
|
Reason =:= timeout; |
100 |
|
Reason =:= remote -> |
101 |
40 |
FSM ! {websockets_closed, undefined}, |
102 |
40 |
ok; |
103 |
|
terminate({remote, _, _}, _Req, #ws_state{fsm_pid = FSM} = State) -> |
104 |
55 |
FSM ! {websockets_closed, undefined}, |
105 |
55 |
ok; |
106 |
|
terminate(Reason, _Req, #ws_state{fsm_pid = FSM} = State) -> |
107 |
23 |
FSM ! {websockets_error, undefined}, |
108 |
23 |
ok. |
109 |
|
|
110 |
|
%%-------------------------------------------------------------------- |
111 |
|
%% cowboy_http_websocket_handler callbacks |
112 |
|
%%-------------------------------------------------------------------- |
113 |
|
|
114 |
|
% Called for every new websocket connection. |
115 |
|
websocket_init(Opts = #{peer := Peer, peer_cert := PeerCert, max_stanza_size := MaxStanzaSize}) -> |
116 |
118 |
PingRate = maps:get(ping_rate, Opts, none), |
117 |
118 |
maybe_send_ping_request(PingRate), |
118 |
118 |
?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>, |
119 |
118 |
peer => Peer, opts => Opts}), |
120 |
118 |
State = #ws_state{opts = Opts, |
121 |
|
ping_rate = PingRate, |
122 |
|
max_stanza_size = MaxStanzaSize, |
123 |
|
peer = Peer, |
124 |
|
peercert = PeerCert}, |
125 |
118 |
{ok, State}. |
126 |
|
|
127 |
|
% Called when a text message arrives. |
128 |
|
websocket_handle({text, Msg}, State) -> |
129 |
859 |
?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}), |
130 |
859 |
handle_text(Msg, State); |
131 |
|
|
132 |
|
websocket_handle({pong, Payload}, State) -> |
133 |
:-( |
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>, |
134 |
:-( |
msg => Payload, peer => State#ws_state.peer}), |
135 |
:-( |
{ok, State}; |
136 |
|
|
137 |
|
% With this callback we can handle other kind of |
138 |
|
% messages, like binary. |
139 |
|
websocket_handle(Any, State) -> |
140 |
:-( |
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>, |
141 |
:-( |
msg => Any, peer => State#ws_state.peer}), |
142 |
:-( |
{ok, State}. |
143 |
|
|
144 |
|
% Other messages from the system are handled here. |
145 |
|
websocket_info({send_xml, XML}, State) -> |
146 |
1259 |
XML1 = process_server_stream_root(replace_stream_ns(XML)), |
147 |
1259 |
Text = exml:to_iolist(XML1), |
148 |
1259 |
mongoose_metrics:update(global, [data, xmpp, sent, c2s, websocket], iolist_size(Text)), |
149 |
1259 |
?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>, |
150 |
1259 |
packet => Text, peer => State#ws_state.peer}), |
151 |
1259 |
{reply, {text, Text}, State}; |
152 |
|
websocket_info(do_ping, State) -> |
153 |
|
%% send ping frame to the client |
154 |
:-( |
send_ping_request(State#ws_state.ping_rate), |
155 |
:-( |
{reply, ping, State}; |
156 |
|
websocket_info(stop, #ws_state{parser = undefined} = State) -> |
157 |
:-( |
{stop, State}; |
158 |
|
websocket_info(stop, #ws_state{parser = Parser} = State) -> |
159 |
40 |
exml_stream:free_parser(Parser), |
160 |
40 |
{stop, State}; |
161 |
|
websocket_info(Info, State) -> |
162 |
:-( |
?LOG_DEBUG(#{what => unexpected_message, msg => Info}), |
163 |
:-( |
{ok, State}. |
164 |
|
|
165 |
|
%%-------------------------------------------------------------------- |
166 |
|
%% Callbacks implementation |
167 |
|
%%-------------------------------------------------------------------- |
168 |
|
|
169 |
|
handle_text(Text, #ws_state{ parser = undefined } = State) -> |
170 |
118 |
ParserOpts = get_parser_opts(Text, State), |
171 |
118 |
{ok, Parser} = exml_stream:new_parser(ParserOpts), |
172 |
118 |
handle_text(Text, State#ws_state{ parser = Parser }); |
173 |
|
handle_text(Text, #ws_state{parser = Parser} = State) -> |
174 |
859 |
case exml_stream:parse(Parser, Text) of |
175 |
|
{ok, NewParser, Elements} -> |
176 |
857 |
mongoose_metrics:update(global, [data, xmpp, received, c2s, websocket], byte_size(Text)), |
177 |
857 |
State1 = State#ws_state{ parser = NewParser }, |
178 |
857 |
case maybe_start_fsm(Elements, State1) of |
179 |
|
{ok, State2} -> |
180 |
857 |
process_client_elements(Elements, State2); |
181 |
|
{stop, _} = Shutdown -> |
182 |
:-( |
Shutdown |
183 |
|
end; |
184 |
|
{error, Reason} -> |
185 |
2 |
process_parse_error(Reason, State) |
186 |
|
end. |
187 |
|
|
188 |
|
process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) -> |
189 |
857 |
Elements1 = process_client_stream_start(Elements), |
190 |
857 |
[send_to_fsm(FSM, process_client_stream_end(replace_stream_ns(Elem))) || Elem <- Elements1], |
191 |
857 |
{ok, State}. |
192 |
|
|
193 |
|
process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) -> |
194 |
:-( |
{stop, State}; |
195 |
|
process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) -> |
196 |
2 |
send_to_fsm(FSM, #xmlstreamerror{name = iolist_to_binary(Reason)}), |
197 |
2 |
{ok, State}. |
198 |
|
|
199 |
|
send_to_fsm(FSM, Element) -> |
200 |
859 |
FSM ! {tcp, undefined, Element}, |
201 |
859 |
ok. |
202 |
|
|
203 |
|
maybe_start_fsm([#xmlel{ name = <<"open">> }], |
204 |
|
#ws_state{fsm_pid = undefined, |
205 |
|
opts = #{ip_tuple := IPTuple, port := Port, |
206 |
|
c2s_state_timeout := StateTimeout, |
207 |
|
backwards_compatible_session := BackwardsCompatible}} = State) -> |
208 |
118 |
Opts = #{ |
209 |
|
access => all, |
210 |
|
shaper => none, |
211 |
|
max_stanza_size => 0, |
212 |
|
xml_socket => true, |
213 |
|
hibernate_after => 0, |
214 |
|
c2s_state_timeout => StateTimeout, |
215 |
|
backwards_compatible_session => BackwardsCompatible, |
216 |
|
port => Port, ip_tuple => IPTuple, proto => tcp}, |
217 |
118 |
do_start_fsm(Opts, State); |
218 |
|
maybe_start_fsm(_Els, #ws_state{fsm_pid = undefined} = State) -> |
219 |
:-( |
{stop, State}; |
220 |
|
maybe_start_fsm(_Els, State) -> |
221 |
739 |
{ok, State}. |
222 |
|
|
223 |
|
do_start_fsm(Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) -> |
224 |
118 |
SocketData = #websocket{pid = self(), |
225 |
|
peername = Peer, |
226 |
|
peercert = PeerCert}, |
227 |
118 |
case call_fsm_start(SocketData, Opts) of |
228 |
|
{ok, Pid} -> |
229 |
118 |
?LOG_DEBUG(#{what => ws_c2s_started, c2s_pid => Pid, |
230 |
|
text => <<"WebSockets starts c2s process">>, |
231 |
118 |
peer => State#ws_state.peer}), |
232 |
118 |
NewState = State#ws_state{fsm_pid = Pid, peercert = passed}, |
233 |
118 |
{ok, NewState}; |
234 |
|
{error, Reason} -> |
235 |
:-( |
?LOG_WARNING(#{what => ws_c2s_start_failed, reason => Reason, |
236 |
|
text => <<"WebSockets fails to start c2s process">>, |
237 |
:-( |
peer => State#ws_state.peer}), |
238 |
:-( |
{stop, State#ws_state{peercert = passed}} |
239 |
|
end. |
240 |
|
|
241 |
|
call_fsm_start(SocketData, #{hibernate_after := HibernateAfterTimeout} = Opts) -> |
242 |
118 |
mongoose_c2s:start({?MODULE, SocketData, Opts}, |
243 |
|
[{hibernate_after, HibernateAfterTimeout}]). |
244 |
|
|
245 |
|
%%-------------------------------------------------------------------- |
246 |
|
%% Helpers for handling |
247 |
|
%% https://datatracker.ietf.org/doc/rfc7395/ |
248 |
|
%%-------------------------------------------------------------------- |
249 |
|
|
250 |
|
process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }]) -> |
251 |
234 |
Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}), |
252 |
234 |
Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1], |
253 |
234 |
NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 }, |
254 |
234 |
[NewStart]; |
255 |
|
process_client_stream_start(Elements) -> |
256 |
623 |
Elements. |
257 |
|
|
258 |
|
process_client_stream_end(#xmlel{ name = <<"close">> }) -> |
259 |
72 |
#xmlstreamend{ name = <<"stream:stream">> }; |
260 |
|
process_client_stream_end(Element) -> |
261 |
785 |
Element. |
262 |
|
|
263 |
|
process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs }) -> |
264 |
234 |
Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs), |
265 |
234 |
Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}), |
266 |
234 |
#xmlel{ name = <<"open">>, attrs = Attrs2 }; |
267 |
|
process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> }) -> |
268 |
81 |
#xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] }; |
269 |
|
process_server_stream_root(Element) -> |
270 |
944 |
Element. |
271 |
|
|
272 |
|
replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element) -> |
273 |
242 |
Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] }; |
274 |
|
replace_stream_ns(Element) -> |
275 |
1874 |
case should_have_jabber_client(Element) of |
276 |
|
true -> |
277 |
824 |
JabberClient = {<<"xmlns">>, <<"jabber:client">>}, |
278 |
824 |
NewAtrrs = lists:keystore(<<"xmlns">>, 1, |
279 |
|
Element#xmlel.attrs, JabberClient), |
280 |
824 |
Element#xmlel{attrs = NewAtrrs}; |
281 |
|
false -> |
282 |
1050 |
Element |
283 |
|
end. |
284 |
|
|
285 |
|
get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) -> |
286 |
108 |
[{max_element_size, 0} | get_parser_opts(Text)]; |
287 |
|
get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) -> |
288 |
10 |
[{max_element_size, MaxStanzaSize} | get_parser_opts(Text)]. |
289 |
|
|
290 |
|
get_parser_opts(<<"<open", _/binary>>) -> |
291 |
118 |
[{infinite_stream, true}]; % new-type WS |
292 |
|
get_parser_opts(_) -> |
293 |
:-( |
[]. % old-type WS |
294 |
|
|
295 |
|
%%-------------------------------------------------------------------- |
296 |
|
%% Helpers |
297 |
|
%%-------------------------------------------------------------------- |
298 |
453 |
should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true; |
299 |
162 |
should_have_jabber_client(#xmlel{name = <<"message">>}) -> true; |
300 |
209 |
should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true; |
301 |
1050 |
should_have_jabber_client(_) -> false. |
302 |
|
|
303 |
|
send_ping_request(PingRate) -> |
304 |
:-( |
Dest = self(), |
305 |
:-( |
?LOG_DEBUG(#{what => ws_schedule_ping, |
306 |
|
text => <<"Sending websocket ping request">>, |
307 |
:-( |
ping_rate => PingRate}), |
308 |
:-( |
erlang:send_after(PingRate, Dest, do_ping). |
309 |
|
|
310 |
|
maybe_send_ping_request(none) -> |
311 |
118 |
ok; |
312 |
|
maybe_send_ping_request(PingRate) -> |
313 |
:-( |
send_ping_request(PingRate). |
314 |
|
|
315 |
|
|
316 |
|
%%-------------------------------------------------------------------- |
317 |
|
%% add_sec_websocket_protocol_header |
318 |
|
%%-------------------------------------------------------------------- |
319 |
|
|
320 |
|
add_sec_websocket_protocol_header(Req) -> |
321 |
118 |
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of |
322 |
|
undefined -> |
323 |
|
%% Server should not set "sec-websocket-protocol" response header |
324 |
|
%% if it does not present in the request |
325 |
:-( |
Req; |
326 |
|
Protocols -> |
327 |
118 |
case case_insensitive_match(<<"xmpp">>, Protocols) of |
328 |
|
{matched, MatchedProtocol} -> |
329 |
118 |
cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req); |
330 |
|
nomatch -> |
331 |
|
%% Do not agree with client, do not add a response header |
332 |
:-( |
?LOG_DEBUG(#{what => ws_unknown_protocol, |
333 |
|
text => <<"Header sec-websocket-protocol does not contain xmpp option">>, |
334 |
:-( |
protocols => Protocols}), |
335 |
:-( |
Req |
336 |
|
end |
337 |
|
end. |
338 |
|
|
339 |
|
case_insensitive_match(LowerPattern, [Case | Cases]) -> |
340 |
118 |
LowerCase = jid:str_tolower(Case), |
341 |
118 |
case LowerCase of |
342 |
|
LowerPattern -> |
343 |
118 |
{matched, Case}; |
344 |
|
_ -> |
345 |
:-( |
case_insensitive_match(LowerPattern, Cases) |
346 |
|
end; |
347 |
|
case_insensitive_match(_, []) -> |
348 |
:-( |
nomatch. |
349 |
|
|
350 |
|
%% mongoose_c2s_socket callbacks |
351 |
|
|
352 |
|
-spec socket_new(socket(), mongoose_listener:options()) -> socket(). |
353 |
|
socket_new(Socket, _LOpts) -> |
354 |
118 |
Socket. |
355 |
|
|
356 |
|
-spec socket_peername(socket()) -> {inet:ip_address(), inet:port_number()}. |
357 |
|
socket_peername(#websocket{peername = PeerName}) -> |
358 |
227 |
PeerName. |
359 |
|
|
360 |
|
-spec tcp_to_tls(socket(), mongoose_listener:options()) -> |
361 |
|
{ok, socket()} | {error, term()}. |
362 |
|
tcp_to_tls(_Socket, _LOpts) -> |
363 |
:-( |
{error, tls_not_allowed_on_websockets}. |
364 |
|
|
365 |
|
-spec socket_handle_data(socket(), {tcp | ssl, term(), term()}) -> |
366 |
|
iodata() | {raw, [exml:element()]} | {error, term()}. |
367 |
|
socket_handle_data(_Socket, {_Kind, _Term, Packet}) -> |
368 |
859 |
{raw, [Packet]}. |
369 |
|
|
370 |
|
-spec socket_activate(socket()) -> ok. |
371 |
|
socket_activate(_Socket) -> |
372 |
977 |
ok. |
373 |
|
|
374 |
|
-spec socket_close(socket()) -> ok. |
375 |
|
socket_close(#websocket{pid = Pid}) -> |
376 |
118 |
Pid ! stop, |
377 |
118 |
ok. |
378 |
|
|
379 |
|
-spec socket_send_xml(socket(), iodata() | exml:element() | [exml:element()]) -> |
380 |
|
ok | {error, term()}. |
381 |
|
socket_send_xml(#websocket{pid = Pid}, XMLs) when is_list(XMLs) -> |
382 |
1368 |
[Pid ! {send_xml, XML} || XML <- XMLs], |
383 |
1368 |
ok; |
384 |
|
socket_send_xml(#websocket{pid = Pid}, XML) -> |
385 |
:-( |
Pid ! {send_xml, XML}, |
386 |
:-( |
ok. |
387 |
|
|
388 |
|
-spec has_peer_cert(socket(), mongoose_listener:options()) -> boolean(). |
389 |
|
has_peer_cert(Socket, LOpts) -> |
390 |
6 |
get_peer_certificate(Socket, LOpts) /= no_peer_cert. |
391 |
|
|
392 |
|
|
393 |
|
-spec get_peer_certificate(socket(), mongoose_listener:options()) -> |
394 |
|
mongoose_transport:peercert_return(). |
395 |
|
get_peer_certificate(#websocket{peercert = undefined}, _) -> |
396 |
:-( |
no_peer_cert; |
397 |
|
get_peer_certificate(#websocket{peercert = PeerCert}, _) -> |
398 |
9 |
Decoded = public_key:pkix_decode_cert(PeerCert, plain), |
399 |
9 |
{ok, Decoded}. |
400 |
|
|
401 |
|
-spec is_channel_binding_supported(socket()) -> boolean(). |
402 |
|
is_channel_binding_supported(_Socket) -> |
403 |
228 |
false. |
404 |
|
|
405 |
|
-spec get_tls_last_message(socket()) -> {ok, binary()} | {error, term()}. |
406 |
|
get_tls_last_message(_Socket) -> |
407 |
:-( |
{error, tls_not_allowed_on_websockets}. |
408 |
|
|
409 |
|
-spec is_ssl(socket()) -> boolean(). |
410 |
|
is_ssl(_Socket) -> |
411 |
343 |
false. |