aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun_http2.erl
blob: 99776c6369e02258f1c49d76080c1095129220fe (plain) (tree)
1
                                                             




















                                                                           
                     


                     




                                
                          
                                            
                                     

                                                            



                                                                         
                                                                     
                                              
                                      

                                                                      

                                                                





                                                  
                                     
                                                      

                                  










                                                                             



                                           







                                                                                          









                                                             




                                                            




                                                                    




                                         
                                       
                                                                  
                                                        

                                                                                     
                                                      


                                                        
                                            
           
              



                                                                                 
                                                                   


                                                     
                                                       



                                                             







                                                                                                                 

                                            
                                                                
                                                                                                   
                                                   
                                                 
                       
                                                                


              


                                                                                                       
                                                                                      




                                                                                                

                                                               
                                                                                    


                                                                                                          




                                                                                            


                                                                            
                                                                










                                                                                            

                                                                                         
                  

                                                                                  
                                                         
                                                                           





                                                                                     
                                                                        
                                                                                  


                                                                 
                                                                           












                                                                                                                                                  
                                                                                                                     
                                                                                                                                 


                                                                                                                                










                                                                                                                                  









                                                                                

                                                                           
                                       



                                                                                             

                                                                                 
                                       

                                                                                     


                                                                                                             










                                                                                                
                                 

                                                                     

                                                                                            












                                                                                                         





















                                                                                                     

                                       
 
                    
           

                                                                 
                                             
                            

                                                                    
                                                  

              
                                                                                         

                                                                         
                                                                                                                 





                                                                                             

                                                                                                      

                                                            


                                                                                          

                                                                              
                                                                                                                 
                                                                                


                                                                                                      
 
                                                                               
                             
                                             


                                                               

                                                                  
                                                         
            









                                                                               


                                                 
                     
                                              



                                               

                                                    
                                     



                                                                                  
                        
                                                                          

            










































































                                                                                                     










                                                                                                        













                                                                                                                    

            







                                                                                                
                                                 




                                                                          
                                                              
                                      




                                                                                       
                                                                         






                                                              
                                                                                       

                                                          

                                                                    
                                                                                            
                                 
                                                                                    

              
                                                                                      


                                                              
                                                 
                                                                                                    




                                                
                                                          
                                           
 






































































                                                                                                             
                                                                   


                                                                                      

                                                                              






                                                                                               

                                                           


                                                        

                                                           





                                                            



                                                                          
                                                          
                                                       










                                                                  

                                               

                                                    




                                                                     




                                                                                 
%% Copyright (c) 2016-2018, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun_http2).

-export([check_options/1]).
-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
-export([request/8]).
-export([request/9]).
-export([data/5]).
-export([cancel/3]).
-export([down/1]).

-record(stream, {
	id :: non_neg_integer(),
	ref :: reference(),
	reply_to :: pid(),
	%% Whether we finished sending data.
	local = nofin :: fin | nofin,
	%% Local flow control window (how much we can send).
	local_window :: integer(),
	%% Buffered data waiting for the flow control window to increase.
	local_buffer = queue:new() :: queue:queue(
		{fin | nofin, non_neg_integer(), iolist()}),
	local_buffer_size = 0 :: non_neg_integer(),
	local_trailers = undefined :: undefined | cow_http:headers(),
	%% Whether we finished receiving data.
	remote = nofin :: fin | nofin,
	%% Remote flow control window (how much we accept to receive).
	remote_window :: integer(),
	%% Content handlers state.
	handler_state :: undefined | gun_content_handler:state()
}).

-record(http2_state, {
	owner :: pid(),
	socket :: inet:socket() | ssl:sslsocket(),
	transport :: module(),
	opts = #{} :: map(), %% @todo
	content_handlers :: gun_content_handler:opt(),
	buffer = <<>> :: binary(),

	local_settings = #{
		initial_window_size => 65535,
		max_frame_size => 16384
	} :: map(),
	remote_settings = #{
		initial_window_size => 65535
	} :: map(),

	%% Connection-wide flow control window.
	local_window = 65535 :: integer(), %% How much we can send.
	remote_window = 65535 :: integer(), %% How much we accept to receive.

	streams = [] :: [#stream{}],
	stream_id = 1 :: non_neg_integer(),

	%% The client starts by sending a sequence of bytes as a preface,
	%% followed by a potentially empty SETTINGS frame. Then the connection
	%% is established and continues normally. An exception is when a HEADERS
	%% frame is sent followed by CONTINUATION frames: no other frame can be
	%% sent in between.
	parse_state = undefined :: normal
		| {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()},

	%% HPACK decoding and encoding state.
	decode_state = cow_hpack:init() :: cow_hpack:state(),
	encode_state = cow_hpack:init() :: cow_hpack:state()
}).

check_options(Opts) ->
	do_check_options(maps:to_list(Opts)).

do_check_options([]) ->
	ok;
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
	case gun_content_handler:check_option(Handlers) of
		ok -> do_check_options(Opts);
		error -> {error, {options, {http, Opt}}}
	end;
do_check_options([{keepalive, infinity}|Opts]) ->
	do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
	do_check_options(Opts);
%% @todo max_frame_size_sent
do_check_options([Opt|_]) ->
	{error, {options, {http2, Opt}}}.

name() -> http2.

init(Owner, Socket, Transport, Opts) ->
	Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
	State = #http2_state{owner=Owner, socket=Socket,
		transport=Transport, opts=Opts, content_handlers=Handlers,
		parse_state=normal}, %% @todo Have a special parse state for preface.
	#http2_state{local_settings=Settings} = State,
	%% Send the HTTP/2 preface.
	Transport:send(Socket, [
		<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
		cow_http2:settings(Settings)
	]),
	State.

handle(Data, State=#http2_state{buffer=Buffer}) ->
	parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).

parse(Data0, State0=#http2_state{buffer=Buffer, parse_state=PS}) ->
	%% @todo Parse states: Preface. Continuation.
	Data = << Buffer/binary, Data0/binary >>,
	case cow_http2:parse(Data) of
		{ok, Frame, Rest} when PS =:= normal ->
			case frame(Frame, State0) of
				close -> close;
				State1 -> parse(Rest, State1)
			end;
		{ok, Frame, Rest} when element(1, PS) =:= continuation ->
			case continuation_frame(Frame, State0) of
				close -> close;
				State1 -> parse(Rest, State1)
			end;
		{ignore, _} when element(1, PS) =:= continuation ->
			terminate(State0, {connection_error, protocol_error,
				'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'});
		{ignore, Rest} ->
			parse(Rest, State0);
		{stream_error, StreamID, Reason, Human, Rest} ->
			parse(Rest, stream_reset(State0, StreamID, {stream_error, Reason, Human}));
		Error = {connection_error, _, _} ->
			terminate(State0, Error);
		more ->
			{state, State0#http2_state{buffer=Data}}
	end.

%% DATA frame.
frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) ->
	case get_stream_by_id(StreamID, State0) of
		Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} ->
			Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
			{Stream, State} = send_window_update(
				Stream0#stream{remote_window=StreamWindow - byte_size(Data),
					handler_state=Handlers},
				State0#http2_state{remote_window=ConnWindow - byte_size(Data)}),
			remote_fin(Stream, State, IsFin);
		_ ->
			%% @todo protocol_error if not existing
			stream_reset(State0, StreamID, {stream_error, stream_closed,
				'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
	end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, State) ->
	stream_decode_init(State, StreamID, IsFin, HeaderBlock);
%% HEADERS frame starting a headers block. Enter continuation mode.
frame({headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}, State) ->
	State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
%% Single HEADERS frame headers block with priority.
frame({headers, StreamID, IsFin, head_fin,
		_IsExclusive, _DepStreamID, _Weight, HeaderBlock}, State) ->
	stream_decode_init(State, StreamID, IsFin, HeaderBlock);
%% @todo HEADERS frame starting a headers block. Enter continuation mode.
%frame(State, {headers, StreamID, IsFin, head_nofin,
%		_IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) ->
%	%% @todo Handle priority.
%	State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
%% @todo PRIORITY frame.
%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) ->
%	%% @todo Validate StreamID?
%	%% @todo Handle priority.
%	State;
%% @todo RST_STREAM frame.
frame({rst_stream, StreamID, Reason}, State) ->
	stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'});
%% SETTINGS frame.
frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport,
		remote_settings=Settings0}) ->
	Transport:send(Socket, cow_http2:settings_ack()),
	State#http2_state{remote_settings=maps:merge(Settings0, Settings)};
%% Ack for a previously sent SETTINGS frame.
frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) ->
	%% @todo Apply SETTINGS that require synchronization.
	State;
%% PUSH_PROMISE frame.
%% @todo Continuation.
frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
		State=#http2_state{streams=Streams, decode_state=DecodeState0}) ->
	case get_stream_by_id(PromisedStreamID, State) of
		false ->
			case get_stream_by_id(StreamID, State) of
				#stream{ref=StreamRef, reply_to=ReplyTo} ->
					try cow_hpack:decode(HeaderBlock, DecodeState0) of
						{Headers0, DecodeState} ->
							{Method, Scheme, Authority, Path, Headers} = try
								{value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0),
								{value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1),
								{value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2),
								{value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3),
								{Method0, Scheme0, Authority0, Path0, Headers4}
							catch error:badmatch ->
								stream_reset(State, StreamID, {stream_error, protocol_error,
									'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'})
							end,
							NewStreamRef = make_ref(),
							ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method,
								iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
							NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo,
								nofin, fin, State),
							State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState}
					catch _:_ ->
						terminate(State, {connection_error, compression_error,
							'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
					end;
				_ ->
					stream_reset(State, StreamID, {stream_error, stream_closed,
						'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
			end;
		_ ->
			stream_reset(State, StreamID, {stream_error, todo, ''})
	end;
%% PING frame.
frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) ->
	Transport:send(Socket, cow_http2:ping_ack(Opaque)),
	State;
%% Ack for a previously sent PING frame.
%%
%% @todo Might want to check contents but probably a waste of time.
frame({ping_ack, _Opaque}, State) ->
	State;
%% GOAWAY frame.
frame(Frame={goaway, StreamID, _, _}, State) ->
	terminate(State, StreamID, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow})
		when ConnWindow + Increment > 16#7fffffff ->
	terminate(State, {connection_error, flow_control_error,
		'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'});
frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) ->
	send_data(State#http2_state{local_window=ConnWindow + Increment});
%% Stream-specific WINDOW_UPDATE frame.
frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) ->
	case lists:keyfind(StreamID, #stream.id, Streams0) of
		#stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
			stream_reset(State0, StreamID, {stream_error, flow_control_error,
				'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'});
		Stream0 = #stream{local_window=StreamWindow} ->
			{State, Stream} = send_data(State0,
				Stream0#stream{local_window=StreamWindow + Increment}),
			Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
			State#http2_state{streams=Streams};
		false ->
			%% @todo Receiving this frame on a stream in the idle state is an error.
			%% WINDOW_UPDATE frames may be received for a short period of time
			%% after a stream is closed. They must be ignored.
			State0
	end;
%% Unexpected CONTINUATION frame.
frame({continuation, StreamID, _, _}, State) ->
	terminate(State, StreamID, {connection_error, protocol_error,
		'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).

continuation_frame({continuation, StreamID, head_fin, HeaderBlockFragment1},
		State=#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment0}}) ->
	HeaderBlock = << HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>,
	stream_decode_init(State#http2_state{parse_state=normal}, StreamID, IsFin, HeaderBlock);
continuation_frame({continuation, StreamID, head_nofin, HeaderBlockFragment1},
		State=#http2_state{parse_state=
			{continuation, StreamID, IsFin, HeaderBlockFragment0}}) ->
	State#http2_state{parse_state={continuation, StreamID, IsFin,
		<< HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>}};
continuation_frame(_, State) ->
	terminate(State, {connection_error, protocol_error,
		'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}).

send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0},
		State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) ->
	%% @todo We should make the windows configurable.
	MinConnWindow = 8000000,
	MinStreamWindow = 1000000,
	ConnWindow = if
		ConnWindow0 =< MinConnWindow ->
			Transport:send(Socket, cow_http2:window_update(MinConnWindow)),
			ConnWindow0 + MinConnWindow;
		true ->
			ConnWindow0
	end,
	StreamWindow = if
		StreamWindow0 =< MinStreamWindow ->
			Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)),
			StreamWindow0 + MinStreamWindow;
		true ->
			StreamWindow0
	end,
	{Stream#stream{remote_window=StreamWindow},
		State#http2_state{remote_window=ConnWindow}}.

close(#http2_state{streams=Streams}) ->
	close_streams(Streams).

close_streams([]) ->
	ok;
close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
	ReplyTo ! {gun_error, self(), StreamRef, {closed,
		"The connection was lost."}},
	close_streams(Tail).

keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
	Transport:send(Socket, cow_http2:ping(0)),
	State.

request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
		streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo,
		Method, Host, Port, Path, Headers) ->
	{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
	IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
			orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
		true -> nofin;
		false -> fin
	end,
	Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
	Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State),
	State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}.

%% @todo Handle Body > 16MB. (split it out into many frames)
request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
		streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo,
		Method, Host, Port, Path, Headers0, Body) ->
	Headers = lists:keystore(<<"content-length">>, 1, Headers0,
		{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
	{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
	Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
	Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0),
	{State, Stream} = send_data(State0, Stream0, fin, Body),
	State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}.

prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
	Host2 = case Host0 of
		{local, _SocketPath} -> <<>>;
		Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
		_ -> Host0
	end,
	Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
		{_, Host} -> Host;
		_ -> [Host2, $:, integer_to_binary(Port)]
	end,
	%% @todo We also must remove any header found in the connection header.
	Headers1 =
		lists:keydelete(<<"host">>, 1,
		lists:keydelete(<<"connection">>, 1,
		lists:keydelete(<<"keep-alive">>, 1,
		lists:keydelete(<<"proxy-connection">>, 1,
		lists:keydelete(<<"transfer-encoding">>, 1,
		lists:keydelete(<<"upgrade">>, 1, Headers0)))))),
	Headers = [
		{<<":method">>, Method},
		{<<":scheme">>, case Transport of
			gun_tls -> <<"https">>;
			gun_tcp -> <<"http">>
		end},
		{<<":authority">>, Authority},
		{<<":path">>, Path}
	|Headers1],
	cow_hpack:encode(Headers, EncodeState).

data(State0, StreamRef, ReplyTo, IsFin, Data) ->
	case get_stream_by_ref(StreamRef, State0) of
		#stream{local=fin} ->
			error_stream_closed(State0, StreamRef, ReplyTo);
		Stream0 = #stream{} ->
			{State, Stream} = send_data(State0, Stream0, IsFin, Data),
			maybe_delete_stream(State, Stream);
		false ->
			error_stream_not_found(State0, StreamRef, ReplyTo)
	end.

%% @todo Should we ever want to implement the PRIORITY mechanism,
%% this would be the place to do it. Right now, we just go over
%% all streams and send what we can until either everything is
%% sent or we run out of space in the window.
send_data(State=#http2_state{streams=Streams}) ->
	resume_streams(State, Streams, []).

%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
%% the local stream windows for all active streams and perhaps
%% resume sending data.
%update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) ->
%	Streams = [
%		S#stream{local_window=StreamWindow + Increment}
%	|| S=#stream{local_window=StreamWindow} <- Streams0],
%	resume_streams(State, Streams, []).

%% When we receive an ack to a SETTINGS frame we sent we need to update
%% the remote stream windows for all active streams.
%update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) ->
%	Streams = [
%		S#stream{remote_window=StreamWindow + Increment}
%	|| S=#stream{remote_window=StreamWindow} <- Streams0],
%	State#http2_state{streams=Streams}.

resume_streams(State, [], Acc) ->
	State#http2_state{streams=lists:reverse(Acc)};
%% While technically we should never get < 0 here, let's be on the safe side.
resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc)
		when ConnWindow =< 0 ->
	State#http2_state{streams=lists:reverse(Acc, Streams)};
%% We rely on send_data/2 to do all the necessary checks about the stream.
resume_streams(State0, [Stream0|Tail], Acc) ->
	{State1, Stream} = send_data(State0, Stream0),
	resume_streams(State1, Tail, [Stream|Acc]).

send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers})
		when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) ->
	send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers);
%% @todo It's possible that the stream terminates. We must remove it.
send_data(State=#http2_state{local_window=ConnWindow},
		Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize})
		when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
	{State, Stream};
send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) ->
	%% We know there is an item in the queue.
	{{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
	{State, Stream} = send_data(State0,
		Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
		IsFin, Data, in_r),
	send_data(State, Stream).

send_data(State, Stream, IsFin, Data) ->
	send_data(State, Stream, IsFin, Data, in).

%% We can send trailers immediately if the queue is empty, otherwise we queue.
%% We always send trailer frames even if the window is empty.
send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) ->
	send_trailers(State, Stream, Trailers);
send_data(State, Stream, fin, {trailers, Trailers}, _) ->
	{State, Stream#stream{local_trailers=Trailers}};
%% Send data immediately if we can, buffer otherwise.
send_data(State=#http2_state{local_window=ConnWindow},
		Stream=#stream{local_window=StreamWindow}, IsFin, Data, In)
		when ConnWindow =< 0; StreamWindow =< 0 ->
	{State, queue_data(Stream, IsFin, Data, In)};
send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
		remote_settings=RemoteSettings, local_window=ConnWindow},
		Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) ->
	RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
	ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
	MaxSendSize = min(
		min(ConnWindow, StreamWindow),
		min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
	),
	case Data of
%		{sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize ->
%			Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
%			Transport:sendfile(Socket, Path, Offset, Bytes),
%			{State#http2_state{local_window=ConnWindow - Bytes},
%				Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}};
%		{sendfile, Offset, Bytes, Path} ->
%			Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)),
%			Transport:sendfile(Socket, Path, Offset, MaxSendSize),
%			send_data(State#http2_state{local_window=ConnWindow - MaxSendSize},
%				Stream#stream{local_window=StreamWindow - MaxSendSize},
%				IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In);
		Iolist0 ->
			IolistSize = iolist_size(Iolist0),
			if
				IolistSize =< MaxSendSize ->
					Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)),
					{State#http2_state{local_window=ConnWindow - IolistSize},
						Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}};
				true ->
					{Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
					Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)),
					send_data(State#http2_state{local_window=ConnWindow - MaxSendSize},
						Stream#stream{local_window=StreamWindow - MaxSendSize},
						IsFin, More, In)
			end
	end.

send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0},
		Stream=#stream{id=StreamID}, Trailers) ->
	{HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
	Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
	{State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}.

queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
	DataSize = case Data of
%		{sendfile, _, Bytes, _} -> Bytes;
		Iolist -> iolist_size(Iolist)
	end,
	Q = queue:In({IsFin, DataSize, Data}, Q0),
	Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}.

cancel(State=#http2_state{socket=Socket, transport=Transport},
		StreamRef, ReplyTo) ->
	case get_stream_by_ref(StreamRef, State) of
		#stream{id=StreamID} ->
			Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
			delete_stream(StreamID, State);
		false ->
			error_stream_not_found(State, StreamRef, ReplyTo)
	end.

%% @todo Add unprocessed streams when GOAWAY handling is done.
down(#http2_state{streams=Streams}) ->
	KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
	{KilledStreams, []}.

terminate(#http2_state{socket=Socket, transport=Transport, streams=Streams}, Reason) ->
	%% Because a particular stream is unknown,
	%% we're sending the error message to all streams.
	%% @todo We should not send duplicate messages to processes.
	%% @todo We should probably also inform the owner process.
	_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
	%% @todo LastGoodStreamID
	Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)),
	close.

terminate(State=#http2_state{socket=Socket, transport=Transport}, StreamID, Reason) ->
	case get_stream_by_id(StreamID, State) of
		#stream{reply_to=ReplyTo} ->
			ReplyTo ! {gun_error, self(), Reason},
			%% @todo LastGoodStreamID
			Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)),
			close;
		_ ->
			terminate(State, Reason)
	end.

terminate_reason({connection_error, Reason, _}) -> Reason;
terminate_reason({stop, _, _}) -> no_error.

%% Stream functions.

stream_decode_init(State=#http2_state{decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) ->
	try cow_hpack:decode(HeaderBlock, DecodeState0) of
		{Headers, DecodeState} ->
			stream_pseudo_headers_init(State#http2_state{decode_state=DecodeState},
				StreamID, IsFin, Headers)
	catch _:_ ->
		terminate(State, {connection_error, compression_error,
			'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
	end.

stream_pseudo_headers_init(State, StreamID, IsFin, Headers0) ->
	case pseudo_headers(Headers0, #{}) of
		{ok, PseudoHeaders, Headers} ->
			stream_resp_init(State, StreamID, IsFin, Headers, PseudoHeaders);
%% @todo When we handle trailers properly:
%		{ok, _, _} ->
%			stream_malformed(State, StreamID,
%				'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
%% Or:
%		{ok, _, _} ->
%			stream_reset(State, StreamID, {stream_error, protocol_error,
%				'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
		{error, HumanReadable} ->
			stream_reset(State, StreamID, {stream_error, protocol_error, HumanReadable})
	end.

pseudo_headers([{<<":status">>, _}|_], #{status := _}) ->
	{error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'};
pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) ->
	try cow_http:status_to_integer(Status) of
		IntStatus ->
			pseudo_headers(Tail, PseudoHeaders#{status => IntStatus})
	catch _:_ ->
		{error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'}
	end;
pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
pseudo_headers(Headers, PseudoHeaders) ->
	{ok, PseudoHeaders, Headers}.

stream_resp_init(State=#http2_state{content_handlers=Handlers0},
		StreamID, IsFin, Headers, PseudoHeaders) ->
	case get_stream_by_id(StreamID, State) of
		Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} ->
			case PseudoHeaders of
				#{status := Status} when Status >= 100, Status =< 199 ->
					ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
					State;
				#{status := Status} ->
					ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
					Handlers = case IsFin of
						fin -> undefined;
						nofin ->
							gun_content_handler:init(ReplyTo, StreamRef,
								Status, Headers, Handlers0)
					end,
					remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
				%% @todo For now we assume that it's a trailer if there's no :status.
				%% A better state machine is needed to distinguish between that and errors.
				_ ->
					%% @todo We probably want to pass this to gun_content_handler?
					ReplyTo ! {gun_trailers, self(), StreamRef, Headers},
					remote_fin(Stream, State, fin)
			end;
		_ ->
			stream_reset(State, StreamID, {stream_error, stream_closed,
				'HEADERS frame received for a closed or non-existent stream. (RFC7540 6.1)'})
	end.

stream_reset(State=#http2_state{socket=Socket, transport=Transport,
		streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
	case lists:keytake(StreamID, #stream.id, Streams0) of
		{value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
			ReplyTo ! {gun_error, self(), StreamRef, StreamError},
			State#http2_state{streams=Streams};
		false ->
			%% @todo Unknown stream. Not sure what to do here. Check again once all
			%% terminate calls have been written.
			State
	end.

error_stream_closed(State, StreamRef, ReplyTo) ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"The stream has already been closed."}},
	State.

error_stream_not_found(State, StreamRef, ReplyTo) ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"The stream cannot be found."}},
	State.

%% Streams.
%% @todo probably change order of args and have state first?

new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{
		local_settings=#{initial_window_size := RemoteWindow},
		remote_settings=#{initial_window_size := LocalWindow}}) ->
	#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
		remote=Remote, remote_window=RemoteWindow,
		local=Local, local_window=LocalWindow}.

get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->
	lists:keyfind(StreamID, #stream.id, Streams).

get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) ->
	lists:keyfind(StreamRef, #stream.ref, Streams).

delete_stream(StreamID, State=#http2_state{streams=Streams}) ->
	Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
	State#http2_state{streams=Streams2}.

remote_fin(S=#stream{local=fin}, State, fin) ->
	delete_stream(S#stream.id, State);
%% We always replace the stream in the state because
%% the content handler state has changed.
remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
	Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
		S#stream{remote=IsFin}),
	State#http2_state{streams=Streams2}.

maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) ->
	delete_stream(Stream#stream.id, State);
maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) ->
	State#http2_state{streams=
		lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}.