aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun.erl27
-rw-r--r--src/gun_http2.erl184
-rw-r--r--src/gun_tcp_proxy.erl57
-rw-r--r--test/raw_SUITE.erl48
-rw-r--r--test/rfc7540_SUITE.erl193
6 files changed, 496 insertions, 15 deletions
diff --git a/ebin/gun.app b/ebin/gun.app
index affc6fd..c0f21d1 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, 'gun', [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "2.0.0-pre.2"},
- {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_ws','gun_ws_h']},
+ {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_ws','gun_ws_h']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib]},
{mod, {gun_app, []}},
diff --git a/src/gun.erl b/src/gun.erl
index f4f1fca..0a0560d 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -108,6 +108,7 @@
-export([connected_no_input/3]).
-export([connected_ws_only/3]).
-export([closing/3]).
+-export([protocol_handler/1]).
-export([terminate/3]).
-type req_headers() :: [{binary() | string() | atom(), iodata()}]
@@ -178,7 +179,8 @@
%% the entire connection.
-type req_opts() :: #{
flow => pos_integer(),
- reply_to => pid()
+ reply_to => pid(),
+ tunnel => reference() | [reference()]
}.
-export_type([req_opts/0]).
@@ -580,12 +582,13 @@ headers(ServerPid, Method, Path, Headers) ->
headers(ServerPid, Method, Path, Headers, #{}).
-spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference().
-headers(ServerPid, Method, Path, Headers, ReqOpts) ->
- StreamRef = make_ref(),
+headers(ServerPid, Method, Path, Headers0, ReqOpts) ->
+ Tunnel = get_tunnel(ReqOpts),
+ StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef,
- Method, Path, normalize_headers(Headers), InitialFlow}),
+ Method, Path, normalize_headers(Headers0), InitialFlow}),
StreamRef.
-spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference().
@@ -594,13 +597,24 @@ request(ServerPid, Method, Path, Headers, Body) ->
-spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference().
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
- StreamRef = make_ref(),
+ Tunnel = get_tunnel(ReqOpts),
+ StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef,
Method, Path, normalize_headers(Headers), Body, InitialFlow}),
StreamRef.
+get_tunnel(#{tunnel := Tunnel}) when is_reference(Tunnel) ->
+ [Tunnel];
+get_tunnel(#{tunnel := Tunnel}) ->
+ Tunnel;
+get_tunnel(_) ->
+ undefined.
+
+make_stream_ref(undefined) -> make_ref();
+make_stream_ref(Tunnel) -> Tunnel ++ [make_ref()].
+
normalize_headers([]) ->
[];
normalize_headers([{Name, Value}|Tail]) when is_binary(Name) ->
@@ -638,6 +652,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
StreamRef = make_ref(),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
+ %% @todo tunnel
gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef,
Destination, Headers, InitialFlow}),
StreamRef.
@@ -876,6 +891,7 @@ ws_upgrade(ServerPid, Path, Headers, Opts) ->
ok = gun_ws:check_options(Opts),
StreamRef = make_ref(),
ReplyTo = maps:get(reply_to, Opts, self()),
+ %% @todo Also accept tunnel option.
gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, Opts}),
StreamRef.
@@ -1197,6 +1213,7 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ %% @todo Not events are currently handled for the request?
ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
%% Public Websocket interface.
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index fef1096..c4668d7 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -29,10 +29,12 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
+-export([connect/6]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([down/1]).
+%-export([ws_upgrade/10]).
-record(stream, {
id = undefined :: cow_http2:streamid(),
@@ -51,7 +53,10 @@
path :: iodata(),
%% Content handlers state.
- handler_state :: undefined | gun_content_handler:state()
+ handler_state :: undefined | gun_content_handler:state(),
+
+ %% CONNECT tunnel.
+ tunnel :: {module(), any(), gun:connect_destination()} | {setup, gun:connect_destination()} | undefined
}).
-record(http2_state, {
@@ -295,9 +300,20 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
-data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
- Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0,
- handler_state=Handlers0} = get_stream_by_id(State0, StreamID),
+%% @todo CONNECT streams may need to pass data through TLS socket.
+data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
+ case get_stream_by_id(State, StreamID) of
+ Stream=#stream{tunnel=undefined} ->
+ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream);
+ Stream=#stream{tunnel={Protocol, ProtoState0, Destination}} ->
+ {ProtoState, EvHandlerState} = Protocol:handle(Data, ProtoState0,
+ EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, Destination}}),
+ EvHandlerState}
+ end.
+
+data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
+ Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) ->
{ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
Flow = case Flow0 of
infinity -> infinity;
@@ -340,9 +356,11 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
ref=StreamRef,
reply_to=ReplyTo,
authority=Authority,
- path=Path
+ path=Path,
+ tunnel=Tunnel
} = Stream,
State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]},
+ %% @todo CONNECT response handling
if
Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
@@ -353,6 +371,36 @@ headers_frame(State0=#http2_state{content_handlers=Handlers0, commands_queue=Com
headers => Headers
}, EvHandlerState0),
{State, EvHandlerState};
+ Status >= 200, Status =< 299, element(1, Tunnel) =:= setup ->
+ ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
+ EvHandlerState = EvHandler:response_headers(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ %% @todo Handle TLS over TCP and TLS over TLS.
+ {setup, Destination} = Tunnel,
+ tcp = maps:get(transport, Destination, tcp),
+ [Protocol0] = maps:get(protocols, Destination, [http]),
+ %% Options are either passed directly or #{} is used. Since the
+ %% protocol only applies to a stream we cannot use connection-wide options.
+ {Protocol, ProtoOpts} = case Protocol0 of
+ {P, PO} -> {gun:protocol_handler(P), PO};
+ P -> {gun:protocol_handler(P), #{}}
+ end,
+ %% @todo What about gun_socks_up?
+ %% @todo What about the StateName returned?
+ OriginSocket = #{
+ reply_to => ReplyTo,
+ stream_ref => StreamRef
+ },
+ OriginTransport = gun_tcp_proxy,
+ {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts),
+ %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ %% @todo What about keepalive?
+ {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState, Destination}}),
+ EvHandlerState};
true ->
ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
@@ -570,6 +618,7 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt
Transport:send(Socket, cow_http2:ping(0)),
{State, EvHandlerState}.
+%% @todo tunnel
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) ->
@@ -598,7 +647,8 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0) ->
+ Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -636,8 +686,44 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
{State, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
nofin ->
maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState)
+ end;
+%% Tunneled request.
+%%
+%% We call Proto:request in a loop until we get to a non-CONNECT stream.
+%% When the transport is gun_tls_proxy we receive the TLS data
+%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast
+%% directly. The 'data' cast contains the tunnel for the StreamRef.
+%% The tunnel is given as the socket and the gun_tls_proxy out_socket
+%% is always a gun_tcp_proxy that sends a 'data' cast.
+request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port,
+ Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel={Proto, ProtoState0, Destination=#{host := OriginHost, port := OriginPort}}} ->
+ %% @todo So the event is probably not giving the right StreamRef?
+ {ProtoState, EvHandlerState} = Proto:request(ProtoState0, normalize_stream_ref(Tail),
+ ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
+ InitialFlow, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, Destination}}), EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
+ %% get the ultimate stream by querying the #stream{} until we get the last one
+ %% call Proto:request in that stream
+ %% receive a {data, ...} back with the Tunnel for the StreamRef
+ %% if gun_tls_proxy then we get the wrapped TLS data
+ %% otherwise we get the data directly
+ %% handle the data in the same way as normal; data follows the same scenario
+ %% until we get a {data, ...} for the top-level stream
+
+ %% What about data we receive from the socket?
+ %%
+ %% we get DATA with a StreamID for the CONNECT, we see it's CONNECT so we forward to Proto:data
+
initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
initial_flow(InitialFlow, _) -> InitialFlow.
@@ -647,6 +733,7 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He
_ -> gun_http:host_header(Transport, Host0, Port)
end,
%% @todo We also must remove any header found in the connection header.
+ %% @todo Much of this is duplicated in cow_http2_machine; sort things out.
Headers =
lists:keydelete(<<"host">>, 1,
lists:keydelete(<<"connection">>, 1,
@@ -666,8 +753,11 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He
},
{ok, PseudoHeaders, Headers}.
+normalize_stream_ref([StreamRef]) -> StreamRef;
+normalize_stream_ref(StreamRef) -> StreamRef.
+
data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data,
- EvHandler, EvHandlerState) ->
+ EvHandler, EvHandlerState) when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
@@ -680,6 +770,20 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
end;
error ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState}
+ end;
+%% Tunneled data.
+data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel={Proto, ProtoState0, Destination}} ->
+ {ProtoState, EvHandlerState} = Proto:data(ProtoState0, normalize_stream_ref(Tail),
+ ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, Destination}}), EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0,
@@ -749,6 +853,41 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
State0
end.
+connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
+ http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Destination=#{host := Host0},
+ Headers0, InitialFlow0) ->
+ Host = case Host0 of
+ Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
+ _ -> Host0
+ end,
+ Port = maps:get(port, Destination, 1080),
+ Authority = [Host, $:, integer_to_binary(Port)],
+ PseudoHeaders = #{
+ method => <<"CONNECT">>,
+ authority => Authority
+ },
+ Headers1 =
+ lists:keydelete(<<"host">>, 1,
+ lists:keydelete(<<"content-length">>, 1, Headers0)),
+ HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers1),
+ Headers = case {HasProxyAuthorization, Destination} of
+ {false, #{username := UserID, password := Password}} ->
+ [{<<"proxy-authorization">>, [
+ <<"Basic ">>,
+ base64:encode(iolist_to_binary([UserID, $:, Password]))]}
+ |Headers1];
+ _ ->
+ Headers1
+ end,
+ {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0),
+ {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
+ StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
+ Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
+ authority=Authority, path= <<>>, tunnel={setup, Destination}},
+ create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream).
+
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
@@ -776,8 +915,21 @@ timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, Nam
connection_error(State, Error)
end.
-stream_info(State, StreamRef) ->
+stream_info(State, StreamRef) when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
+ #stream{reply_to=ReplyTo, tunnel={Protocol, _, #{host := OriginHost, port := OriginPort}}} ->
+ {ok, #{
+ ref => StreamRef,
+ reply_to => ReplyTo,
+ state => running,
+ tunnel => #{
+ transport => tcp, %% @todo
+ protocol => Protocol:name(),
+ origin_scheme => <<"http">>, %% @todo
+ origin_host => OriginHost,
+ origin_port => OriginPort
+ }
+ }};
#stream{reply_to=ReplyTo} ->
{ok, #{
ref => StreamRef,
@@ -786,6 +938,22 @@ stream_info(State, StreamRef) ->
}};
error ->
{ok, undefined}
+ end;
+%% Tunneled streams.
+stream_info(State, StreamRefList=[StreamRef|Tail]) ->
+ case get_stream_by_ref(State, StreamRef) of
+ #stream{tunnel={Protocol, ProtoState, _}} ->
+ %% We must return the real StreamRef as seen by the user.
+ %% We therefore set it on return, with the outer layer "winning".
+ %% @todo Would be well worth returning intermediaries as well.
+ case Protocol:stream_info(ProtoState, normalize_stream_ref(Tail)) of
+ {ok, undefined} ->
+ {ok, undefined};
+ {ok, Info} ->
+ {ok, Info#{ref => StreamRefList}}
+ end;
+ error ->
+ {ok, undefined}
end.
down(#http2_state{stream_refs=Refs}) ->
diff --git a/src/gun_tcp_proxy.erl b/src/gun_tcp_proxy.erl
new file mode 100644
index 0000000..b4236f4
--- /dev/null
+++ b/src/gun_tcp_proxy.erl
@@ -0,0 +1,57 @@
+%% Copyright (c) 2020, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_tcp_proxy).
+
+-export([name/0]).
+-export([messages/0]).
+-export([connect/3]).
+-export([connect/4]).
+-export([send/2]).
+-export([setopts/2]).
+-export([sockname/1]).
+-export([close/1]).
+
+-type socket() :: #{
+ reply_to := pid(),
+ stream_ref := reference() | [reference()]
+}.
+
+name() -> tcp_proxy.
+
+messages() -> {tcp_proxy, tcp_proxy_closed, tcp_proxy_error}.
+
+-spec connect(_, _, _) -> no_return().
+connect(_, _, _) ->
+ error(not_implemented).
+
+-spec connect(_, _, _, _) -> no_return().
+connect(_, _, _, _) ->
+ error(not_implemented).
+
+-spec send(socket(), iodata()) -> ok.
+send(#{reply_to := ReplyTo, stream_ref := StreamRef}, Data) ->
+ gen_statem:cast(self(), {data, ReplyTo, StreamRef, nofin, Data}).
+
+-spec setopts(_, _) -> no_return().
+setopts(_, _) ->
+ error(not_implemented).
+
+-spec sockname(_) -> no_return().
+sockname(_) ->
+ error(not_implemented).
+
+-spec close(socket()) -> ok.
+close(_) ->
+ ok.
diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl
index 18ab3b5..9b836c0 100644
--- a/test/raw_SUITE.erl
+++ b/test/raw_SUITE.erl
@@ -139,7 +139,7 @@ do_connect_raw(OriginTransport, ProxyTransport) ->
protocols => [raw]
}),
{request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid),
- {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef), %% @todo Why fin?
handshake_completed = receive_from(OriginPid),
%% When we take over the entire connection there is no stream reference.
gun:data(ConnPid, undefined, nofin, <<"Hello world!">>),
@@ -186,6 +186,52 @@ connect_raw_reply_to(_) ->
gun:data(ConnPid, undefined, nofin, <<"Hello world!">>),
receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end.
+h2_connect_tcp_raw_tcp(_) ->
+ doc("Use HTTP/2 CONNECT over TCP to connect to a remote endpoint using the raw protocol over TCP."),
+ do_h2_connect_raw(tcp, tcp).
+
+do_h2_connect_raw(OriginTransport, ProxyTransport) ->
+ {ok, OriginPid, OriginPort} = init_origin(OriginTransport, raw, fun do_echo/3),
+ {ok, ProxyPid, ProxyPort} = rfc7540_SUITE:do_proxy_start(ProxyTransport, [
+ {proxy_stream, 1, 200, [], 0, undefined}
+ ]),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ transport => ProxyTransport,
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ handshake_completed = receive_from(ProxyPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ transport => OriginTransport,
+ protocols => [raw]
+ }),
+ {request, #{
+ <<":method">> := <<"CONNECT">>,
+ <<":authority">> := Authority
+ }} = receive_from(ProxyPid),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ handshake_completed = receive_from(OriginPid),
+ gun:data(ConnPid, StreamRef, nofin, <<"Hello world!">>),
+ {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined),
+%% @todo
+% #{
+% transport := OriginTransport,
+% protocol := raw,
+% origin_scheme := _, %% @todo This should be 'undefined'.
+% origin_host := "localhost",
+% origin_port := OriginPort,
+% intermediaries := [#{
+% type := connect,
+% host := "localhost",
+% port := ProxyPort,
+% transport := ProxyTransport,
+% protocol := http
+% }]} = gun:info(ConnPid),
+ gun:close(ConnPid).
+
http11_upgrade_raw_tcp(_) ->
doc("Use the HTTP Upgrade mechanism to switch to the raw protocol over TCP."),
do_http11_upgrade_raw(tcp).
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index 37e6903..09a0923 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -17,12 +17,146 @@
-compile(nowarn_export_all).
-import(ct_helper, [doc/1]).
+-import(gun_test, [init_origin/2]).
-import(gun_test, [init_origin/3]).
-import(gun_test, [receive_from/1]).
all() ->
ct_helper:all(?MODULE).
+%% Proxy helpers.
+
+-record(proxy_stream, {
+ id,
+ status,
+ resp_headers = [],
+ delay = 0,
+ origin_socket
+}).
+
+-record(proxy, {
+ parent,
+ socket,
+ transport,
+ streams = [],
+ decode_state = cow_hpack:init(),
+ encode_state = cow_hpack:init()
+}).
+
+do_proxy_start(Transport) ->
+ do_proxy_start(Transport, [#proxy_stream{id=1, status=200, resp_headers=[], delay=0}]).
+
+do_proxy_start(Transport0, Streams) ->
+ Transport = case Transport0 of
+ tcp -> gun_tcp;
+ tls -> gun_tls
+ end,
+ Proxy = #proxy{parent=self(), transport=Transport, streams=Streams},
+ Pid = spawn_link(fun() -> do_proxy_init(Proxy) end),
+ Port = receive_from(Pid),
+ {ok, Pid, Port}.
+
+do_proxy_init(Proxy=#proxy{parent=Parent, transport=Transport}) ->
+ {ok, ListenSocket} = case Transport of
+ gun_tcp ->
+ gen_tcp:listen(0, [binary, {active, false}]);
+ gun_tls ->
+ Opts = ct_helper:get_certs_from_ets(),
+ ssl:listen(0, [binary, {active, false}|Opts])
+ end,
+ {ok, {_, Port}} = Transport:sockname(ListenSocket),
+ Parent ! {self(), Port},
+ {ok, Socket} = case Transport of
+ gun_tcp ->
+ gen_tcp:accept(ListenSocket, 5000);
+ gun_tls ->
+ {ok, Socket0} = ssl:transport_accept(ListenSocket, 5000),
+ ssl:handshake(Socket0, 5000),
+ {ok, <<"h2">>} = ssl:negotiated_protocol(Socket0),
+ {ok, Socket0}
+ end,
+ gun_test:http2_handshake(Socket, case Transport of
+ gun_tcp -> gen_tcp;
+ gun_tls -> ssl
+ end),
+ Parent ! {self(), handshake_completed},
+ Transport:setopts(Socket, [{active, true}]),
+ do_proxy_receive(<<>>, Proxy#proxy{socket=Socket}).
+
+do_proxy_receive(Buffer, Proxy=#proxy{socket=Socket, transport=Transport}) ->
+ {OK, _, _} = Transport:messages(),
+ receive
+ {OK, Socket, Data0} ->
+ do_proxy_parse(<<Buffer/binary, Data0/bits>>, Proxy);
+ {tcp, OriginSocket, OriginData} ->
+ do_proxy_forward(Buffer, Proxy, OriginSocket, OriginData);
+ {tcp_closed, _} ->
+ ok;
+ {ssl_closed, _} ->
+ ok;
+ Msg ->
+ error(Msg)
+ end.
+
+%% We only expect to receive data on a CONNECT stream.
+do_proxy_parse(<<Len:24, 0:8, _:8, StreamID:32, Payload:Len/binary, Rest/bits>>,
+ Proxy=#proxy{streams=Streams}) ->
+ #proxy_stream{origin_socket=OriginSocket}
+ = lists:keyfind(StreamID, #proxy_stream.id, Streams),
+ case gen_tcp:send(OriginSocket, Payload) of
+ ok ->
+ do_proxy_parse(Rest, Proxy);
+ {error, _} ->
+ ok
+ end;
+do_proxy_parse(<<Len:24, 1:8, _:8, StreamID:32, ReqHeadersBlock:Len/binary, Rest/bits>>,
+ Proxy=#proxy{parent=Parent, socket=Socket, transport=Transport,
+ streams=Streams0, decode_state=DecodeState0, encode_state=EncodeState0}) ->
+ #proxy_stream{status=Status, resp_headers=RespHeaders, delay=Delay}
+ = Stream = lists:keyfind(StreamID, #proxy_stream.id, Streams0),
+ {ReqHeaders0, DecodeState} = cow_hpack:decode(ReqHeadersBlock, DecodeState0),
+ ReqHeaders = maps:from_list(ReqHeaders0),
+ timer:sleep(Delay),
+ Parent ! {self(), {request, ReqHeaders}},
+ {IsFin, OriginSocket} = case ReqHeaders of
+ #{<<":method">> := <<"CONNECT">>, <<":authority">> := Authority}
+ when Status >= 200, Status < 300 ->
+ {OriginHost, OriginPort} = cow_http_hd:parse_host(Authority),
+ {ok, OriginSocket0} = gen_tcp:connect(
+ binary_to_list(OriginHost), OriginPort,
+ [binary, {active, true}]),
+ {nofin, OriginSocket0};
+ #{} ->
+ {fin, undefined}
+ end,
+ {RespHeadersBlock, EncodeState} = cow_hpack:encode([
+ {<<":status">>, integer_to_binary(Status)}
+ |RespHeaders], EncodeState0),
+ ok = Transport:send(Socket, [
+ cow_http2:headers(StreamID, IsFin, RespHeadersBlock)
+ ]),
+ Streams = lists:keystore(StreamID, #proxy_stream.id, Streams0,
+ Stream#proxy_stream{origin_socket=OriginSocket}),
+ do_proxy_parse(Rest, Proxy#proxy{streams=Streams,
+ decode_state=DecodeState, encode_state=EncodeState});
+do_proxy_parse(<<Len:24, Header:6/binary, Payload:Len/binary, Rest/bits>>, Proxy) ->
+ ct:pal("Ignoring packet header ~0p~npayload ~p", [Header, Payload]),
+ do_proxy_parse(Rest, Proxy);
+do_proxy_parse(Rest, Proxy) ->
+ do_proxy_receive(Rest, Proxy).
+
+do_proxy_forward(Buffer, Proxy=#proxy{socket=Socket, transport=Transport, streams=Streams},
+ OriginSocket, OriginData) ->
+ #proxy_stream{id=StreamID} = lists:keyfind(OriginSocket, #proxy_stream.origin_socket, Streams),
+ Len = byte_size(OriginData),
+ Data = [<<Len:24, 0:8, 0:8, StreamID:32>>, OriginData],
+ case Transport:send(Socket, Data) of
+ ok ->
+ do_proxy_receive(Buffer, Proxy);
+ {error, _} ->
+ ok
+ end.
+
%% Tests.
authority_default_port_http(_) ->
@@ -295,3 +429,62 @@ settings_ack_timeout(_) ->
{ok, http2} = gun:await_up(ConnPid),
timer:sleep(6000),
gun:close(ConnPid).
+
+connect_http(_) ->
+ doc("CONNECT can be used to establish a TCP connection "
+ "to an HTTP/1.1 server via a TCP HTTP/2 proxy. (RFC7540 8.3)"),
+ do_connect_http(<<"http">>, tcp, <<"http">>, tcp).
+
+do_connect_http(OriginScheme, OriginTransport, ProxyScheme, ProxyTransport) ->
+ {ok, OriginPid, OriginPort} = init_origin(OriginTransport, http),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyTransport, [
+ #proxy_stream{id=1, status=200}
+ ]),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ transport => ProxyTransport,
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ handshake_completed = receive_from(ProxyPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ transport => OriginTransport
+ }),
+ {request, #{
+ <<":method">> := <<"CONNECT">>,
+ <<":authority">> := Authority
+ }} = receive_from(ProxyPid),
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
+ handshake_completed = receive_from(OriginPid),
+ ProxiedStreamRef = gun:get(ConnPid, "/proxied", #{}, #{tunnel => StreamRef}),
+ Data = receive_from(OriginPid),
+ Lines = binary:split(Data, <<"\r\n">>, [global]),
+ [<<"host: ", Authority/bits>>] = [L || <<"host: ", _/bits>> = L <- Lines],
+ #{
+ transport := ProxyTransport,
+ protocol := http2,
+ origin_scheme := ProxyScheme,
+ origin_host := "localhost",
+ origin_port := ProxyPort,
+ intermediaries := [] %% Intermediaries are specific to the CONNECT stream.
+ } = gun:info(ConnPid),
+ {ok, #{
+ ref := StreamRef,
+ reply_to := Self,
+ state := running,
+ tunnel := #{
+ transport := OriginTransport,
+ protocol := http,
+ origin_scheme := OriginScheme,
+ origin_host := "localhost",
+ origin_port := OriginPort
+ }
+ }} = gun:stream_info(ConnPid, StreamRef),
+ {ok, #{
+ ref := ProxiedStreamRef,
+ reply_to := Self,
+ state := running
+ }} = gun:stream_info(ConnPid, ProxiedStreamRef),
+ gun:close(ConnPid).