aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/cowboy.erl55
-rw-r--r--src/cowboy_app.erl2
-rw-r--r--src/cowboy_bstr.erl2
-rw-r--r--src/cowboy_children.erl2
-rw-r--r--src/cowboy_clear.erl6
-rw-r--r--src/cowboy_clock.erl4
-rw-r--r--src/cowboy_compress_h.erl2
-rw-r--r--src/cowboy_constraints.erl2
-rw-r--r--src/cowboy_decompress_h.erl29
-rw-r--r--src/cowboy_dynamic_buffer.hrl80
-rw-r--r--src/cowboy_handler.erl2
-rw-r--r--src/cowboy_http.erl181
-rw-r--r--src/cowboy_http2.erl85
-rw-r--r--src/cowboy_http3.erl306
-rw-r--r--src/cowboy_loop.erl2
-rw-r--r--src/cowboy_metrics_h.erl2
-rw-r--r--src/cowboy_middleware.erl2
-rw-r--r--src/cowboy_quicer.erl64
-rw-r--r--src/cowboy_req.erl24
-rw-r--r--src/cowboy_rest.erl69
-rw-r--r--src/cowboy_router.erl2
-rw-r--r--src/cowboy_static.erl14
-rw-r--r--src/cowboy_stream.erl3
-rw-r--r--src/cowboy_stream_h.erl9
-rw-r--r--src/cowboy_sub_protocol.erl4
-rw-r--r--src/cowboy_sup.erl2
-rw-r--r--src/cowboy_tls.erl8
-rw-r--r--src/cowboy_tracer_h.erl2
-rw-r--r--src/cowboy_websocket.erl60
-rw-r--r--src/cowboy_webtransport.erl292
30 files changed, 1072 insertions, 245 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl
index e5ed831..6a5634e 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -51,8 +51,12 @@
start_clear(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), ranch:opts(), opts())
@@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) ->
start_tls(Ref, TransOpts0, ProtoOpts0) ->
TransOpts1 = ranch:normalize_opts(TransOpts0),
- SocketOpts = maps:get(socket_opts, TransOpts1, []),
- TransOpts2 = TransOpts1#{socket_opts => [
- {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
- |SocketOpts]},
- {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2),
- ProtoOpts = ProtoOpts0#{connection_type => ConnectionType},
+ {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0),
+ TransOpts3 = ensure_alpn(TransOpts2),
+ {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3),
+ ProtoOpts = ProtoOpts0#{
+ connection_type => ConnectionType,
+ dynamic_buffer => DynamicBuffer
+ },
ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
%% @todo Experimental function to start a barebone QUIC listener.
@@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) ->
-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts())
-> {ok, pid()}.
+%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies.
start_quic(Ref, TransOpts, ProtoOpts) ->
{ok, _} = application:ensure_all_started(quicer),
Parent = self(),
@@ -89,8 +95,14 @@ start_quic(Ref, TransOpts, ProtoOpts) ->
end,
SocketOpts = [
{alpn, ["h3"]}, %% @todo Why not binary?
- {peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec.
- {peer_bidi_stream_count, 100}
+ %% We only need 3 for control and QPACK enc/dec,
+ %% but we need more for WebTransport. %% @todo Use 3 if WT is disabled.
+ {peer_unidi_stream_count, 100},
+ {peer_bidi_stream_count, 100},
+ %% For WebTransport.
+ %% @todo We probably don't want it enabled if WT isn't used.
+ {datagram_send_enabled, 1},
+ {datagram_receive_enabled, 1}
|SocketOpts2],
_ListenerPid = spawn(fun() ->
{ok, Listener} = quicer:listen(Port, SocketOpts),
@@ -139,11 +151,32 @@ port_0() ->
end,
Port.
+ensure_alpn(TransOpts) ->
+ SocketOpts = maps:get(socket_opts, TransOpts, []),
+ TransOpts#{socket_opts => [
+ {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]}
+ |SocketOpts]}.
+
ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) ->
{TransOpts, ConnectionType};
ensure_connection_type(TransOpts) ->
{TransOpts#{connection_type => supervisor}, supervisor}.
+%% Dynamic buffer was set; accept transport options as-is.
+%% Note that initial 'buffer' size may be lower than dynamic buffer allows.
+ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) ->
+ {TransOpts, DynamicBuffer};
+%% Dynamic buffer was not set; define default dynamic buffer
+%% only if 'buffer' size was not configured. In that case we
+%% set the 'buffer' size to the lowest value.
+ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) ->
+ case proplists:get_value(buffer, SocketOpts, undefined) of
+ undefined ->
+ {TransOpts#{socket_opts => [{buffer, 1024}|SocketOpts]}, {1024, 131072}};
+ _ ->
+ {TransOpts, false}
+ end.
+
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
stop_listener(Ref) ->
diff --git a/src/cowboy_app.erl b/src/cowboy_app.erl
index 95ae564..e58e1f6 100644
--- a/src/cowboy_app.erl
+++ b/src/cowboy_app.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_bstr.erl b/src/cowboy_bstr.erl
index f23167d..d0e7301 100644
--- a/src/cowboy_bstr.erl
+++ b/src/cowboy_bstr.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_children.erl b/src/cowboy_children.erl
index 305c989..2e00c37 100644
--- a/src/cowboy_children.erl
+++ b/src/cowboy_children.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl
index eaeab74..845fdc1 100644
--- a/src/cowboy_clear.erl
+++ b/src/cowboy_clear.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -36,10 +36,6 @@ connection_process(Parent, Ref, Transport, Opts) ->
ProxyInfo = get_proxy_info(Ref, Opts),
{ok, Socket} = ranch:handshake(Ref),
%% Use cowboy_http2 directly only when 'http' is missing.
- %% Otherwise switch to cowboy_http2 from cowboy_http.
- %%
- %% @todo Extend this option to cowboy_tls and allow disabling
- %% the switch to cowboy_http2 in cowboy_http. Also document it.
Protocol = case maps:get(protocols, Opts, [http2, http]) of
[http2] -> cowboy_http2;
[_|_] -> cowboy_http
diff --git a/src/cowboy_clock.erl b/src/cowboy_clock.erl
index e2cdf62..b6e39f4 100644
--- a/src/cowboy_clock.erl
+++ b/src/cowboy_clock.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -93,7 +93,7 @@ handle_cast(_Msg, State) ->
-spec handle_info(any(), State) -> {noreply, State} when State::#state{}.
handle_info(update, #state{universaltime=Prev, rfc1123=B1, tref=TRef0}) ->
%% Cancel the timer in case an external process sent an update message.
- _ = erlang:cancel_timer(TRef0),
+ _ = erlang:cancel_timer(TRef0, [{async, true}, {info, false}]),
T = erlang:universaltime(),
B2 = update_rfc1123(B1, Prev, T),
ets:insert(?MODULE, {rfc1123, B2}),
diff --git a/src/cowboy_compress_h.erl b/src/cowboy_compress_h.erl
index 338ea9f..785eb0d 100644
--- a/src/cowboy_compress_h.erl
+++ b/src/cowboy_compress_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_constraints.erl b/src/cowboy_constraints.erl
index 33f0111..84ff249 100644
--- a/src/cowboy_constraints.erl
+++ b/src/cowboy_constraints.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2014-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_decompress_h.erl b/src/cowboy_decompress_h.erl
index 4e86e23..84283e5 100644
--- a/src/cowboy_decompress_h.erl
+++ b/src/cowboy_decompress_h.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2024, jdamanalo <[email protected]>
-%% Copyright (c) 2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) jdamanalo <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -66,9 +66,11 @@ data(StreamID, IsFin, Data, State=#state{next=Next0, enabled=false, read_body_bu
{Commands, Next} = cowboy_stream:data(StreamID, IsFin,
buffer_to_binary([Data|Buffer]), Next0),
fold(Commands, State#state{next=Next, read_body_is_fin=IsFin});
-data(StreamID, IsFin, Data, State0=#state{next=Next0, ratio_limit=RatioLimit,
+data(StreamID, IsFin, Data0, State0=#state{next=Next0, ratio_limit=RatioLimit,
inflate=Z, is_reading=true, read_body_buffer=Buffer}) ->
- case inflate(Z, RatioLimit, buffer_to_iovec([Data|Buffer])) of
+ Data = buffer_to_iovec([Data0|Buffer]),
+ Limit = iolist_size(Data) * RatioLimit,
+ case cow_deflate:inflate(Z, Data, Limit) of
{error, ErrorType} ->
zlib:close(Z),
Status = case ErrorType of
@@ -236,22 +238,3 @@ do_build_accept_encoding([{ContentCoding, Q}|Tail], Acc0) ->
do_build_accept_encoding(Tail, Acc);
do_build_accept_encoding([], Acc) ->
Acc.
-
-inflate(Z, RatioLimit, Data) ->
- try
- {Status, Output} = zlib:safeInflate(Z, Data),
- Size = iolist_size(Output),
- do_inflate(Z, Size, iolist_size(Data) * RatioLimit, Status, [Output])
- catch
- error:data_error ->
- {error, data_error}
- end.
-
-do_inflate(_, Size, Limit, _, _) when Size > Limit ->
- {error, size_error};
-do_inflate(Z, Size0, Limit, continue, Acc) ->
- {Status, Output} = zlib:safeInflate(Z, []),
- Size = Size0 + iolist_size(Output),
- do_inflate(Z, Size, Limit, Status, [Output | Acc]);
-do_inflate(_, _, _, finished, Acc) ->
- {ok, iolist_to_binary(lists:reverse(Acc))}.
diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl
new file mode 100644
index 0000000..4d05e50
--- /dev/null
+++ b/src/cowboy_dynamic_buffer.hrl
@@ -0,0 +1,80 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% These functions are common to cowboy_http, cowboy_http2 and
+%% cowboy_websocket. It requires the options and the state
+%% to use the same field names.
+
+%% Experiments have shown that the size of the 'buffer' can greatly
+%% impact performance: a buffer too small leads to more messages
+%% being handled and typically more binary appends; and a buffer
+%% too large results in inefficient use of memory which in turn
+%% reduces the throughput, presumably because large binary appends
+%% are not as efficient as smaller ones, and because while the
+%% buffer gets allocated only when there is data, the allocated
+%% size remains until the binary is GC and so under-use hurts.
+%%
+%% The performance of a given 'buffer' size will also depend on
+%% how the client is sending data, and on the protocol. For example,
+%% HTTP/1.1 doesn't need a very large 'buffer' size for reading
+%% request headers, but it does need one for reading large request
+%% bodies. At the same time, HTTP/2 performs best reading large
+%% request bodies when the 'buffer' size is about half that of
+%% HTTP/1.1.
+%%
+%% It therefore becomes important to resize the buffer dynamically
+%% depending on what is currently going on. We do this based on
+%% the size of data packets we received from the transport. We
+%% maintain a moving average and when that moving average is
+%% 90% of the current 'buffer' size, we double the 'buffer' size.
+%% When things slow down and the moving average falls below
+%% 40% of the current 'buffer' size, we halve the 'buffer' size.
+%%
+%% To calculate the moving average we do (MovAvg + DataLen) div 2.
+%% This means that the moving average will change very quickly when
+%% DataLen increases or decreases rapidly. That's OK, we want to
+%% be reactive, but also setting the buffer size is a pretty fast
+%% operation. The formula could be changed to the following if it
+%% became a problem: (MovAvg * N + DataLen) div (N + 1).
+%%
+%% Note that this works best when active,N uses low values of N.
+%% We don't want to accumulate too much data because we resize
+%% the buffer.
+
+init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) ->
+ DynamicBuffer;
+init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) ->
+ LowDynamicBuffer;
+init_dynamic_buffer_size(_) ->
+ false.
+
+maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) ->
+ State;
+maybe_resize_buffer(State=#state{transport=Transport, socket=Socket,
+ opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}},
+ dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) ->
+ DataLen = byte_size(Data),
+ MovingAvg = (MovingAvg0 + DataLen) div 2,
+ if
+ BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 ->
+ BufferSize = min(BufferSize0 * 2, HighDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 ->
+ BufferSize = max(BufferSize0 div 2, LowDynamicBuffer),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])),
+ State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize};
+ true ->
+ State#state{dynamic_buffer_moving_average=MovingAvg}
+ end.
diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl
index 5048168..1989512 100644
--- a/src/cowboy_handler.erl
+++ b/src/cowboy_handler.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 9c92ec5..10eb519 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -17,6 +17,7 @@
-module(cowboy_http).
-export([init/6]).
+-export([loop/1]).
-export([system_continue/3]).
-export([system_terminate/4]).
@@ -24,11 +25,16 @@
-type opts() :: #{
active_n => pos_integer(),
+ alpn_default_protocol => http | http2,
chunked => boolean(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
env => cowboy_middleware:env(),
+ hibernate => boolean(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
@@ -47,6 +53,7 @@
metrics_req_filter => fun((cowboy_req:req()) -> map()),
metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
middlewares => [module()],
+ protocols => [http | http2],
proxy_header => boolean(),
request_timeout => timeout(),
reset_idle_timeout_on_send => boolean(),
@@ -137,6 +144,10 @@
%% Flow requested for the current stream.
flow = infinity :: non_neg_integer() | infinity,
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -181,12 +192,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
last_streamid=maps:get(max_keepalive, Opts, 1000)},
safe_setopts_active(State),
- loop(set_timeout(State, request_timeout)).
+ before_loop(set_timeout(State, request_timeout)).
+
+-include("cowboy_dynamic_buffer.hrl").
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
@@ -212,6 +227,13 @@ flush_passive(Socket, Messages) ->
ok
end.
+before_loop(State=#state{opts=#{hibernate := true}}) ->
+ proc_lib:hibernate(?MODULE, loop, [State]);
+before_loop(State) ->
+ loop(State).
+
+-spec loop(#state{}) -> ok.
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
last_streamid=LastStreamID}) ->
@@ -220,11 +242,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
receive
%% Discard data coming in after the last request
%% we want to process was received fully.
- {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- loop(State);
+ {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
+ State1 = maybe_resize_buffer(State, Data),
+ before_loop(State1);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(<< Buffer/binary, Data/binary >>, State);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(<< Buffer/binary, Data/binary >>, State1);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -233,37 +257,37 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
safe_setopts_active(State),
- loop(State);
+ before_loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
- loop(State);
+ before_loop(State);
{timeout, TimerRef, Reason} ->
timeout(State, Reason);
{timeout, _, _} ->
- loop(State);
+ before_loop(State);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
- loop(initiate_closing(State, Reason));
+ before_loop(initiate_closing(State, Reason));
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
- loop(info(State, StreamID, Msg));
+ before_loop(info(State, StreamID, Msg));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
- loop(down(State, Pid, Msg));
+ before_loop(down(State, Pid, Msg));
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
- loop(State);
+ before_loop(State);
%% Unknown messages.
Msg ->
cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
- loop(State)
+ before_loop(State)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
@@ -295,6 +319,7 @@ set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
when element(1, InState) =/= ps_body ->
State;
%% Otherwise we can set the timeout.
+%% @todo Don't do this so often, use a strategy similar to Websocket/H2 if possible.
set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
Default = case Name of
@@ -327,7 +352,7 @@ cancel_timeout(State=#state{timer=TimerRef}) ->
_ ->
%% Do a synchronous cancel and remove the message if any
%% to avoid receiving stray messages.
- _ = erlang:cancel_timer(TimerRef),
+ _ = erlang:cancel_timer(TimerRef, [{async, false}, {info, false}]),
receive
{timeout, TimerRef, _} -> ok
after 0 ->
@@ -348,12 +373,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).
parse(<<>>, State) ->
- loop(State#state{buffer= <<>>});
+ before_loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
- loop(State#state{buffer= <<>>});
+ before_loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -422,13 +447,13 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, IsFin, _, State}) ->
- loop(set_timeout(State, case IsFin of
+after_parse({data, _, IsFin, _, State=#state{buffer=Buffer}}) ->
+ parse(Buffer, set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
- loop(set_timeout(State, idle_timeout)).
+ before_loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
%% This function is only called after parsing, therefore we
@@ -488,8 +513,13 @@ parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLine
'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
%% Accept direct HTTP/2 only at the beginning of the connection.
<< "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
- %% @todo Might be worth throwing to get a clean stacktrace.
- http2_upgrade(State, Buffer);
+ case lists:member(http2, maps:get(protocols, Opts, [http2, http])) of
+ true ->
+ http2_upgrade(State, Buffer);
+ false ->
+ error_terminate(501, State, {connection_error, no_error,
+ 'Prior knowledge upgrade to HTTP/2 is disabled by configuration.'})
+ end;
_ ->
parse_method(Buffer, State, <<>>,
maps:get(max_method_length, Opts, 32))
@@ -777,7 +807,7 @@ default_port(_) -> 80.
%% End of request parsing.
request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
- proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
+ opts=Opts, proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
Headers, Host, Port) ->
Scheme = case Transport:secure() of
@@ -841,7 +871,7 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock
undefined -> Req0;
_ -> Req0#{proxy_header => ProxyHeader}
end,
- case is_http2_upgrade(Headers, Version) of
+ case is_http2_upgrade(Headers, Version, Opts) of
false ->
State = case HasBody of
true ->
@@ -863,12 +893,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock
%% HTTP/2 upgrade.
-%% @todo We must not upgrade to h2c over a TLS connection.
is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
- <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
+ <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1', Opts) ->
Conns = cow_http_hd:parse_connection(Conn),
- case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
- {true, true} ->
+ case lists:member(<<"upgrade">>, Conns)
+ andalso lists:member(<<"http2-settings">>, Conns)
+ andalso lists:member(http2, maps:get(protocols, Opts, [http2, http])) of
+ true ->
Protocols = cow_http_hd:parse_upgrade(Upgrade),
case lists:member(<<"h2c">>, Protocols) of
true ->
@@ -879,17 +910,17 @@ is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
_ ->
false
end;
-is_http2_upgrade(_, _) ->
+is_http2_upgrade(_, _, _) ->
false.
%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
case Transport:secure() of
false ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer);
true ->
error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
@@ -897,22 +928,37 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% Upgrade via an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert},
Buffer, HTTP2Settings, Req) ->
- %% @todo
- %% However if the client sent a body, we need to read the body in full
- %% and if we can't do that, return a 413 response. Some options are in order.
- %% Always half-closed stream coming from this side.
- try cow_http_hd:parse_http2_settings(HTTP2Settings) of
- Settings ->
- _ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
- catch _:_ ->
- error_terminate(400, State, {connection_error, protocol_error,
- 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
+ case Transport:secure() of
+ false ->
+ %% @todo
+ %% However if the client sent a body, we need to read the body in full
+ %% and if we can't do that, return a 413 response. Some options are in order.
+ %% Always half-closed stream coming from this side.
+ try cow_http_hd:parse_http2_settings(HTTP2Settings) of
+ Settings ->
+ _ = cancel_timeout(State),
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req)
+ catch _:_ ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
+ end;
+ true ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
end.
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) ->
+ Opts;
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size,
+ dynamic_buffer_moving_average=MovingAvg}) ->
+ Opts#{
+ dynamic_buffer_initial_average => MovingAvg,
+ dynamic_buffer_initial_size => Size
+ }.
+
%% Request body parsing.
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
@@ -1209,7 +1255,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_
commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
- out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
+ out_state=OutState, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
State1 = cancel_timeout(State0),
@@ -1233,23 +1279,19 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Turn off the trap_exit process flag
%% since this process will no longer be a supervisor.
process_flag(trap_exit, false),
- Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
+ Protocol:takeover(Parent, Ref, Socket, Transport,
+ opts_for_upgrade(State), Buffer, InitialState);
%% Set options dynamically.
-commands(State0=#state{overriden_opts=Opts},
- StreamID, [{set_options, SetOpts}|Tail]) ->
- State1 = case SetOpts of
- #{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+commands(State0, StreamID, [{set_options, SetOpts}|Tail]) ->
+ State = maps:fold(fun
+ (chunked, Chunked, StateF=#state{overriden_opts=Opts}) ->
+ StateF#state{overriden_opts=Opts#{chunked => Chunked}};
+ (idle_timeout, IdleTimeout, StateF=#state{overriden_opts=Opts}) ->
+ set_timeout(StateF#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
idle_timeout);
- _ ->
- State0
- end,
- State = case SetOpts of
- #{chunked := Chunked} ->
- State1#state{overriden_opts=Opts#{chunked => Chunked}};
- _ ->
- State1
- end,
+ (_, _, StateF) ->
+ StateF
+ end, State0, SetOpts),
commands(State, StreamID, Tail);
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
@@ -1368,23 +1410,24 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
end.
stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) ->
+ %% Enable active mode again if it was disabled.
+ State1 = case Active of
+ true -> State0;
+ false -> active(State0)
+ end,
NextOutStreamID = OutStreamID + 1,
case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
false ->
- State = State0#state{out_streamid=NextOutStreamID, out_state=wait},
+ State = State1#state{out_streamid=NextOutStreamID, out_state=wait},
%% There are no streams remaining. We therefore can
%% and want to switch back to the request_timeout.
set_timeout(State, request_timeout);
#stream{queue=Commands} ->
- State = case Active of
- true -> State0;
- false -> active(State0)
- end,
%% @todo Remove queue from the stream.
%% We set the flow to the initial flow size even though
%% we might have sent some data through already due to pipelining.
Flow = maps:get(initial_stream_flow_size, Opts, 65535),
- commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
+ commands(State1#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
NextOutStreamID, Commands)
end.
@@ -1597,12 +1640,12 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
-spec system_continue(_, _, #state{}) -> ok.
system_continue(_, _, State) ->
- loop(State).
+ before_loop(State).
-spec system_terminate(any(), _, _, #state{}) -> no_return().
system_terminate(Reason0, _, _, State) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
- loop(initiate_closing(State, Reason)).
+ before_loop(initiate_closing(State, Reason)).
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 0e110cd..0d22fa1 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -17,6 +17,7 @@
-export([init/6]).
-export([init/10]).
-export([init/12]).
+-export([loop/2]).
-export([system_continue/3]).
-export([system_terminate/4]).
@@ -24,15 +25,20 @@
-type opts() :: #{
active_n => pos_integer(),
+ alpn_default_protocol => http | http2,
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
connection_window_margin_size => 0..16#7fffffff,
connection_window_update_threshold => 0..16#7fffffff,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
goaway_initial_timeout => timeout(),
goaway_complete_timeout => timeout(),
+ hibernate => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
initial_connection_window_size => 65535..16#7fffffff,
@@ -57,6 +63,7 @@
metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
middlewares => [module()],
preface_timeout => timeout(),
+ protocols => [http | http2],
proxy_header => boolean(),
reset_idle_timeout_on_send => boolean(),
sendfile => boolean(),
@@ -133,6 +140,10 @@
%% Flow requested for all streams.
flow = 0 :: non_neg_integer(),
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
streams = #{} :: #{cow_http2:streamid() => #stream{}},
@@ -143,7 +154,8 @@
}).
-spec init(pid(), ranch:ref(), inet:socket(), module(),
- ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok.
+ ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return().
+
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
{ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket),
'A socket error occurred when retrieving the peer name.'),
@@ -167,18 +179,22 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
-spec init(pid(), ranch:ref(), inet:socket(), module(),
ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
- binary() | undefined, binary()) -> ok.
+ binary() | undefined, binary()) -> no_return().
+
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts),
%% Send the preface before doing all the init in case we get a socket error.
ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)),
State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=sequence, http2_machine=HTTP2Machine}), 0),
safe_setopts_active(State),
case Buffer of
- <<>> -> loop(State, Buffer);
+ <<>> -> before_loop(State, Buffer);
_ -> parse(State, Buffer)
end.
@@ -213,15 +229,19 @@ add_period(Time, Period) -> Time + Period.
-spec init(pid(), ranch:ref(), inet:socket(), module(),
ranch_proxy_header:proxy_info() | undefined, cowboy:opts(),
{inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()},
- binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok.
+ binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return().
+
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer,
_Settings, Req=#{method := Method}) ->
+ DynamicBuffer = init_dynamic_buffer_size(Opts),
{ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts),
{ok, StreamID, HTTP2Machine}
= cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0),
State0 = #state{parent=Parent, ref=Ref, socket=Socket,
transport=Transport, proxy_header=ProxyHeader,
opts=Opts, peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=DynamicBuffer,
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
http2_status=upgrade, http2_machine=HTTP2Machine},
State1 = headers_frame(State0#state{
http2_machine=HTTP2Machine}, StreamID, Req),
@@ -237,20 +257,30 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer
ok = maybe_socket_error(State, Transport:send(Socket, Preface)),
safe_setopts_active(State),
case Buffer of
- <<>> -> loop(State, Buffer);
+ <<>> -> before_loop(State, Buffer);
_ -> parse(State, Buffer)
end.
+-include("cowboy_dynamic_buffer.hrl").
+
%% Because HTTP/2 has flow control and Cowboy has other rate limiting
%% mechanisms implemented, a very large active_n value should be fine,
%% as long as the stream handlers do their work in a timely manner.
+%% However large active_n values reduce the impact of dynamic_buffer.
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
safe_setopts_active(State) ->
ok = maybe_socket_error(State, setopts_active(State)).
+before_loop(State=#state{opts=#{hibernate := true}}, Buffer) ->
+ proc_lib:hibernate(?MODULE, loop, [State, Buffer]);
+before_loop(State, Buffer) ->
+ loop(State, Buffer).
+
+-spec loop(#state{}, binary()) -> no_return().
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
opts=Opts, timer=TimerRef, children=Children}, Buffer) ->
Messages = Transport:messages(),
@@ -258,7 +288,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
receive
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>);
{Closed, Socket} when Closed =:= element(2, Messages) ->
Reason = case State#state.http2_status of
closing -> {stop, closed, 'The client is going away.'};
@@ -271,11 +302,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
safe_setopts_active(State),
- loop(State, Buffer);
+ before_loop(State, Buffer);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
- loop(initiate_closing(State, Reason), Buffer);
+ before_loop(initiate_closing(State, Reason), Buffer);
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
@@ -285,27 +316,27 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
tick_idle_timeout(State, Buffer);
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
- loop(State, Buffer);
+ before_loop(State, Buffer);
{timeout, TRef, {cow_http2_machine, Name}} ->
- loop(timeout(State, Name, TRef), Buffer);
+ before_loop(timeout(State, Name, TRef), Buffer);
{timeout, TimerRef, {goaway_initial_timeout, Reason}} ->
- loop(closing(State, Reason), Buffer);
+ before_loop(closing(State, Reason), Buffer);
{timeout, TimerRef, {goaway_complete_timeout, Reason}} ->
terminate(State, {stop, stop_reason(Reason),
'Graceful shutdown timed out.'});
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
- loop(info(State, StreamID, Msg), Buffer);
+ before_loop(info(State, StreamID, Msg), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
- loop(down(State, Pid, Msg), Buffer);
+ before_loop(down(State, Pid, Msg), Buffer);
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
- loop(State, Buffer);
+ before_loop(State, Buffer);
Msg ->
cowboy:log(warning, "Received stray message ~p.", [Msg], Opts),
- loop(State, Buffer)
+ before_loop(State, Buffer)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
@@ -314,7 +345,7 @@ tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) ->
terminate(State, {stop, timeout,
'Connection idle longer than configuration allows.'});
tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) ->
- loop(set_idle_timeout(State, TimeoutNum + 1), Buffer).
+ before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer).
set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _)
when Status =:= closing_initiated orelse Status =:= closing,
@@ -355,7 +386,7 @@ parse(State=#state{http2_status=sequence}, Data) ->
{ok, Rest} ->
parse(State#state{http2_status=settings}, Rest);
more ->
- loop(State, Data);
+ before_loop(State, Data);
Error = {connection_error, _, _} ->
terminate(State, Error)
end;
@@ -374,7 +405,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre
more when Status =:= closing, Streams =:= #{} ->
terminate(State, {stop, normal, 'The connection is going away.'});
more ->
- loop(State, Data)
+ before_loop(State, Data)
end.
%% Frame rate flood protection.
@@ -1106,7 +1137,9 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) ->
%% in-flight stream creation (at least one round-trip time), the server can send
%% another GOAWAY frame with an updated last stream identifier. This ensures
%% that a connection can be cleanly shut down without losing requests.
+
-spec initiate_closing(#state{}, _) -> #state{}.
+
initiate_closing(State=#state{http2_status=connected, socket=Socket,
transport=Transport, opts=Opts}, Reason) ->
ok = maybe_socket_error(State, Transport:send(Socket,
@@ -1123,7 +1156,9 @@ initiate_closing(State, Reason) ->
terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}).
%% Switch to 'closing' state and stop accepting new streams.
+
-spec closing(#state{}, Reason :: term()) -> #state{}.
+
closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} ->
terminate(State, Reason);
closing(State0=#state{http2_status=closing_initiated,
@@ -1160,6 +1195,7 @@ maybe_socket_error(State, {error, Reason}, Human) ->
terminate(State, {socket_error, Reason, Human}).
-spec terminate(#state{} | undefined, _) -> no_return().
+
terminate(undefined, Reason) ->
exit({shutdown, Reason});
terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status,
@@ -1360,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) ->
%% System callbacks.
--spec system_continue(_, _, {#state{}, binary()}) -> ok.
+-spec system_continue(_, _, {#state{}, binary()}) -> no_return().
+
system_continue(_, _, {State, Buffer}) ->
- loop(State, Buffer).
+ before_loop(State, Buffer).
-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
+
system_terminate(Reason0, _, _, {State, Buffer}) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
- loop(initiate_closing(State, Reason), Buffer).
+ before_loop(initiate_closing(State, Reason), Buffer).
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
+
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl
index ef3e3f6..9aa6be5 100644
--- a/src/cowboy_http3.erl
+++ b/src/cowboy_http3.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2023-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -32,10 +32,10 @@
enable_connect_protocol => boolean(),
env => cowboy_middleware:env(),
logger => module(),
- max_decode_blocked_streams => 0..16#3fffffffffffffff,
- max_decode_table_size => 0..16#3fffffffffffffff,
- max_encode_blocked_streams => 0..16#3fffffffffffffff,
- max_encode_table_size => 0..16#3fffffffffffffff,
+ max_decode_blocked_streams => 0..16#3fffffffffffffff,
+ max_decode_table_size => 0..16#3fffffffffffffff,
+ max_encode_blocked_streams => 0..16#3fffffffffffffff,
+ max_encode_table_size => 0..16#3fffffffffffffff,
max_ignored_frame_size_received => non_neg_integer() | infinity,
metrics_callback => cowboy_metrics_h:metrics_callback(),
metrics_req_filter => fun((cowboy_req:req()) -> map()),
@@ -51,18 +51,30 @@
}.
-export_type([opts/0]).
+%% HTTP/3 or WebTransport stream.
+%%
+%% WebTransport sessions involve one bidirectional CONNECT stream
+%% that must stay open (and can be used for signaling using the
+%% Capsule Protocol) and an application-defined number of
+%% unidirectional and bidirectional streams, as well as datagrams.
+%%
+%% WebTransport sessions run in the CONNECT request process and
+%% all events related to the session is sent there as a message.
+%% The pid of the process is kept in the state.
-record(stream, {
id :: cow_http3:stream_id(),
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
- | normal | {data | ignore, non_neg_integer()} | stopping,
+ | normal | {data | ignore, non_neg_integer()} | stopping
+ | {webtransport_session, normal | {ignore, non_neg_integer()}}
+ | {webtransport_stream, cow_http3:stream_id()},
%% Stream buffer.
buffer = <<>> :: binary(),
%% Stream state.
- state = undefined :: undefined | {module, any()}
+ state = undefined :: undefined | {module(), any()}
}).
-record(state, {
@@ -152,6 +164,9 @@ loop(State0=#state{opts=Opts, children=Children}) ->
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State0, StreamID, Msg));
+ %% WebTransport commands.
+ {'$webtransport_commands', SessionID, Commands} ->
+ loop(webtransport_commands(State0, SessionID, Commands));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
loop(down(State0, Pid, Msg));
@@ -164,12 +179,17 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) ->
case cowboy_quicer:handle(Msg) of
{data, StreamID, IsFin, Data} ->
parse(State0, StreamID, Data, IsFin);
+ {datagram, Data} ->
+ parse_datagram(State0, Data);
{stream_started, StreamID, StreamType} ->
State = stream_new_remote(State0, StreamID, StreamType),
loop(State);
{stream_closed, StreamID, ErrorCode} ->
State = stream_closed(State0, StreamID, ErrorCode),
loop(State);
+ {peer_send_shutdown, StreamID} ->
+ State = stream_peer_send_shutdown(State0, StreamID),
+ loop(State);
closed ->
%% @todo Different error reason if graceful?
Reason = {socket_error, closed, 'The socket has been closed.'},
@@ -216,6 +236,56 @@ parse1(State=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State#state{http3_machine=HTTP3Machine}, Error)
end;
+%% @todo Handle when IsFin = fin which must terminate the WT session.
+parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status=
+ {webtransport_session, normal}}, Data, IsFin) ->
+ case cow_capsule:parse(Data) of
+ {ok, wt_drain_session, Rest} ->
+ webtransport_event(State, SessionID, close_initiated),
+ parse1(State, Stream, Rest, IsFin);
+ {ok, {wt_close_session, AppCode, AppMsg}, Rest} ->
+ %% This event will be handled specially and lead
+ %% to the termination of the session process.
+ webtransport_event(State, SessionID, {closed, AppCode, AppMsg}),
+ %% Shutdown the CONNECT stream immediately.
+ cowboy_quicer:shutdown_stream(Conn, SessionID),
+ %% @todo Will we receive a {stream_closed,...} after that?
+ %% If any data is received past that point this is an error.
+ %% @todo Don't crash, error out properly.
+ <<>> = Rest,
+ loop(webtransport_terminate_session(State, Stream));
+ more ->
+ loop(stream_store(State, Stream#stream{buffer=Data}));
+ %% Ignore unhandled/unknown capsules.
+ %% @todo Do this when cow_capsule includes some.
+% {ok, _, Rest} ->
+% parse1(State, Stream, Rest, IsFin);
+% {ok, Rest} ->
+% parse1(State, Stream, Rest, IsFin);
+ %% @todo Make the max length configurable?
+ {skip, Len} when Len =< 8192 ->
+ loop(stream_store(State, Stream#stream{
+ status={webtransport_session, {ignore, Len}}}));
+ {skip, Len} ->
+ %% @todo What should be done on capsule error?
+ error({todo, capsule_too_long, Len});
+ error ->
+ %% @todo What should be done on capsule error?
+ error({todo, capsule_error, Data})
+ end;
+parse1(State, Stream=#stream{status=
+ {webtransport_session, {ignore, Len}}}, Data, IsFin) ->
+ case Data of
+ <<_:Len/unit:8, Rest/bits>> ->
+ parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin);
+ _ ->
+ loop(stream_store(State, Stream#stream{
+ status={webtransport_session, {ignore, Len - byte_size(Data)}}}))
+ end;
+parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) ->
+ webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}),
+ %% No need to store the stream again, WT streams don't get changed here.
+ loop(State);
parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) ->
DataLen = byte_size(Data),
if
@@ -246,6 +316,9 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) ->
{ok, Frame, Rest} ->
FrameIsFin = is_fin(IsFin, Rest),
parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin);
+ %% The WebTransport stream header is not a real frame.
+ {webtransport_stream_header, SessionID, Rest} ->
+ become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin);
{more, Frame = {data, _}, Len} ->
%% We're at the end of the data so FrameIsFin is equivalent to IsFin.
case IsFin of
@@ -317,13 +390,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0},
{error, Error={connection_error, _, _}, HTTP3Machine} ->
terminate(State0#state{http3_machine=HTTP3Machine}, Error)
end;
+ %% @todo Perhaps do this in cow_http3_machine directly.
{ok, push, _} ->
terminate(State0, {connection_error, h3_stream_creation_error,
'Only servers can push. (RFC9114 6.2.2)'});
+ {ok, {webtransport, SessionID}, Rest} ->
+ become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin);
%% Unknown stream types must be ignored. We choose to abort the
%% stream instead of reading and discarding the incoming data.
{undefined, _} ->
- loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error))
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
+ %% Very unlikely to happen but WebTransport headers may be fragmented
+ %% as they are more than one byte. The fin flag in this case is an error,
+ %% but because it happens in WebTransport application data (the Session ID)
+ %% we only reset the impacted stream and not the entire connection.
+ more when IsFin =:= fin ->
+ loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error));
+ more ->
+ loop(stream_store(State0, Stream0#stream{buffer=Data}))
end.
frame(State=#state{http3_machine=HTTP3Machine0},
@@ -449,6 +533,8 @@ headers_to_map([{Name, Value}|Tail], Acc0) ->
end,
headers_to_map(Tail, Acc).
+%% @todo WebTransport CONNECT requests must have extra checks on settings.
+%% @todo We may also need to defer them if we didn't get settings.
headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
@@ -488,6 +574,18 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer},
send_headers(State0, Stream, fin, StatusCode0, RespHeaders0)
end.
+%% Datagrams.
+
+parse_datagram(State, Data0) ->
+ {SessionID, Data} = cow_http3:parse_datagram(Data0),
+ case stream_get(State, SessionID) of
+ #stream{status={webtransport_session, _}} ->
+ webtransport_event(State, SessionID, {datagram, Data}),
+ loop(State);
+ _ ->
+ error(todo) %% @todo Might be a future WT session or an error.
+ end.
+
%% Erlang messages.
down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) ->
@@ -654,6 +752,22 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) ->
%% Use a different protocol within the stream (CONNECT :protocol).
%% @todo Make sure we error out when the feature is disabled.
commands(State0, Stream0=#stream{id=StreamID},
+ [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) ->
+ State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
+ #state{http3_machine=HTTP3Machine0} = State,
+ Stream1 = #stream{state=StreamState} = stream_get(State, StreamID),
+ %% The stream becomes a WT session at that point. It is the
+ %% parent stream of all streams in this WT session. The
+ %% cowboy_stream state is kept because it will be needed
+ %% to terminate the stream properly.
+ HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0),
+ Stream = Stream1#stream{
+ status={webtransport_session, normal},
+ state={cowboy_webtransport, WTState#{stream_state => StreamState}}
+ },
+ %% @todo We must propagate the buffer to capsule handling if any.
+ commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail);
+commands(State0, Stream0=#stream{id=StreamID},
[{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}),
Stream = stream_get(State, StreamID),
@@ -758,6 +872,157 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID},
cowboy_quicer:send(Conn, EncoderID, EncData)),
State.
+%% We mark the stream as being a WebTransport stream
+%% and then continue parsing the data as a WebTransport
+%% stream. This function is common for incoming unidi
+%% and bidi streams.
+become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0},
+ Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) ->
+ case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of
+ {ok, HTTP3Machine} ->
+ State = State0#state{http3_machine=HTTP3Machine},
+ Stream = Stream0#stream{status={webtransport_stream, SessionID}},
+ webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}),
+ %% We don't need to parse the remaining data if there isn't any.
+ case {Rest, IsFin} of
+ {<<>>, nofin} -> loop(stream_store(State, Stream));
+ _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin)
+ end
+ %% @todo Error conditions.
+ end.
+
+webtransport_event(State, SessionID, Event) ->
+ #stream{
+ status={webtransport_session, _},
+ state={cowboy_webtransport, #{session_pid := SessionPid}}
+ } = stream_get(State, SessionID),
+ SessionPid ! {'$webtransport_event', SessionID, Event},
+ ok.
+
+webtransport_commands(State, SessionID, Commands) ->
+ case stream_get(State, SessionID) of
+ Session = #stream{status={webtransport_session, _}} ->
+ wt_commands(State, Session, Commands);
+ %% The stream has been terminated, ignore pending commands.
+ error ->
+ State
+ end.
+
+wt_commands(State, _, []) ->
+ State;
+wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID},
+ [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) ->
+ %% Because opening the stream involves sending a short header
+ %% we necessarily write data. The InitialData variable allows
+ %% providing additional data to be sent in the same packet.
+ StartF = case StreamType of
+ bidi -> start_bidi_stream;
+ unidi -> start_unidi_stream
+ end,
+ Header = cow_http3:webtransport_stream_header(SessionID, StreamType),
+ case cowboy_quicer:StartF(Conn, [Header, InitialData]) of
+ {ok, StreamID} ->
+ %% @todo Pass Session directly?
+ webtransport_event(State0, SessionID,
+ {opened_stream_id, OpenStreamRef, StreamID}),
+ State = stream_new_local(State0, StreamID, StreamType,
+ {webtransport_stream, SessionID}),
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]});
+wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID},
+ [{send, datagram, Data}|Tail]) ->
+ case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ case cowboy_quicer:send(Conn, StreamID, Data, nofin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) ->
+ %% @todo Check that StreamID belongs to Session.
+ case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) ->
+ %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream.
+ Capsule = cow_capsule:wt_drain_session(),
+ case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of
+ ok ->
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end;
+wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail])
+ when Cmd =:= close; element(1, Cmd) =:= close ->
+ %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream.
+ {AppCode, AppMsg} = case Cmd of
+ close -> {0, <<>>};
+ {close, AppCode0} -> {AppCode0, <<>>};
+ {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0}
+ end,
+ Capsule = cow_capsule:wt_close_session(AppCode, AppMsg),
+ case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of
+ ok ->
+ State = webtransport_terminate_session(State0, Session),
+ %% @todo Because the handler is in a separate process
+ %% we must wait for it to stop and eventually
+ %% kill the process if it takes too long.
+ %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it).
+ wt_commands(State, Session, Tail)
+ %% @todo Handle errors.
+ end.
+
+webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0,
+ streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) ->
+ %% Reset/abort the WT streams.
+ Streams = maps:filtermap(fun
+ (_, #stream{id=StreamID, status={webtransport_session, _}})
+ when StreamID =:= SessionID ->
+ %% We remove the session stream but do the shutdown outside this function.
+ false;
+ (StreamID, #stream{status={webtransport_stream, StreamSessionID}})
+ when StreamSessionID =:= SessionID ->
+ cowboy_quicer:shutdown_stream(Conn, StreamID,
+ both, cow_http3:error_to_code(wt_session_gone)),
+ false;
+ (_, _) ->
+ true
+ end, Streams0),
+ %% Keep the streams in lingering state.
+ %% We only keep up to 100 streams in this state. @todo Make it configurable?
+ Terminated = maps:keys(Streams0) -- maps:keys(Streams),
+ Lingering = lists:sublist(Terminated ++ Lingering0, 100),
+ %% Update the HTTP3 state machine.
+ HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0),
+ State#state{
+ http3_machine=HTTP3Machine,
+ streams=Streams,
+ lingering_streams=Lingering
+ }.
+
+stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) ->
+ case stream_get(State, StreamID) of
+ %% Cleanly terminating the CONNECT stream is equivalent
+ %% to an application error code of 0 and empty message.
+ Stream = #stream{status={webtransport_session, _}} ->
+ webtransport_event(State, StreamID, {closed, 0, <<>>}),
+ %% Shutdown the CONNECT stream fully.
+ cowboy_quicer:shutdown_stream(Conn, StreamID),
+ webtransport_terminate_session(State, Stream);
+ _ ->
+ State
+ end.
+
reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0},
Stream=#stream{id=StreamID}, Error) ->
Reason = case Error of
@@ -903,15 +1168,25 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas
stream_get(#state{streams=Streams}, StreamID) ->
maps:get(StreamID, Streams, error).
-stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
- StreamID, StreamType) ->
+stream_new_local(State, StreamID, StreamType, Status) ->
+ stream_new(State, StreamID, StreamType, unidi_local, Status).
+
+stream_new_remote(State, StreamID, StreamType) ->
+ Status = case StreamType of
+ unidi -> header;
+ bidi -> normal
+ end,
+ stream_new(State, StreamID, StreamType, unidi_remote, Status).
+
+stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams},
+ StreamID, StreamType, UnidiType, Status) ->
{HTTP3Machine, Status} = case StreamType of
unidi ->
- {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0),
- header};
+ {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0),
+ Status};
bidi ->
{cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0),
- normal}
+ Status}
end,
Stream = #stream{id=StreamID, status=Status},
State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}.
@@ -926,6 +1201,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) ->
stream_closed(State=#state{opts=Opts,
streams=Streams0, children=Children0}, StreamID, ErrorCode) ->
case maps:take(StreamID, Streams0) of
+ %% In the WT session's case, streams will be
+ %% removed in webtransport_terminate_session.
+ {Stream=#stream{status={webtransport_session, _}}, _} ->
+ webtransport_event(State, StreamID, closed_abruptly),
+ webtransport_terminate_session(State, Stream);
{#stream{state=undefined}, Streams} ->
%% Unidi stream has no handler/children.
stream_closed1(State#state{streams=Streams}, StreamID);
diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl
index 6859c82..629d06e 100644
--- a/src/cowboy_loop.erl
+++ b/src/cowboy_loop.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl
index 27f14d4..67bf1a6 100644
--- a/src/cowboy_metrics_h.erl
+++ b/src/cowboy_metrics_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_middleware.erl b/src/cowboy_middleware.erl
index efeef4f..97c1498 100644
--- a/src/cowboy_middleware.erl
+++ b/src/cowboy_middleware.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl
index d9bbe1f..aa52fae 100644
--- a/src/cowboy_quicer.erl
+++ b/src/cowboy_quicer.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2023, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -23,9 +23,12 @@
-export([shutdown/2]).
%% Streams.
+-export([start_bidi_stream/2]).
-export([start_unidi_stream/2]).
-export([send/3]).
-export([send/4]).
+-export([send_datagram/2]).
+-export([shutdown_stream/2]).
-export([shutdown_stream/4]).
%% Messages.
@@ -45,6 +48,9 @@ peercert(_) -> no_quicer().
-spec shutdown(_, _) -> no_return().
shutdown(_, _) -> no_quicer().
+-spec start_bidi_stream(_, _) -> no_return().
+start_bidi_stream(_, _) -> no_quicer().
+
-spec start_unidi_stream(_, _) -> no_return().
start_unidi_stream(_, _) -> no_quicer().
@@ -54,6 +60,12 @@ send(_, _, _) -> no_quicer().
-spec send(_, _, _, _) -> no_return().
send(_, _, _, _) -> no_quicer().
+-spec send_datagram(_, _) -> no_return().
+send_datagram(_, _) -> no_quicer().
+
+-spec shutdown_stream(_, _) -> no_return().
+shutdown_stream(_, _) -> no_quicer().
+
-spec shutdown_stream(_, _, _, _) -> no_return().
shutdown_stream(_, _, _, _) -> no_quicer().
@@ -109,16 +121,26 @@ shutdown(Conn, ErrorCode) ->
%% Streams.
+-spec start_bidi_stream(quicer_connection_handle(), iodata())
+ -> {ok, cow_http3:stream_id()}
+ | {error, any()}.
+
+start_bidi_stream(Conn, InitialData) ->
+ start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE).
+
-spec start_unidi_stream(quicer_connection_handle(), iodata())
-> {ok, cow_http3:stream_id()}
| {error, any()}.
-start_unidi_stream(Conn, HeaderData) ->
+start_unidi_stream(Conn, InitialData) ->
+ start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL).
+
+start_stream(Conn, InitialData, OpenFlag) ->
case quicer:start_stream(Conn, #{
active => true,
- open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of
+ open_flag => OpenFlag}) of
{ok, StreamRef} ->
- case quicer:send(StreamRef, HeaderData) of
+ case quicer:send(StreamRef, InitialData) of
{ok, _} ->
{ok, StreamID} = quicer:get_stream_id(StreamRef),
put({quicer_stream, StreamID}, StreamRef),
@@ -156,6 +178,29 @@ send(_Conn, StreamID, Data, IsFin) ->
send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE;
send_flag(fin) -> ?QUIC_SEND_FLAG_FIN.
+-spec send_datagram(quicer_connection_handle(), iodata())
+ -> ok | {error, any()}.
+
+send_datagram(Conn, Data) ->
+ %% @todo Fix/ignore the Dialyzer error instead of doing this.
+ DataBin = iolist_to_binary(Data),
+ Size = byte_size(DataBin),
+ case quicer:send_dgram(Conn, DataBin) of
+ {ok, Size} ->
+ ok;
+ %% @todo Handle error cases.
+ Error ->
+ Error
+ end.
+
+-spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id())
+ -> ok.
+
+shutdown_stream(_Conn, StreamID) ->
+ StreamRef = get({quicer_stream, StreamID}),
+ _ = quicer:shutdown_stream(StreamRef),
+ ok.
+
-spec shutdown_stream(quicer_connection_handle(),
cow_http3:stream_id(), both | receiving, quicer_app_errno())
-> ok.
@@ -165,6 +210,7 @@ shutdown_stream(_Conn, StreamID, Dir, ErrorCode) ->
_ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity),
ok.
+%% @todo Are these flags correct for what we want?
shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT;
shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
@@ -173,9 +219,11 @@ shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE.
%% @todo Probably should have the Conn given as argument too?
-spec handle({quic, _, _, _})
-> {data, cow_http3:stream_id(), cow_http:fin(), binary()}
+ | {datagram, binary()}
| {stream_started, cow_http3:stream_id(), unidi | bidi}
| {stream_closed, cow_http3:stream_id(), quicer_app_errno()}
| closed
+ | {peer_send_shutdown, cow_http3:stream_id()}
| ok
| unknown
| {socket_error, any()}.
@@ -187,6 +235,9 @@ handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) ->
_ -> nofin
end,
{data, StreamID, IsFin, Data};
+%% @todo Match on Conn.
+handle({quic, Data, _Conn, Flags}) when is_binary(Data), is_integer(Flags) ->
+ {datagram, Data};
%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED.
handle({quic, new_stream, StreamRef, #{flags := Flags}}) ->
case quicer:setopt(StreamRef, active, true) of
@@ -219,8 +270,9 @@ handle({quic, dgram_state_changed, _Conn, _Props}) ->
%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT
handle({quic, transport_shutdown, _Conn, _Flags}) ->
ok;
-handle({quic, peer_send_shutdown, _StreamRef, undefined}) ->
- ok;
+handle({quic, peer_send_shutdown, StreamRef, undefined}) ->
+ {ok, StreamID} = quicer:get_stream_id(StreamRef),
+ {peer_send_shutdown, StreamID};
handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) ->
ok;
handle({quic, shutdown, _Conn, success}) ->
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 3f87677..550054e 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2011, Anthony Ramine <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%% Copyright (c) Anthony Ramine <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -445,6 +445,7 @@ parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_webs
parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1;
parse_header_fun(<<"trailer">>) -> fun cow_http_hd:parse_trailer/1;
parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1;
+parse_header_fun(<<"wt-available-protocols">>) -> fun cow_http_hd:parse_wt_available_protocols/1;
parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1.
parse_header(Name, Req, Default, ParseFun) ->
@@ -462,7 +463,7 @@ filter_cookies(Names0, Req=#{headers := Headers}) ->
case header(<<"cookie">>, Req) of
undefined -> Req;
Value0 ->
- Cookies0 = binary:split(Value0, <<$;>>),
+ Cookies0 = binary:split(Value0, <<$;>>, [global]),
Cookies = lists:filter(fun(Cookie) ->
lists:member(cookie_name(Cookie), Names)
end, Cookies0),
@@ -726,8 +727,10 @@ set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) ->
set_resp_header(Name,Value, Req) ->
Req#{resp_headers => #{Name => Value}}.
--spec set_resp_headers(cowboy:http_headers(), Req)
+-spec set_resp_headers(cowboy:http_headers() | [{binary(), iodata()}], Req)
-> Req when Req::req().
+set_resp_headers(Headers, Req) when is_list(Headers) ->
+ set_resp_headers_list(Headers, Req, #{});
set_resp_headers(#{<<"set-cookie">> := _}, _) ->
exit({response_error, invalid_header,
'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'});
@@ -736,6 +739,19 @@ set_resp_headers(Headers, Req=#{resp_headers := RespHeaders}) ->
set_resp_headers(Headers, Req) ->
Req#{resp_headers => Headers}.
+set_resp_headers_list([], Req, Acc) ->
+ set_resp_headers(Acc, Req);
+set_resp_headers_list([{<<"set-cookie">>, _}|_], _, _) ->
+ exit({response_error, invalid_header,
+ 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'});
+set_resp_headers_list([{Name, Value}|Tail], Req, Acc) ->
+ case Acc of
+ #{Name := ValueAcc} ->
+ set_resp_headers_list(Tail, Req, Acc#{Name => [ValueAcc, <<", ">>, Value]});
+ _ ->
+ set_resp_headers_list(Tail, Req, Acc#{Name => Value})
+ end.
+
-spec resp_header(binary(), req()) -> binary() | undefined.
resp_header(Name, Req) ->
resp_header(Name, Req, undefined).
diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl
index fcea71c..9f30fcf 100644
--- a/src/cowboy_rest.erl
+++ b/src/cowboy_rest.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -246,9 +246,6 @@
handler :: atom(),
handler_state :: any(),
- %% Allowed methods. Only used for OPTIONS requests.
- allowed_methods :: [binary()] | undefined,
-
%% Media type.
content_types_p = [] ::
[{binary() | {binary(), binary(), [{binary(), binary()}] | '*'},
@@ -307,17 +304,17 @@ known_methods(Req, State=#state{method=Method}) ->
Method =:= <<"POST">>; Method =:= <<"PUT">>;
Method =:= <<"PATCH">>; Method =:= <<"DELETE">>;
Method =:= <<"OPTIONS">> ->
- next(Req, State, fun uri_too_long/2);
+ uri_too_long(Req, State);
no_call ->
- next(Req, State, 501);
+ respond(Req, State, 501);
{stop, Req2, State2} ->
terminate(Req2, State2);
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{List, Req2, State2} ->
case lists:member(Method, List) of
- true -> next(Req2, State2, fun uri_too_long/2);
- false -> next(Req2, State2, 501)
+ true -> uri_too_long(Req2, State2);
+ false -> respond(Req2, State2, 501)
end
end.
@@ -327,39 +324,26 @@ uri_too_long(Req, State) ->
%% allowed_methods/2 should return a list of binary methods.
allowed_methods(Req, State=#state{method=Method}) ->
case call(Req, State, allowed_methods) of
- no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">> ->
- next(Req, State, fun malformed_request/2);
- no_call when Method =:= <<"OPTIONS">> ->
- next(Req, State#state{allowed_methods=
- [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]},
- fun malformed_request/2);
+ no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">>; Method =:= <<"OPTIONS">> ->
+ Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req),
+ malformed_request(Req2, State);
no_call ->
- method_not_allowed(Req, State,
- [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]);
+ Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req),
+ respond(Req2, State, 405);
{stop, Req2, State2} ->
terminate(Req2, State2);
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{List, Req2, State2} ->
+ Req3 = cowboy_req:set_resp_header(<<"allow">>, cow_http_hd:allow(List), Req2),
case lists:member(Method, List) of
- true when Method =:= <<"OPTIONS">> ->
- next(Req2, State2#state{allowed_methods=List},
- fun malformed_request/2);
true ->
- next(Req2, State2, fun malformed_request/2);
+ malformed_request(Req3, State2);
false ->
- method_not_allowed(Req2, State2, List)
+ respond(Req3, State2, 405)
end
end.
-method_not_allowed(Req, State, []) ->
- Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req),
- respond(Req2, State, 405);
-method_not_allowed(Req, State, Methods) ->
- << ", ", Allow/binary >> = << << ", ", M/binary >> || M <- Methods >>,
- Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req),
- respond(Req2, State, 405).
-
malformed_request(Req, State) ->
expect(Req, State, malformed_request, false, fun is_authorized/2, 400).
@@ -413,16 +397,10 @@ valid_entity_length(Req, State) ->
%% If you need to add additional headers to the response at this point,
%% you should do it directly in the options/2 call using set_resp_headers.
-options(Req, State=#state{allowed_methods=Methods, method= <<"OPTIONS">>}) ->
+options(Req, State=#state{method= <<"OPTIONS">>}) ->
case call(Req, State, options) of
- no_call when Methods =:= [] ->
- Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req),
- respond(Req2, State, 200);
no_call ->
- << ", ", Allow/binary >>
- = << << ", ", M/binary >> || M <- Methods >>,
- Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req),
- respond(Req2, State, 200);
+ respond(Req, State, 200);
{stop, Req2, State2} ->
terminate(Req2, State2);
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
@@ -471,7 +449,7 @@ content_types_provided(Req, State) ->
{[], Req2, State2} ->
not_acceptable(Req2, State2);
{CTP, Req2, State2} ->
- CTP2 = [normalize_content_types(P) || P <- CTP],
+ CTP2 = [normalize_content_types(P, provide) || P <- CTP],
State3 = State2#state{content_types_p=CTP2},
try cowboy_req:parse_header(<<"accept">>, Req2) of
undefined ->
@@ -491,10 +469,14 @@ content_types_provided(Req, State) ->
end
end.
-normalize_content_types({ContentType, Callback})
+normalize_content_types({ContentType, Callback}, _)
when is_binary(ContentType) ->
{cow_http_hd:parse_content_type(ContentType), Callback};
-normalize_content_types(Normalized) ->
+normalize_content_types(Normalized = {{Type, SubType, _}, _}, _)
+ when is_binary(Type), is_binary(SubType) ->
+ Normalized;
+%% Wildcard for content_types_accepted.
+normalize_content_types(Normalized = {'*', _}, accept) ->
Normalized.
prioritize_accept(Accept) ->
@@ -1059,7 +1041,7 @@ accept_resource(Req, State) ->
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{CTA, Req2, State2} ->
- CTA2 = [normalize_content_types(P) || P <- CTA],
+ CTA2 = [normalize_content_types(P, accept) || P <- CTA],
try cowboy_req:parse_header(<<"content-type">>, Req2) of
%% We do not match against the boundary parameter for multipart.
{Type = <<"multipart">>, SubType, Params} ->
@@ -1099,9 +1081,9 @@ process_content_type(Req, State=#state{method=Method, exists=Exists}, Fun) ->
{Switch, Req2, State2} when element(1, Switch) =:= switch_handler ->
switch_handler(Switch, Req2, State2);
{true, Req2, State2} when Exists ->
- next(Req2, State2, fun has_resp_body/2);
+ has_resp_body(Req2, State2);
{true, Req2, State2} ->
- next(Req2, State2, fun maybe_created/2);
+ maybe_created(Req2, State2);
{false, Req2, State2} ->
respond(Req2, State2, 400);
{{created, ResURL}, Req2, State2} when Method =:= <<"POST">> ->
@@ -1640,5 +1622,6 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, Class,
erlang:raise(Class, Reason, Stacktrace).
terminate(Req, #state{handler=Handler, handler_state=HandlerState}) ->
+ %% @todo I don't think the result is used anywhere?
Result = cowboy_handler:terminate(normal, Req, HandlerState, Handler),
{ok, Req, Result}.
diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl
index 61c9012..393d82d 100644
--- a/src/cowboy_router.erl
+++ b/src/cowboy_router.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl
index a185ef1..ce34b01 100644
--- a/src/cowboy_static.erl
+++ b/src/cowboy_static.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2011, Magnus Klaar <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%% Copyright (c) Magnus Klaar <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -29,7 +29,7 @@
-type extra_charset() :: {charset, module(), function()} | {charset, binary()}.
-type extra_etag() :: {etag, module(), function()} | {etag, false}.
-type extra_mimetypes() :: {mimetypes, module(), function()}
- | {mimetypes, binary() | {binary(), binary(), [{binary(), binary()}]}}.
+ | {mimetypes, binary() | {binary(), binary(), '*' | [{binary(), binary()}]}}.
-type extra() :: [extra_charset() | extra_etag() | extra_mimetypes()].
-type opts() :: {file | dir, string() | binary()}
| {file | dir, string() | binary(), extra()}
@@ -332,7 +332,7 @@ forbidden(Req, State) ->
%% Detect the mimetype of the file.
-spec content_types_provided(Req, State)
- -> {[{binary(), get_file}], Req, State}
+ -> {[{binary() | {binary(), binary(), '*' | [{binary(), binary()}]}, get_file}], Req, State}
when State::state().
content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) ->
case lists:keyfind(mimetypes, 1, Extra) of
@@ -347,7 +347,7 @@ content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) ->
%% Detect the charset of the file.
-spec charsets_provided(Req, State)
- -> {[binary()], Req, State}
+ -> {[binary()], Req, State} | no_call
when State::state().
charsets_provided(Req, State={Path, _, Extra}) ->
case lists:keyfind(charset, 1, Extra) of
@@ -381,7 +381,7 @@ resource_exists(Req, State) ->
%% Generate an etag for the file.
-spec generate_etag(Req, State)
- -> {{strong | weak, binary()}, Req, State}
+ -> {{strong | weak, binary() | undefined}, Req, State}
when State::state().
generate_etag(Req, State={Path, {_, #file_info{size=Size, mtime=Mtime}},
Extra}) ->
@@ -408,7 +408,7 @@ last_modified(Req, State={_, {_, #file_info{mtime=Modified}}, _}) ->
%% Stream the file.
-spec get_file(Req, State)
- -> {{sendfile, 0, non_neg_integer(), binary()}, Req, State}
+ -> {{sendfile, 0, non_neg_integer(), binary()} | binary(), Req, State}
when State::state().
get_file(Req, State={Path, {direct, #file_info{size=Size}}, _}) ->
{{sendfile, 0, Size, Path}, Req, State};
diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl
index 6ceb5ba..6680bdc 100644
--- a/src/cowboy_stream.erl
+++ b/src/cowboy_stream.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -49,6 +49,7 @@
-type reason() :: normal | switch_protocol
| {internal_error, timeout | {error | exit | throw, any()}, human_reason()}
| {socket_error, closed | atom(), human_reason()}
+ %% @todo Or cow_http3:error().
| {stream_error, cow_http2:error(), human_reason()}
| {connection_error, cow_http2:error(), human_reason()}
| {stop, cow_http2:frame() | {exit, any()}, human_reason()}.
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
index b373344..3c3c084 100644
--- a/src/cowboy_stream_h.erl
+++ b/src/cowboy_stream_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -138,7 +138,7 @@ info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}},
{error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>},
stop
], State);
-info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) ->
+info(StreamID, Exit={'EXIT', Pid, Reason}, State=#state{ref=Ref, pid=Pid}) ->
Commands0 = [{internal_error, Exit, 'Stream process crashed.'}],
Commands = case Reason of
normal -> Commands0;
@@ -146,9 +146,8 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p
{shutdown, _} -> Commands0;
_ -> [{log, error,
"Ranch listener ~p, connection process ~p, stream ~p "
- "had its request process ~p exit with reason "
- "~999999p and stacktrace ~999999p~n",
- [Ref, self(), StreamID, Pid, Reason, Stacktrace]}
+ "had its request process ~p exit with reason ~0p~n",
+ [Ref, self(), StreamID, Pid, Reason]}
|Commands0]
end,
%% @todo We are trying to send a 500 response before resetting
diff --git a/src/cowboy_sub_protocol.erl b/src/cowboy_sub_protocol.erl
index 062fd38..1f24d00 100644
--- a/src/cowboy_sub_protocol.erl
+++ b/src/cowboy_sub_protocol.erl
@@ -1,5 +1,5 @@
-%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2013, James Fish <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%% Copyright (c) James Fish <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_sup.erl b/src/cowboy_sup.erl
index e37f4cf..224ef7d 100644
--- a/src/cowboy_sup.erl
+++ b/src/cowboy_sup.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl
index 60ab2ed..6d0dcd3 100644
--- a/src/cowboy_tls.erl
+++ b/src/cowboy_tls.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -39,7 +39,11 @@ connection_process(Parent, Ref, Transport, Opts) ->
{ok, <<"h2">>} ->
init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http2);
_ -> %% http/1.1 or no protocol negotiated.
- init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http)
+ Protocol = case maps:get(alpn_default_protocol, Opts, http) of
+ http -> cowboy_http;
+ http2 -> cowboy_http2
+ end,
+ init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol)
end.
init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) ->
diff --git a/src/cowboy_tracer_h.erl b/src/cowboy_tracer_h.erl
index b1196fe..91a431b 100644
--- a/src/cowboy_tracer_h.erl
+++ b/src/cowboy_tracer_h.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 577de47..cb30c3f 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -69,6 +69,9 @@
active_n => pos_integer(),
compress => boolean(),
deflate_opts => cow_ws:deflate_opts(),
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
idle_timeout => timeout(),
max_frame_size => non_neg_integer() | infinity,
req_filter => fun((cowboy_req:req()) -> map()),
@@ -97,6 +100,11 @@
timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS,
messages = undefined :: undefined | {atom(), atom(), atom()}
| {atom(), atom(), atom(), atom()},
+
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size = false :: pos_integer() | false,
+ dynamic_buffer_moving_average = 0 :: non_neg_integer(),
+
hibernate = false :: boolean(),
frag_state = undefined :: cow_ws:frag_state(),
frag_buffer = <<>> :: binary(),
@@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>,
+ takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
module() | undefined, any(), binary(),
{#state{}, any()}) -> no_return().
-takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
- {State0=#state{handler=Handler, req=Req}, HandlerState}) ->
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
+ {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
case Req of
#{version := 'HTTP/3'} -> ok;
%% @todo We should have an option to disable this behavior.
@@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
end,
State = set_idle_timeout(State0#state{parent=Parent,
ref=Ref, socket=Socket, transport=Transport,
- key=undefined, messages=Messages}, 0),
+ opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)},
+ key=undefined, messages=Messages,
+ %% Dynamic buffer only applies to HTTP/1.1 Websocket.
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0),
%% We call parse_header/3 immediately because there might be
%% some data in the buffer that was sent along with the handshake.
%% While it is not allowed by the protocol to send frames immediately,
@@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer,
false -> after_init(State, HandlerState, #ps_header{buffer=Buffer})
end.
+-include("cowboy_dynamic_buffer.hrl").
+
+%% @todo Implement early socket error detection.
+maybe_socket_error(_, _) ->
+ ok.
+
after_init(State=#state{active=true}, HandlerState, ParseState) ->
%% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2.
%% We must do this only after calling websocket_init/1 (if any)
@@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) ->
setopts_active(#state{transport=undefined}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
@@ -384,6 +402,7 @@ before_loop(State, HandlerState, ParseState) ->
-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}.
+%% @todo Do we really need this for HTTP/2?
set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
%% Most of the time we don't need to cancel the timer since it
%% will have triggered already. But this call is harmless so
@@ -391,7 +410,7 @@ set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) ->
%% options are changed dynamically.
_ = case PrevRef of
undefined -> ignore;
- PrevRef -> erlang:cancel_timer(PrevRef)
+ PrevRef -> erlang:cancel_timer(PrevRef, [{async, true}, {info, false}])
end,
case maps:get(idle_timeout, Opts, 60000) of
infinity ->
@@ -414,7 +433,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
receive
%% Socket messages. (HTTP/1.1)
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, HandlerState, {error, closed});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -480,12 +500,16 @@ parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions
websocket_close(State, HandlerState, {error, badframe})
end.
-parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions},
+parse_payload(State=#state{opts=Opts, frag_state=FragState, utf8_state=Incomplete, extensions=Extensions},
HandlerState, ParseState=#ps_payload{
type=Type, len=Len, mask_key=MaskKey, rsv=Rsv,
unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) ->
+ MaxFrameSize = case maps:get(max_frame_size, Opts, infinity) of
+ infinity -> infinity;
+ MaxFrameSize0 -> MaxFrameSize0 - UnmaskedLen
+ end,
case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen,
- Type, Len, FragState, Extensions, Rsv) of
+ Type, Len, FragState, Extensions#{max_inflate_size => MaxFrameSize}, Rsv) of
{ok, CloseCode, Payload, Utf8State, Rest} ->
dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState,
ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>,
@@ -615,14 +639,16 @@ commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_b
commands(Tail, State#state{active=Active}, Data);
commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) ->
commands(Tail, State#state{deflate=Deflate}, Data);
-commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) ->
- State = case SetOpts of
- #{idle_timeout := IdleTimeout} ->
+commands([{set_options, SetOpts}|Tail], State0, Data) ->
+ State = maps:fold(fun
+ (idle_timeout, IdleTimeout, StateF=#state{opts=Opts}) ->
%% We reset the number of ticks when changing the idle_timeout option.
- set_idle_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0);
- _ ->
- State0
- end,
+ set_idle_timeout(StateF#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0);
+ (max_frame_size, MaxFrameSize, StateF=#state{opts=Opts}) ->
+ StateF#state{opts=Opts#{max_frame_size => MaxFrameSize}};
+ (_, _, StateF) ->
+ StateF
+ end, State0, SetOpts),
commands(Tail, State, Data);
commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) ->
commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data);
diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl
new file mode 100644
index 0000000..8c8ca39
--- /dev/null
+++ b/src/cowboy_webtransport.erl
@@ -0,0 +1,292 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% @todo To enable WebTransport the following options need to be set:
+%%
+%% QUIC:
+%% - max_datagram_frame_size > 0
+%%
+%% HTTP/3:
+%% - SETTINGS_H3_DATAGRAM = 1
+%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1
+%% - SETTINGS_WT_MAX_SESSIONS >= 1
+
+%% Cowboy supports versions 07 through 13 of the WebTransport drafts.
+%% Cowboy also has some compatibility with version 02.
+%%
+%% WebTransport CONNECT requests go through cowboy_stream as normal
+%% and then an upgrade/switch_protocol is issued (just like Websocket).
+%% After that point none of the events go through cowboy_stream except
+%% the final terminate event. The request process becomes the process
+%% handling all events in the WebTransport session.
+%%
+%% WebTransport sessions can be ended via a command, via a crash or
+%% exit, via the closing of the connection (client or server inititated),
+%% via the client ending the session (mirroring the command) or via
+%% the client terminating the CONNECT stream.
+-module(cowboy_webtransport).
+
+-export([upgrade/4]).
+-export([upgrade/5]).
+
+%% cowboy_stream.
+-export([info/3]).
+-export([terminate/3]).
+
+-type stream_type() :: unidi | bidi.
+-type open_stream_ref() :: any().
+
+-type event() ::
+ {stream_open, cow_http3:stream_id(), stream_type()} |
+ {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} |
+ {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} |
+ {datagram, binary()} |
+ close_initiated.
+
+-type commands() :: [
+ {open_stream, open_stream_ref(), stream_type(), iodata()} |
+ {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} |
+ {send, cow_http3:stream_id() | datagram, iodata()} |
+ initiate_close |
+ close |
+ {close, cow_http3:wt_app_error_code()} |
+ {close, cow_http3:wt_app_error_code(), iodata()}
+].
+-export_type([commands/0]).
+
+-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}.
+
+-callback init(Req, any())
+ -> {ok | module(), Req, any()}
+ | {module(), Req, any(), any()}
+ when Req::cowboy_req:req().
+
+-callback webtransport_init(State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_init/1]).
+
+-callback webtransport_handle(event(), State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_handle/2]).
+
+-callback webtransport_info(any(), State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_info/2]).
+
+-callback terminate(any(), cowboy_req:req(), any()) -> ok.
+-optional_callbacks([terminate/3]).
+
+-type opts() :: #{
+ req_filter => fun((cowboy_req:req()) -> map())
+}.
+-export_type([opts/0]).
+
+-record(state, {
+ id :: cow_http3:stream_id(),
+ parent :: pid(),
+ opts = #{} :: opts(),
+ handler :: module(),
+ hibernate = false :: boolean(),
+ req = #{} :: map()
+}).
+
+%% This function mirrors a similar function for Websocket.
+
+-spec is_upgrade_request(cowboy_req:req()) -> boolean().
+
+is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol})
+ when Version =:= 'HTTP/3' ->
+ %% @todo scheme MUST BE "https"
+ <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol);
+
+is_upgrade_request(_) ->
+ false.
+
+%% Stream process.
+
+-spec upgrade(Req, Env, module(), any())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+upgrade(Req, Env, Handler, HandlerState) ->
+ upgrade(Req, Env, Handler, HandlerState, #{}).
+
+-spec upgrade(Req, Env, module(), any(), opts())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+%% @todo Immediately crash if a response has already been sent.
+upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) ->
+ FilteredReq = case maps:get(req_filter, Opts, undefined) of
+ undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req);
+ FilterFun -> FilterFun(Req)
+ end,
+ State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq},
+ %% @todo Must ensure the relevant settings are enabled (QUIC and H3).
+ %% Either we check them BEFORE, or we check them when the handler
+ %% is OK to initiate a webtransport session. Probably need to
+ %% check them BEFORE as we need to become (takeover) the webtransport process
+ %% after we are done with the upgrade. Maybe in cow_http3_machine but
+ %% it doesn't have QUIC settings currently (max_datagram_size).
+ case is_upgrade_request(Req) of
+ true ->
+ Headers = cowboy_req:response_headers(#{}, Req),
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE,
+ #{session_pid => self()}}},
+ webtransport_init(State, HandlerState);
+ %% Use 501 Not Implemented to mirror the recommendation in
+ %% by RFC9220 3 (WebSockets Upgrade over HTTP/3).
+ false ->
+ %% @todo I don't think terminate will be called.
+ {ok, cowboy_req:reply(501, Req), Env}
+ end.
+
+webtransport_init(State=#state{handler=Handler}, HandlerState) ->
+ case erlang:function_exported(Handler, webtransport_init, 1) of
+ true -> handler_call(State, HandlerState, webtransport_init, undefined);
+ false -> before_loop(State, HandlerState)
+ end.
+
+before_loop(State=#state{hibernate=true}, HandlerState) ->
+ proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]);
+before_loop(State, HandlerState) ->
+ loop(State, HandlerState).
+
+-spec loop(#state{}, any()) -> no_return().
+
+loop(State=#state{id=SessionID, parent=Parent}, HandlerState) ->
+ receive
+ {'$webtransport_event', SessionID, Event={closed, _, _}} ->
+ terminate_proc(State, HandlerState, Event);
+ {'$webtransport_event', SessionID, Event=closed_abruptly} ->
+ terminate_proc(State, HandlerState, Event);
+ {'$webtransport_event', SessionID, Event} ->
+ handler_call(State, HandlerState, webtransport_handle, Event);
+ %% Timeouts.
+%% @todo idle_timeout
+% {timeout, TRef, ?MODULE} ->
+% tick_idle_timeout(State, HandlerState, ParseState);
+% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
+% before_loop(State, HandlerState, ParseState);
+ %% System messages.
+ {'EXIT', Parent, Reason} ->
+ %% @todo We should exit gracefully.
+ exit(Reason);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {State, HandlerState});
+ %% Calls from supervisor module.
+ {'$gen_call', From, Call} ->
+ cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
+ before_loop(State, HandlerState);
+ Message ->
+ handler_call(State, HandlerState, webtransport_info, Message)
+ end.
+
+handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) ->
+ try case Callback of
+ webtransport_init -> Handler:webtransport_init(HandlerState);
+ _ -> Handler:Callback(Message, HandlerState)
+ end of
+ {Commands, HandlerState2} when is_list(Commands) ->
+ handler_call_result(State, HandlerState2, Commands);
+ {Commands, HandlerState2, hibernate} when is_list(Commands) ->
+ handler_call_result(State#state{hibernate=true}, HandlerState2, Commands)
+ catch Class:Reason:Stacktrace ->
+ %% @todo Do we need to send a close? Let cowboy_http3 detect and handle it?
+ handler_terminate(State, HandlerState, {crash, Class, Reason}),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+
+handler_call_result(State0, HandlerState, Commands) ->
+ case commands(Commands, State0, ok, []) of
+ {ok, State} ->
+ before_loop(State, HandlerState);
+ {stop, State} ->
+ terminate_proc(State, HandlerState, stop)
+ end.
+
+%% We accumulate the commands that must be sent to the connection process
+%% because we want to send everything into one message. Other commands are
+%% processed immediately.
+
+commands([], State, Res, []) ->
+ {Res, State};
+commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) ->
+ Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)},
+ {Res, State};
+%% {open_stream, OpenStreamRef, StreamType, InitialData}.
+commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% {close_stream, StreamID, Code}.
+commands([Command={close_stream, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% @todo We must reject send to a remote unidi stream.
+%% {send, StreamID | datagram, Data}.
+commands([Command={send, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% {send, StreamID, IsFin, Data}.
+commands([Command={send, _, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% initiate_close - DRAIN_WT_SESSION
+commands([Command=initiate_close|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION
+%% @todo At this point the handler must not issue stream or send commands.
+commands([Command=close|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]);
+commands([Command={close, _}|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]);
+commands([Command={close, _, _}|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]).
+%% @todo A set_options command could be useful to increase the number of allowed streams
+%% or other forms of flow control. Alternatively a flow command. Or both.
+%% @todo A shutdown_reason command could be useful for the same reasons as Websocekt.
+
+-spec terminate_proc(_, _, _) -> no_return().
+
+terminate_proc(State, HandlerState, Reason) ->
+ handler_terminate(State, HandlerState, Reason),
+ %% @todo This is what should be done if shutdown_reason gets implemented.
+% case Shutdown of
+% normal -> exit(normal);
+% _ -> exit({shutdown, Shutdown})
+% end.
+ exit(normal).
+
+handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
+ cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
+
+%% cowboy_stream callbacks.
+%%
+%% We shortcut stream handlers but still need to process some events
+%% such as process exiting or termination. We implement the relevant
+%% callbacks here. Note that as far as WebTransport is concerned,
+%% receiving stream data here would be an error therefore the data
+%% callback is not implemented.
+%%
+%% @todo Better type than map() for the cowboy_stream state.
+
+-spec info(cowboy_stream:streamid(), any(), State)
+ -> {cowboy_stream:commands(), State} when State::map().
+
+info(StreamID, Msg, WTState=#{stream_state := StreamState0}) ->
+ {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0),
+ {Commands, WTState#{stream_state => StreamState}}.
+
+-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), map())
+ -> any().
+
+terminate(StreamID, Reason, #{stream_state := StreamState}) ->
+ cowboy_stream:terminate(StreamID, Reason, StreamState).