diff options
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r-- | src/gun_http2.erl | 114 |
1 files changed, 72 insertions, 42 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 14eb10b..7b6d1eb 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -36,16 +36,6 @@ -export([down/1]). %-export([ws_upgrade/10]). --type tunnel_info() :: #{ - %% Tunnel. - host := inet:hostname() | inet:ip_address(), - port := inet:port_number(), - - %% Origin. - origin_host => inet:hostname() | inet:ip_address(), - origin_port => inet:port_number() -}. - -record(stream, { id = undefined :: cow_http2:streamid(), @@ -66,8 +56,8 @@ handler_state :: undefined | gun_content_handler:state(), %% CONNECT tunnel. - tunnel :: {module(), any(), tunnel_info()} - | {setup, gun:connect_destination(), tunnel_info()} + tunnel :: {module(), any(), gun:tunnel_info()} + | {setup, gun:connect_destination(), gun:tunnel_info()} | undefined }). @@ -78,6 +68,10 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), + %% Base stream ref, defined when the protocol runs + %% inside an HTTP/2 CONNECT stream. + base_stream_ref = undefined :: undefined | reference() | [reference()], + %% Current status of the connection. We use this to ensure we are %% not sending the GOAWAY frame more than once, and to validate %% the server connection preface. @@ -158,9 +152,10 @@ init(_ReplyTo, Socket, Transport, Opts0) -> }, {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), + BaseStreamRef = maps:get(stream_ref, Opts, undefined), %% @todo Better validate the preface being received. - State = #http2_state{socket=Socket, - transport=Transport, opts=Opts, content_handlers=Handlers, + State = #http2_state{socket=Socket, transport=Transport, opts=Opts, + base_stream_ref=BaseStreamRef, content_handlers=Handlers, http2_machine=HTTP2Machine}, Transport:send(Socket, Preface), {connected, State}. @@ -317,6 +312,24 @@ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> Stream=#stream{tunnel=undefined} -> data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream); Stream=#stream{tunnel={Protocol, ProtoState0, TunnelInfo}} -> + %% @todo Can't call Protocol:handle directly, may need to unwrap TLS first... + + %% in this case we know Transport is either gun_tcp_proxy or gun_tls_proxy + %% if gun_tcp_proxy we can dispatch to Protocol:handle directly; + %% otherwise we must pass the data to gun_tls_proxy + %% -> send {ssl, Socket, Data} + %% -> eventually Gun process receives {Tag, Socket, Data} + %% -> somehow it needs to call this stream to resume processing and call Protocol:handle + + %% maybe {Tag, Socket, Data, Info} instead and Info is used to dispatch + %% maybe {stream_Tag, StreamRef, Data} + %% -> StreamRef to know which stream is the connect stream (potentially recursive) + %% -> Protocol:resume_handle(Data, StreamRef, State, EvHandler, EvHandlerState) + %% -> if reference() then we do Protocol:handle/4 + %% -> otherwise we pass to the next stream onward + + %% This means that #stream{} must contain both the user-facing StreamRef and the reference. + %% @todo Commands. {{state, ProtoState}, EvHandlerState} = Protocol:handle(Data, ProtoState0, EvHandler, EvHandlerState0), @@ -372,10 +385,9 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com tunnel=Tunnel } = Stream, State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]}, - %% @todo CONNECT response handling if Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, + ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, EvHandlerState = EvHandler:response_inform(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -384,15 +396,19 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com }, EvHandlerState0), {State, EvHandlerState}; Status >= 200, Status =< 299, element(1, Tunnel) =:= setup -> - ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel, + %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo. + %% We therefore do not need to call stream_ref/2. + %% @todo Maybe we don't need it in TunnelInfo anymore? + #{stream_ref := RealStreamRef} = TunnelInfo, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), %% @todo Handle TLS over TCP and TLS over TLS. - {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel, tcp = maps:get(transport, Destination, tcp), [Protocol0] = maps:get(protocols, Destination, [http]), %% Options are either passed directly or #{} is used. Since the @@ -405,19 +421,24 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com %% @todo What about the StateName returned? OriginSocket = #{ reply_to => ReplyTo, - stream_ref => StreamRef + stream_ref => RealStreamRef }, OriginTransport = gun_tcp_proxy, + %% @todo Depending on protocol: + %% - HTTP/1.1 will need to add the stream_ref in Opts to its StreamRef in messages. + %% - HTTP/2 as well + %% - raw already uses it + %% - ws already uses it (but it's passed slightly differently) + %% - socks might not need it? what about gun_socks_up? {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, - %% @todo We are giving the wrong StreamRef we need to give the list (if any). - ProtoOpts#{stream_ref => StreamRef}), + ProtoOpts#{stream_ref => RealStreamRef}), %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), %% @todo What about keepalive? {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}}), EvHandlerState}; true -> - ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -435,6 +456,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com {gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0), EvHandlerState1} end, + %% @todo Disable the tunnel if any. {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState} @@ -443,7 +465,7 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, + ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, ResponseEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo @@ -455,7 +477,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), StreamRef, + ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef), {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ stream_ref => StreamRef, @@ -487,7 +509,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, }, PushPromiseEvent = case Status of connected -> - ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, + ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers}, PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; _ -> PushPromiseEvent0 @@ -566,7 +588,7 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, %% the one previously received. goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, status=Status, streams=Streams0, stream_refs=Refs}, {goaway, LastStreamID, Reason, _}) -> - {Streams, RemovedRefs} = goaway_streams(maps:to_list(Streams0), LastStreamID, + {Streams, RemovedRefs} = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {goaway, Reason, 'The connection is going away.'}, [], []), State = State0#http2_state{ streams=maps:from_list(Streams), @@ -583,14 +605,14 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT end. %% Cancel server-initiated streams that are above LastStreamID. -goaway_streams([], _, _, Acc, RefsAcc) -> +goaway_streams(_, [], _, _, Acc, RefsAcc) -> {Acc, RefsAcc}; -goaway_streams([{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc) +goaway_streams(State, [{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc) when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> - close_stream(Stream, Reason), - goaway_streams(Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]); -goaway_streams([StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) -> - goaway_streams(Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc). + close_stream(State, Stream, Reason), + goaway_streams(State, Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]); +goaway_streams(State, [StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) -> + goaway_streams(State, Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc). %% We are already closing, do nothing. closing(_, #http2_state{status=closing}, _, EvHandlerState) -> @@ -614,10 +636,10 @@ closing(#http2_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), {closing, Timeout}. -close(Reason0, #http2_state{streams=Streams}, _, EvHandlerState) -> +close(Reason0, State=#http2_state{streams=Streams}, _, EvHandlerState) -> Reason = close_reason(Reason0), _ = maps:fold(fun(_, Stream, _) -> - close_stream(Stream, Reason) + close_stream(State, Stream, Reason) end, [], Streams), EvHandlerState. @@ -625,8 +647,8 @@ close_reason(closed) -> closed; close_reason(Reason) -> {closed, Reason}. %% @todo Do we want an event for this? -close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> - ReplyTo ! {gun_error, self(), StreamRef, Reason}, +close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> @@ -721,7 +743,7 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port, InitialFlow, EvHandler, EvHandlerState0), {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}; #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {State, EvHandlerState0}; error -> @@ -796,7 +818,7 @@ data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}; #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, {State, EvHandlerState0}; error -> @@ -864,7 +886,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), StreamRef, StreamError}, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, State; error -> State0 @@ -1009,17 +1031,25 @@ connection_error(#http2_state{socket=Socket, transport=Transport, %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream cannot be found."}}, State. %% Streams. +stream_ref(#http2_state{base_stream_ref=undefined}, StreamRef) -> + StreamRef; +stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef) + when is_reference(BaseStreamRef) -> + [BaseStreamRef, StreamRef]; +stream_ref(#http2_state{base_stream_ref=BaseStreamRef}, StreamRef) -> + BaseStreamRef ++ [StreamRef]. + get_stream_by_id(#http2_state{streams=Streams}, StreamID) -> maps:get(StreamID, Streams). |