diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 64 | ||||
-rw-r--r-- | src/gun_event.erl | 52 | ||||
-rw-r--r-- | src/gun_http.erl | 81 | ||||
-rw-r--r-- | src/gun_http2.erl | 179 | ||||
-rw-r--r-- | src/gun_tunnel.erl | 196 | ||||
-rw-r--r-- | src/gun_ws.erl | 4 |
6 files changed, 393 insertions, 183 deletions
diff --git a/src/gun.erl b/src/gun.erl index 217133f..62abd6f 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -93,6 +93,7 @@ -export([ws_upgrade/3]). -export([ws_upgrade/4]). -export([ws_send/2]). +-export([ws_send/3]). %% Internals. -export([start_link/4]). @@ -274,7 +275,8 @@ keepalive => timeout(), protocols => [{binary(), module()}], reply_to => pid(), - silence_pings => boolean() + silence_pings => boolean(), + tunnel => stream_ref() }. -export_type([ws_opts/0]). @@ -926,20 +928,26 @@ ws_upgrade(ServerPid, Path, Headers) -> StreamRef. -spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref(). -ws_upgrade(ServerPid, Path, Headers, Opts) -> +ws_upgrade(ServerPid, Path, Headers, Opts0) -> + Tunnel = get_tunnel(Opts0), + Opts = maps:without([tunnel], Opts0), ok = gun_ws:check_options(Opts), - StreamRef = make_ref(), + StreamRef = make_stream_ref(Tunnel), ReplyTo = maps:get(reply_to, Opts, self()), %% @todo Also accept tunnel option. gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, normalize_headers(Headers), Opts}), StreamRef. %% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. -%% But it can be kept for the time being since it can still work for HTTP/1.1. +%% But it can be kept for the time being since it can still work for HTTP/1.1 (connected_ws_only). -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> gen_statem:cast(ServerPid, {ws_send, self(), Frames}). +-spec ws_send(pid(), stream_ref(), ws_frame() | [ws_frame()]) -> ok. +ws_send(ServerPid, StreamRef, Frames) -> + gen_statem:cast(ServerPid, {ws_send, self(), StreamRef, Frames}). + %% Internals. callback_mode() -> state_functions. @@ -1208,6 +1216,12 @@ connected_data_only(cast, Msg, _) connected_data_only(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). +connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ + protocol=Protocol=gun_ws, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{ protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -1259,13 +1273,13 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState}) -> - %% @todo No events are currently handled for the CONNECT request? - ProtoState2 = Protocol:connect(ProtoState, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Destination, #{host => Host, port => Port}, - Headers, InitialFlow), - {keep_state, State#state{protocol_state=ProtoState2}}; + Headers, InitialFlow, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1275,8 +1289,7 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{op connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts}, State0=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) - when Protocol =:= gun_http -> + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> EvHandlerState1 = EvHandler:ws_upgrade(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -1285,14 +1298,19 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts}, %% @todo Can fail if HTTP/1.0. {Headers, State} = add_cookie_header(Path, Headers0, State0), {ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState, - StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts, - EvHandler, EvHandlerState1), + dereference_stream_ref(StreamRef, State), ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, - "Websocket is only supported over HTTP/1.1."}}, - keep_state_and_data; +%% @todo Maybe better standardize the protocol callbacks argument orders. +connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) + when is_list(StreamRef) -> + {Commands, EvHandlerState} = Protocol:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); +%% Catch-all for the StreamRef-free variant. connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " @@ -1472,7 +1490,7 @@ handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), + dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _, State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) -> @@ -1721,7 +1739,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{ _ -> ProtoOpts0#{tunnel_transport => tcp} end, {StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts), - EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + ProtocolChangedEvent = case ProtoOpts of + #{stream_ref := StreamRef} -> + #{stream_ref => StreamRef, protocol => Protocol:name()}; + _ -> + #{protocol => Protocol:name()} + end, + EvHandlerState = EvHandler:protocol_changed(ProtocolChangedEvent, EvHandlerState0), %% We cancel the existing keepalive and, depending on the protocol, %% we enable keepalive again, effectively resetting the timer. State = keepalive_cancel(active(State0#state{protocol=Protocol, protocol_state=ProtoState, @@ -1755,6 +1779,8 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, KilledStreams = Protocol:down(ProtoState), Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams}, Retry = maps:get(retry, Opts, 5), + %% @todo We need to reset the origin_scheme/host/port and the transport + %% as well as remove the intermediaries. {next_state, not_connected, keepalive_cancel(State#state{socket=undefined, protocol=undefined, protocol_state=undefined}), diff --git a/src/gun_event.erl b/src/gun_event.erl index b2a71db..513fa9f 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -62,10 +62,10 @@ %% These events occur when connecting to a TLS server or when %% upgrading the connection or stream to use TLS, for example %% using CONNECT. The stream_ref/reply_to values are only -%% present when the TLS handshake occurs as a result of a request. +%% present when the TLS handshake occurs in the scope of a request. -type tls_handshake_event() :: #{ - stream_ref => reference(), + stream_ref => gun:stream_ref(), reply_to => pid(), socket := inet:socket() | ssl:sslsocket() | pid(), %% The socket before/after will be different. tls_opts := [ssl:tls_client_option()], @@ -81,13 +81,13 @@ %% request_start/request_headers. -type request_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), function := headers | request | ws_upgrade, method := iodata(), scheme => binary(), authority := iodata(), - path := iodata(), + path => iodata(), headers := [{binary(), iodata()}] }. -export_type([request_start_event/0]). @@ -98,7 +98,7 @@ %% request_end. -type request_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([request_end_event/0]). @@ -108,7 +108,7 @@ %% push_promise_start. -type push_promise_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([push_promise_start_event/0]). @@ -118,12 +118,12 @@ %% push_promise_end. -type push_promise_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), %% No stream is created if we receive the push_promise while %% in the process of gracefully shutting down the connection. %% The promised stream is canceled immediately. - promised_stream_ref => reference(), + promised_stream_ref => gun:stream_ref(), method := binary(), uri := binary(), headers := [{binary(), iodata()}] @@ -135,7 +135,7 @@ %% response_start. -type response_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([response_start_event/0]). @@ -145,7 +145,7 @@ %% response_inform/response_headers. -type response_headers_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), status := non_neg_integer(), headers := [{binary(), binary()}] @@ -158,7 +158,7 @@ %% response_trailers. -type response_trailers_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), headers := [{binary(), binary()}] }. @@ -169,7 +169,7 @@ %% response_end. -type response_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid() }. -export_type([response_end_event/0]). @@ -186,7 +186,7 @@ %% response. -type ws_upgrade_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), opts := gun:ws_opts() }. @@ -197,7 +197,7 @@ %% ws_recv_frame_start. -type ws_recv_frame_start_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), frag_state := cow_ws:frag_state(), extensions := cow_ws:extensions() @@ -209,7 +209,7 @@ %% ws_recv_frame_header. -type ws_recv_frame_header_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), frag_state := cow_ws:frag_state(), extensions := cow_ws:extensions(), @@ -225,7 +225,7 @@ %% ws_recv_frame_end. -type ws_recv_frame_end_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), extensions := cow_ws:extensions(), close_code := undefined | cow_ws:close_code(), @@ -238,7 +238,7 @@ %% ws_send_frame_start/ws_send_frame_end. -type ws_send_frame_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), extensions := cow_ws:extensions(), frame := gun:ws_frame() @@ -251,13 +251,10 @@ %% protocol_changed. %% %% This event can occur either following a successful ws_upgrade -%% event or following a successful CONNECT request. -%% -%% @todo Currently there is only a connection-wide variant of this -%% event. In the future there will be a stream-wide variant to -%% support CONNECT and Websocket over HTTP/2. +%% event, following a successful CONNECT request or a SOCKS tunnel. -type protocol_changed_event() :: #{ + stream_ref := gun:stream_ref(), protocol := http | http2 | socks | ws }. -export_type([protocol_changed_event/0]). @@ -268,9 +265,11 @@ %% %% This event can occur following a successful CONNECT request. %% -%% @todo Currently there is only a connection-wide variant of this -%% event. In the future there will be a stream-wide variant to -%% support CONNECT through TLS proxies over HTTP/2. +%% @todo I think this event should be removed. We already know +%% about the transport being TLS via the tls_handshake events. +%% Perhaps we should provide the socket in tls_handshake_end. +%% We already do!! Therefore what's the point of this event? +%% Remove it!! -type transport_changed_event() :: #{ socket := ssl:sslsocket() | pid(), @@ -283,6 +282,7 @@ %% origin_changed. -type origin_changed_event() :: #{ + stream_ref := gun:stream_ref(), type := connect, %% @todo socks? origin_scheme := binary(), origin_host := inet:hostname() | inet:ip_address(), @@ -303,7 +303,7 @@ %% Events may still occur for a short time after the cancel. -type cancel_event() :: #{ - stream_ref := reference(), + stream_ref := gun:stream_ref(), reply_to := pid(), endpoint := local | remote, reason := atom() diff --git a/src/gun_http.erl b/src/gun_http.erl index e2c0f1d..d877068 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -29,7 +29,7 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -195,7 +195,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {nofin, EvHandlerState0}; no_trailers -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {fin, EvHandlerState1} @@ -218,7 +218,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {nofin, EvHandlerState0}; no_trailers -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {fin, EvHandlerState1} @@ -243,9 +243,10 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, ResponseEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), @@ -272,7 +273,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, %% We ignore the active command because the stream ended. [{state, State1}|_] = send_data(Data, State, fin), EvHandlerState = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), case Conn of @@ -287,7 +288,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, %% We ignore the active command because the stream ended. [{state, State1}|_] = send_data(Body, State, fin), EvHandlerState = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), case Conn of @@ -396,15 +397,15 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandler, EvHandlerState0, Version, Status, Headers) -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, + RealStreamRef = stream_ref(State, StreamRef), %% @todo Figure out whether the event should trigger if the stream was cancelled. {Handlers, EvHandlerState2} = case IsAlive of false -> {undefined, EvHandlerState0}; true -> - ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), - IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -413,7 +414,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec fin -> {undefined, EvHandlerState1}; nofin -> Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]), - {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef), + {gun_content_handler:init(ReplyTo, RealStreamRef, Status, Headers, Handlers0), EvHandlerState1} end end, @@ -422,7 +423,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandlerState2; fin -> EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState2) end, @@ -509,7 +510,7 @@ close(Reason, State=#http_state{in=body_close, close_streams(State, Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState); close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) -> @@ -601,8 +602,9 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi {undefined, _} -> Headers4; _ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4] end, + RealStreamRef = stream_ref(State, StreamRef), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, function => Function, method => Method, @@ -618,7 +620,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi EvHandlerState = case Out of head -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState2); @@ -672,7 +674,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, ]) end, RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), @@ -686,7 +688,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, if Length2 =:= 0, IsFin =:= fin -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), @@ -702,17 +704,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. -connect(State, StreamRef, ReplyTo, _, _, _, _) +connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - State; -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] -> + {State, EvHandlerState}; +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) + when Streams =/= [] -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, - State; + {State, EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, - StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0) -> + StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, + EvHandler, EvHandlerState0) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -736,12 +740,29 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version Headers2 end, Headers = transform_header_names(State, Headers3), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => connect, + method => <<"CONNECT">>, + authority => Authority, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), Transport:send(Socket, [ cow_http:request(<<"CONNECT">>, Authority, Version, Headers) ]), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), - new_stream(State, {connect, StreamRef, Destination}, ReplyTo, - <<"CONNECT">>, Authority, <<>>, InitialFlow). + {new_stream(State, {connect, StreamRef, Destination}, ReplyTo, + <<"CONNECT">>, Authority, <<>>, InitialFlow), + EvHandlerState}. %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> @@ -749,7 +770,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> true -> State = cancel_stream(State0, StreamRef), EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => local, reason => cancel @@ -876,6 +897,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. +ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) + when is_list(StreamRef) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}, + {State, EvHandlerState}; ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, @@ -986,9 +1012,10 @@ ws_handshake_end(Buffer, self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, headers => Headers, extensions => Extensions, flow => InitialFlow, diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1072aca..8312954 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -30,12 +30,13 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([down/1]). -%-export([ws_upgrade/10]). +-export([ws_upgrade/10]). +-export([ws_send/6]). -record(tunnel, { %% The tunnel can either go requested->established @@ -274,7 +275,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise -> push_promise_start end, EvHandler:EvCallback(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0); %% Trailers or invalid header frame. @@ -383,7 +384,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {State1, EvHandlerState1}; @@ -399,7 +400,7 @@ data_frame(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, {update_window(State1, StreamID), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {update_window(State1), EvHandlerState1} @@ -421,11 +422,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, tunnel=Tunnel } = Stream, State = State0#http2_state{commands_queue=[{set_cookie, Authority, Path, Status, Headers}|Commands]}, + RealStreamRef = stream_ref(State, StreamRef), if Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, EvHandlerState = EvHandler:response_inform(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -438,16 +440,23 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, origin_host => DestHost, origin_port => DestPort }, - %% In the case of CONNECT responses the RealStreamRef is found in TunnelInfo. - %% We therefore do not need to call stream_ref/2. - RealStreamRef = stream_ref(State, StreamRef), ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, - EvHandlerState = EvHandler:response_headers(#{ + EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), + EvHandlerState2 = EvHandler:origin_changed(#{ + stream_ref => RealStreamRef, + type => connect, + origin_scheme => case Destination of + #{transport := tls} -> <<"https">>; + _ -> <<"http">> + end, + origin_host => DestHost, + origin_port => DestPort + }, EvHandlerState1), ContinueStreamRef = continue_stream_ref(State, StreamRef), OriginSocket = #{ gun_pid => self(), @@ -505,14 +514,15 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, } } end, - {tunnel, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), + {tunnel, ProtoState, EvHandlerState} = Proto:init( + ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{ info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), EvHandlerState}; true -> - ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -520,12 +530,12 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, {Handlers, EvHandlerState} = case IsFin of fin -> EvHandlerState2 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1), {undefined, EvHandlerState2}; nofin -> - {gun_content_handler:init(ReplyTo, StreamRef, + {gun_content_handler:init(ReplyTo, RealStreamRef, Status, Headers, Handlers0), EvHandlerState1} end, %% @todo Disable the tunnel if any. @@ -537,9 +547,10 @@ headers_frame(State0=#http2_state{transport=Transport, opts=Opts, trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, ResponseEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), @@ -552,7 +563,7 @@ rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef), {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => remote, reason => Reason @@ -571,9 +582,10 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), + RealPromisedStreamRef = stream_ref(State, PromisedStreamRef), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), PushPromiseEvent0 = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, method => Method, uri => URI, @@ -581,8 +593,9 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, }, PushPromiseEvent = case Status of connected -> - ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), PromisedStreamRef, Method, URI, Headers}, - PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), + RealPromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef}; _ -> PushPromiseEvent0 end, @@ -744,16 +757,16 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt Transport:send(Socket, cow_http2:ping(0)), {State, EvHandlerState}. -%% @todo tunnel headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> + Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), Authority = maps:get(authority, PseudoHeaders), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, @@ -769,7 +782,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path}, - {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}. + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}; +%% Tunneled request. +headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, + Path, Headers, InitialFlow, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + %% @todo We should send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + origin_host := OriginHost, origin_port := OriginPort}}} -> + {ProtoState, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef, + ReplyTo, Method, OriginHost, OriginPort, Path, Headers, + InitialFlow, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} + end. request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, @@ -781,8 +813,9 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State0, Method, Host, Port, Path, Headers1), Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = stream_ref(State0, StreamRef), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, @@ -806,7 +839,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, case IsFin of fin -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, {State, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; @@ -814,13 +847,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, maybe_send_data(State, StreamID, fin, Body, EvHandler, EvHandlerState) end; %% Tunneled request. -%% -%% We call Proto:request in a loop until we get to a non-CONNECT stream. -%% When the transport is gun_tls_proxy we receive the TLS data -%% as a 'data' cast; when gun_tcp_proxy we receive the 'data' cast -%% directly. The 'data' cast contains the tunnel for the StreamRef. -%% The tunnel is given as the socket and the gun_tls_proxy out_socket -%% is always a gun_tcp_proxy that sends a 'data' cast. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of @@ -937,7 +963,7 @@ send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> fin -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState0) @@ -978,7 +1004,8 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, - Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0) + Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0, + EvHandler, EvHandlerState0) when is_reference(StreamRef) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); @@ -1004,37 +1031,59 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, Headers1 end, {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(<<"CONNECT">>, HTTP2Machine0), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => connect, + method => <<"CONNECT">>, + authority => Authority, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, - create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream); + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + EvHandlerState}; %% Tunneled request. -connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow) -> +connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, + EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo Should we send an error to the user if the stream isn't ready. Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - ProtoState = Proto:connect(ProtoState0, RealStreamRef, - ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow), - store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}); + {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef, + ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, + EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - State; + {State, EvHandlerState0}; error -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, - StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef, ReplyTo, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => local, reason => cancel @@ -1044,6 +1093,22 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} + end; +%% Tunneled request. +cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef, + ReplyTo, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), + EvHandlerState}; + #stream{tunnel=undefined} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}, + {State, EvHandlerState0}; + error -> + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. timeout(State=#http2_state{http2_machine=HTTP2Machine0}, {cow_http2_machine, undefined, Name}, TRef) -> @@ -1123,6 +1188,28 @@ stream_info(State, RealStreamRef=[StreamRef|_]) -> down(#http2_state{stream_refs=Refs}) -> maps:keys(Refs). +%% Websocket upgrades are currently only accepted when tunneled. +ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State, StreamRef) of + Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, RealStreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0), + {store_stream(State, Stream#stream{ + tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), EvHandlerState} + %% @todo Error conditions? + end. + +ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> + case get_stream_by_ref(State0, StreamRef) of + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} -> + {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState, + RealStreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = tunnel_commands(Commands, Stream, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState} + %% @todo Error conditions? + end. + connection_error(#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, streams=Streams}, Error={connection_error, Reason, HumanReadable}) -> diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index ca9d3aa..cc58351 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -17,7 +17,7 @@ %% by the tunnel layer. -module(gun_tunnel). --export([init/4]). +-export([init/6]). -export([handle/4]). -export([handle_continue/5]). -export([update_flow/4]). @@ -27,13 +27,14 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). -export([tunneled_name/2]). -export([down/1]). -%-export([ws_upgrade/10]). +-export([ws_upgrade/10]). +-export([ws_send/6]). -record(tunnel_state, { %% Fake socket and transport. @@ -97,7 +98,8 @@ %% with some extra information added for the tunnel. %% %% @todo Mark the tunnel options as reserved. -init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}) -> +init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tunnel := Tunnel}, + EvHandler, EvHandlerState0) -> #{ type := TunnelType, transport_name := TunnelTransport, @@ -113,7 +115,10 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), %% When the tunnel protocol is HTTP/1.1 or SOCKS %% the gun_tunnel_up message was already sent. %% @@ -124,34 +129,36 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()} end, {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, - protocol=Proto, protocol_state=ProtoState}}; + protocol=Proto, protocol_state=ProtoState}, + EvHandlerState}; %% We can't initialize the protocol until the TLS handshake has completed. - #{handshake_event := HandshakeEvent, protocols := Protocols} -> + #{handshake_event := HandshakeEvent0, protocols := Protocols} -> #{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket, #{ origin_host := DestHost, origin_port := DestPort } = TunnelInfo, -%% @todo OK so Protocol:init/4 will need to have EvHandler/EvHandlerState! -%% Otherwise we can't do the TLS events. #{ tls_opts := TLSOpts, timeout := TLSTimeout - } = HandshakeEvent, + } = HandshakeEvent0, + HandshakeEvent = HandshakeEvent0#{socket => OriginSocket}, + EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0), {ok, ProxyPid} = gun_tls_proxy:start_link(DestHost, DestPort, TLSOpts, TLSTimeout, OriginSocket, gun_tls_proxy_http2_connect, {handle_continue, ContinueStreamRef, HandshakeEvent, Protocols}), {tunnel, State#tunnel_state{socket=ProxyPid, transport=gun_tls_proxy, - tls_origin_socket=OriginSocket}} + tls_origin_socket=OriginSocket}, EvHandlerState} end. %% When we receive data we pass it forward directly for TCP; %% or we decrypt it and pass it via handle_continue for TLS. -handle(Data, State=#tunnel_state{transport=gun_tcp_proxy, +handle(Data, State0=#tunnel_state{transport=gun_tcp_proxy, protocol=Proto, protocol_state=ProtoState0}, EvHandler, EvHandlerState0) -> - {Commands, EvHandlerState} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}; + {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState0, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}; handle(Data, State=#tunnel_state{transport=gun_tls_proxy, socket=ProxyPid, tls_origin_socket=OriginSocket}, _EvHandler, EvHandlerState) -> @@ -170,15 +177,19 @@ handle(Data, State=#tunnel_state{transport=gun_tls_proxy, handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, {handle_continue, _, HandshakeEvent, Protocols}}, State=#tunnel_state{socket=ProxyPid, stream_ref=StreamRef, opts=Opts}, - _EvHandler, EvHandlerState0) + EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> #{reply_to := ReplyTo} = HandshakeEvent, NewProtocol = gun_protocols:negotiated(Negotiated, Protocols), {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), -% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -% socket => Socket, -% protocol => NewProtocol -% }, EvHandlerState0), + EvHandlerState1 = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => ProxyPid, + protocol => NewProtocol + }, EvHandlerState0), + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => NewProtocol + }, EvHandlerState1), %% @todo Terminate the current protocol or something? OriginSocket = #{ gun_pid => self(), @@ -188,15 +199,15 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}), ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, - {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState0}; + {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, EvHandlerState}; handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, - {handle_continue, _, _HandshakeEvent, _}}, - #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) + {handle_continue, _, HandshakeEvent, _}}, + #tunnel_state{socket=ProxyPid}, EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> -%% EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ -%% error => Reason -%% }, EvHandlerState0), -%%% @todo + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), +%% @todo %% The TCP connection can be closed by either peer. The END_STREAM flag %% on a DATA frame is treated as being equivalent to the TCP FIN bit. A %% client is expected to send a DATA frame with the END_STREAM flag set @@ -206,18 +217,19 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, %% receives a TCP segment with the FIN bit set sends a DATA frame with %% the END_STREAM flag set. Note that the final TCP segment or DATA %% frame could be empty. - {{error, Reason}, EvHandlerState0}; + {{error, Reason}, EvHandlerState}; %% Send the data. This causes TLS to encrypt the data and send it to the inner layer. handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data}, #tunnel_state{}, _EvHandler, EvHandlerState) when is_reference(ContinueStreamRef) -> {{send, IsFin, Data}, EvHandlerState}; handle_continue(ContinueStreamRef, {tls_proxy, ProxyPid, Data}, - State=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, + State0=#tunnel_state{socket=ProxyPid, protocol=Proto, protocol_state=ProtoState}, EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> - {Commands, EvHandlerState} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}; + {Commands, EvHandlerState1} = Proto:handle(Data, ProtoState, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}; handle_continue(ContinueStreamRef, {tls_proxy_closed, ProxyPid}, #tunnel_state{socket=ProxyPid}, _EvHandler, EvHandlerState0) when is_reference(ContinueStreamRef) -> @@ -233,21 +245,24 @@ handle_continue(ContinueStreamRef, {tls_proxy_error, ProxyPid, Reason}, %% %% @todo Assert StreamRef to be our reference(). handle_continue([_StreamRef|ContinueStreamRef0], Msg, - State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, EvHandler, EvHandlerState0) -> ContinueStreamRef = case ContinueStreamRef0 of [CSR] -> CSR; _ -> ContinueStreamRef0 end, - {Commands, EvHandlerState} = Proto:handle_continue(ContinueStreamRef, + {Commands, EvHandlerState1} = Proto:handle_continue(ContinueStreamRef, Msg, ProtoState, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}. + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. -update_flow(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +%% @todo This function will need EvHandler/EvHandlerState? +update_flow(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, ReplyTo, StreamRef0, Inc) -> - StreamRef = maybe_dereference(State, StreamRef0), + StreamRef = maybe_dereference(State0, StreamRef0), Commands = Proto:update_flow(ProtoState, ReplyTo, StreamRef, Inc), - {state, commands(Commands, State)}. + {State, undefined} = commands(Commands, State0, undefined, undefined), + {state, State}. closing(_Reason, _State, _EvHandler, EvHandlerState) -> %% @todo Graceful shutdown must be propagated to tunnels. @@ -312,17 +327,20 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, %% We pass the CONNECT request forward and optionally dereference StreamRef. connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, protocol=Proto, protocol_state=ProtoState0}, - StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow) -> + StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow, + EvHandler, EvHandlerState0) -> StreamRef = maybe_dereference(State, StreamRef0), - ProtoState = Proto:connect(ProtoState0, StreamRef, - ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow), - State#tunnel_state{protocol_state=ProtoState}. + {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef, + ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, + EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. -cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> - StreamRef = maybe_dereference(State, StreamRef0), - {Commands, EvHandlerState} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {{state, commands(Commands, State)}, EvHandlerState}. + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) -> case Proto:timeout(ProtoState0, Msg, TRef) of @@ -395,27 +413,71 @@ down(_State) -> %% @todo Tunnels must be included in the gun_down message. []. +ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=ProtoState0}, + StreamRef0, ReplyTo, _, _, Path, Headers, WsOpts, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State, StreamRef0), + #{ + origin_host := Host, + origin_port := Port + } = TunnelInfo, + {ProtoState, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + +ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, EvHandlerState1} = Proto:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, EvHandlerState}. + %% Internal. -commands(Command, State) when not is_list(Command) -> - commands([Command], State); -commands([], State) -> - State; -commands([{state, ProtoState}|Tail], State) -> - commands(Tail, State#tunnel_state{protocol_state=ProtoState}); +commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) -> + commands([Command], State, EvHandler, EvHandlerState); +commands([], State, _, EvHandlerState) -> + {State, EvHandlerState}; +commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) -> + commands(Tail, State#tunnel_state{protocol_state=ProtoState}, EvHandler, EvHandlerState); %% @todo We must pass down the set_cookie commands. Have a commands_queue. -commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}) -> - commands(Tail, State); +commands([_SetCookie={set_cookie, _, _, _, _}|Tail], State=#tunnel_state{}, EvHandler, EvHandlerState) -> + commands(Tail, State, EvHandler, EvHandlerState); %% @todo What to do about IsFin? -commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}) -> +commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}, + EvHandler, EvHandlerState) -> Transport:send(Socket, Data), - commands(Tail, State); -commands([Origin={origin, _Scheme, _NewHost, _NewPort, _Type}|Tail], State) -> - commands(Tail, State#tunnel_state{protocol_origin=Origin}); + commands(Tail, State, EvHandler, EvHandlerState); +commands([Origin={origin, Scheme, Host, Port, Type}|Tail], + State=#tunnel_state{stream_ref=StreamRef}, + EvHandler, EvHandlerState0) -> + EvHandlerState = EvHandler:origin_changed(#{ + stream_ref => StreamRef, + type => Type, + origin_scheme => Scheme, + origin_host => Host, + origin_port => Port + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol_origin=Origin}, EvHandler, EvHandlerState); +commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], + State=#tunnel_state{socket=Socket, transport=Transport, opts=Opts, + protocol_origin=undefined}, + EvHandler, EvHandlerState0) -> + {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), + %% This should only apply to Websocket for the time being. + {connected_ws_only, ProtoState} = Proto:init(ReplyTo, Socket, Transport, ProtoOpts), + #{stream_ref := StreamRef} = ProtoOpts, + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, stream_ref=TunnelStreamRef, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, - protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) -> + protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}, + EvHandler, EvHandlerState0) -> StreamRef = case Type of socks5 -> TunnelStreamRef; connect -> gun_protocols:stream_ref(NewProtocol) @@ -450,13 +512,15 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), -%% @todo EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); + {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, + OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, - protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}) -> + protocol_origin={origin, _Scheme, OriginHost, OriginPort, Type}}, + EvHandler, EvHandlerState0) -> #{ stream_ref := StreamRef, tls_opts := TLSOpts0 @@ -496,10 +560,12 @@ commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {_, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}); -commands([{active, true}|Tail], State) -> - commands(Tail, State). + {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, + OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); +commands([{active, true}|Tail], State, EvHandler, EvHandlerState) -> + commands(Tail, State, EvHandler, EvHandlerState). continue_stream_ref(#tunnel_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}) -> if diff --git a/src/gun_ws.erl b/src/gun_ws.erl index f59c19c..cd81d65 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -26,6 +26,7 @@ -export([close/4]). -export([keepalive/3]). -export([ws_send/5]). +-export([ws_send/6]). -export([down/1]). -record(payload, { @@ -289,6 +290,9 @@ ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) -> Other end. +ws_send(Frames, State, _StreamRef, ReplyTo, EvHandler, EvHandlerState) -> + ws_send(Frames, State, ReplyTo, EvHandler, EvHandlerState). + %% Websocket has no concept of streams. down(_) -> []. |