aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-09-17 11:08:29 +0200
committerLoïc Hoguin <[email protected]>2018-09-17 11:46:12 +0200
commit8eedc18067d6c2919972ff41a5bccc6d3d72b0ac (patch)
tree99943a49f16763edf9b0ad3363d8ab5bb646e8d6 /src
parent0dd581dbf801306f27f00bc73f04da003cd65039 (diff)
downloadgun-8eedc18067d6c2919972ff41a5bccc6d3d72b0ac.tar.gz
gun-8eedc18067d6c2919972ff41a5bccc6d3d72b0ac.tar.bz2
gun-8eedc18067d6c2919972ff41a5bccc6d3d72b0ac.zip
Add HTTP/1.1 CONNECT support
Gun can now be used to connect through TCP HTTP/1.1 proxies using all supported protocols. It is also possible to create a tunnel through multiple proxies. Also updates Cowlib to 2.6.0.
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl100
-rw-r--r--src/gun_http.erl112
-rw-r--r--src/gun_http2.erl2
-rw-r--r--src/gun_tls.erl6
-rw-r--r--src/gun_ws.erl2
5 files changed, 188 insertions, 34 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 115f603..af576fb 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -55,6 +55,11 @@
%% Streaming data.
-export([data/4]).
+%% Tunneling.
+-export([connect/2]).
+-export([connect/3]).
+-export([connect/4]).
+
%% Awaiting gun messages.
-export([await/2]).
-export([await/3]).
@@ -110,6 +115,22 @@
-export_type([opts/0]).
%% @todo Add an option to disable/enable the notowner behavior.
+-type connect_destination() :: #{
+ host := inet:hostname() | inet:ip_address(),
+ port := inet:port_number(),
+ username => iodata(),
+ password => iodata(),
+ protocol => http | http2,
+ transport => tcp | tls,
+ tls_opts => [ssl:connect_option()],
+ tls_handshake_timeout => timeout()
+}.
+-export_type([connect_destination/0]).
+
+%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here
+%% to indicate that the request must be sent on an existing CONNECT stream.
+%% This is of course not required for HTTP/1.1 since the CONNECT takes over
+%% the entire connection.
-type req_opts() :: #{
reply_to => pid()
}.
@@ -137,8 +158,10 @@
parent :: pid(),
owner :: pid(),
owner_ref :: reference(),
- host :: inet:hostname(),
+ host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
+ origin_host :: inet:hostname() | inet:ip_address(),
+ origin_port :: inet:port_number(),
opts :: opts(),
keepalive_ref :: undefined | reference(),
socket :: undefined | inet:socket() | ssl:sslsocket(),
@@ -358,6 +381,7 @@ request(ServerPid, Method, Path, Headers) ->
request(ServerPid, Method, Path, Headers, Body) ->
request(ServerPid, Method, Path, Headers, Body, #{}).
+%% @todo Accept header names as maps.
-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference().
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_ref(),
@@ -372,6 +396,23 @@ data(ServerPid, StreamRef, IsFin, Data) ->
_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
ok.
+%% Tunneling.
+
+-spec connect(pid(), connect_destination()) -> reference().
+connect(ServerPid, Destination) ->
+ connect(ServerPid, Destination, [], #{}).
+
+-spec connect(pid(), connect_destination(), headers()) -> reference().
+connect(ServerPid, Destination, Headers) ->
+ connect(ServerPid, Destination, Headers, #{}).
+
+-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference().
+connect(ServerPid, Destination, Headers, ReqOpts) ->
+ StreamRef = make_ref(),
+ ReplyTo = maps:get(reply_to, ReqOpts, self()),
+ _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers},
+ StreamRef.
+
%% Awaiting gun messages.
%% @todo spec await await_body
@@ -565,6 +606,8 @@ ws_upgrade(ServerPid, Path, Headers, Opts) ->
_ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
StreamRef.
+%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
+%% But it can be kept for the time being since it can still work for HTTP/1.1.
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
_ = ServerPid ! {ws_send, self(), Frames},
@@ -601,13 +644,14 @@ init(Parent, Owner, Host, Port, Opts) ->
tls -> gun_tls
end,
OwnerRef = monitor(process, Owner),
- connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
- host=Host, port=Port, opts=Opts, transport=Transport}, Retry).
+ transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
+ host=Host, port=Port, origin_host=Host, origin_port=Port,
+ opts=Opts, transport=Transport}, Retry).
default_transport(443) -> tls;
default_transport(_) -> tcp.
-connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
+transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
Protocols = [case P of
http -> <<"http/1.1">>;
http2 -> <<"h2">>
@@ -626,7 +670,7 @@ connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tl
{error, Reason} ->
retry(State#state{last_error=Reason}, Retries)
end;
-connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
+transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
TransportOpts = [binary, {active, false}
|maps:get(transport_opts, Opts, [])],
case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
@@ -670,7 +714,7 @@ retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) ->
_ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry),
receive
retry ->
- connect(State, Retries);
+ transport_connect(State, Retries);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
{retry_loop, State, Retries})
@@ -690,20 +734,18 @@ before_loop(State=#state{opts=Opts, protocol=Protocol}) ->
end,
loop(State#state{keepalive_ref=KeepaliveRef}).
-loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, port=Port, opts=Opts,
- socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
+loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
+ origin_host=Host, origin_port=Port, opts=Opts, socket=Socket,
+ transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
{OK, Closed, Error} = Transport:messages(),
Transport:setopts(Socket, [{active, once}]),
receive
{OK, Socket, Data} ->
case Protocol:handle(Data, ProtoState) of
- close ->
- Transport:close(Socket),
- down(State, normal);
- {upgrade, Protocol2, ProtoState2} ->
- ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2});
- ProtoState2 ->
- loop(State#state{protocol_state=ProtoState2})
+ Commands when is_list(Commands) ->
+ commands(Commands, State);
+ Command ->
+ commands([Command], State)
end;
{Closed, Socket} ->
Protocol:close(ProtoState),
@@ -736,6 +778,9 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por
ProtoState2 = Protocol:data(ProtoState,
StreamRef, ReplyTo, IsFin, Data),
loop(State#state{protocol_state=ProtoState2});
+ {connect, ReplyTo, StreamRef, Destination, Headers} ->
+ ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
+ loop(State#state{protocol_state=ProtoState2});
{cancel, ReplyTo, StreamRef} ->
ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
loop(State#state{protocol_state=ProtoState2});
@@ -786,6 +831,31 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por
loop(State)
end.
+commands([], State) ->
+ loop(State);
+commands([close|_], State=#state{socket=Socket, transport=Transport}) ->
+ Transport:close(Socket),
+ down(State, normal);
+commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) ->
+ Transport:close(Socket),
+ down(State, Error);
+commands([{state, ProtoState}|Tail], State) ->
+ commands(Tail, State#state{protocol_state=ProtoState});
+%% @todo The scheme should probably not be ignored.
+commands([{origin, _Scheme, Host, Port}|Tail], State) ->
+ commands(Tail, State#state{origin_host=Host, origin_port=Port});
+commands([{switch_transport, Transport, Socket}|Tail], State) ->
+ commands(Tail, State#state{socket=Socket, transport=Transport});
+%% @todo The two loops should be reunified and this clause generalized.
+commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) ->
+ ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState});
+%% @todo And this state should probably not be ignored.
+commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
+ State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) ->
+ ProtoOpts = maps:get(http2_opts, Opts, #{}),
+ ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
+ commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}).
+
ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket,
transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
{OK, Closed, Error} = Transport:messages(),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index d07502e..ee9d04f 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -23,6 +23,7 @@
-export([request/8]).
-export([request/9]).
-export([data/5]).
+-export([connect/5]).
-export([cancel/3]).
-export([down/1]).
-export([ws_upgrade/7]).
@@ -30,10 +31,13 @@
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer.
%% @todo Make that a record.
+-type connect_info() :: {connect, reference(), gun:connect_destination()}.
+
+%% @todo Make that a record.
-type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options
-record(stream, {
- ref :: reference() | websocket_info(),
+ ref :: reference() | connect_info() | websocket_info(),
reply_to :: pid(),
method :: binary(),
is_alive :: boolean(),
@@ -87,7 +91,7 @@ init(Owner, Socket, Transport, Opts) ->
%% Stop looping when we got no more data.
handle(<<>>, State) ->
- State;
+ {state, State};
%% Close when server responds and we don't have any open streams.
handle(_, #http_state{streams=[]}) ->
close;
@@ -95,33 +99,33 @@ handle(_, #http_state{streams=[]}) ->
handle(Data, State=#http_state{in=head, buffer=Buffer}) ->
Data2 = << Buffer/binary, Data/binary >>,
case binary:match(Data2, <<"\r\n\r\n">>) of
- nomatch -> State#http_state{buffer=Data2};
+ nomatch -> {state, State#http_state{buffer=Data2}};
{_, _} -> handle_head(Data2, State#http_state{buffer= <<>>})
end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}) ->
- send_data_if_alive(Data, State, nofin);
+ {state, send_data_if_alive(Data, State, nofin)};
%% Chunked transfer-encoding may contain both data and trailers.
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
buffer=Buffer, connection=Conn}) ->
Buffer2 = << Buffer/binary, Data/binary >>,
case cow_http_te:stream_chunked(Buffer2, InState) of
more ->
- State#http_state{buffer=Buffer2};
+ {state, State#http_state{buffer=Buffer2}};
{more, Data2, InState2} ->
- send_data_if_alive(Data2,
+ {state, send_data_if_alive(Data2,
State#http_state{buffer= <<>>, in_state=InState2},
- nofin);
+ nofin)};
{more, Data2, Length, InState2} when is_integer(Length) ->
%% @todo See if we can recv faster than one message at a time.
- send_data_if_alive(Data2,
+ {state, send_data_if_alive(Data2,
State#http_state{buffer= <<>>, in_state=InState2},
- nofin);
+ nofin)};
{more, Data2, Rest, InState2} ->
%% @todo See if we can recv faster than one message at a time.
- send_data_if_alive(Data2,
+ {state, send_data_if_alive(Data2,
State#http_state{buffer=Rest, in_state=InState2},
- nofin);
+ nofin)};
{done, HasTrailers, Rest} ->
IsFin = case HasTrailers of
trailers -> nofin;
@@ -156,7 +160,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) ->
Data2 = << Buffer/binary, Data/binary >>,
case binary:match(Data2, <<"\r\n\r\n">>) of
- nomatch -> State#http_state{buffer=Data2};
+ nomatch -> {state, State#http_state{buffer=Data2}};
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
@@ -174,14 +178,14 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
if
%% More data coming.
DataSize < Length ->
- send_data_if_alive(Data,
+ {state, send_data_if_alive(Data,
State#http_state{in={body, Length - DataSize}},
- nofin);
+ nofin)};
%% Stream finished, no rest.
DataSize =:= Length ->
State1 = send_data_if_alive(Data, State, fin),
case Conn of
- keepalive -> end_stream(State1);
+ keepalive -> {state, end_stream(State1)};
close -> close
end;
%% Stream finished, rest.
@@ -194,7 +198,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
end
end.
-handle_head(Data, State=#http_state{version=ClientVersion,
+handle_head(Data, State=#http_state{socket=Socket, version=ClientVersion,
content_handlers=Handlers0, connection=Conn,
streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
method=Method, is_alive=IsAlive}|Tail]}) ->
@@ -203,6 +207,44 @@ handle_head(Data, State=#http_state{version=ClientVersion,
case {Status, StreamRef} of
{101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts);
+ {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
+ case IsAlive of
+ false ->
+ ok;
+ true ->
+ ReplyTo ! {gun_response, self(), RealStreamRef,
+ fin, Status, Headers},
+ ok
+ end,
+ %% We expect there to be no additional data after the CONNECT response.
+ <<>> = Rest2,
+ State2 = end_stream(State#http_state{streams=[Stream|Tail]}),
+ NewHost = maps:get(host, Destination),
+ NewPort = maps:get(port, Destination),
+ DestProtocol = maps:get(protocol, Destination, http),
+ case Destination of
+ #{transport := tls} ->
+ TLSOpts = maps:get(tls_opts, Destination, []),
+ TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity),
+ case gun_tls:connect(Socket, TLSOpts, TLSTimeout) of
+ {ok, TLSSocket} when DestProtocol =:= http2 ->
+ [{switch_transport, gun_tls, TLSSocket},
+ {switch_protocol, gun_http2, State2},
+ {origin, <<"https">>, NewHost, NewPort}];
+ {ok, TLSSocket} ->
+ [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}},
+ {switch_transport, gun_tls, TLSSocket},
+ {origin, <<"https">>, NewHost, NewPort}];
+ Error ->
+ Error
+ end;
+ _ when DestProtocol =:= http2 ->
+ [{switch_protocol, gun_http2, State2},
+ {origin, <<"http">>, NewHost, NewPort}];
+ _ ->
+ [{state, State2},
+ {origin, <<"http">>, NewHost, NewPort}]
+ end;
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
handle(Rest2, State);
@@ -211,7 +253,7 @@ handle_head(Data, State=#http_state{version=ClientVersion,
IsFin = case In of head -> fin; _ -> nofin end,
Handlers = case IsAlive of
false ->
- ok;
+ undefined;
true ->
ReplyTo ! {gun_response, self(), stream_ref(StreamRef),
IsFin, Status, Headers},
@@ -243,6 +285,7 @@ handle_head(Data, State=#http_state{version=ClientVersion,
end
end.
+stream_ref({connect, StreamRef, _}) -> StreamRef;
stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.
@@ -372,6 +415,40 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
error_stream_not_found(State, StreamRef, ReplyTo)
end.
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ "CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
+ State;
+connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
+ StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) ->
+ Host = case Host0 of
+ Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
+ _ -> Host0
+ end,
+ Port = maps:get(port, Destination, 1080),
+ Authority = [Host, $:, integer_to_binary(Port)],
+ Headers1 = lists:keydelete(<<"content-length">>, 1,
+ lists:keydelete(<<"transfer-encoding">>, 1, Headers0)),
+ Headers2 = case lists:keymember(<<"host">>, 1, Headers1) of
+ false -> [{<<"host">>, Authority}|Headers1];
+ true -> Headers1
+ end,
+ HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers2),
+ Headers3 = case {HasProxyAuthorization, Destination} of
+ {false, #{username := UserID, password := Password}} ->
+ [{<<"proxy-authorization">>, [
+ <<"Basic ">>,
+ base64:encode(iolist_to_binary([UserID, $:, Password]))]}
+ |Headers2];
+ _ ->
+ Headers2
+ end,
+ Headers = transform_header_names(State, Headers3),
+ Transport:send(Socket, [
+ cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
+ ]),
+ new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>).
+
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State, StreamRef, ReplyTo) ->
case is_stream(State, StreamRef) of
@@ -384,6 +461,7 @@ cancel(State, StreamRef, ReplyTo) ->
%% HTTP does not provide any way to figure out what streams are unprocessed.
down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
+ {connect, Ref2, _} -> Ref2;
{websocket, Ref2, _, _, _} -> Ref2;
_ -> Ref
end || #stream{ref=Ref} <- Streams],
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 89deea4..edbc7c0 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -124,7 +124,7 @@ parse(Data0, State0=#http2_state{buffer=Buffer}) ->
Error = {connection_error, _, _} ->
terminate(State0, Error);
more ->
- State0#http2_state{buffer=Data}
+ {state, State0#http2_state{buffer=Data}}
end.
%% DATA frame.
diff --git a/src/gun_tls.erl b/src/gun_tls.erl
index 6d749aa..f58620f 100644
--- a/src/gun_tls.erl
+++ b/src/gun_tls.erl
@@ -15,6 +15,7 @@
-module(gun_tls).
-export([messages/0]).
+-export([connect/3]).
-export([connect/4]).
-export([send/2]).
-export([setopts/2]).
@@ -23,6 +24,11 @@
messages() -> {ssl, ssl_closed, ssl_error}.
+-spec connect(inet:socket(), any(), timeout())
+ -> {ok, ssl:sslsocket()} | {error, atom()}.
+connect(Socket, Opts, Timeout) ->
+ ssl:connect(Socket, Opts, Timeout).
+
-spec connect(inet:ip_address() | inet:hostname(),
inet:port_number(), any(), timeout())
-> {ok, ssl:sslsocket()} | {error, atom()}.
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 5b6962b..b89840e 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -69,7 +69,7 @@ name() -> ws.
init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
- {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
+ {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.