aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-09-17 11:08:29 +0200
committerLoïc Hoguin <[email protected]>2018-09-17 11:46:12 +0200
commit8eedc18067d6c2919972ff41a5bccc6d3d72b0ac (patch)
tree99943a49f16763edf9b0ad3363d8ab5bb646e8d6
parent0dd581dbf801306f27f00bc73f04da003cd65039 (diff)
downloadgun-8eedc18067d6c2919972ff41a5bccc6d3d72b0ac.tar.gz
gun-8eedc18067d6c2919972ff41a5bccc6d3d72b0ac.tar.bz2
gun-8eedc18067d6c2919972ff41a5bccc6d3d72b0ac.zip
Add HTTP/1.1 CONNECT support
Gun can now be used to connect through TCP HTTP/1.1 proxies using all supported protocols. It is also possible to create a tunnel through multiple proxies. Also updates Cowlib to 2.6.0.
-rw-r--r--Makefile2
-rw-r--r--rebar.config2
-rw-r--r--src/gun.erl100
-rw-r--r--src/gun_http.erl112
-rw-r--r--src/gun_http2.erl2
-rw-r--r--src/gun_tls.erl6
-rw-r--r--src/gun_ws.erl2
-rw-r--r--test/rfc7231_SUITE.erl386
8 files changed, 576 insertions, 36 deletions
diff --git a/Makefile b/Makefile
index 157dce3..2aa1bde 100644
--- a/Makefile
+++ b/Makefile
@@ -13,7 +13,7 @@ CT_OPTS += -pa test -ct_hooks gun_ct_hook [] # -boot start_sasl
LOCAL_DEPS = ssl
DEPS = cowlib
-dep_cowlib = git https://github.com/ninenines/cowlib 2.5.1
+dep_cowlib = git https://github.com/ninenines/cowlib 2.6.0
DOC_DEPS = asciideck
diff --git a/rebar.config b/rebar.config
index 4614141..347417f 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,4 +1,4 @@
{deps, [
-{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.5.1"}}
+{cowlib,".*",{git,"https://github.com/ninenines/cowlib","2.6.0"}}
]}.
{erl_opts, [debug_info,warn_export_vars,warn_shadow_vars,warn_obsolete_guard]}.
diff --git a/src/gun.erl b/src/gun.erl
index 115f603..af576fb 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -55,6 +55,11 @@
%% Streaming data.
-export([data/4]).
+%% Tunneling.
+-export([connect/2]).
+-export([connect/3]).
+-export([connect/4]).
+
%% Awaiting gun messages.
-export([await/2]).
-export([await/3]).
@@ -110,6 +115,22 @@
-export_type([opts/0]).
%% @todo Add an option to disable/enable the notowner behavior.
+-type connect_destination() :: #{
+ host := inet:hostname() | inet:ip_address(),
+ port := inet:port_number(),
+ username => iodata(),
+ password => iodata(),
+ protocol => http | http2,
+ transport => tcp | tls,
+ tls_opts => [ssl:connect_option()],
+ tls_handshake_timeout => timeout()
+}.
+-export_type([connect_destination/0]).
+
+%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here
+%% to indicate that the request must be sent on an existing CONNECT stream.
+%% This is of course not required for HTTP/1.1 since the CONNECT takes over
+%% the entire connection.
-type req_opts() :: #{
reply_to => pid()
}.
@@ -137,8 +158,10 @@
parent :: pid(),
owner :: pid(),
owner_ref :: reference(),
- host :: inet:hostname(),
+ host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
+ origin_host :: inet:hostname() | inet:ip_address(),
+ origin_port :: inet:port_number(),
opts :: opts(),
keepalive_ref :: undefined | reference(),
socket :: undefined | inet:socket() | ssl:sslsocket(),
@@ -358,6 +381,7 @@ request(ServerPid, Method, Path, Headers) ->
request(ServerPid, Method, Path, Headers, Body) ->
request(ServerPid, Method, Path, Headers, Body, #{}).
+%% @todo Accept header names as maps.
-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference().
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_ref(),
@@ -372,6 +396,23 @@ data(ServerPid, StreamRef, IsFin, Data) ->
_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
ok.
+%% Tunneling.
+
+-spec connect(pid(), connect_destination()) -> reference().
+connect(ServerPid, Destination) ->
+ connect(ServerPid, Destination, [], #{}).
+
+-spec connect(pid(), connect_destination(), headers()) -> reference().
+connect(ServerPid, Destination, Headers) ->
+ connect(ServerPid, Destination, Headers, #{}).
+
+-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference().
+connect(ServerPid, Destination, Headers, ReqOpts) ->
+ StreamRef = make_ref(),
+ ReplyTo = maps:get(reply_to, ReqOpts, self()),
+ _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers},
+ StreamRef.
+
%% Awaiting gun messages.
%% @todo spec await await_body
@@ -565,6 +606,8 @@ ws_upgrade(ServerPid, Path, Headers, Opts) ->
_ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
StreamRef.
+%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
+%% But it can be kept for the time being since it can still work for HTTP/1.1.
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
_ = ServerPid ! {ws_send, self(), Frames},
@@ -601,13 +644,14 @@ init(Parent, Owner, Host, Port, Opts) ->
tls -> gun_tls
end,
OwnerRef = monitor(process, Owner),
- connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
- host=Host, port=Port, opts=Opts, transport=Transport}, Retry).
+ transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
+ host=Host, port=Port, origin_host=Host, origin_port=Port,
+ opts=Opts, transport=Transport}, Retry).
default_transport(443) -> tls;
default_transport(_) -> tcp.
-connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
+transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
Protocols = [case P of
http -> <<"http/1.1">>;
http2 -> <<"h2">>
@@ -626,7 +670,7 @@ connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tl
{error, Reason} ->
retry(State#state{last_error=Reason}, Retries)
end;
-connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
+transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
TransportOpts = [binary, {active, false}
|maps:get(transport_opts, Opts, [])],
case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
@@ -670,7 +714,7 @@ retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) ->
_ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry),
receive
retry ->
- connect(State, Retries);
+ transport_connect(State, Retries);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
{retry_loop, State, Retries})
@@ -690,20 +734,18 @@ before_loop(State=#state{opts=Opts, protocol=Protocol}) ->
end,
loop(State#state{keepalive_ref=KeepaliveRef}).
-loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, port=Port, opts=Opts,
- socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
+loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
+ origin_host=Host, origin_port=Port, opts=Opts, socket=Socket,
+ transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
{OK, Closed, Error} = Transport:messages(),
Transport:setopts(Socket, [{active, once}]),
receive
{OK, Socket, Data} ->
case Protocol:handle(Data, ProtoState) of
- close ->
- Transport:close(Socket),
- down(State, normal);
- {upgrade, Protocol2, ProtoState2} ->
- ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2});
- ProtoState2 ->
- loop(State#state{protocol_state=ProtoState2})
+ Commands when is_list(Commands) ->
+ commands(Commands, State);
+ Command ->
+ commands([Command], State)
end;
{Closed, Socket} ->
Protocol:close(ProtoState),
@@ -736,6 +778,9 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por
ProtoState2 = Protocol:data(ProtoState,
StreamRef, ReplyTo, IsFin, Data),
loop(State#state{protocol_state=ProtoState2});
+ {connect, ReplyTo, StreamRef, Destination, Headers} ->
+ ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
+ loop(State#state{protocol_state=ProtoState2});
{cancel, ReplyTo, StreamRef} ->
ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
loop(State#state{protocol_state=ProtoState2});
@@ -786,6 +831,31 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por
loop(State)
end.
+commands([], State) ->
+ loop(State);
+commands([close|_], State=#state{socket=Socket, transport=Transport}) ->
+ Transport:close(Socket),
+ down(State, normal);
+commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) ->
+ Transport:close(Socket),
+ down(State, Error);
+commands([{state, ProtoState}|Tail], State) ->
+ commands(Tail, State#state{protocol_state=ProtoState});
+%% @todo The scheme should probably not be ignored.
+commands([{origin, _Scheme, Host, Port}|Tail], State) ->
+ commands(Tail, State#state{origin_host=Host, origin_port=Port});
+commands([{switch_transport, Transport, Socket}|Tail], State) ->
+ commands(Tail, State#state{socket=Socket, transport=Transport});
+%% @todo The two loops should be reunified and this clause generalized.
+commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) ->
+ ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState});
+%% @todo And this state should probably not be ignored.
+commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
+ State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) ->
+ ProtoOpts = maps:get(http2_opts, Opts, #{}),
+ ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
+ commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}).
+
ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket,
transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
{OK, Closed, Error} = Transport:messages(),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index d07502e..ee9d04f 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -23,6 +23,7 @@
-export([request/8]).
-export([request/9]).
-export([data/5]).
+-export([connect/5]).
-export([cancel/3]).
-export([down/1]).
-export([ws_upgrade/7]).
@@ -30,10 +31,13 @@
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer.
%% @todo Make that a record.
+-type connect_info() :: {connect, reference(), gun:connect_destination()}.
+
+%% @todo Make that a record.
-type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options
-record(stream, {
- ref :: reference() | websocket_info(),
+ ref :: reference() | connect_info() | websocket_info(),
reply_to :: pid(),
method :: binary(),
is_alive :: boolean(),
@@ -87,7 +91,7 @@ init(Owner, Socket, Transport, Opts) ->
%% Stop looping when we got no more data.
handle(<<>>, State) ->
- State;
+ {state, State};
%% Close when server responds and we don't have any open streams.
handle(_, #http_state{streams=[]}) ->
close;
@@ -95,33 +99,33 @@ handle(_, #http_state{streams=[]}) ->
handle(Data, State=#http_state{in=head, buffer=Buffer}) ->
Data2 = << Buffer/binary, Data/binary >>,
case binary:match(Data2, <<"\r\n\r\n">>) of
- nomatch -> State#http_state{buffer=Data2};
+ nomatch -> {state, State#http_state{buffer=Data2}};
{_, _} -> handle_head(Data2, State#http_state{buffer= <<>>})
end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}) ->
- send_data_if_alive(Data, State, nofin);
+ {state, send_data_if_alive(Data, State, nofin)};
%% Chunked transfer-encoding may contain both data and trailers.
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
buffer=Buffer, connection=Conn}) ->
Buffer2 = << Buffer/binary, Data/binary >>,
case cow_http_te:stream_chunked(Buffer2, InState) of
more ->
- State#http_state{buffer=Buffer2};
+ {state, State#http_state{buffer=Buffer2}};
{more, Data2, InState2} ->
- send_data_if_alive(Data2,
+ {state, send_data_if_alive(Data2,
State#http_state{buffer= <<>>, in_state=InState2},
- nofin);
+ nofin)};
{more, Data2, Length, InState2} when is_integer(Length) ->
%% @todo See if we can recv faster than one message at a time.
- send_data_if_alive(Data2,
+ {state, send_data_if_alive(Data2,
State#http_state{buffer= <<>>, in_state=InState2},
- nofin);
+ nofin)};
{more, Data2, Rest, InState2} ->
%% @todo See if we can recv faster than one message at a time.
- send_data_if_alive(Data2,
+ {state, send_data_if_alive(Data2,
State#http_state{buffer=Rest, in_state=InState2},
- nofin);
+ nofin)};
{done, HasTrailers, Rest} ->
IsFin = case HasTrailers of
trailers -> nofin;
@@ -156,7 +160,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) ->
Data2 = << Buffer/binary, Data/binary >>,
case binary:match(Data2, <<"\r\n\r\n">>) of
- nomatch -> State#http_state{buffer=Data2};
+ nomatch -> {state, State#http_state{buffer=Data2}};
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
@@ -174,14 +178,14 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
if
%% More data coming.
DataSize < Length ->
- send_data_if_alive(Data,
+ {state, send_data_if_alive(Data,
State#http_state{in={body, Length - DataSize}},
- nofin);
+ nofin)};
%% Stream finished, no rest.
DataSize =:= Length ->
State1 = send_data_if_alive(Data, State, fin),
case Conn of
- keepalive -> end_stream(State1);
+ keepalive -> {state, end_stream(State1)};
close -> close
end;
%% Stream finished, rest.
@@ -194,7 +198,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
end
end.
-handle_head(Data, State=#http_state{version=ClientVersion,
+handle_head(Data, State=#http_state{socket=Socket, version=ClientVersion,
content_handlers=Handlers0, connection=Conn,
streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
method=Method, is_alive=IsAlive}|Tail]}) ->
@@ -203,6 +207,44 @@ handle_head(Data, State=#http_state{version=ClientVersion,
case {Status, StreamRef} of
{101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts);
+ {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
+ case IsAlive of
+ false ->
+ ok;
+ true ->
+ ReplyTo ! {gun_response, self(), RealStreamRef,
+ fin, Status, Headers},
+ ok
+ end,
+ %% We expect there to be no additional data after the CONNECT response.
+ <<>> = Rest2,
+ State2 = end_stream(State#http_state{streams=[Stream|Tail]}),
+ NewHost = maps:get(host, Destination),
+ NewPort = maps:get(port, Destination),
+ DestProtocol = maps:get(protocol, Destination, http),
+ case Destination of
+ #{transport := tls} ->
+ TLSOpts = maps:get(tls_opts, Destination, []),
+ TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity),
+ case gun_tls:connect(Socket, TLSOpts, TLSTimeout) of
+ {ok, TLSSocket} when DestProtocol =:= http2 ->
+ [{switch_transport, gun_tls, TLSSocket},
+ {switch_protocol, gun_http2, State2},
+ {origin, <<"https">>, NewHost, NewPort}];
+ {ok, TLSSocket} ->
+ [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}},
+ {switch_transport, gun_tls, TLSSocket},
+ {origin, <<"https">>, NewHost, NewPort}];
+ Error ->
+ Error
+ end;
+ _ when DestProtocol =:= http2 ->
+ [{switch_protocol, gun_http2, State2},
+ {origin, <<"http">>, NewHost, NewPort}];
+ _ ->
+ [{state, State2},
+ {origin, <<"http">>, NewHost, NewPort}]
+ end;
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
handle(Rest2, State);
@@ -211,7 +253,7 @@ handle_head(Data, State=#http_state{version=ClientVersion,
IsFin = case In of head -> fin; _ -> nofin end,
Handlers = case IsAlive of
false ->
- ok;
+ undefined;
true ->
ReplyTo ! {gun_response, self(), stream_ref(StreamRef),
IsFin, Status, Headers},
@@ -243,6 +285,7 @@ handle_head(Data, State=#http_state{version=ClientVersion,
end
end.
+stream_ref({connect, StreamRef, _}) -> StreamRef;
stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.
@@ -372,6 +415,40 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
error_stream_not_found(State, StreamRef, ReplyTo)
end.
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ "CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
+ State;
+connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
+ StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) ->
+ Host = case Host0 of
+ Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
+ _ -> Host0
+ end,
+ Port = maps:get(port, Destination, 1080),
+ Authority = [Host, $:, integer_to_binary(Port)],
+ Headers1 = lists:keydelete(<<"content-length">>, 1,
+ lists:keydelete(<<"transfer-encoding">>, 1, Headers0)),
+ Headers2 = case lists:keymember(<<"host">>, 1, Headers1) of
+ false -> [{<<"host">>, Authority}|Headers1];
+ true -> Headers1
+ end,
+ HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers2),
+ Headers3 = case {HasProxyAuthorization, Destination} of
+ {false, #{username := UserID, password := Password}} ->
+ [{<<"proxy-authorization">>, [
+ <<"Basic ">>,
+ base64:encode(iolist_to_binary([UserID, $:, Password]))]}
+ |Headers2];
+ _ ->
+ Headers2
+ end,
+ Headers = transform_header_names(State, Headers3),
+ Transport:send(Socket, [
+ cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
+ ]),
+ new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>).
+
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State, StreamRef, ReplyTo) ->
case is_stream(State, StreamRef) of
@@ -384,6 +461,7 @@ cancel(State, StreamRef, ReplyTo) ->
%% HTTP does not provide any way to figure out what streams are unprocessed.
down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
+ {connect, Ref2, _} -> Ref2;
{websocket, Ref2, _, _, _} -> Ref2;
_ -> Ref
end || #stream{ref=Ref} <- Streams],
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 89deea4..edbc7c0 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -124,7 +124,7 @@ parse(Data0, State0=#http2_state{buffer=Buffer}) ->
Error = {connection_error, _, _} ->
terminate(State0, Error);
more ->
- State0#http2_state{buffer=Data}
+ {state, State0#http2_state{buffer=Data}}
end.
%% DATA frame.
diff --git a/src/gun_tls.erl b/src/gun_tls.erl
index 6d749aa..f58620f 100644
--- a/src/gun_tls.erl
+++ b/src/gun_tls.erl
@@ -15,6 +15,7 @@
-module(gun_tls).
-export([messages/0]).
+-export([connect/3]).
-export([connect/4]).
-export([send/2]).
-export([setopts/2]).
@@ -23,6 +24,11 @@
messages() -> {ssl, ssl_closed, ssl_error}.
+-spec connect(inet:socket(), any(), timeout())
+ -> {ok, ssl:sslsocket()} | {error, atom()}.
+connect(Socket, Opts, Timeout) ->
+ ssl:connect(Socket, Opts, Timeout).
+
-spec connect(inet:ip_address() | inet:hostname(),
inet:port_number(), any(), timeout())
-> {ok, ssl:sslsocket()} | {error, atom()}.
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 5b6962b..b89840e 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -69,7 +69,7 @@ name() -> ws.
init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
- {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
+ {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl
new file mode 100644
index 0000000..8d2749b
--- /dev/null
+++ b/test/rfc7231_SUITE.erl
@@ -0,0 +1,386 @@
+%% Copyright (c) 2018, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(rfc7231_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-ifdef(OTP_RELEASE).
+-compile({nowarn_deprecated_function, [{ssl, ssl_accept, 2}]}).
+-endif.
+
+-import(ct_helper, [doc/1]).
+
+all() ->
+ ct_helper:all(?MODULE).
+
+%% Proxy helpers.
+
+do_proxy_start() ->
+ do_proxy_start(200, []).
+
+do_proxy_start(Status) ->
+ do_proxy_start(Status, []).
+
+do_proxy_start(Status, ConnectRespHeaders) ->
+ Self = self(),
+ Pid = spawn_link(fun() -> do_proxy_init(Self, Status, ConnectRespHeaders) end),
+ Port = do_receive(Pid),
+ {ok, Pid, Port}.
+
+do_proxy_init(Parent, Status, ConnectRespHeaders) ->
+ {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
+ {ok, {_, Port}} = inet:sockname(ListenSocket),
+ Parent ! {self(), Port},
+ {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 1000),
+ {ok, Data} = gen_tcp:recv(ClientSocket, 0, 1000),
+ {Method= <<"CONNECT">>, Authority, Version, Rest} = cow_http:parse_request_line(Data),
+ {Headers, <<>>} = cow_http:parse_headers(Rest),
+ Parent ! {self(), {request, Method, Authority, Version, Headers}},
+ {OriginHost, OriginPort} = cow_http_hd:parse_host(Authority),
+ ok = gen_tcp:send(ClientSocket, [
+ <<"HTTP/1.1 ">>,
+ integer_to_binary(Status),
+ <<" Reason phrase\r\n">>,
+ cow_http:headers(ConnectRespHeaders),
+ <<"\r\n">>
+ ]),
+ if
+ Status >= 200, Status < 300 ->
+ {ok, OriginSocket} = gen_tcp:connect(
+ binary_to_list(OriginHost), OriginPort,
+ [binary, {active, false}]),
+ inet:setopts(ClientSocket, [{active, true}]),
+ inet:setopts(OriginSocket, [{active, true}]),
+ do_proxy_loop(ClientSocket, OriginSocket);
+ true ->
+ %% We send a 501 to the subsequent request.
+ {ok, _} = gen_tcp:recv(ClientSocket, 0, 1000),
+ ok = gen_tcp:send(ClientSocket, <<
+ "HTTP/1.1 501 Not Implemented\r\n"
+ "content-length: 0\r\n\r\n">>),
+ timer:sleep(2000)
+ end.
+
+do_proxy_loop(ClientSocket, OriginSocket) ->
+ receive
+ {tcp, ClientSocket, Data} ->
+ ok = gen_tcp:send(OriginSocket, Data),
+ do_proxy_loop(ClientSocket, OriginSocket);
+ {tcp, OriginSocket, Data} ->
+ ok = gen_tcp:send(ClientSocket, Data),
+ do_proxy_loop(ClientSocket, OriginSocket);
+ {tcp_closed, _} ->
+ ok;
+ Msg ->
+ error(Msg)
+ end.
+
+do_origin_start(Transport) ->
+ Self = self(),
+ Pid = spawn_link(fun() ->
+ case Transport of
+ tcp ->
+ do_origin_init_tcp(Self);
+ tls ->
+ do_origin_init_tls(Self)
+ end
+ end),
+ Port = do_receive(Pid),
+ {ok, Pid, Port}.
+
+do_origin_init_tcp(Parent) ->
+ {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]),
+ {ok, {_, Port}} = inet:sockname(ListenSocket),
+ Parent ! {self(), Port},
+ {ok, ClientSocket} = gen_tcp:accept(ListenSocket, 1000),
+ do_origin_loop(Parent, ClientSocket, gen_tcp).
+
+do_origin_init_tls(Parent) ->
+ Opts = ct_helper:get_certs_from_ets(),
+ {ok, ListenSocket} = ssl:listen(0, [binary, {active, false}|Opts]),
+ {ok, {_, Port}} = ssl:sockname(ListenSocket),
+ Parent ! {self(), Port},
+ {ok, ClientSocket} = ssl:transport_accept(ListenSocket, 1000),
+ ok = ssl:ssl_accept(ClientSocket, 1000),
+ do_origin_loop(Parent, ClientSocket, ssl).
+
+do_origin_loop(Parent, ClientSocket, ClientTransport) ->
+ case ClientTransport:recv(ClientSocket, 0, 1000) of
+ {ok, Data} ->
+ Parent ! {self(), Data},
+ do_origin_loop(Parent, ClientSocket, ClientTransport);
+ {error, closed} ->
+ ok
+ end.
+
+do_receive(Pid) ->
+ receive
+ {Pid, Msg} ->
+ Msg
+ after 1000 ->
+ error(timeout)
+ end.
+
+%% Tests.
+
+connect_http(_) ->
+ doc("CONNECT can be used to establish a TCP connection "
+ "to an HTTP/1.1 server via an HTTP proxy. (RFC7231 4.3.6)"),
+ do_connect_http(tcp).
+
+connect_https(_) ->
+ doc("CONNECT can be used to establish a TLS connection "
+ "to an HTTP/1.1 server via an HTTP proxy. (RFC7231 4.3.6)"),
+ do_connect_http(tls).
+
+do_connect_http(Transport) ->
+ {ok, OriginPid, OriginPort} = do_origin_start(Transport),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ transport => Transport
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ _ = gun:get(ConnPid, "/proxied"),
+ Len = byte_size(Authority),
+ <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>>
+ = do_receive(OriginPid),
+ gun:close(ConnPid).
+
+connect_h2c(_) ->
+ doc("CONNECT can be used to establish a TCP connection "
+ "to an HTTP/2 server via an HTTP proxy. (RFC7231 4.3.6)"),
+ do_connect_h2(tcp).
+
+connect_h2(_) ->
+ doc("CONNECT can be used to establish a TLS connection "
+ "to an HTTP/2 server via an HTTP proxy. (RFC7231 4.3.6)"),
+ do_connect_h2(tls).
+
+do_connect_h2(Transport) ->
+ {ok, OriginPid, OriginPort} = do_origin_start(Transport),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ transport => Transport,
+ protocol => http2
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ _ = gun:get(ConnPid, "/proxied"),
+ <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n",
+ Len:24, 4:8, 0:40, %% SETTINGS
+ Rest/bits>> = do_receive(OriginPid),
+ <<_:Len/binary>> = Rest,
+ <<_:24, 1:8, _/bits>> = do_receive(OriginPid),
+ gun:close(ConnPid).
+
+connect_through_multiple_proxies(_) ->
+ doc("CONNECT can be used to establish a TCP connection "
+ "to an HTTP/1.1 server via a tunnel going through "
+ "two separate HTTP proxies. (RFC7231 4.3.6)"),
+ {ok, OriginPid, OriginPort} = do_origin_start(tcp),
+ {ok, Proxy1Pid, Proxy1Port} = do_proxy_start(),
+ {ok, Proxy2Pid, Proxy2Port} = do_proxy_start(),
+ {ok, ConnPid} = gun:open("localhost", Proxy1Port),
+ {ok, http} = gun:await_up(ConnPid),
+ Authority1 = iolist_to_binary(["localhost:", integer_to_binary(Proxy2Port)]),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => Proxy2Port
+ }),
+ {request, <<"CONNECT">>, Authority1, 'HTTP/1.1', _} = do_receive(Proxy1Pid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef1),
+ Authority2 = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ StreamRef2 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority2, 'HTTP/1.1', _} = do_receive(Proxy2Pid),
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef2),
+ _ = gun:get(ConnPid, "/proxied"),
+ Len = byte_size(Authority2),
+ <<"GET /proxied HTTP/1.1\r\nhost: ", Authority2:Len/binary, "\r\n", _/bits>>
+ = do_receive(OriginPid),
+ gun:close(ConnPid).
+
+connect_response_201(_) ->
+ doc("2xx responses to CONNECT requests indicate "
+ "the tunnel was set up successfully. (RFC7231 4.3.6)"),
+ {ok, OriginPid, OriginPort} = do_origin_start(tcp),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(201),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {response, fin, 201, _} = gun:await(ConnPid, StreamRef),
+ _ = gun:get(ConnPid, "/proxied"),
+ Len = byte_size(Authority),
+ <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>>
+ = do_receive(OriginPid),
+ gun:close(ConnPid).
+
+connect_response_302(_) ->
+ doc("3xx responses to CONNECT requests indicate "
+ "the tunnel was not set up. (RFC7231 4.3.6)"),
+ do_connect_failure(302).
+
+connect_response_403(_) ->
+ doc("4xx responses to CONNECT requests indicate "
+ "the tunnel was not set up. (RFC7231 4.3.6)"),
+ do_connect_failure(403).
+
+connect_response_500(_) ->
+ doc("5xx responses to CONNECT requests indicate "
+ "the tunnel was not set up. (RFC7231 4.3.6)"),
+ do_connect_failure(500).
+
+do_connect_failure(Status) ->
+ OriginPort = 33333, %% Doesn't matter because we won't try to connect.
+ Headers = [{<<"content-length">>, <<"0">>}],
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Status, Headers),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {response, fin, Status, Headers} = gun:await(ConnPid, StreamRef),
+ FailedStreamRef = gun:get(ConnPid, "/proxied"),
+ {response, fin, 501, _} = gun:await(ConnPid, FailedStreamRef),
+ gun:close(ConnPid).
+
+connect_authority_form(_) ->
+ doc("CONNECT requests must use the authority-form. (RFC7231 4.3.6)"),
+ {ok, _OriginPid, OriginPort} = do_origin_start(tcp),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ _StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {<<"localhost">>, OriginPort} = cow_http_hd:parse_host(Authority),
+ gun:close(ConnPid).
+
+connect_proxy_authorization(_) ->
+ doc("CONNECT requests may include a proxy-authorization header. (RFC7231 4.3.6)"),
+ {ok, _OriginPid, OriginPort} = do_origin_start(tcp),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ _StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ username => "essen",
+ password => "myrealpasswordis"
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', Headers} = do_receive(ProxyPid),
+ {_, ProxyAuthorization} = lists:keyfind(<<"proxy-authorization">>, 1, Headers),
+ {basic, <<"essen">>, <<"myrealpasswordis">>}
+ = cow_http_hd:parse_proxy_authorization(ProxyAuthorization),
+ gun:close(ConnPid).
+
+connect_request_no_transfer_encoding(_) ->
+ doc("The payload for CONNECT requests has no defined semantics. "
+ "The transfer-encoding header should not be sent. (RFC7231 4.3.6)"),
+ {ok, _OriginPid, OriginPort} = do_origin_start(tcp),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ _StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', Headers} = do_receive(ProxyPid),
+ false = lists:keyfind(<<"transfer-encoding">>, 1, Headers),
+ gun:close(ConnPid).
+
+connect_request_no_content_length(_) ->
+ doc("The payload for CONNECT requests has no defined semantics. "
+ "The content-length header should not be sent. (RFC7231 4.3.6)"),
+ {ok, _OriginPid, OriginPort} = do_origin_start(tcp),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ _StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', Headers} = do_receive(ProxyPid),
+ false = lists:keyfind(<<"content-length">>, 1, Headers),
+ gun:close(ConnPid).
+
+connect_response_ignore_transfer_encoding(_) ->
+ doc("Clients must ignore transfer-encoding headers in responses "
+ "to CONNECT requests. (RFC7231 4.3.6)"),
+ {ok, OriginPid, OriginPort} = do_origin_start(tcp),
+ Headers = [{<<"transfer-encoding">>, <<"chunked">>}],
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(200, Headers),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {response, fin, 200, Headers} = gun:await(ConnPid, StreamRef),
+ _ = gun:get(ConnPid, "/proxied"),
+ Len = byte_size(Authority),
+ <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>>
+ = do_receive(OriginPid),
+ gun:close(ConnPid).
+
+connect_response_ignore_content_length(_) ->
+ doc("Clients must ignore content-length headers in responses "
+ "to CONNECT requests. (RFC7231 4.3.6)"),
+ {ok, OriginPid, OriginPort} = do_origin_start(tcp),
+ Headers = [{<<"content-length">>, <<"1000">>}],
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(200, Headers),
+ Authority = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }),
+ {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = do_receive(ProxyPid),
+ {response, fin, 200, Headers} = gun:await(ConnPid, StreamRef),
+ _ = gun:get(ConnPid, "/proxied"),
+ Len = byte_size(Authority),
+ <<"GET /proxied HTTP/1.1\r\nhost: ", Authority:Len/binary, "\r\n", _/bits>>
+ = do_receive(OriginPid),
+ gun:close(ConnPid).