aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_stream_h.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r--src/cowboy_stream_h.erl31
1 files changed, 28 insertions, 3 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 93b8417..620975c 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -33,7 +33,7 @@
expect = undefined :: undefined | continue,
read_body_ref = undefined :: reference() | undefined,
read_body_timer_ref = undefined :: reference() | undefined,
- read_body_length = 0 :: non_neg_integer() | infinity,
+ read_body_length = 0 :: non_neg_integer() | infinity | auto,
read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()},
read_body_buffer = <<>> :: binary(),
body_length = 0 :: non_neg_integer()
@@ -65,8 +65,9 @@ expect(Req) ->
end.
%% If we receive data and stream is waiting for data:
-%% If we accumulated enough data or IsFin=fin, send it.
-%% If not, buffer it.
+%% If we accumulated enough data or IsFin=fin, send it.
+%% If we are in auto mode, send it and update flow control.
+%% If not, buffer it.
%% If not, buffer it.
%%
%% We always reset the expect field when we receive data,
@@ -75,6 +76,7 @@ expect(Req) ->
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
+%% Stream isn't waiting for data.
data(_StreamID, IsFin, Data, State=#state{
read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) ->
{[], State#state{
@@ -82,6 +84,16 @@ data(_StreamID, IsFin, Data, State=#state{
read_body_is_fin=IsFin,
read_body_buffer= << Buffer/binary, Data/binary >>,
body_length=BodyLen + byte_size(Data)}};
+%% Stream is waiting for data using auto mode.
+%%
+%% There is no buffering done in auto mode.
+data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
+ read_body_length=auto, body_length=BodyLen}) ->
+ send_request_body(Pid, Ref, IsFin, BodyLen, Data),
+ {[{flow, byte_size(Data)}], State#state{
+ read_body_ref=undefined,
+ body_length=BodyLen}};
+%% Stream is waiting for data but we didn't receive enough to send yet.
data(_StreamID, nofin, Data, State=#state{
read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen})
when byte_size(Data) + byte_size(Buffer) < ReadLen ->
@@ -89,9 +101,11 @@ data(_StreamID, nofin, Data, State=#state{
expect=undefined,
read_body_buffer= << Buffer/binary, Data/binary >>,
body_length=BodyLen + byte_size(Data)}};
+%% Stream is waiting for data and we received enough to send.
data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref,
read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) ->
BodyLen = BodyLen0 + byte_size(Data),
+ %% @todo Handle the infinity case where no TRef was defined.
ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]),
send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>),
{[], State#state{
@@ -121,6 +135,16 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref,
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>},
{internal_error, Exit, 'Stream process crashed.'}
], State};
+%% Request body, auto mode, no body buffered.
+info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) ->
+ {[], State#state{
+ read_body_ref=Ref,
+ read_body_length=auto}};
+%% Request body, auto mode, body buffered or complete.
+info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{pid=Pid,
+ read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) ->
+ send_request_body(Pid, Ref, IsFin, BodyLen, Buffer),
+ {[{flow, byte_size(Buffer)}], State#state{read_body_buffer= <<>>}};
%% Request body, body buffered large enough or complete.
%%
%% We do not send a 100 continue response if the client
@@ -136,6 +160,7 @@ info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) ->
continue -> [{inform, 100, #{}}, {flow, Length}];
undefined -> [{flow, Length}]
end,
+ %% @todo Handle the case where Period =:= infinity.
TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}),
{Commands, State#state{
read_body_ref=Ref,