aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cowboy.erl83
-rw-r--r--src/cowboy_http.erl4
-rw-r--r--src/cowboy_http2.erl2
-rw-r--r--src/cowboy_http3.erl973
-rw-r--r--src/cowboy_quicer.erl231
-rw-r--r--src/cowboy_stream_h.erl5
-rw-r--r--src/cowboy_websocket.erl18
7 files changed, 1307 insertions, 9 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl
index ad45919..e5ed831 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -16,6 +16,7 @@
-export([start_clear/3]).
-export([start_tls/3]).
+-export([start_quic/3]).
-export([stop_listener/1]).
-export([get_env/2]).
-export([get_env/3]).
@@ -25,6 +26,9 @@
-export([log/2]).
-export([log/4]).
+%% Don't warn about the bad quicer specs.
+-dialyzer([{nowarn_function, start_quic/3}]).
+
-type opts() :: cowboy_http:opts() | cowboy_http2:opts().
-export_type([opts/0]).
@@ -44,6 +48,7 @@
-spec start_clear(ranch:ref(), ranch:opts(), opts())
-> {ok, pid()} | {error, any()}.
+
start_clear(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
{TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
@@ -52,6 +57,7 @@ start_clear(Ref, TransOpts0, ProtoOpts0) ->
-spec start_tls(ranch:ref(), ranch:opts(), opts())
-> {ok, pid()} | {error, any()}.
+
start_tls(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
SocketOpts = maps:get(socket_opts, TransOpts1, []),
@@ -62,28 +68,103 @@ start_tls(Ref, TransOpts0, ProtoOpts0) ->
ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
+%% @todo Experimental function to start a barebone QUIC listener.
+%% This will need to be reworked to be closer to Ranch
+%% listeners and provide equivalent features.
+%%
+%% @todo Better type for transport options. Might require fixing quicer types.
+
+-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
+ -> {ok, pid()}.
+
+start_quic(Ref, TransOpts, ProtoOpts) ->
+ {ok, _} = application:ensure_all_started(quicer),
+ Parent = self(),
+ SocketOpts0 = maps:get(socket_opts, TransOpts, []),
+ {Port, SocketOpts2} = case lists:keytake(port, 1, SocketOpts0) of
+ {value, {port, Port0}, SocketOpts1} ->
+ {Port0, SocketOpts1};
+ false ->
+ {port_0(), SocketOpts0}
+ end,
+ SocketOpts = [
+ {alpn, ["h3"]}, %% @todo Why not binary?
+ {peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec.
+ {peer_bidi_stream_count, 100}
+ |SocketOpts2],
+ _ListenerPid = spawn(fun() ->
+ {ok, Listener} = quicer:listen(Port, SocketOpts),
+ Parent ! {ok, Listener},
+ _AcceptorPid = [spawn(fun AcceptLoop() ->
+ {ok, Conn} = quicer:accept(Listener, []),
+ Pid = spawn(fun() ->
+ receive go -> ok end,
+ %% We have to do the handshake after handing control of
+ %% the connection otherwise streams may come in before
+ %% the controlling process is changed and messages will
+ %% not be sent to the correct process.
+ {ok, Conn} = quicer:handshake(Conn),
+ process_flag(trap_exit, true), %% @todo Only if supervisor though.
+ try cowboy_http3:init(Parent, Ref, Conn, ProtoOpts)
+ catch
+ exit:{shutdown,_} -> ok;
+ C:E:S -> log(error, "CRASH ~p:~p:~p", [C,E,S], ProtoOpts)
+ end
+ end),
+ ok = quicer:controlling_process(Conn, Pid),
+ Pid ! go,
+ AcceptLoop()
+ end) || _ <- lists:seq(1, 20)],
+ %% Listener process must not terminate.
+ receive after infinity -> ok end
+ end),
+ receive
+ {ok, Listener} ->
+ {ok, Listener}
+ end.
+
+%% Select a random UDP port using gen_udp because quicer
+%% does not provide equivalent functionality. Taken from
+%% quicer test suites.
+port_0() ->
+ {ok, Socket} = gen_udp:open(0, [{reuseaddr, true}]),
+ {ok, {_, Port}} = inet:sockname(Socket),
+ gen_udp:close(Socket),
+ case os:type() of
+ {unix, darwin} ->
+ %% Apparently macOS doesn't free the port immediately.
+ timer:sleep(500);
+ _ ->
+ ok
+ end,
+ Port.
+
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
{TransOpts, ConnectionType};
ensure_connection_type(TransOpts) ->
{TransOpts#{connection_type => supervisor}, supervisor}.
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
+
stop_listener(Ref) ->
ranch:stop_listener(Ref).
-spec get_env(ranch:ref(), atom()) -> ok.
+
get_env(Ref, Name) ->
Opts = ranch:get_protocol_options(Ref),
Env = maps:get(env, Opts, #{}),
maps:get(Name, Env).
-spec get_env(ranch:ref(), atom(), any()) -> ok.
+
get_env(Ref, Name, Default) ->
Opts = ranch:get_protocol_options(Ref),
Env = maps:get(env, Opts, #{}),
maps:get(Name, Env, Default).
-spec set_env(ranch:ref(), atom(), any()) -> ok.
+
set_env(Ref, Name, Value) ->
Opts = ranch:get_protocol_options(Ref),
Env = maps:get(env, Opts, #{}),
@@ -93,10 +174,12 @@ set_env(Ref, Name, Value) ->
%% Internal.
-spec log({log, logger:level(), io:format(), list()}, opts()) -> ok.
+
log({log, Level, Format, Args}, Opts) ->
log(Level, Format, Args, Opts).
-spec log(logger:level(), io:format(), list(), opts()) -> ok.
+
log(Level, Format, Args, #{logger := Logger})
when Logger =/= error_logger ->
_ = Logger:Level(Format, Args),
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index ee1e725..9c92ec5 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -12,6 +12,8 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+%% @todo Worth renaming to cowboy_http1.
+%% @todo Change use of cow_http to cow_http1 where appropriate.
-module(cowboy_http).
-export([init/6]).
@@ -1531,7 +1533,7 @@ maybe_socket_error(_, Result = {ok, _}, _) ->
maybe_socket_error(State, {error, Reason}, Human) ->
terminate(State, {socket_error, Reason, Human}).
--spec terminate(_, _) -> no_return().
+-spec terminate(#state{} | undefined, _) -> no_return().
terminate(undefined, Reason) ->
exit({shutdown, Reason});
terminate(State=#state{streams=Streams, children=Children}, Reason) ->
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 5b1f1e1..2e73d5f 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -1139,7 +1139,7 @@ maybe_socket_error(_, Result = {ok, _}, _) ->
maybe_socket_error(State, {error, Reason}, Human) ->
terminate(State, {socket_error, Reason, Human}).
--spec terminate(#state{}, _) -> no_return().
+-spec terminate(#state{} | undefined, _) -> no_return().
terminate(undefined, Reason) ->
exit({shutdown, Reason});
terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
new file mode 100644
index 0000000..ef3e3f6
--- /dev/null
+++ b/src/cowboy_http3.erl
@@ -0,0 +1,973 @@
+%% Copyright (c) 2023-2024, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% A key difference between cowboy_http2 and cowboy_http3
+%% is that HTTP/3 streams are QUIC streams and therefore
+%% much of the connection state is handled outside of
+%% Cowboy.
+
+-module(cowboy_http3).
+
+-export([init/4]).
+
+%% Temporary callback to do sendfile over QUIC.
+-export([send/2]).
+
+%% @todo Graceful shutdown? Linger? Timeouts? Frame rates? PROXY header?
+-type opts() :: #{
+ compress_buffering => boolean(),
+ compress_threshold => non_neg_integer(),
+ connection_type => worker | supervisor,
+ enable_connect_protocol => boolean(),
+ env => cowboy_middleware:env(),
+ logger => module(),
+ max_decode_blocked_streams => 0..16#3fffffffffffffff,
+ max_decode_table_size => 0..16#3fffffffffffffff,
+ max_encode_blocked_streams => 0..16#3fffffffffffffff,
+ max_encode_table_size => 0..16#3fffffffffffffff,
+ max_ignored_frame_size_received => non_neg_integer() | infinity,
+ metrics_callback => cowboy_metrics_h:metrics_callback(),
+ metrics_req_filter => fun((cowboy_req:req()) -> map()),
+ metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
+ middlewares => [module()],
+ shutdown_timeout => timeout(),
+ stream_handlers => [module()],
+ tracer_callback => cowboy_tracer_h:tracer_callback(),
+ tracer_flags => [atom()],
+ tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
+ %% Open ended because configured stream handlers might add options.
+ _ => _
+}.
+-export_type([opts/0]).
+
+-record(stream, {
+ id :: cow_http3:stream_id(),
+
+ %% Whether the stream is currently in a special state.
+ status :: header | {unidi, control | encoder | decoder}
+ | normal | {data | ignore, non_neg_integer()} | stopping,
+
+ %% Stream buffer.
+ buffer = <<>> :: binary(),
+
+ %% Stream state.
+ state = undefined :: undefined | {module, any()}
+}).
+
+-record(state, {
+ parent :: pid(),
+ ref :: ranch:ref(),
+ conn :: cowboy_quicer:quicer_connection_handle(),
+ opts = #{} :: opts(),
+
+ %% Remote address and port for the connection.
+ peer = undefined :: {inet:ip_address(), inet:port_number()},
+
+ %% Local address and port for the connection.
+ sock = undefined :: {inet:ip_address(), inet:port_number()},
+
+ %% Client certificate.
+ cert :: undefined | binary(),
+
+ %% HTTP/3 state machine.
+ http3_machine :: cow_http3_machine:http3_machine(),
+
+ %% Specially handled local unidi streams.
+ local_control_id = undefined :: undefined | cow_http3:stream_id(),
+ local_encoder_id = undefined :: undefined | cow_http3:stream_id(),
+ local_decoder_id = undefined :: undefined | cow_http3:stream_id(),
+
+ %% Bidirectional streams used for requests and responses,
+ %% as well as unidirectional streams initiated by the client.
+ streams = #{} :: #{cow_http3:stream_id() => #stream{}},
+
+ %% Lingering streams that were recently reset. We may receive
+ %% pending data or messages for these streams a short while
+ %% after they have been reset.
+ lingering_streams = [] :: [non_neg_integer()],
+
+ %% Streams can spawn zero or more children which are then managed
+ %% by this module if operating as a supervisor.
+ children = cowboy_children:init() :: cowboy_children:children()
+}).
+
+-spec init(pid(), ranch:ref(), cowboy_quicer:quicer_connection_handle(), opts())
+ -> no_return().
+
+init(Parent, Ref, Conn, Opts) ->
+ {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
+ %% Immediately open a control, encoder and decoder stream.
+ %% @todo An endpoint MAY avoid creating an encoder stream if it will not be used (for example, if its encoder does not wish to use the dynamic table or if the maximum size of the dynamic table permitted by the peer is zero).
+ %% @todo An endpoint MAY avoid creating a decoder stream if its decoder sets the maximum capacity of the dynamic table to zero.
+ {ok, ControlID} = maybe_socket_error(undefined,
+ cowboy_quicer:start_unidi_stream(Conn, [<<0>>, SettingsBin]),
+ 'A socket error occurred when opening the control stream.'),
+ {ok, EncoderID} = maybe_socket_error(undefined,
+ cowboy_quicer:start_unidi_stream(Conn, <<2>>),
+ 'A socket error occurred when opening the encoder stream.'),
+ {ok, DecoderID} = maybe_socket_error(undefined,
+ cowboy_quicer:start_unidi_stream(Conn, <<3>>),
+ 'A socket error occurred when opening the encoder stream.'),
+ %% Set the control, encoder and decoder streams in the machine.
+ HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
+ ControlID, EncoderID, DecoderID, HTTP3Machine0),
+ %% Get the peername/sockname/cert.
+ {ok, Peer} = maybe_socket_error(undefined, cowboy_quicer:peername(Conn),
+ 'A socket error occurred when retrieving the peer name.'),
+ {ok, Sock} = maybe_socket_error(undefined, cowboy_quicer:sockname(Conn),
+ 'A socket error occurred when retrieving the sock name.'),
+ CertResult = case cowboy_quicer:peercert(Conn) of
+ {error, no_peercert} ->
+ {ok, undefined};
+ Cert0 ->
+ Cert0
+ end,
+ {ok, Cert} = maybe_socket_error(undefined, CertResult,
+ 'A socket error occurred when retrieving the client TLS certificate.'),
+ %% Quick! Let's go!
+ loop(#state{parent=Parent, ref=Ref, conn=Conn,
+ opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ http3_machine=HTTP3Machine, local_control_id=ControlID,
+ local_encoder_id=EncoderID, local_decoder_id=DecoderID}).
+
+loop(State0=#state{opts=Opts, children=Children}) ->
+ receive
+ Msg when element(1, Msg) =:= quic ->
+ handle_quic_msg(State0, Msg);
+ %% Timeouts.
+ {timeout, Ref, {shutdown, Pid}} ->
+ cowboy_children:shutdown_timeout(Children, Ref, Pid),
+ loop(State0);
+ %% Messages pertaining to a stream.
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
+ loop(info(State0, StreamID, Msg));
+ %% Exit signal from children.
+ Msg = {'EXIT', Pid, _} ->
+ loop(down(State0, Pid, Msg));
+ Msg ->
+ cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
+ loop(State0)
+ end.
+
+handle_quic_msg(State0=#state{opts=Opts}, Msg) ->
+ case cowboy_quicer:handle(Msg) of
+ {data, StreamID, IsFin, Data} ->
+ parse(State0, StreamID, Data, IsFin);
+ {stream_started, StreamID, StreamType} ->
+ State = stream_new_remote(State0, StreamID, StreamType),
+ loop(State);
+ {stream_closed, StreamID, ErrorCode} ->
+ State = stream_closed(State0, StreamID, ErrorCode),
+ loop(State);
+ closed ->
+ %% @todo Different error reason if graceful?
+ Reason = {socket_error, closed, 'The socket has been closed.'},
+ terminate(State0, Reason);
+ ok ->
+ loop(State0);
+ unknown ->
+ cowboy:log(warning, "Received unknown QUIC message ~p.", [Msg], Opts),
+ loop(State0);
+ {socket_error, Reason} ->
+ terminate(State0, {socket_error, Reason,
+ 'An error has occurred on the socket.'})
+ end.
+
+parse(State=#state{opts=Opts}, StreamID, Data, IsFin) ->
+ case stream_get(State, StreamID) of
+ Stream=#stream{buffer= <<>>} ->
+ parse1(State, Stream, Data, IsFin);
+ Stream=#stream{buffer=Buffer} ->
+ Stream1 = Stream#stream{buffer= <<>>},
+ parse1(stream_store(State, Stream1),
+ Stream1, <<Buffer/binary, Data/binary>>, IsFin);
+ %% Pending data for a stream that has been reset. Ignore.
+ error ->
+ case is_lingering_stream(State, StreamID) of
+ true ->
+ ok;
+ false ->
+ %% We avoid logging the data as it could be quite large.
+ cowboy:log(warning, "Received data for unknown stream ~p.",
+ [StreamID], Opts)
+ end,
+ loop(State)
+ end.
+
+parse1(State, Stream=#stream{status=header}, Data, IsFin) ->
+ parse_unidirectional_stream_header(State, Stream, Data, IsFin);
+parse1(State=#state{http3_machine=HTTP3Machine0},
+ #stream{status={unidi, Type}, id=StreamID}, Data, IsFin)
+ when Type =:= encoder; Type =:= decoder ->
+ case cow_http3_machine:unidi_data(Data, IsFin, StreamID, HTTP3Machine0) of
+ {ok, Instrs, HTTP3Machine} ->
+ loop(send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs));
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end;
+parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ %% We don't have the full frame but this is the end of the
+ %% data we have. So FrameIsFin is equivalent to IsFin here.
+ loop(frame(State, Stream#stream{status={data, Len - DataLen}}, {data, Data}, IsFin));
+ true ->
+ <<Data1:Len/binary, Rest/bits>> = Data,
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
+ StreamID, Rest, IsFin)
+ end;
+parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ loop(stream_store(State, Stream#stream{status={ignore, Len - DataLen}}));
+ true ->
+ <<_:Len/binary, Rest/bits>> = Data,
+ parse(stream_store(State, Stream#stream{status=normal}),
+ StreamID, Rest, IsFin)
+ end;
+%% @todo Clause that discards receiving data for stopping streams.
+%% We may receive a few more frames after we abort receiving.
+parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
+ case cow_http3:parse(Data) of
+ {ok, Frame, Rest} ->
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
+ {more, Frame = {data, _}, Len} ->
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin ->
+ %% The stream will be stored at the end of processing commands.
+ loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
+ fin ->
+ terminate(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'})
+ end;
+ {more, ignore, Len} ->
+ %% @todo This setting should be tested.
+ %%
+ %% While the default value doesn't warrant doing a streaming ignore
+ %% (and could work just fine with the 'more' clause), this value
+ %% is configurable and users may want to set it large.
+ MaxIgnoredLen = maps:get(max_ignored_frame_size_received, Opts, 16384),
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin when Len < MaxIgnoredLen ->
+ %% We are not processing commands so we must store the stream.
+ %% We also call ignored_frame here; we will not need to call
+ %% it again when ignoring the rest of the data.
+ Stream1 = Stream#stream{status={ignore, Len}},
+ State1 = ignored_frame(State, Stream1),
+ loop(stream_store(State1, Stream1));
+ nofin ->
+ terminate(State, {connection_error, h3_excessive_load,
+ 'Ignored frame larger than limit. (RFC9114 10.5)'});
+ fin ->
+ terminate(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'})
+ end;
+ {ignore, Rest} ->
+ parse(ignored_frame(State, Stream), StreamID, Rest, IsFin);
+ Error = {connection_error, _, _} ->
+ terminate(State, Error);
+ more when Data =:= <<>> ->
+ %% The buffer was already reset to <<>>.
+ loop(stream_store(State, Stream));
+ more ->
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin ->
+ loop(stream_store(State, Stream#stream{buffer=Data}));
+ fin ->
+ terminate(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'})
+ end
+ end.
+
+%% We may receive multiple frames in a single QUIC packet.
+%% The FIN flag applies to the QUIC packet, not to the frame.
+%% We must therefore only consider the frame to have a FIN
+%% flag if there's no data remaining to be read.
+is_fin(fin, <<>>) -> fin;
+is_fin(_, _) -> nofin.
+
+parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
+ Stream0=#stream{id=StreamID}, Data, IsFin) ->
+ case cow_http3:parse_unidi_stream_header(Data) of
+ {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
+ case cow_http3_machine:set_unidi_remote_stream_type(
+ StreamID, Type, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State = State0#state{http3_machine=HTTP3Machine},
+ Stream = Stream0#stream{status={unidi, Type}},
+ parse(stream_store(State, Stream), StreamID, Rest, IsFin);
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State0#state{http3_machine=HTTP3Machine}, Error)
+ end;
+ {ok, push, _} ->
+ terminate(State0, {connection_error, h3_stream_creation_error,
+ 'Only servers can push. (RFC9114 6.2.2)'});
+ %% Unknown stream types must be ignored. We choose to abort the
+ %% stream instead of reading and discarding the incoming data.
+ {undefined, _} ->
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
+ end.
+
+frame(State=#state{http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, Frame, IsFin) ->
+ case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State#state{http3_machine=HTTP3Machine};
+ {ok, {data, Data}, HTTP3Machine} ->
+ data_frame(State#state{http3_machine=HTTP3Machine}, Stream, IsFin, Data);
+ {ok, {headers, Headers, PseudoHeaders, BodyLen}, Instrs, HTTP3Machine} ->
+ headers_frame(send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs),
+ Stream, IsFin, Headers, PseudoHeaders, BodyLen);
+ {ok, {trailers, _Trailers}, Instrs, HTTP3Machine} ->
+ %% @todo Propagate trailers.
+ send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs);
+ {ok, GoAway={goaway, _}, HTTP3Machine} ->
+ goaway(State#state{http3_machine=HTTP3Machine}, GoAway);
+ {error, Error={stream_error, _Reason, _Human}, Instrs, HTTP3Machine} ->
+ State1 = send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs),
+ reset_stream(State1, Stream, Error);
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end.
+
+data_frame(State=#state{opts=Opts},
+ Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
+ try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
+ {Commands, StreamState} ->
+ commands(State, Stream#stream{state=StreamState}, Commands)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(data,
+ [StreamID, IsFin, Data, StreamState0],
+ Class, Exception, Stacktrace), Opts),
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
+ 'Unhandled exception in cowboy_stream:data/4.'})
+ end.
+
+headers_frame(State, Stream, IsFin, Headers,
+ PseudoHeaders=#{method := <<"CONNECT">>}, _)
+ when map_size(PseudoHeaders) =:= 2 ->
+ early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
+ 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
+headers_frame(State, Stream, IsFin, Headers,
+ PseudoHeaders=#{method := <<"TRACE">>}, _) ->
+ early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
+ 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)');
+headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
+ headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
+headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) ->
+ case lists:keyfind(<<"host">>, 1, Headers) of
+ {_, Authority} ->
+ headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
+ _ ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
+ end.
+
+headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert},
+ Stream=#stream{id=StreamID}, IsFin, Headers,
+ PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
+ BodyLen, Authority) ->
+ try cow_http_hd:parse_host(Authority) of
+ {Host, Port0} ->
+ Port = ensure_port(Scheme, Port0),
+ try cow_http:parse_fullpath(PathWithQs) of
+ {<<>>, _} ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'The path component must not be empty. (RFC7540 8.1.2.3)'});
+ {Path, Qs} ->
+ Req0 = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+ peer => Peer,
+ sock => Sock,
+ cert => Cert,
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ port => Port,
+ path => Path,
+ qs => Qs,
+ version => 'HTTP/3',
+ headers => headers_to_map(Headers, #{}),
+ has_body => IsFin =:= nofin,
+ body_length => BodyLen
+ },
+ %% We add the protocol information for extended CONNECTs.
+ Req = case PseudoHeaders of
+ #{protocol := Protocol} -> Req0#{protocol => Protocol};
+ _ -> Req0
+ end,
+ headers_frame(State, Stream, Req)
+ catch _:_ ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
+ end
+ catch _:_ ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
+ end.
+
+%% @todo Copied from cowboy_http2.
+%% @todo How to handle "http"?
+ensure_port(<<"http">>, undefined) -> 80;
+ensure_port(<<"https">>, undefined) -> 443;
+ensure_port(_, Port) -> Port.
+
+%% @todo Copied from cowboy_http2.
+%% This function is necessary to properly handle duplicate headers
+%% and the special-case cookie header.
+headers_to_map([], Acc) ->
+ Acc;
+headers_to_map([{Name, Value}|Tail], Acc0) ->
+ Acc = case Acc0 of
+ %% The cookie header does not use proper HTTP header lists.
+ #{Name := Value0} when Name =:= <<"cookie">> ->
+ Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
+ #{Name := Value0} ->
+ Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
+ _ ->
+ Acc0#{Name => Value}
+ end,
+ headers_to_map(Tail, Acc).
+
+headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
+ try cowboy_stream:init(StreamID, Req, Opts) of
+ {Commands, StreamState} ->
+ commands(State, Stream#stream{state=StreamState}, Commands)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(init,
+ [StreamID, Req, Opts],
+ Class, Exception, Stacktrace), Opts),
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
+ 'Unhandled exception in cowboy_stream:init/3.'})
+ end.
+
+early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
+ Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method},
+ StatusCode0, HumanReadable) ->
+ %% We automatically terminate the stream but it is not an error
+ %% per se (at least not in the first implementation).
+ Reason = {stream_error, h3_no_error, HumanReadable},
+ %% The partial Req is minimal for now. We only have one case
+ %% where it can be called (when a method is completely disabled).
+ PartialReq = #{
+ ref => Ref,
+ peer => Peer,
+ method => Method,
+ headers => headers_to_map(Headers, #{})
+ },
+ Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
+ try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
+ {response, StatusCode, RespHeaders, RespBody} ->
+ send_response(State0, Stream, StatusCode, RespHeaders, RespBody)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(early_error,
+ [StreamID, Reason, PartialReq, Resp, Opts],
+ Class, Exception, Stacktrace), Opts),
+ %% We still need to send an error response, so send what we initially
+ %% wanted to send. It's better than nothing.
+ send_headers(State0, Stream, fin, StatusCode0, RespHeaders0)
+ end.
+
+%% Erlang messages.
+
+down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
+ State = case cowboy_children:down(Children0, Pid) of
+ %% The stream was terminated already.
+ {ok, undefined, Children} ->
+ State0#state{children=Children};
+ %% The stream is still running.
+ {ok, StreamID, Children} ->
+ info(State0#state{children=Children}, StreamID, Msg);
+ %% The process was unknown.
+ error ->
+ cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
+ [Msg, Pid], Opts),
+ State0
+ end,
+ if
+%% @todo
+% State#state.http2_status =:= closing, State#state.streams =:= #{} ->
+% terminate(State, {stop, normal, 'The connection is going away.'});
+ true ->
+ State
+ end.
+
+info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) ->
+ case stream_get(State, StreamID) of
+ Stream=#stream{state=StreamState0} ->
+ try cowboy_stream:info(StreamID, Msg, StreamState0) of
+ {Commands, StreamState} ->
+ commands(State, Stream#stream{state=StreamState}, Commands)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(info,
+ [StreamID, Msg, StreamState0],
+ Class, Exception, Stacktrace), Opts),
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
+ 'Unhandled exception in cowboy_stream:info/3.'})
+ end;
+ error ->
+ case is_lingering_stream(State, StreamID) of
+ true ->
+ ok;
+ false ->
+ cowboy:log(warning, "Received message ~p for unknown stream ~p.",
+ [Msg, StreamID], Opts)
+ end,
+ State
+ end.
+
+%% Stream handler commands.
+
+commands(State, Stream, []) ->
+ stream_store(State, Stream);
+%% Error responses are sent only if a response wasn't sent already.
+commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID},
+ [{error_response, StatusCode, Headers, Body}|Tail]) ->
+ case cow_http3_machine:get_bidi_stream_local_state(StreamID, HTTP3Machine) of
+ {ok, idle} ->
+ commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]);
+ _ ->
+ commands(State, Stream, Tail)
+ end;
+%% Send an informational response.
+commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
+ State = send_headers(State0, Stream, idle, StatusCode, Headers),
+ commands(State, Stream, Tail);
+%% Send response headers.
+commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
+ State = send_response(State0, Stream, StatusCode, Headers, Body),
+ commands(State, Stream, Tail);
+%% Send response headers.
+commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
+ State = send_headers(State0, Stream, nofin, StatusCode, Headers),
+ commands(State, Stream, Tail);
+%%% Send a response body chunk.
+commands(State0=#state{conn=Conn}, Stream=#stream{id=StreamID}, [{data, IsFin, Data}|Tail]) ->
+ _ = case Data of
+ {sendfile, Offset, Bytes, Path} ->
+ %% Temporary solution to do sendfile over QUIC.
+ {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
+ Path, Offset, Bytes, []),
+ ok = maybe_socket_error(State0,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), IsFin));
+ _ ->
+ ok = maybe_socket_error(State0,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(Data), IsFin))
+ end,
+ State = maybe_send_is_fin(State0, Stream, IsFin),
+ commands(State, Stream, Tail);
+%%% Send trailers.
+commands(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, [{trailers, Trailers}|Tail]) ->
+ State = case cow_http3_machine:prepare_trailers(
+ StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
+ {trailers, HeaderBlock, Instrs, HTTP3Machine} ->
+ State1 = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs),
+ ok = maybe_socket_error(State1,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), fin)),
+ State1;
+ {no_trailers, HTTP3Machine} ->
+ ok = maybe_socket_error(State0,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin)),
+ State0#state{http3_machine=HTTP3Machine}
+ end,
+ commands(State, Stream, Tail);
+%% Send a push promise.
+%%
+%% @todo Responses sent as a result of a push_promise request
+%% must not send push_promise frames themselves.
+%%
+%% @todo We should not send push_promise frames when we are
+%% in the closing http2_status.
+%commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0},
+% Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
+% Authority = case {Scheme, Port} of
+% {<<"http">>, 80} -> Host;
+% {<<"https">>, 443} -> Host;
+% _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
+% end,
+% PathWithQs = iolist_to_binary(case Qs of
+% <<>> -> Path;
+% _ -> [Path, $?, Qs]
+% end),
+% PseudoHeaders = #{
+% method => Method,
+% scheme => Scheme,
+% authority => Authority,
+% path => PathWithQs
+% },
+% %% We need to make sure the header value is binary before we can
+% %% create the Req object, as it expects them to be flat.
+% Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
+% %% @todo
+% State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0,
+% PseudoHeaders, Headers) of
+% {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} ->
+% Transport:send(Socket, cow_http2:push_promise(
+% StreamID, PromisedStreamID, HeaderBlock)),
+% headers_frame(State0#state{http3_machine=HTTP2Machine},
+% PromisedStreamID, fin, Headers, PseudoHeaders, 0);
+% {error, no_push} ->
+% State0
+% end,
+% commands(State, Stream, Tail);
+%%% Read the request body.
+%commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) ->
+commands(State, Stream, [{flow, _Size}|Tail]) ->
+ %% @todo We should tell the QUIC stream to increase its window size.
+% #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
+% State = update_window(State0#state{flow=Flow + Size,
+% streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
+% StreamID),
+ commands(State, Stream, Tail);
+%% Supervise a child process.
+commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
+ [{spawn, Pid, Shutdown}|Tail]) ->
+ commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
+ Stream, Tail);
+%% Error handling.
+commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
+ %% @todo Do we want to run the commands after an internal_error?
+ %% @todo Do we even allow commands after?
+ %% @todo Only reset when the stream still exists.
+ reset_stream(State, Stream, Error);
+%% Use a different protocol within the stream (CONNECT :protocol).
+%% @todo Make sure we error out when the feature is disabled.
+commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ Stream = stream_get(State, StreamID),
+ commands(State, Stream, Tail);
+%% Set options dynamically.
+commands(State, Stream, [{set_options, _Opts}|Tail]) ->
+ commands(State, Stream, Tail);
+commands(State, Stream, [stop|_Tail]) ->
+ %% @todo Do we want to run the commands after a stop?
+ %% @todo Do we even allow commands after?
+ stop_stream(State, Stream);
+%% Log event.
+commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) ->
+ cowboy:log(Log, Opts),
+ commands(State, Stream, Tail).
+
+send_response(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, StatusCode, Headers, Body) ->
+ Size = case Body of
+ {sendfile, _, Bytes0, _} -> Bytes0;
+ _ -> iolist_size(Body)
+ end,
+ case Size of
+ 0 ->
+ State = send_headers(State0, Stream, fin, StatusCode, Headers),
+ maybe_send_is_fin(State, Stream, fin);
+ _ ->
+ %% @todo Add a test for HEAD to make sure we don't send the body when
+ %% returning {response...} from a stream handler (or {headers...} then {data...}).
+ {ok, _IsFin, HeaderBlock, Instrs, HTTP3Machine}
+ = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin,
+ #{status => cow_http:status_to_integer(StatusCode)},
+ headers_to_list(Headers)),
+ State = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs),
+ %% @todo It might be better to do async sends.
+ _ = case Body of
+ {sendfile, Offset, Bytes, Path} ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock))),
+ %% Temporary solution to do sendfile over QUIC.
+ {ok, _} = maybe_socket_error(State,
+ ranch_transport:sendfile(?MODULE, {Conn, StreamID},
+ Path, Offset, Bytes, [])),
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin));
+ _ ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, [
+ cow_http3:headers(HeaderBlock),
+ cow_http3:data(Body)
+ ], fin))
+ end,
+ maybe_send_is_fin(State, Stream, fin)
+ end.
+
+maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, fin) ->
+ HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0),
+ maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream);
+maybe_send_is_fin(State, _, _) ->
+ State.
+
+%% Temporary callback to do sendfile over QUIC.
+-spec send({cowboy_quicer:quicer_connection_handle(), cow_http3:stream_id()},
+ iodata()) -> ok | {error, any()}.
+
+send({Conn, StreamID}, IoData) ->
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(IoData)).
+
+send_headers(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ #stream{id=StreamID}, IsFin0, StatusCode, Headers) ->
+ {ok, IsFin, HeaderBlock, Instrs, HTTP3Machine}
+ = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
+ #{status => cow_http:status_to_integer(StatusCode)},
+ headers_to_list(Headers)),
+ State = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs),
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), IsFin)),
+ State.
+
+%% The set-cookie header is special; we can only send one cookie per header.
+headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
+ Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
+ Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
+headers_to_list(Headers) ->
+ maps:to_list(Headers).
+
+%% @todo We would open unidi streams here if we only open on-demand.
+%% No instructions.
+send_instructions(State, undefined) ->
+ State;
+%% Decoder instructions.
+send_instructions(State=#state{conn=Conn, local_decoder_id=DecoderID},
+ {decoder_instructions, DecData}) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, DecoderID, DecData)),
+ State;
+%% Encoder instructions.
+send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
+ {encoder_instructions, EncData}) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, EncoderID, EncData)),
+ State.
+
+reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, Error) ->
+ Reason = case Error of
+ {internal_error, _, _} -> h3_internal_error;
+ {stream_error, Reason0, _} -> Reason0
+ end,
+ %% @todo Do we want to close both sides?
+ %% @todo Should we close the send side if the receive side was already closed?
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ both, cow_http3:error_to_code(Reason)),
+ State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error);
+ {error, not_found} ->
+ terminate_stream(State0, Stream, Error)
+ end,
+%% @todo
+% case reset_rate(State1) of
+% {ok, State} ->
+% State;
+% error ->
+% terminate(State1, {connection_error, enhance_your_calm,
+% 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
+% end.
+ State1.
+
+stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) ->
+ %% We abort reading when stopping the stream but only
+ %% if the client was not finished sending data.
+ %% We mark the stream as 'stopping' either way.
+ State = case cow_http3_machine:get_bidi_stream_remote_state(StreamID, HTTP3Machine) of
+ {ok, fin} ->
+ stream_store(State0, Stream#stream{status=stopping});
+ {error, not_found} ->
+ stream_store(State0, Stream#stream{status=stopping});
+ _ ->
+ stream_abort_receive(State0, Stream, h3_no_error)
+ end,
+ %% Then we may need to send a response or terminate it
+ %% if the stream handler did not do so already.
+ case cow_http3_machine:get_bidi_stream_local_state(StreamID, HTTP3Machine) of
+ %% When the stream terminates normally (without resetting the stream)
+ %% and no response was sent, we need to send a proper response back to the client.
+ {ok, idle} ->
+ info(State, StreamID, {response, 204, #{}, <<>>});
+ %% When a response was sent but not terminated, we need to close the stream.
+ %% We send a final DATA frame to complete the stream.
+ {ok, nofin} ->
+ info(State, StreamID, {data, fin, <<>>});
+ %% When a response was sent fully we can terminate the stream,
+ %% regardless of the stream being in half-closed or closed state.
+ _ ->
+ terminate_stream(State, Stream, normal)
+ end.
+
+maybe_terminate_stream(State, Stream=#stream{status=stopping}) ->
+ terminate_stream(State, Stream, normal);
+%% The Stream will be stored in the State at the end of commands processing.
+maybe_terminate_stream(State, _) ->
+ State.
+
+terminate_stream(State=#state{streams=Streams0, children=Children0},
+ #stream{id=StreamID, state=StreamState}, Reason) ->
+ Streams = maps:remove(StreamID, Streams0),
+ terminate_stream_handler(State, StreamID, Reason, StreamState),
+ Children = cowboy_children:shutdown(Children0, StreamID),
+ stream_linger(State#state{streams=Streams, children=Children}, StreamID).
+
+terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
+ try
+ cowboy_stream:terminate(StreamID, Reason, StreamState)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(terminate,
+ [StreamID, Reason, StreamState],
+ Class, Exception, Stacktrace), Opts)
+ end.
+
+ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) ->
+ case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State#state{http3_machine=HTTP3Machine};
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end.
+
+stream_abort_receive(State=#state{conn=Conn}, Stream=#stream{id=StreamID}, Reason) ->
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ receiving, cow_http3:error_to_code(Reason)),
+ stream_store(State, Stream#stream{status=stopping}).
+
+%% @todo Graceful connection shutdown.
+%% We terminate the connection immediately if it hasn't fully been initialized.
+-spec goaway(#state{}, {goaway, _}) -> no_return().
+goaway(State, {goaway, _}) ->
+ terminate(State, {stop, goaway, 'The connection is going away.'}).
+
+%% Function copied from cowboy_http.
+maybe_socket_error(State, {error, closed}) ->
+ terminate(State, {socket_error, closed, 'The socket has been closed.'});
+maybe_socket_error(State, Reason) ->
+ maybe_socket_error(State, Reason, 'An error has occurred on the socket.').
+
+maybe_socket_error(_, Result = ok, _) ->
+ Result;
+maybe_socket_error(_, Result = {ok, _}, _) ->
+ Result;
+maybe_socket_error(State, {error, Reason}, Human) ->
+ terminate(State, {socket_error, Reason, Human}).
+
+-spec terminate(#state{} | undefined, _) -> no_return().
+terminate(undefined, Reason) ->
+ exit({shutdown, Reason});
+terminate(State=#state{conn=Conn, %http3_status=Status,
+ %http3_machine=HTTP3Machine,
+ streams=Streams, children=Children}, Reason) ->
+% if
+% Status =:= connected; Status =:= closing_initiated ->
+%% @todo
+% %% We are terminating so it's OK if we can't send the GOAWAY anymore.
+% _ = cowboy_quicer:send(Conn, ControlID, cow_http3:goaway(
+% cow_http3_machine:get_last_streamid(HTTP3Machine))),
+ %% We already sent the GOAWAY frame.
+% Status =:= closing ->
+% ok
+% end,
+ terminate_all_streams(State, maps:to_list(Streams), Reason),
+ cowboy_children:terminate(Children),
+% terminate_linger(State),
+ _ = cowboy_quicer:shutdown(Conn, cow_http3:error_to_code(terminate_reason(Reason))),
+ exit({shutdown, Reason}).
+
+terminate_reason({connection_error, Reason, _}) -> Reason;
+terminate_reason({stop, _, _}) -> h3_no_error;
+terminate_reason({socket_error, _, _}) -> h3_internal_error.
+%terminate_reason({internal_error, _, _}) -> internal_error.
+
+terminate_all_streams(_, [], _) ->
+ ok;
+terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
+ terminate_stream_handler(State, StreamID, Reason, StreamState),
+ terminate_all_streams(State, Tail, Reason).
+
+stream_get(#state{streams=Streams}, StreamID) ->
+ maps:get(StreamID, Streams, error).
+
+stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
+ StreamID, StreamType) ->
+ {HTTP3Machine, Status} = case StreamType of
+ unidi ->
+ {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
+ header};
+ bidi ->
+ {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
+ normal}
+ end,
+ Stream = #stream{id=StreamID, status=Status},
+ State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
+
+%% Stream closed message for a local (write-only) unidi stream.
+stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
+ stream_closed1(State, StreamID);
+stream_closed(State=#state{local_encoder_id=StreamID}, StreamID, _) ->
+ stream_closed1(State, StreamID);
+stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
+ stream_closed1(State, StreamID);
+stream_closed(State=#state{opts=Opts,
+ streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
+ case maps:take(StreamID, Streams0) of
+ {#stream{state=undefined}, Streams} ->
+ %% Unidi stream has no handler/children.
+ stream_closed1(State#state{streams=Streams}, StreamID);
+ %% We only stop bidi streams if the stream was closed with an error
+ %% or the stream was already in the process of stopping.
+ {#stream{status=Status, state=StreamState}, Streams}
+ when Status =:= stopping; ErrorCode =/= 0 ->
+ terminate_stream_handler(State, StreamID, closed, StreamState),
+ Children = cowboy_children:shutdown(Children0, StreamID),
+ stream_closed1(State#state{streams=Streams, children=Children}, StreamID);
+ %% Don't remove a stream that terminated properly but
+ %% has chosen to remain up (custom stream handlers).
+ {_, _} ->
+ stream_closed1(State, StreamID);
+ %% Stream closed message for a stream that has been reset. Ignore.
+ error ->
+ case is_lingering_stream(State, StreamID) of
+ true ->
+ ok;
+ false ->
+ %% We avoid logging the data as it could be quite large.
+ cowboy:log(warning, "Received stream_closed for unknown stream ~p. ~p ~p",
+ [StreamID, self(), Streams0], Opts)
+ end,
+ State
+ end.
+
+stream_closed1(State=#state{http3_machine=HTTP3Machine0}, StreamID) ->
+ case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State#state{http3_machine=HTTP3Machine};
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end.
+
+stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) ->
+ State#state{streams=Streams#{StreamID => Stream}}.
+
+stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) ->
+ %% We only keep up to 100 streams in this state. @todo Make it configurable?
+ Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
+ State#state{lingering_streams=Lingering}.
+
+is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) ->
+ lists:member(StreamID, Lingering).
diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl
new file mode 100644
index 0000000..d9bbe1f
--- /dev/null
+++ b/src/cowboy_quicer.erl
@@ -0,0 +1,231 @@
+%% Copyright (c) 2023, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% QUIC transport using the emqx/quicer NIF.
+
+-module(cowboy_quicer).
+
+%% Connection.
+-export([peername/1]).
+-export([sockname/1]).
+-export([peercert/1]).
+-export([shutdown/2]).
+
+%% Streams.
+-export([start_unidi_stream/2]).
+-export([send/3]).
+-export([send/4]).
+-export([shutdown_stream/4]).
+
+%% Messages.
+-export([handle/1]).
+
+-ifndef(COWBOY_QUICER).
+
+-spec peername(_) -> no_return().
+peername(_) -> no_quicer().
+
+-spec sockname(_) -> no_return().
+sockname(_) -> no_quicer().
+
+-spec peercert(_) -> no_return().
+peercert(_) -> no_quicer().
+
+-spec shutdown(_, _) -> no_return().
+shutdown(_, _) -> no_quicer().
+
+-spec start_unidi_stream(_, _) -> no_return().
+start_unidi_stream(_, _) -> no_quicer().
+
+-spec send(_, _, _) -> no_return().
+send(_, _, _) -> no_quicer().
+
+-spec send(_, _, _, _) -> no_return().
+send(_, _, _, _) -> no_quicer().
+
+-spec shutdown_stream(_, _, _, _) -> no_return().
+shutdown_stream(_, _, _, _) -> no_quicer().
+
+-spec handle(_) -> no_return().
+handle(_) -> no_quicer().
+
+no_quicer() ->
+ error({no_quicer,
+ "Cowboy must be compiled with environment variable COWBOY_QUICER=1 "
+ "or with compilation flag -D COWBOY_QUICER=1 in order to enable "
+ "QUIC support using the emqx/quic NIF"}).
+
+-else.
+
+%% @todo Make quicer export these types.
+-type quicer_connection_handle() :: reference().
+-export_type([quicer_connection_handle/0]).
+
+-type quicer_app_errno() :: non_neg_integer().
+
+-include_lib("quicer/include/quicer.hrl").
+
+%% Connection.
+
+-spec peername(quicer_connection_handle())
+ -> {ok, {inet:ip_address(), inet:port_number()}}
+ | {error, any()}.
+
+peername(Conn) ->
+ quicer:peername(Conn).
+
+-spec sockname(quicer_connection_handle())
+ -> {ok, {inet:ip_address(), inet:port_number()}}
+ | {error, any()}.
+
+sockname(Conn) ->
+ quicer:sockname(Conn).
+
+-spec peercert(quicer_connection_handle())
+ -> {ok, public_key:der_encoded()}
+ | {error, any()}.
+
+peercert(Conn) ->
+ quicer_nif:peercert(Conn).
+
+-spec shutdown(quicer_connection_handle(), quicer_app_errno())
+ -> ok | {error, any()}.
+
+shutdown(Conn, ErrorCode) ->
+ quicer:shutdown_connection(Conn,
+ ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
+ ErrorCode).
+
+%% Streams.
+
+-spec start_unidi_stream(quicer_connection_handle(), iodata())
+ -> {ok, cow_http3:stream_id()}
+ | {error, any()}.
+
+start_unidi_stream(Conn, HeaderData) ->
+ case quicer:start_stream(Conn, #{
+ active => true,
+ open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of
+ {ok, StreamRef} ->
+ case quicer:send(StreamRef, HeaderData) of
+ {ok, _} ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ put({quicer_stream, StreamID}, StreamRef),
+ {ok, StreamID};
+ Error ->
+ Error
+ end;
+ {error, Reason1, Reason2} ->
+ {error, {Reason1, Reason2}};
+ Error ->
+ Error
+ end.
+
+-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata())
+ -> ok | {error, any()}.
+
+send(Conn, StreamID, Data) ->
+ send(Conn, StreamID, Data, nofin).
+
+-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata(), cow_http:fin())
+ -> ok | {error, any()}.
+
+send(_Conn, StreamID, Data, IsFin) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ Size = iolist_size(Data),
+ case quicer:send(StreamRef, Data, send_flag(IsFin)) of
+ {ok, Size} ->
+ ok;
+ {error, Reason1, Reason2} ->
+ {error, {Reason1, Reason2}};
+ Error ->
+ Error
+ end.
+
+send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE;
+send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.
+
+-spec shutdown_stream(quicer_connection_handle(),
+ cow_http3:stream_id(), both | receiving, quicer_app_errno())
+ -> ok.
+
+shutdown_stream(_Conn, StreamID, Dir, ErrorCode) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
+ ok.
+
+shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
+shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
+
+%% Messages.
+
+%% @todo Probably should have the Conn given as argument too?
+-spec handle({quic, _, _, _})
+ -> {data, cow_http3:stream_id(), cow_http:fin(), binary()}
+ | {stream_started, cow_http3:stream_id(), unidi | bidi}
+ | {stream_closed, cow_http3:stream_id(), quicer_app_errno()}
+ | closed
+ | ok
+ | unknown
+ | {socket_error, any()}.
+
+handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of
+ ?QUIC_RECEIVE_FLAG_FIN -> fin;
+ _ -> nofin
+ end,
+ {data, StreamID, IsFin, Data};
+%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED.
+handle({quic, new_stream, StreamRef, #{flags := Flags}}) ->
+ case quicer:setopt(StreamRef, active, true) of
+ ok ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ put({quicer_stream, StreamID}, StreamRef),
+ StreamType = case quicer:is_unidirectional(Flags) of
+ true -> unidi;
+ false -> bidi
+ end,
+ {stream_started, StreamID, StreamType};
+ {error, Reason} ->
+ {socket_error, Reason}
+ end;
+%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE.
+handle({quic, stream_closed, StreamRef, #{error := ErrorCode}}) ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ {stream_closed, StreamID, ErrorCode};
+%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE.
+handle({quic, closed, Conn, _Flags}) ->
+ _ = quicer:close_connection(Conn),
+ closed;
+%% The following events are currently ignored either because
+%% I do not know what they do or because we do not need to
+%% take action.
+handle({quic, streams_available, _Conn, _Props}) ->
+ ok;
+handle({quic, dgram_state_changed, _Conn, _Props}) ->
+ ok;
+%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT
+handle({quic, transport_shutdown, _Conn, _Flags}) ->
+ ok;
+handle({quic, peer_send_shutdown, _StreamRef, undefined}) ->
+ ok;
+handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) ->
+ ok;
+handle({quic, shutdown, _Conn, success}) ->
+ ok;
+handle(_Msg) ->
+ unknown.
+
+-endif.
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 842bd8d..b373344 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -151,6 +151,11 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p
[Ref, self(), StreamID, Pid, Reason, Stacktrace]}
|Commands0]
end,
+ %% @todo We are trying to send a 500 response before resetting
+ %% the stream. But due to the way the RESET_STREAM frame
+ %% works in QUIC the data may be lost. The problem is
+ %% known and a draft RFC exists at
+ %% https://www.ietf.org/id/draft-ietf-quic-reliable-stream-reset-03.html
do_info(StreamID, Exit, [
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
|Commands], State);
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 5b98b43..3cc4d30 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -103,7 +103,8 @@
%% is trying to upgrade to the Websocket protocol.
-spec is_upgrade_request(cowboy_req:req()) -> boolean().
-is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) ->
+is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol})
+ when Version =:= 'HTTP/2'; Version =:= 'HTTP/3' ->
<<"websocket">> =:= cowboy_bstr:to_lower(Protocol);
is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) ->
ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []),
@@ -148,13 +149,13 @@ upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
<<"connection">> => <<"upgrade">>,
<<"upgrade">> => <<"websocket">>
}, Req0), Env};
- %% Use a generic 400 error for HTTP/2.
+ %% Use 501 Not Implemented for HTTP/2 and HTTP/3 as recommended
+ %% by RFC9220 3 (WebSockets Upgrade over HTTP/3).
{error, upgrade_required} ->
- {ok, cowboy_req:reply(400, Req0), Env}
+ {ok, cowboy_req:reply(501, Req0), Env}
catch _:_ ->
%% @todo Probably log something here?
%% @todo Test that we can have 2 /ws 400 status code in a row on the same connection.
- %% @todo Does this even work?
{ok, cowboy_req:reply(400, Req0), Env}
end.
@@ -286,9 +287,12 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
module() | undefined, any(), binary(),
{#state{}, any()}) -> no_return().
takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
- {State0=#state{handler=Handler}, HandlerState}) ->
- %% @todo We should have an option to disable this behavior.
- ranch:remove_connection(Ref),
+ {State0=#state{handler=Handler, req=Req}, HandlerState}) ->
+ case Req of
+ #{version := 'HTTP/3'} -> ok;
+ %% @todo We should have an option to disable this behavior.
+ _ -> ranch:remove_connection(Ref)
+ end,
Messages = case Transport of
undefined -> undefined;
_ -> Transport:messages()