From 6dd58a4ff9deedeeb6029827b936c2e81866cd54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 22 Sep 2023 16:28:00 +0200 Subject: Initial HTTP/3 implementation Since quicer, which provides the QUIC implementation, is a NIF, Gun cannot depend directly on it. In order to enable QUIC and HTTP/3, users have to set the GUN_QUICER environment variable: export GUN_QUICER=1 Gun is now tested using GitHub Actions. As a result OTP-24+ is now required. In addition, the number of OTP releases tested has been reduced; only the latest of each major version is now tested. This also updates Erlang.mk. --- .github/workflows/ci.yaml | 36 +++ Makefile | 25 +- ebin/gun.app | 2 +- erlang.mk | 4 +- rebar.config | 2 +- src/gun.erl | 84 +++++- src/gun_http.erl | 2 +- src/gun_http2.erl | 4 +- src/gun_http3.erl | 744 ++++++++++++++++++++++++++++++++++++++++++++++ src/gun_protocols.erl | 2 + src/gun_quicer.erl | 283 ++++++++++++++++++ 11 files changed, 1164 insertions(+), 24 deletions(-) create mode 100644 .github/workflows/ci.yaml create mode 100644 src/gun_http3.erl create mode 100644 src/gun_quicer.erl diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..67bfd42 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,36 @@ +## Use workflows from ninenines/ci.erlang.mk to test Gun. + +name: Check Gun + +on: + push: + branches: + - master + pull_request: + schedule: + ## Every Monday at 2am. + - cron: 0 2 * * 1 + +env: + CI_ERLANG_MK: 1 + +jobs: + cleanup-master: + name: Cleanup master build + runs-on: ubuntu-latest + steps: + + - name: Cleanup master build if necessary + if: ${{ github.event_name == 'schedule' }} + run: | + gh extension install actions/gh-actions-cache + gh actions-cache delete Linux-X64-Erlang-master -R $REPO --confirm || true + gh actions-cache delete macOS-X64-Erlang-master -R $REPO --confirm || true + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + REPO: ${{ github.repository }} + + check: + name: Gun + needs: cleanup-master + uses: ninenines/ci.erlang.mk/.github/workflows/ci.yaml@master diff --git a/Makefile b/Makefile index 1609a37..a6e7f1b 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,12 @@ CT_OPTS += -ct_hooks gun_ct_hook [] # -boot start_sasl LOCAL_DEPS = public_key ssl DEPS = cowlib -dep_cowlib = git https://github.com/ninenines/cowlib 2.13.0 +dep_cowlib = git https://github.com/ninenines/cowlib master + +ifeq ($(GUN_QUICER),1) +DEPS += quicer +dep_quicer = git https://github.com/emqx/quic main +endif DOC_DEPS = asciideck @@ -29,10 +34,8 @@ dep_ranch_commit = 2.0.0 dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master DEP_EARLY_PLUGINS = ci.erlang.mk -AUTO_CI_OTP ?= OTP-22+ -#AUTO_CI_HIPE ?= OTP-LATEST -# AUTO_CI_ERLLVM ?= OTP-LATEST -AUTO_CI_WINDOWS ?= OTP-22+ +AUTO_CI_OTP ?= OTP-LATEST-24+ +AUTO_CI_WINDOWS ?= OTP-LATEST-24+ # Hex configuration. @@ -58,14 +61,24 @@ ifndef FULL CT_SUITES := $(filter-out ws_autobahn,$(CT_SUITES)) endif -# Enable eunit. +# Compile options. TEST_ERLC_OPTS += +'{parse_transform, eunit_autoexport}' +ifeq ($(GUN_QUICER),1) +ERLC_OPTS += -D GUN_QUICER=1 +TEST_ERLC_OPTS += -D GUN_QUICER=1 +endif + # Generate rebar.config on build. app:: rebar.config +# Fix quicer compilation for HTTP/3. + +autopatch-quicer:: + $(verbose) printf "%s\n" "all: ;" > $(DEPS_DIR)/quicer/c_src/Makefile.erlang.mk + # h2specd setup. GOPATH := $(ERLANG_MK_TMP)/gopath diff --git a/ebin/gun.app b/ebin/gun.app index e156075..39c1e84 100644 --- a/ebin/gun.app +++ b/ebin/gun.app @@ -1,7 +1,7 @@ {application, 'gun', [ {description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."}, {vsn, "2.1.0"}, - {modules, ['gun','gun_app','gun_conns_sup','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_pool','gun_pool_events_h','gun_pools_sup','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h','gun_ws_protocol']}, + {modules, ['gun','gun_app','gun_conns_sup','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_http3','gun_pool','gun_pool_events_h','gun_pools_sup','gun_protocols','gun_public_suffix','gun_quicer','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h','gun_ws_protocol']}, {registered, [gun_sup]}, {applications, [kernel,stdlib,public_key,ssl,cowlib]}, {optional_applications, []}, diff --git a/erlang.mk b/erlang.mk index 518a1d2..6c58ea8 100644 --- a/erlang.mk +++ b/erlang.mk @@ -17,7 +17,7 @@ ERLANG_MK_FILENAME := $(realpath $(lastword $(MAKEFILE_LIST))) export ERLANG_MK_FILENAME -ERLANG_MK_VERSION = 61f58ff +ERLANG_MK_VERSION = 16d60fa ERLANG_MK_WITHOUT = # Make 3.81 and 3.82 are deprecated. @@ -3565,7 +3565,7 @@ REBAR_DEPS_DIR = $(DEPS_DIR) export REBAR_DEPS_DIR REBAR3_GIT ?= https://github.com/erlang/rebar3 -REBAR3_COMMIT ?= 3f563feaf1091a1980241adefa83a32dd2eebf7c # 3.20.0 +REBAR3_COMMIT ?= 06aaecd51b0ce828b66bb65a74d3c1fd7833a4ba # 3.22.1 + OTP-27 fixes CACHE_DEPS ?= 0 diff --git a/rebar.config b/rebar.config index e242d75..348f53e 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.13.0"}} +{cowlib,".*",{git,"https://github.com/ninenines/cowlib","master"}} ]}. {erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard]}. diff --git a/src/gun.erl b/src/gun.erl index b4c1686..1f72555 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -124,8 +124,9 @@ | {close, ws_close_code(), iodata()}. -export_type([ws_frame/0]). --type protocol() :: http | http2 | raw | socks - | {http, http_opts()} | {http2, http2_opts()} | {raw, raw_opts()} | {socks, socks_opts()}. +-type protocol() :: http | http2 | http3 | raw | socks + | {http, http_opts()} | {http2, http2_opts()} | {http3, http3_opts()} + | {raw, raw_opts()} | {socks, socks_opts()}. -export_type([protocol/0]). -type protocols() :: [protocol()]. @@ -141,6 +142,7 @@ event_handler => {module(), any()}, http_opts => http_opts(), http2_opts => http2_opts(), + http3_opts => http3_opts(), protocols => protocols(), raw_opts => raw_opts(), retry => non_neg_integer(), @@ -153,7 +155,7 @@ tls_handshake_timeout => timeout(), tls_opts => [ssl:tls_client_option()], trace => boolean(), - transport => tcp | tls | ssl, + transport => tcp | tls | ssl | quic, ws_opts => ws_opts() }. -export_type([opts/0]). @@ -252,6 +254,11 @@ }. -export_type([http2_opts/0]). +%% @todo +-type http3_opts() :: #{ +}. +-export_type([http3_opts/0]). + -type socks_opts() :: #{ version => 5, auth => [{username_password, binary(), binary()} | none], @@ -391,6 +398,11 @@ check_options([{http2_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> Error -> Error end; +check_options([{http3_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> + case gun_http3:check_options(ProtoOpts) of + ok -> + check_options(Opts) + end; check_options([Opt = {protocols, L}|Opts]) when is_list(L) -> case check_protocols_opt(L) of ok -> check_options(Opts); @@ -428,7 +440,7 @@ check_options([{tls_opts, L}|Opts]) when is_list(L) -> check_options(Opts); check_options([{trace, B}|Opts]) when is_boolean(B) -> check_options(Opts); -check_options([{transport, T}|Opts]) when T =:= tcp; T =:= tls -> +check_options([{transport, T}|Opts]) when T =:= tcp; T =:= tls; T =:= quic -> check_options(Opts); check_options([{ws_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> case gun_ws:check_options(ProtoOpts) of @@ -442,9 +454,9 @@ check_options([Opt|_]) -> check_protocols_opt(Protocols) -> %% Protocols must not appear more than once, and they - %% must be one of http, http2 or socks. + %% must be one of http, http2, http3, raw or socks. ProtoNames0 = lists:usort([case P0 of {P, _} -> P; P -> P end || P0 <- Protocols]), - ProtoNames = [P || P <- ProtoNames0, lists:member(P, [http, http2, raw, socks])], + ProtoNames = [P || P <- ProtoNames0, lists:member(P, [http, http2, http3, raw, socks])], case length(Protocols) =:= length(ProtoNames) of false -> error; true -> @@ -453,6 +465,7 @@ check_protocols_opt(Protocols) -> TupleCheck = [case P of {http, Opts} -> gun_http:check_options(Opts); {http2, Opts} -> gun_http2:check_options(Opts); + {http3, Opts} -> gun_http3:check_options(Opts); {raw, Opts} -> gun_raw:check_options(Opts); {socks, Opts} -> gun_socks:check_options(Opts) end || P <- Protocols, is_tuple(P)], @@ -468,6 +481,7 @@ consider_tracing(ServerPid, #{trace := true}) -> _ = dbg:tpl(gun, [{'_', [], [{return_trace}]}]), _ = dbg:tpl(gun_http, [{'_', [], [{return_trace}]}]), _ = dbg:tpl(gun_http2, [{'_', [], [{return_trace}]}]), + _ = dbg:tpl(gun_http3, [{'_', [], [{return_trace}]}]), _ = dbg:tpl(gun_raw, [{'_', [], [{return_trace}]}]), _ = dbg:tpl(gun_socks, [{'_', [], [{return_trace}]}]), _ = dbg:tpl(gun_ws, [{'_', [], [{return_trace}]}]), @@ -495,6 +509,7 @@ info(ServerPid) -> Info0 = #{ owner => Owner, socket => Socket, + %% @todo This is no longer correct for https because of QUIC. transport => case OriginScheme of <<"http">> -> tcp; <<"https">> -> tls @@ -818,7 +833,7 @@ await_body(ServerPid, StreamRef, Timeout, MRef, Acc) -> end. -spec await_up(pid()) - -> {ok, http | http2 | raw | socks} + -> {ok, http | http2 | http3 | raw | socks} | {error, {down, any()} | timeout}. await_up(ServerPid) -> MRef = monitor(process, ServerPid), @@ -827,7 +842,7 @@ await_up(ServerPid) -> Res. -spec await_up(pid(), reference() | timeout()) - -> {ok, http | http2 | raw | socks} + -> {ok, http | http2 | http3 | raw | socks} | {error, {down, any()} | timeout}. await_up(ServerPid, MRef) when is_reference(MRef) -> await_up(ServerPid, 5000, MRef); @@ -838,7 +853,7 @@ await_up(ServerPid, Timeout) -> Res. -spec await_up(pid(), timeout(), reference()) - -> {ok, http | http2 | raw | socks} + -> {ok, http | http2 | http3 | raw | socks} | {error, {down, any()} | timeout}. await_up(ServerPid, Timeout, MRef) -> receive @@ -974,7 +989,8 @@ init({Owner, Host, Port, Opts}) -> %% This is corrected in the gun:info/1 and gun:stream_info/2 functions where applicable. {OriginScheme, Transport} = case OriginTransport of tcp -> {<<"http">>, gun_tcp}; - tls -> {<<"https">>, gun_tls} + tls -> {<<"https">>, gun_tls}; + quic -> {<<"https">>, gun_quicer} end, OwnerRef = monitor(process, Owner), {EvHandler, EvHandlerState0} = maps:get(event_handler, Opts, @@ -1061,6 +1077,38 @@ domain_lookup({call, From}, {stream_info, _}, _) -> domain_lookup(Type, Event, State) -> handle_common(Type, Event, ?FUNCTION_NAME, State). +connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, + transport=gun_quicer, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + %% @todo We are doing the TLS handshake at the same time, + %% we cannot separate it from the connection. Fire events. + ConnectTimeout = maps:get(connect_timeout, Opts, infinity), + ConnectEvent = #{ + lookup_info => LookupInfo, + timeout => ConnectTimeout + }, + EvHandlerState1 = EvHandler:connect_start(ConnectEvent, EvHandlerState0), + case gun_quicer:connect(LookupInfo, ConnectTimeout) of + {ok, Socket} -> + %% @todo We should double check the ALPN result. + [Protocol] = maps:get(protocols, Opts, [http3]), + ProtocolName = case Protocol of + {P, _} -> P; + P -> P + end, + EvHandlerState = EvHandler:connect_end(ConnectEvent#{ + socket => Socket, + protocol => ProtocolName + }, EvHandlerState1), + {next_state, connected_protocol_init, + State#state{event_handler_state=EvHandlerState}, + {next_event, internal, {connected, Retries, Socket, Protocol}}}; + {error, Reason} -> + EvHandlerState = EvHandler:connect_end(ConnectEvent#{ + error => Reason + }, EvHandlerState1), + {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, + {next_event, internal, {retries, Retries, Reason}}} + end; connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, transport=Transport, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> ConnectTimeout = maps:get(connect_timeout, Opts, infinity), @@ -1100,6 +1148,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, initial_tls_handshake(_, {retries, Retries, Socket}, State0=#state{opts=Opts, origin_host=OriginHost}) -> Protocols = maps:get(protocols, Opts, [http2, http]), HandshakeEvent = #{ + %% @todo This results in ensure_tls_opts being called twice. tls_opts => ensure_tls_opts(Protocols, maps:get(tls_opts, Opts, []), OriginHost), timeout => maps:get(tls_handshake_timeout, Opts, infinity) }, @@ -1453,13 +1502,22 @@ handle_common_connected(Type, Event, StateName, StateData) -> handle_common_connected_no_input(Type, Event, StateName, StateData). %% Socket events. +handle_common_connected_no_input(info, Msg, _, State=#state{ + protocol=Protocol=gun_http3, protocol_state=ProtoState, cookie_store=CookieStore0, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) + when element(1, Msg) =:= quic -> +% ct:pal("~p", [Msg]), + {Commands, CookieStore, EvHandlerState} = Protocol:handle(Msg, + ProtoState, CookieStore0, EvHandler, EvHandlerState0), + maybe_active(commands(Commands, State#state{cookie_store=CookieStore, + event_handler_state=EvHandlerState})); handle_common_connected_no_input(info, {OK, Socket, Data}, _, - State0=#state{socket=Socket, messages={OK, _, _}, + State=#state{socket=Socket, messages={OK, _, _}, protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, CookieStore, EvHandlerState} = Protocol:handle(Data, ProtoState, CookieStore0, EvHandler, EvHandlerState0), - maybe_active(commands(Commands, State0#state{cookie_store=CookieStore, + maybe_active(commands(Commands, State#state{cookie_store=CookieStore, event_handler_state=EvHandlerState})); handle_common_connected_no_input(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) -> @@ -1575,6 +1633,8 @@ maybe_active(Other) -> active(State=#state{active=false}) -> {ok, State}; +active(State=#state{transport=gun_quicer}) -> + {ok, State}; active(State=#state{socket=Socket, transport=Transport}) -> case Transport:setopts(Socket, [{active, once}]) of ok -> diff --git a/src/gun_http.erl b/src/gun_http.erl index 58f4ed6..14edfeb 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -69,7 +69,7 @@ socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:http_opts(), - version = 'HTTP/1.1' :: cow_http:version(), + version = 'HTTP/1.1' :: cow_http1:version(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), diff --git a/src/gun_http2.erl b/src/gun_http2.erl index bfd2d31..a1ccef6 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -110,7 +110,7 @@ %% by the client or by the server through PUSH_PROMISE frames. %% %% Streams can be found by ID or by Ref. The most common should be - %% the idea, that's why the main map has the ID as key. Then we also + %% the ID, that's why the main map has the ID as key. Then we also %% have a Ref->ID index for faster lookup when we only have the Ref. streams = #{} :: #{cow_http2:streamid() => #stream{}}, stream_refs = #{} :: #{reference() => cow_http2:streamid()}, @@ -1074,6 +1074,8 @@ prepare_headers(State=#http2_state{transport=Transport}, end, %% @todo We also must remove any header found in the connection header. %% @todo Much of this is duplicated in cow_http2_machine; sort things out. + %% I think we want to do this before triggering events, not when + %% building HeaderBlock. Headers1 = lists:keydelete(<<"host">>, 1, lists:keydelete(<<"connection">>, 1, diff --git a/src/gun_http3.erl b/src/gun_http3.erl new file mode 100644 index 0000000..9994186 --- /dev/null +++ b/src/gun_http3.erl @@ -0,0 +1,744 @@ +%% Copyright (c) 2023, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @todo Tunneling has not been implemented for HTTP/3. +-module(gun_http3). + +-export([check_options/1]). +-export([name/0]). +-export([opts_name/0]). +-export([has_keepalive/0]). +-export([default_keepalive/0]). +-export([init/4]). +-export([switch_transport/3]). +-export([handle/5]). +-export([handle_continue/6]). +-export([update_flow/4]). +-export([closing/4]). +-export([close/4]). +-export([keepalive/3]). +-export([headers/12]). +-export([request/13]). +-export([data/7]). +-export([connect/9]). +-export([cancel/5]). +-export([timeout/3]). +-export([stream_info/2]). +-export([down/1]). +-export([ws_upgrade/11]). +-export([ws_send/6]). + +-record(stream, { + %% Stream ID. + id :: non_neg_integer(), + + %% Reference used by the user of Gun to refer to this stream. + %% This may be only a part of a stream_ref() for tunneled streams. + %% For unidirectional streams this is a dummy that is never sent to the user. + ref :: reference(), + + %% Process to send messages to. + reply_to :: undefined | pid(), + + %% Whether the stream is currently in a special state. + status :: header | {unidi, control | encoder | decoder} + | normal | {data, non_neg_integer()} | discard, + + %% Stream buffer. + buffer = <<>> :: binary(), + + %% Request target URI (request stream only). + authority :: undefined | iodata(), + path :: undefined | iodata(), + + %% Content handlers state. + handler_state :: undefined | gun_content_handler:state() +}). + +-record(http3_state, { + reply_to :: pid(), + conn :: gun_quicer:quicer_connection_handle(), + transport :: module(), + opts = #{} :: gun:http2_opts(), + content_handlers :: gun_content_handler:opt(), + + %% HTTP/3 state machine. + http3_machine :: cow_http3_machine:http3_machine(), + + %% Specially handled local unidi streams. + %% @todo Maybe move the control stream to streams map. + local_control_id = undefined :: undefined | cow_http3:stream_id(), + local_encoder_id = undefined :: undefined | cow_http3:stream_id(), + local_decoder_id = undefined :: undefined | cow_http3:stream_id(), + + %% Bidirectional streams used for requests and responses, + %% as well as unidirectional streams initiated by the server. + %% + %% Streams can be found by Ref or by StreamID. The most + %% common should be the StreamID so we use it as key. We also have + %% a Ref->StreamID index for faster lookup when we only have the Ref. + streams = #{} :: #{reference() => #stream{}}, + stream_refs = #{} :: #{reference() => reference()} +}). + +check_options(_) -> + ok. %% @todo + +name() -> http3. +opts_name() -> http3_opts. +has_keepalive() -> true. +default_keepalive() -> infinity. + +init(ReplyTo, Conn, Transport, Opts) -> + Handlers = maps:get(content_handlers, Opts, [gun_data_h]), + {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(client, Opts), + {ok, ControlID} = Transport:start_unidi_stream(Conn, [<<0>>, SettingsBin]), + {ok, EncoderID} = Transport:start_unidi_stream(Conn, [<<2>>]), + {ok, DecoderID} = Transport:start_unidi_stream(Conn, [<<3>>]), + %% Set the control, encoder and decoder streams in the machine. + HTTP3Machine = cow_http3_machine:init_unidi_local_streams( + ControlID, EncoderID, DecoderID, HTTP3Machine0), + {ok, connected, #http3_state{reply_to=ReplyTo, conn=Conn, transport=Transport, + opts=Opts, content_handlers=Handlers, http3_machine=HTTP3Machine, + local_control_id=ControlID, local_encoder_id=EncoderID, + local_decoder_id=DecoderID}}. + +-spec switch_transport(_, _, _) -> no_return(). + +switch_transport(_Transport, _Conn, _State) -> + error(unimplemented). + +handle(Msg, State0=#http3_state{transport=Transport}, + CookieStore, EvHandler, EvHandlerState) -> + case Transport:handle(Msg) of + {data, StreamID, IsFin, Data} -> + parse(Data, State0, StreamID, IsFin, + CookieStore, EvHandler, EvHandlerState); + {stream_started, StreamID, StreamType} -> + State = stream_new_remote(State0, StreamID, StreamType), + {{state, State}, CookieStore, EvHandlerState}; + {stream_closed, _StreamID, _ErrorCode} -> + %% @todo Clean up the stream. + {{state, State0}, CookieStore, EvHandlerState}; + {stream_peer_send_aborted, StreamID, ErrorCode} -> + Reason = cow_http3:code_to_error(ErrorCode), + {StateOrError, EvHandlerStateRet} = stream_aborted( + State0, StreamID, Reason, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet}; + closed -> + %% @todo Terminate the connection. + {{state, State0}, CookieStore, EvHandlerState}; + ok -> + {{state, State0}, CookieStore, EvHandlerState}; + unknown -> + %% @todo Log a warning. + {{state, State0}, CookieStore, EvHandlerState} + end. + +parse(Data, State, StreamID, IsFin, CookieStore, EvHandler, EvHandlerState) -> + case stream_get(State, StreamID) of + Stream=#stream{buffer= <<>>} -> + parse1(Data, State, Stream, IsFin, + CookieStore, EvHandler, EvHandlerState); + Stream=#stream{buffer=Buffer} -> + %% @todo OK we should only keep the StreamRef forward + %% and update the stream in the state here. + Stream1 = Stream#stream{buffer= <<>>}, + parse1(<>, + stream_update(State, Stream1), Stream1, IsFin, + CookieStore, EvHandler, EvHandlerState); + %% Pending data for a stream that has been reset. Ignore. + error -> + {{state, State}, CookieStore, EvHandlerState} + end. + +parse1(Data, State, Stream=#stream{status=header}, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + parse_unidirectional_stream_header(Data, State, Stream, IsFin, + CookieStore, EvHandler, EvHandlerState); +parse1(Data, State0=#http3_state{http3_machine=HTTP3Machine0}, + #stream{status={unidi, Type}, id=StreamID}, IsFin, + CookieStore, _EvHandler, EvHandlerState) + when Type =:= encoder; Type =:= decoder -> + case cow_http3_machine:unidi_data(Data, IsFin, StreamID, HTTP3Machine0) of + {ok, Instrs, HTTP3Machine} -> + State = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs), + {{state, State}, CookieStore, EvHandlerState}; + {error, Error={connection_error, _, _}, HTTP3Machine} -> + {connection_error(State0#http3_state{http3_machine=HTTP3Machine}, Error), + CookieStore, EvHandlerState} + end; +parse1(Data, State, Stream=#stream{status={data, Len}, id=StreamID}, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + DataLen = byte_size(Data), + if + DataLen < Len -> + %% We don't have the full frame but this is the end of the + %% data we have. So FrameIsFin is equivalent to IsFin here. + frame(State, Stream#stream{status={data, Len - DataLen}}, + {data, Data}, IsFin, CookieStore, EvHandler, EvHandlerState); + true -> + <> = Data, + FrameIsFin = is_fin(IsFin, Rest), + case frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin, + CookieStore, EvHandler, EvHandlerState) of + %% @todo {error, _}. + {{state, State1}, CookieStore1, EvHandlerState1} -> + parse(Rest, State1, StreamID, IsFin, + CookieStore1, EvHandler, EvHandlerState1) + end + end; +%% @todo Clause that discards receiving data for aborted streams. +parse1(Data, State, Stream=#stream{id=StreamID}, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + case cow_http3:parse(Data) of + {ok, Frame, Rest} -> + FrameIsFin = is_fin(IsFin, Rest), + case frame(State, Stream, Frame, FrameIsFin, + CookieStore, EvHandler, EvHandlerState) of + %% @todo {error, _}. + {{state, State1}, CookieStore1, EvHandlerState1} -> + parse(Rest, State1, StreamID, IsFin, + CookieStore1, EvHandler, EvHandlerState1) + end; + {more, Frame = {data, _}, Len} -> + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin -> + frame(State, Stream#stream{status={data, Len}}, Frame, nofin, + CookieStore, EvHandler, EvHandlerState); + fin -> + {connection_error(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}), + CookieStore, EvHandlerState} + end; + %% @todo {more, ignore, Len} + {ignore, Rest} -> + case ignored_frame(State, Stream) of + {state, State1} -> + parse(Rest, State1, StreamID, IsFin, + CookieStore, EvHandler, EvHandlerState) + end; + Error = {connection_error, _, _} -> + {connection_error(State, Error), CookieStore, EvHandlerState}; + more when Data =:= <<>> -> + {{state, stream_update(State, Stream#stream{buffer=Data})}, + CookieStore, EvHandlerState}; + more -> + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin -> + {{state, stream_update(State, Stream#stream{buffer=Data})}, + CookieStore, EvHandlerState}; + fin -> + {connection_error(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}), + CookieStore, EvHandlerState} + end + end. + +%% We may receive multiple frames in a single QUIC packet. +%% The FIN flag applies to the QUIC packet, not to the frame. +%% We must therefore only consider the frame to have a FIN +%% flag if there's no data remaining to be read. +is_fin(fin, <<>>) -> fin; +is_fin(_, _) -> nofin. + +parse_unidirectional_stream_header(Data, State0=#http3_state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, IsFin, CookieStore, EvHandler, EvHandlerState) -> + case cow_http3:parse_unidi_stream_header(Data) of + {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder -> + case cow_http3_machine:set_unidi_remote_stream_type( + StreamID, Type, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#http3_state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={unidi, Type}}, + parse(Rest, stream_update(State, Stream), StreamID, IsFin, + CookieStore, EvHandler, EvHandlerState); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + {connection_error(State0#http3_state{http3_machine=HTTP3Machine}, Error), + CookieStore, EvHandlerState} + end; +%% @todo Implement server push. +%% Note that we shouldn't receive this frame until we set MAX_PUSH_ID. +% {ok, push, _} -> +% {connection_error(State0, {connection_error, h3_stream_creation_error, +% 'Only servers can push. (RFC9114 6.2.2)'}), +% CookieStore, EvHandlerState}; + %% Unknown stream types must be ignored. We choose to abort the + %% stream instead of reading and discarding the incoming data. + {undefined, _} -> + {{state, (stream_abort_receive(State0, Stream0, h3_stream_creation_error))}, + CookieStore, EvHandlerState} + end. + +%% @todo Cookie/events. +frame(State=#http3_state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, Frame, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + {{state, State#http3_state{http3_machine=HTTP3Machine}}, + CookieStore, EvHandlerState}; + {ok, {data, Data}, HTTP3Machine} -> + data_frame(State#http3_state{http3_machine=HTTP3Machine}, Stream, IsFin, Data, + CookieStore, EvHandler, EvHandlerState); + {ok, {headers, Headers, PseudoHeaders, BodyLen}, Instrs, HTTP3Machine} -> + headers_frame( + send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs), + Stream, IsFin, Headers, PseudoHeaders, BodyLen, + CookieStore, EvHandler, EvHandlerState); + {ok, {trailers, Trailers}, Instrs, HTTP3Machine} -> + {StateOrError, EvHandlerStateRet} = trailers_frame( + send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs), + Stream, Trailers, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet}; + {ok, GoAway={goaway, _}, HTTP3Machine} -> + goaway(State#http3_state{http3_machine=HTTP3Machine}, GoAway); + {error, Error={stream_error, _Reason, _Human}, Instrs, HTTP3Machine} -> + State1 = send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs), + reset_stream(State1, StreamID, Error); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + {connection_error(State#http3_state{http3_machine=HTTP3Machine}, Error), + CookieStore, EvHandlerState} + end. + +data_frame(State0, Stream, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> + case Stream of + #stream{} -> %tunnel=undefined} -> + {StateOrError, EvHandlerState} = data_frame1(State0, + Stream, IsFin, Data, EvHandler, EvHandlerState0), + {StateOrError, CookieStore0, EvHandlerState}%; +% Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> +%% %% @todo What about IsFin? +% {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, +% ProtoState0, CookieStore0, EvHandler, EvHandlerState0), +% %% The frame/parse functions only handle state or error commands. +% {ResCommands, EvHandlerState} = tunnel_commands(Commands, +% Stream, State0, EvHandler, EvHandlerState1), +% {ResCommands, CookieStore, EvHandlerState} + end. + +data_frame1(State0, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, + %flow=Flow0, + handler_state=Handlers0}, IsFin, Data, EvHandler, EvHandlerState0) -> + {ok, _Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), +% Flow = case Flow0 of +% infinity -> infinity; +% _ -> Flow0 - Dec +% end, + State1 = stream_update(State0, Stream#stream{%flow=Flow, + handler_state=Handlers}), + {StateOrError, EvHandlerState} = case IsFin of + fin -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, %% @todo stream_ref(State1, StreamRef), + reply_to => ReplyTo + }, EvHandlerState0), + {{state, State1}, EvHandlerState1}; + nofin -> + {{state, State1}, EvHandlerState0} + end, + case StateOrError of + {state, State} -> + %% We do not remove the stream immediately. We will only when + %% the QUIC stream gets closed. + {{state, State}, EvHandlerState}%; +% Error={error, _} -> +% %% @todo Delete stream and return new state and error commands. +% {Error, EvHandlerState} + end. + +headers_frame(State0=#http3_state{opts=Opts}, Stream, IsFin, Headers, + #{status := Status}, _BodyLen, CookieStore0, EvHandler, EvHandlerState0) -> + #stream{ + authority=Authority, + path=Path%, +% tunnel=Tunnel + } = Stream, + CookieStore = gun_cookies:set_cookie_header(<<"https">>, + Authority, Path, Status, Headers, CookieStore0, Opts), + {StateOrError, EvHandlerState} = if + Status >= 100, Status =< 199 -> + headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); +% Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin -> +% headers_frame_connect(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); + true -> + headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0) + end, + {StateOrError, CookieStore, EvHandlerState}. + +headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, + Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), + ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {{state, State}, EvHandlerState}. + +headers_frame_response(State=#http3_state{content_handlers=Handlers0}, + Stream=#stream{ref=StreamRef, reply_to=ReplyTo}, + IsFin, Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {Handlers, EvHandlerState} = case IsFin of + fin -> + EvHandlerState2 = EvHandler:response_end(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, EvHandlerState1), + {undefined, EvHandlerState2}; + nofin -> + {gun_content_handler:init(ReplyTo, RealStreamRef, + Status, Headers, Handlers0), EvHandlerState1} + end, + %% @todo Uncomment when tunnel is added. + %% We disable the tunnel, if any, when receiving any non 2xx response. + %% + %% We do not remove the stream immediately. We will only when + %% the QUIC stream gets closed. + {{state, stream_update(State, + Stream#stream{handler_state=Handlers})},%, tunnel=undefined})}, + EvHandlerState}. + +trailers_frame(State, #stream{ref=StreamRef, reply_to=ReplyTo}, + Trailers, EvHandler, EvHandlerState0) -> + %% @todo We probably want to pass this to gun_content_handler? + RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, + ResponseEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), + EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), + %% We do not remove the stream immediately. We will only when + %% the QUIC stream gets closed. + {{state, State}, EvHandlerState}. + +-spec goaway(_, _) -> no_return(). + +goaway(_State, _GoAway) -> + error(todo). + +-spec reset_stream(_, _, _) -> no_return(). + +reset_stream(_State, _StreamID, _Error) -> + error(todo). + +-spec ignored_frame(_, _) -> no_return(). + +%% @todo Cookie/events. +ignored_frame(_State, _Stream) -> + error(todo). + +stream_abort_receive(State=#http3_state{conn=Conn, transport=Transport}, + Stream=#stream{id=StreamID}, Reason) -> + Transport:shutdown_stream(Conn, StreamID, receiving, cow_http3:error_to_code(Reason)), + stream_update(State, Stream#stream{status=discard}). + +-spec connection_error(_, _) -> no_return(). + +connection_error(_State, _Error) -> + error(todo). + +-spec handle_continue(_, _, _, _, _, _) -> no_return(). + +handle_continue(_ContinueStreamRef, _Msg, _State, _CookieStore, _EvHandler, _EvHandlerState) -> + error(unimplemented). + +-spec update_flow(_, _, _, _) -> no_return(). + +update_flow(_State, _ReplyTo, _StreamRef, _Inc) -> + error(unimplemented). + +closing(_Reason, _State, _, EvHandlerState) -> + %% @todo Implement graceful shutdown. + {[], EvHandlerState}. + +close(_Reason, _State, _, EvHandlerState) -> + %% @todo Implement. + EvHandlerState. + +-spec keepalive(_, _, _) -> no_return(). + +keepalive(_State, _, _EvHandlerState) -> + error(todo). + +headers(State0=#http3_state{conn=Conn, transport=Transport, + http3_machine=HTTP3Machine0}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers0, _InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> + {ok, StreamID} = Transport:start_bidi_stream(Conn), + HTTP3Machine1 = cow_http3_machine:init_bidi_stream(StreamID, + iolist_to_binary(Method), HTTP3Machine0), + {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers( + Method, Host, Port, Path, Headers0, CookieStore0), + Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = StreamRef, %% @todo stream_ref(State0, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => Authority, + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + {ok, nofin, HeaderBlock, Instrs, HTTP3Machine} = cow_http3_machine:prepare_headers( + StreamID, HTTP3Machine1, nofin, PseudoHeaders, Headers), + State1 = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs), + ok = Transport:send(Conn, StreamID, cow_http3:headers(HeaderBlock)), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + %% @todo InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + status=normal, authority=Authority, path=Path}, + State = stream_new_local(State1, Stream), + {{state, State}, CookieStore, EvHandlerState}. + +%% @todo We need to configure the initial flow control for the stream. +request(State0=#http3_state{conn=Conn, transport=Transport, + http3_machine=HTTP3Machine0}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers0, Body, _InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> + Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, + {<<"content-length">>, integer_to_binary(iolist_size(Body))}), + %% @todo InitialFlow = initial_flow(InitialFlow0, Opts), + {ok, StreamID} = Transport:start_bidi_stream(Conn), + HTTP3Machine1 = cow_http3_machine:init_bidi_stream(StreamID, + iolist_to_binary(Method), HTTP3Machine0), + {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers( + Method, Host, Port, Path, Headers1, CookieStore0), + Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = StreamRef, %% @todo stream_ref(State0, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => Authority, + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + {ok, fin, HeaderBlock, Instrs, HTTP3Machine} = cow_http3_machine:prepare_headers( + StreamID, HTTP3Machine1, fin, PseudoHeaders, Headers), + State1 = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs), + %% @todo Handle send errors. + ok = Transport:send(Conn, StreamID, [ + cow_http3:headers(HeaderBlock), + %% Only send a DATA frame if we have a body. + case iolist_size(Body) of + 0 -> <<>>; + _ -> cow_http3:data(Body) + end + ], fin), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + status=normal, authority=Authority, path=Path}, + State = stream_new_local(State1, Stream), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}. + +prepare_headers(Method, Host0, Port, Path, Headers0, CookieStore0) -> + Scheme = <<"https">>, + Authority = case lists:keyfind(<<"host">>, 1, Headers0) of + {_, Host} -> Host; + _ -> gun_http:host_header(tls, Host0, Port) + end, + %% @todo We also must remove any header found in the connection header. + %% @todo Much of this is duplicated in cow_http2, cow_http2_machine + %% and cow_http3_machine; sort things out. + Headers1 = + lists:keydelete(<<"host">>, 1, + lists:keydelete(<<"connection">>, 1, + lists:keydelete(<<"keep-alive">>, 1, + lists:keydelete(<<"proxy-connection">>, 1, + lists:keydelete(<<"transfer-encoding">>, 1, + lists:keydelete(<<"upgrade">>, 1, Headers0)))))), + {Headers, CookieStore} = gun_cookies:add_cookie_header( + Scheme, Authority, Path, Headers1, CookieStore0), + PseudoHeaders = #{ + method => Method, + scheme => Scheme, + authority => Authority, + path => Path + }, + {ok, PseudoHeaders, Headers, CookieStore}. + +%% @todo We would open unidi streams here if we only open on-demand. +%% No instructions. +send_instructions(State, undefined) -> + State; +%% Decoder instructions. +send_instructions(State=#http3_state{conn=Conn, transport=Transport, + local_decoder_id=DecoderID}, {decoder_instructions, DecData}) -> + %% @todo Handle send errors. + ok = Transport:send(Conn, DecoderID, DecData), + State; +%% Encoder instructions. +send_instructions(State=#http3_state{conn=Conn, transport=Transport, + local_encoder_id=EncoderID}, {encoder_instructions, EncData}) -> + %% @todo Handle send errors. + ok = Transport:send(Conn, EncoderID, EncData), + State. + +data(State=#http3_state{conn=Conn, transport=Transport}, StreamRef, _ReplyTo, IsFin, Data, + _EvHandler, EvHandlerState) when is_reference(StreamRef) -> + case stream_get_by_ref(State, StreamRef) of + #stream{id=StreamID} -> %, tunnel=Tunnel} -> + ok = Transport:send(Conn, StreamID, cow_http3:data(Data), IsFin), + {[], EvHandlerState} %% @todo Probably wrong, need to update/keep states? +%% @todo +% case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of +% {ok, fin, _} -> +% error_stream_closed(State, StreamRef, ReplyTo), +% {[], EvHandlerState}; +% {ok, _, fin} -> +% error_stream_closed(State, StreamRef, ReplyTo), +% {[], EvHandlerState}; +% {ok, _, _} when Tunnel =:= undefined -> +% maybe_send_data(State, +% StreamID, IsFin, Data, EvHandler, EvHandlerState); +% {ok, _, _} -> +% #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel, +% {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, +% ReplyTo, IsFin, Data, EvHandler, EvHandlerState), +% tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1) +% end%; +%% @todo +% error -> +% error_stream_not_found(State, StreamRef, ReplyTo), +% {[], EvHandlerState} + end. + +-spec connect(_, _, _, _, _, _, _, _, _) -> no_return(). + +connect(_State, StreamRef, _ReplyTo, _Destination, _TunnelInfo, _Headers0, + _InitialFlow0, _EvHandler, _EvHandlerState0) when is_reference(StreamRef) -> + error(unimplemented). + +-spec cancel(_, _, _, _, _) -> no_return(). + +cancel(_State, StreamRef, _ReplyTo, _EvHandler, _EvHandlerState0) + when is_reference(StreamRef) -> + error(unimplemented). + +-spec timeout(_, _, _) -> no_return(). + +timeout(_State, _Timeout, _TRef) -> + error(todo). + +-spec stream_info(_, _) -> no_return(). + +stream_info(_State, StreamRef) when is_reference(StreamRef) -> + error(unimplemented). + +-spec down(_) -> no_return(). + +down(_State) -> + error(todo). + +-spec ws_upgrade(_, _, _, _, _, _, _, _, _, _, _) -> no_return(). + +ws_upgrade(_State, StreamRef, _ReplyTo, + _Host, _Port, _Path, _Headers0, _WsOpts, + _CookieStore0, _EvHandler, _EvHandlerState0) + when is_reference(StreamRef) -> + error(todo). + +-spec ws_send(_, _, _, _, _, _) -> no_return(). + +ws_send(_Frames, _State, _RealStreamRef, _ReplyTo, _EvHandler, _EvHandlerState0) -> + error(todo). + +%% Streams. + +stream_get(#http3_state{streams=Streams}, StreamID) -> + maps:get(StreamID, Streams, error). + +stream_get_by_ref(State=#http3_state{stream_refs=Refs}, StreamRef) -> + case maps:get(StreamRef, Refs, error) of + error -> error; + StreamID -> stream_get(State, StreamID) + end. + +stream_new_remote(State=#http3_state{http3_machine=HTTP3Machine0, + streams=Streams, stream_refs=Refs}, StreamID, StreamType) -> + %% All remote streams to the client are expected to be unidirectional. + %% @todo Handle errors instead of crashing. + unidi = StreamType, + HTTP3Machine = cow_http3_machine:init_unidi_stream(StreamID, + unidi_remote, HTTP3Machine0), + StreamRef = make_ref(), + Stream = #stream{id=StreamID, ref=StreamRef, status=header}, + State#http3_state{ + http3_machine=HTTP3Machine, + streams=Streams#{StreamID => Stream}, + stream_refs=Refs#{StreamRef => StreamID} + }. + +stream_new_local(State=#http3_state{streams=Streams, stream_refs=Refs}, + Stream=#stream{id=StreamID, ref=StreamRef}) -> + State#http3_state{ + streams=Streams#{StreamID => Stream}, + stream_refs=Refs#{StreamRef => StreamID} + }. + +stream_update(State=#http3_state{streams=Streams}, + Stream=#stream{id=StreamID}) -> + State#http3_state{ + streams=Streams#{StreamID => Stream} + }. + +stream_aborted(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> + case stream_take(State0, StreamID) of + {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> + ReplyTo ! {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef), + {stream_error, Reason, 'Stream reset by server.'}}, + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, %% @todo stream_ref(State, StreamRef), + reply_to => ReplyTo, + endpoint => remote, + reason => Reason + }, EvHandlerState0), + {{state, State}, EvHandlerState}; + error -> + {{state, State0}, EvHandlerState0} + end. + +stream_take(State=#http3_state{streams=Streams0, stream_refs=Refs}, StreamID) -> + case maps:take(StreamID, Streams0) of + {Stream=#stream{ref=StreamRef}, Streams} -> + {Stream, State#http3_state{ + streams=Streams, + stream_refs=maps:remove(StreamRef, Refs) + }}; + error -> + error + end. diff --git a/src/gun_protocols.erl b/src/gun_protocols.erl index 4232e2f..e7e0a8d 100644 --- a/src/gun_protocols.erl +++ b/src/gun_protocols.erl @@ -34,6 +34,8 @@ handler(http) -> gun_http; handler({http, _}) -> gun_http; handler(http2) -> gun_http2; handler({http2, _}) -> gun_http2; +handler(http3) -> gun_http3; +handler({http3, _}) -> gun_http3; handler(raw) -> gun_raw; handler({raw, _}) -> gun_raw; handler(socks) -> gun_socks; diff --git a/src/gun_quicer.erl b/src/gun_quicer.erl new file mode 100644 index 0000000..f12d135 --- /dev/null +++ b/src/gun_quicer.erl @@ -0,0 +1,283 @@ +%% Copyright (c) 2023, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_quicer). + +-export([name/0]). +-export([messages/0]). +-export([connect/2]). +-export([sockname/1]). +-export([close/1]). + +-export([start_bidi_stream/1]). +-export([start_unidi_stream/2]). +-export([send/3]). +-export([send/4]). +-export([shutdown_stream/4]). +-export([handle/1]). + +%% @todo Make quicer export this type. +-type quicer_connection_handle() :: reference(). +-export_type([quicer_connection_handle/0]). + +-ifndef(GUN_QUICER). + +-spec name() -> no_return(). +name() -> no_quicer(). + +-spec messages() -> no_return(). +messages() -> no_quicer(). + +-spec connect(_, _) -> no_return(). +connect(_, _) -> no_quicer(). + +-spec sockname(_) -> no_return(). +sockname(_) -> no_quicer(). + +-spec close(_) -> no_return(). +close(_) -> no_quicer(). + +-spec start_bidi_stream(_) -> no_return(). +start_bidi_stream(_) -> no_quicer(). + +-spec start_unidi_stream(_, _) -> no_return(). +start_unidi_stream(_, _) -> no_quicer(). + +-spec send(_, _, _) -> no_return(). +send(_, _, _) -> no_quicer(). + +-spec send(_, _, _, _) -> no_return(). +send(_, _, _, _) -> no_quicer(). + +-spec shutdown_stream(_, _, _, _) -> no_return(). +shutdown_stream(_, _, _, _) -> no_quicer(). + +-spec handle(_) -> no_return(). +handle(_) -> no_quicer(). + +-spec no_quicer() -> no_return(). +no_quicer() -> + error({no_quicer, + "Cowboy must be compiled with environment variable COWBOY_QUICER=1 " + "or with compilation flag -D COWBOY_QUICER=1 in order to enable " + "QUIC support using the emqx/quic NIF"}). + +-else. + +%% @todo Make quicer export this type. +-type quicer_app_errno() :: non_neg_integer(). + +-include_lib("quicer/include/quicer.hrl"). + +-spec name() -> quic. + +name() -> quic. + +-spec messages() -> {quic, quic, quic}. + +%% Quicer messages aren't compatible with gen_tcp/ssl. +messages() -> {quic, quic, quic}. + +connect(#{ip_addresses := IPs, port := Port, tcp_opts := _Opts}, Timeout) -> + Timer = inet:start_timer(Timeout), + %% @todo We must not disable security by default. + QuicOpts = #{ + alpn => ["h3"], + peer_unidi_stream_count => 3, + verify => none + }, %% @todo We need quic_opts not tcp_opts. + Res = try + try_connect(IPs, Port, QuicOpts, Timer, {error, einval}) + after + _ = inet:stop_timer(Timer) + end, + case Res of + {ok, Conn} -> {ok, Conn}; + Error -> maybe_exit(Error) + end. + +-dialyzer({nowarn_function, try_connect/5}). + +try_connect([IP|IPs], Port, Opts, Timer, _) -> + Timeout = inet:timeout(Timer), + case quicer:connect(IP, Port, Opts, Timeout) of + {ok, Conn} -> + {ok, Conn}; + {error, Reason} -> + {error, Reason}; + {error, transport_down, #{error := 2, status := connection_refused}} -> + timer:sleep(1), + try_connect([IP|IPs], Port, Opts, Timer, {error, einval}); + {error, Reason, Flags} -> + try_connect(IPs, Port, Opts, Timer, {error, {Reason, Flags}}) + end; +try_connect([], _, _, _, Error) -> + Error. + +-dialyzer({nowarn_function, maybe_exit/1}). + +maybe_exit({error, einval}) -> exit(badarg); +maybe_exit({error, eaddrnotavail}) -> exit(badarg); +maybe_exit(Error) -> Error. + +-spec sockname(quicer_connection_handle()) + -> {ok, {inet:ip_address(), inet:port_number()}} + | {error, any()}. + +sockname(Conn) -> + quicer:sockname(Conn). + +-spec close(quicer_connection_handle()) -> ok. + +close(Conn) -> + quicer:close_connection(Conn). + +-spec start_bidi_stream(quicer_connection_handle()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +%% We cannot send data immediately because we need the +%% StreamID in order to compress the headers. +start_bidi_stream(Conn) -> + case quicer:start_stream(Conn, #{active => true}) of + {ok, StreamRef} -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + put({quicer_stream, StreamID}, StreamRef), + {ok, StreamID}; + {error, Reason1, Reason2} -> + {error, {Reason1, Reason2}}; + Error -> + Error + end. + +-spec start_unidi_stream(quicer_connection_handle(), iodata()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +%% Function copied from cowboy_quicer. +start_unidi_stream(Conn, HeaderData) -> + case quicer:start_stream(Conn, #{ + active => true, + open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of + {ok, StreamRef} -> + case quicer:send(StreamRef, HeaderData) of + {ok, _} -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + put({quicer_stream, StreamID}, StreamRef), + {ok, StreamID}; + Error -> + Error + end; + {error, Reason1, Reason2} -> + {error, {Reason1, Reason2}}; + Error -> + Error + end. + +-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata()) + -> ok | {error, any()}. + +send(Conn, StreamID, Data) -> + send(Conn, StreamID, Data, nofin). + +-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata(), cow_http:fin()) + -> ok | {error, any()}. + +send(_Conn, StreamID, Data, nofin) -> + Len = iolist_size(Data), + StreamRef = get({quicer_stream, StreamID}), + {ok, Len} = quicer:send(StreamRef, Data), + ok; +send(_Conn, StreamID, Data, fin) -> + Len = iolist_size(Data), + StreamRef = get({quicer_stream, StreamID}), + {ok, Len} = quicer:send(StreamRef, Data, ?QUIC_SEND_FLAG_FIN), + ok. + +-spec shutdown_stream(quicer_connection_handle(), + cow_http3:stream_id(), both | receiving, quicer_app_errno()) + -> ok. + +%% Function copied from cowboy_quicer. +shutdown_stream(_Conn, StreamID, Dir, ErrorCode) -> + StreamRef = get({quicer_stream, StreamID}), + _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity), + ok. + +shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT; +shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. + +%% @todo Probably should have the Conn given as argument too? +-spec handle({quic, _, _, _}) + -> {data, cow_http3:stream_id(), cow_http:fin(), binary()} + | {stream_started, cow_http3:stream_id(), unidi | bidi} + | {stream_closed, cow_http3:stream_id(), quicer_app_errno()} + | closed + | ok + | unknown + | {socket_error, any()}. + +handle({quic, peer_send_aborted, QStreamRef, ErrorCode}) -> + {ok, StreamID} = quicer:get_stream_id(QStreamRef), + {stream_peer_send_aborted, StreamID, ErrorCode}; +%% Clauses past this point copied from cowboy_quicer. +handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of + ?QUIC_RECEIVE_FLAG_FIN -> fin; + _ -> nofin + end, + {data, StreamID, IsFin, Data}; +%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED. +handle({quic, new_stream, StreamRef, #{flags := Flags}}) -> + case quicer:setopt(StreamRef, active, true) of + ok -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + put({quicer_stream, StreamID}, StreamRef), + StreamType = case quicer:is_unidirectional(Flags) of + true -> unidi; + false -> bidi + end, + {stream_started, StreamID, StreamType}; + {error, Reason} -> + {socket_error, Reason} + end; +%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE. +handle({quic, stream_closed, StreamRef, #{error := ErrorCode}}) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {stream_closed, StreamID, ErrorCode}; +%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE. +handle({quic, closed, Conn, _Flags}) -> + _ = quicer:close_connection(Conn), + closed; +%% The following events are currently ignored either because +%% I do not know what they do or because we do not need to +%% take action. +handle({quic, streams_available, _Conn, _Props}) -> + ok; +handle({quic, dgram_state_changed, _Conn, _Props}) -> + ok; +%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT +handle({quic, transport_shutdown, _Conn, _Flags}) -> + ok; +handle({quic, peer_send_shutdown, _StreamRef, undefined}) -> + ok; +handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) -> + ok; +handle({quic, shutdown, _Conn, success}) -> + ok; +handle(_Msg) -> + unknown. + +-endif. -- cgit v1.2.3