diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_http.erl | 10 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 19 | ||||
-rw-r--r-- | src/cowboy_req.erl | 2 | ||||
-rw-r--r-- | src/cowboy_stream_h.erl | 54 |
4 files changed, 48 insertions, 37 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 107fd60..f0f8ed7 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -709,11 +709,13 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% @todo Asks for 0 or more bytes. {data, StreamID, nofin, Data, State#state{in_state= PS#ps_body{transfer_decode_state=TState}}, Rest}; - {done, TotalLength, Rest} -> - {data, StreamID, {fin, TotalLength}, <<>>, set_timeout( + %% @todo We probably want to confirm that the total length + %% is the same as the content-length, if one was provided. + {done, _TotalLength, Rest} -> + {data, StreamID, fin, <<>>, set_timeout( State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}; - {done, Data, TotalLength, Rest} -> - {data, StreamID, {fin, TotalLength}, Data, set_timeout( + {done, Data, _TotalLength, Rest} -> + {data, StreamID, fin, Data, set_timeout( State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest} end. diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 77359ee..a446222 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -49,9 +49,7 @@ %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), %% Remote flow control window (how much we accept to receive). - remote_window :: integer(), - %% Request body length. - body_length = 0 :: non_neg_integer() + remote_window :: integer() }). -type stream() :: #stream{}. @@ -289,22 +287,15 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _}) terminate(State, {connection_error, protocol_error, 'DATA frame received on a stream in idle state. (RFC7540 5.1)'}); frame(State0=#state{remote_window=ConnWindow, streams=Streams}, - {data, StreamID, IsFin0, Data}) -> + {data, StreamID, IsFin, Data}) -> DataLen = byte_size(Data), State = State0#state{remote_window=ConnWindow - DataLen}, case lists:keyfind(StreamID, #stream.id, Streams) of - Stream = #stream{state=StreamState0, remote=nofin, - remote_window=StreamWindow, body_length=Len0} -> - Len = Len0 + DataLen, - IsFin = case IsFin0 of - fin -> {fin, Len}; - nofin -> nofin - end, + Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State, - Stream#stream{state=StreamState, remote_window=StreamWindow - DataLen, - body_length=Len}, Commands) + commands(State, Stream#stream{state=StreamState, remote=IsFin, + remote_window=StreamWindow - DataLen}, Commands) catch Class:Exception -> cowboy_stream:report_error(data, [StreamID, IsFin, Data, StreamState0], diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 84e1b9d..e916976 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -440,7 +440,7 @@ read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) -> receive {request_body, Ref, nofin, Body} -> {more, Body, Req}; - {request_body, Ref, {fin, BodyLength}, Body} -> + {request_body, Ref, fin, BodyLength, Body} -> {ok, Body, set_body_length(Req, BodyLength)} after Timeout -> exit(timeout) diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 287fd95..5c674dd 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -34,7 +34,8 @@ read_body_timer_ref = undefined :: reference() | undefined, read_body_length = 0 :: non_neg_integer() | infinity, read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, - read_body_buffer = <<>> :: binary() + read_body_buffer = <<>> :: binary(), + body_length = 0 :: non_neg_integer() }). %% @todo For shutting down children we need to have a timeout before we terminate @@ -54,17 +55,31 @@ init(_StreamID, Req=#{ref := Ref}, Opts) -> %% If we accumulated enough data or IsFin=fin, send it. %% If not, buffer it. %% If not, buffer it. + -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) -> {cowboy_stream:commands(), State} when State::#state{}. -data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> - {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; -data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> - {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; +data(_StreamID, IsFin, Data, State=#state{ + read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> + {[], State#state{ + read_body_is_fin=IsFin, + read_body_buffer= << Buffer/binary, Data/binary >>, + body_length=BodyLen + byte_size(Data)}}; +data(_StreamID, nofin, Data, State=#state{ + read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) + when byte_size(Data) + byte_size(Buffer) < ReadLen -> + {[], State#state{ + read_body_buffer= << Buffer/binary, Data/binary >>, + body_length=BodyLen + byte_size(Data)}}; data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, - read_body_timer_ref=TRef, read_body_buffer=Buffer}) -> + read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> + BodyLen = BodyLen0 + byte_size(Data), ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), - Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, - {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}. + send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>), + {[], State#state{ + read_body_ref=undefined, + read_body_timer_ref=undefined, + read_body_buffer= <<>>, + body_length=BodyLen}}. -spec info(cowboy_stream:streamid(), any(), State) -> {cowboy_stream:commands(), State} when State::#state{}. @@ -86,15 +101,11 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} ], State}; -%% Request body, no body buffer but IsFin=fin. -%info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> -% Pid ! {request_body, Ref, fin, <<>>}, -% {[], State}; %% Request body, body buffered large enough or complete. -info(_StreamID, {read_body, Ref, Length, _}, - State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) - when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> - Pid ! {request_body, Ref, IsFin, Data}, +info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid, + read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) + when IsFin =:= fin; byte_size(Buffer) >= Length -> + send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), {[], State#state{read_body_buffer= <<>>}}; %% Request body, not enough to send yet. info(StreamID, {read_body, Ref, Length, Period}, State) -> @@ -102,8 +113,8 @@ info(StreamID, {read_body, Ref, Length, Period}, State) -> {[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}}; %% Request body reading timeout; send what we got. info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, - read_body_is_fin=IsFin, read_body_buffer=Buffer}) -> - Pid ! {request_body, Ref, IsFin, Buffer}, + read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> + send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}; info(_StreamID, {read_body_timeout, _}, State) -> {[], State}; @@ -132,6 +143,13 @@ terminate(_StreamID, _Reason, _State) -> early_error(StreamID, Reason, PartialReq, Resp, Opts) -> cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts). +send_request_body(Pid, Ref, nofin, _, Data) -> + Pid ! {request_body, Ref, nofin, Data}, + ok; +send_request_body(Pid, Ref, fin, BodyLen, Data) -> + Pid ! {request_body, Ref, fin, BodyLen, Data}, + ok. + %% We use ~999999p here instead of ~w because the latter doesn't %% support printable strings. report_crash(_, _, _, normal, _) -> |