diff options
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | rebar.config | 2 | ||||
-rw-r--r-- | src/gun.erl | 100 | ||||
-rw-r--r-- | src/gun_http.erl | 112 | ||||
-rw-r--r-- | src/gun_http2.erl | 2 | ||||
-rw-r--r-- | src/gun_tls.erl | 6 | ||||
-rw-r--r-- | src/gun_ws.erl | 2 | ||||
-rw-r--r-- | test/rfc7231_SUITE.erl | 386 |
8 files changed, 576 insertions, 36 deletions
@@ -13,7 +13,7 @@ CT_OPTS += -pa test -ct_hooks gun_ct_hook [] # -boot start_sasl LOCAL_DEPS = ssl DEPS = cowlib -dep_cowlib = git https://github.com/ninenines/cowlib 2.5.1 +dep_cowlib = git https://github.com/ninenines/cowlib 2.6.0 DOC_DEPS = asciideck diff --git a/rebar.config b/rebar.config index 4614141..347417f 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ -{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.5.1"}} +{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.6.0"}} ]}. {erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard]}. diff --git a/src/gun.erl b/src/gun.erl index 115f603..af576fb 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -55,6 +55,11 @@ %% Streaming data. -export([data/4]). +%% Tunneling. +-export([connect/2]). +-export([connect/3]). +-export([connect/4]). + %% Awaiting gun messages. -export([await/2]). -export([await/3]). @@ -110,6 +115,22 @@ -export_type([opts/0]). %% @todo Add an option to disable/enable the notowner behavior. +-type connect_destination() :: #{ + host := inet:hostname() | inet:ip_address(), + port := inet:port_number(), + username => iodata(), + password => iodata(), + protocol => http | http2, + transport => tcp | tls, + tls_opts => [ssl:connect_option()], + tls_handshake_timeout => timeout() +}. +-export_type([connect_destination/0]). + +%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here +%% to indicate that the request must be sent on an existing CONNECT stream. +%% This is of course not required for HTTP/1.1 since the CONNECT takes over +%% the entire connection. -type req_opts() :: #{ reply_to => pid() }. @@ -137,8 +158,10 @@ parent :: pid(), owner :: pid(), owner_ref :: reference(), - host :: inet:hostname(), + host :: inet:hostname() | inet:ip_address(), port :: inet:port_number(), + origin_host :: inet:hostname() | inet:ip_address(), + origin_port :: inet:port_number(), opts :: opts(), keepalive_ref :: undefined | reference(), socket :: undefined | inet:socket() | ssl:sslsocket(), @@ -358,6 +381,7 @@ request(ServerPid, Method, Path, Headers) -> request(ServerPid, Method, Path, Headers, Body) -> request(ServerPid, Method, Path, Headers, Body, #{}). +%% @todo Accept header names as maps. -spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference(). request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_ref(), @@ -372,6 +396,23 @@ data(ServerPid, StreamRef, IsFin, Data) -> _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, ok. +%% Tunneling. + +-spec connect(pid(), connect_destination()) -> reference(). +connect(ServerPid, Destination) -> + connect(ServerPid, Destination, [], #{}). + +-spec connect(pid(), connect_destination(), headers()) -> reference(). +connect(ServerPid, Destination, Headers) -> + connect(ServerPid, Destination, Headers, #{}). + +-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference(). +connect(ServerPid, Destination, Headers, ReqOpts) -> + StreamRef = make_ref(), + ReplyTo = maps:get(reply_to, ReqOpts, self()), + _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers}, + StreamRef. + %% Awaiting gun messages. %% @todo spec await await_body @@ -565,6 +606,8 @@ ws_upgrade(ServerPid, Path, Headers, Opts) -> _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts}, StreamRef. +%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. +%% But it can be kept for the time being since it can still work for HTTP/1.1. -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> _ = ServerPid ! {ws_send, self(), Frames}, @@ -601,13 +644,14 @@ init(Parent, Owner, Host, Port, Opts) -> tls -> gun_tls end, OwnerRef = monitor(process, Owner), - connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, - host=Host, port=Port, opts=Opts, transport=Transport}, Retry). + transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, + host=Host, port=Port, origin_host=Host, origin_port=Port, + opts=Opts, transport=Transport}, Retry). default_transport(443) -> tls; default_transport(_) -> tcp. -connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) -> +transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) -> Protocols = [case P of http -> <<"http/1.1">>; http2 -> <<"h2">> @@ -626,7 +670,7 @@ connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tl {error, Reason} -> retry(State#state{last_error=Reason}, Retries) end; -connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> +transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> TransportOpts = [binary, {active, false} |maps:get(transport_opts, Opts, [])], case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of @@ -670,7 +714,7 @@ retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) -> _ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry), receive retry -> - connect(State, Retries); + transport_connect(State, Retries); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {retry_loop, State, Retries}) @@ -690,20 +734,18 @@ before_loop(State=#state{opts=Opts, protocol=Protocol}) -> end, loop(State#state{keepalive_ref=KeepaliveRef}). -loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, port=Port, opts=Opts, - socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> +loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, + origin_host=Host, origin_port=Port, opts=Opts, socket=Socket, + transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> {OK, Closed, Error} = Transport:messages(), Transport:setopts(Socket, [{active, once}]), receive {OK, Socket, Data} -> case Protocol:handle(Data, ProtoState) of - close -> - Transport:close(Socket), - down(State, normal); - {upgrade, Protocol2, ProtoState2} -> - ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); - ProtoState2 -> - loop(State#state{protocol_state=ProtoState2}) + Commands when is_list(Commands) -> + commands(Commands, State); + Command -> + commands([Command], State) end; {Closed, Socket} -> Protocol:close(ProtoState), @@ -736,6 +778,9 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por ProtoState2 = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data), loop(State#state{protocol_state=ProtoState2}); + {connect, ReplyTo, StreamRef, Destination, Headers} -> + ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), + loop(State#state{protocol_state=ProtoState2}); {cancel, ReplyTo, StreamRef} -> ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), loop(State#state{protocol_state=ProtoState2}); @@ -786,6 +831,31 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por loop(State) end. +commands([], State) -> + loop(State); +commands([close|_], State=#state{socket=Socket, transport=Transport}) -> + Transport:close(Socket), + down(State, normal); +commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) -> + Transport:close(Socket), + down(State, Error); +commands([{state, ProtoState}|Tail], State) -> + commands(Tail, State#state{protocol_state=ProtoState}); +%% @todo The scheme should probably not be ignored. +commands([{origin, _Scheme, Host, Port}|Tail], State) -> + commands(Tail, State#state{origin_host=Host, origin_port=Port}); +commands([{switch_transport, Transport, Socket}|Tail], State) -> + commands(Tail, State#state{socket=Socket, transport=Transport}); +%% @todo The two loops should be reunified and this clause generalized. +commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) -> + ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState}); +%% @todo And this state should probably not be ignored. +commands([{switch_protocol, Protocol, _ProtoState0}|Tail], + State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) -> + ProtoOpts = maps:get(http2_opts, Opts, #{}), + ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), + commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}). + ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> {OK, Closed, Error} = Transport:messages(), diff --git a/src/gun_http.erl b/src/gun_http.erl index d07502e..ee9d04f 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -23,6 +23,7 @@ -export([request/8]). -export([request/9]). -export([data/5]). +-export([connect/5]). -export([cancel/3]). -export([down/1]). -export([ws_upgrade/7]). @@ -30,10 +31,13 @@ -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer. %% @todo Make that a record. +-type connect_info() :: {connect, reference(), gun:connect_destination()}. + +%% @todo Make that a record. -type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options -record(stream, { - ref :: reference() | websocket_info(), + ref :: reference() | connect_info() | websocket_info(), reply_to :: pid(), method :: binary(), is_alive :: boolean(), @@ -87,7 +91,7 @@ init(Owner, Socket, Transport, Opts) -> %% Stop looping when we got no more data. handle(<<>>, State) -> - State; + {state, State}; %% Close when server responds and we don't have any open streams. handle(_, #http_state{streams=[]}) -> close; @@ -95,33 +99,33 @@ handle(_, #http_state{streams=[]}) -> handle(Data, State=#http_state{in=head, buffer=Buffer}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> State#http_state{buffer=Data2}; + nomatch -> {state, State#http_state{buffer=Data2}}; {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}) end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}) -> - send_data_if_alive(Data, State, nofin); + {state, send_data_if_alive(Data, State, nofin)}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, connection=Conn}) -> Buffer2 = << Buffer/binary, Data/binary >>, case cow_http_te:stream_chunked(Buffer2, InState) of more -> - State#http_state{buffer=Buffer2}; + {state, State#http_state{buffer=Buffer2}}; {more, Data2, InState2} -> - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin); + nofin)}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin); + nofin)}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer=Rest, in_state=InState2}, - nofin); + nofin)}; {done, HasTrailers, Rest} -> IsFin = case HasTrailers of trailers -> nofin; @@ -156,7 +160,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> State#http_state{buffer=Data2}; + nomatch -> {state, State#http_state{buffer=Data2}}; {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? @@ -174,14 +178,14 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> if %% More data coming. DataSize < Length -> - send_data_if_alive(Data, + {state, send_data_if_alive(Data, State#http_state{in={body, Length - DataSize}}, - nofin); + nofin)}; %% Stream finished, no rest. DataSize =:= Length -> State1 = send_data_if_alive(Data, State, fin), case Conn of - keepalive -> end_stream(State1); + keepalive -> {state, end_stream(State1)}; close -> close end; %% Stream finished, rest. @@ -194,7 +198,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> end end. -handle_head(Data, State=#http_state{version=ClientVersion, +handle_head(Data, State=#http_state{socket=Socket, version=ClientVersion, content_handlers=Handlers0, connection=Conn, streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, method=Method, is_alive=IsAlive}|Tail]}) -> @@ -203,6 +207,44 @@ handle_head(Data, State=#http_state{version=ClientVersion, case {Status, StreamRef} of {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts); + {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> + case IsAlive of + false -> + ok; + true -> + ReplyTo ! {gun_response, self(), RealStreamRef, + fin, Status, Headers}, + ok + end, + %% We expect there to be no additional data after the CONNECT response. + <<>> = Rest2, + State2 = end_stream(State#http_state{streams=[Stream|Tail]}), + NewHost = maps:get(host, Destination), + NewPort = maps:get(port, Destination), + DestProtocol = maps:get(protocol, Destination, http), + case Destination of + #{transport := tls} -> + TLSOpts = maps:get(tls_opts, Destination, []), + TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity), + case gun_tls:connect(Socket, TLSOpts, TLSTimeout) of + {ok, TLSSocket} when DestProtocol =:= http2 -> + [{switch_transport, gun_tls, TLSSocket}, + {switch_protocol, gun_http2, State2}, + {origin, <<"https">>, NewHost, NewPort}]; + {ok, TLSSocket} -> + [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, + {switch_transport, gun_tls, TLSSocket}, + {origin, <<"https">>, NewHost, NewPort}]; + Error -> + Error + end; + _ when DestProtocol =:= http2 -> + [{switch_protocol, gun_http2, State2}, + {origin, <<"http">>, NewHost, NewPort}]; + _ -> + [{state, State2}, + {origin, <<"http">>, NewHost, NewPort}] + end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, handle(Rest2, State); @@ -211,7 +253,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, IsFin = case In of head -> fin; _ -> nofin end, Handlers = case IsAlive of false -> - ok; + undefined; true -> ReplyTo ! {gun_response, self(), stream_ref(StreamRef), IsFin, Status, Headers}, @@ -243,6 +285,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, end end. +stream_ref({connect, StreamRef, _}) -> StreamRef; stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. @@ -372,6 +415,40 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, error_stream_not_found(State, StreamRef, ReplyTo) end. +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, + State; +connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, + StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) -> + Host = case Host0 of + Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); + _ -> Host0 + end, + Port = maps:get(port, Destination, 1080), + Authority = [Host, $:, integer_to_binary(Port)], + Headers1 = lists:keydelete(<<"content-length">>, 1, + lists:keydelete(<<"transfer-encoding">>, 1, Headers0)), + Headers2 = case lists:keymember(<<"host">>, 1, Headers1) of + false -> [{<<"host">>, Authority}|Headers1]; + true -> Headers1 + end, + HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers2), + Headers3 = case {HasProxyAuthorization, Destination} of + {false, #{username := UserID, password := Password}} -> + [{<<"proxy-authorization">>, [ + <<"Basic ">>, + base64:encode(iolist_to_binary([UserID, $:, Password]))]} + |Headers2]; + _ -> + Headers2 + end, + Headers = transform_header_names(State, Headers3), + Transport:send(Socket, [ + cow_http:request(<<"CONNECT">>, Authority, Version, Headers) + ]), + new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). + %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State, StreamRef, ReplyTo) -> case is_stream(State, StreamRef) of @@ -384,6 +461,7 @@ cancel(State, StreamRef, ReplyTo) -> %% HTTP does not provide any way to figure out what streams are unprocessed. down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of + {connect, Ref2, _} -> Ref2; {websocket, Ref2, _, _, _} -> Ref2; _ -> Ref end || #stream{ref=Ref} <- Streams], diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 89deea4..edbc7c0 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -124,7 +124,7 @@ parse(Data0, State0=#http2_state{buffer=Buffer}) -> Error = {connection_error, _, _} -> terminate(State0, Error); more -> - State0#http2_state{buffer=Data} + {state, State0#http2_state{buffer=Data}} end. %% DATA frame. diff --git a/src/gun_tls.erl b/src/gun_tls.erl index 6d749aa..f58620f 100644 --- a/src/gun_tls.erl +++ b/src/gun_tls.erl @@ -15,6 +15,7 @@ -module(gun_tls). -export([messages/0]). +-export([connect/3]). -export([connect/4]). -export([send/2]). -export([setopts/2]). @@ -23,6 +24,11 @@ messages() -> {ssl, ssl_closed, ssl_error}. +-spec connect(inet:socket(), any(), timeout()) + -> {ok, ssl:sslsocket()} | {error, atom()}. +connect(Socket, Opts, Timeout) -> + ssl:connect(Socket, Opts, Timeout). + -spec connect(inet:ip_address() | inet:hostname(), inet:port_number(), any(), timeout()) -> {ok, ssl:sslsocket()} | {error, atom()}. diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 5b6962b..b89840e 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -69,7 +69,7 @@ name() -> ws. init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), - {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, + {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl new file mode 100644 index 0000000..8d2749b --- /dev/null +++ b/test/rfc7231_SUITE.erl @@ -0,0 +1,386 @@ +%% Copyright (c) 2018, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(rfc7231_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-ifdef(OTP_RELEASE). +-compile({nowarn_deprecated_function, [{ssl, ssl_accept, 2}]}). +-endif. + +-import(ct_helper, [doc/1]). + +all() -> + ct_helper:all(?MODULE). + +%% Proxy helpers. + +do_proxy_start() -> + do_proxy_start(200, []). + +do_proxy_start(Status) -> + do_proxy_start(Status, []). + +do_proxy_start(Status, ConnectRespHeaders) -> + Self = self(), + Pid = spawn_link(fun() -> do_proxy_init(Self, Status, ConnectRespHeaders) end), + Port = do_receive(Pid), + {ok, Pid, Port}. + +do_proxy_init(Parent, Status, ConnectRespHeaders) -> + {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, {_, Port}} = inet:sockname(ListenSocket), + Parent ! {self(), Port}, + {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 1000), + {ok, Data} = gen_tcp:recv(ClientSocket, 0, 1000), + {Method= <<"CONNECT">>, Authority, Version, Rest} = cow_http:parse_request_line(Data), + {Headers, <<>>} = cow_http:parse_headers(Rest), + Parent ! {self(), {request, Method, Authority, Version, Headers}}, + {OriginHost, OriginPort} = cow_http_hd:parse_host(Authority), + ok = gen_tcp:send(ClientSocket, [ + <<"HTTP/1.1 ">>, + integer_to_binary(Status), + <<" Reason phrase\r\n">>, + cow_http:headers(ConnectRespHeaders), + <<"\r\n">> + ]), + if + Status >= 200, Status < 300 -> + {ok, OriginSocket} = gen_tcp:connect( + binary_to_list(OriginHost), OriginPort, + [binary, {active, false}]), + inet:setopts(ClientSocket, [{active, true}]), + inet:setopts(OriginSocket, [{active, true}]), + do_proxy_loop(ClientSocket, OriginSocket); + true -> + %% We send a 501 to the subsequent request. + {ok, _} = gen_tcp:recv(ClientSocket, 0, 1000), + ok = gen_tcp:send(ClientSocket, << + "HTTP/1.1 501 Not Implemented\r\n" + "content-length: 0\r\n\r\n">>), + timer:sleep(2000) + end. + +do_proxy_loop(ClientSocket, OriginSocket) -> + receive + {tcp, ClientSocket, Data} -> + ok = gen_tcp:send(OriginSocket, Data), + do_proxy_loop(ClientSocket, OriginSocket); + {tcp, OriginSocket, Data} -> + ok = gen_tcp:send(ClientSocket, Data), + do_proxy_loop(ClientSocket, OriginSocket); + {tcp_closed, _} -> + ok; + Msg -> + error(Msg) + end. + +do_origin_start(Transport) -> + Self = self(), + Pid = spawn_link(fun() -> + case Transport of + tcp -> + do_origin_init_tcp(Self); + tls -> + do_origin_init_tls(Self) + end + end), + Port = do_receive(Pid), + {ok, Pid, Port}. + +do_origin_init_tcp(Parent) -> + {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), + {ok, {_, Port}} = inet:sockname(ListenSocket), + Parent ! {self(), Port}, + {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 1000), + do_origin_loop(Parent, ClientSocket, gen_tcp). + +do_origin_init_tls(Parent) -> + Opts = ct_helper:get_certs_from_ets(), + {ok, ListenSocket} = ssl:listen(0, [binary, {active, false}|Opts]), + {ok, {_, Port}} = ssl:sockname(ListenSocket), + Parent ! {self(), Port}, + {ok, ClientSocket} = ssl:transport_accept(ListenSocket, 1000), + ok = ssl:ssl_accept(ClientSocket, 1000), + do_origin_loop(Parent, ClientSocket, ssl). + +do_origin_loop(Parent, ClientSocket, ClientTransport) -> + case ClientTransport:recv(ClientSocket, 0, 1000) of + {ok, Data} -> + Parent ! {self(), Data}, + do_origin_loop(Parent, ClientSocket, ClientTransport); + {error, closed} -> + ok + end. + +do_receive(Pid) -> + receive + {Pid, Msg} -> + Msg + after 1000 -> + error(timeout) + end. + +%% Tests. + +connect_http(_) -> + doc("CONNECT can be used to establish a TCP connection " + "to an HTTP/1.1 server via an HTTP proxy. (RFC7231 4.3.6)"), + do_connect_http(tcp). + +connect_https(_) -> + doc("CONNECT can be used to establish a TLS connection " + "to an HTTP/1.1 server via an HTTP proxy. (RFC7231 4.3.6)"), + do_connect_http(tls). + +do_connect_http(Transport) -> + {ok, OriginPid, OriginPort} = do_origin_start(Transport), + {ok, ProxyPid, ProxyPort} = do_proxy_start(), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + transport => Transport + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + _ = gun:get(ConnPid, "/proxied"), + Len = byte_size(Authority), + <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>> + = do_receive(OriginPid), + gun:close(ConnPid). + +connect_h2c(_) -> + doc("CONNECT can be used to establish a TCP connection " + "to an HTTP/2 server via an HTTP proxy. (RFC7231 4.3.6)"), + do_connect_h2(tcp). + +connect_h2(_) -> + doc("CONNECT can be used to establish a TLS connection " + "to an HTTP/2 server via an HTTP proxy. (RFC7231 4.3.6)"), + do_connect_h2(tls). + +do_connect_h2(Transport) -> + {ok, OriginPid, OriginPort} = do_origin_start(Transport), + {ok, ProxyPid, ProxyPort} = do_proxy_start(), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + transport => Transport, + protocol => http2 + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + _ = gun:get(ConnPid, "/proxied"), + <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n", + Len:24, 4:8, 0:40, %% SETTINGS + Rest/bits>> = do_receive(OriginPid), + <<_:Len/binary>> = Rest, + <<_:24, 1:8, _/bits>> = do_receive(OriginPid), + gun:close(ConnPid). + +connect_through_multiple_proxies(_) -> + doc("CONNECT can be used to establish a TCP connection " + "to an HTTP/1.1 server via a tunnel going through " + "two separate HTTP proxies. (RFC7231 4.3.6)"), + {ok, OriginPid, OriginPort} = do_origin_start(tcp), + {ok, Proxy1Pid, Proxy1Port} = do_proxy_start(), + {ok, Proxy2Pid, Proxy2Port} = do_proxy_start(), + {ok, ConnPid} = gun:open("localhost", Proxy1Port), + {ok, http} = gun:await_up(ConnPid), + Authority1 = iolist_to_binary(["localhost:", integer_to_binary(Proxy2Port)]), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => Proxy2Port + }), + {request, <<"CONNECT">>, Authority1, 'HTTP/1.1', _} = do_receive(Proxy1Pid), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef1), + Authority2 = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + StreamRef2 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority2, 'HTTP/1.1', _} = do_receive(Proxy2Pid), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef2), + _ = gun:get(ConnPid, "/proxied"), + Len = byte_size(Authority2), + <<"GET /proxied HTTP/1.1\r\nhost: ", Authority2:Len/binary, "\r\n", _/bits>> + = do_receive(OriginPid), + gun:close(ConnPid). + +connect_response_201(_) -> + doc("2xx responses to CONNECT requests indicate " + "the tunnel was set up successfully. (RFC7231 4.3.6)"), + {ok, OriginPid, OriginPort} = do_origin_start(tcp), + {ok, ProxyPid, ProxyPort} = do_proxy_start(201), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {response, fin, 201, _} = gun:await(ConnPid, StreamRef), + _ = gun:get(ConnPid, "/proxied"), + Len = byte_size(Authority), + <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>> + = do_receive(OriginPid), + gun:close(ConnPid). + +connect_response_302(_) -> + doc("3xx responses to CONNECT requests indicate " + "the tunnel was not set up. (RFC7231 4.3.6)"), + do_connect_failure(302). + +connect_response_403(_) -> + doc("4xx responses to CONNECT requests indicate " + "the tunnel was not set up. (RFC7231 4.3.6)"), + do_connect_failure(403). + +connect_response_500(_) -> + doc("5xx responses to CONNECT requests indicate " + "the tunnel was not set up. (RFC7231 4.3.6)"), + do_connect_failure(500). + +do_connect_failure(Status) -> + OriginPort = 33333, %% Doesn't matter because we won't try to connect. + Headers = [{<<"content-length">>, <<"0">>}], + {ok, ProxyPid, ProxyPort} = do_proxy_start(Status, Headers), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {response, fin, Status, Headers} = gun:await(ConnPid, StreamRef), + FailedStreamRef = gun:get(ConnPid, "/proxied"), + {response, fin, 501, _} = gun:await(ConnPid, FailedStreamRef), + gun:close(ConnPid). + +connect_authority_form(_) -> + doc("CONNECT requests must use the authority-form. (RFC7231 4.3.6)"), + {ok, _OriginPid, OriginPort} = do_origin_start(tcp), + {ok, ProxyPid, ProxyPort} = do_proxy_start(), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + _StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {<<"localhost">>, OriginPort} = cow_http_hd:parse_host(Authority), + gun:close(ConnPid). + +connect_proxy_authorization(_) -> + doc("CONNECT requests may include a proxy-authorization header. (RFC7231 4.3.6)"), + {ok, _OriginPid, OriginPort} = do_origin_start(tcp), + {ok, ProxyPid, ProxyPort} = do_proxy_start(), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + _StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + username => "essen", + password => "myrealpasswordis" + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', Headers} = do_receive(ProxyPid), + {_, ProxyAuthorization} = lists:keyfind(<<"proxy-authorization">>, 1, Headers), + {basic, <<"essen">>, <<"myrealpasswordis">>} + = cow_http_hd:parse_proxy_authorization(ProxyAuthorization), + gun:close(ConnPid). + +connect_request_no_transfer_encoding(_) -> + doc("The payload for CONNECT requests has no defined semantics. " + "The transfer-encoding header should not be sent. (RFC7231 4.3.6)"), + {ok, _OriginPid, OriginPort} = do_origin_start(tcp), + {ok, ProxyPid, ProxyPort} = do_proxy_start(), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + _StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', Headers} = do_receive(ProxyPid), + false = lists:keyfind(<<"transfer-encoding">>, 1, Headers), + gun:close(ConnPid). + +connect_request_no_content_length(_) -> + doc("The payload for CONNECT requests has no defined semantics. " + "The content-length header should not be sent. (RFC7231 4.3.6)"), + {ok, _OriginPid, OriginPort} = do_origin_start(tcp), + {ok, ProxyPid, ProxyPort} = do_proxy_start(), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + _StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', Headers} = do_receive(ProxyPid), + false = lists:keyfind(<<"content-length">>, 1, Headers), + gun:close(ConnPid). + +connect_response_ignore_transfer_encoding(_) -> + doc("Clients must ignore transfer-encoding headers in responses " + "to CONNECT requests. (RFC7231 4.3.6)"), + {ok, OriginPid, OriginPort} = do_origin_start(tcp), + Headers = [{<<"transfer-encoding">>, <<"chunked">>}], + {ok, ProxyPid, ProxyPort} = do_proxy_start(200, Headers), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {response, fin, 200, Headers} = gun:await(ConnPid, StreamRef), + _ = gun:get(ConnPid, "/proxied"), + Len = byte_size(Authority), + <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>> + = do_receive(OriginPid), + gun:close(ConnPid). + +connect_response_ignore_content_length(_) -> + doc("Clients must ignore content-length headers in responses " + "to CONNECT requests. (RFC7231 4.3.6)"), + {ok, OriginPid, OriginPort} = do_origin_start(tcp), + Headers = [{<<"content-length">>, <<"1000">>}], + {ok, ProxyPid, ProxyPort} = do_proxy_start(200, Headers), + Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }), + {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid), + {response, fin, 200, Headers} = gun:await(ConnPid, StreamRef), + _ = gun:get(ConnPid, "/proxied"), + Len = byte_size(Authority), + <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>> + = do_receive(OriginPid), + gun:close(ConnPid). |