aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-10-16 11:33:31 +0200
committerLoïc Hoguin <[email protected]>2020-10-16 11:33:31 +0200
commit356bf47edeb5b78765200e78d9b7a48aa98b97f5 (patch)
tree83c35cbb5e7120bd1d1e0a5693571f8b18c088d7 /src/gun_http2.erl
parentf2e8d103dd7827251fa726c42e307e42cef8a3dc (diff)
downloadgun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.gz
gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.bz2
gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.zip
Add or fix events inside or related to CONNECT tunnels
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl179
1 files changed, 133 insertions, 46 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 1072aca..8312954 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -30,12 +30,13 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([down/1]).
-%-export([ws_upgrade/10]).
+-export([ws_upgrade/10]).
+-export([ws_send/6]).
-record(tunnel, {
%% The tunnel can either go requested->established
@@ -274,7 +275,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise -> push_promise_start
end,
EvHandler:EvCallback(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0);
%% Trailers or invalid header frame.
@@ -383,7 +384,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{State1, EvHandlerState1};
@@ -399,7 +400,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
{update_window(State1, StreamID), EvHandlerState0};
fin ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{update_window(State1), EvHandlerState1}
@@ -421,11 +422,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
tunnel=Tunnel
} = Stream,
State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]},
+ RealStreamRef = stream_ref(State, StreamRef),
if
Status >= 100, Status =< 199 ->
- ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
EvHandlerState = EvHandler:response_inform(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -438,16 +440,23 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
origin_host => DestHost,
origin_port => DestPort
},
- %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo.
- %% We therefore do not need to call stream_ref/2.
- RealStreamRef = stream_ref(State, StreamRef),
ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
- EvHandlerState = EvHandler:response_headers(#{
+ EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
}, EvHandlerState0),
+ EvHandlerState2 = EvHandler:origin_changed(#{
+ stream_ref => RealStreamRef,
+ type => connect,
+ origin_scheme => case Destination of
+ #{transport := tls} -> <<"https">>;
+ _ -> <<"http">>
+ end,
+ origin_host => DestHost,
+ origin_port => DestPort
+ }, EvHandlerState1),
ContinueStreamRef = continue_stream_ref(State, StreamRef),
OriginSocket = #{
gun_pid => self(),
@@ -505,14 +514,15 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
}
}
end,
- {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(
+ ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2),
{store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{
info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}),
EvHandlerState};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -520,12 +530,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
{Handlers, EvHandlerState} = case IsFin of
fin ->
EvHandlerState2 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
}, EvHandlerState1),
{undefined, EvHandlerState2};
nofin ->
- {gun_content_handler:init(ReplyTo, StreamRef,
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end,
%% @todo Disable the tunnel if any.
@@ -537,9 +547,10 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts,
trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
ResponseEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
@@ -552,7 +563,7 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef),
{stream_error, Reason, 'Stream reset by server.'}},
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => remote,
reason => Reason
@@ -571,9 +582,10 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
+ RealPromisedStreamRef = stream_ref(State, PromisedStreamRef),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
PushPromiseEvent0 = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
method => Method,
uri => URI,
@@ -581,8 +593,9 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
},
PushPromiseEvent = case Status of
connected ->
- ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers},
- PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef),
+ RealPromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef};
_ ->
PushPromiseEvent0
end,
@@ -744,16 +757,16 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt
Transport:send(Socket, cow_http2:ping(0)),
{State, EvHandlerState}.
-%% @todo tunnel
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) ->
+ Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
Authority = maps:get(authority, PseudoHeaders),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
method => Method,
@@ -769,7 +782,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path=Path},
- {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}.
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState};
+%% Tunneled request.
+headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
+ Path, Headers, InitialFlow, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ %% @todo We should send an error to the user if the stream isn't ready.
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
+ origin_host := OriginHost, origin_port := OriginPort}}} ->
+ {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef,
+ ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
+ InitialFlow, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
+ end.
request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
@@ -781,8 +813,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State0, Method, Host, Port, Path, Headers1),
Authority = maps:get(authority, PseudoHeaders),
+ RealStreamRef = stream_ref(State0, StreamRef),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => ?FUNCTION_NAME,
method => Method,
@@ -806,7 +839,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
case IsFin of
fin ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
{State, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
@@ -814,13 +847,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState)
end;
%% Tunneled request.
-%%
-%% We call Proto:request in a loop until we get to a non-CONNECT stream.
-%% When the transport is gun_tls_proxy we receive the TLS data
-%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast
-%% directly. The 'data' cast contains the tunnel for the StreamRef.
-%% The tunnel is given as the socket and the gun_tls_proxy out_socket
-%% is always a gun_tcp_proxy that sends a 'data' cast.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
@@ -937,7 +963,7 @@ send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) ->
fin ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState0)
@@ -978,7 +1004,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
- Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0)
+ Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0,
+ EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
@@ -1004,37 +1031,59 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
Headers1
end,
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => connect,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
- create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream);
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ EvHandlerState};
%% Tunneled request.
-connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) ->
+connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
+ EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo Should we send an error to the user if the stream isn't ready.
Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- ProtoState = Proto:connect(ProtoState0, RealStreamRef,
- ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow),
- store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}});
+ {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef,
+ ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
+ EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- State;
+ {State, EvHandlerState0};
error ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
- StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => local,
reason => cancel
@@ -1044,6 +1093,22 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
EvHandlerState0}
+ end;
+%% Tunneled request.
+cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef,
+ ReplyTo, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
+ EvHandlerState};
+ #stream{tunnel=undefined} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}},
+ {State, EvHandlerState0};
+ error ->
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) ->
@@ -1123,6 +1188,28 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->
down(#http2_state{stream_refs=Refs}) ->
maps:keys(Refs).
+%% Websocket upgrades are currently only accepted when tunneled.
+ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, RealStreamRef, ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0),
+ {store_stream(State, Stream#stream{
+ tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), EvHandlerState}
+ %% @todo Error conditions?
+ end.
+
+ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State0, StreamRef) of
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} ->
+ {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState,
+ RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1),
+ {{state, State}, EvHandlerState}
+ %% @todo Error conditions?
+ end.
+
connection_error(#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine, streams=Streams},
Error={connection_error, Reason, HumanReadable}) ->