diff options
36 files changed, 606 insertions, 102 deletions
@@ -2,7 +2,7 @@ PROJECT = cowboy PROJECT_DESCRIPTION = Small, fast, modern HTTP server. -PROJECT_VERSION = 2.13.0 +PROJECT_VERSION = 2.14.0 PROJECT_REGISTERED = cowboy_clock # Options. @@ -16,7 +16,7 @@ CT_OPTS += -ct_hooks cowboy_ct_hook [] # -boot start_sasl LOCAL_DEPS = crypto DEPS = cowlib ranch -dep_cowlib = git https://github.com/ninenines/cowlib master +dep_cowlib = git https://github.com/ninenines/cowlib 2.16.0 dep_ranch = git https://github.com/ninenines/ranch 1.8.1 ifeq ($(COWBOY_QUICER),1) @@ -44,8 +44,8 @@ define HEX_TARBALL_EXTRA_METADATA #{ licenses => [<<"ISC">>], links => #{ - <<"User guide">> => <<"https://ninenines.eu/docs/en/cowboy/2.13/guide/">>, - <<"Function reference">> => <<"https://ninenines.eu/docs/en/cowboy/2.13/manual/">>, + <<"User guide">> => <<"https://ninenines.eu/docs/en/cowboy/2.14/guide/">>, + <<"Function reference">> => <<"https://ninenines.eu/docs/en/cowboy/2.14/manual/">>, <<"GitHub">> => <<"https://github.com/ninenines/cowboy">>, <<"Sponsor">> => <<"https://github.com/sponsors/essen">> } @@ -53,7 +53,7 @@ define HEX_TARBALL_EXTRA_METADATA endef hex_req_ranch = >= 1.8.0 and < 3.0.0 -hex_req_cowlib = >= 2.14.0 and < 3.0.0 +hex_req_cowlib = >= 2.16.0 and < 3.0.0 # Standard targets. diff --git a/README.asciidoc b/README.asciidoc index 5721a7b..fc4fb71 100644 --- a/README.asciidoc +++ b/README.asciidoc @@ -18,8 +18,8 @@ Cowboy is *clean* and *well tested* Erlang code. == Online documentation -* https://ninenines.eu/docs/en/cowboy/2.13/guide[User guide] -* https://ninenines.eu/docs/en/cowboy/2.13/manual[Function reference] +* https://ninenines.eu/docs/en/cowboy/2.14/guide[User guide] +* https://ninenines.eu/docs/en/cowboy/2.14/manual[Function reference] == Offline documentation diff --git a/doc/src/guide/book.asciidoc b/doc/src/guide/book.asciidoc index 58eda34..2679f99 100644 --- a/doc/src/guide/book.asciidoc +++ b/doc/src/guide/book.asciidoc @@ -75,6 +75,8 @@ include::performance.asciidoc[Performance] = Additional information +include::migrating_from_2.13.asciidoc[Migrating from Cowboy 2.13 to 2.14] + include::migrating_from_2.12.asciidoc[Migrating from Cowboy 2.12 to 2.13] include::migrating_from_2.11.asciidoc[Migrating from Cowboy 2.11 to 2.12] diff --git a/doc/src/guide/getting_started.asciidoc b/doc/src/guide/getting_started.asciidoc index 06677ee..749b1d1 100644 --- a/doc/src/guide/getting_started.asciidoc +++ b/doc/src/guide/getting_started.asciidoc @@ -69,7 +69,7 @@ fetch and compile Cowboy, and that we will use releases: PROJECT = hello_erlang DEPS = cowboy -dep_cowboy_commit = 2.13.0 +dep_cowboy_commit = 2.14.0 REL_DEPS = relx diff --git a/doc/src/guide/migrating_from_2.13.asciidoc b/doc/src/guide/migrating_from_2.13.asciidoc new file mode 100644 index 0000000..8333eb9 --- /dev/null +++ b/doc/src/guide/migrating_from_2.13.asciidoc @@ -0,0 +1,63 @@ +[appendix] +== Migrating from Cowboy 2.13 to 2.14 + +Cowboy 2.14 adds experimental support for HTTP/3 +WebTransport based on the most recent draft. It +also has a new data delivery mechanism for HTTP/2 +and HTTP/3 Websocket, providing better performance. + +Cowboy 2.14 requires Erlang/OTP 24.0 or greater. + +=== Features added + +* The `relay` data delivery mechanism has been + added to HTTP/2 and HTTP/3 protocols. Using + this mechanism lets the Websocket protocol + bypass stream handlers to forward data from + the connection process to the Websocket + session process, as well as better manage + HTTP/2's flow control. This results in a + noticeable performance improvement. This + new mechanism can be used by all sub-protocols + built on top of HTTP/2 or HTTP/3 such as + Websocket or the upcoming HTTP/2 WebTransport. + +* The `last_modified` callback of REST handlers + now accepts `undefined` as a return value to + allow conditionally providing a timestamp. + +=== Experimental features added + +* Experimental support for HTTP/3 WebTransport + has been added, based on the most recent RFC + drafts. The implementation should also be + compatible with earlier drafts that are + currently in use by some browsers. Both + HTTP/3 and HTTP/3 WebTransport are disabled + by default; to enable, the environment + variable COWBOY_QUICER must be set at + compile-time, and a number of options must + be provided at run time, including + `enable_connect_protocol`, `h3_datagram`, + `wt_max_sessions` and for earlier drafts + `enable_webtransport`. The test suite is + the best place to get started at this time. + +=== Optimisation-related changes + +* The `dynamic_buffer` option introduced in + the previous release has been tweaked to + start at 512 bytes and have its value + changed less abruptly. This is based on + additional work done implementing the same + feature in RabbitMQ. + +* The static file handler will now use `raw` + mode to read file information to avoid a + bottleneck when querying the file server. + +=== Bugs fixed + +* It was possible for Websocket to fail to + enable active mode again after it had been + disabled. This has been fixed. diff --git a/doc/src/manual/cowboy_rest.asciidoc b/doc/src/manual/cowboy_rest.asciidoc index fcef799..4fbffd2 100644 --- a/doc/src/manual/cowboy_rest.asciidoc +++ b/doc/src/manual/cowboy_rest.asciidoc @@ -483,7 +483,7 @@ req() :: #{ ---- last_modified(Req, State) -> {Result, Req, State} -Result :: calendar:datetime() +Result :: calendar:datetime() | undefined Default - no last modified value ---- @@ -493,6 +493,10 @@ This date will be used to test against the if-modified-since and if-unmodified-since headers, and sent as the last-modified header in the response to GET and HEAD requests. +When `undefined` is returned, no last-modified header is +added to response. Can be useful if you save timestamp on store +action in memory and lose it after restart. + === malformed_request [source,erlang] @@ -856,6 +860,9 @@ listed here, like the authorization header. == Changelog +* *2.14*: The `last_modified` callback is now type correct + when returning `undefined` to avoid responding + a last-modified header. * *2.11*: The `ranges_provided`, `range_satisfiable` and the `RangeCallback` callbacks have been added. * *2.11*: The `generate_etag` callback can now return diff --git a/doc/src/manual/cowboy_websocket.asciidoc b/doc/src/manual/cowboy_websocket.asciidoc index 319dbae..f30ffca 100644 --- a/doc/src/manual/cowboy_websocket.asciidoc +++ b/doc/src/manual/cowboy_websocket.asciidoc @@ -200,14 +200,16 @@ Cowboy does it automatically for you. [source,erlang] ---- opts() :: #{ - active_n => pos_integer(), - compress => boolean(), - deflate_opts => cow_ws:deflate_opts() - dynamic_buffer => false | {pos_integer(), pos_integer()}, - idle_timeout => timeout(), - max_frame_size => non_neg_integer() | infinity, - req_filter => fun((cowboy_req:req()) -> map()), - validate_utf8 => boolean() + active_n => pos_integer(), + compress => boolean(), + data_delivery => stream_handlers | relay, + data_delivery_flow => pos_integer(), + deflate_opts => cow_ws:deflate_opts(), + dynamic_buffer => false | {pos_integer(), pos_integer()}, + idle_timeout => timeout(), + max_frame_size => non_neg_integer() | infinity, + req_filter => fun((cowboy_req:req()) -> map()), + validate_utf8 => boolean() } ---- @@ -241,6 +243,22 @@ Whether to enable the Websocket frame compression extension. Frames will only be compressed for the clients that support this extension. +data_delivery (stream_handlers):: + +HTTP/2+ only. Determines how data will be delivered +to the Websocket session process. `stream_handlers` +is the default and makes data go through stream +handlers. `relay` is a faster method introduced in +Cowboy 2.14 and sends data directly. `relay` is +intended to become the default in Cowboy 3.0. + +data_delivery_flow (pos_integer()):: + +When the `relay` data delivery method is used, +this value may be used to decide how much the +flow control window should be for the Websocket +stream. Currently only applies to HTTP/2. + deflate_opts (#{}):: Configuration for the permessage-deflate Websocket @@ -299,6 +317,10 @@ normal circumstances if necessary. == Changelog +* *2.14*: The `data_delivery` and `data_delivery_flow` options + were added. The `relay` data delivery mechanism + provides a better way of forwarding data to + HTTP/2+ Websocket session processes. * *2.13*: The `active_n` default value was changed to `1`. * *2.13*: The `dynamic_buffer` option was added. * *2.13*: The `max_frame_size` option can now be set dynamically. diff --git a/ebin/cowboy.app b/ebin/cowboy.app index 39be200..b797517 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,6 +1,6 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, - {vsn, "2.13.0"}, + {vsn, "2.14.0"}, {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_decompress_h','cowboy_handler','cowboy_http','cowboy_http2','cowboy_http3','cowboy_loop','cowboy_metrics_h','cowboy_middleware','cowboy_quicer','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_tracer_h','cowboy_websocket','cowboy_webtransport']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, diff --git a/rebar.config b/rebar.config index 146f88f..22040c9 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{cowlib,".*",{git,"https://github.com/ninenines/cowlib",{branch,"master"}}},{ranch,".*",{git,"https://github.com/ninenines/ranch",{tag,"1.8.1"}}} +{cowlib,".*",{git,"https://github.com/ninenines/cowlib",{tag,"2.16.0"}}},{ranch,".*",{git,"https://github.com/ninenines/ranch",{tag,"1.8.1"}}} ]}. {erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard,warn_missing_spec,warn_untyped_record]}. diff --git a/src/cowboy.erl b/src/cowboy.erl index 6a5634e..03580c8 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -172,7 +172,7 @@ ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) -> ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) -> case proplists:get_value(buffer, SocketOpts, undefined) of undefined -> - {TransOpts#{socket_opts => [{buffer, 1024}|SocketOpts]}, {1024, 131072}}; + {TransOpts#{socket_opts => [{buffer, 512}|SocketOpts]}, {512, 131072}}; _ -> {TransOpts, false} end. diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl index 4d05e50..f6a86f0 100644 --- a/src/cowboy_dynamic_buffer.hrl +++ b/src/cowboy_dynamic_buffer.hrl @@ -65,7 +65,7 @@ maybe_resize_buffer(State=#state{transport=Transport, socket=Socket, opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}}, dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) -> DataLen = byte_size(Data), - MovingAvg = (MovingAvg0 + DataLen) div 2, + MovingAvg = (MovingAvg0 * 7 + DataLen) / 8, if BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 10eb519..a2cf6c9 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -146,7 +146,7 @@ %% Dynamic buffer moving average and current buffer size. dynamic_buffer_size :: pos_integer() | false, - dynamic_buffer_moving_average :: non_neg_integer(), + dynamic_buffer_moving_average :: float(), %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. @@ -193,7 +193,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, dynamic_buffer_size=init_dynamic_buffer_size(Opts), - dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0.0), last_streamid=maps:get(max_keepalive, Opts, 1000)}, safe_setopts_active(State), before_loop(set_timeout(State, request_timeout)). diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0d22fa1..ef4bb98 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -82,8 +82,13 @@ -export_type([opts/0]). -record(stream, { - %% Whether the stream is currently stopping. - status = running :: running | stopping, + %% Whether the stream is currently in a special state. + %% + %% - The running state is the normal state of a stream. + %% - The relaying state is used by extended CONNECT protocols to + %% use a 'relay' data_delivery method. + %% - The stopping state indicates the stream used the 'stop' command. + status = running :: running | {relaying, non_neg_integer(), pid()} | stopping, %% Flow requested for this stream. flow = 0 :: non_neg_integer(), @@ -142,7 +147,7 @@ %% Dynamic buffer moving average and current buffer size. dynamic_buffer_size :: pos_integer() | false, - dynamic_buffer_moving_average :: non_neg_integer(), + dynamic_buffer_moving_average :: float(), %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. @@ -190,7 +195,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, dynamic_buffer_size=DynamicBuffer, - dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0.0), http2_status=sequence, http2_machine=HTTP2Machine}), 0), safe_setopts_active(State), case Buffer of @@ -241,7 +246,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, dynamic_buffer_size=DynamicBuffer, - dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0.0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> before_loop(info(State, StreamID, Msg), Buffer); + {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() -> + before_loop(relay_command(State, StreamID, RelayCommand), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> before_loop(down(State, Pid, Msg), Buffer); @@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; + %% Stream handlers are not used for the data when relaying. + #{StreamID := #stream{status={relaying, _, RelayPid}}} -> + RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data}, + %% We keep a steady flow using the configured flow value. + %% Because we do not change the 'flow' value the update_window/2 + %% function will always maintain this value (of course with + %% thresholds applying). + update_window(State0, StreamID); %% We ignore DATA frames for streams that are stopping. #{} -> State0 @@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. +%% There are two data_delivery: stream_handlers and relay. +%% The former just has the data go through stream handlers +%% like normal requests. The latter relays data directly. +%% +%% @todo When relaying there might be some data that is +%% in stream handlers and that need to be received, +%% depending on whether the protocol sends data +%% before processing the response. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, + [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) -> + State1 = info(State0, StreamID, {headers, 200, Headers}), + #{StreamID := Stream} = Streams, + #{data_delivery_pid := RelayPid} = ModState, + %% WINDOW_UPDATE frames updating the window will be sent after + %% the first DATA frame has been received. + RelayFlow = maps:get(data_delivery_flow, ModState, 131072), + State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{ + status={relaying, RelayFlow, RelayPid}, + flow=RelayFlow}}}, + commands(State, StreamID, Tail); commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(State0, StreamID, {headers, 200, Headers}), commands(State, StreamID, Tail); @@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Relay data delivery commands. + +relay_command(State, StreamID, DataCmd = {data, _, _}) -> + commands(State, StreamID, [DataCmd]); +%% When going active mode again we set the RelayFlow again +%% and update the window if necessary. +relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) -> + #{StreamID := Stream} = Streams, + #stream{status={relaying, RelayFlow, _}} = Stream, + update_window(State#state{flow=Flow + RelayFlow, + streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}}, + StreamID); +%% When going passive mode we don't update the window +%% since we have not incremented it. +relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) -> + #{StreamID := Stream} = Streams, + #stream{flow=StreamFlow} = Stream, + State#state{flow=Flow - StreamFlow, + streams=Streams#{StreamID => Stream#stream{flow=0}}}. + %% Tentatively update the window after the flow was updated. update_window(State0=#state{socket=Socket, transport=Transport, diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl index 9aa6be5..740b9e3 100644 --- a/src/cowboy_http3.erl +++ b/src/cowboy_http3.erl @@ -67,6 +67,7 @@ %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} | normal | {data | ignore, non_neg_integer()} | stopping + | {relaying, normal | {data, non_neg_integer()}, pid()} | {webtransport_session, normal | {ignore, non_neg_integer()}} | {webtransport_stream, cow_http3:stream_id()}, @@ -164,6 +165,8 @@ loop(State0=#state{opts=Opts, children=Children}) -> %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State0, StreamID, Msg)); + {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() -> + loop(relay_command(State0, StreamID, RelayCommand)); %% WebTransport commands. {'$webtransport_commands', SessionID, Commands} -> loop(webtransport_commands(State0, SessionID, Commands)); @@ -299,6 +302,22 @@ parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) -> parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin), StreamID, Rest, IsFin) end; +%% This clause mirrors the {data, Len} clause. +parse1(State, Stream=#stream{status={relaying, {data, Len}, RelayPid}, id=StreamID}, + Data, IsFin) -> + DataLen = byte_size(Data), + if + DataLen < Len -> + %% We don't have the full frame but this is the end of the + %% data we have. So FrameIsFin is equivalent to IsFin here. + loop(frame(State, Stream#stream{status={relaying, {data, Len - DataLen}, RelayPid}}, + {data, Data}, IsFin)); + true -> + <<Data1:Len/binary, Rest/bits>> = Data, + FrameIsFin = is_fin(IsFin, Rest), + parse(frame(State, Stream#stream{status={relaying, normal, RelayPid}}, + {data, Data1}, FrameIsFin), StreamID, Rest, IsFin) + end; parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) -> DataLen = byte_size(Data), if @@ -311,7 +330,7 @@ parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) -> end; %% @todo Clause that discards receiving data for stopping streams. %% We may receive a few more frames after we abort receiving. -parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> +parse1(State=#state{opts=Opts}, Stream=#stream{status=Status0, id=StreamID}, Data, IsFin) -> case cow_http3:parse(Data) of {ok, Frame, Rest} -> FrameIsFin = is_fin(IsFin, Rest), @@ -322,6 +341,10 @@ parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> {more, Frame = {data, _}, Len} -> %% We're at the end of the data so FrameIsFin is equivalent to IsFin. case IsFin of + nofin when element(1, Status0) =:= relaying -> + %% The stream will be stored at the end of processing commands. + Status = setelement(2, Status0, {data, Len}), + loop(frame(State, Stream#stream{status=Status}, Frame, nofin)); nofin -> %% The stream will be stored at the end of processing commands. loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin)); @@ -432,6 +455,9 @@ frame(State=#state{http3_machine=HTTP3Machine0}, terminate(State#state{http3_machine=HTTP3Machine}, Error) end. +data_frame(State, Stream=#stream{status={relaying, _, RelayPid}, id=StreamID}, IsFin, Data) -> + RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data}, + stream_store(State, Stream); data_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -767,6 +793,18 @@ commands(State0, Stream0=#stream{id=StreamID}, }, %% @todo We must propagate the buffer to capsule handling if any. commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); +%% There are two data_delivery: stream_handlers and relay. +%% The former just has the data go through stream handlers +%% like normal requests. The latter relays data directly. +commands(State0, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + Stream1 = #stream{status=normal} = stream_get(State, StreamID), + #{data_delivery_pid := RelayPid} = ModState, + %% We do not set data_delivery_flow because it is managed by quicer + %% and we do not have an easy way to modify it. + Stream = Stream1#stream{status={relaying, normal, RelayPid}}, + commands(State, Stream, Tail); commands(State0, Stream0=#stream{id=StreamID}, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), @@ -872,6 +910,20 @@ send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID}, cowboy_quicer:send(Conn, EncoderID, EncData)), State. +%% Relay data delivery commands. + +relay_command(State, StreamID, DataCmd = {data, _, _}) -> + Stream = stream_get(State, StreamID), + commands(State, Stream, [DataCmd]); +relay_command(State=#state{conn=Conn}, StreamID, active) -> + ok = maybe_socket_error(State, + cowboy_quicer:setopt(Conn, StreamID, active, true)), + State; +relay_command(State=#state{conn=Conn}, StreamID, passive) -> + ok = maybe_socket_error(State, + cowboy_quicer:setopt(Conn, StreamID, active, false)), + State. + %% We mark the stream as being a WebTransport stream %% and then continue parsing the data as a WebTransport %% stream. This function is common for incoming unidi diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl index aa52fae..d2f5433 100644 --- a/src/cowboy_quicer.erl +++ b/src/cowboy_quicer.erl @@ -25,6 +25,7 @@ %% Streams. -export([start_bidi_stream/2]). -export([start_unidi_stream/2]). +-export([setopt/4]). -export([send/3]). -export([send/4]). -export([send_datagram/2]). @@ -54,6 +55,9 @@ start_bidi_stream(_, _) -> no_quicer(). -spec start_unidi_stream(_, _) -> no_return(). start_unidi_stream(_, _) -> no_quicer(). +-spec setopt(_, _, _, _) -> no_return(). +setopt(_, _, _, _) -> no_quicer(). + -spec send(_, _, _) -> no_return(). send(_, _, _) -> no_quicer(). @@ -109,7 +113,7 @@ sockname(Conn) -> | {error, any()}. peercert(Conn) -> - quicer_nif:peercert(Conn). + quicer_nif:peercert(Conn). -spec shutdown(quicer_connection_handle(), quicer_app_errno()) -> ok | {error, any()}. @@ -154,6 +158,13 @@ start_stream(Conn, InitialData, OpenFlag) -> Error end. +-spec setopt(quicer_connection_handle(), cow_http3:stream_id(), active, boolean()) + -> ok | {error, any()}. + +setopt(_Conn, StreamID, active, Value) -> + StreamRef = get({quicer_stream, StreamID}), + quicer:setopt(StreamRef, active, Value). + -spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata()) -> ok | {error, any()}. diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 9f30fcf..7629b3e 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -130,7 +130,7 @@ -optional_callbacks([languages_provided/2]). -callback last_modified(Req, State) - -> {calendar:datetime(), Req, State} + -> {calendar:datetime() | undefined, Req, State} when Req::cowboy_req:req(), State::any(). -optional_callbacks([last_modified/2]). @@ -1531,6 +1531,11 @@ last_modified(Req, State=#state{last_modified=undefined}) -> case unsafe_call(Req, State, last_modified) of no_call -> {undefined, Req, State#state{last_modified=no_call}}; + %% We allow the callback to return 'undefined', + %% in which case the generated header would be missing + %% as if the callback was not called. + {undefined, Req2, State2} -> + {undefined, Req2, State2#state{last_modified=no_call}}; {LastModified, Req2, State2} -> {LastModified, Req2, State2#state{last_modified=LastModified}} end; diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl index ce34b01..be3906b 100644 --- a/src/cowboy_static.erl +++ b/src/cowboy_static.erl @@ -183,12 +183,12 @@ init_info(Req, Path, HowToAccess, Extra) -> {cowboy_rest, Req, {Path, Info, Extra}}. read_file_info(Path, direct) -> - case file:read_file_info(Path, [{time, universal}]) of + case file:read_file_info(Path, [raw, {time, universal}]) of {ok, Info} -> {direct, Info}; Error -> Error end; read_file_info(Path, {archive, Archive}) -> - case file:read_file_info(Archive, [{time, universal}]) of + case file:read_file_info(Archive, [raw, {time, universal}]) of {ok, ArchiveInfo} -> %% The Erlang application archive is fine. %% Now check if the requested file is in that diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index cb30c3f..b66a414 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -68,6 +68,8 @@ -type opts() :: #{ active_n => pos_integer(), compress => boolean(), + data_delivery => stream_handlers | relay, + data_delivery_flow => pos_integer(), deflate_opts => cow_ws:deflate_opts(), dynamic_buffer => false | {pos_integer(), pos_integer()}, dynamic_buffer_initial_average => non_neg_integer(), @@ -91,7 +93,7 @@ parent :: undefined | pid(), ref :: ranch:ref(), socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined, - transport = undefined :: module() | undefined, + transport :: module() | {data_delivery, stream_handlers | relay}, opts = #{} :: opts(), active = true :: boolean(), handler :: module(), @@ -103,7 +105,7 @@ %% Dynamic buffer moving average and current buffer size. dynamic_buffer_size = false :: pos_integer() | false, - dynamic_buffer_moving_average = 0 :: non_neg_integer(), + dynamic_buffer_moving_average = 0.0 :: float(), hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), @@ -149,7 +151,7 @@ upgrade(Req, Env, Handler, HandlerState) -> %% @todo Immediately crash if a response has already been sent. upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> FilteredReq = case maps:get(req_filter, Opts, undefined) of - undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0); + undefined -> maps:with([method, version, scheme, host, port, path, qs, peer, streamid], Req0); FilterFun -> FilterFun(Req0) end, Utf8State = case maps:get(validate_utf8, Opts, true) of @@ -273,12 +275,27 @@ websocket_handshake(State=#state{key=Key}, %% For HTTP/2 we do not let the process die, we instead keep it %% for the Websocket stream. This is because in HTTP/2 we only %% have a stream, it doesn't take over the whole connection. -websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, +%% +%% There are two methods of delivering data to the Websocket session: +%% - 'stream_handlers' is the default and makes the data go +%% through stream handlers just like when reading a request body; +%% - 'relay' is a new method where data is sent as a message as +%% soon as it is received from the socket in a DATA frame. +websocket_handshake(State=#state{opts=Opts}, + Req=#{ref := Ref, pid := Pid, streamid := StreamID}, HandlerState, _Env) -> %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), - Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), + ModState = #{ + data_delivery => DataDelivery, + %% For relay data_delivery. The flow is a hint and may + %% not be used by the underlying protocol. + data_delivery_pid => self(), + data_delivery_flow => maps:get(data_delivery_flow, Opts, 131072) + }, + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, ModState}}, + takeover(Pid, Ref, {Pid, StreamID}, {data_delivery, DataDelivery}, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -301,7 +318,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -type parse_state() :: #ps_header{} | #ps_payload{}. -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, - module() | undefined, any(), binary(), + module() | {data_delivery, stream_handlers | relay}, any(), binary(), {#state{}, any()}) -> no_return(). takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> @@ -311,7 +328,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, _ -> ranch:remove_connection(Ref) end, Messages = case Transport of - undefined -> undefined; + {data_delivery, _} -> undefined; _ -> Transport:messages() end, State = set_idle_timeout(State0#state{parent=Parent, @@ -320,7 +337,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, key=undefined, messages=Messages, %% Dynamic buffer only applies to HTTP/1.1 Websocket. dynamic_buffer_size=init_dynamic_buffer_size(Opts), - dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0.0)}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -355,13 +372,14 @@ after_init(State, HandlerState, ParseState) -> %% immediately but there might still be data to be processed in %% the message queue. -setopts_active(#state{transport=undefined}) -> +setopts_active(#state{transport={data_delivery, _}}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). -maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> +maybe_read_body(#state{transport={data_delivery, stream_handlers}, + socket=Stream={Pid, _}, active=true}) -> %% @todo Keep Ref around. ReadBodyRef = make_ref(), Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, @@ -369,16 +387,25 @@ maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true} maybe_read_body(_) -> ok. -active(State) -> +active(State=#state{transport={data_delivery, relay}, + socket=Stream={Pid, _}}) -> + Pid ! {'$cowboy_relay_command', Stream, active}, + State#state{active=true}; +active(State0) -> + State = State0#state{active=true}, setopts_active(State), maybe_read_body(State), - State#state{active=true}. + State. -passive(State=#state{transport=undefined}) -> +passive(State=#state{transport={data_delivery, stream_handlers}}) -> %% Unfortunately we cannot currently cancel read_body. %% But that's OK, we will just stop reading the body %% after the next message. State#state{active=false}; +passive(State=#state{transport={data_delivery, relay}, + socket=Stream={Pid, _}}) -> + Pid ! {'$cowboy_relay_command', Stream, passive}, + State#state{active=false}; passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) -> Transport:setopts(Socket, [{active, false}]), flush_passive(Socket, Messages), @@ -454,6 +481,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, {request_body, _Ref, fin, _, Data} -> maybe_read_body(State), parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); + %% @todo It would be better to check StreamID. + %% @todo We must ensure that IsFin=fin is handled like a socket close? + {'$cowboy_relay_data', {Pid, _StreamID}, _IsFin, Data} when Pid =:= Parent -> + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); %% Timeouts. {timeout, TRef, ?MODULE} -> tick_idle_timeout(State, HandlerState, ParseState); @@ -662,9 +693,14 @@ commands([Frame|Tail], State, Data0) -> commands(Tail, State, Data) end. -transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) -> +transport_send(#state{transport={data_delivery, stream_handlers}, + socket=Stream={Pid, _}}, IsFin, Data) -> Pid ! {Stream, {data, IsFin, Data}}, ok; +transport_send(#state{transport={data_delivery, relay}, + socket=Stream={Pid, _}}, IsFin, Data) -> + Pid ! {'$cowboy_relay_command', Stream, {data, IsFin, Data}}, + ok; transport_send(#state{socket=Socket, transport=Transport}, _, Data) -> Transport:send(Socket, Data). diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl index 8c8ca39..7bcac55 100644 --- a/src/cowboy_webtransport.erl +++ b/src/cowboy_webtransport.erl @@ -277,6 +277,7 @@ handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> %% callback is not implemented. %% %% @todo Better type than map() for the cowboy_stream state. +%% @todo Is this really useful? -spec info(cowboy_stream:streamid(), any(), State) -> {cowboy_stream:commands(), State} when State::map(). diff --git a/test/handlers/last_modified_h.erl b/test/handlers/last_modified_h.erl index 82893b3..1b109e3 100644 --- a/test/handlers/last_modified_h.erl +++ b/test/handlers/last_modified_h.erl @@ -19,6 +19,8 @@ get_text_plain(Req, State) -> last_modified(Req=#{qs := <<"tuple">>}, State) -> {{{2012, 9, 21}, {22, 36, 14}}, Req, State}; +last_modified(Req=#{qs := <<"undefined">>}, State) -> + {undefined, Req, State}; %% Simulate the callback being missing in other cases. last_modified(#{qs := <<"missing">>}, _) -> no_call. diff --git a/test/handlers/ws_active_commands_h.erl b/test/handlers/ws_active_commands_h.erl index 1c615e3..bbf8907 100644 --- a/test/handlers/ws_active_commands_h.erl +++ b/test/handlers/ws_active_commands_h.erl @@ -9,8 +9,8 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> - {cowboy_websocket, Req, RunOrHibernate}. +init(Req, Opts) -> + {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), Opts}. websocket_init(State=run) -> erlang:send_after(1500, self(), active_true), diff --git a/test/handlers/ws_deflate_commands_h.erl b/test/handlers/ws_deflate_commands_h.erl index 14236bc..be3e16e 100644 --- a/test/handlers/ws_deflate_commands_h.erl +++ b/test/handlers/ws_deflate_commands_h.erl @@ -8,10 +8,11 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> +init(Req, Opts) -> + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), {cowboy_websocket, Req, - #{deflate => true, hibernate => RunOrHibernate}, - #{compress => true}}. + #{deflate => true, hibernate => maps:get(run_or_hibernate, Opts)}, + #{compress => true, data_delivery => DataDelivery}}. websocket_handle(Frame, State=#{deflate := Deflate0, hibernate := run}) -> Deflate = not Deflate0, diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl index da3ffad..0a8513b 100644 --- a/test/handlers/ws_handle_commands_h.erl +++ b/test/handlers/ws_handle_commands_h.erl @@ -9,14 +9,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State) -> {[], State}. diff --git a/test/handlers/ws_ignore.erl b/test/handlers/ws_ignore.erl index 9fe3322..37595aa 100644 --- a/test/handlers/ws_ignore.erl +++ b/test/handlers/ws_ignore.erl @@ -8,6 +8,7 @@ init(Req, _) -> {cowboy_websocket, Req, undefined, #{ + data_delivery => relay, compress => true }}. diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl index d596473..2115727 100644 --- a/test/handlers/ws_info_commands_h.erl +++ b/test/handlers/ws_info_commands_h.erl @@ -10,14 +10,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State) -> self() ! shoot, diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl index 8bae352..d821b18 100644 --- a/test/handlers/ws_init_commands_h.erl +++ b/test/handlers/ws_init_commands_h.erl @@ -9,14 +9,20 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req=#{pid := Pid}, RunOrHibernate) -> +init(Req, Opts) -> Commands0 = cowboy_req:header(<<"x-commands">>, Req), Commands = binary_to_term(base64:decode(Commands0)), case Commands of - bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); - _ -> ok + bad -> + Pid = case Req of + #{version := 'HTTP/2'} -> self(); + #{pid := Pid0} -> Pid0 + end, + ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> + ok end, - {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + {cowboy_websocket, Req, {Commands, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State={Commands, run}) -> {Commands, State}; diff --git a/test/handlers/ws_init_h.erl b/test/handlers/ws_init_h.erl index bbe9ef9..c44986f 100644 --- a/test/handlers/ws_init_h.erl +++ b/test/handlers/ws_init_h.erl @@ -8,9 +8,9 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, _) -> +init(Req, Opts) -> State = binary_to_atom(cowboy_req:qs(Req), latin1), - {cowboy_websocket, Req, State}. + {cowboy_websocket, Req, State, Opts}. %% Sleep to make sure the HTTP response was sent. websocket_init(State) -> diff --git a/test/handlers/ws_set_options_commands_h.erl b/test/handlers/ws_set_options_commands_h.erl index 1ab0af4..af768d6 100644 --- a/test/handlers/ws_set_options_commands_h.erl +++ b/test/handlers/ws_set_options_commands_h.erl @@ -7,9 +7,10 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> - {cowboy_websocket, Req, RunOrHibernate, - #{idle_timeout => infinity}}. +init(Req, Opts) -> + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), + {cowboy_websocket, Req, maps:get(run_or_hibernate, Opts), + #{idle_timeout => infinity, data_delivery => DataDelivery}}. %% Set the idle_timeout option dynamically. websocket_handle({text, <<"idle_timeout_short">>}, State=run) -> diff --git a/test/handlers/ws_shutdown_reason_commands_h.erl b/test/handlers/ws_shutdown_reason_commands_h.erl index 90b435c..878b70a 100644 --- a/test/handlers/ws_shutdown_reason_commands_h.erl +++ b/test/handlers/ws_shutdown_reason_commands_h.erl @@ -10,9 +10,9 @@ -export([websocket_handle/2]). -export([websocket_info/2]). -init(Req, RunOrHibernate) -> +init(Req, Opts) -> TestPid = list_to_pid(binary_to_list(cowboy_req:header(<<"x-test-pid">>, Req))), - {cowboy_websocket, Req, {TestPid, RunOrHibernate}}. + {cowboy_websocket, Req, {TestPid, maps:get(run_or_hibernate, Opts)}, Opts}. websocket_init(State={TestPid, RunOrHibernate}) -> TestPid ! {ws_pid, self()}, diff --git a/test/rest_handler_SUITE.erl b/test/rest_handler_SUITE.erl index a3d9533..7a9566f 100644 --- a/test/rest_handler_SUITE.erl +++ b/test/rest_handler_SUITE.erl @@ -796,6 +796,16 @@ last_modified_missing(Config) -> false = lists:keyfind(<<"last-modified">>, 1, Headers), ok. +last_modified_undefined(Config) -> + doc("The last-modified header must not be sent when the callback returns undefined."), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/last_modified?undefined", [ + {<<"accept-encoding">>, <<"gzip">>} + ]), + {response, _, 200, Headers} = gun:await(ConnPid, Ref), + false = lists:keyfind(<<"last-modified">>, 1, Headers), + ok. + options_missing(Config) -> doc("A successful OPTIONS request to a simple handler results in " "a 200 OK response with the allow header set. (RFC7231 4.3.7)"), diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl index 183fa0f..a42ca3f 100644 --- a/test/rfc7231_SUITE.erl +++ b/test/rfc7231_SUITE.erl @@ -44,7 +44,7 @@ init_dispatch(_) -> {"/echo/:key", echo_h, []}, {"/delay/echo/:key", echo_h, []}, {"/resp/:key[/:arg]", resp_h, []}, - {"/ws", ws_init_h, []} + {"/ws", ws_init_h, #{}} ]}]). %% @todo The documentation should list what methods, headers and status codes diff --git a/test/rfc8441_SUITE.erl b/test/rfc8441_SUITE.erl index b788f9f..efa753c 100644 --- a/test/rfc8441_SUITE.erl +++ b/test/rfc8441_SUITE.erl @@ -396,6 +396,9 @@ accept_handshake_when_enabled(Config) -> MaskedHello = ws_SUITE:do_mask(<<"Hello">>, Mask, <<>>), ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedHello/binary>>)), + %% Ignore expected WINDOW_UPDATE frames. + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), + {ok, <<4:24, 8:8, _:72>>} = gen_tcp:recv(Socket, 13, 1000), {ok, <<Len2:24, _:8, _:8, _:32>>} = gen_tcp:recv(Socket, 9, 1000), {ok, <<1:1, 0:3, 1:4, 0:1, 5:7, "Hello">>} = gen_tcp:recv(Socket, Len2, 1000), ok. diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 6fa4e61..2f1d3e2 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -42,7 +42,7 @@ init_dispatch() -> {"localhost", [ {"/ws_echo", ws_echo, []}, {"/ws_echo_timer", ws_echo_timer, []}, - {"/ws_init", ws_init_h, []}, + {"/ws_init", ws_init_h, #{}}, {"/ws_init_shutdown", ws_init_shutdown, []}, {"/ws_send_many", ws_send_many, [ {sequence, [ diff --git a/test/ws_SUITE_data/ws_echo.erl b/test/ws_SUITE_data/ws_echo.erl index efdc204..1f3a4be 100644 --- a/test/ws_SUITE_data/ws_echo.erl +++ b/test/ws_SUITE_data/ws_echo.erl @@ -8,6 +8,7 @@ init(Req, _) -> {cowboy_websocket, Req, undefined, #{ + data_delivery => relay, compress => true }}. diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl index ab9dbe2..00c584d 100644 --- a/test/ws_handler_SUITE.erl +++ b/test/ws_handler_SUITE.erl @@ -19,21 +19,40 @@ -import(ct_helper, [config/2]). -import(ct_helper, [doc/1]). -import(cowboy_test, [gun_open/1]). +-import(cowboy_test, [gun_open/2]). -import(cowboy_test, [gun_down/1]). %% ct. all() -> - [{group, ws}, {group, ws_hibernate}]. + [ + {group, h1}, + {group, h1_hibernate}, + {group, h2}, + {group, h2_relay} + ]. %% @todo Test against HTTP/2 too. groups() -> AllTests = ct_helper:all(?MODULE), - [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}]. - -init_per_group(Name, Config) -> + [ + {h1, [parallel], AllTests}, + {h1_hibernate, [parallel], AllTests}, + %% The websocket_deflate_false test isn't compatible with HTTP/2. + {h2, [parallel], AllTests -- [websocket_deflate_false]}, + {h2_relay, [parallel], AllTests -- [websocket_deflate_false]} + ]. + +init_per_group(Name, Config) + when Name =:= h1; Name =:= h1_hibernate -> cowboy_test:init_http(Name, #{ env => #{dispatch => init_dispatch(Name)} + }, Config); +init_per_group(Name, Config) + when Name =:= h2; Name =:= h2_relay -> + cowboy_test:init_http2(Name, #{ + enable_connect_protocol => true, + env => #{dispatch => init_dispatch(Name)} }, Config). end_per_group(Name, _) -> @@ -42,25 +61,27 @@ end_per_group(Name, _) -> %% Dispatch configuration. init_dispatch(Name) -> - RunOrHibernate = case Name of - ws -> run; - ws_hibernate -> hibernate + InitialState = case Name of + h1_hibernate -> #{run_or_hibernate => hibernate}; + h2_relay -> #{run_or_hibernate => run, data_delivery => relay}; + _ -> #{run_or_hibernate => run} end, cowboy_router:compile([{'_', [ - {"/init", ws_init_commands_h, RunOrHibernate}, - {"/handle", ws_handle_commands_h, RunOrHibernate}, - {"/info", ws_info_commands_h, RunOrHibernate}, - {"/trap_exit", ws_init_h, RunOrHibernate}, - {"/active", ws_active_commands_h, RunOrHibernate}, - {"/deflate", ws_deflate_commands_h, RunOrHibernate}, - {"/set_options", ws_set_options_commands_h, RunOrHibernate}, - {"/shutdown_reason", ws_shutdown_reason_commands_h, RunOrHibernate} + {"/init", ws_init_commands_h, InitialState}, + {"/handle", ws_handle_commands_h, InitialState}, + {"/info", ws_info_commands_h, InitialState}, + {"/trap_exit", ws_init_h, InitialState}, + {"/active", ws_active_commands_h, InitialState}, + {"/deflate", ws_deflate_commands_h, InitialState}, + {"/set_options", ws_set_options_commands_h, InitialState}, + {"/shutdown_reason", ws_shutdown_reason_commands_h, InitialState} ]}]). %% Support functions for testing using Gun. gun_open_ws(Config, Path, Commands) -> - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, Path, [ {<<"x-commands">>, base64:encode(term_to_binary(Commands))} ]), @@ -75,6 +96,13 @@ gun_open_ws(Config, Path, Commands) -> error(timeout) end. +do_await_enable_connect_protocol(http, _) -> + ok; +do_await_enable_connect_protocol(http2, ConnPid) -> + {notify, settings_changed, #{enable_connect_protocol := true}} + = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1? + ok. + receive_ws(ConnPid, StreamRef) -> receive {gun_ws, ConnPid, StreamRef, Frame} -> @@ -123,7 +151,14 @@ websocket_info_invalid(Config) -> do_invalid(Config, Path) -> {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, bad), ensure_handle_is_called(ConnPid, StreamRef, Path), - gun_down(ConnPid). + case config(protocol, Config) of + %% HTTP/1.1 closes the connection. + http -> gun_down(ConnPid); + %% HTTP/2 terminates the stream. + http2 -> + receive {gun_error, ConnPid, StreamRef, {stream_error, internal_error, _}} -> ok + after 500 -> error(timeout) end + end. websocket_init_one_frame(Config) -> doc("A single frame is received when websocket_init/1 returns it as a command."), @@ -223,8 +258,12 @@ websocket_active_false(Config) -> doc("The {active, false} command stops receiving data from the socket. " "The {active, true} command reenables it."), {ok, ConnPid, StreamRef} = gun_open_ws(Config, "/active", []), + %% We must exhaust the HTTP/2 flow control window + %% otherwise the frame will be received even if active mode is disabled. + gun:ws_send(ConnPid, StreamRef, {binary, <<0:100000/unit:8>>}), gun:ws_send(ConnPid, StreamRef, {text, <<"Not received until the handler enables active again.">>}), {error, timeout} = receive_ws(ConnPid, StreamRef), + {ok, {binary, _}} = receive_ws(ConnPid, StreamRef), {ok, {text, <<"Not received until the handler enables active again.">>}} = receive_ws(ConnPid, StreamRef), ok. @@ -271,7 +310,8 @@ websocket_deflate_ignore_if_not_negotiated(Config) -> websocket_set_options_idle_timeout(Config) -> doc("The idle_timeout option can be modified using the " "command {set_options, Opts} at runtime."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/set_options"), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> @@ -299,7 +339,8 @@ websocket_set_options_idle_timeout(Config) -> websocket_set_options_max_frame_size(Config) -> doc("The max_frame_size option can be modified using the " "command {set_options, Opts} at runtime."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/set_options"), receive {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> @@ -334,7 +375,8 @@ websocket_set_options_max_frame_size(Config) -> websocket_shutdown_reason(Config) -> doc("The command {shutdown_reason, any()} can be used to " "change the shutdown reason of a Websocket connection."), - ConnPid = gun_open(Config), + ConnPid = gun_open(Config, #{http2_opts => #{notify_settings_changed => true}}), + do_await_enable_connect_protocol(config(protocol, Config), ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/shutdown_reason", [ {<<"x-test-pid">>, pid_to_list(self())} ]), diff --git a/test/ws_perf_SUITE.erl b/test/ws_perf_SUITE.erl index ff88554..98b4e18 100644 --- a/test/ws_perf_SUITE.erl +++ b/test/ws_perf_SUITE.erl @@ -55,14 +55,15 @@ init_per_group(Name, Config) when Name =:= h2c; Name =:= h2c_compress -> h2c_compress -> {compress, #{stream_handlers => [cowboy_compress_h, cowboy_stream_h]}} end, Config1 = cowboy_test:init_http(Name, Opts#{ - connection_window_margin_size => 64*1024, + %% The margin sizes must be larger than the larger test message for plain sockets. + connection_window_margin_size => 128*1024, enable_connect_protocol => true, env => #{dispatch => init_dispatch(Config)}, max_frame_size_sent => 64*1024, max_frame_size_received => 16384 * 1024 - 1, max_received_frame_rate => {10_000_000, 1}, stream_window_data_threshold => 1024, - stream_window_margin_size => 64*1024 + stream_window_margin_size => 128*1024 }, [{flavor, Flavor}|Config]), lists:keyreplace(protocol, 1, Config1, {protocol, http2}); init_per_group(ascii, Config) -> @@ -265,13 +266,14 @@ send_N_16384B(Config) -> do_send(Config, What, Num, FrameSize) -> {ok, ConnPid, StreamRef} = do_gun_open_ws("/ws_ignore", Config), + %% Prepare the frame data. FrameType = config(frame_type, Config), FrameData = case FrameType of text -> do_text_data(Config, FrameSize); binary -> rand:bytes(FrameSize) end, %% Heat up the processes before doing the real run. -% do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData), +% do_send_loop(Socket, Num, FrameType, Mask, MaskedFrameData), {Time, _} = timer:tc(?MODULE, do_send_loop, [ConnPid, StreamRef, Num, FrameType, FrameData]), do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), gun:ws_send(ConnPid, StreamRef, close), @@ -279,12 +281,181 @@ do_send(Config, What, Num, FrameSize) -> gun_down(ConnPid). do_send_loop(ConnPid, StreamRef, 0, _, _) -> - gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), - {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), - ok; + gun:ws_send(ConnPid, StreamRef, {text, <<"CHECK">>}), + {ok, {text, <<"CHECK">>}} = receive_ws(ConnPid, StreamRef), + ok; do_send_loop(ConnPid, StreamRef, Num, FrameType, FrameData) -> - gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), - do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + gun:ws_send(ConnPid, StreamRef, {FrameType, FrameData}), + do_send_loop(ConnPid, StreamRef, Num - 1, FrameType, FrameData). + +tcp_send_N_00000B(Config) -> + doc("Send a 0B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 0). + +tcp_send_N_00256B(Config) -> + doc("Send a 256B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 256). + +tcp_send_N_01024B(Config) -> + doc("Send a 1024B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 1024). + +tcp_send_N_04096B(Config) -> + doc("Send a 4096B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 4096). + +tcp_send_N_16384B(Config) -> + doc("Send a 16384B frame 10000 times."), + do_tcp_send(Config, tcps_N, 10000, 16384). + +do_tcp_send(Config, What, Num, FrameSize) -> + {ok, Socket} = do_tcp_handshake(Config, #{}), + %% Prepare the frame data. + FrameType = config(frame_type, Config), + FrameData = case FrameType of + text -> do_text_data(Config, FrameSize); + binary -> rand:bytes(FrameSize) + end, + %% Mask the data outside the benchmark to avoid influencing the results. + Mask = 16#37fa213d, + MaskedFrameData = ws_SUITE:do_mask(FrameData, Mask, <<>>), + FrameSizeWithHeader = FrameSize + case FrameSize of + N when N =< 125 -> 6; + N when N =< 16#ffff -> 8; + N when N =< 16#7fffffffffffffff -> 14 + end, + %% Run the benchmark; different function for h1 and h2. + {Time, _} = case config(protocol, Config) of + http -> timer:tc(?MODULE, do_tcp_send_loop_h1, + [Socket, Num, FrameType, Mask, MaskedFrameData]); + http2 -> timer:tc(?MODULE, do_tcp_send_loop_h2, + [Socket, 65535, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader]) + end, + do_log("~-6s ~-6s ~6s: ~8bµs", [What, FrameType, do_format_size(FrameSize), Time]), + gen_tcp:close(Socket). + +%% Do a prior knowledge handshake. +do_tcp_handshake(Config, LocalSettings) -> + Protocol = config(protocol, Config), + Socket1 = case Protocol of + http -> + {ok, Socket, _} = ws_SUITE:do_handshake(<<"/ws_ignore">>, Config), + Socket; + http2 -> + {ok, Socket} = gen_tcp:connect("localhost", config(port, Config), [binary, {active, false}]), + %% Send a valid preface. + ok = gen_tcp:send(Socket, [ + "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", + cow_http2:settings(LocalSettings) + ]), + %% Receive the server preface. + {ok, << Len:24 >>} = gen_tcp:recv(Socket, 3, 1000), + {ok, << 4:8, 0:40, SettingsPayload:Len/binary >>} = gen_tcp:recv(Socket, 6 + Len, 1000), + RemoteSettings = cow_http2:parse_settings_payload(SettingsPayload), + #{enable_connect_protocol := true} = RemoteSettings, + %% Send the SETTINGS ack. + ok = gen_tcp:send(Socket, cow_http2:settings_ack()), + %% Receive the SETTINGS ack. + {ok, << 0:24, 4:8, 1:8, 0:32 >>} = gen_tcp:recv(Socket, 9, 1000), + %% Send a CONNECT :protocol request to upgrade the stream to Websocket. + {ReqHeadersBlock, _} = cow_hpack:encode([ + {<<":method">>, <<"CONNECT">>}, + {<<":protocol">>, <<"websocket">>}, + {<<":scheme">>, <<"http">>}, + {<<":path">>, <<"/ws_ignore">>}, + {<<":authority">>, <<"localhost">>}, %% @todo Correct port number. + {<<"sec-websocket-version">>, <<"13">>}, + {<<"origin">>, <<"http://localhost">>} + ]), + ok = gen_tcp:send(Socket, cow_http2:headers(1, nofin, ReqHeadersBlock)), + %% Receive a 200 response. + {ok, << Len1:24, 1:8, _:8, 1:32 >>} = gen_tcp:recv(Socket, 9, 1000), + {ok, RespHeadersBlock} = gen_tcp:recv(Socket, Len1, 1000), + {RespHeaders, _} = cow_hpack:decode(RespHeadersBlock), + {_, <<"200">>} = lists:keyfind(<<":status">>, 1, RespHeaders), + Socket + end, + %% Enable active mode to avoid delays in receiving data at the end of benchmark. + ok = inet:setopts(Socket1, [{active, true}]), + %% Stream number 1 is ready. + {ok, Socket1}. + +do_tcp_send_loop_h1(Socket, 0, _, Mask, _) -> + MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>), + ok = gen_tcp:send(Socket, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>), + do_tcp_wait_for_check(Socket); +do_tcp_send_loop_h1(Socket, Num, FrameType, Mask, MaskedFrameData) -> + Len = byte_size(MaskedFrameData), + LenBits = case Len of + N when N =< 125 -> << N:7 >>; + N when N =< 16#ffff -> << 126:7, N:16 >>; + N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >> + end, + FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>, + ok = gen_tcp:send(Socket, [ + FrameHeader, + MaskedFrameData + ]), + do_tcp_send_loop_h1(Socket, Num - 1, FrameType, Mask, MaskedFrameData). + +do_tcp_send_loop_h2(Socket, _, 0, _, Mask, _, _) -> + MaskedFrameData = ws_SUITE:do_mask(<<"CHECK">>, Mask, <<>>), + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, + <<1:1, 0:3, 1:4, 1:1, 5:7, Mask:32, MaskedFrameData/binary>>)), + do_tcp_wait_for_check(Socket); +do_tcp_send_loop_h2(Socket, Window0, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) + when Window0 < FrameSizeWithHeader -> + %% The remaining window isn't large enough so + %% we wait for WINDOW_UPDATE frames. + Window = do_tcp_wait_window_updates(Socket, Window0), + do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader); +do_tcp_send_loop_h2(Socket, Window, Num, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader) -> + Len = byte_size(MaskedFrameData), + LenBits = case Len of + N when N =< 125 -> << N:7 >>; + N when N =< 16#ffff -> << 126:7, N:16 >>; + N when N =< 16#7fffffffffffffff -> << 127:7, N:64 >> + end, + FrameHeader = <<1:1, 0:3, 2:4, 1:1, LenBits/bits, Mask:32>>, + ok = gen_tcp:send(Socket, cow_http2:data(1, nofin, [ + FrameHeader, + MaskedFrameData + ])), + do_tcp_send_loop_h2(Socket, Window - FrameSizeWithHeader, + Num - 1, FrameType, Mask, MaskedFrameData, FrameSizeWithHeader). + +do_tcp_wait_window_updates(Socket, Window) -> + receive + {tcp, Socket, Data} -> + do_tcp_wait_window_updates_parse(Socket, Window, Data) + after 0 -> + Window + end. + +do_tcp_wait_window_updates_parse(Socket, Window, Data) -> + case Data of + %% Ignore the connection-wide WINDOW_UPDATE. + <<4:24, 8:8, 0:8, 0:32, 0:1, _:31, Rest/bits>> -> + do_tcp_wait_window_updates_parse(Socket, Window, Rest); + %% Use the stream-specific WINDOW_UPDATE to increment our window. + <<4:24, 8:8, 0:8, 1:32, 0:1, Inc:31, Rest/bits>> -> + do_tcp_wait_window_updates_parse(Socket, Window + Inc, Rest); + %% Other frames are not expected. + <<>> -> + do_tcp_wait_window_updates(Socket, Window) + end. + +do_tcp_wait_for_check(Socket) -> + receive + {tcp, Socket, Data} -> + case binary:match(Data, <<"CHECK">>) of + nomatch -> do_tcp_wait_for_check(Socket); + _ -> ok + end + after 5000 -> + error(timeout) + end. %% Internal. |