aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/guide/migrating_from_2.6.asciidoc11
-rw-r--r--doc/src/manual/cowboy_http.asciidoc47
-rw-r--r--src/cowboy_http.erl159
3 files changed, 120 insertions, 97 deletions
diff --git a/doc/src/guide/migrating_from_2.6.asciidoc b/doc/src/guide/migrating_from_2.6.asciidoc
index a735ef9..a582ee4 100644
--- a/doc/src/guide/migrating_from_2.6.asciidoc
+++ b/doc/src/guide/migrating_from_2.6.asciidoc
@@ -63,6 +63,13 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater.
willing to accept. By default it will accept 10
stream resets every 10 seconds.
+* Flow control for incoming data has been implemented
+ for HTTP/1.1. Cowboy will now wait for the user code
+ to ask for the request body before reading it from
+ the socket. The option `initial_stream_flow_size`
+ controls how much data Cowboy will read without
+ being asked.
+
* The HTTP/1.1 and HTTP/2 option `logger` is now
documented.
@@ -154,7 +161,9 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater.
was waiting for more data.
* It was possible for Cowboy to receive stray timeout messages
- for HTTP/1.1 connections. This has been addressed.
+ for HTTP/1.1 connections, resulting in crashes. The timeout
+ handling in HTTP/1.1 has been reworked and the issue should
+ no longer occur.
* The type for the Req object has been updated to accept
custom fields as was already documented.
diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc
index e9837c3..8d89ea2 100644
--- a/doc/src/manual/cowboy_http.asciidoc
+++ b/doc/src/manual/cowboy_http.asciidoc
@@ -17,25 +17,26 @@ as a Ranch protocol.
[source,erlang]
----
opts() :: #{
- chunked => boolean(),
- connection_type => worker | supervisor,
- http10_keepalive => boolean(),
- idle_timeout => timeout(),
- inactivity_timeout => timeout(),
- linger_timeout => timeout(),
- logger => module(),
- max_empty_lines => non_neg_integer(),
- max_header_name_length => non_neg_integer(),
- max_header_value_length => non_neg_integer(),
- max_headers => non_neg_integer(),
- max_keepalive => non_neg_integer(),
- max_method_length => non_neg_integer(),
- max_request_line_length => non_neg_integer(),
- max_skip_body_length => non_neg_integer(),
- proxy_header => boolean(),
- request_timeout => timeout(),
- sendfile => boolean(),
- stream_handlers => [module()]
+ chunked => boolean(),
+ connection_type => worker | supervisor,
+ http10_keepalive => boolean(),
+ idle_timeout => timeout(),
+ inactivity_timeout => timeout(),
+ initial_stream_flow_size => non_neg_integer(),
+ linger_timeout => timeout(),
+ logger => module(),
+ max_empty_lines => non_neg_integer(),
+ max_header_name_length => non_neg_integer(),
+ max_header_value_length => non_neg_integer(),
+ max_headers => non_neg_integer(),
+ max_keepalive => non_neg_integer(),
+ max_method_length => non_neg_integer(),
+ max_request_line_length => non_neg_integer(),
+ max_skip_body_length => non_neg_integer(),
+ proxy_header => boolean(),
+ request_timeout => timeout(),
+ sendfile => boolean(),
+ stream_handlers => [module()]
}
----
@@ -79,6 +80,12 @@ inactivity_timeout (300000)::
Time in ms with nothing received at all before Cowboy closes the connection.
+initial_stream_flow_size (65535)::
+
+Amount of data in bytes Cowboy will read from the socket
+right after a request was fully received. This is a soft
+limit.
+
linger_timeout (1000)::
Time in ms that Cowboy will wait when closing the connection. This is
@@ -144,7 +151,7 @@ Ordered list of stream handlers that will handle all stream events.
== Changelog
-* *2.7*: The `logger` option was added.
+* *2.7*: The `initial_stream_flow_size` and `logger` options were added.
* *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added.
* *2.5*: The `linger_timeout` option was added.
* *2.2*: The `max_skip_body_length` option was added.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index baedc1c..8ff8ae2 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -33,6 +33,7 @@
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
+ initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
logger => module(),
max_authority_length => non_neg_integer(),
@@ -74,15 +75,9 @@
name = undefined :: binary() | undefined
}).
-%% @todo We need a state where we wait for the stream process to ask for the body
-%% and do not attempt to read from the socket while in that state (we should read
-%% up to a certain length, and then wait, basically implementing flow control but
-%% by not reading from the socket when the window is empty).
-
-record(ps_body, {
length :: non_neg_integer() | undefined,
received = 0 :: non_neg_integer(),
- %% @todo flow
transfer_decode_fun :: fun(), %% @todo better type
transfer_decode_state :: any() %% @todo better type
}).
@@ -136,6 +131,9 @@
%% Parsing state for the current stream or stream-to-be.
in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+ %% Flow requested for the current stream.
+ flow = infinity :: non_neg_integer() | infinity,
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
end,
case {Peer0, Sock0, Cert1} of
{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
- LastStreamID = maps:get(max_keepalive, Opts, 100),
- before_loop(set_timeout(#state{
+ State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
- last_streamid=LastStreamID}));
+ last_streamid=maps:get(max_keepalive, Opts, 100)},
+ before_loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
+%% Do not read from the socket unless flow is large enough.
+before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
+ loop(State);
before_loop(State=#state{socket=Socket, transport=Transport}) ->
- %% @todo disable this when we get to the body, until the stream asks for it?
- %% Perhaps have a threshold for how much we're willing to read before waiting.
Transport:setopts(Socket, [{active, once}]),
loop(State).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
- last_streamid=LastStreamID, streams=Streams}) ->
+ last_streamid=LastStreamID}) ->
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
receive
@@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
before_loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- %% Only reset the timeout if it is idle_timeout (active streams).
- State1 = case Streams of
- [] -> State;
- _ -> set_timeout(State)
- end,
- parse(<< Buffer/binary, Data/binary >>, State1);
+ parse(<< Buffer/binary, Data/binary >>, State);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
-%% We set request_timeout when there are no active streams,
-%% and idle_timeout otherwise.
-set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
+%% We do not set request_timeout if there are active streams.
+set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
+ State;
+%% We do not set request_timeout if we are skipping a body.
+set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
+ State;
+%% We do not set idle_timeout if there are no active streams,
+%% unless when we are skipping a body.
+set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
+ when element(1, InState) =/= ps_body ->
+ State;
+%% Otherwise we can set the timeout.
+set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
- {Name, Default} = case Streams of
- [] -> {request_timeout, 5000};
- _ -> {idle_timeout, 60000}
+ Default = case Name of
+ request_timeout -> 5000;
+ idle_timeout -> 60000
end,
Timeout = case Override of
%% The timeout may have been overriden for the current stream.
@@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}})
State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
- %% @todo We do not want to get the body automatically if the request doesn't ask for it.
- %% We may want to get bodies that are below a threshold without waiting, and buffer them
- %% until the request asks, though.
after_parse(parse_body(Buffer, State)).
after_parse({request, Req=#{streamid := StreamID, method := Method,
@@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
+ Flow = maps:get(initial_stream_flow_size, Opts, 65535),
TE = maps:get(<<"te">>, Headers, undefined),
Streams = [#stream{id=StreamID, state=StreamState,
method=Method, version=Version, te=TE}|Streams0],
State1 = case maybe_req_close(State0, Headers, Version) of
- close -> State0#state{streams=Streams, last_streamid=StreamID};
- keepalive -> State0#state{streams=Streams}
+ close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
+ keepalive -> State0#state{streams=Streams, flow=Flow}
end,
- State = set_timeout(State1),
+ State = set_timeout(State1, idle_timeout),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
end;
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
-after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer,
+after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
- parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ State1 = set_timeout(State0, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end),
+ State = update_flow(IsFin, Data, State1#state{streams=Streams}),
+ parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ %% @todo Should call parse after this.
+ stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, _, _, State}) ->
- before_loop(State);
+after_parse({data, _, IsFin, _, State}) ->
+ before_loop(set_timeout(State, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end));
after_parse({more, State}) ->
- before_loop(State).
+ before_loop(set_timeout(State, idle_timeout)).
+
+update_flow(fin, _, State) ->
+ State#state{flow=infinity};
+update_flow(nofin, Data, State=#state{flow=Flow0}) ->
+ State#state{flow=Flow0 - byte_size(Data)}.
%% Request-line.
@@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
%% @todo Proper trailers.
try TDecode(Buffer, TState0) of
more ->
- %% @todo Asks for 0 or more bytes.
{more, State#state{buffer=Buffer}};
{more, Data, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, _Length, TState} when is_integer(_Length) ->
- %% @todo Asks for Length more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, Rest, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer=Rest,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{done, _HasTrailers, Rest} ->
- {data, StreamID, fin, <<>>, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})};
+ {data, StreamID, fin, <<>>,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
{done, Data, _HasTrailers, Rest} ->
- {data, StreamID, fin, Data, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}
+ {data, StreamID, fin, Data,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
catch _:_ ->
Reason = {connection_error, protocol_error,
'Failure to decode the content. (RFC7230 4)'},
@@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
cowboy:log(cowboy_stream:make_error_log(info,
[StreamID, Msg, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ stream_terminate(State, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:info/3.'})
end;
false ->
@@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail
StreamID, Tail);
%% Error handling.
commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
- commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+ commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
%% Commands for a stream currently inactive.
commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
when Current =/= StreamID ->
@@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
-commands(State, StreamID, [{flow, _Length}|Tail]) ->
- %% @todo We only read from socket if buffer is empty, otherwise
- %% we decode the buffer.
-
- %% @todo Set the body reading length to min(Length, BodyLength)
-
- commands(State, StreamID, Tail);
+commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
+ [{flow, Size}|Tail]) ->
+ %% We must read *at least* Size of data otherwise functions
+ %% like cowboy_req:read_body/1,2 will wait indefinitely.
+ Flow = if
+ Flow0 < 0 -> Size;
+ true -> Flow0 + Size
+ end,
+ %% Reenable active mode if necessary.
+ _ = if
+ Flow0 =< 0, Flow > 0 ->
+ Transport:setopts(Socket, [{active, once}]);
+ true ->
+ ok
+ end,
+ commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
[{error_response, Status, Headers0, Body}|Tail]) ->
@@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
- %% @todo This should be the last stream running otherwise we need to wait before switching.
%% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
@@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
State1 = case SetOpts of
#{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
+ set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+ idle_timeout);
_ ->
State0
end,
@@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
-stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
- %% @todo headers
- %% @todo Don't send this if there are no streams left.
-% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
-% {<<"content-length">>, <<"0">>}
-% ])),
- %% @todo update IsFin local
-% stream_terminate(State#state{out_state=done}, StreamID, StreamError).
- stream_terminate(State, StreamID, StreamError).
-
stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
children=Children0}, StreamID, Reason) ->
@@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
%% Remove the stream from the state and reset the overriden options.
{value, #stream{state=StreamState}, Streams}
= lists:keytake(StreamID, #stream.id, Streams1),
- State2 = State1#state{streams=Streams, overriden_opts=#{}},
+ State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
%% Stop the stream.
stream_call_terminate(StreamID, Reason, StreamState, State2),
Children = cowboy_children:shutdown(Children0, StreamID),
%% We reset the timeout if there are no active streams anymore.
- State = case Streams of
- [] -> set_timeout(State2);
- _ -> State2
- end,
+ State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
%% We want to drop the connection if the body was not read fully
%% and we don't know its length or more remains to be read than
%% configuration allows.
@@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
case InState of
#ps_body{length=undefined}
when InStreamID =:= OutStreamID ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
+ terminate(State, skip_body_unknown_length);
#ps_body{length=Len, received=Received}
when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
+ terminate(State, skip_body_too_large);
_ ->
%% Move on to the next stream.
NextOutStreamID = OutStreamID + 1,
@@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
false ->
%% @todo This is clearly wrong, if the stream is gone we need to check if
%% there used to be such a stream, and if there was to send an error.
- State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children};
+ State#state{out_streamid=NextOutStreamID, out_state=wait};
#stream{queue=Commands} ->
%% @todo Remove queue from the stream.
- commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children}, NextOutStreamID, Commands)
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
+ NextOutStreamID, Commands)
end
end.