aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-10-09 20:54:33 +0200
committerLoïc Hoguin <[email protected]>2019-10-09 20:54:33 +0200
commitcc54c207e35f3ab7a2dfc105eef39fe7d3bf1633 (patch)
treec33b7e9398d2c4b4a1c1906e27383185673004df
parent0c4103984b28c9df1770a0eea0d14ba9cacc49e0 (diff)
downloadcowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.tar.gz
cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.tar.bz2
cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.zip
Implement flow control for HTTP/1.1
We now stop reading from the socket unless asked to, when we reach the request body. The option initial_stream_flow_size controls how much data we read without being asked, as an optimization. We may also have received additional data along with the request headers. This commit also reworks the timeout handling for HTTP/1.1 because the stray timeout message was easily reproducible after implementing the flow control. The issue should be gone for good this time.
-rw-r--r--doc/src/guide/migrating_from_2.6.asciidoc11
-rw-r--r--doc/src/manual/cowboy_http.asciidoc47
-rw-r--r--src/cowboy_http.erl159
3 files changed, 120 insertions, 97 deletions
diff --git a/doc/src/guide/migrating_from_2.6.asciidoc b/doc/src/guide/migrating_from_2.6.asciidoc
index a735ef9..a582ee4 100644
--- a/doc/src/guide/migrating_from_2.6.asciidoc
+++ b/doc/src/guide/migrating_from_2.6.asciidoc
@@ -63,6 +63,13 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater.
willing to accept. By default it will accept 10
stream resets every 10 seconds.
+* Flow control for incoming data has been implemented
+ for HTTP/1.1. Cowboy will now wait for the user code
+ to ask for the request body before reading it from
+ the socket. The option `initial_stream_flow_size`
+ controls how much data Cowboy will read without
+ being asked.
+
* The HTTP/1.1 and HTTP/2 option `logger` is now
documented.
@@ -154,7 +161,9 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater.
was waiting for more data.
* It was possible for Cowboy to receive stray timeout messages
- for HTTP/1.1 connections. This has been addressed.
+ for HTTP/1.1 connections, resulting in crashes. The timeout
+ handling in HTTP/1.1 has been reworked and the issue should
+ no longer occur.
* The type for the Req object has been updated to accept
custom fields as was already documented.
diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc
index e9837c3..8d89ea2 100644
--- a/doc/src/manual/cowboy_http.asciidoc
+++ b/doc/src/manual/cowboy_http.asciidoc
@@ -17,25 +17,26 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
- chunked => boolean(),
- connection_type => worker | supervisor,
- http10_keepalive => boolean(),
- idle_timeout => timeout(),
- inactivity_timeout => timeout(),
- linger_timeout => timeout(),
- logger => module(),
- max_empty_lines => non_neg_integer(),
- max_header_name_length => non_neg_integer(),
- max_header_value_length => non_neg_integer(),
- max_headers => non_neg_integer(),
- max_keepalive => non_neg_integer(),
- max_method_length => non_neg_integer(),
- max_request_line_length => non_neg_integer(),
- max_skip_body_length => non_neg_integer(),
- proxy_header => boolean(),
- request_timeout => timeout(),
- sendfile => boolean(),
- stream_handlers => [module()]
+ chunked => boolean(),
+ connection_type => worker | supervisor,
+ http10_keepalive => boolean(),
+ idle_timeout => timeout(),
+ inactivity_timeout => timeout(),
+ initial_stream_flow_size => non_neg_integer(),
+ linger_timeout => timeout(),
+ logger => module(),
+ max_empty_lines => non_neg_integer(),
+ max_header_name_length => non_neg_integer(),
+ max_header_value_length => non_neg_integer(),
+ max_headers => non_neg_integer(),
+ max_keepalive => non_neg_integer(),
+ max_method_length => non_neg_integer(),
+ max_request_line_length => non_neg_integer(),
+ max_skip_body_length => non_neg_integer(),
+ proxy_header => boolean(),
+ request_timeout => timeout(),
+ sendfile => boolean(),
+ stream_handlers => [module()]
}
----
@@ -79,6 +80,12 @@ inactivity_timeout (300000)::
Time in ms with nothing received at all before Cowboy closes the connection.
+initial_stream_flow_size (65535)::
+
+Amount of data in bytes Cowboy will read from the socket
+right after a request was fully received. This is a soft
+limit.
+
linger_timeout (1000)::
Time in ms that Cowboy will wait when closing the connection. This is
@@ -144,7 +151,7 @@ Ordered list of stream handlers that will handle all stream events.
== Changelog
-* *2.7*: The `logger` option was added.
+* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
* *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
* *2.5*: The `linger_timeout` option was added.
* *2.2*: The `max_skip_body_length` option was added.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index baedc1c..8ff8ae2 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -33,6 +33,7 @@
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
+ initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
logger => module(),
max_authority_length => non_neg_integer(),
@@ -74,15 +75,9 @@
name = undefined :: binary() | undefined
}).
-%% @todo We need a state where we wait for the stream process to ask for the body
-%% and do not attempt to read from the socket while in that state (we should read
-%% up to a certain length, and then wait, basically implementing flow control but
-%% by not reading from the socket when the window is empty).
-
-record(ps_body, {
length :: non_neg_integer() | undefined,
received = 0 :: non_neg_integer(),
- %% @todo flow
transfer_decode_fun :: fun(), %% @todo better type
transfer_decode_state :: any() %% @todo better type
}).
@@ -136,6 +131,9 @@
%% Parsing state for the current stream or stream-to-be.
in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+ %% Flow requested for the current stream.
+ flow = infinity :: non_neg_integer() | infinity,
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
end,
case {Peer0, Sock0, Cert1} of
{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
- LastStreamID = maps:get(max_keepalive, Opts, 100),
- before_loop(set_timeout(#state{
+ State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
- last_streamid=LastStreamID}));
+ last_streamid=maps:get(max_keepalive, Opts, 100)},
+ before_loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
+%% Do not read from the socket unless flow is large enough.
+before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
+ loop(State);
before_loop(State=#state{socket=Socket, transport=Transport}) ->
- %% @todo disable this when we get to the body, until the stream asks for it?
- %% Perhaps have a threshold for how much we're willing to read before waiting.
Transport:setopts(Socket, [{active, once}]),
loop(State).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
- last_streamid=LastStreamID, streams=Streams}) ->
+ last_streamid=LastStreamID}) ->
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
receive
@@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
before_loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- %% Only reset the timeout if it is idle_timeout (active streams).
- State1 = case Streams of
- [] -> State;
- _ -> set_timeout(State)
- end,
- parse(<< Buffer/binary, Data/binary >>, State1);
+ parse(<< Buffer/binary, Data/binary >>, State);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
-%% We set request_timeout when there are no active streams,
-%% and idle_timeout otherwise.
-set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
+%% We do not set request_timeout if there are active streams.
+set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
+ State;
+%% We do not set request_timeout if we are skipping a body.
+set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
+ State;
+%% We do not set idle_timeout if there are no active streams,
+%% unless when we are skipping a body.
+set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
+ when element(1, InState) =/= ps_body ->
+ State;
+%% Otherwise we can set the timeout.
+set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
- {Name, Default} = case Streams of
- [] -> {request_timeout, 5000};
- _ -> {idle_timeout, 60000}
+ Default = case Name of
+ request_timeout -> 5000;
+ idle_timeout -> 60000
end,
Timeout = case Override of
%% The timeout may have been overriden for the current stream.
@@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}})
State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
- %% @todo We do not want to get the body automatically if the request doesn't ask for it.
- %% We may want to get bodies that are below a threshold without waiting, and buffer them
- %% until the request asks, though.
after_parse(parse_body(Buffer, State)).
after_parse({request, Req=#{streamid := StreamID, method := Method,
@@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
+ Flow = maps:get(initial_stream_flow_size, Opts, 65535),
TE = maps:get(<<"te">>, Headers, undefined),
Streams = [#stream{id=StreamID, state=StreamState,
method=Method, version=Version, te=TE}|Streams0],
State1 = case maybe_req_close(State0, Headers, Version) of
- close -> State0#state{streams=Streams, last_streamid=StreamID};
- keepalive -> State0#state{streams=Streams}
+ close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
+ keepalive -> State0#state{streams=Streams, flow=Flow}
end,
- State = set_timeout(State1),
+ State = set_timeout(State1, idle_timeout),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
end;
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
-after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer,
+after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
- parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ State1 = set_timeout(State0, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end),
+ State = update_flow(IsFin, Data, State1#state{streams=Streams}),
+ parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ %% @todo Should call parse after this.
+ stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, _, _, State}) ->
- before_loop(State);
+after_parse({data, _, IsFin, _, State}) ->
+ before_loop(set_timeout(State, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end));
after_parse({more, State}) ->
- before_loop(State).
+ before_loop(set_timeout(State, idle_timeout)).
+
+update_flow(fin, _, State) ->
+ State#state{flow=infinity};
+update_flow(nofin, Data, State=#state{flow=Flow0}) ->
+ State#state{flow=Flow0 - byte_size(Data)}.
%% Request-line.
@@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
%% @todo Proper trailers.
try TDecode(Buffer, TState0) of
more ->
- %% @todo Asks for 0 or more bytes.
{more, State#state{buffer=Buffer}};
{more, Data, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, _Length, TState} when is_integer(_Length) ->
- %% @todo Asks for Length more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, Rest, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer=Rest,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{done, _HasTrailers, Rest} ->
- {data, StreamID, fin, <<>>, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})};
+ {data, StreamID, fin, <<>>,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
{done, Data, _HasTrailers, Rest} ->
- {data, StreamID, fin, Data, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}
+ {data, StreamID, fin, Data,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
catch _:_ ->
Reason = {connection_error, protocol_error,
'Failure to decode the content. (RFC7230 4)'},
@@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
cowboy:log(cowboy_stream:make_error_log(info,
[StreamID, Msg, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ stream_terminate(State, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:info/3.'})
end;
false ->
@@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail
StreamID, Tail);
%% Error handling.
commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
- commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+ commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
%% Commands for a stream currently inactive.
commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
when Current =/= StreamID ->
@@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
-commands(State, StreamID, [{flow, _Length}|Tail]) ->
- %% @todo We only read from socket if buffer is empty, otherwise
- %% we decode the buffer.
-
- %% @todo Set the body reading length to min(Length, BodyLength)
-
- commands(State, StreamID, Tail);
+commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
+ [{flow, Size}|Tail]) ->
+ %% We must read *at least* Size of data otherwise functions
+ %% like cowboy_req:read_body/1,2 will wait indefinitely.
+ Flow = if
+ Flow0 < 0 -> Size;
+ true -> Flow0 + Size
+ end,
+ %% Reenable active mode if necessary.
+ _ = if
+ Flow0 =< 0, Flow > 0 ->
+ Transport:setopts(Socket, [{active, once}]);
+ true ->
+ ok
+ end,
+ commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
[{error_response, Status, Headers0, Body}|Tail]) ->
@@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
- %% @todo This should be the last stream running otherwise we need to wait before switching.
%% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
@@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
State1 = case SetOpts of
#{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
+ set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+ idle_timeout);
_ ->
State0
end,
@@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
-stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
- %% @todo headers
- %% @todo Don't send this if there are no streams left.
-% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
-% {<<"content-length">>, <<"0">>}
-% ])),
- %% @todo update IsFin local
-% stream_terminate(State#state{out_state=done}, StreamID, StreamError).
- stream_terminate(State, StreamID, StreamError).
-
stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
children=Children0}, StreamID, Reason) ->
@@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
%% Remove the stream from the state and reset the overriden options.
{value, #stream{state=StreamState}, Streams}
= lists:keytake(StreamID, #stream.id, Streams1),
- State2 = State1#state{streams=Streams, overriden_opts=#{}},
+ State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
%% Stop the stream.
stream_call_terminate(StreamID, Reason, StreamState, State2),
Children = cowboy_children:shutdown(Children0, StreamID),
%% We reset the timeout if there are no active streams anymore.
- State = case Streams of
- [] -> set_timeout(State2);
- _ -> State2
- end,
+ State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
%% We want to drop the connection if the body was not read fully
%% and we don't know its length or more remains to be read than
%% configuration allows.
@@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
case InState of
#ps_body{length=undefined}
when InStreamID =:= OutStreamID ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
+ terminate(State, skip_body_unknown_length);
#ps_body{length=Len, received=Received}
when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
+ terminate(State, skip_body_too_large);
_ ->
%% Move on to the next stream.
NextOutStreamID = OutStreamID + 1,
@@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
false ->
%% @todo This is clearly wrong, if the stream is gone we need to check if
%% there used to be such a stream, and if there was to send an error.
- State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children};
+ State#state{out_streamid=NextOutStreamID, out_state=wait};
#stream{queue=Commands} ->
%% @todo Remove queue from the stream.
- commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children}, NextOutStreamID, Commands)
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
+ NextOutStreamID, Commands)
end
end.