aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cowboy_http.erl10
-rw-r--r--src/cowboy_http2.erl19
-rw-r--r--src/cowboy_req.erl2
-rw-r--r--src/cowboy_stream_h.erl54
4 files changed, 48 insertions, 37 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 107fd60..f0f8ed7 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -709,11 +709,13 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
%% @todo Asks for 0 or more bytes.
{data, StreamID, nofin, Data, State#state{in_state=
PS#ps_body{transfer_decode_state=TState}}, Rest};
- {done, TotalLength, Rest} ->
- {data, StreamID, {fin, TotalLength}, <<>>, set_timeout(
+ %% @todo We probably want to confirm that the total length
+ %% is the same as the content-length, if one was provided.
+ {done, _TotalLength, Rest} ->
+ {data, StreamID, fin, <<>>, set_timeout(
State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
- {done, Data, TotalLength, Rest} ->
- {data, StreamID, {fin, TotalLength}, Data, set_timeout(
+ {done, Data, _TotalLength, Rest} ->
+ {data, StreamID, fin, Data, set_timeout(
State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
end.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 77359ee..a446222 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -49,9 +49,7 @@
%% Whether we finished receiving data.
remote = nofin :: cowboy_stream:fin(),
%% Remote flow control window (how much we accept to receive).
- remote_window :: integer(),
- %% Request body length.
- body_length = 0 :: non_neg_integer()
+ remote_window :: integer()
}).
-type stream() :: #stream{}.
@@ -289,22 +287,15 @@ frame(State=#state{client_streamid=LastStreamID}, {data, StreamID, _, _})
terminate(State, {connection_error, protocol_error,
'DATA frame received on a stream in idle state. (RFC7540 5.1)'});
frame(State0=#state{remote_window=ConnWindow, streams=Streams},
- {data, StreamID, IsFin0, Data}) ->
+ {data, StreamID, IsFin, Data}) ->
DataLen = byte_size(Data),
State = State0#state{remote_window=ConnWindow - DataLen},
case lists:keyfind(StreamID, #stream.id, Streams) of
- Stream = #stream{state=StreamState0, remote=nofin,
- remote_window=StreamWindow, body_length=Len0} ->
- Len = Len0 + DataLen,
- IsFin = case IsFin0 of
- fin -> {fin, Len};
- nofin -> nofin
- end,
+ Stream = #stream{state=StreamState0, remote=nofin, remote_window=StreamWindow} ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
- commands(State,
- Stream#stream{state=StreamState, remote_window=StreamWindow - DataLen,
- body_length=Len}, Commands)
+ commands(State, Stream#stream{state=StreamState, remote=IsFin,
+ remote_window=StreamWindow - DataLen}, Commands)
catch Class:Exception ->
cowboy_stream:report_error(data,
[StreamID, IsFin, Data, StreamState0],
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 84e1b9d..e916976 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -440,7 +440,7 @@ read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) ->
receive
{request_body, Ref, nofin, Body} ->
{more, Body, Req};
- {request_body, Ref, {fin, BodyLength}, Body} ->
+ {request_body, Ref, fin, BodyLength, Body} ->
{ok, Body, set_body_length(Req, BodyLength)}
after Timeout ->
exit(timeout)
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 287fd95..5c674dd 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -34,7 +34,8 @@
read_body_timer_ref = undefined :: reference() | undefined,
read_body_length = 0 :: non_neg_integer() | infinity,
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
- read_body_buffer = <<>> :: binary()
+ read_body_buffer = <<>> :: binary(),
+ body_length = 0 :: non_neg_integer()
}).
%% @todo For shutting down children we need to have a timeout before we terminate
@@ -54,17 +55,31 @@ init(_StreamID, Req=#{ref := Ref}, Opts) ->
%% If we accumulated enough data or IsFin=fin, send it.
%% If not, buffer it.
%% If not, buffer it.
+
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
-data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
- {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
-data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
- {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, IsFin, Data, State=#state{
+ read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
+ {[], State#state{
+ read_body_is_fin=IsFin,
+ read_body_buffer= << Buffer/binary, Data/binary >>,
+ body_length=BodyLen + byte_size(Data)}};
+data(_StreamID, nofin, Data, State=#state{
+ read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
+ when byte_size(Data) + byte_size(Buffer) < ReadLen ->
+ {[], State#state{
+ read_body_buffer= << Buffer/binary, Data/binary >>,
+ body_length=BodyLen + byte_size(Data)}};
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
- read_body_timer_ref=TRef, read_body_buffer=Buffer}) ->
+ read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
+ BodyLen = BodyLen0 + byte_size(Data),
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
- Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
- {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}.
+ send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
+ {[], State#state{
+ read_body_ref=undefined,
+ read_body_timer_ref=undefined,
+ read_body_buffer= <<>>,
+ body_length=BodyLen}}.
-spec info(cowboy_stream:streamid(), any(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
@@ -86,15 +101,11 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref,
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
{internal_error, Exit, 'Stream process crashed.'}
], State};
-%% Request body, no body buffer but IsFin=fin.
-%info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
-% Pid ! {request_body, Ref, fin, <<>>},
-% {[], State};
%% Request body, body buffered large enough or complete.
-info(_StreamID, {read_body, Ref, Length, _},
- State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
- when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
- Pid ! {request_body, Ref, IsFin, Data},
+info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid,
+ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen})
+ when IsFin =:= fin; byte_size(Buffer) >= Length ->
+ send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
{[], State#state{read_body_buffer= <<>>}};
%% Request body, not enough to send yet.
info(StreamID, {read_body, Ref, Length, Period}, State) ->
@@ -102,8 +113,8 @@ info(StreamID, {read_body, Ref, Length, Period}, State) ->
{[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}};
%% Request body reading timeout; send what we got.
info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref,
- read_body_is_fin=IsFin, read_body_buffer=Buffer}) ->
- Pid ! {request_body, Ref, IsFin, Buffer},
+ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
+ send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
{[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}};
info(_StreamID, {read_body_timeout, _}, State) ->
{[], State};
@@ -132,6 +143,13 @@ terminate(_StreamID, _Reason, _State) ->
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
+send_request_body(Pid, Ref, nofin, _, Data) ->
+ Pid ! {request_body, Ref, nofin, Data},
+ ok;
+send_request_body(Pid, Ref, fin, BodyLen, Data) ->
+ Pid ! {request_body, Ref, fin, BodyLen, Data},
+ ok.
+
%% We use ~999999p here instead of ~w because the latter doesn't
%% support printable strings.
report_crash(_, _, _, normal, _) ->