diff options
author | Loïc Hoguin <[email protected]> | 2025-08-21 18:09:42 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2025-09-15 13:09:23 +0200 |
commit | 8da6ca11e8ea4e93def78bd0299decd6f409bc43 (patch) | |
tree | e897bf997175a113e02acb2b7f4c179c7528aeb4 /src/cowboy_http2.erl | |
parent | a8c717718a3f4dd7b4bc67fe7bebe3a4e7a7ed74 (diff) | |
download | cowboy-direct-data_delivery-for-h2-websocket.tar.gz cowboy-direct-data_delivery-for-h2-websocket.tar.bz2 cowboy-direct-data_delivery-for-h2-websocket.zip |
New data delivery mechanism for HTTP/2+ Websocketdirect-data_delivery-for-h2-websocket
A new data_delivery mechanism called 'relay' has been added.
It bypasses stream handlers (and the buffering in cowboy_stream_h)
and sends the data directly to the process implementing
Websocket (and should work for other similar protocols
like HTTP/2 WebTransport).
Flow control in HTTP/2 is maintained in a simpler way,
via a configured flow value that is used to maintain
the window to a reasonable value when data is received.
The 'relay' data_delivery has been implemented for both
HTTP/2 and HTTP/3. It has not been implemented for HTTP/1.1
since switching protocol there overrides the connection process.
HTTP/2 Websocket is now better tested.
A bug was fixed with the 'stream_handlers' data_delivery
where active mode would not be reenabled if it was disabled
at some point.
The Websocket performance suite has been updated to
include tests that do not use Gun. Websocket modules
used by the performance suite use the 'relay' data_delivery
now. Performance is improved significantly with 'relay',
between 10% and 20% faster. HTTP/2 Websocket performance
is not on par with HTTP/1.1 still, but the remaining
difference is thought to be from the HTTP/2 overhead and
flow control.
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r-- | src/cowboy_http2.erl | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0d22fa1..5cb0585 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -82,8 +82,13 @@ -export_type([opts/0]). -record(stream, { - %% Whether the stream is currently stopping. - status = running :: running | stopping, + %% Whether the stream is currently in a special state. + %% + %% - The running state is the normal state of a stream. + %% - The relaying state is used by extended CONNECT protocols to + %% use a 'relay' data_delivery method. + %% - The stopping state indicates the stream used the 'stop' command. + status = running :: running | {relaying, non_neg_integer(), pid()} | stopping, %% Flow requested for this stream. flow = 0 :: non_neg_integer(), @@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> before_loop(info(State, StreamID, Msg), Buffer); + {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() -> + before_loop(relay_command(State, StreamID, RelayCommand), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> before_loop(down(State, Pid, Msg), Buffer); @@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; + %% Stream handlers are not used for the data when relaying. + #{StreamID := #stream{status={relaying, _, RelayPid}}} -> + RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data}, + %% We keep a steady flow using the configured flow value. + %% Because we do not change the 'flow' value the update_window/2 + %% function will always maintain this value (of course with + %% thresholds applying). + update_window(State0, StreamID); %% We ignore DATA frames for streams that are stopping. #{} -> State0 @@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. +%% There are two data_delivery: stream_handlers and relay. +%% The former just has the data go through stream handlers +%% like normal requests. The latter relays data directly. +%% +%% @todo When relaying there might be some data that is +%% in stream handlers and that need to be received, +%% depending on whether the protocol sends data +%% before processing the response. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, + [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) -> + State1 = info(State0, StreamID, {headers, 200, Headers}), + #{StreamID := Stream} = Streams, + #{data_delivery_pid := RelayPid} = ModState, + %% WINDOW_UPDATE frames updating the window will be sent after + %% the first DATA frame has been received. + RelayFlow = maps:get(data_delivery_flow, ModState, 131072), + State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{ + status={relaying, RelayFlow, RelayPid}, + flow=RelayFlow}}}, + commands(State, StreamID, Tail); commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(State0, StreamID, {headers, 200, Headers}), commands(State, StreamID, Tail); @@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Relay data delivery commands. + +relay_command(State, StreamID, DataCmd = {data, _, _}) -> + commands(State, StreamID, [DataCmd]); +%% When going active mode again we set the RelayFlow again +%% and update the window if necessary. +relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) -> + #{StreamID := Stream} = Streams, + #stream{status={relaying, RelayFlow, _}} = Stream, + update_window(State#state{flow=Flow + RelayFlow, + streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}}, + StreamID); +%% When going passive mode we don't update the window +%% since we have not incremented it. +relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) -> + #{StreamID := Stream} = Streams, + #stream{flow=StreamFlow} = Stream, + State#state{flow=Flow - StreamFlow, + streams=Streams#{StreamID => Stream#stream{flow=0}}}. + %% Tentatively update the window after the flow was updated. update_window(State0=#state{socket=Socket, transport=Transport, |