From bbfc1569ccffab060c4c2b402a45119fb1f57495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 4 Apr 2018 17:23:37 +0200 Subject: Add initial implementation of Websocket over HTTP/2 Using the current draft: https://tools.ietf.org/html/draft-ietf-httpbis-h2-websockets-01 --- src/cowboy_stream_h.erl | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) (limited to 'src/cowboy_stream_h.erl') diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 93b8417..620975c 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -33,7 +33,7 @@ expect = undefined :: undefined | continue, read_body_ref = undefined :: reference() | undefined, read_body_timer_ref = undefined :: reference() | undefined, - read_body_length = 0 :: non_neg_integer() | infinity, + read_body_length = 0 :: non_neg_integer() | infinity | auto, read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()}, read_body_buffer = <<>> :: binary(), body_length = 0 :: non_neg_integer() @@ -65,8 +65,9 @@ expect(Req) -> end. %% If we receive data and stream is waiting for data: -%% If we accumulated enough data or IsFin=fin, send it. -%% If not, buffer it. +%% If we accumulated enough data or IsFin=fin, send it. +%% If we are in auto mode, send it and update flow control. +%% If not, buffer it. %% If not, buffer it. %% %% We always reset the expect field when we receive data, @@ -75,6 +76,7 @@ expect(Req) -> -spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) -> {cowboy_stream:commands(), State} when State::#state{}. +%% Stream isn't waiting for data. data(_StreamID, IsFin, Data, State=#state{ read_body_ref=undefined, read_body_buffer=Buffer, body_length=BodyLen}) -> {[], State#state{ @@ -82,6 +84,16 @@ data(_StreamID, IsFin, Data, State=#state{ read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>, body_length=BodyLen + byte_size(Data)}}; +%% Stream is waiting for data using auto mode. +%% +%% There is no buffering done in auto mode. +data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, + read_body_length=auto, body_length=BodyLen}) -> + send_request_body(Pid, Ref, IsFin, BodyLen, Data), + {[{flow, byte_size(Data)}], State#state{ + read_body_ref=undefined, + body_length=BodyLen}}; +%% Stream is waiting for data but we didn't receive enough to send yet. data(_StreamID, nofin, Data, State=#state{ read_body_length=ReadLen, read_body_buffer=Buffer, body_length=BodyLen}) when byte_size(Data) + byte_size(Buffer) < ReadLen -> @@ -89,9 +101,11 @@ data(_StreamID, nofin, Data, State=#state{ expect=undefined, read_body_buffer= << Buffer/binary, Data/binary >>, body_length=BodyLen + byte_size(Data)}}; +%% Stream is waiting for data and we received enough to send. data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_timer_ref=TRef, read_body_buffer=Buffer, body_length=BodyLen0}) -> BodyLen = BodyLen0 + byte_size(Data), + %% @todo Handle the infinity case where no TRef was defined. ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), send_request_body(Pid, Ref, IsFin, BodyLen, <>), {[], State#state{ @@ -121,6 +135,16 @@ info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} ], State}; +%% Request body, auto mode, no body buffered. +info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{read_body_buffer= <<>>}) -> + {[], State#state{ + read_body_ref=Ref, + read_body_length=auto}}; +%% Request body, auto mode, body buffered or complete. +info(_StreamID, {read_body, Ref, auto, infinity}, State=#state{pid=Pid, + read_body_is_fin=IsFin, read_body_buffer=Buffer, body_length=BodyLen}) -> + send_request_body(Pid, Ref, IsFin, BodyLen, Buffer), + {[{flow, byte_size(Buffer)}], State#state{read_body_buffer= <<>>}}; %% Request body, body buffered large enough or complete. %% %% We do not send a 100 continue response if the client @@ -136,6 +160,7 @@ info(StreamID, {read_body, Ref, Length, Period}, State=#state{expect=Expect}) -> continue -> [{inform, 100, #{}}, {flow, Length}]; undefined -> [{flow, Length}] end, + %% @todo Handle the case where Period =:= infinity. TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), {Commands, State#state{ read_body_ref=Ref, -- cgit v1.2.3