aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-08-24 17:06:23 +0200
committerLoïc Hoguin <[email protected]>2020-09-21 15:51:57 +0200
commitd056e5fb2a1fbb54e108c5c61384573acf21b4cf (patch)
treeccf9131d63fbace32e2d99941fb6b1d788f6ec7f /src
parent2c8db0879109dd90443d7b276e5ca2daf83920bc (diff)
downloadgun-d056e5fb2a1fbb54e108c5c61384573acf21b4cf.tar.gz
gun-d056e5fb2a1fbb54e108c5c61384573acf21b4cf.tar.bz2
gun-d056e5fb2a1fbb54e108c5c61384573acf21b4cf.zip
Replace gun_tunnel_up/3 message with /4 variant
Also fixes all the tests. Lots of work remain around protocols (how best to pass the base stream_ref to them? maybe the current solution, maybe a new argument to Protocol:init) and strengthen the concept of stream_ref, at least with its own type.
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl37
-rw-r--r--src/gun_http.erl53
-rw-r--r--src/gun_http2.erl5
-rw-r--r--src/gun_raw.erl2
-rw-r--r--src/gun_socks.erl25
5 files changed, 94 insertions, 28 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 2f92b95..38c479e 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -668,10 +668,10 @@ connect(ServerPid, Destination, Headers) ->
-spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference().
connect(ServerPid, Destination, Headers, ReqOpts) ->
- StreamRef = make_ref(),
+ Tunnel = get_tunnel(ReqOpts),
+ StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
- %% @todo tunnel
gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef,
Destination, Headers, InitialFlow}),
StreamRef.
@@ -808,8 +808,6 @@ await_up(ServerPid, Timeout, MRef) ->
receive
{gun_up, ServerPid, Protocol} ->
{ok, Protocol};
- {gun_tunnel_up, ServerPid, Protocol} ->
- {ok, Protocol};
{'DOWN', MRef, process, ServerPid, Reason} ->
{error, {down, Reason}}
after Timeout ->
@@ -1092,8 +1090,17 @@ ensure_alpn_sni(Protocols0, TransOpts0, OriginHost) ->
%% Normal TLS handshake.
tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols, ReplyTo},
State0=#state{socket=Socket, transport=gun_tcp}) ->
+ StreamRef = maps:get(stream_ref, HandshakeEvent, undefined),
case normal_tls_handshake(Socket, State0, HandshakeEvent, Protocols) of
- {ok, TLSSocket, NewProtocol, State} ->
+ {ok, TLSSocket, NewProtocol0, State} ->
+ NewProtocol = {Protocol0, _} = case {StreamRef, NewProtocol0} of
+ {undefined, {_, _}} -> NewProtocol0;
+ {undefined, P} -> {P, #{}};
+ {_, {P, POpts}} -> {P, POpts#{stream_ref => StreamRef}};
+ {_, P} -> {P, #{stream_ref => StreamRef}}
+ end,
+ Protocol = gun:protocol_handler(Protocol0),
+ ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()},
commands([
{switch_transport, gun_tls, TLSSocket},
{switch_protocol, NewProtocol, ReplyTo}
@@ -1120,10 +1127,19 @@ tls_handshake(internal, {tls_handshake,
%% the handshake succeeded and whether we need to switch to a different protocol.
tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, Protocols, ReplyTo}},
State0=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- NewProtocol = protocol_negotiated(Negotiated, Protocols),
+ NewProtocol0 = protocol_negotiated(Negotiated, Protocols),
+ StreamRef = maps:get(stream_ref, HandshakeEvent, undefined),
+ NewProtocol = {Protocol0, _} = case {StreamRef, NewProtocol0} of
+ {undefined, {_, _}} -> NewProtocol0;
+ {undefined, P} -> {P, #{}};
+ {_, {P, POpts}} -> {P, POpts#{stream_ref => StreamRef}};
+ {_, P} -> {P, #{stream_ref => StreamRef}}
+ end,
+ Protocol = gun:protocol_handler(Protocol0),
+ ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()},
EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
socket => Socket,
- protocol => NewProtocol
+ protocol => Protocol:name()
}, EvHandlerState0),
commands([{switch_protocol, NewProtocol, ReplyTo}], State0#state{event_handler_state=EvHandlerState});
tls_handshake(info, {gun_tls_proxy, Socket, Error = {error, Reason}, {HandshakeEvent, _, _}},
@@ -1578,7 +1594,7 @@ commands([{switch_transport, Transport, Socket}|Tail], State=#state{
messages=Transport:messages(), protocol_state=ProtoState,
event_handler_state=EvHandlerState}));
commands([{switch_protocol, Protocol0, ReplyTo}], State0=#state{
- opts=Opts, socket=Socket, transport=Transport, protocol=CurrentProtocol,
+ opts=Opts, socket=Socket, transport=Transport,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Protocol, ProtoOpts} = case Protocol0 of
{P, PO} -> {protocol_handler(P), PO};
@@ -1586,11 +1602,6 @@ commands([{switch_protocol, Protocol0, ReplyTo}], State0=#state{
Protocol1 = protocol_handler(P),
{Protocol1, maps:get(Protocol1:opts_name(), Opts, #{})}
end,
- %% When we switch_protocol from socks we must send a gun_tunnel_up message.
- _ = case CurrentProtocol of
- gun_socks -> ReplyTo ! {gun_tunnel_up, self(), Protocol:name()};
- _ -> ok
- end,
{StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
%% We cancel the existing keepalive and, depending on the protocol,
diff --git a/src/gun_http.erl b/src/gun_http.erl
index bced64d..950bda1 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -313,14 +313,15 @@ handle_head(Data, State0=#http_state{streams=[#stream{ref=StreamRef, authority=A
handle_connect(Rest, State=#http_state{
streams=[Stream=#stream{ref={_, StreamRef, Destination}, reply_to=ReplyTo}|Tail]},
EvHandler, EvHandlerState0, 'HTTP/1.1', Status, Headers) ->
+ RealStreamRef = stream_ref(State, StreamRef),
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
_ = case Stream of
#stream{is_alive=false} -> ok;
- _ -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), fin, Status, Headers}
+ _ -> ReplyTo ! {gun_response, self(), RealStreamRef, fin, Status, Headers}
end,
%% @todo Figure out whether the event should trigger if the stream was cancelled.
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -334,23 +335,27 @@ handle_connect(Rest, State=#http_state{
case Destination of
#{transport := tls} ->
HandshakeEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
tls_opts => maps:get(tls_opts, Destination, []),
timeout => maps:get(tls_handshake_timeout, Destination, infinity)
},
Protocols = maps:get(protocols, Destination, [http2, http]),
-%% @todo gun_tunnel_up when the protocol switch is complete
{handle_ret([
{origin, <<"https">>, NewHost, NewPort, connect},
{tls_handshake, HandshakeEvent, Protocols, ReplyTo}
], State), EvHandlerState1};
_ ->
- [Protocol] = maps:get(protocols, Destination, [http]),
-%% @todo gun_tunnel_up
+ [NewProtocol0] = maps:get(protocols, Destination, [http]),
+ NewProtocol = {Protocol0, _} = case NewProtocol0 of
+ {P, POpts} -> {P, POpts#{stream_ref => RealStreamRef}};
+ P -> {P, #{stream_ref => RealStreamRef}}
+ end,
+ Protocol = gun:protocol_handler(Protocol0),
+ ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()},
{handle_ret([
{origin, <<"http">>, NewHost, NewPort, connect},
- {switch_protocol, Protocol, ReplyTo}
+ {switch_protocol, NewProtocol, ReplyTo}
], State), EvHandlerState1}
end.
@@ -537,6 +542,13 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, Ev
keepalive(State, _, EvHandlerState) ->
{State, EvHandlerState}.
+headers(State, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers, InitialFlow, EvHandler, EvHandlerState)
+ when is_list(StreamRef) ->
+ %% Because we switch protocol we may receive a StreamRef as a list.
+ %% But we are always the final StreamRef as HTTP/1.1.
+ headers(State, lists:last(StreamRef), ReplyTo, Method, Host, Port,
+ Path, Headers, InitialFlow, EvHandler, EvHandlerState);
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, EvHandler, EvHandlerState0) ->
@@ -547,6 +559,13 @@ headers(State=#http_state{opts=Opts, out=head},
{new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow), EvHandlerState}.
+request(State, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState)
+ when is_list(StreamRef) ->
+ %% Because we switch protocol we may receive a StreamRef as a list.
+ %% But we are always the final StreamRef as HTTP/1.1.
+ request(State, lists:last(StreamRef), ReplyTo, Method, Host, Port,
+ Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState);
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, EvHandler, EvHandlerState0) ->
@@ -634,6 +653,11 @@ transform_header_names(#http_state{opts=Opts}, Headers) ->
Fun -> lists:keymap(Fun, 1, Headers)
end.
+data(State, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState)
+ when is_list(StreamRef) ->
+ %% Because we switch protocol we may receive a StreamRef as a list.
+ %% But we are always the final StreamRef as HTTP/1.1.
+ data(State, lists:last(StreamRef), ReplyTo, IsFin, Data, EvHandler, EvHandlerState);
%% We are expecting a new stream.
data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _, _, EvHandlerState) ->
{error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
@@ -689,6 +713,11 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
+connect(State, StreamRef, ReplyTo, Destination, TunnelInfo, Headers, InitialFlow)
+ when is_list(StreamRef) ->
+ %% Because we switch protocol we may receive a StreamRef as a list.
+ %% But we are always the final StreamRef as HTTP/1.1.
+ connect(State, lists:last(StreamRef), ReplyTo, Destination, TunnelInfo, Headers, InitialFlow);
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
@@ -725,6 +754,11 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
<<"CONNECT">>, Authority, <<>>, InitialFlow).
+cancel(State, StreamRef, ReplyTo, EvHandler, EvHandlerState)
+ when is_list(StreamRef) ->
+ %% Because we switch protocol we may receive a StreamRef as a list.
+ %% But we are always the final StreamRef as HTTP/1.1.
+ cancel(State, lists:last(StreamRef), ReplyTo, EvHandler, EvHandlerState);
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
case is_stream(State0, StreamRef) of
@@ -741,6 +775,11 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
{error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0}
end.
+stream_info(State, StreamRef)
+ when is_list(StreamRef) ->
+ %% Because we switch protocol we may receive a StreamRef as a list.
+ %% But we are always the final StreamRef as HTTP/1.1.
+ stream_info(State, lists:last(StreamRef));
stream_info(#http_state{streams=Streams}, StreamRef) ->
case lists:keyfind(StreamRef, #stream.ref, Streams) of
#stream{reply_to=ReplyTo, is_alive=IsAlive} ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 5b8d229..ad52555 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -371,13 +371,14 @@ tunnel_commands([{switch_protocol, Protocol0, ReplyTo}|Tail], Stream=#stream{ref
gun_socks -> ReplyTo ! {gun_tunnel_up, self(), stream_ref(State, StreamRef), Protocol:name()};
_ -> ok
end,
+ RealStreamRef = stream_ref(State, StreamRef),
OriginSocket = #{
gun_pid => self(),
reply_to => ReplyTo,
- stream_ref => StreamRef
+ stream_ref => RealStreamRef
},
OriginTransport = gun_tcp_proxy,
- {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts),
+ {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts#{stream_ref => RealStreamRef}),
%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
tunnel_commands([{state, ProtoState}|Tail], Stream, Protocol, TunnelInfo, State);
tunnel_commands([{active, true}|Tail], Stream, Protocol, TunnelInfo, State) ->
diff --git a/src/gun_raw.erl b/src/gun_raw.erl
index 369353e..840774a 100644
--- a/src/gun_raw.erl
+++ b/src/gun_raw.erl
@@ -57,7 +57,7 @@ close(_, _, _, EvHandlerState) ->
EvHandlerState.
%% @todo Initiate closing on IsFin=fin.
-data(State=#raw_state{socket=Socket, transport=Transport}, undefined,
+data(State=#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef,
_ReplyTo, _IsFin, Data, _EvHandler, EvHandlerState) ->
Transport:send(Socket, Data),
{State, EvHandlerState}.
diff --git a/src/gun_socks.erl b/src/gun_socks.erl
index 5a42e3f..b7a08b2 100644
--- a/src/gun_socks.erl
+++ b/src/gun_socks.erl
@@ -26,6 +26,7 @@
%% @todo down
-record(socks_state, {
+ ref :: undefined | reference(), %% @todo Need a proper stream_ref type.
reply_to :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
@@ -84,6 +85,7 @@ opts_name() -> socks_opts.
has_keepalive() -> false.
init(ReplyTo, Socket, Transport, Opts) ->
+ StreamRef = maps:get(stream_ref, Opts, undefined),
5 = Version = maps:get(version, Opts, 5),
Auth = maps:get(auth, Opts, [none]),
Methods = <<case A of
@@ -91,7 +93,7 @@ init(ReplyTo, Socket, Transport, Opts) ->
none -> <<0>>
end || A <- Auth>>,
Transport:send(Socket, [<<5, (length(Auth))>>, Methods]),
- {connected_no_input, #socks_state{reply_to=ReplyTo, socket=Socket, transport=Transport,
+ {connected_no_input, #socks_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport,
opts=Opts, version=Version, status=auth_method_select}}.
switch_transport(Transport, Socket, State) ->
@@ -120,7 +122,8 @@ handle(<<1, 0>>, State=#socks_state{version=5, status=auth_username_password}) -
handle(<<1, _>>, #socks_state{version=5, status=auth_username_password}) ->
{error, {socks5, username_password_auth_failure}};
%% Connect reply.
-handle(<<5, 0, 0, Rest0/bits>>, #socks_state{reply_to=ReplyTo, opts=Opts, version=5, status=connect}) ->
+handle(<<5, 0, 0, Rest0/bits>>, #socks_state{ref=StreamRef, reply_to=ReplyTo, opts=Opts,
+ version=5, status=connect}) ->
%% @todo What to do with BoundAddr and BoundPort? Add as metadata to origin info?
{_BoundAddr, _BoundPort} = case Rest0 of
%% @todo Seen a server with <<1, 0:48>>.
@@ -137,16 +140,28 @@ handle(<<5, 0, 0, Rest0/bits>>, #socks_state{reply_to=ReplyTo, opts=Opts, versio
%% @todo The origin scheme is wrong when the next protocol is not HTTP.
case Opts of
#{transport := tls} ->
- HandshakeEvent = #{
+ HandshakeEvent0 = #{
tls_opts => maps:get(tls_opts, Opts, []),
timeout => maps:get(tls_handshake_timeout, Opts, infinity)
},
+ HandshakeEvent = case StreamRef of
+ undefined -> HandshakeEvent0;
+ _ -> HandshakeEvent0#{stream_ref => StreamRef}
+ end,
[{origin, <<"https">>, NewHost, NewPort, socks5},
{tls_handshake, HandshakeEvent, maps:get(protocols, Opts, [http2, http]), ReplyTo}];
_ ->
- [Protocol] = maps:get(protocols, Opts, [http]),
+ [NewProtocol0] = maps:get(protocols, Opts, [http]),
+ NewProtocol = {Protocol0, _} = case {StreamRef, NewProtocol0} of
+ {undefined, {_, _}} -> NewProtocol0;
+ {undefined, P} -> {P, #{}};
+ {_, {P, POpts}} -> {P, POpts#{stream_ref => StreamRef}};
+ {_, P} -> {P, #{stream_ref => StreamRef}}
+ end,
+ Protocol = gun:protocol_handler(Protocol0),
+ ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()},
[{origin, <<"http">>, NewHost, NewPort, socks5},
- {switch_protocol, Protocol, ReplyTo}]
+ {switch_protocol, NewProtocol, ReplyTo}]
end;
handle(<<5, Error, _/bits>>, #socks_state{version=5, status=connect}) ->
Reason = case Error of