aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http3.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_http3.erl')
-rw-r--r--src/cowboy_http3.erl54
1 files changed, 53 insertions, 1 deletions
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
index 9aa6be5..740b9e3 100644
--- a/src/cowboy_http3.erl
+++ b/src/cowboy_http3.erl
@@ -67,6 +67,7 @@
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
| normal | {data | ignore, non_neg_integer()} | stopping
+ | {relaying, normal | {data, non_neg_integer()}, pid()}
| {webtransport_session, normal | {ignore, non_neg_integer()}}
| {webtransport_stream, cow_http3:stream_id()},
@@ -164,6 +165,8 @@ loop(State0=#state{opts=Opts, children=Children}) ->
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
+ {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
+ loop(relay_command(State0, StreamID, RelayCommand));
%% WebTransport commands.
{'$webtransport_commands', SessionID, Commands} ->
loop(webtransport_commands(State0, SessionID, Commands));
@@ -299,6 +302,22 @@ parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
StreamID, Rest, IsFin)
end;
+%% This clause mirrors the {data, Len} clause.
+parse1(State, Stream=#stream{status={relaying, {data, Len}, RelayPid}, id=StreamID},
+ Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ %% We don't have the full frame but this is the end of the
+ %% data we have. So FrameIsFin is equivalent to IsFin here.
+ loop(frame(State, Stream#stream{status={relaying, {data, Len - DataLen}, RelayPid}},
+ {data, Data}, IsFin));
+ true ->
+ <<Data1:Len/binary, Rest/bits>> = Data,
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream#stream{status={relaying, normal, RelayPid}},
+ {data, Data1}, FrameIsFin), StreamID, Rest, IsFin)
+ end;
parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
DataLen = byte_size(Data),
if
@@ -311,7 +330,7 @@ parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
end;
%% @todo Clause that discards receiving data for stopping streams.
%% We may receive a few more frames after we abort receiving.
-parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
+parse1(State=#state{opts=Opts}, Stream=#stream{status=Status0, id=StreamID}, Data, IsFin) ->
case cow_http3:parse(Data) of
{ok, Frame, Rest} ->
FrameIsFin = is_fin(IsFin, Rest),
@@ -322,6 +341,10 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
{more, Frame = {data, _}, Len} ->
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
case IsFin of
+ nofin when element(1, Status0) =:= relaying ->
+ %% The stream will be stored at the end of processing commands.
+ Status = setelement(2, Status0, {data, Len}),
+ loop(frame(State, Stream#stream{status=Status}, Frame, nofin));
nofin ->
%% The stream will be stored at the end of processing commands.
loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
@@ -432,6 +455,9 @@ frame(State=#state{http3_machine=HTTP3Machine0},
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end.
+data_frame(State, Stream=#stream{status={relaying, _, RelayPid}, id=StreamID}, IsFin, Data) ->
+ RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
+ stream_store(State, Stream);
data_frame(State=#state{opts=Opts},
Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
@@ -767,6 +793,18 @@ commands(State0, Stream0=#stream{id=StreamID},
},
%% @todo We must propagate the buffer to capsule handling if any.
commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
+%% There are two data_delivery: stream_handlers and relay.
+%% The former just has the data go through stream handlers
+%% like normal requests. The latter relays data directly.
+commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ Stream1 = #stream{status=normal} = stream_get(State, StreamID),
+ #{data_delivery_pid := RelayPid} = ModState,
+ %% We do not set data_delivery_flow because it is managed by quicer
+ %% and we do not have an easy way to modify it.
+ Stream = Stream1#stream{status={relaying, normal, RelayPid}},
+ commands(State, Stream, Tail);
commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
@@ -872,6 +910,20 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
cowboy_quicer:send(Conn, EncoderID, EncData)),
State.
+%% Relay data delivery commands.
+
+relay_command(State, StreamID, DataCmd = {data, _, _}) ->
+ Stream = stream_get(State, StreamID),
+ commands(State, Stream, [DataCmd]);
+relay_command(State=#state{conn=Conn}, StreamID, active) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:setopt(Conn, StreamID, active, true)),
+ State;
+relay_command(State=#state{conn=Conn}, StreamID, passive) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:setopt(Conn, StreamID, active, false)),
+ State.
+
%% We mark the stream as being a WebTransport stream
%% and then continue parsing the data as a WebTransport
%% stream. This function is common for incoming unidi