diff options
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r-- | src/cowboy_stream_h.erl | 54 |
1 files changed, 36 insertions, 18 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 287fd95..5c674dd 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -34,7 +34,8 @@ read_body_timer_ref = undefined :: reference() | undefined, read_body_length = 0 :: non_neg_integer() | infinity, read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, - read_body_buffer = <<>> :: binary() + read_body_buffer = <<>> :: binary(), + body_length = 0 :: non_neg_integer() }). %% @todo For shutting down children we need to have a timeout before we terminate @@ -54,17 +55,31 @@ init(_StreamID, Req=#{ref := Ref}, Opts) -> %% If we accumulated enough data or IsFin=fin, send it. %% If not, buffer it. %% If not, buffer it. + -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) -> {cowboy_stream:commands(), State} when State::#state{}. -data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> - {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; -data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> - {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; +data(_StreamID, IsFin, Data, State=#state{ + read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> + {[], State#state{ + read_body_is_fin=IsFin, + read_body_buffer= << Buffer/binary, Data/binary >>, + body_length=BodyLen + byte_size(Data)}}; +data(_StreamID, nofin, Data, State=#state{ + read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) + when byte_size(Data) + byte_size(Buffer) < ReadLen -> + {[], State#state{ + read_body_buffer= << Buffer/binary, Data/binary >>, + body_length=BodyLen + byte_size(Data)}}; data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, - read_body_timer_ref=TRef, read_body_buffer=Buffer}) -> + read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> + BodyLen = BodyLen0 + byte_size(Data), ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), - Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, - {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}. + send_request_body(Pid, Ref, IsFin, BodyLen, <<Buffer/binary, Data/binary>>), + {[], State#state{ + read_body_ref=undefined, + read_body_timer_ref=undefined, + read_body_buffer= <<>>, + body_length=BodyLen}}. -spec info(cowboy_stream:streamid(), any(), State) -> {cowboy_stream:commands(), State} when State::#state{}. @@ -86,15 +101,11 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} ], State}; -%% Request body, no body buffer but IsFin=fin. -%info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> -% Pid ! {request_body, Ref, fin, <<>>}, -% {[], State}; %% Request body, body buffered large enough or complete. -info(_StreamID, {read_body, Ref, Length, _}, - State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) - when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> - Pid ! {request_body, Ref, IsFin, Data}, +info(_StreamID, {read_body, Ref, Length, _}, State=#state{pid=Pid, + read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) + when IsFin =:= fin; byte_size(Buffer) >= Length -> + send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), {[], State#state{read_body_buffer= <<>>}}; %% Request body, not enough to send yet. info(StreamID, {read_body, Ref, Length, Period}, State) -> @@ -102,8 +113,8 @@ info(StreamID, {read_body, Ref, Length, Period}, State) -> {[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}}; %% Request body reading timeout; send what we got. info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, - read_body_is_fin=IsFin, read_body_buffer=Buffer}) -> - Pid ! {request_body, Ref, IsFin, Buffer}, + read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> + send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}; info(_StreamID, {read_body_timeout, _}, State) -> {[], State}; @@ -132,6 +143,13 @@ terminate(_StreamID, _Reason, _State) -> early_error(StreamID, Reason, PartialReq, Resp, Opts) -> cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts). +send_request_body(Pid, Ref, nofin, _, Data) -> + Pid ! {request_body, Ref, nofin, Data}, + ok; +send_request_body(Pid, Ref, fin, BodyLen, Data) -> + Pid ! {request_body, Ref, fin, BodyLen, Data}, + ok. + %% We use ~999999p here instead of ~w because the latter doesn't %% support printable strings. report_crash(_, _, _, normal, _) -> |