From bbfc1569ccffab060c4c2b402a45119fb1f57495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 4 Apr 2018 17:23:37 +0200 Subject: Add initial implementation of Websocket over HTTP/2 Using the current draft: https://tools.ietf.org/html/draft-ietf-httpbis-h2-websockets-01 --- doc/src/manual/cowboy_http2.asciidoc | 19 +- src/cowboy_http2.erl | 57 +++-- src/cowboy_req.erl | 1 + src/cowboy_stream_h.erl | 31 ++- src/cowboy_websocket.erl | 188 +++++++++++----- test/draft_h2_websockets_SUITE.erl | 405 +++++++++++++++++++++++++++++++++++ 6 files changed, 622 insertions(+), 79 deletions(-) create mode 100644 test/draft_h2_websockets_SUITE.erl diff --git a/doc/src/manual/cowboy_http2.asciidoc b/doc/src/manual/cowboy_http2.asciidoc index 517879f..793c6a1 100644 --- a/doc/src/manual/cowboy_http2.asciidoc +++ b/doc/src/manual/cowboy_http2.asciidoc @@ -17,13 +17,14 @@ as a Ranch protocol. [source,erlang] ---- opts() :: #{ - connection_type => worker | supervisor, - env => cowboy_middleware:env(), - inactivity_timeout => timeout(), - middlewares => [module()], - preface_timeout => timeout(), - shutdown_timeout => timeout(), - stream_handlers => [module()] + connection_type => worker | supervisor, + enable_connect_protocol => boolean(), + env => cowboy_middleware:env(), + inactivity_timeout => timeout(), + middlewares => [module()], + preface_timeout => timeout(), + shutdown_timeout => timeout(), + stream_handlers => [module()] } ---- @@ -41,6 +42,10 @@ The default value is given next to the option name: connection_type (supervisor):: Whether the connection process also acts as a supervisor. +enable_connect_protocol (false):: + Whether to enable the extended CONNECT method to allow + protocols like Websocket to be used over an HTTP/2 stream. + env (#{}):: Middleware environment. diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index caa1f07..7f5d738 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -165,9 +165,10 @@ init(Parent, Ref, Socket, Transport, Opts) -> {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, binary() | undefined, binary()) -> ok. init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer) -> - State = #state{parent=Parent, ref=Ref, socket=Socket, + State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, opts=Opts, peer=Peer, sock=Sock, cert=Cert, parse_state={preface, sequence, preface_timeout(Opts)}}, + State = settings_init(State0, Opts), preface(State), case Buffer of <<>> -> before_loop(State, Buffer); @@ -188,16 +189,21 @@ init(Parent, Ref, Socket, Transport, Opts, Peer, Sock, Cert, Buffer, _Settings, State1 = stream_handler_init(State0, 1, fin, upgrade, Req), %% We assume that the upgrade will be applied. A stream handler %% must not prevent the normal operations of the server. - State = info(State1, 1, {switch_protocol, #{ + State2 = info(State1, 1, {switch_protocol, #{ <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? + State = settings_init(State2, Opts), preface(State), case Buffer of <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. +settings_init(State=#state{next_settings=Settings}, Opts) -> + EnableConnectProtocol = maps:get(enable_connect_protocol, Opts, false), + State#state{next_settings=Settings#{enable_connect_protocol => EnableConnectProtocol}}. + preface(#state{socket=Socket, transport=Transport, next_settings=Settings}) -> %% We send next_settings and use defaults until we get a ack. Transport:send(Socket, cow_http2:settings(Settings)). @@ -413,9 +419,9 @@ frame(State0=#state{socket=Socket, transport=Transport, remote_settings=Settings State end; %% Ack for a previously sent SETTINGS frame. -frame(State=#state{next_settings=_NextSettings}, settings_ack) -> - %% @todo Apply SETTINGS that require synchronization. - State; +frame(State=#state{local_settings=Local0, next_settings=Next}, settings_ack) -> + Local = maps:merge(Local0, Next), + State#state{local_settings=Local, next_settings=#{}}; %% Unexpected PUSH_PROMISE frame. frame(State, {push_promise, _, _, _, _}) -> terminate(State, {connection_error, protocol_error, @@ -637,9 +643,11 @@ commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{local=upgrade}, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), commands(State, Stream#stream{local=idle}, Tail); -%% HTTP/2 has no support for the Upgrade mechanism. -commands(State, Stream, [{switch_protocol, _Headers, _Mod, _ModState}|Tail]) -> - %% @todo This is an error. Not sure what to do here yet. +%% Use a different protocol within the stream (CONNECT :protocol). +%% @todo Make sure we error out when the feature is disabled. +commands(State0, #stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> + State = #state{streams=Streams} = info(State0, StreamID, {headers, 200, Headers}), + Stream = lists:keyfind(StreamID, #stream.id, Streams), commands(State, Stream, Tail); commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) -> %% @todo Do we want to run the commands after a stop? @@ -840,8 +848,22 @@ stream_decode_init(State=#state{decode_state=DecodeState0}, StreamID, IsFin, Hea 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end. -stream_pseudo_headers_init(State, StreamID, IsFin, Headers0) -> +stream_pseudo_headers_init(State=#state{local_settings=LocalSettings}, + StreamID, IsFin, Headers0) -> + IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false), case pseudo_headers(Headers0, #{}) of + {ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _, + authority := _, path := _, protocol := _}, Headers} + when IsExtendedConnectEnabled -> + stream_regular_headers_init(State, StreamID, IsFin, Headers, PseudoHeaders); + {ok, #{method := <<"CONNECT">>, scheme := _, + authority := _, path := _}, _} + when IsExtendedConnectEnabled -> + stream_malformed(State, StreamID, + 'The :protocol pseudo-header MUST be sent with an extended CONNECT. (draft_h2_websockets 4)'); + {ok, #{protocol := _}, _} -> + stream_malformed(State, StreamID, + 'The :protocol pseudo-header is only defined for the extended CONNECT. (draft_h2_websockets 4)'); %% @todo Add clause for CONNECT requests (no scheme/path). {ok, PseudoHeaders=#{method := <<"CONNECT">>}, _} -> stream_early_error(State, StreamID, IsFin, 501, PseudoHeaders, @@ -869,13 +891,15 @@ pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) -> pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) -> {error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'}; pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) -> - %% @todo Probably parse the authority here. pseudo_headers(Tail, PseudoHeaders#{authority => Authority}); pseudo_headers([{<<":path">>, _}|_], #{path := _}) -> {error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'}; pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) -> - %% @todo Probably parse the path here. pseudo_headers(Tail, PseudoHeaders#{path => Path}); +pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) -> + {error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'}; +pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) -> + pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol}); pseudo_headers([{<<":", _/bits>>, _}|_], _) -> {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; pseudo_headers(Headers, PseudoHeaders) -> @@ -946,7 +970,7 @@ stream_req_init(State, StreamID, IsFin, Headers, PseudoHeaders) -> end. stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert}, - StreamID, IsFin, Headers, #{method := Method, scheme := Scheme, + StreamID, IsFin, Headers, PseudoHeaders=#{method := Method, scheme := Scheme, authority := Authority, path := PathWithQs}, BodyLength) -> try cow_http_hd:parse_host(Authority) of {Host, Port} -> @@ -955,7 +979,7 @@ stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert}, stream_malformed(State, StreamID, 'The path component must not be empty. (RFC7540 8.1.2.3)'); {Path, Qs} -> - Req = #{ + Req0 = #{ ref => Ref, pid => self(), streamid => StreamID, @@ -973,6 +997,13 @@ stream_req_init(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert}, has_body => IsFin =:= nofin, body_length => BodyLength }, + %% We add the protocol information for extended CONNECTs. + Req = case PseudoHeaders of + #{protocol := Protocol} -> + Req0#{protocol => Protocol}; + _ -> + Req0 + end, stream_handler_init(State, StreamID, IsFin, idle, Req) catch _:_ -> stream_malformed(State, StreamID, diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index f5ea458..7b5cc5b 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -413,6 +413,7 @@ parse_header_fun(<<"if-unmodified-since">>) -> fun cow_http_hd:parse_if_unmodifi parse_header_fun(<<"range">>) -> fun cow_http_hd:parse_range/1; parse_header_fun(<<"sec-websocket-extensions">>) -> fun cow_http_hd:parse_sec_websocket_extensions/1; parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_websocket_protocol_req/1; +parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1; parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1; parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1. diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 93b8417..620975c 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -33,7 +33,7 @@ expect = undefined :: undefined | continue, read_body_ref = undefined :: reference() | undefined, read_body_timer_ref = undefined :: reference() | undefined, - read_body_length = 0 :: non_neg_integer() | infinity, + read_body_length = 0 :: non_neg_integer() | infinity | auto, read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, read_body_buffer = <<>> :: binary(), body_length = 0 :: non_neg_integer() @@ -65,8 +65,9 @@ expect(Req) -> end. %% If we receive data and stream is waiting for data: -%% If we accumulated enough data or IsFin=fin, send it. -%% If not, buffer it. +%% If we accumulated enough data or IsFin=fin, send it. +%% If we are in auto mode, send it and update flow control. +%% If not, buffer it. %% If not, buffer it. %% %% We always reset the expect field when we receive data, @@ -75,6 +76,7 @@ expect(Req) -> -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) -> {cowboy_stream:commands(), State} when State::#state{}. +%% Stream isn't waiting for data. data(_StreamID, IsFin, Data, State=#state{ read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> {[], State#state{ @@ -82,6 +84,16 @@ data(_StreamID, IsFin, Data, State=#state{ read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>, body_length=BodyLen + byte_size(Data)}}; +%% Stream is waiting for data using auto mode. +%% +%% There is no buffering done in auto mode. +data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, + read_body_length=auto, body_length=BodyLen}) -> + send_request_body(Pid, Ref, IsFin, BodyLen, Data), + {[{flow, byte_size(Data)}], State#state{ + read_body_ref=undefined, + body_length=BodyLen}}; +%% Stream is waiting for data but we didn't receive enough to send yet. data(_StreamID, nofin, Data, State=#state{ read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) when byte_size(Data) + byte_size(Buffer) < ReadLen -> @@ -89,9 +101,11 @@ data(_StreamID, nofin, Data, State=#state{ expect=undefined, read_body_buffer= << Buffer/binary, Data/binary >>, body_length=BodyLen + byte_size(Data)}}; +%% Stream is waiting for data and we received enough to send. data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> BodyLen = BodyLen0 + byte_size(Data), + %% @todo Handle the infinity case where no TRef was defined. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), send_request_body(Pid, Ref, IsFin, BodyLen, <>), {[], State#state{ @@ -121,6 +135,16 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} ], State}; +%% Request body, auto mode, no body buffered. +info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> + {[], State#state{ + read_body_ref=Ref, + read_body_length=auto}}; +%% Request body, auto mode, body buffered or complete. +info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{pid=Pid, + read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> + send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), + {[{flow, byte_size(Buffer)}], State#state{read_body_buffer= <<>>}}; %% Request body, body buffered large enough or complete. %% %% We do not send a 100 continue response if the client @@ -136,6 +160,7 @@ info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) -> continue -> [{inform, 100, #{}}, {flow, Length}]; undefined -> [{flow, Length}] end, + %% @todo Handle the case where Period =:= infinity. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), {Commands, State#state{ read_body_ref=Ref, diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index df2e1a5..992af52 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -17,6 +17,7 @@ -module(cowboy_websocket). -behaviour(cowboy_sub_protocol). +-export([is_upgrade_request/1]). -export([upgrade/4]). -export([upgrade/5]). -export([takeover/7]). @@ -82,6 +83,25 @@ req = #{} :: map() }). +%% Because the HTTP/1.1 and HTTP/2 handshakes are so different, +%% this function is necessary to figure out whether a request +%% is trying to upgrade to the Websocket protocol. + +-spec is_upgrade_request(cowboy_req:req()) -> boolean(). +is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) -> + <<"websocket">> =:= cowboy_bstr:to_lower(Protocol); +is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) -> + ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []), + case lists:member(<<"upgrade">>, ConnTokens) of + false -> + false; + true -> + UpgradeTokens = cowboy_req:parse_header(<<"upgrade">>, Req), + lists:member(<<"websocket">>, UpgradeTokens) + end; +is_upgrade_request(_) -> + false. + %% Stream process. -spec upgrade(Req, Env, module(), any()) @@ -94,8 +114,7 @@ upgrade(Req, Env, Handler, HandlerState) -> -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). %% @todo Immediately crash if a response has already been sent. -%% @todo Error out if HTTP/2. -upgrade(Req0, Env, Handler, HandlerState, Opts) -> +upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> Timeout = maps:get(idle_timeout, Opts, 60000), MaxFrameSize = maps:get(max_frame_size, Opts, infinity), Compress = maps:get(compress, Opts, false), @@ -108,11 +127,15 @@ upgrade(Req0, Env, Handler, HandlerState, Opts) -> try websocket_upgrade(State0, Req0) of {ok, State, Req} -> websocket_handshake(State, Req, HandlerState, Env); - {error, upgrade_required} -> + %% The status code 426 is specific to HTTP/1.1 connections. + {error, upgrade_required} when Version =:= 'HTTP/1.1' -> {ok, cowboy_req:reply(426, #{ <<"connection">> => <<"upgrade">>, <<"upgrade">> => <<"websocket">> - }, Req0), Env} + }, Req0), Env}; + %% Use a generic 400 error for HTTP/2. + {error, upgrade_required} -> + {ok, cowboy_req:reply(400, Req0), Env} catch _:_ -> %% @todo Probably log something here? %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection. @@ -120,27 +143,27 @@ upgrade(Req0, Env, Handler, HandlerState, Opts) -> {ok, cowboy_req:reply(400, Req0), Env} end. -websocket_upgrade(State, Req) -> - ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []), - case lists:member(<<"upgrade">>, ConnTokens) of +websocket_upgrade(State, Req=#{version := Version}) -> + case is_upgrade_request(Req) of false -> {error, upgrade_required}; + true when Version =:= 'HTTP/1.1' -> + Key = cowboy_req:header(<<"sec-websocket-key">>, Req), + false = Key =:= undefined, + websocket_version(State#state{key=Key}, Req); true -> - UpgradeTokens = cowboy_req:parse_header(<<"upgrade">>, Req, []), - case lists:member(<<"websocket">>, UpgradeTokens) of - false -> - {error, upgrade_required}; - true -> - Version = cowboy_req:header(<<"sec-websocket-version">>, Req), - IntVersion = binary_to_integer(Version), - true = (IntVersion =:= 7) orelse (IntVersion =:= 8) - orelse (IntVersion =:= 13), - Key = cowboy_req:header(<<"sec-websocket-key">>, Req), - false = Key =:= undefined, - websocket_extensions(State#state{key=Key}, Req#{websocket_version => IntVersion}) - end + websocket_version(State, Req) end. +websocket_version(State, Req) -> + WsVersion = cowboy_req:parse_header(<<"sec-websocket-version">>, Req), + case WsVersion of + 7 -> ok; + 8 -> ok; + 13 -> ok + end, + websocket_extensions(State, Req#{websocket_version => WsVersion}). + websocket_extensions(State=#state{compress=Compress}, Req) -> %% @todo We want different options for this. For example %% * compress everything auto @@ -159,11 +182,16 @@ websocket_extensions(State, Req, [], []) -> {ok, State, Req}; websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) -> {ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)}; -websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid}, +%% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner. +websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version}, [{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> %% @todo Make deflate options configurable. - Opts = #{level => best_compression, mem_level => 8, strategy => default}, - try cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts#{owner => Pid}) of + Opts0 = #{level => best_compression, mem_level => 8, strategy => default}, + Opts = case Version of + 'HTTP/1.1' -> Opts0#{owner => Pid}; + _ -> Opts0 + end, + try cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of {ok, RespExt, Extensions2} -> websocket_extensions(State#state{extensions=Extensions2}, Req, Tail, [<<", ">>, RespExt|RespHeader]); @@ -172,11 +200,15 @@ websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid}, catch exit:{error, incompatible_zlib_version, _} -> websocket_extensions(State, Req, Tail, RespHeader) end; -websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid}, +websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version}, [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> %% @todo Make deflate options configurable. - Opts = #{level => best_compression, mem_level => 8, strategy => default}, - try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts#{owner => Pid}) of + Opts0 = #{level => best_compression, mem_level => 8, strategy => default}, + Opts = case Version of + 'HTTP/1.1' -> Opts0#{owner => Pid}; + _ -> Opts0 + end, + try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts) of {ok, RespExt, Extensions2} -> websocket_extensions(State#state{extensions=Extensions2}, Req, Tail, [<<", ">>, RespExt|RespHeader]); @@ -192,7 +224,8 @@ websocket_extensions(State, Req, [_|Tail], RespHeader) -> -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). websocket_handshake(State=#state{key=Key}, - Req=#{pid := Pid, streamid := StreamID}, HandlerState, Env) -> + Req=#{version := 'HTTP/1.1', pid := Pid, streamid := StreamID}, + HandlerState, Env) -> Challenge = base64:encode(crypto:hash(sha, << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)), %% @todo We don't want date and server headers. @@ -202,7 +235,17 @@ websocket_handshake(State=#state{key=Key}, <<"sec-websocket-accept">> => Challenge }, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - {ok, Req, Env}. + {ok, Req, Env}; +%% For HTTP/2 we do not let the process die, we instead keep it +%% for the Websocket stream. This is because in HTTP/2 we only +%% have a stream, it doesn't take over the whole connection. +websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, + HandlerState, _Env) -> + %% @todo We don't want date and server headers. + Headers = cowboy_req:response_headers(#{}, Req), + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, + takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + {State, HandlerState}). %% Connection process. @@ -223,21 +266,34 @@ websocket_handshake(State=#state{key=Key}, -type parse_state() :: #ps_header{} | #ps_payload{}. --spec takeover(pid(), ranch:ref(), inet:socket(), module(), any(), binary(), +-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, + module() | undefined, any(), binary(), {#state{}, any()}) -> no_return(). takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, {State0=#state{handler=Handler}, HandlerState}) -> %% @todo We should have an option to disable this behavior. ranch:remove_connection(Ref), + Messages = case Transport of + undefined -> undefined; + _ -> Transport:messages() + end, State = loop_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Transport:messages()}), + key=undefined, messages=Messages}), case erlang:function_exported(Handler, websocket_init, 1) of true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer}, websocket_init, undefined, fun before_loop/3); false -> before_loop(State, HandlerState, #ps_header{buffer=Buffer}) end. +%% @todo We probably shouldn't do the setopts if we have not received a socket message. +%% @todo We need to hibernate when HTTP/2 is used too. +before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined}, + HandlerState, ParseState) -> + %% @todo Keep Ref around. + ReadBodyRef = make_ref(), + Pid ! {Stream, {read_body, ReadBodyRef, auto, infinity}}, + loop(State, HandlerState, ParseState); before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true}, HandlerState, ParseState) -> Transport:setopts(Socket, [{active, once}]), @@ -258,19 +314,32 @@ loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) -> State#state{timeout_ref=TRef}. -spec loop(#state{}, any(), parse_state()) -> no_return(). -loop(State=#state{parent=Parent, socket=Socket, messages={OK, Closed, Error}, +loop(State=#state{parent=Parent, socket=Socket, messages=Messages, timeout_ref=TRef}, HandlerState, ParseState) -> receive - {OK, Socket, Data} -> + %% Socket messages. (HTTP/1.1) + {OK, Socket, Data} when OK =:= element(1, Messages) -> State2 = loop_timeout(State), parse(State2, HandlerState, ParseState, Data); - {Closed, Socket} -> + {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); - {Error, Socket, Reason} -> + {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, HandlerState, {error, Reason}); + %% Body reading messages. (HTTP/2) + {request_body, _Ref, nofin, Data} -> + State2 = loop_timeout(State), + parse(State2, HandlerState, ParseState, Data); + %% @todo We need to handle this case as if it was an {error, closed} + %% but not before we finish processing frames. We probably should have + %% a check in before_loop to let us stop looping if a flag is set. + {request_body, _Ref, fin, _, Data} -> + State2 = loop_timeout(State), + parse(State2, HandlerState, ParseState, Data); + %% Timeouts. {timeout, TRef, ?MODULE} -> websocket_close(State, HandlerState, timeout); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> + %% @todo This should call before_loop. loop(State, HandlerState, ParseState); %% System messages. {'EXIT', Parent, Reason} -> @@ -282,6 +351,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages={OK, Closed, Error}, %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), + %% @todo This should call before_loop. loop(State, HandlerState, ParseState); Message -> handler_call(State, HandlerState, ParseState, @@ -341,8 +411,7 @@ parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensio websocket_close(State, HandlerState, Error) end. -dispatch_frame(State=#state{socket=Socket, transport=Transport, - max_frame_size=MaxFrameSize, frag_state=FragState, +dispatch_frame(State=#state{max_frame_size=MaxFrameSize, frag_state=FragState, frag_buffer=SoFar, extensions=Extensions}, HandlerState, #ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0}, RemainingData) -> @@ -363,12 +432,12 @@ dispatch_frame(State=#state{socket=Socket, transport=Transport, {close, CloseCode, Payload} -> websocket_close(State, HandlerState, {remote, CloseCode, Payload}); Frame = ping -> - Transport:send(Socket, cow_ws:frame(pong, Extensions)), + transport_send(State, nofin, cow_ws:frame(pong, Extensions)), handler_call(State, HandlerState, #ps_header{buffer=RemainingData}, websocket_handle, Frame, fun parse_header/3); Frame = {ping, Payload} -> - Transport:send(Socket, cow_ws:frame({pong, Payload}, Extensions)), + transport_send(State, nofin, cow_ws:frame({pong, Payload}, Extensions)), handler_call(State, HandlerState, #ps_header{buffer=RemainingData}, websocket_handle, Frame, fun parse_header/3); @@ -415,24 +484,32 @@ handler_call(State=#state{handler=Handler}, HandlerState, erlang:raise(Class, Reason, erlang:get_stacktrace()) end. +transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) -> + Pid ! {Stream, {data, IsFin, Data}}, + ok; +transport_send(#state{socket=Socket, transport=Transport}, _, Data) -> + Transport:send(Socket, Data). + -spec websocket_send(cow_ws:frame(), #state{}) -> ok | stop | {error, atom()}. websocket_send(Frames, State) when is_list(Frames) -> websocket_send_many(Frames, State, []); -websocket_send(Frame, #state{socket=Socket, transport=Transport, extensions=Extensions}) -> - Res = Transport:send(Socket, cow_ws:frame(Frame, Extensions)), +websocket_send(Frame, State=#state{extensions=Extensions}) -> + Data = cow_ws:frame(Frame, Extensions), case is_close_frame(Frame) of - true -> stop; - false -> Res + true -> + _ = transport_send(State, fin, Data), + stop; + false -> + transport_send(State, nofin, Data) end. -websocket_send_many([], #state{socket=Socket, transport=Transport}, Acc) -> - Transport:send(Socket, lists:reverse(Acc)); -websocket_send_many([Frame|Tail], State=#state{socket=Socket, transport=Transport, - extensions=Extensions}, Acc0) -> +websocket_send_many([], State, Acc) -> + transport_send(State, nofin, lists:reverse(Acc)); +websocket_send_many([Frame|Tail], State=#state{extensions=Extensions}, Acc0) -> Acc = [cow_ws:frame(Frame, Extensions)|Acc0], case is_close_frame(Frame) of true -> - _ = Transport:send(Socket, lists:reverse(Acc)), + _ = transport_send(State, fin, lists:reverse(Acc)), stop; false -> websocket_send_many(Tail, State, Acc) @@ -448,23 +525,22 @@ websocket_close(State, HandlerState, Reason) -> websocket_send_close(State, Reason), terminate(State, HandlerState, Reason). -websocket_send_close(#state{socket=Socket, transport=Transport, - extensions=Extensions}, Reason) -> +websocket_send_close(State=#state{extensions=Extensions}, Reason) -> _ = case Reason of Normal when Normal =:= stop; Normal =:= timeout -> - Transport:send(Socket, cow_ws:frame({close, 1000, <<>>}, Extensions)); + transport_send(State, fin, cow_ws:frame({close, 1000, <<>>}, Extensions)); {error, badframe} -> - Transport:send(Socket, cow_ws:frame({close, 1002, <<>>}, Extensions)); + transport_send(State, fin, cow_ws:frame({close, 1002, <<>>}, Extensions)); {error, badencoding} -> - Transport:send(Socket, cow_ws:frame({close, 1007, <<>>}, Extensions)); + transport_send(State, fin, cow_ws:frame({close, 1007, <<>>}, Extensions)); {error, badsize} -> - Transport:send(Socket, cow_ws:frame({close, 1009, <<>>}, Extensions)); + transport_send(State, fin, cow_ws:frame({close, 1009, <<>>}, Extensions)); {crash, _, _} -> - Transport:send(Socket, cow_ws:frame({close, 1011, <<>>}, Extensions)); + transport_send(State, fin, cow_ws:frame({close, 1011, <<>>}, Extensions)); remote -> - Transport:send(Socket, cow_ws:frame(close, Extensions)); + transport_send(State, fin, cow_ws:frame(close, Extensions)); {remote, Code, _} -> - Transport:send(Socket, cow_ws:frame({close, Code, <<>>}, Extensions)) + transport_send(State, fin, cow_ws:frame({close, Code, <<>>}, Extensions)) end, ok. diff --git a/test/draft_h2_websockets_SUITE.erl b/test/draft_h2_websockets_SUITE.erl new file mode 100644 index 0000000..31429df --- /dev/null +++ b/test/draft_h2_websockets_SUITE.erl @@ -0,0 +1,405 @@ +%% Copyright (c) 2018, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(draft_h2_websockets_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [config/2]). +-import(ct_helper, [doc/1]). + +all() -> [{group, enabled}]. + +groups() -> + Tests = ct_helper:all(?MODULE), + [{enabled, [parallel], Tests}]. + +init_per_group(Name = enabled, Config) -> + cowboy_test:init_http(Name, #{ + enable_connect_protocol => true, + env => #{dispatch => cowboy_router:compile(init_routes(Config))} + }, Config). + +end_per_group(Name, _) -> + ok = cowboy:stop_listener(Name). + +init_routes(_) -> [ + {"localhost", [ + {"/ws", ws_echo, []} + ]} +]. + +%% Do a prior knowledge handshake. +do_handshake(Config) -> + {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]), + %% Send a valid preface. + ok = gen_tcp:send(Socket, ["PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", cow_http2:settings(#{})]), + %% Receive the server preface. + {ok, << Len:24 >>} = gen_tcp:recv(Socket, 3, 1000), + {ok, << 4:8, 0:40, SettingsPayload:Len/binary >>} = gen_tcp:recv(Socket, 6 + Len, 1000), + Settings = cow_http2:parse_settings_payload(SettingsPayload), + %% Send the SETTINGS ack. + ok = gen_tcp:send(Socket, cow_http2:settings_ack()), + %% Receive the SETTINGS ack. + {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, Socket, Settings}. + +% The SETTINGS_ENABLE_CONNECT_PROTOCOL SETTINGS Parameter. + +% The new parameter name is SETTINGS_ENABLE_CONNECT_PROTOCOL. The +% value of the parameter MUST be 0 or 1. + +% Upon receipt of SETTINGS_ENABLE_CONNECT_PROTOCOL with a value of 1 a +% client MAY use the Extended CONNECT definition of this document when +% creating new streams. Receipt of this parameter by a server does not +% have any impact. +%% @todo ignore_client_enable_setting(Config) -> + +% A sender MUST NOT send a SETTINGS_ENABLE_CONNECT_PROTOCOL parameter +% with the value of 0 after previously sending a value of 1. + +reject_handshake_when_disabled(Config0) -> + doc("Extended CONNECT requests MUST be rejected with a " + "PROTOCOL_ERROR stream error when enable_connect_protocol=false. (draft-01 3)"), + Config = cowboy_test:init_http(disabled, #{ + enable_connect_protocol => false, + env => #{dispatch => cowboy_router:compile(init_routes(Config0))} + }, Config0), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 0. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := false} = Settings, + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +reject_handshake_disabled_by_default(Config0) -> + doc("Extended CONNECT requests MUST be rejected with a " + "PROTOCOL_ERROR stream error with default enable_connect_protocol. (draft-01 3)"), + Config = cowboy_test:init_http(disabled_by_default, #{ + env => #{dispatch => cowboy_router:compile(init_routes(Config0))} + }, Config0), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 0. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := false} = Settings, + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +% The Extended CONNECT Method. + +accept_uppercase_pseudo_header_protocol(Config) -> + doc("The :protocol pseudo header is case insensitive. (draft-01 4)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"WEBSOCKET">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 200 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + ok. + +reject_many_pseudo_header_protocol(Config) -> + doc("An extended CONNECT request containing more than one protocol component " + "must be rejected with a PROTOCOL_ERROR stream error. (draft-01 4, RFC7540 8.1.2.6)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request with more than one :protocol pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":protocol">>, <<"mqtt">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +reject_unknown_pseudo_header_protocol(Config) -> + doc("An extended CONNECT request with an unknown protocol must be rejected " + "with a 400 error. (draft-01 4)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request with an unknown :protocol pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"mqtt">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 400 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"400">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + ok. + +reject_invalid_pseudo_header_protocol(Config) -> + doc("An extended CONNECT request with an invalid protocol must be rejected " + "with a 400 error. (draft-01 4)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request with an invalid :protocol pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket mqtt">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 400 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"400">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + ok. + +reject_missing_pseudo_header_scheme(Config) -> + doc("An extended CONNECT request without a scheme component must be rejected " + "with a PROTOCOL_ERROR stream error. (draft-01 4, RFC7540 8.1.2.6)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request without a :scheme pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +reject_missing_pseudo_header_path(Config) -> + doc("An extended CONNECT request without a path component must be rejected " + "with a PROTOCOL_ERROR stream error. (draft-01 4, RFC7540 8.1.2.6)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request without a :path pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +% On requests bearing the :protocol pseudo-header, the :authority +% pseudo-header field is interpreted according to Section 8.1.2.3 of +% [RFC7540] instead of Section 8.3 of [RFC7540]. In particular the +% server MUST not make a new TCP connection to the host and port +% indicated by the :authority. + +reject_missing_pseudo_header_authority(Config) -> + doc("An extended CONNECT request without an authority component must be rejected " + "with a PROTOCOL_ERROR stream error. (draft-01 4, draft-01 5)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request without an :authority pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +% Using Extended CONNECT To Bootstrap The WebSocket Protocol. + +reject_missing_pseudo_header_protocol(Config) -> + doc("An extended CONNECT request without a protocol component must be rejected " + "with a PROTOCOL_ERROR stream error. (draft-01 4, RFC7540 8.1.2.6)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request without a :scheme pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +% The scheme of the Target URI [RFC7230] MUST be https for wss schemed +% WebSockets and http for ws schemed WebSockets. The websocket URI is +% still used for proxy autoconfiguration. + +reject_connection_header(Config) -> + doc("An extended CONNECT request with a connection header must be rejected " + "with a PROTOCOL_ERROR stream error. (draft-01 5, RFC7540 8.1.2.6)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request without a :scheme pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"connection">>, <<"upgrade">>}, + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +reject_upgrade_header(Config) -> + doc("An extended CONNECT request with a upgrade header must be rejected " + "with a PROTOCOL_ERROR stream error. (draft-01 5, RFC7540 8.1.2.6)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send an extended CONNECT request without a :scheme pseudo-header. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"upgrade">>, <<"websocket">>}, + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a PROTOCOL_ERROR stream error. + {ok, << _:24, 3:8, _:8, 1:32, 1:32 >>} = gen_tcp:recv(Socket, 13, 6000), + ok. + +% After successfully processing the opening handshake the peers should +% proceed with The WebSocket Protocol [RFC6455] using the HTTP/2 stream +% from the CONNECT transaction as if it were the TCP connection +% referred to in [RFC6455]. The state of the WebSocket connection at +% this point is OPEN as defined by [RFC6455], Section 4.1. +%% @todo I'm guessing we should test for things like RST_STREAM, +%% closing the connection and others? + +% Examples. + +%% @todo Probably worth testing that we get the correct option +%% over all different connection types (alpn, prior, upgrade). +accept_handshake_when_enabled(Config) -> + doc("Confirm the example for Websocket over HTTP/2 works. (draft-01 5.1)"), + %% Connect to server and confirm that SETTINGS_ENABLE_CONNECT_PROTOCOL = 1. + {ok, Socket, Settings} = do_handshake(Config), + #{enable_connect_protocol := true} = Settings, + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 200 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + %% Masked text hello echoed back clear by the server. + %% + %% We receive WINDOW_UPDATE frames before the actual data + %% due to flow control updates every time a data frame is received. + Mask = 16#37fa213d, + MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>), + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)), + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), + {ok, <>} = gen_tcp:recv(Socket, 9, 1000), + {ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000), + ok. + +%% Closing a Websocket stream. + +%% @todo client close frame with END_STREAM +%% @todo server close frame with END_STREAM +%% @todo client other frame with END_STREAM +%% @todo server other frame with END_STREAM +%% @todo client close connection -- cgit v1.2.3