diff options
author | Loïc Hoguin <[email protected]> | 2025-01-02 15:21:51 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2025-01-02 15:21:51 +0100 |
commit | 23f96b811a48e53f34799092b03afb424cfdc269 (patch) | |
tree | b5c45fa3876a7da6902040eacfc4281110b5e936 /src | |
parent | 8efcedd3a089e6ab5317e4310fed424a4ee130f8 (diff) | |
download | gun-23f96b811a48e53f34799092b03afb424cfdc269.tar.gz gun-23f96b811a48e53f34799092b03afb424cfdc269.tar.bz2 gun-23f96b811a48e53f34799092b03afb424cfdc269.zip |
HTTP/2: Fix tunneled streams bugs
* Tunneled streams can now close the stream.
* Data received on tunneled streams now result in
WINDOW_UPDATE frames being sent if necessary,
and flow control is handled.
This was detected as part of writing a new Cowboy test
suite for benchmarking Websocket, but should help other
uses too.
Diffstat (limited to 'src')
-rw-r--r-- | src/gun_http2.erl | 113 |
1 files changed, 63 insertions, 50 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index a1ccef6..4903258 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -240,6 +240,8 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea case frame(State0, Frame, CookieStore0, EvHandler, EvHandlerState0) of {Error={error, _}, CookieStore, EvHandlerState} -> {Error, CookieStore, EvHandlerState}; + {[{state, State}, close], CookieStore, EvHandlerState} -> + {[{state, State}, close], CookieStore, EvHandlerState}; {{state, State}, CookieStore, EvHandlerState} -> parse(Rest, State, CookieStore, EvHandler, EvHandlerState) end; @@ -379,67 +381,37 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of - Stream=#stream{tunnel=undefined} -> + Stream=#stream{tunnel=undefined, handler_state=Handlers0} -> + {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), {StateOrError, EvHandlerState} = data_frame1(State0, - StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream), + StreamID, IsFin, Data, EvHandler, EvHandlerState0, + Stream#stream{handler_state=Handlers}, Dec), {StateOrError, CookieStore0, EvHandlerState}; Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> % %% @todo What about IsFin? - {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data, - ProtoState0, CookieStore0, EvHandler, EvHandlerState0), - %% The frame/parse functions only handle state or error commands. - {ResCommands, EvHandlerState} = tunnel_commands(Commands, - Stream, State0, EvHandler, EvHandlerState1), - {ResCommands, CookieStore, EvHandlerState} + {StateOrError, EvHandlerState1} = data_frame1(State0, + StreamID, IsFin, Data, EvHandler, EvHandlerState0, + Stream, 0), + case StateOrError of + {state, State} -> + {Commands, CookieStore, EvHandlerState2} = Proto:handle(Data, + ProtoState0, CookieStore0, EvHandler, EvHandlerState1), + %% The frame/parse functions only handle state or error commands. + {ResCommands, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState2), + {ResCommands, CookieStore, EvHandlerState}; + Error = {error, _} -> + {Error, CookieStore0, EvHandlerState1} + end end. -%% Send errors are returned. Other errors cause the stream to be deleted. -tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState) - when not is_list(Command) -> - tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState); -tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) -> - {{state, store_stream(State, Stream)}, EvHandlerState}; -tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID}, - State0, EvHandler, EvHandlerState0) -> - case maybe_send_data(State0, StreamID, - IsFin, Data, EvHandler, EvHandlerState0) of - {{state, State}, EvHandlerState} -> - tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); - ErrorResult={{error, _Reason}, _EvHandlerState} -> - ErrorResult - end; -tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, - State, EvHandler, EvHandlerState) -> - tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}, - State, EvHandler, EvHandlerState); -tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, - State, _EvHandler, EvHandlerState) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), - {stream_error, Reason, 'Tunnel closed unexpectedly.'}}, - {{state, delete_stream(State, StreamID)}, EvHandlerState}; -%% @todo Set a timeout for closing the Websocket stream. -tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> - tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); -%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.) -tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> - tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState). - -continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) -> - case ContinueStreamRef of - [_|_] -> ContinueStreamRef ++ [StreamRef]; - _ -> [ContinueStreamRef, StreamRef] - end; -continue_stream_ref(State, StreamRef) -> - stream_ref(State, StreamRef). - data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, - Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) -> - {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), + Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0}, Dec) -> Flow = case Flow0 of infinity -> infinity; _ -> Flow0 - Dec end, - State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}), + State1 = store_stream(State0, Stream#stream{flow=Flow}), {StateOrError, EvHandlerState} = case byte_size(Data) of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> @@ -475,6 +447,47 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {Error, EvHandlerState} end. +%% Send errors are returned. Other errors cause the stream to be deleted. +tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState) + when not is_list(Command) -> + tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState); +tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) -> + {{state, store_stream(State, Stream)}, EvHandlerState}; +tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID}, + State0, EvHandler, EvHandlerState0) -> + case maybe_send_data(State0, StreamID, + IsFin, Data, EvHandler, EvHandlerState0) of + {{state, State}, EvHandlerState} -> + tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); + ErrorResult={{error, _Reason}, _EvHandlerState} -> + ErrorResult + end; +tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, + State, EvHandler, EvHandlerState) -> + tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}, + State, EvHandler, EvHandlerState); +tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + State, _EvHandler, EvHandlerState) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), + {stream_error, Reason, 'Tunnel closed unexpectedly.'}}, + {{state, delete_stream(State, StreamID)}, EvHandlerState}; +%% @todo Set a timeout for closing the Websocket stream. +tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> + tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); +tunnel_commands([close|_Tail], #stream{id=StreamID}, State, _EvHandler, EvHandlerState) -> + {[{state, delete_stream(State, StreamID)}, close], EvHandlerState}; +%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.) +tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> + tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState). + +continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) -> + case ContinueStreamRef of + [_|_] -> ContinueStreamRef ++ [StreamRef]; + _ -> [ContinueStreamRef, StreamRef] + end; +continue_stream_ref(State, StreamRef) -> + stream_ref(State, StreamRef). + headers_frame(State0=#http2_state{opts=Opts}, StreamID, IsFin, Headers, #{status := Status}, _BodyLen, CookieStore0, EvHandler, EvHandlerState0) -> |