diff options
Diffstat (limited to 'src/cowboy_http3.erl')
-rw-r--r-- | src/cowboy_http3.erl | 54 |
1 files changed, 53 insertions, 1 deletions
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 9aa6be5..740b9e3 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -67,6 +67,7 @@ %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} | normal | {data | ignore, non_neg_integer()} | stopping + | {relaying, normal | {data, non_neg_integer()}, pid()} | {webtransport_session, normal | {ignore, non_neg_integer()}} | {webtransport_stream, cow_http3:stream_id()}, @@ -164,6 +165,8 @@ loop(State0=#state{opts=Opts, children=Children}) -> %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State0, StreamID, Msg)); + {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() -> + loop(relay_command(State0, StreamID, RelayCommand)); %% WebTransport commands. {'$webtransport_commands', SessionID, Commands} -> loop(webtransport_commands(State0, SessionID, Commands)); @@ -299,6 +302,22 @@ parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) -> parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin), StreamID, Rest, IsFin) end; +%% This clause mirrors the {data, Len} clause. +parse1(State, Stream=#stream{status={relaying, {data, Len}, RelayPid}, id=StreamID}, + Data, IsFin) -> + DataLen = byte_size(Data), + if + DataLen < Len -> + %% We don't have the full frame but this is the end of the + %% data we have. So FrameIsFin is equivalent to IsFin here. + loop(frame(State, Stream#stream{status={relaying, {data, Len - DataLen}, RelayPid}}, + {data, Data}, IsFin)); + true -> + <<Data1:Len/binary, Rest/bits>> = Data, + FrameIsFin = is_fin(IsFin, Rest), + parse(frame(State, Stream#stream{status={relaying, normal, RelayPid}}, + {data, Data1}, FrameIsFin), StreamID, Rest, IsFin) + end; parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) -> DataLen = byte_size(Data), if @@ -311,7 +330,7 @@ parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) -> end; %% @todo Clause that discards receiving data for stopping streams. %% We may receive a few more frames after we abort receiving. -parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> +parse1(State=#state{opts=Opts}, Stream=#stream{status=Status0, id=StreamID}, Data, IsFin) -> case cow_http3:parse(Data) of {ok, Frame, Rest} -> FrameIsFin = is_fin(IsFin, Rest), @@ -322,6 +341,10 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> {more, Frame = {data, _}, Len} -> %% We're at the end of the data so FrameIsFin is equivalent to IsFin. case IsFin of + nofin when element(1, Status0) =:= relaying -> + %% The stream will be stored at the end of processing commands. + Status = setelement(2, Status0, {data, Len}), + loop(frame(State, Stream#stream{status=Status}, Frame, nofin)); nofin -> %% The stream will be stored at the end of processing commands. loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin)); @@ -432,6 +455,9 @@ frame(State=#state{http3_machine=HTTP3Machine0}, terminate(State#state{http3_machine=HTTP3Machine}, Error) end. +data_frame(State, Stream=#stream{status={relaying, _, RelayPid}, id=StreamID}, IsFin, Data) -> + RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data}, + stream_store(State, Stream); data_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -767,6 +793,18 @@ commands(State0, Stream0=#stream{id=StreamID}, }, %% @todo We must propagate the buffer to capsule handling if any. commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); +%% There are two data_delivery: stream_handlers and relay. +%% The former just has the data go through stream handlers +%% like normal requests. The latter relays data directly. +commands(State0, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + Stream1 = #stream{status=normal} = stream_get(State, StreamID), + #{data_delivery_pid := RelayPid} = ModState, + %% We do not set data_delivery_flow because it is managed by quicer + %% and we do not have an easy way to modify it. + Stream = Stream1#stream{status={relaying, normal, RelayPid}}, + commands(State, Stream, Tail); commands(State0, Stream0=#stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), @@ -872,6 +910,20 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID}, cowboy_quicer:send(Conn, EncoderID, EncData)), State. +%% Relay data delivery commands. + +relay_command(State, StreamID, DataCmd = {data, _, _}) -> + Stream = stream_get(State, StreamID), + commands(State, Stream, [DataCmd]); +relay_command(State=#state{conn=Conn}, StreamID, active) -> + ok = maybe_socket_error(State, + cowboy_quicer:setopt(Conn, StreamID, active, true)), + State; +relay_command(State=#state{conn=Conn}, StreamID, passive) -> + ok = maybe_socket_error(State, + cowboy_quicer:setopt(Conn, StreamID, active, false)), + State. + %% We mark the stream as being a WebTransport stream %% and then continue parsing the data as a WebTransport %% stream. This function is common for incoming unidi |