aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http3.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_http3.erl')
-rw-r--r--src/cowboy_http3.erl304
1 files changed, 292 insertions, 12 deletions
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
index da1312e..9aa6be5 100644
--- a/src/cowboy_http3.erl
+++ b/src/cowboy_http3.erl
@@ -32,10 +32,10 @@
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
logger => module(),
- max_decode_blocked_streams => 0..16#3fffffffffffffff,
- max_decode_table_size => 0..16#3fffffffffffffff,
- max_encode_blocked_streams => 0..16#3fffffffffffffff,
- max_encode_table_size => 0..16#3fffffffffffffff,
+ max_decode_blocked_streams => 0..16#3fffffffffffffff,
+ max_decode_table_size => 0..16#3fffffffffffffff,
+ max_encode_blocked_streams => 0..16#3fffffffffffffff,
+ max_encode_table_size => 0..16#3fffffffffffffff,
max_ignored_frame_size_received => non_neg_integer() | infinity,
metrics_callback => cowboy_metrics_h:metrics_callback(),
metrics_req_filter => fun((cowboy_req:req()) -> map()),
@@ -51,18 +51,30 @@
}.
-export_type([opts/0]).
+%% HTTP/3 or WebTransport stream.
+%%
+%% WebTransport sessions involve one bidirectional CONNECT stream
+%% that must stay open (and can be used for signaling using the
+%% Capsule Protocol) and an application-defined number of
+%% unidirectional and bidirectional streams, as well as datagrams.
+%%
+%% WebTransport sessions run in the CONNECT request process and
+%% all events related to the session is sent there as a message.
+%% The pid of the process is kept in the state.
-record(stream, {
id :: cow_http3:stream_id(),
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
- | normal | {data | ignore, non_neg_integer()} | stopping,
+ | normal | {data | ignore, non_neg_integer()} | stopping
+ | {webtransport_session, normal | {ignore, non_neg_integer()}}
+ | {webtransport_stream, cow_http3:stream_id()},
%% Stream buffer.
buffer = <<>> :: binary(),
%% Stream state.
- state = undefined :: undefined | {module, any()}
+ state = undefined :: undefined | {module(), any()}
}).
-record(state, {
@@ -152,6 +164,9 @@ loop(State0=#state{opts=Opts, children=Children}) ->
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
+ %% WebTransport commands.
+ {'$webtransport_commands', SessionID, Commands} ->
+ loop(webtransport_commands(State0, SessionID, Commands));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
loop(down(State0, Pid, Msg));
@@ -164,12 +179,17 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) ->
case cowboy_quicer:handle(Msg) of
{data, StreamID, IsFin, Data} ->
parse(State0, StreamID, Data, IsFin);
+ {datagram, Data} ->
+ parse_datagram(State0, Data);
{stream_started, StreamID, StreamType} ->
State = stream_new_remote(State0, StreamID, StreamType),
loop(State);
{stream_closed, StreamID, ErrorCode} ->
State = stream_closed(State0, StreamID, ErrorCode),
loop(State);
+ {peer_send_shutdown, StreamID} ->
+ State = stream_peer_send_shutdown(State0, StreamID),
+ loop(State);
closed ->
%% @todo Different error reason if graceful?
Reason = {socket_error, closed, 'The socket has been closed.'},
@@ -216,6 +236,56 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end;
+%% @todo Handle when IsFin = fin which must terminate the WT session.
+parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status=
+ {webtransport_session, normal}}, Data, IsFin) ->
+ case cow_capsule:parse(Data) of
+ {ok, wt_drain_session, Rest} ->
+ webtransport_event(State, SessionID, close_initiated),
+ parse1(State, Stream, Rest, IsFin);
+ {ok, {wt_close_session, AppCode, AppMsg}, Rest} ->
+ %% This event will be handled specially and lead
+ %% to the termination of the session process.
+ webtransport_event(State, SessionID, {closed, AppCode, AppMsg}),
+ %% Shutdown the CONNECT stream immediately.
+ cowboy_quicer:shutdown_stream(Conn, SessionID),
+ %% @todo Will we receive a {stream_closed,...} after that?
+ %% If any data is received past that point this is an error.
+ %% @todo Don't crash, error out properly.
+ <<>> = Rest,
+ loop(webtransport_terminate_session(State, Stream));
+ more ->
+ loop(stream_store(State, Stream#stream{buffer=Data}));
+ %% Ignore unhandled/unknown capsules.
+ %% @todo Do this when cow_capsule includes some.
+% {ok, _, Rest} ->
+% parse1(State, Stream, Rest, IsFin);
+% {ok, Rest} ->
+% parse1(State, Stream, Rest, IsFin);
+ %% @todo Make the max length configurable?
+ {skip, Len} when Len =< 8192 ->
+ loop(stream_store(State, Stream#stream{
+ status={webtransport_session, {ignore, Len}}}));
+ {skip, Len} ->
+ %% @todo What should be done on capsule error?
+ error({todo, capsule_too_long, Len});
+ error ->
+ %% @todo What should be done on capsule error?
+ error({todo, capsule_error, Data})
+ end;
+parse1(State, Stream=#stream{status=
+ {webtransport_session, {ignore, Len}}}, Data, IsFin) ->
+ case Data of
+ <<_:Len/unit:8, Rest/bits>> ->
+ parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin);
+ _ ->
+ loop(stream_store(State, Stream#stream{
+ status={webtransport_session, {ignore, Len - byte_size(Data)}}}))
+ end;
+parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) ->
+ webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
+ %% No need to store the stream again, WT streams don't get changed here.
+ loop(State);
parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
DataLen = byte_size(Data),
if
@@ -246,6 +316,9 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
{ok, Frame, Rest} ->
FrameIsFin = is_fin(IsFin, Rest),
parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
+ %% The WebTransport stream header is not a real frame.
+ {webtransport_stream_header, SessionID, Rest} ->
+ become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin);
{more, Frame = {data, _}, Len} ->
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
case IsFin of
@@ -317,13 +390,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State0#state{http3_machine=HTTP3Machine}, Error)
end;
+ %% @todo Perhaps do this in cow_http3_machine directly.
{ok, push, _} ->
terminate(State0, {connection_error, h3_stream_creation_error,
'Only servers can push. (RFC9114 6.2.2)'});
+ {ok, {webtransport, SessionID}, Rest} ->
+ become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin);
%% Unknown stream types must be ignored. We choose to abort the
%% stream instead of reading and discarding the incoming data.
{undefined, _} ->
- loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
+ %% Very unlikely to happen but WebTransport headers may be fragmented
+ %% as they are more than one byte. The fin flag in this case is an error,
+ %% but because it happens in WebTransport application data (the Session ID)
+ %% we only reset the impacted stream and not the entire connection.
+ more when IsFin =:= fin ->
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
+ more ->
+ loop(stream_store(State0, Stream0#stream{buffer=Data}))
end.
frame(State=#state{http3_machine=HTTP3Machine0},
@@ -449,6 +533,8 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
end,
headers_to_map(Tail, Acc).
+%% @todo WebTransport CONNECT requests must have extra checks on settings.
+%% @todo We may also need to defer them if we didn't get settings.
headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
@@ -488,6 +574,18 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
send_headers(State0, Stream, fin, StatusCode0, RespHeaders0)
end.
+%% Datagrams.
+
+parse_datagram(State, Data0) ->
+ {SessionID, Data} = cow_http3:parse_datagram(Data0),
+ case stream_get(State, SessionID) of
+ #stream{status={webtransport_session, _}} ->
+ webtransport_event(State, SessionID, {datagram, Data}),
+ loop(State);
+ _ ->
+ error(todo) %% @todo Might be a future WT session or an error.
+ end.
+
%% Erlang messages.
down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
@@ -654,6 +752,22 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
%% Use a different protocol within the stream (CONNECT :protocol).
%% @todo Make sure we error out when the feature is disabled.
commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ #state{http3_machine=HTTP3Machine0} = State,
+ Stream1 = #stream{state=StreamState} = stream_get(State, StreamID),
+ %% The stream becomes a WT session at that point. It is the
+ %% parent stream of all streams in this WT session. The
+ %% cowboy_stream state is kept because it will be needed
+ %% to terminate the stream properly.
+ HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0),
+ Stream = Stream1#stream{
+ status={webtransport_session, normal},
+ state={cowboy_webtransport, WTState#{stream_state => StreamState}}
+ },
+ %% @todo We must propagate the buffer to capsule handling if any.
+ commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
+commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
Stream = stream_get(State, StreamID),
@@ -758,6 +872,157 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
cowboy_quicer:send(Conn, EncoderID, EncData)),
State.
+%% We mark the stream as being a WebTransport stream
+%% and then continue parsing the data as a WebTransport
+%% stream. This function is common for incoming unidi
+%% and bidi streams.
+become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0},
+ Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) ->
+ case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State = State0#state{http3_machine=HTTP3Machine},
+ Stream = Stream0#stream{status={webtransport_stream, SessionID}},
+ webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}),
+ %% We don't need to parse the remaining data if there isn't any.
+ case {Rest, IsFin} of
+ {<<>>, nofin} -> loop(stream_store(State, Stream));
+ _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin)
+ end
+ %% @todo Error conditions.
+ end.
+
+webtransport_event(State, SessionID, Event) ->
+ #stream{
+ status={webtransport_session, _},
+ state={cowboy_webtransport, #{session_pid := SessionPid}}
+ } = stream_get(State, SessionID),
+ SessionPid ! {'$webtransport_event', SessionID, Event},
+ ok.
+
+webtransport_commands(State, SessionID, Commands) ->
+ case stream_get(State, SessionID) of
+ Session = #stream{status={webtransport_session, _}} ->
+ wt_commands(State, Session, Commands);
+ %% The stream has been terminated, ignore pending commands.
+ error ->
+ State
+ end.
+
+wt_commands(State, _, []) ->
+ State;
+wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID},
+ [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) ->
+ %% Because opening the stream involves sending a short header
+ %% we necessarily write data. The InitialData variable allows
+ %% providing additional data to be sent in the same packet.
+ StartF = case StreamType of
+ bidi -> start_bidi_stream;
+ unidi -> start_unidi_stream
+ end,
+ Header = cow_http3:webtransport_stream_header(SessionID, StreamType),
+ case cowboy_quicer:StartF(Conn, [Header, InitialData]) of
+ {ok, StreamID} ->
+ %% @todo Pass Session directly?
+ webtransport_event(State0, SessionID,
+ {opened_stream_id, OpenStreamRef, StreamID}),
+ State = stream_new_local(State0, StreamID, StreamType,
+ {webtransport_stream, SessionID}),
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]});
+wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID},
+ [{send, datagram, Data}|Tail]) ->
+ case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ case cowboy_quicer:send(Conn, StreamID, Data, nofin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) ->
+ %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream.
+ Capsule = cow_capsule:wt_drain_session(),
+ case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
+ when Cmd =:= close; element(1, Cmd) =:= close ->
+ %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream.
+ {AppCode, AppMsg} = case Cmd of
+ close -> {0, <<>>};
+ {close, AppCode0} -> {AppCode0, <<>>};
+ {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0}
+ end,
+ Capsule = cow_capsule:wt_close_session(AppCode, AppMsg),
+ case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
+ ok ->
+ State = webtransport_terminate_session(State0, Session),
+ %% @todo Because the handler is in a separate process
+ %% we must wait for it to stop and eventually
+ %% kill the process if it takes too long.
+ %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it).
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end.
+
+webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0,
+ streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
+ %% Reset/abort the WT streams.
+ Streams = maps:filtermap(fun
+ (_, #stream{id=StreamID, status={webtransport_session, _}})
+ when StreamID =:= SessionID ->
+ %% We remove the session stream but do the shutdown outside this function.
+ false;
+ (StreamID, #stream{status={webtransport_stream, StreamSessionID}})
+ when StreamSessionID =:= SessionID ->
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ both, cow_http3:error_to_code(wt_session_gone)),
+ false;
+ (_, _) ->
+ true
+ end, Streams0),
+ %% Keep the streams in lingering state.
+ %% We only keep up to 100 streams in this state. @todo Make it configurable?
+ Terminated = maps:keys(Streams0) -- maps:keys(Streams),
+ Lingering = lists:sublist(Terminated ++ Lingering0, 100),
+ %% Update the HTTP3 state machine.
+ HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0),
+ State#state{
+ http3_machine=HTTP3Machine,
+ streams=Streams,
+ lingering_streams=Lingering
+ }.
+
+stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) ->
+ case stream_get(State, StreamID) of
+ %% Cleanly terminating the CONNECT stream is equivalent
+ %% to an application error code of 0 and empty message.
+ Stream = #stream{status={webtransport_session, _}} ->
+ webtransport_event(State, StreamID, {closed, 0, <<>>}),
+ %% Shutdown the CONNECT stream fully.
+ cowboy_quicer:shutdown_stream(Conn, StreamID),
+ webtransport_terminate_session(State, Stream);
+ _ ->
+ State
+ end.
+
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of
@@ -903,15 +1168,25 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas
stream_get(#state{streams=Streams}, StreamID) ->
maps:get(StreamID, Streams, error).
-stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
- StreamID, StreamType) ->
+stream_new_local(State, StreamID, StreamType, Status) ->
+ stream_new(State, StreamID, StreamType, unidi_local, Status).
+
+stream_new_remote(State, StreamID, StreamType) ->
+ Status = case StreamType of
+ unidi -> header;
+ bidi -> normal
+ end,
+ stream_new(State, StreamID, StreamType, unidi_remote, Status).
+
+stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
+ StreamID, StreamType, UnidiType, Status) ->
{HTTP3Machine, Status} = case StreamType of
unidi ->
- {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
- header};
+ {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0),
+ Status};
bidi ->
{cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
- normal}
+ Status}
end,
Stream = #stream{id=StreamID, status=Status},
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
@@ -926,6 +1201,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
stream_closed(State=#state{opts=Opts,
streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
case maps:take(StreamID, Streams0) of
+ %% In the WT session's case, streams will be
+ %% removed in webtransport_terminate_session.
+ {Stream=#stream{status={webtransport_session, _}}, _} ->
+ webtransport_event(State, StreamID, closed_abruptly),
+ webtransport_terminate_session(State, Stream);
{#stream{state=undefined}, Streams} ->
%% Unidi stream has no handler/children.
stream_closed1(State#state{streams=Streams}, StreamID);