aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun_http2.erl
blob: 30ee06e4ee2c828dc86f0f27c253005da2676552 (plain) (tree)

































                                                                           


                                                                





                                                  
                                                      


















                                                                    




                                                            




                                         
                                       




                                                          


                                                                     


















                                                                                                  
                                              
                                                 


                                                                                        






                                                                                                          
                                                                                                          






                                                                                                                                        








                                                                                                                    






























                                                                                                                                           

                                                                                         










                                                                                       

































                                                                                                                                                  







































                                                                                            
                                                  

              

                                                                                         
                                                                                                                 









                                                                                             



                                                                                            
                                                                                                                 




                                                                                   


                                                                                     




                                                                               









                                                                               



                                                          
                                              



                                               
                                                                                       



                                                              



                                                                                                        




                                                                










                                                                                         





























































                                                                                               

                                               

                                                    












                                                                     
%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun_http2).

-export([check_options/1]).
-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
-export([request/7]).
-export([request/8]).
-export([data/4]).
-export([cancel/2]).
-export([down/1]).

-record(stream, {
	id :: non_neg_integer(),
	ref :: reference(),
	%% Whether we finished sending data.
	local = nofin :: cowboy_stream:fin(),
	%% Whether we finished receiving data.
	remote = nofin :: cowboy_stream:fin(),
	%% Content handlers state.
	handler_state :: undefined | gun_content_handler:state()
}).

-record(http2_state, {
	owner :: pid(),
	socket :: inet:socket() | ssl:sslsocket(),
	transport :: module(),
	content_handlers :: gun_content_handler:opt(),
	buffer = <<>> :: binary(),

	%% @todo local_settings, next_settings, remote_settings

	streams = [] :: [#stream{}],
	stream_id = 1 :: non_neg_integer(),

	%% HPACK decoding and encoding state.
	decode_state = cow_hpack:init() :: cow_hpack:state(),
	encode_state = cow_hpack:init() :: cow_hpack:state()
}).

check_options(Opts) ->
	do_check_options(maps:to_list(Opts)).

do_check_options([]) ->
	ok;
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
	do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
	case gun_content_handler:check_option(Handlers) of
		ok -> do_check_options(Opts);
		error -> {error, {options, {http, Opt}}}
	end;
do_check_options([Opt|_]) ->
	{error, {options, {http2, Opt}}}.

name() -> http2.

init(Owner, Socket, Transport, Opts) ->
	%% Send the HTTP/2 preface.
	Transport:send(Socket, [
		<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
		cow_http2:settings(#{}) %% @todo Settings.
	]),
	Handlers = maps:get(content_handlers, Opts, [gun_data]),
	#http2_state{owner=Owner, socket=Socket, transport=Transport,
		content_handlers=Handlers}.

handle(Data, State=#http2_state{buffer=Buffer}) ->
	parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).

parse(Data0, State=#http2_state{buffer=Buffer}) ->
	%% @todo Parse states: Preface. Continuation.
	Data = << Buffer/binary, Data0/binary >>,
	case cow_http2:parse(Data) of
		{ok, Frame, Rest} ->
			parse(Rest, frame(Frame, State));
		{stream_error, StreamID, Reason, Human, Rest} ->
			parse(Rest, stream_reset(State, StreamID, {stream_error, Reason, Human}));
		Error = {connection_error, _, _} ->
			terminate(State, Error);
		more ->
			State#http2_state{buffer=Data}
	end.

%% DATA frame.
frame({data, StreamID, IsFin, Data}, State) ->
	case get_stream_by_id(StreamID, State) of
		Stream = #stream{remote=nofin, handler_state=Handlers0} ->
			Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
			remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
		_ ->
			%% @todo protocol_error if not existing
			stream_reset(State, StreamID, {stream_error, stream_closed,
				'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
	end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
		State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) ->
	case get_stream_by_id(StreamID, State) of
		Stream = #stream{ref=StreamRef, remote=nofin} ->
			try cow_hpack:decode(HeaderBlock, DecodeState0) of
				{Headers0, DecodeState} ->
					case lists:keytake(<<":status">>, 1, Headers0) of
						{value, {_, Status}, Headers} ->
							Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
							%% @todo Change to ReplyTo.
							Handlers = case IsFin of
								fin -> undefined;
								nofin ->
									gun_content_handler:init(Owner, StreamRef,
										Status, Headers, Handlers0)
							end,
							remote_fin(Stream#stream{handler_state=Handlers},
								State#http2_state{decode_state=DecodeState}, IsFin);
						false ->
							stream_reset(State, StreamID, {stream_error, protocol_error,
								'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
					end
			catch _:_ ->
				terminate(State, {connection_error, compression_error,
					'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
			end;
		_ ->
			stream_reset(State, StreamID, {stream_error, stream_closed,
				'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
	end;
%% @todo HEADERS frame starting a headers block. Enter continuation mode.
%frame(State, {headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}) ->
%	State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
%% @todo Single HEADERS frame headers block with priority.
%frame(State, {headers, StreamID, IsFin, head_fin,
%		_IsExclusive, _DepStreamID, _Weight, HeaderBlock}) ->
%	%% @todo Handle priority.
%	stream_init(State, StreamID, IsFin, HeaderBlock);
%% @todo HEADERS frame starting a headers block. Enter continuation mode.
%frame(State, {headers, StreamID, IsFin, head_nofin,
%		_IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) ->
%	%% @todo Handle priority.
%	State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
%% @todo PRIORITY frame.
%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) ->
%	%% @todo Validate StreamID?
%	%% @todo Handle priority.
%	State;
%% @todo RST_STREAM frame.
frame({rst_stream, StreamID, Reason}, State) ->
	stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'});
%% SETTINGS frame.
frame({settings, _Settings}, State=#http2_state{socket=Socket, transport=Transport}) ->
	%% @todo Apply SETTINGS.
	Transport:send(Socket, cow_http2:settings_ack()),
	State;
%% Ack for a previously sent SETTINGS frame.
frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) ->
	%% @todo Apply SETTINGS that require synchronization.
	State;
%% PUSH_PROMISE frame.
%% @todo Continuation.
frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
		State=#http2_state{owner=Owner, decode_state=DecodeState0}) ->
	case get_stream_by_id(PromisedStreamID, State) of
		false ->
			case get_stream_by_id(StreamID, State) of
				#stream{ref=StreamRef} ->
					try cow_hpack:decode(HeaderBlock, DecodeState0) of
						{Headers0, DecodeState} ->
							{Method, Scheme, Authority, Path, Headers} = try
								{value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0),
								{value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1),
								{value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2),
								{value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3),
								{Method0, Scheme0, Authority0, Path0, Headers4}
							catch error:badmatch ->
								stream_reset(State, StreamID, {stream_error, protocol_error,
									'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'})
							end,
							NewStreamRef = make_ref(),
							Owner ! {gun_push, self(), StreamRef, NewStreamRef, Method,
								iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
							new_stream(PromisedStreamID, NewStreamRef, nofin, fin,
								State#http2_state{decode_state=DecodeState})
					catch _:_ ->
						terminate(State, {connection_error, compression_error,
							'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
					end;
				_ ->
					stream_reset(State, StreamID, {stream_error, stream_closed,
						'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
			end;
		_ ->
			stream_reset(State, StreamID, {stream_error, todo, ''})
	end;
%% PING frame.
frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) ->
	Transport:send(Socket, cow_http2:ping_ack(Opaque)),
	State;
%% Ack for a previously sent PING frame.
%%
%% @todo Might want to check contents but probably a waste of time.
frame({ping_ack, _Opaque}, State) ->
	State;
%% GOAWAY frame.
frame(Frame={goaway, _, _, _}, State) ->
	terminate(State, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
frame({window_update, _Increment}, State) ->
	%% @todo control flow
	State;
%% Stream-specific WINDOW_UPDATE frame.
frame({window_update, _StreamID, _Increment}, State) ->
	%% @todo stream-specific control flow
	State;
%% Unexpected CONTINUATION frame.
frame({continuation, _, _, _}, State) ->
	terminate(State, {connection_error, protocol_error,
		'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).

parse_status(Status) ->
	<< Code:3/binary, _/bits >> = Status,
	list_to_integer(binary_to_list(Code)).

close(#http2_state{owner=Owner, streams=Streams}) ->
	close_streams(Owner, Streams).

close_streams(_, []) ->
	ok;
close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
	Owner ! {gun_error, self(), StreamRef, {closed,
		"The connection was lost."}},
	close_streams(Owner, Tail).

keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
	Transport:send(Socket, cow_http2:ping(0)),
	State.

request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
		stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) ->
	{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
	IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
			orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
		true -> nofin;
		false -> fin
	end,
	Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
	new_stream(StreamID, StreamRef, nofin, IsFin,
		State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).

%% @todo Handle Body > 16MB. (split it out into many frames)
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
		stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers0, Body) ->
	Headers = lists:keystore(<<"content-length">>, 1, Headers0,
		{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
	{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
	Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
	%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
	%% Use the length set by the server instead, if any.
	%% @todo Would be better if we didn't have to convert to binary.
	send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
	new_stream(StreamID, StreamRef, nofin, fin,
		State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).

prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
	Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
		{_, Host} -> Host;
		_ -> [Host0, $:, integer_to_binary(Port)]
	end,
	%% @todo We also must remove any header found in the connection header.
	Headers1 =
		lists:keydelete(<<"host">>, 1,
		lists:keydelete(<<"connection">>, 1,
		lists:keydelete(<<"keep-alive">>, 1,
		lists:keydelete(<<"proxy-connection">>, 1,
		lists:keydelete(<<"transfer-encoding">>, 1,
		lists:keydelete(<<"upgrade">>, 1, Headers0)))))),
	Headers = [
		{<<":method">>, Method},
		{<<":scheme">>, case Transport:secure() of
			true -> <<"https">>;
			false -> <<"http">>
		end},
		{<<":authority">>, Authority},
		{<<":path">>, Path}
	|Headers1],
	cow_hpack:encode(Headers, EncodeState).

data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, Data) ->
	case get_stream_by_ref(StreamRef, State) of
		#stream{local=fin} ->
			error_stream_closed(State, StreamRef);
		S = #stream{} ->
			%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
			%% Use the length set by the server instead, if any.
			%% @todo Would be better if we didn't have to convert to binary.
			send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384),
			local_fin(S, State, IsFin);
		false ->
			error_stream_not_found(State, StreamRef)
	end.

%% This same function is found in cowboy_http2.
send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
	if
		Length < byte_size(Data) ->
			<< Payload:Length/binary, Rest/bits >> = Data,
			Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)),
			send_data(Socket, Transport, StreamID, IsFin, Rest, Length);
		true ->
			Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data))
	end.

cancel(State=#http2_state{socket=Socket, transport=Transport},
		StreamRef) ->
	case get_stream_by_ref(StreamRef, State) of
		#stream{id=StreamID} ->
			Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
			delete_stream(StreamID, State);
		false ->
			error_stream_not_found(State, StreamRef)
	end.

%% @todo Add unprocessed streams when GOAWAY handling is done.
down(#http2_state{streams=Streams}) ->
	KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
	{KilledStreams, []}.

terminate(#http2_state{owner=Owner}, Reason) ->
	Owner ! {gun_error, self(), Reason},
	%% @todo Send GOAWAY frame.
	%% @todo LastGoodStreamID
	close.

stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport,
		streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
	Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
	case lists:keytake(StreamID, #stream.id, Streams0) of
		{value, #stream{ref=StreamRef}, Streams} ->
			Owner ! {gun_error, self(), StreamRef, StreamError},
			State#http2_state{streams=Streams};
		false ->
			%% @todo Unknown stream. Not sure what to do here. Check again once all
			%% terminate calls have been written.
			State
	end.

error_stream_closed(State=#http2_state{owner=Owner}, StreamRef) ->
	Owner ! {gun_error, self(), StreamRef, {badstate,
		"The stream has already been closed."}},
	State.

error_stream_not_found(State=#http2_state{owner=Owner}, StreamRef) ->
	Owner ! {gun_error, self(), StreamRef, {badstate,
		"The stream cannot be found."}},
	State.

%% Streams.
%% @todo probably change order of args and have state first?

new_stream(StreamID, StreamRef, Remote, Local,
		State=#http2_state{streams=Streams}) ->
	New = #stream{id=StreamID, ref=StreamRef, remote=Remote, local=Local},
	State#http2_state{streams=[New|Streams]}.

get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->
	lists:keyfind(StreamID, #stream.id, Streams).

get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) ->
	lists:keyfind(StreamRef, #stream.ref, Streams).

delete_stream(StreamID, State=#http2_state{streams=Streams}) ->
	Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
	State#http2_state{streams=Streams2}.

remote_fin(S=#stream{local=fin}, State, fin) ->
	delete_stream(S#stream.id, State);
%% We always replace the stream in the state because
%% the content handler state has changed.
remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
	Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
		S#stream{remote=IsFin}),
	State#http2_state{streams=Streams2}.

local_fin(_, State, nofin) ->
	State;
local_fin(S=#stream{remote=fin}, State, fin) ->
	delete_stream(S#stream.id, State);
local_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
	Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
		S#stream{local=IsFin}),
	State#http2_state{streams=Streams2}.