aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cowboy.erl134
-rw-r--r--src/cowboy_app.erl2
-rw-r--r--src/cowboy_bstr.erl2
-rw-r--r--src/cowboy_children.erl2
-rw-r--r--src/cowboy_clear.erl6
-rw-r--r--src/cowboy_clock.erl4
-rw-r--r--src/cowboy_compress_h.erl2
-rw-r--r--src/cowboy_constraints.erl2
-rw-r--r--src/cowboy_decompress_h.erl29
-rw-r--r--src/cowboy_dynamic_buffer.hrl80
-rw-r--r--src/cowboy_handler.erl2
-rw-r--r--src/cowboy_http.erl185
-rw-r--r--src/cowboy_http2.erl121
-rw-r--r--src/cowboy_http3.erl1253
-rw-r--r--src/cowboy_loop.erl2
-rw-r--r--src/cowboy_metrics_h.erl2
-rw-r--r--src/cowboy_middleware.erl2
-rw-r--r--src/cowboy_quicer.erl283
-rw-r--r--src/cowboy_req.erl24
-rw-r--r--src/cowboy_rest.erl69
-rw-r--r--src/cowboy_router.erl2
-rw-r--r--src/cowboy_static.erl14
-rw-r--r--src/cowboy_stream.erl3
-rw-r--r--src/cowboy_stream_h.erl14
-rw-r--r--src/cowboy_sub_protocol.erl4
-rw-r--r--src/cowboy_sup.erl2
-rw-r--r--src/cowboy_tls.erl8
-rw-r--r--src/cowboy_tracer_h.erl2
-rw-r--r--src/cowboy_websocket.erl119
-rw-r--r--src/cowboy_webtransport.erl292
30 files changed, 2415 insertions, 251 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl
index ad45919..6a5634e 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -16,6 +16,7 @@
-export([start_clear/3]).
-export([start_tls/3]).
+-export([start_quic/3]).
-export([stop_listener/1]).
-export([get_env/2]).
-export([get_env/3]).
@@ -25,6 +26,9 @@
-export([log/2]).
-export([log/4]).
+%% Don't warn about the bad quicer specs.
+-dialyzer([{nowarn_function, start_quic/3}]).
+
-type opts() :: cowboy_http:opts() | cowboy_http2:opts().
-export_type([opts/0]).
@@ -44,46 +48,156 @@
-spec start_clear(ranch:ref(), ranch:opts(), opts())
-> {ok, pid()} | {error, any()}.
+
start_clear(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), ranch:opts(), opts())
-> {ok, pid()} | {error, any()}.
+
start_tls(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- SocketOpts = maps:get(socket_opts, TransOpts1, []),
- TransOpts2 = TransOpts1#{socket_opts => [
- {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
- |SocketOpts]},
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ TransOpts3 = ensure_alpn(TransOpts2),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
+%% @todo Experimental function to start a barebone QUIC listener.
+%% This will need to be reworked to be closer to Ranch
+%% listeners and provide equivalent features.
+%%
+%% @todo Better type for transport options. Might require fixing quicer types.
+
+-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
+ -> {ok, pid()}.
+
+%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies.
+start_quic(Ref, TransOpts, ProtoOpts) ->
+ {ok, _} = application:ensure_all_started(quicer),
+ Parent = self(),
+ SocketOpts0 = maps:get(socket_opts, TransOpts, []),
+ {Port, SocketOpts2} = case lists:keytake(port, 1, SocketOpts0) of
+ {value, {port, Port0}, SocketOpts1} ->
+ {Port0, SocketOpts1};
+ false ->
+ {port_0(), SocketOpts0}
+ end,
+ SocketOpts = [
+ {alpn, ["h3"]}, %% @todo Why not binary?
+ %% We only need 3 for control and QPACK enc/dec,
+ %% but we need more for WebTransport. %% @todo Use 3 if WT is disabled.
+ {peer_unidi_stream_count, 100},
+ {peer_bidi_stream_count, 100},
+ %% For WebTransport.
+ %% @todo We probably don't want it enabled if WT isn't used.
+ {datagram_send_enabled, 1},
+ {datagram_receive_enabled, 1}
+ |SocketOpts2],
+ _ListenerPid = spawn(fun() ->
+ {ok, Listener} = quicer:listen(Port, SocketOpts),
+ Parent ! {ok, Listener},
+ _AcceptorPid = [spawn(fun AcceptLoop() ->
+ {ok, Conn} = quicer:accept(Listener, []),
+ Pid = spawn(fun() ->
+ receive go -> ok end,
+ %% We have to do the handshake after handing control of
+ %% the connection otherwise streams may come in before
+ %% the controlling process is changed and messages will
+ %% not be sent to the correct process.
+ {ok, Conn} = quicer:handshake(Conn),
+ process_flag(trap_exit, true), %% @todo Only if supervisor though.
+ try cowboy_http3:init(Parent, Ref, Conn, ProtoOpts)
+ catch
+ exit:{shutdown,_} -> ok;
+ C:E:S -> log(error, "CRASH ~p:~p:~p", [C,E,S], ProtoOpts)
+ end
+ end),
+ ok = quicer:controlling_process(Conn, Pid),
+ Pid ! go,
+ AcceptLoop()
+ end) || _ <- lists:seq(1, 20)],
+ %% Listener process must not terminate.
+ receive after infinity -> ok end
+ end),
+ receive
+ {ok, Listener} ->
+ {ok, Listener}
+ end.
+
+%% Select a random UDP port using gen_udp because quicer
+%% does not provide equivalent functionality. Taken from
+%% quicer test suites.
+port_0() ->
+ {ok, Socket} = gen_udp:open(0, [{reuseaddr, true}]),
+ {ok, {_, Port}} = inet:sockname(Socket),
+ gen_udp:close(Socket),
+ case os:type() of
+ {unix, darwin} ->
+ %% Apparently macOS doesn't free the port immediately.
+ timer:sleep(500);
+ _ ->
+ ok
+ end,
+ Port.
+
+ensure_alpn(TransOpts) ->
+ SocketOpts = maps:get(socket_opts, TransOpts, []),
+ TransOpts#{socket_opts => [
+ {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
+ |SocketOpts]}.
+
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
{TransOpts, ConnectionType};
ensure_connection_type(TransOpts) ->
{TransOpts#{connection_type => supervisor}, supervisor}.
+%% Dynamic buffer was set; accept transport options as-is.
+%% Note that initial 'buffer' size may be lower than dynamic buffer allows.
+ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) ->
+ {TransOpts, DynamicBuffer};
+%% Dynamic buffer was not set; define default dynamic buffer
+%% only if 'buffer' size was not configured. In that case we
+%% set the 'buffer' size to the lowest value.
+ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) ->
+ case proplists:get_value(buffer, SocketOpts, undefined) of
+ undefined ->
+ {TransOpts#{socket_opts => [{buffer, 1024}|SocketOpts]}, {1024, 131072}};
+ _ ->
+ {TransOpts, false}
+ end.
+
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
+
stop_listener(Ref) ->
ranch:stop_listener(Ref).
-spec get_env(ranch:ref(), atom()) -> ok.
+
get_env(Ref, Name) ->
Opts = ranch:get_protocol_options(Ref),
Env = maps:get(env, Opts, #{}),
maps:get(Name, Env).
-spec get_env(ranch:ref(), atom(), any()) -> ok.
+
get_env(Ref, Name, Default) ->
Opts = ranch:get_protocol_options(Ref),
Env = maps:get(env, Opts, #{}),
maps:get(Name, Env, Default).
-spec set_env(ranch:ref(), atom(), any()) -> ok.
+
set_env(Ref, Name, Value) ->
Opts = ranch:get_protocol_options(Ref),
Env = maps:get(env, Opts, #{}),
@@ -93,10 +207,12 @@ set_env(Ref, Name, Value) ->
%% Internal.
-spec log({log, logger:level(), io:format(), list()}, opts()) -> ok.
+
log({log, Level, Format, Args}, Opts) ->
log(Level, Format, Args, Opts).
-spec log(logger:level(), io:format(), list(), opts()) -> ok.
+
log(Level, Format, Args, #{logger := Logger})
when Logger =/= error_logger ->
_ = Logger:Level(Format, Args),
diff --git a/src/cowboy_app.erl b/src/cowboy_app.erl
index 95ae564..e58e1f6 100644
--- a/src/cowboy_app.erl
+++ b/src/cowboy_app.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_bstr.erl b/src/cowboy_bstr.erl
index f23167d..d0e7301 100644
--- a/src/cowboy_bstr.erl
+++ b/src/cowboy_bstr.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_children.erl b/src/cowboy_children.erl
index 305c989..2e00c37 100644
--- a/src/cowboy_children.erl
+++ b/src/cowboy_children.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl
index eaeab74..845fdc1 100644
--- a/src/cowboy_clear.erl
+++ b/src/cowboy_clear.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -36,10 +36,6 @@ connection_process(Parent, Ref, Transport, Opts) ->
ProxyInfo = get_proxy_info(Ref, Opts),
{ok, Socket} = ranch:handshake(Ref),
%% Use cowboy_http2 directly only when 'http' is missing.
- %% Otherwise switch to cowboy_http2 from cowboy_http.
- %%
- %% @todo Extend this option to cowboy_tls and allow disabling
- %% the switch to cowboy_http2 in cowboy_http. Also document it.
Protocol = case maps:get(protocols, Opts, [http2, http]) of
[http2] -> cowboy_http2;
[_|_] -> cowboy_http
diff --git a/src/cowboy_clock.erl b/src/cowboy_clock.erl
index e2cdf62..b6e39f4 100644
--- a/src/cowboy_clock.erl
+++ b/src/cowboy_clock.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -93,7 +93,7 @@ handle_cast(_Msg, State) ->
-spec handle_info(any(), State) -> {noreply, State} when State::#state{}.
handle_info(update, #state{universaltime=Prev, rfc1123=B1, tref=TRef0}) ->
%% Cancel the timer in case an external process sent an update message.
- _ = erlang:cancel_timer(TRef0),
+ _ = erlang:cancel_timer(TRef0, [{async, true}, {info, false}]),
T = erlang:universaltime(),
B2 = update_rfc1123(B1, Prev, T),
ets:insert(?MODULE, {rfc1123, B2}),
diff --git a/src/cowboy_compress_h.erl b/src/cowboy_compress_h.erl
index 338ea9f..785eb0d 100644
--- a/src/cowboy_compress_h.erl
+++ b/src/cowboy_compress_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_constraints.erl b/src/cowboy_constraints.erl
index 33f0111..84ff249 100644
--- a/src/cowboy_constraints.erl
+++ b/src/cowboy_constraints.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2014-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_decompress_h.erl b/src/cowboy_decompress_h.erl
index 4e86e23..84283e5 100644
--- a/src/cowboy_decompress_h.erl
+++ b/src/cowboy_decompress_h.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2024, jdamanalo <[email protected]>
-%% Copyright (c) 2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) jdamanalo <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -66,9 +66,11 @@ data(StreamID, IsFin, Data, State=#state{next=Next0, enabled=false, read_body_bu
{Commands, Next} = cowboy_stream:data(StreamID, IsFin,
buffer_to_binary([Data|Buffer]), Next0),
fold(Commands, State#state{next=Next, read_body_is_fin=IsFin});
-data(StreamID, IsFin, Data, State0=#state{next=Next0, ratio_limit=RatioLimit,
+data(StreamID, IsFin, Data0, State0=#state{next=Next0, ratio_limit=RatioLimit,
inflate=Z, is_reading=true, read_body_buffer=Buffer}) ->
- case inflate(Z, RatioLimit, buffer_to_iovec([Data|Buffer])) of
+ Data = buffer_to_iovec([Data0|Buffer]),
+ Limit = iolist_size(Data) * RatioLimit,
+ case cow_deflate:inflate(Z, Data, Limit) of
{error, ErrorType} ->
zlib:close(Z),
Status = case ErrorType of
@@ -236,22 +238,3 @@ do_build_accept_encoding([{ContentCoding, Q}|Tail], Acc0) ->
do_build_accept_encoding(Tail, Acc);
do_build_accept_encoding([], Acc) ->
Acc.
-
-inflate(Z, RatioLimit, Data) ->
- try
- {Status, Output} = zlib:safeInflate(Z, Data),
- Size = iolist_size(Output),
- do_inflate(Z, Size, iolist_size(Data) * RatioLimit, Status, [Output])
- catch
- error:data_error ->
- {error, data_error}
- end.
-
-do_inflate(_, Size, Limit, _, _) when Size > Limit ->
- {error, size_error};
-do_inflate(Z, Size0, Limit, continue, Acc) ->
- {Status, Output} = zlib:safeInflate(Z, []),
- Size = Size0 + iolist_size(Output),
- do_inflate(Z, Size, Limit, Status, [Output | Acc]);
-do_inflate(_, _, _, finished, Acc) ->
- {ok, iolist_to_binary(lists:reverse(Acc))}.
diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl
new file mode 100644
index 0000000..4d05e50
--- /dev/null
+++ b/src/cowboy_dynamic_buffer.hrl
@@ -0,0 +1,80 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% These functions are common to cowboy_http, cowboy_http2 and
+%% cowboy_websocket. It requires the options and the state
+%% to use the same field names.
+
+%% Experiments have shown that the size of the 'buffer' can greatly
+%% impact performance: a buffer too small leads to more messages
+%% being handled and typically more binary appends; and a buffer
+%% too large results in inefficient use of memory which in turn
+%% reduces the throughput, presumably because large binary appends
+%% are not as efficient as smaller ones, and because while the
+%% buffer gets allocated only when there is data, the allocated
+%% size remains until the binary is GC and so under-use hurts.
+%%
+%% The performance of a given 'buffer' size will also depend on
+%% how the client is sending data, and on the protocol. For example,
+%% HTTP/1.1 doesn't need a very large 'buffer' size for reading
+%% request headers, but it does need one for reading large request
+%% bodies. At the same time, HTTP/2 performs best reading large
+%% request bodies when the 'buffer' size is about half that of
+%% HTTP/1.1.
+%%
+%% It therefore becomes important to resize the buffer dynamically
+%% depending on what is currently going on. We do this based on
+%% the size of data packets we received from the transport. We
+%% maintain a moving average and when that moving average is
+%% 90% of the current 'buffer' size, we double the 'buffer' size.
+%% When things slow down and the moving average falls below
+%% 40% of the current 'buffer' size, we halve the 'buffer' size.
+%%
+%% To calculate the moving average we do (MovAvg + DataLen) div 2.
+%% This means that the moving average will change very quickly when
+%% DataLen increases or decreases rapidly. That's OK, we want to
+%% be reactive, but also setting the buffer size is a pretty fast
+%% operation. The formula could be changed to the following if it
+%% became a problem: (MovAvg * N + DataLen) div (N + 1).
+%%
+%% Note that this works best when active,N uses low values of N.
+%% We don't want to accumulate too much data because we resize
+%% the buffer.
+
+init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) ->
+ DynamicBuffer;
+init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) ->
+ LowDynamicBuffer;
+init_dynamic_buffer_size(_) ->
+ false.
+
+maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) ->
+ State;
+maybe_resize_buffer(State=#state{transport=Transport, socket=Socket,
+ opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}},
+ dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) ->
+ DataLen = byte_size(Data),
+ MovingAvg = (MovingAvg0 + DataLen) div 2,
+ if
+ BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
+ BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
+ BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ true ->
+ State#state{dynamic_buffer_moving_average=MovingAvg}
+ end.
diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl
index 5048168..1989512 100644
--- a/src/cowboy_handler.erl
+++ b/src/cowboy_handler.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index ee1e725..10eb519 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -12,9 +12,12 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+%% @todo Worth renaming to cowboy_http1.
+%% @todo Change use of cow_http to cow_http1 where appropriate.
-module(cowboy_http).
-export([init/6]).
+-export([loop/1]).
-export([system_continue/3]).
-export([system_terminate/4]).
@@ -22,11 +25,16 @@
-type opts() :: #{
active_n => pos_integer(),
+ alpn_default_protocol => http | http2,
chunked => boolean(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
env => cowboy_middleware:env(),
+ hibernate => boolean(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
@@ -45,6 +53,7 @@
metrics_req_filter => fun((cowboy_req:req()) -> map()),
metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
middlewares => [module()],
+ protocols => [http | http2],
proxy_header => boolean(),
request_timeout => timeout(),
reset_idle_timeout_on_send => boolean(),
@@ -135,6 +144,10 @@
%% Flow requested for the current stream.
flow = infinity :: non_neg_integer() | infinity,
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -179,12 +192,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
last_streamid=maps:get(max_keepalive, Opts, 1000)},
safe_setopts_active(State),
- loop(set_timeout(State, request_timeout)).
+ before_loop(set_timeout(State, request_timeout)).
+
+-include("cowboy_dynamic_buffer.hrl").
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
@@ -210,6 +227,13 @@ flush_passive(Socket, Messages) ->
ok
end.
+before_loop(State=#state{opts=#{hibernate := true}}) ->
+ proc_lib:hibernate(?MODULE, loop, [State]);
+before_loop(State) ->
+ loop(State).
+
+-spec loop(#state{}) -> ok.
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
last_streamid=LastStreamID}) ->
@@ -218,11 +242,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
receive
%% Discard data coming in after the last request
%% we want to process was received fully.
- {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- loop(State);
+ {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
+ State1 = maybe_resize_buffer(State, Data),
+ before_loop(State1);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(<< Buffer/binary, Data/binary >>, State);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(<< Buffer/binary, Data/binary >>, State1);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -231,37 +257,37 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
safe_setopts_active(State),
- loop(State);
+ before_loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
- loop(State);
+ before_loop(State);
{timeout, TimerRef, Reason} ->
timeout(State, Reason);
{timeout, _, _} ->
- loop(State);
+ before_loop(State);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
- loop(initiate_closing(State, Reason));
+ before_loop(initiate_closing(State, Reason));
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
- loop(info(State, StreamID, Msg));
+ before_loop(info(State, StreamID, Msg));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
- loop(down(State, Pid, Msg));
+ before_loop(down(State, Pid, Msg));
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
- loop(State);
+ before_loop(State);
%% Unknown messages.
Msg ->
cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
- loop(State)
+ before_loop(State)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
@@ -293,6 +319,7 @@ set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
when element(1, InState) =/= ps_body ->
State;
%% Otherwise we can set the timeout.
+%% @todo Don't do this so often, use a strategy similar to Websocket/H2 if possible.
set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
Default = case Name of
@@ -325,7 +352,7 @@ cancel_timeout(State=#state{timer=TimerRef}) ->
_ ->
%% Do a synchronous cancel and remove the message if any
%% to avoid receiving stray messages.
- _ = erlang:cancel_timer(TimerRef),
+ _ = erlang:cancel_timer(TimerRef, [{async, false}, {info, false}]),
receive
{timeout, TimerRef, _} -> ok
after 0 ->
@@ -346,12 +373,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).
parse(<<>>, State) ->
- loop(State#state{buffer= <<>>});
+ before_loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
- loop(State#state{buffer= <<>>});
+ before_loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -420,13 +447,13 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, IsFin, _, State}) ->
- loop(set_timeout(State, case IsFin of
+after_parse({data, _, IsFin, _, State=#state{buffer=Buffer}}) ->
+ parse(Buffer, set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
- loop(set_timeout(State, idle_timeout)).
+ before_loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
%% This function is only called after parsing, therefore we
@@ -486,8 +513,13 @@ parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLine
'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
%% Accept direct HTTP/2 only at the beginning of the connection.
<< "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
- %% @todo Might be worth throwing to get a clean stacktrace.
- http2_upgrade(State, Buffer);
+ case lists:member(http2, maps:get(protocols, Opts, [http2, http])) of
+ true ->
+ http2_upgrade(State, Buffer);
+ false ->
+ error_terminate(501, State, {connection_error, no_error,
+ 'Prior knowledge upgrade to HTTP/2 is disabled by configuration.'})
+ end;
_ ->
parse_method(Buffer, State, <<>>,
maps:get(max_method_length, Opts, 32))
@@ -775,7 +807,7 @@ default_port(_) -> 80.
%% End of request parsing.
request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
- proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
+ opts=Opts, proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
Headers, Host, Port) ->
Scheme = case Transport:secure() of
@@ -839,7 +871,7 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock
undefined -> Req0;
_ -> Req0#{proxy_header => ProxyHeader}
end,
- case is_http2_upgrade(Headers, Version) of
+ case is_http2_upgrade(Headers, Version, Opts) of
false ->
State = case HasBody of
true ->
@@ -861,12 +893,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock
%% HTTP/2 upgrade.
-%% @todo We must not upgrade to h2c over a TLS connection.
is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
- <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
+ <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1', Opts) ->
Conns = cow_http_hd:parse_connection(Conn),
- case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
- {true, true} ->
+ case lists:member(<<"upgrade">>, Conns)
+ andalso lists:member(<<"http2-settings">>, Conns)
+ andalso lists:member(http2, maps:get(protocols, Opts, [http2, http])) of
+ true ->
Protocols = cow_http_hd:parse_upgrade(Upgrade),
case lists:member(<<"h2c">>, Protocols) of
true ->
@@ -877,17 +910,17 @@ is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
_ ->
false
end;
-is_http2_upgrade(_, _) ->
+is_http2_upgrade(_, _, _) ->
false.
%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
case Transport:secure() of
false ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer);
true ->
error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
@@ -895,22 +928,37 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% Upgrade via an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert},
Buffer, HTTP2Settings, Req) ->
- %% @todo
- %% However if the client sent a body, we need to read the body in full
- %% and if we can't do that, return a 413 response. Some options are in order.
- %% Always half-closed stream coming from this side.
- try cow_http_hd:parse_http2_settings(HTTP2Settings) of
- Settings ->
- _ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
- catch _:_ ->
- error_terminate(400, State, {connection_error, protocol_error,
- 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
+ case Transport:secure() of
+ false ->
+ %% @todo
+ %% However if the client sent a body, we need to read the body in full
+ %% and if we can't do that, return a 413 response. Some options are in order.
+ %% Always half-closed stream coming from this side.
+ try cow_http_hd:parse_http2_settings(HTTP2Settings) of
+ Settings ->
+ _ = cancel_timeout(State),
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req)
+ catch _:_ ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
+ end;
+ true ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
end.
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) ->
+ Opts;
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size,
+ dynamic_buffer_moving_average=MovingAvg}) ->
+ Opts#{
+ dynamic_buffer_initial_average => MovingAvg,
+ dynamic_buffer_initial_size => Size
+ }.
+
%% Request body parsing.
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
@@ -1207,7 +1255,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_
commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
- out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
+ out_state=OutState, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
State1 = cancel_timeout(State0),
@@ -1231,23 +1279,19 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Turn off the trap_exit process flag
%% since this process will no longer be a supervisor.
process_flag(trap_exit, false),
- Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
+ Protocol:takeover(Parent, Ref, Socket, Transport,
+ opts_for_upgrade(State), Buffer, InitialState);
%% Set options dynamically.
-commands(State0=#state{overriden_opts=Opts},
- StreamID, [{set_options, SetOpts}|Tail]) ->
- State1 = case SetOpts of
- #{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+commands(State0, StreamID, [{set_options, SetOpts}|Tail]) ->
+ State = maps:fold(fun
+ (chunked, Chunked, StateF=#state{overriden_opts=Opts}) ->
+ StateF#state{overriden_opts=Opts#{chunked => Chunked}};
+ (idle_timeout, IdleTimeout, StateF=#state{overriden_opts=Opts}) ->
+ set_timeout(StateF#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
idle_timeout);
- _ ->
- State0
- end,
- State = case SetOpts of
- #{chunked := Chunked} ->
- State1#state{overriden_opts=Opts#{chunked => Chunked}};
- _ ->
- State1
- end,
+ (_, _, StateF) ->
+ StateF
+ end, State0, SetOpts),
commands(State, StreamID, Tail);
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
@@ -1366,23 +1410,24 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
end.
stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) ->
+ %% Enable active mode again if it was disabled.
+ State1 = case Active of
+ true -> State0;
+ false -> active(State0)
+ end,
NextOutStreamID = OutStreamID + 1,
case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
false ->
- State = State0#state{out_streamid=NextOutStreamID, out_state=wait},
+ State = State1#state{out_streamid=NextOutStreamID, out_state=wait},
%% There are no streams remaining. We therefore can
%% and want to switch back to the request_timeout.
set_timeout(State, request_timeout);
#stream{queue=Commands} ->
- State = case Active of
- true -> State0;
- false -> active(State0)
- end,
%% @todo Remove queue from the stream.
%% We set the flow to the initial flow size even though
%% we might have sent some data through already due to pipelining.
Flow = maps:get(initial_stream_flow_size, Opts, 65535),
- commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
+ commands(State1#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
NextOutStreamID, Commands)
end.
@@ -1531,7 +1576,7 @@ maybe_socket_error(_, Result = {ok, _}, _) ->
maybe_socket_error(State, {error, Reason}, Human) ->
terminate(State, {socket_error, Reason, Human}).
--spec terminate(_, _) -> no_return().
+-spec terminate(#state{} | undefined, _) -> no_return().
terminate(undefined, Reason) ->
exit({shutdown, Reason});
terminate(State=#state{streams=Streams, children=Children}, Reason) ->
@@ -1595,12 +1640,12 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
-spec system_continue(_, _, #state{}) -> ok.
system_continue(_, _, State) ->
- loop(State).
+ before_loop(State).
-spec system_terminate(any(), _, _, #state{}) -> no_return().
system_terminate(Reason0, _, _, State) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
- loop(initiate_closing(State, Reason)).
+ before_loop(initiate_closing(State, Reason)).
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 5b1f1e1..0d22fa1 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -17,6 +17,7 @@
-export([init/6]).
-export([init/10]).
-export([init/12]).
+-export([loop/2]).
-export([system_continue/3]).
-export([system_terminate/4]).
@@ -24,15 +25,20 @@
-type opts() :: #{
active_n => pos_integer(),
+ alpn_default_protocol => http | http2,
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
goaway_initial_timeout => timeout(),
goaway_complete_timeout => timeout(),
+ hibernate => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
@@ -57,6 +63,7 @@
metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
middlewares => [module()],
preface_timeout => timeout(),
+ protocols => [http | http2],
proxy_header => boolean(),
reset_idle_timeout_on_send => boolean(),
sendfile => boolean(),
@@ -85,6 +92,14 @@
state :: {module, any()}
}).
+%% We don't want to reset the idle timeout too often,
+%% so we don't reset it on data. Instead we reset the
+%% number of ticks we have observed. We divide the
+%% timeout value by a value and that value becomes
+%% the number of ticks at which point we can drop
+%% the connection. This value is the number of ticks.
+-define(IDLE_TIMEOUT_TICKS, 10).
+
-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
@@ -95,6 +110,7 @@
%% Timer for idle_timeout; also used for goaway timers.
timer = undefined :: undefined | reference(),
+ idle_timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
%% Remote address and port for the connection.
peer = undefined :: {inet:ip_address(), inet:port_number()},
@@ -124,6 +140,10 @@
%% Flow requested for all streams.
flow = 0 :: non_neg_integer(),
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{cow_http2:streamid() => #stream{}},
@@ -134,7 +154,8 @@
}).
-spec init(pid(), ranch:ref(), inet:socket(), module(),
- ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok.
+ ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return().
+
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
{ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket),
'A socket error occurred when retrieving the peer name.'),
@@ -158,18 +179,22 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
-spec init(pid(), ranch:ref(), inet:socket(), module(),
ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
- binary() | undefined, binary()) -> ok.
+ binary() | undefined, binary()) -> no_return().
+
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
%% Send the preface before doing all the init in case we get a socket error.
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
- http2_status=sequence, http2_machine=HTTP2Machine})),
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
+ http2_status=sequence, http2_machine=HTTP2Machine}), 0),
safe_setopts_active(State),
case Buffer of
- <<>> -> loop(State, Buffer);
+ <<>> -> before_loop(State, Buffer);
_ -> parse(State, Buffer)
end.
@@ -204,15 +229,19 @@ add_period(Time, Period) -> Time + Period.
-spec init(pid(), ranch:ref(), inet:socket(), module(),
ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
- binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
+ binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return().
+
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
_Settings, Req=#{method := Method}) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
{ok, StreamID, HTTP2Machine}
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=upgrade, http2_machine=HTTP2Machine},
State1 = headers_frame(State0#state{
http2_machine=HTTP2Machine}, StreamID, Req),
@@ -222,26 +251,36 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
<<"connection">> => <<"Upgrade">>,
<<"upgrade">> => <<"h2c">>
}, ?MODULE, undefined}), %% @todo undefined or #{}?
- State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})),
+ State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence}), 0),
%% In the case of HTTP/1.1 Upgrade we cannot send the Preface
%% until we send the 101 response.
ok = maybe_socket_error(State, Transport:send(Socket, Preface)),
safe_setopts_active(State),
case Buffer of
- <<>> -> loop(State, Buffer);
+ <<>> -> before_loop(State, Buffer);
_ -> parse(State, Buffer)
end.
+-include("cowboy_dynamic_buffer.hrl").
+
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
%% mechanisms implemented, a very large active_n value should be fine,
%% as long as the stream handlers do their work in a timely manner.
+%% However large active_n values reduce the impact of dynamic_buffer.
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
ok = maybe_socket_error(State, setopts_active(State)).
+before_loop(State=#state{opts=#{hibernate := true}}, Buffer) ->
+ proc_lib:hibernate(?MODULE, loop, [State, Buffer]);
+before_loop(State, Buffer) ->
+ loop(State, Buffer).
+
+-spec loop(#state{}, binary()) -> no_return().
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
Messages = Transport:messages(),
@@ -249,7 +288,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
receive
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
{Closed, Socket} when Closed =:= element(2, Messages) ->
Reason = case State#state.http2_status of
closing -> {stop, closed, 'The client is going away.'};
@@ -262,52 +302,63 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
safe_setopts_active(State),
- loop(State, Buffer);
+ before_loop(State, Buffer);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
- loop(initiate_closing(State, Reason), Buffer);
+ before_loop(initiate_closing(State, Reason), Buffer);
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
%% Timeouts.
{timeout, TimerRef, idle_timeout} ->
- terminate(State, {stop, timeout,
- 'Connection idle longer than configuration allows.'});
+ tick_idle_timeout(State, Buffer);
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
- loop(State, Buffer);
+ before_loop(State, Buffer);
{timeout, TRef, {cow_http2_machine, Name}} ->
- loop(timeout(State, Name, TRef), Buffer);
+ before_loop(timeout(State, Name, TRef), Buffer);
{timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
- loop(closing(State, Reason), Buffer);
+ before_loop(closing(State, Reason), Buffer);
{timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
terminate(State, {stop, stop_reason(Reason),
'Graceful shutdown timed out.'});
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
- loop(info(State, StreamID, Msg), Buffer);
+ before_loop(info(State, StreamID, Msg), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
- loop(down(State, Pid, Msg), Buffer);
+ before_loop(down(State, Pid, Msg), Buffer);
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
- loop(State, Buffer);
+ before_loop(State, Buffer);
Msg ->
cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
- loop(State, Buffer)
+ before_loop(State, Buffer)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
-set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef})
+tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) ->
+ terminate(State, {stop, timeout,
+ 'Connection idle longer than configuration allows.'});
+tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) ->
+ before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer).
+
+set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _)
when Status =:= closing_initiated orelse Status =:= closing,
TimerRef =/= undefined ->
State;
-set_idle_timeout(State=#state{opts=Opts}) ->
- set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout).
+set_idle_timeout(State=#state{opts=Opts}, TimeoutNum) ->
+ case maps:get(idle_timeout, Opts, 60000) of
+ infinity ->
+ State#state{timer=undefined};
+ Timeout ->
+ set_timeout(State#state{idle_timeout_num=TimeoutNum},
+ Timeout div ?IDLE_TIMEOUT_TICKS, idle_timeout)
+ end.
set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
ok = case TimerRef0 of
@@ -323,7 +374,7 @@ set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
maybe_reset_idle_timeout(State=#state{opts=Opts}) ->
case maps:get(reset_idle_timeout_on_send, Opts, false) of
true ->
- set_idle_timeout(State);
+ State#state{idle_timeout_num=0};
false ->
State
end.
@@ -335,7 +386,7 @@ parse(State=#state{http2_status=sequence}, Data) ->
{ok, Rest} ->
parse(State#state{http2_status=settings}, Rest);
more ->
- loop(State, Data);
+ before_loop(State, Data);
Error = {connection_error, _, _} ->
terminate(State, Error)
end;
@@ -354,7 +405,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre
more when Status =:= closing, Streams =:= #{} ->
terminate(State, {stop, normal, 'The connection is going away.'});
more ->
- loop(State, Data)
+ before_loop(State, Data)
end.
%% Frame rate flood protection.
@@ -1086,7 +1137,9 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
%% in-flight stream creation (at least one round-trip time), the server can send
%% another GOAWAY frame with an updated last stream identifier. This ensures
%% that a connection can be cleanly shut down without losing requests.
+
-spec initiate_closing(#state{}, _) -> #state{}.
+
initiate_closing(State=#state{http2_status=connected, socket=Socket,
transport=Transport, opts=Opts}, Reason) ->
ok = maybe_socket_error(State, Transport:send(Socket,
@@ -1103,7 +1156,9 @@ initiate_closing(State, Reason) ->
terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}).
%% Switch to 'closing' state and stop accepting new streams.
+
-spec closing(#state{}, Reason :: term()) -> #state{}.
+
closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} ->
terminate(State, Reason);
closing(State0=#state{http2_status=closing_initiated,
@@ -1139,7 +1194,8 @@ maybe_socket_error(_, Result = {ok, _}, _) ->
maybe_socket_error(State, {error, Reason}, Human) ->
terminate(State, {socket_error, Reason, Human}).
--spec terminate(#state{}, _) -> no_return().
+-spec terminate(#state{} | undefined, _) -> no_return().
+
terminate(undefined, Reason) ->
exit({shutdown, Reason});
terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
@@ -1340,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
%% System callbacks.
--spec system_continue(_, _, {#state{}, binary()}) -> ok.
+-spec system_continue(_, _, {#state{}, binary()}) -> no_return().
+
system_continue(_, _, {State, Buffer}) ->
- loop(State, Buffer).
+ before_loop(State, Buffer).
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
+
system_terminate(Reason0, _, _, {State, Buffer}) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
- loop(initiate_closing(State, Reason), Buffer).
+ before_loop(initiate_closing(State, Reason), Buffer).
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
+
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
new file mode 100644
index 0000000..9aa6be5
--- /dev/null
+++ b/src/cowboy_http3.erl
@@ -0,0 +1,1253 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% A key difference between cowboy_http2 and cowboy_http3
+%% is that HTTP/3 streams are QUIC streams and therefore
+%% much of the connection state is handled outside of
+%% Cowboy.
+
+-module(cowboy_http3).
+
+-export([init/4]).
+
+%% Temporary callback to do sendfile over QUIC.
+-export([send/2]).
+
+%% @todo Graceful shutdown? Linger? Timeouts? Frame rates? PROXY header?
+-type opts() :: #{
+ compress_buffering => boolean(),
+ compress_threshold => non_neg_integer(),
+ connection_type => worker | supervisor,
+ enable_connect_protocol => boolean(),
+ env => cowboy_middleware:env(),
+ logger => module(),
+ max_decode_blocked_streams => 0..16#3fffffffffffffff,
+ max_decode_table_size => 0..16#3fffffffffffffff,
+ max_encode_blocked_streams => 0..16#3fffffffffffffff,
+ max_encode_table_size => 0..16#3fffffffffffffff,
+ max_ignored_frame_size_received => non_neg_integer() | infinity,
+ metrics_callback => cowboy_metrics_h:metrics_callback(),
+ metrics_req_filter => fun((cowboy_req:req()) -> map()),
+ metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
+ middlewares => [module()],
+ shutdown_timeout => timeout(),
+ stream_handlers => [module()],
+ tracer_callback => cowboy_tracer_h:tracer_callback(),
+ tracer_flags => [atom()],
+ tracer_match_specs => cowboy_tracer_h:tracer_match_specs(),
+ %% Open ended because configured stream handlers might add options.
+ _ => _
+}.
+-export_type([opts/0]).
+
+%% HTTP/3 or WebTransport stream.
+%%
+%% WebTransport sessions involve one bidirectional CONNECT stream
+%% that must stay open (and can be used for signaling using the
+%% Capsule Protocol) and an application-defined number of
+%% unidirectional and bidirectional streams, as well as datagrams.
+%%
+%% WebTransport sessions run in the CONNECT request process and
+%% all events related to the session is sent there as a message.
+%% The pid of the process is kept in the state.
+-record(stream, {
+ id :: cow_http3:stream_id(),
+
+ %% Whether the stream is currently in a special state.
+ status :: header | {unidi, control | encoder | decoder}
+ | normal | {data | ignore, non_neg_integer()} | stopping
+ | {webtransport_session, normal | {ignore, non_neg_integer()}}
+ | {webtransport_stream, cow_http3:stream_id()},
+
+ %% Stream buffer.
+ buffer = <<>> :: binary(),
+
+ %% Stream state.
+ state = undefined :: undefined | {module(), any()}
+}).
+
+-record(state, {
+ parent :: pid(),
+ ref :: ranch:ref(),
+ conn :: cowboy_quicer:quicer_connection_handle(),
+ opts = #{} :: opts(),
+
+ %% Remote address and port for the connection.
+ peer = undefined :: {inet:ip_address(), inet:port_number()},
+
+ %% Local address and port for the connection.
+ sock = undefined :: {inet:ip_address(), inet:port_number()},
+
+ %% Client certificate.
+ cert :: undefined | binary(),
+
+ %% HTTP/3 state machine.
+ http3_machine :: cow_http3_machine:http3_machine(),
+
+ %% Specially handled local unidi streams.
+ local_control_id = undefined :: undefined | cow_http3:stream_id(),
+ local_encoder_id = undefined :: undefined | cow_http3:stream_id(),
+ local_decoder_id = undefined :: undefined | cow_http3:stream_id(),
+
+ %% Bidirectional streams used for requests and responses,
+ %% as well as unidirectional streams initiated by the client.
+ streams = #{} :: #{cow_http3:stream_id() => #stream{}},
+
+ %% Lingering streams that were recently reset. We may receive
+ %% pending data or messages for these streams a short while
+ %% after they have been reset.
+ lingering_streams = [] :: [non_neg_integer()],
+
+ %% Streams can spawn zero or more children which are then managed
+ %% by this module if operating as a supervisor.
+ children = cowboy_children:init() :: cowboy_children:children()
+}).
+
+-spec init(pid(), ranch:ref(), cowboy_quicer:quicer_connection_handle(), opts())
+ -> no_return().
+
+init(Parent, Ref, Conn, Opts) ->
+ {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts),
+ %% Immediately open a control, encoder and decoder stream.
+ %% @todo An endpoint MAY avoid creating an encoder stream if it will not be used (for example, if its encoder does not wish to use the dynamic table or if the maximum size of the dynamic table permitted by the peer is zero).
+ %% @todo An endpoint MAY avoid creating a decoder stream if its decoder sets the maximum capacity of the dynamic table to zero.
+ {ok, ControlID} = maybe_socket_error(undefined,
+ cowboy_quicer:start_unidi_stream(Conn, [<<0>>, SettingsBin]),
+ 'A socket error occurred when opening the control stream.'),
+ {ok, EncoderID} = maybe_socket_error(undefined,
+ cowboy_quicer:start_unidi_stream(Conn, <<2>>),
+ 'A socket error occurred when opening the encoder stream.'),
+ {ok, DecoderID} = maybe_socket_error(undefined,
+ cowboy_quicer:start_unidi_stream(Conn, <<3>>),
+ 'A socket error occurred when opening the encoder stream.'),
+ %% Set the control, encoder and decoder streams in the machine.
+ HTTP3Machine = cow_http3_machine:init_unidi_local_streams(
+ ControlID, EncoderID, DecoderID, HTTP3Machine0),
+ %% Get the peername/sockname/cert.
+ {ok, Peer} = maybe_socket_error(undefined, cowboy_quicer:peername(Conn),
+ 'A socket error occurred when retrieving the peer name.'),
+ {ok, Sock} = maybe_socket_error(undefined, cowboy_quicer:sockname(Conn),
+ 'A socket error occurred when retrieving the sock name.'),
+ CertResult = case cowboy_quicer:peercert(Conn) of
+ {error, no_peercert} ->
+ {ok, undefined};
+ Cert0 ->
+ Cert0
+ end,
+ {ok, Cert} = maybe_socket_error(undefined, CertResult,
+ 'A socket error occurred when retrieving the client TLS certificate.'),
+ %% Quick! Let's go!
+ loop(#state{parent=Parent, ref=Ref, conn=Conn,
+ opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ http3_machine=HTTP3Machine, local_control_id=ControlID,
+ local_encoder_id=EncoderID, local_decoder_id=DecoderID}).
+
+loop(State0=#state{opts=Opts, children=Children}) ->
+ receive
+ Msg when element(1, Msg) =:= quic ->
+ handle_quic_msg(State0, Msg);
+ %% Timeouts.
+ {timeout, Ref, {shutdown, Pid}} ->
+ cowboy_children:shutdown_timeout(Children, Ref, Pid),
+ loop(State0);
+ %% Messages pertaining to a stream.
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
+ loop(info(State0, StreamID, Msg));
+ %% WebTransport commands.
+ {'$webtransport_commands', SessionID, Commands} ->
+ loop(webtransport_commands(State0, SessionID, Commands));
+ %% Exit signal from children.
+ Msg = {'EXIT', Pid, _} ->
+ loop(down(State0, Pid, Msg));
+ Msg ->
+ cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
+ loop(State0)
+ end.
+
+handle_quic_msg(State0=#state{opts=Opts}, Msg) ->
+ case cowboy_quicer:handle(Msg) of
+ {data, StreamID, IsFin, Data} ->
+ parse(State0, StreamID, Data, IsFin);
+ {datagram, Data} ->
+ parse_datagram(State0, Data);
+ {stream_started, StreamID, StreamType} ->
+ State = stream_new_remote(State0, StreamID, StreamType),
+ loop(State);
+ {stream_closed, StreamID, ErrorCode} ->
+ State = stream_closed(State0, StreamID, ErrorCode),
+ loop(State);
+ {peer_send_shutdown, StreamID} ->
+ State = stream_peer_send_shutdown(State0, StreamID),
+ loop(State);
+ closed ->
+ %% @todo Different error reason if graceful?
+ Reason = {socket_error, closed, 'The socket has been closed.'},
+ terminate(State0, Reason);
+ ok ->
+ loop(State0);
+ unknown ->
+ cowboy:log(warning, "Received unknown QUIC message ~p.", [Msg], Opts),
+ loop(State0);
+ {socket_error, Reason} ->
+ terminate(State0, {socket_error, Reason,
+ 'An error has occurred on the socket.'})
+ end.
+
+parse(State=#state{opts=Opts}, StreamID, Data, IsFin) ->
+ case stream_get(State, StreamID) of
+ Stream=#stream{buffer= <<>>} ->
+ parse1(State, Stream, Data, IsFin);
+ Stream=#stream{buffer=Buffer} ->
+ Stream1 = Stream#stream{buffer= <<>>},
+ parse1(stream_store(State, Stream1),
+ Stream1, <<Buffer/binary, Data/binary>>, IsFin);
+ %% Pending data for a stream that has been reset. Ignore.
+ error ->
+ case is_lingering_stream(State, StreamID) of
+ true ->
+ ok;
+ false ->
+ %% We avoid logging the data as it could be quite large.
+ cowboy:log(warning, "Received data for unknown stream ~p.",
+ [StreamID], Opts)
+ end,
+ loop(State)
+ end.
+
+parse1(State, Stream=#stream{status=header}, Data, IsFin) ->
+ parse_unidirectional_stream_header(State, Stream, Data, IsFin);
+parse1(State=#state{http3_machine=HTTP3Machine0},
+ #stream{status={unidi, Type}, id=StreamID}, Data, IsFin)
+ when Type =:= encoder; Type =:= decoder ->
+ case cow_http3_machine:unidi_data(Data, IsFin, StreamID, HTTP3Machine0) of
+ {ok, Instrs, HTTP3Machine} ->
+ loop(send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs));
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end;
+%% @todo Handle when IsFin = fin which must terminate the WT session.
+parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status=
+ {webtransport_session, normal}}, Data, IsFin) ->
+ case cow_capsule:parse(Data) of
+ {ok, wt_drain_session, Rest} ->
+ webtransport_event(State, SessionID, close_initiated),
+ parse1(State, Stream, Rest, IsFin);
+ {ok, {wt_close_session, AppCode, AppMsg}, Rest} ->
+ %% This event will be handled specially and lead
+ %% to the termination of the session process.
+ webtransport_event(State, SessionID, {closed, AppCode, AppMsg}),
+ %% Shutdown the CONNECT stream immediately.
+ cowboy_quicer:shutdown_stream(Conn, SessionID),
+ %% @todo Will we receive a {stream_closed,...} after that?
+ %% If any data is received past that point this is an error.
+ %% @todo Don't crash, error out properly.
+ <<>> = Rest,
+ loop(webtransport_terminate_session(State, Stream));
+ more ->
+ loop(stream_store(State, Stream#stream{buffer=Data}));
+ %% Ignore unhandled/unknown capsules.
+ %% @todo Do this when cow_capsule includes some.
+% {ok, _, Rest} ->
+% parse1(State, Stream, Rest, IsFin);
+% {ok, Rest} ->
+% parse1(State, Stream, Rest, IsFin);
+ %% @todo Make the max length configurable?
+ {skip, Len} when Len =< 8192 ->
+ loop(stream_store(State, Stream#stream{
+ status={webtransport_session, {ignore, Len}}}));
+ {skip, Len} ->
+ %% @todo What should be done on capsule error?
+ error({todo, capsule_too_long, Len});
+ error ->
+ %% @todo What should be done on capsule error?
+ error({todo, capsule_error, Data})
+ end;
+parse1(State, Stream=#stream{status=
+ {webtransport_session, {ignore, Len}}}, Data, IsFin) ->
+ case Data of
+ <<_:Len/unit:8, Rest/bits>> ->
+ parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin);
+ _ ->
+ loop(stream_store(State, Stream#stream{
+ status={webtransport_session, {ignore, Len - byte_size(Data)}}}))
+ end;
+parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) ->
+ webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
+ %% No need to store the stream again, WT streams don't get changed here.
+ loop(State);
+parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ %% We don't have the full frame but this is the end of the
+ %% data we have. So FrameIsFin is equivalent to IsFin here.
+ loop(frame(State, Stream#stream{status={data, Len - DataLen}}, {data, Data}, IsFin));
+ true ->
+ <<Data1:Len/binary, Rest/bits>> = Data,
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin),
+ StreamID, Rest, IsFin)
+ end;
+parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) ->
+ DataLen = byte_size(Data),
+ if
+ DataLen < Len ->
+ loop(stream_store(State, Stream#stream{status={ignore, Len - DataLen}}));
+ true ->
+ <<_:Len/binary, Rest/bits>> = Data,
+ parse(stream_store(State, Stream#stream{status=normal}),
+ StreamID, Rest, IsFin)
+ end;
+%% @todo Clause that discards receiving data for stopping streams.
+%% We may receive a few more frames after we abort receiving.
+parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
+ case cow_http3:parse(Data) of
+ {ok, Frame, Rest} ->
+ FrameIsFin = is_fin(IsFin, Rest),
+ parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
+ %% The WebTransport stream header is not a real frame.
+ {webtransport_stream_header, SessionID, Rest} ->
+ become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin);
+ {more, Frame = {data, _}, Len} ->
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin ->
+ %% The stream will be stored at the end of processing commands.
+ loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin));
+ fin ->
+ terminate(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'})
+ end;
+ {more, ignore, Len} ->
+ %% @todo This setting should be tested.
+ %%
+ %% While the default value doesn't warrant doing a streaming ignore
+ %% (and could work just fine with the 'more' clause), this value
+ %% is configurable and users may want to set it large.
+ MaxIgnoredLen = maps:get(max_ignored_frame_size_received, Opts, 16384),
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin when Len < MaxIgnoredLen ->
+ %% We are not processing commands so we must store the stream.
+ %% We also call ignored_frame here; we will not need to call
+ %% it again when ignoring the rest of the data.
+ Stream1 = Stream#stream{status={ignore, Len}},
+ State1 = ignored_frame(State, Stream1),
+ loop(stream_store(State1, Stream1));
+ nofin ->
+ terminate(State, {connection_error, h3_excessive_load,
+ 'Ignored frame larger than limit. (RFC9114 10.5)'});
+ fin ->
+ terminate(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'})
+ end;
+ {ignore, Rest} ->
+ parse(ignored_frame(State, Stream), StreamID, Rest, IsFin);
+ Error = {connection_error, _, _} ->
+ terminate(State, Error);
+ more when Data =:= <<>> ->
+ %% The buffer was already reset to <<>>.
+ loop(stream_store(State, Stream));
+ more ->
+ %% We're at the end of the data so FrameIsFin is equivalent to IsFin.
+ case IsFin of
+ nofin ->
+ loop(stream_store(State, Stream#stream{buffer=Data}));
+ fin ->
+ terminate(State, {connection_error, h3_frame_error,
+ 'Last frame on stream was truncated. (RFC9114 7.1)'})
+ end
+ end.
+
+%% We may receive multiple frames in a single QUIC packet.
+%% The FIN flag applies to the QUIC packet, not to the frame.
+%% We must therefore only consider the frame to have a FIN
+%% flag if there's no data remaining to be read.
+is_fin(fin, <<>>) -> fin;
+is_fin(_, _) -> nofin.
+
+parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
+ Stream0=#stream{id=StreamID}, Data, IsFin) ->
+ case cow_http3:parse_unidi_stream_header(Data) of
+ {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder ->
+ case cow_http3_machine:set_unidi_remote_stream_type(
+ StreamID, Type, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State = State0#state{http3_machine=HTTP3Machine},
+ Stream = Stream0#stream{status={unidi, Type}},
+ parse(stream_store(State, Stream), StreamID, Rest, IsFin);
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State0#state{http3_machine=HTTP3Machine}, Error)
+ end;
+ %% @todo Perhaps do this in cow_http3_machine directly.
+ {ok, push, _} ->
+ terminate(State0, {connection_error, h3_stream_creation_error,
+ 'Only servers can push. (RFC9114 6.2.2)'});
+ {ok, {webtransport, SessionID}, Rest} ->
+ become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin);
+ %% Unknown stream types must be ignored. We choose to abort the
+ %% stream instead of reading and discarding the incoming data.
+ {undefined, _} ->
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
+ %% Very unlikely to happen but WebTransport headers may be fragmented
+ %% as they are more than one byte. The fin flag in this case is an error,
+ %% but because it happens in WebTransport application data (the Session ID)
+ %% we only reset the impacted stream and not the entire connection.
+ more when IsFin =:= fin ->
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
+ more ->
+ loop(stream_store(State0, Stream0#stream{buffer=Data}))
+ end.
+
+frame(State=#state{http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, Frame, IsFin) ->
+ case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State#state{http3_machine=HTTP3Machine};
+ {ok, {data, Data}, HTTP3Machine} ->
+ data_frame(State#state{http3_machine=HTTP3Machine}, Stream, IsFin, Data);
+ {ok, {headers, Headers, PseudoHeaders, BodyLen}, Instrs, HTTP3Machine} ->
+ headers_frame(send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs),
+ Stream, IsFin, Headers, PseudoHeaders, BodyLen);
+ {ok, {trailers, _Trailers}, Instrs, HTTP3Machine} ->
+ %% @todo Propagate trailers.
+ send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs);
+ {ok, GoAway={goaway, _}, HTTP3Machine} ->
+ goaway(State#state{http3_machine=HTTP3Machine}, GoAway);
+ {error, Error={stream_error, _Reason, _Human}, Instrs, HTTP3Machine} ->
+ State1 = send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs),
+ reset_stream(State1, Stream, Error);
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end.
+
+data_frame(State=#state{opts=Opts},
+ Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) ->
+ try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
+ {Commands, StreamState} ->
+ commands(State, Stream#stream{state=StreamState}, Commands)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(data,
+ [StreamID, IsFin, Data, StreamState0],
+ Class, Exception, Stacktrace), Opts),
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
+ 'Unhandled exception in cowboy_stream:data/4.'})
+ end.
+
+headers_frame(State, Stream, IsFin, Headers,
+ PseudoHeaders=#{method := <<"CONNECT">>}, _)
+ when map_size(PseudoHeaders) =:= 2 ->
+ early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
+ 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)');
+headers_frame(State, Stream, IsFin, Headers,
+ PseudoHeaders=#{method := <<"TRACE">>}, _) ->
+ early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501,
+ 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)');
+headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) ->
+ headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
+headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) ->
+ case lists:keyfind(<<"host">>, 1, Headers) of
+ {_, Authority} ->
+ headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority);
+ _ ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'})
+ end.
+
+headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert},
+ Stream=#stream{id=StreamID}, IsFin, Headers,
+ PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs},
+ BodyLen, Authority) ->
+ try cow_http_hd:parse_host(Authority) of
+ {Host, Port0} ->
+ Port = ensure_port(Scheme, Port0),
+ try cow_http:parse_fullpath(PathWithQs) of
+ {<<>>, _} ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'The path component must not be empty. (RFC7540 8.1.2.3)'});
+ {Path, Qs} ->
+ Req0 = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+ peer => Peer,
+ sock => Sock,
+ cert => Cert,
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ port => Port,
+ path => Path,
+ qs => Qs,
+ version => 'HTTP/3',
+ headers => headers_to_map(Headers, #{}),
+ has_body => IsFin =:= nofin,
+ body_length => BodyLen
+ },
+ %% We add the protocol information for extended CONNECTs.
+ Req = case PseudoHeaders of
+ #{protocol := Protocol} -> Req0#{protocol => Protocol};
+ _ -> Req0
+ end,
+ headers_frame(State, Stream, Req)
+ catch _:_ ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'})
+ end
+ catch _:_ ->
+ reset_stream(State, Stream, {stream_error, h3_message_error,
+ 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'})
+ end.
+
+%% @todo Copied from cowboy_http2.
+%% @todo How to handle "http"?
+ensure_port(<<"http">>, undefined) -> 80;
+ensure_port(<<"https">>, undefined) -> 443;
+ensure_port(_, Port) -> Port.
+
+%% @todo Copied from cowboy_http2.
+%% This function is necessary to properly handle duplicate headers
+%% and the special-case cookie header.
+headers_to_map([], Acc) ->
+ Acc;
+headers_to_map([{Name, Value}|Tail], Acc0) ->
+ Acc = case Acc0 of
+ %% The cookie header does not use proper HTTP header lists.
+ #{Name := Value0} when Name =:= <<"cookie">> ->
+ Acc0#{Name => << Value0/binary, "; ", Value/binary >>};
+ #{Name := Value0} ->
+ Acc0#{Name => << Value0/binary, ", ", Value/binary >>};
+ _ ->
+ Acc0#{Name => Value}
+ end,
+ headers_to_map(Tail, Acc).
+
+%% @todo WebTransport CONNECT requests must have extra checks on settings.
+%% @todo We may also need to defer them if we didn't get settings.
+headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
+ try cowboy_stream:init(StreamID, Req, Opts) of
+ {Commands, StreamState} ->
+ commands(State, Stream#stream{state=StreamState}, Commands)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(init,
+ [StreamID, Req, Opts],
+ Class, Exception, Stacktrace), Opts),
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
+ 'Unhandled exception in cowboy_stream:init/3.'})
+ end.
+
+early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
+ Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method},
+ StatusCode0, HumanReadable) ->
+ %% We automatically terminate the stream but it is not an error
+ %% per se (at least not in the first implementation).
+ Reason = {stream_error, h3_no_error, HumanReadable},
+ %% The partial Req is minimal for now. We only have one case
+ %% where it can be called (when a method is completely disabled).
+ PartialReq = #{
+ ref => Ref,
+ peer => Peer,
+ method => Method,
+ headers => headers_to_map(Headers, #{})
+ },
+ Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>},
+ try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
+ {response, StatusCode, RespHeaders, RespBody} ->
+ send_response(State0, Stream, StatusCode, RespHeaders, RespBody)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(early_error,
+ [StreamID, Reason, PartialReq, Resp, Opts],
+ Class, Exception, Stacktrace), Opts),
+ %% We still need to send an error response, so send what we initially
+ %% wanted to send. It's better than nothing.
+ send_headers(State0, Stream, fin, StatusCode0, RespHeaders0)
+ end.
+
+%% Datagrams.
+
+parse_datagram(State, Data0) ->
+ {SessionID, Data} = cow_http3:parse_datagram(Data0),
+ case stream_get(State, SessionID) of
+ #stream{status={webtransport_session, _}} ->
+ webtransport_event(State, SessionID, {datagram, Data}),
+ loop(State);
+ _ ->
+ error(todo) %% @todo Might be a future WT session or an error.
+ end.
+
+%% Erlang messages.
+
+down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
+ State = case cowboy_children:down(Children0, Pid) of
+ %% The stream was terminated already.
+ {ok, undefined, Children} ->
+ State0#state{children=Children};
+ %% The stream is still running.
+ {ok, StreamID, Children} ->
+ info(State0#state{children=Children}, StreamID, Msg);
+ %% The process was unknown.
+ error ->
+ cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n",
+ [Msg, Pid], Opts),
+ State0
+ end,
+ if
+%% @todo
+% State#state.http2_status =:= closing, State#state.streams =:= #{} ->
+% terminate(State, {stop, normal, 'The connection is going away.'});
+ true ->
+ State
+ end.
+
+info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) ->
+ case stream_get(State, StreamID) of
+ Stream=#stream{state=StreamState0} ->
+ try cowboy_stream:info(StreamID, Msg, StreamState0) of
+ {Commands, StreamState} ->
+ commands(State, Stream#stream{state=StreamState}, Commands)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(info,
+ [StreamID, Msg, StreamState0],
+ Class, Exception, Stacktrace), Opts),
+ reset_stream(State, Stream, {internal_error, {Class, Exception},
+ 'Unhandled exception in cowboy_stream:info/3.'})
+ end;
+ error ->
+ case is_lingering_stream(State, StreamID) of
+ true ->
+ ok;
+ false ->
+ cowboy:log(warning, "Received message ~p for unknown stream ~p.",
+ [Msg, StreamID], Opts)
+ end,
+ State
+ end.
+
+%% Stream handler commands.
+
+commands(State, Stream, []) ->
+ stream_store(State, Stream);
+%% Error responses are sent only if a response wasn't sent already.
+commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID},
+ [{error_response, StatusCode, Headers, Body}|Tail]) ->
+ case cow_http3_machine:get_bidi_stream_local_state(StreamID, HTTP3Machine) of
+ {ok, idle} ->
+ commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]);
+ _ ->
+ commands(State, Stream, Tail)
+ end;
+%% Send an informational response.
+commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) ->
+ State = send_headers(State0, Stream, idle, StatusCode, Headers),
+ commands(State, Stream, Tail);
+%% Send response headers.
+commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) ->
+ State = send_response(State0, Stream, StatusCode, Headers, Body),
+ commands(State, Stream, Tail);
+%% Send response headers.
+commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) ->
+ State = send_headers(State0, Stream, nofin, StatusCode, Headers),
+ commands(State, Stream, Tail);
+%%% Send a response body chunk.
+commands(State0=#state{conn=Conn}, Stream=#stream{id=StreamID}, [{data, IsFin, Data}|Tail]) ->
+ _ = case Data of
+ {sendfile, Offset, Bytes, Path} ->
+ %% Temporary solution to do sendfile over QUIC.
+ {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID},
+ Path, Offset, Bytes, []),
+ ok = maybe_socket_error(State0,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), IsFin));
+ _ ->
+ ok = maybe_socket_error(State0,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(Data), IsFin))
+ end,
+ State = maybe_send_is_fin(State0, Stream, IsFin),
+ commands(State, Stream, Tail);
+%%% Send trailers.
+commands(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, [{trailers, Trailers}|Tail]) ->
+ State = case cow_http3_machine:prepare_trailers(
+ StreamID, HTTP3Machine0, maps:to_list(Trailers)) of
+ {trailers, HeaderBlock, Instrs, HTTP3Machine} ->
+ State1 = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs),
+ ok = maybe_socket_error(State1,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), fin)),
+ State1;
+ {no_trailers, HTTP3Machine} ->
+ ok = maybe_socket_error(State0,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin)),
+ State0#state{http3_machine=HTTP3Machine}
+ end,
+ commands(State, Stream, Tail);
+%% Send a push promise.
+%%
+%% @todo Responses sent as a result of a push_promise request
+%% must not send push_promise frames themselves.
+%%
+%% @todo We should not send push_promise frames when we are
+%% in the closing http2_status.
+%commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0},
+% Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) ->
+% Authority = case {Scheme, Port} of
+% {<<"http">>, 80} -> Host;
+% {<<"https">>, 443} -> Host;
+% _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)])
+% end,
+% PathWithQs = iolist_to_binary(case Qs of
+% <<>> -> Path;
+% _ -> [Path, $?, Qs]
+% end),
+% PseudoHeaders = #{
+% method => Method,
+% scheme => Scheme,
+% authority => Authority,
+% path => PathWithQs
+% },
+% %% We need to make sure the header value is binary before we can
+% %% create the Req object, as it expects them to be flat.
+% Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)),
+% %% @todo
+% State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0,
+% PseudoHeaders, Headers) of
+% {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} ->
+% Transport:send(Socket, cow_http2:push_promise(
+% StreamID, PromisedStreamID, HeaderBlock)),
+% headers_frame(State0#state{http3_machine=HTTP2Machine},
+% PromisedStreamID, fin, Headers, PseudoHeaders, 0);
+% {error, no_push} ->
+% State0
+% end,
+% commands(State, Stream, Tail);
+%%% Read the request body.
+%commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) ->
+commands(State, Stream, [{flow, _Size}|Tail]) ->
+ %% @todo We should tell the QUIC stream to increase its window size.
+% #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams,
+% State = update_window(State0#state{flow=Flow + Size,
+% streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}},
+% StreamID),
+ commands(State, Stream, Tail);
+%% Supervise a child process.
+commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
+ [{spawn, Pid, Shutdown}|Tail]) ->
+ commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
+ Stream, Tail);
+%% Error handling.
+commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
+ %% @todo Do we want to run the commands after an internal_error?
+ %% @todo Do we even allow commands after?
+ %% @todo Only reset when the stream still exists.
+ reset_stream(State, Stream, Error);
+%% Use a different protocol within the stream (CONNECT :protocol).
+%% @todo Make sure we error out when the feature is disabled.
+commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ #state{http3_machine=HTTP3Machine0} = State,
+ Stream1 = #stream{state=StreamState} = stream_get(State, StreamID),
+ %% The stream becomes a WT session at that point. It is the
+ %% parent stream of all streams in this WT session. The
+ %% cowboy_stream state is kept because it will be needed
+ %% to terminate the stream properly.
+ HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0),
+ Stream = Stream1#stream{
+ status={webtransport_session, normal},
+ state={cowboy_webtransport, WTState#{stream_state => StreamState}}
+ },
+ %% @todo We must propagate the buffer to capsule handling if any.
+ commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
+commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ Stream = stream_get(State, StreamID),
+ commands(State, Stream, Tail);
+%% Set options dynamically.
+commands(State, Stream, [{set_options, _Opts}|Tail]) ->
+ commands(State, Stream, Tail);
+commands(State, Stream, [stop|_Tail]) ->
+ %% @todo Do we want to run the commands after a stop?
+ %% @todo Do we even allow commands after?
+ stop_stream(State, Stream);
+%% Log event.
+commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) ->
+ cowboy:log(Log, Opts),
+ commands(State, Stream, Tail).
+
+send_response(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, StatusCode, Headers, Body) ->
+ Size = case Body of
+ {sendfile, _, Bytes0, _} -> Bytes0;
+ _ -> iolist_size(Body)
+ end,
+ case Size of
+ 0 ->
+ State = send_headers(State0, Stream, fin, StatusCode, Headers),
+ maybe_send_is_fin(State, Stream, fin);
+ _ ->
+ %% @todo Add a test for HEAD to make sure we don't send the body when
+ %% returning {response...} from a stream handler (or {headers...} then {data...}).
+ {ok, _IsFin, HeaderBlock, Instrs, HTTP3Machine}
+ = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin,
+ #{status => cow_http:status_to_integer(StatusCode)},
+ headers_to_list(Headers)),
+ State = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs),
+ %% @todo It might be better to do async sends.
+ _ = case Body of
+ {sendfile, Offset, Bytes, Path} ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock))),
+ %% Temporary solution to do sendfile over QUIC.
+ {ok, _} = maybe_socket_error(State,
+ ranch_transport:sendfile(?MODULE, {Conn, StreamID},
+ Path, Offset, Bytes, [])),
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin));
+ _ ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, [
+ cow_http3:headers(HeaderBlock),
+ cow_http3:data(Body)
+ ], fin))
+ end,
+ maybe_send_is_fin(State, Stream, fin)
+ end.
+
+maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, fin) ->
+ HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0),
+ maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream);
+maybe_send_is_fin(State, _, _) ->
+ State.
+
+%% Temporary callback to do sendfile over QUIC.
+-spec send({cowboy_quicer:quicer_connection_handle(), cow_http3:stream_id()},
+ iodata()) -> ok | {error, any()}.
+
+send({Conn, StreamID}, IoData) ->
+ cowboy_quicer:send(Conn, StreamID, cow_http3:data(IoData)).
+
+send_headers(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ #stream{id=StreamID}, IsFin0, StatusCode, Headers) ->
+ {ok, IsFin, HeaderBlock, Instrs, HTTP3Machine}
+ = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0,
+ #{status => cow_http:status_to_integer(StatusCode)},
+ headers_to_list(Headers)),
+ State = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs),
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), IsFin)),
+ State.
+
+%% The set-cookie header is special; we can only send one cookie per header.
+headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
+ Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)),
+ Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies];
+headers_to_list(Headers) ->
+ maps:to_list(Headers).
+
+%% @todo We would open unidi streams here if we only open on-demand.
+%% No instructions.
+send_instructions(State, undefined) ->
+ State;
+%% Decoder instructions.
+send_instructions(State=#state{conn=Conn, local_decoder_id=DecoderID},
+ {decoder_instructions, DecData}) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, DecoderID, DecData)),
+ State;
+%% Encoder instructions.
+send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
+ {encoder_instructions, EncData}) ->
+ ok = maybe_socket_error(State,
+ cowboy_quicer:send(Conn, EncoderID, EncData)),
+ State.
+
+%% We mark the stream as being a WebTransport stream
+%% and then continue parsing the data as a WebTransport
+%% stream. This function is common for incoming unidi
+%% and bidi streams.
+become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0},
+ Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) ->
+ case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State = State0#state{http3_machine=HTTP3Machine},
+ Stream = Stream0#stream{status={webtransport_stream, SessionID}},
+ webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}),
+ %% We don't need to parse the remaining data if there isn't any.
+ case {Rest, IsFin} of
+ {<<>>, nofin} -> loop(stream_store(State, Stream));
+ _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin)
+ end
+ %% @todo Error conditions.
+ end.
+
+webtransport_event(State, SessionID, Event) ->
+ #stream{
+ status={webtransport_session, _},
+ state={cowboy_webtransport, #{session_pid := SessionPid}}
+ } = stream_get(State, SessionID),
+ SessionPid ! {'$webtransport_event', SessionID, Event},
+ ok.
+
+webtransport_commands(State, SessionID, Commands) ->
+ case stream_get(State, SessionID) of
+ Session = #stream{status={webtransport_session, _}} ->
+ wt_commands(State, Session, Commands);
+ %% The stream has been terminated, ignore pending commands.
+ error ->
+ State
+ end.
+
+wt_commands(State, _, []) ->
+ State;
+wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID},
+ [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) ->
+ %% Because opening the stream involves sending a short header
+ %% we necessarily write data. The InitialData variable allows
+ %% providing additional data to be sent in the same packet.
+ StartF = case StreamType of
+ bidi -> start_bidi_stream;
+ unidi -> start_unidi_stream
+ end,
+ Header = cow_http3:webtransport_stream_header(SessionID, StreamType),
+ case cowboy_quicer:StartF(Conn, [Header, InitialData]) of
+ {ok, StreamID} ->
+ %% @todo Pass Session directly?
+ webtransport_event(State0, SessionID,
+ {opened_stream_id, OpenStreamRef, StreamID}),
+ State = stream_new_local(State0, StreamID, StreamType,
+ {webtransport_stream, SessionID}),
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]});
+wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID},
+ [{send, datagram, Data}|Tail]) ->
+ case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ case cowboy_quicer:send(Conn, StreamID, Data, nofin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) ->
+ %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream.
+ Capsule = cow_capsule:wt_drain_session(),
+ case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
+ when Cmd =:= close; element(1, Cmd) =:= close ->
+ %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream.
+ {AppCode, AppMsg} = case Cmd of
+ close -> {0, <<>>};
+ {close, AppCode0} -> {AppCode0, <<>>};
+ {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0}
+ end,
+ Capsule = cow_capsule:wt_close_session(AppCode, AppMsg),
+ case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
+ ok ->
+ State = webtransport_terminate_session(State0, Session),
+ %% @todo Because the handler is in a separate process
+ %% we must wait for it to stop and eventually
+ %% kill the process if it takes too long.
+ %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it).
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end.
+
+webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0,
+ streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
+ %% Reset/abort the WT streams.
+ Streams = maps:filtermap(fun
+ (_, #stream{id=StreamID, status={webtransport_session, _}})
+ when StreamID =:= SessionID ->
+ %% We remove the session stream but do the shutdown outside this function.
+ false;
+ (StreamID, #stream{status={webtransport_stream, StreamSessionID}})
+ when StreamSessionID =:= SessionID ->
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ both, cow_http3:error_to_code(wt_session_gone)),
+ false;
+ (_, _) ->
+ true
+ end, Streams0),
+ %% Keep the streams in lingering state.
+ %% We only keep up to 100 streams in this state. @todo Make it configurable?
+ Terminated = maps:keys(Streams0) -- maps:keys(Streams),
+ Lingering = lists:sublist(Terminated ++ Lingering0, 100),
+ %% Update the HTTP3 state machine.
+ HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0),
+ State#state{
+ http3_machine=HTTP3Machine,
+ streams=Streams,
+ lingering_streams=Lingering
+ }.
+
+stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) ->
+ case stream_get(State, StreamID) of
+ %% Cleanly terminating the CONNECT stream is equivalent
+ %% to an application error code of 0 and empty message.
+ Stream = #stream{status={webtransport_session, _}} ->
+ webtransport_event(State, StreamID, {closed, 0, <<>>}),
+ %% Shutdown the CONNECT stream fully.
+ cowboy_quicer:shutdown_stream(Conn, StreamID),
+ webtransport_terminate_session(State, Stream);
+ _ ->
+ State
+ end.
+
+reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
+ Stream=#stream{id=StreamID}, Error) ->
+ Reason = case Error of
+ {internal_error, _, _} -> h3_internal_error;
+ {stream_error, Reason0, _} -> Reason0
+ end,
+ %% @todo Do we want to close both sides?
+ %% @todo Should we close the send side if the receive side was already closed?
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ both, cow_http3:error_to_code(Reason)),
+ State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error);
+ {error, not_found} ->
+ terminate_stream(State0, Stream, Error)
+ end,
+%% @todo
+% case reset_rate(State1) of
+% {ok, State} ->
+% State;
+% error ->
+% terminate(State1, {connection_error, enhance_your_calm,
+% 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'})
+% end.
+ State1.
+
+stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) ->
+ %% We abort reading when stopping the stream but only
+ %% if the client was not finished sending data.
+ %% We mark the stream as 'stopping' either way.
+ State = case cow_http3_machine:get_bidi_stream_remote_state(StreamID, HTTP3Machine) of
+ {ok, fin} ->
+ stream_store(State0, Stream#stream{status=stopping});
+ {error, not_found} ->
+ stream_store(State0, Stream#stream{status=stopping});
+ _ ->
+ stream_abort_receive(State0, Stream, h3_no_error)
+ end,
+ %% Then we may need to send a response or terminate it
+ %% if the stream handler did not do so already.
+ case cow_http3_machine:get_bidi_stream_local_state(StreamID, HTTP3Machine) of
+ %% When the stream terminates normally (without resetting the stream)
+ %% and no response was sent, we need to send a proper response back to the client.
+ {ok, idle} ->
+ info(State, StreamID, {response, 204, #{}, <<>>});
+ %% When a response was sent but not terminated, we need to close the stream.
+ %% We send a final DATA frame to complete the stream.
+ {ok, nofin} ->
+ info(State, StreamID, {data, fin, <<>>});
+ %% When a response was sent fully we can terminate the stream,
+ %% regardless of the stream being in half-closed or closed state.
+ _ ->
+ terminate_stream(State, Stream, normal)
+ end.
+
+maybe_terminate_stream(State, Stream=#stream{status=stopping}) ->
+ terminate_stream(State, Stream, normal);
+%% The Stream will be stored in the State at the end of commands processing.
+maybe_terminate_stream(State, _) ->
+ State.
+
+terminate_stream(State=#state{streams=Streams0, children=Children0},
+ #stream{id=StreamID, state=StreamState}, Reason) ->
+ Streams = maps:remove(StreamID, Streams0),
+ terminate_stream_handler(State, StreamID, Reason, StreamState),
+ Children = cowboy_children:shutdown(Children0, StreamID),
+ stream_linger(State#state{streams=Streams, children=Children}, StreamID).
+
+terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
+ try
+ cowboy_stream:terminate(StreamID, Reason, StreamState)
+ catch Class:Exception:Stacktrace ->
+ cowboy:log(cowboy_stream:make_error_log(terminate,
+ [StreamID, Reason, StreamState],
+ Class, Exception, Stacktrace), Opts)
+ end.
+
+ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) ->
+ case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State#state{http3_machine=HTTP3Machine};
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end.
+
+stream_abort_receive(State=#state{conn=Conn}, Stream=#stream{id=StreamID}, Reason) ->
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ receiving, cow_http3:error_to_code(Reason)),
+ stream_store(State, Stream#stream{status=stopping}).
+
+%% @todo Graceful connection shutdown.
+%% We terminate the connection immediately if it hasn't fully been initialized.
+-spec goaway(#state{}, {goaway, _}) -> no_return().
+goaway(State, {goaway, _}) ->
+ terminate(State, {stop, goaway, 'The connection is going away.'}).
+
+%% Function copied from cowboy_http.
+maybe_socket_error(State, {error, closed}) ->
+ terminate(State, {socket_error, closed, 'The socket has been closed.'});
+maybe_socket_error(State, Reason) ->
+ maybe_socket_error(State, Reason, 'An error has occurred on the socket.').
+
+maybe_socket_error(_, Result = ok, _) ->
+ Result;
+maybe_socket_error(_, Result = {ok, _}, _) ->
+ Result;
+maybe_socket_error(State, {error, Reason}, Human) ->
+ terminate(State, {socket_error, Reason, Human}).
+
+-spec terminate(#state{} | undefined, _) -> no_return().
+terminate(undefined, Reason) ->
+ exit({shutdown, Reason});
+terminate(State=#state{conn=Conn, %http3_status=Status,
+ %http3_machine=HTTP3Machine,
+ streams=Streams, children=Children}, Reason) ->
+% if
+% Status =:= connected; Status =:= closing_initiated ->
+%% @todo
+% %% We are terminating so it's OK if we can't send the GOAWAY anymore.
+% _ = cowboy_quicer:send(Conn, ControlID, cow_http3:goaway(
+% cow_http3_machine:get_last_streamid(HTTP3Machine))),
+ %% We already sent the GOAWAY frame.
+% Status =:= closing ->
+% ok
+% end,
+ terminate_all_streams(State, maps:to_list(Streams), Reason),
+ cowboy_children:terminate(Children),
+% terminate_linger(State),
+ _ = cowboy_quicer:shutdown(Conn, cow_http3:error_to_code(terminate_reason(Reason))),
+ exit({shutdown, Reason}).
+
+terminate_reason({connection_error, Reason, _}) -> Reason;
+terminate_reason({stop, _, _}) -> h3_no_error;
+terminate_reason({socket_error, _, _}) -> h3_internal_error.
+%terminate_reason({internal_error, _, _}) -> internal_error.
+
+terminate_all_streams(_, [], _) ->
+ ok;
+terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) ->
+ terminate_stream_handler(State, StreamID, Reason, StreamState),
+ terminate_all_streams(State, Tail, Reason).
+
+stream_get(#state{streams=Streams}, StreamID) ->
+ maps:get(StreamID, Streams, error).
+
+stream_new_local(State, StreamID, StreamType, Status) ->
+ stream_new(State, StreamID, StreamType, unidi_local, Status).
+
+stream_new_remote(State, StreamID, StreamType) ->
+ Status = case StreamType of
+ unidi -> header;
+ bidi -> normal
+ end,
+ stream_new(State, StreamID, StreamType, unidi_remote, Status).
+
+stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
+ StreamID, StreamType, UnidiType, Status) ->
+ {HTTP3Machine, Status} = case StreamType of
+ unidi ->
+ {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0),
+ Status};
+ bidi ->
+ {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
+ Status}
+ end,
+ Stream = #stream{id=StreamID, status=Status},
+ State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
+
+%% Stream closed message for a local (write-only) unidi stream.
+stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) ->
+ stream_closed1(State, StreamID);
+stream_closed(State=#state{local_encoder_id=StreamID}, StreamID, _) ->
+ stream_closed1(State, StreamID);
+stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
+ stream_closed1(State, StreamID);
+stream_closed(State=#state{opts=Opts,
+ streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
+ case maps:take(StreamID, Streams0) of
+ %% In the WT session's case, streams will be
+ %% removed in webtransport_terminate_session.
+ {Stream=#stream{status={webtransport_session, _}}, _} ->
+ webtransport_event(State, StreamID, closed_abruptly),
+ webtransport_terminate_session(State, Stream);
+ {#stream{state=undefined}, Streams} ->
+ %% Unidi stream has no handler/children.
+ stream_closed1(State#state{streams=Streams}, StreamID);
+ %% We only stop bidi streams if the stream was closed with an error
+ %% or the stream was already in the process of stopping.
+ {#stream{status=Status, state=StreamState}, Streams}
+ when Status =:= stopping; ErrorCode =/= 0 ->
+ terminate_stream_handler(State, StreamID, closed, StreamState),
+ Children = cowboy_children:shutdown(Children0, StreamID),
+ stream_closed1(State#state{streams=Streams, children=Children}, StreamID);
+ %% Don't remove a stream that terminated properly but
+ %% has chosen to remain up (custom stream handlers).
+ {_, _} ->
+ stream_closed1(State, StreamID);
+ %% Stream closed message for a stream that has been reset. Ignore.
+ error ->
+ case is_lingering_stream(State, StreamID) of
+ true ->
+ ok;
+ false ->
+ %% We avoid logging the data as it could be quite large.
+ cowboy:log(warning, "Received stream_closed for unknown stream ~p. ~p ~p",
+ [StreamID, self(), Streams0], Opts)
+ end,
+ State
+ end.
+
+stream_closed1(State=#state{http3_machine=HTTP3Machine0}, StreamID) ->
+ case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State#state{http3_machine=HTTP3Machine};
+ {error, Error={connection_error, _, _}, HTTP3Machine} ->
+ terminate(State#state{http3_machine=HTTP3Machine}, Error)
+ end.
+
+stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) ->
+ State#state{streams=Streams#{StreamID => Stream}}.
+
+stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) ->
+ %% We only keep up to 100 streams in this state. @todo Make it configurable?
+ Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
+ State#state{lingering_streams=Lingering}.
+
+is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) ->
+ lists:member(StreamID, Lingering).
diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl
index 6859c82..629d06e 100644
--- a/src/cowboy_loop.erl
+++ b/src/cowboy_loop.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl
index 27f14d4..67bf1a6 100644
--- a/src/cowboy_metrics_h.erl
+++ b/src/cowboy_metrics_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_middleware.erl b/src/cowboy_middleware.erl
index efeef4f..97c1498 100644
--- a/src/cowboy_middleware.erl
+++ b/src/cowboy_middleware.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl
new file mode 100644
index 0000000..aa52fae
--- /dev/null
+++ b/src/cowboy_quicer.erl
@@ -0,0 +1,283 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% QUIC transport using the emqx/quicer NIF.
+
+-module(cowboy_quicer).
+
+%% Connection.
+-export([peername/1]).
+-export([sockname/1]).
+-export([peercert/1]).
+-export([shutdown/2]).
+
+%% Streams.
+-export([start_bidi_stream/2]).
+-export([start_unidi_stream/2]).
+-export([send/3]).
+-export([send/4]).
+-export([send_datagram/2]).
+-export([shutdown_stream/2]).
+-export([shutdown_stream/4]).
+
+%% Messages.
+-export([handle/1]).
+
+-ifndef(COWBOY_QUICER).
+
+-spec peername(_) -> no_return().
+peername(_) -> no_quicer().
+
+-spec sockname(_) -> no_return().
+sockname(_) -> no_quicer().
+
+-spec peercert(_) -> no_return().
+peercert(_) -> no_quicer().
+
+-spec shutdown(_, _) -> no_return().
+shutdown(_, _) -> no_quicer().
+
+-spec start_bidi_stream(_, _) -> no_return().
+start_bidi_stream(_, _) -> no_quicer().
+
+-spec start_unidi_stream(_, _) -> no_return().
+start_unidi_stream(_, _) -> no_quicer().
+
+-spec send(_, _, _) -> no_return().
+send(_, _, _) -> no_quicer().
+
+-spec send(_, _, _, _) -> no_return().
+send(_, _, _, _) -> no_quicer().
+
+-spec send_datagram(_, _) -> no_return().
+send_datagram(_, _) -> no_quicer().
+
+-spec shutdown_stream(_, _) -> no_return().
+shutdown_stream(_, _) -> no_quicer().
+
+-spec shutdown_stream(_, _, _, _) -> no_return().
+shutdown_stream(_, _, _, _) -> no_quicer().
+
+-spec handle(_) -> no_return().
+handle(_) -> no_quicer().
+
+no_quicer() ->
+ error({no_quicer,
+ "Cowboy must be compiled with environment variable COWBOY_QUICER=1 "
+ "or with compilation flag -D COWBOY_QUICER=1 in order to enable "
+ "QUIC support using the emqx/quic NIF"}).
+
+-else.
+
+%% @todo Make quicer export these types.
+-type quicer_connection_handle() :: reference().
+-export_type([quicer_connection_handle/0]).
+
+-type quicer_app_errno() :: non_neg_integer().
+
+-include_lib("quicer/include/quicer.hrl").
+
+%% Connection.
+
+-spec peername(quicer_connection_handle())
+ -> {ok, {inet:ip_address(), inet:port_number()}}
+ | {error, any()}.
+
+peername(Conn) ->
+ quicer:peername(Conn).
+
+-spec sockname(quicer_connection_handle())
+ -> {ok, {inet:ip_address(), inet:port_number()}}
+ | {error, any()}.
+
+sockname(Conn) ->
+ quicer:sockname(Conn).
+
+-spec peercert(quicer_connection_handle())
+ -> {ok, public_key:der_encoded()}
+ | {error, any()}.
+
+peercert(Conn) ->
+ quicer_nif:peercert(Conn).
+
+-spec shutdown(quicer_connection_handle(), quicer_app_errno())
+ -> ok | {error, any()}.
+
+shutdown(Conn, ErrorCode) ->
+ quicer:shutdown_connection(Conn,
+ ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
+ ErrorCode).
+
+%% Streams.
+
+-spec start_bidi_stream(quicer_connection_handle(), iodata())
+ -> {ok, cow_http3:stream_id()}
+ | {error, any()}.
+
+start_bidi_stream(Conn, InitialData) ->
+ start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE).
+
+-spec start_unidi_stream(quicer_connection_handle(), iodata())
+ -> {ok, cow_http3:stream_id()}
+ | {error, any()}.
+
+start_unidi_stream(Conn, InitialData) ->
+ start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL).
+
+start_stream(Conn, InitialData, OpenFlag) ->
+ case quicer:start_stream(Conn, #{
+ active => true,
+ open_flag => OpenFlag}) of
+ {ok, StreamRef} ->
+ case quicer:send(StreamRef, InitialData) of
+ {ok, _} ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ put({quicer_stream, StreamID}, StreamRef),
+ {ok, StreamID};
+ Error ->
+ Error
+ end;
+ {error, Reason1, Reason2} ->
+ {error, {Reason1, Reason2}};
+ Error ->
+ Error
+ end.
+
+-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata())
+ -> ok | {error, any()}.
+
+send(Conn, StreamID, Data) ->
+ send(Conn, StreamID, Data, nofin).
+
+-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata(), cow_http:fin())
+ -> ok | {error, any()}.
+
+send(_Conn, StreamID, Data, IsFin) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ Size = iolist_size(Data),
+ case quicer:send(StreamRef, Data, send_flag(IsFin)) of
+ {ok, Size} ->
+ ok;
+ {error, Reason1, Reason2} ->
+ {error, {Reason1, Reason2}};
+ Error ->
+ Error
+ end.
+
+send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE;
+send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.
+
+-spec send_datagram(quicer_connection_handle(), iodata())
+ -> ok | {error, any()}.
+
+send_datagram(Conn, Data) ->
+ %% @todo Fix/ignore the Dialyzer error instead of doing this.
+ DataBin = iolist_to_binary(Data),
+ Size = byte_size(DataBin),
+ case quicer:send_dgram(Conn, DataBin) of
+ {ok, Size} ->
+ ok;
+ %% @todo Handle error cases.
+ Error ->
+ Error
+ end.
+
+-spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id())
+ -> ok.
+
+shutdown_stream(_Conn, StreamID) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ _ = quicer:shutdown_stream(StreamRef),
+ ok.
+
+-spec shutdown_stream(quicer_connection_handle(),
+ cow_http3:stream_id(), both | receiving, quicer_app_errno())
+ -> ok.
+
+shutdown_stream(_Conn, StreamID, Dir, ErrorCode) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
+ ok.
+
+%% @todo Are these flags correct for what we want?
+shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
+shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
+
+%% Messages.
+
+%% @todo Probably should have the Conn given as argument too?
+-spec handle({quic, _, _, _})
+ -> {data, cow_http3:stream_id(), cow_http:fin(), binary()}
+ | {datagram, binary()}
+ | {stream_started, cow_http3:stream_id(), unidi | bidi}
+ | {stream_closed, cow_http3:stream_id(), quicer_app_errno()}
+ | closed
+ | {peer_send_shutdown, cow_http3:stream_id()}
+ | ok
+ | unknown
+ | {socket_error, any()}.
+
+handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of
+ ?QUIC_RECEIVE_FLAG_FIN -> fin;
+ _ -> nofin
+ end,
+ {data, StreamID, IsFin, Data};
+%% @todo Match on Conn.
+handle({quic, Data, _Conn, Flags}) when is_binary(Data), is_integer(Flags) ->
+ {datagram, Data};
+%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED.
+handle({quic, new_stream, StreamRef, #{flags := Flags}}) ->
+ case quicer:setopt(StreamRef, active, true) of
+ ok ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ put({quicer_stream, StreamID}, StreamRef),
+ StreamType = case quicer:is_unidirectional(Flags) of
+ true -> unidi;
+ false -> bidi
+ end,
+ {stream_started, StreamID, StreamType};
+ {error, Reason} ->
+ {socket_error, Reason}
+ end;
+%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE.
+handle({quic, stream_closed, StreamRef, #{error := ErrorCode}}) ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ {stream_closed, StreamID, ErrorCode};
+%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE.
+handle({quic, closed, Conn, _Flags}) ->
+ _ = quicer:close_connection(Conn),
+ closed;
+%% The following events are currently ignored either because
+%% I do not know what they do or because we do not need to
+%% take action.
+handle({quic, streams_available, _Conn, _Props}) ->
+ ok;
+handle({quic, dgram_state_changed, _Conn, _Props}) ->
+ ok;
+%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT
+handle({quic, transport_shutdown, _Conn, _Flags}) ->
+ ok;
+handle({quic, peer_send_shutdown, StreamRef, undefined}) ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ {peer_send_shutdown, StreamID};
+handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) ->
+ ok;
+handle({quic, shutdown, _Conn, success}) ->
+ ok;
+handle(_Msg) ->
+ unknown.
+
+-endif.
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 3f87677..550054e 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2011, Anthony Ramine <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%% Copyright (c) Anthony Ramine <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -445,6 +445,7 @@ parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_webs
parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1;
parse_header_fun(<<"trailer">>) -> fun cow_http_hd:parse_trailer/1;
parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1;
+parse_header_fun(<<"wt-available-protocols">>) -> fun cow_http_hd:parse_wt_available_protocols/1;
parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1.
parse_header(Name, Req, Default, ParseFun) ->
@@ -462,7 +463,7 @@ filter_cookies(Names0, Req=#{headers := Headers}) ->
case header(<<"cookie">>, Req) of
undefined -> Req;
Value0 ->
- Cookies0 = binary:split(Value0, <<$;>>),
+ Cookies0 = binary:split(Value0, <<$;>>, [global]),
Cookies = lists:filter(fun(Cookie) ->
lists:member(cookie_name(Cookie), Names)
end, Cookies0),
@@ -726,8 +727,10 @@ set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) ->
set_resp_header(Name,Value, Req) ->
Req#{resp_headers => #{Name => Value}}.
--spec set_resp_headers(cowboy:http_headers(), Req)
+-spec set_resp_headers(cowboy:http_headers() | [{binary(), iodata()}], Req)
-> Req when Req::req().
+set_resp_headers(Headers, Req) when is_list(Headers) ->
+ set_resp_headers_list(Headers, Req, #{});
set_resp_headers(#{<<"set-cookie">> := _}, _) ->
exit({response_error, invalid_header,
'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'});
@@ -736,6 +739,19 @@ set_resp_headers(Headers, Req=#{resp_headers := RespHeaders}) ->
set_resp_headers(Headers, Req) ->
Req#{resp_headers => Headers}.
+set_resp_headers_list([], Req, Acc) ->
+ set_resp_headers(Acc, Req);
+set_resp_headers_list([{<<"set-cookie">>, _}|_], _, _) ->
+ exit({response_error, invalid_header,
+ 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'});
+set_resp_headers_list([{Name, Value}|Tail], Req, Acc) ->
+ case Acc of
+ #{Name := ValueAcc} ->
+ set_resp_headers_list(Tail, Req, Acc#{Name => [ValueAcc, <<", ">>, Value]});
+ _ ->
+ set_resp_headers_list(Tail, Req, Acc#{Name => Value})
+ end.
+
-spec resp_header(binary(), req()) -> binary() | undefined.
resp_header(Name, Req) ->
resp_header(Name, Req, undefined).
diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl
index fcea71c..9f30fcf 100644
--- a/src/cowboy_rest.erl
+++ b/src/cowboy_rest.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -246,9 +246,6 @@
handler :: atom(),
handler_state :: any(),
- %% Allowed methods. Only used for OPTIONS requests.
- allowed_methods :: [binary()] | undefined,
-
%% Media type.
content_types_p = [] ::
[{binary() | {binary(), binary(), [{binary(), binary()}] | '*'},
@@ -307,17 +304,17 @@ known_methods(Req, State=#state{method=Method}) ->
Method =:= <<"POST">>; Method =:= <<"PUT">>;
Method =:= <<"PATCH">>; Method =:= <<"DELETE">>;
Method =:= <<"OPTIONS">> ->
- next(Req, State, fun uri_too_long/2);
+ uri_too_long(Req, State);
no_call ->
- next(Req, State, 501);
+ respond(Req, State, 501);
{stop, Req2, State2} ->
terminate(Req2, State2);
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{List, Req2, State2} ->
case lists:member(Method, List) of
- true -> next(Req2, State2, fun uri_too_long/2);
- false -> next(Req2, State2, 501)
+ true -> uri_too_long(Req2, State2);
+ false -> respond(Req2, State2, 501)
end
end.
@@ -327,39 +324,26 @@ uri_too_long(Req, State) ->
%% allowed_methods/2 should return a list of binary methods.
allowed_methods(Req, State=#state{method=Method}) ->
case call(Req, State, allowed_methods) of
- no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">> ->
- next(Req, State, fun malformed_request/2);
- no_call when Method =:= <<"OPTIONS">> ->
- next(Req, State#state{allowed_methods=
- [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]},
- fun malformed_request/2);
+ no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">>; Method =:= <<"OPTIONS">> ->
+ Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req),
+ malformed_request(Req2, State);
no_call ->
- method_not_allowed(Req, State,
- [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]);
+ Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req),
+ respond(Req2, State, 405);
{stop, Req2, State2} ->
terminate(Req2, State2);
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{List, Req2, State2} ->
+ Req3 = cowboy_req:set_resp_header(<<"allow">>, cow_http_hd:allow(List), Req2),
case lists:member(Method, List) of
- true when Method =:= <<"OPTIONS">> ->
- next(Req2, State2#state{allowed_methods=List},
- fun malformed_request/2);
true ->
- next(Req2, State2, fun malformed_request/2);
+ malformed_request(Req3, State2);
false ->
- method_not_allowed(Req2, State2, List)
+ respond(Req3, State2, 405)
end
end.
-method_not_allowed(Req, State, []) ->
- Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req),
- respond(Req2, State, 405);
-method_not_allowed(Req, State, Methods) ->
- << ", ", Allow/binary >> = << << ", ", M/binary >> || M <- Methods >>,
- Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req),
- respond(Req2, State, 405).
-
malformed_request(Req, State) ->
expect(Req, State, malformed_request, false, fun is_authorized/2, 400).
@@ -413,16 +397,10 @@ valid_entity_length(Req, State) ->
%% If you need to add additional headers to the response at this point,
%% you should do it directly in the options/2 call using set_resp_headers.
-options(Req, State=#state{allowed_methods=Methods, method= <<"OPTIONS">>}) ->
+options(Req, State=#state{method= <<"OPTIONS">>}) ->
case call(Req, State, options) of
- no_call when Methods =:= [] ->
- Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req),
- respond(Req2, State, 200);
no_call ->
- << ", ", Allow/binary >>
- = << << ", ", M/binary >> || M <- Methods >>,
- Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req),
- respond(Req2, State, 200);
+ respond(Req, State, 200);
{stop, Req2, State2} ->
terminate(Req2, State2);
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
@@ -471,7 +449,7 @@ content_types_provided(Req, State) ->
{[], Req2, State2} ->
not_acceptable(Req2, State2);
{CTP, Req2, State2} ->
- CTP2 = [normalize_content_types(P) || P <- CTP],
+ CTP2 = [normalize_content_types(P, provide) || P <- CTP],
State3 = State2#state{content_types_p=CTP2},
try cowboy_req:parse_header(<<"accept">>, Req2) of
undefined ->
@@ -491,10 +469,14 @@ content_types_provided(Req, State) ->
end
end.
-normalize_content_types({ContentType, Callback})
+normalize_content_types({ContentType, Callback}, _)
when is_binary(ContentType) ->
{cow_http_hd:parse_content_type(ContentType), Callback};
-normalize_content_types(Normalized) ->
+normalize_content_types(Normalized = {{Type, SubType, _}, _}, _)
+ when is_binary(Type), is_binary(SubType) ->
+ Normalized;
+%% Wildcard for content_types_accepted.
+normalize_content_types(Normalized = {'*', _}, accept) ->
Normalized.
prioritize_accept(Accept) ->
@@ -1059,7 +1041,7 @@ accept_resource(Req, State) ->
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{CTA, Req2, State2} ->
- CTA2 = [normalize_content_types(P) || P <- CTA],
+ CTA2 = [normalize_content_types(P, accept) || P <- CTA],
try cowboy_req:parse_header(<<"content-type">>, Req2) of
%% We do not match against the boundary parameter for multipart.
{Type = <<"multipart">>, SubType, Params} ->
@@ -1099,9 +1081,9 @@ process_content_type(Req, State=#state{method=Method, exists=Exists}, Fun) ->
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{true, Req2, State2} when Exists ->
- next(Req2, State2, fun has_resp_body/2);
+ has_resp_body(Req2, State2);
{true, Req2, State2} ->
- next(Req2, State2, fun maybe_created/2);
+ maybe_created(Req2, State2);
{false, Req2, State2} ->
respond(Req2, State2, 400);
{{created, ResURL}, Req2, State2} when Method =:= <<"POST">> ->
@@ -1640,5 +1622,6 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, Class,
erlang:raise(Class, Reason, Stacktrace).
terminate(Req, #state{handler=Handler, handler_state=HandlerState}) ->
+ %% @todo I don't think the result is used anywhere?
Result = cowboy_handler:terminate(normal, Req, HandlerState, Handler),
{ok, Req, Result}.
diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl
index 61c9012..393d82d 100644
--- a/src/cowboy_router.erl
+++ b/src/cowboy_router.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl
index a185ef1..ce34b01 100644
--- a/src/cowboy_static.erl
+++ b/src/cowboy_static.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2011, Magnus Klaar <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%% Copyright (c) Magnus Klaar <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -29,7 +29,7 @@
-type extra_charset() :: {charset, module(), function()} | {charset, binary()}.
-type extra_etag() :: {etag, module(), function()} | {etag, false}.
-type extra_mimetypes() :: {mimetypes, module(), function()}
- | {mimetypes, binary() | {binary(), binary(), [{binary(), binary()}]}}.
+ | {mimetypes, binary() | {binary(), binary(), '*' | [{binary(), binary()}]}}.
-type extra() :: [extra_charset() | extra_etag() | extra_mimetypes()].
-type opts() :: {file | dir, string() | binary()}
| {file | dir, string() | binary(), extra()}
@@ -332,7 +332,7 @@ forbidden(Req, State) ->
%% Detect the mimetype of the file.
-spec content_types_provided(Req, State)
- -> {[{binary(), get_file}], Req, State}
+ -> {[{binary() | {binary(), binary(), '*' | [{binary(), binary()}]}, get_file}], Req, State}
when State::state().
content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) ->
case lists:keyfind(mimetypes, 1, Extra) of
@@ -347,7 +347,7 @@ content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) ->
%% Detect the charset of the file.
-spec charsets_provided(Req, State)
- -> {[binary()], Req, State}
+ -> {[binary()], Req, State} | no_call
when State::state().
charsets_provided(Req, State={Path, _, Extra}) ->
case lists:keyfind(charset, 1, Extra) of
@@ -381,7 +381,7 @@ resource_exists(Req, State) ->
%% Generate an etag for the file.
-spec generate_etag(Req, State)
- -> {{strong | weak, binary()}, Req, State}
+ -> {{strong | weak, binary() | undefined}, Req, State}
when State::state().
generate_etag(Req, State={Path, {_, #file_info{size=Size, mtime=Mtime}},
Extra}) ->
@@ -408,7 +408,7 @@ last_modified(Req, State={_, {_, #file_info{mtime=Modified}}, _}) ->
%% Stream the file.
-spec get_file(Req, State)
- -> {{sendfile, 0, non_neg_integer(), binary()}, Req, State}
+ -> {{sendfile, 0, non_neg_integer(), binary()} | binary(), Req, State}
when State::state().
get_file(Req, State={Path, {direct, #file_info{size=Size}}, _}) ->
{{sendfile, 0, Size, Path}, Req, State};
diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl
index 6ceb5ba..6680bdc 100644
--- a/src/cowboy_stream.erl
+++ b/src/cowboy_stream.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -49,6 +49,7 @@
-type reason() :: normal | switch_protocol
| {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
| {socket_error, closed | atom(), human_reason()}
+ %% @todo Or cow_http3:error().
| {stream_error, cow_http2:error(), human_reason()}
| {connection_error, cow_http2:error(), human_reason()}
| {stop, cow_http2:frame() | {exit, any()}, human_reason()}.
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index 842bd8d..3c3c084 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -138,7 +138,7 @@ info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
stop
], State);
-info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
+info(StreamID, Exit={'EXIT', Pid, Reason}, State=#state{ref=Ref, pid=Pid}) ->
Commands0 = [{internal_error, Exit, 'Stream process crashed.'}],
Commands = case Reason of
normal -> Commands0;
@@ -146,11 +146,15 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p
{shutdown, _} -> Commands0;
_ -> [{log, error,
"Ranch listener ~p, connection process ~p, stream ~p "
- "had its request process ~p exit with reason "
- "~999999p and stacktrace ~999999p~n",
- [Ref, self(), StreamID, Pid, Reason, Stacktrace]}
+ "had its request process ~p exit with reason ~0p~n",
+ [Ref, self(), StreamID, Pid, Reason]}
|Commands0]
end,
+ %% @todo We are trying to send a 500 response before resetting
+ %% the stream. But due to the way the RESET_STREAM frame
+ %% works in QUIC the data may be lost. The problem is
+ %% known and a draft RFC exists at
+ %% https://www.ietf.org/id/draft-ietf-quic-reliable-stream-reset-03.html
do_info(StreamID, Exit, [
{error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}
|Commands], State);
diff --git a/src/cowboy_sub_protocol.erl b/src/cowboy_sub_protocol.erl
index 062fd38..1f24d00 100644
--- a/src/cowboy_sub_protocol.erl
+++ b/src/cowboy_sub_protocol.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2013, James Fish <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%% Copyright (c) James Fish <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_sup.erl b/src/cowboy_sup.erl
index e37f4cf..224ef7d 100644
--- a/src/cowboy_sup.erl
+++ b/src/cowboy_sup.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl
index 60ab2ed..6d0dcd3 100644
--- a/src/cowboy_tls.erl
+++ b/src/cowboy_tls.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -39,7 +39,11 @@ connection_process(Parent, Ref, Transport, Opts) ->
{ok, <<"h2">>} ->
init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http2);
_ -> %% http/1.1 or no protocol negotiated.
- init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http)
+ Protocol = case maps:get(alpn_default_protocol, Opts, http) of
+ http -> cowboy_http;
+ http2 -> cowboy_http2
+ end,
+ init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol)
end.
init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) ->
diff --git a/src/cowboy_tracer_h.erl b/src/cowboy_tracer_h.erl
index b1196fe..91a431b 100644
--- a/src/cowboy_tracer_h.erl
+++ b/src/cowboy_tracer_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 5b98b43..cb30c3f 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -69,6 +69,9 @@
active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()),
@@ -76,6 +79,14 @@
}.
-export_type([opts/0]).
+%% We don't want to reset the idle timeout too often,
+%% so we don't reset it on data. Instead we reset the
+%% number of ticks we have observed. We divide the
+%% timeout value by a value and that value becomes
+%% the number of ticks at which point we can drop
+%% the connection. This value is the number of ticks.
+-define(IDLE_TIMEOUT_TICKS, 10).
+
-record(state, {
parent :: undefined | pid(),
ref :: ranch:ref(),
@@ -86,8 +97,14 @@
handler :: module(),
key = undefined :: undefined | binary(),
timeout_ref = undefined :: undefined | reference(),
+ timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
messages = undefined :: undefined | {atom(), atom(), atom()}
| {atom(), atom(), atom(), atom()},
+
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size = false :: pos_integer() | false,
+ dynamic_buffer_moving_average = 0 :: non_neg_integer(),
+
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -103,7 +120,8 @@
%% is trying to upgrade to the Websocket protocol.
-spec is_upgrade_request(cowboy_req:req()) -> boolean().
-is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) ->
+is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol})
+ when Version =:= 'HTTP/2'; Version =:= 'HTTP/3' ->
<<"websocket">> =:= cowboy_bstr:to_lower(Protocol);
is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) ->
ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []),
@@ -148,13 +166,13 @@ upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
<<"connection">> => <<"upgrade">>,
<<"upgrade">> => <<"websocket">>
}, Req0), Env};
- %% Use a generic 400 error for HTTP/2.
+ %% Use 501 Not Implemented for HTTP/2 and HTTP/3 as recommended
+ %% by RFC9220 3 (WebSockets Upgrade over HTTP/3).
{error, upgrade_required} ->
- {ok, cowboy_req:reply(400, Req0), Env}
+ {ok, cowboy_req:reply(501, Req0), Env}
catch _:_ ->
%% @todo Probably log something here?
%% @todo Test that we can have 2 /ws 400 status code in a row on the same connection.
- %% @todo Does this even work?
{ok, cowboy_req:reply(400, Req0), Env}
end.
@@ -260,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
+ takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -285,17 +303,24 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
module() | undefined, any(), binary(),
{#state{}, any()}) -> no_return().
-takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
- {State0=#state{handler=Handler}, HandlerState}) ->
- %% @todo We should have an option to disable this behavior.
- ranch:remove_connection(Ref),
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
+ {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
+ case Req of
+ #{version := 'HTTP/3'} -> ok;
+ %% @todo We should have an option to disable this behavior.
+ _ -> ranch:remove_connection(Ref)
+ end,
Messages = case Transport of
undefined -> undefined;
_ -> Transport:messages()
end,
- State = loop_timeout(State0#state{parent=Parent,
+ State = set_idle_timeout(State0#state{parent=Parent,
ref=Ref, socket=Socket, transport=Transport,
- key=undefined, messages=Messages}),
+ opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)},
+ key=undefined, messages=Messages,
+ %% Dynamic buffer only applies to HTTP/1.1 Websocket.
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0),
%% We call parse_header/3 immediately because there might be
%% some data in the buffer that was sent along with the handshake.
%% While it is not allowed by the protocol to send frames immediately,
@@ -306,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
end.
+-include("cowboy_dynamic_buffer.hrl").
+
+%% @todo Implement early socket error detection.
+maybe_socket_error(_, _) ->
+ ok.
+
after_init(State=#state{active=true}, HandlerState, ParseState) ->
%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
%% We must do this only after calling websocket_init/1 (if any)
@@ -327,7 +358,7 @@ after_init(State, HandlerState, ParseState) ->
setopts_active(#state{transport=undefined}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
@@ -369,28 +400,41 @@ before_loop(State=#state{hibernate=true}, HandlerState, ParseState) ->
before_loop(State, HandlerState, ParseState) ->
loop(State, HandlerState, ParseState).
--spec loop_timeout(#state{}) -> #state{}.
-loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) ->
+-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}.
+
+%% @todo Do we really need this for HTTP/2?
+set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
+ %% Most of the time we don't need to cancel the timer since it
+ %% will have triggered already. But this call is harmless so
+ %% it is kept to simplify the code as we do need to cancel when
+ %% options are changed dynamically.
_ = case PrevRef of
undefined -> ignore;
- PrevRef -> erlang:cancel_timer(PrevRef)
+ PrevRef -> erlang:cancel_timer(PrevRef, [{async, true}, {info, false}])
end,
case maps:get(idle_timeout, Opts, 60000) of
infinity ->
- State#state{timeout_ref=undefined};
+ State#state{timeout_ref=undefined, timeout_num=TimeoutNum};
Timeout ->
- TRef = erlang:start_timer(Timeout, self(), ?MODULE),
- State#state{timeout_ref=TRef}
+ TRef = erlang:start_timer(Timeout div ?IDLE_TIMEOUT_TICKS, self(), ?MODULE),
+ State#state{timeout_ref=TRef, timeout_num=TimeoutNum}
end.
+-define(reset_idle_timeout(State), State#state{timeout_num=0}).
+
+tick_idle_timeout(State=#state{timeout_num=?IDLE_TIMEOUT_TICKS}, HandlerState, _) ->
+ websocket_close(State, HandlerState, timeout);
+tick_idle_timeout(State=#state{timeout_num=TimeoutNum}, HandlerState, ParseState) ->
+ before_loop(set_idle_timeout(State, TimeoutNum + 1), HandlerState, ParseState).
+
-spec loop(#state{}, any(), parse_state()) -> no_return().
loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
timeout_ref=TRef}, HandlerState, ParseState) ->
receive
%% Socket messages. (HTTP/1.1)
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- State2 = loop_timeout(State),
- parse(State2, HandlerState, ParseState, Data);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -403,18 +447,16 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
%% Body reading messages. (HTTP/2)
{request_body, _Ref, nofin, Data} ->
maybe_read_body(State),
- State2 = loop_timeout(State),
- parse(State2, HandlerState, ParseState, Data);
+ parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% @todo We need to handle this case as if it was an {error, closed}
%% but not before we finish processing frames. We probably should have
%% a check in before_loop to let us stop looping if a flag is set.
{request_body, _Ref, fin, _, Data} ->
maybe_read_body(State),
- State2 = loop_timeout(State),
- parse(State2, HandlerState, ParseState, Data);
+ parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
- websocket_close(State, HandlerState, timeout);
+ tick_idle_timeout(State, HandlerState, ParseState);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
before_loop(State, HandlerState, ParseState);
%% System messages.
@@ -458,12 +500,16 @@ parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions
websocket_close(State, HandlerState, {error, badframe})
end.
-parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions},
+parse_payload(State=#state{opts=Opts, frag_state=FragState, utf8_state=Incomplete, extensions=Extensions},
HandlerState, ParseState=#ps_payload{
type=Type, len=Len, mask_key=MaskKey, rsv=Rsv,
unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) ->
+ MaxFrameSize = case maps:get(max_frame_size, Opts, infinity) of
+ infinity -> infinity;
+ MaxFrameSize0 -> MaxFrameSize0 - UnmaskedLen
+ end,
case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen,
- Type, Len, FragState, Extensions, Rsv) of
+ Type, Len, FragState, Extensions#{max_inflate_size => MaxFrameSize}, Rsv) of
{ok, CloseCode, Payload, Utf8State, Rest} ->
dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState,
ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>,
@@ -593,13 +639,16 @@ commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_b
commands(Tail, State#state{active=Active}, Data);
commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands(Tail, State#state{deflate=Deflate}, Data);
-commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) ->
- State = case SetOpts of
- #{idle_timeout := IdleTimeout} ->
- loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}});
- _ ->
- State0
- end,
+commands([{set_options, SetOpts}|Tail], State0, Data) ->
+ State = maps:fold(fun
+ (idle_timeout, IdleTimeout, StateF=#state{opts=Opts}) ->
+ %% We reset the number of ticks when changing the idle_timeout option.
+ set_idle_timeout(StateF#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0);
+ (max_frame_size, MaxFrameSize, StateF=#state{opts=Opts}) ->
+ StateF#state{opts=Opts#{max_frame_size => MaxFrameSize}};
+ (_, _, StateF) ->
+ StateF
+ end, State0, SetOpts),
commands(Tail, State, Data);
commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) ->
commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data);
diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl
new file mode 100644
index 0000000..8c8ca39
--- /dev/null
+++ b/src/cowboy_webtransport.erl
@@ -0,0 +1,292 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% @todo To enable WebTransport the following options need to be set:
+%%
+%% QUIC:
+%% - max_datagram_frame_size > 0
+%%
+%% HTTP/3:
+%% - SETTINGS_H3_DATAGRAM = 1
+%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1
+%% - SETTINGS_WT_MAX_SESSIONS >= 1
+
+%% Cowboy supports versions 07 through 13 of the WebTransport drafts.
+%% Cowboy also has some compatibility with version 02.
+%%
+%% WebTransport CONNECT requests go through cowboy_stream as normal
+%% and then an upgrade/switch_protocol is issued (just like Websocket).
+%% After that point none of the events go through cowboy_stream except
+%% the final terminate event. The request process becomes the process
+%% handling all events in the WebTransport session.
+%%
+%% WebTransport sessions can be ended via a command, via a crash or
+%% exit, via the closing of the connection (client or server inititated),
+%% via the client ending the session (mirroring the command) or via
+%% the client terminating the CONNECT stream.
+-module(cowboy_webtransport).
+
+-export([upgrade/4]).
+-export([upgrade/5]).
+
+%% cowboy_stream.
+-export([info/3]).
+-export([terminate/3]).
+
+-type stream_type() :: unidi | bidi.
+-type open_stream_ref() :: any().
+
+-type event() ::
+ {stream_open, cow_http3:stream_id(), stream_type()} |
+ {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} |
+ {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} |
+ {datagram, binary()} |
+ close_initiated.
+
+-type commands() :: [
+ {open_stream, open_stream_ref(), stream_type(), iodata()} |
+ {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} |
+ {send, cow_http3:stream_id() | datagram, iodata()} |
+ initiate_close |
+ close |
+ {close, cow_http3:wt_app_error_code()} |
+ {close, cow_http3:wt_app_error_code(), iodata()}
+].
+-export_type([commands/0]).
+
+-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}.
+
+-callback init(Req, any())
+ -> {ok | module(), Req, any()}
+ | {module(), Req, any(), any()}
+ when Req::cowboy_req:req().
+
+-callback webtransport_init(State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_init/1]).
+
+-callback webtransport_handle(event(), State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_handle/2]).
+
+-callback webtransport_info(any(), State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_info/2]).
+
+-callback terminate(any(), cowboy_req:req(), any()) -> ok.
+-optional_callbacks([terminate/3]).
+
+-type opts() :: #{
+ req_filter => fun((cowboy_req:req()) -> map())
+}.
+-export_type([opts/0]).
+
+-record(state, {
+ id :: cow_http3:stream_id(),
+ parent :: pid(),
+ opts = #{} :: opts(),
+ handler :: module(),
+ hibernate = false :: boolean(),
+ req = #{} :: map()
+}).
+
+%% This function mirrors a similar function for Websocket.
+
+-spec is_upgrade_request(cowboy_req:req()) -> boolean().
+
+is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol})
+ when Version =:= 'HTTP/3' ->
+ %% @todo scheme MUST BE "https"
+ <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol);
+
+is_upgrade_request(_) ->
+ false.
+
+%% Stream process.
+
+-spec upgrade(Req, Env, module(), any())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+upgrade(Req, Env, Handler, HandlerState) ->
+ upgrade(Req, Env, Handler, HandlerState, #{}).
+
+-spec upgrade(Req, Env, module(), any(), opts())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+%% @todo Immediately crash if a response has already been sent.
+upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) ->
+ FilteredReq = case maps:get(req_filter, Opts, undefined) of
+ undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req);
+ FilterFun -> FilterFun(Req)
+ end,
+ State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq},
+ %% @todo Must ensure the relevant settings are enabled (QUIC and H3).
+ %% Either we check them BEFORE, or we check them when the handler
+ %% is OK to initiate a webtransport session. Probably need to
+ %% check them BEFORE as we need to become (takeover) the webtransport process
+ %% after we are done with the upgrade. Maybe in cow_http3_machine but
+ %% it doesn't have QUIC settings currently (max_datagram_size).
+ case is_upgrade_request(Req) of
+ true ->
+ Headers = cowboy_req:response_headers(#{}, Req),
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE,
+ #{session_pid => self()}}},
+ webtransport_init(State, HandlerState);
+ %% Use 501 Not Implemented to mirror the recommendation in
+ %% by RFC9220 3 (WebSockets Upgrade over HTTP/3).
+ false ->
+ %% @todo I don't think terminate will be called.
+ {ok, cowboy_req:reply(501, Req), Env}
+ end.
+
+webtransport_init(State=#state{handler=Handler}, HandlerState) ->
+ case erlang:function_exported(Handler, webtransport_init, 1) of
+ true -> handler_call(State, HandlerState, webtransport_init, undefined);
+ false -> before_loop(State, HandlerState)
+ end.
+
+before_loop(State=#state{hibernate=true}, HandlerState) ->
+ proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]);
+before_loop(State, HandlerState) ->
+ loop(State, HandlerState).
+
+-spec loop(#state{}, any()) -> no_return().
+
+loop(State=#state{id=SessionID, parent=Parent}, HandlerState) ->
+ receive
+ {'$webtransport_event', SessionID, Event={closed, _, _}} ->
+ terminate_proc(State, HandlerState, Event);
+ {'$webtransport_event', SessionID, Event=closed_abruptly} ->
+ terminate_proc(State, HandlerState, Event);
+ {'$webtransport_event', SessionID, Event} ->
+ handler_call(State, HandlerState, webtransport_handle, Event);
+ %% Timeouts.
+%% @todo idle_timeout
+% {timeout, TRef, ?MODULE} ->
+% tick_idle_timeout(State, HandlerState, ParseState);
+% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
+% before_loop(State, HandlerState, ParseState);
+ %% System messages.
+ {'EXIT', Parent, Reason} ->
+ %% @todo We should exit gracefully.
+ exit(Reason);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {State, HandlerState});
+ %% Calls from supervisor module.
+ {'$gen_call', From, Call} ->
+ cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
+ before_loop(State, HandlerState);
+ Message ->
+ handler_call(State, HandlerState, webtransport_info, Message)
+ end.
+
+handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) ->
+ try case Callback of
+ webtransport_init -> Handler:webtransport_init(HandlerState);
+ _ -> Handler:Callback(Message, HandlerState)
+ end of
+ {Commands, HandlerState2} when is_list(Commands) ->
+ handler_call_result(State, HandlerState2, Commands);
+ {Commands, HandlerState2, hibernate} when is_list(Commands) ->
+ handler_call_result(State#state{hibernate=true}, HandlerState2, Commands)
+ catch Class:Reason:Stacktrace ->
+ %% @todo Do we need to send a close? Let cowboy_http3 detect and handle it?
+ handler_terminate(State, HandlerState, {crash, Class, Reason}),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+
+handler_call_result(State0, HandlerState, Commands) ->
+ case commands(Commands, State0, ok, []) of
+ {ok, State} ->
+ before_loop(State, HandlerState);
+ {stop, State} ->
+ terminate_proc(State, HandlerState, stop)
+ end.
+
+%% We accumulate the commands that must be sent to the connection process
+%% because we want to send everything into one message. Other commands are
+%% processed immediately.
+
+commands([], State, Res, []) ->
+ {Res, State};
+commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) ->
+ Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)},
+ {Res, State};
+%% {open_stream, OpenStreamRef, StreamType, InitialData}.
+commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% {close_stream, StreamID, Code}.
+commands([Command={close_stream, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% @todo We must reject send to a remote unidi stream.
+%% {send, StreamID | datagram, Data}.
+commands([Command={send, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% {send, StreamID, IsFin, Data}.
+commands([Command={send, _, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% initiate_close - DRAIN_WT_SESSION
+commands([Command=initiate_close|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION
+%% @todo At this point the handler must not issue stream or send commands.
+commands([Command=close|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]);
+commands([Command={close, _}|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]);
+commands([Command={close, _, _}|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]).
+%% @todo A set_options command could be useful to increase the number of allowed streams
+%% or other forms of flow control. Alternatively a flow command. Or both.
+%% @todo A shutdown_reason command could be useful for the same reasons as Websocekt.
+
+-spec terminate_proc(_, _, _) -> no_return().
+
+terminate_proc(State, HandlerState, Reason) ->
+ handler_terminate(State, HandlerState, Reason),
+ %% @todo This is what should be done if shutdown_reason gets implemented.
+% case Shutdown of
+% normal -> exit(normal);
+% _ -> exit({shutdown, Shutdown})
+% end.
+ exit(normal).
+
+handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
+ cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
+
+%% cowboy_stream callbacks.
+%%
+%% We shortcut stream handlers but still need to process some events
+%% such as process exiting or termination. We implement the relevant
+%% callbacks here. Note that as far as WebTransport is concerned,
+%% receiving stream data here would be an error therefore the data
+%% callback is not implemented.
+%%
+%% @todo Better type than map() for the cowboy_stream state.
+
+-spec info(cowboy_stream:streamid(), any(), State)
+ -> {cowboy_stream:commands(), State} when State::map().
+
+info(StreamID, Msg, WTState=#{stream_state := StreamState0}) ->
+ {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0),
+ {Commands, WTState#{stream_state => StreamState}}.
+
+-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), map())
+ -> any().
+
+terminate(StreamID, Reason, #{stream_state := StreamState}) ->
+ cowboy_stream:terminate(StreamID, Reason, StreamState).