From a093bf88e1740e4f89937d84cd4d5b26cb5b4e80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 16 Jul 2020 14:56:45 +0200 Subject: Initial HTTP/2 CONNECT implementation --- ebin/gun.app | 2 +- src/gun.erl | 27 +++++-- src/gun_http2.erl | 184 ++++++++++++++++++++++++++++++++++++++++++++-- src/gun_tcp_proxy.erl | 57 +++++++++++++++ test/raw_SUITE.erl | 48 +++++++++++- test/rfc7540_SUITE.erl | 193 +++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 496 insertions(+), 15 deletions(-) create mode 100644 src/gun_tcp_proxy.erl diff --git a/ebin/gun.app b/ebin/gun.app index affc6fd..c0f21d1 100644 --- a/ebin/gun.app +++ b/ebin/gun.app @@ -1,7 +1,7 @@ {application, 'gun', [ {description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."}, {vsn, "2.0.0-pre.2"}, - {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_ws','gun_ws_h']}, + {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_ws','gun_ws_h']}, {registered, [gun_sup]}, {applications, [kernel,stdlib,ssl,cowlib]}, {mod, {gun_app, []}}, diff --git a/src/gun.erl b/src/gun.erl index f4f1fca..0a0560d 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -108,6 +108,7 @@ -export([connected_no_input/3]). -export([connected_ws_only/3]). -export([closing/3]). +-export([protocol_handler/1]). -export([terminate/3]). -type req_headers() :: [{binary() | string() | atom(), iodata()}] @@ -178,7 +179,8 @@ %% the entire connection. -type req_opts() :: #{ flow => pos_integer(), - reply_to => pid() + reply_to => pid(), + tunnel => reference() | [reference()] }. -export_type([req_opts/0]). @@ -580,12 +582,13 @@ headers(ServerPid, Method, Path, Headers) -> headers(ServerPid, Method, Path, Headers, #{}). -spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference(). -headers(ServerPid, Method, Path, Headers, ReqOpts) -> - StreamRef = make_ref(), +headers(ServerPid, Method, Path, Headers0, ReqOpts) -> + Tunnel = get_tunnel(ReqOpts), + StreamRef = make_stream_ref(Tunnel), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef, - Method, Path, normalize_headers(Headers), InitialFlow}), + Method, Path, normalize_headers(Headers0), InitialFlow}), StreamRef. -spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference(). @@ -594,13 +597,24 @@ request(ServerPid, Method, Path, Headers, Body) -> -spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> - StreamRef = make_ref(), + Tunnel = get_tunnel(ReqOpts), + StreamRef = make_stream_ref(Tunnel), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef, Method, Path, normalize_headers(Headers), Body, InitialFlow}), StreamRef. +get_tunnel(#{tunnel := Tunnel}) when is_reference(Tunnel) -> + [Tunnel]; +get_tunnel(#{tunnel := Tunnel}) -> + Tunnel; +get_tunnel(_) -> + undefined. + +make_stream_ref(undefined) -> make_ref(); +make_stream_ref(Tunnel) -> Tunnel ++ [make_ref()]. + normalize_headers([]) -> []; normalize_headers([{Name, Value}|Tail]) when is_binary(Name) -> @@ -638,6 +652,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> StreamRef = make_ref(), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), + %% @todo tunnel gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}), StreamRef. @@ -876,6 +891,7 @@ ws_upgrade(ServerPid, Path, Headers, Opts) -> ok = gun_ws:check_options(Opts), StreamRef = make_ref(), ReplyTo = maps:get(reply_to, Opts, self()), + %% @todo Also accept tunnel option. gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, Opts}), StreamRef. @@ -1197,6 +1213,7 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + %% @todo Not events are currently handled for the request? ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; %% Public Websocket interface. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index fef1096..c4668d7 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -29,10 +29,12 @@ -export([headers/11]). -export([request/12]). -export([data/7]). +-export([connect/6]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([down/1]). +%-export([ws_upgrade/10]). -record(stream, { id = undefined :: cow_http2:streamid(), @@ -51,7 +53,10 @@ path :: iodata(), %% Content handlers state. - handler_state :: undefined | gun_content_handler:state() + handler_state :: undefined | gun_content_handler:state(), + + %% CONNECT tunnel. + tunnel :: {module(), any(), gun:connect_destination()} | {setup, gun:connect_destination()} | undefined }). -record(http2_state, { @@ -295,9 +300,20 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> end, State. -data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> - Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, - handler_state=Handlers0} = get_stream_by_id(State0, StreamID), +%% @todo CONNECT streams may need to pass data through TLS socket. +data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> + case get_stream_by_id(State, StreamID) of + Stream=#stream{tunnel=undefined} -> + data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream); + Stream=#stream{tunnel={Protocol, ProtoState0, Destination}} -> + {ProtoState, EvHandlerState} = Protocol:handle(Data, ProtoState0, + EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, Destination}}), + EvHandlerState} + end. + +data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, + Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) -> {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), Flow = case Flow0 of infinity -> infinity; @@ -340,9 +356,11 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com ref=StreamRef, reply_to=ReplyTo, authority=Authority, - path=Path + path=Path, + tunnel=Tunnel } = Stream, State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]}, + %% @todo CONNECT response handling if Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, @@ -353,6 +371,36 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com headers => Headers }, EvHandlerState0), {State, EvHandlerState}; + Status >= 200, Status =< 299, element(1, Tunnel) =:= setup -> + ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, + EvHandlerState = EvHandler:response_headers(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + %% @todo Handle TLS over TCP and TLS over TLS. + {setup, Destination} = Tunnel, + tcp = maps:get(transport, Destination, tcp), + [Protocol0] = maps:get(protocols, Destination, [http]), + %% Options are either passed directly or #{} is used. Since the + %% protocol only applies to a stream we cannot use connection-wide options. + {Protocol, ProtoOpts} = case Protocol0 of + {P, PO} -> {gun:protocol_handler(P), PO}; + P -> {gun:protocol_handler(P), #{}} + end, + %% @todo What about gun_socks_up? + %% @todo What about the StateName returned? + OriginSocket = #{ + reply_to => ReplyTo, + stream_ref => StreamRef + }, + OriginTransport = gun_tcp_proxy, + {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts), + %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + %% @todo What about keepalive? + {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, Destination}}), + EvHandlerState}; true -> ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ @@ -570,6 +618,7 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt Transport:send(Socket, cow_http2:ping(0)), {State, EvHandlerState}. +%% @todo tunnel headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> @@ -598,7 +647,8 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0) -> + Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( @@ -636,8 +686,44 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, {State, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; nofin -> maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState) + end; +%% Tunneled request. +%% +%% We call Proto:request in a loop until we get to a non-CONNECT stream. +%% When the transport is gun_tls_proxy we receive the TLS data +%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast +%% directly. The 'data' cast contains the tunnel for the StreamRef. +%% The tunnel is given as the socket and the gun_tls_proxy out_socket +%% is always a gun_tcp_proxy that sends a 'data' cast. +request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port, + Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel={Proto, ProtoState0, Destination=#{host := OriginHost, port := OriginPort}}} -> + %% @todo So the event is probably not giving the right StreamRef? + {ProtoState, EvHandlerState} = Proto:request(ProtoState0, normalize_stream_ref(Tail), + ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, + InitialFlow, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, Destination}}), EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. + %% get the ultimate stream by querying the #stream{} until we get the last one + %% call Proto:request in that stream + %% receive a {data, ...} back with the Tunnel for the StreamRef + %% if gun_tls_proxy then we get the wrapped TLS data + %% otherwise we get the data directly + %% handle the data in the same way as normal; data follows the same scenario + %% until we get a {data, ...} for the top-level stream + + %% What about data we receive from the socket? + %% + %% we get DATA with a StreamID for the CONNECT, we see it's CONNECT so we forward to Proto:data + initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; initial_flow(InitialFlow, _) -> InitialFlow. @@ -647,6 +733,7 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He _ -> gun_http:host_header(Transport, Host0, Port) end, %% @todo We also must remove any header found in the connection header. + %% @todo Much of this is duplicated in cow_http2_machine; sort things out. Headers = lists:keydelete(<<"host">>, 1, lists:keydelete(<<"connection">>, 1, @@ -666,8 +753,11 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He }, {ok, PseudoHeaders, Headers}. +normalize_stream_ref([StreamRef]) -> StreamRef; +normalize_stream_ref(StreamRef) -> StreamRef. + data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data, - EvHandler, EvHandlerState) -> + EvHandler, EvHandlerState) when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of @@ -680,6 +770,20 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, end; error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} + end; +%% Tunneled data. +data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel={Proto, ProtoState0, Destination}} -> + {ProtoState, EvHandlerState} = Proto:data(ProtoState0, normalize_stream_ref(Tail), + ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, Destination}}), EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, @@ -749,6 +853,41 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, State0 end. +connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, + http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Destination=#{host := Host0}, + Headers0, InitialFlow0) -> + Host = case Host0 of + Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); + _ -> Host0 + end, + Port = maps:get(port, Destination, 1080), + Authority = [Host, $:, integer_to_binary(Port)], + PseudoHeaders = #{ + method => <<"CONNECT">>, + authority => Authority + }, + Headers1 = + lists:keydelete(<<"host">>, 1, + lists:keydelete(<<"content-length">>, 1, Headers0)), + HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers1), + Headers = case {HasProxyAuthorization, Destination} of + {false, #{username := UserID, password := Password}} -> + [{<<"proxy-authorization">>, [ + <<"Basic ">>, + base64:encode(iolist_to_binary([UserID, $:, Password]))]} + |Headers1]; + _ -> + Headers1 + end, + {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0), + {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( + StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), + Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, + authority=Authority, path= <<>>, tunnel={setup, Destination}}, + create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream). + cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of @@ -776,8 +915,21 @@ timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, Nam connection_error(State, Error) end. -stream_info(State, StreamRef) -> +stream_info(State, StreamRef) when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of + #stream{reply_to=ReplyTo, tunnel={Protocol, _, #{host := OriginHost, port := OriginPort}}} -> + {ok, #{ + ref => StreamRef, + reply_to => ReplyTo, + state => running, + tunnel => #{ + transport => tcp, %% @todo + protocol => Protocol:name(), + origin_scheme => <<"http">>, %% @todo + origin_host => OriginHost, + origin_port => OriginPort + } + }}; #stream{reply_to=ReplyTo} -> {ok, #{ ref => StreamRef, @@ -786,6 +938,22 @@ stream_info(State, StreamRef) -> }}; error -> {ok, undefined} + end; +%% Tunneled streams. +stream_info(State, StreamRefList=[StreamRef|Tail]) -> + case get_stream_by_ref(State, StreamRef) of + #stream{tunnel={Protocol, ProtoState, _}} -> + %% We must return the real StreamRef as seen by the user. + %% We therefore set it on return, with the outer layer "winning". + %% @todo Would be well worth returning intermediaries as well. + case Protocol:stream_info(ProtoState, normalize_stream_ref(Tail)) of + {ok, undefined} -> + {ok, undefined}; + {ok, Info} -> + {ok, Info#{ref => StreamRefList}} + end; + error -> + {ok, undefined} end. down(#http2_state{stream_refs=Refs}) -> diff --git a/src/gun_tcp_proxy.erl b/src/gun_tcp_proxy.erl new file mode 100644 index 0000000..b4236f4 --- /dev/null +++ b/src/gun_tcp_proxy.erl @@ -0,0 +1,57 @@ +%% Copyright (c) 2020, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_tcp_proxy). + +-export([name/0]). +-export([messages/0]). +-export([connect/3]). +-export([connect/4]). +-export([send/2]). +-export([setopts/2]). +-export([sockname/1]). +-export([close/1]). + +-type socket() :: #{ + reply_to := pid(), + stream_ref := reference() | [reference()] +}. + +name() -> tcp_proxy. + +messages() -> {tcp_proxy, tcp_proxy_closed, tcp_proxy_error}. + +-spec connect(_, _, _) -> no_return(). +connect(_, _, _) -> + error(not_implemented). + +-spec connect(_, _, _, _) -> no_return(). +connect(_, _, _, _) -> + error(not_implemented). + +-spec send(socket(), iodata()) -> ok. +send(#{reply_to := ReplyTo, stream_ref := StreamRef}, Data) -> + gen_statem:cast(self(), {data, ReplyTo, StreamRef, nofin, Data}). + +-spec setopts(_, _) -> no_return(). +setopts(_, _) -> + error(not_implemented). + +-spec sockname(_) -> no_return(). +sockname(_) -> + error(not_implemented). + +-spec close(socket()) -> ok. +close(_) -> + ok. diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl index 18ab3b5..9b836c0 100644 --- a/test/raw_SUITE.erl +++ b/test/raw_SUITE.erl @@ -139,7 +139,7 @@ do_connect_raw(OriginTransport, ProxyTransport) -> protocols => [raw] }), {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), - {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef), %% @todo Why fin? handshake_completed = receive_from(OriginPid), %% When we take over the entire connection there is no stream reference. gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), @@ -186,6 +186,52 @@ connect_raw_reply_to(_) -> gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end. +h2_connect_tcp_raw_tcp(_) -> + doc("Use HTTP/2 CONNECT over TCP to connect to a remote endpoint using the raw protocol over TCP."), + do_h2_connect_raw(tcp, tcp). + +do_h2_connect_raw(OriginTransport, ProxyTransport) -> + {ok, OriginPid, OriginPort} = init_origin(OriginTransport, raw, fun do_echo/3), + {ok, ProxyPid, ProxyPort} = rfc7540_SUITE:do_proxy_start(ProxyTransport, [ + {proxy_stream, 1, 200, [], 0, undefined} + ]), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + transport => ProxyTransport, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + handshake_completed = receive_from(ProxyPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + transport => OriginTransport, + protocols => [raw] + }), + {request, #{ + <<":method">> := <<"CONNECT">>, + <<":authority">> := Authority + }} = receive_from(ProxyPid), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + handshake_completed = receive_from(OriginPid), + gun:data(ConnPid, StreamRef, nofin, <<"Hello world!">>), + {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined), +%% @todo +% #{ +% transport := OriginTransport, +% protocol := raw, +% origin_scheme := _, %% @todo This should be 'undefined'. +% origin_host := "localhost", +% origin_port := OriginPort, +% intermediaries := [#{ +% type := connect, +% host := "localhost", +% port := ProxyPort, +% transport := ProxyTransport, +% protocol := http +% }]} = gun:info(ConnPid), + gun:close(ConnPid). + http11_upgrade_raw_tcp(_) -> doc("Use the HTTP Upgrade mechanism to switch to the raw protocol over TCP."), do_http11_upgrade_raw(tcp). diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index 37e6903..09a0923 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -17,12 +17,146 @@ -compile(nowarn_export_all). -import(ct_helper, [doc/1]). +-import(gun_test, [init_origin/2]). -import(gun_test, [init_origin/3]). -import(gun_test, [receive_from/1]). all() -> ct_helper:all(?MODULE). +%% Proxy helpers. + +-record(proxy_stream, { + id, + status, + resp_headers = [], + delay = 0, + origin_socket +}). + +-record(proxy, { + parent, + socket, + transport, + streams = [], + decode_state = cow_hpack:init(), + encode_state = cow_hpack:init() +}). + +do_proxy_start(Transport) -> + do_proxy_start(Transport, [#proxy_stream{id=1, status=200, resp_headers=[], delay=0}]). + +do_proxy_start(Transport0, Streams) -> + Transport = case Transport0 of + tcp -> gun_tcp; + tls -> gun_tls + end, + Proxy = #proxy{parent=self(), transport=Transport, streams=Streams}, + Pid = spawn_link(fun() -> do_proxy_init(Proxy) end), + Port = receive_from(Pid), + {ok, Pid, Port}. + +do_proxy_init(Proxy=#proxy{parent=Parent, transport=Transport}) -> + {ok, ListenSocket} = case Transport of + gun_tcp -> + gen_tcp:listen(0, [binary, {active, false}]); + gun_tls -> + Opts = ct_helper:get_certs_from_ets(), + ssl:listen(0, [binary, {active, false}|Opts]) + end, + {ok, {_, Port}} = Transport:sockname(ListenSocket), + Parent ! {self(), Port}, + {ok, Socket} = case Transport of + gun_tcp -> + gen_tcp:accept(ListenSocket, 5000); + gun_tls -> + {ok, Socket0} = ssl:transport_accept(ListenSocket, 5000), + ssl:handshake(Socket0, 5000), + {ok, <<"h2">>} = ssl:negotiated_protocol(Socket0), + {ok, Socket0} + end, + gun_test:http2_handshake(Socket, case Transport of + gun_tcp -> gen_tcp; + gun_tls -> ssl + end), + Parent ! {self(), handshake_completed}, + Transport:setopts(Socket, [{active, true}]), + do_proxy_receive(<<>>, Proxy#proxy{socket=Socket}). + +do_proxy_receive(Buffer, Proxy=#proxy{socket=Socket, transport=Transport}) -> + {OK, _, _} = Transport:messages(), + receive + {OK, Socket, Data0} -> + do_proxy_parse(<>, Proxy); + {tcp, OriginSocket, OriginData} -> + do_proxy_forward(Buffer, Proxy, OriginSocket, OriginData); + {tcp_closed, _} -> + ok; + {ssl_closed, _} -> + ok; + Msg -> + error(Msg) + end. + +%% We only expect to receive data on a CONNECT stream. +do_proxy_parse(<>, + Proxy=#proxy{streams=Streams}) -> + #proxy_stream{origin_socket=OriginSocket} + = lists:keyfind(StreamID, #proxy_stream.id, Streams), + case gen_tcp:send(OriginSocket, Payload) of + ok -> + do_proxy_parse(Rest, Proxy); + {error, _} -> + ok + end; +do_proxy_parse(<>, + Proxy=#proxy{parent=Parent, socket=Socket, transport=Transport, + streams=Streams0, decode_state=DecodeState0, encode_state=EncodeState0}) -> + #proxy_stream{status=Status, resp_headers=RespHeaders, delay=Delay} + = Stream = lists:keyfind(StreamID, #proxy_stream.id, Streams0), + {ReqHeaders0, DecodeState} = cow_hpack:decode(ReqHeadersBlock, DecodeState0), + ReqHeaders = maps:from_list(ReqHeaders0), + timer:sleep(Delay), + Parent ! {self(), {request, ReqHeaders}}, + {IsFin, OriginSocket} = case ReqHeaders of + #{<<":method">> := <<"CONNECT">>, <<":authority">> := Authority} + when Status >= 200, Status < 300 -> + {OriginHost, OriginPort} = cow_http_hd:parse_host(Authority), + {ok, OriginSocket0} = gen_tcp:connect( + binary_to_list(OriginHost), OriginPort, + [binary, {active, true}]), + {nofin, OriginSocket0}; + #{} -> + {fin, undefined} + end, + {RespHeadersBlock, EncodeState} = cow_hpack:encode([ + {<<":status">>, integer_to_binary(Status)} + |RespHeaders], EncodeState0), + ok = Transport:send(Socket, [ + cow_http2:headers(StreamID, IsFin, RespHeadersBlock) + ]), + Streams = lists:keystore(StreamID, #proxy_stream.id, Streams0, + Stream#proxy_stream{origin_socket=OriginSocket}), + do_proxy_parse(Rest, Proxy#proxy{streams=Streams, + decode_state=DecodeState, encode_state=EncodeState}); +do_proxy_parse(<>, Proxy) -> + ct:pal("Ignoring packet header ~0p~npayload ~p", [Header, Payload]), + do_proxy_parse(Rest, Proxy); +do_proxy_parse(Rest, Proxy) -> + do_proxy_receive(Rest, Proxy). + +do_proxy_forward(Buffer, Proxy=#proxy{socket=Socket, transport=Transport, streams=Streams}, + OriginSocket, OriginData) -> + #proxy_stream{id=StreamID} = lists:keyfind(OriginSocket, #proxy_stream.origin_socket, Streams), + Len = byte_size(OriginData), + Data = [<>, OriginData], + case Transport:send(Socket, Data) of + ok -> + do_proxy_receive(Buffer, Proxy); + {error, _} -> + ok + end. + %% Tests. authority_default_port_http(_) -> @@ -295,3 +429,62 @@ settings_ack_timeout(_) -> {ok, http2} = gun:await_up(ConnPid), timer:sleep(6000), gun:close(ConnPid). + +connect_http(_) -> + doc("CONNECT can be used to establish a TCP connection " + "to an HTTP/1.1 server via a TCP HTTP/2 proxy. (RFC7540 8.3)"), + do_connect_http(<<"http">>, tcp, <<"http">>, tcp). + +do_connect_http(OriginScheme, OriginTransport, ProxyScheme, ProxyTransport) -> + {ok, OriginPid, OriginPort} = init_origin(OriginTransport, http), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyTransport, [ + #proxy_stream{id=1, status=200} + ]), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + transport => ProxyTransport, + protocols => [http2] + }), + {ok, http2} = gun:await_up(ConnPid), + handshake_completed = receive_from(ProxyPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + transport => OriginTransport + }), + {request, #{ + <<":method">> := <<"CONNECT">>, + <<":authority">> := Authority + }} = receive_from(ProxyPid), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + handshake_completed = receive_from(OriginPid), + ProxiedStreamRef = gun:get(ConnPid, "/proxied", #{}, #{tunnel => StreamRef}), + Data = receive_from(OriginPid), + Lines = binary:split(Data, <<"\r\n">>, [global]), + [<<"host: ", Authority/bits>>] = [L || <<"host: ", _/bits>> = L <- Lines], + #{ + transport := ProxyTransport, + protocol := http2, + origin_scheme := ProxyScheme, + origin_host := "localhost", + origin_port := ProxyPort, + intermediaries := [] %% Intermediaries are specific to the CONNECT stream. + } = gun:info(ConnPid), + {ok, #{ + ref := StreamRef, + reply_to := Self, + state := running, + tunnel := #{ + transport := OriginTransport, + protocol := http, + origin_scheme := OriginScheme, + origin_host := "localhost", + origin_port := OriginPort + } + }} = gun:stream_info(ConnPid, StreamRef), + {ok, #{ + ref := ProxiedStreamRef, + reply_to := Self, + state := running + }} = gun:stream_info(ConnPid, ProxiedStreamRef), + gun:close(ConnPid). -- cgit v1.2.3