From 356bf47edeb5b78765200e78d9b7a48aa98b97f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 16 Oct 2020 11:33:31 +0200 Subject: Add or fix events inside or related to CONNECT tunnels --- src/gun.erl | 64 ++- src/gun_event.erl | 52 +-- src/gun_http.erl | 81 ++-- src/gun_http2.erl | 179 ++++++-- src/gun_tunnel.erl | 196 ++++++--- src/gun_ws.erl | 4 + test/event_SUITE.erl | 1142 +++++++++++++++++++++++++++++++++++++++++++++--- test/rfc7231_SUITE.erl | 5 +- test/rfc7540_SUITE.erl | 5 +- 9 files changed, 1478 insertions(+), 250 deletions(-) diff --git a/src/gun.erl b/src/gun.erl index 217133f..62abd6f 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -93,6 +93,7 @@ -export([ws_upgrade/3]). -export([ws_upgrade/4]). -export([ws_send/2]). +-export([ws_send/3]). %% Internals. -export([start_link/4]). @@ -274,7 +275,8 @@ keepalive => timeout(), protocols => [{binary(), module()}], reply_to => pid(), - silence_pings => boolean() + silence_pings => boolean(), + tunnel => stream_ref() }. -export_type([ws_opts/0]). @@ -926,20 +928,26 @@ ws_upgrade(ServerPid, Path, Headers) -> StreamRef. -spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref(). -ws_upgrade(ServerPid, Path, Headers, Opts) -> +ws_upgrade(ServerPid, Path, Headers, Opts0) -> + Tunnel = get_tunnel(Opts0), + Opts = maps:without([tunnel], Opts0), ok = gun_ws:check_options(Opts), - StreamRef = make_ref(), + StreamRef = make_stream_ref(Tunnel), ReplyTo = maps:get(reply_to, Opts, self()), %% @todo Also accept tunnel option. gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, normalize_headers(Headers), Opts}), StreamRef. %% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. -%% But it can be kept for the time being since it can still work for HTTP/1.1. +%% But it can be kept for the time being since it can still work for HTTP/1.1 (connected_ws_only). -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> gen_statem:cast(ServerPid, {ws_send, self(), Frames}). +-spec ws_send(pid(), stream_ref(), ws_frame() | [ws_frame()]) -> ok. +ws_send(ServerPid, StreamRef, Frames) -> + gen_statem:cast(ServerPid, {ws_send, self(), StreamRef, Frames}). + %% Internals. callback_mode() -> state_functions. @@ -1208,6 +1216,12 @@ connected_data_only(cast, Msg, _) connected_data_only(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). +connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ + protocol=Protocol=gun_ws, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{ protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -1259,13 +1273,13 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState}) -> - %% @todo No events are currently handled for the CONNECT request? - ProtoState2 = Protocol:connect(ProtoState, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Destination, #{host => Host, port => Port}, - Headers, InitialFlow), - {keep_state, State#state{protocol_state=ProtoState2}}; + Headers, InitialFlow, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1275,8 +1289,7 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{op connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts}, State0=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) - when Protocol =:= gun_http -> + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> EvHandlerState1 = EvHandler:ws_upgrade(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -1285,14 +1298,19 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts}, %% @todo Can fail if HTTP/1.0. {Headers, State} = add_cookie_header(Path, Headers0, State0), {ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState, - StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts, - EvHandler, EvHandlerState1), + dereference_stream_ref(StreamRef, State), ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, - "Websocket is only supported over HTTP/1.1."}}, - keep_state_and_data; +%% @todo Maybe better standardize the protocol callbacks argument orders. +connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) + when is_list(StreamRef) -> + {Commands, EvHandlerState} = Protocol:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); +%% Catch-all for the StreamRef-free variant. connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " @@ -1472,7 +1490,7 @@ handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), + dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _, State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) -> @@ -1721,7 +1739,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{ _ -> ProtoOpts0#{tunnel_transport => tcp} end, {StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts), - EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + ProtocolChangedEvent = case ProtoOpts of + #{stream_ref := StreamRef} -> + #{stream_ref => StreamRef, protocol => Protocol:name()}; + _ -> + #{protocol => Protocol:name()} + end, + EvHandlerState = EvHandler:protocol_changed(ProtocolChangedEvent, EvHandlerState0), %% We cancel the existing keepalive and, depending on the protocol, %% we enable keepalive again, effectively resetting the timer. State = keepalive_cancel(active(State0#state{protocol=Protocol, protocol_state=ProtoState, @@ -1755,6 +1779,8 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, KilledStreams = Protocol:down(ProtoState), Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams}, Retry = maps:get(retry, Opts, 5), + %% @todo We need to reset the origin_scheme/host/port and the transport + %% as well as remove the intermediaries. {next_state, not_connected, keepalive_cancel(State#state{socket=undefined, protocol=undefined, protocol_state=undefined}), diff --git a/src/gun_event.erl b/src/gun_event.erl index b2a71db..513fa9f 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -62,10 +62,10 @@ %% These events occur when connecting to a TLS server or when %% upgrading the connection or stream to use TLS, for example %% using CONNECT. The stream_ref/reply_to values are only -%% present when the TLS handshake occurs as a result of a request. +%% present when the TLS handshake occurs in the scope of a request. -type tls_handshake_event() :: #{ - stream_ref => reference(), + stream_ref => gun:stream_ref(), reply_to => pid(), socket := inet:socket() | ssl:sslsocket() | pid(), %% The socket before/after will be different. tls_opts := [ssl:tls_client_option()], @@ -81,13 +81,13 @@ %% request_start/request_headers. -type request_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), function := headers | request | ws_upgrade, method := iodata(), scheme => binary(), authority := iodata(), - path := iodata(), + path => iodata(), headers := [{binary(), iodata()}] }. -export_type([request_start_event/0]). @@ -98,7 +98,7 @@ %% request_end. -type request_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([request_end_event/0]). @@ -108,7 +108,7 @@ %% push_promise_start. -type push_promise_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([push_promise_start_event/0]). @@ -118,12 +118,12 @@ %% push_promise_end. -type push_promise_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), %% No stream is created if we receive the push_promise while %% in the process of gracefully shutting down the connection. %% The promised stream is canceled immediately. - promised_stream_ref => reference(), + promised_stream_ref => gun:stream_ref(), method := binary(), uri := binary(), headers := [{binary(), iodata()}] @@ -135,7 +135,7 @@ %% response_start. -type response_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([response_start_event/0]). @@ -145,7 +145,7 @@ %% response_inform/response_headers. -type response_headers_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), status := non_neg_integer(), headers := [{binary(), binary()}] @@ -158,7 +158,7 @@ %% response_trailers. -type response_trailers_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), headers := [{binary(), binary()}] }. @@ -169,7 +169,7 @@ %% response_end. -type response_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([response_end_event/0]). @@ -186,7 +186,7 @@ %% response. -type ws_upgrade_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), opts := gun:ws_opts() }. @@ -197,7 +197,7 @@ %% ws_recv_frame_start. -type ws_recv_frame_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), frag_state := cow_ws:frag_state(), extensions := cow_ws:extensions() @@ -209,7 +209,7 @@ %% ws_recv_frame_header. -type ws_recv_frame_header_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), frag_state := cow_ws:frag_state(), extensions := cow_ws:extensions(), @@ -225,7 +225,7 @@ %% ws_recv_frame_end. -type ws_recv_frame_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), extensions := cow_ws:extensions(), close_code := undefined | cow_ws:close_code(), @@ -238,7 +238,7 @@ %% ws_send_frame_start/ws_send_frame_end. -type ws_send_frame_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), extensions := cow_ws:extensions(), frame := gun:ws_frame() @@ -251,13 +251,10 @@ %% protocol_changed. %% %% This event can occur either following a successful ws_upgrade -%% event or following a successful CONNECT request. -%% -%% @todo Currently there is only a connection-wide variant of this -%% event. In the future there will be a stream-wide variant to -%% support CONNECT and Websocket over HTTP/2. +%% event, following a successful CONNECT request or a SOCKS tunnel. -type protocol_changed_event() :: #{ + stream_ref := gun:stream_ref(), protocol := http | http2 | socks | ws }. -export_type([protocol_changed_event/0]). @@ -268,9 +265,11 @@ %% %% This event can occur following a successful CONNECT request. %% -%% @todo Currently there is only a connection-wide variant of this -%% event. In the future there will be a stream-wide variant to -%% support CONNECT through TLS proxies over HTTP/2. +%% @todo I think this event should be removed. We already know +%% about the transport being TLS via the tls_handshake events. +%% Perhaps we should provide the socket in tls_handshake_end. +%% We already do!! Therefore what's the point of this event? +%% Remove it!! -type transport_changed_event() :: #{ socket := ssl:sslsocket() | pid(), @@ -283,6 +282,7 @@ %% origin_changed. -type origin_changed_event() :: #{ + stream_ref := gun:stream_ref(), type := connect, %% @todo socks? origin_scheme := binary(), origin_host := inet:hostname() | inet:ip_address(), @@ -303,7 +303,7 @@ %% Events may still occur for a short time after the cancel. -type cancel_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), endpoint := local | remote, reason := atom() diff --git a/src/gun_http.erl b/src/gun_http.erl index e2c0f1d..d877068 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -29,7 +29,7 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -195,7 +195,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {nofin, EvHandlerState0}; no_trailers -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {fin, EvHandlerState1} @@ -218,7 +218,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {nofin, EvHandlerState0}; no_trailers -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {fin, EvHandlerState1} @@ -243,9 +243,10 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, ResponseEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), @@ -272,7 +273,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, %% We ignore the active command because the stream ended. [{state, State1}|_] = send_data(Data, State, fin), EvHandlerState = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), case Conn of @@ -287,7 +288,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, %% We ignore the active command because the stream ended. [{state, State1}|_] = send_data(Body, State, fin), EvHandlerState = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), case Conn of @@ -396,15 +397,15 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandler, EvHandlerState0, Version, Status, Headers) -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, + RealStreamRef = stream_ref(State, StreamRef), %% @todo Figure out whether the event should trigger if the stream was cancelled. {Handlers, EvHandlerState2} = case IsAlive of false -> {undefined, EvHandlerState0}; true -> - ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), - IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -413,7 +414,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec fin -> {undefined, EvHandlerState1}; nofin -> Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]), - {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef), + {gun_content_handler:init(ReplyTo, RealStreamRef, Status, Headers, Handlers0), EvHandlerState1} end end, @@ -422,7 +423,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandlerState2; fin -> EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState2) end, @@ -509,7 +510,7 @@ close(Reason, State=#http_state{in=body_close, close_streams(State, Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState); close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) -> @@ -601,8 +602,9 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi {undefined, _} -> Headers4; _ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4] end, + RealStreamRef = stream_ref(State, StreamRef), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, function => Function, method => Method, @@ -618,7 +620,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi EvHandlerState = case Out of head -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState2); @@ -672,7 +674,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, ]) end, RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), @@ -686,7 +688,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, if Length2 =:= 0, IsFin =:= fin -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), @@ -702,17 +704,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. -connect(State, StreamRef, ReplyTo, _, _, _, _) +connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - State; -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] -> + {State, EvHandlerState}; +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) + when Streams =/= [] -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, - State; + {State, EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, - StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0) -> + StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, + EvHandler, EvHandlerState0) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -736,12 +740,29 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version Headers2 end, Headers = transform_header_names(State, Headers3), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => connect, + method => <<"CONNECT">>, + authority => Authority, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), Transport:send(Socket, [ cow_http:request(<<"CONNECT">>, Authority, Version, Headers) ]), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), - new_stream(State, {connect, StreamRef, Destination}, ReplyTo, - <<"CONNECT">>, Authority, <<>>, InitialFlow). + {new_stream(State, {connect, StreamRef, Destination}, ReplyTo, + <<"CONNECT">>, Authority, <<>>, InitialFlow), + EvHandlerState}. %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> @@ -749,7 +770,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> true -> State = cancel_stream(State0, StreamRef), EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => local, reason => cancel @@ -876,6 +897,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. +ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) + when is_list(StreamRef) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}, + {State, EvHandlerState}; ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, @@ -986,9 +1012,10 @@ ws_handshake_end(Buffer, self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, headers => Headers, extensions => Extensions, flow => InitialFlow, diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1072aca..8312954 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -30,12 +30,13 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([down/1]). -%-export([ws_upgrade/10]). +-export([ws_upgrade/10]). +-export([ws_send/6]). -record(tunnel, { %% The tunnel can either go requested->established @@ -274,7 +275,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise -> push_promise_start end, EvHandler:EvCallback(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0); %% Trailers or invalid header frame. @@ -383,7 +384,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {State1, EvHandlerState1}; @@ -399,7 +400,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {update_window(State1, StreamID), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {update_window(State1), EvHandlerState1} @@ -421,11 +422,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, tunnel=Tunnel } = Stream, State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]}, + RealStreamRef = stream_ref(State, StreamRef), if Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, EvHandlerState = EvHandler:response_inform(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -438,16 +440,23 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, origin_host => DestHost, origin_port => DestPort }, - %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo. - %% We therefore do not need to call stream_ref/2. - RealStreamRef = stream_ref(State, StreamRef), ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, - EvHandlerState = EvHandler:response_headers(#{ + EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), + EvHandlerState2 = EvHandler:origin_changed(#{ + stream_ref => RealStreamRef, + type => connect, + origin_scheme => case Destination of + #{transport := tls} -> <<"https">>; + _ -> <<"http">> + end, + origin_host => DestHost, + origin_port => DestPort + }, EvHandlerState1), ContinueStreamRef = continue_stream_ref(State, StreamRef), OriginSocket = #{ gun_pid => self(), @@ -505,14 +514,15 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, } } end, - {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), + {tunnel, ProtoState, EvHandlerState} = Proto:init( + ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{ info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), EvHandlerState}; true -> - ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -520,12 +530,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, {Handlers, EvHandlerState} = case IsFin of fin -> EvHandlerState2 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1), {undefined, EvHandlerState2}; nofin -> - {gun_content_handler:init(ReplyTo, StreamRef, + {gun_content_handler:init(ReplyTo, RealStreamRef, Status, Headers, Handlers0), EvHandlerState1} end, %% @todo Disable the tunnel if any. @@ -537,9 +547,10 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, ResponseEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), @@ -552,7 +563,7 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef), {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => remote, reason => Reason @@ -571,9 +582,10 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), + RealPromisedStreamRef = stream_ref(State, PromisedStreamRef), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), PushPromiseEvent0 = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, method => Method, uri => URI, @@ -581,8 +593,9 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, }, PushPromiseEvent = case Status of connected -> - ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers}, - PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), + RealPromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef}; _ -> PushPromiseEvent0 end, @@ -744,16 +757,16 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt Transport:send(Socket, cow_http2:ping(0)), {State, EvHandlerState}. -%% @todo tunnel headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> + Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), Authority = maps:get(authority, PseudoHeaders), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, @@ -769,7 +782,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path}, - {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}. + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}; +%% Tunneled request. +headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, + Path, Headers, InitialFlow, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + %% @todo We should send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + origin_host := OriginHost, origin_port := OriginPort}}} -> + {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef, + ReplyTo, Method, OriginHost, OriginPort, Path, Headers, + InitialFlow, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} + end. request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, @@ -781,8 +813,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State0, Method, Host, Port, Path, Headers1), Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = stream_ref(State0, StreamRef), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, @@ -806,7 +839,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, case IsFin of fin -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, {State, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; @@ -814,13 +847,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState) end; %% Tunneled request. -%% -%% We call Proto:request in a loop until we get to a non-CONNECT stream. -%% When the transport is gun_tls_proxy we receive the TLS data -%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast -%% directly. The 'data' cast contains the tunnel for the StreamRef. -%% The tunnel is given as the socket and the gun_tls_proxy out_socket -%% is always a gun_tcp_proxy that sends a 'data' cast. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of @@ -937,7 +963,7 @@ send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> fin -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState0) @@ -978,7 +1004,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, - Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0) + Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0, + EvHandler, EvHandlerState0) when is_reference(StreamRef) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); @@ -1004,37 +1031,59 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, Headers1 end, {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => connect, + method => <<"CONNECT">>, + authority => Authority, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, - create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream); + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + EvHandlerState}; %% Tunneled request. -connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) -> +connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, + EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo Should we send an error to the user if the stream isn't ready. Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - ProtoState = Proto:connect(ProtoState0, RealStreamRef, - ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow), - store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}); + {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef, + ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, + EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - State; + {State, EvHandlerState0}; error -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef, ReplyTo, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => local, reason => cancel @@ -1044,6 +1093,22 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} + end; +%% Tunneled request. +cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef, + ReplyTo, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) -> @@ -1123,6 +1188,28 @@ stream_info(State, RealStreamRef=[StreamRef|_]) -> down(#http2_state{stream_refs=Refs}) -> maps:keys(Refs). +%% Websocket upgrades are currently only accepted when tunneled. +ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, RealStreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{ + tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), EvHandlerState} + %% @todo Error conditions? + end. + +ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State0, StreamRef) of + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} -> + {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState, + RealStreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState} + %% @todo Error conditions? + end. + connection_error(#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, streams=Streams}, Error={connection_error, Reason, HumanReadable}) -> diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index ca9d3aa..cc58351 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -17,7 +17,7 @@ %% by the tunnel layer. -module(gun_tunnel). --export([init/4]). +-export([init/6]). -export([handle/4]). -export([handle_continue/5]). -export([update_flow/4]). @@ -27,13 +27,14 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([tunneled_name/2]). -export([down/1]). -%-export([ws_upgrade/10]). +-export([ws_upgrade/10]). +-export([ws_send/6]). -record(tunnel_state, { %% Fake socket and transport. @@ -97,7 +98,8 @@ %% with some extra information added for the tunnel. %% %% @todo Mark the tunnel options as reserved. -init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) -> +init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}, + EvHandler, EvHandlerState0) -> #{ type := TunnelType, transport_name := TunnelTransport, @@ -113,7 +115,10 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), %% When the tunnel protocol is HTTP/1.1 or SOCKS %% the gun_tunnel_up message was already sent. %% @@ -124,34 +129,36 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()} end, {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, - protocol=Proto, protocol_state=ProtoState}}; + protocol=Proto, protocol_state=ProtoState}, + EvHandlerState}; %% We can't initialize the protocol until the TLS handshake has completed. - #{handshake_event := HandshakeEvent, protocols := Protocols} -> + #{handshake_event := HandshakeEvent0, protocols := Protocols} -> #{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket, #{ origin_host := DestHost, origin_port := DestPort } = TunnelInfo, -%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState! -%% Otherwise we can't do the TLS events. #{ tls_opts := TLSOpts, timeout := TLSTimeout - } = HandshakeEvent, + } = HandshakeEvent0, + HandshakeEvent = HandshakeEvent0#{socket => OriginSocket}, + EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0), {ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort, TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect, {handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}), {tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy, - tls_origin_socket=OriginSocket}} + tls_origin_socket=OriginSocket}, EvHandlerState} end. %% When we receive data we pass it forward directly for TCP; %% or we decrypt it and pass it via handle_continue for TLS. -handle(Data, State=#tunnel_state{transport=gun_tcp_proxy, +handle(Data, State0=#tunnel_state{transport=gun_tcp_proxy, protocol=Proto, protocol_state=ProtoState0}, EvHandler, EvHandlerState0) -> - {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}; + {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}; handle(Data, State=#tunnel_state{transport=gun_tls_proxy, socket=ProxyPid, tls_origin_socket=OriginSocket}, _EvHandler, EvHandlerState) -> @@ -170,15 +177,19 @@ handle(Data, State=#tunnel_state{transport=gun_tls_proxy, handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, {handle_continue, _, HandshakeEvent, Protocols}}, State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts}, - _EvHandler, EvHandlerState0) + EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> #{reply_to := ReplyTo} = HandshakeEvent, NewProtocol = gun_protocols:negotiated(Negotiated, Protocols), {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), -% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -% socket => Socket, -% protocol => NewProtocol -% }, EvHandlerState0), + EvHandlerState1 = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => ProxyPid, + protocol => NewProtocol + }, EvHandlerState0), + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => NewProtocol + }, EvHandlerState1), %% @todo Terminate the current protocol or something? OriginSocket = #{ gun_pid => self(), @@ -188,15 +199,15 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}), ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, - {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0}; + {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState}; handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, - {handle_continue, _, _HandshakeEvent, _}}, - #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) + {handle_continue, _, HandshakeEvent, _}}, + #tunnel_state{socket=ProxyPid}, EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> -%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -%% error => Reason -%% }, EvHandlerState0), -%%% @todo + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), +%% @todo %% The TCP connection can be closed by either peer. The END_STREAM flag %% on a DATA frame is treated as being equivalent to the TCP FIN bit. A %% client is expected to send a DATA frame with the END_STREAM flag set @@ -206,18 +217,19 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, %% receives a TCP segment with the FIN bit set sends a DATA frame with %% the END_STREAM flag set. Note that the final TCP segment or DATA %% frame could be empty. - {{error, Reason}, EvHandlerState0}; + {{error, Reason}, EvHandlerState}; %% Send the data. This causes TLS to encrypt the data and send it to the inner layer. handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data}, #tunnel_state{}, _EvHandler, EvHandlerState) when is_reference(ContinueStreamRef) -> {{send, IsFin, Data}, EvHandlerState}; handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data}, - State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, + State0=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> - {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}; + {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}; handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid}, #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> @@ -233,21 +245,24 @@ handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, Reason}, %% %% @todo Assert StreamRef to be our reference(). handle_continue([_StreamRef|ContinueStreamRef0], Msg, - State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, EvHandler, EvHandlerState0) -> ContinueStreamRef = case ContinueStreamRef0 of [CSR] -> CSR; _ -> ContinueStreamRef0 end, - {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef, + {Commands, EvHandlerState1} = Proto:handle_continue(ContinueStreamRef, Msg, ProtoState, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}. + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. -update_flow(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +%% @todo This function will need EvHandler/EvHandlerState? +update_flow(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, ReplyTo, StreamRef0, Inc) -> - StreamRef = maybe_dereference(State, StreamRef0), + StreamRef = maybe_dereference(State0, StreamRef0), Commands = Proto:update_flow(ProtoState, ReplyTo, StreamRef, Inc), - {state, commands(Commands, State)}. + {State, undefined} = commands(Commands, State0, undefined, undefined), + {state, State}. closing(_Reason, _State, _EvHandler, EvHandlerState) -> %% @todo Graceful shutdown must be propagated to tunnels. @@ -312,17 +327,20 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, %% We pass the CONNECT request forward and optionally dereference StreamRef. connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, protocol=Proto, protocol_state=ProtoState0}, - StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow) -> + StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow, + EvHandler, EvHandlerState0) -> StreamRef = maybe_dereference(State, StreamRef0), - ProtoState = Proto:connect(ProtoState0, StreamRef, - ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow), - State#tunnel_state{protocol_state=ProtoState}. + {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef, + ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, + EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. -cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> - StreamRef = maybe_dereference(State, StreamRef0), - {Commands, EvHandlerState} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}. + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) -> case Proto:timeout(ProtoState0, Msg, TRef) of @@ -395,27 +413,71 @@ down(_State) -> %% @todo Tunnels must be included in the gun_down message. []. +ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, _, _, Path, Headers, WsOpts, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State, StreamRef0), + #{ + origin_host := Host, + origin_port := Port + } = TunnelInfo, + {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + +ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, EvHandlerState1} = Proto:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. + %% Internal. -commands(Command, State) when not is_list(Command) -> - commands([Command], State); -commands([], State) -> - State; -commands([{state, ProtoState}|Tail], State) -> - commands(Tail, State#tunnel_state{protocol_state=ProtoState}); +commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) -> + commands([Command], State, EvHandler, EvHandlerState); +commands([], State, _, EvHandlerState) -> + {State, EvHandlerState}; +commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) -> + commands(Tail, State#tunnel_state{protocol_state=ProtoState}, EvHandler, EvHandlerState); %% @todo We must pass down the set_cookie commands. Have a commands_queue. -commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) -> - commands(Tail, State); +commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}, EvHandler, EvHandlerState) -> + commands(Tail, State, EvHandler, EvHandlerState); %% @todo What to do about IsFin? -commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) -> +commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}, + EvHandler, EvHandlerState) -> Transport:send(Socket, Data), - commands(Tail, State); -commands([Origin={origin, _Scheme, _NewHost, _NewPort, _Type}|Tail], State) -> - commands(Tail, State#tunnel_state{protocol_origin=Origin}); + commands(Tail, State, EvHandler, EvHandlerState); +commands([Origin={origin, Scheme, Host, Port, Type}|Tail], + State=#tunnel_state{stream_ref=StreamRef}, + EvHandler, EvHandlerState0) -> + EvHandlerState = EvHandler:origin_changed(#{ + stream_ref => StreamRef, + type => Type, + origin_scheme => Scheme, + origin_host => Host, + origin_port => Port + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol_origin=Origin}, EvHandler, EvHandlerState); +commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], + State=#tunnel_state{socket=Socket, transport=Transport, opts=Opts, + protocol_origin=undefined}, + EvHandler, EvHandlerState0) -> + {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), + %% This should only apply to Websocket for the time being. + {connected_ws_only, ProtoState} = Proto:init(ReplyTo, Socket, Transport, ProtoOpts), + #{stream_ref := StreamRef} = ProtoOpts, + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, stream_ref=TunnelStreamRef, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, - protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) -> + protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}, + EvHandler, EvHandlerState0) -> StreamRef = case Type of socks5 -> TunnelStreamRef; connect -> gun_protocols:stream_ref(NewProtocol) @@ -450,13 +512,15 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); + {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, + OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, - protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) -> + protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}, + EvHandler, EvHandlerState0) -> #{ stream_ref := StreamRef, tls_opts := TLSOpts0 @@ -496,10 +560,12 @@ commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); -commands([{active, true}|Tail], State) -> - commands(Tail, State). + {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, + OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); +commands([{active, true}|Tail], State, EvHandler, EvHandlerState) -> + commands(Tail, State, EvHandler, EvHandlerState). continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) -> if diff --git a/src/gun_ws.erl b/src/gun_ws.erl index f59c19c..cd81d65 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -26,6 +26,7 @@ -export([close/4]). -export([keepalive/3]). -export([ws_send/5]). +-export([ws_send/6]). -export([down/1]). -record(payload, { @@ -289,6 +290,9 @@ ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) -> Other end. +ws_send(Frames, State, _StreamRef, ReplyTo, EvHandler, EvHandlerState) -> + ws_send(Frames, State, ReplyTo, EvHandler, EvHandlerState). + %% Websocket has no concept of streams. down(_) -> []. diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl index 5510e8c..6f2f1fa 100644 --- a/test/event_SUITE.erl +++ b/test/event_SUITE.erl @@ -37,7 +37,7 @@ groups() -> %% We currently do not support Websocket over HTTP/2. WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"], [ - {http, [parallel], Tests -- [cancel_remote|PushTests]}, + {http, [parallel], Tests -- [cancel_remote, cancel_remote_connect|PushTests]}, {http2, [parallel], (Tests -- WsTests) -- HTTP1Tests} ]. @@ -193,6 +193,8 @@ connect_end_ok_tls(Config) -> false = maps:is_key(protocol, Event), gun:close(Pid). +%% tls_handshake_start/tls_handshake_end. + tls_handshake_start(Config) -> doc("Confirm that the tls_handshake_start event callback is called."), {ok, Pid, _} = do_gun_open_tls(Config), @@ -236,17 +238,19 @@ tls_handshake_end_ok(Config) -> true = is_tuple(Socket), gun:close(Pid). -http1_tls_handshake_start_connect(Config) -> +tls_handshake_start_tcp_connect_tls(Config) -> doc("Confirm that the tls_handshake_start event callback is called " "when using CONNECT to a TLS server via a TCP proxy."), OriginPort = config(tls_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [Protocol], transport => tcp }), - {ok, http} = gun:await_up(ConnPid), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), StreamRef = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, @@ -260,21 +264,26 @@ http1_tls_handshake_start_connect(Config) -> tls_opts := _, timeout := _ } = do_receive_event(tls_handshake_start), - true = is_port(Socket), + true = case Protocol of + http -> is_port(Socket); + http2 -> is_map(Socket) + end, gun:close(ConnPid). -http1_tls_handshake_end_error_connect(Config) -> +tls_handshake_end_error_tcp_connect_tls(Config) -> doc("Confirm that the tls_handshake_end event callback is called on TLS handshake error " "when using CONNECT to a TLS server via a TCP proxy."), %% We use the wrong port on purpose to trigger a handshake error. OriginPort = config(tcp_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [Protocol], transport => tcp }), - {ok, http} = gun:await_up(ConnPid), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), StreamRef = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, @@ -289,20 +298,25 @@ http1_tls_handshake_end_error_connect(Config) -> timeout := _, error := {tls_alert, _} } = do_receive_event(tls_handshake_end), - true = is_port(Socket), + true = case Protocol of + http -> is_port(Socket); + http2 -> is_map(Socket) + end, gun:close(ConnPid). -http1_tls_handshake_end_ok_connect(Config) -> +tls_handshake_end_ok_tcp_connect_tls(Config) -> doc("Confirm that the tls_handshake_end event callback is called on TLS handshake success " "when using CONNECT to a TLS server via a TCP proxy."), OriginPort = config(tls_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [Protocol], transport => tcp }), - {ok, http} = gun:await_up(ConnPid), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), StreamRef = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, @@ -317,20 +331,25 @@ http1_tls_handshake_end_ok_connect(Config) -> timeout := _, protocol := http2 } = do_receive_event(tls_handshake_end), - true = is_tuple(Socket), + true = case Protocol of + http -> is_tuple(Socket); + http2 -> is_pid(Socket) + end, gun:close(ConnPid). -http1_tls_handshake_start_connect_over_https_proxy(Config) -> +tls_handshake_start_tls_connect_tls(Config) -> doc("Confirm that the tls_handshake_start event callback is called " "when using CONNECT to a TLS server via a TLS proxy."), OriginPort = config(tls_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tls), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [Protocol], transport => tls }), - {ok, http} = gun:await_up(ConnPid), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), %% We skip the TLS handshake event to the TLS proxy. _ = do_receive_event(tls_handshake_start), StreamRef = gun:connect(ConnPid, #{ @@ -346,21 +365,26 @@ http1_tls_handshake_start_connect_over_https_proxy(Config) -> tls_opts := _, timeout := _ } = do_receive_event(tls_handshake_start), - true = is_tuple(Socket), + true = case Protocol of + http -> is_tuple(Socket); + http2 -> is_map(Socket) + end, gun:close(ConnPid). -http1_tls_handshake_end_error_connect_over_https_proxy(Config) -> +tls_handshake_end_error_tls_connect_tls(Config) -> doc("Confirm that the tls_handshake_end event callback is called on TLS handshake error " "when using CONNECT to a TLS server via a TLS proxy."), %% We use the wrong port on purpose to trigger a handshake error. OriginPort = config(tcp_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tls), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [Protocol], transport => tls }), - {ok, http} = gun:await_up(ConnPid), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), %% We skip the TLS handshake event to the TLS proxy. _ = do_receive_event(tls_handshake_end), StreamRef = gun:connect(ConnPid, #{ @@ -377,20 +401,25 @@ http1_tls_handshake_end_error_connect_over_https_proxy(Config) -> timeout := _, error := {tls_alert, _} } = do_receive_event(tls_handshake_end), - true = is_tuple(Socket), + true = case Protocol of + http -> is_tuple(Socket); + http2 -> is_map(Socket) + end, gun:close(ConnPid). -http1_tls_handshake_end_ok_connect_over_https_proxy(Config) -> +tls_handshake_end_ok_tls_connect_tls(Config) -> doc("Confirm that the tls_handshake_end event callback is called on TLS handshake success " "when using CONNECT to a TLS server via a TLS proxy."), OriginPort = config(tls_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tls), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [Protocol], transport => tls }), - {ok, http} = gun:await_up(ConnPid), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), %% We skip the TLS handshake event to the TLS proxy. _ = do_receive_event(tls_handshake_end), StreamRef = gun:connect(ConnPid, #{ @@ -410,6 +439,8 @@ http1_tls_handshake_end_ok_connect_over_https_proxy(Config) -> true = is_pid(Socket), gun:close(ConnPid). +%% request_start/request_headers/request_end. + request_start(Config) -> doc("Confirm that the request_start event callback is called."), do_request_event(Config, ?FUNCTION_NAME), @@ -458,6 +489,110 @@ do_request_event_headers(Config, EventName) -> Authority = iolist_to_binary(EventAuthority), gun:close(Pid). +request_start_connect(Config) -> + doc("Confirm that the request_start event callback is called " + "for requests going through a CONNECT proxy."), + do_request_event_connect(Config, request_start), + do_request_event_headers_connect(Config, request_start). + +request_headers_connect(Config) -> + doc("Confirm that the request_headers event callback is called " + "for requests going through a CONNECT proxy."), + do_request_event_connect(Config, request_headers), + do_request_event_headers_connect(Config, request_headers). + +do_request_event_connect(Config, EventName) -> + OriginPort = config(tcp_origin_port, Config), + Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo, + function := connect, + method := <<"CONNECT">>, + authority := EventAuthority1, + headers := Headers1 + } = do_receive_event(EventName), + Authority = iolist_to_binary(EventAuthority1), + %% Gun doesn't send headers with an HTTP/2 CONNECT request + %% so we only check that the headers are given as a list. + true = is_list(Headers1), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + function := request, + method := <<"GET">>, + authority := EventAuthority2, + path := "/", + headers := [_|_] + } = do_receive_event(EventName), + Authority = iolist_to_binary(EventAuthority2), + gun:close(ConnPid). + +do_request_event_headers_connect(Config, EventName) -> + OriginPort = config(tcp_origin_port, Config), + Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo, + function := connect, + method := <<"CONNECT">>, + authority := EventAuthority1, + headers := Headers1 + } = do_receive_event(EventName), + Authority = iolist_to_binary(EventAuthority1), + %% Gun doesn't send headers with an HTTP/2 CONNECT request + %% so we only check that the headers are given as a list. + true = is_list(Headers1), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:put(ConnPid, "/", [ + {<<"content-type">>, <<"text/plain">>} + ], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + function := headers, + method := <<"PUT">>, + authority := EventAuthority2, + path := "/", + headers := [_|_] + } = do_receive_event(EventName), + Authority = iolist_to_binary(EventAuthority2), + gun:close(ConnPid). + request_end(Config) -> doc("Confirm that the request_end event callback is called."), do_request_end(Config, ?FUNCTION_NAME), @@ -522,6 +657,149 @@ do_request_end_headers_content_length_0(Config, EventName) -> } = do_receive_event(EventName), gun:close(Pid). +request_end_connect(Config) -> + doc("Confirm that the request_end event callback is called " + "for requests going through a CONNECT proxy."), + do_request_end_connect(Config, request_end), + do_request_end_headers_connect(Config, request_end), + do_request_end_headers_content_length_connect(Config, request_end), + do_request_end_headers_content_length_0_connect(Config, request_end). + +do_request_end_connect(Config, EventName) -> + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo + } = do_receive_event(EventName), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(EventName), + gun:close(ConnPid). + +do_request_end_headers_connect(Config, EventName) -> + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo + } = do_receive_event(EventName), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:put(ConnPid, "/", [ + {<<"content-type">>, <<"text/plain">>} + ], #{tunnel => StreamRef1}), + gun:data(ConnPid, StreamRef2, nofin, <<"Hello ">>), + gun:data(ConnPid, StreamRef2, fin, <<"world!">>), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(EventName), + gun:close(ConnPid). + +do_request_end_headers_content_length_connect(Config, EventName) -> + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo + } = do_receive_event(EventName), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:put(ConnPid, "/", [ + {<<"content-type">>, <<"text/plain">>}, + {<<"content-length">>, <<"12">>} + ], #{tunnel => StreamRef1}), + gun:data(ConnPid, StreamRef2, nofin, <<"Hello ">>), + gun:data(ConnPid, StreamRef2, fin, <<"world!">>), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(EventName), + gun:close(ConnPid). + +do_request_end_headers_content_length_0_connect(Config, EventName) -> + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo + } = do_receive_event(EventName), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:put(ConnPid, "/", [ + {<<"content-type">>, <<"text/plain">>}, + {<<"content-length">>, <<"0">>} + ], #{tunnel => StreamRef1}), + gun:data(ConnPid, StreamRef2, fin, <<>>), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(EventName), + gun:close(ConnPid). + +%% push_promise_start/push_promise_end. + push_promise_start(Config) -> doc("Confirm that the push_promise_start event callback is called."), {ok, Pid, _} = do_gun_open(Config), @@ -538,6 +816,41 @@ push_promise_start(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +push_promise_start_connect(Config) -> + doc("Confirm that the push_promise_start event callback is called " + "for requests going through a CONNECT proxy."), + do_push_promise_start_connect(Config, http), + do_push_promise_start_connect(Config, http2). + +do_push_promise_start_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [http2] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, http2} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/push", [], #{tunnel => StreamRef1}), + ReplyTo = self(), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(push_promise_start), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(push_promise_start), + gun:close(ConnPid). + push_promise_end(Config) -> doc("Confirm that the push_promise_end event callback is called."), {ok, Pid, _} = do_gun_open(Config), @@ -562,6 +875,49 @@ push_promise_end(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +push_promise_end_connect(Config) -> + doc("Confirm that the push_promise_end event callback is called " + "for requests going through a CONNECT proxy."), + do_push_promise_end_connect(Config, http), + do_push_promise_end_connect(Config, http2). + +do_push_promise_end_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [http2] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, http2} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/push", [], #{tunnel => StreamRef1}), + ReplyTo = self(), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + promised_stream_ref := [StreamRef1|_], + method := <<"GET">>, + uri := <<"http://",_/bits>>, + headers := [_|_] + } = do_receive_event(push_promise_end), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + promised_stream_ref := [StreamRef1|_], + method := <<"GET">>, + uri := <<"http://",_/bits>>, + headers := [_|_] + } = do_receive_event(push_promise_end), + gun:close(ConnPid). + push_promise_followed_by_response(Config) -> doc("Confirm that the push_promise_end event callbacks are followed by response_start."), {ok, Pid, _} = do_gun_open(Config), @@ -574,8 +930,10 @@ push_promise_followed_by_response(Config) -> true = lists:member(PromisedStreamRef, [StreamRef1, StreamRef2, StreamRef3]), gun:close(Pid). +%% response_start/response_inform/response_headers/response_trailers/response_end. + response_start(Config) -> - doc("Confirm that the request_start event callback is called."), + doc("Confirm that the response_start event callback is called."), {ok, Pid, _} = do_gun_open(Config), {ok, _} = gun:await_up(Pid), StreamRef = gun:get(Pid, "/"), @@ -586,8 +944,40 @@ response_start(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +response_start_connect(Config) -> + doc("Confirm that the response_start event callback is called " + "for requests going through a CONNECT proxy."), + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo + } = do_receive_event(response_start), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(response_start), + gun:close(ConnPid). + response_inform(Config) -> - doc("Confirm that the request_inform event callback is called."), + doc("Confirm that the response_inform event callback is called."), {ok, Pid, _} = do_gun_open(Config), {ok, _} = gun:await_up(Pid), StreamRef = gun:get(Pid, "/inform"), @@ -606,8 +996,44 @@ response_inform(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +response_inform_connect(Config) -> + doc("Confirm that the response_inform event callback is called " + "for requests going through a CONNECT proxy."), + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/inform", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + status := 103, + headers := [_|_] + } = do_receive_event(response_inform), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + status := 103, + headers := [_|_] + } = do_receive_event(response_inform), + gun:close(ConnPid). + response_headers(Config) -> - doc("Confirm that the request_headers event callback is called."), + doc("Confirm that the response_headers event callback is called."), {ok, Pid, _} = do_gun_open(Config), {ok, _} = gun:await_up(Pid), StreamRef = gun:get(Pid, "/"), @@ -620,21 +1046,74 @@ response_headers(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +response_headers_connect(Config) -> + doc("Confirm that the response_headers event callback is called " + "for requests going through a CONNECT proxy."), + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + #{ + stream_ref := StreamRef1, + reply_to := ReplyTo, + status := 200, + headers := Headers1 + } = do_receive_event(response_headers), + true = is_list(Headers1), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + status := 200, + headers := [_|_] + } = do_receive_event(response_headers), + gun:close(ConnPid). + response_trailers(Config) -> - doc("Confirm that the request_trailers event callback is called."), - {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), - StreamRef = gun:get(Pid, "/trailers", [{<<"te">>, <<"trailers">>}]), + doc("Confirm that the response_trailers event callback is called " + "for requests going through a CONNECT proxy."), + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/trailers", [{<<"te">>, <<"trailers">>}], #{tunnel => StreamRef1}), #{ - stream_ref := StreamRef, + stream_ref := StreamRef2, reply_to := ReplyTo, headers := [_|_] - } = do_receive_event(?FUNCTION_NAME), - gun:close(Pid). + } = do_receive_event(response_trailers), + gun:close(ConnPid). response_end(Config) -> - doc("Confirm that the request_headers event callback is called."), + doc("Confirm that the response_end event callback is called."), do_response_end(Config, ?FUNCTION_NAME, "/"), do_response_end(Config, ?FUNCTION_NAME, "/empty"), do_response_end(Config, ?FUNCTION_NAME, "/stream"), @@ -651,8 +1130,47 @@ do_response_end(Config, EventName, Path) -> } = do_receive_event(EventName), gun:close(Pid). +response_end_connect(Config) -> + doc("Confirm that the response_end event callback is called " + "for requests going through a CONNECT proxy."), + do_response_end_connect(Config, response_end, "/"), + do_response_end_connect(Config, response_end, "/empty"), + do_response_end_connect(Config, response_end, "/stream"), + do_response_end_connect(Config, response_end, "/trailers"). + +do_response_end_connect(Config, EventName, Path) -> + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }, []), + %% @todo Figure out whether the response should end when the tunnel is established. +% #{ +% stream_ref := StreamRef1, +% reply_to := ReplyTo +% } = do_receive_event(EventName), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, Path, [{<<"te">>, <<"trailers">>}], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(EventName), + gun:close(ConnPid). + http1_response_end_body_close(Config) -> - doc("Confirm that the request_headers event callback is called " + doc("Confirm that the response_end event callback is called " "when using HTTP/1.0 and the content-length header is not set."), OriginPort = config(tcp_origin_port, Config), Opts = #{ @@ -670,6 +1188,43 @@ http1_response_end_body_close(Config) -> } = do_receive_event(response_end), gun:close(Pid). +%% @todo Figure out how to test both this and TLS handshake errors. Maybe a proxy option? +%response_end_body_close_connect(Config) -> +% doc("Confirm that the response_end event callback is called " +% "when using HTTP/1.0 and the content-length header is not set " +% "for requests going through a CONNECT proxy."), +% OriginPort = config(tcp_origin_port, Config), +% Protocol = config(name, config(tc_group_properties, Config)), +% ReplyTo = self(), +% {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), +% {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ +% event_handler => {?MODULE, self()}, +% protocols => [Protocol] +% }), +% {ok, Protocol} = gun:await_up(ConnPid), +% tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), +% StreamRef1 = gun:connect(ConnPid, #{ +% host => "localhost", +% port => OriginPort, +% protocols => [{http, #{version => 'HTTP/1.0'}}] +% }, []), +% %% @todo Figure out whether the response should end when the tunnel is established. +%% #{ +%% stream_ref := StreamRef1, +%% reply_to := ReplyTo +%% } = do_receive_event(EventName), +% %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... +% {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), +% {up, http} = gun:await(ConnPid, StreamRef1), +% StreamRef2 = gun:get(ConnPid, "/stream", [], #{tunnel => StreamRef1}), +% #{ +% stream_ref := StreamRef2, +% reply_to := ReplyTo +% } = do_receive_event(response_end), +% gun:close(ConnPid). + +%% ws_upgrade. + ws_upgrade(Config) -> doc("Confirm that the ws_upgrade event callback is called."), {ok, Pid, _} = do_gun_open(Config), @@ -683,6 +1238,39 @@ ws_upgrade(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +ws_upgrade_connect(Config) -> + doc("Confirm that the ws_upgrade event callback is called " + "for requests going through a CONNECT proxy."), + do_ws_upgrade_connect(Config, http), + do_ws_upgrade_connect(Config, http2). + +do_ws_upgrade_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + opts := #{} + } = do_receive_event(ws_upgrade), + gun:close(ConnPid). + ws_upgrade_all_events(Config) -> doc("Confirm that a Websocket upgrade triggers all relevant events."), {ok, Pid, OriginPort} = do_gun_open(Config), @@ -730,10 +1318,92 @@ ws_upgrade_all_events(Config) -> headers := [_|_] } = do_receive_event(response_inform), #{ + stream_ref := StreamRef, protocol := ws } = do_receive_event(protocol_changed), gun:close(Pid). +ws_upgrade_all_events_connect(Config) -> + doc("Confirm that a Websocket upgrade triggers all relevant events " + "for requests going through a CONNECT proxy."), + do_ws_upgrade_all_events_connect(Config, http), + do_ws_upgrade_all_events_connect(Config, http2). + +do_ws_upgrade_all_events_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + %% Skip all CONNECT-related events that may conflict. + _ = do_receive_event(request_start), + _ = do_receive_event(request_headers), + _ = do_receive_event(request_end), + _ = do_receive_event(response_start), + _ = do_receive_event(protocol_changed), + %% Check the Websocket events. + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + opts := #{} + } = do_receive_event(ws_upgrade), + Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + function := ws_upgrade, + method := <<"GET">>, + authority := EventAuthority1, + path := "/ws", + headers := [_|_] + } = do_receive_event(request_start), + Authority = iolist_to_binary(EventAuthority1), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + function := ws_upgrade, + method := <<"GET">>, + authority := EventAuthority2, + path := "/ws", + headers := [_|_] + } = do_receive_event(request_headers), + Authority = iolist_to_binary(EventAuthority2), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(request_end), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(response_start), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + status := 101, + headers := [_|_] + } = do_receive_event(response_inform), + #{ + stream_ref := StreamRef2, + protocol := ws + } = do_receive_event(protocol_changed), + gun:close(ConnPid). + +%% ws_recv_frame_start/ws_recv_frame_header/ws_recv_frame_end. + ws_recv_frame_start(Config) -> doc("Confirm that the ws_recv_frame_start event callback is called."), {ok, Pid, _} = do_gun_open(Config), @@ -750,6 +1420,42 @@ ws_recv_frame_start(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +ws_recv_frame_start_connect(Config) -> + doc("Confirm that the ws_recv_frame_start event callback is called " + "for requests going through a CONNECT proxy."), + do_ws_recv_frame_start_connect(Config, http), + do_ws_recv_frame_start_connect(Config, http2). + +do_ws_recv_frame_start_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), + gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + frag_state := undefined, + extensions := #{} + } = do_receive_event(ws_recv_frame_start), + gun:close(ConnPid). + ws_recv_frame_header(Config) -> doc("Confirm that the ws_recv_frame_header event callback is called."), {ok, Pid, _} = do_gun_open(Config), @@ -770,6 +1476,46 @@ ws_recv_frame_header(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +ws_recv_frame_header_connect(Config) -> + doc("Confirm that the ws_recv_frame_header event callback is called " + "for requests going through a CONNECT proxy."), + do_ws_recv_frame_header_connect(Config, http), + do_ws_recv_frame_header_connect(Config, http2). + +do_ws_recv_frame_header_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), + gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + frag_state := undefined, + extensions := #{}, + type := text, + rsv := <<0:3>>, + len := 6, + mask_key := _ + } = do_receive_event(ws_recv_frame_header), + gun:close(ConnPid). + ws_recv_frame_end(Config) -> doc("Confirm that the ws_recv_frame_end event callback is called."), {ok, Pid, _} = do_gun_open(Config), @@ -787,6 +1533,45 @@ ws_recv_frame_end(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +ws_recv_frame_end_connect(Config) -> + doc("Confirm that the ws_recv_frame_end event callback is called " + "for requests going through a CONNECT proxy."), + do_ws_recv_frame_end_connect(Config, http), + do_ws_recv_frame_end_connect(Config, http2). + +do_ws_recv_frame_end_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), + gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + extensions := #{}, + close_code := undefined, + payload := <<"Hello!">> + } = do_receive_event(ws_recv_frame_end), + gun:close(ConnPid). + +%% ws_send_frame_start/ws_send_frame_end. + ws_send_frame_start(Config) -> doc("Confirm that the ws_send_frame_start event callback is called."), do_ws_send_frame(Config, ?FUNCTION_NAME). @@ -810,6 +1595,50 @@ do_ws_send_frame(Config, EventName) -> } = do_receive_event(EventName), gun:close(Pid). +ws_send_frame_start_connect(Config) -> + doc("Confirm that the ws_send_frame_start event callback is called " + "for requests going through a CONNECT proxy."), + do_ws_send_frame_connect(Config, http, ws_send_frame_start), + do_ws_send_frame_connect(Config, http2, ws_send_frame_start). + +ws_send_frame_end_connect(Config) -> + doc("Confirm that the ws_send_frame_end event callback is called " + "for requests going through a CONNECT proxy."), + do_ws_send_frame_connect(Config, http, ws_send_frame_end), + do_ws_send_frame_connect(Config, http2, ws_send_frame_end). + +do_ws_send_frame_connect(Config, ProxyProtocol, EventName) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + ReplyTo = self(), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), + gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + extensions := #{}, + frame := {text, <<"Hello!">>} + } = do_receive_event(EventName), + gun:close(ConnPid). + +%% protocol_changed. + ws_protocol_changed(Config) -> doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success."), {ok, Pid, _} = do_gun_open(Config), @@ -820,45 +1649,100 @@ ws_protocol_changed(Config) -> } = do_receive_event(protocol_changed), gun:close(Pid). -http1_protocol_changed_connect(Config) -> +ws_protocol_changed_connect(Config) -> + doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success " + "for requests going through a CONNECT proxy."), + do_ws_protocol_changed_connect(Config, http), + do_ws_protocol_changed_connect(Config, http2). + +do_ws_protocol_changed_connect(Config, ProxyProtocol) -> + OriginPort = config(tcp_origin_port, Config), + OriginProtocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol] + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [OriginProtocol] + }, []), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + #{ + stream_ref := StreamRef1, + protocol := OriginProtocol + } = do_receive_event(protocol_changed), + StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), + #{ + stream_ref := StreamRef2, + protocol := ws + } = do_receive_event(protocol_changed), + gun:close(ConnPid). + +protocol_changed_connect(Config) -> doc("Confirm that the protocol_changed event callback is called on CONNECT success " "when connecting through a TCP server via a TCP proxy."), + do_protocol_changed_connect(Config, http), + do_protocol_changed_connect(Config, http2). + +do_protocol_changed_connect(Config, OriginProtocol) -> OriginPort = config(tcp_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp), + ProxyProtocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [ProxyProtocol], transport => tcp }), - {ok, http} = gun:await_up(ConnPid), - _ = gun:connect(ConnPid, #{ + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [http2] + protocols => [OriginProtocol] }), - #{protocol := http2} = do_receive_event(protocol_changed), + #{ + stream_ref := StreamRef, + protocol := OriginProtocol + } = do_receive_event(protocol_changed), gun:close(ConnPid). -http1_protocol_changed_connect_over_https_proxy(Config) -> +protocol_changed_tls_connect(Config) -> doc("Confirm that the protocol_changed event callback is called on CONNECT success " - "when connecting through a TLS server via a TLS proxy."), + "when connecting to a TLS server via a TLS proxy."), + do_protocol_changed_tls_connect(Config, http), + do_protocol_changed_tls_connect(Config, http2). + +do_protocol_changed_tls_connect(Config, OriginProtocol) -> OriginPort = config(tls_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls), + ProxyProtocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tls), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [ProxyProtocol], transport => tls }), - {ok, http} = gun:await_up(ConnPid), - _ = gun:connect(ConnPid, #{ + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, transport => tls, - protocols => [http2] + protocols => [OriginProtocol] }), - #{protocol := http2} = do_receive_event(protocol_changed), + #{ + stream_ref := StreamRef, + protocol := OriginProtocol + } = do_receive_event(protocol_changed), gun:close(ConnPid). +%% transport_changed. + http1_transport_changed_connect(Config) -> doc("Confirm that the transport_changed event callback is called on CONNECT success " "when connecting through a TLS server via a TCP proxy."), @@ -905,28 +1789,87 @@ http1_transport_changed_connect_over_https_proxy(Config) -> true = is_pid(Socket), gun:close(ConnPid). -http1_origin_changed_connect(Config) -> +%% origin_changed. + +origin_changed_connect(Config) -> doc("Confirm that the origin_changed event callback is called on CONNECT success."), OriginPort = config(tcp_origin_port, Config), - {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp), + ProxyProtocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp), {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ event_handler => {?MODULE, self()}, - protocols => [config(name, config(tc_group_properties, Config))], + protocols => [ProxyProtocol], transport => tcp }), - {ok, http} = gun:await_up(ConnPid), - _ = gun:connect(ConnPid, #{ + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid), + StreamRef = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort }), - #{ + Event = #{ type := connect, origin_scheme := <<"http">>, origin_host := "localhost", origin_port := OriginPort } = do_receive_event(origin_changed), + case ProxyProtocol of + http -> ok; + http2 -> + #{stream_ref := StreamRef} = Event + end, gun:close(ConnPid). +origin_changed_connect_connect(Config) -> + doc("Confirm that the origin_changed event callback is called on CONNECT success " + "when performed inside another CONNECT tunnel."), + OriginPort = config(tcp_origin_port, Config), + ProxyProtocol = config(name, config(tc_group_properties, Config)), + {ok, Proxy1Pid, Proxy1Port} = do_proxy_start(ProxyProtocol, tcp), + {ok, Proxy2Pid, Proxy2Port} = do_proxy_start(ProxyProtocol, tcp), + {ok, ConnPid} = gun:open("localhost", Proxy1Port, #{ + event_handler => {?MODULE, self()}, + protocols => [ProxyProtocol], + transport => tcp + }), + {ok, ProxyProtocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(ProxyProtocol, Proxy1Pid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => Proxy2Port, + protocols => [ProxyProtocol] + }), + Event1 = #{ + type := connect, + origin_scheme := <<"http">>, + origin_host := "localhost", + origin_port := Proxy2Port + } = do_receive_event(origin_changed), + case ProxyProtocol of + http -> ok; + http2 -> + #{stream_ref := StreamRef1} = Event1 + end, + tunnel_SUITE:do_handshake_completed(ProxyProtocol, Proxy2Pid), + StreamRef2 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort + }, [], #{tunnel => StreamRef1}), + Event2 = #{ + type := connect, + origin_scheme := <<"http">>, + origin_host := "localhost", + origin_port := OriginPort + } = do_receive_event(origin_changed), + case ProxyProtocol of + http -> ok; + http2 -> + #{stream_ref := StreamRef2} = Event2 + end, + gun:close(ConnPid). + +%% cancel. + cancel(Config) -> doc("Confirm that the cancel event callback is called when we cancel a stream."), {ok, Pid, _} = do_gun_open(Config), @@ -957,6 +1900,71 @@ cancel_remote(Config) -> } = do_receive_event(cancel), gun:close(Pid). +cancel_connect(Config) -> + doc("Confirm that the cancel event callback is called when we " + "cancel a stream running inside a CONNECT proxy."), + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol], + transport => tcp + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:post(ConnPid, "/stream", [], #{tunnel => StreamRef1}), + gun:cancel(ConnPid, StreamRef2), + ReplyTo = self(), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + endpoint := local, + reason := cancel + } = do_receive_event(cancel), + gun:close(ConnPid). + +cancel_remote_connect(Config) -> + doc("Confirm that the cancel event callback is called when the " + "remote endpoint cancels a stream running inside a CONNECT proxy."), + OriginPort = config(tcp_origin_port, Config), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort, #{ + event_handler => {?MODULE, self()}, + protocols => [Protocol], + transport => tcp + }), + {ok, Protocol} = gun:await_up(ConnPid), + tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid), + StreamRef1 = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [Protocol] + }), + %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... + {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), + {up, Protocol} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:post(ConnPid, "/stream", [], #{tunnel => StreamRef1}), + ReplyTo = self(), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + endpoint := remote, + reason := _ + } = do_receive_event(cancel), + gun:close(ConnPid). + +%% disconnect. + disconnect(Config) -> doc("Confirm that the disconnect event callback is called on disconnect."), {ok, OriginPid, OriginPort} = init_origin(tcp), @@ -970,6 +1978,8 @@ disconnect(Config) -> } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). +%% terminate. + terminate(Config) -> doc("Confirm that the terminate event callback is called on terminate."), {ok, Pid, _} = do_gun_open(12345, Config), @@ -1012,6 +2022,12 @@ do_receive_event(Event) -> error(timeout) end. +do_proxy_start(Protocol, Transport) -> + case Protocol of + http -> rfc7231_SUITE:do_proxy_start(Transport); + http2 -> rfc7540_SUITE:do_proxy_start(Transport) + end. + %% gun_event callbacks. init(EventData, Pid) -> diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl index b1941c6..d6bcafa 100644 --- a/test/rfc7231_SUITE.erl +++ b/test/rfc7231_SUITE.erl @@ -115,10 +115,11 @@ do_proxy_loop(Transport, ClientSocket, OriginSocket) -> {error, _} -> ok end; + %% Wait forever when a connection gets closed. We will exit with the test process. {tcp_closed, _} -> - ok; + timer:sleep(infinity); {ssl_closed, _} -> - ok; + timer:sleep(infinity); Msg -> error(Msg) end. diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index a66c5ae..8680031 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -90,10 +90,11 @@ do_proxy_receive(Buffer, Proxy=#proxy{socket=Socket, transport=Transport}) -> do_proxy_parse(<>, Proxy); {tcp, OriginSocket, OriginData} -> do_proxy_forward(Buffer, Proxy, OriginSocket, OriginData); + %% Wait forever when a connection gets closed. We will exit with the test process. {tcp_closed, _} -> - ok; + timer:sleep(infinity); {ssl_closed, _} -> - ok; + timer:sleep(infinity); Msg -> error(Msg) end. -- cgit v1.2.3