diff options
-rw-r--r-- | doc/src/guide/migrating_from_2.6.asciidoc | 11 | ||||
-rw-r--r-- | doc/src/manual/cowboy_http.asciidoc | 47 | ||||
-rw-r--r-- | src/cowboy_http.erl | 159 |
3 files changed, 120 insertions, 97 deletions
diff --git a/doc/src/guide/migrating_from_2.6.asciidoc b/doc/src/guide/migrating_from_2.6.asciidoc index a735ef9..a582ee4 100644 --- a/doc/src/guide/migrating_from_2.6.asciidoc +++ b/doc/src/guide/migrating_from_2.6.asciidoc @@ -63,6 +63,13 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater. willing to accept. By default it will accept 10 stream resets every 10 seconds. +* Flow control for incoming data has been implemented + for HTTP/1.1. Cowboy will now wait for the user code + to ask for the request body before reading it from + the socket. The option `initial_stream_flow_size` + controls how much data Cowboy will read without + being asked. + * The HTTP/1.1 and HTTP/2 option `logger` is now documented. @@ -154,7 +161,9 @@ Cowboy 2.7 requires Erlang/OTP 20.0 or greater. was waiting for more data. * It was possible for Cowboy to receive stray timeout messages - for HTTP/1.1 connections. This has been addressed. + for HTTP/1.1 connections, resulting in crashes. The timeout + handling in HTTP/1.1 has been reworked and the issue should + no longer occur. * The type for the Req object has been updated to accept custom fields as was already documented. diff --git a/doc/src/manual/cowboy_http.asciidoc b/doc/src/manual/cowboy_http.asciidoc index e9837c3..8d89ea2 100644 --- a/doc/src/manual/cowboy_http.asciidoc +++ b/doc/src/manual/cowboy_http.asciidoc @@ -17,25 +17,26 @@ as a Ranch protocol. [source,erlang] ---- opts() :: #{ - chunked => boolean(), - connection_type => worker | supervisor, - http10_keepalive => boolean(), - idle_timeout => timeout(), - inactivity_timeout => timeout(), - linger_timeout => timeout(), - logger => module(), - max_empty_lines => non_neg_integer(), - max_header_name_length => non_neg_integer(), - max_header_value_length => non_neg_integer(), - max_headers => non_neg_integer(), - max_keepalive => non_neg_integer(), - max_method_length => non_neg_integer(), - max_request_line_length => non_neg_integer(), - max_skip_body_length => non_neg_integer(), - proxy_header => boolean(), - request_timeout => timeout(), - sendfile => boolean(), - stream_handlers => [module()] + chunked => boolean(), + connection_type => worker | supervisor, + http10_keepalive => boolean(), + idle_timeout => timeout(), + inactivity_timeout => timeout(), + initial_stream_flow_size => non_neg_integer(), + linger_timeout => timeout(), + logger => module(), + max_empty_lines => non_neg_integer(), + max_header_name_length => non_neg_integer(), + max_header_value_length => non_neg_integer(), + max_headers => non_neg_integer(), + max_keepalive => non_neg_integer(), + max_method_length => non_neg_integer(), + max_request_line_length => non_neg_integer(), + max_skip_body_length => non_neg_integer(), + proxy_header => boolean(), + request_timeout => timeout(), + sendfile => boolean(), + stream_handlers => [module()] } ---- @@ -79,6 +80,12 @@ inactivity_timeout (300000):: Time in ms with nothing received at all before Cowboy closes the connection. +initial_stream_flow_size (65535):: + +Amount of data in bytes Cowboy will read from the socket +right after a request was fully received. This is a soft +limit. + linger_timeout (1000):: Time in ms that Cowboy will wait when closing the connection. This is @@ -144,7 +151,7 @@ Ordered list of stream handlers that will handle all stream events. == Changelog -* *2.7*: The `logger` option was added. +* *2.7*: The `initial_stream_flow_size` and `logger` options were added. * *2.6*: The `chunked`, `http10_keepalive`, `proxy_header` and `sendfile` options were added. * *2.5*: The `linger_timeout` option was added. * *2.2*: The `max_skip_body_length` option was added. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index baedc1c..8ff8ae2 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -33,6 +33,7 @@ http10_keepalive => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), + initial_stream_flow_size => non_neg_integer(), linger_timeout => timeout(), logger => module(), max_authority_length => non_neg_integer(), @@ -74,15 +75,9 @@ name = undefined :: binary() | undefined }). -%% @todo We need a state where we wait for the stream process to ask for the body -%% and do not attempt to read from the socket while in that state (we should read -%% up to a certain length, and then wait, basically implementing flow control but -%% by not reading from the socket when the window is empty). - -record(ps_body, { length :: non_neg_integer() | undefined, received = 0 :: non_neg_integer(), - %% @todo flow transfer_decode_fun :: fun(), %% @todo better type transfer_decode_state :: any() %% @todo better type }). @@ -136,6 +131,9 @@ %% Parsing state for the current stream or stream-to-be. in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{}, + %% Flow requested for the current stream. + flow = infinity :: non_neg_integer() | infinity, + %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. out_streamid = 1 :: pos_integer(), @@ -174,12 +172,12 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> end, case {Peer0, Sock0, Cert1} of {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - LastStreamID = maps:get(max_keepalive, Opts, 100), - before_loop(set_timeout(#state{ + State = #state{ parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, - last_streamid=LastStreamID})); + last_streamid=maps:get(max_keepalive, Opts, 100)}, + before_loop(set_timeout(State, request_timeout)); {{error, Reason}, _, _} -> terminate(undefined, {socket_error, Reason, 'A socket error occurred when retrieving the peer name.'}); @@ -191,15 +189,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 'A socket error occurred when retrieving the client TLS certificate.'}) end. +%% Do not read from the socket unless flow is large enough. +before_loop(State=#state{flow=Flow}) when Flow =< 0 -> + loop(State); before_loop(State=#state{socket=Socket, transport=Transport}) -> - %% @todo disable this when we get to the body, until the stream asks for it? - %% Perhaps have a threshold for how much we're willing to read before waiting. Transport:setopts(Socket, [{active, once}]), loop(State). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, - last_streamid=LastStreamID, streams=Streams}) -> + last_streamid=LastStreamID}) -> Messages = Transport:messages(), InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000), receive @@ -209,12 +208,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, before_loop(State); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - %% Only reset the timeout if it is idle_timeout (active streams). - State1 = case Streams of - [] -> State; - _ -> set_timeout(State) - end, - parse(<< Buffer/binary, Data/binary >>, State1); + parse(<< Buffer/binary, Data/binary >>, State); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -250,13 +244,23 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -%% We set request_timeout when there are no active streams, -%% and idle_timeout otherwise. -set_timeout(State0=#state{opts=Opts, overriden_opts=Override, streams=Streams}) -> +%% We do not set request_timeout if there are active streams. +set_timeout(State=#state{streams=[_|_]}, request_timeout) -> + State; +%% We do not set request_timeout if we are skipping a body. +set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) -> + State; +%% We do not set idle_timeout if there are no active streams, +%% unless when we are skipping a body. +set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout) + when element(1, InState) =/= ps_body -> + State; +%% Otherwise we can set the timeout. +set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> State = cancel_timeout(State0), - {Name, Default} = case Streams of - [] -> {request_timeout, 5000}; - _ -> {idle_timeout, 60000} + Default = case Name of + request_timeout -> 5000; + idle_timeout -> 60000 end, Timeout = case Override of %% The timeout may have been overriden for the current stream. @@ -314,9 +318,6 @@ parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) State#state{in_state=PS#ps_header{headers=undefined, name=undefined}}, Headers, Name)); parse(Buffer, State=#state{in_state=#ps_body{}}) -> - %% @todo We do not want to get the body automatically if the request doesn't ask for it. - %% We may want to get bodies that are below a threshold without waiting, and buffer them - %% until the request asks, though. after_parse(parse_body(Buffer, State)). after_parse({request, Req=#{streamid := StreamID, method := Method, @@ -324,14 +325,15 @@ after_parse({request, Req=#{streamid := StreamID, method := Method, State0=#state{opts=Opts, buffer=Buffer, streams=Streams0}}) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> + Flow = maps:get(initial_stream_flow_size, Opts, 65535), TE = maps:get(<<"te">>, Headers, undefined), Streams = [#stream{id=StreamID, state=StreamState, method=Method, version=Version, te=TE}|Streams0], State1 = case maybe_req_close(State0, Headers, Version) of - close -> State0#state{streams=Streams, last_streamid=StreamID}; - keepalive -> State0#state{streams=Streams} + close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow}; + keepalive -> State0#state{streams=Streams, flow=Flow} end, - State = set_timeout(State1), + State = set_timeout(State1, idle_timeout), parse(Buffer, commands(State, StreamID, Commands)) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(init, @@ -343,26 +345,40 @@ after_parse({request, Req=#{streamid := StreamID, method := Method, end; %% Streams are sequential so the body is always about the last stream created %% unless that stream has terminated. -after_parse({data, StreamID, IsFin, Data, State=#state{opts=Opts, buffer=Buffer, +after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer, streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}}) -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), - parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) + State1 = set_timeout(State0, case IsFin of + fin -> request_timeout; + nofin -> idle_timeout + end), + State = update_flow(IsFin, Data, State1#state{streams=Streams}), + parse(Buffer, commands(State, StreamID, Commands)) catch Class:Exception -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - stream_reset(State, StreamID, {internal_error, {Class, Exception}, + %% @todo Should call parse after this. + stream_terminate(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. -after_parse({data, _, _, _, State}) -> - before_loop(State); +after_parse({data, _, IsFin, _, State}) -> + before_loop(set_timeout(State, case IsFin of + fin -> request_timeout; + nofin -> idle_timeout + end)); after_parse({more, State}) -> - before_loop(State). + before_loop(set_timeout(State, idle_timeout)). + +update_flow(fin, _, State) -> + State#state{flow=infinity}; +update_flow(nofin, Data, State=#state{flow=Flow0}) -> + State#state{flow=Flow0 - byte_size(Data)}. %% Request-line. @@ -838,29 +854,25 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% @todo Proper trailers. try TDecode(Buffer, TState0) of more -> - %% @todo Asks for 0 or more bytes. {more, State#state{buffer=Buffer}}; {more, Data, TState} -> - %% @todo Asks for 0 or more bytes. {data, StreamID, nofin, Data, State#state{buffer= <<>>, in_state=PS#ps_body{received=Received + byte_size(Data), transfer_decode_state=TState}}}; {more, Data, _Length, TState} when is_integer(_Length) -> - %% @todo Asks for Length more bytes. {data, StreamID, nofin, Data, State#state{buffer= <<>>, in_state=PS#ps_body{received=Received + byte_size(Data), transfer_decode_state=TState}}}; {more, Data, Rest, TState} -> - %% @todo Asks for 0 or more bytes. {data, StreamID, nofin, Data, State#state{buffer=Rest, in_state=PS#ps_body{received=Received + byte_size(Data), transfer_decode_state=TState}}}; {done, _HasTrailers, Rest} -> - {data, StreamID, fin, <<>>, set_timeout( - State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})}; + {data, StreamID, fin, <<>>, + State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}}; {done, Data, _HasTrailers, Rest} -> - {data, StreamID, fin, Data, set_timeout( - State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}})} + {data, StreamID, fin, Data, + State#state{buffer=Rest, in_streamid=StreamID + 1, in_state=#ps_request_line{}}} catch _:_ -> Reason = {connection_error, protocol_error, 'Failure to decode the content. (RFC7230 4)'}, @@ -896,7 +908,7 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) -> cowboy:log(cowboy_stream:make_error_log(info, [StreamID, Msg, StreamState0], Class, Exception, erlang:get_stacktrace()), Opts), - stream_reset(State, StreamID, {internal_error, {Class, Exception}, + stream_terminate(State, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:info/3.'}) end; false -> @@ -915,7 +927,7 @@ commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail StreamID, Tail); %% Error handling. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> - commands(stream_reset(State, StreamID, Error), StreamID, Tail); + commands(stream_terminate(State, StreamID, Error), StreamID, Tail); %% Commands for a stream currently inactive. commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands) when Current =/= StreamID -> @@ -927,13 +939,22 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command Stream#stream{queue=Queue ++ Commands}), State#state{streams=Streams}; %% Read the request body. -commands(State, StreamID, [{flow, _Length}|Tail]) -> - %% @todo We only read from socket if buffer is empty, otherwise - %% we decode the buffer. - - %% @todo Set the body reading length to min(Length, BodyLength) - - commands(State, StreamID, Tail); +commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, + [{flow, Size}|Tail]) -> + %% We must read *at least* Size of data otherwise functions + %% like cowboy_req:read_body/1,2 will wait indefinitely. + Flow = if + Flow0 < 0 -> Size; + true -> Flow0 + Size + end, + %% Reenable active mode if necessary. + _ = if + Flow0 =< 0, Flow > 0 -> + Transport:setopts(Socket, [{active, once}]); + true -> + ok + end, + commands(State#state{flow=Flow}, StreamID, Tail); %% Error responses are sent only if a response wasn't sent already. commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID, [{error_response, Status, Headers0, Body}|Tail]) -> @@ -1100,7 +1121,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> - %% @todo This should be the last stream running otherwise we need to wait before switching. %% @todo If there's streams opened after this one, fail instead of 101. State = cancel_timeout(State0), %% Before we send the 101 response we need to stop receiving data @@ -1127,7 +1147,8 @@ commands(State0=#state{overriden_opts=Opts}, StreamID, [{set_options, SetOpts}|Tail]) -> State1 = case SetOpts of #{idle_timeout := IdleTimeout} -> - set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}); + set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, + idle_timeout); _ -> State0 end, @@ -1209,16 +1230,6 @@ maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) -> maybe_terminate(State, StreamID, _Tail) -> stream_terminate(State, StreamID, normal). -stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> - %% @todo headers - %% @todo Don't send this if there are no streams left. -% Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [ -% {<<"content-length">>, <<"0">>} -% ])), - %% @todo update IsFin local -% stream_terminate(State#state{out_state=done}, StreamID, StreamError). - stream_terminate(State, StreamID, StreamError). - stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InState, out_streamid=OutStreamID, out_state=OutState, streams=Streams0, children=Children0}, StreamID, Reason) -> @@ -1241,15 +1252,12 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta %% Remove the stream from the state and reset the overriden options. {value, #stream{state=StreamState}, Streams} = lists:keytake(StreamID, #stream.id, Streams1), - State2 = State1#state{streams=Streams, overriden_opts=#{}}, + State2 = State1#state{streams=Streams, overriden_opts=#{}, flow=infinity}, %% Stop the stream. stream_call_terminate(StreamID, Reason, StreamState, State2), Children = cowboy_children:shutdown(Children0, StreamID), %% We reset the timeout if there are no active streams anymore. - State = case Streams of - [] -> set_timeout(State2); - _ -> State2 - end, + State = set_timeout(State2#state{streams=Streams, children=Children}, request_timeout), %% We want to drop the connection if the body was not read fully %% and we don't know its length or more remains to be read than %% configuration allows. @@ -1258,10 +1266,10 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta case InState of #ps_body{length=undefined} when InStreamID =:= OutStreamID -> - terminate(State#state{streams=Streams, children=Children}, skip_body_unknown_length); + terminate(State, skip_body_unknown_length); #ps_body{length=Len, received=Received} when InStreamID =:= OutStreamID, Received + MaxSkipBodyLength < Len -> - terminate(State#state{streams=Streams, children=Children}, skip_body_too_large); + terminate(State, skip_body_too_large); _ -> %% Move on to the next stream. NextOutStreamID = OutStreamID + 1, @@ -1269,12 +1277,11 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta false -> %% @todo This is clearly wrong, if the stream is gone we need to check if %% there used to be such a stream, and if there was to send an error. - State#state{out_streamid=NextOutStreamID, out_state=wait, - streams=Streams, children=Children}; + State#state{out_streamid=NextOutStreamID, out_state=wait}; #stream{queue=Commands} -> %% @todo Remove queue from the stream. - commands(State#state{out_streamid=NextOutStreamID, out_state=wait, - streams=Streams, children=Children}, NextOutStreamID, Commands) + commands(State#state{out_streamid=NextOutStreamID, out_state=wait}, + NextOutStreamID, Commands) end end. |