diff options
Diffstat (limited to 'src/gun_http3.erl')
-rw-r--r-- | src/gun_http3.erl | 744 |
1 files changed, 744 insertions, 0 deletions
diff --git a/src/gun_http3.erl b/src/gun_http3.erl new file mode 100644 index 0000000..9994186 --- /dev/null +++ b/src/gun_http3.erl @@ -0,0 +1,744 @@ +%% Copyright (c) 2023, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @todo Tunneling has not been implemented for HTTP/3. +-module(gun_http3). + +-export([check_options/1]). +-export([name/0]). +-export([opts_name/0]). +-export([has_keepalive/0]). +-export([default_keepalive/0]). +-export([init/4]). +-export([switch_transport/3]). +-export([handle/5]). +-export([handle_continue/6]). +-export([update_flow/4]). +-export([closing/4]). +-export([close/4]). +-export([keepalive/3]). +-export([headers/12]). +-export([request/13]). +-export([data/7]). +-export([connect/9]). +-export([cancel/5]). +-export([timeout/3]). +-export([stream_info/2]). +-export([down/1]). +-export([ws_upgrade/11]). +-export([ws_send/6]). + +-record(stream, { + %% Stream ID. + id :: non_neg_integer(), + + %% Reference used by the user of Gun to refer to this stream. + %% This may be only a part of a stream_ref() for tunneled streams. + %% For unidirectional streams this is a dummy that is never sent to the user. + ref :: reference(), + + %% Process to send messages to. + reply_to :: undefined | pid(), + + %% Whether the stream is currently in a special state. + status :: header | {unidi, control | encoder | decoder} + | normal | {data, non_neg_integer()} | discard, + + %% Stream buffer. + buffer = <<>> :: binary(), + + %% Request target URI (request stream only). + authority :: undefined | iodata(), + path :: undefined | iodata(), + + %% Content handlers state. + handler_state :: undefined | gun_content_handler:state() +}). + +-record(http3_state, { + reply_to :: pid(), + conn :: gun_quicer:quicer_connection_handle(), + transport :: module(), + opts = #{} :: gun:http2_opts(), + content_handlers :: gun_content_handler:opt(), + + %% HTTP/3 state machine. + http3_machine :: cow_http3_machine:http3_machine(), + + %% Specially handled local unidi streams. + %% @todo Maybe move the control stream to streams map. + local_control_id = undefined :: undefined | cow_http3:stream_id(), + local_encoder_id = undefined :: undefined | cow_http3:stream_id(), + local_decoder_id = undefined :: undefined | cow_http3:stream_id(), + + %% Bidirectional streams used for requests and responses, + %% as well as unidirectional streams initiated by the server. + %% + %% Streams can be found by Ref or by StreamID. The most + %% common should be the StreamID so we use it as key. We also have + %% a Ref->StreamID index for faster lookup when we only have the Ref. + streams = #{} :: #{reference() => #stream{}}, + stream_refs = #{} :: #{reference() => reference()} +}). + +check_options(_) -> + ok. %% @todo + +name() -> http3. +opts_name() -> http3_opts. +has_keepalive() -> true. +default_keepalive() -> infinity. + +init(ReplyTo, Conn, Transport, Opts) -> + Handlers = maps:get(content_handlers, Opts, [gun_data_h]), + {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(client, Opts), + {ok, ControlID} = Transport:start_unidi_stream(Conn, [<<0>>, SettingsBin]), + {ok, EncoderID} = Transport:start_unidi_stream(Conn, [<<2>>]), + {ok, DecoderID} = Transport:start_unidi_stream(Conn, [<<3>>]), + %% Set the control, encoder and decoder streams in the machine. + HTTP3Machine = cow_http3_machine:init_unidi_local_streams( + ControlID, EncoderID, DecoderID, HTTP3Machine0), + {ok, connected, #http3_state{reply_to=ReplyTo, conn=Conn, transport=Transport, + opts=Opts, content_handlers=Handlers, http3_machine=HTTP3Machine, + local_control_id=ControlID, local_encoder_id=EncoderID, + local_decoder_id=DecoderID}}. + +-spec switch_transport(_, _, _) -> no_return(). + +switch_transport(_Transport, _Conn, _State) -> + error(unimplemented). + +handle(Msg, State0=#http3_state{transport=Transport}, + CookieStore, EvHandler, EvHandlerState) -> + case Transport:handle(Msg) of + {data, StreamID, IsFin, Data} -> + parse(Data, State0, StreamID, IsFin, + CookieStore, EvHandler, EvHandlerState); + {stream_started, StreamID, StreamType} -> + State = stream_new_remote(State0, StreamID, StreamType), + {{state, State}, CookieStore, EvHandlerState}; + {stream_closed, _StreamID, _ErrorCode} -> + %% @todo Clean up the stream. + {{state, State0}, CookieStore, EvHandlerState}; + {stream_peer_send_aborted, StreamID, ErrorCode} -> + Reason = cow_http3:code_to_error(ErrorCode), + {StateOrError, EvHandlerStateRet} = stream_aborted( + State0, StreamID, Reason, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet}; + closed -> + %% @todo Terminate the connection. + {{state, State0}, CookieStore, EvHandlerState}; + ok -> + {{state, State0}, CookieStore, EvHandlerState}; + unknown -> + %% @todo Log a warning. + {{state, State0}, CookieStore, EvHandlerState} + end. + +parse(Data, State, StreamID, IsFin, CookieStore, EvHandler, EvHandlerState) -> + case stream_get(State, StreamID) of + Stream=#stream{buffer= <<>>} -> + parse1(Data, State, Stream, IsFin, + CookieStore, EvHandler, EvHandlerState); + Stream=#stream{buffer=Buffer} -> + %% @todo OK we should only keep the StreamRef forward + %% and update the stream in the state here. + Stream1 = Stream#stream{buffer= <<>>}, + parse1(<<Buffer/binary, Data/binary>>, + stream_update(State, Stream1), Stream1, IsFin, + CookieStore, EvHandler, EvHandlerState); + %% Pending data for a stream that has been reset. Ignore. + error -> + {{state, State}, CookieStore, EvHandlerState} + end. + +parse1(Data, State, Stream=#stream{status=header}, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + parse_unidirectional_stream_header(Data, State, Stream, IsFin, + CookieStore, EvHandler, EvHandlerState); +parse1(Data, State0=#http3_state{http3_machine=HTTP3Machine0}, + #stream{status={unidi, Type}, id=StreamID}, IsFin, + CookieStore, _EvHandler, EvHandlerState) + when Type =:= encoder; Type =:= decoder -> + case cow_http3_machine:unidi_data(Data, IsFin, StreamID, HTTP3Machine0) of + {ok, Instrs, HTTP3Machine} -> + State = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs), + {{state, State}, CookieStore, EvHandlerState}; + {error, Error={connection_error, _, _}, HTTP3Machine} -> + {connection_error(State0#http3_state{http3_machine=HTTP3Machine}, Error), + CookieStore, EvHandlerState} + end; +parse1(Data, State, Stream=#stream{status={data, Len}, id=StreamID}, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + DataLen = byte_size(Data), + if + DataLen < Len -> + %% We don't have the full frame but this is the end of the + %% data we have. So FrameIsFin is equivalent to IsFin here. + frame(State, Stream#stream{status={data, Len - DataLen}}, + {data, Data}, IsFin, CookieStore, EvHandler, EvHandlerState); + true -> + <<Data1:Len/binary, Rest/bits>> = Data, + FrameIsFin = is_fin(IsFin, Rest), + case frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin, + CookieStore, EvHandler, EvHandlerState) of + %% @todo {error, _}. + {{state, State1}, CookieStore1, EvHandlerState1} -> + parse(Rest, State1, StreamID, IsFin, + CookieStore1, EvHandler, EvHandlerState1) + end + end; +%% @todo Clause that discards receiving data for aborted streams. +parse1(Data, State, Stream=#stream{id=StreamID}, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + case cow_http3:parse(Data) of + {ok, Frame, Rest} -> + FrameIsFin = is_fin(IsFin, Rest), + case frame(State, Stream, Frame, FrameIsFin, + CookieStore, EvHandler, EvHandlerState) of + %% @todo {error, _}. + {{state, State1}, CookieStore1, EvHandlerState1} -> + parse(Rest, State1, StreamID, IsFin, + CookieStore1, EvHandler, EvHandlerState1) + end; + {more, Frame = {data, _}, Len} -> + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin -> + frame(State, Stream#stream{status={data, Len}}, Frame, nofin, + CookieStore, EvHandler, EvHandlerState); + fin -> + {connection_error(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}), + CookieStore, EvHandlerState} + end; + %% @todo {more, ignore, Len} + {ignore, Rest} -> + case ignored_frame(State, Stream) of + {state, State1} -> + parse(Rest, State1, StreamID, IsFin, + CookieStore, EvHandler, EvHandlerState) + end; + Error = {connection_error, _, _} -> + {connection_error(State, Error), CookieStore, EvHandlerState}; + more when Data =:= <<>> -> + {{state, stream_update(State, Stream#stream{buffer=Data})}, + CookieStore, EvHandlerState}; + more -> + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin -> + {{state, stream_update(State, Stream#stream{buffer=Data})}, + CookieStore, EvHandlerState}; + fin -> + {connection_error(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}), + CookieStore, EvHandlerState} + end + end. + +%% We may receive multiple frames in a single QUIC packet. +%% The FIN flag applies to the QUIC packet, not to the frame. +%% We must therefore only consider the frame to have a FIN +%% flag if there's no data remaining to be read. +is_fin(fin, <<>>) -> fin; +is_fin(_, _) -> nofin. + +parse_unidirectional_stream_header(Data, State0=#http3_state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, IsFin, CookieStore, EvHandler, EvHandlerState) -> + case cow_http3:parse_unidi_stream_header(Data) of + {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder -> + case cow_http3_machine:set_unidi_remote_stream_type( + StreamID, Type, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#http3_state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={unidi, Type}}, + parse(Rest, stream_update(State, Stream), StreamID, IsFin, + CookieStore, EvHandler, EvHandlerState); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + {connection_error(State0#http3_state{http3_machine=HTTP3Machine}, Error), + CookieStore, EvHandlerState} + end; +%% @todo Implement server push. +%% Note that we shouldn't receive this frame until we set MAX_PUSH_ID. +% {ok, push, _} -> +% {connection_error(State0, {connection_error, h3_stream_creation_error, +% 'Only servers can push. (RFC9114 6.2.2)'}), +% CookieStore, EvHandlerState}; + %% Unknown stream types must be ignored. We choose to abort the + %% stream instead of reading and discarding the incoming data. + {undefined, _} -> + {{state, (stream_abort_receive(State0, Stream0, h3_stream_creation_error))}, + CookieStore, EvHandlerState} + end. + +%% @todo Cookie/events. +frame(State=#http3_state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, Frame, IsFin, + CookieStore, EvHandler, EvHandlerState) -> + case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + {{state, State#http3_state{http3_machine=HTTP3Machine}}, + CookieStore, EvHandlerState}; + {ok, {data, Data}, HTTP3Machine} -> + data_frame(State#http3_state{http3_machine=HTTP3Machine}, Stream, IsFin, Data, + CookieStore, EvHandler, EvHandlerState); + {ok, {headers, Headers, PseudoHeaders, BodyLen}, Instrs, HTTP3Machine} -> + headers_frame( + send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs), + Stream, IsFin, Headers, PseudoHeaders, BodyLen, + CookieStore, EvHandler, EvHandlerState); + {ok, {trailers, Trailers}, Instrs, HTTP3Machine} -> + {StateOrError, EvHandlerStateRet} = trailers_frame( + send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs), + Stream, Trailers, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet}; + {ok, GoAway={goaway, _}, HTTP3Machine} -> + goaway(State#http3_state{http3_machine=HTTP3Machine}, GoAway); + {error, Error={stream_error, _Reason, _Human}, Instrs, HTTP3Machine} -> + State1 = send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs), + reset_stream(State1, StreamID, Error); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + {connection_error(State#http3_state{http3_machine=HTTP3Machine}, Error), + CookieStore, EvHandlerState} + end. + +data_frame(State0, Stream, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> + case Stream of + #stream{} -> %tunnel=undefined} -> + {StateOrError, EvHandlerState} = data_frame1(State0, + Stream, IsFin, Data, EvHandler, EvHandlerState0), + {StateOrError, CookieStore0, EvHandlerState}%; +% Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> +%% %% @todo What about IsFin? +% {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, +% ProtoState0, CookieStore0, EvHandler, EvHandlerState0), +% %% The frame/parse functions only handle state or error commands. +% {ResCommands, EvHandlerState} = tunnel_commands(Commands, +% Stream, State0, EvHandler, EvHandlerState1), +% {ResCommands, CookieStore, EvHandlerState} + end. + +data_frame1(State0, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, + %flow=Flow0, + handler_state=Handlers0}, IsFin, Data, EvHandler, EvHandlerState0) -> + {ok, _Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), +% Flow = case Flow0 of +% infinity -> infinity; +% _ -> Flow0 - Dec +% end, + State1 = stream_update(State0, Stream#stream{%flow=Flow, + handler_state=Handlers}), + {StateOrError, EvHandlerState} = case IsFin of + fin -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, %% @todo stream_ref(State1, StreamRef), + reply_to => ReplyTo + }, EvHandlerState0), + {{state, State1}, EvHandlerState1}; + nofin -> + {{state, State1}, EvHandlerState0} + end, + case StateOrError of + {state, State} -> + %% We do not remove the stream immediately. We will only when + %% the QUIC stream gets closed. + {{state, State}, EvHandlerState}%; +% Error={error, _} -> +% %% @todo Delete stream and return new state and error commands. +% {Error, EvHandlerState} + end. + +headers_frame(State0=#http3_state{opts=Opts}, Stream, IsFin, Headers, + #{status := Status}, _BodyLen, CookieStore0, EvHandler, EvHandlerState0) -> + #stream{ + authority=Authority, + path=Path%, +% tunnel=Tunnel + } = Stream, + CookieStore = gun_cookies:set_cookie_header(<<"https">>, + Authority, Path, Status, Headers, CookieStore0, Opts), + {StateOrError, EvHandlerState} = if + Status >= 100, Status =< 199 -> + headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); +% Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin -> +% headers_frame_connect(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); + true -> + headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0) + end, + {StateOrError, CookieStore, EvHandlerState}. + +headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, + Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), + ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {{state, State}, EvHandlerState}. + +headers_frame_response(State=#http3_state{content_handlers=Handlers0}, + Stream=#stream{ref=StreamRef, reply_to=ReplyTo}, + IsFin, Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {Handlers, EvHandlerState} = case IsFin of + fin -> + EvHandlerState2 = EvHandler:response_end(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, EvHandlerState1), + {undefined, EvHandlerState2}; + nofin -> + {gun_content_handler:init(ReplyTo, RealStreamRef, + Status, Headers, Handlers0), EvHandlerState1} + end, + %% @todo Uncomment when tunnel is added. + %% We disable the tunnel, if any, when receiving any non 2xx response. + %% + %% We do not remove the stream immediately. We will only when + %% the QUIC stream gets closed. + {{state, stream_update(State, + Stream#stream{handler_state=Handlers})},%, tunnel=undefined})}, + EvHandlerState}. + +trailers_frame(State, #stream{ref=StreamRef, reply_to=ReplyTo}, + Trailers, EvHandler, EvHandlerState0) -> + %% @todo We probably want to pass this to gun_content_handler? + RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, + ResponseEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), + EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), + %% We do not remove the stream immediately. We will only when + %% the QUIC stream gets closed. + {{state, State}, EvHandlerState}. + +-spec goaway(_, _) -> no_return(). + +goaway(_State, _GoAway) -> + error(todo). + +-spec reset_stream(_, _, _) -> no_return(). + +reset_stream(_State, _StreamID, _Error) -> + error(todo). + +-spec ignored_frame(_, _) -> no_return(). + +%% @todo Cookie/events. +ignored_frame(_State, _Stream) -> + error(todo). + +stream_abort_receive(State=#http3_state{conn=Conn, transport=Transport}, + Stream=#stream{id=StreamID}, Reason) -> + Transport:shutdown_stream(Conn, StreamID, receiving, cow_http3:error_to_code(Reason)), + stream_update(State, Stream#stream{status=discard}). + +-spec connection_error(_, _) -> no_return(). + +connection_error(_State, _Error) -> + error(todo). + +-spec handle_continue(_, _, _, _, _, _) -> no_return(). + +handle_continue(_ContinueStreamRef, _Msg, _State, _CookieStore, _EvHandler, _EvHandlerState) -> + error(unimplemented). + +-spec update_flow(_, _, _, _) -> no_return(). + +update_flow(_State, _ReplyTo, _StreamRef, _Inc) -> + error(unimplemented). + +closing(_Reason, _State, _, EvHandlerState) -> + %% @todo Implement graceful shutdown. + {[], EvHandlerState}. + +close(_Reason, _State, _, EvHandlerState) -> + %% @todo Implement. + EvHandlerState. + +-spec keepalive(_, _, _) -> no_return(). + +keepalive(_State, _, _EvHandlerState) -> + error(todo). + +headers(State0=#http3_state{conn=Conn, transport=Transport, + http3_machine=HTTP3Machine0}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers0, _InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> + {ok, StreamID} = Transport:start_bidi_stream(Conn), + HTTP3Machine1 = cow_http3_machine:init_bidi_stream(StreamID, + iolist_to_binary(Method), HTTP3Machine0), + {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers( + Method, Host, Port, Path, Headers0, CookieStore0), + Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = StreamRef, %% @todo stream_ref(State0, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => Authority, + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + {ok, nofin, HeaderBlock, Instrs, HTTP3Machine} = cow_http3_machine:prepare_headers( + StreamID, HTTP3Machine1, nofin, PseudoHeaders, Headers), + State1 = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs), + ok = Transport:send(Conn, StreamID, cow_http3:headers(HeaderBlock)), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + %% @todo InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + status=normal, authority=Authority, path=Path}, + State = stream_new_local(State1, Stream), + {{state, State}, CookieStore, EvHandlerState}. + +%% @todo We need to configure the initial flow control for the stream. +request(State0=#http3_state{conn=Conn, transport=Transport, + http3_machine=HTTP3Machine0}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers0, Body, _InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> + Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, + {<<"content-length">>, integer_to_binary(iolist_size(Body))}), + %% @todo InitialFlow = initial_flow(InitialFlow0, Opts), + {ok, StreamID} = Transport:start_bidi_stream(Conn), + HTTP3Machine1 = cow_http3_machine:init_bidi_stream(StreamID, + iolist_to_binary(Method), HTTP3Machine0), + {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers( + Method, Host, Port, Path, Headers1, CookieStore0), + Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = StreamRef, %% @todo stream_ref(State0, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => Authority, + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + {ok, fin, HeaderBlock, Instrs, HTTP3Machine} = cow_http3_machine:prepare_headers( + StreamID, HTTP3Machine1, fin, PseudoHeaders, Headers), + State1 = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs), + %% @todo Handle send errors. + ok = Transport:send(Conn, StreamID, [ + cow_http3:headers(HeaderBlock), + %% Only send a DATA frame if we have a body. + case iolist_size(Body) of + 0 -> <<>>; + _ -> cow_http3:data(Body) + end + ], fin), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + status=normal, authority=Authority, path=Path}, + State = stream_new_local(State1, Stream), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}. + +prepare_headers(Method, Host0, Port, Path, Headers0, CookieStore0) -> + Scheme = <<"https">>, + Authority = case lists:keyfind(<<"host">>, 1, Headers0) of + {_, Host} -> Host; + _ -> gun_http:host_header(tls, Host0, Port) + end, + %% @todo We also must remove any header found in the connection header. + %% @todo Much of this is duplicated in cow_http2, cow_http2_machine + %% and cow_http3_machine; sort things out. + Headers1 = + lists:keydelete(<<"host">>, 1, + lists:keydelete(<<"connection">>, 1, + lists:keydelete(<<"keep-alive">>, 1, + lists:keydelete(<<"proxy-connection">>, 1, + lists:keydelete(<<"transfer-encoding">>, 1, + lists:keydelete(<<"upgrade">>, 1, Headers0)))))), + {Headers, CookieStore} = gun_cookies:add_cookie_header( + Scheme, Authority, Path, Headers1, CookieStore0), + PseudoHeaders = #{ + method => Method, + scheme => Scheme, + authority => Authority, + path => Path + }, + {ok, PseudoHeaders, Headers, CookieStore}. + +%% @todo We would open unidi streams here if we only open on-demand. +%% No instructions. +send_instructions(State, undefined) -> + State; +%% Decoder instructions. +send_instructions(State=#http3_state{conn=Conn, transport=Transport, + local_decoder_id=DecoderID}, {decoder_instructions, DecData}) -> + %% @todo Handle send errors. + ok = Transport:send(Conn, DecoderID, DecData), + State; +%% Encoder instructions. +send_instructions(State=#http3_state{conn=Conn, transport=Transport, + local_encoder_id=EncoderID}, {encoder_instructions, EncData}) -> + %% @todo Handle send errors. + ok = Transport:send(Conn, EncoderID, EncData), + State. + +data(State=#http3_state{conn=Conn, transport=Transport}, StreamRef, _ReplyTo, IsFin, Data, + _EvHandler, EvHandlerState) when is_reference(StreamRef) -> + case stream_get_by_ref(State, StreamRef) of + #stream{id=StreamID} -> %, tunnel=Tunnel} -> + ok = Transport:send(Conn, StreamID, cow_http3:data(Data), IsFin), + {[], EvHandlerState} %% @todo Probably wrong, need to update/keep states? +%% @todo +% case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of +% {ok, fin, _} -> +% error_stream_closed(State, StreamRef, ReplyTo), +% {[], EvHandlerState}; +% {ok, _, fin} -> +% error_stream_closed(State, StreamRef, ReplyTo), +% {[], EvHandlerState}; +% {ok, _, _} when Tunnel =:= undefined -> +% maybe_send_data(State, +% StreamID, IsFin, Data, EvHandler, EvHandlerState); +% {ok, _, _} -> +% #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel, +% {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, +% ReplyTo, IsFin, Data, EvHandler, EvHandlerState), +% tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1) +% end%; +%% @todo +% error -> +% error_stream_not_found(State, StreamRef, ReplyTo), +% {[], EvHandlerState} + end. + +-spec connect(_, _, _, _, _, _, _, _, _) -> no_return(). + +connect(_State, StreamRef, _ReplyTo, _Destination, _TunnelInfo, _Headers0, + _InitialFlow0, _EvHandler, _EvHandlerState0) when is_reference(StreamRef) -> + error(unimplemented). + +-spec cancel(_, _, _, _, _) -> no_return(). + +cancel(_State, StreamRef, _ReplyTo, _EvHandler, _EvHandlerState0) + when is_reference(StreamRef) -> + error(unimplemented). + +-spec timeout(_, _, _) -> no_return(). + +timeout(_State, _Timeout, _TRef) -> + error(todo). + +-spec stream_info(_, _) -> no_return(). + +stream_info(_State, StreamRef) when is_reference(StreamRef) -> + error(unimplemented). + +-spec down(_) -> no_return(). + +down(_State) -> + error(todo). + +-spec ws_upgrade(_, _, _, _, _, _, _, _, _, _, _) -> no_return(). + +ws_upgrade(_State, StreamRef, _ReplyTo, + _Host, _Port, _Path, _Headers0, _WsOpts, + _CookieStore0, _EvHandler, _EvHandlerState0) + when is_reference(StreamRef) -> + error(todo). + +-spec ws_send(_, _, _, _, _, _) -> no_return(). + +ws_send(_Frames, _State, _RealStreamRef, _ReplyTo, _EvHandler, _EvHandlerState0) -> + error(todo). + +%% Streams. + +stream_get(#http3_state{streams=Streams}, StreamID) -> + maps:get(StreamID, Streams, error). + +stream_get_by_ref(State=#http3_state{stream_refs=Refs}, StreamRef) -> + case maps:get(StreamRef, Refs, error) of + error -> error; + StreamID -> stream_get(State, StreamID) + end. + +stream_new_remote(State=#http3_state{http3_machine=HTTP3Machine0, + streams=Streams, stream_refs=Refs}, StreamID, StreamType) -> + %% All remote streams to the client are expected to be unidirectional. + %% @todo Handle errors instead of crashing. + unidi = StreamType, + HTTP3Machine = cow_http3_machine:init_unidi_stream(StreamID, + unidi_remote, HTTP3Machine0), + StreamRef = make_ref(), + Stream = #stream{id=StreamID, ref=StreamRef, status=header}, + State#http3_state{ + http3_machine=HTTP3Machine, + streams=Streams#{StreamID => Stream}, + stream_refs=Refs#{StreamRef => StreamID} + }. + +stream_new_local(State=#http3_state{streams=Streams, stream_refs=Refs}, + Stream=#stream{id=StreamID, ref=StreamRef}) -> + State#http3_state{ + streams=Streams#{StreamID => Stream}, + stream_refs=Refs#{StreamRef => StreamID} + }. + +stream_update(State=#http3_state{streams=Streams}, + Stream=#stream{id=StreamID}) -> + State#http3_state{ + streams=Streams#{StreamID => Stream} + }. + +stream_aborted(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> + case stream_take(State0, StreamID) of + {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> + ReplyTo ! {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef), + {stream_error, Reason, 'Stream reset by server.'}}, + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, %% @todo stream_ref(State, StreamRef), + reply_to => ReplyTo, + endpoint => remote, + reason => Reason + }, EvHandlerState0), + {{state, State}, EvHandlerState}; + error -> + {{state, State0}, EvHandlerState0} + end. + +stream_take(State=#http3_state{streams=Streams0, stream_refs=Refs}, StreamID) -> + case maps:take(StreamID, Streams0) of + {Stream=#stream{ref=StreamRef}, Streams} -> + {Stream, State#http3_state{ + streams=Streams, + stream_refs=maps:remove(StreamRef, Refs) + }}; + error -> + error + end. |