From 356bf47edeb5b78765200e78d9b7a48aa98b97f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 16 Oct 2020 11:33:31 +0200 Subject: Add or fix events inside or related to CONNECT tunnels --- src/gun_http2.erl | 179 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 133 insertions(+), 46 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1072aca..8312954 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -30,12 +30,13 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([down/1]). -%-export([ws_upgrade/10]). +-export([ws_upgrade/10]). +-export([ws_send/6]). -record(tunnel, { %% The tunnel can either go requested->established @@ -274,7 +275,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise -> push_promise_start end, EvHandler:EvCallback(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0); %% Trailers or invalid header frame. @@ -383,7 +384,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {State1, EvHandlerState1}; @@ -399,7 +400,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {update_window(State1, StreamID), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {update_window(State1), EvHandlerState1} @@ -421,11 +422,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, tunnel=Tunnel } = Stream, State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]}, + RealStreamRef = stream_ref(State, StreamRef), if Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, EvHandlerState = EvHandler:response_inform(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -438,16 +440,23 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, origin_host => DestHost, origin_port => DestPort }, - %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo. - %% We therefore do not need to call stream_ref/2. - RealStreamRef = stream_ref(State, StreamRef), ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, - EvHandlerState = EvHandler:response_headers(#{ + EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), + EvHandlerState2 = EvHandler:origin_changed(#{ + stream_ref => RealStreamRef, + type => connect, + origin_scheme => case Destination of + #{transport := tls} -> <<"https">>; + _ -> <<"http">> + end, + origin_host => DestHost, + origin_port => DestPort + }, EvHandlerState1), ContinueStreamRef = continue_stream_ref(State, StreamRef), OriginSocket = #{ gun_pid => self(), @@ -505,14 +514,15 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, } } end, - {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), + {tunnel, ProtoState, EvHandlerState} = Proto:init( + ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{ info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), EvHandlerState}; true -> - ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -520,12 +530,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, {Handlers, EvHandlerState} = case IsFin of fin -> EvHandlerState2 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1), {undefined, EvHandlerState2}; nofin -> - {gun_content_handler:init(ReplyTo, StreamRef, + {gun_content_handler:init(ReplyTo, RealStreamRef, Status, Headers, Handlers0), EvHandlerState1} end, %% @todo Disable the tunnel if any. @@ -537,9 +547,10 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, ResponseEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), @@ -552,7 +563,7 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef), {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => remote, reason => Reason @@ -571,9 +582,10 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), + RealPromisedStreamRef = stream_ref(State, PromisedStreamRef), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), PushPromiseEvent0 = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, method => Method, uri => URI, @@ -581,8 +593,9 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, }, PushPromiseEvent = case Status of connected -> - ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers}, - PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), + RealPromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef}; _ -> PushPromiseEvent0 end, @@ -744,16 +757,16 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt Transport:send(Socket, cow_http2:ping(0)), {State, EvHandlerState}. -%% @todo tunnel headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> + Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), Authority = maps:get(authority, PseudoHeaders), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, @@ -769,7 +782,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path}, - {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}. + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}; +%% Tunneled request. +headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, + Path, Headers, InitialFlow, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + %% @todo We should send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + origin_host := OriginHost, origin_port := OriginPort}}} -> + {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef, + ReplyTo, Method, OriginHost, OriginPort, Path, Headers, + InitialFlow, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} + end. request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, @@ -781,8 +813,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State0, Method, Host, Port, Path, Headers1), Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = stream_ref(State0, StreamRef), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, @@ -806,7 +839,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, case IsFin of fin -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, {State, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; @@ -814,13 +847,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState) end; %% Tunneled request. -%% -%% We call Proto:request in a loop until we get to a non-CONNECT stream. -%% When the transport is gun_tls_proxy we receive the TLS data -%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast -%% directly. The 'data' cast contains the tunnel for the StreamRef. -%% The tunnel is given as the socket and the gun_tls_proxy out_socket -%% is always a gun_tcp_proxy that sends a 'data' cast. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of @@ -937,7 +963,7 @@ send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> fin -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState0) @@ -978,7 +1004,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, - Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0) + Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0, + EvHandler, EvHandlerState0) when is_reference(StreamRef) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); @@ -1004,37 +1031,59 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, Headers1 end, {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => connect, + method => <<"CONNECT">>, + authority => Authority, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, - create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream); + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + EvHandlerState}; %% Tunneled request. -connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) -> +connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, + EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo Should we send an error to the user if the stream isn't ready. Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - ProtoState = Proto:connect(ProtoState0, RealStreamRef, - ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow), - store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}); + {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef, + ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, + EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - State; + {State, EvHandlerState0}; error -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef, ReplyTo, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => local, reason => cancel @@ -1044,6 +1093,22 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} + end; +%% Tunneled request. +cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef, + ReplyTo, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) -> @@ -1123,6 +1188,28 @@ stream_info(State, RealStreamRef=[StreamRef|_]) -> down(#http2_state{stream_refs=Refs}) -> maps:keys(Refs). +%% Websocket upgrades are currently only accepted when tunneled. +ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, RealStreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{ + tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), EvHandlerState} + %% @todo Error conditions? + end. + +ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State0, StreamRef) of + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} -> + {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState, + RealStreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState} + %% @todo Error conditions? + end. + connection_error(#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, streams=Streams}, Error={connection_error, Reason, HumanReadable}) -> -- cgit v1.2.3