From 871989eef53663285c165fdfb83a5918ebe00d41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 30 Oct 2018 10:03:17 +0100 Subject: Switch to cow_http2_machine for HTTP/2 A common state machine for Gun and Cowboy will be easier to maintain. This fixes numerous issues including some test failures that were hidden because the h2specd_SUITE was flawed. We temporarily depend on Cowlib master until a new version is released. --- src/gun_http2.erl | 774 +++++++++++++++++++----------------------------------- 1 file changed, 274 insertions(+), 500 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 99776c6..a562e59 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -27,22 +27,14 @@ -export([down/1]). -record(stream, { - id :: non_neg_integer(), + id = undefined :: cow_http2:streamid(), + + %% Reference used by the user of Gun to refer to this stream. ref :: reference(), + + %% Process to send messages to. reply_to :: pid(), - %% Whether we finished sending data. - local = nofin :: fin | nofin, - %% Local flow control window (how much we can send). - local_window :: integer(), - %% Buffered data waiting for the flow control window to increase. - local_buffer = queue:new() :: queue:queue( - {fin | nofin, non_neg_integer(), iolist()}), - local_buffer_size = 0 :: non_neg_integer(), - local_trailers = undefined :: undefined | cow_http:headers(), - %% Whether we finished receiving data. - remote = nofin :: fin | nofin, - %% Remote flow control window (how much we accept to receive). - remote_window :: integer(), + %% Content handlers state. handler_state :: undefined | gun_content_handler:state() }). @@ -55,32 +47,12 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), - local_settings = #{ - initial_window_size => 65535, - max_frame_size => 16384 - } :: map(), - remote_settings = #{ - initial_window_size => 65535 - } :: map(), - - %% Connection-wide flow control window. - local_window = 65535 :: integer(), %% How much we can send. - remote_window = 65535 :: integer(), %% How much we accept to receive. - - streams = [] :: [#stream{}], - stream_id = 1 :: non_neg_integer(), - - %% The client starts by sending a sequence of bytes as a preface, - %% followed by a potentially empty SETTINGS frame. Then the connection - %% is established and continues normally. An exception is when a HEADERS - %% frame is sent followed by CONTINUATION frames: no other frame can be - %% sent in between. - parse_state = undefined :: normal - | {continuation, cowboy_stream:streamid(), cowboy_stream:fin(), binary()}, - - %% HPACK decoding and encoding state. - decode_state = cow_hpack:init() :: cow_hpack:state(), - encode_state = cow_hpack:init() :: cow_hpack:state() + %% HTTP/2 state machine. + http2_machine :: cow_http2_machine:http2_machine(), + + %% Currently active HTTP/2 streams. Streams may be initiated either + %% by the client or by the server through PUSH_PROMISE frames. + streams = [] :: [#stream{}] }). check_options(Opts) -> @@ -97,214 +69,160 @@ do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); -%% @todo max_frame_size_sent do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. name() -> http2. init(Owner, Socket, Transport, Opts) -> + {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), + %% @todo Better validate the preface being received. State = #http2_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, content_handlers=Handlers, - parse_state=normal}, %% @todo Have a special parse state for preface. - #http2_state{local_settings=Settings} = State, - %% Send the HTTP/2 preface. - Transport:send(Socket, [ - << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, - cow_http2:settings(Settings) - ]), + http2_machine=HTTP2Machine}, + Transport:send(Socket, Preface), State. handle(Data, State=#http2_state{buffer=Buffer}) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). -parse(Data0, State0=#http2_state{buffer=Buffer, parse_state=PS}) -> - %% @todo Parse states: Preface. Continuation. - Data = << Buffer/binary, Data0/binary >>, - case cow_http2:parse(Data) of - {ok, Frame, Rest} when PS =:= normal -> - case frame(Frame, State0) of +parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}) -> + MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), + case cow_http2:parse(Data, MaxFrameSize) of + {ok, Frame, Rest} -> + case frame(State0, Frame) of close -> close; - State1 -> parse(Rest, State1) + State -> parse(Rest, State) end; - {ok, Frame, Rest} when element(1, PS) =:= continuation -> - case continuation_frame(Frame, State0) of + {ignore, Rest} -> + case ignored_frame(State0) of close -> close; - State1 -> parse(Rest, State1) + State -> parse(Rest, State) end; - {ignore, _} when element(1, PS) =:= continuation -> - terminate(State0, {connection_error, protocol_error, - 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}); - {ignore, Rest} -> - parse(Rest, State0); {stream_error, StreamID, Reason, Human, Rest} -> - parse(Rest, stream_reset(State0, StreamID, {stream_error, Reason, Human})); + parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human})); Error = {connection_error, _, _} -> terminate(State0, Error); more -> {state, State0#http2_state{buffer=Data}} end. -%% DATA frame. -frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) -> - case get_stream_by_id(StreamID, State0) of - Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} -> - Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), - {Stream, State} = send_window_update( - Stream0#stream{remote_window=StreamWindow - byte_size(Data), - handler_state=Handlers}, - State0#http2_state{remote_window=ConnWindow - byte_size(Data)}), - remote_fin(Stream, State, IsFin); - _ -> - %% @todo protocol_error if not existing - stream_reset(State0, StreamID, {stream_error, stream_closed, - 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) - end; -%% Single HEADERS frame headers block. -frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, State) -> - stream_decode_init(State, StreamID, IsFin, HeaderBlock); -%% HEADERS frame starting a headers block. Enter continuation mode. -frame({headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}, State) -> - State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; -%% Single HEADERS frame headers block with priority. -frame({headers, StreamID, IsFin, head_fin, - _IsExclusive, _DepStreamID, _Weight, HeaderBlock}, State) -> - stream_decode_init(State, StreamID, IsFin, HeaderBlock); -%% @todo HEADERS frame starting a headers block. Enter continuation mode. -%frame(State, {headers, StreamID, IsFin, head_nofin, -% _IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) -> -% %% @todo Handle priority. -% State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; -%% @todo PRIORITY frame. -%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) -> -% %% @todo Validate StreamID? -% %% @todo Handle priority. -% State; -%% @todo RST_STREAM frame. -frame({rst_stream, StreamID, Reason}, State) -> - stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'}); -%% SETTINGS frame. -frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport, - remote_settings=Settings0}) -> - Transport:send(Socket, cow_http2:settings_ack()), - State#http2_state{remote_settings=maps:merge(Settings0, Settings)}; -%% Ack for a previously sent SETTINGS frame. -frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) -> - %% @todo Apply SETTINGS that require synchronization. - State; -%% PUSH_PROMISE frame. -%% @todo Continuation. -frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, - State=#http2_state{streams=Streams, decode_state=DecodeState0}) -> - case get_stream_by_id(PromisedStreamID, State) of - false -> - case get_stream_by_id(StreamID, State) of - #stream{ref=StreamRef, reply_to=ReplyTo} -> - try cow_hpack:decode(HeaderBlock, DecodeState0) of - {Headers0, DecodeState} -> - {Method, Scheme, Authority, Path, Headers} = try - {value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0), - {value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1), - {value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2), - {value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3), - {Method0, Scheme0, Authority0, Path0, Headers4} - catch error:badmatch -> - stream_reset(State, StreamID, {stream_error, protocol_error, - 'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'}) - end, - NewStreamRef = make_ref(), - ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method, - iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, - NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo, - nofin, fin, State), - State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState} - catch _:_ -> - terminate(State, {connection_error, compression_error, - 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) - end; - _ -> - stream_reset(State, StreamID, {stream_error, stream_closed, - 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) - end; +%% Frames received. + +frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame) -> + case cow_http2_machine:frame(Frame, HTTP2Machine0) of + {ok, HTTP2Machine} -> + maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame); + {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> + data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); + {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> + headers_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, IsFin, Headers, PseudoHeaders, BodyLen); + {ok, {trailers, StreamID, Trailers}, HTTP2Machine} -> + trailers_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, Trailers); + {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> + rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason); + {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> + push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, PromisedStreamID, Headers, PseudoHeaders); + {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> + terminate(State#http2_state{http2_machine=HTTP2Machine}, + {stop, Frame, 'Server is going away.'}); + {send, SendData, HTTP2Machine} -> + send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData); + {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> + reset_stream(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, {stream_error, Reason, Human}); + {error, Error={connection_error, _, _}, HTTP2Machine} -> + terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + end. + +maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> + case Frame of + {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); + {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); + _ -> ok + end, + State. + +data_frame(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0}, StreamID, IsFin, Data) -> + Stream = #stream{handler_state=Handlers0} = get_stream_by_id(State, StreamID), + Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), + Size = byte_size(Data), + HTTP2Machine = case Size of + %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. + 0 -> + HTTP2Machine0; _ -> - stream_reset(State, StreamID, {stream_error, todo, ''}) - end; -%% PING frame. -frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) -> - Transport:send(Socket, cow_http2:ping_ack(Opaque)), - State; -%% Ack for a previously sent PING frame. -%% -%% @todo Might want to check contents but probably a waste of time. -frame({ping_ack, _Opaque}, State) -> - State; -%% GOAWAY frame. -frame(Frame={goaway, StreamID, _, _}, State) -> - terminate(State, StreamID, {stop, Frame, 'Client is going away.'}); -%% Connection-wide WINDOW_UPDATE frame. -frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) - when ConnWindow + Increment > 16#7fffffff -> - terminate(State, {connection_error, flow_control_error, - 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}); -frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) -> - send_data(State#http2_state{local_window=ConnWindow + Increment}); -%% Stream-specific WINDOW_UPDATE frame. -frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) -> - case lists:keyfind(StreamID, #stream.id, Streams0) of - #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff -> - stream_reset(State0, StreamID, {stream_error, flow_control_error, - 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}); - Stream0 = #stream{local_window=StreamWindow} -> - {State, Stream} = send_data(State0, - Stream0#stream{local_window=StreamWindow + Increment}), - Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), + Transport:send(Socket, cow_http2:window_update(Size)), + HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), + %% We do not send a stream WINDOW_UPDATE if this was the last DATA frame. + case IsFin of + nofin -> + Transport:send(Socket, cow_http2:window_update(StreamID, Size)), + cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1); + fin -> + HTTP2Machine1 + end + end, + maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin). + +headers_frame(State=#http2_state{content_handlers=Handlers0}, + StreamID, IsFin, Headers, PseudoHeaders, _BodyLen) -> + Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + case PseudoHeaders of + #{status := Status} when Status >= 100, Status =< 199 -> + ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, + State; + #{status := Status} -> + ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + Handlers = case IsFin of + fin -> undefined; + nofin -> + gun_content_handler:init(ReplyTo, StreamRef, + Status, Headers, Handlers0) + end, + maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), + StreamID, remote, IsFin) + end. + +trailers_frame(State, StreamID, Trailers) -> + #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + %% @todo We probably want to pass this to gun_content_handler? + ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, + maybe_delete_stream(State, StreamID, remote, fin). + +rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) -> + case lists:keytake(StreamID, #stream.id, Streams0) of + {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, State#http2_state{streams=Streams}; false -> - %% @todo Receiving this frame on a stream in the idle state is an error. - %% WINDOW_UPDATE frames may be received for a short period of time - %% after a stream is closed. They must be ignored. - State0 - end; -%% Unexpected CONTINUATION frame. -frame({continuation, StreamID, _, _}, State) -> - terminate(State, StreamID, {connection_error, protocol_error, - 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). - -continuation_frame({continuation, StreamID, head_fin, HeaderBlockFragment1}, - State=#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment0}}) -> - HeaderBlock = << HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>, - stream_decode_init(State#http2_state{parse_state=normal}, StreamID, IsFin, HeaderBlock); -continuation_frame({continuation, StreamID, head_nofin, HeaderBlockFragment1}, - State=#http2_state{parse_state= - {continuation, StreamID, IsFin, HeaderBlockFragment0}}) -> - State#http2_state{parse_state={continuation, StreamID, IsFin, - << HeaderBlockFragment0/binary, HeaderBlockFragment1/binary >>}}; -continuation_frame(_, State) -> - terminate(State, {connection_error, protocol_error, - 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}). - -send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0}, - State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) -> - %% @todo We should make the windows configurable. - MinConnWindow = 8000000, - MinStreamWindow = 1000000, - ConnWindow = if - ConnWindow0 =< MinConnWindow -> - Transport:send(Socket, cow_http2:window_update(MinConnWindow)), - ConnWindow0 + MinConnWindow; - true -> - ConnWindow0 - end, - StreamWindow = if - StreamWindow0 =< MinStreamWindow -> - Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)), - StreamWindow0 + MinStreamWindow; - true -> - StreamWindow0 - end, - {Stream#stream{remote_window=StreamWindow}, - State#http2_state{remote_window=ConnWindow}}. + State + end. + +push_promise_frame(State=#http2_state{streams=Streams}, + StreamID, PromisedStreamID, Headers, #{ + method := Method, scheme := Scheme, + authority := Authority, path := Path}) -> + #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + PromisedStreamRef = make_ref(), + ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, + iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo}, + State#http2_state{streams=[NewStream|Streams]}. + +ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> + case cow_http2_machine:ignored_frame(HTTP2Machine0) of + {ok, HTTP2Machine} -> + State#http2_state{http2_machine=HTTP2Machine}; + {error, Error={connection_error, _, _}, HTTP2Machine} -> + terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + end. close(#http2_state{streams=Streams}) -> close_streams(Streams). @@ -320,193 +238,141 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. -request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, - Method, Host, Port, Path, Headers) -> - {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), - IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) - orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of +request(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0, streams=Streams}, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0) -> + IsFin0 = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers0)) + orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers0)) of true -> nofin; false -> fin end, + {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(Method, HTTP2Machine0), + {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), + {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( + StreamID, HTTP2Machine1, IsFin0, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State), - State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. - -%% @todo Handle Body > 16MB. (split it out into many frames) -request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, - Method, Host, Port, Path, Headers0, Body) -> - Headers = lists:keystore(<<"content-length">>, 1, Headers0, + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}. + +request(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0, streams=Streams}, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> + Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), - {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), - Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0), - {State, Stream} = send_data(State0, Stream0, fin, Body), - State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. - -prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> - Host2 = case Host0 of + {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(Method, HTTP2Machine0), + {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers1), + {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( + StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), + Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + maybe_send_data(State#http2_state{http2_machine=HTTP2Machine, + streams=[Stream|Streams]}, StreamID, fin, Body). + +prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) -> + Host1 = case Host0 of {local, _SocketPath} -> <<>>; Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 end, Authority = case lists:keyfind(<<"host">>, 1, Headers0) of {_, Host} -> Host; - _ -> [Host2, $:, integer_to_binary(Port)] + _ -> [Host1, $:, integer_to_binary(Port)] end, %% @todo We also must remove any header found in the connection header. - Headers1 = + Headers = lists:keydelete(<<"host">>, 1, lists:keydelete(<<"connection">>, 1, lists:keydelete(<<"keep-alive">>, 1, lists:keydelete(<<"proxy-connection">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, lists:keydelete(<<"upgrade">>, 1, Headers0)))))), - Headers = [ - {<<":method">>, Method}, - {<<":scheme">>, case Transport of + PseudoHeaders = #{ + method => Method, + scheme => case Transport of gun_tls -> <<"https">>; gun_tcp -> <<"http">> - end}, - {<<":authority">>, Authority}, - {<<":path">>, Path} - |Headers1], - cow_hpack:encode(Headers, EncodeState). - -data(State0, StreamRef, ReplyTo, IsFin, Data) -> - case get_stream_by_ref(StreamRef, State0) of - #stream{local=fin} -> - error_stream_closed(State0, StreamRef, ReplyTo); - Stream0 = #stream{} -> - {State, Stream} = send_data(State0, Stream0, IsFin, Data), - maybe_delete_stream(State, Stream); + end, + authority => Authority, + path => Path + }, + {ok, PseudoHeaders, Headers}. + +data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data) -> + case get_stream_by_ref(State, StreamRef) of + #stream{id=StreamID} -> + case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of + {ok, fin, _} -> + error_stream_closed(State, StreamRef, ReplyTo); + {ok, _, fin} -> + error_stream_closed(State, StreamRef, ReplyTo); + {ok, _, _} -> + maybe_send_data(State, StreamID, IsFin, Data) + end; false -> - error_stream_not_found(State0, StreamRef, ReplyTo) + error_stream_not_found(State, StreamRef, ReplyTo) end. -%% @todo Should we ever want to implement the PRIORITY mechanism, -%% this would be the place to do it. Right now, we just go over -%% all streams and send what we can until either everything is -%% sent or we run out of space in the window. -send_data(State=#http2_state{streams=Streams}) -> - resume_streams(State, Streams, []). - -%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update -%% the local stream windows for all active streams and perhaps -%% resume sending data. -%update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) -> -% Streams = [ -% S#stream{local_window=StreamWindow + Increment} -% || S=#stream{local_window=StreamWindow} <- Streams0], -% resume_streams(State, Streams, []). - -%% When we receive an ack to a SETTINGS frame we sent we need to update -%% the remote stream windows for all active streams. -%update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) -> -% Streams = [ -% S#stream{remote_window=StreamWindow + Increment} -% || S=#stream{remote_window=StreamWindow} <- Streams0], -% State#http2_state{streams=Streams}. - -resume_streams(State, [], Acc) -> - State#http2_state{streams=lists:reverse(Acc)}; -%% While technically we should never get < 0 here, let's be on the safe side. -resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc) - when ConnWindow =< 0 -> - State#http2_state{streams=lists:reverse(Acc, Streams)}; -%% We rely on send_data/2 to do all the necessary checks about the stream. -resume_streams(State0, [Stream0|Tail], Acc) -> - {State1, Stream} = send_data(State0, Stream0), - resume_streams(State1, Tail, [Stream|Acc]). - -send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers}) - when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) -> - send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers); -%% @todo It's possible that the stream terminates. We must remove it. -send_data(State=#http2_state{local_window=ConnWindow}, - Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize}) - when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> - {State, Stream}; -send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) -> - %% We know there is an item in the queue. - {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), - {State, Stream} = send_data(State0, - Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, - IsFin, Data, in_r), - send_data(State, Stream). - -send_data(State, Stream, IsFin, Data) -> - send_data(State, Stream, IsFin, Data, in). - -%% We can send trailers immediately if the queue is empty, otherwise we queue. -%% We always send trailer frames even if the window is empty. -send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) -> - send_trailers(State, Stream, Trailers); -send_data(State, Stream, fin, {trailers, Trailers}, _) -> - {State, Stream#stream{local_trailers=Trailers}}; -%% Send data immediately if we can, buffer otherwise. -send_data(State=#http2_state{local_window=ConnWindow}, - Stream=#stream{local_window=StreamWindow}, IsFin, Data, In) - when ConnWindow =< 0; StreamWindow =< 0 -> - {State, queue_data(Stream, IsFin, Data, In)}; -send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, - remote_settings=RemoteSettings, local_window=ConnWindow}, - Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) -> - RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), - ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), - MaxSendSize = min( - min(ConnWindow, StreamWindow), - min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) - ), - case Data of -% {sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize -> -% Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), -% Transport:sendfile(Socket, Path, Offset, Bytes), -% {State#http2_state{local_window=ConnWindow - Bytes}, -% Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}}; -% {sendfile, Offset, Bytes, Path} -> -% Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)), -% Transport:sendfile(Socket, Path, Offset, MaxSendSize), -% send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, -% Stream#stream{local_window=StreamWindow - MaxSendSize}, -% IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In); - Iolist0 -> - IolistSize = iolist_size(Iolist0), - if - IolistSize =< MaxSendSize -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)), - {State#http2_state{local_window=ConnWindow - IolistSize}, - Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}}; - true -> - {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), - Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)), - send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, - Stream#stream{local_window=StreamWindow - MaxSendSize}, - IsFin, More, In) - end +maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) -> + Data = case is_tuple(Data0) of + false -> {data, Data0}; + true -> Data0 + end, + case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of + {ok, HTTP2Machine} -> + State#http2_state{http2_machine=HTTP2Machine}; + {send, SendData, HTTP2Machine} -> + send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData) end. -send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0}, - Stream=#stream{id=StreamID}, Trailers) -> - {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), +send_data(State, []) -> + State; +send_data(State0, [{StreamID, IsFin, SendData}|Tail]) -> + State = send_data(State0, StreamID, IsFin, SendData), + send_data(State, Tail). + +send_data(State0, StreamID, IsFin, [Data]) -> + State = send_data_frame(State0, StreamID, IsFin, Data), + maybe_delete_stream(State, StreamID, local, IsFin); +send_data(State0, StreamID, IsFin, [Data|Tail]) -> + State = send_data_frame(State0, StreamID, nofin, Data), + send_data(State, StreamID, IsFin, Tail). + +send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, + StreamID, IsFin, {data, Data}) -> + Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), + State; +%% @todo Uncomment this once sendfile is supported. +%send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, +% StreamID, IsFin, {sendfile, Offset, Bytes, Path}) -> +% Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), +% Transport:sendfile(Socket, Path, Offset, Bytes), +% State; +%% The stream is terminated in cow_http2_machine:prepare_trailers. +send_data_frame(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) -> + {ok, HeaderBlock, HTTP2Machine} + = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers), Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), - {State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}. + State#http2_state{http2_machine=HTTP2Machine}. -queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> - DataSize = case Data of -% {sendfile, _, Bytes, _} -> Bytes; - Iolist -> iolist_size(Iolist) - end, - Q = queue:In({IsFin, DataSize, Data}, Q0), - Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. +reset_stream(State=#http2_state{socket=Socket, transport=Transport, + streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> + Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + case lists:keytake(StreamID, #stream.id, Streams0) of + {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> + ReplyTo ! {gun_error, self(), StreamRef, StreamError}, + State#http2_state{streams=Streams}; + false -> + State + end. -cancel(State=#http2_state{socket=Socket, transport=Transport}, - StreamRef, ReplyTo) -> - case get_stream_by_ref(StreamRef, State) of +cancel(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0}, StreamRef, ReplyTo) -> + case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), - delete_stream(StreamID, State); + delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID); false -> error_stream_not_found(State, StreamRef, ReplyTo) end. @@ -516,114 +382,25 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{socket=Socket, transport=Transport, streams=Streams}, Reason) -> - %% Because a particular stream is unknown, - %% we're sending the error message to all streams. +terminate(#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine, streams=Streams}, Reason) -> + %% The connection is going away either at the request of the server, + %% or because an error occurred in the protocol. Inform the streams. %% @todo We should not send duplicate messages to processes. %% @todo We should probably also inform the owner process. + + %% @todo Somehow streams aren't removed on receiving a response. _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], - %% @todo LastGoodStreamID - Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)), + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + terminate_reason(Reason), <<>>)), close. -terminate(State=#http2_state{socket=Socket, transport=Transport}, StreamID, Reason) -> - case get_stream_by_id(StreamID, State) of - #stream{reply_to=ReplyTo} -> - ReplyTo ! {gun_error, self(), Reason}, - %% @todo LastGoodStreamID - Transport:send(Socket, cow_http2:goaway(0, terminate_reason(Reason), <<>>)), - close; - _ -> - terminate(State, Reason) - end. - terminate_reason({connection_error, Reason, _}) -> Reason; terminate_reason({stop, _, _}) -> no_error. %% Stream functions. -stream_decode_init(State=#http2_state{decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) -> - try cow_hpack:decode(HeaderBlock, DecodeState0) of - {Headers, DecodeState} -> - stream_pseudo_headers_init(State#http2_state{decode_state=DecodeState}, - StreamID, IsFin, Headers) - catch _:_ -> - terminate(State, {connection_error, compression_error, - 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) - end. - -stream_pseudo_headers_init(State, StreamID, IsFin, Headers0) -> - case pseudo_headers(Headers0, #{}) of - {ok, PseudoHeaders, Headers} -> - stream_resp_init(State, StreamID, IsFin, Headers, PseudoHeaders); -%% @todo When we handle trailers properly: -% {ok, _, _} -> -% stream_malformed(State, StreamID, -% 'A required pseudo-header was not found. (RFC7540 8.1.2.3)'); -%% Or: -% {ok, _, _} -> -% stream_reset(State, StreamID, {stream_error, protocol_error, -% 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) - {error, HumanReadable} -> - stream_reset(State, StreamID, {stream_error, protocol_error, HumanReadable}) - end. - -pseudo_headers([{<<":status">>, _}|_], #{status := _}) -> - {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'}; -pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) -> - try cow_http:status_to_integer(Status) of - IntStatus -> - pseudo_headers(Tail, PseudoHeaders#{status => IntStatus}) - catch _:_ -> - {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'} - end; -pseudo_headers([{<<":", _/bits>>, _}|_], _) -> - {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; -pseudo_headers(Headers, PseudoHeaders) -> - {ok, PseudoHeaders, Headers}. - -stream_resp_init(State=#http2_state{content_handlers=Handlers0}, - StreamID, IsFin, Headers, PseudoHeaders) -> - case get_stream_by_id(StreamID, State) of - Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} -> - case PseudoHeaders of - #{status := Status} when Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, - State; - #{status := Status} -> - ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, - Handlers = case IsFin of - fin -> undefined; - nofin -> - gun_content_handler:init(ReplyTo, StreamRef, - Status, Headers, Handlers0) - end, - remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin); - %% @todo For now we assume that it's a trailer if there's no :status. - %% A better state machine is needed to distinguish between that and errors. - _ -> - %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), StreamRef, Headers}, - remote_fin(Stream, State, fin) - end; - _ -> - stream_reset(State, StreamID, {stream_error, stream_closed, - 'HEADERS frame received for a closed or non-existent stream. (RFC7540 6.1)'}) - end. - -stream_reset(State=#http2_state{socket=Socket, transport=Transport, - streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), - case lists:keytake(StreamID, #stream.id, Streams0) of - {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> - ReplyTo ! {gun_error, self(), StreamRef, StreamError}, - State#http2_state{streams=Streams}; - false -> - %% @todo Unknown stream. Not sure what to do here. Check again once all - %% terminate calls have been written. - State - end. - error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, @@ -635,36 +412,33 @@ error_stream_not_found(State, StreamRef, ReplyTo) -> State. %% Streams. -%% @todo probably change order of args and have state first? - -new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{ - local_settings=#{initial_window_size := RemoteWindow}, - remote_settings=#{initial_window_size := LocalWindow}}) -> - #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, - remote=Remote, remote_window=RemoteWindow, - local=Local, local_window=LocalWindow}. +%% @todo probably change order of args and have state first? Yes. -get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> +get_stream_by_id(#http2_state{streams=Streams}, StreamID) -> lists:keyfind(StreamID, #stream.id, Streams). -get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) -> +get_stream_by_ref(#http2_state{streams=Streams}, StreamRef) -> lists:keyfind(StreamRef, #stream.ref, Streams). -delete_stream(StreamID, State=#http2_state{streams=Streams}) -> - Streams2 = lists:keydelete(StreamID, #stream.id, Streams), - State#http2_state{streams=Streams2}. +store_stream(State=#http2_state{streams=Streams0}, Stream=#stream{id=StreamID}) -> + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), + State#http2_state{streams=Streams}. -remote_fin(S=#stream{local=fin}, State, fin) -> - delete_stream(S#stream.id, State); -%% We always replace the stream in the state because -%% the content handler state has changed. -remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> - Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, - S#stream{remote=IsFin}), - State#http2_state{streams=Streams2}. +maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, local, fin) -> + case cow_http2_machine:get_stream_remote_state(StreamID, HTTP2Machine) of + {ok, fin} -> delete_stream(State, StreamID); + {error, closed} -> delete_stream(State, StreamID); + _ -> State + end; +maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, remote, fin) -> + case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of + {ok, fin, _} -> delete_stream(State, StreamID); + {error, closed} -> delete_stream(State, StreamID); + _ -> State + end; +maybe_delete_stream(State, _, _, _) -> + State. -maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) -> - delete_stream(Stream#stream.id, State); -maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) -> - State#http2_state{streams= - lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}. +delete_stream(State=#http2_state{streams=Streams}, StreamID) -> + Streams2 = lists:keydelete(StreamID, #stream.id, Streams), + State#http2_state{streams=Streams2}. -- cgit v1.2.3