./ct_report/coverage/mod_websockets.COVER.html

1 %%%===================================================================
2 %%% @copyright (C) 2016, Erlang Solutions Ltd.
3 %%% @doc Module providing support for websockets in MongooseIM
4 %%% @end
5 %%%===================================================================
6 -module(mod_websockets).
7
8 -behaviour(mongoose_http_handler).
9 -behaviour(cowboy_websocket).
10 -behaviour(mongoose_c2s_socket).
11
12 %% mongoose_http_handler callbacks
13 -export([config_spec/0]).
14
15 %% cowboy_http_websocket_handler callbacks
16 -export([init/2,
17 websocket_init/1,
18 websocket_handle/2,
19 websocket_info/2,
20 terminate/3]).
21
22 %% mongoose_c2s_socket callbacks
23 -export([socket_new/2,
24 socket_peername/1,
25 tcp_to_tls/2,
26 socket_handle_data/2,
27 socket_activate/1,
28 socket_close/1,
29 socket_send_xml/2,
30 get_peer_certificate/2,
31 has_peer_cert/2,
32 is_channel_binding_supported/1,
33 get_tls_last_message/1,
34 is_ssl/1]).
35
36 -include("mongoose.hrl").
37 -include("mongoose_config_spec.hrl").
38 -include("jlib.hrl").
39 -include_lib("exml/include/exml_stream.hrl").
40
41 -define(NS_FRAMING, <<"urn:ietf:params:xml:ns:xmpp-framing">>).
42 -define(NS_COMPONENT, <<"jabber:component:accept">>).
43
44 -record(websocket, {
45 pid :: pid(),
46 peername :: mongoose_transport:peer(),
47 peercert :: undefined | binary()
48 }).
49 -record(ws_state, {
50 peer :: mongoose_transport:peer() | undefined,
51 fsm_pid :: pid() | undefined,
52 parser :: exml_stream:parser() | undefined,
53 opts :: map(),
54 ping_rate :: integer() | none,
55 max_stanza_size :: integer() | infinity,
56 peercert :: undefined | passed | binary()
57 %% the passed value is used to clear the certificate from the handlers state after it's passed down to the socket()
58 }).
59
60 -type socket() :: #websocket{}.
61
62 %% mongoose_http_handler callbacks
63
64 -spec config_spec() -> mongoose_config_spec:config_section().
65 config_spec() ->
66 93 #section{items = #{<<"timeout">> => #option{type = int_or_infinity,
67 validate = non_negative},
68 <<"ping_rate">> => #option{type = integer,
69 validate = positive},
70 <<"max_stanza_size">> => #option{type = int_or_infinity,
71 validate = positive},
72 <<"c2s_state_timeout">> => #option{type = int_or_infinity,
73 validate = non_negative},
74 <<"backwards_compatible_session">> => #option{type = boolean}},
75 defaults = #{<<"timeout">> => 60000,
76 <<"max_stanza_size">> => infinity,
77 <<"c2s_state_timeout">> => 5000,
78 <<"backwards_compatible_session">> => true}
79 }.
80
81 %%--------------------------------------------------------------------
82 %% Common callbacks for all cowboy behaviours
83 %%--------------------------------------------------------------------
84
85 init(Req, Opts = #{timeout := Timeout}) ->
86 32 Peer = cowboy_req:peer(Req),
87 32 PeerCert = cowboy_req:cert(Req),
88 32 Req1 = add_sec_websocket_protocol_header(Req),
89 32 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets request">>,
90 32 req => Req, opts => Opts}),
91 32 AllModOpts = Opts#{peer => Peer, peer_cert => PeerCert},
92 %% upgrade protocol
93 32 {cowboy_websocket, Req1, AllModOpts, #{idle_timeout => Timeout}}.
94
95 terminate(_Reason, _Req, _State) ->
96 32 ok.
97
98 %%--------------------------------------------------------------------
99 %% cowboy_http_websocket_handler callbacks
100 %%--------------------------------------------------------------------
101
102 % Called for every new websocket connection.
103 websocket_init(Opts = #{peer := Peer, peer_cert := PeerCert, max_stanza_size := MaxStanzaSize}) ->
104 32 PingRate = maps:get(ping_rate, Opts, none),
105 32 maybe_send_ping_request(PingRate),
106 32 ?LOG_DEBUG(#{what => ws_init, text => <<"New websockets connection">>,
107 32 peer => Peer, opts => Opts}),
108 32 State = #ws_state{opts = Opts,
109 ping_rate = PingRate,
110 max_stanza_size = MaxStanzaSize,
111 peer = Peer,
112 peercert = PeerCert},
113 32 {ok, State}.
114
115 % Called when a text message arrives.
116 websocket_handle({text, Msg}, State) ->
117 251 ?LOG_DEBUG(#{what => ws_received, msg => Msg, peer => State#ws_state.peer}),
118 251 handle_text(Msg, State);
119
120 websocket_handle({pong, Payload}, State) ->
121
:-(
?LOG_DEBUG(#{what => ws_pong, text => <<"Received pong frame over WebSockets">>,
122
:-(
msg => Payload, peer => State#ws_state.peer}),
123
:-(
{ok, State};
124
125 % With this callback we can handle other kind of
126 % messages, like binary.
127 websocket_handle(Any, State) ->
128
:-(
?LOG_DEBUG(#{what => ws_received, text => <<"Received non-text over WebSockets">>,
129
:-(
msg => Any, peer => State#ws_state.peer}),
130
:-(
{ok, State}.
131
132 % Other messages from the system are handled here.
133 websocket_info({send_xml, XML}, State) ->
134 319 XML1 = process_server_stream_root(replace_stream_ns(XML)),
135 319 Text = exml:to_iolist(XML1),
136 319 mongoose_metrics:update(global, [data, xmpp, sent, c2s, websocket], iolist_size(Text)),
137 319 ?LOG_DEBUG(#{what => ws_send, text => <<"Sending xml over WebSockets">>,
138 319 packet => Text, peer => State#ws_state.peer}),
139 319 {reply, {text, Text}, State};
140 websocket_info(do_ping, State) ->
141 %% send ping frame to the client
142
:-(
send_ping_request(State#ws_state.ping_rate),
143
:-(
{reply, ping, State};
144 websocket_info(stop, #ws_state{parser = undefined} = State) ->
145
:-(
{stop, State};
146 websocket_info(stop, #ws_state{parser = Parser} = State) ->
147 23 exml_stream:free_parser(Parser),
148 23 {stop, State};
149 websocket_info(Info, State) ->
150
:-(
?LOG_DEBUG(#{what => unexpected_message, msg => Info}),
151
:-(
{ok, State}.
152
153 %%--------------------------------------------------------------------
154 %% Callbacks implementation
155 %%--------------------------------------------------------------------
156
157 handle_text(Text, #ws_state{ parser = undefined } = State) ->
158 32 ParserOpts = get_parser_opts(Text, State),
159 32 {ok, Parser} = exml_stream:new_parser(ParserOpts),
160 32 handle_text(Text, State#ws_state{ parser = Parser });
161 handle_text(Text, #ws_state{parser = Parser} = State) ->
162 251 case exml_stream:parse(Parser, Text) of
163 {ok, NewParser, Elements} ->
164 249 mongoose_metrics:update(global, [data, xmpp, received, c2s, websocket], byte_size(Text)),
165 249 State1 = State#ws_state{ parser = NewParser },
166 249 case maybe_start_fsm(Elements, State1) of
167 {ok, State2} ->
168 249 process_client_elements(Elements, State2);
169 {stop, _} = Shutdown ->
170
:-(
Shutdown
171 end;
172 {error, Reason} ->
173 2 process_parse_error(Reason, State)
174 end.
175
176 process_client_elements(Elements, #ws_state{fsm_pid = FSM} = State) ->
177 249 Elements1 = process_client_stream_start(Elements),
178 249 [send_to_fsm(FSM, process_client_stream_end(replace_stream_ns(Elem))) || Elem <- Elements1],
179 249 {ok, State}.
180
181 process_parse_error(_Reason, #ws_state{fsm_pid = undefined} = State) ->
182
:-(
{stop, State};
183 process_parse_error(Reason, #ws_state{fsm_pid = FSM} = State) ->
184 2 send_to_fsm(FSM, #xmlstreamerror{name = iolist_to_binary(Reason)}),
185 2 {ok, State}.
186
187 send_to_fsm(FSM, Element) ->
188 251 FSM ! {tcp, undefined, Element},
189 251 ok.
190
191 maybe_start_fsm([#xmlel{ name = <<"open">> }],
192 #ws_state{fsm_pid = undefined,
193 opts = #{ip_tuple := IPTuple, port := Port,
194 c2s_state_timeout := StateTimeout,
195 backwards_compatible_session := BackwardsCompatible}} = State) ->
196 32 Opts = #{
197 access => all,
198 shaper => none,
199 max_stanza_size => 0,
200 xml_socket => true,
201 hibernate_after => 0,
202 c2s_state_timeout => StateTimeout,
203 backwards_compatible_session => BackwardsCompatible,
204 port => Port, ip_tuple => IPTuple, proto => tcp},
205 32 do_start_fsm(Opts, State);
206 maybe_start_fsm(_Els, #ws_state{fsm_pid = undefined} = State) ->
207
:-(
{stop, State};
208 maybe_start_fsm(_Els, State) ->
209 217 {ok, State}.
210
211 do_start_fsm(Opts, State = #ws_state{peer = Peer, peercert = PeerCert}) ->
212 32 SocketData = #websocket{pid = self(),
213 peername = Peer,
214 peercert = PeerCert},
215 32 case call_fsm_start(SocketData, Opts) of
216 {ok, Pid} ->
217 32 ?LOG_DEBUG(#{what => ws_c2s_started, c2s_pid => Pid,
218 text => <<"WebSockets starts c2s process">>,
219 32 peer => State#ws_state.peer}),
220 32 NewState = State#ws_state{fsm_pid = Pid, peercert = passed},
221 32 {ok, NewState};
222 {error, Reason} ->
223
:-(
?LOG_WARNING(#{what => ws_c2s_start_failed, reason => Reason,
224 text => <<"WebSockets fails to start c2s process">>,
225
:-(
peer => State#ws_state.peer}),
226
:-(
{stop, State#ws_state{peercert = passed}}
227 end.
228
229 call_fsm_start(SocketData, #{hibernate_after := HibernateAfterTimeout} = Opts) ->
230 32 mongoose_c2s:start({?MODULE, SocketData, Opts},
231 [{hibernate_after, HibernateAfterTimeout}]).
232
233 %%--------------------------------------------------------------------
234 %% Helpers for handling
235 %% https://datatracker.ietf.org/doc/rfc7395/
236 %%--------------------------------------------------------------------
237
238 process_client_stream_start([#xmlel{ name = <<"open">>, attrs = Attrs }]) ->
239 64 Attrs1 = lists:keyreplace(<<"xmlns">>, 1, Attrs, {<<"xmlns">>, ?NS_CLIENT}),
240 64 Attrs2 = [{<<"xmlns:stream">>, ?NS_STREAM} | Attrs1],
241 64 NewStart = #xmlstreamstart{ name = <<"stream:stream">>, attrs = Attrs2 },
242 64 [NewStart];
243 process_client_stream_start(Elements) ->
244 185 Elements.
245
246 process_client_stream_end(#xmlel{ name = <<"close">> }) ->
247 30 #xmlstreamend{ name = <<"stream:stream">> };
248 process_client_stream_end(Element) ->
249 219 Element.
250
251 process_server_stream_root(#xmlstreamstart{ name = <<"stream", _/binary>>, attrs = Attrs }) ->
252 64 Attrs1 = lists:keydelete(<<"xmlns:stream">>, 1, Attrs),
253 64 Attrs2 = lists:keyreplace(<<"xmlns">>, 1, Attrs1, {<<"xmlns">>, ?NS_FRAMING}),
254 64 #xmlel{ name = <<"open">>, attrs = Attrs2 };
255 process_server_stream_root(#xmlstreamend{ name = <<"stream", _/binary>> }) ->
256 32 #xmlel{ name = <<"close">>, attrs = [{<<"xmlns">>, ?NS_FRAMING}] };
257 process_server_stream_root(Element) ->
258 223 Element.
259
260 replace_stream_ns(#xmlel{ name = <<"stream:", ElementName/binary>> } = Element) ->
261 66 Element#xmlel{ name = ElementName, attrs = [{<<"xmlns">>, ?NS_STREAM} | Element#xmlel.attrs] };
262 replace_stream_ns(Element) ->
263 502 case should_have_jabber_client(Element) of
264 true ->
265 248 JabberClient = {<<"xmlns">>, <<"jabber:client">>},
266 248 NewAtrrs = lists:keystore(<<"xmlns">>, 1,
267 Element#xmlel.attrs, JabberClient),
268 248 Element#xmlel{attrs = NewAtrrs};
269 false ->
270 254 Element
271 end.
272
273 get_parser_opts(Text, #ws_state{ max_stanza_size = infinity }) ->
274 22 [{max_element_size, 0} | get_parser_opts(Text)];
275 get_parser_opts(Text, #ws_state{ max_stanza_size = MaxStanzaSize }) ->
276 10 [{max_element_size, MaxStanzaSize} | get_parser_opts(Text)].
277
278 get_parser_opts(<<"<open", _/binary>>) ->
279 32 [{infinite_stream, true}]; % new-type WS
280 get_parser_opts(_) ->
281
:-(
[]. % old-type WS
282
283 %%--------------------------------------------------------------------
284 %% Helpers
285 %%--------------------------------------------------------------------
286 128 should_have_jabber_client(#xmlel{name = <<"iq">>}) -> true;
287 60 should_have_jabber_client(#xmlel{name = <<"message">>}) -> true;
288 60 should_have_jabber_client(#xmlel{name = <<"presence">>}) -> true;
289 254 should_have_jabber_client(_) -> false.
290
291 send_ping_request(PingRate) ->
292
:-(
Dest = self(),
293
:-(
?LOG_DEBUG(#{what => ws_schedule_ping,
294 text => <<"Sending websocket ping request">>,
295
:-(
ping_rate => PingRate}),
296
:-(
erlang:send_after(PingRate, Dest, do_ping).
297
298 maybe_send_ping_request(none) ->
299 32 ok;
300 maybe_send_ping_request(PingRate) ->
301
:-(
send_ping_request(PingRate).
302
303
304 %%--------------------------------------------------------------------
305 %% add_sec_websocket_protocol_header
306 %%--------------------------------------------------------------------
307
308 add_sec_websocket_protocol_header(Req) ->
309 32 case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
310 undefined ->
311 %% Server should not set "sec-websocket-protocol" response header
312 %% if it does not present in the request
313
:-(
Req;
314 Protocols ->
315 32 case case_insensitive_match(<<"xmpp">>, Protocols) of
316 {matched, MatchedProtocol} ->
317 32 cowboy_req:set_resp_header(<<"Sec-WebSocket-Protocol">>, MatchedProtocol, Req);
318 nomatch ->
319 %% Do not agree with client, do not add a response header
320
:-(
?LOG_DEBUG(#{what => ws_unknown_protocol,
321 text => <<"Header sec-websocket-protocol does not contain xmpp option">>,
322
:-(
protocols => Protocols}),
323
:-(
Req
324 end
325 end.
326
327 case_insensitive_match(LowerPattern, [Case | Cases]) ->
328 32 LowerCase = jid:str_tolower(Case),
329 32 case LowerCase of
330 LowerPattern ->
331 32 {matched, Case};
332 _ ->
333
:-(
case_insensitive_match(LowerPattern, Cases)
334 end;
335 case_insensitive_match(_, []) ->
336
:-(
nomatch.
337
338 %% mongoose_c2s_socket callbacks
339
340 -spec socket_new(socket(), mongoose_listener:options()) -> socket().
341 socket_new(Socket, _LOpts) ->
342 32 Socket.
343
344 -spec socket_peername(socket()) -> {inet:ip_address(), inet:port_number()}.
345 socket_peername(#websocket{peername = PeerName}) ->
346 64 PeerName.
347
348 -spec tcp_to_tls(socket(), mongoose_listener:options()) ->
349 {ok, socket()} | {error, term()}.
350 tcp_to_tls(_Socket, _LOpts) ->
351
:-(
{error, tls_not_allowed_on_websockets}.
352
353 -spec socket_handle_data(socket(), {tcp | ssl, term(), term()}) ->
354 iodata() | {raw, [exml:element()]} | {error, term()}.
355 socket_handle_data(_Socket, {_Kind, _Term, Packet}) ->
356 251 {raw, [Packet]}.
357
358 -spec socket_activate(socket()) -> ok.
359 socket_activate(_Socket) ->
360 283 ok.
361
362 -spec socket_close(socket()) -> ok.
363 socket_close(#websocket{pid = Pid}) ->
364 32 Pid ! stop,
365 32 ok.
366
367 -spec socket_send_xml(socket(), iodata() | exml:element() | [exml:element()]) ->
368 ok | {error, term()}.
369 socket_send_xml(#websocket{pid = Pid}, XMLs) when is_list(XMLs) ->
370 319 [Pid ! {send_xml, XML} || XML <- XMLs],
371 319 ok;
372 socket_send_xml(#websocket{pid = Pid}, XML) ->
373
:-(
Pid ! {send_xml, XML},
374
:-(
ok.
375
376 -spec has_peer_cert(socket(), mongoose_listener:options()) -> boolean().
377 has_peer_cert(Socket, LOpts) ->
378 6 get_peer_certificate(Socket, LOpts) /= no_peer_cert.
379
380
381 -spec get_peer_certificate(socket(), mongoose_listener:options()) ->
382 mongoose_transport:peercert_return().
383 get_peer_certificate(#websocket{peercert = undefined}, _) ->
384
:-(
no_peer_cert;
385 get_peer_certificate(#websocket{peercert = PeerCert}, _) ->
386 9 Decoded = public_key:pkix_decode_cert(PeerCert, plain),
387 9 {ok, Decoded}.
388
389 -spec is_channel_binding_supported(socket()) -> boolean().
390 is_channel_binding_supported(_Socket) ->
391 58 false.
392
393 -spec get_tls_last_message(socket()) -> {ok, binary()} | {error, term()}.
394 get_tls_last_message(_Socket) ->
395
:-(
{error, tls_not_allowed_on_websockets}.
396
397 -spec is_ssl(socket()) -> boolean().
398 is_ssl(_Socket) ->
399 96 false.
Line Hits Source