aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-07-23 17:22:21 +0200
committerLoïc Hoguin <[email protected]>2020-09-21 15:51:56 +0200
commit323bd167fd33f322ab8747e398e54a8a36f5b753 (patch)
tree8105c23a63f76eb204ea32f4b29e933a18976b5e /src
parentf8272a1e8d5fbf3b8021479d142a2002846fe062 (diff)
downloadgun-323bd167fd33f322ab8747e398e54a8a36f5b753.tar.gz
gun-323bd167fd33f322ab8747e398e54a8a36f5b753.tar.bz2
gun-323bd167fd33f322ab8747e398e54a8a36f5b753.zip
Add the base_stream_ref to gun_http/gun_http2
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl17
-rw-r--r--src/gun_event.erl6
-rw-r--r--src/gun_http.erl76
-rw-r--r--src/gun_http2.erl114
4 files changed, 135 insertions, 78 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 8556b92..00e2d82 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -115,6 +115,19 @@
| #{binary() | string() | atom() => iodata()}.
-export_type([req_headers/0]).
+-type tunnel_info() :: #{
+ stream_ref := reference() | [reference()],
+
+ %% Tunnel.
+ host := inet:hostname() | inet:ip_address(),
+ port := inet:port_number(),
+
+ %% Origin.
+ origin_host => inet:hostname() | inet:ip_address(),
+ origin_port => inet:port_number()
+}.
+-export_type([tunnel_info/0]).
+
-type ws_close_code() :: 1000..4999.
-type ws_frame() :: close | ping | pong
@@ -1214,9 +1227,9 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState}) ->
- %% @todo Not events are currently handled for the request?
+ %% @todo No events are currently handled for the CONNECT request?
ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo,
- Destination, #{host => Host, port => Port},
+ Destination, #{stream_ref => StreamRef, host => Host, port => Port},
Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
%% Public Websocket interface.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 0c1326d..a553ddb 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -57,9 +57,9 @@
%% tls_handshake_start/tls_handshake_end.
%%
%% These events occur when connecting to a TLS server or when
-%% upgrading the connection to use TLS, for example using CONNECT.
-%% The stream_ref/reply_to values are only present when the TLS
-%% handshake occurs as a result of a request.
+%% upgrading the connection or stream to use TLS, for example
+%% using CONNECT. The stream_ref/reply_to values are only
+%% present when the TLS handshake occurs as a result of a request.
-type tls_handshake_event() :: #{
stream_ref => reference(),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index aae47cf..982df82 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -72,6 +72,11 @@
version = 'HTTP/1.1' :: cow_http:version(),
connection = keepalive :: keepalive | close,
buffer = <<>> :: binary(),
+
+ %% Base stream ref, defined when the protocol runs
+ %% inside an HTTP/2 CONNECT stream.
+ base_stream_ref = undefined :: undefined | reference() | [reference()],
+
streams = [] :: [#stream{}],
in = head :: io(),
in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()},
@@ -114,9 +119,10 @@ has_keepalive() -> true.
default_keepalive() -> infinity.
init(_ReplyTo, Socket, Transport, Opts) ->
+ BaseStreamRef = maps:get(stream_ref, Opts, undefined),
Version = maps:get(version, Opts, 'HTTP/1.1'),
{connected, #http_state{socket=Socket, transport=Transport,
- opts=Opts, version=Version}}.
+ opts=Opts, version=Version, base_stream_ref=BaseStreamRef}}.
switch_transport(Transport, Socket, State) ->
State#http_state{socket=Socket, transport=Transport}.
@@ -149,7 +155,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer,
EvHandlerState = case Buffer of
<<>> ->
EvHandler:response_start(#{
- stream_ref => stream_ref(StreamRef),
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0);
_ ->
@@ -237,7 +243,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers},
+ ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
ResponseEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -310,7 +316,7 @@ handle_connect(Rest, State=#http_state{
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
_ = case Stream of
#stream{is_alive=false} -> ok;
- _ -> ReplyTo ! {gun_response, self(), StreamRef, fin, Status, Headers}
+ _ -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), fin, Status, Headers}
end,
%% @todo Figure out whether the event should trigger if the stream was cancelled.
EvHandlerState1 = EvHandler:response_headers(#{
@@ -351,7 +357,7 @@ handle_inform(Rest, State=#http_state{
streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]},
EvHandler, EvHandlerState0, Version, Status, Headers) ->
EvHandlerState = EvHandler:response_inform(#{
- stream_ref => stream_ref(StreamRef),
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -367,16 +373,16 @@ handle_inform(Rest, State=#http_state{
%% @todo We shouldn't ignore Rest.
{_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers),
Upgrade = cow_http_hd:parse_upgrade(Upgrade0),
- ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers},
+ ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers},
{handle_ret({switch_protocol, raw, ReplyTo}, State), EvHandlerState0}
catch _:_ ->
%% When the Upgrade header is missing or invalid we treat
%% the response as any other informational response.
- ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
handle(Rest, State, EvHandler, EvHandlerState)
end;
_ ->
- ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
handle(Rest, State, EvHandler, EvHandlerState)
end.
@@ -390,7 +396,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
false ->
{undefined, EvHandlerState0};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(StreamRef),
+ ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef),
IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => StreamRef,
@@ -402,7 +408,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
fin -> {undefined, EvHandlerState1};
nofin ->
Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]),
- {gun_content_handler:init(ReplyTo, stream_ref(StreamRef),
+ {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef),
Status, Headers, Handlers0), EvHandlerState1}
end
end,
@@ -437,10 +443,6 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandler, EvHandlerState)
end.
-stream_ref({connect, StreamRef, _}) -> StreamRef;
-stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
-stream_ref(StreamRef) -> StreamRef.
-
%% The state must be first in order to retrieve it when the stream ended.
send_data(<<>>, State, nofin) ->
[{state, State}, {active, true}];
@@ -485,7 +487,7 @@ closing(_, #http_state{streams=[]}, _, EvHandlerState) ->
%% Otherwise we set connection: close (even if the header was not sent)
%% and close any pipelined streams, only keeping the active stream.
closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) ->
- close_streams(Tail, {closing, Reason}),
+ close_streams(State, Tail, {closing, Reason}),
{[
{state, State#http_state{connection=close, streams=[LastStream]}},
closing(State)
@@ -499,27 +501,27 @@ close(Reason, State=#http_state{in=body_close,
streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
EvHandler, EvHandlerState) ->
%% We may have more than one stream in case we somehow close abruptly.
- close_streams(Tail, close_reason(Reason)),
+ close_streams(State, Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState);
-close(Reason, #http_state{streams=Streams}, _, EvHandlerState) ->
- close_streams(Streams, close_reason(Reason)),
+close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(State, Streams, close_reason(Reason)),
EvHandlerState.
close_reason(closed) -> closed;
close_reason(Reason) -> {closed, Reason}.
%% @todo Do we want an event for this?
-close_streams([], _) ->
+close_streams(_, [], _) ->
ok;
-close_streams([#stream{is_alive=false}|Tail], Reason) ->
- close_streams(Tail, Reason);
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
- ReplyTo ! {gun_error, self(), StreamRef, Reason},
- close_streams(Tail, Reason).
+close_streams(State, [#stream{is_alive=false}|Tail], Reason) ->
+ close_streams(State, Tail, Reason);
+close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
+ close_streams(State, Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
@@ -684,7 +686,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
end.
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
State;
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
@@ -758,12 +760,12 @@ down(#http_state{streams=Streams}) ->
end || #stream{ref=Ref} <- Streams].
error_stream_closed(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
State.
error_stream_not_found(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
State.
@@ -816,6 +818,18 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
+stream_ref(#http_state{base_stream_ref=undefined}, StreamRef) ->
+ stream_ref(StreamRef);
+stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef)
+ when is_reference(BaseStreamRef) ->
+ [BaseStreamRef, stream_ref(StreamRef)];
+stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef) ->
+ BaseStreamRef ++ [stream_ref(StreamRef)].
+
+stream_ref({connect, StreamRef, _}) -> StreamRef;
+stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
+stream_ref(StreamRef) -> StreamRef.
+
new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow) ->
State#http_state{streams=Streams
@@ -840,9 +854,9 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
-ws_upgrade(#http_state{version='HTTP/1.0'},
+ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
{[], EvHandlerState};
ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
@@ -939,7 +953,7 @@ ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensi
%% We know that the most recent stream is the Websocket one.
ws_handshake_end(Buffer,
- #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]},
+ State=#http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]},
#websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
@@ -950,7 +964,7 @@ ws_handshake_end(Buffer,
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
+ ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
stream_ref => StreamRef,
headers => Headers,
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 14eb10b..7b6d1eb 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -36,16 +36,6 @@
-export([down/1]).
%-export([ws_upgrade/10]).
--type tunnel_info() :: #{
- %% Tunnel.
- host := inet:hostname() | inet:ip_address(),
- port := inet:port_number(),
-
- %% Origin.
- origin_host => inet:hostname() | inet:ip_address(),
- origin_port => inet:port_number()
-}.
-
-record(stream, {
id = undefined :: cow_http2:streamid(),
@@ -66,8 +56,8 @@
handler_state :: undefined | gun_content_handler:state(),
%% CONNECT tunnel.
- tunnel :: {module(), any(), tunnel_info()}
- | {setup, gun:connect_destination(), tunnel_info()}
+ tunnel :: {module(), any(), gun:tunnel_info()}
+ | {setup, gun:connect_destination(), gun:tunnel_info()}
| undefined
}).
@@ -78,6 +68,10 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
+ %% Base stream ref, defined when the protocol runs
+ %% inside an HTTP/2 CONNECT stream.
+ base_stream_ref = undefined :: undefined | reference() | [reference()],
+
%% Current status of the connection. We use this to ensure we are
%% not sending the GOAWAY frame more than once, and to validate
%% the server connection preface.
@@ -158,9 +152,10 @@ init(_ReplyTo, Socket, Transport, Opts0) ->
},
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
+ BaseStreamRef = maps:get(stream_ref, Opts, undefined),
%% @todo Better validate the preface being received.
- State = #http2_state{socket=Socket,
- transport=Transport, opts=Opts, content_handlers=Handlers,
+ State = #http2_state{socket=Socket, transport=Transport, opts=Opts,
+ base_stream_ref=BaseStreamRef, content_handlers=Handlers,
http2_machine=HTTP2Machine},
Transport:send(Socket, Preface),
{connected, State}.
@@ -317,6 +312,24 @@ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
Stream=#stream{tunnel=undefined} ->
data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream);
Stream=#stream{tunnel={Protocol, ProtoState0, TunnelInfo}} ->
+ %% @todo Can't call Protocol:handle directly, may need to unwrap TLS first...
+
+ %% in this case we know Transport is either gun_tcp_proxy or gun_tls_proxy
+ %% if gun_tcp_proxy we can dispatch to Protocol:handle directly;
+ %% otherwise we must pass the data to gun_tls_proxy
+ %% -> send {ssl, Socket, Data}
+ %% -> eventually Gun process receives {Tag, Socket, Data}
+ %% -> somehow it needs to call this stream to resume processing and call Protocol:handle
+
+ %% maybe {Tag, Socket, Data, Info} instead and Info is used to dispatch
+ %% maybe {stream_Tag, StreamRef, Data}
+ %% -> StreamRef to know which stream is the connect stream (potentially recursive)
+ %% -> Protocol:resume_handle(Data, StreamRef, State, EvHandler, EvHandlerState)
+ %% -> if reference() then we do Protocol:handle/4
+ %% -> otherwise we pass to the next stream onward
+
+ %% This means that #stream{} must contain both the user-facing StreamRef and the reference.
+
%% @todo Commands.
{{state, ProtoState}, EvHandlerState} = Protocol:handle(Data, ProtoState0,
EvHandler, EvHandlerState0),
@@ -372,10 +385,9 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
tunnel=Tunnel
} = Stream,
State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]},
- %% @todo CONNECT response handling
if
Status >= 100, Status =< 199 ->
- ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
+ ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
EvHandlerState = EvHandler:response_inform(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -384,15 +396,19 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
}, EvHandlerState0),
{State, EvHandlerState};
Status >= 200, Status =< 299, element(1, Tunnel) =:= setup ->
- ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
+ {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel,
+ %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo.
+ %% We therefore do not need to call stream_ref/2.
+ %% @todo Maybe we don't need it in TunnelInfo anymore?
+ #{stream_ref := RealStreamRef} = TunnelInfo,
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
}, EvHandlerState0),
%% @todo Handle TLS over TCP and TLS over TLS.
- {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel,
tcp = maps:get(transport, Destination, tcp),
[Protocol0] = maps:get(protocols, Destination, [http]),
%% Options are either passed directly or #{} is used. Since the
@@ -405,19 +421,24 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
%% @todo What about the StateName returned?
OriginSocket = #{
reply_to => ReplyTo,
- stream_ref => StreamRef
+ stream_ref => RealStreamRef
},
OriginTransport = gun_tcp_proxy,
+ %% @todo Depending on protocol:
+ %% - HTTP/1.1 will need to add the stream_ref in Opts to its StreamRef in messages.
+ %% - HTTP/2 as well
+ %% - raw already uses it
+ %% - ws already uses it (but it's passed slightly differently)
+ %% - socks might not need it? what about gun_socks_up?
{_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport,
- %% @todo We are giving the wrong StreamRef we need to give the list (if any).
- ProtoOpts#{stream_ref => StreamRef}),
+ ProtoOpts#{stream_ref => RealStreamRef}),
%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
%% @todo What about keepalive?
{store_stream(State, Stream#stream{tunnel={Protocol, ProtoState,
TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}}),
EvHandlerState};
true ->
- ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -435,6 +456,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
{gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end,
+ %% @todo Disable the tunnel if any.
{maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}),
StreamID, remote, IsFin),
EvHandlerState}
@@ -443,7 +465,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), StreamRef, Trailers},
+ ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
ResponseEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -455,7 +477,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), StreamRef,
+ ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef),
{stream_error, Reason, 'Stream reset by server.'}},
EvHandlerState = EvHandler:cancel(#{
stream_ref => StreamRef,
@@ -487,7 +509,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
},
PushPromiseEvent = case Status of
connected ->
- ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
+ ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers},
PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
_ ->
PushPromiseEvent0
@@ -566,7 +588,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
%% the one previously received.
goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
status=Status, streams=Streams0, stream_refs=Refs}, {goaway, LastStreamID, Reason, _}) ->
- {Streams, RemovedRefs} = goaway_streams(maps:to_list(Streams0), LastStreamID,
+ {Streams, RemovedRefs} = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
{goaway, Reason, 'The connection is going away.'}, [], []),
State = State0#http2_state{
streams=maps:from_list(Streams),
@@ -583,14 +605,14 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT
end.
%% Cancel server-initiated streams that are above LastStreamID.
-goaway_streams([], _, _, Acc, RefsAcc) ->
+goaway_streams(_, [], _, _, Acc, RefsAcc) ->
{Acc, RefsAcc};
-goaway_streams([{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc)
+goaway_streams(State, [{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
- close_stream(Stream, Reason),
- goaway_streams(Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]);
-goaway_streams([StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) ->
- goaway_streams(Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc).
+ close_stream(State, Stream, Reason),
+ goaway_streams(State, Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]);
+goaway_streams(State, [StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) ->
+ goaway_streams(State, Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc).
%% We are already closing, do nothing.
closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
@@ -614,10 +636,10 @@ closing(#http2_state{opts=Opts}) ->
Timeout = maps:get(closing_timeout, Opts, 15000),
{closing, Timeout}.
-close(Reason0, #http2_state{streams=Streams}, _, EvHandlerState) ->
+close(Reason0, State=#http2_state{streams=Streams}, _, EvHandlerState) ->
Reason = close_reason(Reason0),
_ = maps:fold(fun(_, Stream, _) ->
- close_stream(Stream, Reason)
+ close_stream(State, Stream, Reason)
end, [], Streams),
EvHandlerState.
@@ -625,8 +647,8 @@ close_reason(closed) -> closed;
close_reason(Reason) -> {closed, Reason}.
%% @todo Do we want an event for this?
-close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
- ReplyTo ! {gun_error, self(), StreamRef, Reason},
+close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
ok.
keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
@@ -721,7 +743,7 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port,
InitialFlow, EvHandler, EvHandlerState0),
{store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState};
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{State, EvHandlerState0};
error ->
@@ -796,7 +818,7 @@ data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0)
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState};
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{State, EvHandlerState0};
error ->
@@ -864,7 +886,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), StreamRef, StreamError},
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
State;
error ->
State0
@@ -1009,17 +1031,25 @@ connection_error(#http2_state{socket=Socket, transport=Transport,
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
State.
error_stream_not_found(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
State.
%% Streams.
+stream_ref(#http2_state{base_stream_ref=undefined}, StreamRef) ->
+ StreamRef;
+stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef)
+ when is_reference(BaseStreamRef) ->
+ [BaseStreamRef, StreamRef];
+stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef) ->
+ BaseStreamRef ++ [StreamRef].
+
get_stream_by_id(#http2_state{streams=Streams}, StreamID) ->
maps:get(StreamID, Streams).