diff options
-rw-r--r-- | src/gun.erl | 17 | ||||
-rw-r--r-- | src/gun_event.erl | 6 | ||||
-rw-r--r-- | src/gun_http.erl | 76 | ||||
-rw-r--r-- | src/gun_http2.erl | 114 | ||||
-rw-r--r-- | test/rfc7540_SUITE.erl | 3 |
5 files changed, 138 insertions, 78 deletions
diff --git a/src/gun.erl b/src/gun.erl index 8556b92..00e2d82 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -115,6 +115,19 @@ | #{binary() | string() | atom() => iodata()}. -export_type([req_headers/0]). +-type tunnel_info() :: #{ + stream_ref := reference() | [reference()], + + %% Tunnel. + host := inet:hostname() | inet:ip_address(), + port := inet:port_number(), + + %% Origin. + origin_host => inet:hostname() | inet:ip_address(), + origin_port => inet:port_number() +}. +-export_type([tunnel_info/0]). + -type ws_close_code() :: 1000..4999. -type ws_frame() :: close | ping | pong @@ -1214,9 +1227,9 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState}) -> - %% @todo Not events are currently handled for the request? + %% @todo No events are currently handled for the CONNECT request? ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, - Destination, #{host => Host, port => Port}, + Destination, #{stream_ref => StreamRef, host => Host, port => Port}, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; %% Public Websocket interface. diff --git a/src/gun_event.erl b/src/gun_event.erl index 0c1326d..a553ddb 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -57,9 +57,9 @@ %% tls_handshake_start/tls_handshake_end. %% %% These events occur when connecting to a TLS server or when -%% upgrading the connection to use TLS, for example using CONNECT. -%% The stream_ref/reply_to values are only present when the TLS -%% handshake occurs as a result of a request. +%% upgrading the connection or stream to use TLS, for example +%% using CONNECT. The stream_ref/reply_to values are only +%% present when the TLS handshake occurs as a result of a request. -type tls_handshake_event() :: #{ stream_ref => reference(), diff --git a/src/gun_http.erl b/src/gun_http.erl index aae47cf..982df82 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -72,6 +72,11 @@ version = 'HTTP/1.1' :: cow_http:version(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), + + %% Base stream ref, defined when the protocol runs + %% inside an HTTP/2 CONNECT stream. + base_stream_ref = undefined :: undefined | reference() | [reference()], + streams = [] :: [#stream{}], in = head :: io(), in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -114,9 +119,10 @@ has_keepalive() -> true. default_keepalive() -> infinity. init(_ReplyTo, Socket, Transport, Opts) -> + BaseStreamRef = maps:get(stream_ref, Opts, undefined), Version = maps:get(version, Opts, 'HTTP/1.1'), {connected, #http_state{socket=Socket, transport=Transport, - opts=Opts, version=Version}}. + opts=Opts, version=Version, base_stream_ref=BaseStreamRef}}. switch_transport(Transport, Socket, State) -> State#http_state{socket=Socket, transport=Transport}. @@ -149,7 +155,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer, EvHandlerState = case Buffer of <<>> -> EvHandler:response_start(#{ - stream_ref => stream_ref(StreamRef), + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0); _ -> @@ -237,7 +243,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers}, + ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, ResponseEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo @@ -310,7 +316,7 @@ handle_connect(Rest, State=#http_state{ %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. _ = case Stream of #stream{is_alive=false} -> ok; - _ -> ReplyTo ! {gun_response, self(), StreamRef, fin, Status, Headers} + _ -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), fin, Status, Headers} end, %% @todo Figure out whether the event should trigger if the stream was cancelled. EvHandlerState1 = EvHandler:response_headers(#{ @@ -351,7 +357,7 @@ handle_inform(Rest, State=#http_state{ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, EvHandler, EvHandlerState0, Version, Status, Headers) -> EvHandlerState = EvHandler:response_inform(#{ - stream_ref => stream_ref(StreamRef), + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, status => Status, headers => Headers @@ -367,16 +373,16 @@ handle_inform(Rest, State=#http_state{ %% @todo We shouldn't ignore Rest. {_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers), Upgrade = cow_http_hd:parse_upgrade(Upgrade0), - ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers}, + ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers}, {handle_ret({switch_protocol, raw, ReplyTo}, State), EvHandlerState0} catch _:_ -> %% When the Upgrade header is missing or invalid we treat %% the response as any other informational response. - ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, handle(Rest, State, EvHandler, EvHandlerState) end; _ -> - ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, handle(Rest, State, EvHandler, EvHandlerState) end. @@ -390,7 +396,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec false -> {undefined, EvHandlerState0}; true -> - ReplyTo ! {gun_response, self(), stream_ref(StreamRef), + ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => StreamRef, @@ -402,7 +408,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec fin -> {undefined, EvHandlerState1}; nofin -> Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]), - {gun_content_handler:init(ReplyTo, stream_ref(StreamRef), + {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef), Status, Headers, Handlers0), EvHandlerState1} end end, @@ -437,10 +443,6 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandler, EvHandlerState) end. -stream_ref({connect, StreamRef, _}) -> StreamRef; -stream_ref(#websocket{ref=StreamRef}) -> StreamRef; -stream_ref(StreamRef) -> StreamRef. - %% The state must be first in order to retrieve it when the stream ended. send_data(<<>>, State, nofin) -> [{state, State}, {active, true}]; @@ -485,7 +487,7 @@ closing(_, #http_state{streams=[]}, _, EvHandlerState) -> %% Otherwise we set connection: close (even if the header was not sent) %% and close any pipelined streams, only keeping the active stream. closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) -> - close_streams(Tail, {closing, Reason}), + close_streams(State, Tail, {closing, Reason}), {[ {state, State#http_state{connection=close, streams=[LastStream]}}, closing(State) @@ -499,27 +501,27 @@ close(Reason, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, EvHandler, EvHandlerState) -> %% We may have more than one stream in case we somehow close abruptly. - close_streams(Tail, close_reason(Reason)), + close_streams(State, Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState); -close(Reason, #http_state{streams=Streams}, _, EvHandlerState) -> - close_streams(Streams, close_reason(Reason)), +close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) -> + close_streams(State, Streams, close_reason(Reason)), EvHandlerState. close_reason(closed) -> closed; close_reason(Reason) -> {closed, Reason}. %% @todo Do we want an event for this? -close_streams([], _) -> +close_streams(_, [], _) -> ok; -close_streams([#stream{is_alive=false}|Tail], Reason) -> - close_streams(Tail, Reason); -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> - ReplyTo ! {gun_error, self(), StreamRef, Reason}, - close_streams(Tail, Reason). +close_streams(State, [#stream{is_alive=false}|Tail], Reason) -> + close_streams(State, Tail, Reason); +close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, + close_streams(State, Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) -> @@ -684,7 +686,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, end. connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, State; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, @@ -758,12 +760,12 @@ down(#http_state{streams=Streams}) -> end || #stream{ref=Ref} <- Streams]. error_stream_closed(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream cannot be found."}}, State. @@ -816,6 +818,18 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. +stream_ref(#http_state{base_stream_ref=undefined}, StreamRef) -> + stream_ref(StreamRef); +stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef) + when is_reference(BaseStreamRef) -> + [BaseStreamRef, stream_ref(StreamRef)]; +stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef) -> + BaseStreamRef ++ [stream_ref(StreamRef)]. + +stream_ref({connect, StreamRef, _}) -> StreamRef; +stream_ref(#websocket{ref=StreamRef}) -> StreamRef; +stream_ref(StreamRef) -> StreamRef. + new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method, Authority, Path, InitialFlow) -> State#http_state{streams=Streams @@ -840,9 +854,9 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. -ws_upgrade(#http_state{version='HTTP/1.0'}, +ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "Websocket cannot be used over an HTTP/1.0 connection."}}, {[], EvHandlerState}; ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, @@ -939,7 +953,7 @@ ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensi %% We know that the most recent stream is the Websocket one. ws_handshake_end(Buffer, - #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]}, + State=#http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]}, #websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of @@ -950,7 +964,7 @@ ws_handshake_end(Buffer, self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, + ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ stream_ref => StreamRef, headers => Headers, diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 14eb10b..7b6d1eb 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -36,16 +36,6 @@ -export([down/1]). %-export([ws_upgrade/10]). --type tunnel_info() :: #{ - %% Tunnel. - host := inet:hostname() | inet:ip_address(), - port := inet:port_number(), - - %% Origin. - origin_host => inet:hostname() | inet:ip_address(), - origin_port => inet:port_number() -}. - -record(stream, { id = undefined :: cow_http2:streamid(), @@ -66,8 +56,8 @@ handler_state :: undefined | gun_content_handler:state(), %% CONNECT tunnel. - tunnel :: {module(), any(), tunnel_info()} - | {setup, gun:connect_destination(), tunnel_info()} + tunnel :: {module(), any(), gun:tunnel_info()} + | {setup, gun:connect_destination(), gun:tunnel_info()} | undefined }). @@ -78,6 +68,10 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), + %% Base stream ref, defined when the protocol runs + %% inside an HTTP/2 CONNECT stream. + base_stream_ref = undefined :: undefined | reference() | [reference()], + %% Current status of the connection. We use this to ensure we are %% not sending the GOAWAY frame more than once, and to validate %% the server connection preface. @@ -158,9 +152,10 @@ init(_ReplyTo, Socket, Transport, Opts0) -> }, {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), + BaseStreamRef = maps:get(stream_ref, Opts, undefined), %% @todo Better validate the preface being received. - State = #http2_state{socket=Socket, - transport=Transport, opts=Opts, content_handlers=Handlers, + State = #http2_state{socket=Socket, transport=Transport, opts=Opts, + base_stream_ref=BaseStreamRef, content_handlers=Handlers, http2_machine=HTTP2Machine}, Transport:send(Socket, Preface), {connected, State}. @@ -317,6 +312,24 @@ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> Stream=#stream{tunnel=undefined} -> data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream); Stream=#stream{tunnel={Protocol, ProtoState0, TunnelInfo}} -> + %% @todo Can't call Protocol:handle directly, may need to unwrap TLS first... + + %% in this case we know Transport is either gun_tcp_proxy or gun_tls_proxy + %% if gun_tcp_proxy we can dispatch to Protocol:handle directly; + %% otherwise we must pass the data to gun_tls_proxy + %% -> send {ssl, Socket, Data} + %% -> eventually Gun process receives {Tag, Socket, Data} + %% -> somehow it needs to call this stream to resume processing and call Protocol:handle + + %% maybe {Tag, Socket, Data, Info} instead and Info is used to dispatch + %% maybe {stream_Tag, StreamRef, Data} + %% -> StreamRef to know which stream is the connect stream (potentially recursive) + %% -> Protocol:resume_handle(Data, StreamRef, State, EvHandler, EvHandlerState) + %% -> if reference() then we do Protocol:handle/4 + %% -> otherwise we pass to the next stream onward + + %% This means that #stream{} must contain both the user-facing StreamRef and the reference. + %% @todo Commands. {{state, ProtoState}, EvHandlerState} = Protocol:handle(Data, ProtoState0, EvHandler, EvHandlerState0), @@ -372,10 +385,9 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com tunnel=Tunnel } = Stream, State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]}, - %% @todo CONNECT response handling if Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, + ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, EvHandlerState = EvHandler:response_inform(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -384,15 +396,19 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com }, EvHandlerState0), {State, EvHandlerState}; Status >= 200, Status =< 299, element(1, Tunnel) =:= setup -> - ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel, + %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo. + %% We therefore do not need to call stream_ref/2. + %% @todo Maybe we don't need it in TunnelInfo anymore? + #{stream_ref := RealStreamRef} = TunnelInfo, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), %% @todo Handle TLS over TCP and TLS over TLS. - {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel, tcp = maps:get(transport, Destination, tcp), [Protocol0] = maps:get(protocols, Destination, [http]), %% Options are either passed directly or #{} is used. Since the @@ -405,19 +421,24 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com %% @todo What about the StateName returned? OriginSocket = #{ reply_to => ReplyTo, - stream_ref => StreamRef + stream_ref => RealStreamRef }, OriginTransport = gun_tcp_proxy, + %% @todo Depending on protocol: + %% - HTTP/1.1 will need to add the stream_ref in Opts to its StreamRef in messages. + %% - HTTP/2 as well + %% - raw already uses it + %% - ws already uses it (but it's passed slightly differently) + %% - socks might not need it? what about gun_socks_up? {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, - %% @todo We are giving the wrong StreamRef we need to give the list (if any). - ProtoOpts#{stream_ref => StreamRef}), + ProtoOpts#{stream_ref => RealStreamRef}), %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), %% @todo What about keepalive? {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}}), EvHandlerState}; true -> - ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -435,6 +456,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com {gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0), EvHandlerState1} end, + %% @todo Disable the tunnel if any. {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState} @@ -443,7 +465,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, + ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, ResponseEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo @@ -455,7 +477,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), StreamRef, + ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef), {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ stream_ref => StreamRef, @@ -487,7 +509,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, }, PushPromiseEvent = case Status of connected -> - ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, + ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers}, PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; _ -> PushPromiseEvent0 @@ -566,7 +588,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, %% the one previously received. goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, status=Status, streams=Streams0, stream_refs=Refs}, {goaway, LastStreamID, Reason, _}) -> - {Streams, RemovedRefs} = goaway_streams(maps:to_list(Streams0), LastStreamID, + {Streams, RemovedRefs} = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {goaway, Reason, 'The connection is going away.'}, [], []), State = State0#http2_state{ streams=maps:from_list(Streams), @@ -583,14 +605,14 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT end. %% Cancel server-initiated streams that are above LastStreamID. -goaway_streams([], _, _, Acc, RefsAcc) -> +goaway_streams(_, [], _, _, Acc, RefsAcc) -> {Acc, RefsAcc}; -goaway_streams([{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc) +goaway_streams(State, [{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc) when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> - close_stream(Stream, Reason), - goaway_streams(Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]); -goaway_streams([StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) -> - goaway_streams(Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc). + close_stream(State, Stream, Reason), + goaway_streams(State, Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]); +goaway_streams(State, [StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) -> + goaway_streams(State, Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc). %% We are already closing, do nothing. closing(_, #http2_state{status=closing}, _, EvHandlerState) -> @@ -614,10 +636,10 @@ closing(#http2_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), {closing, Timeout}. -close(Reason0, #http2_state{streams=Streams}, _, EvHandlerState) -> +close(Reason0, State=#http2_state{streams=Streams}, _, EvHandlerState) -> Reason = close_reason(Reason0), _ = maps:fold(fun(_, Stream, _) -> - close_stream(Stream, Reason) + close_stream(State, Stream, Reason) end, [], Streams), EvHandlerState. @@ -625,8 +647,8 @@ close_reason(closed) -> closed; close_reason(Reason) -> {closed, Reason}. %% @todo Do we want an event for this? -close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> - ReplyTo ! {gun_error, self(), StreamRef, Reason}, +close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> @@ -721,7 +743,7 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port, InitialFlow, EvHandler, EvHandlerState0), {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}; #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {State, EvHandlerState0}; error -> @@ -796,7 +818,7 @@ data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}; #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {State, EvHandlerState0}; error -> @@ -864,7 +886,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), StreamRef, StreamError}, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, State; error -> State0 @@ -1009,17 +1031,25 @@ connection_error(#http2_state{socket=Socket, transport=Transport, %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream cannot be found."}}, State. %% Streams. +stream_ref(#http2_state{base_stream_ref=undefined}, StreamRef) -> + StreamRef; +stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef) + when is_reference(BaseStreamRef) -> + [BaseStreamRef, StreamRef]; +stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef) -> + BaseStreamRef ++ [StreamRef]. + get_stream_by_id(#http2_state{streams=Streams}, StreamID) -> maps:get(StreamID, Streams). diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index b171fa2..0991b1b 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -534,3 +534,6 @@ do_connect_http(OriginScheme, OriginTransport, OriginProtocol, ProxyScheme, Prox }] }} = gun:stream_info(ConnPid, ProxiedStreamRef), gun:close(ConnPid). + +%% @todo Have a test with a Cowboy origin that confirms that tunneled requests +%% work as intended. |