From 8eedc18067d6c2919972ff41a5bccc6d3d72b0ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 17 Sep 2018 11:08:29 +0200 Subject: Add HTTP/1.1 CONNECT support Gun can now be used to connect through TCP HTTP/1.1 proxies using all supported protocols. It is also possible to create a tunnel through multiple proxies. Also updates Cowlib to 2.6.0. --- src/gun_http.erl | 112 ++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 95 insertions(+), 17 deletions(-) (limited to 'src/gun_http.erl') diff --git a/src/gun_http.erl b/src/gun_http.erl index d07502e..ee9d04f 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -23,17 +23,21 @@ -export([request/8]). -export([request/9]). -export([data/5]). +-export([connect/5]). -export([cancel/3]). -export([down/1]). -export([ws_upgrade/7]). -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer. +%% @todo Make that a record. +-type connect_info() :: {connect, reference(), gun:connect_destination()}. + %% @todo Make that a record. -type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options -record(stream, { - ref :: reference() | websocket_info(), + ref :: reference() | connect_info() | websocket_info(), reply_to :: pid(), method :: binary(), is_alive :: boolean(), @@ -87,7 +91,7 @@ init(Owner, Socket, Transport, Opts) -> %% Stop looping when we got no more data. handle(<<>>, State) -> - State; + {state, State}; %% Close when server responds and we don't have any open streams. handle(_, #http_state{streams=[]}) -> close; @@ -95,33 +99,33 @@ handle(_, #http_state{streams=[]}) -> handle(Data, State=#http_state{in=head, buffer=Buffer}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> State#http_state{buffer=Data2}; + nomatch -> {state, State#http_state{buffer=Data2}}; {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}) end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}) -> - send_data_if_alive(Data, State, nofin); + {state, send_data_if_alive(Data, State, nofin)}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, connection=Conn}) -> Buffer2 = << Buffer/binary, Data/binary >>, case cow_http_te:stream_chunked(Buffer2, InState) of more -> - State#http_state{buffer=Buffer2}; + {state, State#http_state{buffer=Buffer2}}; {more, Data2, InState2} -> - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin); + nofin)}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin); + nofin)}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer=Rest, in_state=InState2}, - nofin); + nofin)}; {done, HasTrailers, Rest} -> IsFin = case HasTrailers of trailers -> nofin; @@ -156,7 +160,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> State#http_state{buffer=Data2}; + nomatch -> {state, State#http_state{buffer=Data2}}; {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? @@ -174,14 +178,14 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> if %% More data coming. DataSize < Length -> - send_data_if_alive(Data, + {state, send_data_if_alive(Data, State#http_state{in={body, Length - DataSize}}, - nofin); + nofin)}; %% Stream finished, no rest. DataSize =:= Length -> State1 = send_data_if_alive(Data, State, fin), case Conn of - keepalive -> end_stream(State1); + keepalive -> {state, end_stream(State1)}; close -> close end; %% Stream finished, rest. @@ -194,7 +198,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> end end. -handle_head(Data, State=#http_state{version=ClientVersion, +handle_head(Data, State=#http_state{socket=Socket, version=ClientVersion, content_handlers=Handlers0, connection=Conn, streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, method=Method, is_alive=IsAlive}|Tail]}) -> @@ -203,6 +207,44 @@ handle_head(Data, State=#http_state{version=ClientVersion, case {Status, StreamRef} of {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts); + {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> + case IsAlive of + false -> + ok; + true -> + ReplyTo ! {gun_response, self(), RealStreamRef, + fin, Status, Headers}, + ok + end, + %% We expect there to be no additional data after the CONNECT response. + <<>> = Rest2, + State2 = end_stream(State#http_state{streams=[Stream|Tail]}), + NewHost = maps:get(host, Destination), + NewPort = maps:get(port, Destination), + DestProtocol = maps:get(protocol, Destination, http), + case Destination of + #{transport := tls} -> + TLSOpts = maps:get(tls_opts, Destination, []), + TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity), + case gun_tls:connect(Socket, TLSOpts, TLSTimeout) of + {ok, TLSSocket} when DestProtocol =:= http2 -> + [{switch_transport, gun_tls, TLSSocket}, + {switch_protocol, gun_http2, State2}, + {origin, <<"https">>, NewHost, NewPort}]; + {ok, TLSSocket} -> + [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, + {switch_transport, gun_tls, TLSSocket}, + {origin, <<"https">>, NewHost, NewPort}]; + Error -> + Error + end; + _ when DestProtocol =:= http2 -> + [{switch_protocol, gun_http2, State2}, + {origin, <<"http">>, NewHost, NewPort}]; + _ -> + [{state, State2}, + {origin, <<"http">>, NewHost, NewPort}] + end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, handle(Rest2, State); @@ -211,7 +253,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, IsFin = case In of head -> fin; _ -> nofin end, Handlers = case IsAlive of false -> - ok; + undefined; true -> ReplyTo ! {gun_response, self(), stream_ref(StreamRef), IsFin, Status, Headers}, @@ -243,6 +285,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, end end. +stream_ref({connect, StreamRef, _}) -> StreamRef; stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. @@ -372,6 +415,40 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, error_stream_not_found(State, StreamRef, ReplyTo) end. +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, + State; +connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, + StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) -> + Host = case Host0 of + Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); + _ -> Host0 + end, + Port = maps:get(port, Destination, 1080), + Authority = [Host, $:, integer_to_binary(Port)], + Headers1 = lists:keydelete(<<"content-length">>, 1, + lists:keydelete(<<"transfer-encoding">>, 1, Headers0)), + Headers2 = case lists:keymember(<<"host">>, 1, Headers1) of + false -> [{<<"host">>, Authority}|Headers1]; + true -> Headers1 + end, + HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers2), + Headers3 = case {HasProxyAuthorization, Destination} of + {false, #{username := UserID, password := Password}} -> + [{<<"proxy-authorization">>, [ + <<"Basic ">>, + base64:encode(iolist_to_binary([UserID, $:, Password]))]} + |Headers2]; + _ -> + Headers2 + end, + Headers = transform_header_names(State, Headers3), + Transport:send(Socket, [ + cow_http:request(<<"CONNECT">>, Authority, Version, Headers) + ]), + new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). + %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State, StreamRef, ReplyTo) -> case is_stream(State, StreamRef) of @@ -384,6 +461,7 @@ cancel(State, StreamRef, ReplyTo) -> %% HTTP does not provide any way to figure out what streams are unprocessed. down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of + {connect, Ref2, _} -> Ref2; {websocket, Ref2, _, _, _} -> Ref2; _ -> Ref end || #stream{ref=Ref} <- Streams], -- cgit v1.2.3