aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-10-09 20:54:33 +0200
committerLoïc Hoguin <[email protected]>2019-10-09 20:54:33 +0200
commitcc54c207e35f3ab7a2dfc105eef39fe7d3bf1633 (patch)
treec33b7e9398d2c4b4a1c1906e27383185673004df /src/cowboy_http.erl
parent0c4103984b28c9df1770a0eea0d14ba9cacc49e0 (diff)
downloadcowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.tar.gz
cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.tar.bz2
cowboy-cc54c207e35f3ab7a2dfc105eef39fe7d3bf1633.zip
Implement flow control for HTTP/1.1
We now stop reading from the socket unless asked to, when we reach the request body. The option initial_stream_flow_size controls how much data we read without being asked, as an optimization. We may also have received additional data along with the request headers. This commit also reworks the timeout handling for HTTP/1.1 because the stray timeout message was easily reproducible after implementing the flow control. The issue should be gone for good this time.
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r--src/cowboy_http.erl159
1 files changed, 83 insertions, 76 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index baedc1c..8ff8ae2 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -33,6 +33,7 @@
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
+ initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
logger => module(),
max_authority_length => non_neg_integer(),
@@ -74,15 +75,9 @@
name = undefined :: binary() | undefined
}).
-%% @todo We need a state where we wait for the stream process to ask for the body
-%% and do not attempt to read from the socket while in that state (we should read
-%% up to a certain length, and then wait, basically implementing flow control but
-%% by not reading from the socket when the window is empty).
-
-record(ps_body, {
length :: non_neg_integer() | undefined,
received = 0 :: non_neg_integer(),
- %% @todo flow
transfer_decode_fun :: fun(), %% @todo better type
transfer_decode_state :: any() %% @todo better type
}).
@@ -136,6 +131,9 @@
%% Parsing state for the current stream or stream-to-be.
in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+ %% Flow requested for the current stream.
+ flow = infinity :: non_neg_integer() | infinity,
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
end,
case {Peer0, Sock0, Cert1} of
{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
- LastStreamID = maps:get(max_keepalive, Opts, 100),
- before_loop(set_timeout(#state{
+ State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
- last_streamid=LastStreamID}));
+ last_streamid=maps:get(max_keepalive, Opts, 100)},
+ before_loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
+%% Do not read from the socket unless flow is large enough.
+before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
+ loop(State);
before_loop(State=#state{socket=Socket, transport=Transport}) ->
- %% @todo disable this when we get to the body, until the stream asks for it?
- %% Perhaps have a threshold for how much we're willing to read before waiting.
Transport:setopts(Socket, [{active, once}]),
loop(State).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
- last_streamid=LastStreamID, streams=Streams}) ->
+ last_streamid=LastStreamID}) ->
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
receive
@@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
before_loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- %% Only reset the timeout if it is idle_timeout (active streams).
- State1 = case Streams of
- [] -> State;
- _ -> set_timeout(State)
- end,
- parse(<< Buffer/binary, Data/binary >>, State1);
+ parse(<< Buffer/binary, Data/binary >>, State);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
-%% We set request_timeout when there are no active streams,
-%% and idle_timeout otherwise.
-set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
+%% We do not set request_timeout if there are active streams.
+set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
+ State;
+%% We do not set request_timeout if we are skipping a body.
+set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
+ State;
+%% We do not set idle_timeout if there are no active streams,
+%% unless when we are skipping a body.
+set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
+ when element(1, InState) =/= ps_body ->
+ State;
+%% Otherwise we can set the timeout.
+set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
- {Name, Default} = case Streams of
- [] -> {request_timeout, 5000};
- _ -> {idle_timeout, 60000}
+ Default = case Name of
+ request_timeout -> 5000;
+ idle_timeout -> 60000
end,
Timeout = case Override of
%% The timeout may have been overriden for the current stream.
@@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}})
State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
- %% @todo We do not want to get the body automatically if the request doesn't ask for it.
- %% We may want to get bodies that are below a threshold without waiting, and buffer them
- %% until the request asks, though.
after_parse(parse_body(Buffer, State)).
after_parse({request, Req=#{streamid := StreamID, method := Method,
@@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
+ Flow = maps:get(initial_stream_flow_size, Opts, 65535),
TE = maps:get(<<"te">>, Headers, undefined),
Streams = [#stream{id=StreamID, state=StreamState,
method=Method, version=Version, te=TE}|Streams0],
State1 = case maybe_req_close(State0, Headers, Version) of
- close -> State0#state{streams=Streams, last_streamid=StreamID};
- keepalive -> State0#state{streams=Streams}
+ close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
+ keepalive -> State0#state{streams=Streams, flow=Flow}
end,
- State = set_timeout(State1),
+ State = set_timeout(State1, idle_timeout),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(init,
@@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
end;
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
-after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer,
+after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
- parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ State1 = set_timeout(State0, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end),
+ State = update_flow(IsFin, Data, State1#state{streams=Streams}),
+ parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ %% @todo Should call parse after this.
+ stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, _, _, State}) ->
- before_loop(State);
+after_parse({data, _, IsFin, _, State}) ->
+ before_loop(set_timeout(State, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end));
after_parse({more, State}) ->
- before_loop(State).
+ before_loop(set_timeout(State, idle_timeout)).
+
+update_flow(fin, _, State) ->
+ State#state{flow=infinity};
+update_flow(nofin, Data, State=#state{flow=Flow0}) ->
+ State#state{flow=Flow0 - byte_size(Data)}.
%% Request-line.
@@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
%% @todo Proper trailers.
try TDecode(Buffer, TState0) of
more ->
- %% @todo Asks for 0 or more bytes.
{more, State#state{buffer=Buffer}};
{more, Data, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, _Length, TState} when is_integer(_Length) ->
- %% @todo Asks for Length more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{more, Data, Rest, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer=Rest,
in_state=PS#ps_body{received=Received + byte_size(Data),
transfer_decode_state=TState}}};
{done, _HasTrailers, Rest} ->
- {data, StreamID, fin, <<>>, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})};
+ {data, StreamID, fin, <<>>,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
{done, Data, _HasTrailers, Rest} ->
- {data, StreamID, fin, Data, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}
+ {data, StreamID, fin, Data,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
catch _:_ ->
Reason = {connection_error, protocol_error,
'Failure to decode the content. (RFC7230 4)'},
@@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
cowboy:log(cowboy_stream:make_error_log(info,
[StreamID, Msg, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ stream_terminate(State, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:info/3.'})
end;
false ->
@@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail
StreamID, Tail);
%% Error handling.
commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
- commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+ commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
%% Commands for a stream currently inactive.
commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
when Current =/= StreamID ->
@@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
-commands(State, StreamID, [{flow, _Length}|Tail]) ->
- %% @todo We only read from socket if buffer is empty, otherwise
- %% we decode the buffer.
-
- %% @todo Set the body reading length to min(Length, BodyLength)
-
- commands(State, StreamID, Tail);
+commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
+ [{flow, Size}|Tail]) ->
+ %% We must read *at least* Size of data otherwise functions
+ %% like cowboy_req:read_body/1,2 will wait indefinitely.
+ Flow = if
+ Flow0 < 0 -> Size;
+ true -> Flow0 + Size
+ end,
+ %% Reenable active mode if necessary.
+ _ = if
+ Flow0 =< 0, Flow > 0 ->
+ Transport:setopts(Socket, [{active, once}]);
+ true ->
+ ok
+ end,
+ commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
[{error_response, Status, Headers0, Body}|Tail]) ->
@@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
- %% @todo This should be the last stream running otherwise we need to wait before switching.
%% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
@@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
State1 = case SetOpts of
#{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
+ set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+ idle_timeout);
_ ->
State0
end,
@@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
-stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
- %% @todo headers
- %% @todo Don't send this if there are no streams left.
-% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
-% {<<"content-length">>, <<"0">>}
-% ])),
- %% @todo update IsFin local
-% stream_terminate(State#state{out_state=done}, StreamID, StreamError).
- stream_terminate(State, StreamID, StreamError).
-
stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
children=Children0}, StreamID, Reason) ->
@@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
%% Remove the stream from the state and reset the overriden options.
{value, #stream{state=StreamState}, Streams}
= lists:keytake(StreamID, #stream.id, Streams1),
- State2 = State1#state{streams=Streams, overriden_opts=#{}},
+ State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
%% Stop the stream.
stream_call_terminate(StreamID, Reason, StreamState, State2),
Children = cowboy_children:shutdown(Children0, StreamID),
%% We reset the timeout if there are no active streams anymore.
- State = case Streams of
- [] -> set_timeout(State2);
- _ -> State2
- end,
+ State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
%% We want to drop the connection if the body was not read fully
%% and we don't know its length or more remains to be read than
%% configuration allows.
@@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
case InState of
#ps_body{length=undefined}
when InStreamID =:= OutStreamID ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
+ terminate(State, skip_body_unknown_length);
#ps_body{length=Len, received=Received}
when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
+ terminate(State, skip_body_too_large);
_ ->
%% Move on to the next stream.
NextOutStreamID = OutStreamID + 1,
@@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
false ->
%% @todo This is clearly wrong, if the stream is gone we need to check if
%% there used to be such a stream, and if there was to send an error.
- State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children};
+ State#state{out_streamid=NextOutStreamID, out_state=wait};
#stream{queue=Commands} ->
%% @todo Remove queue from the stream.
- commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children}, NextOutStreamID, Commands)
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
+ NextOutStreamID, Commands)
end
end.