1 |
|
%%%=================================================================== |
2 |
|
%%% @copyright (C) 2016, Erlang Solutions Ltd. |
3 |
|
%%% @doc Module providing support for websockets in MongooseIM |
4 |
|
%%% @end |
5 |
|
%%%=================================================================== |
6 |
|
-module(mod_websockets). |
7 |
|
|
8 |
|
-behaviour(mongoose_http_handler). |
9 |
|
-behaviour(cowboy_websocket). |
10 |
|
-behaviour(mongoose_c2s_socket). |
11 |
|
|
12 |
|
%% mongoose_http_handler callbacks |
13 |
|
-export([config_spec/0]). |
14 |
|
|
15 |
|
%% cowboy_http_websocket_handler callbacks |
16 |
|
-export([init/2, |
17 |
|
websocket_init/1, |
18 |
|
websocket_handle/2, |
19 |
|
websocket_info/2, |
20 |
|
terminate/3]). |
21 |
|
|
22 |
|
%% mongoose_c2s_socket callbacks |
23 |
|
-export([socket_new/2, |
24 |
|
socket_peername/1, |
25 |
|
tcp_to_tls/2, |
26 |
|
socket_handle_data/2, |
27 |
|
socket_activate/1, |
28 |
|
socket_close/1, |
29 |
|
socket_send_xml/2, |
30 |
|
get_peer_certificate/2, |
31 |
|
has_peer_cert/2, |
32 |
|
is_channel_binding_supported/1, |
33 |
|
get_tls_last_message/1, |
34 |
|
is_ssl/1]). |
35 |
|
|
36 |
|
-include("mongoose.hrl"). |
37 |
|
-include("mongoose_config_spec.hrl"). |
38 |
|
-include("jlib.hrl"). |
39 |
|
-include_lib("exml/include/exml_stream.hrl"). |
40 |
|
|
41 |
|
-define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>). |
42 |
|
-define(NS_COMPONENT, <<"jabber:component:accept">>). |
43 |
|
|
44 |
|
-record(websocket, { |
45 |
|
pid :: pid(), |
46 |
|
peername :: mongoose_transport:peer(), |
47 |
|
peercert :: undefined | binary() |
48 |
|
}). |
49 |
|
-record(ws_state, { |
50 |
|
peer :: mongoose_transport:peer() | undefined, |
51 |
|
fsm_pid :: pid() | undefined, |
52 |
|
parser :: exml_stream:parser() | undefined, |
53 |
|
opts :: map(), |
54 |
|
ping_rate :: integer() | none, |
55 |
|
max_stanza_size :: integer() | infinity, |
56 |
|
peercert :: undefined | passed | binary() |
57 |
|
%% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket() |
58 |
|
}). |
59 |
|
|
60 |
|
-type socket() :: #websocket{}. |
61 |
|
|
62 |
|
%% mongoose_http_handler callbacks |
63 |
|
|
64 |
|
-spec config_spec() -> mongoose_config_spec:config_section(). |
65 |
|
config_spec() -> |
66 |
104 |
#section{items = #{<<"timeout">> => #option{type = int_or_infinity, |
67 |
|
validate = non_negative}, |
68 |
|
<<"ping_rate">> => #option{type = integer, |
69 |
|
validate = positive}, |
70 |
|
<<"max_stanza_size">> => #option{type = int_or_infinity, |
71 |
|
validate = positive}, |
72 |
|
<<"c2s_state_timeout">> => #option{type = int_or_infinity, |
73 |
|
validate = non_negative}, |
74 |
|
<<"backwards_compatible_session">> => #option{type = boolean}}, |
75 |
|
defaults = #{<<"timeout">> => 60000, |
76 |
|
<<"max_stanza_size">> => infinity, |
77 |
|
<<"c2s_state_timeout">> => 5000, |
78 |
|
<<"backwards_compatible_session">> => true} |
79 |
|
}. |
80 |
|
|
81 |
|
%%-------------------------------------------------------------------- |
82 |
|
%% Common callbacks for all cowboy behaviours |
83 |
|
%%-------------------------------------------------------------------- |
84 |
|
|
85 |
|
init(Req, Opts = #{timeout := Timeout}) -> |
86 |
49 |
Peer = cowboy_req:peer(Req), |
87 |
49 |
PeerCert = cowboy_req:cert(Req), |
88 |
49 |
Req1 = add_sec_websocket_protocol_header(Req), |
89 |
49 |
?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>, |
90 |
49 |
req => Req, opts => Opts}), |
91 |
49 |
AllModOpts = Opts#{peer => Peer, peer_cert => PeerCert}, |
92 |
|
%% upgrade protocol |
93 |
49 |
{cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}. |
94 |
|
|
95 |
|
terminate(_Reason, _Req, _State) -> |
96 |
49 |
ok. |
97 |
|
|
98 |
|
%%-------------------------------------------------------------------- |
99 |
|
%% cowboy_http_websocket_handler callbacks |
100 |
|
%%-------------------------------------------------------------------- |
101 |
|
|
102 |
|
% Called for every new websocket connection. |
103 |
|
websocket_init(Opts = #{peer := Peer, peer_cert := PeerCert, max_stanza_size := MaxStanzaSize}) -> |
104 |
49 |
PingRate = maps:get(ping_rate, Opts, none), |
105 |
49 |
maybe_send_ping_request(PingRate), |
106 |
49 |
?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>, |
107 |
49 |
peer => Peer, opts => Opts}), |
108 |
49 |
State = #ws_state{opts = Opts, |
109 |
|
ping_rate = PingRate, |
110 |
|
max_stanza_size = MaxStanzaSize, |
111 |
|
peer = Peer, |
112 |
|
peercert = PeerCert}, |
113 |
49 |
{ok, State}. |
114 |
|
|
115 |
|
% Called when a text message arrives. |
116 |
|
websocket_handle({text, Msg}, State) -> |
117 |
386 |
?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}), |
118 |
386 |
handle_text(Msg, State); |
119 |
|
|
120 |
|
websocket_handle({pong, Payload}, State) -> |
121 |
:-( |
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>, |
122 |
:-( |
msg => Payload, peer => State#ws_state.peer}), |
123 |
:-( |
{ok, State}; |
124 |
|
|
125 |
|
% With this callback we can handle other kind of |
126 |
|
% messages, like binary. |
127 |
|
websocket_handle(Any, State) -> |
128 |
:-( |
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>, |
129 |
:-( |
msg => Any, peer => State#ws_state.peer}), |
130 |
:-( |
{ok, State}. |
131 |
|
|
132 |
|
% Other messages from the system are handled here. |
133 |
|
websocket_info({send_xml, XML}, State) -> |
134 |
512 |
XML1 = process_server_stream_root(replace_stream_ns(XML)), |
135 |
512 |
Text = exml:to_iolist(XML1), |
136 |
512 |
mongoose_metrics:update(global, [data, xmpp, sent, c2s, websocket], iolist_size(Text)), |
137 |
512 |
?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>, |
138 |
512 |
packet => Text, peer => State#ws_state.peer}), |
139 |
512 |
{reply, {text, Text}, State}; |
140 |
|
websocket_info(do_ping, State) -> |
141 |
|
%% send ping frame to the client |
142 |
:-( |
send_ping_request(State#ws_state.ping_rate), |
143 |
:-( |
{reply, ping, State}; |
144 |
|
websocket_info(stop, #ws_state{parser = undefined} = State) -> |
145 |
:-( |
{stop, State}; |
146 |
|
websocket_info(stop, #ws_state{parser = Parser} = State) -> |
147 |
23 |
exml_stream:free_parser(Parser), |
148 |
23 |
{stop, State}; |
149 |
|
websocket_info(Info, State) -> |
150 |
:-( |
?LOG_DEBUG(#{what => unexpected_message, msg => Info}), |
151 |
:-( |
{ok, State}. |
152 |
|
|
153 |
|
%%-------------------------------------------------------------------- |
154 |
|
%% Callbacks implementation |
155 |
|
%%-------------------------------------------------------------------- |
156 |
|
|
157 |
|
handle_text(Text, #ws_state{ parser = undefined } = State) -> |
158 |
49 |
ParserOpts = get_parser_opts(Text, State), |
159 |
49 |
{ok, Parser} = exml_stream:new_parser(ParserOpts), |
160 |
49 |
handle_text(Text, State#ws_state{ parser = Parser }); |
161 |
|
handle_text(Text, #ws_state{parser = Parser} = State) -> |
162 |
386 |
case exml_stream:parse(Parser, Text) of |
163 |
|
{ok, NewParser, Elements} -> |
164 |
384 |
mongoose_metrics:update(global, [data, xmpp, received, c2s, websocket], byte_size(Text)), |
165 |
384 |
State1 = State#ws_state{ parser = NewParser }, |
166 |
384 |
case maybe_start_fsm(Elements, State1) of |
167 |
|
{ok, State2} -> |
168 |
384 |
process_client_elements(Elements, State2); |
169 |
|
{stop, _} = Shutdown -> |
170 |
:-( |
Shutdown |
171 |
|
end; |
172 |
|
{error, Reason} -> |
173 |
2 |
process_parse_error(Reason, State) |
174 |
|
end. |
175 |
|
|
176 |
|
process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) -> |
177 |
384 |
Elements1 = process_client_stream_start(Elements), |
178 |
384 |
[send_to_fsm(FSM, process_client_stream_end(replace_stream_ns(Elem))) || Elem <- Elements1], |
179 |
384 |
{ok, State}. |
180 |
|
|
181 |
|
process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) -> |
182 |
:-( |
{stop, State}; |
183 |
|
process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) -> |
184 |
2 |
send_to_fsm(FSM, #xmlstreamerror{name = iolist_to_binary(Reason)}), |
185 |
2 |
{ok, State}. |
186 |
|
|
187 |
|
send_to_fsm(FSM, Element) -> |
188 |
386 |
FSM ! {tcp, undefined, Element}, |
189 |
386 |
ok. |
190 |
|
|
191 |
|
maybe_start_fsm([#xmlel{ name = <<"open">> }], |
192 |
|
#ws_state{fsm_pid = undefined, |
193 |
|
opts = #{ip_tuple := IPTuple, port := Port, |
194 |
|
c2s_state_timeout := StateTimeout, |
195 |
|
backwards_compatible_session := BackwardsCompatible}} = State) -> |
196 |
49 |
Opts = #{ |
197 |
|
access => all, |
198 |
|
shaper => none, |
199 |
|
max_stanza_size => 0, |
200 |
|
xml_socket => true, |
201 |
|
hibernate_after => 0, |
202 |
|
c2s_state_timeout => StateTimeout, |
203 |
|
backwards_compatible_session => BackwardsCompatible, |
204 |
|
port => Port, ip_tuple => IPTuple, proto => tcp}, |
205 |
49 |
do_start_fsm(Opts, State); |
206 |
|
maybe_start_fsm(_Els, #ws_state{fsm_pid = undefined} = State) -> |
207 |
:-( |
{stop, State}; |
208 |
|
maybe_start_fsm(_Els, State) -> |
209 |
335 |
{ok, State}. |
210 |
|
|
211 |
|
do_start_fsm(Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) -> |
212 |
49 |
SocketData = #websocket{pid = self(), |
213 |
|
peername = Peer, |
214 |
|
peercert = PeerCert}, |
215 |
49 |
case call_fsm_start(SocketData, Opts) of |
216 |
|
{ok, Pid} -> |
217 |
49 |
?LOG_DEBUG(#{what => ws_c2s_started, c2s_pid => Pid, |
218 |
|
text => <<"WebSockets starts c2s process">>, |
219 |
49 |
peer => State#ws_state.peer}), |
220 |
49 |
NewState = State#ws_state{fsm_pid = Pid, peercert = passed}, |
221 |
49 |
{ok, NewState}; |
222 |
|
{error, Reason} -> |
223 |
:-( |
?LOG_WARNING(#{what => ws_c2s_start_failed, reason => Reason, |
224 |
|
text => <<"WebSockets fails to start c2s process">>, |
225 |
:-( |
peer => State#ws_state.peer}), |
226 |
:-( |
{stop, State#ws_state{peercert = passed}} |
227 |
|
end. |
228 |
|
|
229 |
|
call_fsm_start(SocketData, #{hibernate_after := HibernateAfterTimeout} = Opts) -> |
230 |
49 |
mongoose_c2s:start({?MODULE, SocketData, Opts}, |
231 |
|
[{hibernate_after, HibernateAfterTimeout}]). |
232 |
|
|
233 |
|
%%-------------------------------------------------------------------- |
234 |
|
%% Helpers for handling |
235 |
|
%% https://datatracker.ietf.org/doc/rfc7395/ |
236 |
|
%%-------------------------------------------------------------------- |
237 |
|
|
238 |
|
process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }]) -> |
239 |
98 |
Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}), |
240 |
98 |
Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1], |
241 |
98 |
NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 }, |
242 |
98 |
[NewStart]; |
243 |
|
process_client_stream_start(Elements) -> |
244 |
286 |
Elements. |
245 |
|
|
246 |
|
process_client_stream_end(#xmlel{ name = <<"close">> }) -> |
247 |
47 |
#xmlstreamend{ name = <<"stream:stream">> }; |
248 |
|
process_client_stream_end(Element) -> |
249 |
337 |
Element. |
250 |
|
|
251 |
|
process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs }) -> |
252 |
98 |
Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs), |
253 |
98 |
Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}), |
254 |
98 |
#xmlel{ name = <<"open">>, attrs = Attrs2 }; |
255 |
|
process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> }) -> |
256 |
49 |
#xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] }; |
257 |
|
process_server_stream_root(Element) -> |
258 |
365 |
Element. |
259 |
|
|
260 |
|
replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element) -> |
261 |
100 |
Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] }; |
262 |
|
replace_stream_ns(Element) -> |
263 |
796 |
case should_have_jabber_client(Element) of |
264 |
|
true -> |
265 |
406 |
JabberClient = {<<"xmlns">>, <<"jabber:client">>}, |
266 |
406 |
NewAtrrs = lists:keystore(<<"xmlns">>, 1, |
267 |
|
Element#xmlel.attrs, JabberClient), |
268 |
406 |
Element#xmlel{attrs = NewAtrrs}; |
269 |
|
false -> |
270 |
390 |
Element |
271 |
|
end. |
272 |
|
|
273 |
|
get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) -> |
274 |
39 |
[{max_element_size, 0} | get_parser_opts(Text)]; |
275 |
|
get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) -> |
276 |
10 |
[{max_element_size, MaxStanzaSize} | get_parser_opts(Text)]. |
277 |
|
|
278 |
|
get_parser_opts(<<"<open", _/binary>>) -> |
279 |
49 |
[{infinite_stream, true}]; % new-type WS |
280 |
|
get_parser_opts(_) -> |
281 |
:-( |
[]. % old-type WS |
282 |
|
|
283 |
|
%%-------------------------------------------------------------------- |
284 |
|
%% Helpers |
285 |
|
%%-------------------------------------------------------------------- |
286 |
228 |
should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true; |
287 |
75 |
should_have_jabber_client(#xmlel{name = <<"message">>}) -> true; |
288 |
103 |
should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true; |
289 |
390 |
should_have_jabber_client(_) -> false. |
290 |
|
|
291 |
|
send_ping_request(PingRate) -> |
292 |
:-( |
Dest = self(), |
293 |
:-( |
?LOG_DEBUG(#{what => ws_schedule_ping, |
294 |
|
text => <<"Sending websocket ping request">>, |
295 |
:-( |
ping_rate => PingRate}), |
296 |
:-( |
erlang:send_after(PingRate, Dest, do_ping). |
297 |
|
|
298 |
|
maybe_send_ping_request(none) -> |
299 |
49 |
ok; |
300 |
|
maybe_send_ping_request(PingRate) -> |
301 |
:-( |
send_ping_request(PingRate). |
302 |
|
|
303 |
|
|
304 |
|
%%-------------------------------------------------------------------- |
305 |
|
%% add_sec_websocket_protocol_header |
306 |
|
%%-------------------------------------------------------------------- |
307 |
|
|
308 |
|
add_sec_websocket_protocol_header(Req) -> |
309 |
49 |
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of |
310 |
|
undefined -> |
311 |
|
%% Server should not set "sec-websocket-protocol" response header |
312 |
|
%% if it does not present in the request |
313 |
:-( |
Req; |
314 |
|
Protocols -> |
315 |
49 |
case case_insensitive_match(<<"xmpp">>, Protocols) of |
316 |
|
{matched, MatchedProtocol} -> |
317 |
49 |
cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req); |
318 |
|
nomatch -> |
319 |
|
%% Do not agree with client, do not add a response header |
320 |
:-( |
?LOG_DEBUG(#{what => ws_unknown_protocol, |
321 |
|
text => <<"Header sec-websocket-protocol does not contain xmpp option">>, |
322 |
:-( |
protocols => Protocols}), |
323 |
:-( |
Req |
324 |
|
end |
325 |
|
end. |
326 |
|
|
327 |
|
case_insensitive_match(LowerPattern, [Case | Cases]) -> |
328 |
49 |
LowerCase = jid:str_tolower(Case), |
329 |
49 |
case LowerCase of |
330 |
|
LowerPattern -> |
331 |
49 |
{matched, Case}; |
332 |
|
_ -> |
333 |
:-( |
case_insensitive_match(LowerPattern, Cases) |
334 |
|
end; |
335 |
|
case_insensitive_match(_, []) -> |
336 |
:-( |
nomatch. |
337 |
|
|
338 |
|
%% mongoose_c2s_socket callbacks |
339 |
|
|
340 |
|
-spec socket_new(socket(), mongoose_listener:options()) -> socket(). |
341 |
|
socket_new(Socket, _LOpts) -> |
342 |
49 |
Socket. |
343 |
|
|
344 |
|
-spec socket_peername(socket()) -> {inet:ip_address(), inet:port_number()}. |
345 |
|
socket_peername(#websocket{peername = PeerName}) -> |
346 |
98 |
PeerName. |
347 |
|
|
348 |
|
-spec tcp_to_tls(socket(), mongoose_listener:options()) -> |
349 |
|
{ok, socket()} | {error, term()}. |
350 |
|
tcp_to_tls(_Socket, _LOpts) -> |
351 |
:-( |
{error, tls_not_allowed_on_websockets}. |
352 |
|
|
353 |
|
-spec socket_handle_data(socket(), {tcp | ssl, term(), term()}) -> |
354 |
|
iodata() | {raw, [exml:element()]} | {error, term()}. |
355 |
|
socket_handle_data(_Socket, {_Kind, _Term, Packet}) -> |
356 |
386 |
{raw, [Packet]}. |
357 |
|
|
358 |
|
-spec socket_activate(socket()) -> ok. |
359 |
|
socket_activate(_Socket) -> |
360 |
435 |
ok. |
361 |
|
|
362 |
|
-spec socket_close(socket()) -> ok. |
363 |
|
socket_close(#websocket{pid = Pid}) -> |
364 |
49 |
Pid ! stop, |
365 |
49 |
ok. |
366 |
|
|
367 |
|
-spec socket_send_xml(socket(), iodata() | exml:element() | [exml:element()]) -> |
368 |
|
ok | {error, term()}. |
369 |
|
socket_send_xml(#websocket{pid = Pid}, XMLs) when is_list(XMLs) -> |
370 |
512 |
[Pid ! {send_xml, XML} || XML <- XMLs], |
371 |
512 |
ok; |
372 |
|
socket_send_xml(#websocket{pid = Pid}, XML) -> |
373 |
:-( |
Pid ! {send_xml, XML}, |
374 |
:-( |
ok. |
375 |
|
|
376 |
|
-spec has_peer_cert(socket(), mongoose_listener:options()) -> boolean(). |
377 |
|
has_peer_cert(Socket, LOpts) -> |
378 |
6 |
get_peer_certificate(Socket, LOpts) /= no_peer_cert. |
379 |
|
|
380 |
|
|
381 |
|
-spec get_peer_certificate(socket(), mongoose_listener:options()) -> |
382 |
|
mongoose_transport:peercert_return(). |
383 |
|
get_peer_certificate(#websocket{peercert = undefined}, _) -> |
384 |
:-( |
no_peer_cert; |
385 |
|
get_peer_certificate(#websocket{peercert = PeerCert}, _) -> |
386 |
9 |
Decoded = public_key:pkix_decode_cert(PeerCert, plain), |
387 |
9 |
{ok, Decoded}. |
388 |
|
|
389 |
|
-spec is_channel_binding_supported(socket()) -> boolean(). |
390 |
|
is_channel_binding_supported(_Socket) -> |
391 |
92 |
false. |
392 |
|
|
393 |
|
-spec get_tls_last_message(socket()) -> {ok, binary()} | {error, term()}. |
394 |
|
get_tls_last_message(_Socket) -> |
395 |
:-( |
{error, tls_not_allowed_on_websockets}. |
396 |
|
|
397 |
|
-spec is_ssl(socket()) -> boolean(). |
398 |
|
is_ssl(_Socket) -> |
399 |
147 |
false. |