1 |
|
%%%=================================================================== |
2 |
|
%%% @copyright (C) 2016, Erlang Solutions Ltd. |
3 |
|
%%% @doc Module providing support for websockets in MongooseIM |
4 |
|
%%% @end |
5 |
|
%%%=================================================================== |
6 |
|
-module(mod_websockets). |
7 |
|
|
8 |
|
-behaviour(cowboy_websocket). |
9 |
|
-behaviour(mongoose_transport). |
10 |
|
|
11 |
|
%% cowboy_http_websocket_handler callbacks |
12 |
|
-export([init/2, |
13 |
|
websocket_init/1, |
14 |
|
websocket_handle/2, |
15 |
|
websocket_info/2, |
16 |
|
terminate/3]). |
17 |
|
|
18 |
|
%% ejabberd_socket compatibility |
19 |
|
-export([starttls/2, starttls/3, |
20 |
|
compress/1, compress/3, |
21 |
|
send/2, |
22 |
|
send_xml/2, |
23 |
|
change_shaper/2, |
24 |
|
monitor/1, |
25 |
|
get_sockmod/1, |
26 |
|
close/1, |
27 |
|
peername/1, |
28 |
|
get_peer_certificate/1, |
29 |
|
set_ping/2, |
30 |
|
disable_ping/1]). |
31 |
|
|
32 |
|
-ignore_xref([change_shaper/2, close/1, compress/1, compress/3, disable_ping/1, |
33 |
|
get_peer_certificate/1, get_sockmod/1, send/2, set_ping/2]). |
34 |
|
|
35 |
|
-include("mongoose.hrl"). |
36 |
|
-include("jlib.hrl"). |
37 |
|
-include_lib("exml/include/exml_stream.hrl"). |
38 |
|
|
39 |
|
-define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>). |
40 |
|
-define(NS_COMPONENT, <<"jabber:component:accept">>). |
41 |
|
|
42 |
|
-record(websocket, { |
43 |
|
pid :: pid(), |
44 |
|
peername :: mongoose_transport:peer(), |
45 |
|
peercert :: undefined | binary() |
46 |
|
}). |
47 |
|
-record(ws_state, { |
48 |
|
peer :: mongoose_transport:peer() | undefined, |
49 |
|
fsm_pid :: pid() | undefined, |
50 |
|
open_tag :: stream | open | undefined, |
51 |
|
parser :: exml_stream:parser() | undefined, |
52 |
|
opts :: proplists:proplist() | undefined, |
53 |
|
ping_rate :: integer() | none, |
54 |
|
max_stanza_size :: integer() | infinity, |
55 |
|
peercert :: undefined | passed | binary() |
56 |
|
%% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket() |
57 |
|
}). |
58 |
|
|
59 |
|
-type socket() :: #websocket{}. |
60 |
|
|
61 |
|
%%-------------------------------------------------------------------- |
62 |
|
%% Common callbacks for all cowboy behaviours |
63 |
|
%%-------------------------------------------------------------------- |
64 |
|
|
65 |
|
init(Req, Opts) -> |
66 |
28 |
Peer = cowboy_req:peer(Req), |
67 |
28 |
PeerCert = cowboy_req:cert(Req), |
68 |
28 |
Req1 = add_sec_websocket_protocol_header(Req), |
69 |
28 |
?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>, |
70 |
28 |
req => Req, opts => Opts}), |
71 |
28 |
Timeout = gen_mod:get_opt(timeout, Opts, 60000), |
72 |
|
|
73 |
28 |
AllModOpts = [{peer, Peer}, {peercert, PeerCert} | Opts], |
74 |
|
%% upgrade protocol |
75 |
28 |
{cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}. |
76 |
|
|
77 |
|
terminate(_Reason, _Req, _State) -> |
78 |
28 |
ok. |
79 |
|
|
80 |
|
%%-------------------------------------------------------------------- |
81 |
|
%% cowboy_http_websocket_handler callbacks |
82 |
|
%%-------------------------------------------------------------------- |
83 |
|
|
84 |
|
% Called for every new websocket connection. |
85 |
|
websocket_init(Opts) -> |
86 |
28 |
PingRate = gen_mod:get_opt(ping_rate, Opts, none), |
87 |
28 |
MaxStanzaSize = gen_mod:get_opt(max_stanza_size, Opts, infinity), |
88 |
28 |
Peer = gen_mod:get_opt(peer, Opts), |
89 |
28 |
PeerCert = gen_mod:get_opt(peercert, Opts), |
90 |
28 |
maybe_send_ping_request(PingRate), |
91 |
28 |
?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>, |
92 |
28 |
peer => Peer, opts => Opts}), |
93 |
28 |
State = #ws_state{opts = Opts, |
94 |
|
ping_rate = PingRate, |
95 |
|
max_stanza_size = MaxStanzaSize, |
96 |
|
peer = Peer, |
97 |
|
peercert = PeerCert}, |
98 |
28 |
{ok, State}. |
99 |
|
|
100 |
|
% Called when a text message arrives. |
101 |
|
websocket_handle({text, Msg}, State) -> |
102 |
221 |
?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}), |
103 |
221 |
handle_text(Msg, State); |
104 |
|
|
105 |
|
websocket_handle({binary, Msg}, State) -> |
106 |
:-( |
?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}), |
107 |
:-( |
handle_text(Msg, State); |
108 |
|
|
109 |
|
websocket_handle({pong, Payload}, State) -> |
110 |
:-( |
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>, |
111 |
:-( |
msg => Payload, peer => State#ws_state.peer}), |
112 |
:-( |
{ok, State}; |
113 |
|
|
114 |
|
% With this callback we can handle other kind of |
115 |
|
% messages, like binary. |
116 |
|
websocket_handle(Any, State) -> |
117 |
:-( |
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>, |
118 |
:-( |
msg => Any, peer => State#ws_state.peer}), |
119 |
:-( |
{ok, State}. |
120 |
|
|
121 |
|
% Other messages from the system are handled here. |
122 |
|
websocket_info({send, Text}, State) -> |
123 |
:-( |
?LOG_DEBUG(#{what => ws_send, text => <<"Sending text over WebSockets">>, |
124 |
:-( |
msg => Text, peer => State#ws_state.peer}), |
125 |
:-( |
{reply, {text, Text}, State}; |
126 |
|
websocket_info({send_xml, XML}, State) -> |
127 |
279 |
XML1 = process_server_stream_root(replace_stream_ns(XML, State), State), |
128 |
279 |
Text = exml:to_iolist(XML1), |
129 |
279 |
?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>, |
130 |
279 |
packet => Text, peer => State#ws_state.peer}), |
131 |
279 |
{reply, {text, Text}, State}; |
132 |
|
websocket_info({set_ping, Value}, State = #ws_state{ping_rate = none}) |
133 |
|
when is_integer(Value) and (Value > 0)-> |
134 |
:-( |
send_ping_request(Value), |
135 |
:-( |
{ok, State#ws_state{ping_rate = Value}}; |
136 |
|
websocket_info({set_ping, Value}, State) when is_integer(Value) and (Value > 0)-> |
137 |
:-( |
{ok, State#ws_state{ping_rate = Value}}; |
138 |
|
websocket_info(disable_ping, State)-> |
139 |
:-( |
{ok, State#ws_state{ping_rate = none}}; |
140 |
|
websocket_info(do_ping, State = #ws_state{ping_rate = none}) -> |
141 |
|
%% probalby someone disabled pings |
142 |
:-( |
{ok, State}; |
143 |
|
websocket_info(do_ping, State) -> |
144 |
|
%% send ping frame to the client |
145 |
:-( |
send_ping_request(State#ws_state.ping_rate), |
146 |
:-( |
{reply, ping, State}; |
147 |
|
websocket_info(stop, #ws_state{parser = undefined} = State) -> |
148 |
:-( |
{stop, State}; |
149 |
|
websocket_info(stop, #ws_state{parser = Parser} = State) -> |
150 |
28 |
exml_stream:free_parser(Parser), |
151 |
28 |
{stop, State}; |
152 |
|
websocket_info(Info, State) -> |
153 |
:-( |
?LOG_DEBUG(#{what => unexpected_message, msg => Info}), |
154 |
:-( |
{ok, State}. |
155 |
|
|
156 |
|
%%-------------------------------------------------------------------- |
157 |
|
%% Callbacks implementation |
158 |
|
%%-------------------------------------------------------------------- |
159 |
|
|
160 |
|
handle_text(Text, #ws_state{ parser = undefined } = State) -> |
161 |
28 |
ParserOpts = get_parser_opts(Text, State), |
162 |
28 |
{ok, Parser} = exml_stream:new_parser(ParserOpts), |
163 |
28 |
handle_text(Text, State#ws_state{ parser = Parser }); |
164 |
|
handle_text(Text, #ws_state{parser = Parser} = State) -> |
165 |
221 |
case exml_stream:parse(Parser, Text) of |
166 |
|
{ok, NewParser, Elements} -> |
167 |
221 |
State1 = State#ws_state{ parser = NewParser }, |
168 |
221 |
case maybe_start_fsm(Elements, State1) of |
169 |
|
{ok, State2} -> |
170 |
221 |
process_client_elements(Elements, State2); |
171 |
|
{stop, _} = Shutdown -> |
172 |
:-( |
Shutdown |
173 |
|
end; |
174 |
|
{error, Reason} -> |
175 |
:-( |
process_parse_error(Reason, State) |
176 |
|
end. |
177 |
|
|
178 |
|
process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) -> |
179 |
221 |
{Elements1, State1} = process_client_stream_start(Elements, State), |
180 |
221 |
[send_to_fsm(FSM, process_client_stream_end( |
181 |
221 |
replace_stream_ns(Elem, State1), State1)) || Elem <- Elements1], |
182 |
221 |
{ok, State1}. |
183 |
|
|
184 |
|
process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) -> |
185 |
:-( |
{stop, State}; |
186 |
|
process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) -> |
187 |
:-( |
send_to_fsm(FSM, {xmlstreamerror, Reason}), |
188 |
:-( |
{ok, State}. |
189 |
|
|
190 |
|
send_to_fsm(FSM, #xmlel{} = Element) -> |
191 |
137 |
send_to_fsm(FSM, {xmlstreamelement, Element}); |
192 |
|
send_to_fsm(FSM, StreamElement) -> |
193 |
221 |
p1_fsm:send_event(FSM, StreamElement). |
194 |
|
|
195 |
|
maybe_start_fsm([#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs} |
196 |
|
| _], |
197 |
|
#ws_state{fsm_pid = undefined, opts = Opts}=State) -> |
198 |
:-( |
case lists:keyfind(<<"xmlns">>, 1, Attrs) of |
199 |
|
{<<"xmlns">>, ?NS_COMPONENT} -> |
200 |
:-( |
ServiceOpts = gen_mod:get_opt(ejabberd_service, Opts, []), |
201 |
:-( |
do_start_fsm(ejabberd_service, ServiceOpts, State); |
202 |
|
_ -> |
203 |
:-( |
{stop, State} |
204 |
|
end; |
205 |
|
maybe_start_fsm([#xmlel{ name = <<"open">> }], |
206 |
|
#ws_state{fsm_pid = undefined, opts = Opts}=State) -> |
207 |
28 |
do_start_fsm(ejabberd_c2s, Opts, State); |
208 |
|
maybe_start_fsm(_Els, State) -> |
209 |
193 |
{ok, State}. |
210 |
|
|
211 |
|
do_start_fsm(FSMModule, Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) -> |
212 |
28 |
SocketData = #websocket{pid = self(), |
213 |
|
peername = Peer, |
214 |
|
peercert = PeerCert}, |
215 |
28 |
Opts1 = [{xml_socket, true} | Opts], |
216 |
28 |
case call_fsm_start(FSMModule, SocketData, Opts1) of |
217 |
|
{ok, Pid} -> |
218 |
28 |
?LOG_DEBUG(#{what => ws_c2s_started, |
219 |
|
text => <<"WebSockets starts c2s process">>, |
220 |
|
c2s_pid => Pid, c2s_module => FSMModule, |
221 |
28 |
peer => State#ws_state.peer}), |
222 |
28 |
NewState = State#ws_state{fsm_pid = Pid, peercert = passed}, |
223 |
28 |
{ok, NewState}; |
224 |
|
{error, Reason} -> |
225 |
:-( |
?LOG_WARNING(#{what => ws_c2s_start_failed, |
226 |
|
text => <<"WebSockets fails to start c2s process">>, |
227 |
|
reason => Reason, c2s_module => FSMModule, |
228 |
:-( |
peer => State#ws_state.peer}), |
229 |
:-( |
{stop, State#ws_state{peercert = passed}} |
230 |
|
end. |
231 |
|
|
232 |
|
call_fsm_start(ejabberd_c2s, SocketData, Opts) -> |
233 |
28 |
ejabberd_c2s:start({?MODULE, SocketData}, Opts); |
234 |
|
call_fsm_start(ejabberd_service, SocketData, Opts) -> |
235 |
:-( |
ejabberd_service:start({?MODULE, SocketData}, Opts). |
236 |
|
|
237 |
|
%%-------------------------------------------------------------------- |
238 |
|
%% ejabberd_socket compatibility |
239 |
|
%%-------------------------------------------------------------------- |
240 |
|
-spec starttls(socket(), _) -> no_return(). |
241 |
|
starttls(SocketData, TLSOpts) -> |
242 |
:-( |
starttls(SocketData, TLSOpts, <<>>). |
243 |
|
|
244 |
|
-spec starttls(socket(), _, _) -> no_return(). |
245 |
|
starttls(_SocketData, _TLSOpts, _Data) -> |
246 |
:-( |
throw({error, tls_not_allowed_on_websockets}). |
247 |
|
|
248 |
|
-spec compress(socket()) -> no_return(). |
249 |
|
compress(SocketData) -> |
250 |
:-( |
compress(SocketData, <<>>, 0). |
251 |
|
|
252 |
|
-spec compress(socket(), _, _) -> no_return(). |
253 |
|
compress(_SocketData, _Data, _InflateSizeLimit) -> |
254 |
:-( |
throw({error, compression_not_allowed_on_websockets}). |
255 |
|
|
256 |
|
-spec send_xml(socket(), mongoose_transport:send_xml_input()) -> ok. |
257 |
|
send_xml(SocketData, {xmlstreamraw, Text}) -> |
258 |
:-( |
send(SocketData, Text); |
259 |
|
send_xml(SocketData, {xmlstreamelement, XML}) -> |
260 |
195 |
send_xml(SocketData, XML); |
261 |
|
send_xml(#websocket{pid = Pid}, XML) -> |
262 |
279 |
Pid ! {send_xml, XML}, |
263 |
279 |
ok. |
264 |
|
|
265 |
|
send(#websocket{pid = Pid}, Data) -> |
266 |
:-( |
Pid ! {send, Data}, |
267 |
:-( |
ok. |
268 |
|
|
269 |
|
change_shaper(SocketData, _Shaper) -> |
270 |
84 |
SocketData. %% TODO: we ignore shapers for now |
271 |
|
|
272 |
|
-spec monitor(socket()) -> reference(). |
273 |
|
monitor(#websocket{pid = Pid}) -> |
274 |
28 |
erlang:monitor(process, Pid). |
275 |
|
|
276 |
|
get_sockmod(_SocketData) -> |
277 |
162 |
?MODULE. |
278 |
|
|
279 |
|
close(#websocket{pid = Pid}) -> |
280 |
28 |
Pid ! stop. |
281 |
|
|
282 |
|
-spec peername(socket()) -> mongoose_transport:peername_return(). |
283 |
|
peername(#websocket{peername = PeerName}) -> |
284 |
28 |
{ok, PeerName}. |
285 |
|
|
286 |
|
get_peer_certificate(#websocket{peercert = undefined}) -> |
287 |
25 |
no_peer_cert; |
288 |
|
get_peer_certificate(#websocket{peercert = PeerCert}) -> |
289 |
9 |
Decoded = public_key:pkix_decode_cert(PeerCert, plain), |
290 |
9 |
{ok, Decoded}. |
291 |
|
|
292 |
|
set_ping(#websocket{pid = Pid}, Value) -> |
293 |
:-( |
Pid ! {set_ping, Value}. |
294 |
|
|
295 |
|
disable_ping(#websocket{pid = Pid}) -> |
296 |
:-( |
Pid ! disable_ping. |
297 |
|
|
298 |
|
%%-------------------------------------------------------------------- |
299 |
|
%% Helpers for handling both |
300 |
|
%% http://datatracker.ietf.org/doc/draft-ietf-xmpp-websocket |
301 |
|
%% and older |
302 |
|
%% http://tools.ietf.org/id/draft-moffitt-xmpp-over-websocket |
303 |
|
%%-------------------------------------------------------------------- |
304 |
|
|
305 |
|
process_client_stream_start([#xmlstreamstart{ name = <<"stream", _/binary>>} |
306 |
|
| _] = Elements, State) -> |
307 |
:-( |
{Elements, State#ws_state{ open_tag = stream }}; |
308 |
|
process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }], State) -> |
309 |
56 |
Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}), |
310 |
56 |
Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1], |
311 |
56 |
NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 }, |
312 |
56 |
{[NewStart], State#ws_state{ open_tag = open }}; |
313 |
|
process_client_stream_start(Elements, State) -> |
314 |
165 |
{Elements, State}. |
315 |
|
|
316 |
|
process_client_stream_end(#xmlel{ name = <<"close">> }, #ws_state{ open_tag = open }) -> |
317 |
28 |
#xmlstreamend{ name = <<"stream:stream">> }; |
318 |
|
process_client_stream_end(Element, _) -> |
319 |
193 |
Element. |
320 |
|
|
321 |
|
process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs }, |
322 |
|
#ws_state{ open_tag = open }) -> |
323 |
56 |
Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs), |
324 |
56 |
Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}), |
325 |
56 |
#xmlel{ name = <<"open">>, attrs = Attrs2 }; |
326 |
|
process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> }, |
327 |
|
#ws_state{ open_tag = open }) -> |
328 |
28 |
#xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] }; |
329 |
|
process_server_stream_root(Element, _) -> |
330 |
195 |
Element. |
331 |
|
|
332 |
|
replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element, |
333 |
|
#ws_state{ open_tag = open }) -> |
334 |
56 |
Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] }; |
335 |
|
replace_stream_ns(Element, #ws_state{ open_tag = open }) -> |
336 |
444 |
case should_have_jabber_client(Element) of |
337 |
|
true -> |
338 |
220 |
JabberClient = {<<"xmlns">>, <<"jabber:client">>}, |
339 |
220 |
NewAtrrs = lists:keystore(<<"xmlns">>, 1, |
340 |
|
Element#xmlel.attrs, JabberClient), |
341 |
220 |
Element#xmlel{attrs = NewAtrrs}; |
342 |
|
false -> |
343 |
224 |
Element |
344 |
|
end; |
345 |
|
replace_stream_ns(Element, _State) -> |
346 |
:-( |
Element. |
347 |
|
|
348 |
|
get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) -> |
349 |
28 |
[{max_child_size, 0} | get_parser_opts(Text)]; |
350 |
|
get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) -> |
351 |
:-( |
[{max_child_size, MaxStanzaSize} | get_parser_opts(Text)]. |
352 |
|
|
353 |
|
get_parser_opts(<<"<open", _/binary>>) -> |
354 |
28 |
[{infinite_stream, true}, {autoreset, true}]; % new-type WS |
355 |
|
get_parser_opts(_) -> |
356 |
:-( |
[{start_tag, <<"stream:stream">>}]. % old-type WS |
357 |
|
|
358 |
|
%%-------------------------------------------------------------------- |
359 |
|
%% Helpers |
360 |
|
%%-------------------------------------------------------------------- |
361 |
112 |
should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true; |
362 |
56 |
should_have_jabber_client(#xmlel{name = <<"message">>}) -> true; |
363 |
52 |
should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true; |
364 |
224 |
should_have_jabber_client(_) -> false. |
365 |
|
|
366 |
|
send_ping_request(PingRate) -> |
367 |
:-( |
Dest = self(), |
368 |
:-( |
?LOG_DEBUG(#{what => ws_schedule_ping, |
369 |
|
text => <<"Sending websocket ping request">>, |
370 |
:-( |
ping_rate => PingRate}), |
371 |
:-( |
erlang:send_after(PingRate, Dest, do_ping). |
372 |
|
|
373 |
|
maybe_send_ping_request(none) -> |
374 |
28 |
ok; |
375 |
|
maybe_send_ping_request(PingRate) -> |
376 |
:-( |
send_ping_request(PingRate). |
377 |
|
|
378 |
|
|
379 |
|
%%-------------------------------------------------------------------- |
380 |
|
%% add_sec_websocket_protocol_header |
381 |
|
%%-------------------------------------------------------------------- |
382 |
|
|
383 |
|
add_sec_websocket_protocol_header(Req) -> |
384 |
28 |
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of |
385 |
|
undefined -> |
386 |
|
%% Server should not set "sec-websocket-protocol" response header |
387 |
|
%% if it does not present in the request |
388 |
:-( |
Req; |
389 |
|
Protocols -> |
390 |
28 |
case case_insensitive_match(<<"xmpp">>, Protocols) of |
391 |
|
{matched, MatchedProtocol} -> |
392 |
28 |
cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req); |
393 |
|
nomatch -> |
394 |
|
%% Do not agree with client, do not add a response header |
395 |
:-( |
?LOG_DEBUG(#{what => ws_unknown_protocol, |
396 |
|
text => <<"Header sec-websocket-protocol does not contain xmpp option">>, |
397 |
:-( |
protocols => Protocols}), |
398 |
:-( |
Req |
399 |
|
end |
400 |
|
end. |
401 |
|
|
402 |
|
case_insensitive_match(LowerPattern, [Case | Cases]) -> |
403 |
28 |
LowerCase = jid:str_tolower(Case), |
404 |
28 |
case LowerCase of |
405 |
|
LowerPattern -> |
406 |
28 |
{matched, Case}; |
407 |
|
_ -> |
408 |
:-( |
case_insensitive_match(LowerPattern, Cases) |
409 |
|
end; |
410 |
|
case_insensitive_match(_, []) -> |
411 |
:-( |
nomatch. |