./ct_report/coverage/mod_websockets.COVER.html

1 %%%===================================================================
2 %%% @copyright (C) 2016, Erlang Solutions Ltd.
3 %%% @doc Module providing support for websockets in MongooseIM
4 %%% @end
5 %%%===================================================================
6 -module(mod_websockets).
7
8 -behaviour(mongoose_http_handler).
9 -behaviour(cowboy_websocket).
10 -behaviour(mongoose_c2s_socket).
11
12 %% mongoose_http_handler callbacks
13 -export([config_spec/0]).
14
15 %% cowboy_http_websocket_handler callbacks
16 -export([init/2,
17 websocket_init/1,
18 websocket_handle/2,
19 websocket_info/2,
20 terminate/3]).
21
22 %% mongoose_c2s_socket callbacks
23 -export([socket_new/2,
24 socket_peername/1,
25 tcp_to_tls/2,
26 socket_handle_data/2,
27 socket_activate/1,
28 socket_close/1,
29 socket_send_xml/2,
30 get_peer_certificate/2,
31 has_peer_cert/2,
32 is_channel_binding_supported/1,
33 get_tls_last_message/1,
34 is_ssl/1]).
35
36 -include("mongoose.hrl").
37 -include("mongoose_config_spec.hrl").
38 -include("jlib.hrl").
39 -include_lib("exml/include/exml_stream.hrl").
40
41 -define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>).
42 -define(NS_COMPONENT, <<"jabber:component:accept">>).
43
44 -record(websocket, {
45 pid :: pid(),
46 peername :: mongoose_transport:peer(),
47 peercert :: undefined | binary()
48 }).
49 -record(ws_state, {
50 peer :: mongoose_transport:peer() | undefined,
51 fsm_pid :: pid() | undefined,
52 parser :: exml_stream:parser() | undefined,
53 opts :: map(),
54 ping_rate :: integer() | none,
55 max_stanza_size :: integer() | infinity,
56 peercert :: undefined | passed | binary()
57 %% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket()
58 }).
59
60 -type socket() :: #websocket{}.
61
62 %% mongoose_http_handler callbacks
63
64 -spec config_spec() -> mongoose_config_spec:config_section().
65 config_spec() ->
66 104 #section{items = #{<<"timeout">> => #option{type = int_or_infinity,
67 validate = non_negative},
68 <<"ping_rate">> => #option{type = integer,
69 validate = positive},
70 <<"max_stanza_size">> => #option{type = int_or_infinity,
71 validate = positive},
72 <<"c2s_state_timeout">> => #option{type = int_or_infinity,
73 validate = non_negative},
74 <<"backwards_compatible_session">> => #option{type = boolean}},
75 defaults = #{<<"timeout">> => 60000,
76 <<"max_stanza_size">> => infinity,
77 <<"c2s_state_timeout">> => 5000,
78 <<"backwards_compatible_session">> => true}
79 }.
80
81 %%--------------------------------------------------------------------
82 %% Common callbacks for all cowboy behaviours
83 %%--------------------------------------------------------------------
84
85 init(Req, Opts = #{timeout := Timeout}) ->
86 118 Peer = cowboy_req:peer(Req),
87 118 PeerCert = cowboy_req:cert(Req),
88 118 Req1 = add_sec_websocket_protocol_header(Req),
89 118 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>,
90 118 req => Req, opts => Opts}),
91 118 AllModOpts = Opts#{peer => Peer, peer_cert => PeerCert},
92 %% upgrade protocol
93 118 {cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}.
94
95 terminate(_Reason, _Req, #ws_state{fsm_pid = undefined} = State) ->
96
:-(
ok;
97 terminate(Reason, _Req, #ws_state{fsm_pid = FSM} = State) when Reason =:= normal;
98 Reason =:= stop;
99 Reason =:= timeout;
100 Reason =:= remote ->
101 40 FSM ! {websockets_closed, undefined},
102 40 ok;
103 terminate({remote, _, _}, _Req, #ws_state{fsm_pid = FSM} = State) ->
104 55 FSM ! {websockets_closed, undefined},
105 55 ok;
106 terminate(Reason, _Req, #ws_state{fsm_pid = FSM} = State) ->
107 23 FSM ! {websockets_error, undefined},
108 23 ok.
109
110 %%--------------------------------------------------------------------
111 %% cowboy_http_websocket_handler callbacks
112 %%--------------------------------------------------------------------
113
114 % Called for every new websocket connection.
115 websocket_init(Opts = #{peer := Peer, peer_cert := PeerCert, max_stanza_size := MaxStanzaSize}) ->
116 118 PingRate = maps:get(ping_rate, Opts, none),
117 118 maybe_send_ping_request(PingRate),
118 118 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>,
119 118 peer => Peer, opts => Opts}),
120 118 State = #ws_state{opts = Opts,
121 ping_rate = PingRate,
122 max_stanza_size = MaxStanzaSize,
123 peer = Peer,
124 peercert = PeerCert},
125 118 {ok, State}.
126
127 % Called when a text message arrives.
128 websocket_handle({text, Msg}, State) ->
129 859 ?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}),
130 859 handle_text(Msg, State);
131
132 websocket_handle({pong, Payload}, State) ->
133
:-(
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>,
134
:-(
msg => Payload, peer => State#ws_state.peer}),
135
:-(
{ok, State};
136
137 % With this callback we can handle other kind of
138 % messages, like binary.
139 websocket_handle(Any, State) ->
140
:-(
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>,
141
:-(
msg => Any, peer => State#ws_state.peer}),
142
:-(
{ok, State}.
143
144 % Other messages from the system are handled here.
145 websocket_info({send_xml, XML}, State) ->
146 1259 XML1 = process_server_stream_root(replace_stream_ns(XML)),
147 1259 Text = exml:to_iolist(XML1),
148 1259 mongoose_metrics:update(global, [data, xmpp, sent, c2s, websocket], iolist_size(Text)),
149 1259 ?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>,
150 1259 packet => Text, peer => State#ws_state.peer}),
151 1259 {reply, {text, Text}, State};
152 websocket_info(do_ping, State) ->
153 %% send ping frame to the client
154
:-(
send_ping_request(State#ws_state.ping_rate),
155
:-(
{reply, ping, State};
156 websocket_info(stop, #ws_state{parser = undefined} = State) ->
157
:-(
{stop, State};
158 websocket_info(stop, #ws_state{parser = Parser} = State) ->
159 40 exml_stream:free_parser(Parser),
160 40 {stop, State};
161 websocket_info(Info, State) ->
162
:-(
?LOG_DEBUG(#{what => unexpected_message, msg => Info}),
163
:-(
{ok, State}.
164
165 %%--------------------------------------------------------------------
166 %% Callbacks implementation
167 %%--------------------------------------------------------------------
168
169 handle_text(Text, #ws_state{ parser = undefined } = State) ->
170 118 ParserOpts = get_parser_opts(Text, State),
171 118 {ok, Parser} = exml_stream:new_parser(ParserOpts),
172 118 handle_text(Text, State#ws_state{ parser = Parser });
173 handle_text(Text, #ws_state{parser = Parser} = State) ->
174 859 case exml_stream:parse(Parser, Text) of
175 {ok, NewParser, Elements} ->
176 857 mongoose_metrics:update(global, [data, xmpp, received, c2s, websocket], byte_size(Text)),
177 857 State1 = State#ws_state{ parser = NewParser },
178 857 case maybe_start_fsm(Elements, State1) of
179 {ok, State2} ->
180 857 process_client_elements(Elements, State2);
181 {stop, _} = Shutdown ->
182
:-(
Shutdown
183 end;
184 {error, Reason} ->
185 2 process_parse_error(Reason, State)
186 end.
187
188 process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) ->
189 857 Elements1 = process_client_stream_start(Elements),
190 857 [send_to_fsm(FSM, process_client_stream_end(replace_stream_ns(Elem))) || Elem <- Elements1],
191 857 {ok, State}.
192
193 process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) ->
194
:-(
{stop, State};
195 process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) ->
196 2 send_to_fsm(FSM, #xmlstreamerror{name = iolist_to_binary(Reason)}),
197 2 {ok, State}.
198
199 send_to_fsm(FSM, Element) ->
200 859 FSM ! {tcp, undefined, Element},
201 859 ok.
202
203 maybe_start_fsm([#xmlel{ name = <<"open">> }],
204 #ws_state{fsm_pid = undefined,
205 opts = #{ip_tuple := IPTuple, port := Port,
206 c2s_state_timeout := StateTimeout,
207 backwards_compatible_session := BackwardsCompatible}} = State) ->
208 118 Opts = #{
209 access => all,
210 shaper => none,
211 max_stanza_size => 0,
212 xml_socket => true,
213 hibernate_after => 0,
214 c2s_state_timeout => StateTimeout,
215 backwards_compatible_session => BackwardsCompatible,
216 port => Port, ip_tuple => IPTuple, proto => tcp},
217 118 do_start_fsm(Opts, State);
218 maybe_start_fsm(_Els, #ws_state{fsm_pid = undefined} = State) ->
219
:-(
{stop, State};
220 maybe_start_fsm(_Els, State) ->
221 739 {ok, State}.
222
223 do_start_fsm(Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) ->
224 118 SocketData = #websocket{pid = self(),
225 peername = Peer,
226 peercert = PeerCert},
227 118 case call_fsm_start(SocketData, Opts) of
228 {ok, Pid} ->
229 118 ?LOG_DEBUG(#{what => ws_c2s_started, c2s_pid => Pid,
230 text => <<"WebSockets starts c2s process">>,
231 118 peer => State#ws_state.peer}),
232 118 NewState = State#ws_state{fsm_pid = Pid, peercert = passed},
233 118 {ok, NewState};
234 {error, Reason} ->
235
:-(
?LOG_WARNING(#{what => ws_c2s_start_failed, reason => Reason,
236 text => <<"WebSockets fails to start c2s process">>,
237
:-(
peer => State#ws_state.peer}),
238
:-(
{stop, State#ws_state{peercert = passed}}
239 end.
240
241 call_fsm_start(SocketData, #{hibernate_after := HibernateAfterTimeout} = Opts) ->
242 118 mongoose_c2s:start({?MODULE, SocketData, Opts},
243 [{hibernate_after, HibernateAfterTimeout}]).
244
245 %%--------------------------------------------------------------------
246 %% Helpers for handling
247 %% https://datatracker.ietf.org/doc/rfc7395/
248 %%--------------------------------------------------------------------
249
250 process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }]) ->
251 234 Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}),
252 234 Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1],
253 234 NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 },
254 234 [NewStart];
255 process_client_stream_start(Elements) ->
256 623 Elements.
257
258 process_client_stream_end(#xmlel{ name = <<"close">> }) ->
259 72 #xmlstreamend{ name = <<"stream:stream">> };
260 process_client_stream_end(Element) ->
261 785 Element.
262
263 process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs }) ->
264 234 Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs),
265 234 Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}),
266 234 #xmlel{ name = <<"open">>, attrs = Attrs2 };
267 process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> }) ->
268 81 #xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] };
269 process_server_stream_root(Element) ->
270 944 Element.
271
272 replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element) ->
273 242 Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] };
274 replace_stream_ns(Element) ->
275 1874 case should_have_jabber_client(Element) of
276 true ->
277 824 JabberClient = {<<"xmlns">>, <<"jabber:client">>},
278 824 NewAtrrs = lists:keystore(<<"xmlns">>, 1,
279 Element#xmlel.attrs, JabberClient),
280 824 Element#xmlel{attrs = NewAtrrs};
281 false ->
282 1050 Element
283 end.
284
285 get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) ->
286 108 [{max_element_size, 0} | get_parser_opts(Text)];
287 get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) ->
288 10 [{max_element_size, MaxStanzaSize} | get_parser_opts(Text)].
289
290 get_parser_opts(<<"<open", _/binary>>) ->
291 118 [{infinite_stream, true}]; % new-type WS
292 get_parser_opts(_) ->
293
:-(
[]. % old-type WS
294
295 %%--------------------------------------------------------------------
296 %% Helpers
297 %%--------------------------------------------------------------------
298 453 should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true;
299 162 should_have_jabber_client(#xmlel{name = <<"message">>}) -> true;
300 209 should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true;
301 1050 should_have_jabber_client(_) -> false.
302
303 send_ping_request(PingRate) ->
304
:-(
Dest = self(),
305
:-(
?LOG_DEBUG(#{what => ws_schedule_ping,
306 text => <<"Sending websocket ping request">>,
307
:-(
ping_rate => PingRate}),
308
:-(
erlang:send_after(PingRate, Dest, do_ping).
309
310 maybe_send_ping_request(none) ->
311 118 ok;
312 maybe_send_ping_request(PingRate) ->
313
:-(
send_ping_request(PingRate).
314
315
316 %%--------------------------------------------------------------------
317 %% add_sec_websocket_protocol_header
318 %%--------------------------------------------------------------------
319
320 add_sec_websocket_protocol_header(Req) ->
321 118 case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
322 undefined ->
323 %% Server should not set "sec-websocket-protocol" response header
324 %% if it does not present in the request
325
:-(
Req;
326 Protocols ->
327 118 case case_insensitive_match(<<"xmpp">>, Protocols) of
328 {matched, MatchedProtocol} ->
329 118 cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req);
330 nomatch ->
331 %% Do not agree with client, do not add a response header
332
:-(
?LOG_DEBUG(#{what => ws_unknown_protocol,
333 text => <<"Header sec-websocket-protocol does not contain xmpp option">>,
334
:-(
protocols => Protocols}),
335
:-(
Req
336 end
337 end.
338
339 case_insensitive_match(LowerPattern, [Case | Cases]) ->
340 118 LowerCase = jid:str_tolower(Case),
341 118 case LowerCase of
342 LowerPattern ->
343 118 {matched, Case};
344 _ ->
345
:-(
case_insensitive_match(LowerPattern, Cases)
346 end;
347 case_insensitive_match(_, []) ->
348
:-(
nomatch.
349
350 %% mongoose_c2s_socket callbacks
351
352 -spec socket_new(socket(), mongoose_listener:options()) -> socket().
353 socket_new(Socket, _LOpts) ->
354 118 Socket.
355
356 -spec socket_peername(socket()) -> {inet:ip_address(), inet:port_number()}.
357 socket_peername(#websocket{peername = PeerName}) ->
358 227 PeerName.
359
360 -spec tcp_to_tls(socket(), mongoose_listener:options()) ->
361 {ok, socket()} | {error, term()}.
362 tcp_to_tls(_Socket, _LOpts) ->
363
:-(
{error, tls_not_allowed_on_websockets}.
364
365 -spec socket_handle_data(socket(), {tcp | ssl, term(), term()}) ->
366 iodata() | {raw, [exml:element()]} | {error, term()}.
367 socket_handle_data(_Socket, {_Kind, _Term, Packet}) ->
368 859 {raw, [Packet]}.
369
370 -spec socket_activate(socket()) -> ok.
371 socket_activate(_Socket) ->
372 977 ok.
373
374 -spec socket_close(socket()) -> ok.
375 socket_close(#websocket{pid = Pid}) ->
376 118 Pid ! stop,
377 118 ok.
378
379 -spec socket_send_xml(socket(), iodata() | exml:element() | [exml:element()]) ->
380 ok | {error, term()}.
381 socket_send_xml(#websocket{pid = Pid}, XMLs) when is_list(XMLs) ->
382 1368 [Pid ! {send_xml, XML} || XML <- XMLs],
383 1368 ok;
384 socket_send_xml(#websocket{pid = Pid}, XML) ->
385
:-(
Pid ! {send_xml, XML},
386
:-(
ok.
387
388 -spec has_peer_cert(socket(), mongoose_listener:options()) -> boolean().
389 has_peer_cert(Socket, LOpts) ->
390 6 get_peer_certificate(Socket, LOpts) /= no_peer_cert.
391
392
393 -spec get_peer_certificate(socket(), mongoose_listener:options()) ->
394 mongoose_transport:peercert_return().
395 get_peer_certificate(#websocket{peercert = undefined}, _) ->
396
:-(
no_peer_cert;
397 get_peer_certificate(#websocket{peercert = PeerCert}, _) ->
398 9 Decoded = public_key:pkix_decode_cert(PeerCert, plain),
399 9 {ok, Decoded}.
400
401 -spec is_channel_binding_supported(socket()) -> boolean().
402 is_channel_binding_supported(_Socket) ->
403 228 false.
404
405 -spec get_tls_last_message(socket()) -> {ok, binary()} | {error, term()}.
406 get_tls_last_message(_Socket) ->
407
:-(
{error, tls_not_allowed_on_websockets}.
408
409 -spec is_ssl(socket()) -> boolean().
410 is_ssl(_Socket) ->
411 343 false.
Line Hits Source