diff options
author | Loïc Hoguin <[email protected]> | 2019-10-09 20:54:33 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-10-09 20:54:33 +0200 |
commit | cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633 (patch) | |
tree | c33b7e9398d2c4b4a1c1906e27383185673004df /src/cowboy_http.erl | |
parent | 0c4103984b28c9df1770a0eea0d14ba9cacc49e0 (diff) | |
download | cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.tar.gz cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.tar.bz2 cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.zip |
Implement flow control for HTTP/1.1
We now stop reading from the socket unless asked to,
when we reach the request body. The option
initial_stream_flow_size controls how much data
we read without being asked, as an optimization.
We may also have received additional data along
with the request headers.
This commit also reworks the timeout handling for HTTP/1.1
because the stray timeout message was easily reproducible
after implementing the flow control. The issue should be
gone for good this time.
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r-- | src/cowboy_http.erl | 159 |
1 files changed, 83 insertions, 76 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index baedc1c..8ff8ae2 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -33,6 +33,7 @@ http10_keepalive => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), + initial_stream_flow_size => non_neg_integer(), linger_timeout => timeout(), logger => module(), max_authority_length => non_neg_integer(), @@ -74,15 +75,9 @@ name = undefined :: binary() | undefined }). -%% @todo We need a state where we wait for the stream process to ask for the body -%% and do not attempt to read from the socket while in that state (we should read -%% up to a certain length, and then wait, basically implementing flow control but -%% by not reading from the socket when the window is empty). - -record(ps_body, { length :: non_neg_integer() | undefined, received = 0 :: non_neg_integer(), - %% @todo flow transfer_decode_fun :: fun(), %% @todo better type transfer_decode_state :: any() %% @todo better type }). @@ -136,6 +131,9 @@ %% Parsing state for the current stream or stream-to-be. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{}, + %% Flow requested for the current stream. + flow = infinity :: non_neg_integer() | infinity, + %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. out_streamid = 1 :: pos_integer(), @@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> end, case {Peer0, Sock0, Cert1} of {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - LastStreamID = maps:get(max_keepalive, Opts, 100), - before_loop(set_timeout(#state{ + State = #state{ parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, - last_streamid=LastStreamID})); + last_streamid=maps:get(max_keepalive, Opts, 100)}, + before_loop(set_timeout(State, request_timeout)); {{error, Reason}, _, _} -> terminate(undefined, {socket_error, Reason, 'A socket error occurred when retrieving the peer name.'}); @@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 'A socket error occurred when retrieving the client TLS certificate.'}) end. +%% Do not read from the socket unless flow is large enough. +before_loop(State=#state{flow=Flow}) when Flow =< 0 -> + loop(State); before_loop(State=#state{socket=Socket, transport=Transport}) -> - %% @todo disable this when we get to the body, until the stream asks for it? - %% Perhaps have a threshold for how much we're willing to read before waiting. Transport:setopts(Socket, [{active, once}]), loop(State). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, - last_streamid=LastStreamID, streams=Streams}) -> + last_streamid=LastStreamID}) -> Messages = Transport:messages(), InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000), receive @@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, before_loop(State); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - %% Only reset the timeout if it is idle_timeout (active streams). - State1 = case Streams of - [] -> State; - _ -> set_timeout(State) - end, - parse(<< Buffer/binary, Data/binary >>, State1); + parse(<< Buffer/binary, Data/binary >>, State); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -%% We set request_timeout when there are no active streams, -%% and idle_timeout otherwise. -set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) -> +%% We do not set request_timeout if there are active streams. +set_timeout(State=#state{streams=[_|_]}, request_timeout) -> + State; +%% We do not set request_timeout if we are skipping a body. +set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) -> + State; +%% We do not set idle_timeout if there are no active streams, +%% unless when we are skipping a body. +set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout) + when element(1, InState) =/= ps_body -> + State; +%% Otherwise we can set the timeout. +set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> State = cancel_timeout(State0), - {Name, Default} = case Streams of - [] -> {request_timeout, 5000}; - _ -> {idle_timeout, 60000} + Default = case Name of + request_timeout -> 5000; + idle_timeout -> 60000 end, Timeout = case Override of %% The timeout may have been overriden for the current stream. @@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) State#state{in_state=PS#ps_header{headers=undefined, name=undefined}}, Headers, Name)); parse(Buffer, State=#state{in_state=#ps_body{}}) -> - %% @todo We do not want to get the body automatically if the request doesn't ask for it. - %% We may want to get bodies that are below a threshold without waiting, and buffer them - %% until the request asks, though. after_parse(parse_body(Buffer, State)). after_parse({request, Req=#{streamid := StreamID, method := Method, @@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method, State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> + Flow = maps:get(initial_stream_flow_size, Opts, 65535), TE = maps:get(<<"te">>, Headers, undefined), Streams = [#stream{id=StreamID, state=StreamState, method=Method, version=Version, te=TE}|Streams0], State1 = case maybe_req_close(State0, Headers, Version) of - close -> State0#state{streams=Streams, last_streamid=StreamID}; - keepalive -> State0#state{streams=Streams} + close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow}; + keepalive -> State0#state{streams=Streams, flow=Flow} end, - State = set_timeout(State1), + State = set_timeout(State1, idle_timeout), parse(Buffer, commands(State, StreamID, Commands)) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(init, @@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method, end; %% Streams are sequential so the body is always about the last stream created %% unless that stream has terminated. -after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer, +after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer, streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), - parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) + State1 = set_timeout(State0, case IsFin of + fin -> request_timeout; + nofin -> idle_timeout + end), + State = update_flow(IsFin, Data, State1#state{streams=Streams}), + parse(Buffer, commands(State, StreamID, Commands)) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - stream_reset(State, StreamID, {internal_error, {Class, Exception}, + %% @todo Should call parse after this. + stream_terminate(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. -after_parse({data, _, _, _, State}) -> - before_loop(State); +after_parse({data, _, IsFin, _, State}) -> + before_loop(set_timeout(State, case IsFin of + fin -> request_timeout; + nofin -> idle_timeout + end)); after_parse({more, State}) -> - before_loop(State). + before_loop(set_timeout(State, idle_timeout)). + +update_flow(fin, _, State) -> + State#state{flow=infinity}; +update_flow(nofin, Data, State=#state{flow=Flow0}) -> + State#state{flow=Flow0 - byte_size(Data)}. %% Request-line. @@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% @todo Proper trailers. try TDecode(Buffer, TState0) of more -> - %% @todo Asks for 0 or more bytes. {more, State#state{buffer=Buffer}}; {more, Data, TState} -> - %% @todo Asks for 0 or more bytes. {data, StreamID, nofin, Data, State#state{buffer= <<>>, in_state=PS#ps_body{received=Received + byte_size(Data), transfer_decode_state=TState}}}; {more, Data, _Length, TState} when is_integer(_Length) -> - %% @todo Asks for Length more bytes. {data, StreamID, nofin, Data, State#state{buffer= <<>>, in_state=PS#ps_body{received=Received + byte_size(Data), transfer_decode_state=TState}}}; {more, Data, Rest, TState} -> - %% @todo Asks for 0 or more bytes. {data, StreamID, nofin, Data, State#state{buffer=Rest, in_state=PS#ps_body{received=Received + byte_size(Data), transfer_decode_state=TState}}}; {done, _HasTrailers, Rest} -> - {data, StreamID, fin, <<>>, set_timeout( - State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}; + {data, StreamID, fin, <<>>, + State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}; {done, Data, _HasTrailers, Rest} -> - {data, StreamID, fin, Data, set_timeout( - State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})} + {data, StreamID, fin, Data, + State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}} catch _:_ -> Reason = {connection_error, protocol_error, 'Failure to decode the content. (RFC7230 4)'}, @@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) -> cowboy:log(cowboy_stream:make_error_log(info, [StreamID, Msg, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - stream_reset(State, StreamID, {internal_error, {Class, Exception}, + stream_terminate(State, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:info/3.'}) end; false -> @@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail StreamID, Tail); %% Error handling. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> - commands(stream_reset(State, StreamID, Error), StreamID, Tail); + commands(stream_terminate(State, StreamID, Error), StreamID, Tail); %% Commands for a stream currently inactive. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands) when Current =/= StreamID -> @@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command Stream#stream{queue=Queue ++ Commands}), State#state{streams=Streams}; %% Read the request body. -commands(State, StreamID, [{flow, _Length}|Tail]) -> - %% @todo We only read from socket if buffer is empty, otherwise - %% we decode the buffer. - - %% @todo Set the body reading length to min(Length, BodyLength) - - commands(State, StreamID, Tail); +commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, + [{flow, Size}|Tail]) -> + %% We must read *at least* Size of data otherwise functions + %% like cowboy_req:read_body/1,2 will wait indefinitely. + Flow = if + Flow0 < 0 -> Size; + true -> Flow0 + Size + end, + %% Reenable active mode if necessary. + _ = if + Flow0 =< 0, Flow > 0 -> + Transport:setopts(Socket, [{active, once}]); + true -> + ok + end, + commands(State#state{flow=Flow}, StreamID, Tail); %% Error responses are sent only if a response wasn't sent already. commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID, [{error_response, Status, Headers0, Body}|Tail]) -> @@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> - %% @todo This should be the last stream running otherwise we need to wait before switching. %% @todo If there's streams opened after this one, fail instead of 101. State = cancel_timeout(State0), %% Before we send the 101 response we need to stop receiving data @@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts}, StreamID, [{set_options, SetOpts}|Tail]) -> State1 = case SetOpts of #{idle_timeout := IdleTimeout} -> - set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}); + set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, + idle_timeout); _ -> State0 end, @@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) -> maybe_terminate(State, StreamID, _Tail) -> stream_terminate(State, StreamID, normal). -stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> - %% @todo headers - %% @todo Don't send this if there are no streams left. -% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [ -% {<<"content-length">>, <<"0">>} -% ])), - %% @todo update IsFin local -% stream_terminate(State#state{out_state=done}, StreamID, StreamError). - stream_terminate(State, StreamID, StreamError). - stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState, out_streamid=OutStreamID, out_state=OutState, streams=Streams0, children=Children0}, StreamID, Reason) -> @@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta %% Remove the stream from the state and reset the overriden options. {value, #stream{state=StreamState}, Streams} = lists:keytake(StreamID, #stream.id, Streams1), - State2 = State1#state{streams=Streams, overriden_opts=#{}}, + State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity}, %% Stop the stream. stream_call_terminate(StreamID, Reason, StreamState, State2), Children = cowboy_children:shutdown(Children0, StreamID), %% We reset the timeout if there are no active streams anymore. - State = case Streams of - [] -> set_timeout(State2); - _ -> State2 - end, + State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout), %% We want to drop the connection if the body was not read fully %% and we don't know its length or more remains to be read than %% configuration allows. @@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta case InState of #ps_body{length=undefined} when InStreamID =:= OutStreamID -> - terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length); + terminate(State, skip_body_unknown_length); #ps_body{length=Len, received=Received} when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len -> - terminate(State#state{streams=Streams, children=Children}, skip_body_too_large); + terminate(State, skip_body_too_large); _ -> %% Move on to the next stream. NextOutStreamID = OutStreamID + 1, @@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta false -> %% @todo This is clearly wrong, if the stream is gone we need to check if %% there used to be such a stream, and if there was to send an error. - State#state{out_streamid=NextOutStreamID, out_state=wait, - streams=Streams, children=Children}; + State#state{out_streamid=NextOutStreamID, out_state=wait}; #stream{queue=Commands} -> %% @todo Remove queue from the stream. - commands(State#state{out_streamid=NextOutStreamID, out_state=wait, - streams=Streams, children=Children}, NextOutStreamID, Commands) + commands(State#state{out_streamid=NextOutStreamID, out_state=wait}, + NextOutStreamID, Commands) end end. |