aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl64
-rw-r--r--src/gun_event.erl52
-rw-r--r--src/gun_http.erl81
-rw-r--r--src/gun_http2.erl179
-rw-r--r--src/gun_tunnel.erl196
-rw-r--r--src/gun_ws.erl4
6 files changed, 393 insertions, 183 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 217133f..62abd6f 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -93,6 +93,7 @@
-export([ws_upgrade/3]).
-export([ws_upgrade/4]).
-export([ws_send/2]).
+-export([ws_send/3]).
%% Internals.
-export([start_link/4]).
@@ -274,7 +275,8 @@
keepalive => timeout(),
protocols => [{binary(), module()}],
reply_to => pid(),
- silence_pings => boolean()
+ silence_pings => boolean(),
+ tunnel => stream_ref()
}.
-export_type([ws_opts/0]).
@@ -926,20 +928,26 @@ ws_upgrade(ServerPid, Path, Headers) ->
StreamRef.
-spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref().
-ws_upgrade(ServerPid, Path, Headers, Opts) ->
+ws_upgrade(ServerPid, Path, Headers, Opts0) ->
+ Tunnel = get_tunnel(Opts0),
+ Opts = maps:without([tunnel], Opts0),
ok = gun_ws:check_options(Opts),
- StreamRef = make_ref(),
+ StreamRef = make_stream_ref(Tunnel),
ReplyTo = maps:get(reply_to, Opts, self()),
%% @todo Also accept tunnel option.
gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, normalize_headers(Headers), Opts}),
StreamRef.
%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
-%% But it can be kept for the time being since it can still work for HTTP/1.1.
+%% But it can be kept for the time being since it can still work for HTTP/1.1 (connected_ws_only).
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
gen_statem:cast(ServerPid, {ws_send, self(), Frames}).
+-spec ws_send(pid(), stream_ref(), ws_frame() | [ws_frame()]) -> ok.
+ws_send(ServerPid, StreamRef, Frames) ->
+ gen_statem:cast(ServerPid, {ws_send, self(), StreamRef, Frames}).
+
%% Internals.
callback_mode() -> state_functions.
@@ -1208,6 +1216,12 @@ connected_data_only(cast, Msg, _)
connected_data_only(Type, Event, State) ->
handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
+ protocol=Protocol=gun_ws, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{
protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -1259,13 +1273,13 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState}) ->
- %% @todo No events are currently handled for the CONNECT request?
- ProtoState2 = Protocol:connect(ProtoState,
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
- Headers, InitialFlow),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ Headers, InitialFlow, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -1275,8 +1289,7 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{op
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts},
State0=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0})
- when Protocol =:= gun_http ->
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
EvHandlerState1 = EvHandler:ws_upgrade(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -1285,14 +1298,19 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts},
%% @todo Can fail if HTTP/1.0.
{Headers, State} = add_cookie_header(Path, Headers0, State0),
{ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
- StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts,
- EvHandler, EvHandlerState1),
+ dereference_stream_ref(StreamRef, State), ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1),
{keep_state, State#state{protocol_state=ProtoState2,
event_handler_state=EvHandlerState}};
-connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
- "Websocket is only supported over HTTP/1.1."}},
- keep_state_and_data;
+%% @todo Maybe better standardize the protocol callbacks argument orders.
+connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0})
+ when is_list(StreamRef) ->
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
+%% Catch-all for the StreamRef-free variant.
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
@@ -1472,7 +1490,7 @@ handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
- StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _,
State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) ->
@@ -1721,7 +1739,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{
_ -> ProtoOpts0#{tunnel_transport => tcp}
end,
{StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
- EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ ProtocolChangedEvent = case ProtoOpts of
+ #{stream_ref := StreamRef} ->
+ #{stream_ref => StreamRef, protocol => Protocol:name()};
+ _ ->
+ #{protocol => Protocol:name()}
+ end,
+ EvHandlerState = EvHandler:protocol_changed(ProtocolChangedEvent, EvHandlerState0),
%% We cancel the existing keepalive and, depending on the protocol,
%% we enable keepalive again, effectively resetting the timer.
State = keepalive_cancel(active(State0#state{protocol=Protocol, protocol_state=ProtoState,
@@ -1755,6 +1779,8 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
KilledStreams = Protocol:down(ProtoState),
Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams},
Retry = maps:get(retry, Opts, 5),
+ %% @todo We need to reset the origin_scheme/host/port and the transport
+ %% as well as remove the intermediaries.
{next_state, not_connected,
keepalive_cancel(State#state{socket=undefined,
protocol=undefined, protocol_state=undefined}),
diff --git a/src/gun_event.erl b/src/gun_event.erl
index b2a71db..513fa9f 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -62,10 +62,10 @@
%% These events occur when connecting to a TLS server or when
%% upgrading the connection or stream to use TLS, for example
%% using CONNECT. The stream_ref/reply_to values are only
-%% present when the TLS handshake occurs as a result of a request.
+%% present when the TLS handshake occurs in the scope of a request.
-type tls_handshake_event() :: #{
- stream_ref => reference(),
+ stream_ref => gun:stream_ref(),
reply_to => pid(),
socket := inet:socket() | ssl:sslsocket() | pid(), %% The socket before/after will be different.
tls_opts := [ssl:tls_client_option()],
@@ -81,13 +81,13 @@
%% request_start/request_headers.
-type request_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
function := headers | request | ws_upgrade,
method := iodata(),
scheme => binary(),
authority := iodata(),
- path := iodata(),
+ path => iodata(),
headers := [{binary(), iodata()}]
}.
-export_type([request_start_event/0]).
@@ -98,7 +98,7 @@
%% request_end.
-type request_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([request_end_event/0]).
@@ -108,7 +108,7 @@
%% push_promise_start.
-type push_promise_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([push_promise_start_event/0]).
@@ -118,12 +118,12 @@
%% push_promise_end.
-type push_promise_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
%% No stream is created if we receive the push_promise while
%% in the process of gracefully shutting down the connection.
%% The promised stream is canceled immediately.
- promised_stream_ref => reference(),
+ promised_stream_ref => gun:stream_ref(),
method := binary(),
uri := binary(),
headers := [{binary(), iodata()}]
@@ -135,7 +135,7 @@
%% response_start.
-type response_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([response_start_event/0]).
@@ -145,7 +145,7 @@
%% response_inform/response_headers.
-type response_headers_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
status := non_neg_integer(),
headers := [{binary(), binary()}]
@@ -158,7 +158,7 @@
%% response_trailers.
-type response_trailers_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
headers := [{binary(), binary()}]
}.
@@ -169,7 +169,7 @@
%% response_end.
-type response_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid()
}.
-export_type([response_end_event/0]).
@@ -186,7 +186,7 @@
%% response.
-type ws_upgrade_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
opts := gun:ws_opts()
}.
@@ -197,7 +197,7 @@
%% ws_recv_frame_start.
-type ws_recv_frame_start_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
frag_state := cow_ws:frag_state(),
extensions := cow_ws:extensions()
@@ -209,7 +209,7 @@
%% ws_recv_frame_header.
-type ws_recv_frame_header_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
frag_state := cow_ws:frag_state(),
extensions := cow_ws:extensions(),
@@ -225,7 +225,7 @@
%% ws_recv_frame_end.
-type ws_recv_frame_end_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
extensions := cow_ws:extensions(),
close_code := undefined | cow_ws:close_code(),
@@ -238,7 +238,7 @@
%% ws_send_frame_start/ws_send_frame_end.
-type ws_send_frame_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
extensions := cow_ws:extensions(),
frame := gun:ws_frame()
@@ -251,13 +251,10 @@
%% protocol_changed.
%%
%% This event can occur either following a successful ws_upgrade
-%% event or following a successful CONNECT request.
-%%
-%% @todo Currently there is only a connection-wide variant of this
-%% event. In the future there will be a stream-wide variant to
-%% support CONNECT and Websocket over HTTP/2.
+%% event, following a successful CONNECT request or a SOCKS tunnel.
-type protocol_changed_event() :: #{
+ stream_ref := gun:stream_ref(),
protocol := http | http2 | socks | ws
}.
-export_type([protocol_changed_event/0]).
@@ -268,9 +265,11 @@
%%
%% This event can occur following a successful CONNECT request.
%%
-%% @todo Currently there is only a connection-wide variant of this
-%% event. In the future there will be a stream-wide variant to
-%% support CONNECT through TLS proxies over HTTP/2.
+%% @todo I think this event should be removed. We already know
+%% about the transport being TLS via the tls_handshake events.
+%% Perhaps we should provide the socket in tls_handshake_end.
+%% We already do!! Therefore what's the point of this event?
+%% Remove it!!
-type transport_changed_event() :: #{
socket := ssl:sslsocket() | pid(),
@@ -283,6 +282,7 @@
%% origin_changed.
-type origin_changed_event() :: #{
+ stream_ref := gun:stream_ref(),
type := connect, %% @todo socks?
origin_scheme := binary(),
origin_host := inet:hostname() | inet:ip_address(),
@@ -303,7 +303,7 @@
%% Events may still occur for a short time after the cancel.
-type cancel_event() :: #{
- stream_ref := reference(),
+ stream_ref := gun:stream_ref(),
reply_to := pid(),
endpoint := local | remote,
reason := atom()
diff --git a/src/gun_http.erl b/src/gun_http.erl
index e2c0f1d..d877068 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -29,7 +29,7 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -195,7 +195,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{nofin, EvHandlerState0};
no_trailers ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{fin, EvHandlerState1}
@@ -218,7 +218,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{nofin, EvHandlerState0};
no_trailers ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{fin, EvHandlerState1}
@@ -243,9 +243,10 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
ResponseEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
@@ -272,7 +273,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
%% We ignore the active command because the stream ended.
[{state, State1}|_] = send_data(Data, State, fin),
EvHandlerState = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
@@ -287,7 +288,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
%% We ignore the active command because the stream ended.
[{state, State1}|_] = send_data(Body, State, fin),
EvHandlerState = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
@@ -396,15 +397,15 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandler, EvHandlerState0, Version, Status, Headers) ->
In = response_io_from_headers(Method, Version, Status, Headers),
IsFin = case In of head -> fin; _ -> nofin end,
+ RealStreamRef = stream_ref(State, StreamRef),
%% @todo Figure out whether the event should trigger if the stream was cancelled.
{Handlers, EvHandlerState2} = case IsAlive of
false ->
{undefined, EvHandlerState0};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef),
- IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -413,7 +414,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
fin -> {undefined, EvHandlerState1};
nofin ->
Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]),
- {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef),
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end
end,
@@ -422,7 +423,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandlerState2;
fin ->
EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
}, EvHandlerState2)
end,
@@ -509,7 +510,7 @@ close(Reason, State=#http_state{in=body_close,
close_streams(State, Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState);
close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) ->
@@ -601,8 +602,9 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
{undefined, _} -> Headers4;
_ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4]
end,
+ RealStreamRef = stream_ref(State, StreamRef),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => Function,
method => Method,
@@ -618,7 +620,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
EvHandlerState = case Out of
head ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState2);
@@ -672,7 +674,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
])
end,
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
@@ -686,7 +688,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
if
Length2 =:= 0, IsFin =:= fin ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
@@ -702,17 +704,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
-connect(State, StreamRef, ReplyTo, _, _, _, _)
+connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- State;
-connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] ->
+ {State, EvHandlerState};
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
+ when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
- State;
+ {State, EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
- StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0) ->
+ StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
+ EvHandler, EvHandlerState0) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -736,12 +740,29 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
Headers2
end,
Headers = transform_header_names(State, Headers3),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => connect,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
- new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
- <<"CONNECT">>, Authority, <<>>, InitialFlow).
+ {new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
+ <<"CONNECT">>, Authority, <<>>, InitialFlow),
+ EvHandlerState}.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
@@ -749,7 +770,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
true ->
State = cancel_stream(State0, StreamRef),
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => local,
reason => cancel
@@ -876,6 +897,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
+ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState)
+ when is_list(StreamRef) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}},
+ {State, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
@@ -986,9 +1012,10 @@ ws_handshake_end(Buffer,
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
headers => Headers,
extensions => Extensions,
flow => InitialFlow,
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 1072aca..8312954 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -30,12 +30,13 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([down/1]).
-%-export([ws_upgrade/10]).
+-export([ws_upgrade/10]).
+-export([ws_send/6]).
-record(tunnel, {
%% The tunnel can either go requested->established
@@ -274,7 +275,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise -> push_promise_start
end,
EvHandler:EvCallback(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0);
%% Trailers or invalid header frame.
@@ -383,7 +384,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{State1, EvHandlerState1};
@@ -399,7 +400,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
{update_window(State1, StreamID), EvHandlerState0};
fin ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{update_window(State1), EvHandlerState1}
@@ -421,11 +422,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
tunnel=Tunnel
} = Stream,
State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]},
+ RealStreamRef = stream_ref(State, StreamRef),
if
Status >= 100, Status =< 199 ->
- ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
EvHandlerState = EvHandler:response_inform(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -438,16 +440,23 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
origin_host => DestHost,
origin_port => DestPort
},
- %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo.
- %% We therefore do not need to call stream_ref/2.
- RealStreamRef = stream_ref(State, StreamRef),
ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
- EvHandlerState = EvHandler:response_headers(#{
+ EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
}, EvHandlerState0),
+ EvHandlerState2 = EvHandler:origin_changed(#{
+ stream_ref => RealStreamRef,
+ type => connect,
+ origin_scheme => case Destination of
+ #{transport := tls} -> <<"https">>;
+ _ -> <<"http">>
+ end,
+ origin_host => DestHost,
+ origin_port => DestPort
+ }, EvHandlerState1),
ContinueStreamRef = continue_stream_ref(State, StreamRef),
OriginSocket = #{
gun_pid => self(),
@@ -505,14 +514,15 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
}
}
end,
- {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(
+ ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2),
{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{
info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}),
EvHandlerState};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -520,12 +530,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
{Handlers, EvHandlerState} = case IsFin of
fin ->
EvHandlerState2 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
}, EvHandlerState1),
{undefined, EvHandlerState2};
nofin ->
- {gun_content_handler:init(ReplyTo, StreamRef,
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end,
%% @todo Disable the tunnel if any.
@@ -537,9 +547,10 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
ResponseEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
@@ -552,7 +563,7 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef),
{stream_error, Reason, 'Stream reset by server.'}},
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => remote,
reason => Reason
@@ -571,9 +582,10 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
+ RealPromisedStreamRef = stream_ref(State, PromisedStreamRef),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
PushPromiseEvent0 = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
method => Method,
uri => URI,
@@ -581,8 +593,9 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
},
PushPromiseEvent = case Status of
connected ->
- ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers},
- PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef),
+ RealPromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef};
_ ->
PushPromiseEvent0
end,
@@ -744,16 +757,16 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt
Transport:send(Socket, cow_http2:ping(0)),
{State, EvHandlerState}.
-%% @todo tunnel
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) ->
+ Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
Authority = maps:get(authority, PseudoHeaders),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
method => Method,
@@ -769,7 +782,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path=Path},
- {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}.
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState};
+%% Tunneled request.
+headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
+ Path, Headers, InitialFlow, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ %% @todo We should send an error to the user if the stream isn't ready.
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
+ origin_host := OriginHost, origin_port := OriginPort}}} ->
+ {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef,
+ ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
+ InitialFlow, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
+ end.
request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
@@ -781,8 +813,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State0, Method, Host, Port, Path, Headers1),
Authority = maps:get(authority, PseudoHeaders),
+ RealStreamRef = stream_ref(State0, StreamRef),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
method => Method,
@@ -806,7 +839,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
case IsFin of
fin ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
{State, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
@@ -814,13 +847,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState)
end;
%% Tunneled request.
-%%
-%% We call Proto:request in a loop until we get to a non-CONNECT stream.
-%% When the transport is gun_tls_proxy we receive the TLS data
-%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast
-%% directly. The 'data' cast contains the tunnel for the StreamRef.
-%% The tunnel is given as the socket and the gun_tls_proxy out_socket
-%% is always a gun_tcp_proxy that sends a 'data' cast.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
@@ -937,7 +963,7 @@ send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) ->
fin ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState0)
@@ -978,7 +1004,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
- Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0)
+ Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0,
+ EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
@@ -1004,37 +1031,59 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
Headers1
end,
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => connect,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
- create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream);
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ EvHandlerState};
%% Tunneled request.
-connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) ->
+connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
+ EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo Should we send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- ProtoState = Proto:connect(ProtoState0, RealStreamRef,
- ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow),
- store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}});
+ {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef,
+ ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
+ EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- State;
+ {State, EvHandlerState0};
error ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
- StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => local,
reason => cancel
@@ -1044,6 +1093,22 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
EvHandlerState0}
+ end;
+%% Tunneled request.
+cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef,
+ ReplyTo, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) ->
@@ -1123,6 +1188,28 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->
down(#http2_state{stream_refs=Refs}) ->
maps:keys(Refs).
+%% Websocket upgrades are currently only accepted when tunneled.
+ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, RealStreamRef, ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{
+ tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), EvHandlerState}
+ %% @todo Error conditions?
+ end.
+
+ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State0, StreamRef) of
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} ->
+ {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState,
+ RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}
+ %% @todo Error conditions?
+ end.
+
connection_error(#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine, streams=Streams},
Error={connection_error, Reason, HumanReadable}) ->
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
index ca9d3aa..cc58351 100644
--- a/src/gun_tunnel.erl
+++ b/src/gun_tunnel.erl
@@ -17,7 +17,7 @@
%% by the tunnel layer.
-module(gun_tunnel).
--export([init/4]).
+-export([init/6]).
-export([handle/4]).
-export([handle_continue/5]).
-export([update_flow/4]).
@@ -27,13 +27,14 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([tunneled_name/2]).
-export([down/1]).
-%-export([ws_upgrade/10]).
+-export([ws_upgrade/10]).
+-export([ws_send/6]).
-record(tunnel_state, {
%% Fake socket and transport.
@@ -97,7 +98,8 @@
%% with some extra information added for the tunnel.
%%
%% @todo Mark the tunnel options as reserved.
-init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) ->
+init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel},
+ EvHandler, EvHandlerState0) ->
#{
type := TunnelType,
transport_name := TunnelTransport,
@@ -113,7 +115,10 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun
{Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
{_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport,
ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}),
-%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => StreamRef,
+ protocol => Proto:name()
+ }, EvHandlerState0),
%% When the tunnel protocol is HTTP/1.1 or SOCKS
%% the gun_tunnel_up message was already sent.
%%
@@ -124,34 +129,36 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun
_ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}
end,
{tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport,
- protocol=Proto, protocol_state=ProtoState}};
+ protocol=Proto, protocol_state=ProtoState},
+ EvHandlerState};
%% We can't initialize the protocol until the TLS handshake has completed.
- #{handshake_event := HandshakeEvent, protocols := Protocols} ->
+ #{handshake_event := HandshakeEvent0, protocols := Protocols} ->
#{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket,
#{
origin_host := DestHost,
origin_port := DestPort
} = TunnelInfo,
-%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState!
-%% Otherwise we can't do the TLS events.
#{
tls_opts := TLSOpts,
timeout := TLSTimeout
- } = HandshakeEvent,
+ } = HandshakeEvent0,
+ HandshakeEvent = HandshakeEvent0#{socket => OriginSocket},
+ EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0),
{ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort,
TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect,
{handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}),
{tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy,
- tls_origin_socket=OriginSocket}}
+ tls_origin_socket=OriginSocket}, EvHandlerState}
end.
%% When we receive data we pass it forward directly for TCP;
%% or we decrypt it and pass it via handle_continue for TLS.
-handle(Data, State=#tunnel_state{transport=gun_tcp_proxy,
+handle(Data, State0=#tunnel_state{transport=gun_tcp_proxy,
protocol=Proto, protocol_state=ProtoState0},
EvHandler, EvHandlerState0) ->
- {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState};
+ {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState};
handle(Data, State=#tunnel_state{transport=gun_tls_proxy,
socket=ProxyPid, tls_origin_socket=OriginSocket},
_EvHandler, EvHandlerState) ->
@@ -170,15 +177,19 @@ handle(Data, State=#tunnel_state{transport=gun_tls_proxy,
handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated},
{handle_continue, _, HandshakeEvent, Protocols}},
State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts},
- _EvHandler, EvHandlerState0)
+ EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
#{reply_to := ReplyTo} = HandshakeEvent,
NewProtocol = gun_protocols:negotiated(Negotiated, Protocols),
{Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
-% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
-% socket => Socket,
-% protocol => NewProtocol
-% }, EvHandlerState0),
+ EvHandlerState1 = EvHandler:tls_handshake_end(HandshakeEvent#{
+ socket => ProxyPid,
+ protocol => NewProtocol
+ }, EvHandlerState0),
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => StreamRef,
+ protocol => NewProtocol
+ }, EvHandlerState1),
%% @todo Terminate the current protocol or something?
OriginSocket = #{
gun_pid => self(),
@@ -188,15 +199,15 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated},
{_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy,
ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}),
ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()},
- {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0};
+ {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState};
handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason},
- {handle_continue, _, _HandshakeEvent, _}},
- #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0)
+ {handle_continue, _, HandshakeEvent, _}},
+ #tunnel_state{socket=ProxyPid}, EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
-%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
-%% error => Reason
-%% }, EvHandlerState0),
-%%% @todo
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ error => Reason
+ }, EvHandlerState0),
+%% @todo
%% The TCP connection can be closed by either peer. The END_STREAM flag
%% on a DATA frame is treated as being equivalent to the TCP FIN bit. A
%% client is expected to send a DATA frame with the END_STREAM flag set
@@ -206,18 +217,19 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason},
%% receives a TCP segment with the FIN bit set sends a DATA frame with
%% the END_STREAM flag set. Note that the final TCP segment or DATA
%% frame could be empty.
- {{error, Reason}, EvHandlerState0};
+ {{error, Reason}, EvHandlerState};
%% Send the data. This causes TLS to encrypt the data and send it to the inner layer.
handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data},
#tunnel_state{}, _EvHandler, EvHandlerState)
when is_reference(ContinueStreamRef) ->
{{send, IsFin, Data}, EvHandlerState};
handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data},
- State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
+ State0=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState},
EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
- {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState};
+ {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState};
handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid},
#tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0)
when is_reference(ContinueStreamRef) ->
@@ -233,21 +245,24 @@ handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, Reason},
%%
%% @todo Assert StreamRef to be our reference().
handle_continue([_StreamRef|ContinueStreamRef0], Msg,
- State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
EvHandler, EvHandlerState0) ->
ContinueStreamRef = case ContinueStreamRef0 of
[CSR] -> CSR;
_ -> ContinueStreamRef0
end,
- {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef,
+ {Commands, EvHandlerState1} = Proto:handle_continue(ContinueStreamRef,
Msg, ProtoState, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState}.
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}.
-update_flow(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+%% @todo This function will need EvHandler/EvHandlerState?
+update_flow(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
ReplyTo, StreamRef0, Inc) ->
- StreamRef = maybe_dereference(State, StreamRef0),
+ StreamRef = maybe_dereference(State0, StreamRef0),
Commands = Proto:update_flow(ProtoState, ReplyTo, StreamRef, Inc),
- {state, commands(Commands, State)}.
+ {State, undefined} = commands(Commands, State0, undefined, undefined),
+ {state, State}.
closing(_Reason, _State, _EvHandler, EvHandlerState) ->
%% @todo Graceful shutdown must be propagated to tunnels.
@@ -312,17 +327,20 @@ data(State=#tunnel_state{socket=Socket, transport=Transport,
%% We pass the CONNECT request forward and optionally dereference StreamRef.
connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
protocol=Proto, protocol_state=ProtoState0},
- StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow) ->
+ StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow,
+ EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State, StreamRef0),
- ProtoState = Proto:connect(ProtoState0, StreamRef,
- ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow),
- State#tunnel_state{protocol_state=ProtoState}.
+ {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef,
+ ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow,
+ EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
-cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
- StreamRef = maybe_dereference(State, StreamRef0),
- {Commands, EvHandlerState} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
- {{state, commands(Commands, State)}, EvHandlerState}.
+ StreamRef = maybe_dereference(State0, StreamRef0),
+ {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}.
timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) ->
case Proto:timeout(ProtoState0, Msg, TRef) of
@@ -395,27 +413,71 @@ down(_State) ->
%% @todo Tunnels must be included in the gun_down message.
[].
+ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=ProtoState0},
+ StreamRef0, ReplyTo, _, _, Path, Headers, WsOpts, EvHandler, EvHandlerState0) ->
+ StreamRef = maybe_dereference(State, StreamRef0),
+ #{
+ origin_host := Host,
+ origin_port := Port
+ } = TunnelInfo,
+ {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
+
+ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
+ StreamRef = maybe_dereference(State0, StreamRef0),
+ {Commands, EvHandlerState1} = Proto:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}.
+
%% Internal.
-commands(Command, State) when not is_list(Command) ->
- commands([Command], State);
-commands([], State) ->
- State;
-commands([{state, ProtoState}|Tail], State) ->
- commands(Tail, State#tunnel_state{protocol_state=ProtoState});
+commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) ->
+ commands([Command], State, EvHandler, EvHandlerState);
+commands([], State, _, EvHandlerState) ->
+ {State, EvHandlerState};
+commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) ->
+ commands(Tail, State#tunnel_state{protocol_state=ProtoState}, EvHandler, EvHandlerState);
%% @todo We must pass down the set_cookie commands. Have a commands_queue.
-commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) ->
- commands(Tail, State);
+commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}, EvHandler, EvHandlerState) ->
+ commands(Tail, State, EvHandler, EvHandlerState);
%% @todo What to do about IsFin?
-commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) ->
+commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport},
+ EvHandler, EvHandlerState) ->
Transport:send(Socket, Data),
- commands(Tail, State);
-commands([Origin={origin, _Scheme, _NewHost, _NewPort, _Type}|Tail], State) ->
- commands(Tail, State#tunnel_state{protocol_origin=Origin});
+ commands(Tail, State, EvHandler, EvHandlerState);
+commands([Origin={origin, Scheme, Host, Port, Type}|Tail],
+ State=#tunnel_state{stream_ref=StreamRef},
+ EvHandler, EvHandlerState0) ->
+ EvHandlerState = EvHandler:origin_changed(#{
+ stream_ref => StreamRef,
+ type => Type,
+ origin_scheme => Scheme,
+ origin_host => Host,
+ origin_port => Port
+ }, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol_origin=Origin}, EvHandler, EvHandlerState);
+commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
+ State=#tunnel_state{socket=Socket, transport=Transport, opts=Opts,
+ protocol_origin=undefined},
+ EvHandler, EvHandlerState0) ->
+ {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts),
+ %% This should only apply to Websocket for the time being.
+ {connected_ws_only, ProtoState} = Proto:init(ReplyTo, Socket, Transport, ProtoOpts),
+ #{stream_ref := StreamRef} = ProtoOpts,
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => StreamRef,
+ protocol => Proto:name()
+ }, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState);
commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
State=#tunnel_state{transport=Transport, stream_ref=TunnelStreamRef,
info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto,
- protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) ->
+ protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}},
+ EvHandler, EvHandlerState0) ->
StreamRef = case Type of
socks5 -> TunnelStreamRef;
connect -> gun_protocols:stream_ref(NewProtocol)
@@ -450,13 +512,15 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail],
}
},
Proto = gun_tunnel,
- {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
-%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
- commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState});
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo,
+ OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState);
commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail],
State=#tunnel_state{transport=Transport,
info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto,
- protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) ->
+ protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}},
+ EvHandler, EvHandlerState0) ->
#{
stream_ref := StreamRef,
tls_opts := TLSOpts0
@@ -496,10 +560,12 @@ commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail],
}
},
Proto = gun_tunnel,
- {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
- commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState});
-commands([{active, true}|Tail], State) ->
- commands(Tail, State).
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo,
+ OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0),
+ commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+ EvHandler, EvHandlerState);
+commands([{active, true}|Tail], State, EvHandler, EvHandlerState) ->
+ commands(Tail, State, EvHandler, EvHandlerState).
continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) ->
if
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index f59c19c..cd81d65 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -26,6 +26,7 @@
-export([close/4]).
-export([keepalive/3]).
-export([ws_send/5]).
+-export([ws_send/6]).
-export([down/1]).
-record(payload, {
@@ -289,6 +290,9 @@ ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) ->
Other
end.
+ws_send(Frames, State, _StreamRef, ReplyTo, EvHandler, EvHandlerState) ->
+ ws_send(Frames, State, ReplyTo, EvHandler, EvHandlerState).
+
%% Websocket has no concept of streams.
down(_) ->
[].