path: root/src/cowboy_http.erl
diff options
Diffstat (limited to 'src/cowboy_http.erl')
1 files changed, 83 insertions, 76 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index baedc1c..8ff8ae2 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -33,6 +33,7 @@
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
+ initial_stream_flow_size => non_neg_integer(),
linger_timeout => timeout(),
logger => module(),
max_authority_length => non_neg_integer(),
@@ -74,15 +75,9 @@
name = undefined :: binary() | undefined
-%% @todo We need a state where we wait for the stream process to ask for the body
-%% and do not attempt to read from the socket while in that state (we should read
-%% up to a certain length, and then wait, basically implementing flow control but
-%% by not reading from the socket when the window is empty).
-record(ps_body, {
length :: non_neg_integer() | undefined,
received = 0 :: non_neg_integer(),
- %% @todo flow
transfer_decode_fun :: fun(), %% @todo better type
transfer_decode_state :: any() %% @todo better type
@@ -136,6 +131,9 @@
%% Parsing state for the current stream or stream-to-be.
in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+ %% Flow requested for the current stream.
+ flow = infinity :: non_neg_integer() | infinity,
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
case {Peer0, Sock0, Cert1} of
{{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
- LastStreamID = maps:get(max_keepalive, Opts, 100),
- before_loop(set_timeout(#state{
+ State = #state{
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
- last_streamid=LastStreamID}));
+ last_streamid=maps:get(max_keepalive, Opts, 100)},
+ before_loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
+%% Do not read from the socket unless flow is large enough.
+before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
+ loop(State);
before_loop(State=#state{socket=Socket, transport=Transport}) ->
- %% @todo disable this when we get to the body, until the stream asks for it?
- %% Perhaps have a threshold for how much we're willing to read before waiting.
Transport:setopts(Socket, [{active, once}]),
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
- last_streamid=LastStreamID, streams=Streams}) ->
+ last_streamid=LastStreamID}) ->
Messages = Transport:messages(),
InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000),
@@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- %% Only reset the timeout if it is idle_timeout (active streams).
- State1 = case Streams of
- [] -> State;
- _ -> set_timeout(State)
- end,
- parse(<< Buffer/binary, Data/binary >>, State1);
+ parse(<< Buffer/binary, Data/binary >>, State);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
-%% We set request_timeout when there are no active streams,
-%% and idle_timeout otherwise.
-set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) ->
+%% We do not set request_timeout if there are active streams.
+set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
+ State;
+%% We do not set request_timeout if we are skipping a body.
+set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
+ State;
+%% We do not set idle_timeout if there are no active streams,
+%% unless when we are skipping a body.
+set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
+ when element(1, InState) =/= ps_body ->
+ State;
+%% Otherwise we can set the timeout.
+set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
- {Name, Default} = case Streams of
- [] -> {request_timeout, 5000};
- _ -> {idle_timeout, 60000}
+ Default = case Name of
+ request_timeout -> 5000;
+ idle_timeout -> 60000
Timeout = case Override of
%% The timeout may have been overriden for the current stream.
@@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}})
State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
Headers, Name));
parse(Buffer, State=#state{in_state=#ps_body{}}) ->
- %% @todo We do not want to get the body automatically if the request doesn't ask for it.
- %% We may want to get bodies that are below a threshold without waiting, and buffer them
- %% until the request asks, though.
after_parse(parse_body(Buffer, State)).
after_parse({request, Req=#{streamid := StreamID, method := Method,
@@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
+ Flow = maps:get(initial_stream_flow_size, Opts, 65535),
TE = maps:get(<<"te">>, Headers, undefined),
Streams = [#stream{id=StreamID, state=StreamState,
method=Method, version=Version, te=TE}|Streams0],
State1 = case maybe_req_close(State0, Headers, Version) of
- close -> State0#state{streams=Streams, last_streamid=StreamID};
- keepalive -> State0#state{streams=Streams}
+ close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
+ keepalive -> State0#state{streams=Streams, flow=Flow}
- State = set_timeout(State1),
+ State = set_timeout(State1, idle_timeout),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
@@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
-after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer,
+after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer,
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
- parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ State1 = set_timeout(State0, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end),
+ State = update_flow(IsFin, Data, State1#state{streams=Streams}),
+ parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception ->
[StreamID, IsFin, Data, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ %% @todo Should call parse after this.
+ stream_terminate(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, _, _, State}) ->
- before_loop(State);
+after_parse({data, _, IsFin, _, State}) ->
+ before_loop(set_timeout(State, case IsFin of
+ fin -> request_timeout;
+ nofin -> idle_timeout
+ end));
after_parse({more, State}) ->
- before_loop(State).
+ before_loop(set_timeout(State, idle_timeout)).
+update_flow(fin, _, State) ->
+ State#state{flow=infinity};
+update_flow(nofin, Data, State=#state{flow=Flow0}) ->
+ State#state{flow=Flow0 - byte_size(Data)}.
%% Request-line.
@@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
%% @todo Proper trailers.
try TDecode(Buffer, TState0) of
more ->
- %% @todo Asks for 0 or more bytes.
{more, State#state{buffer=Buffer}};
{more, Data, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
{more, Data, _Length, TState} when is_integer(_Length) ->
- %% @todo Asks for Length more bytes.
{data, StreamID, nofin, Data, State#state{buffer= <<>>,
in_state=PS#ps_body{received=Received + byte_size(Data),
{more, Data, Rest, TState} ->
- %% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{buffer=Rest,
in_state=PS#ps_body{received=Received + byte_size(Data),
{done, _HasTrailers, Rest} ->
- {data, StreamID, fin, <<>>, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})};
+ {data, StreamID, fin, <<>>,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}};
{done, Data, _HasTrailers, Rest} ->
- {data, StreamID, fin, Data, set_timeout(
- State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}
+ {data, StreamID, fin, Data,
+ State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}
catch _:_ ->
Reason = {connection_error, protocol_error,
'Failure to decode the content. (RFC7230 4)'},
@@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
[StreamID, Msg, StreamState0],
Class, Exception, erlang:get_stacktrace()), Opts),
- stream_reset(State, StreamID, {internal_error, {Class, Exception},
+ stream_terminate(State, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:info/3.'})
false ->
@@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail
StreamID, Tail);
%% Error handling.
commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
- commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+ commands(stream_terminate(State, StreamID, Error), StreamID, Tail);
%% Commands for a stream currently inactive.
commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
when Current =/= StreamID ->
@@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
%% Read the request body.
-commands(State, StreamID, [{flow, _Length}|Tail]) ->
- %% @todo We only read from socket if buffer is empty, otherwise
- %% we decode the buffer.
- %% @todo Set the body reading length to min(Length, BodyLength)
- commands(State, StreamID, Tail);
+commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
+ [{flow, Size}|Tail]) ->
+ %% We must read *at least* Size of data otherwise functions
+ %% like cowboy_req:read_body/1,2 will wait indefinitely.
+ Flow = if
+ Flow0 < 0 -> Size;
+ true -> Flow0 + Size
+ end,
+ %% Reenable active mode if necessary.
+ _ = if
+ Flow0 =< 0, Flow > 0 ->
+ Transport:setopts(Socket, [{active, once}]);
+ true ->
+ ok
+ end,
+ commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
[{error_response, Status, Headers0, Body}|Tail]) ->
@@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
- %% @todo This should be the last stream running otherwise we need to wait before switching.
%% @todo If there's streams opened after this one, fail instead of 101.
State = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
@@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts},
StreamID, [{set_options, SetOpts}|Tail]) ->
State1 = case SetOpts of
#{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}});
+ set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+ idle_timeout);
_ ->
@@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
-stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
- %% @todo headers
- %% @todo Don't send this if there are no streams left.
-% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
-% {<<"content-length">>, <<"0">>}
-% ])),
- %% @todo update IsFin local
-% stream_terminate(State#state{out_state=done}, StreamID, StreamError).
- stream_terminate(State, StreamID, StreamError).
stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState,
out_streamid=OutStreamID, out_state=OutState, streams=Streams0,
children=Children0}, StreamID, Reason) ->
@@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
%% Remove the stream from the state and reset the overriden options.
{value, #stream{state=StreamState}, Streams}
= lists:keytake(StreamID, #stream.id, Streams1),
- State2 = State1#state{streams=Streams, overriden_opts=#{}},
+ State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity},
%% Stop the stream.
stream_call_terminate(StreamID, Reason, StreamState, State2),
Children = cowboy_children:shutdown(Children0, StreamID),
%% We reset the timeout if there are no active streams anymore.
- State = case Streams of
- [] -> set_timeout(State2);
- _ -> State2
- end,
+ State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout),
%% We want to drop the connection if the body was not read fully
%% and we don't know its length or more remains to be read than
%% configuration allows.
@@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
case InState of
when InStreamID =:= OutStreamID ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length);
+ terminate(State, skip_body_unknown_length);
#ps_body{length=Len, received=Received}
when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len ->
- terminate(State#state{streams=Streams, children=Children}, skip_body_too_large);
+ terminate(State, skip_body_too_large);
_ ->
%% Move on to the next stream.
NextOutStreamID = OutStreamID + 1,
@@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
false ->
%% @todo This is clearly wrong, if the stream is gone we need to check if
%% there used to be such a stream, and if there was to send an error.
- State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children};
+ State#state{out_streamid=NextOutStreamID, out_state=wait};
#stream{queue=Commands} ->
%% @todo Remove queue from the stream.
- commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
- streams=Streams, children=Children}, NextOutStreamID, Commands)
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait},
+ NextOutStreamID, Commands)