From 8033850ab81ca0639489636bb8760d93900d4a80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 18 Sep 2020 17:01:25 +0200 Subject: Initial success for h2 CONNECT -> https CONNECT -> https --- ebin/gun.app | 2 +- src/gun.erl | 5 +- src/gun_http.erl | 3 +- src/gun_http2.erl | 341 +++++++++++------------------ src/gun_protocols.erl | 5 + src/gun_tcp_proxy.erl | 15 +- src/gun_tls_proxy.erl | 12 +- src/gun_tls_proxy_http2_connect.erl | 10 +- src/gun_tunnel.erl | 414 ++++++++++++++++++++++++++++++++++++ test/rfc7540_SUITE.erl | 32 ++- 10 files changed, 609 insertions(+), 230 deletions(-) create mode 100644 src/gun_tunnel.erl diff --git a/ebin/gun.app b/ebin/gun.app index 9a5ab45..1f69574 100644 --- a/ebin/gun.app +++ b/ebin/gun.app @@ -1,7 +1,7 @@ {application, 'gun', [ {description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."}, {vsn, "2.0.0-pre.2"}, - {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_ws','gun_ws_h']}, + {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h']}, {registered, [gun_sup]}, {applications, [kernel,stdlib,ssl,cowlib]}, {mod, {gun_app, []}}, diff --git a/src/gun.erl b/src/gun.erl index 02b7302..4178c24 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -186,10 +186,7 @@ origin_port => inet:port_number(), %% Non-stream intermediaries (for example SOCKS). - intermediaries => [intermediary()], - - %% TLS proxy. - tls_proxy_pid => pid() + intermediaries => [intermediary()] }. -export_type([tunnel_info/0]). diff --git a/src/gun_http.erl b/src/gun_http.erl index 490f025..064bf04 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -378,6 +378,7 @@ handle_inform(Rest, State=#http_state{ {_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers), Upgrade = cow_http_hd:parse_upgrade(Upgrade0), ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers}, + %% @todo We probably need to add_stream_ref? {handle_ret({switch_protocol, raw, ReplyTo}, State), EvHandlerState0} catch _:_ -> %% When the Upgrade header is missing or invalid we treat @@ -781,7 +782,7 @@ stream_info(#http_state{streams=Streams}, StreamRef) -> case lists:keyfind(StreamRef, #stream.ref, Streams) of #stream{reply_to=ReplyTo, is_alive=IsAlive} -> {ok, #{ - ref => StreamRef, + ref => StreamRef, %% @todo Wrong stream_ref? base_stream_ref it? reply_to => ReplyTo, state => case IsAlive of true -> running; diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 252aaec..df76195 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -37,6 +37,24 @@ -export([down/1]). %-export([ws_upgrade/10]). +-record(tunnel, { + %% The tunnel can either go requested->established + %% or requested->tls_handshake->established, or get + %% canceled. + state = requested :: requested | established, + + %% Destination information. + destination = undefined :: gun:connect_destination(), + + %% Tunnel information. + info = undefined :: gun:tunnel_info(), + + %% Protocol module and state of the outer layer. Only initialized + %% after the TLS handshake has completed when TLS is involved. + protocol = undefined :: module(), + protocol_state = undefined :: any() +}). + -record(stream, { id = undefined :: cow_http2:streamid(), @@ -58,10 +76,7 @@ handler_state :: undefined | gun_content_handler:state(), %% CONNECT tunnel. - tunnel :: {module(), any(), gun:tunnel_info()} - | {setup, gun:connect_destination(), gun:tunnel_info()} - | {tls_handshake, gun:connect_destination(), gun:tunnel_info()} - | undefined + tunnel :: undefined | #tunnel{} }). -record(http2_state, { @@ -315,64 +330,29 @@ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> case get_stream_by_id(State, StreamID) of Stream=#stream{tunnel=undefined} -> data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream); - #stream{ref=StreamRef, reply_to=ReplyTo, - tunnel={_Protocol, _ProtoState, #{tls_proxy_pid := ProxyPid}}} -> - %% When we receive a DATA frame that contains TLS-encoded data, - %% we must first forward it to the ProxyPid to be decoded. The - %% Gun process will receive it back as a tls_proxy_http2_connect - %% message and forward it to the right stream via the handle_continue - %% callback. - OriginSocket = #{ - gun_pid => self(), - reply_to => ReplyTo, - stream_ref => stream_ref(State, StreamRef) - }, - ProxyPid ! {tls_proxy_http2_connect, OriginSocket, Data}, - %% @todo What about IsFin? - {State, EvHandlerState0}; - Stream=#stream{tunnel={Protocol, ProtoState0, TunnelInfo}} -> - {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState0, EvHandler, EvHandlerState0), - {tunnel_commands(Commands, Stream, Protocol, TunnelInfo, State), EvHandlerState} + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> +% %% @todo What about IsFin? + {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), + {tunnel_commands(Commands, Stream, State), EvHandlerState} end. -tunnel_commands(Command, Stream, Protocol, TunnelInfo, State) when not is_list(Command) -> - tunnel_commands([Command], Stream, Protocol, TunnelInfo, State); -tunnel_commands([], Stream, _, _, State) -> +tunnel_commands(Command, Stream, State) when not is_list(Command) -> + tunnel_commands([Command], Stream, State); +tunnel_commands([], Stream, State) -> store_stream(State, Stream); -tunnel_commands([{state, ProtoState}|Tail], Stream, Protocol, TunnelInfo, State) -> - tunnel_commands(Tail, Stream#stream{tunnel={Protocol, ProtoState, TunnelInfo}}, - Protocol, TunnelInfo, State); -tunnel_commands([SetCookie={set_cookie, _, _, _, _}|Tail], Stream, Protocol, TunnelInfo, - State=#http2_state{commands_queue=Queue}) -> - tunnel_commands(Tail, Stream, Protocol, TunnelInfo, - State#http2_state{commands_queue=[SetCookie|Queue]}); -tunnel_commands([{origin, _, NewHost, NewPort, Type}|Tail], Stream, Protocol, TunnelInfo, State) -> -%% @todo Event? - tunnel_commands(Tail, Stream, Protocol, TunnelInfo#{ - origin_host => NewHost, - origin_port => NewPort, - intermediaries => [#{ - type => Type, - host => maps:get(origin_host, TunnelInfo), - port => maps:get(origin_port, TunnelInfo), - transport => tcp, %% @todo - protocol => Protocol:name() - }|maps:get(intermediaries, TunnelInfo, [])] - }, State); -tunnel_commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], Stream=#stream{ref=StreamRef}, - _CurrentProtocol, TunnelInfo, State=#http2_state{opts=Opts}) -> - {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), - OriginSocket = #{ - gun_pid => self(), - reply_to => ReplyTo, - stream_ref => stream_ref(State, StreamRef) - }, - OriginTransport = gun_tcp_proxy, - {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), - tunnel_commands([{state, ProtoState}|Tail], Stream, Protocol, TunnelInfo, State); -tunnel_commands([{active, true}|Tail], Stream, Protocol, TunnelInfo, State) -> - tunnel_commands(Tail, Stream, Protocol, TunnelInfo, State). +tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID}, State0) -> + %% @todo EvHandler EvHandlerState + {State, _EvHandlerState} = maybe_send_data(State0, StreamID, IsFin, Data, todo, todo), + tunnel_commands(Tail, Stream, State); +tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, State) -> + tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}, State); +tunnel_commands([SetCookie={set_cookie, _, _, _, _}|Tail], Stream, State=#http2_state{commands_queue=Queue}) -> + tunnel_commands(Tail, Stream, State#http2_state{commands_queue=[SetCookie|Queue]}). + +continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) -> + ContinueStreamRef ++ [StreamRef]; +continue_stream_ref(State, StreamRef) -> + stream_ref(State, StreamRef). data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) -> @@ -410,6 +390,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, end, {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. +%% @todo Make separate functions for inform/connect/normal. headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, commands_queue=Commands}, StreamID, IsFin, Headers, #{status := Status}, _BodyLen, EvHandler, EvHandlerState0) -> @@ -432,8 +413,14 @@ headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, command headers => Headers }, EvHandlerState0), {State, EvHandlerState}; - Status >= 200, Status =< 299, element(1, Tunnel) =:= setup -> - {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel, + Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested -> + #tunnel{destination=Destination, info=TunnelInfo0} = Tunnel, + #{host := DestHost, port := DestPort} = Destination, + TunnelInfo = TunnelInfo0#{ + transport => maps:get(transport, Destination, tcp), + origin_host => DestHost, + origin_port => DestPort + }, %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo. %% We therefore do not need to call stream_ref/2. RealStreamRef = stream_ref(State, StreamRef), @@ -444,22 +431,17 @@ headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, command status => Status, headers => Headers }, EvHandlerState0), + ContinueStreamRef = continue_stream_ref(State, StreamRef), OriginSocket = #{ gun_pid => self(), reply_to => ReplyTo, - stream_ref => RealStreamRef + stream_ref => RealStreamRef, + %% @todo That's wrong when we are already in a tunnel? + handle_continue_stream_ref => ContinueStreamRef }, - case Destination of + Proto = gun_tunnel, + ProtoOpts = case Destination of #{transport := tls} -> - Protocols = maps:get(protocols, Destination, [http2, http]), - TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost), - TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity), -% HandshakeEvent = #{ -% stream_ref => StreamRef, -% reply_to => ReplyTo, -% tls_opts => maps:get(tls_opts, Destination, []), -% timeout => maps:get(tls_handshake_timeout, Destination, infinity) -% }, %tls_handshake(internal, {tls_handshake, % HandshakeEvent0=#{tls_opts := TLSOpts0, timeout := TLSTimeout}, Protocols, ReplyTo}, % State=#state{socket=Socket, transport=Transport, origin_host=OriginHost, origin_port=OriginPort, @@ -469,32 +451,38 @@ headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, command % socket => Socket % }, % EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0), - HandshakeEvent = undefined, - {ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort, - TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect, - %% @todo ? -% {HandshakeEvent, Protocols, ReplyTo}), - {handle_continue, RealStreamRef, HandshakeEvent, Protocols}), -% commands([{switch_transport, gun_tls_proxy, ProxyPid}], State#state{ -% socket=ProxyPid, transport=gun_tls_proxy, event_handler_state=EvHandlerState}); - %% @todo What about keepalive? - {store_stream(State, Stream#stream{tunnel={tls_handshake, Destination, - TunnelInfo#{origin_host => DestHost, origin_port => DestPort, - %% @todo Fine having it, but we want the socket pid to simulate active. - tls_proxy_pid => ProxyPid}}}), - EvHandlerState}; + Protocols = maps:get(protocols, Destination, [http2, http]), + TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost), + HandshakeEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + tls_opts => TLSOpts, + timeout => maps:get(tls_handshake_timeout, Destination, infinity) + }, + Opts#{ + stream_ref => RealStreamRef, + tunnel => #{ + type => connect, + info => TunnelInfo, + handshake_event => HandshakeEvent, + protocols => Protocols + } + }; _ -> [NewProtocol] = maps:get(protocols, Destination, [http]), - {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), - %% @todo What about the StateName returned? - {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts#{stream_ref => RealStreamRef}), - %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), - %% @todo What about keepalive? - ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()}, - {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, - TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}}), - EvHandlerState} - end; + Opts#{ + stream_ref => RealStreamRef, + tunnel => #{ + type => connect, + info => TunnelInfo, + new_protocol => NewProtocol + } + } + end, + {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{ + info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), + EvHandlerState}; true -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ @@ -593,82 +581,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. -%% Continue handling or sending the data. -handle_continue(StreamRef, Msg, State=#http2_state{opts=Opts}, EvHandler, EvHandlerState0) - when is_reference(StreamRef) -> - case get_stream_by_ref(State, StreamRef) of - Stream=#stream{id=StreamID, reply_to=ReplyTo, - tunnel={tls_handshake, Destination, TunnelInfo=#{tls_proxy_pid := ProxyPid}}} -> - case Msg of - {gun_tls_proxy, ProxyPid, {ok, Negotiated}, - {handle_continue, _, _HandshakeEvent, Protocols}} -> - #{host := DestHost, port := DestPort} = Destination, - RealStreamRef = stream_ref(State, StreamRef), - NewProtocol = gun_protocols:negotiated(Negotiated, Protocols), -% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -% socket => Socket, -% protocol => NewProtocol -% }, EvHandlerState0), - OriginSocket = #{ - gun_pid => self(), - reply_to => ReplyTo, - stream_ref => RealStreamRef - }, - {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), - {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, gun_tcp_proxy, - ProtoOpts#{stream_ref => RealStreamRef}), - ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()}, - {{state, store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, - TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}})}, - EvHandlerState0}; - {gun_tls_proxy, ProxyPid, {error, _Reason}, - {handle_continue, _, _HandshakeEvent, _}} -> -% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -% error => Reason -% }, EvHandlerState0), -%% @todo -% The TCP connection can be closed by either peer. The END_STREAM flag -% on a DATA frame is treated as being equivalent to the TCP FIN bit. A -% client is expected to send a DATA frame with the END_STREAM flag set -% after receiving a frame bearing the END_STREAM flag. A proxy that -% receives a DATA frame with the END_STREAM flag set sends the attached -% data with the FIN bit set on the last TCP segment. A proxy that -% receives a TCP segment with the FIN bit set sends a DATA frame with -% the END_STREAM flag set. Note that the final TCP segment or DATA -% frame could be empty. - {{state, State}, EvHandlerState0}; - %% Data that must be sent as a DATA frame. - {data, ReplyTo, _, IsFin, Data} -> - {State1, EvHandlerState} = maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0), - {{state, State1}, EvHandlerState} - end; - Stream=#stream{id=StreamID, tunnel={Protocol, ProtoState0, TunnelInfo=#{tls_proxy_pid := ProxyPid}}} -> - case Msg of - %% Data that was received and decrypted. - {tls_proxy, ProxyPid, Data} -> - {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState0, EvHandler, EvHandlerState0), - {{state, tunnel_commands(Commands, Stream, Protocol, TunnelInfo, State)}, EvHandlerState}; - %% @todo What to do about those? - {tls_proxy_closed, ProxyPid} -> - todo; - {tls_proxy_error, ProxyPid, _Reason} -> - todo; - %% Data that must be sent as a DATA frame. - {data, _, _, IsFin, Data} -> - {State1, EvHandlerState} = maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0), - {{state, State1}, EvHandlerState} - end -%% @todo Is this possible? -% error -> -% {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} - end; -%% Tunneled data. -handle_continue([StreamRef|Tail], Msg, State, EvHandler, EvHandlerState0) -> +%% We always pass handle_continue messages to the tunnel. +handle_continue(ContinueStreamRef, Msg, State, EvHandler, EvHandlerState0) -> + StreamRef = case ContinueStreamRef of + [SR|_] -> SR; + _ -> ContinueStreamRef + end, case get_stream_by_ref(State, StreamRef) of - Stream=#stream{tunnel={Proto, ProtoState0, TunnelInfo}} -> - {ProtoState, EvHandlerState} = Proto:handle_continue(normalize_stream_ref(Tail), + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef, Msg, ProtoState0, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}%; + {{state, tunnel_commands(Commands, Stream, State)}, + EvHandlerState}%; %% The stream may have ended while TLS was being decoded. @todo What should we do? % error -> % {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} @@ -875,13 +799,14 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel={Proto, ProtoState0, TunnelInfo=#{ + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ origin_host := OriginHost, origin_port := OriginPort}}} -> %% @todo So the event is probably not giving the right StreamRef? {ProtoState, EvHandlerState} = Proto:request(ProtoState0, normalize_stream_ref(Tail), ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}; + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, @@ -890,18 +815,6 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. - %% get the ultimate stream by querying the #stream{} until we get the last one - %% call Proto:request in that stream - %% receive a {data, ...} back with the Tunnel for the StreamRef - %% if gun_tls_proxy then we get the wrapped TLS data - %% otherwise we get the data directly - %% handle the data in the same way as normal; data follows the same scenario - %% until we get a {data, ...} for the top-level stream - - %% What about data we receive from the socket? - %% - %% we get DATA with a StreamID for the CONNECT, we see it's CONNECT so we forward to Proto:data - initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; initial_flow(InitialFlow, _) -> InitialFlow. @@ -925,7 +838,8 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He gun_tls -> <<"https">>; gun_tls_proxy -> <<"https">>; gun_tcp -> <<"http">>; - gun_tcp_proxy -> <<"http">> + gun_tcp_proxy -> <<"http">>; + gun_tls_proxy_http2_connect -> <<"http">> end, authority => Authority, path => Path @@ -935,25 +849,24 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He normalize_stream_ref([StreamRef]) -> StreamRef; normalize_stream_ref(StreamRef) -> StreamRef. +%% @todo Make all calls go through this clause. data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState) when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of - #stream{id=StreamID, tunnel=Tunnel} -> + Stream=#stream{id=StreamID, tunnel=Tunnel} -> case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of {ok, fin, _} -> {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, fin} -> {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; + {ok, _, _} when Tunnel =:= undefined -> + maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState); {ok, _, _} -> - case Tunnel of - %% We need to encrypt the data before we can send it. We send it - %% directly to the gun_tls_proxy process and then - {_, _, #{tls_proxy_pid := ProxyPid}} -> - ok = gun_tls_proxy:send(ProxyPid, Data), - {State, EvHandlerState}; - _ -> - maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState) - end + #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel, + {ProtoState, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, + ReplyTo, IsFin, Data, EvHandler, EvHandlerState), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState1} end; error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} @@ -961,10 +874,11 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, %% Tunneled data. data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{tunnel={Proto, ProtoState0, TunnelInfo}} -> + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> {ProtoState, EvHandlerState} = Proto:data(ProtoState0, normalize_stream_ref(Tail), ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}; + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, @@ -1073,16 +987,16 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path= <<>>, tunnel={setup, Destination, TunnelInfo}}, + authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream); %% Tunneled request. connect(State, [StreamRef|Tail], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) -> case get_stream_by_ref(State, StreamRef) of - %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel={Proto, ProtoState0, ProtoTunnelInfo}} -> + %% @todo Should we send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> ProtoState = Proto:connect(ProtoState0, normalize_stream_ref(Tail), ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow), - store_stream(State, Stream#stream{tunnel={Proto, ProtoState, ProtoTunnelInfo}}); + store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}); #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, @@ -1110,6 +1024,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP EvHandlerState0} end. +%% @todo What about tunnels? timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, Name}, TRef) -> case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of {ok, HTTP2Machine} -> @@ -1120,21 +1035,18 @@ timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, Nam stream_info(State, StreamRef) when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of - #stream{reply_to=ReplyTo, tunnel={Protocol, _, TunnelInfo=#{ - origin_host := OriginHost, origin_port := OriginPort}}} -> + #stream{reply_to=ReplyTo, tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState, + info=#{transport := Transport, origin_host := OriginHost, origin_port := OriginPort}}} -> {ok, #{ ref => StreamRef, reply_to => ReplyTo, state => running, tunnel => #{ - transport => case TunnelInfo of - #{tls_proxy_pid := _} -> tls; - _ -> tcp - end, - protocol => Protocol:name(), - origin_scheme => case TunnelInfo of - #{tls_proxy_pid := _} -> <<"https">>; - _ -> <<"http">> + transport => Transport, + protocol => Proto:tunneled_name(ProtoState), + origin_scheme => case Transport of + tcp -> <<"http">>; + tls -> <<"https">> end, origin_host => OriginHost, origin_port => OriginPort @@ -1152,17 +1064,19 @@ stream_info(State, StreamRef) when is_reference(StreamRef) -> %% Tunneled streams. stream_info(State=#http2_state{transport=Transport}, StreamRefList=[StreamRef|Tail]) -> case get_stream_by_ref(State, StreamRef) of - #stream{tunnel={Protocol, ProtoState, TunnelInfo=#{host := TunnelHost, port := TunnelPort}}} -> + #stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState, + info=TunnelInfo=#{host := TunnelHost, port := TunnelPort}}} -> %% We must return the real StreamRef as seen by the user. %% We therefore set it on return, with the outer layer "winning". %% %% We also add intermediaries which are prepended to the list and %% therefore are ultimately given from outer to inner layer just %% like gun:info/1 intermediaries. - case Protocol:stream_info(ProtoState, normalize_stream_ref(Tail)) of + case Proto:stream_info(ProtoState, normalize_stream_ref(Tail)) of {ok, undefined} -> {ok, undefined}; {ok, Info} -> + %% @todo Double check intermediaries. Intermediaries1 = maps:get(intermediaries, TunnelInfo, []), Intermediaries2 = maps:get(intermediaries, Info, []), {ok, Info#{ @@ -1184,6 +1098,7 @@ stream_info(State=#http2_state{transport=Transport}, StreamRefList=[StreamRef|Ta {ok, undefined} end. +%% @todo Tunnels. down(#http2_state{stream_refs=Refs}) -> maps:keys(Refs). diff --git a/src/gun_protocols.erl b/src/gun_protocols.erl index 4209641..65d5211 100644 --- a/src/gun_protocols.erl +++ b/src/gun_protocols.erl @@ -18,6 +18,7 @@ -export([handler/1]). -export([handler_and_opts/2]). -export([negotiated/2]). +-export([stream_ref/1]). -spec add_stream_ref(Protocol, undefined | gun:stream_ref()) -> Protocol when Protocol :: gun:protocol(). @@ -53,3 +54,7 @@ negotiated({ok, <<"h2">>}, _) -> http2; negotiated({ok, <<"http/1.1">>}, _) -> http; negotiated({error, protocol_not_negotiated}, [Protocol]) -> Protocol; negotiated({error, protocol_not_negotiated}, _) -> http. + +-spec stream_ref(gun:protocol()) -> undefined | gun:stream_ref(). +stream_ref({_, ProtocolOpts}) -> maps:get(stream_ref, ProtocolOpts, undefined); +stream_ref(_) -> undefined. diff --git a/src/gun_tcp_proxy.erl b/src/gun_tcp_proxy.erl index b4236f4..0107ac1 100644 --- a/src/gun_tcp_proxy.erl +++ b/src/gun_tcp_proxy.erl @@ -24,8 +24,14 @@ -export([close/1]). -type socket() :: #{ + %% The pid of the Gun connection. + gun_pid := pid(), + + %% The pid of the process that gets replies for this tunnel. reply_to := pid(), - stream_ref := reference() | [reference()] + + %% The full stream reference for this tunnel. + stream_ref := gun:stream_ref() }. name() -> tcp_proxy. @@ -41,10 +47,17 @@ connect(_, _, _, _) -> error(not_implemented). -spec send(socket(), iodata()) -> ok. +send(#{gun_pid := GunPid, reply_to := ReplyTo, stream_ref := StreamRef, + handle_continue_stream_ref := ContinueStreamRef}, Data) -> + GunPid ! {handle_continue, ContinueStreamRef, {data, ReplyTo, StreamRef, nofin, Data}}, + ok; send(#{reply_to := ReplyTo, stream_ref := StreamRef}, Data) -> gen_statem:cast(self(), {data, ReplyTo, StreamRef, nofin, Data}). -spec setopts(_, _) -> no_return(). +setopts(#{handle_continue_stream_ref := _}, _) -> + %% We send messages automatically regardless of active mode. + ok; setopts(_, _) -> error(not_implemented). diff --git a/src/gun_tls_proxy.erl b/src/gun_tls_proxy.erl index ab394bd..cba03de 100644 --- a/src/gun_tls_proxy.erl +++ b/src/gun_tls_proxy.erl @@ -95,6 +95,7 @@ extra :: any() }). +%-define(DEBUG_PROXY,1). -ifdef(DEBUG_PROXY). -define(DEBUG_LOG(Format, Args), io:format(user, "(~p) ~p:~p/~p:" ++ Format ++ "~n", @@ -269,17 +270,17 @@ connected({call, From}, Msg={send, Data}, State=#state{proxy_socket=Socket}) -> %% of the data isn't yet complete. We wrap the message in a handle_continue %% tuple and provide the StreamRef for further processing. connected(info, Msg={ssl, Socket, Data}, State=#state{owner_pid=OwnerPid, proxy_socket=Socket, - out_socket=#{stream_ref := StreamRef}}) -> + out_socket=#{handle_continue_stream_ref := StreamRef}}) -> ?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]), OwnerPid ! {handle_continue, StreamRef, {tls_proxy, self(), Data}}, keep_state_and_data; connected(info, Msg={ssl_closed, Socket}, State=#state{owner_pid=OwnerPid, proxy_socket=Socket, - out_socket=#{stream_ref := StreamRef}}) -> + out_socket=#{handle_continue_stream_ref := StreamRef}}) -> ?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]), OwnerPid ! {handle_continue, StreamRef, {tls_proxy_closed, self()}}, keep_state_and_data; connected(info, Msg={ssl_error, Socket, Reason}, State=#state{owner_pid=OwnerPid, proxy_socket=Socket, - out_socket=#{stream_ref := StreamRef}}) -> + out_socket=#{handle_continue_stream_ref := StreamRef}}) -> ?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]), OwnerPid ! {handle_continue, StreamRef, {tls_proxy_error, self(), Reason}}, keep_state_and_data; @@ -340,8 +341,9 @@ handle_common(cast, Msg={send_result, From, Result}, State) -> gen_statem:reply(From, Result), keep_state_and_data; %% Messages from the real socket. -handle_common(info, Msg={OK, Socket, Data}, State=#state{proxy_pid=ProxyPid, - out_socket=Socket, out_messages={OK, _, _}}) -> +%% @todo Make _Socket and __Socket match again. +handle_common(info, Msg={OK, _Socket, Data}, State=#state{proxy_pid=ProxyPid, + out_socket=__Socket, out_messages={OK, _, _}}) -> ?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]), ProxyPid ! {tls_proxy, self(), Data}, keep_state_and_data; diff --git a/src/gun_tls_proxy_http2_connect.erl b/src/gun_tls_proxy_http2_connect.erl index c423571..70b4824 100644 --- a/src/gun_tls_proxy_http2_connect.erl +++ b/src/gun_tls_proxy_http2_connect.erl @@ -31,7 +31,10 @@ reply_to := pid(), %% The full stream reference for this tunnel. - stream_ref := reference() | [reference()] + stream_ref := gun:stream_ref(), + + %% The full stream reference for the responsible HTTP/2 stream. + handle_continue_stream_ref := gun:stream_ref() }. name() -> tls_proxy_http2_connect. @@ -47,8 +50,9 @@ connect(_, _, _, _) -> error(not_implemented). -spec send(socket(), iodata()) -> ok. -send(#{gun_pid := GunPid, reply_to := ReplyTo, stream_ref := StreamRef}, Data) -> - GunPid ! {handle_continue, StreamRef, {data, ReplyTo, StreamRef, nofin, Data}}, +send(S=#{gun_pid := GunPid, reply_to := ReplyTo, stream_ref := DataStreamRef, + handle_continue_stream_ref := StreamRef}, Data) -> + GunPid ! {handle_continue, StreamRef, {data, ReplyTo, DataStreamRef, nofin, Data}}, ok. -spec setopts(_, _) -> no_return(). diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl new file mode 100644 index 0000000..8da4c5a --- /dev/null +++ b/src/gun_tunnel.erl @@ -0,0 +1,414 @@ +%% Copyright (c) 2020, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% This module is used when a tunnel is established and either +%% StreamRef dereference or a TLS proxy process must be handled +%% by the tunnel layer. +-module(gun_tunnel). + +-export([init/4]). +-export([handle/4]). +-export([handle_continue/5]). +-export([update_flow/4]). +-export([closing/4]). +-export([close/4]). +-export([keepalive/3]). +-export([headers/11]). +-export([request/12]). +-export([data/7]). +-export([connect/7]). +-export([cancel/5]). +-export([timeout/3]). +-export([stream_info/2]). +-export([tunneled_name/1]). +-export([down/1]). +%-export([ws_upgrade/10]). + +-record(tunnel_state, { + %% Fake socket and transport. + socket = undefined :: #{ + gun_pid := pid(), + reply_to := pid(), + stream_ref := gun:stream_ref(), + handle_continue_stream_ref := gun:stream_ref() + } | pid(), + transport = undefined :: gun_tcp_proxy | gun_tls_proxy, + + %% The stream_ref from which the stream was created. When + %% the tunnel exists as a result of HTTP/2 CONNECT -> HTTP/1.1 CONNECT + %% the stream_ref is the same as the HTTP/1.1 CONNECT one. + stream_ref = undefined :: gun:stream_ref(), + + %% When the tunnel is a 'connect' tunnel we must dereference the + %% stream_ref. When it is 'socks' we must not as there was no + %% stream involved in creating the tunnel. + type = undefined :: connect | socks, + + %% Tunnel information. + info = undefined :: gun:tunnel_info(), + + %% The origin socket of the TLS proxy, if any. This is used to forward + %% messages to the proxy process in order to decrypt the data. + tls_origin_socket = undefined :: undefined | #{ + gun_pid := pid(), + reply_to := pid(), + stream_ref := gun:stream_ref(), + handle_continue_stream_ref => gun:stream_ref() + }, + + opts = undefined :: undefined | any(), %% @todo Opts type. + + %% Protocol module and state of the outer layer. Only initialized + %% after the TLS handshake has completed when TLS is involved. + protocol = undefined :: module(), + protocol_state = undefined :: any() +}). + +%% Socket is the "origin socket" and Transport the "origin transport". +%% When the Transport indicate a TLS handshake was requested, the socket +%% and transport are given to the intermediary TLS proxy process. +%% +%% Opts is the options for the underlying HTTP/2 connection, +%% with some extra information added for the tunnel. +%% +%% @todo Mark the tunnel options as reserved. +init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) -> + #{ + type := TunnelType, + info := TunnelInfo + } = Tunnel, + State = #tunnel_state{stream_ref=StreamRef, type=TunnelType, info=TunnelInfo, + opts=maps:without([stream_ref, tunnel], Opts)}, + case Tunnel of + %% Initialize the protocol. + #{new_protocol := NewProtocol} -> + {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), + {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport, + ProtoOpts#{stream_ref => StreamRef}), +%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, + {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, + protocol=Proto, protocol_state=ProtoState}}; + %% We can't initialize the protocol until the TLS handshake has completed. + #{handshake_event := HandshakeEvent, protocols := Protocols} -> + #{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket, + %% @todo FIX THIS!! + % #{ + % origin_host := DestHost, + % origin_port := DestPort + % } = TunnelInfo, +%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState! +%% Otherwise we can't do the TLS events. + #{ + tls_opts := TLSOpts, + timeout := TLSTimeout + } = HandshakeEvent, + {ok, ProxyPid} = gun_tls_proxy:start_link("fake", 12345,% @todo FIX THIS!! DestHost, DestPort, + TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect, + {handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}), + {tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy, + tls_origin_socket=OriginSocket}} + end. + +%% When we receive data we pass it forward directly for TCP; +%% or we decrypt it and pass it via handle_continue for TLS. +handle(Data, State=#tunnel_state{transport=gun_tcp_proxy, + protocol=Proto, protocol_state=ProtoState0}, + EvHandler, EvHandlerState0) -> + {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), + {{state, commands(Commands, State)}, EvHandlerState}; +handle(Data, State=#tunnel_state{transport=gun_tls_proxy, + socket=ProxyPid, tls_origin_socket=OriginSocket}, + _EvHandler, EvHandlerState) -> + %% When we receive a DATA frame that contains TLS-encoded data, + %% we must first forward it to the ProxyPid to be decoded. The + %% Gun process will receive it back as a tls_proxy_http2_connect + %% message and forward it to the right stream via the handle_continue + %% callback. + ProxyPid ! {tls_proxy_http2_connect, OriginSocket, Data}, + {{state, State}, EvHandlerState}. + +%% This callback will only be called for TLS. +%% +%% The StreamRef in this callback is special because it includes +%% a reference() for Socks layers as well. +handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, + {handle_continue, _, HandshakeEvent, Protocols}}, + State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts}, + _EvHandler, EvHandlerState0) + when is_reference(ContinueStreamRef) -> + #{reply_to := ReplyTo} = HandshakeEvent, + NewProtocol = gun_protocols:negotiated(Negotiated, Protocols), + {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), +% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ +% socket => Socket, +% protocol => NewProtocol +% }, EvHandlerState0), + %% @todo Terminate the current protocol or something? + OriginSocket = #{ + gun_pid => self(), + reply_to => ReplyTo, + stream_ref => StreamRef%, +% handle_continue_stream_ref => ContinueStreamRef + }, + {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, + ProtoOpts#{stream_ref => StreamRef}), + ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, + {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0}; +handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, _Reason}, + {handle_continue, _, _HandshakeEvent, _}}, + #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) + when is_reference(ContinueStreamRef) -> +%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ +%% error => Reason +%% }, EvHandlerState0), +%%% @todo +%% The TCP connection can be closed by either peer. The END_STREAM flag +%% on a DATA frame is treated as being equivalent to the TCP FIN bit. A +%% client is expected to send a DATA frame with the END_STREAM flag set +%% after receiving a frame bearing the END_STREAM flag. A proxy that +%% receives a DATA frame with the END_STREAM flag set sends the attached +%% data with the FIN bit set on the last TCP segment. A proxy that +%% receives a TCP segment with the FIN bit set sends a DATA frame with +%% the END_STREAM flag set. Note that the final TCP segment or DATA +%% frame could be empty. + {[], EvHandlerState0}; +%% Send the data. This causes TLS to encrypt the data and send it to the inner layer. +handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data}, + #tunnel_state{socket=Socket, transport=Transport}, _EvHandler, EvHandlerState) + when is_reference(ContinueStreamRef) -> + {[{send, IsFin, Data}], EvHandlerState}; +handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data}, + State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState0) + when is_reference(ContinueStreamRef) -> + {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), + {{state, commands(Commands, State)}, EvHandlerState}; +%% @todo What to do about those? Does it matter which one closes/errors out? +handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid}, + #tunnel_state{socket=ProxyPid}, _EvHandler, _EvHandlerState0) + when is_reference(ContinueStreamRef) -> + todo; +handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, _Reason}, + #tunnel_state{socket=ProxyPid}, _EvHandler, _EvHandlerState0) + when is_reference(ContinueStreamRef) -> + todo; +%% We always dereference the ContinueStreamRef because it includes a +%% reference() for Socks layers too. +%% +%% @todo Assert StreamRef to be our reference(). +handle_continue([_StreamRef|ContinueStreamRef0], Msg, + State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState0) -> + ContinueStreamRef = case ContinueStreamRef0 of + [CSR] -> CSR; + _ -> ContinueStreamRef0 + end, + {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef, + Msg, ProtoState, EvHandler, EvHandlerState0), + {{state, commands(Commands, State)}, EvHandlerState}. + +%% @todo Probably just pass it forward? +update_flow(_State, _ReplyTo, _StreamRef, _Inc) -> + todo. + +%% @todo ? +closing(_Reason, _State, _EvHandler, _EvHandlerState) -> + todo. + +%% @todo ? +close(_Reason, _State, _EvHandler, _EvHandlerState) -> + todo. + +%% @todo ? +keepalive(_State, _EvHandler, _EvHandlerState) -> + todo. + +%% We pass the headers forward and optionally dereference StreamRef. +headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, Method, Host, Port, Path, Headers, + InitialFlow, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State, StreamRef0), + {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, StreamRef, + ReplyTo, Method, Host, Port, Path, Headers, + InitialFlow, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + +%% We pass the request forward and optionally dereference StreamRef. +request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, Method, Host, Port, Path, Headers, Body, + InitialFlow, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State, StreamRef0), + {ProtoState, EvHandlerState} = Proto:request(ProtoState0, StreamRef, + ReplyTo, Method, Host, Port, Path, Headers, Body, + InitialFlow, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + +%% We pass the data forward and optionally dereference StreamRef. +data(State=#tunnel_state{socket=Socket, transport=Transport, + stream_ref=TunnelStreamRef0, protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) -> + TunnelStreamRef = if + is_list(TunnelStreamRef0) -> lists:last(TunnelStreamRef0); + true -> TunnelStreamRef0 + end, + case StreamRef0 of + TunnelStreamRef -> + ok = Transport:send(Socket, Data), + {State, EvHandlerState0}; + _ -> + StreamRef = maybe_dereference(State, StreamRef0), + {ProtoState, EvHandlerState} = Proto:data(ProtoState0, StreamRef, + ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState} + end. + +%% We pass the CONNECT request forward and optionally dereference StreamRef. +connect(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, Destination, TunnelInfo, Headers, InitialFlow) -> + StreamRef = maybe_dereference(State, StreamRef0), + ProtoState = Proto:connect(ProtoState0, StreamRef, + ReplyTo, Destination, TunnelInfo, Headers, InitialFlow), + State#tunnel_state{protocol_state=ProtoState}. + +%% @todo ? +cancel(_State, _StreamRef, _ReplyTo, _EvHandler, _EvHandlerState) -> + todo. + +%% @todo ? +%% ... we might have to do update Cowlib there... +timeout(_State, {cow_http2_machine, _Name}, _TRef) -> + todo. + +stream_info(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, StreamRef0) -> + StreamRef = maybe_dereference(State, StreamRef0), + Proto:stream_info(ProtoState, StreamRef). + +tunneled_name(#tunnel_state{protocol=Proto}) -> + Proto:name(). + +%% @todo ? +down(_State) -> + todo. + +%% Internal. + +commands(Command, State) when not is_list(Command) -> + commands([Command], State); +commands([], State) -> + State; +commands([{state, ProtoState}|Tail], State) -> + commands(Tail, State#tunnel_state{protocol_state=ProtoState}); +%% @todo We must pass down the set_cookie commands. Have a commands_queue. +commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) -> + commands(Tail, State); +commands([{send, IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) -> + Transport:send(Socket, Data), + commands(Tail, State); +%% @todo How to handle origin changes? +commands([{origin, _, _NewHost, _NewPort, _Type}|Tail], State) -> + commands(Tail, State); +commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], + State=#tunnel_state{stream_ref=TunnelStreamRef, opts=Opts, protocol=CurrentProto}) -> + Type = case CurrentProto:name() of + socks -> socks; + _ -> connect + end, + StreamRef = case Type of + socks -> TunnelStreamRef; + connect -> gun_protocols:stream_ref(NewProtocol) + end, + ContinueStreamRef0 = continue_stream_ref(State), + ContinueStreamRef = case Type of + socks -> ContinueStreamRef0 ++ [make_ref()]; + connect -> ContinueStreamRef0 ++ [if is_list(StreamRef) -> lists:last(StreamRef); true -> StreamRef end] + end, + OriginSocket = #{ + gun_pid => self(), + reply_to => ReplyTo, + stream_ref => StreamRef, + handle_continue_stream_ref => ContinueStreamRef + }, + ProtoOpts = Opts#{ + stream_ref => StreamRef, + tunnel => #{ + type => Type, + info => #{}, %% @todo + new_protocol => NewProtocol + } + }, + Proto = gun_tunnel, + {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), +%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); +commands([{tls_handshake, HandshakeEvent, Protocols, ReplyTo}|Tail], + State=#tunnel_state{opts=Opts, protocol=CurrentProto}) -> + Type = case CurrentProto:name() of + socks -> socks; + _ -> connect + end, + #{ + stream_ref := StreamRef + } = HandshakeEvent, + ContinueStreamRef0 = continue_stream_ref(State), + ContinueStreamRef = case Type of + socks -> ContinueStreamRef0 ++ [make_ref()]; + connect -> ContinueStreamRef0 ++ [lists:last(StreamRef)] + end, + OriginSocket = #{ + gun_pid => self(), + reply_to => ReplyTo, + stream_ref => StreamRef, + handle_continue_stream_ref => ContinueStreamRef + }, + ProtoOpts = Opts#{ + stream_ref => StreamRef, + tunnel => #{ + type => Type, + info => #{}, %% @todo + handshake_event => HandshakeEvent, + protocols => Protocols + } + }, + Proto = gun_tunnel, + {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); +commands([{active, true}|Tail], State) -> + commands(Tail, State). + +continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) -> + if + is_list(ContinueStreamRef) -> ContinueStreamRef; + true -> [ContinueStreamRef] + end; +continue_stream_ref(#tunnel_state{tls_origin_socket=#{handle_continue_stream_ref := ContinueStreamRef}}) -> + if + is_list(ContinueStreamRef) -> ContinueStreamRef; + true -> [ContinueStreamRef] + end. + +maybe_dereference(#tunnel_state{stream_ref=RealStreamRef, + type=connect, protocol=gun_tunnel}, [_StreamRef|Tail]) -> + %% @todo Assert that we got the right stream. +% StreamRef = if is_list(RealStreamRef) -> lists:last(RealStreamRef); true -> RealStreamRef end, + case Tail of + [Ref] -> Ref; + _ -> Tail + end; +%% We do not dereference when we are the target. +%% For example when creating a new stream on the origin via tunnel(s). +maybe_dereference(#tunnel_state{type=connect}, StreamRef) -> + StreamRef; +maybe_dereference(#tunnel_state{type=socks}, StreamRef) -> + StreamRef. diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index c8aff46..4116ef4 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -557,6 +557,15 @@ do_connect_http(OriginScheme, OriginTransport, OriginProtocol, ProxyScheme, Prox gun:close(ConnPid). connect_cowboy_http_via_h2c(_) -> + +%dbg:tracer(), +%dbg:tpl(gun_http, []), +%dbg:tpl(gun_http2, []), +%dbg:tpl(gun_tunnel, []), +%dbg:tpl(gun_tcp_proxy, []), +%dbg:tpl(gun, []), +%dbg:p(all, c), + doc("CONNECT can be used to establish a TCP connection " "to an HTTP/1.1 server via a TCP HTTP/2 proxy. (RFC7540 8.3)"), do_connect_cowboy(<<"http">>, tcp, http, <<"http">>, tcp). @@ -623,6 +632,7 @@ do_connect_cowboy(_OriginScheme, OriginTransport, OriginProtocol, _ProxyScheme, {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), {up, OriginProtocol} = gun:await(ConnPid, StreamRef), ProxiedStreamRef = gun:get(ConnPid, "/proxied", #{}, #{tunnel => StreamRef}), + timer:sleep(1000), {response, nofin, 200, _} = gun:await(ConnPid, ProxiedStreamRef), %% We can create more requests on the proxy as well. ProxyStreamRef = gun:get(ConnPid, "/"), @@ -652,9 +662,23 @@ do_cowboy_origin(OriginTransport, OriginProtocol) -> connect_http_via_http_via_h2c(_) -> doc("CONNECT can be used to establish a TCP connection " "to an HTTP/1.1 server via a tunnel going through both " - "an HTTP/2 and an HTTP/1.1 proxy. (RFC7231 4.3.6)"), + "a TCP HTTP/2 and a TCP HTTP/1.1 proxy. (RFC7540 8.3)"), do_connect_via_multiple_proxies(tcp, http, tcp, http, tcp). +connect_https_via_https_via_h2(_) -> + +%dbg:tracer(), +%dbg:tpl(?MODULE, []), +%dbg:tpl(gun, []), +%dbg:tpl(gun_http, []), +%dbg:tpl(gun_http2, []), +%dbg:p(all, c), + + doc("CONNECT can be used to establish a TLS connection " + "to an HTTP/1.1 server via a tunnel going through both " + "a TLS HTTP/2 and a TLS HTTP/1.1 proxy. (RFC7540 8.3)"), + do_connect_via_multiple_proxies(tls, http, tls, http, tls). + do_connect_via_multiple_proxies(OriginTransport, OriginProtocol, Proxy2Transport, Proxy2Protocol, Proxy1Transport) -> {ok, Ref, OriginPort} = do_cowboy_origin(OriginTransport, OriginProtocol), @@ -665,16 +689,18 @@ do_connect_via_multiple_proxies(OriginTransport, OriginProtocol, {ok, Proxy2Pid, Proxy2Port} = rfc7231_SUITE:do_proxy_start(Proxy2Transport), %% First proxy. {ok, ConnPid} = gun:open("localhost", Proxy1Port, #{ + transport => Proxy1Transport, protocols => [http2] }), {ok, http2} = gun:await_up(ConnPid), + handshake_completed = receive_from(Proxy1Pid), %% Second proxy. StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => Proxy2Port, + transport => Proxy2Transport, protocols => [Proxy2Protocol] }, []), - handshake_completed = receive_from(Proxy1Pid), Authority1 = iolist_to_binary(["localhost:", integer_to_binary(Proxy2Port)]), {request, #{ <<":method">> := <<"CONNECT">>, @@ -686,6 +712,7 @@ do_connect_via_multiple_proxies(OriginTransport, OriginProtocol, StreamRef2 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, + transport => OriginTransport, protocols => [OriginProtocol] }, [], #{tunnel => StreamRef1}), Authority2 = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), @@ -697,6 +724,7 @@ do_connect_via_multiple_proxies(OriginTransport, OriginProtocol, ProxiedStreamRef = gun:get(ConnPid, "/proxied", [], #{tunnel => StreamRef2}), {response, nofin, 200, _} = gun:await(ConnPid, ProxiedStreamRef), gun:close(ConnPid) + %% @todo Also test stream_info. after cowboy:stop_listener(Ref) end. -- cgit v1.2.3