diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy.erl | 10 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 21 | ||||
-rw-r--r-- | src/cowboy_http3.erl | 304 | ||||
-rw-r--r-- | src/cowboy_quicer.erl | 62 | ||||
-rw-r--r-- | src/cowboy_req.erl | 1 | ||||
-rw-r--r-- | src/cowboy_rest.erl | 1 | ||||
-rw-r--r-- | src/cowboy_stream.erl | 1 | ||||
-rw-r--r-- | src/cowboy_websocket.erl | 1 | ||||
-rw-r--r-- | src/cowboy_webtransport.erl | 292 |
9 files changed, 669 insertions, 24 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index d46691f..6a5634e 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -95,8 +95,14 @@ start_quic(Ref, TransOpts, ProtoOpts) -> end, SocketOpts = [ {alpn, ["h3"]}, %% @todo Why not binary? - {peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec. - {peer_bidi_stream_count, 100} + %% We only need 3 for control and QPACK enc/dec, + %% but we need more for WebTransport. %% @todo Use 3 if WT is disabled. + {peer_unidi_stream_count, 100}, + {peer_bidi_stream_count, 100}, + %% For WebTransport. + %% @todo We probably don't want it enabled if WT isn't used. + {datagram_send_enabled, 1}, + {datagram_receive_enabled, 1} |SocketOpts2], _ListenerPid = spawn(fun() -> {ok, Listener} = quicer:listen(Port, SocketOpts), diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index f4cfa39..0d22fa1 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -154,7 +154,8 @@ }). -spec init(pid(), ranch:ref(), inet:socket(), module(), - ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. + ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), 'A socket error occurred when retrieving the peer name.'), @@ -178,7 +179,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary()) -> ok. + binary() | undefined, binary()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), @@ -227,7 +229,8 @@ add_period(Time, Period) -> Time + Period. -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. + binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> DynamicBuffer = init_dynamic_buffer_size(Opts), @@ -276,7 +279,7 @@ before_loop(State=#state{opts=#{hibernate := true}}, Buffer) -> before_loop(State, Buffer) -> loop(State, Buffer). --spec loop(#state{}, binary()) -> ok. +-spec loop(#state{}, binary()) -> no_return(). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> @@ -1134,7 +1137,9 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> %% in-flight stream creation (at least one round-trip time), the server can send %% another GOAWAY frame with an updated last stream identifier. This ensures %% that a connection can be cleanly shut down without losing requests. + -spec initiate_closing(#state{}, _) -> #state{}. + initiate_closing(State=#state{http2_status=connected, socket=Socket, transport=Transport, opts=Opts}, Reason) -> ok = maybe_socket_error(State, Transport:send(Socket, @@ -1151,7 +1156,9 @@ initiate_closing(State, Reason) -> terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). %% Switch to 'closing' state and stop accepting new streams. + -spec closing(#state{}, Reason :: term()) -> #state{}. + closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> terminate(State, Reason); closing(State0=#state{http2_status=closing_initiated, @@ -1188,6 +1195,7 @@ maybe_socket_error(State, {error, Reason}, Human) -> terminate(State, {socket_error, Reason, Human}). -spec terminate(#state{} | undefined, _) -> no_return(). + terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, @@ -1388,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> %% System callbacks. --spec system_continue(_, _, {#state{}, binary()}) -> ok. +-spec system_continue(_, _, {#state{}, binary()}) -> no_return(). + system_continue(_, _, {State, Buffer}) -> before_loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). + system_terminate(Reason0, _, _, {State, Buffer}) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, before_loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. + system_code_change(Misc, _, _, _) -> {ok, Misc}. diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index da1312e..9aa6be5 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -32,10 +32,10 @@ enable_connect_protocol => boolean(), env => cowboy_middleware:env(), logger => module(), - max_decode_blocked_streams => 0..16#3fffffffffffffff, - max_decode_table_size => 0..16#3fffffffffffffff, - max_encode_blocked_streams => 0..16#3fffffffffffffff, - max_encode_table_size => 0..16#3fffffffffffffff, + max_decode_blocked_streams => 0..16#3fffffffffffffff, + max_decode_table_size => 0..16#3fffffffffffffff, + max_encode_blocked_streams => 0..16#3fffffffffffffff, + max_encode_table_size => 0..16#3fffffffffffffff, max_ignored_frame_size_received => non_neg_integer() | infinity, metrics_callback => cowboy_metrics_h:metrics_callback(), metrics_req_filter => fun((cowboy_req:req()) -> map()), @@ -51,18 +51,30 @@ }. -export_type([opts/0]). +%% HTTP/3 or WebTransport stream. +%% +%% WebTransport sessions involve one bidirectional CONNECT stream +%% that must stay open (and can be used for signaling using the +%% Capsule Protocol) and an application-defined number of +%% unidirectional and bidirectional streams, as well as datagrams. +%% +%% WebTransport sessions run in the CONNECT request process and +%% all events related to the session is sent there as a message. +%% The pid of the process is kept in the state. -record(stream, { id :: cow_http3:stream_id(), %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} - | normal | {data | ignore, non_neg_integer()} | stopping, + | normal | {data | ignore, non_neg_integer()} | stopping + | {webtransport_session, normal | {ignore, non_neg_integer()}} + | {webtransport_stream, cow_http3:stream_id()}, %% Stream buffer. buffer = <<>> :: binary(), %% Stream state. - state = undefined :: undefined | {module, any()} + state = undefined :: undefined | {module(), any()} }). -record(state, { @@ -152,6 +164,9 @@ loop(State0=#state{opts=Opts, children=Children}) -> %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State0, StreamID, Msg)); + %% WebTransport commands. + {'$webtransport_commands', SessionID, Commands} -> + loop(webtransport_commands(State0, SessionID, Commands)); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> loop(down(State0, Pid, Msg)); @@ -164,12 +179,17 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) -> case cowboy_quicer:handle(Msg) of {data, StreamID, IsFin, Data} -> parse(State0, StreamID, Data, IsFin); + {datagram, Data} -> + parse_datagram(State0, Data); {stream_started, StreamID, StreamType} -> State = stream_new_remote(State0, StreamID, StreamType), loop(State); {stream_closed, StreamID, ErrorCode} -> State = stream_closed(State0, StreamID, ErrorCode), loop(State); + {peer_send_shutdown, StreamID} -> + State = stream_peer_send_shutdown(State0, StreamID), + loop(State); closed -> %% @todo Different error reason if graceful? Reason = {socket_error, closed, 'The socket has been closed.'}, @@ -216,6 +236,56 @@ parse1(State=#state{http3_machine=HTTP3Machine0}, {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State#state{http3_machine=HTTP3Machine}, Error) end; +%% @todo Handle when IsFin = fin which must terminate the WT session. +parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status= + {webtransport_session, normal}}, Data, IsFin) -> + case cow_capsule:parse(Data) of + {ok, wt_drain_session, Rest} -> + webtransport_event(State, SessionID, close_initiated), + parse1(State, Stream, Rest, IsFin); + {ok, {wt_close_session, AppCode, AppMsg}, Rest} -> + %% This event will be handled specially and lead + %% to the termination of the session process. + webtransport_event(State, SessionID, {closed, AppCode, AppMsg}), + %% Shutdown the CONNECT stream immediately. + cowboy_quicer:shutdown_stream(Conn, SessionID), + %% @todo Will we receive a {stream_closed,...} after that? + %% If any data is received past that point this is an error. + %% @todo Don't crash, error out properly. + <<>> = Rest, + loop(webtransport_terminate_session(State, Stream)); + more -> + loop(stream_store(State, Stream#stream{buffer=Data})); + %% Ignore unhandled/unknown capsules. + %% @todo Do this when cow_capsule includes some. +% {ok, _, Rest} -> +% parse1(State, Stream, Rest, IsFin); +% {ok, Rest} -> +% parse1(State, Stream, Rest, IsFin); + %% @todo Make the max length configurable? + {skip, Len} when Len =< 8192 -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len}}})); + {skip, Len} -> + %% @todo What should be done on capsule error? + error({todo, capsule_too_long, Len}); + error -> + %% @todo What should be done on capsule error? + error({todo, capsule_error, Data}) + end; +parse1(State, Stream=#stream{status= + {webtransport_session, {ignore, Len}}}, Data, IsFin) -> + case Data of + <<_:Len/unit:8, Rest/bits>> -> + parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin); + _ -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len - byte_size(Data)}}})) + end; +parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) -> + webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}), + %% No need to store the stream again, WT streams don't get changed here. + loop(State); parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) -> DataLen = byte_size(Data), if @@ -246,6 +316,9 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> {ok, Frame, Rest} -> FrameIsFin = is_fin(IsFin, Rest), parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin); + %% The WebTransport stream header is not a real frame. + {webtransport_stream_header, SessionID, Rest} -> + become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin); {more, Frame = {data, _}, Len} -> %% We're at the end of the data so FrameIsFin is equivalent to IsFin. case IsFin of @@ -317,13 +390,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State0#state{http3_machine=HTTP3Machine}, Error) end; + %% @todo Perhaps do this in cow_http3_machine directly. {ok, push, _} -> terminate(State0, {connection_error, h3_stream_creation_error, 'Only servers can push. (RFC9114 6.2.2)'}); + {ok, {webtransport, SessionID}, Rest} -> + become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin); %% Unknown stream types must be ignored. We choose to abort the %% stream instead of reading and discarding the incoming data. {undefined, _} -> - loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)) + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + %% Very unlikely to happen but WebTransport headers may be fragmented + %% as they are more than one byte. The fin flag in this case is an error, + %% but because it happens in WebTransport application data (the Session ID) + %% we only reset the impacted stream and not the entire connection. + more when IsFin =:= fin -> + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + more -> + loop(stream_store(State0, Stream0#stream{buffer=Data})) end. frame(State=#state{http3_machine=HTTP3Machine0}, @@ -449,6 +533,8 @@ headers_to_map([{Name, Value}|Tail], Acc0) -> end, headers_to_map(Tail, Acc). +%% @todo WebTransport CONNECT requests must have extra checks on settings. +%% @todo We may also need to defer them if we didn't get settings. headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> @@ -488,6 +574,18 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, send_headers(State0, Stream, fin, StatusCode0, RespHeaders0) end. +%% Datagrams. + +parse_datagram(State, Data0) -> + {SessionID, Data} = cow_http3:parse_datagram(Data0), + case stream_get(State, SessionID) of + #stream{status={webtransport_session, _}} -> + webtransport_event(State, SessionID, {datagram, Data}), + loop(State); + _ -> + error(todo) %% @todo Might be a future WT session or an error. + end. + %% Erlang messages. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> @@ -654,6 +752,22 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) -> %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. commands(State0, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + #state{http3_machine=HTTP3Machine0} = State, + Stream1 = #stream{state=StreamState} = stream_get(State, StreamID), + %% The stream becomes a WT session at that point. It is the + %% parent stream of all streams in this WT session. The + %% cowboy_stream state is kept because it will be needed + %% to terminate the stream properly. + HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0), + Stream = Stream1#stream{ + status={webtransport_session, normal}, + state={cowboy_webtransport, WTState#{stream_state => StreamState}} + }, + %% @todo We must propagate the buffer to capsule handling if any. + commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); +commands(State0, Stream0=#stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), Stream = stream_get(State, StreamID), @@ -758,6 +872,157 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID}, cowboy_quicer:send(Conn, EncoderID, EncData)), State. +%% We mark the stream as being a WebTransport stream +%% and then continue parsing the data as a WebTransport +%% stream. This function is common for incoming unidi +%% and bidi streams. +become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) -> + case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={webtransport_stream, SessionID}}, + webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}), + %% We don't need to parse the remaining data if there isn't any. + case {Rest, IsFin} of + {<<>>, nofin} -> loop(stream_store(State, Stream)); + _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin) + end + %% @todo Error conditions. + end. + +webtransport_event(State, SessionID, Event) -> + #stream{ + status={webtransport_session, _}, + state={cowboy_webtransport, #{session_pid := SessionPid}} + } = stream_get(State, SessionID), + SessionPid ! {'$webtransport_event', SessionID, Event}, + ok. + +webtransport_commands(State, SessionID, Commands) -> + case stream_get(State, SessionID) of + Session = #stream{status={webtransport_session, _}} -> + wt_commands(State, Session, Commands); + %% The stream has been terminated, ignore pending commands. + error -> + State + end. + +wt_commands(State, _, []) -> + State; +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) -> + %% Because opening the stream involves sending a short header + %% we necessarily write data. The InitialData variable allows + %% providing additional data to be sent in the same packet. + StartF = case StreamType of + bidi -> start_bidi_stream; + unidi -> start_unidi_stream + end, + Header = cow_http3:webtransport_stream_header(SessionID, StreamType), + case cowboy_quicer:StartF(Conn, [Header, InitialData]) of + {ok, StreamID} -> + %% @todo Pass Session directly? + webtransport_event(State0, SessionID, + {opened_stream_id, OpenStreamRef, StreamID}), + State = stream_new_local(State0, StreamID, StreamType, + {webtransport_stream, SessionID}), + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{send, datagram, Data}|Tail]) -> + case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) -> + %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream. + Capsule = cow_capsule:wt_drain_session(), + case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) + when Cmd =:= close; element(1, Cmd) =:= close -> + %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream. + {AppCode, AppMsg} = case Cmd of + close -> {0, <<>>}; + {close, AppCode0} -> {AppCode0, <<>>}; + {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0} + end, + Capsule = cow_capsule:wt_close_session(AppCode, AppMsg), + case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of + ok -> + State = webtransport_terminate_session(State0, Session), + %% @todo Because the handler is in a separate process + %% we must wait for it to stop and eventually + %% kill the process if it takes too long. + %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it). + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end. + +webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0, + streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> + %% Reset/abort the WT streams. + Streams = maps:filtermap(fun + (_, #stream{id=StreamID, status={webtransport_session, _}}) + when StreamID =:= SessionID -> + %% We remove the session stream but do the shutdown outside this function. + false; + (StreamID, #stream{status={webtransport_stream, StreamSessionID}}) + when StreamSessionID =:= SessionID -> + cowboy_quicer:shutdown_stream(Conn, StreamID, + both, cow_http3:error_to_code(wt_session_gone)), + false; + (_, _) -> + true + end, Streams0), + %% Keep the streams in lingering state. + %% We only keep up to 100 streams in this state. @todo Make it configurable? + Terminated = maps:keys(Streams0) -- maps:keys(Streams), + Lingering = lists:sublist(Terminated ++ Lingering0, 100), + %% Update the HTTP3 state machine. + HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0), + State#state{ + http3_machine=HTTP3Machine, + streams=Streams, + lingering_streams=Lingering + }. + +stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) -> + case stream_get(State, StreamID) of + %% Cleanly terminating the CONNECT stream is equivalent + %% to an application error code of 0 and empty message. + Stream = #stream{status={webtransport_session, _}} -> + webtransport_event(State, StreamID, {closed, 0, <<>>}), + %% Shutdown the CONNECT stream fully. + cowboy_quicer:shutdown_stream(Conn, StreamID), + webtransport_terminate_session(State, Stream); + _ -> + State + end. + reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, Stream=#stream{id=StreamID}, Error) -> Reason = case Error of @@ -903,15 +1168,25 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas stream_get(#state{streams=Streams}, StreamID) -> maps:get(StreamID, Streams, error). -stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, - StreamID, StreamType) -> +stream_new_local(State, StreamID, StreamType, Status) -> + stream_new(State, StreamID, StreamType, unidi_local, Status). + +stream_new_remote(State, StreamID, StreamType) -> + Status = case StreamType of + unidi -> header; + bidi -> normal + end, + stream_new(State, StreamID, StreamType, unidi_remote, Status). + +stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, + StreamID, StreamType, UnidiType, Status) -> {HTTP3Machine, Status} = case StreamType of unidi -> - {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0), - header}; + {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0), + Status}; bidi -> {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0), - normal} + Status} end, Stream = #stream{id=StreamID, status=Status}, State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. @@ -926,6 +1201,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) -> stream_closed(State=#state{opts=Opts, streams=Streams0, children=Children0}, StreamID, ErrorCode) -> case maps:take(StreamID, Streams0) of + %% In the WT session's case, streams will be + %% removed in webtransport_terminate_session. + {Stream=#stream{status={webtransport_session, _}}, _} -> + webtransport_event(State, StreamID, closed_abruptly), + webtransport_terminate_session(State, Stream); {#stream{state=undefined}, Streams} -> %% Unidi stream has no handler/children. stream_closed1(State#state{streams=Streams}, StreamID); diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index d9f51f3..aa52fae 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -23,9 +23,12 @@ -export([shutdown/2]). %% Streams. +-export([start_bidi_stream/2]). -export([start_unidi_stream/2]). -export([send/3]). -export([send/4]). +-export([send_datagram/2]). +-export([shutdown_stream/2]). -export([shutdown_stream/4]). %% Messages. @@ -45,6 +48,9 @@ peercert(_) -> no_quicer(). -spec shutdown(_, _) -> no_return(). shutdown(_, _) -> no_quicer(). +-spec start_bidi_stream(_, _) -> no_return(). +start_bidi_stream(_, _) -> no_quicer(). + -spec start_unidi_stream(_, _) -> no_return(). start_unidi_stream(_, _) -> no_quicer(). @@ -54,6 +60,12 @@ send(_, _, _) -> no_quicer(). -spec send(_, _, _, _) -> no_return(). send(_, _, _, _) -> no_quicer(). +-spec send_datagram(_, _) -> no_return(). +send_datagram(_, _) -> no_quicer(). + +-spec shutdown_stream(_, _) -> no_return(). +shutdown_stream(_, _) -> no_quicer(). + -spec shutdown_stream(_, _, _, _) -> no_return(). shutdown_stream(_, _, _, _) -> no_quicer(). @@ -109,16 +121,26 @@ shutdown(Conn, ErrorCode) -> %% Streams. +-spec start_bidi_stream(quicer_connection_handle(), iodata()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +start_bidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE). + -spec start_unidi_stream(quicer_connection_handle(), iodata()) -> {ok, cow_http3:stream_id()} | {error, any()}. -start_unidi_stream(Conn, HeaderData) -> +start_unidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL). + +start_stream(Conn, InitialData, OpenFlag) -> case quicer:start_stream(Conn, #{ active => true, - open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of + open_flag => OpenFlag}) of {ok, StreamRef} -> - case quicer:send(StreamRef, HeaderData) of + case quicer:send(StreamRef, InitialData) of {ok, _} -> {ok, StreamID} = quicer:get_stream_id(StreamRef), put({quicer_stream, StreamID}, StreamRef), @@ -156,6 +178,29 @@ send(_Conn, StreamID, Data, IsFin) -> send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE; send_flag(fin) -> ?QUIC_SEND_FLAG_FIN. +-spec send_datagram(quicer_connection_handle(), iodata()) + -> ok | {error, any()}. + +send_datagram(Conn, Data) -> + %% @todo Fix/ignore the Dialyzer error instead of doing this. + DataBin = iolist_to_binary(Data), + Size = byte_size(DataBin), + case quicer:send_dgram(Conn, DataBin) of + {ok, Size} -> + ok; + %% @todo Handle error cases. + Error -> + Error + end. + +-spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id()) + -> ok. + +shutdown_stream(_Conn, StreamID) -> + StreamRef = get({quicer_stream, StreamID}), + _ = quicer:shutdown_stream(StreamRef), + ok. + -spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id(), both | receiving, quicer_app_errno()) -> ok. @@ -165,6 +210,7 @@ shutdown_stream(_Conn, StreamID, Dir, ErrorCode) -> _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity), ok. +%% @todo Are these flags correct for what we want? shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT; shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. @@ -173,9 +219,11 @@ shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. %% @todo Probably should have the Conn given as argument too? -spec handle({quic, _, _, _}) -> {data, cow_http3:stream_id(), cow_http:fin(), binary()} + | {datagram, binary()} | {stream_started, cow_http3:stream_id(), unidi | bidi} | {stream_closed, cow_http3:stream_id(), quicer_app_errno()} | closed + | {peer_send_shutdown, cow_http3:stream_id()} | ok | unknown | {socket_error, any()}. @@ -187,6 +235,9 @@ handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) -> _ -> nofin end, {data, StreamID, IsFin, Data}; +%% @todo Match on Conn. +handle({quic, Data, _Conn, Flags}) when is_binary(Data), is_integer(Flags) -> + {datagram, Data}; %% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED. handle({quic, new_stream, StreamRef, #{flags := Flags}}) -> case quicer:setopt(StreamRef, active, true) of @@ -219,8 +270,9 @@ handle({quic, dgram_state_changed, _Conn, _Props}) -> %% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT handle({quic, transport_shutdown, _Conn, _Flags}) -> ok; -handle({quic, peer_send_shutdown, _StreamRef, undefined}) -> - ok; +handle({quic, peer_send_shutdown, StreamRef, undefined}) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {peer_send_shutdown, StreamID}; handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) -> ok; handle({quic, shutdown, _Conn, success}) -> diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 933d22e..550054e 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -445,6 +445,7 @@ parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_webs parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1; parse_header_fun(<<"trailer">>) -> fun cow_http_hd:parse_trailer/1; parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1; +parse_header_fun(<<"wt-available-protocols">>) -> fun cow_http_hd:parse_wt_available_protocols/1; parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1. parse_header(Name, Req, Default, ParseFun) -> diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 1e4f4f7..9f30fcf 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -1622,5 +1622,6 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, Class, erlang:raise(Class, Reason, Stacktrace). terminate(Req, #state{handler=Handler, handler_state=HandlerState}) -> + %% @todo I don't think the result is used anywhere? Result = cowboy_handler:terminate(normal, Req, HandlerState, Handler), {ok, Req, Result}. diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl index 79e4357..6680bdc 100644 --- a/src/cowboy_stream.erl +++ b/src/cowboy_stream.erl @@ -49,6 +49,7 @@ -type reason() :: normal | switch_protocol | {internal_error, timeout | {error | exit | throw, any()}, human_reason()} | {socket_error, closed | atom(), human_reason()} + %% @todo Or cow_http3:error(). | {stream_error, cow_http2:error(), human_reason()} | {connection_error, cow_http2:error(), human_reason()} | {stop, cow_http2:frame() | {exit, any()}, human_reason()}. diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index dd57730..cb30c3f 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -402,6 +402,7 @@ before_loop(State, HandlerState, ParseState) -> -spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}. +%% @todo Do we really need this for HTTP/2? set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> %% Most of the time we don't need to cancel the timer since it %% will have triggered already. But this call is harmless so diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl new file mode 100644 index 0000000..8c8ca39 --- /dev/null +++ b/src/cowboy_webtransport.erl @@ -0,0 +1,292 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @todo To enable WebTransport the following options need to be set: +%% +%% QUIC: +%% - max_datagram_frame_size > 0 +%% +%% HTTP/3: +%% - SETTINGS_H3_DATAGRAM = 1 +%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1 +%% - SETTINGS_WT_MAX_SESSIONS >= 1 + +%% Cowboy supports versions 07 through 13 of the WebTransport drafts. +%% Cowboy also has some compatibility with version 02. +%% +%% WebTransport CONNECT requests go through cowboy_stream as normal +%% and then an upgrade/switch_protocol is issued (just like Websocket). +%% After that point none of the events go through cowboy_stream except +%% the final terminate event. The request process becomes the process +%% handling all events in the WebTransport session. +%% +%% WebTransport sessions can be ended via a command, via a crash or +%% exit, via the closing of the connection (client or server inititated), +%% via the client ending the session (mirroring the command) or via +%% the client terminating the CONNECT stream. +-module(cowboy_webtransport). + +-export([upgrade/4]). +-export([upgrade/5]). + +%% cowboy_stream. +-export([info/3]). +-export([terminate/3]). + +-type stream_type() :: unidi | bidi. +-type open_stream_ref() :: any(). + +-type event() :: + {stream_open, cow_http3:stream_id(), stream_type()} | + {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} | + {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} | + {datagram, binary()} | + close_initiated. + +-type commands() :: [ + {open_stream, open_stream_ref(), stream_type(), iodata()} | + {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} | + {send, cow_http3:stream_id() | datagram, iodata()} | + initiate_close | + close | + {close, cow_http3:wt_app_error_code()} | + {close, cow_http3:wt_app_error_code(), iodata()} +]. +-export_type([commands/0]). + +-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}. + +-callback init(Req, any()) + -> {ok | module(), Req, any()} + | {module(), Req, any(), any()} + when Req::cowboy_req:req(). + +-callback webtransport_init(State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_init/1]). + +-callback webtransport_handle(event(), State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_handle/2]). + +-callback webtransport_info(any(), State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_info/2]). + +-callback terminate(any(), cowboy_req:req(), any()) -> ok. +-optional_callbacks([terminate/3]). + +-type opts() :: #{ + req_filter => fun((cowboy_req:req()) -> map()) +}. +-export_type([opts/0]). + +-record(state, { + id :: cow_http3:stream_id(), + parent :: pid(), + opts = #{} :: opts(), + handler :: module(), + hibernate = false :: boolean(), + req = #{} :: map() +}). + +%% This function mirrors a similar function for Websocket. + +-spec is_upgrade_request(cowboy_req:req()) -> boolean(). + +is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol}) + when Version =:= 'HTTP/3' -> + %% @todo scheme MUST BE "https" + <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol); + +is_upgrade_request(_) -> + false. + +%% Stream process. + +-spec upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +-spec upgrade(Req, Env, module(), any(), opts()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +%% @todo Immediately crash if a response has already been sent. +upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) -> + FilteredReq = case maps:get(req_filter, Opts, undefined) of + undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req); + FilterFun -> FilterFun(Req) + end, + State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq}, + %% @todo Must ensure the relevant settings are enabled (QUIC and H3). + %% Either we check them BEFORE, or we check them when the handler + %% is OK to initiate a webtransport session. Probably need to + %% check them BEFORE as we need to become (takeover) the webtransport process + %% after we are done with the upgrade. Maybe in cow_http3_machine but + %% it doesn't have QUIC settings currently (max_datagram_size). + case is_upgrade_request(Req) of + true -> + Headers = cowboy_req:response_headers(#{}, Req), + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, + #{session_pid => self()}}}, + webtransport_init(State, HandlerState); + %% Use 501 Not Implemented to mirror the recommendation in + %% by RFC9220 3 (WebSockets Upgrade over HTTP/3). + false -> + %% @todo I don't think terminate will be called. + {ok, cowboy_req:reply(501, Req), Env} + end. + +webtransport_init(State=#state{handler=Handler}, HandlerState) -> + case erlang:function_exported(Handler, webtransport_init, 1) of + true -> handler_call(State, HandlerState, webtransport_init, undefined); + false -> before_loop(State, HandlerState) + end. + +before_loop(State=#state{hibernate=true}, HandlerState) -> + proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]); +before_loop(State, HandlerState) -> + loop(State, HandlerState). + +-spec loop(#state{}, any()) -> no_return(). + +loop(State=#state{id=SessionID, parent=Parent}, HandlerState) -> + receive + {'$webtransport_event', SessionID, Event={closed, _, _}} -> + terminate_proc(State, HandlerState, Event); + {'$webtransport_event', SessionID, Event=closed_abruptly} -> + terminate_proc(State, HandlerState, Event); + {'$webtransport_event', SessionID, Event} -> + handler_call(State, HandlerState, webtransport_handle, Event); + %% Timeouts. +%% @todo idle_timeout +% {timeout, TRef, ?MODULE} -> +% tick_idle_timeout(State, HandlerState, ParseState); +% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> +% before_loop(State, HandlerState, ParseState); + %% System messages. + {'EXIT', Parent, Reason} -> + %% @todo We should exit gracefully. + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, HandlerState}); + %% Calls from supervisor module. + {'$gen_call', From, Call} -> + cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), + before_loop(State, HandlerState); + Message -> + handler_call(State, HandlerState, webtransport_info, Message) + end. + +handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) -> + try case Callback of + webtransport_init -> Handler:webtransport_init(HandlerState); + _ -> Handler:Callback(Message, HandlerState) + end of + {Commands, HandlerState2} when is_list(Commands) -> + handler_call_result(State, HandlerState2, Commands); + {Commands, HandlerState2, hibernate} when is_list(Commands) -> + handler_call_result(State#state{hibernate=true}, HandlerState2, Commands) + catch Class:Reason:Stacktrace -> + %% @todo Do we need to send a close? Let cowboy_http3 detect and handle it? + handler_terminate(State, HandlerState, {crash, Class, Reason}), + erlang:raise(Class, Reason, Stacktrace) + end. + +handler_call_result(State0, HandlerState, Commands) -> + case commands(Commands, State0, ok, []) of + {ok, State} -> + before_loop(State, HandlerState); + {stop, State} -> + terminate_proc(State, HandlerState, stop) + end. + +%% We accumulate the commands that must be sent to the connection process +%% because we want to send everything into one message. Other commands are +%% processed immediately. + +commands([], State, Res, []) -> + {Res, State}; +commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) -> + Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)}, + {Res, State}; +%% {open_stream, OpenStreamRef, StreamType, InitialData}. +commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% {close_stream, StreamID, Code}. +commands([Command={close_stream, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% @todo We must reject send to a remote unidi stream. +%% {send, StreamID | datagram, Data}. +commands([Command={send, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% {send, StreamID, IsFin, Data}. +commands([Command={send, _, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% initiate_close - DRAIN_WT_SESSION +commands([Command=initiate_close|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION +%% @todo At this point the handler must not issue stream or send commands. +commands([Command=close|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]); +commands([Command={close, _}|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]); +commands([Command={close, _, _}|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]). +%% @todo A set_options command could be useful to increase the number of allowed streams +%% or other forms of flow control. Alternatively a flow command. Or both. +%% @todo A shutdown_reason command could be useful for the same reasons as Websocekt. + +-spec terminate_proc(_, _, _) -> no_return(). + +terminate_proc(State, HandlerState, Reason) -> + handler_terminate(State, HandlerState, Reason), + %% @todo This is what should be done if shutdown_reason gets implemented. +% case Shutdown of +% normal -> exit(normal); +% _ -> exit({shutdown, Shutdown}) +% end. + exit(normal). + +handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> + cowboy_handler:terminate(Reason, Req, HandlerState, Handler). + +%% cowboy_stream callbacks. +%% +%% We shortcut stream handlers but still need to process some events +%% such as process exiting or termination. We implement the relevant +%% callbacks here. Note that as far as WebTransport is concerned, +%% receiving stream data here would be an error therefore the data +%% callback is not implemented. +%% +%% @todo Better type than map() for the cowboy_stream state. + +-spec info(cowboy_stream:streamid(), any(), State) + -> {cowboy_stream:commands(), State} when State::map(). + +info(StreamID, Msg, WTState=#{stream_state := StreamState0}) -> + {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0), + {Commands, WTState#{stream_state => StreamState}}. + +-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), map()) + -> any(). + +terminate(StreamID, Reason, #{stream_state := StreamState}) -> + cowboy_stream:terminate(StreamID, Reason, StreamState). |