From 00cc1f385f94823a0684deee001b643091e235b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 26 Sep 2019 13:16:56 +0200 Subject: Add reply_to option to ws_upgrade; remove notowner entirely The reply_to option is also propagated when we switch protocols. --- src/gun_http.erl | 77 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 40 insertions(+), 37 deletions(-) (limited to 'src/gun_http.erl') diff --git a/src/gun_http.erl b/src/gun_http.erl index f27563e..80c83bb 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -33,7 +33,7 @@ -export([cancel/5]). -export([stream_info/2]). -export([down/1]). --export([ws_upgrade/9]). +-export([ws_upgrade/10]). %% Functions shared with gun_http2. -export([host_header/3]). @@ -43,11 +43,16 @@ %% @todo Make that a record. -type connect_info() :: {connect, reference(), gun:connect_destination()}. -%% @todo Make that a record. --type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options +-record(websocket, { + ref :: reference(), + reply_to :: pid(), + key :: binary(), + extensions :: [binary()], + opts :: gun:ws_opts() +}). -record(stream, { - ref :: reference() | connect_info() | websocket_info(), + ref :: reference() | connect_info() | #websocket{}, reply_to :: pid(), flow :: integer() | infinity, method :: binary(), @@ -56,7 +61,6 @@ }). -record(http_state, { - owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:http_opts(), @@ -101,9 +105,9 @@ opts_name() -> http_opts. has_keepalive() -> true. default_keepalive() -> infinity. -init(Owner, Socket, Transport, Opts) -> +init(_ReplyTo, Socket, Transport, Opts) -> Version = maps:get(version, Opts, 'HTTP/1.1'), - {connected, #http_state{owner=Owner, socket=Socket, transport=Transport, + {connected, #http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}}. switch_transport(Transport, Socket, State) -> @@ -274,16 +278,15 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts, }, EvHandlerState0), %% @todo We might want to switch to the HTTP/2 protocol or to the TLS transport as well. case StreamRef of - {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts} -> - {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts), - EvHandlerState}; + #websocket{} -> + {ws_handshake(Rest2, State, StreamRef, Headers), EvHandlerState}; %% Any other 101 response results in us switching to the raw protocol. %% @todo We should check that we asked for an upgrade before accepting it. _ -> {_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers), Upgrade = cow_http_hd:parse_upgrade(Upgrade0), ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers}, - {{switch_protocol, raw}, EvHandlerState0} + {{switch_protocol, raw, ReplyTo}, EvHandlerState0} end; %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> @@ -317,11 +320,11 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts, }, Protocols = maps:get(protocols, Destination, [http2, http]), {[{origin, <<"https">>, NewHost, NewPort, connect}, - {tls_handshake, HandshakeEvent, Protocols}], EvHandlerState1}; + {tls_handshake, HandshakeEvent, Protocols, ReplyTo}], EvHandlerState1}; _ -> [Protocol] = maps:get(protocols, Destination, [http]), {[{origin, <<"http">>, NewHost, NewPort, connect}, - {switch_protocol, Protocol}], EvHandlerState1} + {switch_protocol, Protocol, ReplyTo}], EvHandlerState1} end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, @@ -389,7 +392,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts, end. stream_ref({connect, StreamRef, _}) -> StreamRef; -stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; +stream_ref(#websocket{ref=StreamRef}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. %% The state must be first in order to retrieve it when the stream ended. @@ -702,7 +705,7 @@ stream_info(#http_state{streams=Streams}, StreamRef) -> down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of {connect, Ref2, _} -> Ref2; - {websocket, Ref2, _, _, _} -> Ref2; + #websocket{ref=Ref2} -> Ref2; _ -> Ref end || #stream{ref=Ref} <- Streams], {KilledStreams, []}. @@ -788,13 +791,13 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. -ws_upgrade(#http_state{owner=ReplyTo, version='HTTP/1.0'}, StreamRef, _, _, _, _, _, _, EvHandlerState) -> +ws_upgrade(#http_state{version='HTTP/1.0'}, + StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket cannot be used over an HTTP/1.0 connection."}}, {[], EvHandlerState}; -ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, - StreamRef, Host, Port, Path, Headers0, WsOpts, - EvHandler, EvHandlerState0) -> +ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, + Host, Port, Path, Headers0, WsOpts, EvHandler, EvHandlerState0) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} @@ -821,10 +824,10 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, EvHandler, EvHandlerState0, ?FUNCTION_NAME), InitialFlow = maps:get(flow, WsOpts, infinity), {new_stream(State#http_state{connection=Conn, out=Out}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow), - EvHandlerState}. + #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts}, + ReplyTo, <<"GET">>, InitialFlow), EvHandlerState}. -ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> +ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) -> %% @todo check upgrade, connection case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of false -> @@ -832,23 +835,23 @@ ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> {_, Accept} -> case cow_ws:encode_key(Key) of Accept -> - ws_handshake_extensions(Buffer, State, StreamRef, - Headers, GunExtensions, Opts); + ws_handshake_extensions(Buffer, State, Ws, Headers); _ -> close end end. -ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) -> +ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) -> case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of false -> - ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts); + ws_handshake_protocols(Buffer, State, Ws, Headers, #{}); {_, ExtHd} -> - case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of + ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd), + case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of close -> close; Extensions -> - ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) + ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions) end end. @@ -870,25 +873,25 @@ ws_validate_extensions(_, _, _, _) -> close. %% @todo Validate protocols. -ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) -> +ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) -> case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of false -> - ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions, - maps:get(default_protocol, Opts, gun_ws_h), Opts); + ws_handshake_end(Buffer, State, Ws, Headers, Extensions, + maps:get(default_protocol, Opts, gun_ws_h)); {_, Proto} -> ProtoOpt = maps:get(protocols, Opts, []), case lists:keyfind(Proto, 1, ProtoOpt) of {_, Handler} -> - ws_handshake_end(Buffer, State, StreamRef, - Headers, Extensions, Handler, Opts); + ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler); false -> close end end. %% We know that the most recent stream is the Websocket one. -ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport, - streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) -> +ws_handshake_end(Buffer, + #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]}, + #websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> @@ -898,7 +901,7 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, + ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ stream_ref => StreamRef, headers => Headers, @@ -906,4 +909,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans flow => InitialFlow, handler => Handler, opts => Opts - }}}. + }}, ReplyTo}. -- cgit v1.2.3