diff options
| -rw-r--r-- | src/cowboy_http3.erl | 72 | ||||
| -rw-r--r-- | src/cowboy_quic.erl | 2 | ||||
| -rw-r--r-- | src/cowboy_webtransport.erl | 46 | ||||
| -rw-r--r-- | test/draft_h3_webtransport_SUITE.erl | 26 | ||||
| -rw-r--r-- | test/handlers/wt_echo_h.erl | 24 |
5 files changed, 93 insertions, 77 deletions
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 72b9598..ddd51ad 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -100,7 +100,7 @@ http3_machine :: cow_http3_machine:http3_machine(), %% Specially handled local unidi streams. - local_control_id = undefined :: undefined | cow_http3:stream_id(), + local_control_id = undefined :: undefined | cow_http3:stream_id(), %% @todo We probably don't need this. local_encoder_id = undefined :: undefined | cow_http3:stream_id(), local_decoder_id = undefined :: undefined | cow_http3:stream_id(), @@ -191,13 +191,10 @@ handle_quic_msg(State0=#state{backend=QuicBackend, opts=Opts}, Msg) -> State = stream_new_remote(State0, StreamID, StreamType), loop(State); {stream_reset, StreamID, _ErrorCode} -> - %% @todo Properly handle half-closed. - %% @todo Rename this function. - State = stream_peer_send_shutdown(State0, StreamID), + State = stream_reset_remote(State0, StreamID), loop(State); {stream_stop_sending, StreamID, ErrorCode} -> - %% @todo Properly handle half-closed. - State = stream_closed(State0, StreamID, ErrorCode), + State = stream_stop_sending_remote(State0, StreamID, ErrorCode), loop(State); {conn_closed, transport, _QuicErrno} -> %% @todo Different error reason if graceful? @@ -263,8 +260,6 @@ parse1(State=#state{backend=QuicBackend, conn=Conn}, Stream=#stream{id=SessionID webtransport_event(State, SessionID, {closed, AppCode, AppMsg}), %% Shutdown the CONNECT stream immediately. QuicBackend:send(Conn, SessionID, fin, <<>>), - %% @todo Will we receive a {stream_closed,...} after that? - %% If any data is received past that point this is an error. %% @todo Don't crash, error out properly. <<>> = Rest, loop(webtransport_terminate_session(State, Stream)); @@ -948,7 +943,7 @@ become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, {ok, HTTP3Machine} -> State = State0#state{http3_machine=HTTP3Machine}, Stream = Stream0#stream{status={webtransport_stream, SessionID}}, - webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}), + webtransport_event(State, SessionID, {stream_opened, StreamID, StreamType}), %% We don't need to parse the remaining data if there isn't any. case {Rest, IsFin} of {<<>>, nofin} -> loop(stream_store(State, Stream)); @@ -999,13 +994,6 @@ wt_commands(State0=#state{backend=QuicBackend, conn=Conn}, Session=#stream{id=Se wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> %% @todo Check that StreamID belongs to Session. error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); -wt_commands(State=#state{backend=QuicBackend, conn=Conn}, Session=#stream{id=SessionID}, - [{send, datagram, Data}|Tail]) -> - case QuicBackend:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of - ok -> - wt_commands(State, Session, Tail) - %% @todo Handle errors. - end; wt_commands(State=#state{backend=QuicBackend, conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> %% @todo Check that StreamID belongs to Session. case QuicBackend:send(Conn, StreamID, Data) of @@ -1020,7 +1008,15 @@ wt_commands(State=#state{backend=QuicBackend, conn=Conn}, Session, [{send, Strea wt_commands(State, Session, Tail) %% @todo Handle errors. end; -wt_commands(State=#state{backend=QuicBackend, conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) -> +wt_commands(State=#state{backend=QuicBackend, conn=Conn}, Session=#stream{id=SessionID}, + [{send_datagram, Data}|Tail]) -> + case QuicBackend:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{backend=QuicBackend, conn=Conn}, Session=#stream{id=SessionID}, + [drain_session|Tail]) -> %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream. Capsule = cow_capsule:wt_drain_session(), case QuicBackend:send(Conn, SessionID, Capsule) of @@ -1032,9 +1028,9 @@ wt_commands(State0=#state{backend=QuicBackend, conn=Conn}, Session=#stream{id=Se when Cmd =:= close; element(1, Cmd) =:= close -> %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream. {AppCode, AppMsg} = case Cmd of - close -> {0, <<>>}; - {close, AppCode0} -> {AppCode0, <<>>}; - {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0} + close_session -> {0, <<>>}; + {close_session, AppCode0} -> {AppCode0, <<>>}; + {close_session, AppCode0, AppMsg0} -> {AppCode0, AppMsg0} end, Capsule = cow_capsule:wt_close_session(AppCode, AppMsg), case QuicBackend:send(Conn, SessionID, fin, Capsule) of @@ -1075,19 +1071,6 @@ webtransport_terminate_session(State=#state{backend=QuicBackend, conn=Conn, http lingering_streams=Lingering }. -stream_peer_send_shutdown(State=#state{backend=QuicBackend, conn=Conn}, StreamID) -> - case stream_get(State, StreamID) of - %% Cleanly terminating the CONNECT stream is equivalent - %% to an application error code of 0 and empty message. - Stream = #stream{status={webtransport_session, _}} -> - webtransport_event(State, StreamID, {closed, 0, <<>>}), - %% Shutdown the CONNECT stream fully. - QuicBackend:send(Conn, StreamID, fin, <<>>), - webtransport_terminate_session(State, Stream); - _ -> - State - end. - reset_stream(State0=#state{backend=QuicBackend, conn=Conn, http3_machine=HTTP3Machine0}, Stream=#stream{id=StreamID}, Error) -> Reason = case Error of @@ -1255,6 +1238,29 @@ stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, Stream = #stream{id=StreamID, status=Status}, State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. +stream_reset_remote(State=#state{backend=QuicBackend, conn=Conn}, StreamID) -> + %% Peer will no longer send data. + %% @todo Check remote control streams, they must not be closed. + + case stream_get(State, StreamID) of + %% Cleanly terminating the CONNECT stream is equivalent + %% to an application error code of 0 and empty message. + Stream = #stream{status={webtransport_session, _}} -> + webtransport_event(State, StreamID, {closed, 0, <<>>}), + %% Shutdown the CONNECT stream fully. + QuicBackend:send(Conn, StreamID, fin, <<>>), + webtransport_terminate_session(State, Stream); + _ -> + State + end. + +stream_stop_sending_remote( + %% Peer will no longer receive data. + %% @todo Check local control streams, they must not be closed. + +%% @todo For WT we send an event and clean up if stream is closed right? + + %% Stream closed message for a local (write-only) unidi stream. stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) -> stream_closed1(State, StreamID); diff --git a/src/cowboy_quic.erl b/src/cowboy_quic.erl index 94ea074..3357879 100644 --- a/src/cowboy_quic.erl +++ b/src/cowboy_quic.erl @@ -27,7 +27,7 @@ start_link(Ref, QuicBackend, Conn, Opts) -> {ok, Pid}. -spec connection_process(pid(), corral:ref(), module(), corral_backend:conn(), cowboy:opts()) - -> ok. + -> no_return(). connection_process(Parent, Ref, QuicBackend, Conn, Opts) -> {ok, #{alpn := <<"h3">>}} = QuicBackend:handshake(Conn), diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index 7bcac55..2a519f0 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -48,20 +48,25 @@ -type open_stream_ref() :: any(). -type event() :: - {stream_open, cow_http3:stream_id(), stream_type()} | - {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} | + {stream_opened, cow_http3:stream_id(), stream_type()} | + {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} | %% Unfortunate but unavoidable. {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} | + {stream_reset, cow_http3:stream_id(), cow_http3:wt_app_error_code() | undefined} | %% @todo Add. (for undefined see end of 4.4) + {stream_stop_sending, cow_http3:stream_id(), cow_http3:wt_app_error_code() | undefined} | %% @todo Add. {datagram, binary()} | - close_initiated. + drain_session. %% @todo Add a test, command tested, event not. -type commands() :: [ {open_stream, open_stream_ref(), stream_type(), iodata()} | - {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} | - {send, cow_http3:stream_id() | datagram, iodata()} | - initiate_close | - close | - {close, cow_http3:wt_app_error_code()} | - {close, cow_http3:wt_app_error_code(), iodata()} + {send, cow_http3:stream_id(), iodata()} | + {send, cow_http3:stream_id(), cow_http:fin(), iodata()} | + {reset_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} | %% @todo Separated from close_stream. + {stop_sending, cow_http3:stream_id(), cow_http3:wt_app_error_code()} | %% @todo Separated from close_stream. + {send_datagram, iodata()} | + drain_session | + close_session | + {close_session, cow_http3:wt_app_error_code()} | + {close_session, cow_http3:wt_app_error_code(), iodata()} ]. -export_type([commands/0]). @@ -167,9 +172,11 @@ before_loop(State, HandlerState) -> loop(State=#state{id=SessionID, parent=Parent}, HandlerState) -> receive - {'$webtransport_event', SessionID, Event={closed, _, _}} -> + %% Application close via WT_CLOSE_SESSION. + {'$webtransport_event', SessionID, Event={close_session, _, _}} -> %% @todo Renamed. terminate_proc(State, HandlerState, Event); - {'$webtransport_event', SessionID, Event=closed_abruptly} -> + %% Abrupt session close. + {'$webtransport_event', SessionID, Event={session_error, _, _}} -> %% @todo Renamed and added info (atom reason, atom/binary string explaining). terminate_proc(State, HandlerState, Event); {'$webtransport_event', SessionID, Event} -> handler_call(State, HandlerState, webtransport_handle, Event); @@ -233,22 +240,25 @@ commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) -> commands([Command={close_stream, _, _}|Tail], State, Res, Acc) -> commands(Tail, State, Res, [Command|Acc]); %% @todo We must reject send to a remote unidi stream. -%% {send, StreamID | datagram, Data}. +%% {send, StreamID, Data}. commands([Command={send, _, _}|Tail], State, Res, Acc) -> commands(Tail, State, Res, [Command|Acc]); %% {send, StreamID, IsFin, Data}. commands([Command={send, _, _, _}|Tail], State, Res, Acc) -> commands(Tail, State, Res, [Command|Acc]); -%% initiate_close - DRAIN_WT_SESSION -commands([Command=initiate_close|Tail], State, Res, Acc) -> +%% {send_datagram, Data}. +commands([Command={send_datagram, _}|Tail], State, Res, Acc) -> commands(Tail, State, Res, [Command|Acc]); -%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION +%% drain_session - DRAIN_WT_SESSION +commands([Command=drain_session|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% close_session | {close_session, Code} | {close_session, Code, Msg} - CLOSE_WT_SESSION %% @todo At this point the handler must not issue stream or send commands. -commands([Command=close|Tail], State, _, Acc) -> +commands([Command=close_session|Tail], State, _, Acc) -> commands(Tail, State, stop, [Command|Acc]); -commands([Command={close, _}|Tail], State, _, Acc) -> +commands([Command={close_session, _}|Tail], State, _, Acc) -> commands(Tail, State, stop, [Command|Acc]); -commands([Command={close, _, _}|Tail], State, _, Acc) -> +commands([Command={close_session, _, _}|Tail], State, _, Acc) -> commands(Tail, State, stop, [Command|Acc]). %% @todo A set_options command could be useful to increase the number of allowed streams %% or other forms of flow control. Alternatively a flow command. Or both. diff --git a/test/draft_h3_webtransport_SUITE.erl b/test/draft_h3_webtransport_SUITE.erl index c6382ec..131b001 100644 --- a/test/draft_h3_webtransport_SUITE.erl +++ b/test/draft_h3_webtransport_SUITE.erl @@ -273,7 +273,7 @@ datagrams(Config) -> % cow_http3:encode_int(0) % ]), % %% Receive a datagram indicating processing by the WT handler. -% {datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn), +% {datagram, SessionID, <<"TEST:drain_session">>} = do_receive_datagram(Conn), % ok. wt_drain_session_client(Config) -> @@ -288,7 +288,7 @@ wt_drain_session_client(Config) -> %% Send the WT_DRAIN_SESSION capsule on the CONNECT stream. {ok, _} = quicer:send(ConnectStreamRef, cow_capsule:wt_drain_session()), %% Receive a datagram indicating processing by the WT handler. - {datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn), + {datagram, SessionID, <<"TEST:drain_session">>} = do_receive_datagram(Conn), ok. wt_drain_session_server(Config) -> @@ -302,7 +302,7 @@ wt_drain_session_server(Config) -> } = do_webtransport_connect(Config), %% Create a bidi stream, send a special instruction to make it initiate the close. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), - {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:initiate_close">>), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:drain_session">>), %% Receive the WT_DRAIN_SESSION capsule on the CONNECT stream. DrainWTSessionCapsule = cow_capsule:wt_drain_session(), {nofin, DrainWTSessionCapsule} = do_receive_data(ConnectStreamRef), @@ -321,7 +321,7 @@ wt_drain_session_continue_client(Config) -> %% Send the WT_DRAIN_SESSION capsule on the CONNECT stream. {ok, _} = quicer:send(ConnectStreamRef, cow_capsule:wt_drain_session()), %% Receive a datagram indicating processing by the WT handler. - {datagram, SessionID, <<"TEST:close_initiated">>} = do_receive_datagram(Conn), + {datagram, SessionID, <<"TEST:drain_session">>} = do_receive_datagram(Conn), %% Create a new bidi stream, send Hello, get Hello back. {ok, ContinueStreamRef} = quicer:start_stream(Conn, #{}), {ok, _} = quicer:send(ContinueStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "Hello">>), @@ -340,7 +340,7 @@ wt_drain_session_continue_server(Config) -> } = do_webtransport_connect(Config), %% Create a bidi stream, send a special instruction to make it initiate the close. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), - {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:initiate_close">>), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:drain_session">>), %% Receive the WT_DRAIN_SESSION capsule on the CONNECT stream. DrainWTSessionCapsule = cow_capsule:wt_drain_session(), {nofin, DrainWTSessionCapsule} = do_receive_data(ConnectStreamRef), @@ -465,7 +465,7 @@ wt_close_session_server(Config) -> } = do_webtransport_connect(Config), %% Create a bidi stream, send a special instruction to make it initiate the close. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), - {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:close">>), + {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, "TEST:close_session">>), %% Receive the WT_CLOSE_SESSION capsule on the CONNECT stream. CloseWTSessionCapsule = cow_capsule:wt_close_session(0, <<>>), {fin, CloseWTSessionCapsule} = do_receive_data(ConnectStreamRef), @@ -540,7 +540,7 @@ wt_session_gone_server(Config) -> {nofin, <<"Hello">>} = do_receive_data(RemoteBidiStreamRef), %% Send a special instruction to make the server initiate the close. - {ok, _} = quicer:send(LocalBidiStreamRef, <<"TEST:close">>), + {ok, _} = quicer:send(LocalBidiStreamRef, <<"TEST:close_session">>), %% Receive the WT_CLOSE_SESSION capsule on the CONNECT stream. CloseWTSessionCapsule = cow_capsule:wt_close_session(0, <<>>), {fin, CloseWTSessionCapsule} = do_receive_data(ConnectStreamRef), @@ -575,7 +575,7 @@ wt_close_session_app_code_msg_client(Config) -> %% @todo Stop reading from the CONNECt stream too. (STOP_SENDING) %% Receive the terminate event from the WT handler. receive - {'$wt_echo_h', terminate, {closed, 17, <<"seventeen">>}, _, _} -> + {'$wt_echo_h', terminate, {close_session, 17, <<"seventeen">>}, _, _} -> ok after 1000 -> error({timeout, waiting_for_terminate_event}) @@ -593,7 +593,7 @@ wt_close_session_app_code_server(Config) -> %% Create a bidi stream, send a special instruction to make it initiate the close. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, - "TEST:close_app_code">>), + "TEST:close_session_app_code">>), %% Receive the WT_CLOSE_SESSION capsule on the CONNECT stream. CloseWTSessionCapsule = cow_capsule:wt_close_session(1234567890, <<>>), {fin, CloseWTSessionCapsule} = do_receive_data(ConnectStreamRef), @@ -611,7 +611,7 @@ wt_close_session_app_code_msg_server(Config) -> %% Create a bidi stream, send a special instruction to make it initiate the close. {ok, LocalStreamRef} = quicer:start_stream(Conn, #{}), {ok, _} = quicer:send(LocalStreamRef, <<1:2, 16#41:14, 0:2, SessionID:6, - "TEST:close_app_code_msg">>), + "TEST:close_session_app_code_msg">>), %% Receive the WT_CLOSE_SESSION capsule on the CONNECT stream. CloseWTSessionCapsule = iolist_to_binary(cow_capsule:wt_close_session(1234567890, <<"onetwothreefourfivesixseveneightnineten">>)), @@ -645,7 +645,7 @@ connect_stream_closed_cleanly_fin(Config) -> {ok, _} = quicer:send(ConnectStreamRef, <<>>, ?QUIC_SEND_FLAG_FIN), %% Receive the terminate event from the WT handler. receive - {'$wt_echo_h', terminate, {closed, 0, <<>>}, _, _} -> + {'$wt_echo_h', terminate, {close_session, 0, <<>>}, _, _} -> ok after 1000 -> error({timeout, waiting_for_terminate_event}) @@ -671,7 +671,7 @@ connect_stream_closed_cleanly_shutdown(Config) -> _ = quicer:shutdown_stream(ConnectStreamRef), %% Receive the terminate event from the WT handler. receive - {'$wt_echo_h', terminate, {closed, 0, <<>>}, _, _} -> + {'$wt_echo_h', terminate, {close_session, 0, <<>>}, _, _} -> ok after 1000 -> error({timeout, waiting_for_terminate_event}) @@ -699,7 +699,7 @@ connect_stream_closed_abruptly(Config) -> receive %% @todo It would be good to forward a stream error as well %% so that a WT error can be sent, but I have been unsuccessful. - {'$wt_echo_h', terminate, closed_abruptly, _, _} -> + {'$wt_echo_h', terminate, {session_error, _, _}, _, _} -> ok after 1000 -> error({timeout, waiting_for_terminate_event}) diff --git a/test/handlers/wt_echo_h.erl b/test/handlers/wt_echo_h.erl index 5198565..c9ce8f5 100644 --- a/test/handlers/wt_echo_h.erl +++ b/test/handlers/wt_echo_h.erl @@ -26,10 +26,10 @@ init(Req0, _) -> end, {cowboy_webtransport, Req, #{}}. -webtransport_handle(Event = {stream_open, StreamID, bidi}, Streams) -> +webtransport_handle(Event = {stream_opened, StreamID, bidi}, Streams) -> ?LOG("WT handle ~p~n", [Event]), {[], Streams#{StreamID => bidi}}; -webtransport_handle(Event = {stream_open, StreamID, unidi}, Streams) -> +webtransport_handle(Event = {stream_opened, StreamID, unidi}, Streams) -> ?LOG("WT handle ~p~n", [Event]), OpenStreamRef = make_ref(), {[{open_stream, OpenStreamRef, unidi, <<>>}], Streams#{ @@ -56,14 +56,14 @@ webtransport_handle(Event = {stream_data, StreamID, _IsFin, <<"TEST:", Test/bits OpenStreamRef = make_ref(), {[{open_stream, OpenStreamRef, bidi, <<>>}], Streams#{OpenStreamRef => bidi}}; - <<"initiate_close">> -> - {[initiate_close], Streams}; - <<"close">> -> + <<"drain_session">> -> + {[drain_session], Streams}; + <<"close_session">> -> {[close], Streams}; - <<"close_app_code">> -> - {[{close, 1234567890}], Streams}; - <<"close_app_code_msg">> -> - {[{close, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams}; + <<"close_session_app_code">> -> + {[{close_session, 1234567890}], Streams}; + <<"close_session_app_code_msg">> -> + {[{close_session, 1234567890, <<"onetwothreefourfivesixseveneightnineten">>}], Streams}; <<"event_pid:", EventPidBin/bits>> -> {[{send, StreamID, nofin, <<"event_pid_received">>}], Streams#{event_pid => binary_to_term(EventPidBin)}} @@ -82,10 +82,10 @@ webtransport_handle(Event = {stream_data, StreamID, IsFin, Data}, Streams) -> end; webtransport_handle(Event = {datagram, Data}, Streams) -> ?LOG("WT handle ~p~n", [Event]), - {[{send, datagram, Data}], Streams}; -webtransport_handle(Event = close_initiated, Streams) -> + {[{send_datagram, Data}], Streams}; +webtransport_handle(Event = drain_session, Streams) -> ?LOG("WT handle ~p~n", [Event]), - {[{send, datagram, <<"TEST:close_initiated">>}], Streams}; + {[{send_datagram, <<"TEST:drain_session">>}], Streams}; webtransport_handle(Event, Streams) -> ?LOG("WT handle ignore ~p~n", [Event]), {[], Streams}. |
