diff options
author | Loïc Hoguin <[email protected]> | 2020-10-16 11:33:31 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2020-10-16 11:33:31 +0200 |
commit | 356bf47edeb5b78765200e78d9b7a48aa98b97f5 (patch) | |
tree | 83c35cbb5e7120bd1d1e0a5693571f8b18c088d7 /src/gun_tunnel.erl | |
parent | f2e8d103dd7827251fa726c42e307e42cef8a3dc (diff) | |
download | gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.gz gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.bz2 gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.zip |
Add or fix events inside or related to CONNECT tunnels
Diffstat (limited to 'src/gun_tunnel.erl')
-rw-r--r-- | src/gun_tunnel.erl | 196 |
1 files changed, 131 insertions, 65 deletions
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index ca9d3aa..cc58351 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -17,7 +17,7 @@ %% by the tunnel layer. -module(gun_tunnel). --export([init/4]). +-export([init/6]). -export([handle/4]). -export([handle_continue/5]). -export([update_flow/4]). @@ -27,13 +27,14 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([tunneled_name/2]). -export([down/1]). -%-export([ws_upgrade/10]). +-export([ws_upgrade/10]). +-export([ws_send/6]). -record(tunnel_state, { %% Fake socket and transport. @@ -97,7 +98,8 @@ %% with some extra information added for the tunnel. %% %% @todo Mark the tunnel options as reserved. -init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) -> +init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}, + EvHandler, EvHandlerState0) -> #{ type := TunnelType, transport_name := TunnelTransport, @@ -113,7 +115,10 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), %% When the tunnel protocol is HTTP/1.1 or SOCKS %% the gun_tunnel_up message was already sent. %% @@ -124,34 +129,36 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()} end, {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, - protocol=Proto, protocol_state=ProtoState}}; + protocol=Proto, protocol_state=ProtoState}, + EvHandlerState}; %% We can't initialize the protocol until the TLS handshake has completed. - #{handshake_event := HandshakeEvent, protocols := Protocols} -> + #{handshake_event := HandshakeEvent0, protocols := Protocols} -> #{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket, #{ origin_host := DestHost, origin_port := DestPort } = TunnelInfo, -%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState! -%% Otherwise we can't do the TLS events. #{ tls_opts := TLSOpts, timeout := TLSTimeout - } = HandshakeEvent, + } = HandshakeEvent0, + HandshakeEvent = HandshakeEvent0#{socket => OriginSocket}, + EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0), {ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort, TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect, {handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}), {tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy, - tls_origin_socket=OriginSocket}} + tls_origin_socket=OriginSocket}, EvHandlerState} end. %% When we receive data we pass it forward directly for TCP; %% or we decrypt it and pass it via handle_continue for TLS. -handle(Data, State=#tunnel_state{transport=gun_tcp_proxy, +handle(Data, State0=#tunnel_state{transport=gun_tcp_proxy, protocol=Proto, protocol_state=ProtoState0}, EvHandler, EvHandlerState0) -> - {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}; + {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}; handle(Data, State=#tunnel_state{transport=gun_tls_proxy, socket=ProxyPid, tls_origin_socket=OriginSocket}, _EvHandler, EvHandlerState) -> @@ -170,15 +177,19 @@ handle(Data, State=#tunnel_state{transport=gun_tls_proxy, handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, {handle_continue, _, HandshakeEvent, Protocols}}, State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts}, - _EvHandler, EvHandlerState0) + EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> #{reply_to := ReplyTo} = HandshakeEvent, NewProtocol = gun_protocols:negotiated(Negotiated, Protocols), {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), -% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -% socket => Socket, -% protocol => NewProtocol -% }, EvHandlerState0), + EvHandlerState1 = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => ProxyPid, + protocol => NewProtocol + }, EvHandlerState0), + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => NewProtocol + }, EvHandlerState1), %% @todo Terminate the current protocol or something? OriginSocket = #{ gun_pid => self(), @@ -188,15 +199,15 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}), ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, - {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0}; + {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState}; handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, - {handle_continue, _, _HandshakeEvent, _}}, - #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) + {handle_continue, _, HandshakeEvent, _}}, + #tunnel_state{socket=ProxyPid}, EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> -%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -%% error => Reason -%% }, EvHandlerState0), -%%% @todo + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), +%% @todo %% The TCP connection can be closed by either peer. The END_STREAM flag %% on a DATA frame is treated as being equivalent to the TCP FIN bit. A %% client is expected to send a DATA frame with the END_STREAM flag set @@ -206,18 +217,19 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, %% receives a TCP segment with the FIN bit set sends a DATA frame with %% the END_STREAM flag set. Note that the final TCP segment or DATA %% frame could be empty. - {{error, Reason}, EvHandlerState0}; + {{error, Reason}, EvHandlerState}; %% Send the data. This causes TLS to encrypt the data and send it to the inner layer. handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data}, #tunnel_state{}, _EvHandler, EvHandlerState) when is_reference(ContinueStreamRef) -> {{send, IsFin, Data}, EvHandlerState}; handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data}, - State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, + State0=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> - {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}; + {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}; handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid}, #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> @@ -233,21 +245,24 @@ handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, Reason}, %% %% @todo Assert StreamRef to be our reference(). handle_continue([_StreamRef|ContinueStreamRef0], Msg, - State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, EvHandler, EvHandlerState0) -> ContinueStreamRef = case ContinueStreamRef0 of [CSR] -> CSR; _ -> ContinueStreamRef0 end, - {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef, + {Commands, EvHandlerState1} = Proto:handle_continue(ContinueStreamRef, Msg, ProtoState, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}. + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. -update_flow(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +%% @todo This function will need EvHandler/EvHandlerState? +update_flow(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, ReplyTo, StreamRef0, Inc) -> - StreamRef = maybe_dereference(State, StreamRef0), + StreamRef = maybe_dereference(State0, StreamRef0), Commands = Proto:update_flow(ProtoState, ReplyTo, StreamRef, Inc), - {state, commands(Commands, State)}. + {State, undefined} = commands(Commands, State0, undefined, undefined), + {state, State}. closing(_Reason, _State, _EvHandler, EvHandlerState) -> %% @todo Graceful shutdown must be propagated to tunnels. @@ -312,17 +327,20 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, %% We pass the CONNECT request forward and optionally dereference StreamRef. connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, protocol=Proto, protocol_state=ProtoState0}, - StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow) -> + StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow, + EvHandler, EvHandlerState0) -> StreamRef = maybe_dereference(State, StreamRef0), - ProtoState = Proto:connect(ProtoState0, StreamRef, - ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow), - State#tunnel_state{protocol_state=ProtoState}. + {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef, + ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, + EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. -cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> - StreamRef = maybe_dereference(State, StreamRef0), - {Commands, EvHandlerState} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}. + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) -> case Proto:timeout(ProtoState0, Msg, TRef) of @@ -395,27 +413,71 @@ down(_State) -> %% @todo Tunnels must be included in the gun_down message. []. +ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, _, _, Path, Headers, WsOpts, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State, StreamRef0), + #{ + origin_host := Host, + origin_port := Port + } = TunnelInfo, + {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + +ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, EvHandlerState1} = Proto:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. + %% Internal. -commands(Command, State) when not is_list(Command) -> - commands([Command], State); -commands([], State) -> - State; -commands([{state, ProtoState}|Tail], State) -> - commands(Tail, State#tunnel_state{protocol_state=ProtoState}); +commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) -> + commands([Command], State, EvHandler, EvHandlerState); +commands([], State, _, EvHandlerState) -> + {State, EvHandlerState}; +commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) -> + commands(Tail, State#tunnel_state{protocol_state=ProtoState}, EvHandler, EvHandlerState); %% @todo We must pass down the set_cookie commands. Have a commands_queue. -commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) -> - commands(Tail, State); +commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}, EvHandler, EvHandlerState) -> + commands(Tail, State, EvHandler, EvHandlerState); %% @todo What to do about IsFin? -commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) -> +commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}, + EvHandler, EvHandlerState) -> Transport:send(Socket, Data), - commands(Tail, State); -commands([Origin={origin, _Scheme, _NewHost, _NewPort, _Type}|Tail], State) -> - commands(Tail, State#tunnel_state{protocol_origin=Origin}); + commands(Tail, State, EvHandler, EvHandlerState); +commands([Origin={origin, Scheme, Host, Port, Type}|Tail], + State=#tunnel_state{stream_ref=StreamRef}, + EvHandler, EvHandlerState0) -> + EvHandlerState = EvHandler:origin_changed(#{ + stream_ref => StreamRef, + type => Type, + origin_scheme => Scheme, + origin_host => Host, + origin_port => Port + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol_origin=Origin}, EvHandler, EvHandlerState); +commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], + State=#tunnel_state{socket=Socket, transport=Transport, opts=Opts, + protocol_origin=undefined}, + EvHandler, EvHandlerState0) -> + {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), + %% This should only apply to Websocket for the time being. + {connected_ws_only, ProtoState} = Proto:init(ReplyTo, Socket, Transport, ProtoOpts), + #{stream_ref := StreamRef} = ProtoOpts, + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, stream_ref=TunnelStreamRef, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, - protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) -> + protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}, + EvHandler, EvHandlerState0) -> StreamRef = case Type of socks5 -> TunnelStreamRef; connect -> gun_protocols:stream_ref(NewProtocol) @@ -450,13 +512,15 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); + {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, + OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, - protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) -> + protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}, + EvHandler, EvHandlerState0) -> #{ stream_ref := StreamRef, tls_opts := TLSOpts0 @@ -496,10 +560,12 @@ commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); -commands([{active, true}|Tail], State) -> - commands(Tail, State). + {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, + OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); +commands([{active, true}|Tail], State, EvHandler, EvHandlerState) -> + commands(Tail, State, EvHandler, EvHandlerState). continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) -> if |