diff options
| -rw-r--r-- | src/gun.erl | 14 | ||||
| -rw-r--r-- | src/gun_http.erl | 53 | ||||
| -rw-r--r-- | src/gun_http2.erl | 415 | ||||
| -rw-r--r-- | src/gun_tunnel.erl | 10 | ||||
| -rw-r--r-- | src/gun_ws.erl | 49 | ||||
| -rw-r--r-- | test/event_SUITE.erl | 183 | ||||
| -rw-r--r-- | test/ws_SUITE.erl | 139 | 
7 files changed, 590 insertions, 273 deletions
| diff --git a/src/gun.erl b/src/gun.erl index 69dbb6b..e441e52 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -229,6 +229,7 @@  	cookie_ignore_informational => boolean(),  	flow => pos_integer(),  	keepalive => timeout(), +	notify_settings_changed => boolean(),  	%% Options copied from cow_http2_machine.  	connection_window_margin_size => 0..16#7fffffff, @@ -708,6 +709,8 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->  	| {push, stream_ref(), binary(), binary(), resp_headers()}  	| {upgrade, [binary()], resp_headers()}  	| {ws, ws_frame()} +	| {up, http | http2 | raw | socks} +	| {notify, settings_changed, map()}  	| {error, {stream_error | connection_error | down, any()} | timeout}.  -spec await(pid(), stream_ref()) -> await_result(). @@ -747,6 +750,8 @@ await(ServerPid, StreamRef, Timeout, MRef) ->  			{ws, Frame};  		{gun_tunnel_up, ServerPid, StreamRef, Protocol} ->  			{up, Protocol}; +		{gun_notify, ServerPid, Type, Info} -> +			{notify, Type, Info};  		{gun_error, ServerPid, StreamRef, Reason} ->  			{error, {stream_error, Reason}};  		{gun_error, ServerPid, Reason} -> @@ -1223,7 +1228,8 @@ connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{  		protocol=Protocol=gun_ws, protocol_state=ProtoState,  		event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->  	{Commands, EvHandlerState} = Protocol:ws_send(Frames, -		ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), +		ProtoState, dereference_stream_ref(StreamRef, State), +		ReplyTo, EvHandler, EvHandlerState0),  	commands(Commands, State#state{event_handler_state=EvHandlerState});  connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{  		protocol=Protocol=gun_ws, protocol_state=ProtoState, @@ -1312,10 +1318,10 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts},  %% @todo Maybe better standardize the protocol callbacks argument orders.  connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{  		protocol=Protocol, protocol_state=ProtoState, -		event_handler=EvHandler, event_handler_state=EvHandlerState0}) -		when is_list(StreamRef) -> +		event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->  	{Commands, EvHandlerState} = Protocol:ws_send(Frames, -		ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), +		ProtoState, dereference_stream_ref(StreamRef, State), +		ReplyTo, EvHandler, EvHandlerState0),  	commands(Commands, State#state{event_handler_state=EvHandlerState});  %% Catch-all for the StreamRef-free variant.  connected(cast, {ws_send, ReplyTo, _}, _) -> diff --git a/src/gun_http.erl b/src/gun_http.erl index 8b716a5..0eeca05 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -955,56 +955,23 @@ ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->  		{_, Accept} ->  			case cow_ws:encode_key(Key) of  				Accept -> -					ws_handshake_extensions(Buffer, State, Ws, Headers); +					ws_handshake_extensions_and_protocol(Buffer, State, Ws, Headers);  				_ ->  					close  			end  	end. -ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) -> -	case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of -		false -> -			ws_handshake_protocols(Buffer, State, Ws, Headers, #{}); -		{_, ExtHd} -> -			ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd), -			case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of +ws_handshake_extensions_and_protocol(Buffer, State, +		Ws=#websocket{extensions=Extensions0, opts=WsOpts}, Headers) -> +	case gun_ws:select_extensions(Headers, Extensions0, WsOpts) of +		close -> +			close; +		Extensions -> +			case gun_ws:select_protocol(Headers, WsOpts) of  				close ->  					close; -				Extensions -> -					ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions) -			end -	end. - -ws_validate_extensions([], _, Acc, _) -> -	Acc; -ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) -> -	case lists:member(Name, GunExts) of -		true -> -			case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of -				{ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts); -				error -> close -			end; -		%% Fail the connection if extension was not requested. -		false -> -			close -	end; -%% Fail the connection on unknown extension. -ws_validate_extensions(_, _, _, _) -> -	close. - -%% @todo Validate protocols. -ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) -> -	case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of -		false -> -			ws_handshake_end(Buffer, State, Ws, Headers, Extensions, -				maps:get(default_protocol, Opts, gun_ws_h)); -		{_, Proto} -> -			ProtoOpt = maps:get(protocols, Opts, []), -			case lists:keyfind(Proto, 1, ProtoOpt) of -				{_, Handler} -> -					ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler); -				false -> -					close +				Handler -> +					ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler)  			end  	end. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index cb10029..037c193 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -38,17 +38,19 @@  -export([ws_upgrade/11]).  -export([ws_send/6]). +-record(websocket_info, { +	extensions :: [binary()], +	opts :: gun:ws_opts() +}). +  -record(tunnel, { -	%% The tunnel can either go requested->established -	%% or requested->tls_handshake->established, or get -	%% canceled.  	state = requested :: requested | established,  	%% Destination information. -	destination = undefined :: gun:connect_destination(), +	destination = undefined :: undefined | gun:connect_destination(),  	%% Tunnel information. -	info = undefined :: gun:tunnel_info(), +	info = undefined :: gun:tunnel_info() | #websocket_info{},  	%% Protocol module and state of the outer layer. Only initialized  	%% after the TLS handshake has completed when TLS is involved. @@ -81,6 +83,7 @@  }).  -record(http2_state, { +	reply_to :: pid(),  	socket :: inet:socket() | ssl:sslsocket(),  	transport :: module(),  	opts = #{} :: gun:http2_opts(), @@ -136,6 +139,8 @@ do_check_options([{keepalive, infinity}|Opts]) ->  	do_check_options(Opts);  do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->  	do_check_options(Opts); +do_check_options([{notify_settings_changed, B}|Opts]) when is_boolean(B) -> +	do_check_options(Opts);  do_check_options([Opt={Name, _}|Opts]) ->  	%% We blindly accept all cow_http2_machine options.  	HTTP2MachineOpts = [ @@ -166,7 +171,7 @@ opts_name() -> http2_opts.  has_keepalive() -> true.  default_keepalive() -> infinity. -init(_ReplyTo, Socket, Transport, Opts0) -> +init(ReplyTo, Socket, Transport, Opts0) ->  	%% We have different defaults than the protocol in order  	%% to optimize for performance when receiving responses.  	Opts = Opts0#{ @@ -178,8 +183,8 @@ init(_ReplyTo, Socket, Transport, Opts0) ->  	TunnelTransport = maps:get(tunnel_transport, Opts, undefined),  	{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts#{message_tag => BaseStreamRef}),  	%% @todo Better validate the preface being received. -	State = #http2_state{socket=Socket, transport=Transport, opts=Opts, -		base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, +	State = #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport, +		opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport,  		content_handlers=Handlers, http2_machine=HTTP2Machine},  	Transport:send(Socket, Preface),  	{connected, State}. @@ -283,7 +288,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan  			{update_window(State#http2_state{http2_machine=HTTP2Machine}),  				CookieStore, EvHandlerState};  		{ok, HTTP2Machine} -> -			{maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), +			{maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),  				CookieStore, EvHandlerState};  		{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->  			data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, @@ -313,7 +318,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan  				CookieStore, EvHandlerState};  		{send, SendData, HTTP2Machine} ->  			{StateRet, EvHandlerStateRet} = send_data( -				maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), +				maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),  				SendData, EvHandler, EvHandlerState),  			{StateRet, CookieStore, EvHandlerStateRet};  		{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> @@ -325,11 +330,23 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan  				CookieStore, EvHandlerState}  	end. -maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> +maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, +		transport=Transport, opts=Opts, http2_machine=HTTP2Machine}, Frame) ->  	case Frame of -		{settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); -		{ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); -		_ -> ok +		{settings, _} -> +			%% We notify remote settings changes only if the user requested it. +			_ = case Opts of +				#{notify_settings_changed := true} -> +					ReplyTo ! {gun_notify, self(), settings_changed, +						cow_http2_machine:get_remote_settings(HTTP2Machine)}; +				_ -> +					ok +			end, +			Transport:send(Socket, cow_http2:settings_ack()); +		{ping, Opaque} -> +			Transport:send(Socket, cow_http2:ping_ack(Opaque)); +		_ -> +			ok  	end,  	State. @@ -363,7 +380,13 @@ tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},  		State, EvHandler, EvHandlerState);  tunnel_commands([{error, _Reason}|_], #stream{id=StreamID},  		State, _EvHandler, EvHandlerState) -> -	{delete_stream(State, StreamID), EvHandlerState}. +	{delete_stream(State, StreamID), EvHandlerState}; +%% @todo Set a timeout for closing the Websocket stream. +tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> +	tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); +%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.) +tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> +	tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState).  continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->  	case ContinueStreamRef of @@ -409,133 +432,212 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,  	end,  	{maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. -%% @todo Make separate functions for inform/connect/normal. -headers_frame(State=#http2_state{transport=Transport, opts=Opts, -		tunnel_transport=TunnelTransport, content_handlers=Handlers0}, +headers_frame(State0=#http2_state{opts=Opts},  		StreamID, IsFin, Headers, #{status := Status}, _BodyLen,  		CookieStore0, EvHandler, EvHandlerState0) -> -	Stream = get_stream_by_id(State, StreamID), +	Stream = get_stream_by_id(State0, StreamID),  	#stream{ -		ref=StreamRef, -		reply_to=ReplyTo,  		authority=Authority,  		path=Path,  		tunnel=Tunnel  	} = Stream, -	CookieStore = gun_cookies:set_cookie_header(scheme(State), +	CookieStore = gun_cookies:set_cookie_header(scheme(State0),  		Authority, Path, Status, Headers, CookieStore0, Opts), -	RealStreamRef = stream_ref(State, StreamRef), -	if +	{State, EvHandlerState} = if  		Status >= 100, Status =< 199 -> -			ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, -			EvHandlerState = EvHandler:response_inform(#{ +			headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); +		Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin -> +			headers_frame_connect(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); +		true -> +			headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0) +	end, +	{State, CookieStore, EvHandlerState}. + +headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, +		Status, Headers, EvHandler, EvHandlerState0) -> +	RealStreamRef = stream_ref(State, StreamRef), +	ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, +	EvHandlerState = EvHandler:response_inform(#{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo, +		status => Status, +		headers => Headers +	}, EvHandlerState0), +	{State, EvHandlerState}. + +headers_frame_connect(State0=#http2_state{http2_machine=HTTP2Machine0}, +		Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, tunnel=#tunnel{ +			info=#websocket_info{extensions=Extensions0, opts=WsOpts}}}, +		Status, Headers, EvHandler, EvHandlerState0) -> +	RealStreamRef = stream_ref(State0, StreamRef), +	EvHandlerState1 = EvHandler:response_headers(#{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo, +		status => Status, +		headers => Headers +	}, EvHandlerState0), +	%% Websocket CONNECT response headers terminate the response but not the stream. +	EvHandlerState = EvHandler:response_end(#{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo +	}, EvHandlerState1), +	case gun_ws:select_extensions(Headers, Extensions0, WsOpts) of +		close -> +			{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), +			State1 = State0#http2_state{http2_machine=HTTP2Machine}, +			State = reset_stream(State1, StreamID, {stream_error, cancel, +				'The sec-websocket-extensions header is invalid. (RFC6455 9.1, RFC7692 7)'}), +			{State, EvHandlerState}; +		Extensions -> +			case gun_ws:select_protocol(Headers, WsOpts) of +				close -> +					{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), +					State1 = State0#http2_state{http2_machine=HTTP2Machine}, +					State = reset_stream(State1, StreamID, {stream_error, cancel, +						'The sec-websocket-protocol header is invalid. (RFC6455 4.1)'}), +					{State, EvHandlerState}; +				Handler -> +					headers_frame_connect_websocket(State0, Stream, Headers, +						EvHandler, EvHandlerState, Extensions, Handler) +			end +	end; +headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_transport=TunnelTransport}, +		Stream=#stream{ref=StreamRef, reply_to=ReplyTo, tunnel=Tunnel=#tunnel{ +		destination=Destination=#{host := DestHost, port := DestPort}, info=TunnelInfo0}}, +		Status, Headers, EvHandler, EvHandlerState0) -> +	RealStreamRef = stream_ref(State, StreamRef), +	TunnelInfo = TunnelInfo0#{ +		origin_host => DestHost, +		origin_port => DestPort +	}, +	ReplyTo ! {gun_response, self(), RealStreamRef, nofin, Status, Headers}, +	EvHandlerState1 = EvHandler:response_headers(#{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo, +		status => Status, +		headers => Headers +	}, EvHandlerState0), +	EvHandlerState2 = EvHandler:origin_changed(#{ +		stream_ref => RealStreamRef, +		type => connect, +		origin_scheme => case Destination of +			#{transport := tls} -> <<"https">>; +			_ -> <<"http">> +		end, +		origin_host => DestHost, +		origin_port => DestPort +	}, EvHandlerState1), +	ContinueStreamRef = continue_stream_ref(State, StreamRef), +	OriginSocket = #{ +		gun_pid => self(), +		reply_to => ReplyTo, +		stream_ref => RealStreamRef, +		handle_continue_stream_ref => ContinueStreamRef +	}, +	Proto = gun_tunnel, +	ProtoOpts = case Destination of +		#{transport := tls} -> +			Protocols = maps:get(protocols, Destination, [http2, http]), +			TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost), +			HandshakeEvent = #{  				stream_ref => RealStreamRef,  				reply_to => ReplyTo, -				status => Status, -				headers => Headers -			}, EvHandlerState0), -			{State, CookieStore, EvHandlerState}; -		Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested -> -			#tunnel{destination=Destination, info=TunnelInfo0} = Tunnel, -			#{host := DestHost, port := DestPort} = Destination, -			TunnelInfo = TunnelInfo0#{ -				origin_host => DestHost, -				origin_port => DestPort +				tls_opts => TLSOpts, +				timeout => maps:get(tls_handshake_timeout, Destination, infinity)  			}, -			ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, -			EvHandlerState1 = EvHandler:response_headers(#{ +			Opts#{  				stream_ref => RealStreamRef, -				reply_to => ReplyTo, -				status => Status, -				headers => Headers -			}, EvHandlerState0), -			EvHandlerState2 = EvHandler:origin_changed(#{ -				stream_ref => RealStreamRef, -				type => connect, -				origin_scheme => case Destination of -					#{transport := tls} -> <<"https">>; -					_ -> <<"http">> -				end, -				origin_host => DestHost, -				origin_port => DestPort -			}, EvHandlerState1), -			ContinueStreamRef = continue_stream_ref(State, StreamRef), -			OriginSocket = #{ -				gun_pid => self(), -				reply_to => ReplyTo, +				tunnel => #{ +					type => connect, +					transport_name => case TunnelTransport of +						undefined -> Transport:name(); +						_ -> TunnelTransport +					end, +					protocol_name => http2, +					info => TunnelInfo, +					handshake_event => HandshakeEvent, +					protocols => Protocols +				} +			}; +		_ -> +			[NewProtocol] = maps:get(protocols, Destination, [http]), +			Opts#{  				stream_ref => RealStreamRef, -				handle_continue_stream_ref => ContinueStreamRef -			}, -			Proto = gun_tunnel, -			ProtoOpts = case Destination of -				#{transport := tls} -> -					Protocols = maps:get(protocols, Destination, [http2, http]), -					TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost), -					HandshakeEvent = #{ -						stream_ref => RealStreamRef, -						reply_to => ReplyTo, -						tls_opts => TLSOpts, -						timeout => maps:get(tls_handshake_timeout, Destination, infinity) -					}, -					Opts#{ -						stream_ref => RealStreamRef, -						tunnel => #{ -							type => connect, -							transport_name => case TunnelTransport of -								undefined -> Transport:name(); -								_ -> TunnelTransport -							end, -							protocol_name => http2, -							info => TunnelInfo, -							handshake_event => HandshakeEvent, -							protocols => Protocols -						} -					}; -				_ -> -					[NewProtocol] = maps:get(protocols, Destination, [http]), -					Opts#{ -						stream_ref => RealStreamRef, -						tunnel => #{ -							type => connect, -							transport_name => case TunnelTransport of -								undefined -> Transport:name(); -								_ -> TunnelTransport -							end, -							protocol_name => http2, -							info => TunnelInfo, -							new_protocol => NewProtocol -						} -					} -			end, -			{tunnel, ProtoState, EvHandlerState} = Proto:init( -				ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), -			{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{ -				info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), -				CookieStore, EvHandlerState}; -		true -> -			ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, -			EvHandlerState1 = EvHandler:response_headers(#{ +				tunnel => #{ +					type => connect, +					transport_name => case TunnelTransport of +						undefined -> Transport:name(); +						_ -> TunnelTransport +					end, +					protocol_name => http2, +					info => TunnelInfo, +					new_protocol => NewProtocol +				} +			} +	end, +	{tunnel, ProtoState, EvHandlerState} = Proto:init( +		ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), +	{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, +		info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), +		EvHandlerState}. + +headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, +		tunnel=Tunnel=#tunnel{info=#websocket_info{opts=WsOpts}}}, +		Headers, EvHandler, EvHandlerState0, Extensions, Handler) -> +	RealStreamRef = stream_ref(State, StreamRef), +	ContinueStreamRef = continue_stream_ref(State, StreamRef), +	OriginSocket = #{ +		gun_pid => self(), +		reply_to => ReplyTo, +		stream_ref => RealStreamRef, +		handle_continue_stream_ref => ContinueStreamRef +	}, +	ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, +	Proto = gun_ws, +	EvHandlerState = EvHandler:protocol_changed(#{ +		stream_ref => RealStreamRef, +		protocol => Proto:name() +	}, EvHandlerState0), +	ProtoOpts = #{ +		stream_ref => RealStreamRef, +		headers => Headers, +		extensions => Extensions, +		flow => maps:get(flow, WsOpts, infinity), +		handler => Handler, +		opts => WsOpts +	}, +	{connected_ws_only, ProtoState} = Proto:init( +		ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), +	{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, +		protocol=Proto, protocol_state=ProtoState}}), +		EvHandlerState}. + +headers_frame_response(State=#http2_state{content_handlers=Handlers0}, +		Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, +		IsFin, Status, Headers, EvHandler, EvHandlerState0) -> +	RealStreamRef = stream_ref(State, StreamRef), +	ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, +	EvHandlerState1 = EvHandler:response_headers(#{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo, +		status => Status, +		headers => Headers +	}, EvHandlerState0), +	{Handlers, EvHandlerState} = case IsFin of +		fin -> +			EvHandlerState2 = EvHandler:response_end(#{  				stream_ref => RealStreamRef, -				reply_to => ReplyTo, -				status => Status, -				headers => Headers -			}, EvHandlerState0), -			{Handlers, EvHandlerState} = case IsFin of -				fin -> -					EvHandlerState2 = EvHandler:response_end(#{ -						stream_ref => RealStreamRef, -						reply_to => ReplyTo -					}, EvHandlerState1), -					{undefined, EvHandlerState2}; -				nofin -> -					{gun_content_handler:init(ReplyTo, RealStreamRef, -						Status, Headers, Handlers0), EvHandlerState1} -			end, -			%% @todo Disable the tunnel if any. -			{maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), -				StreamID, remote, IsFin), -				CookieStore, EvHandlerState} -	end. +				reply_to => ReplyTo +			}, EvHandlerState1), +			{undefined, EvHandlerState2}; +		nofin -> +			{gun_content_handler:init(ReplyTo, RealStreamRef, +				Status, Headers, Handlers0), EvHandlerState1} +	end, +	%% We disable the tunnel, if any, when receiving any non 2xx response. +	{maybe_delete_stream(store_stream(State, +		Stream#stream{handler_state=Handlers, tunnel=undefined}), +		StreamID, remote, IsFin), EvHandlerState}.  trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->  	#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), @@ -1195,7 +1297,58 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->  down(#http2_state{stream_refs=Refs}) ->  	maps:keys(Refs). -%% Websocket upgrades are currently only accepted when tunneled. +ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, +		http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, +		Host, Port, Path, Headers0, WsOpts, +		CookieStore0, EvHandler, EvHandlerState0) +		when is_reference(StreamRef) -> +	{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( +		<<"CONNECT">>, HTTP2Machine0), +	{ok, PseudoHeaders, Headers1, CookieStore} = prepare_headers(State, +		<<"CONNECT">>, Host, Port, Path, Headers0, CookieStore0), +	{Headers2, GunExtensions} = case maps:get(compress, WsOpts, false) of +		true -> +			{[{<<"sec-websocket-extensions">>, +				<<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} +			|Headers1], [<<"permessage-deflate">>]}; +		false -> +			{Headers1, []} +	end, +	Headers3 = case maps:get(protocols, WsOpts, []) of +		[] -> +			Headers2; +		ProtoOpt -> +			<< _, _, Proto/bits >> = iolist_to_binary([[<<", ">>, P] || {P, _} <- ProtoOpt]), +			[{<<"sec-websocket-protocol">>, Proto}|Headers2] +	end, +	Headers = [{<<"sec-websocket-version">>, <<"13">>}|Headers3], +	Authority = maps:get(authority, PseudoHeaders), +	RealStreamRef = stream_ref(State, StreamRef), +	RequestEvent = #{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo, +		function => ?FUNCTION_NAME, +		method => <<"CONNECT">>, +		authority => Authority, +		path => Path, +		headers => Headers +	}, +	EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), +	{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( +		StreamID, HTTP2Machine1, nofin, PseudoHeaders#{protocol => <<"websocket">>}, Headers), +	Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), +	EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), +	RequestEndEvent = #{ +		stream_ref => RealStreamRef, +		reply_to => ReplyTo +	}, +	EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), +	InitialFlow = maps:get(flow, WsOpts, infinity), +	Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, +		authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{ +			extensions=GunExtensions, opts=WsOpts}}}, +	{create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), +		CookieStore, EvHandlerState};  ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,  		Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->  	case get_stream_by_ref(State, StreamRef) of @@ -1210,7 +1363,11 @@ ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,  		%% @todo Error conditions?  	end. -ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> +ws_send(Frames, State0, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) -> +	StreamRef = case RealStreamRef of +		[SR|_] -> SR; +		_ -> RealStreamRef +	end,  	case get_stream_by_ref(State0, StreamRef) of  		Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} ->  			{Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState, diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index 7c29684..2594d24 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -340,12 +340,12 @@ connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},  		EvHandler, EvHandlerState0),  	{State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. -cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},  		StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> -	StreamRef = maybe_dereference(State0, StreamRef0), -	{Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), -	{State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), -	{{state, State}, EvHandlerState}. +	StreamRef = maybe_dereference(State, StreamRef0), +	{ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, StreamRef, +		ReplyTo, EvHandler, EvHandlerState0), +	{State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.  timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) ->  	case Proto:timeout(ProtoState0, Msg, TRef) of diff --git a/src/gun_ws.erl b/src/gun_ws.erl index f413f94..a1fdfae 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -15,12 +15,15 @@  -module(gun_ws).  -export([check_options/1]). +-export([select_extensions/3]). +-export([select_protocol/2]).  -export([name/0]).  -export([opts_name/0]).  -export([has_keepalive/0]).  -export([default_keepalive/0]).  -export([init/4]).  -export([handle/5]). +-export([handle_continue/6]).  -export([update_flow/4]).  -export([closing/4]).  -export([close/4]). @@ -89,6 +92,47 @@ do_check_options([{user_opts, _}|Opts]) ->  do_check_options([Opt|_]) ->  	{error, {options, {ws, Opt}}}. +select_extensions(Headers, Extensions0, Opts) -> +	case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of +		false -> +			#{}; +		{_, ExtHd} -> +			ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd), +			validate_extensions(ParsedExtHd, Extensions0, Opts, #{}) +	end. + +validate_extensions([], _, _, Acc) -> +	Acc; +validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], Extensions, Opts, Acc0) -> +	case lists:member(Name, Extensions) of +		true -> +			case cow_ws:validate_permessage_deflate(Params, Acc0, Opts) of +				{ok, Acc} -> validate_extensions(Tail, Extensions, Opts, Acc); +				error -> close +			end; +		%% Fail the connection if extension was not requested. +		false -> +			close +	end; +%% Fail the connection on unknown extension. +validate_extensions(_, _, _, _) -> +	close. + +%% @todo Validate protocols. +select_protocol(Headers, Opts) -> +	case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of +		false -> +			maps:get(default_protocol, Opts, gun_ws_h); +		{_, Proto} -> +			ProtoOpt = maps:get(protocols, Opts, []), +			case lists:keyfind(Proto, 1, ProtoOpt) of +				{_, Handler} -> +					Handler; +				false -> +					close +			end +	end. +  name() -> ws.  opts_name() -> ws_opts.  has_keepalive() -> true. @@ -176,6 +220,11 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke  			closing(Error, State, EvHandler, EvHandlerState)  	end. +handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data}, +		#ws_state{}, CookieStore, _EvHandler, EvHandlerState) +		when is_reference(ContinueStreamRef) -> +	{{send, IsFin, Data}, CookieStore, EvHandlerState}. +  maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->  	{[  		{state, State}, diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl index d883dc5..8be10b1 100644 --- a/test/event_SUITE.erl +++ b/test/event_SUITE.erl @@ -34,25 +34,25 @@ groups() ->  	HTTP1Tests = [T || T <- Tests, lists:sublist(atom_to_list(T), 6) =:= "http1_"],  	%% Push is not possible over HTTP/1.1.  	PushTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 5) =:= "push_"], -	%% We currently do not support Websocket over HTTP/2. -	WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"],  	[  		{http, [parallel], Tests -- [cancel_remote, cancel_remote_connect|PushTests]}, -		{http2, [parallel], (Tests -- WsTests) -- HTTP1Tests} +		{http2, [parallel], Tests -- HTTP1Tests}  	].  init_per_suite(Config) -> -	ProtoOpts = #{env => #{ -		dispatch => cowboy_router:compile([{'_', [ -			{"/", hello_h, []}, -			{"/empty", empty_h, []}, -			{"/inform", inform_h, []}, -			{"/push", push_h, []}, -			{"/stream", stream_h, []}, -			{"/trailers", trailers_h, []}, -			{"/ws", ws_echo_h, []} -		]}]) -	}}, +	Routes = [ +		{"/", hello_h, []}, +		{"/empty", empty_h, []}, +		{"/inform", inform_h, []}, +		{"/push", push_h, []}, +		{"/stream", stream_h, []}, +		{"/trailers", trailers_h, []}, +		{"/ws", ws_echo_h, []} +	], +	ProtoOpts = #{ +		enable_connect_protocol => true, +		env => #{dispatch => cowboy_router:compile([{'_', Routes}])} +	},  	{ok, _} = cowboy:start_clear({?MODULE, tcp}, [], ProtoOpts),  	TCPOriginPort = ranch:get_port({?MODULE, tcp}),  	{ok, _} = cowboy:start_tls({?MODULE, tls}, ct_helper:get_certs_from_ets(), ProtoOpts), @@ -1227,8 +1227,10 @@ http1_response_end_body_close(Config) ->  ws_upgrade(Config) ->  	doc("Confirm that the ws_upgrade event callback is called."), +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, _} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	StreamRef = gun:ws_upgrade(Pid, "/ws"),  	ReplyTo = self(),  	#{ @@ -1258,11 +1260,15 @@ do_ws_upgrade_connect(Config, ProxyProtocol) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),  	#{  		stream_ref := StreamRef2, @@ -1273,8 +1279,10 @@ do_ws_upgrade_connect(Config, ProxyProtocol) ->  ws_upgrade_all_events(Config) ->  	doc("Confirm that a Websocket upgrade triggers all relevant events."), +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, OriginPort} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	StreamRef = gun:ws_upgrade(Pid, "/ws"),  	ReplyTo = self(),  	#{ @@ -1283,11 +1291,15 @@ ws_upgrade_all_events(Config) ->  		opts := #{}  	} = do_receive_event(ws_upgrade),  	Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), +	Method = case Protocol of +		http -> <<"GET">>; +		http2 -> <<"CONNECT">> +	end,  	#{  		stream_ref := StreamRef,  		reply_to := ReplyTo,  		function := ws_upgrade, -		method := <<"GET">>, +		method := Method,  		authority := EventAuthority1,  		path := "/ws",  		headers := [_|_] @@ -1297,7 +1309,7 @@ ws_upgrade_all_events(Config) ->  		stream_ref := StreamRef,  		reply_to := ReplyTo,  		function := ws_upgrade, -		method := <<"GET">>, +		method := Method,  		authority := EventAuthority2,  		path := "/ws",  		headers := [_|_] @@ -1311,12 +1323,26 @@ ws_upgrade_all_events(Config) ->  		stream_ref := StreamRef,  		reply_to := ReplyTo  	} = do_receive_event(response_start), -	#{ -		stream_ref := StreamRef, -		reply_to := ReplyTo, -		status := 101, -		headers := [_|_] -	} = do_receive_event(response_inform), +	_ = case Protocol of +		http -> +			#{ +				stream_ref := StreamRef, +				reply_to := ReplyTo, +				status := 101, +				headers := [_|_] +			} = do_receive_event(response_inform); +		http2 -> +			#{ +				stream_ref := StreamRef, +				reply_to := ReplyTo, +				status := 200, +				headers := [_|_] +			} = do_receive_event(response_headers), +			#{ +				stream_ref := StreamRef, +				reply_to := ReplyTo +			} = do_receive_event(response_end) +	end,  	#{  		stream_ref := StreamRef,  		protocol := ws @@ -1343,16 +1369,27 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	%% Skip all CONNECT-related events that may conflict.  	_ = do_receive_event(request_start),  	_ = do_receive_event(request_headers),  	_ = do_receive_event(request_end),  	_ = do_receive_event(response_start), +	case OriginProtocol of +		http -> ok; +		http2 -> +			_ = do_receive_event(response_headers), +%			_ = do_receive_event(response_end), @todo Probably should response_end CONNECT responses for both protocols. +			ok +	end,  	_ = do_receive_event(protocol_changed),  	%% Check the Websocket events.  	StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), @@ -1362,11 +1399,15 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->  		opts := #{}  	} = do_receive_event(ws_upgrade),  	Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), +	Method = case OriginProtocol of +		http -> <<"GET">>; +		http2 -> <<"CONNECT">> +	end,  	#{  		stream_ref := StreamRef2,  		reply_to := ReplyTo,  		function := ws_upgrade, -		method := <<"GET">>, +		method := Method,  		authority := EventAuthority1,  		path := "/ws",  		headers := [_|_] @@ -1376,7 +1417,7 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->  		stream_ref := StreamRef2,  		reply_to := ReplyTo,  		function := ws_upgrade, -		method := <<"GET">>, +		method := Method,  		authority := EventAuthority2,  		path := "/ws",  		headers := [_|_] @@ -1390,12 +1431,26 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->  		stream_ref := StreamRef2,  		reply_to := ReplyTo  	} = do_receive_event(response_start), -	#{ -		stream_ref := StreamRef2, -		reply_to := ReplyTo, -		status := 101, -		headers := [_|_] -	} = do_receive_event(response_inform), +	_ = case OriginProtocol of +		http -> +			#{ +				stream_ref := StreamRef2, +				reply_to := ReplyTo, +				status := 101, +				headers := [_|_] +			} = do_receive_event(response_inform); +		http2 -> +			#{ +				stream_ref := StreamRef2, +				reply_to := ReplyTo, +				status := 200, +				headers := [_|_] +			} = do_receive_event(response_headers), +			#{ +				stream_ref := StreamRef2, +				reply_to := ReplyTo +			} = do_receive_event(response_end) +	end,  	#{  		stream_ref := StreamRef2,  		protocol := ws @@ -1406,11 +1461,13 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->  ws_recv_frame_start(Config) ->  	doc("Confirm that the ws_recv_frame_start event callback is called."), +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, _} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	StreamRef = gun:ws_upgrade(Pid, "/ws"),  	{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), -	gun:ws_send(Pid, {text, <<"Hello!">>}), +	gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),  	ReplyTo = self(),  	#{  		stream_ref := StreamRef, @@ -1440,11 +1497,15 @@ do_ws_recv_frame_start_connect(Config, ProxyProtocol) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),  	gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1458,11 +1519,13 @@ do_ws_recv_frame_start_connect(Config, ProxyProtocol) ->  ws_recv_frame_header(Config) ->  	doc("Confirm that the ws_recv_frame_header event callback is called."), +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, _} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	StreamRef = gun:ws_upgrade(Pid, "/ws"),  	{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), -	gun:ws_send(Pid, {text, <<"Hello!">>}), +	gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),  	ReplyTo = self(),  	#{  		stream_ref := StreamRef, @@ -1496,11 +1559,15 @@ do_ws_recv_frame_header_connect(Config, ProxyProtocol) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),  	gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1518,11 +1585,13 @@ do_ws_recv_frame_header_connect(Config, ProxyProtocol) ->  ws_recv_frame_end(Config) ->  	doc("Confirm that the ws_recv_frame_end event callback is called."), +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, _} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	StreamRef = gun:ws_upgrade(Pid, "/ws"),  	{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), -	gun:ws_send(Pid, {text, <<"Hello!">>}), +	gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),  	ReplyTo = self(),  	#{  		stream_ref := StreamRef, @@ -1553,11 +1622,15 @@ do_ws_recv_frame_end_connect(Config, ProxyProtocol) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),  	gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1581,11 +1654,13 @@ ws_send_frame_end(Config) ->  	do_ws_send_frame(Config, ?FUNCTION_NAME).  do_ws_send_frame(Config, EventName) -> +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, _} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	StreamRef = gun:ws_upgrade(Pid, "/ws"),  	{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), -	gun:ws_send(Pid, {text, <<"Hello!">>}), +	gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),  	ReplyTo = self(),  	#{  		stream_ref := StreamRef, @@ -1621,11 +1696,15 @@ do_ws_send_frame_connect(Config, ProxyProtocol, EventName) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),  	gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1641,8 +1720,10 @@ do_ws_send_frame_connect(Config, ProxyProtocol, EventName) ->  ws_protocol_changed(Config) ->  	doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success."), +	Protocol = config(name, config(tc_group_properties, Config)),  	{ok, Pid, _} = do_gun_open(Config), -	{ok, _} = gun:await_up(Pid), +	{ok, Protocol} = gun:await_up(Pid), +	ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),  	_ = gun:ws_upgrade(Pid, "/ws"),  	#{  		protocol := ws @@ -1668,11 +1749,15 @@ do_ws_protocol_changed_connect(Config, ProxyProtocol) ->  	StreamRef1 = gun:connect(ConnPid, #{  		host => "localhost",  		port => OriginPort, -		protocols => [OriginProtocol] +		protocols => [case OriginProtocol of +			http -> http; +			http2 -> {http2, #{notify_settings_changed => true}} +		end]  	}, []),  	%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...  	{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),  	{up, OriginProtocol} = gun:await(ConnPid, StreamRef1), +	ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),  	#{  		stream_ref := StreamRef1,  		protocol := OriginProtocol @@ -1951,6 +2036,7 @@ do_gun_open(Config) ->  do_gun_open(OriginPort, Config) ->  	Opts = #{  		event_handler => {?MODULE, self()}, +		http2_opts => #{notify_settings_changed => true},  		protocols => [config(name, config(tc_group_properties, Config))]  	},  	{ok, Pid} = gun:open("localhost", OriginPort, Opts), @@ -1960,6 +2046,7 @@ do_gun_open_tls(Config) ->  	OriginPort = config(tls_origin_port, Config),  	Opts = #{  		event_handler => {?MODULE, self()}, +		http2_opts => #{notify_settings_changed => true},  		protocols => [config(name, config(tc_group_properties, Config))],  		transport => tls  	}, diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index c04acc2..201403e 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -22,19 +22,30 @@  %% ct.  all() -> -	[{group, ws}]. +	[{group, http}, {group, http2}].  groups() -> -	[{ws, [], ct_helper:all(?MODULE)}]. +	Tests = ct_helper:all(?MODULE), +	HTTP1Tests = [ +		http10_upgrade_error, +		http11_request_error, +		http11_keepalive, +		http11_keepalive_default_silence_pings +	], +	[ +		{http, [], Tests}, +		{http2, [], Tests -- HTTP1Tests} +	].  init_per_suite(Config) ->  	Routes = [  		{"/", ws_echo_h, []},  		{"/reject", ws_reject_h, []}  	], -	{ok, _} = cowboy:start_clear(ws, [], #{env => #{ -		dispatch => cowboy_router:compile([{'_', Routes}]) -	}}), +	{ok, _} = cowboy:start_clear(ws, [], #{ +		enable_connect_protocol => true, +		env => #{dispatch => cowboy_router:compile([{'_', Routes}])} +	}),  	Port = ranch:get_port(ws),  	[{port, Port}|Config]. @@ -45,16 +56,37 @@ end_per_suite(_) ->  await(Config) ->  	doc("Ensure gun:await/2 can be used to receive Websocket frames."), -	{ok, ConnPid} = gun:open("localhost", config(port, Config)), -	{ok, _} = gun:await_up(ConnPid), +	Protocol = config(name, config(tc_group_properties, Config)), +	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{ +		protocols => [Protocol], +		http2_opts => #{notify_settings_changed => true} +	}), +	{ok, Protocol} = gun:await_up(ConnPid), +	do_await_enable_connect_protocol(Protocol, ConnPid),  	StreamRef = gun:ws_upgrade(ConnPid, "/", []),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),  	Frame = {text, <<"Hello!">>}, -	gun:ws_send(ConnPid, Frame), +	gun:ws_send(ConnPid, StreamRef, Frame),  	{ws, Frame} = gun:await(ConnPid, StreamRef),  	gun:close(ConnPid). -error_http10_upgrade(Config) -> +headers_normalized_upgrade(Config) -> +	doc("Headers passed to ws_upgrade are normalized before being used."), +	Protocol = config(name, config(tc_group_properties, Config)), +	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{ +		protocols => [Protocol], +		http2_opts => #{notify_settings_changed => true} +	}), +	{ok, Protocol} = gun:await_up(ConnPid), +	do_await_enable_connect_protocol(Protocol, ConnPid), +	StreamRef = gun:ws_upgrade(ConnPid, "/", #{ +		atom_header_name => <<"value">>, +		"string_header_name" => <<"value">> +	}), +	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), +	gun:close(ConnPid). + +http10_upgrade_error(Config) ->  	doc("Attempting to upgrade HTTP/1.0 to Websocket produces an error."),  	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{  		http_opts => #{version => 'HTTP/1.0'} @@ -70,28 +102,7 @@ error_http10_upgrade(Config) ->  		error(timeout)  	end. -headers_normalized_upgrade(Config) -> -	doc("Headers passed to ws_upgrade are normalized before being used."), -	{ok, ConnPid} = gun:open("localhost", config(port, Config)), -	{ok, _} = gun:await_up(ConnPid), -	StreamRef = gun:ws_upgrade(ConnPid, "/", #{ -		atom_header_name => <<"value">>, -		"string_header_name" => <<"value">> -	}), -	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), -	gun:close(ConnPid). - -error_http_request(Config) -> -	doc("Ensure that requests are rejected while using Websocket."), -	{ok, ConnPid} = gun:open("localhost", config(port, Config)), -	{ok, _} = gun:await_up(ConnPid), -	StreamRef1 = gun:ws_upgrade(ConnPid, "/", []), -	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef1), -	StreamRef2 = gun:get(ConnPid, "/"), -	{error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2), -	gun:close(ConnPid). - -keepalive(Config) -> +http11_keepalive(Config) ->  	doc("Ensure that Gun automatically sends ping frames."),  	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{  		ws_opts => #{ @@ -106,7 +117,7 @@ keepalive(Config) ->  	{ws, pong} = gun:await(ConnPid, StreamRef),  	gun:close(ConnPid). -keepalive_default_silence_pings(Config) -> +http11_keepalive_default_silence_pings(Config) ->  	doc("Ensure that Gun does not forward ping/pong by default."),  	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{  		ws_opts => #{keepalive => 100} @@ -118,10 +129,25 @@ keepalive_default_silence_pings(Config) ->  	{error, timeout} = gun:await(ConnPid, StreamRef, 1000),  	gun:close(ConnPid). -reject_upgrade(Config) -> -	doc("Ensure Websocket connections can be rejected."), +http11_request_error(Config) -> +	doc("Ensure that HTTP/1.1 requests are rejected while using Websocket."),  	{ok, ConnPid} = gun:open("localhost", config(port, Config)),  	{ok, _} = gun:await_up(ConnPid), +	StreamRef1 = gun:ws_upgrade(ConnPid, "/", []), +	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef1), +	StreamRef2 = gun:get(ConnPid, "/"), +	{error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2), +	gun:close(ConnPid). + +reject_upgrade(Config) -> +	doc("Ensure Websocket connections can be rejected."), +	Protocol = config(name, config(tc_group_properties, Config)), +	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{ +		protocols => [Protocol], +		http2_opts => #{notify_settings_changed => true} +	}), +	{ok, Protocol} = gun:await_up(ConnPid), +	do_await_enable_connect_protocol(Protocol, ConnPid),  	StreamRef = gun:ws_upgrade(ConnPid, "/reject", []),  	receive  		{gun_response, ConnPid, StreamRef, nofin, 400, _} -> @@ -134,7 +160,7 @@ reject_upgrade(Config) ->  	end.  reply_to(Config) -> -	doc("Ensure we can send a list of frames in one gun:ws_send call."), +	doc("Ensure the reply_to request option is respected."),  	Self = self(),  	Frame = {text, <<"Hello!">>},  	ReplyTo = spawn(fun() -> @@ -144,36 +170,61 @@ reply_to(Config) ->  		{ws, Frame} = gun:await(ConnPid, StreamRef),  		Self ! {self(), ok}  	end), -	{ok, ConnPid} = gun:open("localhost", config(port, Config)), -	{ok, _} = gun:await_up(ConnPid), +	Protocol = config(name, config(tc_group_properties, Config)), +	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{ +		protocols => [Protocol], +		http2_opts => #{notify_settings_changed => true} +	}), +	{ok, Protocol} = gun:await_up(ConnPid), +	do_await_enable_connect_protocol(Protocol, ConnPid),  	StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{reply_to => ReplyTo}),  	ReplyTo ! {ConnPid, StreamRef}, -	receive {ReplyTo, ready} -> gun:ws_send(ConnPid, Frame) after 1000 -> error(timeout) end, +	receive {ReplyTo, ready} -> gun:ws_send(ConnPid, StreamRef, Frame) after 1000 -> error(timeout) end,  	receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end.  send_many(Config) ->  	doc("Ensure we can send a list of frames in one gun:ws_send call."), -	{ok, ConnPid} = gun:open("localhost", config(port, Config)), -	{ok, _} = gun:await_up(ConnPid), +	Protocol = config(name, config(tc_group_properties, Config)), +	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{ +		protocols => [Protocol], +		http2_opts => #{notify_settings_changed => true} +	}), +	{ok, Protocol} = gun:await_up(ConnPid), +	do_await_enable_connect_protocol(Protocol, ConnPid),  	StreamRef = gun:ws_upgrade(ConnPid, "/", []),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),  	Frame1 = {text, <<"Hello!">>},  	Frame2 = {binary, <<"World!">>}, -	gun:ws_send(ConnPid, [Frame1, Frame2]), +	gun:ws_send(ConnPid, StreamRef, [Frame1, Frame2]),  	{ws, Frame1} = gun:await(ConnPid, StreamRef),  	{ws, Frame2} = gun:await(ConnPid, StreamRef),  	gun:close(ConnPid).  send_many_close(Config) ->  	doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."), -	{ok, ConnPid} = gun:open("localhost", config(port, Config)), -	{ok, _} = gun:await_up(ConnPid), +	Protocol = config(name, config(tc_group_properties, Config)), +	{ok, ConnPid} = gun:open("localhost", config(port, Config), #{ +		protocols => [Protocol], +		http2_opts => #{notify_settings_changed => true} +	}), +	{ok, Protocol} = gun:await_up(ConnPid), +	do_await_enable_connect_protocol(Protocol, ConnPid),  	StreamRef = gun:ws_upgrade(ConnPid, "/", []),  	{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),  	Frame1 = {text, <<"Hello!">>},  	Frame2 = {binary, <<"World!">>}, -	gun:ws_send(ConnPid, [Frame1, Frame2, close]), +	gun:ws_send(ConnPid, StreamRef, [Frame1, Frame2, close]),  	{ws, Frame1} = gun:await(ConnPid, StreamRef),  	{ws, Frame2} = gun:await(ConnPid, StreamRef),  	{ws, close} = gun:await(ConnPid, StreamRef),  	gun:close(ConnPid). + +%% Internal. + +do_await_enable_connect_protocol(http, _) -> +	ok; +%% We cannot do a CONNECT :protocol request until the server tells us we can. +do_await_enable_connect_protocol(http2, ConnPid) -> +	{notify, settings_changed, #{enable_connect_protocol := true}} +		= gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1? +	ok. | 
