aboutsummaryrefslogblamecommitdiffstats
path: root/src/cowboy_http.erl
blob: b1c8dbe73707ebf2e4bd0da68a8c0bc31c17031b (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                                                             














                                                                           



                                                                      
                  




                                




                                               
                                    
                           
                                                  











                                                     







                                            
                                                      



                                                                                 
                                                

   



                                                                                 

                  

                                                





                                                           
                                                   

                                               

                                       

                                                     

                                                                    



                                                                         
                           
                                              









                                
 


                                                                    





                                                                    












                                                                                        
                                                              



                                                           
                                             

                                   
                                                 
                                                                       




                                              

                                                                             














                                                        
                                                                          
                                                       
                                                                      
                                                               










                                                                                                       
            
 





                                                                                      
                                                                               

                                                                          
                                                   
                                                                       
               



                                                                 

                                     





                                                                                         




                                                                                                         


                                                                             





                                               
                                                                                               








                                                                                                   

                                                                                              


                                            
                                                                                         
                                           
                                  


                                                                                                          







                                                           



                                                                    

                                    
                                               





                                                                                  
                                   
                                                                      

                                                             
                                                                




                                                                      
 

                                 




                                                                              













                                                                                                
                                               
 

                                                                   

                                                                       
                                          

                                                                          
                                                                                 
                                                                                  


                                                                                               
                                                    
                                                                          
                                
                                                             
                                              
                                                                          


                                                                              


                                                                             
                                                                 
                                                                                                  
                                                                      



                                                                                                 
                                
                                                             
                                                              
                                                                          

                                                                                  
            

                                                                         






                                              




                                                                                        


                                                                      
                                                                                                  

                                                                      
                                                                                               

                                                                          
                                                                                     




                                                                                     
                                                                                                                 



                                                                                                       
                                                                                                            






                                                                                                         
                                                                                                            


                                                                                                                     


                                                                                                                   


                                                                                                   
                                                                     


















                                                                                         
                                                                                                    








                                                                                                       
                                                 


                                                                             
                                                 
                                                
                                                               



                                                                                         










                                                                                                                     
                 








                                                                                                             








                                                                                                           

            
                                                                    

                                                                                     
                                                                                                     



                                                                                        

            
                                                           

                                                                                     
                                                                                                     


                                                                               

            
                                                         

                                                                                     
                                                                                                     

                                                               

            




                                                                                            

                                                                                      
                                                                              
                                                                      
                                                                                                          
                                      
                                                                      
                                                            
 
                                                   
                                                           
                                                                       








                                                                                  
                                                      
                                        










                                                                                                                                  

                                                             


                                                                                                                          














                                                                                            


                                                                                    




                                                                                                             


                                                     







                                                                            


                                                                                                                           





                                                                                              



                                                                      

                                                                                                                


                                                                                    





































                                                                                           
                                                                                          


                                                                                             


                                                                                                        

                                                                                                



























                                                                                                                                  





                                          

                          


                                                                                            
                                        



                                           














                                                                                                                            
                                                     
                                                                   



                                                                           
                                                                                                           

                                                                                                         
                            
                                                                                                 
                    
                                                                  




                                     
                             

                             


                                 

                             




                                                                            

                                         
          
                                                  
                        

                                               
                                                                       
                                                                    

                                                                                    
                                           
                                        
                                                                                                           

                                                      
                                        



                                                                                           
            
 

                  
                                                          


                                                                      


                                                                                               

                                                                  
                                                              








                                             
                                                        
                                                                                      
                                                                       

                                  
                                                  

                                                                               




                                                                                                     
                                   
                                                                                      
                                                                                           



                                                                                     

                                                              
                                                  

                                                                               

                                                                              
                                                                                                                          
            
 


                                                               

                                                                           
                                 
                                       





                                                                           

                                                                               


                                                                           

                                                                               


                                                                           

                                                                               
                                             
                                                                
                                                                                                           
                                                   
                                                                
                                                                                                          



                                                                            



                    
                                                              


                                                     
                                                       

                                               
                                                                            

                                           

                                                                                                


                             
                                                                 

                                                             
                                                                              



                                                                                                  
                                                
                                                                             
                                                                      
                                                                                          

                                                                                                  

                            

                                                                                           


                             





                                                                                     

                                                                                             




















                                                                                      
                                                                   








                                                                                                    

                                                              














                                                                                           




                                                                              
                                                                                                      





                                                                                
                                                                                       


                                                         
                                                                                   

                                                                 
                                                                             
            


                                                                                                 
                                                                   




                                                                                          
                                                                 
                                             
                                                                                                               








                                                                                                  
            
                                                      



                                                                           
                                                                                                    
                                        


                                                            
                                                                             

                                                                                                 


                                                                   

                                                                

                                                                                            

                                                                   
                                           
                                                                  


                                                                                

                                


                                                                                                    



                                                                   





                                                                                              











                                                                                                                    
                           
            



                                                    




                                                                                               











                                                                         
               
                                                                     
                                                                 
                                                                              















                                                                       
                                  






                                                                               

                                                                                  
                                                                            


                                                                                                   
                                       






                                                                           




                                                                       

                                                                                  
                                                                             
                                                                                          
                                            
                      

                                                                                             
                                   



                                                                                    






                                                                 



                                                                         









                                                                            
 












                                                                 
 
                                                                               
                                                                         
                                                                                         
                                          

                                                  
                                                                    

                                                              


                                                                   
                                   

                                                                             
 


                                                                                   

                                                                                            
                                                            

                                                                                                          

                                                                                                          
                       
                                                                           
                                                      
                                                                  

                                                                   
                                                      
                              
            




                                                                
                                                                     
                                                                 

                                                                       

                                          
            


                                                                        
                                                      

                                                                          

                                                                  
                                                                                                             

                                                                                                      














                                                                                                                 

            
                                                                          
           
                                                                      
                                
                                                                  
                                                        
                                                                         

            







































                                                                                                     
                          

                         
                                      
                    
                                






                                                                                  
                                                                     
                                                                        
                                                                                              
                                        



                                        













                                                                          
                                                                                               

                                 


                                                                
                                                                   


                                                                                      



                                                                                                     

                                                                                            


                                        
                                                                    
                                                                   
                                                                          


                                                                                     
                                                                
            
           
 
                                     

                                 
                                                                      
                                                      
                                            
                                
                                 
 
                                  
           


                                                                                       
 





































                                                                                                       

                    
                                                        


                                         

                                                                         
                                                                                  



                                                                                      
%% Copyright (c) 2016-2017, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(cowboy_http).

-ifdef(OTP_RELEASE).
-compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
-endif.

-export([init/5]).

-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).

-type opts() :: #{
	connection_type => worker | supervisor,
	env => cowboy_middleware:env(),
	idle_timeout => timeout(),
	inactivity_timeout => timeout(),
	linger_timeout => timeout(),
	logger => module(),
	max_authority_length => non_neg_integer(),
	max_empty_lines => non_neg_integer(),
	max_header_name_length => non_neg_integer(),
	max_header_value_length => non_neg_integer(),
	max_headers => non_neg_integer(),
	max_keepalive => non_neg_integer(),
	max_method_length => non_neg_integer(),
	max_request_line_length => non_neg_integer(),
	middlewares => [module()],
	request_timeout => timeout(),
	shutdown_timeout => timeout(),
	stream_handlers => [module()]
}.
-export_type([opts/0]).

-record(ps_request_line, {
	empty_lines = 0 :: non_neg_integer()
}).

-record(ps_header, {
	method = undefined :: binary(),
	authority = undefined :: binary() | undefined,
	path = undefined :: binary(),
	qs = undefined :: binary(),
	version = undefined :: cowboy:http_version(),
	headers = undefined :: map() | undefined, %% @todo better type than map()
	name = undefined :: binary() | undefined
}).

%% @todo We need a state where we wait for the stream process to ask for the body
%% and do not attempt to read from the socket while in that state (we should read
%% up to a certain length, and then wait, basically implementing flow control but
%% by not reading from the socket when the window is empty).

-record(ps_body, {
	length :: non_neg_integer() | undefined,
	received = 0 :: non_neg_integer(),
	%% @todo flow
	transfer_decode_fun :: fun(), %% @todo better type
	transfer_decode_state :: any() %% @todo better type
}).

-record(stream, {
	id = undefined :: cowboy_stream:streamid(),
	%% Stream handlers and their state.
	state = undefined :: {module(), any()},
	%% Request method.
	method = undefined :: binary(),
	%% Client HTTP version for this stream.
	version = undefined :: cowboy:http_version(),
	%% Unparsed te header. Used to know if we can send trailers.
	te :: undefined | binary(),
	%% Expected body size.
	local_expected_size = undefined :: undefined | non_neg_integer(),
	%% Sent body size.
	local_sent_size = 0 :: non_neg_integer(),
	%% Commands queued.
	queue = [] :: cowboy_stream:commands()
}).

-type stream() :: #stream{}.

-record(state, {
	parent :: pid(),
	ref :: ranch:ref(),
	socket :: inet:socket(),
	transport :: module(),
	opts = #{} :: map(),

	%% Remote address and port for the connection.
	peer = undefined :: {inet:ip_address(), inet:port_number()},

	%% Local address and port for the connection.
	sock = undefined :: {inet:ip_address(), inet:port_number()},

	%% Client certificate (TLS only).
	cert :: undefined | binary(),

	timer = undefined :: undefined | reference(),

	%% Identifier for the stream currently being read (or waiting to be received).
	in_streamid = 1 :: pos_integer(),

	%% Parsing state for the current stream or stream-to-be.
	in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},

	%% Identifier for the stream currently being written.
	%% Note that out_streamid =< in_streamid.
	out_streamid = 1 :: pos_integer(),

	%% Whether we finished writing data for the current stream.
	out_state = wait :: wait | chunked | streaming | done,

	%% The connection will be closed after this stream.
	last_streamid = undefined :: pos_integer(),

	%% Currently active HTTP/1.1 streams.
	streams = [] :: [stream()],

	%% Children processes created by streams.
	children = cowboy_children:init() :: cowboy_children:children()
}).

-include_lib("cowlib/include/cow_inline.hrl").
-include_lib("cowlib/include/cow_parse.hrl").

-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
init(Parent, Ref, Socket, Transport, Opts) ->
	Peer0 = Transport:peername(Socket),
	Sock0 = Transport:sockname(Socket),
	Cert1 = case Transport:name() of
		ssl ->
			case ssl:peercert(Socket) of
				{error, no_peercert} ->
					{ok, undefined};
				Cert0 ->
					Cert0
			end;
		_ ->
			{ok, undefined}
	end,
	case {Peer0, Sock0, Cert1} of
		{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
			LastStreamID = maps:get(max_keepalive, Opts, 100),
			before_loop(set_timeout(#state{
				parent=Parent, ref=Ref, socket=Socket,
				transport=Transport, opts=Opts,
				peer=Peer, sock=Sock, cert=Cert,
				last_streamid=LastStreamID}), <<>>);
		{{error, Reason}, _, _} ->
			terminate(undefined, {socket_error, Reason,
				'A socket error occurred when retrieving the peer name.'});
		{_, {error, Reason}, _} ->
			terminate(undefined, {socket_error, Reason,
				'A socket error occurred when retrieving the sock name.'});
		{_, _, {error, Reason}} ->
			terminate(undefined, {socket_error, Reason,
				'A socket error occurred when retrieving the client TLS certificate.'})
	end.

before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
	%% @todo disable this when we get to the body, until the stream asks for it?
	%% Perhaps have a threshold for how much we're willing to read before waiting.
	Transport:setopts(Socket, [{active, once}]),
	loop(State, Buffer).

loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
		timer=TimerRef, children=Children, in_streamid=InStreamID,
		last_streamid=LastStreamID, streams=Streams}, Buffer) ->
	{OK, Closed, Error} = Transport:messages(),
	InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
	receive
		%% Discard data coming in after the last request
		%% we want to process was received fully.
		{OK, Socket, _} when InStreamID > LastStreamID ->
			before_loop(State, Buffer);
		%% Socket messages.
		{OK, Socket, Data} ->
			%% Only reset the timeout if it is idle_timeout (active streams).
			State1 = case Streams of
				[] -> State;
				_ -> set_timeout(State)
			end,
			parse(<< Buffer/binary, Data/binary >>, State1);
		{Closed, Socket} ->
			terminate(State, {socket_error, closed, 'The socket has been closed.'});
		{Error, Socket, Reason} ->
			terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
		%% Timeouts.
		{timeout, Ref, {shutdown, Pid}} ->
			cowboy_children:shutdown_timeout(Children, Ref, Pid),
			loop(State, Buffer);
		{timeout, TimerRef, Reason} ->
			timeout(State, Reason);
		{timeout, _, _} ->
			loop(State, Buffer);
		%% System messages.
		{'EXIT', Parent, Reason} ->
			terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
		%% Messages pertaining to a stream.
		{{Pid, StreamID}, Msg} when Pid =:= self() ->
			loop(info(State, StreamID, Msg), Buffer);
		%% Exit signal from children.
		Msg = {'EXIT', Pid, _} ->
			loop(down(State, Pid, Msg), Buffer);
		%% Calls from supervisor module.
		{'$gen_call', From, Call} ->
			cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
			loop(State, Buffer);
		%% Unknown messages.
		Msg ->
			cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
			loop(State, Buffer)
	after InactivityTimeout ->
		terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
	end.

%% We set request_timeout when there are no active streams,
%% and idle_timeout otherwise.
set_timeout(State0=#state{opts=Opts, streams=Streams}) ->
	State = cancel_timeout(State0),
	{Name, Default} = case Streams of
		[] -> {request_timeout, 5000};
		_ -> {idle_timeout, 60000}
	end,
	TimerRef = case maps:get(Name, Opts, Default) of
		infinity -> undefined;
		Timeout -> erlang:start_timer(Timeout, self(), Name)
	end,
	State#state{timer=TimerRef}.

cancel_timeout(State=#state{timer=TimerRef}) ->
	ok = case TimerRef of
		undefined -> ok;
		_ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
	end,
	State#state{timer=undefined}.

-spec timeout(_, _) -> no_return().
timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
	terminate(State, {connection_error, timeout,
		'No request-line received before timeout.'});
timeout(State=#state{in_state=#ps_header{}}, request_timeout) ->
	error_terminate(408, State, {connection_error, timeout,
		'Request headers not received before timeout.'});
timeout(State, idle_timeout) ->
	terminate(State, {connection_error, timeout,
		'Connection idle longer than configuration allows.'}).

parse(<<>>, State) ->
	before_loop(State, <<>>);
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
		last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
	before_loop(State, <<>>);
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
	after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
	after_parse(parse_header(Buffer,
		State#state{in_state=PS#ps_header{headers=undefined}},
		Headers));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
	after_parse(parse_hd_before_value(Buffer,
		State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
		Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
	%% @todo We do not want to get the body automatically if the request doesn't ask for it.
	%% We may want to get bodies that are below a threshold without waiting, and buffer them
	%% until the request asks, though.
	after_parse(parse_body(Buffer, State)).

after_parse({request, Req=#{streamid := StreamID, method := Method,
		headers := Headers, version := Version},
		State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
	try cowboy_stream:init(StreamID, Req, Opts) of
		{Commands, StreamState} ->
			TE = maps:get(<<"te">>, Headers, undefined),
			Streams = [#stream{id=StreamID, state=StreamState,
				method=Method, version=Version, te=TE}|Streams0],
			State1 = case maybe_req_close(State0, Headers, Version) of
				close -> State0#state{streams=Streams, last_streamid=StreamID};
				keepalive -> State0#state{streams=Streams}
			end,
			State = set_timeout(State1),
			parse(Buffer, commands(State, StreamID, Commands))
	catch Class:Exception ->
		cowboy:log(cowboy_stream:make_error_log(init,
			[StreamID, Req, Opts],
			Class, Exception, erlang:get_stacktrace()), Opts),
		early_error(500, State0, {internal_error, {Class, Exception},
			'Unhandled exception in cowboy_stream:init/3.'}, Req),
		parse(Buffer, State0)
	end;
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts,
		streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
	try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
		{Commands, StreamState} ->
			Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
				Stream#stream{state=StreamState}),
			parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
	catch Class:Exception ->
		cowboy:log(cowboy_stream:make_error_log(data,
			[StreamID, IsFin, Data, StreamState0],
			Class, Exception, erlang:get_stacktrace()), Opts),
		stream_reset(State, StreamID, {internal_error, {Class, Exception},
			'Unhandled exception in cowboy_stream:data/4.'})
	end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
after_parse({data, _, _, _, State, Buffer}) ->
	before_loop(State, Buffer);
after_parse({more, State, Buffer}) ->
	before_loop(State, Buffer).

%% Request-line.

-spec parse_request(Buffer, State, non_neg_integer())
	-> {request, cowboy_req:req(), State, Buffer}
	| {data, cowboy_stream:streamid(), cowboy_stream:fin(), binary(), State, Buffer}
	| {more, State, Buffer}
	when Buffer::binary(), State::#state{}.
%% Empty lines must be using \r\n.
parse_request(<< $\n, _/bits >>, State, _) ->
	error_terminate(400, State, {connection_error, protocol_error,
		'Empty lines between requests must use the CRLF line terminator. (RFC7230 3.5)'});
parse_request(<< $\s, _/bits >>, State, _) ->
	error_terminate(400, State, {connection_error, protocol_error,
		'The request-line must not begin with a space. (RFC7230 3.1.1, RFC7230 3.5)'});
%% We limit the length of the Request-line to MaxLength to avoid endlessly
%% reading from the socket and eventually crashing.
parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLines) ->
	MaxLength = maps:get(max_request_line_length, Opts, 8000),
	MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
	case match_eol(Buffer, 0) of
		nomatch when byte_size(Buffer) > MaxLength ->
			error_terminate(414, State, {connection_error, limit_reached,
				'The request-line length is larger than configuration allows. (RFC7230 3.1.1)'});
		nomatch ->
			{more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
		1 when EmptyLines =:= MaxEmptyLines ->
			error_terminate(400, State, {connection_error, limit_reached,
				'More empty lines were received than configuration allows. (RFC7230 3.5)'});
		1 ->
			<< _:16, Rest/bits >> = Buffer,
			parse_request(Rest, State, EmptyLines + 1);
		_ ->
			case Buffer of
				%% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
				<< "OPTIONS * ", Rest/bits >> ->
					parse_version(Rest, State, <<"OPTIONS">>, undefined, <<"*">>, <<>>);
				<<"CONNECT ", _/bits>> ->
					error_terminate(501, State, {connection_error, no_error,
						'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'});
				<<"TRACE ", _/bits>> ->
					error_terminate(501, State, {connection_error, no_error,
						'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
				%% Accept direct HTTP/2 only at the beginning of the connection.
				<< "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
					%% @todo Might be worth throwing to get a clean stacktrace.
					http2_upgrade(State, Buffer);
				_ ->
					parse_method(Buffer, State, <<>>,
						maps:get(max_method_length, Opts, 32))
			end
	end.

match_eol(<< $\n, _/bits >>, N) ->
	N;
match_eol(<< _, Rest/bits >>, N) ->
	match_eol(Rest, N + 1);
match_eol(_, _) ->
	nomatch.

parse_method(_, State, _, 0) ->
	error_terminate(501, State, {connection_error, limit_reached,
		'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
	case C of
		$\r -> error_terminate(400, State, {connection_error, protocol_error,
			'The method name must not be followed with a line break. (RFC7230 3.1.1)'});
		$\s -> parse_uri(Rest, State, SoFar);
		_ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
		_ -> error_terminate(400, State, {connection_error, protocol_error,
			'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
	end.

parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
		when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
			P =:= $p orelse P =:= $P ->
	parse_uri_authority(Rest, State, Method);
parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
		when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
			P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
	parse_uri_authority(Rest, State, Method);
parse_uri(<< $/, Rest/bits >>, State, Method) ->
	parse_uri_path(Rest, State, Method, undefined, <<$/>>);
parse_uri(_, State, _) ->
	error_terminate(400, State, {connection_error, protocol_error,
		'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).

%% @todo We probably want to apply max_authority_length also
%% to the host header and to document this option. It might
%% also be useful for HTTP/2 requests.
parse_uri_authority(Rest, State=#state{opts=Opts}, Method) ->
	parse_uri_authority(Rest, State, Method, <<>>,
		maps:get(max_authority_length, Opts, 255)).

parse_uri_authority(_, State, _, _, 0) ->
	error_terminate(414, State, {connection_error, limit_reached,
		'The authority component of the absolute URI is longer than configuration allows. (RFC7230 2.7.1)'});
parse_uri_authority(<<C, Rest/bits>>, State, Method, SoFar, Remaining) ->
	case C of
		$\r ->
			error_terminate(400, State, {connection_error, protocol_error,
				'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
		$@ ->
			error_terminate(400, State, {connection_error, protocol_error,
				'Absolute URIs must not include a userinfo component. (RFC7230 2.7.1)'});
		C when SoFar =:= <<>> andalso
				((C =:= $/) orelse (C =:= $\s) orelse (C =:= $?) orelse (C =:= $#)) ->
			error_terminate(400, State, {connection_error, protocol_error,
				'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
		$: when SoFar =:= <<>> ->
			error_terminate(400, State, {connection_error, protocol_error,
				'Absolute URIs must include a non-empty host component. (RFC7230 2.7.1)'});
		$/ -> parse_uri_path(Rest, State, Method, SoFar, <<"/">>);
		$\s -> parse_version(Rest, State, Method, SoFar, <<"/">>, <<>>);
		$? -> parse_uri_query(Rest, State, Method, SoFar, <<"/">>, <<>>);
		$# -> skip_uri_fragment(Rest, State, Method, SoFar, <<"/">>, <<>>);
		C -> parse_uri_authority(Rest, State, Method, <<SoFar/binary, C>>, Remaining - 1)
	end.

parse_uri_path(<<C, Rest/bits>>, State, Method, Authority, SoFar) ->
	case C of
		$\r -> error_terminate(400, State, {connection_error, protocol_error,
			'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
		$\s -> parse_version(Rest, State, Method, Authority, SoFar, <<>>);
		$? -> parse_uri_query(Rest, State, Method, Authority, SoFar, <<>>);
		$# -> skip_uri_fragment(Rest, State, Method, Authority, SoFar, <<>>);
		_ -> parse_uri_path(Rest, State, Method, Authority, <<SoFar/binary, C>>)
	end.

parse_uri_query(<<C, Rest/bits>>, State, M, A, P, SoFar) ->
	case C of
		$\r -> error_terminate(400, State, {connection_error, protocol_error,
			'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
		$\s -> parse_version(Rest, State, M, A, P, SoFar);
		$# -> skip_uri_fragment(Rest, State, M, A, P, SoFar);
		_ -> parse_uri_query(Rest, State, M, A, P, <<SoFar/binary, C>>)
	end.

skip_uri_fragment(<<C, Rest/bits>>, State, M, A, P, Q) ->
	case C of
		$\r -> error_terminate(400, State, {connection_error, protocol_error,
			'The request-target must not be followed by a line break. (RFC7230 3.1.1)'});
		$\s -> parse_version(Rest, State, M, A, P, Q);
		_ -> skip_uri_fragment(Rest, State, M, A, P, Q)
	end.

parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, A, P, Q) ->
	before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.1');
parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, A, P, Q) ->
	before_parse_headers(Rest, State, M, A, P, Q, 'HTTP/1.0');
parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
	error_terminate(400, State, {connection_error, protocol_error,
		'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
parse_version(<< C, _/bits >>, State, _, _, _, _) when C =:= $\s; C =:= $\t ->
	error_terminate(400, State, {connection_error, protocol_error,
		'The separator between request target and version must be a single SP. (RFC7230 3.1.1)'});
parse_version(_, State, _, _, _, _) ->
	error_terminate(505, State, {connection_error, protocol_error,
		'Unsupported HTTP version. (RFC7230 2.6)'}).

before_parse_headers(Rest, State, M, A, P, Q, V) ->
	parse_header(Rest, State#state{in_state=#ps_header{
		method=M, authority=A, path=P, qs=Q, version=V}}, #{}).

%% Headers.

%% We need two or more bytes in the buffer to continue.
parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
	{more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
	request(Rest, S, Headers);
parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
	MaxHeaders = maps:get(max_headers, Opts, 100),
	NumHeaders = maps:size(Headers),
	if
		NumHeaders >= MaxHeaders ->
			error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
				{connection_error, limit_reached,
					'The number of headers is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
		true ->
			parse_header_colon(Buffer, State, Headers)
	end.

parse_header_colon(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
	MaxLength = maps:get(max_header_name_length, Opts, 64),
	case match_colon(Buffer, 0) of
		nomatch when byte_size(Buffer) > MaxLength ->
			error_terminate(431, State#state{in_state=PS#ps_header{headers=Headers}},
				{connection_error, limit_reached,
					'A header name is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
		nomatch ->
			{more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
		_ ->
			parse_hd_name(Buffer, State, Headers, <<>>)
	end.

match_colon(<< $:, _/bits >>, N) ->
	N;
match_colon(<< _, Rest/bits >>, N) ->
	match_colon(Rest, N + 1);
match_colon(_, _) ->
	nomatch.

parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
	parse_hd_before_value(Rest, State, H, SoFar);
parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, <<>>) when ?IS_WS(C) ->
	error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
		{connection_error, protocol_error,
			'Whitespace is not allowed before the header name. (RFC7230 3.2)'});
parse_hd_name(<< C, _/bits >>, State=#state{in_state=PS}, H, _) when ?IS_WS(C) ->
	error_terminate(400, State#state{in_state=PS#ps_header{headers=H}},
		{connection_error, protocol_error,
			'Whitespace is not allowed between the header name and the colon. (RFC7230 3.2.4)'});
parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
	?LOWER(parse_hd_name, Rest, State, H, SoFar).

parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
	parse_hd_before_value(Rest, S, H, N);
parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
	parse_hd_before_value(Rest, S, H, N);
parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
	MaxLength = maps:get(max_header_value_length, Opts, 4096),
	case match_eol(Buffer, 0) of
		nomatch when byte_size(Buffer) > MaxLength ->
			error_terminate(431, State#state{in_state=PS#ps_header{headers=H}},
				{connection_error, limit_reached,
					'A header value is larger than configuration allows. (RFC7230 3.2.5, RFC6585 5)'});
		nomatch ->
			{more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
		_ ->
			parse_hd_value(Buffer, State, H, N, <<>>)
	end.

parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) ->
	Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1),
	Headers = case maps:get(Name, Headers0, undefined) of
		undefined -> Headers0#{Name => Value};
		%% The cookie header does not use proper HTTP header lists.
		Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>};
		Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>}
	end,
	parse_header(Rest, S, Headers);
parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
	parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).

clean_value_ws_end(_, -1) ->
	<<>>;
clean_value_ws_end(Value, N) ->
	case binary:at(Value, N) of
		$\s -> clean_value_ws_end(Value, N - 1);
		$\t -> clean_value_ws_end(Value, N - 1);
		_ ->
			S = N + 1,
			<< Value2:S/binary, _/bits >> = Value,
			Value2
	end.

-ifdef(TEST).
clean_value_ws_end_test_() ->
	Tests = [
		{<<>>, <<>>},
		{<<"     ">>, <<>>},
		{<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
			"text/html;level=2;q=0.4, */*;q=0.5   \t   \t    ">>,
			<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
				"text/html;level=2;q=0.4, */*;q=0.5">>}
	],
	[{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].

horse_clean_value_ws_end() ->
	horse:repeat(200000,
		clean_value_ws_end(
			<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
				"text/html;level=2;q=0.4, */*;q=0.5          ">>,
			byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
				"text/html;level=2;q=0.4, */*;q=0.5          ">>) - 1)
	).
-endif.

request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
		in_state=PS=#ps_header{authority=Authority, version=Version}}, Headers) ->
	case maps:get(<<"host">>, Headers, undefined) of
		undefined when Version =:= 'HTTP/1.1' ->
			%% @todo Might want to not close the connection on this and next one.
			error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
				{stream_error, StreamID, protocol_error,
					'HTTP/1.1 requests must include a host header. (RFC7230 5.4)'});
		undefined ->
			request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
		%% @todo When CONNECT requests come in we need to ignore the RawHost
		%% and instead use the Authority as the source of host.
		RawHost when Authority =:= undefined; Authority =:= RawHost ->
			request_parse_host(Buffer, State, Headers, RawHost);
		%% RFC7230 does not explicitly ask us to reject requests
		%% that have a different authority component and host header.
		%% However it DOES ask clients to set them to the same value,
		%% so we enforce that.
		_ ->
			error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
				{stream_error, StreamID, protocol_error,
					'The host header is different than the absolute-form authority component. (RFC7230 5.4)'})
	end.

request_parse_host(Buffer, State=#state{transport=Transport,
		in_streamid=StreamID, in_state=PS}, Headers, RawHost) ->
	try cow_http_hd:parse_host(RawHost) of
		{Host, undefined} ->
			request(Buffer, State, Headers, Host, default_port(Transport:secure()));
		{Host, Port} when Port > 0, Port =< 65535 ->
			request(Buffer, State, Headers, Host, Port);
		_ ->
			error_terminate(400, State, {stream_error, StreamID, protocol_error,
				'The port component of the absolute-form is not in the range 0..65535. (RFC7230 2.7.1)'})
	catch _:_ ->
		error_terminate(400, State#state{in_state=PS#ps_header{headers=Headers}},
			{stream_error, StreamID, protocol_error,
				'The host header is invalid. (RFC7230 5.4)'})
	end.

-spec default_port(boolean()) -> 80 | 443.
default_port(true) -> 443;
default_port(_) -> 80.

%% End of request parsing.

request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
		in_streamid=StreamID, in_state=
			PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
		Headers0, Host, Port) ->
	Scheme = case Transport:secure() of
		true -> <<"https">>;
		false -> <<"http">>
	end,
	{Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
		#{<<"transfer-encoding">> := TransferEncoding0} ->
			try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
				[<<"chunked">>] ->
					{maps:remove(<<"content-length">>, Headers0),
						true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
				_ ->
					error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
						{stream_error, StreamID, protocol_error,
							'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
			catch _:_ ->
				error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
					{stream_error, StreamID, protocol_error,
						'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
			end;
		#{<<"content-length">> := <<"0">>} ->
			{Headers0, false, 0, undefined, undefined};
		#{<<"content-length">> := BinLength} ->
			Length = try
				cow_http_hd:parse_content_length(BinLength)
			catch _:_ ->
				error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
					{stream_error, StreamID, protocol_error,
						'The content-length header is invalid. (RFC7230 3.3.2)'})
			end,
			{Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
		_ ->
			{Headers0, false, 0, undefined, undefined}
	end,
	Req = #{
		ref => Ref,
		pid => self(),
		streamid => StreamID,
		peer => Peer,
		sock => Sock,
		cert => Cert,
		method => Method,
		scheme => Scheme,
		host => Host,
		port => Port,
		path => Path,
		qs => Qs,
		version => Version,
		%% We are transparently taking care of transfer-encodings so
		%% the user code has no need to know about it.
		headers => maps:remove(<<"transfer-encoding">>, Headers),
		has_body => HasBody,
		body_length => BodyLength
	},
	case is_http2_upgrade(Headers, Version) of
		false ->
			State = case HasBody of
				true ->
					State0#state{in_state=#ps_body{
						length = BodyLength,
						transfer_decode_fun = TDecodeFun,
						transfer_decode_state = TDecodeState
					}};
				false ->
					State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}
			end,
			{request, Req, State, Buffer};
		{true, HTTP2Settings} ->
			%% We save the headers in case the upgrade will fail
			%% and we need to pass them to cowboy_stream:early_error.
			http2_upgrade(State0#state{in_state=PS#ps_header{headers=Headers}},
				Buffer, HTTP2Settings, Req)
	end.

%% HTTP/2 upgrade.

%% @todo We must not upgrade to h2c over a TLS connection.
is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
		<<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
	Conns = cow_http_hd:parse_connection(Conn),
	case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
		{true, true} ->
			Protocols = cow_http_hd:parse_upgrade(Upgrade),
			case lists:member(<<"h2c">>, Protocols) of
				true ->
					{true, HTTP2Settings};
				false ->
					false
			end;
		_ ->
			false
	end;
is_http2_upgrade(_, _) ->
	false.

%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
		opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
	case Transport:secure() of
		false ->
			_ = cancel_timeout(State),
			cowboy_http2:init(Parent, Ref, Socket, Transport, Opts,
				Peer, Sock, Cert, Buffer);
		true ->
			error_terminate(400, State, {connection_error, protocol_error,
				'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
	end.

%% Upgrade via an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
		opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) ->
	%% @todo
	%% However if the client sent a body, we need to read the body in full
	%% and if we can't do that, return a 413 response. Some options are in order.
	%% Always half-closed stream coming from this side.
	try cow_http_hd:parse_http2_settings(HTTP2Settings) of
		Settings ->
			_ = cancel_timeout(State),
			cowboy_http2:init(Parent, Ref, Socket, Transport, Opts,
				Peer, Sock, Cert, Buffer, Settings, Req)
	catch _:_ ->
		error_terminate(400, State, {connection_error, protocol_error,
			'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
	end.

%% Request body parsing.

parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
		PS=#ps_body{received=Received, transfer_decode_fun=TDecode,
			transfer_decode_state=TState0}}) ->
	%% @todo Proper trailers.
	try TDecode(Buffer, TState0) of
		more ->
			%% @todo Asks for 0 or more bytes.
			{more, State, Buffer};
		{more, Data, TState} ->
			%% @todo Asks for 0 or more bytes.
			{data, StreamID, nofin, Data, State#state{in_state=
				PS#ps_body{received=Received + byte_size(Data),
					transfer_decode_state=TState}}, <<>>};
		{more, Data, _Length, TState} when is_integer(_Length) ->
			%% @todo Asks for Length more bytes.
			{data, StreamID, nofin, Data, State#state{in_state=
				PS#ps_body{received=Received + byte_size(Data),
					transfer_decode_state=TState}}, <<>>};
		{more, Data, Rest, TState} ->
			%% @todo Asks for 0 or more bytes.
			{data, StreamID, nofin, Data, State#state{in_state=
				PS#ps_body{received=Received + byte_size(Data),
					transfer_decode_state=TState}}, Rest};
		{done, _HasTrailers, Rest} ->
			{data, StreamID, fin, <<>>, set_timeout(
				State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
		{done, Data, _HasTrailers, Rest} ->
			{data, StreamID, fin, Data, set_timeout(
				State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
	catch _:_ ->
		Reason = {connection_error, protocol_error,
			'Failure to decode the content. (RFC7230 4)'},
		terminate(stream_terminate(State, StreamID, Reason), Reason)
	end.

%% Message handling.

down(State=#state{opts=Opts, children=Children0}, Pid, Msg) ->
	case cowboy_children:down(Children0, Pid) of
		%% The stream was terminated already.
		{ok, undefined, Children} ->
			State#state{children=Children};
		%% The stream is still running.
		{ok, StreamID, Children} ->
			info(State#state{children=Children}, StreamID, Msg);
		%% The process was unknown.
		error ->
			cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
				[Msg, Pid], Opts),
			State
	end.

info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
	case lists:keyfind(StreamID, #stream.id, Streams0) of
		Stream = #stream{state=StreamState0} ->
			try cowboy_stream:info(StreamID, Msg, StreamState0) of
				{Commands, StreamState} ->
					Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
						Stream#stream{state=StreamState}),
					commands(State#state{streams=Streams}, StreamID, Commands)
			catch Class:Exception ->
				cowboy:log(cowboy_stream:make_error_log(info,
					[StreamID, Msg, StreamState0],
					Class, Exception, erlang:get_stacktrace()), Opts),
				stream_reset(State, StreamID, {internal_error, {Class, Exception},
					'Unhandled exception in cowboy_stream:info/3.'})
			end;
		false ->
			cowboy:log(warning, "Received message ~p for unknown stream ~p.~n",
				[Msg, StreamID], Opts),
			State
	end.

%% Commands.

commands(State, _, []) ->
	State;
%% Supervise a child process.
commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
	commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
		StreamID, Tail);
%% Error handling.
commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
	commands(stream_reset(State, StreamID, Error), StreamID, Tail);
%% Commands for a stream currently inactive.
commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
		when Current =/= StreamID ->

	%% @todo We still want to handle some commands...

	Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
	Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
		Stream#stream{queue=Queue ++ Commands}),
	State#state{streams=Streams};
%% Read the request body.
commands(State, StreamID, [{flow, _Length}|Tail]) ->
	%% @todo We only read from socket if buffer is empty, otherwise
	%% we decode the buffer.

	%% @todo Set the body reading length to min(Length, BodyLength)

	commands(State, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
commands(State=#state{out_state=wait}, StreamID, [{error_response, Status, Headers0, Body}|Tail]) ->
	%% We close the connection when the error response is 408, as it
	%% indicates a timeout and the RFC recommends that we stop here. (RFC7231 6.5.7)
	Headers = case Status of
		408 -> Headers0#{<<"connection">> => <<"close">>};
		<<"408", _/bits>> -> Headers0#{<<"connection">> => <<"close">>};
		_ -> Headers0
	end,
	commands(State, StreamID, [{response, Status, Headers, Body}|Tail]);
commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
	commands(State, StreamID, Tail);
%% Send an informational response.
commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
		StreamID, [{inform, StatusCode, Headers}|Tail]) ->
	%% @todo I'm pretty sure the last stream in the list is the one we want
	%% considering all others are queued.
	#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
	_ = case Version of
		'HTTP/1.1' ->
			Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
				headers_to_list(Headers)));
		%% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
		'HTTP/1.0' ->
			ok
	end,
	commands(State, StreamID, Tail);
%% Send a full response.
%%
%% @todo Kill the stream if it sent a response when one has already been sent.
%% @todo Keep IsFin in the state.
%% @todo Same two things above apply to DATA, possibly promise too.
commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID,
		[{response, StatusCode, Headers0, Body}|Tail]) ->
	%% @todo I'm pretty sure the last stream in the list is the one we want
	%% considering all others are queued.
	#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
	{State, Headers} = connection(State0, Headers0, StreamID, Version),
	%% @todo Ensure content-length is set.
	Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
	case Body of
		{sendfile, O, B, P} ->
			Transport:send(Socket, Response),
			commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
		_ ->
			Transport:send(Socket, [Response, Body]),
			commands(State#state{out_state=done}, StreamID, Tail)
	end;
%% Send response headers and initiate chunked encoding or streaming.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
		StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
	%% @todo Same as above (about the last stream in the list).
	Stream = #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams0),
	Status = cow_http:status_to_integer(StatusCode),
	ContentLength = maps:get(<<"content-length">>, Headers0, undefined),
	{State1, Headers1} = case {Status, ContentLength, Version} of
		{204, _, 'HTTP/1.1'} ->
			{State0#state{out_state=done}, Headers0};
		{_, undefined, 'HTTP/1.1'} ->
			{State0#state{out_state=chunked}, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
		%% Close the connection after streaming without content-length to HTTP/1.0 client.
		{_, undefined, 'HTTP/1.0'} ->
			{State0#state{out_state=streaming, last_streamid=StreamID}, Headers0};
		%% Stream the response body without chunked transfer-encoding.
		_ ->
			ExpectedSize = cow_http_hd:parse_content_length(ContentLength),
			Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
				Stream#stream{local_expected_size=ExpectedSize}),
			{State0#state{out_state=streaming, streams=Streams}, Headers0}
	end,
	Headers2 = case stream_te(OutState, Stream) of
		trailers -> Headers1;
		_ -> maps:remove(<<"trailer">>, Headers1)
	end,
	{State, Headers} = connection(State1, Headers2, StreamID, Version),
	Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
	commands(State, StreamID, Tail);
%% Send a response body chunk.
%%
%% @todo WINDOW_UPDATE stuff require us to buffer some data.
%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
		StreamID, [{data, IsFin, Data}|Tail]) ->
	%% Do not send anything when the user asks to send an empty
	%% data frame, as that would break the protocol.
	Size = iolist_size(Data),
	Stream0 = lists:keyfind(StreamID, #stream.id, Streams0),
	Stream = case Size of
		0 ->
			%% We send the last chunk only if version is HTTP/1.1 and IsFin=fin.
			case {OutState, Stream0} of
				{_, #stream{method= <<"HEAD">>}} ->
					ok;
				{chunked, _} when IsFin =:= fin ->
					Transport:send(Socket, <<"0\r\n\r\n">>);
				_ ->
					ok
			end,
			Stream0;
		_ ->
			%% @todo We need to kill the stream if it tries to send data before headers.
			%% @todo Same as above.
			case {OutState, Stream0} of
				{_, #stream{method= <<"HEAD">>}} ->
					Stream0;
				{chunked, _} ->
					Transport:send(Socket, [
						integer_to_binary(Size, 16), <<"\r\n">>, Data,
						case IsFin of
							fin -> <<"\r\n0\r\n\r\n">>;
							nofin -> <<"\r\n">>
						end
					]),
					Stream0;
				{streaming, #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize}} ->
					SentSize = SentSize0 + Size,
					if
						%% undefined is > any integer value.
						SentSize > ExpectedSize ->
							terminate(State0, response_body_too_large);
						true ->
							Transport:send(Socket, Data),
							Stream0#stream{local_sent_size=SentSize}
					end
			end
	end,
	State = case IsFin of
		fin -> State0#state{out_state=done};
		nofin -> State0
	end,
	Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
	commands(State#state{streams=Streams}, StreamID, Tail);
commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
		StreamID, [{trailers, Trailers}|Tail]) ->
	case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
		trailers ->
			Transport:send(Socket, [
				<<"0\r\n">>,
				cow_http:headers(maps:to_list(Trailers)),
				<<"\r\n">>
			]);
		no_trailers ->
			Transport:send(Socket, <<"0\r\n\r\n">>);
		not_chunked ->
			ok
	end,
	commands(State#state{out_state=done}, StreamID, Tail);
%% Send a file.
commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
		[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
	%% @todo exit with response_body_too_large if we exceed content-length
	%% We wrap the sendfile call into a try/catch because on OTP-20
	%% and earlier a few different crashes could occur for sockets
	%% that were closing or closed. For example a badarg in
	%% erlang:port_get_data(#Port<...>) or a badmatch like
	%% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
	%%
	%% OTP-21 uses a NIF instead of a port so the implementation
	%% and behavior has dramatically changed and it is unclear
	%% whether it will be necessary in the future.
	%%
	%% This try/catch prevents some noisy logs to be written
	%% when these errors occur.
	try
		Transport:sendfile(Socket, Path, Offset, Bytes),
		State = case IsFin of
			fin -> State0#state{out_state=done}
%% @todo Add the sendfile command.
%			nofin -> State0
		end,
		commands(State, StreamID, Tail)
	catch _:_ ->
		terminate(State0, {socket_error, sendfile_crash,
			'An error occurred when using the sendfile function.'})
	end;
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
		out_state=OutState, opts=Opts, children=Children}, StreamID,
		[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
	%% @todo This should be the last stream running otherwise we need to wait before switching.
	%% @todo If there's streams opened after this one, fail instead of 101.
	State = cancel_timeout(State0),
	%% Before we send the 101 response we need to stop receiving data
	%% from the socket, otherwise the data might be receive before the
	%% call to flush/0 and we end up inadvertently dropping a packet.
	%%
	%% @todo Handle cases where the request came with a body. We need
	%% to process or skip the body before the upgrade can be completed.
	Transport:setopts(Socket, [{active, false}]),
	%% Send a 101 response if necessary, then terminate the stream.
	#state{streams=Streams} = case OutState of
		wait -> info(State, StreamID, {inform, 101, Headers});
		_ -> State
	end,
	#stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
	%% @todo We need to shutdown processes here first.
	stream_call_terminate(StreamID, switch_protocol, StreamState, State),
	%% Terminate children processes and flush any remaining messages from the mailbox.
	cowboy_children:terminate(Children),
	flush(Parent),
	%% @todo This is no good because commands return a state normally and here it doesn't
	%% we need to let this module go entirely. Perhaps it should be handled directly in
	%% cowboy_clear/cowboy_tls?
	Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
	%% @todo Do we want to run the commands after a stop?
	%% @todo We currently wait for the stop command before we
	%% continue with the next request/response. In theory, if
	%% the request body was read fully and the response body
	%% was sent fully we should be able to start working on
	%% the next request concurrently. This can be done as a
	%% future optimization.
	maybe_terminate(State, StreamID, Tail);
%% Log event.
commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
	cowboy:log(Log, Opts),
	commands(State, StreamID, Tail);
%% HTTP/1.1 does not support push; ignore.
commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
	commands(State, StreamID, Tail).

%% The set-cookie header is special; we can only send one cookie per header.
headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
	Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
	Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
headers_to_list(Headers) ->
	maps:to_list(Headers).

%% Flush messages specific to cowboy_http before handing over the
%% connection to another protocol.
flush(Parent) ->
	receive
		{timeout, _, _} ->
			flush(Parent);
		{{Pid, _}, _} when Pid =:= self() ->
			flush(Parent);
		{'EXIT', Pid, _} when Pid =/= Parent ->
			flush(Parent)
	after 0 ->
		ok
	end.

%% @todo In these cases I'm not sure if we should continue processing commands.
maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
	terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
maybe_terminate(State, StreamID, _Tail) ->
	stream_terminate(State, StreamID, normal).

stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
	%% @todo headers
	%% @todo Don't send this if there are no streams left.
%	Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
%		{<<"content-length">>, <<"0">>}
%	])),
	%% @todo update IsFin local
%	stream_terminate(State#state{out_state=done}, StreamID, StreamError).
	stream_terminate(State, StreamID, StreamError).

stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
		out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
		children=Children0}, StreamID, Reason) ->
	#stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize}
		= lists:keyfind(StreamID, #stream.id, Streams0),
	State1 = #state{streams=Streams1} = case OutState of
		wait when element(1, Reason) =:= internal_error ->
			info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>});
		wait when element(1, Reason) =:= connection_error ->
			info(State0, StreamID, {response, 400, #{<<"content-length">> => <<"0">>}, <<>>});
		wait ->
			info(State0, StreamID, {response, 204, #{}, <<>>});
		chunked when Version =:= 'HTTP/1.1' ->
			info(State0, StreamID, {data, fin, <<>>});
		streaming when ExpectedSize < SentSize ->
			terminate(State0, response_body_too_small);
		_ -> %% done or Version =:= 'HTTP/1.0'
			State0
	end,
	%% Remove the stream from the state.
	{value, #stream{state=StreamState}, Streams}
		= lists:keytake(StreamID, #stream.id, Streams1),
	State2 = State1#state{streams=Streams},
	%% Stop the stream.
	stream_call_terminate(StreamID, Reason, StreamState, State2),
	Children = cowboy_children:shutdown(Children0, StreamID),
	%% We reset the timeout if there are no active streams anymore.
	State = case Streams of
		[] -> set_timeout(State2);
		_ -> State2
	end,
	%% We want to drop the connection if the body was not read fully
	%% and we don't know its length or more remains to be read than
	%% configuration allows.
	%% @todo Only do this if Current =:= StreamID.
	MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000),
	case InState of
		#ps_body{length=undefined}
				when InStreamID =:= OutStreamID ->
			terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
		#ps_body{length=Len, received=Received}
				when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
			terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
		_ ->
			%% Move on to the next stream.
			NextOutStreamID = OutStreamID + 1,
			case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
				false ->
					%% @todo This is clearly wrong, if the stream is gone we need to check if
					%% there used to be such a stream, and if there was to send an error.
					State#state{out_streamid=NextOutStreamID, out_state=wait,
						streams=Streams, children=Children};
				#stream{queue=Commands} ->
					%% @todo Remove queue from the stream.
					commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
						streams=Streams, children=Children}, NextOutStreamID, Commands)
			end
	end.

stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->
	try
		cowboy_stream:terminate(StreamID, Reason, StreamState)
	catch Class:Exception ->
		cowboy:log(cowboy_stream:make_error_log(terminate,
			[StreamID, Reason, StreamState],
			Class, Exception, erlang:get_stacktrace()), Opts)
	end.

%% @todo max_reqs also
maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
	Conns = cow_http_hd:parse_connection(Conn),
	case lists:member(<<"keep-alive">>, Conns) of
		true -> keepalive;
		false -> close
	end;
maybe_req_close(_, _, 'HTTP/1.0') ->
	close;
maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
	case connection_hd_is_close(Conn) of
		true -> close;
		false -> keepalive
	end;
maybe_req_close(_State, _, _) ->
	keepalive.

connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
	case connection_hd_is_close(Conn) of
		true -> {State, Headers};
		%% @todo Here we need to remove keep-alive and add close, not just add close.
		false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
	end;
connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
	{State, Headers#{<<"connection">> => <<"close">>}};
connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
	case connection_hd_is_close(Conn) of
		true -> {State#state{last_streamid=StreamID}, Headers};
		%% @todo Here we need to set keep-alive only if it wasn't set before.
		false -> {State, Headers}
	end;
connection(State, Headers, _, 'HTTP/1.0') ->
	{State, Headers#{<<"connection">> => <<"keep-alive">>}};
connection(State, Headers, _, _) ->
	{State, Headers}.

connection_hd_is_close(Conn) ->
	Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
	lists:member(<<"close">>, Conns).

stream_te(streaming, _) ->
	not_chunked;
%% No TE header was sent.
stream_te(_, #stream{te=undefined}) ->
	no_trailers;
stream_te(_, #stream{te=TE0}) ->
	try cow_http_hd:parse_te(TE0) of
		{TE1, _} -> TE1
	catch _:_ ->
		%% If we can't parse the TE header, assume we can't send trailers.
		no_trailers
	end.

%% This function is only called when an error occurs on a new stream.
-spec error_terminate(cowboy:http_status(), #state{}, _) -> no_return().
error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamState}, Reason) ->
	PartialReq = case StreamState of
		#ps_request_line{} -> #{
			ref => Ref,
			peer => Peer
		};
		#ps_header{method=Method, path=Path, qs=Qs,
				version=Version, headers=ReqHeaders} -> #{
			ref => Ref,
			peer => Peer,
			method => Method,
			path => Path,
			qs => Qs,
			version => Version,
			headers => case ReqHeaders of
				undefined -> #{};
				_ -> ReqHeaders
			end
		}
	end,
	early_error(StatusCode, State, Reason, PartialReq, #{<<"connection">> => <<"close">>}),
	terminate(State, Reason).

early_error(StatusCode, State, Reason, PartialReq) ->
	early_error(StatusCode, State, Reason, PartialReq, #{}).

early_error(StatusCode0, #state{socket=Socket, transport=Transport,
		opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
	RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
	Resp = {response, StatusCode0, RespHeaders1, <<>>},
	try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
		{response, StatusCode, RespHeaders, RespBody} ->
			Transport:send(Socket, [
				cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
				%% @todo We shouldn't send the body when the method is HEAD.
				%% @todo Technically we allow the sendfile tuple.
				RespBody
			])
	catch Class:Exception ->
		cowboy:log(cowboy_stream:make_error_log(early_error,
			[StreamID, Reason, PartialReq, Resp, Opts],
			Class, Exception, erlang:get_stacktrace()), Opts),
		%% We still need to send an error response, so send what we initially
		%% wanted to send. It's better than nothing.
		Transport:send(Socket, cow_http:response(StatusCode0,
			'HTTP/1.1', maps:to_list(RespHeaders1)))
	end,
	ok.

-spec terminate(_, _) -> no_return().
terminate(undefined, Reason) ->
	exit({shutdown, Reason});
terminate(State=#state{streams=Streams, children=Children}, Reason) ->
	terminate_all_streams(State, Streams, Reason),
	cowboy_children:terminate(Children),
	terminate_linger(State),
	exit({shutdown, Reason}).

terminate_all_streams(_, [], _) ->
	ok;
terminate_all_streams(State, [#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
	stream_call_terminate(StreamID, Reason, StreamState, State),
	terminate_all_streams(State, Tail, Reason).

terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
	case Transport:shutdown(Socket, write) of
		ok ->
			case maps:get(linger_timeout, Opts, 1000) of
				0 ->
					ok;
				infinity ->
					terminate_linger_loop(State, undefined);
				Timeout ->
					TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
					terminate_linger_loop(State, TimerRef)
			end;
		{error, _} ->
			ok
	end.

terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
	{OK, Closed, Error} = Transport:messages(),
	%% We may already have a message in the mailbox when we do this
	%% but it's OK because we are shutting down anyway.
	case Transport:setopts(Socket, [{active, once}]) of
		ok ->
			receive
				{OK, Socket, _} ->
					terminate_linger_loop(State, TimerRef);
				{Closed, Socket} ->
					ok;
				{Error, Socket, _} ->
					ok;
				{timeout, TimerRef, linger_timeout} ->
					ok;
				_ ->
					terminate_linger_loop(State, TimerRef)
			end;
		{error, _} ->
			ok
	end.

%% System callbacks.

-spec system_continue(_, _, {#state{}, binary()}) -> ok.
system_continue(_, _, {State, Buffer}) ->
	loop(State, Buffer).

-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
system_terminate(Reason, _, _, {State, _}) ->
	terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}).

-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->
	{ok, Misc}.