aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-07-23 17:22:21 +0200
committerLoïc Hoguin <[email protected]>2020-09-21 15:51:56 +0200
commit323bd167fd33f322ab8747e398e54a8a36f5b753 (patch)
tree8105c23a63f76eb204ea32f4b29e933a18976b5e /src/gun_http2.erl
parentf8272a1e8d5fbf3b8021479d142a2002846fe062 (diff)
downloadgun-323bd167fd33f322ab8747e398e54a8a36f5b753.tar.gz
gun-323bd167fd33f322ab8747e398e54a8a36f5b753.tar.bz2
gun-323bd167fd33f322ab8747e398e54a8a36f5b753.zip
Add the base_stream_ref to gun_http/gun_http2
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl114
1 files changed, 72 insertions, 42 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 14eb10b..7b6d1eb 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -36,16 +36,6 @@
-export([down/1]).
%-export([ws_upgrade/10]).
--type tunnel_info() :: #{
- %% Tunnel.
- host := inet:hostname() | inet:ip_address(),
- port := inet:port_number(),
-
- %% Origin.
- origin_host => inet:hostname() | inet:ip_address(),
- origin_port => inet:port_number()
-}.
-
-record(stream, {
id = undefined :: cow_http2:streamid(),
@@ -66,8 +56,8 @@
handler_state :: undefined | gun_content_handler:state(),
%% CONNECT tunnel.
- tunnel :: {module(), any(), tunnel_info()}
- | {setup, gun:connect_destination(), tunnel_info()}
+ tunnel :: {module(), any(), gun:tunnel_info()}
+ | {setup, gun:connect_destination(), gun:tunnel_info()}
| undefined
}).
@@ -78,6 +68,10 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
+ %% Base stream ref, defined when the protocol runs
+ %% inside an HTTP/2 CONNECT stream.
+ base_stream_ref = undefined :: undefined | reference() | [reference()],
+
%% Current status of the connection. We use this to ensure we are
%% not sending the GOAWAY frame more than once, and to validate
%% the server connection preface.
@@ -158,9 +152,10 @@ init(_ReplyTo, Socket, Transport, Opts0) ->
},
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
+ BaseStreamRef = maps:get(stream_ref, Opts, undefined),
%% @todo Better validate the preface being received.
- State = #http2_state{socket=Socket,
- transport=Transport, opts=Opts, content_handlers=Handlers,
+ State = #http2_state{socket=Socket, transport=Transport, opts=Opts,
+ base_stream_ref=BaseStreamRef, content_handlers=Handlers,
http2_machine=HTTP2Machine},
Transport:send(Socket, Preface),
{connected, State}.
@@ -317,6 +312,24 @@ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
Stream=#stream{tunnel=undefined} ->
data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream);
Stream=#stream{tunnel={Protocol, ProtoState0, TunnelInfo}} ->
+ %% @todo Can't call Protocol:handle directly, may need to unwrap TLS first...
+
+ %% in this case we know Transport is either gun_tcp_proxy or gun_tls_proxy
+ %% if gun_tcp_proxy we can dispatch to Protocol:handle directly;
+ %% otherwise we must pass the data to gun_tls_proxy
+ %% -> send {ssl, Socket, Data}
+ %% -> eventually Gun process receives {Tag, Socket, Data}
+ %% -> somehow it needs to call this stream to resume processing and call Protocol:handle
+
+ %% maybe {Tag, Socket, Data, Info} instead and Info is used to dispatch
+ %% maybe {stream_Tag, StreamRef, Data}
+ %% -> StreamRef to know which stream is the connect stream (potentially recursive)
+ %% -> Protocol:resume_handle(Data, StreamRef, State, EvHandler, EvHandlerState)
+ %% -> if reference() then we do Protocol:handle/4
+ %% -> otherwise we pass to the next stream onward
+
+ %% This means that #stream{} must contain both the user-facing StreamRef and the reference.
+
%% @todo Commands.
{{state, ProtoState}, EvHandlerState} = Protocol:handle(Data, ProtoState0,
EvHandler, EvHandlerState0),
@@ -372,10 +385,9 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
tunnel=Tunnel
} = Stream,
State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]},
- %% @todo CONNECT response handling
if
Status >= 100, Status =< 199 ->
- ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
+ ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
EvHandlerState = EvHandler:response_inform(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -384,15 +396,19 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
}, EvHandlerState0),
{State, EvHandlerState};
Status >= 200, Status =< 299, element(1, Tunnel) =:= setup ->
- ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
+ {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel,
+ %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo.
+ %% We therefore do not need to call stream_ref/2.
+ %% @todo Maybe we don't need it in TunnelInfo anymore?
+ #{stream_ref := RealStreamRef} = TunnelInfo,
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
}, EvHandlerState0),
%% @todo Handle TLS over TCP and TLS over TLS.
- {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel,
tcp = maps:get(transport, Destination, tcp),
[Protocol0] = maps:get(protocols, Destination, [http]),
%% Options are either passed directly or #{} is used. Since the
@@ -405,19 +421,24 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
%% @todo What about the StateName returned?
OriginSocket = #{
reply_to => ReplyTo,
- stream_ref => StreamRef
+ stream_ref => RealStreamRef
},
OriginTransport = gun_tcp_proxy,
+ %% @todo Depending on protocol:
+ %% - HTTP/1.1 will need to add the stream_ref in Opts to its StreamRef in messages.
+ %% - HTTP/2 as well
+ %% - raw already uses it
+ %% - ws already uses it (but it's passed slightly differently)
+ %% - socks might not need it? what about gun_socks_up?
{_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport,
- %% @todo We are giving the wrong StreamRef we need to give the list (if any).
- ProtoOpts#{stream_ref => StreamRef}),
+ ProtoOpts#{stream_ref => RealStreamRef}),
%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
%% @todo What about keepalive?
{store_stream(State, Stream#stream{tunnel={Protocol, ProtoState,
TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}}),
EvHandlerState};
true ->
- ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -435,6 +456,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
{gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end,
+ %% @todo Disable the tunnel if any.
{maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}),
StreamID, remote, IsFin),
EvHandlerState}
@@ -443,7 +465,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), StreamRef, Trailers},
+ ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
ResponseEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -455,7 +477,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), StreamRef,
+ ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef),
{stream_error, Reason, 'Stream reset by server.'}},
EvHandlerState = EvHandler:cancel(#{
stream_ref => StreamRef,
@@ -487,7 +509,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
},
PushPromiseEvent = case Status of
connected ->
- ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
+ ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers},
PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
_ ->
PushPromiseEvent0
@@ -566,7 +588,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
%% the one previously received.
goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
status=Status, streams=Streams0, stream_refs=Refs}, {goaway, LastStreamID, Reason, _}) ->
- {Streams, RemovedRefs} = goaway_streams(maps:to_list(Streams0), LastStreamID,
+ {Streams, RemovedRefs} = goaway_streams(State0, maps:to_list(Streams0), LastStreamID,
{goaway, Reason, 'The connection is going away.'}, [], []),
State = State0#http2_state{
streams=maps:from_list(Streams),
@@ -583,14 +605,14 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT
end.
%% Cancel server-initiated streams that are above LastStreamID.
-goaway_streams([], _, _, Acc, RefsAcc) ->
+goaway_streams(_, [], _, _, Acc, RefsAcc) ->
{Acc, RefsAcc};
-goaway_streams([{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc)
+goaway_streams(State, [{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
- close_stream(Stream, Reason),
- goaway_streams(Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]);
-goaway_streams([StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) ->
- goaway_streams(Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc).
+ close_stream(State, Stream, Reason),
+ goaway_streams(State, Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]);
+goaway_streams(State, [StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) ->
+ goaway_streams(State, Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc).
%% We are already closing, do nothing.
closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
@@ -614,10 +636,10 @@ closing(#http2_state{opts=Opts}) ->
Timeout = maps:get(closing_timeout, Opts, 15000),
{closing, Timeout}.
-close(Reason0, #http2_state{streams=Streams}, _, EvHandlerState) ->
+close(Reason0, State=#http2_state{streams=Streams}, _, EvHandlerState) ->
Reason = close_reason(Reason0),
_ = maps:fold(fun(_, Stream, _) ->
- close_stream(Stream, Reason)
+ close_stream(State, Stream, Reason)
end, [], Streams),
EvHandlerState.
@@ -625,8 +647,8 @@ close_reason(closed) -> closed;
close_reason(Reason) -> {closed, Reason}.
%% @todo Do we want an event for this?
-close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
- ReplyTo ! {gun_error, self(), StreamRef, Reason},
+close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
ok.
keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
@@ -721,7 +743,7 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port,
InitialFlow, EvHandler, EvHandlerState0),
{store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState};
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{State, EvHandlerState0};
error ->
@@ -796,7 +818,7 @@ data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0)
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
{store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState};
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
{State, EvHandlerState0};
error ->
@@ -864,7 +886,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), StreamRef, StreamError},
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
State;
error ->
State0
@@ -1009,17 +1031,25 @@ connection_error(#http2_state{socket=Socket, transport=Transport,
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
State.
error_stream_not_found(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
State.
%% Streams.
+stream_ref(#http2_state{base_stream_ref=undefined}, StreamRef) ->
+ StreamRef;
+stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef)
+ when is_reference(BaseStreamRef) ->
+ [BaseStreamRef, StreamRef];
+stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef) ->
+ BaseStreamRef ++ [StreamRef].
+
get_stream_by_id(#http2_state{streams=Streams}, StreamID) ->
maps:get(StreamID, Streams).