aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-10-16 11:33:31 +0200
committerLoïc Hoguin <[email protected]>2020-10-16 11:33:31 +0200
commit356bf47edeb5b78765200e78d9b7a48aa98b97f5 (patch)
tree83c35cbb5e7120bd1d1e0a5693571f8b18c088d7
parentf2e8d103dd7827251fa726c42e307e42cef8a3dc (diff)
downloadgun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.gz
gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.bz2
gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.zip
Add or fix events inside or related to CONNECT tunnels
-rw-r--r--src/gun.erl64
-rw-r--r--src/gun_event.erl52
-rw-r--r--src/gun_http.erl81
-rw-r--r--src/gun_http2.erl179
-rw-r--r--src/gun_tunnel.erl196
-rw-r--r--src/gun_ws.erl4
-rw-r--r--test/event_SUITE.erl1142
-rw-r--r--test/rfc7231_SUITE.erl5
-rw-r--r--test/rfc7540_SUITE.erl5
9 files changed, 1478 insertions, 250 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 217133f..62abd6f 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -93,6 +93,7 @@
-export([ws_upgrade/3]).
-export([ws_upgrade/4]).
-export([ws_send/2]).
+-export([ws_send/3]).
%% Internals.
-export([start_link/4]).
@@ -274,7 +275,8 @@
keepalive => timeout(),
protocols => [{binary(), module()}],
reply_to => pid(),
- silence_pings => boolean()
+ silence_pings => boolean(),
+ tunnel => stream_ref()
}.
-export_type([ws_opts/0]).
@@ -926,20 +928,26 @@ ws_upgrade(ServerPid, Path, Headers) ->
StreamRef.
-spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref().
-ws_upgrade(ServerPid, Path, Headers, Opts) ->
+ws_upgrade(ServerPid, Path, Headers, Opts0) ->
+ Tunnel = get_tunnel(Opts0),
+ Opts = maps:without([tunnel], Opts0),
ok = gun_ws:check_options(Opts),
- StreamRef = make_ref(),
+ StreamRef = make_stream_ref(Tunnel),
ReplyTo = maps:get(reply_to, Opts, self()),
%% @todo Also accept tunnel option.
gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, normalize_headers(Headers), Opts}),
StreamRef.
%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
-%% But it can be kept for the time being since it can still work for HTTP/1.1.
+%% But it can be kept for the time being since it can still work for HTTP/1.1 (connected_ws_only).
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
gen_statem:cast(ServerPid, {ws_send, self(), Frames}).
+-spec ws_send(pid(), stream_ref(), ws_frame() | [ws_frame()]) -> ok.
+ws_send(ServerPid, StreamRef, Frames) ->
+ gen_statem:cast(ServerPid, {ws_send, self(), StreamRef, Frames}).
+
%% Internals.
callback_mode() -> state_functions.
@@ -1208,6 +1216,12 @@ connected_data_only(cast, Msg, _)
connected_data_only(Type, Event, State) ->
handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
+ protocol=Protocol=gun_ws, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{
protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -1259,13 +1273,13 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState}) ->
- %% @todo No events are currently handled for the CONNECT request?
- ProtoState2 = Protocol:connect(ProtoState,
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
- Headers, InitialFlow),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ Headers, InitialFlow, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -1275,8 +1289,7 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{op
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts},
State0=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0})
- when Protocol =:= gun_http ->
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
EvHandlerState1 = EvHandler:ws_upgrade(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -1285,14 +1298,19 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts},
%% @todo Can fail if HTTP/1.0.
{Headers, State} = add_cookie_header(Path, Headers0, State0),
{ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
- StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts,
- EvHandler, EvHandlerState1),
+ dereference_stream_ref(StreamRef, State), ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1),
{keep_state, State#state{protocol_state=ProtoState2,
event_handler_state=EvHandlerState}};
-connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
- "Websocket is only supported over HTTP/1.1."}},
- keep_state_and_data;
+%% @todo Maybe better standardize the protocol callbacks argument orders.
+connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0})
+ when is_list(StreamRef) ->
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
+%% Catch-all for the StreamRef-free variant.
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
@@ -1472,7 +1490,7 @@ handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
- StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _,
State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) ->
@@ -1721,7 +1739,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{
_ -> ProtoOpts0#{tunnel_transport => tcp}
end,
{StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
- EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ ProtocolChangedEvent = case ProtoOpts of
+ #{stream_ref := StreamRef} ->
+ #{stream_ref => StreamRef, protocol => Protocol:name()};
+ _ ->
+ #{protocol => Protocol:name()}
+ end,
+ EvHandlerState = EvHandler:protocol_changed(ProtocolChangedEvent, EvHandlerState0),
%% We cancel the existing keepalive and, depending on the protocol,
%% we enable keepalive again, effectively resetting the timer.
State = keepalive_cancel(active(State0#state{protocol=Protocol, protocol_state=ProtoState,
@@ -1755,6 +1779,8 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
KilledStreams = Protocol:down(ProtoState),
Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams},
Retry = maps:get(retry, Opts, 5),
+ %% @todo We need to reset the origin_scheme/host/port and the transport
+ %% as well as remove the intermediaries.
{next_state, not_connected,
keepalive_cancel(State#state{socket=undefined,
protocol=undefined, protocol_state=undefined}),
diff --git a/src/gun_event.erl b/src/gun_event.erl
index b2a71db..513fa9f 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -62,10 +62,10 @@
%% These events occur when connecting to a TLS server or when
%% upgrading the connection or stream to use TLS, for example
%% using CONNECT. The stream_ref/reply_to values are only
-%% present when the TLS handshake occurs as a result of a request.
+%% present when the TLS handshake occurs in the scope of a request.
-type tls_handshake_event() :: #{
- stream_ref => reference(),
+ stream_ref => gun:stream_ref(),
reply_to => pid(),
socket := inet:socket() | ssl:sslsocket() | pid(), %% The socket before/after will be different.
tls_opts := [ssl:tls_client_option()],
@@ -81,13 +81,13 @@
%% request_start/request_headers.
-type request_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
function := headers | request | ws_upgrade,
method := iodata(),
scheme => binary(),
authority := iodata(),
- path := iodata(),
+ path => iodata(),
headers := [{binary(), iodata()}]
}.
-export_type([request_start_event/0]).
@@ -98,7 +98,7 @@
%% request_end.
-type request_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([request_end_event/0]).
@@ -108,7 +108,7 @@
%% push_promise_start.
-type push_promise_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([push_promise_start_event/0]).
@@ -118,12 +118,12 @@
%% push_promise_end.
-type push_promise_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
%% No stream is created if we receive the push_promise while
%% in the process of gracefully shutting down the connection.
%% The promised stream is canceled immediately.
- promised_stream_ref => reference(),
+ promised_stream_ref => gun:stream_ref(),
method := binary(),
uri := binary(),
headers := [{binary(), iodata()}]
@@ -135,7 +135,7 @@
%% response_start.
-type response_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([response_start_event/0]).
@@ -145,7 +145,7 @@
%% response_inform/response_headers.
-type response_headers_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
status := non_neg_integer(),
headers := [{binary(), binary()}]
@@ -158,7 +158,7 @@
%% response_trailers.
-type response_trailers_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
headers := [{binary(), binary()}]
}.
@@ -169,7 +169,7 @@
%% response_end.
-type response_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([response_end_event/0]).
@@ -186,7 +186,7 @@
%% response.
-type ws_upgrade_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
opts := gun:ws_opts()
}.
@@ -197,7 +197,7 @@
%% ws_recv_frame_start.
-type ws_recv_frame_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
frag_state := cow_ws:frag_state(),
extensions := cow_ws:extensions()
@@ -209,7 +209,7 @@
%% ws_recv_frame_header.
-type ws_recv_frame_header_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
frag_state := cow_ws:frag_state(),
extensions := cow_ws:extensions(),
@@ -225,7 +225,7 @@
%% ws_recv_frame_end.
-type ws_recv_frame_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
extensions := cow_ws:extensions(),
close_code := undefined | cow_ws:close_code(),
@@ -238,7 +238,7 @@
%% ws_send_frame_start/ws_send_frame_end.
-type ws_send_frame_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
extensions := cow_ws:extensions(),
frame := gun:ws_frame()
@@ -251,13 +251,10 @@
%% protocol_changed.
%%
%% This event can occur either following a successful ws_upgrade
-%% event or following a successful CONNECT request.
-%%
-%% @todo Currently there is only a connection-wide variant of this
-%% event. In the future there will be a stream-wide variant to
-%% support CONNECT and Websocket over HTTP/2.
+%% event, following a successful CONNECT request or a SOCKS tunnel.
-type protocol_changed_event() :: #{
+ stream_ref := gun:stream_ref(),
protocol := http | http2 | socks | ws
}.
-export_type([protocol_changed_event/0]).
@@ -268,9 +265,11 @@
%%
%% This event can occur following a successful CONNECT request.
%%
-%% @todo Currently there is only a connection-wide variant of this
-%% event. In the future there will be a stream-wide variant to
-%% support CONNECT through TLS proxies over HTTP/2.
+%% @todo I think this event should be removed. We already know
+%% about the transport being TLS via the tls_handshake events.
+%% Perhaps we should provide the socket in tls_handshake_end.
+%% We already do!! Therefore what's the point of this event?
+%% Remove it!!
-type transport_changed_event() :: #{
socket := ssl:sslsocket() | pid(),
@@ -283,6 +282,7 @@
%% origin_changed.
-type origin_changed_event() :: #{
+ stream_ref := gun:stream_ref(),
type := connect, %% @todo socks?
origin_scheme := binary(),
origin_host := inet:hostname() | inet:ip_address(),
@@ -303,7 +303,7 @@
%% Events may still occur for a short time after the cancel.
-type cancel_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
endpoint := local | remote,
reason := atom()
diff --git a/src/gun_http.erl b/src/gun_http.erl
index e2c0f1d..d877068 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -29,7 +29,7 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -195,7 +195,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{nofin, EvHandlerState0};
no_trailers ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{fin, EvHandlerState1}
@@ -218,7 +218,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{nofin, EvHandlerState0};
no_trailers ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{fin, EvHandlerState1}
@@ -243,9 +243,10 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
ResponseEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
@@ -272,7 +273,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
%% We ignore the active command because the stream ended.
[{state, State1}|_] = send_data(Data, State, fin),
EvHandlerState = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
@@ -287,7 +288,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
%% We ignore the active command because the stream ended.
[{state, State1}|_] = send_data(Body, State, fin),
EvHandlerState = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
@@ -396,15 +397,15 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandler, EvHandlerState0, Version, Status, Headers) ->
In = response_io_from_headers(Method, Version, Status, Headers),
IsFin = case In of head -> fin; _ -> nofin end,
+ RealStreamRef = stream_ref(State, StreamRef),
%% @todo Figure out whether the event should trigger if the stream was cancelled.
{Handlers, EvHandlerState2} = case IsAlive of
false ->
{undefined, EvHandlerState0};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef),
- IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -413,7 +414,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
fin -> {undefined, EvHandlerState1};
nofin ->
Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]),
- {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef),
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end
end,
@@ -422,7 +423,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandlerState2;
fin ->
EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
}, EvHandlerState2)
end,
@@ -509,7 +510,7 @@ close(Reason, State=#http_state{in=body_close,
close_streams(State, Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState);
close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) ->
@@ -601,8 +602,9 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
{undefined, _} -> Headers4;
_ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4]
end,
+ RealStreamRef = stream_ref(State, StreamRef),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => Function,
method => Method,
@@ -618,7 +620,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
EvHandlerState = case Out of
head ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState2);
@@ -672,7 +674,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
])
end,
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
@@ -686,7 +688,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
if
Length2 =:= 0, IsFin =:= fin ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
@@ -702,17 +704,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
-connect(State, StreamRef, ReplyTo, _, _, _, _)
+connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- State;
-connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] ->
+ {State, EvHandlerState};
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
+ when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
- State;
+ {State, EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
- StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0) ->
+ StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
+ EvHandler, EvHandlerState0) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -736,12 +740,29 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
Headers2
end,
Headers = transform_header_names(State, Headers3),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => connect,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
- new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
- <<"CONNECT">>, Authority, <<>>, InitialFlow).
+ {new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
+ <<"CONNECT">>, Authority, <<>>, InitialFlow),
+ EvHandlerState}.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
@@ -749,7 +770,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
true ->
State = cancel_stream(State0, StreamRef),
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => local,
reason => cancel
@@ -876,6 +897,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
+ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState)
+ when is_list(StreamRef) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}},
+ {State, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
@@ -986,9 +1012,10 @@ ws_handshake_end(Buffer,
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
headers => Headers,
extensions => Extensions,
flow => InitialFlow,
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 1072aca..8312954 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -30,12 +30,13 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([down/1]).
-%-export([ws_upgrade/10]).
+-export([ws_upgrade/10]).
+-export([ws_send/6]).
-record(tunnel, {
%% The tunnel can either go requested->established
@@ -274,7 +275,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise -> push_promise_start
end,
EvHandler:EvCallback(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0);
%% Trailers or invalid header frame.
@@ -383,7 +384,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{State1, EvHandlerState1};
@@ -399,7 +400,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
{update_window(State1, StreamID), EvHandlerState0};
fin ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{update_window(State1), EvHandlerState1}
@@ -421,11 +422,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
tunnel=Tunnel
} = Stream,
State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]},
+ RealStreamRef = stream_ref(State, StreamRef),
if
Status >= 100, Status =< 199 ->
- ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
EvHandlerState = EvHandler:response_inform(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -438,16 +440,23 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
origin_host => DestHost,
origin_port => DestPort
},
- %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo.
- %% We therefore do not need to call stream_ref/2.
- RealStreamRef = stream_ref(State, StreamRef),
ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
- EvHandlerState = EvHandler:response_headers(#{
+ EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
}, EvHandlerState0),
+ EvHandlerState2 = EvHandler:origin_changed(#{
+ stream_ref => RealStreamRef,
+ type => connect,
+ origin_scheme => case Destination of
+ #{transport := tls} -> <<"https">>;
+ _ -> <<"http">>
+ end,
+ origin_host => DestHost,
+ origin_port => DestPort
+ }, EvHandlerState1),
ContinueStreamRef = continue_stream_ref(State, StreamRef),
OriginSocket = #{
gun_pid => self(),
@@ -505,14 +514,15 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
}
}
end,
- {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(
+ ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2),
{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{
info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}),
EvHandlerState};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -520,12 +530,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
{Handlers, EvHandlerState} = case IsFin of
fin ->
EvHandlerState2 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
}, EvHandlerState1),
{undefined, EvHandlerState2};
nofin ->
- {gun_content_handler:init(ReplyTo, StreamRef,
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end,
%% @todo Disable the tunnel if any.
@@ -537,9 +547,10 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
ResponseEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
@@ -552,7 +563,7 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef),
{stream_error, Reason, 'Stream reset by server.'}},
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => remote,
reason => Reason
@@ -571,9 +582,10 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
+ RealPromisedStreamRef = stream_ref(State, PromisedStreamRef),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
PushPromiseEvent0 = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
method => Method,
uri => URI,
@@ -581,8 +593,9 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
},
PushPromiseEvent = case Status of
connected ->
- ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers},
- PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef),
+ RealPromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef};
_ ->
PushPromiseEvent0
end,
@@ -744,16 +757,16 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt
Transport:send(Socket, cow_http2:ping(0)),
{State, EvHandlerState}.
-%% @todo tunnel
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) ->
+ Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
Authority = maps:get(authority, PseudoHeaders),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
method => Method,
@@ -769,7 +782,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path=Path},
- {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}.
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState};
+%% Tunneled request.
+headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
+ Path, Headers, InitialFlow, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ %% @todo We should send an error to the user if the stream isn't ready.
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
+ origin_host := OriginHost, origin_port := OriginPort}}} ->
+ {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef,
+ ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
+ InitialFlow, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
+ end.
request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
@@ -781,8 +813,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State0, Method, Host, Port, Path, Headers1),
Authority = maps:get(authority, PseudoHeaders),
+ RealStreamRef = stream_ref(State0, StreamRef),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
method => Method,
@@ -806,7 +839,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
case IsFin of
fin ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
{State, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
@@ -814,13 +847,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState)
end;
%% Tunneled request.
-%%
-%% We call Proto:request in a loop until we get to a non-CONNECT stream.
-%% When the transport is gun_tls_proxy we receive the TLS data
-%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast
-%% directly. The 'data' cast contains the tunnel for the StreamRef.
-%% The tunnel is given as the socket and the gun_tls_proxy out_socket
-%% is always a gun_tcp_proxy that sends a 'data' cast.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
@@ -937,7 +963,7 @@ send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) ->
fin ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState0)
@@ -978,7 +1004,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
- Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0)
+ Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0,
+ EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
@@ -1004,37 +1031,59 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
Headers1
end,
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => connect,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
- create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream);
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ EvHandlerState};
%% Tunneled request.
-connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) ->
+connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
+ EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo Should we send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- ProtoState = Proto:connect(ProtoState0, RealStreamRef,
- ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow),
- store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}});
+ {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef,
+ ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
+ EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- State;
+ {State, EvHandlerState0};
error ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
- StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => local,
reason => cancel
@@ -1044,6 +1093,22 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
EvHandlerState0}
+ end;
+%% Tunneled request.
+cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef,
+ ReplyTo, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) ->
@@ -1123,6 +1188,28 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->
down(#http2_state{stream_refs=Refs}) ->
maps:keys(Refs).
+%% Websocket upgrades are currently only accepted when tunneled.
+ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, RealStreamRef, ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{
+ tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), EvHandlerState}
+ %% @todo Error conditions?
+ end.
+
+ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State0, StreamRef) of
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} ->
+ {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState,
+ RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}
+ %% @todo Error conditions?
+ end.
+
connection_error(#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine, streams=Streams},
Error={connection_error, Reason, HumanReadable}) ->
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
index ca9d3aa..cc58351 100644
--- a/src/gun_tunnel.erl
+++ b/src/gun_tunnel.erl
@@ -17,7 +17,7 @@
%% by the tunnel layer.
-module(gun_tunnel).
--export([init/4]).
+-export([init/6]).
-export([handle/4]).
-export([handle_continue/5]).
-export([update_flow/4]).
@@ -27,13 +27,14 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([tunneled_name/2]).
-export([down/1]).
-%-export([ws_upgrade/10]).
+-export([ws_upgrade/10]).
+-export([ws_send/6]).
-record(tunnel_state, {
%% Fake socket and transport.
@@ -97,7 +98,8 @@
%% with some extra information added for the tunnel.
%%
%% @todo Mark the tunnel options as reserved.
-init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) ->
+init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel},
+ EvHandler, EvHandlerState0) ->
#{
type := TunnelType,
transport_name := TunnelTransport,
@@ -113,7 +115,10 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun
{Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
{_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport,
ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}),
-%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => StreamRef,
+ protocol => Proto:name()
+ }, EvHandlerState0),
%% When the tunnel protocol is HTTP/1.1 or SOCKS
%% the gun_tunnel_up message was already sent.
%%
@@ -124,34 +129,36 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun
_ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}
end,
{tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport,
- protocol=Proto, protocol_state=ProtoState}};
+ protocol=Proto, protocol_state=ProtoState},
+ EvHandlerState};
%% We can't initialize the protocol until the TLS handshake has completed.
- #{handshake_event := HandshakeEvent, protocols := Protocols} ->
+ #{handshake_event := HandshakeEvent0, protocols := Protocols} ->
#{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket,
#{
origin_host := DestHost,
origin_port := DestPort
} = TunnelInfo,
-%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState!
-%% Otherwise we can't do the TLS events.
#{
tls_opts := TLSOpts,
timeout := TLSTimeout
- } = HandshakeEvent,
+ } = HandshakeEvent0,
+ HandshakeEvent = HandshakeEvent0#{socket => OriginSocket},
+ EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0),
{ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort,
TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect,
{handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}),
{tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy,
- tls_origin_socket=OriginSocket}}
+ tls_origin_socket=OriginSocket}, EvHandlerState}
end.
%% When we receive data we pass it forward directly for TCP;
%% or we decrypt it and pass it via handle_continue for TLS.
-handle(Data, State=#tunnel_state{transport=gun_tcp_proxy,
+handle(Data, State0=#tunnel_state{transport=gun_tcp_proxy,
protocol=Proto, protocol_state=ProtoState0},
EvHandler, EvHandlerState0) ->
- {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState};
+ {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState};
handle(Data, State=#tunnel_state{transport=gun_tls_proxy,
socket=ProxyPid, tls_origin_socket=OriginSocket},
_EvHandler, EvHandlerState) ->
@@ -170,15 +177,19 @@ handle(Data, State=#tunnel_state{transport=gun_tls_proxy,
handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated},
{handle_continue, _, HandshakeEvent, Protocols}},
State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts},
- _EvHandler, EvHandlerState0)
+ EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
#{reply_to := ReplyTo} = HandshakeEvent,
NewProtocol = gun_protocols:negotiated(Negotiated, Protocols),
{Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
-% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
-% socket => Socket,
-% protocol => NewProtocol
-% }, EvHandlerState0),
+ EvHandlerState1 = EvHandler:tls_handshake_end(HandshakeEvent#{
+ socket => ProxyPid,
+ protocol => NewProtocol
+ }, EvHandlerState0),
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => StreamRef,
+ protocol => NewProtocol
+ }, EvHandlerState1),
%% @todo Terminate the current protocol or something?
OriginSocket = #{
gun_pid => self(),
@@ -188,15 +199,15 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated},
{_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy,
ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}),
ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()},
- {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0};
+ {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState};
handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason},
- {handle_continue, _, _HandshakeEvent, _}},
- #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0)
+ {handle_continue, _, HandshakeEvent, _}},
+ #tunnel_state{socket=ProxyPid}, EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
-%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
-%% error => Reason
-%% }, EvHandlerState0),
-%%% @todo
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ error => Reason
+ }, EvHandlerState0),
+%% @todo
%% The TCP connection can be closed by either peer. The END_STREAM flag
%% on a DATA frame is treated as being equivalent to the TCP FIN bit. A
%% client is expected to send a DATA frame with the END_STREAM flag set
@@ -206,18 +217,19 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason},
%% receives a TCP segment with the FIN bit set sends a DATA frame with
%% the END_STREAM flag set. Note that the final TCP segment or DATA
%% frame could be empty.
- {{error, Reason}, EvHandlerState0};
+ {{error, Reason}, EvHandlerState};
%% Send the data. This causes TLS to encrypt the data and send it to the inner layer.
handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data},
#tunnel_state{}, _EvHandler, EvHandlerState)
when is_reference(ContinueStreamRef) ->
{{send, IsFin, Data}, EvHandlerState};
handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data},
- State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
+ State0=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
- {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState};
+ {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState};
handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid},
#tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
@@ -233,21 +245,24 @@ handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, Reason},
%%
%% @todo Assert StreamRef to be our reference().
handle_continue([_StreamRef|ContinueStreamRef0], Msg,
- State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
EvHandler, EvHandlerState0) ->
ContinueStreamRef = case ContinueStreamRef0 of
[CSR] -> CSR;
_ -> ContinueStreamRef0
end,
- {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef,
+ {Commands, EvHandlerState1} = Proto:handle_continue(ContinueStreamRef,
Msg, ProtoState, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState}.
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}.
-update_flow(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+%% @todo This function will need EvHandler/EvHandlerState?
+update_flow(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
ReplyTo, StreamRef0, Inc) ->
- StreamRef = maybe_dereference(State, StreamRef0),
+ StreamRef = maybe_dereference(State0, StreamRef0),
Commands = Proto:update_flow(ProtoState, ReplyTo, StreamRef, Inc),
- {state, commands(Commands, State)}.
+ {State, undefined} = commands(Commands, State0, undefined, undefined),
+ {state, State}.
closing(_Reason, _State, _EvHandler, EvHandlerState) ->
%% @todo Graceful shutdown must be propagated to tunnels.
@@ -312,17 +327,20 @@ data(State=#tunnel_state{socket=Socket, transport=Transport,
%% We pass the CONNECT request forward and optionally dereference StreamRef.
connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
protocol=Proto, protocol_state=ProtoState0},
- StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow) ->
+ StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow,
+ EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State, StreamRef0),
- ProtoState = Proto:connect(ProtoState0, StreamRef,
- ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow),
- State#tunnel_state{protocol_state=ProtoState}.
+ {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef,
+ ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow,
+ EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
-cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
- StreamRef = maybe_dereference(State, StreamRef0),
- {Commands, EvHandlerState} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState}.
+ StreamRef = maybe_dereference(State0, StreamRef0),
+ {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}.
timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) ->
case Proto:timeout(ProtoState0, Msg, TRef) of
@@ -395,27 +413,71 @@ down(_State) ->
%% @todo Tunnels must be included in the gun_down message.
[].
+ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=ProtoState0},
+ StreamRef0, ReplyTo, _, _, Path, Headers, WsOpts, EvHandler, EvHandlerState0) ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ #{
+ origin_host := Host,
+ origin_port := Port
+ } = TunnelInfo,
+ {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
+
+ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
+ StreamRef = maybe_dereference(State0, StreamRef0),
+ {Commands, EvHandlerState1} = Proto:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}.
+
%% Internal.
-commands(Command, State) when not is_list(Command) ->
- commands([Command], State);
-commands([], State) ->
- State;
-commands([{state, ProtoState}|Tail], State) ->
- commands(Tail, State#tunnel_state{protocol_state=ProtoState});
+commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) ->
+ commands([Command], State, EvHandler, EvHandlerState);
+commands([], State, _, EvHandlerState) ->
+ {State, EvHandlerState};
+commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) ->
+ commands(Tail, State#tunnel_state{protocol_state=ProtoState}, EvHandler, EvHandlerState);
%% @todo We must pass down the set_cookie commands. Have a commands_queue.
-commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) ->
- commands(Tail, State);
+commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}, EvHandler, EvHandlerState) ->
+ commands(Tail, State, EvHandler, EvHandlerState);
%% @todo What to do about IsFin?
-commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) ->
+commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport},
+ EvHandler, EvHandlerState) ->
Transport:send(Socket, Data),
- commands(Tail, State);
-commands([Origin={origin, _Scheme, _NewHost, _NewPort, _Type}|Tail], State) ->
- commands(Tail, State#tunnel_state{protocol_origin=Origin});
+ commands(Tail, State, EvHandler, EvHandlerState);
+commands([Origin={origin, Scheme, Host, Port, Type}|Tail],
+ State=#tunnel_state{stream_ref=StreamRef},
+ EvHandler, EvHandlerState0) ->
+ EvHandlerState = EvHandler:origin_changed(#{
+ stream_ref => StreamRef,
+ type => Type,
+ origin_scheme => Scheme,
+ origin_host => Host,
+ origin_port => Port
+ }, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol_origin=Origin}, EvHandler, EvHandlerState);
+commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
+ State=#tunnel_state{socket=Socket, transport=Transport, opts=Opts,
+ protocol_origin=undefined},
+ EvHandler, EvHandlerState0) ->
+ {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
+ %% This should only apply to Websocket for the time being.
+ {connected_ws_only, ProtoState} = Proto:init(ReplyTo, Socket, Transport, ProtoOpts),
+ #{stream_ref := StreamRef} = ProtoOpts,
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => StreamRef,
+ protocol => Proto:name()
+ }, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState);
commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
State=#tunnel_state{transport=Transport, stream_ref=TunnelStreamRef,
info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto,
- protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) ->
+ protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}},
+ EvHandler, EvHandlerState0) ->
StreamRef = case Type of
socks5 -> TunnelStreamRef;
connect -> gun_protocols:stream_ref(NewProtocol)
@@ -450,13 +512,15 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
}
},
Proto = gun_tunnel,
- {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
-%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
- commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState});
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo,
+ OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState);
commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail],
State=#tunnel_state{transport=Transport,
info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto,
- protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) ->
+ protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}},
+ EvHandler, EvHandlerState0) ->
#{
stream_ref := StreamRef,
tls_opts := TLSOpts0
@@ -496,10 +560,12 @@ commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail],
}
},
Proto = gun_tunnel,
- {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
- commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState});
-commands([{active, true}|Tail], State) ->
- commands(Tail, State).
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo,
+ OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState);
+commands([{active, true}|Tail], State, EvHandler, EvHandlerState) ->
+ commands(Tail, State, EvHandler, EvHandlerState).
continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) ->
if
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index f59c19c..cd81d65 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -26,6 +26,7 @@
-export([close/4]).
-export([keepalive/3]).
-export([ws_send/5]).
+-export([ws_send/6]).
-export([down/1]).
-record(payload, {
@@ -289,6 +290,9 @@ ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) ->
Other
end.
+ws_send(Frames, State, _StreamRef, ReplyTo, EvHandler, EvHandlerState) ->
+ ws_send(Frames, State, ReplyTo, EvHandler, EvHandlerState).
+
%% Websocket has no concept of streams.
down(_) ->
[].
diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl
index 5510e8c..6f2f1fa 100644
--- a/test/event_SUITE.erl
+++ b/test/event_SUITE.erl
@@ -37,7 +37,7 @@ groups() ->
%% We currently do not support Websocket over HTTP/2.
WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"],
[
- {http, [parallel], Tests -- [cancel_remote|PushTests]},
+ {http, [parallel], Tests -- [cancel_remote, cancel_remote_connect|PushTests]},
{http2, [parallel], (Tests -- WsTests) -- HTTP1Tests}
].
@@ -193,6 +193,8 @@ connect_end_ok_tls(Config) ->
false = maps:is_key(protocol, Event),
gun:close(Pid).
+%% tls_handshake_start/tls_handshake_end.
+
tls_handshake_start(Config) ->
doc("Confirm that the tls_handshake_start event callback is called."),
{ok, Pid, _} = do_gun_open_tls(Config),
@@ -236,17 +238,19 @@ tls_handshake_end_ok(Config) ->
true = is_tuple(Socket),
gun:close(Pid).
-http1_tls_handshake_start_connect(Config) ->
+tls_handshake_start_tcp_connect_tls(Config) ->
doc("Confirm that the tls_handshake_start event callback is called "
"when using CONNECT to a TLS server via a TCP proxy."),
OriginPort = config(tls_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [Protocol],
transport => tcp
}),
- {ok, http} = gun:await_up(ConnPid),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
StreamRef = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
@@ -260,21 +264,26 @@ http1_tls_handshake_start_connect(Config) ->
tls_opts := _,
timeout := _
} = do_receive_event(tls_handshake_start),
- true = is_port(Socket),
+ true = case Protocol of
+ http -> is_port(Socket);
+ http2 -> is_map(Socket)
+ end,
gun:close(ConnPid).
-http1_tls_handshake_end_error_connect(Config) ->
+tls_handshake_end_error_tcp_connect_tls(Config) ->
doc("Confirm that the tls_handshake_end event callback is called on TLS handshake error "
"when using CONNECT to a TLS server via a TCP proxy."),
%% We use the wrong port on purpose to trigger a handshake error.
OriginPort = config(tcp_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [Protocol],
transport => tcp
}),
- {ok, http} = gun:await_up(ConnPid),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
StreamRef = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
@@ -289,20 +298,25 @@ http1_tls_handshake_end_error_connect(Config) ->
timeout := _,
error := {tls_alert, _}
} = do_receive_event(tls_handshake_end),
- true = is_port(Socket),
+ true = case Protocol of
+ http -> is_port(Socket);
+ http2 -> is_map(Socket)
+ end,
gun:close(ConnPid).
-http1_tls_handshake_end_ok_connect(Config) ->
+tls_handshake_end_ok_tcp_connect_tls(Config) ->
doc("Confirm that the tls_handshake_end event callback is called on TLS handshake success "
"when using CONNECT to a TLS server via a TCP proxy."),
OriginPort = config(tls_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [Protocol],
transport => tcp
}),
- {ok, http} = gun:await_up(ConnPid),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
StreamRef = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
@@ -317,20 +331,25 @@ http1_tls_handshake_end_ok_connect(Config) ->
timeout := _,
protocol := http2
} = do_receive_event(tls_handshake_end),
- true = is_tuple(Socket),
+ true = case Protocol of
+ http -> is_tuple(Socket);
+ http2 -> is_pid(Socket)
+ end,
gun:close(ConnPid).
-http1_tls_handshake_start_connect_over_https_proxy(Config) ->
+tls_handshake_start_tls_connect_tls(Config) ->
doc("Confirm that the tls_handshake_start event callback is called "
"when using CONNECT to a TLS server via a TLS proxy."),
OriginPort = config(tls_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tls),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [Protocol],
transport => tls
}),
- {ok, http} = gun:await_up(ConnPid),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
%% We skip the TLS handshake event to the TLS proxy.
_ = do_receive_event(tls_handshake_start),
StreamRef = gun:connect(ConnPid, #{
@@ -346,21 +365,26 @@ http1_tls_handshake_start_connect_over_https_proxy(Config) ->
tls_opts := _,
timeout := _
} = do_receive_event(tls_handshake_start),
- true = is_tuple(Socket),
+ true = case Protocol of
+ http -> is_tuple(Socket);
+ http2 -> is_map(Socket)
+ end,
gun:close(ConnPid).
-http1_tls_handshake_end_error_connect_over_https_proxy(Config) ->
+tls_handshake_end_error_tls_connect_tls(Config) ->
doc("Confirm that the tls_handshake_end event callback is called on TLS handshake error "
"when using CONNECT to a TLS server via a TLS proxy."),
%% We use the wrong port on purpose to trigger a handshake error.
OriginPort = config(tcp_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tls),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [Protocol],
transport => tls
}),
- {ok, http} = gun:await_up(ConnPid),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
%% We skip the TLS handshake event to the TLS proxy.
_ = do_receive_event(tls_handshake_end),
StreamRef = gun:connect(ConnPid, #{
@@ -377,20 +401,25 @@ http1_tls_handshake_end_error_connect_over_https_proxy(Config) ->
timeout := _,
error := {tls_alert, _}
} = do_receive_event(tls_handshake_end),
- true = is_tuple(Socket),
+ true = case Protocol of
+ http -> is_tuple(Socket);
+ http2 -> is_map(Socket)
+ end,
gun:close(ConnPid).
-http1_tls_handshake_end_ok_connect_over_https_proxy(Config) ->
+tls_handshake_end_ok_tls_connect_tls(Config) ->
doc("Confirm that the tls_handshake_end event callback is called on TLS handshake success "
"when using CONNECT to a TLS server via a TLS proxy."),
OriginPort = config(tls_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tls),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [Protocol],
transport => tls
}),
- {ok, http} = gun:await_up(ConnPid),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
%% We skip the TLS handshake event to the TLS proxy.
_ = do_receive_event(tls_handshake_end),
StreamRef = gun:connect(ConnPid, #{
@@ -410,6 +439,8 @@ http1_tls_handshake_end_ok_connect_over_https_proxy(Config) ->
true = is_pid(Socket),
gun:close(ConnPid).
+%% request_start/request_headers/request_end.
+
request_start(Config) ->
doc("Confirm that the request_start event callback is called."),
do_request_event(Config, ?FUNCTION_NAME),
@@ -458,6 +489,110 @@ do_request_event_headers(Config, EventName) ->
Authority = iolist_to_binary(EventAuthority),
gun:close(Pid).
+request_start_connect(Config) ->
+ doc("Confirm that the request_start event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_request_event_connect(Config, request_start),
+ do_request_event_headers_connect(Config, request_start).
+
+request_headers_connect(Config) ->
+ doc("Confirm that the request_headers event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_request_event_connect(Config, request_headers),
+ do_request_event_headers_connect(Config, request_headers).
+
+do_request_event_connect(Config, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo,
+ function := connect,
+ method := <<"CONNECT">>,
+ authority := EventAuthority1,
+ headers := Headers1
+ } = do_receive_event(EventName),
+ Authority = iolist_to_binary(EventAuthority1),
+ %% Gun doesn't send headers with an HTTP/2 CONNECT request
+ %% so we only check that the headers are given as a list.
+ true = is_list(Headers1),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ function := request,
+ method := <<"GET">>,
+ authority := EventAuthority2,
+ path := "/",
+ headers := [_|_]
+ } = do_receive_event(EventName),
+ Authority = iolist_to_binary(EventAuthority2),
+ gun:close(ConnPid).
+
+do_request_event_headers_connect(Config, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo,
+ function := connect,
+ method := <<"CONNECT">>,
+ authority := EventAuthority1,
+ headers := Headers1
+ } = do_receive_event(EventName),
+ Authority = iolist_to_binary(EventAuthority1),
+ %% Gun doesn't send headers with an HTTP/2 CONNECT request
+ %% so we only check that the headers are given as a list.
+ true = is_list(Headers1),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:put(ConnPid, "/", [
+ {<<"content-type">>, <<"text/plain">>}
+ ], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ function := headers,
+ method := <<"PUT">>,
+ authority := EventAuthority2,
+ path := "/",
+ headers := [_|_]
+ } = do_receive_event(EventName),
+ Authority = iolist_to_binary(EventAuthority2),
+ gun:close(ConnPid).
+
request_end(Config) ->
doc("Confirm that the request_end event callback is called."),
do_request_end(Config, ?FUNCTION_NAME),
@@ -522,6 +657,149 @@ do_request_end_headers_content_length_0(Config, EventName) ->
} = do_receive_event(EventName),
gun:close(Pid).
+request_end_connect(Config) ->
+ doc("Confirm that the request_end event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_request_end_connect(Config, request_end),
+ do_request_end_headers_connect(Config, request_end),
+ do_request_end_headers_content_length_connect(Config, request_end),
+ do_request_end_headers_content_length_0_connect(Config, request_end).
+
+do_request_end_connect(Config, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(ConnPid).
+
+do_request_end_headers_connect(Config, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:put(ConnPid, "/", [
+ {<<"content-type">>, <<"text/plain">>}
+ ], #{tunnel => StreamRef1}),
+ gun:data(ConnPid, StreamRef2, nofin, <<"Hello ">>),
+ gun:data(ConnPid, StreamRef2, fin, <<"world!">>),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(ConnPid).
+
+do_request_end_headers_content_length_connect(Config, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:put(ConnPid, "/", [
+ {<<"content-type">>, <<"text/plain">>},
+ {<<"content-length">>, <<"12">>}
+ ], #{tunnel => StreamRef1}),
+ gun:data(ConnPid, StreamRef2, nofin, <<"Hello ">>),
+ gun:data(ConnPid, StreamRef2, fin, <<"world!">>),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(ConnPid).
+
+do_request_end_headers_content_length_0_connect(Config, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:put(ConnPid, "/", [
+ {<<"content-type">>, <<"text/plain">>},
+ {<<"content-length">>, <<"0">>}
+ ], #{tunnel => StreamRef1}),
+ gun:data(ConnPid, StreamRef2, fin, <<>>),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(ConnPid).
+
+%% push_promise_start/push_promise_end.
+
push_promise_start(Config) ->
doc("Confirm that the push_promise_start event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
@@ -538,6 +816,41 @@ push_promise_start(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+push_promise_start_connect(Config) ->
+ doc("Confirm that the push_promise_start event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_push_promise_start_connect(Config, http),
+ do_push_promise_start_connect(Config, http2).
+
+do_push_promise_start_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [http2]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, http2} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/push", [], #{tunnel => StreamRef1}),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(push_promise_start),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(push_promise_start),
+ gun:close(ConnPid).
+
push_promise_end(Config) ->
doc("Confirm that the push_promise_end event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
@@ -562,6 +875,49 @@ push_promise_end(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+push_promise_end_connect(Config) ->
+ doc("Confirm that the push_promise_end event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_push_promise_end_connect(Config, http),
+ do_push_promise_end_connect(Config, http2).
+
+do_push_promise_end_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [http2]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, http2} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/push", [], #{tunnel => StreamRef1}),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ promised_stream_ref := [StreamRef1|_],
+ method := <<"GET">>,
+ uri := <<"http://",_/bits>>,
+ headers := [_|_]
+ } = do_receive_event(push_promise_end),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ promised_stream_ref := [StreamRef1|_],
+ method := <<"GET">>,
+ uri := <<"http://",_/bits>>,
+ headers := [_|_]
+ } = do_receive_event(push_promise_end),
+ gun:close(ConnPid).
+
push_promise_followed_by_response(Config) ->
doc("Confirm that the push_promise_end event callbacks are followed by response_start."),
{ok, Pid, _} = do_gun_open(Config),
@@ -574,8 +930,10 @@ push_promise_followed_by_response(Config) ->
true = lists:member(PromisedStreamRef, [StreamRef1, StreamRef2, StreamRef3]),
gun:close(Pid).
+%% response_start/response_inform/response_headers/response_trailers/response_end.
+
response_start(Config) ->
- doc("Confirm that the request_start event callback is called."),
+ doc("Confirm that the response_start event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
{ok, _} = gun:await_up(Pid),
StreamRef = gun:get(Pid, "/"),
@@ -586,8 +944,40 @@ response_start(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+response_start_connect(Config) ->
+ doc("Confirm that the response_start event callback is called "
+ "for requests going through a CONNECT proxy."),
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo
+ } = do_receive_event(response_start),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(response_start),
+ gun:close(ConnPid).
+
response_inform(Config) ->
- doc("Confirm that the request_inform event callback is called."),
+ doc("Confirm that the response_inform event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
{ok, _} = gun:await_up(Pid),
StreamRef = gun:get(Pid, "/inform"),
@@ -606,8 +996,44 @@ response_inform(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+response_inform_connect(Config) ->
+ doc("Confirm that the response_inform event callback is called "
+ "for requests going through a CONNECT proxy."),
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/inform", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ status := 103,
+ headers := [_|_]
+ } = do_receive_event(response_inform),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ status := 103,
+ headers := [_|_]
+ } = do_receive_event(response_inform),
+ gun:close(ConnPid).
+
response_headers(Config) ->
- doc("Confirm that the request_headers event callback is called."),
+ doc("Confirm that the response_headers event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
{ok, _} = gun:await_up(Pid),
StreamRef = gun:get(Pid, "/"),
@@ -620,21 +1046,74 @@ response_headers(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+response_headers_connect(Config) ->
+ doc("Confirm that the response_headers event callback is called "
+ "for requests going through a CONNECT proxy."),
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ #{
+ stream_ref := StreamRef1,
+ reply_to := ReplyTo,
+ status := 200,
+ headers := Headers1
+ } = do_receive_event(response_headers),
+ true = is_list(Headers1),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ status := 200,
+ headers := [_|_]
+ } = do_receive_event(response_headers),
+ gun:close(ConnPid).
+
response_trailers(Config) ->
- doc("Confirm that the request_trailers event callback is called."),
- {ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
- StreamRef = gun:get(Pid, "/trailers", [{<<"te">>, <<"trailers">>}]),
+ doc("Confirm that the response_trailers event callback is called "
+ "for requests going through a CONNECT proxy."),
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/trailers", [{<<"te">>, <<"trailers">>}], #{tunnel => StreamRef1}),
#{
- stream_ref := StreamRef,
+ stream_ref := StreamRef2,
reply_to := ReplyTo,
headers := [_|_]
- } = do_receive_event(?FUNCTION_NAME),
- gun:close(Pid).
+ } = do_receive_event(response_trailers),
+ gun:close(ConnPid).
response_end(Config) ->
- doc("Confirm that the request_headers event callback is called."),
+ doc("Confirm that the response_end event callback is called."),
do_response_end(Config, ?FUNCTION_NAME, "/"),
do_response_end(Config, ?FUNCTION_NAME, "/empty"),
do_response_end(Config, ?FUNCTION_NAME, "/stream"),
@@ -651,8 +1130,47 @@ do_response_end(Config, EventName, Path) ->
} = do_receive_event(EventName),
gun:close(Pid).
+response_end_connect(Config) ->
+ doc("Confirm that the response_end event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_response_end_connect(Config, response_end, "/"),
+ do_response_end_connect(Config, response_end, "/empty"),
+ do_response_end_connect(Config, response_end, "/stream"),
+ do_response_end_connect(Config, response_end, "/trailers").
+
+do_response_end_connect(Config, EventName, Path) ->
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol]
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }, []),
+ %% @todo Figure out whether the response should end when the tunnel is established.
+% #{
+% stream_ref := StreamRef1,
+% reply_to := ReplyTo
+% } = do_receive_event(EventName),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, Path, [{<<"te">>, <<"trailers">>}], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(ConnPid).
+
http1_response_end_body_close(Config) ->
- doc("Confirm that the request_headers event callback is called "
+ doc("Confirm that the response_end event callback is called "
"when using HTTP/1.0 and the content-length header is not set."),
OriginPort = config(tcp_origin_port, Config),
Opts = #{
@@ -670,6 +1188,43 @@ http1_response_end_body_close(Config) ->
} = do_receive_event(response_end),
gun:close(Pid).
+%% @todo Figure out how to test both this and TLS handshake errors. Maybe a proxy option?
+%response_end_body_close_connect(Config) ->
+% doc("Confirm that the response_end event callback is called "
+% "when using HTTP/1.0 and the content-length header is not set "
+% "for requests going through a CONNECT proxy."),
+% OriginPort = config(tcp_origin_port, Config),
+% Protocol = config(name, config(tc_group_properties, Config)),
+% ReplyTo = self(),
+% {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+% {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+% event_handler => {?MODULE, self()},
+% protocols => [Protocol]
+% }),
+% {ok, Protocol} = gun:await_up(ConnPid),
+% tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+% StreamRef1 = gun:connect(ConnPid, #{
+% host => "localhost",
+% port => OriginPort,
+% protocols => [{http, #{version => 'HTTP/1.0'}}]
+% }, []),
+% %% @todo Figure out whether the response should end when the tunnel is established.
+%% #{
+%% stream_ref := StreamRef1,
+%% reply_to := ReplyTo
+%% } = do_receive_event(EventName),
+% %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+% {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+% {up, http} = gun:await(ConnPid, StreamRef1),
+% StreamRef2 = gun:get(ConnPid, "/stream", [], #{tunnel => StreamRef1}),
+% #{
+% stream_ref := StreamRef2,
+% reply_to := ReplyTo
+% } = do_receive_event(response_end),
+% gun:close(ConnPid).
+
+%% ws_upgrade.
+
ws_upgrade(Config) ->
doc("Confirm that the ws_upgrade event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
@@ -683,6 +1238,39 @@ ws_upgrade(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+ws_upgrade_connect(Config) ->
+ doc("Confirm that the ws_upgrade event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_ws_upgrade_connect(Config, http),
+ do_ws_upgrade_connect(Config, http2).
+
+do_ws_upgrade_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ opts := #{}
+ } = do_receive_event(ws_upgrade),
+ gun:close(ConnPid).
+
ws_upgrade_all_events(Config) ->
doc("Confirm that a Websocket upgrade triggers all relevant events."),
{ok, Pid, OriginPort} = do_gun_open(Config),
@@ -730,10 +1318,92 @@ ws_upgrade_all_events(Config) ->
headers := [_|_]
} = do_receive_event(response_inform),
#{
+ stream_ref := StreamRef,
protocol := ws
} = do_receive_event(protocol_changed),
gun:close(Pid).
+ws_upgrade_all_events_connect(Config) ->
+ doc("Confirm that a Websocket upgrade triggers all relevant events "
+ "for requests going through a CONNECT proxy."),
+ do_ws_upgrade_all_events_connect(Config, http),
+ do_ws_upgrade_all_events_connect(Config, http2).
+
+do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ %% Skip all CONNECT-related events that may conflict.
+ _ = do_receive_event(request_start),
+ _ = do_receive_event(request_headers),
+ _ = do_receive_event(request_end),
+ _ = do_receive_event(response_start),
+ _ = do_receive_event(protocol_changed),
+ %% Check the Websocket events.
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ opts := #{}
+ } = do_receive_event(ws_upgrade),
+ Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ function := ws_upgrade,
+ method := <<"GET">>,
+ authority := EventAuthority1,
+ path := "/ws",
+ headers := [_|_]
+ } = do_receive_event(request_start),
+ Authority = iolist_to_binary(EventAuthority1),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ function := ws_upgrade,
+ method := <<"GET">>,
+ authority := EventAuthority2,
+ path := "/ws",
+ headers := [_|_]
+ } = do_receive_event(request_headers),
+ Authority = iolist_to_binary(EventAuthority2),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(request_end),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(response_start),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ status := 101,
+ headers := [_|_]
+ } = do_receive_event(response_inform),
+ #{
+ stream_ref := StreamRef2,
+ protocol := ws
+ } = do_receive_event(protocol_changed),
+ gun:close(ConnPid).
+
+%% ws_recv_frame_start/ws_recv_frame_header/ws_recv_frame_end.
+
ws_recv_frame_start(Config) ->
doc("Confirm that the ws_recv_frame_start event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
@@ -750,6 +1420,42 @@ ws_recv_frame_start(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+ws_recv_frame_start_connect(Config) ->
+ doc("Confirm that the ws_recv_frame_start event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_ws_recv_frame_start_connect(Config, http),
+ do_ws_recv_frame_start_connect(Config, http2).
+
+do_ws_recv_frame_start_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
+ gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ frag_state := undefined,
+ extensions := #{}
+ } = do_receive_event(ws_recv_frame_start),
+ gun:close(ConnPid).
+
ws_recv_frame_header(Config) ->
doc("Confirm that the ws_recv_frame_header event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
@@ -770,6 +1476,46 @@ ws_recv_frame_header(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+ws_recv_frame_header_connect(Config) ->
+ doc("Confirm that the ws_recv_frame_header event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_ws_recv_frame_header_connect(Config, http),
+ do_ws_recv_frame_header_connect(Config, http2).
+
+do_ws_recv_frame_header_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
+ gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ frag_state := undefined,
+ extensions := #{},
+ type := text,
+ rsv := <<0:3>>,
+ len := 6,
+ mask_key := _
+ } = do_receive_event(ws_recv_frame_header),
+ gun:close(ConnPid).
+
ws_recv_frame_end(Config) ->
doc("Confirm that the ws_recv_frame_end event callback is called."),
{ok, Pid, _} = do_gun_open(Config),
@@ -787,6 +1533,45 @@ ws_recv_frame_end(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+ws_recv_frame_end_connect(Config) ->
+ doc("Confirm that the ws_recv_frame_end event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_ws_recv_frame_end_connect(Config, http),
+ do_ws_recv_frame_end_connect(Config, http2).
+
+do_ws_recv_frame_end_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
+ gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ extensions := #{},
+ close_code := undefined,
+ payload := <<"Hello!">>
+ } = do_receive_event(ws_recv_frame_end),
+ gun:close(ConnPid).
+
+%% ws_send_frame_start/ws_send_frame_end.
+
ws_send_frame_start(Config) ->
doc("Confirm that the ws_send_frame_start event callback is called."),
do_ws_send_frame(Config, ?FUNCTION_NAME).
@@ -810,6 +1595,50 @@ do_ws_send_frame(Config, EventName) ->
} = do_receive_event(EventName),
gun:close(Pid).
+ws_send_frame_start_connect(Config) ->
+ doc("Confirm that the ws_send_frame_start event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_ws_send_frame_connect(Config, http, ws_send_frame_start),
+ do_ws_send_frame_connect(Config, http2, ws_send_frame_start).
+
+ws_send_frame_end_connect(Config) ->
+ doc("Confirm that the ws_send_frame_end event callback is called "
+ "for requests going through a CONNECT proxy."),
+ do_ws_send_frame_connect(Config, http, ws_send_frame_end),
+ do_ws_send_frame_connect(Config, http2, ws_send_frame_end).
+
+do_ws_send_frame_connect(Config, ProxyProtocol, EventName) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ ReplyTo = self(),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
+ gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ extensions := #{},
+ frame := {text, <<"Hello!">>}
+ } = do_receive_event(EventName),
+ gun:close(ConnPid).
+
+%% protocol_changed.
+
ws_protocol_changed(Config) ->
doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success."),
{ok, Pid, _} = do_gun_open(Config),
@@ -820,45 +1649,100 @@ ws_protocol_changed(Config) ->
} = do_receive_event(protocol_changed),
gun:close(Pid).
-http1_protocol_changed_connect(Config) ->
+ws_protocol_changed_connect(Config) ->
+ doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success "
+ "for requests going through a CONNECT proxy."),
+ do_ws_protocol_changed_connect(Config, http),
+ do_ws_protocol_changed_connect(Config, http2).
+
+do_ws_protocol_changed_connect(Config, ProxyProtocol) ->
+ OriginPort = config(tcp_origin_port, Config),
+ OriginProtocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol]
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [OriginProtocol]
+ }, []),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ #{
+ stream_ref := StreamRef1,
+ protocol := OriginProtocol
+ } = do_receive_event(protocol_changed),
+ StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
+ #{
+ stream_ref := StreamRef2,
+ protocol := ws
+ } = do_receive_event(protocol_changed),
+ gun:close(ConnPid).
+
+protocol_changed_connect(Config) ->
doc("Confirm that the protocol_changed event callback is called on CONNECT success "
"when connecting through a TCP server via a TCP proxy."),
+ do_protocol_changed_connect(Config, http),
+ do_protocol_changed_connect(Config, http2).
+
+do_protocol_changed_connect(Config, OriginProtocol) ->
OriginPort = config(tcp_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp),
+ ProxyProtocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [ProxyProtocol],
transport => tcp
}),
- {ok, http} = gun:await_up(ConnPid),
- _ = gun:connect(ConnPid, #{
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [http2]
+ protocols => [OriginProtocol]
}),
- #{protocol := http2} = do_receive_event(protocol_changed),
+ #{
+ stream_ref := StreamRef,
+ protocol := OriginProtocol
+ } = do_receive_event(protocol_changed),
gun:close(ConnPid).
-http1_protocol_changed_connect_over_https_proxy(Config) ->
+protocol_changed_tls_connect(Config) ->
doc("Confirm that the protocol_changed event callback is called on CONNECT success "
- "when connecting through a TLS server via a TLS proxy."),
+ "when connecting to a TLS server via a TLS proxy."),
+ do_protocol_changed_tls_connect(Config, http),
+ do_protocol_changed_tls_connect(Config, http2).
+
+do_protocol_changed_tls_connect(Config, OriginProtocol) ->
OriginPort = config(tls_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tls),
+ ProxyProtocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tls),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [ProxyProtocol],
transport => tls
}),
- {ok, http} = gun:await_up(ConnPid),
- _ = gun:connect(ConnPid, #{
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
transport => tls,
- protocols => [http2]
+ protocols => [OriginProtocol]
}),
- #{protocol := http2} = do_receive_event(protocol_changed),
+ #{
+ stream_ref := StreamRef,
+ protocol := OriginProtocol
+ } = do_receive_event(protocol_changed),
gun:close(ConnPid).
+%% transport_changed.
+
http1_transport_changed_connect(Config) ->
doc("Confirm that the transport_changed event callback is called on CONNECT success "
"when connecting through a TLS server via a TCP proxy."),
@@ -905,28 +1789,87 @@ http1_transport_changed_connect_over_https_proxy(Config) ->
true = is_pid(Socket),
gun:close(ConnPid).
-http1_origin_changed_connect(Config) ->
+%% origin_changed.
+
+origin_changed_connect(Config) ->
doc("Confirm that the origin_changed event callback is called on CONNECT success."),
OriginPort = config(tcp_origin_port, Config),
- {ok, _, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp),
+ ProxyProtocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(ProxyProtocol, tcp),
{ok, ConnPid} = gun:open("localhost", ProxyPort, #{
event_handler => {?MODULE, self()},
- protocols => [config(name, config(tc_group_properties, Config))],
+ protocols => [ProxyProtocol],
transport => tcp
}),
- {ok, http} = gun:await_up(ConnPid),
- _ = gun:connect(ConnPid, #{
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, ProxyPid),
+ StreamRef = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort
}),
- #{
+ Event = #{
type := connect,
origin_scheme := <<"http">>,
origin_host := "localhost",
origin_port := OriginPort
} = do_receive_event(origin_changed),
+ case ProxyProtocol of
+ http -> ok;
+ http2 ->
+ #{stream_ref := StreamRef} = Event
+ end,
gun:close(ConnPid).
+origin_changed_connect_connect(Config) ->
+ doc("Confirm that the origin_changed event callback is called on CONNECT success "
+ "when performed inside another CONNECT tunnel."),
+ OriginPort = config(tcp_origin_port, Config),
+ ProxyProtocol = config(name, config(tc_group_properties, Config)),
+ {ok, Proxy1Pid, Proxy1Port} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, Proxy2Pid, Proxy2Port} = do_proxy_start(ProxyProtocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", Proxy1Port, #{
+ event_handler => {?MODULE, self()},
+ protocols => [ProxyProtocol],
+ transport => tcp
+ }),
+ {ok, ProxyProtocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, Proxy1Pid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => Proxy2Port,
+ protocols => [ProxyProtocol]
+ }),
+ Event1 = #{
+ type := connect,
+ origin_scheme := <<"http">>,
+ origin_host := "localhost",
+ origin_port := Proxy2Port
+ } = do_receive_event(origin_changed),
+ case ProxyProtocol of
+ http -> ok;
+ http2 ->
+ #{stream_ref := StreamRef1} = Event1
+ end,
+ tunnel_SUITE:do_handshake_completed(ProxyProtocol, Proxy2Pid),
+ StreamRef2 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort
+ }, [], #{tunnel => StreamRef1}),
+ Event2 = #{
+ type := connect,
+ origin_scheme := <<"http">>,
+ origin_host := "localhost",
+ origin_port := OriginPort
+ } = do_receive_event(origin_changed),
+ case ProxyProtocol of
+ http -> ok;
+ http2 ->
+ #{stream_ref := StreamRef2} = Event2
+ end,
+ gun:close(ConnPid).
+
+%% cancel.
+
cancel(Config) ->
doc("Confirm that the cancel event callback is called when we cancel a stream."),
{ok, Pid, _} = do_gun_open(Config),
@@ -957,6 +1900,71 @@ cancel_remote(Config) ->
} = do_receive_event(cancel),
gun:close(Pid).
+cancel_connect(Config) ->
+ doc("Confirm that the cancel event callback is called when we "
+ "cancel a stream running inside a CONNECT proxy."),
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol],
+ transport => tcp
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:post(ConnPid, "/stream", [], #{tunnel => StreamRef1}),
+ gun:cancel(ConnPid, StreamRef2),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ endpoint := local,
+ reason := cancel
+ } = do_receive_event(cancel),
+ gun:close(ConnPid).
+
+cancel_remote_connect(Config) ->
+ doc("Confirm that the cancel event callback is called when the "
+ "remote endpoint cancels a stream running inside a CONNECT proxy."),
+ OriginPort = config(tcp_origin_port, Config),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ProxyPid, ProxyPort} = do_proxy_start(Protocol, tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort, #{
+ event_handler => {?MODULE, self()},
+ protocols => [Protocol],
+ transport => tcp
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ tunnel_SUITE:do_handshake_completed(Protocol, ProxyPid),
+ StreamRef1 = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [Protocol]
+ }),
+ %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
+ {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {up, Protocol} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:post(ConnPid, "/stream", [], #{tunnel => StreamRef1}),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ endpoint := remote,
+ reason := _
+ } = do_receive_event(cancel),
+ gun:close(ConnPid).
+
+%% disconnect.
+
disconnect(Config) ->
doc("Confirm that the disconnect event callback is called on disconnect."),
{ok, OriginPid, OriginPort} = init_origin(tcp),
@@ -970,6 +1978,8 @@ disconnect(Config) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
+%% terminate.
+
terminate(Config) ->
doc("Confirm that the terminate event callback is called on terminate."),
{ok, Pid, _} = do_gun_open(12345, Config),
@@ -1012,6 +2022,12 @@ do_receive_event(Event) ->
error(timeout)
end.
+do_proxy_start(Protocol, Transport) ->
+ case Protocol of
+ http -> rfc7231_SUITE:do_proxy_start(Transport);
+ http2 -> rfc7540_SUITE:do_proxy_start(Transport)
+ end.
+
%% gun_event callbacks.
init(EventData, Pid) ->
diff --git a/test/rfc7231_SUITE.erl b/test/rfc7231_SUITE.erl
index b1941c6..d6bcafa 100644
--- a/test/rfc7231_SUITE.erl
+++ b/test/rfc7231_SUITE.erl
@@ -115,10 +115,11 @@ do_proxy_loop(Transport, ClientSocket, OriginSocket) ->
{error, _} ->
ok
end;
+ %% Wait forever when a connection gets closed. We will exit with the test process.
{tcp_closed, _} ->
- ok;
+ timer:sleep(infinity);
{ssl_closed, _} ->
- ok;
+ timer:sleep(infinity);
Msg ->
error(Msg)
end.
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index a66c5ae..8680031 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -90,10 +90,11 @@ do_proxy_receive(Buffer, Proxy=#proxy{socket=Socket, transport=Transport}) ->
do_proxy_parse(<<Buffer/binary, Data0/bits>>, Proxy);
{tcp, OriginSocket, OriginData} ->
do_proxy_forward(Buffer, Proxy, OriginSocket, OriginData);
+ %% Wait forever when a connection gets closed. We will exit with the test process.
{tcp_closed, _} ->
- ok;
+ timer:sleep(infinity);
{ssl_closed, _} ->
- ok;
+ timer:sleep(infinity);
Msg ->
error(Msg)
end.