From d056e5fb2a1fbb54e108c5c61384573acf21b4cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 24 Aug 2020 17:06:23 +0200 Subject: Replace gun_tunnel_up/3 message with /4 variant Also fixes all the tests. Lots of work remain around protocols (how best to pass the base stream_ref to them? maybe the current solution, maybe a new argument to Protocol:init) and strengthen the concept of stream_ref, at least with its own type. --- src/gun.erl | 37 ++++++++++++++++++++++------------- src/gun_http.erl | 53 +++++++++++++++++++++++++++++++++++++++++++------- src/gun_http2.erl | 5 +++-- src/gun_raw.erl | 2 +- src/gun_socks.erl | 25 +++++++++++++++++++----- test/raw_SUITE.erl | 13 +++++++------ test/rfc7231_SUITE.erl | 11 ++++++++++- test/socks_SUITE.erl | 11 ++++++----- 8 files changed, 117 insertions(+), 40 deletions(-) diff --git a/src/gun.erl b/src/gun.erl index 2f92b95..38c479e 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -668,10 +668,10 @@ connect(ServerPid, Destination, Headers) -> -spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference(). connect(ServerPid, Destination, Headers, ReqOpts) -> - StreamRef = make_ref(), + Tunnel = get_tunnel(ReqOpts), + StreamRef = make_stream_ref(Tunnel), InitialFlow = maps:get(flow, ReqOpts, infinity), ReplyTo = maps:get(reply_to, ReqOpts, self()), - %% @todo tunnel gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}), StreamRef. @@ -808,8 +808,6 @@ await_up(ServerPid, Timeout, MRef) -> receive {gun_up, ServerPid, Protocol} -> {ok, Protocol}; - {gun_tunnel_up, ServerPid, Protocol} -> - {ok, Protocol}; {'DOWN', MRef, process, ServerPid, Reason} -> {error, {down, Reason}} after Timeout -> @@ -1092,8 +1090,17 @@ ensure_alpn_sni(Protocols0, TransOpts0, OriginHost) -> %% Normal TLS handshake. tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols, ReplyTo}, State0=#state{socket=Socket, transport=gun_tcp}) -> + StreamRef = maps:get(stream_ref, HandshakeEvent, undefined), case normal_tls_handshake(Socket, State0, HandshakeEvent, Protocols) of - {ok, TLSSocket, NewProtocol, State} -> + {ok, TLSSocket, NewProtocol0, State} -> + NewProtocol = {Protocol0, _} = case {StreamRef, NewProtocol0} of + {undefined, {_, _}} -> NewProtocol0; + {undefined, P} -> {P, #{}}; + {_, {P, POpts}} -> {P, POpts#{stream_ref => StreamRef}}; + {_, P} -> {P, #{stream_ref => StreamRef}} + end, + Protocol = gun:protocol_handler(Protocol0), + ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()}, commands([ {switch_transport, gun_tls, TLSSocket}, {switch_protocol, NewProtocol, ReplyTo} @@ -1120,10 +1127,19 @@ tls_handshake(internal, {tls_handshake, %% the handshake succeeded and whether we need to switch to a different protocol. tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, Protocols, ReplyTo}}, State0=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - NewProtocol = protocol_negotiated(Negotiated, Protocols), + NewProtocol0 = protocol_negotiated(Negotiated, Protocols), + StreamRef = maps:get(stream_ref, HandshakeEvent, undefined), + NewProtocol = {Protocol0, _} = case {StreamRef, NewProtocol0} of + {undefined, {_, _}} -> NewProtocol0; + {undefined, P} -> {P, #{}}; + {_, {P, POpts}} -> {P, POpts#{stream_ref => StreamRef}}; + {_, P} -> {P, #{stream_ref => StreamRef}} + end, + Protocol = gun:protocol_handler(Protocol0), + ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()}, EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ socket => Socket, - protocol => NewProtocol + protocol => Protocol:name() }, EvHandlerState0), commands([{switch_protocol, NewProtocol, ReplyTo}], State0#state{event_handler_state=EvHandlerState}); tls_handshake(info, {gun_tls_proxy, Socket, Error = {error, Reason}, {HandshakeEvent, _, _}}, @@ -1578,7 +1594,7 @@ commands([{switch_transport, Transport, Socket}|Tail], State=#state{ messages=Transport:messages(), protocol_state=ProtoState, event_handler_state=EvHandlerState})); commands([{switch_protocol, Protocol0, ReplyTo}], State0=#state{ - opts=Opts, socket=Socket, transport=Transport, protocol=CurrentProtocol, + opts=Opts, socket=Socket, transport=Transport, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Protocol, ProtoOpts} = case Protocol0 of {P, PO} -> {protocol_handler(P), PO}; @@ -1586,11 +1602,6 @@ commands([{switch_protocol, Protocol0, ReplyTo}], State0=#state{ Protocol1 = protocol_handler(P), {Protocol1, maps:get(Protocol1:opts_name(), Opts, #{})} end, - %% When we switch_protocol from socks we must send a gun_tunnel_up message. - _ = case CurrentProtocol of - gun_socks -> ReplyTo ! {gun_tunnel_up, self(), Protocol:name()}; - _ -> ok - end, {StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts), EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), %% We cancel the existing keepalive and, depending on the protocol, diff --git a/src/gun_http.erl b/src/gun_http.erl index bced64d..950bda1 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -313,14 +313,15 @@ handle_head(Data, State0=#http_state{streams=[#stream{ref=StreamRef, authority=A handle_connect(Rest, State=#http_state{ streams=[Stream=#stream{ref={_, StreamRef, Destination}, reply_to=ReplyTo}|Tail]}, EvHandler, EvHandlerState0, 'HTTP/1.1', Status, Headers) -> + RealStreamRef = stream_ref(State, StreamRef), %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. _ = case Stream of #stream{is_alive=false} -> ok; - _ -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), fin, Status, Headers} + _ -> ReplyTo ! {gun_response, self(), RealStreamRef, fin, Status, Headers} end, %% @todo Figure out whether the event should trigger if the stream was cancelled. EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -334,23 +335,27 @@ handle_connect(Rest, State=#http_state{ case Destination of #{transport := tls} -> HandshakeEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, tls_opts => maps:get(tls_opts, Destination, []), timeout => maps:get(tls_handshake_timeout, Destination, infinity) }, Protocols = maps:get(protocols, Destination, [http2, http]), -%% @todo gun_tunnel_up when the protocol switch is complete {handle_ret([ {origin, <<"https">>, NewHost, NewPort, connect}, {tls_handshake, HandshakeEvent, Protocols, ReplyTo} ], State), EvHandlerState1}; _ -> - [Protocol] = maps:get(protocols, Destination, [http]), -%% @todo gun_tunnel_up + [NewProtocol0] = maps:get(protocols, Destination, [http]), + NewProtocol = {Protocol0, _} = case NewProtocol0 of + {P, POpts} -> {P, POpts#{stream_ref => RealStreamRef}}; + P -> {P, #{stream_ref => RealStreamRef}} + end, + Protocol = gun:protocol_handler(Protocol0), + ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()}, {handle_ret([ {origin, <<"http">>, NewHost, NewPort, connect}, - {switch_protocol, Protocol, ReplyTo} + {switch_protocol, NewProtocol, ReplyTo} ], State), EvHandlerState1} end. @@ -537,6 +542,13 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, Ev keepalive(State, _, EvHandlerState) -> {State, EvHandlerState}. +headers(State, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers, InitialFlow, EvHandler, EvHandlerState) + when is_list(StreamRef) -> + %% Because we switch protocol we may receive a StreamRef as a list. + %% But we are always the final StreamRef as HTTP/1.1. + headers(State, lists:last(StreamRef), ReplyTo, Method, Host, Port, + Path, Headers, InitialFlow, EvHandler, EvHandlerState); headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, InitialFlow0, EvHandler, EvHandlerState0) -> @@ -547,6 +559,13 @@ headers(State=#http_state{opts=Opts, out=head}, {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, Authority, Path, InitialFlow), EvHandlerState}. +request(State, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState) + when is_list(StreamRef) -> + %% Because we switch protocol we may receive a StreamRef as a list. + %% But we are always the final StreamRef as HTTP/1.1. + request(State, lists:last(StreamRef), ReplyTo, Method, Host, Port, + Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState); request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow0, EvHandler, EvHandlerState0) -> @@ -634,6 +653,11 @@ transform_header_names(#http_state{opts=Opts}, Headers) -> Fun -> lists:keymap(Fun, 1, Headers) end. +data(State, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState) + when is_list(StreamRef) -> + %% Because we switch protocol we may receive a StreamRef as a list. + %% But we are always the final StreamRef as HTTP/1.1. + data(State, lists:last(StreamRef), ReplyTo, IsFin, Data, EvHandler, EvHandlerState); %% We are expecting a new stream. data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _, _, EvHandlerState) -> {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; @@ -689,6 +713,11 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. +connect(State, StreamRef, ReplyTo, Destination, TunnelInfo, Headers, InitialFlow) + when is_list(StreamRef) -> + %% Because we switch protocol we may receive a StreamRef as a list. + %% But we are always the final StreamRef as HTTP/1.1. + connect(State, lists:last(StreamRef), ReplyTo, Destination, TunnelInfo, Headers, InitialFlow); connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, @@ -725,6 +754,11 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow). +cancel(State, StreamRef, ReplyTo, EvHandler, EvHandlerState) + when is_list(StreamRef) -> + %% Because we switch protocol we may receive a StreamRef as a list. + %% But we are always the final StreamRef as HTTP/1.1. + cancel(State, lists:last(StreamRef), ReplyTo, EvHandler, EvHandlerState); %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> case is_stream(State0, StreamRef) of @@ -741,6 +775,11 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> {error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0} end. +stream_info(State, StreamRef) + when is_list(StreamRef) -> + %% Because we switch protocol we may receive a StreamRef as a list. + %% But we are always the final StreamRef as HTTP/1.1. + stream_info(State, lists:last(StreamRef)); stream_info(#http_state{streams=Streams}, StreamRef) -> case lists:keyfind(StreamRef, #stream.ref, Streams) of #stream{reply_to=ReplyTo, is_alive=IsAlive} -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 5b8d229..ad52555 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -371,13 +371,14 @@ tunnel_commands([{switch_protocol, Protocol0, ReplyTo}|Tail], Stream=#stream{ref gun_socks -> ReplyTo ! {gun_tunnel_up, self(), stream_ref(State, StreamRef), Protocol:name()}; _ -> ok end, + RealStreamRef = stream_ref(State, StreamRef), OriginSocket = #{ gun_pid => self(), reply_to => ReplyTo, - stream_ref => StreamRef + stream_ref => RealStreamRef }, OriginTransport = gun_tcp_proxy, - {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts), + {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts#{stream_ref => RealStreamRef}), %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), tunnel_commands([{state, ProtoState}|Tail], Stream, Protocol, TunnelInfo, State); tunnel_commands([{active, true}|Tail], Stream, Protocol, TunnelInfo, State) -> diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 369353e..840774a 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -57,7 +57,7 @@ close(_, _, _, EvHandlerState) -> EvHandlerState. %% @todo Initiate closing on IsFin=fin. -data(State=#raw_state{socket=Socket, transport=Transport}, undefined, +data(State=#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef, _ReplyTo, _IsFin, Data, _EvHandler, EvHandlerState) -> Transport:send(Socket, Data), {State, EvHandlerState}. diff --git a/src/gun_socks.erl b/src/gun_socks.erl index 5a42e3f..b7a08b2 100644 --- a/src/gun_socks.erl +++ b/src/gun_socks.erl @@ -26,6 +26,7 @@ %% @todo down -record(socks_state, { + ref :: undefined | reference(), %% @todo Need a proper stream_ref type. reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), @@ -84,6 +85,7 @@ opts_name() -> socks_opts. has_keepalive() -> false. init(ReplyTo, Socket, Transport, Opts) -> + StreamRef = maps:get(stream_ref, Opts, undefined), 5 = Version = maps:get(version, Opts, 5), Auth = maps:get(auth, Opts, [none]), Methods = < none -> <<0>> end || A <- Auth>>, Transport:send(Socket, [<<5, (length(Auth))>>, Methods]), - {connected_no_input, #socks_state{reply_to=ReplyTo, socket=Socket, transport=Transport, + {connected_no_input, #socks_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport, opts=Opts, version=Version, status=auth_method_select}}. switch_transport(Transport, Socket, State) -> @@ -120,7 +122,8 @@ handle(<<1, 0>>, State=#socks_state{version=5, status=auth_username_password}) - handle(<<1, _>>, #socks_state{version=5, status=auth_username_password}) -> {error, {socks5, username_password_auth_failure}}; %% Connect reply. -handle(<<5, 0, 0, Rest0/bits>>, #socks_state{reply_to=ReplyTo, opts=Opts, version=5, status=connect}) -> +handle(<<5, 0, 0, Rest0/bits>>, #socks_state{ref=StreamRef, reply_to=ReplyTo, opts=Opts, + version=5, status=connect}) -> %% @todo What to do with BoundAddr and BoundPort? Add as metadata to origin info? {_BoundAddr, _BoundPort} = case Rest0 of %% @todo Seen a server with <<1, 0:48>>. @@ -137,16 +140,28 @@ handle(<<5, 0, 0, Rest0/bits>>, #socks_state{reply_to=ReplyTo, opts=Opts, versio %% @todo The origin scheme is wrong when the next protocol is not HTTP. case Opts of #{transport := tls} -> - HandshakeEvent = #{ + HandshakeEvent0 = #{ tls_opts => maps:get(tls_opts, Opts, []), timeout => maps:get(tls_handshake_timeout, Opts, infinity) }, + HandshakeEvent = case StreamRef of + undefined -> HandshakeEvent0; + _ -> HandshakeEvent0#{stream_ref => StreamRef} + end, [{origin, <<"https">>, NewHost, NewPort, socks5}, {tls_handshake, HandshakeEvent, maps:get(protocols, Opts, [http2, http]), ReplyTo}]; _ -> - [Protocol] = maps:get(protocols, Opts, [http]), + [NewProtocol0] = maps:get(protocols, Opts, [http]), + NewProtocol = {Protocol0, _} = case {StreamRef, NewProtocol0} of + {undefined, {_, _}} -> NewProtocol0; + {undefined, P} -> {P, #{}}; + {_, {P, POpts}} -> {P, POpts#{stream_ref => StreamRef}}; + {_, P} -> {P, #{stream_ref => StreamRef}} + end, + Protocol = gun:protocol_handler(Protocol0), + ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()}, [{origin, <<"http">>, NewHost, NewPort, socks5}, - {switch_protocol, Protocol, ReplyTo}] + {switch_protocol, NewProtocol, ReplyTo}] end; handle(<<5, Error, _/bits>>, #socks_state{version=5, status=connect}) -> Reason = case Error of diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl index bd34ebb..d909dce 100644 --- a/test/raw_SUITE.erl +++ b/test/raw_SUITE.erl @@ -87,7 +87,7 @@ do_socks5_raw(OriginTransport, ProxyTransport) -> }), %% We receive a gun_up and a gun_tunnel_up. {ok, socks} = gun:await_up(ConnPid), - {ok, raw} = gun:await_up(ConnPid), + {up, raw} = gun:await(ConnPid, undefined), %% The proxy received two packets. {auth_methods, 1, [none]} = receive_from(ProxyPid), {connect, <<"localhost">>, OriginPort} = receive_from(ProxyPid), @@ -141,9 +141,9 @@ do_connect_raw(OriginTransport, ProxyTransport) -> {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), {response, fin, 200, _} = gun:await(ConnPid, StreamRef), %% @todo Why fin? handshake_completed = receive_from(OriginPid), - %% When we take over the entire connection there is no stream reference. - gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), - {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined), + {up, raw} = gun:await(ConnPid, StreamRef), + gun:data(ConnPid, StreamRef, nofin, <<"Hello world!">>), + {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, StreamRef), #{ transport := OriginTransport, protocol := raw, @@ -166,8 +166,9 @@ connect_raw_reply_to(_) -> ReplyTo = spawn(fun() -> {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end, {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + {up, raw} = gun:await(ConnPid, StreamRef), Self ! {self(), ready}, - {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined), + {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, StreamRef), Self ! {self(), ok} end), {ok, OriginPid, OriginPort} = init_origin(tcp, raw, fun do_echo/3), @@ -183,7 +184,7 @@ connect_raw_reply_to(_) -> {request, <<"CONNECT">>, _, 'HTTP/1.1', _} = receive_from(ProxyPid), handshake_completed = receive_from(OriginPid), receive {ReplyTo, ready} -> ok after 1000 -> error(timeout) end, - gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), + gun:data(ConnPid, StreamRef, nofin, <<"Hello world!">>), receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end. http11_upgrade_raw_tcp(_) -> diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl index c2f3da6..5ef37bd 100644 --- a/test/rfc7231_SUITE.erl +++ b/test/rfc7231_SUITE.erl @@ -168,7 +168,9 @@ do_connect_http(OriginScheme, OriginTransport, ProxyTransport) -> }), {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + %% @todo Do we still need these handshake_completed messages? handshake_completed = receive_from(OriginPid), + {up, http} = gun:await(ConnPid, StreamRef), _ = gun:get(ConnPid, "/proxied"), Data = receive_from(OriginPid), Lines = binary:split(Data, <<"\r\n">>, [global]), @@ -223,6 +225,7 @@ do_connect_h2(OriginScheme, OriginTransport, ProxyTransport) -> {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), {response, fin, 200, _} = gun:await(ConnPid, StreamRef), handshake_completed = receive_from(OriginPid), + {up, http2} = gun:await(ConnPid, StreamRef), _ = gun:get(ConnPid, "/proxied"), <<_:24, 1:8, _/bits>> = receive_from(OriginPid), #{ @@ -268,15 +271,17 @@ do_connect_through_multiple_proxies(OriginScheme, OriginTransport, ProxiesTransp }), {request, <<"CONNECT">>, Authority1, 'HTTP/1.1', _} = receive_from(Proxy1Pid), {response, fin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, http} = gun:await(ConnPid, StreamRef1), Authority2 = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]), StreamRef2 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, transport => OriginTransport - }), + }, [], #{tunnel => StreamRef1}), {request, <<"CONNECT">>, Authority2, 'HTTP/1.1', _} = receive_from(Proxy2Pid), {response, fin, 200, _} = gun:await(ConnPid, StreamRef2), handshake_completed = receive_from(OriginPid), + {up, http} = gun:await(ConnPid, StreamRef2), _ = gun:get(ConnPid, "/proxied"), Data = receive_from(OriginPid), Lines = binary:split(Data, <<"\r\n">>, [global]), @@ -317,6 +322,7 @@ connect_delay(_) -> {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid, 3000), {response, fin, 201, _} = gun:await(ConnPid, StreamRef), handshake_completed = receive_from(OriginPid), + {up, http} = gun:await(ConnPid, StreamRef), _ = gun:get(ConnPid, "/proxied"), Data = receive_from(OriginPid), Lines = binary:split(Data, <<"\r\n">>, [global]), @@ -351,6 +357,7 @@ connect_response_201(_) -> {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), {response, fin, 201, _} = gun:await(ConnPid, StreamRef), handshake_completed = receive_from(OriginPid), + {up, http} = gun:await(ConnPid, StreamRef), _ = gun:get(ConnPid, "/proxied"), Data = receive_from(OriginPid), Lines = binary:split(Data, <<"\r\n">>, [global]), @@ -492,6 +499,7 @@ connect_response_ignore_transfer_encoding(_) -> {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), {response, fin, 200, Headers} = gun:await(ConnPid, StreamRef), handshake_completed = receive_from(OriginPid), + {up, http} = gun:await(ConnPid, StreamRef), _ = gun:get(ConnPid, "/proxied"), Data = receive_from(OriginPid), Lines = binary:split(Data, <<"\r\n">>, [global]), @@ -514,6 +522,7 @@ connect_response_ignore_content_length(_) -> {request, <<"CONNECT">>, Authority, 'HTTP/1.1', _} = receive_from(ProxyPid), {response, fin, 200, Headers} = gun:await(ConnPid, StreamRef), handshake_completed = receive_from(OriginPid), + {up, http} = gun:await(ConnPid, StreamRef), _ = gun:get(ConnPid, "/proxied"), Data = receive_from(OriginPid), Lines = binary:split(Data, <<"\r\n">>, [global]), diff --git a/test/socks_SUITE.erl b/test/socks_SUITE.erl index 436ea3f..3b8c822 100644 --- a/test/socks_SUITE.erl +++ b/test/socks_SUITE.erl @@ -224,7 +224,7 @@ do_socks5(OriginScheme, OriginTransport, OriginProtocol, ProxyTransport, SocksAu }), %% We receive a gun_up and a gun_tunnel_up. {ok, socks} = gun:await_up(ConnPid), - {ok, OriginProtocol} = gun:await_up(ConnPid), + {up, OriginProtocol} = gun:await(ConnPid, undefined), %% The proxy received two packets. AuthMethod = do_auth_method(SocksAuth), {auth_methods, 1, [AuthMethod]} = receive_from(ProxyPid), @@ -302,8 +302,8 @@ do_socks5_through_multiple_proxies(OriginScheme, OriginTransport, ProxyTransport }), %% We receive a gun_up and two gun_tunnel_up. {ok, socks} = gun:await_up(ConnPid), - {ok, socks} = gun:await_up(ConnPid), - {ok, http} = gun:await_up(ConnPid), + {up, socks} = gun:await(ConnPid, undefined), + {up, http} = gun:await(ConnPid, undefined), %% The first proxy received two packets. {auth_methods, 1, [none]} = receive_from(Proxy1Pid), {connect, <<"localhost">>, Proxy2Port} = receive_from(Proxy1Pid), @@ -382,8 +382,9 @@ do_socks5_through_connect_proxy(OriginScheme, OriginTransport, ProxyTransport) - }), {request, <<"CONNECT">>, Authority1, 'HTTP/1.1', _} = receive_from(Proxy1Pid), {response, fin, 200, _} = gun:await(ConnPid, StreamRef), - %% We receive a gun_tunnel_up afterwards. This is the origin HTTP server. - {ok, http} = gun:await_up(ConnPid), + %% We receive two gun_tunnel_up messages. First the SOCKS server and then the origin HTTP server. + {up, socks} = gun:await(ConnPid, StreamRef), + {up, http} = gun:await(ConnPid, StreamRef), %% The second proxy receives a Socks5 auth/connect request. {auth_methods, 1, [none]} = receive_from(Proxy2Pid), {connect, <<"localhost">>, OriginPort} = receive_from(Proxy2Pid), -- cgit v1.2.3