diff options
Diffstat (limited to 'src/cowboy_stream_h.erl')
-rw-r--r-- | src/cowboy_stream_h.erl | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index b834c17..54dcc2d 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -29,7 +29,8 @@ ref = undefined :: ranch:ref(), pid = undefined :: pid(), read_body_ref = undefined :: reference(), - read_body_length = 0 :: non_neg_integer(), + read_body_timer_ref = undefined :: reference(), + read_body_length = 0 :: non_neg_integer() | infinity, read_body_is_fin = nofin :: nofin | fin, read_body_buffer = <<>> :: binary() }). @@ -58,9 +59,11 @@ data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buf {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; -data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) -> +data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, + read_body_timer_ref=TRef, read_body_buffer=Buffer}) -> + ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, - {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}. + {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}. %% @todo proper specs -spec info(_,_,_) -> _. @@ -90,17 +93,26 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, {internal_error, Exit, 'Stream process crashed.'} ], State}; %% Request body, no body buffer but IsFin=fin. -info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> - Pid ! {request_body, Ref, fin, <<>>}, - {[], State}; +%info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> +% Pid ! {request_body, Ref, fin, <<>>}, +% {[], State}; %% Request body, body buffered large enough or complete. -info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) +info(_StreamID, {read_body, Ref, Length, _}, + State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> Pid ! {request_body, Ref, IsFin, Data}, {[], State#state{read_body_buffer= <<>>}}; %% Request body, not enough to send yet. -info(_StreamID, {read_body, Ref, Length}, State) -> - {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}}; +info(StreamID, {read_body, Ref, Length, Period}, State) -> + TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), + {[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}}; +%% Request body reading timeout; send what we got. +info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, + read_body_is_fin=IsFin, read_body_buffer=Buffer}) -> + Pid ! {request_body, Ref, IsFin, Buffer}, + {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}; +info(_StreamID, {read_body_timeout, _}, State) -> + {[], State}; %% Response. info(_StreamID, Response = {response, _, _, _}, State) -> {[Response], State}; @@ -108,6 +120,8 @@ info(_StreamID, Headers = {headers, _, _}, State) -> {[Headers], State}; info(_StreamID, Data = {data, _, _}, State) -> {[Data], State}; +info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) -> + {[Push], State}; info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> {[SwitchProtocol], State}; %% Stray message. |