aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http3.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_http3.erl')
-rw-r--r--src/gun_http3.erl744
1 files changed, 744 insertions, 0 deletions
diff --git a/src/gun_http3.erl b/src/gun_http3.erl
new file mode 100644
index 0000000..9994186
--- /dev/null
+++ b/src/gun_http3.erl
@@ -0,0 +1,744 @@
+%% Copyright (c) 2023, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% @todo Tunneling has not been implemented for HTTP/3.
+-module(gun_http3).
+
+-export([check_options/1]).
+-export([name/0]).
+-export([opts_name/0]).
+-export([has_keepalive/0]).
+-export([default_keepalive/0]).
+-export([init/4]).
+-export([switch_transport/3]).
+-export([handle/5]).
+-export([handle_continue/6]).
+-export([update_flow/4]).
+-export([closing/4]).
+-export([close/4]).
+-export([keepalive/3]).
+-export([headers/12]).
+-export([request/13]).
+-export([data/7]).
+-export([connect/9]).
+-export([cancel/5]).
+-export([timeout/3]).
+-export([stream_info/2]).
+-export([down/1]).
+-export([ws_upgrade/11]).
+-export([ws_send/6]).
+
+-record(stream, {
+ %% Stream ID.
+ id :: non_neg_integer(),
+
+ %% Reference used by the user of Gun to refer to this stream.
+ %% This may be only a part of a stream_ref() for tunneled streams.
+ %% For unidirectional streams this is a dummy that is never sent to the user.
+ ref :: reference(),
+
+ %% Process to send messages to.
+ reply_to :: undefined | pid(),
+
+ %% Whether the stream is currently in a special state.
+ status :: header | {unidi, control | encoder | decoder}
+ | normal | {data, non_neg_integer()} | discard,
+
+ %% Stream buffer.
+ buffer = <<>> :: binary(),
+
+ %% Request target URI (request stream only).
+ authority :: undefined | iodata(),
+ path :: undefined | iodata(),
+
+ %% Content handlers state.
+ handler_state :: undefined | gun_content_handler:state()
+}).
+
+-record(http3_state, {
+ reply_to :: pid(),
+ conn :: gun_quicer:quicer_connection_handle(),
+ transport :: module(),
+ opts = #{} :: gun:http2_opts(),
+ content_handlers :: gun_content_handler:opt(),
+
+ %% HTTP/3 state machine.
+ http3_machine :: cow_http3_machine:http3_machine(),
+
+ %% Specially handled local unidi streams.
+ %% @todo Maybe move the control stream to streams map.
+ local_control_id = undefined :: undefined | cow_http3:stream_id(),
+ local_encoder_id = undefined :: undefined | cow_http3:stream_id(),
+ local_decoder_id = undefined :: undefined | cow_http3:stream_id(),
+
+ %% Bidirectional streams used for requests and responses,
+ %% as well as unidirectional streams initiated by the server.
+ %%
+ %% Streams can be found by Ref or by StreamID. The most
+ %% common should be the StreamID so we use it as key. We also have
+ %% a Ref->StreamID index for faster lookup when we only have the Ref.
+ streams = #{} :: #{reference() => #stream{}},
+ stream_refs = #{} :: #{reference() => reference()}
+}).
+
+check_options(_) ->
+ ok. %% @todo
+
+name() -> http3.
+opts_name() -> http3_opts.
+has_keepalive() -> true.
+default_keepalive() -> infinity.
+
+init(ReplyTo, Conn, Transport, Opts) ->
+ Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
+ {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(client, Opts),
+ {ok, ControlID} = Transport:start_unidi_stream(Conn, [<<0>>, SettingsBin]),
+ {ok, EncoderID} = Transport:start_unidi_stream(Conn, [<<2>>]),
+ {ok, DecoderID} = Transport:start_unidi_stream(Conn, [<<3>>]),
+ %% Set the control, encoder and decoder streams in the machine.
+ HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
+ ControlID, EncoderID, DecoderID, HTTP3Machine0),
+ {ok, connected, #http3_state{reply_to=ReplyTo, conn=Conn, transport=Transport,
+ opts=Opts, content_handlers=Handlers, http3_machine=HTTP3Machine,
+ local_control_id=ControlID, local_encoder_id=EncoderID,
+ local_decoder_id=DecoderID}}.
+
+-spec switch_transport(_, _, _) -> no_return().
+
+switch_transport(_Transport, _Conn, _State) ->
+ error(unimplemented).
+
+handle(Msg, State0=#http3_state{transport=Transport},
+ CookieStore, EvHandler, EvHandlerState) ->
+ case Transport:handle(Msg) of
+ {data, StreamID, IsFin, Data} ->
+ parse(Data, State0, StreamID, IsFin,
+ CookieStore, EvHandler, EvHandlerState);
+ {stream_started, StreamID, StreamType} ->
+ State = stream_new_remote(State0, StreamID, StreamType),
+ {{state, State}, CookieStore, EvHandlerState};
+ {stream_closed, _StreamID, _ErrorCode} ->
+ %% @todo Clean up the stream.
+ {{state, State0}, CookieStore, EvHandlerState};
+ {stream_peer_send_aborted, StreamID, ErrorCode} ->
+ Reason = cow_http3:code_to_error(ErrorCode),
+ {StateOrError, EvHandlerStateRet} = stream_aborted(
+ State0, StreamID, Reason, EvHandler, EvHandlerState),
+ {StateOrError, CookieStore, EvHandlerStateRet};
+ closed ->
+ %% @todo Terminate the connection.
+ {{state, State0}, CookieStore, EvHandlerState};
+ ok ->
+ {{state, State0}, CookieStore, EvHandlerState};
+ unknown ->
+ %% @todo Log a warning.
+ {{state, State0}, CookieStore, EvHandlerState}
+ end.
+
+parse(Data, State, StreamID, IsFin, CookieStore, EvHandler, EvHandlerState) ->
+ case stream_get(State, StreamID) of
+ Stream=#stream{buffer= <<>>} ->
+ parse1(Data, State, Stream, IsFin,
+ CookieStore, EvHandler, EvHandlerState);
+ Stream=#stream{buffer=Buffer} ->
+ %% @todo OK we should only keep the StreamRef forward
+ %% and update the stream in the state here.
+ Stream1 = Stream#stream{buffer= <<>>},
+ parse1(<<Buffer/binary, Data/binary>>,
+ stream_update(State, Stream1), Stream1, IsFin,
+ CookieStore, EvHandler, EvHandlerState);
+ %% Pending data for a stream that has been reset. Ignore.
+ error ->
+ {{state, State}, CookieStore, EvHandlerState}
+ end.
+
+parse1(Data, State, Stream=#stream{status=header}, IsFin,
+ CookieStore, EvHandler, EvHandlerState) ->
+ parse_unidirectional_stream_header(Data, State, Stream, IsFin,
+ CookieStore, EvHandler, EvHandlerState);
+parse1(Data, State0=#http3_state{http3_machine=HTTP3Machine0},
+ #stream{status={unidi, Type}, id=StreamID}, IsFin,
+ CookieStore, _EvHandler, EvHandlerState)
+ when Type =:= encoder; Type =:= decoder ->
+ case cow_http3_machine:unidi_data(Data, IsFin, StreamID, HTTP3Machine0) of
+ {ok, Instrs, HTTP3Machine} ->
+ State = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs),
+ {{state, State}, CookieStore, EvHandlerState};
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ {connection_error(State0#http3_state{http3_machine=HTTP3Machine}, Error),
+ CookieStore, EvHandlerState}
+ end;
+parse1(Data, State, Stream=#stream{status={data, Len}, id=StreamID}, IsFin,
+ CookieStore, EvHandler, EvHandlerState) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ %% We don't have the full frame but this is the end of the
+ %% data we have. So FrameIsFin is equivalent to IsFin here.
+ frame(State, Stream#stream{status={data, Len - DataLen}},
+ {data, Data}, IsFin, CookieStore, EvHandler, EvHandlerState);
+ true ->
+ <<Data1:Len/binary, Rest/bits>> = Data,
+ FrameIsFin = is_fin(IsFin, Rest),
+ case frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin,
+ CookieStore, EvHandler, EvHandlerState) of
+ %% @todo {error, _}.
+ {{state, State1}, CookieStore1, EvHandlerState1} ->
+ parse(Rest, State1, StreamID, IsFin,
+ CookieStore1, EvHandler, EvHandlerState1)
+ end
+ end;
+%% @todo Clause that discards receiving data for aborted streams.
+parse1(Data, State, Stream=#stream{id=StreamID}, IsFin,
+ CookieStore, EvHandler, EvHandlerState) ->
+ case cow_http3:parse(Data) of
+ {ok, Frame, Rest} ->
+ FrameIsFin = is_fin(IsFin, Rest),
+ case frame(State, Stream, Frame, FrameIsFin,
+ CookieStore, EvHandler, EvHandlerState) of
+ %% @todo {error, _}.
+ {{state, State1}, CookieStore1, EvHandlerState1} ->
+ parse(Rest, State1, StreamID, IsFin,
+ CookieStore1, EvHandler, EvHandlerState1)
+ end;
+ {more, Frame = {data, _}, Len} ->
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin ->
+ frame(State, Stream#stream{status={data, Len}}, Frame, nofin,
+ CookieStore, EvHandler, EvHandlerState);
+ fin ->
+ {connection_error(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'}),
+ CookieStore, EvHandlerState}
+ end;
+ %% @todo {more, ignore, Len}
+ {ignore, Rest} ->
+ case ignored_frame(State, Stream) of
+ {state, State1} ->
+ parse(Rest, State1, StreamID, IsFin,
+ CookieStore, EvHandler, EvHandlerState)
+ end;
+ Error = {connection_error, _, _} ->
+ {connection_error(State, Error), CookieStore, EvHandlerState};
+ more when Data =:= <<>> ->
+ {{state, stream_update(State, Stream#stream{buffer=Data})},
+ CookieStore, EvHandlerState};
+ more ->
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin ->
+ {{state, stream_update(State, Stream#stream{buffer=Data})},
+ CookieStore, EvHandlerState};
+ fin ->
+ {connection_error(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'}),
+ CookieStore, EvHandlerState}
+ end
+ end.
+
+%% We may receive multiple frames in a single QUIC packet.
+%% The FIN flag applies to the QUIC packet, not to the frame.
+%% We must therefore only consider the frame to have a FIN
+%% flag if there's no data remaining to be read.
+is_fin(fin, <<>>) -> fin;
+is_fin(_, _) -> nofin.
+
+parse_unidirectional_stream_header(Data, State0=#http3_state{http3_machine=HTTP3Machine0},
+ Stream0=#stream{id=StreamID}, IsFin, CookieStore, EvHandler, EvHandlerState) ->
+ case cow_http3:parse_unidi_stream_header(Data) of
+ {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
+ case cow_http3_machine:set_unidi_remote_stream_type(
+ StreamID, Type, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State = State0#http3_state{http3_machine=HTTP3Machine},
+ Stream = Stream0#stream{status={unidi, Type}},
+ parse(Rest, stream_update(State, Stream), StreamID, IsFin,
+ CookieStore, EvHandler, EvHandlerState);
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ {connection_error(State0#http3_state{http3_machine=HTTP3Machine}, Error),
+ CookieStore, EvHandlerState}
+ end;
+%% @todo Implement server push.
+%% Note that we shouldn't receive this frame until we set MAX_PUSH_ID.
+% {ok, push, _} ->
+% {connection_error(State0, {connection_error, h3_stream_creation_error,
+% 'Only servers can push. (RFC9114 6.2.2)'}),
+% CookieStore, EvHandlerState};
+ %% Unknown stream types must be ignored. We choose to abort the
+ %% stream instead of reading and discarding the incoming data.
+ {undefined, _} ->
+ {{state, (stream_abort_receive(State0, Stream0, h3_stream_creation_error))},
+ CookieStore, EvHandlerState}
+ end.
+
+%% @todo Cookie/events.
+frame(State=#http3_state{http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, Frame, IsFin,
+ CookieStore, EvHandler, EvHandlerState) ->
+ case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ {{state, State#http3_state{http3_machine=HTTP3Machine}},
+ CookieStore, EvHandlerState};
+ {ok, {data, Data}, HTTP3Machine} ->
+ data_frame(State#http3_state{http3_machine=HTTP3Machine}, Stream, IsFin, Data,
+ CookieStore, EvHandler, EvHandlerState);
+ {ok, {headers, Headers, PseudoHeaders, BodyLen}, Instrs, HTTP3Machine} ->
+ headers_frame(
+ send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs),
+ Stream, IsFin, Headers, PseudoHeaders, BodyLen,
+ CookieStore, EvHandler, EvHandlerState);
+ {ok, {trailers, Trailers}, Instrs, HTTP3Machine} ->
+ {StateOrError, EvHandlerStateRet} = trailers_frame(
+ send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs),
+ Stream, Trailers, EvHandler, EvHandlerState),
+ {StateOrError, CookieStore, EvHandlerStateRet};
+ {ok, GoAway={goaway, _}, HTTP3Machine} ->
+ goaway(State#http3_state{http3_machine=HTTP3Machine}, GoAway);
+ {error, Error={stream_error, _Reason, _Human}, Instrs, HTTP3Machine} ->
+ State1 = send_instructions(State#http3_state{http3_machine=HTTP3Machine}, Instrs),
+ reset_stream(State1, StreamID, Error);
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ {connection_error(State#http3_state{http3_machine=HTTP3Machine}, Error),
+ CookieStore, EvHandlerState}
+ end.
+
+data_frame(State0, Stream, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) ->
+ case Stream of
+ #stream{} -> %tunnel=undefined} ->
+ {StateOrError, EvHandlerState} = data_frame1(State0,
+ Stream, IsFin, Data, EvHandler, EvHandlerState0),
+ {StateOrError, CookieStore0, EvHandlerState}%;
+% Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+%% %% @todo What about IsFin?
+% {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data,
+% ProtoState0, CookieStore0, EvHandler, EvHandlerState0),
+% %% The frame/parse functions only handle state or error commands.
+% {ResCommands, EvHandlerState} = tunnel_commands(Commands,
+% Stream, State0, EvHandler, EvHandlerState1),
+% {ResCommands, CookieStore, EvHandlerState}
+ end.
+
+data_frame1(State0, Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
+ %flow=Flow0,
+ handler_state=Handlers0}, IsFin, Data, EvHandler, EvHandlerState0) ->
+ {ok, _Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
+% Flow = case Flow0 of
+% infinity -> infinity;
+% _ -> Flow0 - Dec
+% end,
+ State1 = stream_update(State0, Stream#stream{%flow=Flow,
+ handler_state=Handlers}),
+ {StateOrError, EvHandlerState} = case IsFin of
+ fin ->
+ EvHandlerState1 = EvHandler:response_end(#{
+ stream_ref => StreamRef, %% @todo stream_ref(State1, StreamRef),
+ reply_to => ReplyTo
+ }, EvHandlerState0),
+ {{state, State1}, EvHandlerState1};
+ nofin ->
+ {{state, State1}, EvHandlerState0}
+ end,
+ case StateOrError of
+ {state, State} ->
+ %% We do not remove the stream immediately. We will only when
+ %% the QUIC stream gets closed.
+ {{state, State}, EvHandlerState}%;
+% Error={error, _} ->
+% %% @todo Delete stream and return new state and error commands.
+% {Error, EvHandlerState}
+ end.
+
+headers_frame(State0=#http3_state{opts=Opts}, Stream, IsFin, Headers,
+ #{status := Status}, _BodyLen, CookieStore0, EvHandler, EvHandlerState0) ->
+ #stream{
+ authority=Authority,
+ path=Path%,
+% tunnel=Tunnel
+ } = Stream,
+ CookieStore = gun_cookies:set_cookie_header(<<"https">>,
+ Authority, Path, Status, Headers, CookieStore0, Opts),
+ {StateOrError, EvHandlerState} = if
+ Status >= 100, Status =< 199 ->
+ headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0);
+% Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin ->
+% headers_frame_connect(State0, Stream, Status, Headers, EvHandler, EvHandlerState0);
+ true ->
+ headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0)
+ end,
+ {StateOrError, CookieStore, EvHandlerState}.
+
+headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo},
+ Status, Headers, EvHandler, EvHandlerState0) ->
+ RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef),
+ ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
+ EvHandlerState = EvHandler:response_inform(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ {{state, State}, EvHandlerState}.
+
+headers_frame_response(State=#http3_state{content_handlers=Handlers0},
+ Stream=#stream{ref=StreamRef, reply_to=ReplyTo},
+ IsFin, Status, Headers, EvHandler, EvHandlerState0) ->
+ RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef),
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
+ EvHandlerState1 = EvHandler:response_headers(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ {Handlers, EvHandlerState} = case IsFin of
+ fin ->
+ EvHandlerState2 = EvHandler:response_end(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState1),
+ {undefined, EvHandlerState2};
+ nofin ->
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
+ Status, Headers, Handlers0), EvHandlerState1}
+ end,
+ %% @todo Uncomment when tunnel is added.
+ %% We disable the tunnel, if any, when receiving any non 2xx response.
+ %%
+ %% We do not remove the stream immediately. We will only when
+ %% the QUIC stream gets closed.
+ {{state, stream_update(State,
+ Stream#stream{handler_state=Handlers})},%, tunnel=undefined})},
+ EvHandlerState}.
+
+trailers_frame(State, #stream{ref=StreamRef, reply_to=ReplyTo},
+ Trailers, EvHandler, EvHandlerState0) ->
+ %% @todo We probably want to pass this to gun_content_handler?
+ RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
+ ResponseEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
+ EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1),
+ %% We do not remove the stream immediately. We will only when
+ %% the QUIC stream gets closed.
+ {{state, State}, EvHandlerState}.
+
+-spec goaway(_, _) -> no_return().
+
+goaway(_State, _GoAway) ->
+ error(todo).
+
+-spec reset_stream(_, _, _) -> no_return().
+
+reset_stream(_State, _StreamID, _Error) ->
+ error(todo).
+
+-spec ignored_frame(_, _) -> no_return().
+
+%% @todo Cookie/events.
+ignored_frame(_State, _Stream) ->
+ error(todo).
+
+stream_abort_receive(State=#http3_state{conn=Conn, transport=Transport},
+ Stream=#stream{id=StreamID}, Reason) ->
+ Transport:shutdown_stream(Conn, StreamID, receiving, cow_http3:error_to_code(Reason)),
+ stream_update(State, Stream#stream{status=discard}).
+
+-spec connection_error(_, _) -> no_return().
+
+connection_error(_State, _Error) ->
+ error(todo).
+
+-spec handle_continue(_, _, _, _, _, _) -> no_return().
+
+handle_continue(_ContinueStreamRef, _Msg, _State, _CookieStore, _EvHandler, _EvHandlerState) ->
+ error(unimplemented).
+
+-spec update_flow(_, _, _, _) -> no_return().
+
+update_flow(_State, _ReplyTo, _StreamRef, _Inc) ->
+ error(unimplemented).
+
+closing(_Reason, _State, _, EvHandlerState) ->
+ %% @todo Implement graceful shutdown.
+ {[], EvHandlerState}.
+
+close(_Reason, _State, _, EvHandlerState) ->
+ %% @todo Implement.
+ EvHandlerState.
+
+-spec keepalive(_, _, _) -> no_return().
+
+keepalive(_State, _, _EvHandlerState) ->
+ error(todo).
+
+headers(State0=#http3_state{conn=Conn, transport=Transport,
+ http3_machine=HTTP3Machine0}, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers0, _InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
+ {ok, StreamID} = Transport:start_bidi_stream(Conn),
+ HTTP3Machine1 = cow_http3_machine:init_bidi_stream(StreamID,
+ iolist_to_binary(Method), HTTP3Machine0),
+ {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers(
+ Method, Host, Port, Path, Headers0, CookieStore0),
+ Authority = maps:get(authority, PseudoHeaders),
+ RealStreamRef = StreamRef, %% @todo stream_ref(State0, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => Method,
+ authority => Authority,
+ path => Path,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ {ok, nofin, HeaderBlock, Instrs, HTTP3Machine} = cow_http3_machine:prepare_headers(
+ StreamID, HTTP3Machine1, nofin, PseudoHeaders, Headers),
+ State1 = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs),
+ ok = Transport:send(Conn, StreamID, cow_http3:headers(HeaderBlock)),
+ EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ %% @todo InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+ status=normal, authority=Authority, path=Path},
+ State = stream_new_local(State1, Stream),
+ {{state, State}, CookieStore, EvHandlerState}.
+
+%% @todo We need to configure the initial flow control for the stream.
+request(State0=#http3_state{conn=Conn, transport=Transport,
+ http3_machine=HTTP3Machine0}, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers0, Body, _InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
+ Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
+ {<<"content-length">>, integer_to_binary(iolist_size(Body))}),
+ %% @todo InitialFlow = initial_flow(InitialFlow0, Opts),
+ {ok, StreamID} = Transport:start_bidi_stream(Conn),
+ HTTP3Machine1 = cow_http3_machine:init_bidi_stream(StreamID,
+ iolist_to_binary(Method), HTTP3Machine0),
+ {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers(
+ Method, Host, Port, Path, Headers1, CookieStore0),
+ Authority = maps:get(authority, PseudoHeaders),
+ RealStreamRef = StreamRef, %% @todo stream_ref(State0, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => Method,
+ authority => Authority,
+ path => Path,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ {ok, fin, HeaderBlock, Instrs, HTTP3Machine} = cow_http3_machine:prepare_headers(
+ StreamID, HTTP3Machine1, fin, PseudoHeaders, Headers),
+ State1 = send_instructions(State0#http3_state{http3_machine=HTTP3Machine}, Instrs),
+ %% @todo Handle send errors.
+ ok = Transport:send(Conn, StreamID, [
+ cow_http3:headers(HeaderBlock),
+ %% Only send a DATA frame if we have a body.
+ case iolist_size(Body) of
+ 0 -> <<>>;
+ _ -> cow_http3:data(Body)
+ end
+ ], fin),
+ EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+ status=normal, authority=Authority, path=Path},
+ State = stream_new_local(State1, Stream),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}.
+
+prepare_headers(Method, Host0, Port, Path, Headers0, CookieStore0) ->
+ Scheme = <<"https">>,
+ Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
+ {_, Host} -> Host;
+ _ -> gun_http:host_header(tls, Host0, Port)
+ end,
+ %% @todo We also must remove any header found in the connection header.
+ %% @todo Much of this is duplicated in cow_http2, cow_http2_machine
+ %% and cow_http3_machine; sort things out.
+ Headers1 =
+ lists:keydelete(<<"host">>, 1,
+ lists:keydelete(<<"connection">>, 1,
+ lists:keydelete(<<"keep-alive">>, 1,
+ lists:keydelete(<<"proxy-connection">>, 1,
+ lists:keydelete(<<"transfer-encoding">>, 1,
+ lists:keydelete(<<"upgrade">>, 1, Headers0)))))),
+ {Headers, CookieStore} = gun_cookies:add_cookie_header(
+ Scheme, Authority, Path, Headers1, CookieStore0),
+ PseudoHeaders = #{
+ method => Method,
+ scheme => Scheme,
+ authority => Authority,
+ path => Path
+ },
+ {ok, PseudoHeaders, Headers, CookieStore}.
+
+%% @todo We would open unidi streams here if we only open on-demand.
+%% No instructions.
+send_instructions(State, undefined) ->
+ State;
+%% Decoder instructions.
+send_instructions(State=#http3_state{conn=Conn, transport=Transport,
+ local_decoder_id=DecoderID}, {decoder_instructions, DecData}) ->
+ %% @todo Handle send errors.
+ ok = Transport:send(Conn, DecoderID, DecData),
+ State;
+%% Encoder instructions.
+send_instructions(State=#http3_state{conn=Conn, transport=Transport,
+ local_encoder_id=EncoderID}, {encoder_instructions, EncData}) ->
+ %% @todo Handle send errors.
+ ok = Transport:send(Conn, EncoderID, EncData),
+ State.
+
+data(State=#http3_state{conn=Conn, transport=Transport}, StreamRef, _ReplyTo, IsFin, Data,
+ _EvHandler, EvHandlerState) when is_reference(StreamRef) ->
+ case stream_get_by_ref(State, StreamRef) of
+ #stream{id=StreamID} -> %, tunnel=Tunnel} ->
+ ok = Transport:send(Conn, StreamID, cow_http3:data(Data), IsFin),
+ {[], EvHandlerState} %% @todo Probably wrong, need to update/keep states?
+%% @todo
+% case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
+% {ok, fin, _} ->
+% error_stream_closed(State, StreamRef, ReplyTo),
+% {[], EvHandlerState};
+% {ok, _, fin} ->
+% error_stream_closed(State, StreamRef, ReplyTo),
+% {[], EvHandlerState};
+% {ok, _, _} when Tunnel =:= undefined ->
+% maybe_send_data(State,
+% StreamID, IsFin, Data, EvHandler, EvHandlerState);
+% {ok, _, _} ->
+% #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel,
+% {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
+% ReplyTo, IsFin, Data, EvHandler, EvHandlerState),
+% tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1)
+% end%;
+%% @todo
+% error ->
+% error_stream_not_found(State, StreamRef, ReplyTo),
+% {[], EvHandlerState}
+ end.
+
+-spec connect(_, _, _, _, _, _, _, _, _) -> no_return().
+
+connect(_State, StreamRef, _ReplyTo, _Destination, _TunnelInfo, _Headers0,
+ _InitialFlow0, _EvHandler, _EvHandlerState0) when is_reference(StreamRef) ->
+ error(unimplemented).
+
+-spec cancel(_, _, _, _, _) -> no_return().
+
+cancel(_State, StreamRef, _ReplyTo, _EvHandler, _EvHandlerState0)
+ when is_reference(StreamRef) ->
+ error(unimplemented).
+
+-spec timeout(_, _, _) -> no_return().
+
+timeout(_State, _Timeout, _TRef) ->
+ error(todo).
+
+-spec stream_info(_, _) -> no_return().
+
+stream_info(_State, StreamRef) when is_reference(StreamRef) ->
+ error(unimplemented).
+
+-spec down(_) -> no_return().
+
+down(_State) ->
+ error(todo).
+
+-spec ws_upgrade(_, _, _, _, _, _, _, _, _, _, _) -> no_return().
+
+ws_upgrade(_State, StreamRef, _ReplyTo,
+ _Host, _Port, _Path, _Headers0, _WsOpts,
+ _CookieStore0, _EvHandler, _EvHandlerState0)
+ when is_reference(StreamRef) ->
+ error(todo).
+
+-spec ws_send(_, _, _, _, _, _) -> no_return().
+
+ws_send(_Frames, _State, _RealStreamRef, _ReplyTo, _EvHandler, _EvHandlerState0) ->
+ error(todo).
+
+%% Streams.
+
+stream_get(#http3_state{streams=Streams}, StreamID) ->
+ maps:get(StreamID, Streams, error).
+
+stream_get_by_ref(State=#http3_state{stream_refs=Refs}, StreamRef) ->
+ case maps:get(StreamRef, Refs, error) of
+ error -> error;
+ StreamID -> stream_get(State, StreamID)
+ end.
+
+stream_new_remote(State=#http3_state{http3_machine=HTTP3Machine0,
+ streams=Streams, stream_refs=Refs}, StreamID, StreamType) ->
+ %% All remote streams to the client are expected to be unidirectional.
+ %% @todo Handle errors instead of crashing.
+ unidi = StreamType,
+ HTTP3Machine = cow_http3_machine:init_unidi_stream(StreamID,
+ unidi_remote, HTTP3Machine0),
+ StreamRef = make_ref(),
+ Stream = #stream{id=StreamID, ref=StreamRef, status=header},
+ State#http3_state{
+ http3_machine=HTTP3Machine,
+ streams=Streams#{StreamID => Stream},
+ stream_refs=Refs#{StreamRef => StreamID}
+ }.
+
+stream_new_local(State=#http3_state{streams=Streams, stream_refs=Refs},
+ Stream=#stream{id=StreamID, ref=StreamRef}) ->
+ State#http3_state{
+ streams=Streams#{StreamID => Stream},
+ stream_refs=Refs#{StreamRef => StreamID}
+ }.
+
+stream_update(State=#http3_state{streams=Streams},
+ Stream=#stream{id=StreamID}) ->
+ State#http3_state{
+ streams=Streams#{StreamID => Stream}
+ }.
+
+stream_aborted(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
+ case stream_take(State0, StreamID) of
+ {#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
+ ReplyTo ! {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef),
+ {stream_error, Reason, 'Stream reset by server.'}},
+ EvHandlerState = EvHandler:cancel(#{
+ stream_ref => StreamRef, %% @todo stream_ref(State, StreamRef),
+ reply_to => ReplyTo,
+ endpoint => remote,
+ reason => Reason
+ }, EvHandlerState0),
+ {{state, State}, EvHandlerState};
+ error ->
+ {{state, State0}, EvHandlerState0}
+ end.
+
+stream_take(State=#http3_state{streams=Streams0, stream_refs=Refs}, StreamID) ->
+ case maps:take(StreamID, Streams0) of
+ {Stream=#stream{ref=StreamRef}, Streams} ->
+ {Stream, State#http3_state{
+ streams=Streams,
+ stream_refs=maps:remove(StreamRef, Refs)
+ }};
+ error ->
+ error
+ end.