aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun_http.erl
blob: ca04b101e7206c7074d94e23e7ab393b4416314c (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                                                             














                                                                           
                           
                  
                  
                    
                   
                       
                     

                     
                     
                    
                         
                  
                        
 


                                   
                                                                                          
 
                            


                                                                          
                                                                                                                    
 
                 
                                                               
                          




                                                                



                                                  
                                                   
                                                      

                                                    
                                    
                          
                                                                    

                                                            

   
                      
                                             
 

                       




                                                            



                                                                    

                                                                          

                                                                                


                                        

               

                                                      
                                                                  
                                                                                      
                                                                                     
                                                                                      
 

                                         
                       


                                                                 


                                                                  
                                                   
                                                                   


                                                                              
                                                 
                                                        
                                                                

                                                                 

                                                            
                       
                                                                  
                                          
                                                         
                                                                                  
                                        
                                                                          
                                                                                      
                                                         
                                                                                  
                                        
                                                
                                                                                      
                                                         
                                                                                 
                                        




                                                   
                                                                               




                                                                                                        
                                                                                                  
                                                       
                                                                            
                            











                                                                                                        
                                                                            





                                                                               
                                                                   



                                                                                          

                                            
                                                                                                 
                                        
                                                                           







                                                                      
                                                        
                                                                               
                                        

                                            
                                                                      
                                    
                                                                         
                                                                             



                                                                   
                                                                      
                                    
                                                                              
                                                                             


                           

                                                                                   

                                                                        
                                                                      
                                                        
                                   

                                                                                                        













                                                                                               
                                           




                                                                                                            


                                                                                                                


                                                                                                              



                                                                                                            
                                                                  

                                                                                                  


                                                                                                                          

                                                                                                                                         

                                                                                                                                 
                                                            


                                                             
                                    


                                                                                        
                                                                                                                 
                                                          

                                                                                                         
                                           
                            


                                                                                               
                    
                                                                                        
                                                                       
                                                  
                                        
                                                  
                                       
                                                                                               
                                                                        


                                                                 
                                                                                                                

                                                                                           




                                                                      
                                                                           






                                                                                        

                                                                                                        
                                       


                                                                                                      
                           
            
 
                                                 


                                                         

                                         
                                                               





                                                                               
 

                                                               
                                                 
                            
                                         
                               
 
                    
           



                                                                 
                                             
                            
 


                                                                       
                                                                      
                                                                             
                                           

                   

              
                                                                              
                                                                                    
                                                                        
                                                                  
                                                                                     


                                                             

                                                    
                              
                                                                     


                                                                                    

                                                                                  
                                                                                           
 
                                                                              
                                                                                          

                                                                      
                                                                  
                                                                                     

                                
                                                           
                                                             
                                                    

                                                         
                                                                                    
                            
                       
                                                                                  
 



                                                               
                                                              







                                                        


                                                                            
                                 

                                                              
                               

                                                                
                                                    
                                                                           
                                                                              
                                   
                                                        
                                                       
                                   


                                                                                          
                                                                                                         

                                                                                

                                                                                        

                                                          
                                                                   
                                                                           
                                                                                        
                                              
                                                                           
                                                                     
                                                                      
                                          
                                                                               
                                                                                   
                                                                               
                                                                                             



                                                                     
                            
                    
                                                                         

            

































                                                                                                 
                                                                               
                                    



                                                        
                                                                         

            














                                                              


                                                                            
                                           
                                                   
                        
                                            

                            

                                                           


                                                        

                                                           




                                                
                                      
                                                           

                                                    


                                  




                                                                        

            






                                                                         
                                    

            

                                                
                                                                                      
             
                                                         





                                                                       
                    






                                                                                         




                           
                                                                             
                                        

                                                                          

                                                     
                                                         



                                                               
                                                     

                             
                                                    



                                                  





                                                                
                                                                                        
                                                                 


                                                                                                            
                                   
                                                    






                                                                                                         

                           
                    



                                                        
                         
          

                                                                  
                                                                                    
            
                                                                                       
                                                                    
                                                                                      
 
                                                                            





                                                                     




                                                                                         


                           
                                                                                  

                                                                         
                                                                                             

                                                                                                                                   



                                                                                                                   




















                                                                                                
                                                                              

                                                                       

                                                                                       



                                                                 

                                                                                    



                                             
 
                                                                                      
                                                                 







                                                          
                                                                                             
%% Copyright (c) 2014-2019, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun_http).

-export([check_options/1]).
-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/2]).
-export([keepalive/1]).
-export([headers/8]).
-export([request/9]).
-export([data/5]).
-export([connect/5]).
-export([cancel/3]).
-export([stream_info/2]).
-export([down/1]).
-export([ws_upgrade/7]).

%% Functions shared with gun_http2.
-export([host_header/3]).

-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer.

%% @todo Make that a record.
-type connect_info() :: {connect, reference(), gun:connect_destination()}.

%% @todo Make that a record.
-type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options

-record(stream, {
	ref :: reference() | connect_info() | websocket_info(),
	reply_to :: pid(),
	method :: binary(),
	is_alive :: boolean(),
	handler_state :: undefined | gun_content_handler:state()
}).

-record(http_state, {
	owner :: pid(),
	socket :: inet:socket() | ssl:sslsocket(),
	transport :: module(),
	version = 'HTTP/1.1' :: cow_http:version(),
	content_handlers :: gun_content_handler:opt(),
	connection = keepalive :: keepalive | close,
	buffer = <<>> :: binary(),
	streams = [] :: [#stream{}],
	in = head :: io(),
	in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()},
	out = head :: io(),
	transform_header_name :: fun((binary()) -> binary())
}).

check_options(Opts) ->
	do_check_options(maps:to_list(Opts)).

do_check_options([]) ->
	ok;
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
	case gun_content_handler:check_option(Handlers) of
		ok -> do_check_options(Opts);
		error -> {error, {options, {http, Opt}}}
	end;
do_check_options([{keepalive, infinity}|Opts]) ->
	do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
	do_check_options(Opts);
do_check_options([{transform_header_name, F}|Opts]) when is_function(F) ->
	do_check_options(Opts);
do_check_options([{version, V}|Opts]) when V =:= 'HTTP/1.1'; V =:= 'HTTP/1.0' ->
	do_check_options(Opts);
do_check_options([Opt|_]) ->
	{error, {options, {http, Opt}}}.

name() -> http.

init(Owner, Socket, Transport, Opts) ->
	Version = maps:get(version, Opts, 'HTTP/1.1'),
	Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
	TransformHeaderName = maps:get(transform_header_name, Opts, fun (N) -> N end),
	#http_state{owner=Owner, socket=Socket, transport=Transport, version=Version,
		content_handlers=Handlers, transform_header_name=TransformHeaderName}.

%% Stop looping when we got no more data.
handle(<<>>, State) ->
	{state, State};
%% Close when server responds and we don't have any open streams.
handle(_, #http_state{streams=[]}) ->
	close;
%% Wait for the full response headers before trying to parse them.
handle(Data, State=#http_state{in=head, buffer=Buffer}) ->
	Data2 = << Buffer/binary, Data/binary >>,
	case binary:match(Data2, <<"\r\n\r\n">>) of
		nomatch -> {state, State#http_state{buffer=Data2}};
		{_, _} -> handle_head(Data2, State#http_state{buffer= <<>>})
	end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}) ->
	{state, send_data_if_alive(Data, State, nofin)};
%% Chunked transfer-encoding may contain both data and trailers.
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
		buffer=Buffer, connection=Conn}) ->
	Buffer2 = << Buffer/binary, Data/binary >>,
	case cow_http_te:stream_chunked(Buffer2, InState) of
		more ->
			{state, State#http_state{buffer=Buffer2}};
		{more, Data2, InState2} ->
			{state, send_data_if_alive(Data2,
				State#http_state{buffer= <<>>, in_state=InState2},
				nofin)};
		{more, Data2, Length, InState2} when is_integer(Length) ->
			%% @todo See if we can recv faster than one message at a time.
			{state, send_data_if_alive(Data2,
				State#http_state{buffer= <<>>, in_state=InState2},
				nofin)};
		{more, Data2, Rest, InState2} ->
			%% @todo See if we can recv faster than one message at a time.
			{state, send_data_if_alive(Data2,
				State#http_state{buffer=Rest, in_state=InState2},
				nofin)};
		{done, HasTrailers, Rest} ->
			IsFin = case HasTrailers of
				trailers -> nofin;
				no_trailers -> fin
			end,
			%% I suppose it doesn't hurt to append an empty binary.
			State1 = send_data_if_alive(<<>>, State, IsFin),
			case {HasTrailers, Conn} of
				{trailers, _} ->
					handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer});
				{no_trailers, keepalive} ->
					handle(Rest, end_stream(State1#http_state{buffer= <<>>}));
				{no_trailers, close} ->
					[{state, end_stream(State1)}, close]
			end;
		{done, Data2, HasTrailers, Rest} ->
			IsFin = case HasTrailers of
				trailers -> nofin;
				no_trailers -> fin
			end,
			State1 = send_data_if_alive(Data2, State, IsFin),
			case {HasTrailers, Conn} of
				{trailers, _} ->
					handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer});
				{no_trailers, keepalive} ->
					handle(Rest, end_stream(State1#http_state{buffer= <<>>}));
				{no_trailers, close} ->
					[{state, end_stream(State1)}, close]
			end
	end;
handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
		streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) ->
	Data2 = << Buffer/binary, Data/binary >>,
	case binary:match(Data2, <<"\r\n\r\n">>) of
		nomatch -> {state, State#http_state{buffer=Data2}};
		{_, _} ->
			{Trailers, Rest} = cow_http:parse_headers(Data2),
			%% @todo We probably want to pass this to gun_content_handler?
			ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers},
			case Conn of
				keepalive ->
					handle(Rest, end_stream(State#http_state{buffer= <<>>}));
				close ->
					[{state, end_stream(State)}, close]
			end
	end;
%% We know the length of the rest of the body.
handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
	DataSize = byte_size(Data),
	if
		%% More data coming.
		DataSize < Length ->
			{state, send_data_if_alive(Data,
				State#http_state{in={body, Length - DataSize}},
				nofin)};
		%% Stream finished, no rest.
		DataSize =:= Length ->
			State1 = send_data_if_alive(Data, State, fin),
			case Conn of
				keepalive -> {state, end_stream(State1)};
				close -> [{state, end_stream(State1)}, close]
			end;
		%% Stream finished, rest.
		true ->
			<< Body:Length/binary, Rest/bits >> = Data,
			State1 = send_data_if_alive(Body, State, fin),
			case Conn of
				keepalive -> handle(Rest, end_stream(State1));
				close -> [{state, end_stream(State1)}, close]
			end
	end.

handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
		version=ClientVersion, content_handlers=Handlers0, connection=Conn,
		streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
			method=Method, is_alive=IsAlive}|Tail]}) ->
	{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
	{Headers, Rest2} = cow_http:parse_headers(Rest),
	case {Status, StreamRef} of
		{101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
			ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts);
		{_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
			case IsAlive of
				false ->
					ok;
				true ->
					ReplyTo ! {gun_response, self(), RealStreamRef,
						fin, Status, Headers},
					ok
			end,
			%% We expect there to be no additional data after the CONNECT response.
			<<>> = Rest2,
			State2 = end_stream(State#http_state{streams=[Stream|Tail]}),
			NewHost = maps:get(host, Destination),
			NewPort = maps:get(port, Destination),
			case Destination of
				#{transport := tls} when Transport =:= gun_tls ->
					TLSOpts = maps:get(tls_opts, Destination, []),
					TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity),
					{ok, ProxyPid} = gun_tls_proxy:start_link(NewHost, NewPort,
						TLSOpts, TLSTimeout, Socket, gun_tls),
					%% In this case the switch_protocol is delayed and is handled by
					%% a message sent from gun_tls_proxy once the connection is established,
					%% and handled by the gun module directly.
					[{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}},
						{origin, <<"https">>, NewHost, NewPort, connect},
						{switch_transport, gun_tls_proxy, ProxyPid}];
				#{transport := tls} ->
					TLSOpts = maps:get(tls_opts, Destination, []),
					TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity),
					case gun_tls:connect(Socket, TLSOpts, TLSTimeout) of
						{ok, TLSSocket} ->
							case ssl:negotiated_protocol(TLSSocket) of
								{ok, <<"h2">>} ->
									[{origin, <<"https">>, NewHost, NewPort, connect},
										{switch_transport, gun_tls, TLSSocket},
										{switch_protocol, gun_http2, State2}];
								_ ->
									[{state, State2#http_state{socket=TLSSocket, transport=gun_tls}},
										{origin, <<"https">>, NewHost, NewPort, connect},
										{switch_transport, gun_tls, TLSSocket}]
							end;
						Error ->
							Error
					end;
				_ ->
					case maps:get(protocols, Destination, [http]) of
						[http] ->
							[{state, State2},
								{origin, <<"http">>, NewHost, NewPort, connect}];
						[http2] ->
							[{origin, <<"http">>, NewHost, NewPort, connect},
								{switch_protocol, gun_http2, State2}]
					end
			end;
		{_, _} when Status >= 100, Status =< 199 ->
			ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
			handle(Rest2, State);
		_ ->
			In = response_io_from_headers(Method, Version, Status, Headers),
			IsFin = case In of head -> fin; _ -> nofin end,
			Handlers = case IsAlive of
				false ->
					undefined;
				true ->
					ReplyTo ! {gun_response, self(), stream_ref(StreamRef),
						IsFin, Status, Headers},
					case IsFin of
						fin -> undefined;
						nofin ->
							gun_content_handler:init(ReplyTo, stream_ref(StreamRef),
								Status, Headers, Handlers0)
					end
			end,
			Conn2 = if
				Conn =:= close -> close;
				Version =:= 'HTTP/1.0' -> close;
				ClientVersion =:= 'HTTP/1.0' -> close;
				true -> conn_from_headers(Version, Headers)
			end,
			%% We always reset in_state even if not chunked.
			if
				IsFin =:= fin, Conn2 =:= close ->
					close;
				IsFin =:= fin ->
					handle(Rest2, end_stream(State#http_state{in=In,
						in_state={0, 0}, connection=Conn2,
						streams=[Stream#stream{handler_state=Handlers}|Tail]}));
				true ->
					handle(Rest2, State#http_state{in=In,
						in_state={0, 0}, connection=Conn2,
						streams=[Stream#stream{handler_state=Handlers}|Tail]})
			end
	end.

stream_ref({connect, StreamRef, _}) -> StreamRef;
stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.

send_data_if_alive(<<>>, State, nofin) ->
	State;
%% @todo What if we receive data when the HEAD method was used?
send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
		is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) ->
	Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
	State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]};
send_data_if_alive(_, State, _) ->
	State.

%% @todo Use Reason.
close(_, State=#http_state{in=body_close, streams=[_|Tail]}) ->
	_ = send_data_if_alive(<<>>, State, fin),
	close_streams(Tail);
close(_, #http_state{streams=Streams}) ->
	close_streams(Streams).

close_streams([]) ->
	ok;
close_streams([#stream{is_alive=false}|Tail]) ->
	close_streams(Tail);
close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
	ReplyTo ! {gun_error, self(), StreamRef, {closed,
		"The connection was lost."}},
	close_streams(Tail).

%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) ->
	State;
%% We can only keep-alive by sending an empty line in-between streams.
keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
	Transport:send(Socket, <<"\r\n">>),
	State;
keepalive(State) ->
	State.

headers(State=#http_state{socket=Socket, transport=Transport, version=Version,
		out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
	Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers),
	Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
		false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers2];
		true -> Headers2
	end,
	%% We use Headers2 because this is the smallest list.
	Conn = conn_from_headers(Version, Headers2),
	Out = request_io_from_headers(Headers2),
	Headers4 = case Out of
		body_chunked when Version =:= 'HTTP/1.0' -> Headers3;
		body_chunked -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers3];
		_ -> Headers3
	end,
	Headers5 = transform_header_names(State, Headers4),
	Transport:send(Socket, cow_http:request(Method, Path, Version, Headers5)),
	new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method).

request(State=#http_state{socket=Socket, transport=Transport, version=Version,
		out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
	Headers2 = lists:keydelete(<<"content-length">>, 1,
		lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
	Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
		false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers2];
		true -> Headers2
	end,
	Headers4 = transform_header_names(State, Headers3),
	%% We use Headers2 because this is the smallest list.
	Conn = conn_from_headers(Version, Headers2),
	Transport:send(Socket, [
		cow_http:request(Method, Path, Version, [
			{<<"content-length">>, integer_to_binary(iolist_size(Body))}
		|Headers4]),
		Body]),
	new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method).

host_header(Transport, Host0, Port) ->
	Host = case Host0 of
		{local, _SocketPath} -> <<>>;
		Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
		Atom when is_atom(Atom) -> atom_to_list(Atom);
		_ -> Host0
	end,
	case {Transport:name(), Port} of
		{tcp, 80} -> Host;
		{tls, 443} -> Host;
		_ -> [Host, $:, integer_to_binary(Port)]
	end.

transform_header_names(#http_state{transform_header_name = Fun}, Headers) ->
	lists:keymap(Fun, 1, Headers).

%% We are expecting a new stream.
data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) ->
	error_stream_closed(State, StreamRef, ReplyTo);
%% There are no active streams.
data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) ->
	error_stream_not_found(State, StreamRef, ReplyTo);
%% We can only send data on the last created stream.
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
		out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) ->
	case lists:last(Streams) of
		#stream{ref=StreamRef, is_alive=true} ->
			DataLength = iolist_size(Data),
			case Out of
				body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin ->
					case Data of
						<<>> ->
							Transport:send(Socket, cow_http_te:last_chunk());
						_ ->
							Transport:send(Socket, [
								cow_http_te:chunk(Data),
								cow_http_te:last_chunk()
							])
					end,
					State#http_state{out=head};
				body_chunked when Version =:= 'HTTP/1.1' ->
					Transport:send(Socket, cow_http_te:chunk(Data)),
					State;
				{body, Length} when DataLength =< Length ->
					Transport:send(Socket, Data),
					Length2 = Length - DataLength,
					if
						Length2 =:= 0, IsFin =:= fin ->
							State#http_state{out=head};
						Length2 > 0, IsFin =:= nofin ->
							State#http_state{out={body, Length2}}
					end;
				body_chunked -> %% HTTP/1.0
					Transport:send(Socket, Data),
					State
			end;
		_ ->
			error_stream_not_found(State, StreamRef, ReplyTo)
	end.

connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
	State;
connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
		StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) ->
	Host = case Host0 of
		Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
		_ -> Host0
	end,
	Port = maps:get(port, Destination, 1080),
	Authority = [Host, $:, integer_to_binary(Port)],
	Headers1 = lists:keydelete(<<"content-length">>, 1,
		lists:keydelete(<<"transfer-encoding">>, 1, Headers0)),
	Headers2 = case lists:keymember(<<"host">>, 1, Headers1) of
		false -> [{<<"host">>, Authority}|Headers1];
		true -> Headers1
	end,
	HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers2),
	Headers3 = case {HasProxyAuthorization, Destination} of
		{false, #{username := UserID, password := Password}} ->
			[{<<"proxy-authorization">>, [
					<<"Basic ">>,
					base64:encode(iolist_to_binary([UserID, $:, Password]))]}
				|Headers2];
		_ ->
			Headers2
	end,
	Headers = transform_header_names(State, Headers3),
	Transport:send(Socket, [
		cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
	]),
	new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>).

%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State, StreamRef, ReplyTo) ->
	case is_stream(State, StreamRef) of
		true ->
			cancel_stream(State, StreamRef);
		false ->
			error_stream_not_found(State, StreamRef, ReplyTo)
	end.

stream_info(#http_state{streams=Streams}, StreamRef) ->
	case lists:keyfind(StreamRef, #stream.ref, Streams) of
		#stream{reply_to=ReplyTo, is_alive=IsAlive} ->
			{ok, #{
				ref => StreamRef,
				reply_to => ReplyTo,
				state => case IsAlive of
					true -> running;
					false -> stopping
				end
			}};
		false ->
			{ok, undefined}
	end.

%% HTTP does not provide any way to figure out what streams are unprocessed.
down(#http_state{streams=Streams}) ->
	KilledStreams = [case Ref of
		{connect, Ref2, _} -> Ref2;
		{websocket, Ref2, _, _, _} -> Ref2;
		_ -> Ref
	end || #stream{ref=Ref} <- Streams],
	{KilledStreams, []}.

error_stream_closed(State, StreamRef, ReplyTo) ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"The stream has already been closed."}},
	State.

error_stream_not_found(State, StreamRef, ReplyTo) ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"The stream cannot be found."}},
	State.

%% Headers information retrieval.

conn_from_headers(Version, Headers) ->
	case lists:keyfind(<<"connection">>, 1, Headers) of
		false when Version =:= 'HTTP/1.0' ->
			close;
		false ->
			keepalive;
		{_, ConnHd} ->
			ConnList = cow_http_hd:parse_connection(ConnHd),
			case lists:member(<<"keep-alive">>, ConnList) of
				true -> keepalive;
				false -> close
			end
	end.

request_io_from_headers(Headers) ->
	case lists:keyfind(<<"content-length">>, 1, Headers) of
		{_, <<"0">>} ->
			head;
		{_, Length} ->
			{body, cow_http_hd:parse_content_length(Length)};
		_ ->
			body_chunked
	end.

response_io_from_headers(<<"HEAD">>, _, _, _) ->
	head;
response_io_from_headers(_, _, Status, _) when (Status =:= 204) or (Status =:= 304) ->
	head;
response_io_from_headers(_, Version, _Status, Headers) ->
	case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of
		{_, TE} when Version =:= 'HTTP/1.1' ->
			case cow_http_hd:parse_transfer_encoding(TE) of
				[<<"chunked">>] -> body_chunked;
				[<<"identity">>] -> body_close
			end;
		_ ->
			case lists:keyfind(<<"content-length">>, 1, Headers) of
				{_, <<"0">>} ->
					head;
				{_, Length} ->
					{body, cow_http_hd:parse_content_length(Length)};
				_ ->
					body_close
			end
	end.

%% Streams.

new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) ->
	State#http_state{streams=Streams
		++ [#stream{ref=StreamRef, reply_to=ReplyTo,
			method=iolist_to_binary(Method), is_alive=true}]}.

is_stream(#http_state{streams=Streams}, StreamRef) ->
	lists:keymember(StreamRef, #stream.ref, Streams).

cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
	Streams2 = [case Ref of
		StreamRef ->
			Tuple#stream{is_alive=false};
		_ ->
			Tuple
	end || Tuple = #stream{ref=Ref} <- Streams],
	State#http_state{streams=Streams2}.

end_stream(State=#http_state{streams=[_|Tail]}) ->
	State#http_state{in=head, streams=Tail}.

%% Websocket upgrade.

%% Ensure version is 1.1.
ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
	error; %% @todo
ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head},
		StreamRef, Host, Port, Path, Headers0, WsOpts) ->
	{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
		true -> {[{<<"sec-websocket-extensions">>,
				<<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}
			|Headers0],
			[<<"permessage-deflate">>]};
		false -> {Headers0, []}
	end,
	Headers2 = case maps:get(protocols, WsOpts, []) of
		[] -> Headers1;
		ProtoOpt ->
			<< _, _, Proto/bits >> = iolist_to_binary([[<<", ">>, P] || {P, _} <- ProtoOpt]),
			[{<<"sec-websocket-protocol">>, Proto}|Headers1]
	end,
	Key = cow_ws:key(),
	Headers3 = [
		{<<"connection">>, <<"upgrade">>},
		{<<"upgrade">>, <<"websocket">>},
		{<<"sec-websocket-version">>, <<"13">>},
		{<<"sec-websocket-key">>, Key}
		|Headers2
	],
	Headers = case lists:keymember(<<"host">>, 1, Headers0) of
		true -> Headers3;
		false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers3]
	end,
	Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)),
	new_stream(State#http_state{connection=keepalive, out=head},
		{websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>).

ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
	%% @todo check upgrade, connection
	case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
		false ->
			close;
		{_, Accept} ->
			case cow_ws:encode_key(Key) of
				Accept ->
					ws_handshake_extensions(Buffer, State, StreamRef,
						Headers, GunExtensions, Opts);
				_ ->
					close
			end
	end.

ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) ->
	case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
		false ->
			ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts);
		{_, ExtHd} ->
			case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
				close ->
					close;
				Extensions ->
					ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts)
			end
	end.

ws_validate_extensions([], _, Acc, _) ->
	Acc;
ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) ->
	case lists:member(Name, GunExts) of
		true ->
			case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of
				{ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts);
				error -> close
			end;
		%% Fail the connection if extension was not requested.
		false ->
			close
	end;
%% Fail the connection on unknown extension.
ws_validate_extensions(_, _, _, _) ->
	close.

%% @todo Validate protocols.
ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
	case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
		false ->
			ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions,
				maps:get(default_protocol, Opts, gun_ws_h), Opts);
		{_, Proto} ->
			ProtoOpt = maps:get(protocols, Opts, []),
			case lists:keyfind(Proto, 1, ProtoOpt) of
				{_, Handler} ->
					ws_handshake_end(Buffer, State, StreamRef,
						Headers, Extensions, Handler, Opts);
				false ->
					close
			end
	end.

ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport},
		StreamRef, Headers, Extensions, Handler, Opts) ->
	%% Send ourselves the remaining buffer, if any.
	_ = case Buffer of
		<<>> ->
			ok;
		_ ->
			{OK, _, _} = Transport:messages(),
			self() ! {OK, Socket, Buffer}
	end,
	gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts).