./ct_report/coverage/mod_websockets.COVER.html

1 %%%===================================================================
2 %%% @copyright (C) 2016, Erlang Solutions Ltd.
3 %%% @doc Module providing support for websockets in MongooseIM
4 %%% @end
5 %%%===================================================================
6 -module(mod_websockets).
7
8 -behaviour(cowboy_websocket).
9 -behaviour(mongoose_transport).
10
11 %% cowboy_http_websocket_handler callbacks
12 -export([init/2,
13 websocket_init/1,
14 websocket_handle/2,
15 websocket_info/2,
16 terminate/3]).
17
18 %% ejabberd_socket compatibility
19 -export([starttls/2, starttls/3,
20 compress/1, compress/3,
21 send/2,
22 send_xml/2,
23 change_shaper/2,
24 monitor/1,
25 get_sockmod/1,
26 close/1,
27 peername/1,
28 get_peer_certificate/1,
29 set_ping/2,
30 disable_ping/1]).
31
32 -ignore_xref([change_shaper/2, close/1, compress/1, compress/3, disable_ping/1,
33 get_peer_certificate/1, get_sockmod/1, send/2, set_ping/2]).
34
35 -include("mongoose.hrl").
36 -include("jlib.hrl").
37 -include_lib("exml/include/exml_stream.hrl").
38
39 -define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>).
40 -define(NS_COMPONENT, <<"jabber:component:accept">>).
41
42 -record(websocket, {
43 pid :: pid(),
44 peername :: mongoose_transport:peer(),
45 peercert :: undefined | binary()
46 }).
47 -record(ws_state, {
48 peer :: mongoose_transport:peer() | undefined,
49 fsm_pid :: pid() | undefined,
50 open_tag :: stream | open | undefined,
51 parser :: exml_stream:parser() | undefined,
52 opts :: proplists:proplist() | undefined,
53 ping_rate :: integer() | none,
54 max_stanza_size :: integer() | infinity,
55 peercert :: undefined | passed | binary()
56 %% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket()
57 }).
58
59 -type socket() :: #websocket{}.
60
61 %%--------------------------------------------------------------------
62 %% Common callbacks for all cowboy behaviours
63 %%--------------------------------------------------------------------
64
65 init(Req, Opts) ->
66 28 Peer = cowboy_req:peer(Req),
67 28 PeerCert = cowboy_req:cert(Req),
68 28 Req1 = add_sec_websocket_protocol_header(Req),
69 28 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>,
70 28 req => Req, opts => Opts}),
71 28 Timeout = proplists:get_value(timeout, Opts, 60000),
72
73 28 AllModOpts = [{peer, Peer}, {peercert, PeerCert} | Opts],
74 %% upgrade protocol
75 28 {cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}.
76
77 terminate(_Reason, _Req, _State) ->
78 28 ok.
79
80 %%--------------------------------------------------------------------
81 %% cowboy_http_websocket_handler callbacks
82 %%--------------------------------------------------------------------
83
84 % Called for every new websocket connection.
85 websocket_init(Opts) ->
86 28 PingRate = proplists:get_value(ping_rate, Opts, none),
87 28 MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity),
88 28 Peer = proplists:get_value(peer, Opts),
89 28 PeerCert = proplists:get_value(peercert, Opts),
90 28 maybe_send_ping_request(PingRate),
91 28 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>,
92 28 peer => Peer, opts => Opts}),
93 28 State = #ws_state{opts = Opts,
94 ping_rate = PingRate,
95 max_stanza_size = MaxStanzaSize,
96 peer = Peer,
97 peercert = PeerCert},
98 28 {ok, State}.
99
100 % Called when a text message arrives.
101 websocket_handle({text, Msg}, State) ->
102 221 ?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}),
103 221 handle_text(Msg, State);
104
105 websocket_handle({binary, Msg}, State) ->
106
:-(
?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}),
107
:-(
handle_text(Msg, State);
108
109 websocket_handle({pong, Payload}, State) ->
110
:-(
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>,
111
:-(
msg => Payload, peer => State#ws_state.peer}),
112
:-(
{ok, State};
113
114 % With this callback we can handle other kind of
115 % messages, like binary.
116 websocket_handle(Any, State) ->
117
:-(
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>,
118
:-(
msg => Any, peer => State#ws_state.peer}),
119
:-(
{ok, State}.
120
121 % Other messages from the system are handled here.
122 websocket_info({send, Text}, State) ->
123
:-(
?LOG_DEBUG(#{what => ws_send, text => <<"Sending text over WebSockets">>,
124
:-(
msg => Text, peer => State#ws_state.peer}),
125
:-(
{reply, {text, Text}, State};
126 websocket_info({send_xml, XML}, State) ->
127 279 XML1 = process_server_stream_root(replace_stream_ns(XML, State), State),
128 279 Text = exml:to_iolist(XML1),
129 279 ?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>,
130 279 packet => Text, peer => State#ws_state.peer}),
131 279 {reply, {text, Text}, State};
132 websocket_info({set_ping, Value}, State = #ws_state{ping_rate = none})
133 when is_integer(Value) and (Value > 0)->
134
:-(
send_ping_request(Value),
135
:-(
{ok, State#ws_state{ping_rate = Value}};
136 websocket_info({set_ping, Value}, State) when is_integer(Value) and (Value > 0)->
137
:-(
{ok, State#ws_state{ping_rate = Value}};
138 websocket_info(disable_ping, State)->
139
:-(
{ok, State#ws_state{ping_rate = none}};
140 websocket_info(do_ping, State = #ws_state{ping_rate = none}) ->
141 %% probalby someone disabled pings
142
:-(
{ok, State};
143 websocket_info(do_ping, State) ->
144 %% send ping frame to the client
145
:-(
send_ping_request(State#ws_state.ping_rate),
146
:-(
{reply, ping, State};
147 websocket_info(stop, #ws_state{parser = undefined} = State) ->
148
:-(
{stop, State};
149 websocket_info(stop, #ws_state{parser = Parser} = State) ->
150 27 exml_stream:free_parser(Parser),
151 27 {stop, State};
152 websocket_info(Info, State) ->
153
:-(
?LOG_DEBUG(#{what => unexpected_message, msg => Info}),
154
:-(
{ok, State}.
155
156 %%--------------------------------------------------------------------
157 %% Callbacks implementation
158 %%--------------------------------------------------------------------
159
160 handle_text(Text, #ws_state{ parser = undefined } = State) ->
161 28 ParserOpts = get_parser_opts(Text, State),
162 28 {ok, Parser} = exml_stream:new_parser(ParserOpts),
163 28 handle_text(Text, State#ws_state{ parser = Parser });
164 handle_text(Text, #ws_state{parser = Parser} = State) ->
165 221 case exml_stream:parse(Parser, Text) of
166 {ok, NewParser, Elements} ->
167 221 State1 = State#ws_state{ parser = NewParser },
168 221 case maybe_start_fsm(Elements, State1) of
169 {ok, State2} ->
170 221 process_client_elements(Elements, State2);
171 {stop, _} = Shutdown ->
172
:-(
Shutdown
173 end;
174 {error, Reason} ->
175
:-(
process_parse_error(Reason, State)
176 end.
177
178 process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) ->
179 221 {Elements1, State1} = process_client_stream_start(Elements, State),
180 221 [send_to_fsm(FSM, process_client_stream_end(
181 221 replace_stream_ns(Elem, State1), State1)) || Elem <- Elements1],
182 221 {ok, State1}.
183
184 process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) ->
185
:-(
{stop, State};
186 process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) ->
187
:-(
send_to_fsm(FSM, {xmlstreamerror, Reason}),
188
:-(
{ok, State}.
189
190 send_to_fsm(FSM, #xmlel{} = Element) ->
191 137 send_to_fsm(FSM, {xmlstreamelement, Element});
192 send_to_fsm(FSM, StreamElement) ->
193 221 p1_fsm:send_event(FSM, StreamElement).
194
195 maybe_start_fsm([#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs}
196 | _],
197 #ws_state{fsm_pid = undefined, opts = Opts}=State) ->
198
:-(
case {lists:keyfind(<<"xmlns">>, 1, Attrs), proplists:get_value(service, Opts)} of
199 {{<<"xmlns">>, ?NS_COMPONENT}, #{} = ServiceOpts} ->
200
:-(
do_start_fsm(ejabberd_service, ServiceOpts, State);
201 _ ->
202
:-(
{stop, State}
203 end;
204 maybe_start_fsm([#xmlel{ name = <<"open">> }],
205 #ws_state{fsm_pid = undefined} = State) ->
206 28 Opts = #{access => all, shaper => none, xml_socket => true, hibernate_after => 0},
207 28 do_start_fsm(ejabberd_c2s, Opts, State);
208 maybe_start_fsm(_Els, State) ->
209 193 {ok, State}.
210
211 do_start_fsm(FSMModule, Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) ->
212 28 SocketData = #websocket{pid = self(),
213 peername = Peer,
214 peercert = PeerCert},
215 28 case call_fsm_start(FSMModule, SocketData, Opts) of
216 {ok, Pid} ->
217 28 ?LOG_DEBUG(#{what => ws_c2s_started,
218 text => <<"WebSockets starts c2s process">>,
219 c2s_pid => Pid, c2s_module => FSMModule,
220 28 peer => State#ws_state.peer}),
221 28 NewState = State#ws_state{fsm_pid = Pid, peercert = passed},
222 28 {ok, NewState};
223 {error, Reason} ->
224
:-(
?LOG_WARNING(#{what => ws_c2s_start_failed,
225 text => <<"WebSockets fails to start c2s process">>,
226 reason => Reason, c2s_module => FSMModule,
227
:-(
peer => State#ws_state.peer}),
228
:-(
{stop, State#ws_state{peercert = passed}}
229 end.
230
231 call_fsm_start(ejabberd_c2s, SocketData, Opts) ->
232 28 ejabberd_c2s:start({?MODULE, SocketData}, Opts);
233 call_fsm_start(ejabberd_service, SocketData, Opts) ->
234
:-(
ejabberd_service:start({?MODULE, SocketData}, Opts).
235
236 %%--------------------------------------------------------------------
237 %% ejabberd_socket compatibility
238 %%--------------------------------------------------------------------
239 -spec starttls(socket(), _) -> no_return().
240 starttls(SocketData, TLSOpts) ->
241
:-(
starttls(SocketData, TLSOpts, <<>>).
242
243 -spec starttls(socket(), _, _) -> no_return().
244 starttls(_SocketData, _TLSOpts, _Data) ->
245
:-(
throw({error, tls_not_allowed_on_websockets}).
246
247 -spec compress(socket()) -> no_return().
248 compress(SocketData) ->
249
:-(
compress(SocketData, <<>>, 0).
250
251 -spec compress(socket(), _, _) -> no_return().
252 compress(_SocketData, _Data, _InflateSizeLimit) ->
253
:-(
throw({error, compression_not_allowed_on_websockets}).
254
255 -spec send_xml(socket(), mongoose_transport:send_xml_input()) -> ok.
256 send_xml(SocketData, {xmlstreamraw, Text}) ->
257
:-(
send(SocketData, Text);
258 send_xml(SocketData, {xmlstreamelement, XML}) ->
259 195 send_xml(SocketData, XML);
260 send_xml(#websocket{pid = Pid}, XML) ->
261 279 Pid ! {send_xml, XML},
262 279 ok.
263
264 send(#websocket{pid = Pid}, Data) ->
265
:-(
Pid ! {send, Data},
266
:-(
ok.
267
268 change_shaper(SocketData, _Shaper) ->
269 84 SocketData. %% TODO: we ignore shapers for now
270
271 -spec monitor(socket()) -> reference().
272 monitor(#websocket{pid = Pid}) ->
273 28 erlang:monitor(process, Pid).
274
275 get_sockmod(_SocketData) ->
276 162 ?MODULE.
277
278 close(#websocket{pid = Pid}) ->
279 28 Pid ! stop.
280
281 -spec peername(socket()) -> mongoose_transport:peername_return().
282 peername(#websocket{peername = PeerName}) ->
283 28 {ok, PeerName}.
284
285 get_peer_certificate(#websocket{peercert = undefined}) ->
286 25 no_peer_cert;
287 get_peer_certificate(#websocket{peercert = PeerCert}) ->
288 9 Decoded = public_key:pkix_decode_cert(PeerCert, plain),
289 9 {ok, Decoded}.
290
291 set_ping(#websocket{pid = Pid}, Value) ->
292
:-(
Pid ! {set_ping, Value}.
293
294 disable_ping(#websocket{pid = Pid}) ->
295
:-(
Pid ! disable_ping.
296
297 %%--------------------------------------------------------------------
298 %% Helpers for handling both
299 %% http://datatracker.ietf.org/doc/draft-ietf-xmpp-websocket
300 %% and older
301 %% http://tools.ietf.org/id/draft-moffitt-xmpp-over-websocket
302 %%--------------------------------------------------------------------
303
304 process_client_stream_start([#xmlstreamstart{ name = <<"stream", _/binary>>}
305 | _] = Elements, State) ->
306
:-(
{Elements, State#ws_state{ open_tag = stream }};
307 process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }], State) ->
308 56 Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}),
309 56 Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1],
310 56 NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 },
311 56 {[NewStart], State#ws_state{ open_tag = open }};
312 process_client_stream_start(Elements, State) ->
313 165 {Elements, State}.
314
315 process_client_stream_end(#xmlel{ name = <<"close">> }, #ws_state{ open_tag = open }) ->
316 28 #xmlstreamend{ name = <<"stream:stream">> };
317 process_client_stream_end(Element, _) ->
318 193 Element.
319
320 process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs },
321 #ws_state{ open_tag = open }) ->
322 56 Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs),
323 56 Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}),
324 56 #xmlel{ name = <<"open">>, attrs = Attrs2 };
325 process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> },
326 #ws_state{ open_tag = open }) ->
327 28 #xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] };
328 process_server_stream_root(Element, _) ->
329 195 Element.
330
331 replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element,
332 #ws_state{ open_tag = open }) ->
333 56 Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] };
334 replace_stream_ns(Element, #ws_state{ open_tag = open }) ->
335 444 case should_have_jabber_client(Element) of
336 true ->
337 220 JabberClient = {<<"xmlns">>, <<"jabber:client">>},
338 220 NewAtrrs = lists:keystore(<<"xmlns">>, 1,
339 Element#xmlel.attrs, JabberClient),
340 220 Element#xmlel{attrs = NewAtrrs};
341 false ->
342 224 Element
343 end;
344 replace_stream_ns(Element, _State) ->
345
:-(
Element.
346
347 get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) ->
348 28 [{max_child_size, 0} | get_parser_opts(Text)];
349 get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) ->
350
:-(
[{max_child_size, MaxStanzaSize} | get_parser_opts(Text)].
351
352 get_parser_opts(<<"<open", _/binary>>) ->
353 28 [{infinite_stream, true}, {autoreset, true}]; % new-type WS
354 get_parser_opts(_) ->
355
:-(
[{start_tag, <<"stream:stream">>}]. % old-type WS
356
357 %%--------------------------------------------------------------------
358 %% Helpers
359 %%--------------------------------------------------------------------
360 112 should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true;
361 56 should_have_jabber_client(#xmlel{name = <<"message">>}) -> true;
362 52 should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true;
363 224 should_have_jabber_client(_) -> false.
364
365 send_ping_request(PingRate) ->
366
:-(
Dest = self(),
367
:-(
?LOG_DEBUG(#{what => ws_schedule_ping,
368 text => <<"Sending websocket ping request">>,
369
:-(
ping_rate => PingRate}),
370
:-(
erlang:send_after(PingRate, Dest, do_ping).
371
372 maybe_send_ping_request(none) ->
373 28 ok;
374 maybe_send_ping_request(PingRate) ->
375
:-(
send_ping_request(PingRate).
376
377
378 %%--------------------------------------------------------------------
379 %% add_sec_websocket_protocol_header
380 %%--------------------------------------------------------------------
381
382 add_sec_websocket_protocol_header(Req) ->
383 28 case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
384 undefined ->
385 %% Server should not set "sec-websocket-protocol" response header
386 %% if it does not present in the request
387
:-(
Req;
388 Protocols ->
389 28 case case_insensitive_match(<<"xmpp">>, Protocols) of
390 {matched, MatchedProtocol} ->
391 28 cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req);
392 nomatch ->
393 %% Do not agree with client, do not add a response header
394
:-(
?LOG_DEBUG(#{what => ws_unknown_protocol,
395 text => <<"Header sec-websocket-protocol does not contain xmpp option">>,
396
:-(
protocols => Protocols}),
397
:-(
Req
398 end
399 end.
400
401 case_insensitive_match(LowerPattern, [Case | Cases]) ->
402 28 LowerCase = jid:str_tolower(Case),
403 28 case LowerCase of
404 LowerPattern ->
405 28 {matched, Case};
406 _ ->
407
:-(
case_insensitive_match(LowerPattern, Cases)
408 end;
409 case_insensitive_match(_, []) ->
410
:-(
nomatch.
Line Hits Source