diff options
author | Loïc Hoguin <[email protected]> | 2025-08-21 18:09:42 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2025-09-15 13:09:23 +0200 |
commit | 8da6ca11e8ea4e93def78bd0299decd6f409bc43 (patch) | |
tree | e897bf997175a113e02acb2b7f4c179c7528aeb4 /src/cowboy_websocket.erl | |
parent | a8c717718a3f4dd7b4bc67fe7bebe3a4e7a7ed74 (diff) | |
download | cowboy-direct-data_delivery-for-h2-websocket.tar.gz cowboy-direct-data_delivery-for-h2-websocket.tar.bz2 cowboy-direct-data_delivery-for-h2-websocket.zip |
New data delivery mechanism for HTTP/2+ Websocketdirect-data_delivery-for-h2-websocket
A new data_delivery mechanism called 'relay' has been added.
It bypasses stream handlers (and the buffering in cowboy_stream_h)
and sends the data directly to the process implementing
Websocket (and should work for other similar protocols
like HTTP/2 WebTransport).
Flow control in HTTP/2 is maintained in a simpler way,
via a configured flow value that is used to maintain
the window to a reasonable value when data is received.
The 'relay' data_delivery has been implemented for both
HTTP/2 and HTTP/3. It has not been implemented for HTTP/1.1
since switching protocol there overrides the connection process.
HTTP/2 Websocket is now better tested.
A bug was fixed with the 'stream_handlers' data_delivery
where active mode would not be reenabled if it was disabled
at some point.
The Websocket performance suite has been updated to
include tests that do not use Gun. Websocket modules
used by the performance suite use the 'relay' data_delivery
now. Performance is improved significantly with 'relay',
between 10% and 20% faster. HTTP/2 Websocket performance
is not on par with HTTP/1.1 still, but the remaining
difference is thought to be from the HTTP/2 overhead and
flow control.
Diffstat (limited to 'src/cowboy_websocket.erl')
-rw-r--r-- | src/cowboy_websocket.erl | 62 |
1 files changed, 49 insertions, 13 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index cb30c3f..65289cd 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -68,6 +68,8 @@ -type opts() :: #{ active_n => pos_integer(), compress => boolean(), + data_delivery => stream_handlers | relay, + data_delivery_flow => pos_integer(), deflate_opts => cow_ws:deflate_opts(), dynamic_buffer => false | {pos_integer(), pos_integer()}, dynamic_buffer_initial_average => non_neg_integer(), @@ -91,7 +93,7 @@ parent :: undefined | pid(), ref :: ranch:ref(), socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined, - transport = undefined :: module() | undefined, + transport :: module() | {data_delivery, stream_handlers | relay}, opts = #{} :: opts(), active = true :: boolean(), handler :: module(), @@ -149,7 +151,7 @@ upgrade(Req, Env, Handler, HandlerState) -> %% @todo Immediately crash if a response has already been sent. upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> FilteredReq = case maps:get(req_filter, Opts, undefined) of - undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0); + undefined -> maps:with([method, version, scheme, host, port, path, qs, peer, streamid], Req0); FilterFun -> FilterFun(Req0) end, Utf8State = case maps:get(validate_utf8, Opts, true) of @@ -273,12 +275,27 @@ websocket_handshake(State=#state{key=Key}, %% For HTTP/2 we do not let the process die, we instead keep it %% for the Websocket stream. This is because in HTTP/2 we only %% have a stream, it doesn't take over the whole connection. -websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, +%% +%% There are two methods of delivering data to the Websocket session: +%% - 'stream_handlers' is the default and makes the data go +%% through stream handlers just like when reading a request body; +%% - 'relay' is a new method where data is sent as a message as +%% soon as it is received from the socket in a DATA frame. +websocket_handshake(State=#state{opts=Opts}, + Req=#{ref := Ref, pid := Pid, streamid := StreamID}, HandlerState, _Env) -> %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), - Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, + DataDelivery = maps:get(data_delivery, Opts, stream_handlers), + ModState = #{ + data_delivery => DataDelivery, + %% For relay data_delivery. The flow is a hint and may + %% not be used by the underlying protocol. + data_delivery_pid => self(), + data_delivery_flow => maps:get(data_delivery_flow, Opts, 131072) + }, + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, ModState}}, + takeover(Pid, Ref, {Pid, StreamID}, {data_delivery, DataDelivery}, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -301,7 +318,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -type parse_state() :: #ps_header{} | #ps_payload{}. -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, - module() | undefined, any(), binary(), + module() | {data_delivery, stream_handlers | relay}, any(), binary(), {#state{}, any()}) -> no_return(). takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> @@ -311,7 +328,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, _ -> ranch:remove_connection(Ref) end, Messages = case Transport of - undefined -> undefined; + {data_delivery, _} -> undefined; _ -> Transport:messages() end, State = set_idle_timeout(State0#state{parent=Parent, @@ -355,13 +372,14 @@ after_init(State, HandlerState, ParseState) -> %% immediately but there might still be data to be processed in %% the message queue. -setopts_active(#state{transport=undefined}) -> +setopts_active(#state{transport={data_delivery, _}}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). -maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> +maybe_read_body(#state{transport={data_delivery, stream_handlers}, + socket=Stream={Pid, _}, active=true}) -> %% @todo Keep Ref around. ReadBodyRef = make_ref(), Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, @@ -369,16 +387,25 @@ maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true} maybe_read_body(_) -> ok. -active(State) -> +active(State=#state{transport={data_delivery, relay}, + socket=Stream={Pid, _}}) -> + Pid ! {'$cowboy_relay_command', Stream, active}, + State#state{active=true}; +active(State0) -> + State = State0#state{active=true}, setopts_active(State), maybe_read_body(State), - State#state{active=true}. + State. -passive(State=#state{transport=undefined}) -> +passive(State=#state{transport={data_delivery, stream_handlers}}) -> %% Unfortunately we cannot currently cancel read_body. %% But that's OK, we will just stop reading the body %% after the next message. State#state{active=false}; +passive(State=#state{transport={data_delivery, relay}, + socket=Stream={Pid, _}}) -> + Pid ! {'$cowboy_relay_command', Stream, passive}, + State#state{active=false}; passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) -> Transport:setopts(Socket, [{active, false}]), flush_passive(Socket, Messages), @@ -454,6 +481,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, {request_body, _Ref, fin, _, Data} -> maybe_read_body(State), parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); + %% @todo It would be better to check StreamID. + %% @todo We must ensure that IsFin=fin is handled like a socket close? + {'$cowboy_relay_data', {Pid, _StreamID}, _IsFin, Data} when Pid =:= Parent -> + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); %% Timeouts. {timeout, TRef, ?MODULE} -> tick_idle_timeout(State, HandlerState, ParseState); @@ -662,9 +693,14 @@ commands([Frame|Tail], State, Data0) -> commands(Tail, State, Data) end. -transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) -> +transport_send(#state{transport={data_delivery, stream_handlers}, + socket=Stream={Pid, _}}, IsFin, Data) -> Pid ! {Stream, {data, IsFin, Data}}, ok; +transport_send(#state{transport={data_delivery, relay}, + socket=Stream={Pid, _}}, IsFin, Data) -> + Pid ! {'$cowboy_relay_command', Stream, {data, IsFin, Data}}, + ok; transport_send(#state{socket=Socket, transport=Transport}, _, Data) -> Transport:send(Socket, Data). |