From 752297b1539b4f74c9329c873f78485a52b5b8bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 16 Jan 2020 17:33:49 +0100 Subject: Fix bugs related to HTTP/1.1 pipelining The flow control is now only set to infinity when we are skipping the request body of the stream that is being terminated. This fixes a bug where it was set to infinity while reading a subsequent request's body, leading to a crash. The timeout is no longer reset on stream termination. Timeout handling is already done when receiving data from the socket and doing a reset on stream termination was leading to the wrong timeout being set or the right timeout being reset needlessly. --- src/cowboy_http.erl | 35 ++++++++++++++++++----------------- test/rfc7230_SUITE.erl | 5 +++-- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index ad2ef75..9f0b865 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -1266,6 +1266,7 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta children=Children0}, StreamID, Reason) -> #stream{version=Version, local_expected_size=ExpectedSize, local_sent_size=SentSize} = lists:keyfind(StreamID, #stream.id, Streams0), + %% Send a response or terminate chunks depending on the current output state. State1 = #state{streams=Streams1} = case OutState of wait when element(1, Reason) =:= internal_error -> info(State0, StreamID, {response, 500, #{<<"content-length">> => <<"0">>}, <<>>}); @@ -1280,19 +1281,15 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta _ -> %% done or Version =:= 'HTTP/1.0' State0 end, - %% Remove the stream from the state and reset the overriden options. + %% Stop the stream, shutdown children and reset overriden options. {value, #stream{state=StreamState}, Streams} = lists:keytake(StreamID, #stream.id, Streams1), - State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity}, - %% Stop the stream. - stream_call_terminate(StreamID, Reason, StreamState, State2), + stream_call_terminate(StreamID, Reason, StreamState, State1), Children = cowboy_children:shutdown(Children0, StreamID), - %% We reset the timeout if there are no active streams anymore. - State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout), + State = State1#state{overriden_opts=#{}, streams=Streams, children=Children}, %% We want to drop the connection if the body was not read fully %% and we don't know its length or more remains to be read than %% configuration allows. - %% @todo Only do this if Current =:= StreamID. MaxSkipBodyLength = maps:get(max_skip_body_length, Opts, 1000000), case InState of #ps_body{length=undefined} @@ -1301,17 +1298,21 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta #ps_body{length=Len, received=Received} when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len -> terminate(State, skip_body_too_large); + #ps_body{} when InStreamID =:= OutStreamID -> + stream_next(State#state{flow=infinity}); _ -> - %% Move on to the next stream. - NextOutStreamID = OutStreamID + 1, - case lists:keyfind(NextOutStreamID, #stream.id, Streams) of - false -> - State#state{out_streamid=NextOutStreamID, out_state=wait}; - #stream{queue=Commands} -> - %% @todo Remove queue from the stream. - commands(State#state{out_streamid=NextOutStreamID, out_state=wait}, - NextOutStreamID, Commands) - end + stream_next(State) + end. + +stream_next(State=#state{out_streamid=OutStreamID, streams=Streams}) -> + NextOutStreamID = OutStreamID + 1, + case lists:keyfind(NextOutStreamID, #stream.id, Streams) of + false -> + State#state{out_streamid=NextOutStreamID, out_state=wait}; + #stream{queue=Commands} -> + %% @todo Remove queue from the stream. + commands(State#state{out_streamid=NextOutStreamID, out_state=wait}, + NextOutStreamID, Commands) end. stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) -> diff --git a/test/rfc7230_SUITE.erl b/test/rfc7230_SUITE.erl index 6752c30..5b699e8 100644 --- a/test/rfc7230_SUITE.erl +++ b/test/rfc7230_SUITE.erl @@ -40,6 +40,7 @@ init_routes(_) -> [ {"localhost", [ {"/", hello_h, []}, {"/echo/:key[/:arg]", echo_h, []}, + {"/full/:key[/:arg]", echo_h, []}, {"/length/echo/:key", echo_h, []}, {"/resp/:key[/:arg]", resp_h, []}, {"/send_message", send_message_h, []}, @@ -1553,13 +1554,13 @@ pipeline(Config) -> ConnPid = gun_open(Config), Refs = [{ gun:get(ConnPid, "/"), - gun:delete(ConnPid, "/echo/method") + gun:post(ConnPid, "/full/read_body", [], <<0:800000>>) } || _ <- lists:seq(1, 25)], _ = [begin {response, nofin, 200, _} = gun:await(ConnPid, Ref1), {ok, <<"Hello world!">>} = gun:await_body(ConnPid, Ref1), {response, nofin, 200, _} = gun:await(ConnPid, Ref2), - {ok, <<"DELETE">>} = gun:await_body(ConnPid, Ref2) + {ok, <<0:800000>>} = gun:await_body(ConnPid, Ref2) end || {Ref1, Ref2} <- Refs], ok. -- cgit v1.2.3