diff options
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r-- | src/cowboy_http2.erl | 93 |
1 files changed, 65 insertions, 28 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 2b189e2..e57f02c 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -98,6 +98,10 @@ %% by the client or by the server through PUSH_PROMISE frames. streams = [] :: [stream()], + %% HTTP/2 streams that have been reset recently. We are expected + %% to keep receiving additional frames after sending an RST_STREAM. + lingering_streams = [] :: [cowboy_stream:streamid()], + %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. children = cowboy_children:init() :: cowboy_children:children(), @@ -286,14 +290,12 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _}) when StreamID > LastStreamID -> terminate(State, {connection_error, protocol_error, 'DATA frame received on a stream in idle state. (RFC7540 5.1)'}); -frame(State0=#state{remote_window=ConnWindow, streams=Streams}, +frame(State0=#state{remote_window=ConnWindow, streams=Streams, lingering_streams=Lingering}, {data, StreamID, IsFin, Data}) -> DataLen = byte_size(Data), State = State0#state{remote_window=ConnWindow - DataLen}, case lists:keyfind(StreamID, #stream.id, Streams) of Stream = #stream{state=flush, remote=nofin, remote_window=StreamWindow} -> - %% @todo We need to cancel streams that we don't want to receive - %% the full body from after we finish flushing the response. after_commands(State, Stream#stream{remote=IsFin, remote_window=StreamWindow - DataLen}); Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -311,9 +313,17 @@ frame(State0=#state{remote_window=ConnWindow, streams=Streams}, stream_reset(State, StreamID, {stream_error, stream_closed, 'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)'}); false -> - %% @todo What about RST_STREAM? Sigh. - terminate(State, {connection_error, stream_closed, - 'DATA frame received for a closed stream. (RFC7540 5.1)'}) + %% After we send an RST_STREAM frame and terminate a stream, + %% the client still might be sending us some more frames + %% until it can process this RST_STREAM. We therefore ignore + %% DATA frames received for such lingering streams. + case lists:member(StreamID, Lingering) of + true -> + State0; + false -> + terminate(State, {connection_error, stream_closed, + 'DATA frame received for a closed stream. (RFC7540 5.1)'}) + end end; %% HEADERS frame with invalid even-numbered streamid. frame(State, {headers, StreamID, _, _, _}) when StreamID rem 2 =:= 0 -> @@ -574,9 +584,6 @@ status(Status) when is_integer(Status) -> status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 -> << H, T, U >>. - - - %% @todo Should we ever want to implement the PRIORITY mechanism, %% this would be the place to do it. Right now, we just go over %% all streams and send what we can until either everything is @@ -584,7 +591,6 @@ status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, send_data(State=#state{streams=Streams}) -> resume_streams(State, Streams, []). -%% @todo When streams terminate we need to remove the stream. resume_streams(State, [], Acc) -> State#state{streams=lists:reverse(Acc)}; %% While technically we should never get < 0 here, let's be on the safe side. @@ -593,8 +599,18 @@ resume_streams(State=#state{local_window=ConnWindow}, Streams, Acc) State#state{streams=lists:reverse(Acc, Streams)}; %% We rely on send_data/2 to do all the necessary checks about the stream. resume_streams(State0, [Stream0|Tail], Acc) -> - {State, Stream} = send_data(State0, Stream0), - resume_streams(State, Tail, [Stream|Acc]). + {State1, Stream} = send_data(State0, Stream0), + case Stream of + %% We are done flushing, remove the stream. + %% Maybe skip the request body if it was not fully read. + #stream{state=flush, local=fin} -> + State = maybe_skip_body(State1, Stream, normal), + resume_streams(State, Tail, Acc); + %% Keep the stream. Either the stream handler is still running, + %% or we are not finished flushing. + _ -> + resume_streams(State1, Tail, [Stream|Acc]) + end. %% @todo We might want to print an error if local=fin. %% @@ -762,43 +778,54 @@ stream_handler_init(State=#state{opts=Opts, 'Unhandled exception in cowboy_stream:init/3.'}) end. -%% @todo We might need to keep track of which stream has been reset so we don't send lots of them. -stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, - StreamError={internal_error, _, _}) -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, internal_error)), - stream_terminate(State, StreamID, StreamError); -stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, - StreamError={stream_error, Reason, _}) -> +%% @todo Don't send an RST_STREAM if one was already sent. +stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, StreamError) -> + Reason = case StreamError of + {internal_error, _, _} -> internal_error; + {stream_error, Reason0, _} -> Reason0 + end, Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), - stream_terminate(State, StreamID, StreamError). + stream_terminate(stream_linger(State, StreamID), StreamID, StreamError). -stream_terminate(State=#state{socket=Socket, transport=Transport, +%% We only keep up to 100 streams in this state. @todo Make it configurable? +stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) -> + Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)], + State#state{lingering_streams=Lingering}. + +stream_terminate(State0=#state{socket=Socket, transport=Transport, streams=Streams0, children=Children0}, StreamID, Reason) -> case lists:keytake(StreamID, #stream.id, Streams0) of %% When the stream terminates normally (without sending RST_STREAM) %% and no response was sent, we need to send a proper response back to the client. - {value, #stream{local=idle}, Streams} when Reason =:= normal -> - State1 = #state{streams=Streams1} = info(State, StreamID, {response, 204, #{}, <<>>}), + {value, Stream=#stream{local=idle}, Streams} when Reason =:= normal -> + State1 = #state{streams=Streams1} = info(State0, StreamID, {response, 204, #{}, <<>>}), + State = maybe_skip_body(State1, Stream, Reason), #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams1), stream_call_terminate(StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State1#state{streams=Streams, children=Children}; + State#state{streams=Streams, children=Children}; %% When a response was sent but not terminated, we need to close the stream. - {value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams} + {value, Stream=#stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams} when Reason =:= normal -> Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)), + State = maybe_skip_body(State0, Stream, Reason), stream_call_terminate(StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; %% Unless there is still data in the buffer. We can however reset %% a few fields and set a special local state to avoid confusion. + %% + %% We do not reset the stream in this case (to skip the body) + %% because we are still sending data via the buffer. We will + %% reset the stream if necessary once the buffer is empty. {value, Stream=#stream{state=StreamState, local=nofin}, Streams} -> stream_call_terminate(StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=[Stream#stream{state=flush, local=flush}|Streams], + State0#state{streams=[Stream#stream{state=flush, local=flush}|Streams], children=Children}; %% Otherwise we sent an RST_STREAM and/or the stream is already closed. - {value, #stream{state=StreamState}, Streams} -> + {value, Stream=#stream{state=StreamState}, Streams} -> + State = maybe_skip_body(State0, Stream, Reason), stream_call_terminate(StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; @@ -807,9 +834,19 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, %% the cowboy_stream:init call failed, in which case doing nothing %% is correct. false -> - State + State0 end. +%% When the stream stops normally without reading the request +%% body fully we need to tell the client to stop sending it. +%% We do this by sending an RST_STREAM with reason NO_ERROR. (RFC7540 8.1.0) +maybe_skip_body(State=#state{socket=Socket, transport=Transport}, + #stream{id=StreamID, remote=nofin}, normal) -> + Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)), + stream_linger(State, StreamID); +maybe_skip_body(State, _, _) -> + State. + stream_call_terminate(StreamID, Reason, StreamState) -> try cowboy_stream:terminate(StreamID, Reason, StreamState) |