aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2025-01-02 15:21:51 +0100
committerLoïc Hoguin <[email protected]>2025-01-02 15:21:51 +0100
commit23f96b811a48e53f34799092b03afb424cfdc269 (patch)
treeb5c45fa3876a7da6902040eacfc4281110b5e936 /src
parent8efcedd3a089e6ab5317e4310fed424a4ee130f8 (diff)
downloadgun-23f96b811a48e53f34799092b03afb424cfdc269.tar.gz
gun-23f96b811a48e53f34799092b03afb424cfdc269.tar.bz2
gun-23f96b811a48e53f34799092b03afb424cfdc269.zip
HTTP/2: Fix tunneled streams bugs
* Tunneled streams can now close the stream. * Data received on tunneled streams now result in WINDOW_UPDATE frames being sent if necessary, and flow control is handled. This was detected as part of writing a new Cowboy test suite for benchmarking Websocket, but should help other uses too.
Diffstat (limited to 'src')
-rw-r--r--src/gun_http2.erl113
1 files changed, 63 insertions, 50 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index a1ccef6..4903258 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -240,6 +240,8 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea
case frame(State0, Frame, CookieStore0, EvHandler, EvHandlerState0) of
{Error={error, _}, CookieStore, EvHandlerState} ->
{Error, CookieStore, EvHandlerState};
+ {[{state, State}, close], CookieStore, EvHandlerState} ->
+ {[{state, State}, close], CookieStore, EvHandlerState};
{{state, State}, CookieStore, EvHandlerState} ->
parse(Rest, State, CookieStore, EvHandler, EvHandlerState)
end;
@@ -379,67 +381,37 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket,
data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_id(State0, StreamID) of
- Stream=#stream{tunnel=undefined} ->
+ Stream=#stream{tunnel=undefined, handler_state=Handlers0} ->
+ {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
{StateOrError, EvHandlerState} = data_frame1(State0,
- StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream),
+ StreamID, IsFin, Data, EvHandler, EvHandlerState0,
+ Stream#stream{handler_state=Handlers}, Dec),
{StateOrError, CookieStore0, EvHandlerState};
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
% %% @todo What about IsFin?
- {Commands, CookieStore, EvHandlerState1} = Proto:handle(Data,
- ProtoState0, CookieStore0, EvHandler, EvHandlerState0),
- %% The frame/parse functions only handle state or error commands.
- {ResCommands, EvHandlerState} = tunnel_commands(Commands,
- Stream, State0, EvHandler, EvHandlerState1),
- {ResCommands, CookieStore, EvHandlerState}
+ {StateOrError, EvHandlerState1} = data_frame1(State0,
+ StreamID, IsFin, Data, EvHandler, EvHandlerState0,
+ Stream, 0),
+ case StateOrError of
+ {state, State} ->
+ {Commands, CookieStore, EvHandlerState2} = Proto:handle(Data,
+ ProtoState0, CookieStore0, EvHandler, EvHandlerState1),
+ %% The frame/parse functions only handle state or error commands.
+ {ResCommands, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState2),
+ {ResCommands, CookieStore, EvHandlerState};
+ Error = {error, _} ->
+ {Error, CookieStore0, EvHandlerState1}
+ end
end.
-%% Send errors are returned. Other errors cause the stream to be deleted.
-tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState)
- when not is_list(Command) ->
- tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState);
-tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) ->
- {{state, store_stream(State, Stream)}, EvHandlerState};
-tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID},
- State0, EvHandler, EvHandlerState0) ->
- case maybe_send_data(State0, StreamID,
- IsFin, Data, EvHandler, EvHandlerState0) of
- {{state, State}, EvHandlerState} ->
- tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
- ErrorResult={{error, _Reason}, _EvHandlerState} ->
- ErrorResult
- end;
-tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
- State, EvHandler, EvHandlerState) ->
- tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}},
- State, EvHandler, EvHandlerState);
-tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
- State, _EvHandler, EvHandlerState) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
- {stream_error, Reason, 'Tunnel closed unexpectedly.'}},
- {{state, delete_stream(State, StreamID)}, EvHandlerState};
-%% @todo Set a timeout for closing the Websocket stream.
-tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
- tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
-%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.)
-tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
- tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState).
-
-continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->
- case ContinueStreamRef of
- [_|_] -> ContinueStreamRef ++ [StreamRef];
- _ -> [ContinueStreamRef, StreamRef]
- end;
-continue_stream_ref(State, StreamRef) ->
- stream_ref(State, StreamRef).
-
data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
- Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) ->
- {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0}, Dec) ->
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 - Dec
end,
- State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}),
+ State1 = store_stream(State0, Stream#stream{flow=Flow}),
{StateOrError, EvHandlerState} = case byte_size(Data) of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
@@ -475,6 +447,47 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
{Error, EvHandlerState}
end.
+%% Send errors are returned. Other errors cause the stream to be deleted.
+tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState)
+ when not is_list(Command) ->
+ tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState);
+tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) ->
+ {{state, store_stream(State, Stream)}, EvHandlerState};
+tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID},
+ State0, EvHandler, EvHandlerState0) ->
+ case maybe_send_data(State0, StreamID,
+ IsFin, Data, EvHandler, EvHandlerState0) of
+ {{state, State}, EvHandlerState} ->
+ tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
+ ErrorResult={{error, _Reason}, _EvHandlerState} ->
+ ErrorResult
+ end;
+tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
+ State, EvHandler, EvHandlerState) ->
+ tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}},
+ State, EvHandler, EvHandlerState);
+tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
+ State, _EvHandler, EvHandlerState) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
+ {stream_error, Reason, 'Tunnel closed unexpectedly.'}},
+ {{state, delete_stream(State, StreamID)}, EvHandlerState};
+%% @todo Set a timeout for closing the Websocket stream.
+tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
+ tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
+tunnel_commands([close|_Tail], #stream{id=StreamID}, State, _EvHandler, EvHandlerState) ->
+ {[{state, delete_stream(State, StreamID)}, close], EvHandlerState};
+%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.)
+tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
+ tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState).
+
+continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->
+ case ContinueStreamRef of
+ [_|_] -> ContinueStreamRef ++ [StreamRef];
+ _ -> [ContinueStreamRef, StreamRef]
+ end;
+continue_stream_ref(State, StreamRef) ->
+ stream_ref(State, StreamRef).
+
headers_frame(State0=#http2_state{opts=Opts},
StreamID, IsFin, Headers, #{status := Status}, _BodyLen,
CookieStore0, EvHandler, EvHandlerState0) ->