diff options
Diffstat (limited to 'src')
30 files changed, 1072 insertions, 245 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index e5ed831..6a5634e 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -51,8 +51,12 @@ start_clear(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). -spec start_tls(ranch:ref(), ranch:opts(), opts()) @@ -60,12 +64,13 @@ start_clear(Ref, TransOpts0, ProtoOpts0) -> start_tls(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - SocketOpts = maps:get(socket_opts, TransOpts1, []), - TransOpts2 = TransOpts1#{socket_opts => [ - {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} - |SocketOpts]}, - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + TransOpts3 = ensure_alpn(TransOpts2), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). %% @todo Experimental function to start a barebone QUIC listener. @@ -77,6 +82,7 @@ start_tls(Ref, TransOpts0, ProtoOpts0) -> -spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts()) -> {ok, pid()}. +%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies. start_quic(Ref, TransOpts, ProtoOpts) -> {ok, _} = application:ensure_all_started(quicer), Parent = self(), @@ -89,8 +95,14 @@ start_quic(Ref, TransOpts, ProtoOpts) -> end, SocketOpts = [ {alpn, ["h3"]}, %% @todo Why not binary? - {peer_unidi_stream_count, 3}, %% We only need control and QPACK enc/dec. - {peer_bidi_stream_count, 100} + %% We only need 3 for control and QPACK enc/dec, + %% but we need more for WebTransport. %% @todo Use 3 if WT is disabled. + {peer_unidi_stream_count, 100}, + {peer_bidi_stream_count, 100}, + %% For WebTransport. + %% @todo We probably don't want it enabled if WT isn't used. + {datagram_send_enabled, 1}, + {datagram_receive_enabled, 1} |SocketOpts2], _ListenerPid = spawn(fun() -> {ok, Listener} = quicer:listen(Port, SocketOpts), @@ -139,11 +151,32 @@ port_0() -> end, Port. +ensure_alpn(TransOpts) -> + SocketOpts = maps:get(socket_opts, TransOpts, []), + TransOpts#{socket_opts => [ + {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} + |SocketOpts]}. + ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) -> {TransOpts, ConnectionType}; ensure_connection_type(TransOpts) -> {TransOpts#{connection_type => supervisor}, supervisor}. +%% Dynamic buffer was set; accept transport options as-is. +%% Note that initial 'buffer' size may be lower than dynamic buffer allows. +ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) -> + {TransOpts, DynamicBuffer}; +%% Dynamic buffer was not set; define default dynamic buffer +%% only if 'buffer' size was not configured. In that case we +%% set the 'buffer' size to the lowest value. +ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) -> + case proplists:get_value(buffer, SocketOpts, undefined) of + undefined -> + {TransOpts#{socket_opts => [{buffer, 1024}|SocketOpts]}, {1024, 131072}}; + _ -> + {TransOpts, false} + end. + -spec stop_listener(ranch:ref()) -> ok | {error, not_found}. stop_listener(Ref) -> diff --git a/src/cowboy_app.erl b/src/cowboy_app.erl index 95ae564..e58e1f6 100644 --- a/src/cowboy_app.erl +++ b/src/cowboy_app.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_bstr.erl b/src/cowboy_bstr.erl index f23167d..d0e7301 100644 --- a/src/cowboy_bstr.erl +++ b/src/cowboy_bstr.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_children.erl b/src/cowboy_children.erl index 305c989..2e00c37 100644 --- a/src/cowboy_children.erl +++ b/src/cowboy_children.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl index eaeab74..845fdc1 100644 --- a/src/cowboy_clear.erl +++ b/src/cowboy_clear.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -36,10 +36,6 @@ connection_process(Parent, Ref, Transport, Opts) -> ProxyInfo = get_proxy_info(Ref, Opts), {ok, Socket} = ranch:handshake(Ref), %% Use cowboy_http2 directly only when 'http' is missing. - %% Otherwise switch to cowboy_http2 from cowboy_http. - %% - %% @todo Extend this option to cowboy_tls and allow disabling - %% the switch to cowboy_http2 in cowboy_http. Also document it. Protocol = case maps:get(protocols, Opts, [http2, http]) of [http2] -> cowboy_http2; [_|_] -> cowboy_http diff --git a/src/cowboy_clock.erl b/src/cowboy_clock.erl index e2cdf62..b6e39f4 100644 --- a/src/cowboy_clock.erl +++ b/src/cowboy_clock.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -93,7 +93,7 @@ handle_cast(_Msg, State) -> -spec handle_info(any(), State) -> {noreply, State} when State::#state{}. handle_info(update, #state{universaltime=Prev, rfc1123=B1, tref=TRef0}) -> %% Cancel the timer in case an external process sent an update message. - _ = erlang:cancel_timer(TRef0), + _ = erlang:cancel_timer(TRef0, [{async, true}, {info, false}]), T = erlang:universaltime(), B2 = update_rfc1123(B1, Prev, T), ets:insert(?MODULE, {rfc1123, B2}), diff --git a/src/cowboy_compress_h.erl b/src/cowboy_compress_h.erl index 338ea9f..785eb0d 100644 --- a/src/cowboy_compress_h.erl +++ b/src/cowboy_compress_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_constraints.erl b/src/cowboy_constraints.erl index 33f0111..84ff249 100644 --- a/src/cowboy_constraints.erl +++ b/src/cowboy_constraints.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2014-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_decompress_h.erl b/src/cowboy_decompress_h.erl index 4e86e23..84283e5 100644 --- a/src/cowboy_decompress_h.erl +++ b/src/cowboy_decompress_h.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2024, jdamanalo <[email protected]> -%% Copyright (c) 2024, Loïc Hoguin <[email protected]> +%% Copyright (c) jdamanalo <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -66,9 +66,11 @@ data(StreamID, IsFin, Data, State=#state{next=Next0, enabled=false, read_body_bu {Commands, Next} = cowboy_stream:data(StreamID, IsFin, buffer_to_binary([Data|Buffer]), Next0), fold(Commands, State#state{next=Next, read_body_is_fin=IsFin}); -data(StreamID, IsFin, Data, State0=#state{next=Next0, ratio_limit=RatioLimit, +data(StreamID, IsFin, Data0, State0=#state{next=Next0, ratio_limit=RatioLimit, inflate=Z, is_reading=true, read_body_buffer=Buffer}) -> - case inflate(Z, RatioLimit, buffer_to_iovec([Data|Buffer])) of + Data = buffer_to_iovec([Data0|Buffer]), + Limit = iolist_size(Data) * RatioLimit, + case cow_deflate:inflate(Z, Data, Limit) of {error, ErrorType} -> zlib:close(Z), Status = case ErrorType of @@ -236,22 +238,3 @@ do_build_accept_encoding([{ContentCoding, Q}|Tail], Acc0) -> do_build_accept_encoding(Tail, Acc); do_build_accept_encoding([], Acc) -> Acc. - -inflate(Z, RatioLimit, Data) -> - try - {Status, Output} = zlib:safeInflate(Z, Data), - Size = iolist_size(Output), - do_inflate(Z, Size, iolist_size(Data) * RatioLimit, Status, [Output]) - catch - error:data_error -> - {error, data_error} - end. - -do_inflate(_, Size, Limit, _, _) when Size > Limit -> - {error, size_error}; -do_inflate(Z, Size0, Limit, continue, Acc) -> - {Status, Output} = zlib:safeInflate(Z, []), - Size = Size0 + iolist_size(Output), - do_inflate(Z, Size, Limit, Status, [Output | Acc]); -do_inflate(_, _, _, finished, Acc) -> - {ok, iolist_to_binary(lists:reverse(Acc))}. diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl new file mode 100644 index 0000000..4d05e50 --- /dev/null +++ b/src/cowboy_dynamic_buffer.hrl @@ -0,0 +1,80 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% These functions are common to cowboy_http, cowboy_http2 and +%% cowboy_websocket. It requires the options and the state +%% to use the same field names. + +%% Experiments have shown that the size of the 'buffer' can greatly +%% impact performance: a buffer too small leads to more messages +%% being handled and typically more binary appends; and a buffer +%% too large results in inefficient use of memory which in turn +%% reduces the throughput, presumably because large binary appends +%% are not as efficient as smaller ones, and because while the +%% buffer gets allocated only when there is data, the allocated +%% size remains until the binary is GC and so under-use hurts. +%% +%% The performance of a given 'buffer' size will also depend on +%% how the client is sending data, and on the protocol. For example, +%% HTTP/1.1 doesn't need a very large 'buffer' size for reading +%% request headers, but it does need one for reading large request +%% bodies. At the same time, HTTP/2 performs best reading large +%% request bodies when the 'buffer' size is about half that of +%% HTTP/1.1. +%% +%% It therefore becomes important to resize the buffer dynamically +%% depending on what is currently going on. We do this based on +%% the size of data packets we received from the transport. We +%% maintain a moving average and when that moving average is +%% 90% of the current 'buffer' size, we double the 'buffer' size. +%% When things slow down and the moving average falls below +%% 40% of the current 'buffer' size, we halve the 'buffer' size. +%% +%% To calculate the moving average we do (MovAvg + DataLen) div 2. +%% This means that the moving average will change very quickly when +%% DataLen increases or decreases rapidly. That's OK, we want to +%% be reactive, but also setting the buffer size is a pretty fast +%% operation. The formula could be changed to the following if it +%% became a problem: (MovAvg * N + DataLen) div (N + 1). +%% +%% Note that this works best when active,N uses low values of N. +%% We don't want to accumulate too much data because we resize +%% the buffer. + +init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) -> + DynamicBuffer; +init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) -> + LowDynamicBuffer; +init_dynamic_buffer_size(_) -> + false. + +maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) -> + State; +maybe_resize_buffer(State=#state{transport=Transport, socket=Socket, + opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}}, + dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) -> + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 + DataLen) div 2, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + true -> + State#state{dynamic_buffer_moving_average=MovingAvg} + end. diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl index 5048168..1989512 100644 --- a/src/cowboy_handler.erl +++ b/src/cowboy_handler.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 9c92ec5..10eb519 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -17,6 +17,7 @@ -module(cowboy_http). -export([init/6]). +-export([loop/1]). -export([system_continue/3]). -export([system_terminate/4]). @@ -24,11 +25,16 @@ -type opts() :: #{ active_n => pos_integer(), + alpn_default_protocol => http | http2, chunked => boolean(), compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), env => cowboy_middleware:env(), + hibernate => boolean(), http10_keepalive => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), @@ -47,6 +53,7 @@ metrics_req_filter => fun((cowboy_req:req()) -> map()), metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), middlewares => [module()], + protocols => [http | http2], proxy_header => boolean(), request_timeout => timeout(), reset_idle_timeout_on_send => boolean(), @@ -137,6 +144,10 @@ %% Flow requested for the current stream. flow = infinity :: non_neg_integer() | infinity, + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. out_streamid = 1 :: pos_integer(), @@ -181,12 +192,16 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), last_streamid=maps:get(max_keepalive, Opts, 1000)}, safe_setopts_active(State), - loop(set_timeout(State, request_timeout)). + before_loop(set_timeout(State, request_timeout)). + +-include("cowboy_dynamic_buffer.hrl"). setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> @@ -212,6 +227,13 @@ flush_passive(Socket, Messages) -> ok end. +before_loop(State=#state{opts=#{hibernate := true}}) -> + proc_lib:hibernate(?MODULE, loop, [State]); +before_loop(State) -> + loop(State). + +-spec loop(#state{}) -> ok. + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, last_streamid=LastStreamID}) -> @@ -220,11 +242,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, receive %% Discard data coming in after the last request %% we want to process was received fully. - {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - loop(State); + {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID -> + State1 = maybe_resize_buffer(State, Data), + before_loop(State1); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(<< Buffer/binary, Data/binary >>, State); + State1 = maybe_resize_buffer(State, Data), + parse(<< Buffer/binary, Data/binary >>, State1); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -233,37 +257,37 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> safe_setopts_active(State), - loop(State); + before_loop(State); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), - loop(State); + before_loop(State); {timeout, TimerRef, Reason} -> timeout(State, Reason); {timeout, _, _} -> - loop(State); + before_loop(State); %% System messages. {'EXIT', Parent, shutdown} -> Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, - loop(initiate_closing(State, Reason)); + before_loop(initiate_closing(State, Reason)); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> - loop(info(State, StreamID, Msg)); + before_loop(info(State, StreamID, Msg)); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> - loop(down(State, Pid, Msg)); + before_loop(down(State, Pid, Msg)); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), - loop(State); + before_loop(State); %% Unknown messages. Msg -> cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts), - loop(State) + before_loop(State) after InactivityTimeout -> terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. @@ -295,6 +319,7 @@ set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout) when element(1, InState) =/= ps_body -> State; %% Otherwise we can set the timeout. +%% @todo Don't do this so often, use a strategy similar to Websocket/H2 if possible. set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> State = cancel_timeout(State0), Default = case Name of @@ -327,7 +352,7 @@ cancel_timeout(State=#state{timer=TimerRef}) -> _ -> %% Do a synchronous cancel and remove the message if any %% to avoid receiving stray messages. - _ = erlang:cancel_timer(TimerRef), + _ = erlang:cancel_timer(TimerRef, [{async, false}, {info, false}]), receive {timeout, TimerRef, _} -> ok after 0 -> @@ -348,12 +373,12 @@ timeout(State, idle_timeout) -> 'Connection idle longer than configuration allows.'}). parse(<<>>, State) -> - loop(State#state{buffer= <<>>}); + before_loop(State#state{buffer= <<>>}); %% Do not process requests that come in after the last request %% and discard the buffer if any to save memory. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{}, last_streamid=LastStreamID}) when InStreamID > LastStreamID -> - loop(State#state{buffer= <<>>}); + before_loop(State#state{buffer= <<>>}); parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> after_parse(parse_request(Buffer, State, EmptyLines)); parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> @@ -422,13 +447,13 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer end; %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. -after_parse({data, _, IsFin, _, State}) -> - loop(set_timeout(State, case IsFin of +after_parse({data, _, IsFin, _, State=#state{buffer=Buffer}}) -> + parse(Buffer, set_timeout(State, case IsFin of fin -> request_timeout; nofin -> idle_timeout end)); after_parse({more, State}) -> - loop(set_timeout(State, idle_timeout)). + before_loop(set_timeout(State, idle_timeout)). update_flow(fin, _, State) -> %% This function is only called after parsing, therefore we @@ -488,8 +513,13 @@ parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLine 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'}); %% Accept direct HTTP/2 only at the beginning of the connection. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 -> - %% @todo Might be worth throwing to get a clean stacktrace. - http2_upgrade(State, Buffer); + case lists:member(http2, maps:get(protocols, Opts, [http2, http])) of + true -> + http2_upgrade(State, Buffer); + false -> + error_terminate(501, State, {connection_error, no_error, + 'Prior knowledge upgrade to HTTP/2 is disabled by configuration.'}) + end; _ -> parse_method(Buffer, State, <<>>, maps:get(max_method_length, Opts, 32)) @@ -777,7 +807,7 @@ default_port(_) -> 80. %% End of request parsing. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert, - proxy_header=ProxyHeader, in_streamid=StreamID, in_state= + opts=Opts, proxy_header=ProxyHeader, in_streamid=StreamID, in_state= PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}}, Headers, Host, Port) -> Scheme = case Transport:secure() of @@ -841,7 +871,7 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock undefined -> Req0; _ -> Req0#{proxy_header => ProxyHeader} end, - case is_http2_upgrade(Headers, Version) of + case is_http2_upgrade(Headers, Version, Opts) of false -> State = case HasBody of true -> @@ -863,12 +893,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock %% HTTP/2 upgrade. -%% @todo We must not upgrade to h2c over a TLS connection. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade, - <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') -> + <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1', Opts) -> Conns = cow_http_hd:parse_connection(Conn), - case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of - {true, true} -> + case lists:member(<<"upgrade">>, Conns) + andalso lists:member(<<"http2-settings">>, Conns) + andalso lists:member(http2, maps:get(protocols, Opts, [http2, http])) of + true -> Protocols = cow_http_hd:parse_upgrade(Upgrade), case lists:member(<<"h2c">>, Protocols) of true -> @@ -879,17 +910,17 @@ is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade, _ -> false end; -is_http2_upgrade(_, _) -> +is_http2_upgrade(_, _, _) -> false. %% Prior knowledge upgrade, without an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> case Transport:secure() of false -> _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer); + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer); true -> error_terminate(400, State, {connection_error, protocol_error, 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) @@ -897,22 +928,37 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran %% Upgrade via an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) -> - %% @todo - %% However if the client sent a body, we need to read the body in full - %% and if we can't do that, return a 413 response. Some options are in order. - %% Always half-closed stream coming from this side. - try cow_http_hd:parse_http2_settings(HTTP2Settings) of - Settings -> - _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req) - catch _:_ -> - error_terminate(400, State, {connection_error, protocol_error, - 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) + case Transport:secure() of + false -> + %% @todo + %% However if the client sent a body, we need to read the body in full + %% and if we can't do that, return a 413 response. Some options are in order. + %% Always half-closed stream coming from this side. + try cow_http_hd:parse_http2_settings(HTTP2Settings) of + Settings -> + _ = cancel_timeout(State), + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req) + catch _:_ -> + error_terminate(400, State, {connection_error, protocol_error, + 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) + end; + true -> + error_terminate(400, State, {connection_error, protocol_error, + 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) end. +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) -> + Opts; +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size, + dynamic_buffer_moving_average=MovingAvg}) -> + Opts#{ + dynamic_buffer_initial_average => MovingAvg, + dynamic_buffer_initial_size => Size + }. + %% Request body parsing. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= @@ -1209,7 +1255,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_ commands(State, StreamID, Tail); %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, - out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, + out_state=OutState, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. State1 = cancel_timeout(State0), @@ -1233,23 +1279,19 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% Turn off the trap_exit process flag %% since this process will no longer be a supervisor. process_flag(trap_exit, false), - Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); + Protocol:takeover(Parent, Ref, Socket, Transport, + opts_for_upgrade(State), Buffer, InitialState); %% Set options dynamically. -commands(State0=#state{overriden_opts=Opts}, - StreamID, [{set_options, SetOpts}|Tail]) -> - State1 = case SetOpts of - #{idle_timeout := IdleTimeout} -> - set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, +commands(State0, StreamID, [{set_options, SetOpts}|Tail]) -> + State = maps:fold(fun + (chunked, Chunked, StateF=#state{overriden_opts=Opts}) -> + StateF#state{overriden_opts=Opts#{chunked => Chunked}}; + (idle_timeout, IdleTimeout, StateF=#state{overriden_opts=Opts}) -> + set_timeout(StateF#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, idle_timeout); - _ -> - State0 - end, - State = case SetOpts of - #{chunked := Chunked} -> - State1#state{overriden_opts=Opts#{chunked => Chunked}}; - _ -> - State1 - end, + (_, _, StateF) -> + StateF + end, State0, SetOpts), commands(State, StreamID, Tail); %% Stream shutdown. commands(State, StreamID, [stop|Tail]) -> @@ -1368,23 +1410,24 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta end. stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) -> + %% Enable active mode again if it was disabled. + State1 = case Active of + true -> State0; + false -> active(State0) + end, NextOutStreamID = OutStreamID + 1, case lists:keyfind(NextOutStreamID, #stream.id, Streams) of false -> - State = State0#state{out_streamid=NextOutStreamID, out_state=wait}, + State = State1#state{out_streamid=NextOutStreamID, out_state=wait}, %% There are no streams remaining. We therefore can %% and want to switch back to the request_timeout. set_timeout(State, request_timeout); #stream{queue=Commands} -> - State = case Active of - true -> State0; - false -> active(State0) - end, %% @todo Remove queue from the stream. %% We set the flow to the initial flow size even though %% we might have sent some data through already due to pipelining. Flow = maps:get(initial_stream_flow_size, Opts, 65535), - commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait}, + commands(State1#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait}, NextOutStreamID, Commands) end. @@ -1597,12 +1640,12 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> -spec system_continue(_, _, #state{}) -> ok. system_continue(_, _, State) -> - loop(State). + before_loop(State). -spec system_terminate(any(), _, _, #state{}) -> no_return(). system_terminate(Reason0, _, _, State) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, - loop(initiate_closing(State, Reason)). + before_loop(initiate_closing(State, Reason)). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0e110cd..0d22fa1 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -17,6 +17,7 @@ -export([init/6]). -export([init/10]). -export([init/12]). +-export([loop/2]). -export([system_continue/3]). -export([system_terminate/4]). @@ -24,15 +25,20 @@ -type opts() :: #{ active_n => pos_integer(), + alpn_default_protocol => http | http2, compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), enable_connect_protocol => boolean(), env => cowboy_middleware:env(), goaway_initial_timeout => timeout(), goaway_complete_timeout => timeout(), + hibernate => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, @@ -57,6 +63,7 @@ metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), middlewares => [module()], preface_timeout => timeout(), + protocols => [http | http2], proxy_header => boolean(), reset_idle_timeout_on_send => boolean(), sendfile => boolean(), @@ -133,6 +140,10 @@ %% Flow requested for all streams. flow = 0 :: non_neg_integer(), + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = #{} :: #{cow_http2:streamid() => #stream{}}, @@ -143,7 +154,8 @@ }). -spec init(pid(), ranch:ref(), inet:socket(), module(), - ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. + ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), 'A socket error occurred when retrieving the peer name.'), @@ -167,18 +179,22 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary()) -> ok. + binary() | undefined, binary()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), %% Send the preface before doing all the init in case we get a socket error. ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=sequence, http2_machine=HTTP2Machine}), 0), safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. @@ -213,15 +229,19 @@ add_period(Time, Period) -> Time + Period. -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. + binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts), {ok, StreamID, HTTP2Machine} = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0), State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -237,20 +257,30 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer ok = maybe_socket_error(State, Transport:send(Socket, Preface)), safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. +-include("cowboy_dynamic_buffer.hrl"). + %% Because HTTP/2 has flow control and Cowboy has other rate limiting %% mechanisms implemented, a very large active_n value should be fine, %% as long as the stream handlers do their work in a timely manner. +%% However large active_n values reduce the impact of dynamic_buffer. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> ok = maybe_socket_error(State, setopts_active(State)). +before_loop(State=#state{opts=#{hibernate := true}}, Buffer) -> + proc_lib:hibernate(?MODULE, loop, [State, Buffer]); +before_loop(State, Buffer) -> + loop(State, Buffer). + +-spec loop(#state{}, binary()) -> no_return(). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> Messages = Transport:messages(), @@ -258,7 +288,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); + State1 = maybe_resize_buffer(State, Data), + parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> Reason = case State#state.http2_status of closing -> {stop, closed, 'The client is going away.'}; @@ -271,11 +302,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> safe_setopts_active(State), - loop(State, Buffer); + before_loop(State, Buffer); %% System messages. {'EXIT', Parent, shutdown} -> Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, - loop(initiate_closing(State, Reason), Buffer); + before_loop(initiate_closing(State, Reason), Buffer); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> @@ -285,27 +316,27 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, tick_idle_timeout(State, Buffer); {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), - loop(State, Buffer); + before_loop(State, Buffer); {timeout, TRef, {cow_http2_machine, Name}} -> - loop(timeout(State, Name, TRef), Buffer); + before_loop(timeout(State, Name, TRef), Buffer); {timeout, TimerRef, {goaway_initial_timeout, Reason}} -> - loop(closing(State, Reason), Buffer); + before_loop(closing(State, Reason), Buffer); {timeout, TimerRef, {goaway_complete_timeout, Reason}} -> terminate(State, {stop, stop_reason(Reason), 'Graceful shutdown timed out.'}); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> - loop(info(State, StreamID, Msg), Buffer); + before_loop(info(State, StreamID, Msg), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> - loop(down(State, Pid, Msg), Buffer); + before_loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), - loop(State, Buffer); + before_loop(State, Buffer); Msg -> cowboy:log(warning, "Received stray message ~p.", [Msg], Opts), - loop(State, Buffer) + before_loop(State, Buffer) after InactivityTimeout -> terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. @@ -314,7 +345,7 @@ tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) -> terminate(State, {stop, timeout, 'Connection idle longer than configuration allows.'}); tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) -> - loop(set_idle_timeout(State, TimeoutNum + 1), Buffer). + before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer). set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _) when Status =:= closing_initiated orelse Status =:= closing, @@ -355,7 +386,7 @@ parse(State=#state{http2_status=sequence}, Data) -> {ok, Rest} -> parse(State#state{http2_status=settings}, Rest); more -> - loop(State, Data); + before_loop(State, Data); Error = {connection_error, _, _} -> terminate(State, Error) end; @@ -374,7 +405,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre more when Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); more -> - loop(State, Data) + before_loop(State, Data) end. %% Frame rate flood protection. @@ -1106,7 +1137,9 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> %% in-flight stream creation (at least one round-trip time), the server can send %% another GOAWAY frame with an updated last stream identifier. This ensures %% that a connection can be cleanly shut down without losing requests. + -spec initiate_closing(#state{}, _) -> #state{}. + initiate_closing(State=#state{http2_status=connected, socket=Socket, transport=Transport, opts=Opts}, Reason) -> ok = maybe_socket_error(State, Transport:send(Socket, @@ -1123,7 +1156,9 @@ initiate_closing(State, Reason) -> terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). %% Switch to 'closing' state and stop accepting new streams. + -spec closing(#state{}, Reason :: term()) -> #state{}. + closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> terminate(State, Reason); closing(State0=#state{http2_status=closing_initiated, @@ -1160,6 +1195,7 @@ maybe_socket_error(State, {error, Reason}, Human) -> terminate(State, {socket_error, Reason, Human}). -spec terminate(#state{} | undefined, _) -> no_return(). + terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, @@ -1360,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> %% System callbacks. --spec system_continue(_, _, {#state{}, binary()}) -> ok. +-spec system_continue(_, _, {#state{}, binary()}) -> no_return(). + system_continue(_, _, {State, Buffer}) -> - loop(State, Buffer). + before_loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). + system_terminate(Reason0, _, _, {State, Buffer}) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, - loop(initiate_closing(State, Reason), Buffer). + before_loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. + system_code_change(Misc, _, _, _) -> {ok, Misc}. diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index ef3e3f6..9aa6be5 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2023-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -32,10 +32,10 @@ enable_connect_protocol => boolean(), env => cowboy_middleware:env(), logger => module(), - max_decode_blocked_streams => 0..16#3fffffffffffffff, - max_decode_table_size => 0..16#3fffffffffffffff, - max_encode_blocked_streams => 0..16#3fffffffffffffff, - max_encode_table_size => 0..16#3fffffffffffffff, + max_decode_blocked_streams => 0..16#3fffffffffffffff, + max_decode_table_size => 0..16#3fffffffffffffff, + max_encode_blocked_streams => 0..16#3fffffffffffffff, + max_encode_table_size => 0..16#3fffffffffffffff, max_ignored_frame_size_received => non_neg_integer() | infinity, metrics_callback => cowboy_metrics_h:metrics_callback(), metrics_req_filter => fun((cowboy_req:req()) -> map()), @@ -51,18 +51,30 @@ }. -export_type([opts/0]). +%% HTTP/3 or WebTransport stream. +%% +%% WebTransport sessions involve one bidirectional CONNECT stream +%% that must stay open (and can be used for signaling using the +%% Capsule Protocol) and an application-defined number of +%% unidirectional and bidirectional streams, as well as datagrams. +%% +%% WebTransport sessions run in the CONNECT request process and +%% all events related to the session is sent there as a message. +%% The pid of the process is kept in the state. -record(stream, { id :: cow_http3:stream_id(), %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} - | normal | {data | ignore, non_neg_integer()} | stopping, + | normal | {data | ignore, non_neg_integer()} | stopping + | {webtransport_session, normal | {ignore, non_neg_integer()}} + | {webtransport_stream, cow_http3:stream_id()}, %% Stream buffer. buffer = <<>> :: binary(), %% Stream state. - state = undefined :: undefined | {module, any()} + state = undefined :: undefined | {module(), any()} }). -record(state, { @@ -152,6 +164,9 @@ loop(State0=#state{opts=Opts, children=Children}) -> %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State0, StreamID, Msg)); + %% WebTransport commands. + {'$webtransport_commands', SessionID, Commands} -> + loop(webtransport_commands(State0, SessionID, Commands)); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> loop(down(State0, Pid, Msg)); @@ -164,12 +179,17 @@ handle_quic_msg(State0=#state{opts=Opts}, Msg) -> case cowboy_quicer:handle(Msg) of {data, StreamID, IsFin, Data} -> parse(State0, StreamID, Data, IsFin); + {datagram, Data} -> + parse_datagram(State0, Data); {stream_started, StreamID, StreamType} -> State = stream_new_remote(State0, StreamID, StreamType), loop(State); {stream_closed, StreamID, ErrorCode} -> State = stream_closed(State0, StreamID, ErrorCode), loop(State); + {peer_send_shutdown, StreamID} -> + State = stream_peer_send_shutdown(State0, StreamID), + loop(State); closed -> %% @todo Different error reason if graceful? Reason = {socket_error, closed, 'The socket has been closed.'}, @@ -216,6 +236,56 @@ parse1(State=#state{http3_machine=HTTP3Machine0}, {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State#state{http3_machine=HTTP3Machine}, Error) end; +%% @todo Handle when IsFin = fin which must terminate the WT session. +parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status= + {webtransport_session, normal}}, Data, IsFin) -> + case cow_capsule:parse(Data) of + {ok, wt_drain_session, Rest} -> + webtransport_event(State, SessionID, close_initiated), + parse1(State, Stream, Rest, IsFin); + {ok, {wt_close_session, AppCode, AppMsg}, Rest} -> + %% This event will be handled specially and lead + %% to the termination of the session process. + webtransport_event(State, SessionID, {closed, AppCode, AppMsg}), + %% Shutdown the CONNECT stream immediately. + cowboy_quicer:shutdown_stream(Conn, SessionID), + %% @todo Will we receive a {stream_closed,...} after that? + %% If any data is received past that point this is an error. + %% @todo Don't crash, error out properly. + <<>> = Rest, + loop(webtransport_terminate_session(State, Stream)); + more -> + loop(stream_store(State, Stream#stream{buffer=Data})); + %% Ignore unhandled/unknown capsules. + %% @todo Do this when cow_capsule includes some. +% {ok, _, Rest} -> +% parse1(State, Stream, Rest, IsFin); +% {ok, Rest} -> +% parse1(State, Stream, Rest, IsFin); + %% @todo Make the max length configurable? + {skip, Len} when Len =< 8192 -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len}}})); + {skip, Len} -> + %% @todo What should be done on capsule error? + error({todo, capsule_too_long, Len}); + error -> + %% @todo What should be done on capsule error? + error({todo, capsule_error, Data}) + end; +parse1(State, Stream=#stream{status= + {webtransport_session, {ignore, Len}}}, Data, IsFin) -> + case Data of + <<_:Len/unit:8, Rest/bits>> -> + parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin); + _ -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len - byte_size(Data)}}})) + end; +parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) -> + webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}), + %% No need to store the stream again, WT streams don't get changed here. + loop(State); parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) -> DataLen = byte_size(Data), if @@ -246,6 +316,9 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> {ok, Frame, Rest} -> FrameIsFin = is_fin(IsFin, Rest), parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin); + %% The WebTransport stream header is not a real frame. + {webtransport_stream_header, SessionID, Rest} -> + become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin); {more, Frame = {data, _}, Len} -> %% We're at the end of the data so FrameIsFin is equivalent to IsFin. case IsFin of @@ -317,13 +390,24 @@ parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, {error, Error={connection_error, _, _}, HTTP3Machine} -> terminate(State0#state{http3_machine=HTTP3Machine}, Error) end; + %% @todo Perhaps do this in cow_http3_machine directly. {ok, push, _} -> terminate(State0, {connection_error, h3_stream_creation_error, 'Only servers can push. (RFC9114 6.2.2)'}); + {ok, {webtransport, SessionID}, Rest} -> + become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin); %% Unknown stream types must be ignored. We choose to abort the %% stream instead of reading and discarding the incoming data. {undefined, _} -> - loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)) + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + %% Very unlikely to happen but WebTransport headers may be fragmented + %% as they are more than one byte. The fin flag in this case is an error, + %% but because it happens in WebTransport application data (the Session ID) + %% we only reset the impacted stream and not the entire connection. + more when IsFin =:= fin -> + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + more -> + loop(stream_store(State0, Stream0#stream{buffer=Data})) end. frame(State=#state{http3_machine=HTTP3Machine0}, @@ -449,6 +533,8 @@ headers_to_map([{Name, Value}|Tail], Acc0) -> end, headers_to_map(Tail, Acc). +%% @todo WebTransport CONNECT requests must have extra checks on settings. +%% @todo We may also need to defer them if we didn't get settings. headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> @@ -488,6 +574,18 @@ early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, send_headers(State0, Stream, fin, StatusCode0, RespHeaders0) end. +%% Datagrams. + +parse_datagram(State, Data0) -> + {SessionID, Data} = cow_http3:parse_datagram(Data0), + case stream_get(State, SessionID) of + #stream{status={webtransport_session, _}} -> + webtransport_event(State, SessionID, {datagram, Data}), + loop(State); + _ -> + error(todo) %% @todo Might be a future WT session or an error. + end. + %% Erlang messages. down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> @@ -654,6 +752,22 @@ commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) -> %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. commands(State0, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + #state{http3_machine=HTTP3Machine0} = State, + Stream1 = #stream{state=StreamState} = stream_get(State, StreamID), + %% The stream becomes a WT session at that point. It is the + %% parent stream of all streams in this WT session. The + %% cowboy_stream state is kept because it will be needed + %% to terminate the stream properly. + HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0), + Stream = Stream1#stream{ + status={webtransport_session, normal}, + state={cowboy_webtransport, WTState#{stream_state => StreamState}} + }, + %% @todo We must propagate the buffer to capsule handling if any. + commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); +commands(State0, Stream0=#stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), Stream = stream_get(State, StreamID), @@ -758,6 +872,157 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID}, cowboy_quicer:send(Conn, EncoderID, EncData)), State. +%% We mark the stream as being a WebTransport stream +%% and then continue parsing the data as a WebTransport +%% stream. This function is common for incoming unidi +%% and bidi streams. +become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) -> + case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={webtransport_stream, SessionID}}, + webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}), + %% We don't need to parse the remaining data if there isn't any. + case {Rest, IsFin} of + {<<>>, nofin} -> loop(stream_store(State, Stream)); + _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin) + end + %% @todo Error conditions. + end. + +webtransport_event(State, SessionID, Event) -> + #stream{ + status={webtransport_session, _}, + state={cowboy_webtransport, #{session_pid := SessionPid}} + } = stream_get(State, SessionID), + SessionPid ! {'$webtransport_event', SessionID, Event}, + ok. + +webtransport_commands(State, SessionID, Commands) -> + case stream_get(State, SessionID) of + Session = #stream{status={webtransport_session, _}} -> + wt_commands(State, Session, Commands); + %% The stream has been terminated, ignore pending commands. + error -> + State + end. + +wt_commands(State, _, []) -> + State; +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) -> + %% Because opening the stream involves sending a short header + %% we necessarily write data. The InitialData variable allows + %% providing additional data to be sent in the same packet. + StartF = case StreamType of + bidi -> start_bidi_stream; + unidi -> start_unidi_stream + end, + Header = cow_http3:webtransport_stream_header(SessionID, StreamType), + case cowboy_quicer:StartF(Conn, [Header, InitialData]) of + {ok, StreamID} -> + %% @todo Pass Session directly? + webtransport_event(State0, SessionID, + {opened_stream_id, OpenStreamRef, StreamID}), + State = stream_new_local(State0, StreamID, StreamType, + {webtransport_stream, SessionID}), + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{send, datagram, Data}|Tail]) -> + case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) -> + %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream. + Capsule = cow_capsule:wt_drain_session(), + case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) + when Cmd =:= close; element(1, Cmd) =:= close -> + %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream. + {AppCode, AppMsg} = case Cmd of + close -> {0, <<>>}; + {close, AppCode0} -> {AppCode0, <<>>}; + {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0} + end, + Capsule = cow_capsule:wt_close_session(AppCode, AppMsg), + case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of + ok -> + State = webtransport_terminate_session(State0, Session), + %% @todo Because the handler is in a separate process + %% we must wait for it to stop and eventually + %% kill the process if it takes too long. + %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it). + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end. + +webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0, + streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> + %% Reset/abort the WT streams. + Streams = maps:filtermap(fun + (_, #stream{id=StreamID, status={webtransport_session, _}}) + when StreamID =:= SessionID -> + %% We remove the session stream but do the shutdown outside this function. + false; + (StreamID, #stream{status={webtransport_stream, StreamSessionID}}) + when StreamSessionID =:= SessionID -> + cowboy_quicer:shutdown_stream(Conn, StreamID, + both, cow_http3:error_to_code(wt_session_gone)), + false; + (_, _) -> + true + end, Streams0), + %% Keep the streams in lingering state. + %% We only keep up to 100 streams in this state. @todo Make it configurable? + Terminated = maps:keys(Streams0) -- maps:keys(Streams), + Lingering = lists:sublist(Terminated ++ Lingering0, 100), + %% Update the HTTP3 state machine. + HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0), + State#state{ + http3_machine=HTTP3Machine, + streams=Streams, + lingering_streams=Lingering + }. + +stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) -> + case stream_get(State, StreamID) of + %% Cleanly terminating the CONNECT stream is equivalent + %% to an application error code of 0 and empty message. + Stream = #stream{status={webtransport_session, _}} -> + webtransport_event(State, StreamID, {closed, 0, <<>>}), + %% Shutdown the CONNECT stream fully. + cowboy_quicer:shutdown_stream(Conn, StreamID), + webtransport_terminate_session(State, Stream); + _ -> + State + end. + reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, Stream=#stream{id=StreamID}, Error) -> Reason = case Error of @@ -903,15 +1168,25 @@ terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reas stream_get(#state{streams=Streams}, StreamID) -> maps:get(StreamID, Streams, error). -stream_new_remote(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, - StreamID, StreamType) -> +stream_new_local(State, StreamID, StreamType, Status) -> + stream_new(State, StreamID, StreamType, unidi_local, Status). + +stream_new_remote(State, StreamID, StreamType) -> + Status = case StreamType of + unidi -> header; + bidi -> normal + end, + stream_new(State, StreamID, StreamType, unidi_remote, Status). + +stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, + StreamID, StreamType, UnidiType, Status) -> {HTTP3Machine, Status} = case StreamType of unidi -> - {cow_http3_machine:init_unidi_stream(StreamID, unidi_remote, HTTP3Machine0), - header}; + {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0), + Status}; bidi -> {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0), - normal} + Status} end, Stream = #stream{id=StreamID, status=Status}, State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. @@ -926,6 +1201,11 @@ stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) -> stream_closed(State=#state{opts=Opts, streams=Streams0, children=Children0}, StreamID, ErrorCode) -> case maps:take(StreamID, Streams0) of + %% In the WT session's case, streams will be + %% removed in webtransport_terminate_session. + {Stream=#stream{status={webtransport_session, _}}, _} -> + webtransport_event(State, StreamID, closed_abruptly), + webtransport_terminate_session(State, Stream); {#stream{state=undefined}, Streams} -> %% Unidi stream has no handler/children. stream_closed1(State#state{streams=Streams}, StreamID); diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl index 6859c82..629d06e 100644 --- a/src/cowboy_loop.erl +++ b/src/cowboy_loop.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl index 27f14d4..67bf1a6 100644 --- a/src/cowboy_metrics_h.erl +++ b/src/cowboy_metrics_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_middleware.erl b/src/cowboy_middleware.erl index efeef4f..97c1498 100644 --- a/src/cowboy_middleware.erl +++ b/src/cowboy_middleware.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index d9bbe1f..aa52fae 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2023, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -23,9 +23,12 @@ -export([shutdown/2]). %% Streams. +-export([start_bidi_stream/2]). -export([start_unidi_stream/2]). -export([send/3]). -export([send/4]). +-export([send_datagram/2]). +-export([shutdown_stream/2]). -export([shutdown_stream/4]). %% Messages. @@ -45,6 +48,9 @@ peercert(_) -> no_quicer(). -spec shutdown(_, _) -> no_return(). shutdown(_, _) -> no_quicer(). +-spec start_bidi_stream(_, _) -> no_return(). +start_bidi_stream(_, _) -> no_quicer(). + -spec start_unidi_stream(_, _) -> no_return(). start_unidi_stream(_, _) -> no_quicer(). @@ -54,6 +60,12 @@ send(_, _, _) -> no_quicer(). -spec send(_, _, _, _) -> no_return(). send(_, _, _, _) -> no_quicer(). +-spec send_datagram(_, _) -> no_return(). +send_datagram(_, _) -> no_quicer(). + +-spec shutdown_stream(_, _) -> no_return(). +shutdown_stream(_, _) -> no_quicer(). + -spec shutdown_stream(_, _, _, _) -> no_return(). shutdown_stream(_, _, _, _) -> no_quicer(). @@ -109,16 +121,26 @@ shutdown(Conn, ErrorCode) -> %% Streams. +-spec start_bidi_stream(quicer_connection_handle(), iodata()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +start_bidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE). + -spec start_unidi_stream(quicer_connection_handle(), iodata()) -> {ok, cow_http3:stream_id()} | {error, any()}. -start_unidi_stream(Conn, HeaderData) -> +start_unidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL). + +start_stream(Conn, InitialData, OpenFlag) -> case quicer:start_stream(Conn, #{ active => true, - open_flag => ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL}) of + open_flag => OpenFlag}) of {ok, StreamRef} -> - case quicer:send(StreamRef, HeaderData) of + case quicer:send(StreamRef, InitialData) of {ok, _} -> {ok, StreamID} = quicer:get_stream_id(StreamRef), put({quicer_stream, StreamID}, StreamRef), @@ -156,6 +178,29 @@ send(_Conn, StreamID, Data, IsFin) -> send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE; send_flag(fin) -> ?QUIC_SEND_FLAG_FIN. +-spec send_datagram(quicer_connection_handle(), iodata()) + -> ok | {error, any()}. + +send_datagram(Conn, Data) -> + %% @todo Fix/ignore the Dialyzer error instead of doing this. + DataBin = iolist_to_binary(Data), + Size = byte_size(DataBin), + case quicer:send_dgram(Conn, DataBin) of + {ok, Size} -> + ok; + %% @todo Handle error cases. + Error -> + Error + end. + +-spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id()) + -> ok. + +shutdown_stream(_Conn, StreamID) -> + StreamRef = get({quicer_stream, StreamID}), + _ = quicer:shutdown_stream(StreamRef), + ok. + -spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id(), both | receiving, quicer_app_errno()) -> ok. @@ -165,6 +210,7 @@ shutdown_stream(_Conn, StreamID, Dir, ErrorCode) -> _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity), ok. +%% @todo Are these flags correct for what we want? shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT; shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. @@ -173,9 +219,11 @@ shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. %% @todo Probably should have the Conn given as argument too? -spec handle({quic, _, _, _}) -> {data, cow_http3:stream_id(), cow_http:fin(), binary()} + | {datagram, binary()} | {stream_started, cow_http3:stream_id(), unidi | bidi} | {stream_closed, cow_http3:stream_id(), quicer_app_errno()} | closed + | {peer_send_shutdown, cow_http3:stream_id()} | ok | unknown | {socket_error, any()}. @@ -187,6 +235,9 @@ handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) -> _ -> nofin end, {data, StreamID, IsFin, Data}; +%% @todo Match on Conn. +handle({quic, Data, _Conn, Flags}) when is_binary(Data), is_integer(Flags) -> + {datagram, Data}; %% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED. handle({quic, new_stream, StreamRef, #{flags := Flags}}) -> case quicer:setopt(StreamRef, active, true) of @@ -219,8 +270,9 @@ handle({quic, dgram_state_changed, _Conn, _Props}) -> %% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT handle({quic, transport_shutdown, _Conn, _Flags}) -> ok; -handle({quic, peer_send_shutdown, _StreamRef, undefined}) -> - ok; +handle({quic, peer_send_shutdown, StreamRef, undefined}) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {peer_send_shutdown, StreamID}; handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) -> ok; handle({quic, shutdown, _Conn, success}) -> diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 3f87677..550054e 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> -%% Copyright (c) 2011, Anthony Ramine <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% Copyright (c) Anthony Ramine <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -445,6 +445,7 @@ parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_webs parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1; parse_header_fun(<<"trailer">>) -> fun cow_http_hd:parse_trailer/1; parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1; +parse_header_fun(<<"wt-available-protocols">>) -> fun cow_http_hd:parse_wt_available_protocols/1; parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1. parse_header(Name, Req, Default, ParseFun) -> @@ -462,7 +463,7 @@ filter_cookies(Names0, Req=#{headers := Headers}) -> case header(<<"cookie">>, Req) of undefined -> Req; Value0 -> - Cookies0 = binary:split(Value0, <<$;>>), + Cookies0 = binary:split(Value0, <<$;>>, [global]), Cookies = lists:filter(fun(Cookie) -> lists:member(cookie_name(Cookie), Names) end, Cookies0), @@ -726,8 +727,10 @@ set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) -> set_resp_header(Name,Value, Req) -> Req#{resp_headers => #{Name => Value}}. --spec set_resp_headers(cowboy:http_headers(), Req) +-spec set_resp_headers(cowboy:http_headers() | [{binary(), iodata()}], Req) -> Req when Req::req(). +set_resp_headers(Headers, Req) when is_list(Headers) -> + set_resp_headers_list(Headers, Req, #{}); set_resp_headers(#{<<"set-cookie">> := _}, _) -> exit({response_error, invalid_header, 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); @@ -736,6 +739,19 @@ set_resp_headers(Headers, Req=#{resp_headers := RespHeaders}) -> set_resp_headers(Headers, Req) -> Req#{resp_headers => Headers}. +set_resp_headers_list([], Req, Acc) -> + set_resp_headers(Acc, Req); +set_resp_headers_list([{<<"set-cookie">>, _}|_], _, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); +set_resp_headers_list([{Name, Value}|Tail], Req, Acc) -> + case Acc of + #{Name := ValueAcc} -> + set_resp_headers_list(Tail, Req, Acc#{Name => [ValueAcc, <<", ">>, Value]}); + _ -> + set_resp_headers_list(Tail, Req, Acc#{Name => Value}) + end. + -spec resp_header(binary(), req()) -> binary() | undefined. resp_header(Name, Req) -> resp_header(Name, Req, undefined). diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index fcea71c..9f30fcf 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -246,9 +246,6 @@ handler :: atom(), handler_state :: any(), - %% Allowed methods. Only used for OPTIONS requests. - allowed_methods :: [binary()] | undefined, - %% Media type. content_types_p = [] :: [{binary() | {binary(), binary(), [{binary(), binary()}] | '*'}, @@ -307,17 +304,17 @@ known_methods(Req, State=#state{method=Method}) -> Method =:= <<"POST">>; Method =:= <<"PUT">>; Method =:= <<"PATCH">>; Method =:= <<"DELETE">>; Method =:= <<"OPTIONS">> -> - next(Req, State, fun uri_too_long/2); + uri_too_long(Req, State); no_call -> - next(Req, State, 501); + respond(Req, State, 501); {stop, Req2, State2} -> terminate(Req2, State2); {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {List, Req2, State2} -> case lists:member(Method, List) of - true -> next(Req2, State2, fun uri_too_long/2); - false -> next(Req2, State2, 501) + true -> uri_too_long(Req2, State2); + false -> respond(Req2, State2, 501) end end. @@ -327,39 +324,26 @@ uri_too_long(Req, State) -> %% allowed_methods/2 should return a list of binary methods. allowed_methods(Req, State=#state{method=Method}) -> case call(Req, State, allowed_methods) of - no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">> -> - next(Req, State, fun malformed_request/2); - no_call when Method =:= <<"OPTIONS">> -> - next(Req, State#state{allowed_methods= - [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]}, - fun malformed_request/2); + no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">>; Method =:= <<"OPTIONS">> -> + Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req), + malformed_request(Req2, State); no_call -> - method_not_allowed(Req, State, - [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]); + Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req), + respond(Req2, State, 405); {stop, Req2, State2} -> terminate(Req2, State2); {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {List, Req2, State2} -> + Req3 = cowboy_req:set_resp_header(<<"allow">>, cow_http_hd:allow(List), Req2), case lists:member(Method, List) of - true when Method =:= <<"OPTIONS">> -> - next(Req2, State2#state{allowed_methods=List}, - fun malformed_request/2); true -> - next(Req2, State2, fun malformed_request/2); + malformed_request(Req3, State2); false -> - method_not_allowed(Req2, State2, List) + respond(Req3, State2, 405) end end. -method_not_allowed(Req, State, []) -> - Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req), - respond(Req2, State, 405); -method_not_allowed(Req, State, Methods) -> - << ", ", Allow/binary >> = << << ", ", M/binary >> || M <- Methods >>, - Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req), - respond(Req2, State, 405). - malformed_request(Req, State) -> expect(Req, State, malformed_request, false, fun is_authorized/2, 400). @@ -413,16 +397,10 @@ valid_entity_length(Req, State) -> %% If you need to add additional headers to the response at this point, %% you should do it directly in the options/2 call using set_resp_headers. -options(Req, State=#state{allowed_methods=Methods, method= <<"OPTIONS">>}) -> +options(Req, State=#state{method= <<"OPTIONS">>}) -> case call(Req, State, options) of - no_call when Methods =:= [] -> - Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req), - respond(Req2, State, 200); no_call -> - << ", ", Allow/binary >> - = << << ", ", M/binary >> || M <- Methods >>, - Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req), - respond(Req2, State, 200); + respond(Req, State, 200); {stop, Req2, State2} -> terminate(Req2, State2); {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> @@ -471,7 +449,7 @@ content_types_provided(Req, State) -> {[], Req2, State2} -> not_acceptable(Req2, State2); {CTP, Req2, State2} -> - CTP2 = [normalize_content_types(P) || P <- CTP], + CTP2 = [normalize_content_types(P, provide) || P <- CTP], State3 = State2#state{content_types_p=CTP2}, try cowboy_req:parse_header(<<"accept">>, Req2) of undefined -> @@ -491,10 +469,14 @@ content_types_provided(Req, State) -> end end. -normalize_content_types({ContentType, Callback}) +normalize_content_types({ContentType, Callback}, _) when is_binary(ContentType) -> {cow_http_hd:parse_content_type(ContentType), Callback}; -normalize_content_types(Normalized) -> +normalize_content_types(Normalized = {{Type, SubType, _}, _}, _) + when is_binary(Type), is_binary(SubType) -> + Normalized; +%% Wildcard for content_types_accepted. +normalize_content_types(Normalized = {'*', _}, accept) -> Normalized. prioritize_accept(Accept) -> @@ -1059,7 +1041,7 @@ accept_resource(Req, State) -> {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {CTA, Req2, State2} -> - CTA2 = [normalize_content_types(P) || P <- CTA], + CTA2 = [normalize_content_types(P, accept) || P <- CTA], try cowboy_req:parse_header(<<"content-type">>, Req2) of %% We do not match against the boundary parameter for multipart. {Type = <<"multipart">>, SubType, Params} -> @@ -1099,9 +1081,9 @@ process_content_type(Req, State=#state{method=Method, exists=Exists}, Fun) -> {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {true, Req2, State2} when Exists -> - next(Req2, State2, fun has_resp_body/2); + has_resp_body(Req2, State2); {true, Req2, State2} -> - next(Req2, State2, fun maybe_created/2); + maybe_created(Req2, State2); {false, Req2, State2} -> respond(Req2, State2, 400); {{created, ResURL}, Req2, State2} when Method =:= <<"POST">> -> @@ -1640,5 +1622,6 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, Class, erlang:raise(Class, Reason, Stacktrace). terminate(Req, #state{handler=Handler, handler_state=HandlerState}) -> + %% @todo I don't think the result is used anywhere? Result = cowboy_handler:terminate(normal, Req, HandlerState, Handler), {ok, Req, Result}. diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl index 61c9012..393d82d 100644 --- a/src/cowboy_router.erl +++ b/src/cowboy_router.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl index a185ef1..ce34b01 100644 --- a/src/cowboy_static.erl +++ b/src/cowboy_static.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]> -%% Copyright (c) 2011, Magnus Klaar <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% Copyright (c) Magnus Klaar <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -29,7 +29,7 @@ -type extra_charset() :: {charset, module(), function()} | {charset, binary()}. -type extra_etag() :: {etag, module(), function()} | {etag, false}. -type extra_mimetypes() :: {mimetypes, module(), function()} - | {mimetypes, binary() | {binary(), binary(), [{binary(), binary()}]}}. + | {mimetypes, binary() | {binary(), binary(), '*' | [{binary(), binary()}]}}. -type extra() :: [extra_charset() | extra_etag() | extra_mimetypes()]. -type opts() :: {file | dir, string() | binary()} | {file | dir, string() | binary(), extra()} @@ -332,7 +332,7 @@ forbidden(Req, State) -> %% Detect the mimetype of the file. -spec content_types_provided(Req, State) - -> {[{binary(), get_file}], Req, State} + -> {[{binary() | {binary(), binary(), '*' | [{binary(), binary()}]}, get_file}], Req, State} when State::state(). content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) -> case lists:keyfind(mimetypes, 1, Extra) of @@ -347,7 +347,7 @@ content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) -> %% Detect the charset of the file. -spec charsets_provided(Req, State) - -> {[binary()], Req, State} + -> {[binary()], Req, State} | no_call when State::state(). charsets_provided(Req, State={Path, _, Extra}) -> case lists:keyfind(charset, 1, Extra) of @@ -381,7 +381,7 @@ resource_exists(Req, State) -> %% Generate an etag for the file. -spec generate_etag(Req, State) - -> {{strong | weak, binary()}, Req, State} + -> {{strong | weak, binary() | undefined}, Req, State} when State::state(). generate_etag(Req, State={Path, {_, #file_info{size=Size, mtime=Mtime}}, Extra}) -> @@ -408,7 +408,7 @@ last_modified(Req, State={_, {_, #file_info{mtime=Modified}}, _}) -> %% Stream the file. -spec get_file(Req, State) - -> {{sendfile, 0, non_neg_integer(), binary()}, Req, State} + -> {{sendfile, 0, non_neg_integer(), binary()} | binary(), Req, State} when State::state(). get_file(Req, State={Path, {direct, #file_info{size=Size}}, _}) -> {{sendfile, 0, Size, Path}, Req, State}; diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl index 6ceb5ba..6680bdc 100644 --- a/src/cowboy_stream.erl +++ b/src/cowboy_stream.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -49,6 +49,7 @@ -type reason() :: normal | switch_protocol | {internal_error, timeout | {error | exit | throw, any()}, human_reason()} | {socket_error, closed | atom(), human_reason()} + %% @todo Or cow_http3:error(). | {stream_error, cow_http2:error(), human_reason()} | {connection_error, cow_http2:error(), human_reason()} | {stop, cow_http2:frame() | {exit, any()}, human_reason()}. diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index b373344..3c3c084 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2016-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -138,7 +138,7 @@ info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, stop ], State); -info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> +info(StreamID, Exit={'EXIT', Pid, Reason}, State=#state{ref=Ref, pid=Pid}) -> Commands0 = [{internal_error, Exit, 'Stream process crashed.'}], Commands = case Reason of normal -> Commands0; @@ -146,9 +146,8 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p {shutdown, _} -> Commands0; _ -> [{log, error, "Ranch listener ~p, connection process ~p, stream ~p " - "had its request process ~p exit with reason " - "~999999p and stacktrace ~999999p~n", - [Ref, self(), StreamID, Pid, Reason, Stacktrace]} + "had its request process ~p exit with reason ~0p~n", + [Ref, self(), StreamID, Pid, Reason]} |Commands0] end, %% @todo We are trying to send a 500 response before resetting diff --git a/src/cowboy_sub_protocol.erl b/src/cowboy_sub_protocol.erl index 062fd38..1f24d00 100644 --- a/src/cowboy_sub_protocol.erl +++ b/src/cowboy_sub_protocol.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2013-2024, Loïc Hoguin <[email protected]> -%% Copyright (c) 2013, James Fish <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% Copyright (c) James Fish <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_sup.erl b/src/cowboy_sup.erl index e37f4cf..224ef7d 100644 --- a/src/cowboy_sup.erl +++ b/src/cowboy_sup.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl index 60ab2ed..6d0dcd3 100644 --- a/src/cowboy_tls.erl +++ b/src/cowboy_tls.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -39,7 +39,11 @@ connection_process(Parent, Ref, Transport, Opts) -> {ok, <<"h2">>} -> init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http2); _ -> %% http/1.1 or no protocol negotiated. - init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http) + Protocol = case maps:get(alpn_default_protocol, Opts, http) of + http -> cowboy_http; + http2 -> cowboy_http2 + end, + init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) end. init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) -> diff --git a/src/cowboy_tracer_h.erl b/src/cowboy_tracer_h.erl index b1196fe..91a431b 100644 --- a/src/cowboy_tracer_h.erl +++ b/src/cowboy_tracer_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 577de47..cb30c3f 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -69,6 +69,9 @@ active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()), @@ -97,6 +100,11 @@ timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, messages = undefined :: undefined | {atom(), atom(), atom()} | {atom(), atom(), atom(), atom()}, + + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size = false :: pos_integer() | false, + dynamic_buffer_moving_average = 0 :: non_neg_integer(), + hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, module() | undefined, any(), binary(), {#state{}, any()}) -> no_return(). -takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, - {State0=#state{handler=Handler, req=Req}, HandlerState}) -> +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, + {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> case Req of #{version := 'HTTP/3'} -> ok; %% @todo We should have an option to disable this behavior. @@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, end, State = set_idle_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Messages}, 0), + opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)}, + key=undefined, messages=Messages, + %% Dynamic buffer only applies to HTTP/1.1 Websocket. + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. +-include("cowboy_dynamic_buffer.hrl"). + +%% @todo Implement early socket error detection. +maybe_socket_error(_, _) -> + ok. + after_init(State=#state{active=true}, HandlerState, ParseState) -> %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. %% We must do this only after calling websocket_init/1 (if any) @@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) -> setopts_active(#state{transport=undefined}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> @@ -384,6 +402,7 @@ before_loop(State, HandlerState, ParseState) -> -spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}. +%% @todo Do we really need this for HTTP/2? set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> %% Most of the time we don't need to cancel the timer since it %% will have triggered already. But this call is harmless so @@ -391,7 +410,7 @@ set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> %% options are changed dynamically. _ = case PrevRef of undefined -> ignore; - PrevRef -> erlang:cancel_timer(PrevRef) + PrevRef -> erlang:cancel_timer(PrevRef, [{async, true}, {info, false}]) end, case maps:get(idle_timeout, Opts, 60000) of infinity -> @@ -414,7 +433,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, receive %% Socket messages. (HTTP/1.1) {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); + State1 = maybe_resize_buffer(State, Data), + parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -480,12 +500,16 @@ parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions websocket_close(State, HandlerState, {error, badframe}) end. -parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, +parse_payload(State=#state{opts=Opts, frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, HandlerState, ParseState=#ps_payload{ type=Type, len=Len, mask_key=MaskKey, rsv=Rsv, unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) -> + MaxFrameSize = case maps:get(max_frame_size, Opts, infinity) of + infinity -> infinity; + MaxFrameSize0 -> MaxFrameSize0 - UnmaskedLen + end, case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen, - Type, Len, FragState, Extensions, Rsv) of + Type, Len, FragState, Extensions#{max_inflate_size => MaxFrameSize}, Rsv) of {ok, CloseCode, Payload, Utf8State, Rest} -> dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState, ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>, @@ -615,14 +639,16 @@ commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_b commands(Tail, State#state{active=Active}, Data); commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> commands(Tail, State#state{deflate=Deflate}, Data); -commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) -> - State = case SetOpts of - #{idle_timeout := IdleTimeout} -> +commands([{set_options, SetOpts}|Tail], State0, Data) -> + State = maps:fold(fun + (idle_timeout, IdleTimeout, StateF=#state{opts=Opts}) -> %% We reset the number of ticks when changing the idle_timeout option. - set_idle_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0); - _ -> - State0 - end, + set_idle_timeout(StateF#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0); + (max_frame_size, MaxFrameSize, StateF=#state{opts=Opts}) -> + StateF#state{opts=Opts#{max_frame_size => MaxFrameSize}}; + (_, _, StateF) -> + StateF + end, State0, SetOpts), commands(Tail, State, Data); commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) -> commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data); diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl new file mode 100644 index 0000000..8c8ca39 --- /dev/null +++ b/src/cowboy_webtransport.erl @@ -0,0 +1,292 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @todo To enable WebTransport the following options need to be set: +%% +%% QUIC: +%% - max_datagram_frame_size > 0 +%% +%% HTTP/3: +%% - SETTINGS_H3_DATAGRAM = 1 +%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1 +%% - SETTINGS_WT_MAX_SESSIONS >= 1 + +%% Cowboy supports versions 07 through 13 of the WebTransport drafts. +%% Cowboy also has some compatibility with version 02. +%% +%% WebTransport CONNECT requests go through cowboy_stream as normal +%% and then an upgrade/switch_protocol is issued (just like Websocket). +%% After that point none of the events go through cowboy_stream except +%% the final terminate event. The request process becomes the process +%% handling all events in the WebTransport session. +%% +%% WebTransport sessions can be ended via a command, via a crash or +%% exit, via the closing of the connection (client or server inititated), +%% via the client ending the session (mirroring the command) or via +%% the client terminating the CONNECT stream. +-module(cowboy_webtransport). + +-export([upgrade/4]). +-export([upgrade/5]). + +%% cowboy_stream. +-export([info/3]). +-export([terminate/3]). + +-type stream_type() :: unidi | bidi. +-type open_stream_ref() :: any(). + +-type event() :: + {stream_open, cow_http3:stream_id(), stream_type()} | + {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} | + {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} | + {datagram, binary()} | + close_initiated. + +-type commands() :: [ + {open_stream, open_stream_ref(), stream_type(), iodata()} | + {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} | + {send, cow_http3:stream_id() | datagram, iodata()} | + initiate_close | + close | + {close, cow_http3:wt_app_error_code()} | + {close, cow_http3:wt_app_error_code(), iodata()} +]. +-export_type([commands/0]). + +-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}. + +-callback init(Req, any()) + -> {ok | module(), Req, any()} + | {module(), Req, any(), any()} + when Req::cowboy_req:req(). + +-callback webtransport_init(State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_init/1]). + +-callback webtransport_handle(event(), State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_handle/2]). + +-callback webtransport_info(any(), State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_info/2]). + +-callback terminate(any(), cowboy_req:req(), any()) -> ok. +-optional_callbacks([terminate/3]). + +-type opts() :: #{ + req_filter => fun((cowboy_req:req()) -> map()) +}. +-export_type([opts/0]). + +-record(state, { + id :: cow_http3:stream_id(), + parent :: pid(), + opts = #{} :: opts(), + handler :: module(), + hibernate = false :: boolean(), + req = #{} :: map() +}). + +%% This function mirrors a similar function for Websocket. + +-spec is_upgrade_request(cowboy_req:req()) -> boolean(). + +is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol}) + when Version =:= 'HTTP/3' -> + %% @todo scheme MUST BE "https" + <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol); + +is_upgrade_request(_) -> + false. + +%% Stream process. + +-spec upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +-spec upgrade(Req, Env, module(), any(), opts()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +%% @todo Immediately crash if a response has already been sent. +upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) -> + FilteredReq = case maps:get(req_filter, Opts, undefined) of + undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req); + FilterFun -> FilterFun(Req) + end, + State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq}, + %% @todo Must ensure the relevant settings are enabled (QUIC and H3). + %% Either we check them BEFORE, or we check them when the handler + %% is OK to initiate a webtransport session. Probably need to + %% check them BEFORE as we need to become (takeover) the webtransport process + %% after we are done with the upgrade. Maybe in cow_http3_machine but + %% it doesn't have QUIC settings currently (max_datagram_size). + case is_upgrade_request(Req) of + true -> + Headers = cowboy_req:response_headers(#{}, Req), + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, + #{session_pid => self()}}}, + webtransport_init(State, HandlerState); + %% Use 501 Not Implemented to mirror the recommendation in + %% by RFC9220 3 (WebSockets Upgrade over HTTP/3). + false -> + %% @todo I don't think terminate will be called. + {ok, cowboy_req:reply(501, Req), Env} + end. + +webtransport_init(State=#state{handler=Handler}, HandlerState) -> + case erlang:function_exported(Handler, webtransport_init, 1) of + true -> handler_call(State, HandlerState, webtransport_init, undefined); + false -> before_loop(State, HandlerState) + end. + +before_loop(State=#state{hibernate=true}, HandlerState) -> + proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]); +before_loop(State, HandlerState) -> + loop(State, HandlerState). + +-spec loop(#state{}, any()) -> no_return(). + +loop(State=#state{id=SessionID, parent=Parent}, HandlerState) -> + receive + {'$webtransport_event', SessionID, Event={closed, _, _}} -> + terminate_proc(State, HandlerState, Event); + {'$webtransport_event', SessionID, Event=closed_abruptly} -> + terminate_proc(State, HandlerState, Event); + {'$webtransport_event', SessionID, Event} -> + handler_call(State, HandlerState, webtransport_handle, Event); + %% Timeouts. +%% @todo idle_timeout +% {timeout, TRef, ?MODULE} -> +% tick_idle_timeout(State, HandlerState, ParseState); +% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> +% before_loop(State, HandlerState, ParseState); + %% System messages. + {'EXIT', Parent, Reason} -> + %% @todo We should exit gracefully. + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, HandlerState}); + %% Calls from supervisor module. + {'$gen_call', From, Call} -> + cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), + before_loop(State, HandlerState); + Message -> + handler_call(State, HandlerState, webtransport_info, Message) + end. + +handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) -> + try case Callback of + webtransport_init -> Handler:webtransport_init(HandlerState); + _ -> Handler:Callback(Message, HandlerState) + end of + {Commands, HandlerState2} when is_list(Commands) -> + handler_call_result(State, HandlerState2, Commands); + {Commands, HandlerState2, hibernate} when is_list(Commands) -> + handler_call_result(State#state{hibernate=true}, HandlerState2, Commands) + catch Class:Reason:Stacktrace -> + %% @todo Do we need to send a close? Let cowboy_http3 detect and handle it? + handler_terminate(State, HandlerState, {crash, Class, Reason}), + erlang:raise(Class, Reason, Stacktrace) + end. + +handler_call_result(State0, HandlerState, Commands) -> + case commands(Commands, State0, ok, []) of + {ok, State} -> + before_loop(State, HandlerState); + {stop, State} -> + terminate_proc(State, HandlerState, stop) + end. + +%% We accumulate the commands that must be sent to the connection process +%% because we want to send everything into one message. Other commands are +%% processed immediately. + +commands([], State, Res, []) -> + {Res, State}; +commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) -> + Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)}, + {Res, State}; +%% {open_stream, OpenStreamRef, StreamType, InitialData}. +commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% {close_stream, StreamID, Code}. +commands([Command={close_stream, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% @todo We must reject send to a remote unidi stream. +%% {send, StreamID | datagram, Data}. +commands([Command={send, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% {send, StreamID, IsFin, Data}. +commands([Command={send, _, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% initiate_close - DRAIN_WT_SESSION +commands([Command=initiate_close|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION +%% @todo At this point the handler must not issue stream or send commands. +commands([Command=close|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]); +commands([Command={close, _}|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]); +commands([Command={close, _, _}|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]). +%% @todo A set_options command could be useful to increase the number of allowed streams +%% or other forms of flow control. Alternatively a flow command. Or both. +%% @todo A shutdown_reason command could be useful for the same reasons as Websocekt. + +-spec terminate_proc(_, _, _) -> no_return(). + +terminate_proc(State, HandlerState, Reason) -> + handler_terminate(State, HandlerState, Reason), + %% @todo This is what should be done if shutdown_reason gets implemented. +% case Shutdown of +% normal -> exit(normal); +% _ -> exit({shutdown, Shutdown}) +% end. + exit(normal). + +handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> + cowboy_handler:terminate(Reason, Req, HandlerState, Handler). + +%% cowboy_stream callbacks. +%% +%% We shortcut stream handlers but still need to process some events +%% such as process exiting or termination. We implement the relevant +%% callbacks here. Note that as far as WebTransport is concerned, +%% receiving stream data here would be an error therefore the data +%% callback is not implemented. +%% +%% @todo Better type than map() for the cowboy_stream state. + +-spec info(cowboy_stream:streamid(), any(), State) + -> {cowboy_stream:commands(), State} when State::map(). + +info(StreamID, Msg, WTState=#{stream_state := StreamState0}) -> + {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0), + {Commands, WTState#{stream_state => StreamState}}. + +-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), map()) + -> any(). + +terminate(StreamID, Reason, #{stream_state := StreamState}) -> + cowboy_stream:terminate(StreamID, Reason, StreamState). |