./ct_report/coverage/mod_websockets.COVER.html

1 %%%===================================================================
2 %%% @copyright (C) 2016, Erlang Solutions Ltd.
3 %%% @doc Module providing support for websockets in MongooseIM
4 %%% @end
5 %%%===================================================================
6 -module(mod_websockets).
7
8 -behaviour(cowboy_websocket).
9 -behaviour(mongoose_transport).
10
11 %% cowboy_http_websocket_handler callbacks
12 -export([init/2,
13 websocket_init/1,
14 websocket_handle/2,
15 websocket_info/2,
16 terminate/3]).
17
18 %% ejabberd_socket compatibility
19 -export([starttls/2, starttls/3,
20 compress/1, compress/3,
21 send/2,
22 send_xml/2,
23 change_shaper/2,
24 monitor/1,
25 get_sockmod/1,
26 close/1,
27 peername/1,
28 get_peer_certificate/1,
29 set_ping/2,
30 disable_ping/1]).
31
32 -ignore_xref([change_shaper/2, close/1, compress/1, compress/3, disable_ping/1,
33 get_peer_certificate/1, get_sockmod/1, send/2, set_ping/2]).
34
35 -include("mongoose.hrl").
36 -include("jlib.hrl").
37 -include_lib("exml/include/exml_stream.hrl").
38
39 -define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>).
40 -define(NS_COMPONENT, <<"jabber:component:accept">>).
41
42 -record(websocket, {
43 pid :: pid(),
44 peername :: mongoose_transport:peer(),
45 peercert :: undefined | binary()
46 }).
47 -record(ws_state, {
48 peer :: mongoose_transport:peer() | undefined,
49 fsm_pid :: pid() | undefined,
50 open_tag :: stream | open | undefined,
51 parser :: exml_stream:parser() | undefined,
52 opts :: proplists:proplist() | undefined,
53 ping_rate :: integer() | none,
54 max_stanza_size :: integer() | infinity,
55 peercert :: undefined | passed | binary()
56 %% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket()
57 }).
58
59 -type socket() :: #websocket{}.
60
61 %%--------------------------------------------------------------------
62 %% Common callbacks for all cowboy behaviours
63 %%--------------------------------------------------------------------
64
65 init(Req, Opts) ->
66 56 Peer = cowboy_req:peer(Req),
67 56 PeerCert = cowboy_req:cert(Req),
68 56 Req1 = add_sec_websocket_protocol_header(Req),
69 56 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>,
70 56 req => Req, opts => Opts}),
71 56 Timeout = gen_mod:get_opt(timeout, Opts, 60000),
72
73 56 AllModOpts = [{peer, Peer}, {peercert, PeerCert} | Opts],
74 %% upgrade protocol
75 56 {cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}.
76
77 terminate(_Reason, _Req, _State) ->
78 56 ok.
79
80 %%--------------------------------------------------------------------
81 %% cowboy_http_websocket_handler callbacks
82 %%--------------------------------------------------------------------
83
84 % Called for every new websocket connection.
85 websocket_init(Opts) ->
86 56 PingRate = gen_mod:get_opt(ping_rate, Opts, none),
87 56 MaxStanzaSize = gen_mod:get_opt(max_stanza_size, Opts, infinity),
88 56 Peer = gen_mod:get_opt(peer, Opts),
89 56 PeerCert = gen_mod:get_opt(peercert, Opts),
90 56 maybe_send_ping_request(PingRate),
91 56 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>,
92 56 peer => Peer, opts => Opts}),
93 56 State = #ws_state{opts = Opts,
94 ping_rate = PingRate,
95 max_stanza_size = MaxStanzaSize,
96 peer = Peer,
97 peercert = PeerCert},
98 56 {ok, State}.
99
100 % Called when a text message arrives.
101 websocket_handle({text, Msg}, State) ->
102 381 ?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}),
103 381 handle_text(Msg, State);
104
105 websocket_handle({binary, Msg}, State) ->
106
:-(
?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}),
107
:-(
handle_text(Msg, State);
108
109 websocket_handle({pong, Payload}, State) ->
110
:-(
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>,
111
:-(
msg => Payload, peer => State#ws_state.peer}),
112
:-(
{ok, State};
113
114 % With this callback we can handle other kind of
115 % messages, like binary.
116 websocket_handle(Any, State) ->
117
:-(
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>,
118
:-(
msg => Any, peer => State#ws_state.peer}),
119
:-(
{ok, State}.
120
121 % Other messages from the system are handled here.
122 websocket_info({send, Text}, State) ->
123 25 ?LOG_DEBUG(#{what => ws_send, text => <<"Sending text over WebSockets">>,
124 25 msg => Text, peer => State#ws_state.peer}),
125 25 {reply, {text, Text}, State};
126 websocket_info({send_xml, XML}, State) ->
127 472 XML1 = process_server_stream_root(replace_stream_ns(XML, State), State),
128 472 Text = exml:to_iolist(XML1),
129 472 ?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>,
130 472 packet => Text, peer => State#ws_state.peer}),
131 472 {reply, {text, Text}, State};
132 websocket_info({set_ping, Value}, State = #ws_state{ping_rate = none})
133 when is_integer(Value) and (Value > 0)->
134
:-(
send_ping_request(Value),
135
:-(
{ok, State#ws_state{ping_rate = Value}};
136 websocket_info({set_ping, Value}, State) when is_integer(Value) and (Value > 0)->
137
:-(
{ok, State#ws_state{ping_rate = Value}};
138 websocket_info(disable_ping, State)->
139
:-(
{ok, State#ws_state{ping_rate = none}};
140 websocket_info(do_ping, State = #ws_state{ping_rate = none}) ->
141 %% probalby someone disabled pings
142
:-(
{ok, State};
143 websocket_info(do_ping, State) ->
144 %% send ping frame to the client
145
:-(
send_ping_request(State#ws_state.ping_rate),
146
:-(
{reply, ping, State};
147 websocket_info(stop, #ws_state{parser = undefined} = State) ->
148
:-(
{stop, State};
149 websocket_info(stop, #ws_state{parser = Parser} = State) ->
150 38 exml_stream:free_parser(Parser),
151 38 {stop, State};
152 websocket_info(Info, State) ->
153
:-(
?LOG_DEBUG(#{what => unexpected_message, msg => Info}),
154
:-(
{ok, State}.
155
156 %%--------------------------------------------------------------------
157 %% Callbacks implementation
158 %%--------------------------------------------------------------------
159
160 handle_text(Text, #ws_state{ parser = undefined } = State) ->
161 56 ParserOpts = get_parser_opts(Text, State),
162 56 {ok, Parser} = exml_stream:new_parser(ParserOpts),
163 56 handle_text(Text, State#ws_state{ parser = Parser });
164 handle_text(Text, #ws_state{parser = Parser} = State) ->
165 381 case exml_stream:parse(Parser, Text) of
166 {ok, NewParser, Elements} ->
167 381 State1 = State#ws_state{ parser = NewParser },
168 381 case maybe_start_fsm(Elements, State1) of
169 {ok, State2} ->
170 381 process_client_elements(Elements, State2);
171 {stop, _} = Shutdown ->
172
:-(
Shutdown
173 end;
174 {error, Reason} ->
175
:-(
process_parse_error(Reason, State)
176 end.
177
178 process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) ->
179 381 {Elements1, State1} = process_client_stream_start(Elements, State),
180 381 [send_to_fsm(FSM, process_client_stream_end(
181 381 replace_stream_ns(Elem, State1), State1)) || Elem <- Elements1],
182 381 {ok, State1}.
183
184 process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) ->
185
:-(
{stop, State};
186 process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) ->
187
:-(
send_to_fsm(FSM, {xmlstreamerror, Reason}),
188
:-(
{ok, State}.
189
190 send_to_fsm(FSM, #xmlel{} = Element) ->
191 235 send_to_fsm(FSM, {xmlstreamelement, Element});
192 send_to_fsm(FSM, StreamElement) ->
193 381 p1_fsm:send_event(FSM, StreamElement).
194
195 maybe_start_fsm([#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs}
196 | _],
197 #ws_state{fsm_pid = undefined, opts = Opts}=State) ->
198 11 case lists:keyfind(<<"xmlns">>, 1, Attrs) of
199 {<<"xmlns">>, ?NS_COMPONENT} ->
200 11 ServiceOpts = gen_mod:get_opt(ejabberd_service, Opts, []),
201 11 do_start_fsm(ejabberd_service, ServiceOpts, State);
202 _ ->
203
:-(
{stop, State}
204 end;
205 maybe_start_fsm([#xmlel{ name = <<"open">> }],
206 #ws_state{fsm_pid = undefined, opts = Opts}=State) ->
207 45 do_start_fsm(ejabberd_c2s, Opts, State);
208 maybe_start_fsm(_Els, State) ->
209 325 {ok, State}.
210
211 do_start_fsm(FSMModule, Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) ->
212 56 SocketData = #websocket{pid = self(),
213 peername = Peer,
214 peercert = PeerCert},
215 56 Opts1 = [{xml_socket, true} | Opts],
216 56 case call_fsm_start(FSMModule, SocketData, Opts1) of
217 {ok, Pid} ->
218 56 ?LOG_DEBUG(#{what => ws_c2s_started,
219 text => <<"WebSockets starts c2s process">>,
220 c2s_pid => Pid, c2s_module => FSMModule,
221 56 peer => State#ws_state.peer}),
222 56 NewState = State#ws_state{fsm_pid = Pid, peercert = passed},
223 56 {ok, NewState};
224 {error, Reason} ->
225
:-(
?LOG_WARNING(#{what => ws_c2s_start_failed,
226 text => <<"WebSockets fails to start c2s process">>,
227 reason => Reason, c2s_module => FSMModule,
228
:-(
peer => State#ws_state.peer}),
229
:-(
{stop, State#ws_state{peercert = passed}}
230 end.
231
232 call_fsm_start(ejabberd_c2s, SocketData, Opts) ->
233 45 ejabberd_c2s:start({?MODULE, SocketData}, Opts);
234 call_fsm_start(ejabberd_service, SocketData, Opts) ->
235 11 ejabberd_service:start({?MODULE, SocketData}, Opts).
236
237 %%--------------------------------------------------------------------
238 %% ejabberd_socket compatibility
239 %%--------------------------------------------------------------------
240 -spec starttls(socket(), _) -> no_return().
241 starttls(SocketData, TLSOpts) ->
242
:-(
starttls(SocketData, TLSOpts, <<>>).
243
244 -spec starttls(socket(), _, _) -> no_return().
245 starttls(_SocketData, _TLSOpts, _Data) ->
246
:-(
throw({error, tls_not_allowed_on_websockets}).
247
248 -spec compress(socket()) -> no_return().
249 compress(SocketData) ->
250
:-(
compress(SocketData, <<>>, 0).
251
252 -spec compress(socket(), _, _) -> no_return().
253 compress(_SocketData, _Data, _InflateSizeLimit) ->
254
:-(
throw({error, compression_not_allowed_on_websockets}).
255
256 -spec send_xml(socket(), mongoose_transport:send_xml_input()) -> ok.
257 send_xml(SocketData, {xmlstreamraw, Text}) ->
258
:-(
send(SocketData, Text);
259 send_xml(SocketData, {xmlstreamelement, XML}) ->
260 337 send_xml(SocketData, XML);
261 send_xml(#websocket{pid = Pid}, XML) ->
262 472 Pid ! {send_xml, XML},
263 472 ok.
264
265 send(#websocket{pid = Pid}, Data) ->
266 25 Pid ! {send, Data},
267 25 ok.
268
269 change_shaper(SocketData, _Shaper) ->
270 146 SocketData. %% TODO: we ignore shapers for now
271
272 -spec monitor(socket()) -> reference().
273 monitor(#websocket{pid = Pid}) ->
274 56 erlang:monitor(process, Pid).
275
276 get_sockmod(_SocketData) ->
277 264 ?MODULE.
278
279 close(#websocket{pid = Pid}) ->
280 56 Pid ! stop.
281
282 -spec peername(socket()) -> mongoose_transport:peername_return().
283 peername(#websocket{peername = PeerName}) ->
284 45 {ok, PeerName}.
285
286 get_peer_certificate(#websocket{peercert = undefined}) ->
287 42 no_peer_cert;
288 get_peer_certificate(#websocket{peercert = PeerCert}) ->
289 9 Decoded = public_key:pkix_decode_cert(PeerCert, plain),
290 9 {ok, Decoded}.
291
292 set_ping(#websocket{pid = Pid}, Value) ->
293
:-(
Pid ! {set_ping, Value}.
294
295 disable_ping(#websocket{pid = Pid}) ->
296
:-(
Pid ! disable_ping.
297
298 %%--------------------------------------------------------------------
299 %% Helpers for handling both
300 %% http://datatracker.ietf.org/doc/draft-ietf-xmpp-websocket
301 %% and older
302 %% http://tools.ietf.org/id/draft-moffitt-xmpp-over-websocket
303 %%--------------------------------------------------------------------
304
305 process_client_stream_start([#xmlstreamstart{ name = <<"stream", _/binary>>}
306 | _] = Elements, State) ->
307 11 {Elements, State#ws_state{ open_tag = stream }};
308 process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }], State) ->
309 90 Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}),
310 90 Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1],
311 90 NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 },
312 90 {[NewStart], State#ws_state{ open_tag = open }};
313 process_client_stream_start(Elements, State) ->
314 280 {Elements, State}.
315
316 process_client_stream_end(#xmlel{ name = <<"close">> }, #ws_state{ open_tag = open }) ->
317 45 #xmlstreamend{ name = <<"stream:stream">> };
318 process_client_stream_end(Element, _) ->
319 336 Element.
320
321 process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs },
322 #ws_state{ open_tag = open }) ->
323 90 Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs),
324 90 Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}),
325 90 #xmlel{ name = <<"open">>, attrs = Attrs2 };
326 process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> },
327 #ws_state{ open_tag = open }) ->
328 45 #xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] };
329 process_server_stream_root(Element, _) ->
330 337 Element.
331
332 replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element,
333 #ws_state{ open_tag = open }) ->
334 90 Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] };
335 replace_stream_ns(Element, #ws_state{ open_tag = open }) ->
336 738 case should_have_jabber_client(Element) of
337 true ->
338 378 JabberClient = {<<"xmlns">>, <<"jabber:client">>},
339 378 NewAtrrs = lists:keystore(<<"xmlns">>, 1,
340 Element#xmlel.attrs, JabberClient),
341 378 Element#xmlel{attrs = NewAtrrs};
342 false ->
343 360 Element
344 end;
345 replace_stream_ns(Element, _State) ->
346 25 Element.
347
348 get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) ->
349 56 [{max_child_size, 0} | get_parser_opts(Text)];
350 get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) ->
351
:-(
[{max_child_size, MaxStanzaSize} | get_parser_opts(Text)].
352
353 get_parser_opts(<<"<open", _/binary>>) ->
354 45 [{infinite_stream, true}, {autoreset, true}]; % new-type WS
355 get_parser_opts(_) ->
356 11 [{start_tag, <<"stream:stream">>}]. % old-type WS
357
358 %%--------------------------------------------------------------------
359 %% Helpers
360 %%--------------------------------------------------------------------
361 212 should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true;
362 71 should_have_jabber_client(#xmlel{name = <<"message">>}) -> true;
363 95 should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true;
364 360 should_have_jabber_client(_) -> false.
365
366 send_ping_request(PingRate) ->
367
:-(
Dest = self(),
368
:-(
?LOG_DEBUG(#{what => ws_schedule_ping,
369 text => <<"Sending websocket ping request">>,
370
:-(
ping_rate => PingRate}),
371
:-(
erlang:send_after(PingRate, Dest, do_ping).
372
373 maybe_send_ping_request(none) ->
374 56 ok;
375 maybe_send_ping_request(PingRate) ->
376
:-(
send_ping_request(PingRate).
377
378
379 %%--------------------------------------------------------------------
380 %% add_sec_websocket_protocol_header
381 %%--------------------------------------------------------------------
382
383 add_sec_websocket_protocol_header(Req) ->
384 56 case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
385 undefined ->
386 %% Server should not set "sec-websocket-protocol" response header
387 %% if it does not present in the request
388
:-(
Req;
389 Protocols ->
390 56 case case_insensitive_match(<<"xmpp">>, Protocols) of
391 {matched, MatchedProtocol} ->
392 56 cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req);
393 nomatch ->
394 %% Do not agree with client, do not add a response header
395
:-(
?LOG_DEBUG(#{what => ws_unknown_protocol,
396 text => <<"Header sec-websocket-protocol does not contain xmpp option">>,
397
:-(
protocols => Protocols}),
398
:-(
Req
399 end
400 end.
401
402 case_insensitive_match(LowerPattern, [Case | Cases]) ->
403 56 LowerCase = jid:str_tolower(Case),
404 56 case LowerCase of
405 LowerPattern ->
406 56 {matched, Case};
407 _ ->
408
:-(
case_insensitive_match(LowerPattern, Cases)
409 end;
410 case_insensitive_match(_, []) ->
411
:-(
nomatch.
Line Hits Source