aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-09-18 17:01:25 +0200
committerLoïc Hoguin <[email protected]>2020-09-21 15:52:26 +0200
commit8033850ab81ca0639489636bb8760d93900d4a80 (patch)
tree94c2df630a4c6fce97f6192a63a663a25f43266c
parente740356b5881c39a95715d6081689802edf469a0 (diff)
downloadgun-8033850ab81ca0639489636bb8760d93900d4a80.tar.gz
gun-8033850ab81ca0639489636bb8760d93900d4a80.tar.bz2
gun-8033850ab81ca0639489636bb8760d93900d4a80.zip
Initial success for h2 CONNECT -> https CONNECT -> https
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun.erl5
-rw-r--r--src/gun_http.erl3
-rw-r--r--src/gun_http2.erl341
-rw-r--r--src/gun_protocols.erl5
-rw-r--r--src/gun_tcp_proxy.erl15
-rw-r--r--src/gun_tls_proxy.erl12
-rw-r--r--src/gun_tls_proxy_http2_connect.erl10
-rw-r--r--src/gun_tunnel.erl414
-rw-r--r--test/rfc7540_SUITE.erl32
10 files changed, 609 insertions, 230 deletions
diff --git a/ebin/gun.app b/ebin/gun.app
index 9a5ab45..1f69574 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, 'gun', [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "2.0.0-pre.2"},
- {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_ws','gun_ws_h']},
+ {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib]},
{mod, {gun_app, []}},
diff --git a/src/gun.erl b/src/gun.erl
index 02b7302..4178c24 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -186,10 +186,7 @@
origin_port => inet:port_number(),
%% Non-stream intermediaries (for example SOCKS).
- intermediaries => [intermediary()],
-
- %% TLS proxy.
- tls_proxy_pid => pid()
+ intermediaries => [intermediary()]
}.
-export_type([tunnel_info/0]).
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 490f025..064bf04 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -378,6 +378,7 @@ handle_inform(Rest, State=#http_state{
{_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers),
Upgrade = cow_http_hd:parse_upgrade(Upgrade0),
ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers},
+ %% @todo We probably need to add_stream_ref?
{handle_ret({switch_protocol, raw, ReplyTo}, State), EvHandlerState0}
catch _:_ ->
%% When the Upgrade header is missing or invalid we treat
@@ -781,7 +782,7 @@ stream_info(#http_state{streams=Streams}, StreamRef) ->
case lists:keyfind(StreamRef, #stream.ref, Streams) of
#stream{reply_to=ReplyTo, is_alive=IsAlive} ->
{ok, #{
- ref => StreamRef,
+ ref => StreamRef, %% @todo Wrong stream_ref? base_stream_ref it?
reply_to => ReplyTo,
state => case IsAlive of
true -> running;
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 252aaec..df76195 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -37,6 +37,24 @@
-export([down/1]).
%-export([ws_upgrade/10]).
+-record(tunnel, {
+ %% The tunnel can either go requested->established
+ %% or requested->tls_handshake->established, or get
+ %% canceled.
+ state = requested :: requested | established,
+
+ %% Destination information.
+ destination = undefined :: gun:connect_destination(),
+
+ %% Tunnel information.
+ info = undefined :: gun:tunnel_info(),
+
+ %% Protocol module and state of the outer layer. Only initialized
+ %% after the TLS handshake has completed when TLS is involved.
+ protocol = undefined :: module(),
+ protocol_state = undefined :: any()
+}).
+
-record(stream, {
id = undefined :: cow_http2:streamid(),
@@ -58,10 +76,7 @@
handler_state :: undefined | gun_content_handler:state(),
%% CONNECT tunnel.
- tunnel :: {module(), any(), gun:tunnel_info()}
- | {setup, gun:connect_destination(), gun:tunnel_info()}
- | {tls_handshake, gun:connect_destination(), gun:tunnel_info()}
- | undefined
+ tunnel :: undefined | #tunnel{}
}).
-record(http2_state, {
@@ -315,64 +330,29 @@ data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0) ->
case get_stream_by_id(State, StreamID) of
Stream=#stream{tunnel=undefined} ->
data_frame(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream);
- #stream{ref=StreamRef, reply_to=ReplyTo,
- tunnel={_Protocol, _ProtoState, #{tls_proxy_pid := ProxyPid}}} ->
- %% When we receive a DATA frame that contains TLS-encoded data,
- %% we must first forward it to the ProxyPid to be decoded. The
- %% Gun process will receive it back as a tls_proxy_http2_connect
- %% message and forward it to the right stream via the handle_continue
- %% callback.
- OriginSocket = #{
- gun_pid => self(),
- reply_to => ReplyTo,
- stream_ref => stream_ref(State, StreamRef)
- },
- ProxyPid ! {tls_proxy_http2_connect, OriginSocket, Data},
- %% @todo What about IsFin?
- {State, EvHandlerState0};
- Stream=#stream{tunnel={Protocol, ProtoState0, TunnelInfo}} ->
- {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
- {tunnel_commands(Commands, Stream, Protocol, TunnelInfo, State), EvHandlerState}
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+% %% @todo What about IsFin?
+ {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
+ {tunnel_commands(Commands, Stream, State), EvHandlerState}
end.
-tunnel_commands(Command, Stream, Protocol, TunnelInfo, State) when not is_list(Command) ->
- tunnel_commands([Command], Stream, Protocol, TunnelInfo, State);
-tunnel_commands([], Stream, _, _, State) ->
+tunnel_commands(Command, Stream, State) when not is_list(Command) ->
+ tunnel_commands([Command], Stream, State);
+tunnel_commands([], Stream, State) ->
store_stream(State, Stream);
-tunnel_commands([{state, ProtoState}|Tail], Stream, Protocol, TunnelInfo, State) ->
- tunnel_commands(Tail, Stream#stream{tunnel={Protocol, ProtoState, TunnelInfo}},
- Protocol, TunnelInfo, State);
-tunnel_commands([SetCookie={set_cookie, _, _, _, _}|Tail], Stream, Protocol, TunnelInfo,
- State=#http2_state{commands_queue=Queue}) ->
- tunnel_commands(Tail, Stream, Protocol, TunnelInfo,
- State#http2_state{commands_queue=[SetCookie|Queue]});
-tunnel_commands([{origin, _, NewHost, NewPort, Type}|Tail], Stream, Protocol, TunnelInfo, State) ->
-%% @todo Event?
- tunnel_commands(Tail, Stream, Protocol, TunnelInfo#{
- origin_host => NewHost,
- origin_port => NewPort,
- intermediaries => [#{
- type => Type,
- host => maps:get(origin_host, TunnelInfo),
- port => maps:get(origin_port, TunnelInfo),
- transport => tcp, %% @todo
- protocol => Protocol:name()
- }|maps:get(intermediaries, TunnelInfo, [])]
- }, State);
-tunnel_commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], Stream=#stream{ref=StreamRef},
- _CurrentProtocol, TunnelInfo, State=#http2_state{opts=Opts}) ->
- {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
- OriginSocket = #{
- gun_pid => self(),
- reply_to => ReplyTo,
- stream_ref => stream_ref(State, StreamRef)
- },
- OriginTransport = gun_tcp_proxy,
- {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts),
-%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
- tunnel_commands([{state, ProtoState}|Tail], Stream, Protocol, TunnelInfo, State);
-tunnel_commands([{active, true}|Tail], Stream, Protocol, TunnelInfo, State) ->
- tunnel_commands(Tail, Stream, Protocol, TunnelInfo, State).
+tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID}, State0) ->
+ %% @todo EvHandler EvHandlerState
+ {State, _EvHandlerState} = maybe_send_data(State0, StreamID, IsFin, Data, todo, todo),
+ tunnel_commands(Tail, Stream, State);
+tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, State) ->
+ tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}, State);
+tunnel_commands([SetCookie={set_cookie, _, _, _, _}|Tail], Stream, State=#http2_state{commands_queue=Queue}) ->
+ tunnel_commands(Tail, Stream, State#http2_state{commands_queue=[SetCookie|Queue]}).
+
+continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->
+ ContinueStreamRef ++ [StreamRef];
+continue_stream_ref(State, StreamRef) ->
+ stream_ref(State, StreamRef).
data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) ->
@@ -410,6 +390,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
end,
{maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}.
+%% @todo Make separate functions for inform/connect/normal.
headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, commands_queue=Commands},
StreamID, IsFin, Headers, #{status := Status}, _BodyLen,
EvHandler, EvHandlerState0) ->
@@ -432,8 +413,14 @@ headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, command
headers => Headers
}, EvHandlerState0),
{State, EvHandlerState};
- Status >= 200, Status =< 299, element(1, Tunnel) =:= setup ->
- {setup, Destination=#{host := DestHost, port := DestPort}, TunnelInfo} = Tunnel,
+ Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested ->
+ #tunnel{destination=Destination, info=TunnelInfo0} = Tunnel,
+ #{host := DestHost, port := DestPort} = Destination,
+ TunnelInfo = TunnelInfo0#{
+ transport => maps:get(transport, Destination, tcp),
+ origin_host => DestHost,
+ origin_port => DestPort
+ },
%% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo.
%% We therefore do not need to call stream_ref/2.
RealStreamRef = stream_ref(State, StreamRef),
@@ -444,22 +431,17 @@ headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, command
status => Status,
headers => Headers
}, EvHandlerState0),
+ ContinueStreamRef = continue_stream_ref(State, StreamRef),
OriginSocket = #{
gun_pid => self(),
reply_to => ReplyTo,
- stream_ref => RealStreamRef
+ stream_ref => RealStreamRef,
+ %% @todo That's wrong when we are already in a tunnel?
+ handle_continue_stream_ref => ContinueStreamRef
},
- case Destination of
+ Proto = gun_tunnel,
+ ProtoOpts = case Destination of
#{transport := tls} ->
- Protocols = maps:get(protocols, Destination, [http2, http]),
- TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost),
- TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity),
-% HandshakeEvent = #{
-% stream_ref => StreamRef,
-% reply_to => ReplyTo,
-% tls_opts => maps:get(tls_opts, Destination, []),
-% timeout => maps:get(tls_handshake_timeout, Destination, infinity)
-% },
%tls_handshake(internal, {tls_handshake,
% HandshakeEvent0=#{tls_opts := TLSOpts0, timeout := TLSTimeout}, Protocols, ReplyTo},
% State=#state{socket=Socket, transport=Transport, origin_host=OriginHost, origin_port=OriginPort,
@@ -469,32 +451,38 @@ headers_frame(State0=#http2_state{opts=Opts, content_handlers=Handlers0, command
% socket => Socket
% },
% EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0),
- HandshakeEvent = undefined,
- {ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort,
- TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect,
- %% @todo ?
-% {HandshakeEvent, Protocols, ReplyTo}),
- {handle_continue, RealStreamRef, HandshakeEvent, Protocols}),
-% commands([{switch_transport, gun_tls_proxy, ProxyPid}], State#state{
-% socket=ProxyPid, transport=gun_tls_proxy, event_handler_state=EvHandlerState});
- %% @todo What about keepalive?
- {store_stream(State, Stream#stream{tunnel={tls_handshake, Destination,
- TunnelInfo#{origin_host => DestHost, origin_port => DestPort,
- %% @todo Fine having it, but we want the socket pid to simulate active.
- tls_proxy_pid => ProxyPid}}}),
- EvHandlerState};
+ Protocols = maps:get(protocols, Destination, [http2, http]),
+ TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost),
+ HandshakeEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ tls_opts => TLSOpts,
+ timeout => maps:get(tls_handshake_timeout, Destination, infinity)
+ },
+ Opts#{
+ stream_ref => RealStreamRef,
+ tunnel => #{
+ type => connect,
+ info => TunnelInfo,
+ handshake_event => HandshakeEvent,
+ protocols => Protocols
+ }
+ };
_ ->
[NewProtocol] = maps:get(protocols, Destination, [http]),
- {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
- %% @todo What about the StateName returned?
- {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts#{stream_ref => RealStreamRef}),
- %% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
- %% @todo What about keepalive?
- ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()},
- {store_stream(State, Stream#stream{tunnel={Protocol, ProtoState,
- TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}}),
- EvHandlerState}
- end;
+ Opts#{
+ stream_ref => RealStreamRef,
+ tunnel => #{
+ type => connect,
+ info => TunnelInfo,
+ new_protocol => NewProtocol
+ }
+ }
+ end,
+ {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{
+ info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}),
+ EvHandlerState};
true ->
ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
@@ -593,82 +581,18 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
-%% Continue handling or sending the data.
-handle_continue(StreamRef, Msg, State=#http2_state{opts=Opts}, EvHandler, EvHandlerState0)
- when is_reference(StreamRef) ->
- case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{id=StreamID, reply_to=ReplyTo,
- tunnel={tls_handshake, Destination, TunnelInfo=#{tls_proxy_pid := ProxyPid}}} ->
- case Msg of
- {gun_tls_proxy, ProxyPid, {ok, Negotiated},
- {handle_continue, _, _HandshakeEvent, Protocols}} ->
- #{host := DestHost, port := DestPort} = Destination,
- RealStreamRef = stream_ref(State, StreamRef),
- NewProtocol = gun_protocols:negotiated(Negotiated, Protocols),
-% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
-% socket => Socket,
-% protocol => NewProtocol
-% }, EvHandlerState0),
- OriginSocket = #{
- gun_pid => self(),
- reply_to => ReplyTo,
- stream_ref => RealStreamRef
- },
- {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
- {_, ProtoState} = Protocol:init(ReplyTo, OriginSocket, gun_tcp_proxy,
- ProtoOpts#{stream_ref => RealStreamRef}),
- ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()},
- {{state, store_stream(State, Stream#stream{tunnel={Protocol, ProtoState,
- TunnelInfo#{origin_host => DestHost, origin_port => DestPort}}})},
- EvHandlerState0};
- {gun_tls_proxy, ProxyPid, {error, _Reason},
- {handle_continue, _, _HandshakeEvent, _}} ->
-% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
-% error => Reason
-% }, EvHandlerState0),
-%% @todo
-% The TCP connection can be closed by either peer. The END_STREAM flag
-% on a DATA frame is treated as being equivalent to the TCP FIN bit. A
-% client is expected to send a DATA frame with the END_STREAM flag set
-% after receiving a frame bearing the END_STREAM flag. A proxy that
-% receives a DATA frame with the END_STREAM flag set sends the attached
-% data with the FIN bit set on the last TCP segment. A proxy that
-% receives a TCP segment with the FIN bit set sends a DATA frame with
-% the END_STREAM flag set. Note that the final TCP segment or DATA
-% frame could be empty.
- {{state, State}, EvHandlerState0};
- %% Data that must be sent as a DATA frame.
- {data, ReplyTo, _, IsFin, Data} ->
- {State1, EvHandlerState} = maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0),
- {{state, State1}, EvHandlerState}
- end;
- Stream=#stream{id=StreamID, tunnel={Protocol, ProtoState0, TunnelInfo=#{tls_proxy_pid := ProxyPid}}} ->
- case Msg of
- %% Data that was received and decrypted.
- {tls_proxy, ProxyPid, Data} ->
- {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
- {{state, tunnel_commands(Commands, Stream, Protocol, TunnelInfo, State)}, EvHandlerState};
- %% @todo What to do about those?
- {tls_proxy_closed, ProxyPid} ->
- todo;
- {tls_proxy_error, ProxyPid, _Reason} ->
- todo;
- %% Data that must be sent as a DATA frame.
- {data, _, _, IsFin, Data} ->
- {State1, EvHandlerState} = maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState0),
- {{state, State1}, EvHandlerState}
- end
-%% @todo Is this possible?
-% error ->
-% {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
- end;
-%% Tunneled data.
-handle_continue([StreamRef|Tail], Msg, State, EvHandler, EvHandlerState0) ->
+%% We always pass handle_continue messages to the tunnel.
+handle_continue(ContinueStreamRef, Msg, State, EvHandler, EvHandlerState0) ->
+ StreamRef = case ContinueStreamRef of
+ [SR|_] -> SR;
+ _ -> ContinueStreamRef
+ end,
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{tunnel={Proto, ProtoState0, TunnelInfo}} ->
- {ProtoState, EvHandlerState} = Proto:handle_continue(normalize_stream_ref(Tail),
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef,
Msg, ProtoState0, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState}%;
+ {{state, tunnel_commands(Commands, Stream, State)},
+ EvHandlerState}%;
%% The stream may have ended while TLS was being decoded. @todo What should we do?
% error ->
% {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
@@ -875,13 +799,14 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel={Proto, ProtoState0, TunnelInfo=#{
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
%% @todo So the event is probably not giving the right StreamRef?
{ProtoState, EvHandlerState} = Proto:request(ProtoState0, normalize_stream_ref(Tail),
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState};
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
@@ -890,18 +815,6 @@ request(State, [StreamRef|Tail], ReplyTo, Method, _Host, _Port,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
- %% get the ultimate stream by querying the #stream{} until we get the last one
- %% call Proto:request in that stream
- %% receive a {data, ...} back with the Tunnel for the StreamRef
- %% if gun_tls_proxy then we get the wrapped TLS data
- %% otherwise we get the data directly
- %% handle the data in the same way as normal; data follows the same scenario
- %% until we get a {data, ...} for the top-level stream
-
- %% What about data we receive from the socket?
- %%
- %% we get DATA with a StreamID for the CONNECT, we see it's CONNECT so we forward to Proto:data
-
initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
initial_flow(InitialFlow, _) -> InitialFlow.
@@ -925,7 +838,8 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He
gun_tls -> <<"https">>;
gun_tls_proxy -> <<"https">>;
gun_tcp -> <<"http">>;
- gun_tcp_proxy -> <<"http">>
+ gun_tcp_proxy -> <<"http">>;
+ gun_tls_proxy_http2_connect -> <<"http">>
end,
authority => Authority,
path => Path
@@ -935,25 +849,24 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He
normalize_stream_ref([StreamRef]) -> StreamRef;
normalize_stream_ref(StreamRef) -> StreamRef.
+%% @todo Make all calls go through this clause.
data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data,
EvHandler, EvHandlerState) when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
- #stream{id=StreamID, tunnel=Tunnel} ->
+ Stream=#stream{id=StreamID, tunnel=Tunnel} ->
case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
{ok, fin, _} ->
{error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
{ok, _, fin} ->
{error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
+ {ok, _, _} when Tunnel =:= undefined ->
+ maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState);
{ok, _, _} ->
- case Tunnel of
- %% We need to encrypt the data before we can send it. We send it
- %% directly to the gun_tls_proxy process and then
- {_, _, #{tls_proxy_pid := ProxyPid}} ->
- ok = gun_tls_proxy:send(ProxyPid, Data),
- {State, EvHandlerState};
- _ ->
- maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState)
- end
+ #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel,
+ {ProtoState, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
+ ReplyTo, IsFin, Data, EvHandler, EvHandlerState),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState1}
end;
error ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState}
@@ -961,10 +874,11 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
%% Tunneled data.
data(State, [StreamRef|Tail], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{tunnel={Proto, ProtoState0, TunnelInfo}} ->
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
{ProtoState, EvHandlerState} = Proto:data(ProtoState0, normalize_stream_ref(Tail),
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel={Proto, ProtoState, TunnelInfo}}), EvHandlerState};
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
@@ -1073,16 +987,16 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
- authority=Authority, path= <<>>, tunnel={setup, Destination, TunnelInfo}},
+ authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream);
%% Tunneled request.
connect(State, [StreamRef|Tail], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) ->
case get_stream_by_ref(State, StreamRef) of
- %% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel={Proto, ProtoState0, ProtoTunnelInfo}} ->
+ %% @todo Should we send an error to the user if the stream isn't ready.
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
ProtoState = Proto:connect(ProtoState0, normalize_stream_ref(Tail),
ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow),
- store_stream(State, Stream#stream{tunnel={Proto, ProtoState, ProtoTunnelInfo}});
+ store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}});
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
@@ -1110,6 +1024,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
EvHandlerState0}
end.
+%% @todo What about tunnels?
timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, Name}, TRef) ->
case cow_http2_machine:timeout(Name, TRef, HTTP2Machine0) of
{ok, HTTP2Machine} ->
@@ -1120,21 +1035,18 @@ timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, Nam
stream_info(State, StreamRef) when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
- #stream{reply_to=ReplyTo, tunnel={Protocol, _, TunnelInfo=#{
- origin_host := OriginHost, origin_port := OriginPort}}} ->
+ #stream{reply_to=ReplyTo, tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState,
+ info=#{transport := Transport, origin_host := OriginHost, origin_port := OriginPort}}} ->
{ok, #{
ref => StreamRef,
reply_to => ReplyTo,
state => running,
tunnel => #{
- transport => case TunnelInfo of
- #{tls_proxy_pid := _} -> tls;
- _ -> tcp
- end,
- protocol => Protocol:name(),
- origin_scheme => case TunnelInfo of
- #{tls_proxy_pid := _} -> <<"https">>;
- _ -> <<"http">>
+ transport => Transport,
+ protocol => Proto:tunneled_name(ProtoState),
+ origin_scheme => case Transport of
+ tcp -> <<"http">>;
+ tls -> <<"https">>
end,
origin_host => OriginHost,
origin_port => OriginPort
@@ -1152,17 +1064,19 @@ stream_info(State, StreamRef) when is_reference(StreamRef) ->
%% Tunneled streams.
stream_info(State=#http2_state{transport=Transport}, StreamRefList=[StreamRef|Tail]) ->
case get_stream_by_ref(State, StreamRef) of
- #stream{tunnel={Protocol, ProtoState, TunnelInfo=#{host := TunnelHost, port := TunnelPort}}} ->
+ #stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState,
+ info=TunnelInfo=#{host := TunnelHost, port := TunnelPort}}} ->
%% We must return the real StreamRef as seen by the user.
%% We therefore set it on return, with the outer layer "winning".
%%
%% We also add intermediaries which are prepended to the list and
%% therefore are ultimately given from outer to inner layer just
%% like gun:info/1 intermediaries.
- case Protocol:stream_info(ProtoState, normalize_stream_ref(Tail)) of
+ case Proto:stream_info(ProtoState, normalize_stream_ref(Tail)) of
{ok, undefined} ->
{ok, undefined};
{ok, Info} ->
+ %% @todo Double check intermediaries.
Intermediaries1 = maps:get(intermediaries, TunnelInfo, []),
Intermediaries2 = maps:get(intermediaries, Info, []),
{ok, Info#{
@@ -1184,6 +1098,7 @@ stream_info(State=#http2_state{transport=Transport}, StreamRefList=[StreamRef|Ta
{ok, undefined}
end.
+%% @todo Tunnels.
down(#http2_state{stream_refs=Refs}) ->
maps:keys(Refs).
diff --git a/src/gun_protocols.erl b/src/gun_protocols.erl
index 4209641..65d5211 100644
--- a/src/gun_protocols.erl
+++ b/src/gun_protocols.erl
@@ -18,6 +18,7 @@
-export([handler/1]).
-export([handler_and_opts/2]).
-export([negotiated/2]).
+-export([stream_ref/1]).
-spec add_stream_ref(Protocol, undefined | gun:stream_ref())
-> Protocol when Protocol :: gun:protocol().
@@ -53,3 +54,7 @@ negotiated({ok, <<"h2">>}, _) -> http2;
negotiated({ok, <<"http/1.1">>}, _) -> http;
negotiated({error, protocol_not_negotiated}, [Protocol]) -> Protocol;
negotiated({error, protocol_not_negotiated}, _) -> http.
+
+-spec stream_ref(gun:protocol()) -> undefined | gun:stream_ref().
+stream_ref({_, ProtocolOpts}) -> maps:get(stream_ref, ProtocolOpts, undefined);
+stream_ref(_) -> undefined.
diff --git a/src/gun_tcp_proxy.erl b/src/gun_tcp_proxy.erl
index b4236f4..0107ac1 100644
--- a/src/gun_tcp_proxy.erl
+++ b/src/gun_tcp_proxy.erl
@@ -24,8 +24,14 @@
-export([close/1]).
-type socket() :: #{
+ %% The pid of the Gun connection.
+ gun_pid := pid(),
+
+ %% The pid of the process that gets replies for this tunnel.
reply_to := pid(),
- stream_ref := reference() | [reference()]
+
+ %% The full stream reference for this tunnel.
+ stream_ref := gun:stream_ref()
}.
name() -> tcp_proxy.
@@ -41,10 +47,17 @@ connect(_, _, _, _) ->
error(not_implemented).
-spec send(socket(), iodata()) -> ok.
+send(#{gun_pid := GunPid, reply_to := ReplyTo, stream_ref := StreamRef,
+ handle_continue_stream_ref := ContinueStreamRef}, Data) ->
+ GunPid ! {handle_continue, ContinueStreamRef, {data, ReplyTo, StreamRef, nofin, Data}},
+ ok;
send(#{reply_to := ReplyTo, stream_ref := StreamRef}, Data) ->
gen_statem:cast(self(), {data, ReplyTo, StreamRef, nofin, Data}).
-spec setopts(_, _) -> no_return().
+setopts(#{handle_continue_stream_ref := _}, _) ->
+ %% We send messages automatically regardless of active mode.
+ ok;
setopts(_, _) ->
error(not_implemented).
diff --git a/src/gun_tls_proxy.erl b/src/gun_tls_proxy.erl
index ab394bd..cba03de 100644
--- a/src/gun_tls_proxy.erl
+++ b/src/gun_tls_proxy.erl
@@ -95,6 +95,7 @@
extra :: any()
}).
+%-define(DEBUG_PROXY,1).
-ifdef(DEBUG_PROXY).
-define(DEBUG_LOG(Format, Args),
io:format(user, "(~p) ~p:~p/~p:" ++ Format ++ "~n",
@@ -269,17 +270,17 @@ connected({call, From}, Msg={send, Data}, State=#state{proxy_socket=Socket}) ->
%% of the data isn't yet complete. We wrap the message in a handle_continue
%% tuple and provide the StreamRef for further processing.
connected(info, Msg={ssl, Socket, Data}, State=#state{owner_pid=OwnerPid, proxy_socket=Socket,
- out_socket=#{stream_ref := StreamRef}}) ->
+ out_socket=#{handle_continue_stream_ref := StreamRef}}) ->
?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]),
OwnerPid ! {handle_continue, StreamRef, {tls_proxy, self(), Data}},
keep_state_and_data;
connected(info, Msg={ssl_closed, Socket}, State=#state{owner_pid=OwnerPid, proxy_socket=Socket,
- out_socket=#{stream_ref := StreamRef}}) ->
+ out_socket=#{handle_continue_stream_ref := StreamRef}}) ->
?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]),
OwnerPid ! {handle_continue, StreamRef, {tls_proxy_closed, self()}},
keep_state_and_data;
connected(info, Msg={ssl_error, Socket, Reason}, State=#state{owner_pid=OwnerPid, proxy_socket=Socket,
- out_socket=#{stream_ref := StreamRef}}) ->
+ out_socket=#{handle_continue_stream_ref := StreamRef}}) ->
?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]),
OwnerPid ! {handle_continue, StreamRef, {tls_proxy_error, self(), Reason}},
keep_state_and_data;
@@ -340,8 +341,9 @@ handle_common(cast, Msg={send_result, From, Result}, State) ->
gen_statem:reply(From, Result),
keep_state_and_data;
%% Messages from the real socket.
-handle_common(info, Msg={OK, Socket, Data}, State=#state{proxy_pid=ProxyPid,
- out_socket=Socket, out_messages={OK, _, _}}) ->
+%% @todo Make _Socket and __Socket match again.
+handle_common(info, Msg={OK, _Socket, Data}, State=#state{proxy_pid=ProxyPid,
+ out_socket=__Socket, out_messages={OK, _, _}}) ->
?DEBUG_LOG("msg ~0p state ~0p", [Msg, State]),
ProxyPid ! {tls_proxy, self(), Data},
keep_state_and_data;
diff --git a/src/gun_tls_proxy_http2_connect.erl b/src/gun_tls_proxy_http2_connect.erl
index c423571..70b4824 100644
--- a/src/gun_tls_proxy_http2_connect.erl
+++ b/src/gun_tls_proxy_http2_connect.erl
@@ -31,7 +31,10 @@
reply_to := pid(),
%% The full stream reference for this tunnel.
- stream_ref := reference() | [reference()]
+ stream_ref := gun:stream_ref(),
+
+ %% The full stream reference for the responsible HTTP/2 stream.
+ handle_continue_stream_ref := gun:stream_ref()
}.
name() -> tls_proxy_http2_connect.
@@ -47,8 +50,9 @@ connect(_, _, _, _) ->
error(not_implemented).
-spec send(socket(), iodata()) -> ok.
-send(#{gun_pid := GunPid, reply_to := ReplyTo, stream_ref := StreamRef}, Data) ->
- GunPid ! {handle_continue, StreamRef, {data, ReplyTo, StreamRef, nofin, Data}},
+send(S=#{gun_pid := GunPid, reply_to := ReplyTo, stream_ref := DataStreamRef,
+ handle_continue_stream_ref := StreamRef}, Data) ->
+ GunPid ! {handle_continue, StreamRef, {data, ReplyTo, DataStreamRef, nofin, Data}},
ok.
-spec setopts(_, _) -> no_return().
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
new file mode 100644
index 0000000..8da4c5a
--- /dev/null
+++ b/src/gun_tunnel.erl
@@ -0,0 +1,414 @@
+%% Copyright (c) 2020, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% This module is used when a tunnel is established and either
+%% StreamRef dereference or a TLS proxy process must be handled
+%% by the tunnel layer.
+-module(gun_tunnel).
+
+-export([init/4]).
+-export([handle/4]).
+-export([handle_continue/5]).
+-export([update_flow/4]).
+-export([closing/4]).
+-export([close/4]).
+-export([keepalive/3]).
+-export([headers/11]).
+-export([request/12]).
+-export([data/7]).
+-export([connect/7]).
+-export([cancel/5]).
+-export([timeout/3]).
+-export([stream_info/2]).
+-export([tunneled_name/1]).
+-export([down/1]).
+%-export([ws_upgrade/10]).
+
+-record(tunnel_state, {
+ %% Fake socket and transport.
+ socket = undefined :: #{
+ gun_pid := pid(),
+ reply_to := pid(),
+ stream_ref := gun:stream_ref(),
+ handle_continue_stream_ref := gun:stream_ref()
+ } | pid(),
+ transport = undefined :: gun_tcp_proxy | gun_tls_proxy,
+
+ %% The stream_ref from which the stream was created. When
+ %% the tunnel exists as a result of HTTP/2 CONNECT -> HTTP/1.1 CONNECT
+ %% the stream_ref is the same as the HTTP/1.1 CONNECT one.
+ stream_ref = undefined :: gun:stream_ref(),
+
+ %% When the tunnel is a 'connect' tunnel we must dereference the
+ %% stream_ref. When it is 'socks' we must not as there was no
+ %% stream involved in creating the tunnel.
+ type = undefined :: connect | socks,
+
+ %% Tunnel information.
+ info = undefined :: gun:tunnel_info(),
+
+ %% The origin socket of the TLS proxy, if any. This is used to forward
+ %% messages to the proxy process in order to decrypt the data.
+ tls_origin_socket = undefined :: undefined | #{
+ gun_pid := pid(),
+ reply_to := pid(),
+ stream_ref := gun:stream_ref(),
+ handle_continue_stream_ref => gun:stream_ref()
+ },
+
+ opts = undefined :: undefined | any(), %% @todo Opts type.
+
+ %% Protocol module and state of the outer layer. Only initialized
+ %% after the TLS handshake has completed when TLS is involved.
+ protocol = undefined :: module(),
+ protocol_state = undefined :: any()
+}).
+
+%% Socket is the "origin socket" and Transport the "origin transport".
+%% When the Transport indicate a TLS handshake was requested, the socket
+%% and transport are given to the intermediary TLS proxy process.
+%%
+%% Opts is the options for the underlying HTTP/2 connection,
+%% with some extra information added for the tunnel.
+%%
+%% @todo Mark the tunnel options as reserved.
+init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) ->
+ #{
+ type := TunnelType,
+ info := TunnelInfo
+ } = Tunnel,
+ State = #tunnel_state{stream_ref=StreamRef, type=TunnelType, info=TunnelInfo,
+ opts=maps:without([stream_ref, tunnel], Opts)},
+ case Tunnel of
+ %% Initialize the protocol.
+ #{new_protocol := NewProtocol} ->
+ {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
+ {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport,
+ ProtoOpts#{stream_ref => StreamRef}),
+%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()},
+ {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport,
+ protocol=Proto, protocol_state=ProtoState}};
+ %% We can't initialize the protocol until the TLS handshake has completed.
+ #{handshake_event := HandshakeEvent, protocols := Protocols} ->
+ #{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket,
+ %% @todo FIX THIS!!
+ % #{
+ % origin_host := DestHost,
+ % origin_port := DestPort
+ % } = TunnelInfo,
+%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState!
+%% Otherwise we can't do the TLS events.
+ #{
+ tls_opts := TLSOpts,
+ timeout := TLSTimeout
+ } = HandshakeEvent,
+ {ok, ProxyPid} = gun_tls_proxy:start_link("fake", 12345,% @todo FIX THIS!! DestHost, DestPort,
+ TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect,
+ {handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}),
+ {tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy,
+ tls_origin_socket=OriginSocket}}
+ end.
+
+%% When we receive data we pass it forward directly for TCP;
+%% or we decrypt it and pass it via handle_continue for TLS.
+handle(Data, State=#tunnel_state{transport=gun_tcp_proxy,
+ protocol=Proto, protocol_state=ProtoState0},
+ EvHandler, EvHandlerState0) ->
+ {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
+ {{state, commands(Commands, State)}, EvHandlerState};
+handle(Data, State=#tunnel_state{transport=gun_tls_proxy,
+ socket=ProxyPid, tls_origin_socket=OriginSocket},
+ _EvHandler, EvHandlerState) ->
+ %% When we receive a DATA frame that contains TLS-encoded data,
+ %% we must first forward it to the ProxyPid to be decoded. The
+ %% Gun process will receive it back as a tls_proxy_http2_connect
+ %% message and forward it to the right stream via the handle_continue
+ %% callback.
+ ProxyPid ! {tls_proxy_http2_connect, OriginSocket, Data},
+ {{state, State}, EvHandlerState}.
+
+%% This callback will only be called for TLS.
+%%
+%% The StreamRef in this callback is special because it includes
+%% a reference() for Socks layers as well.
+handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated},
+ {handle_continue, _, HandshakeEvent, Protocols}},
+ State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts},
+ _EvHandler, EvHandlerState0)
+ when is_reference(ContinueStreamRef) ->
+ #{reply_to := ReplyTo} = HandshakeEvent,
+ NewProtocol = gun_protocols:negotiated(Negotiated, Protocols),
+ {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
+% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+% socket => Socket,
+% protocol => NewProtocol
+% }, EvHandlerState0),
+ %% @todo Terminate the current protocol or something?
+ OriginSocket = #{
+ gun_pid => self(),
+ reply_to => ReplyTo,
+ stream_ref => StreamRef%,
+% handle_continue_stream_ref => ContinueStreamRef
+ },
+ {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy,
+ ProtoOpts#{stream_ref => StreamRef}),
+ ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()},
+ {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0};
+handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, _Reason},
+ {handle_continue, _, _HandshakeEvent, _}},
+ #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0)
+ when is_reference(ContinueStreamRef) ->
+%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+%% error => Reason
+%% }, EvHandlerState0),
+%%% @todo
+%% The TCP connection can be closed by either peer. The END_STREAM flag
+%% on a DATA frame is treated as being equivalent to the TCP FIN bit. A
+%% client is expected to send a DATA frame with the END_STREAM flag set
+%% after receiving a frame bearing the END_STREAM flag. A proxy that
+%% receives a DATA frame with the END_STREAM flag set sends the attached
+%% data with the FIN bit set on the last TCP segment. A proxy that
+%% receives a TCP segment with the FIN bit set sends a DATA frame with
+%% the END_STREAM flag set. Note that the final TCP segment or DATA
+%% frame could be empty.
+ {[], EvHandlerState0};
+%% Send the data. This causes TLS to encrypt the data and send it to the inner layer.
+handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data},
+ #tunnel_state{socket=Socket, transport=Transport}, _EvHandler, EvHandlerState)
+ when is_reference(ContinueStreamRef) ->
+ {[{send, IsFin, Data}], EvHandlerState};
+handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data},
+ State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState0)
+ when is_reference(ContinueStreamRef) ->
+ {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ {{state, commands(Commands, State)}, EvHandlerState};
+%% @todo What to do about those? Does it matter which one closes/errors out?
+handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid},
+ #tunnel_state{socket=ProxyPid}, _EvHandler, _EvHandlerState0)
+ when is_reference(ContinueStreamRef) ->
+ todo;
+handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, _Reason},
+ #tunnel_state{socket=ProxyPid}, _EvHandler, _EvHandlerState0)
+ when is_reference(ContinueStreamRef) ->
+ todo;
+%% We always dereference the ContinueStreamRef because it includes a
+%% reference() for Socks layers too.
+%%
+%% @todo Assert StreamRef to be our reference().
+handle_continue([_StreamRef|ContinueStreamRef0], Msg,
+ State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState0) ->
+ ContinueStreamRef = case ContinueStreamRef0 of
+ [CSR] -> CSR;
+ _ -> ContinueStreamRef0
+ end,
+ {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef,
+ Msg, ProtoState, EvHandler, EvHandlerState0),
+ {{state, commands(Commands, State)}, EvHandlerState}.
+
+%% @todo Probably just pass it forward?
+update_flow(_State, _ReplyTo, _StreamRef, _Inc) ->
+ todo.
+
+%% @todo ?
+closing(_Reason, _State, _EvHandler, _EvHandlerState) ->
+ todo.
+
+%% @todo ?
+close(_Reason, _State, _EvHandler, _EvHandlerState) ->
+ todo.
+
+%% @todo ?
+keepalive(_State, _EvHandler, _EvHandlerState) ->
+ todo.
+
+%% We pass the headers forward and optionally dereference StreamRef.
+headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
+ StreamRef0, ReplyTo, Method, Host, Port, Path, Headers,
+ InitialFlow, EvHandler, EvHandlerState0) ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, StreamRef,
+ ReplyTo, Method, Host, Port, Path, Headers,
+ InitialFlow, EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
+
+%% We pass the request forward and optionally dereference StreamRef.
+request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
+ StreamRef0, ReplyTo, Method, Host, Port, Path, Headers, Body,
+ InitialFlow, EvHandler, EvHandlerState0) ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ {ProtoState, EvHandlerState} = Proto:request(ProtoState0, StreamRef,
+ ReplyTo, Method, Host, Port, Path, Headers, Body,
+ InitialFlow, EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
+
+%% We pass the data forward and optionally dereference StreamRef.
+data(State=#tunnel_state{socket=Socket, transport=Transport,
+ stream_ref=TunnelStreamRef0, protocol=Proto, protocol_state=ProtoState0},
+ StreamRef0, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ->
+ TunnelStreamRef = if
+ is_list(TunnelStreamRef0) -> lists:last(TunnelStreamRef0);
+ true -> TunnelStreamRef0
+ end,
+ case StreamRef0 of
+ TunnelStreamRef ->
+ ok = Transport:send(Socket, Data),
+ {State, EvHandlerState0};
+ _ ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ {ProtoState, EvHandlerState} = Proto:data(ProtoState0, StreamRef,
+ ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}
+ end.
+
+%% We pass the CONNECT request forward and optionally dereference StreamRef.
+connect(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
+ StreamRef0, ReplyTo, Destination, TunnelInfo, Headers, InitialFlow) ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ ProtoState = Proto:connect(ProtoState0, StreamRef,
+ ReplyTo, Destination, TunnelInfo, Headers, InitialFlow),
+ State#tunnel_state{protocol_state=ProtoState}.
+
+%% @todo ?
+cancel(_State, _StreamRef, _ReplyTo, _EvHandler, _EvHandlerState) ->
+ todo.
+
+%% @todo ?
+%% ... we might have to do update Cowlib there...
+timeout(_State, {cow_http2_machine, _Name}, _TRef) ->
+ todo.
+
+stream_info(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, StreamRef0) ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ Proto:stream_info(ProtoState, StreamRef).
+
+tunneled_name(#tunnel_state{protocol=Proto}) ->
+ Proto:name().
+
+%% @todo ?
+down(_State) ->
+ todo.
+
+%% Internal.
+
+commands(Command, State) when not is_list(Command) ->
+ commands([Command], State);
+commands([], State) ->
+ State;
+commands([{state, ProtoState}|Tail], State) ->
+ commands(Tail, State#tunnel_state{protocol_state=ProtoState});
+%% @todo We must pass down the set_cookie commands. Have a commands_queue.
+commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) ->
+ commands(Tail, State);
+commands([{send, IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) ->
+ Transport:send(Socket, Data),
+ commands(Tail, State);
+%% @todo How to handle origin changes?
+commands([{origin, _, _NewHost, _NewPort, _Type}|Tail], State) ->
+ commands(Tail, State);
+commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
+ State=#tunnel_state{stream_ref=TunnelStreamRef, opts=Opts, protocol=CurrentProto}) ->
+ Type = case CurrentProto:name() of
+ socks -> socks;
+ _ -> connect
+ end,
+ StreamRef = case Type of
+ socks -> TunnelStreamRef;
+ connect -> gun_protocols:stream_ref(NewProtocol)
+ end,
+ ContinueStreamRef0 = continue_stream_ref(State),
+ ContinueStreamRef = case Type of
+ socks -> ContinueStreamRef0 ++ [make_ref()];
+ connect -> ContinueStreamRef0 ++ [if is_list(StreamRef) -> lists:last(StreamRef); true -> StreamRef end]
+ end,
+ OriginSocket = #{
+ gun_pid => self(),
+ reply_to => ReplyTo,
+ stream_ref => StreamRef,
+ handle_continue_stream_ref => ContinueStreamRef
+ },
+ ProtoOpts = Opts#{
+ stream_ref => StreamRef,
+ tunnel => #{
+ type => Type,
+ info => #{}, %% @todo
+ new_protocol => NewProtocol
+ }
+ },
+ Proto = gun_tunnel,
+ {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState});
+commands([{tls_handshake, HandshakeEvent, Protocols, ReplyTo}|Tail],
+ State=#tunnel_state{opts=Opts, protocol=CurrentProto}) ->
+ Type = case CurrentProto:name() of
+ socks -> socks;
+ _ -> connect
+ end,
+ #{
+ stream_ref := StreamRef
+ } = HandshakeEvent,
+ ContinueStreamRef0 = continue_stream_ref(State),
+ ContinueStreamRef = case Type of
+ socks -> ContinueStreamRef0 ++ [make_ref()];
+ connect -> ContinueStreamRef0 ++ [lists:last(StreamRef)]
+ end,
+ OriginSocket = #{
+ gun_pid => self(),
+ reply_to => ReplyTo,
+ stream_ref => StreamRef,
+ handle_continue_stream_ref => ContinueStreamRef
+ },
+ ProtoOpts = Opts#{
+ stream_ref => StreamRef,
+ tunnel => #{
+ type => Type,
+ info => #{}, %% @todo
+ handshake_event => HandshakeEvent,
+ protocols => Protocols
+ }
+ },
+ Proto = gun_tunnel,
+ {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState});
+commands([{active, true}|Tail], State) ->
+ commands(Tail, State).
+
+continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) ->
+ if
+ is_list(ContinueStreamRef) -> ContinueStreamRef;
+ true -> [ContinueStreamRef]
+ end;
+continue_stream_ref(#tunnel_state{tls_origin_socket=#{handle_continue_stream_ref := ContinueStreamRef}}) ->
+ if
+ is_list(ContinueStreamRef) -> ContinueStreamRef;
+ true -> [ContinueStreamRef]
+ end.
+
+maybe_dereference(#tunnel_state{stream_ref=RealStreamRef,
+ type=connect, protocol=gun_tunnel}, [_StreamRef|Tail]) ->
+ %% @todo Assert that we got the right stream.
+% StreamRef = if is_list(RealStreamRef) -> lists:last(RealStreamRef); true -> RealStreamRef end,
+ case Tail of
+ [Ref] -> Ref;
+ _ -> Tail
+ end;
+%% We do not dereference when we are the target.
+%% For example when creating a new stream on the origin via tunnel(s).
+maybe_dereference(#tunnel_state{type=connect}, StreamRef) ->
+ StreamRef;
+maybe_dereference(#tunnel_state{type=socks}, StreamRef) ->
+ StreamRef.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index c8aff46..4116ef4 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -557,6 +557,15 @@ do_connect_http(OriginScheme, OriginTransport, OriginProtocol, ProxyScheme, Prox
gun:close(ConnPid).
connect_cowboy_http_via_h2c(_) ->
+
+%dbg:tracer(),
+%dbg:tpl(gun_http, []),
+%dbg:tpl(gun_http2, []),
+%dbg:tpl(gun_tunnel, []),
+%dbg:tpl(gun_tcp_proxy, []),
+%dbg:tpl(gun, []),
+%dbg:p(all, c),
+
doc("CONNECT can be used to establish a TCP connection "
"to an HTTP/1.1 server via a TCP HTTP/2 proxy. (RFC7540 8.3)"),
do_connect_cowboy(<<"http">>, tcp, http, <<"http">>, tcp).
@@ -623,6 +632,7 @@ do_connect_cowboy(_OriginScheme, OriginTransport, OriginProtocol, _ProxyScheme,
{response, nofin, 200, _} = gun:await(ConnPid, StreamRef),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef),
ProxiedStreamRef = gun:get(ConnPid, "/proxied", #{}, #{tunnel => StreamRef}),
+ timer:sleep(1000),
{response, nofin, 200, _} = gun:await(ConnPid, ProxiedStreamRef),
%% We can create more requests on the proxy as well.
ProxyStreamRef = gun:get(ConnPid, "/"),
@@ -652,9 +662,23 @@ do_cowboy_origin(OriginTransport, OriginProtocol) ->
connect_http_via_http_via_h2c(_) ->
doc("CONNECT can be used to establish a TCP connection "
"to an HTTP/1.1 server via a tunnel going through both "
- "an HTTP/2 and an HTTP/1.1 proxy. (RFC7231 4.3.6)"),
+ "a TCP HTTP/2 and a TCP HTTP/1.1 proxy. (RFC7540 8.3)"),
do_connect_via_multiple_proxies(tcp, http, tcp, http, tcp).
+connect_https_via_https_via_h2(_) ->
+
+%dbg:tracer(),
+%dbg:tpl(?MODULE, []),
+%dbg:tpl(gun, []),
+%dbg:tpl(gun_http, []),
+%dbg:tpl(gun_http2, []),
+%dbg:p(all, c),
+
+ doc("CONNECT can be used to establish a TLS connection "
+ "to an HTTP/1.1 server via a tunnel going through both "
+ "a TLS HTTP/2 and a TLS HTTP/1.1 proxy. (RFC7540 8.3)"),
+ do_connect_via_multiple_proxies(tls, http, tls, http, tls).
+
do_connect_via_multiple_proxies(OriginTransport, OriginProtocol,
Proxy2Transport, Proxy2Protocol, Proxy1Transport) ->
{ok, Ref, OriginPort} = do_cowboy_origin(OriginTransport, OriginProtocol),
@@ -665,16 +689,18 @@ do_connect_via_multiple_proxies(OriginTransport, OriginProtocol,
{ok, Proxy2Pid, Proxy2Port} = rfc7231_SUITE:do_proxy_start(Proxy2Transport),
%% First proxy.
{ok, ConnPid} = gun:open("localhost", Proxy1Port, #{
+ transport => Proxy1Transport,
protocols => [http2]
}),
{ok, http2} = gun:await_up(ConnPid),
+ handshake_completed = receive_from(Proxy1Pid),
%% Second proxy.
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => Proxy2Port,
+ transport => Proxy2Transport,
protocols => [Proxy2Protocol]
}, []),
- handshake_completed = receive_from(Proxy1Pid),
Authority1 = iolist_to_binary(["localhost:", integer_to_binary(Proxy2Port)]),
{request, #{
<<":method">> := <<"CONNECT">>,
@@ -686,6 +712,7 @@ do_connect_via_multiple_proxies(OriginTransport, OriginProtocol,
StreamRef2 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
+ transport => OriginTransport,
protocols => [OriginProtocol]
}, [], #{tunnel => StreamRef1}),
Authority2 = iolist_to_binary(["localhost:", integer_to_binary(OriginPort)]),
@@ -697,6 +724,7 @@ do_connect_via_multiple_proxies(OriginTransport, OriginProtocol,
ProxiedStreamRef = gun:get(ConnPid, "/proxied", [], #{tunnel => StreamRef2}),
{response, nofin, 200, _} = gun:await(ConnPid, ProxiedStreamRef),
gun:close(ConnPid)
+ %% @todo Also test stream_info.
after
cowboy:stop_listener(Ref)
end.