From 00cc1f385f94823a0684deee001b643091e235b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 26 Sep 2019 13:16:56 +0200 Subject: Add reply_to option to ws_upgrade; remove notowner entirely The reply_to option is also propagated when we switch protocols. --- doc/src/guide/connect.asciidoc | 4 +- doc/src/guide/http.asciidoc | 4 +- doc/src/manual/gun_socks_up.asciidoc | 2 +- src/gun.erl | 60 +++++++++++++--------------- src/gun_http.erl | 77 +++++++++++++++++++----------------- src/gun_http2.erl | 5 +-- src/gun_raw.erl | 10 ++--- src/gun_socks.erl | 12 +++--- src/gun_ws.erl | 45 +++++++++++---------- test/raw_SUITE.erl | 61 ++++++++++++++++++++++++++++ test/ws_SUITE.erl | 18 +++++++++ 11 files changed, 190 insertions(+), 108 deletions(-) diff --git a/doc/src/guide/connect.asciidoc b/doc/src/guide/connect.asciidoc index a2c0a28..08f8db2 100644 --- a/doc/src/guide/connect.asciidoc +++ b/doc/src/guide/connect.asciidoc @@ -16,10 +16,10 @@ a remote endpoint. This Gun connection is owned by a user process that is called the _owner_ of the connection, and is managed by the supervision tree of the `gun` application. -The owner process communicates with the Gun connection +Any process can communicate with the Gun connection by calling functions from the module `gun`. All functions perform their respective operations asynchronously. The Gun -connection will send Erlang messages to the owner process +connection will send Erlang messages to the calling process whenever needed. When the remote endpoint closes the connection, Gun attempts diff --git a/doc/src/guide/http.asciidoc b/doc/src/guide/http.asciidoc index 382b245..51cb994 100644 --- a/doc/src/guide/http.asciidoc +++ b/doc/src/guide/http.asciidoc @@ -15,7 +15,7 @@ Stream references use the Erlang _reference_ data type and are therefore unique. Streams can be canceled at any time. This will stop any further -messages from being sent to the owner process. Depending on +messages from being sent to the calling process. Depending on its capabilities, the server will also be instructed to cancel the request. @@ -233,7 +233,7 @@ gun:request(ConnPid, "TRACE", "/", [ === Processing responses -All data received from the server is sent to the owner +All data received from the server is sent to the calling process as a message. First a `gun_response` message is sent, followed by zero or more `gun_data` messages. If something goes wrong, a `gun_error` message is sent instead. diff --git a/doc/src/manual/gun_socks_up.asciidoc b/doc/src/manual/gun_socks_up.asciidoc index e74f1a9..e65cbff 100644 --- a/doc/src/manual/gun_socks_up.asciidoc +++ b/doc/src/manual/gun_socks_up.asciidoc @@ -16,7 +16,7 @@ Protocol :: http | http2 | socks The Socks connection is up. -This message informs the owner process that the connection +This message informs the owner/calling process that the connection completed through the configured Socks proxy. If Gun is configured to connect to another Socks server, then the diff --git a/src/gun.erl b/src/gun.erl index 3154b9b..ddd38c8 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -142,7 +142,6 @@ ws_opts => ws_opts() }. -export_type([opts/0]). -%% @todo Add an option to disable/enable the notowner behavior. -type connect_destination() :: #{ host := inet:hostname() | inet:ip_address(), @@ -229,6 +228,7 @@ flow => pos_integer(), keepalive => timeout(), protocols => [{binary(), module()}], + reply_to => pid(), silence_pings => boolean() }. -export_type([ws_opts/0]). @@ -447,7 +447,7 @@ close(ServerPid) -> -spec shutdown(pid()) -> ok. shutdown(ServerPid) -> - gen_statem:cast(ServerPid, {shutdown, self()}). + gen_statem:cast(ServerPid, shutdown). %% Requests. @@ -843,7 +843,8 @@ ws_upgrade(ServerPid, Path, Headers) -> ws_upgrade(ServerPid, Path, Headers, Opts) -> ok = gun_ws:check_options(Opts), StreamRef = make_ref(), - gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers, Opts}), + ReplyTo = maps:get(reply_to, Opts, self()), + gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, Opts}), StreamRef. %% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. @@ -1011,20 +1012,20 @@ ensure_alpn_sni(Protocols0, TransOpts0, #state{origin_host=OriginHost}) -> end. %% Normal TLS handshake. -tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols}, +tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols, ReplyTo}, State0=#state{socket=Socket, transport=gun_tcp}) -> case normal_tls_handshake(Socket, State0, HandshakeEvent, Protocols) of {ok, TLSSocket, NewProtocol, State} -> commands([ {switch_transport, gun_tls, TLSSocket}, - {switch_protocol, NewProtocol} + {switch_protocol, NewProtocol, ReplyTo} ], State); {error, Reason, State} -> commands({error, Reason}, State) end; %% TLS over TLS. tls_handshake(internal, {tls_handshake, - HandshakeEvent0=#{tls_opts := TLSOpts0, timeout := TLSTimeout}, Protocols}, + HandshakeEvent0=#{tls_opts := TLSOpts0, timeout := TLSTimeout}, Protocols, ReplyTo}, State=#state{socket=Socket, transport=Transport, origin_host=OriginHost, origin_port=OriginPort, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> TLSOpts = ensure_alpn_sni(Protocols, TLSOpts0, State), @@ -1034,20 +1035,20 @@ tls_handshake(internal, {tls_handshake, }, EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0), {ok, ProxyPid} = gun_tls_proxy:start_link(OriginHost, OriginPort, - TLSOpts, TLSTimeout, Socket, Transport, {HandshakeEvent, Protocols}), + TLSOpts, TLSTimeout, Socket, Transport, {HandshakeEvent, Protocols, ReplyTo}), commands([{switch_transport, gun_tls_proxy, ProxyPid}], State#state{ socket=ProxyPid, transport=gun_tls_proxy, event_handler_state=EvHandlerState}); %% When using gun_tls_proxy we need a separate message to know whether %% the handshake succeeded and whether we need to switch to a different protocol. -tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, Protocols}}, +tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, Protocols, ReplyTo}}, State0=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> NewProtocol = protocol_negotiated(Negotiated, Protocols), EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ socket => Socket, protocol => NewProtocol }, EvHandlerState0), - commands([{switch_protocol, NewProtocol}], State0#state{event_handler_state=EvHandlerState}); -tls_handshake(info, {gun_tls_proxy, Socket, Error = {error, Reason}, {HandshakeEvent, _}}, + commands([{switch_protocol, NewProtocol, ReplyTo}], State0#state{event_handler_state=EvHandlerState}); +tls_handshake(info, {gun_tls_proxy, Socket, Error = {error, Reason}, {HandshakeEvent, _, _}}, State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ error => Reason @@ -1099,10 +1100,10 @@ connected_data_only(cast, Msg, _) connected_data_only(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). -connected_ws_only(cast, {ws_send, Owner, Frames}, State=#state{ - owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState, +connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{ + protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0), + {Commands, EvHandlerState} = Protocol:ws_send(Frames, ProtoState, ReplyTo, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); connected_ws_only(cast, Msg, _) when element(1, Msg) =:= headers; element(1, Msg) =:= request; element(1, Msg) =:= data; @@ -1155,22 +1156,22 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow} %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. -connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers}, State=#state{opts=Opts}) -> +connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) -> WsOpts = maps:get(ws_opts, Opts, #{}), - connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, State); -connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, - State=#state{owner=Owner, origin_host=Host, origin_port=Port, + connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts}, State); +connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts}, + State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) when Protocol =:= gun_http -> EvHandlerState1 = EvHandler:ws_upgrade(#{ stream_ref => StreamRef, - reply_to => Owner, %% Only the owner can upgrade the connection at this time. + reply_to => ReplyTo, opts => WsOpts }, EvHandlerState0), %% @todo Can fail if HTTP/1.0. {ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState, - StreamRef, Host, Port, Path, Headers, WsOpts, + StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; @@ -1272,6 +1273,7 @@ handle_common_connected_no_input(Type, Event, StateName, State) -> %% Common events. handle_common(cast, {set_owner, CurrentOwner, NewOwner}, _, State=#state{owner=CurrentOwner, status={up, CurrentOwnerRef}}) -> + %% @todo This should probably trigger an event. demonitor(CurrentOwnerRef, [flush]), NewOwnerRef = monitor(process, NewOwner), {keep_state, State#state{owner=NewOwner, status={up, NewOwnerRef}}}; @@ -1280,8 +1282,8 @@ handle_common(cast, {set_owner, CurrentOwner, _}, _, #state{owner=CurrentOwner}) CurrentOwner ! {gun_error, self(), {badstate, "The owner of the connection cannot be changed when the connection is shutting down."}}, keep_state_and_state; -handle_common(cast, {shutdown, Owner}, StateName, State=#state{ - owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> +handle_common(cast, shutdown, StateName, State=#state{ + status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> case {Socket, Protocol} of {undefined, _} -> {stop, shutdown}; @@ -1318,12 +1320,6 @@ handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State end; handle_common({call, From}, _, _, _) -> {keep_state_and_data, {reply, From, {error, bad_call}}}; -%% @todo The ReplyTo patch disabled the notowner behavior. -%% We need to add an option to enforce this behavior if needed. -handle_common(cast, Any, _, #state{owner=Owner}) when element(2, Any) =/= Owner -> - element(2, Any) ! {gun_error, self(), {notowner, - "Operations are restricted to the owner of the connection."}}, - keep_state_and_data; %% We postpone all HTTP/Websocket operations until we are connected. handle_common(cast, _, StateName, _) when StateName =/= connected -> {keep_state_and_data, postpone}; @@ -1381,8 +1377,8 @@ commands([{switch_transport, Transport, Socket}|Tail], State=#state{ commands(Tail, active(State#state{socket=Socket, transport=Transport, messages=Transport:messages(), protocol_state=ProtoState, event_handler_state=EvHandlerState})); -commands([{switch_protocol, Protocol0}], State0=#state{ - owner=Owner, opts=Opts, socket=Socket, transport=Transport, protocol=CurrentProtocol, +commands([{switch_protocol, Protocol0, ReplyTo}], State0=#state{ + opts=Opts, socket=Socket, transport=Transport, protocol=CurrentProtocol, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Protocol, ProtoOpts} = case Protocol0 of {P, PO} -> {protocol_handler(P), PO}; @@ -1392,10 +1388,10 @@ commands([{switch_protocol, Protocol0}], State0=#state{ end, %% When we switch_protocol from socks we must send a gun_socks_up message. _ = case CurrentProtocol of - gun_socks -> Owner ! {gun_socks_up, self(), Protocol:name()}; + gun_socks -> ReplyTo ! {gun_socks_up, self(), Protocol:name()}; _ -> ok end, - {StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts), + {StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts), EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), %% We cancel the existing keepalive and, depending on the protocol, %% we enable keepalive again, effectively resetting the timer. @@ -1406,7 +1402,7 @@ commands([{switch_protocol, Protocol0}], State0=#state{ false -> {next_state, StateName, State} end; %% Perform a TLS handshake. -commands([TLSHandshake={tls_handshake, _, _}], State) -> +commands([TLSHandshake={tls_handshake, _, _, _}], State) -> {next_state, tls_handshake, State, {next_event, internal, TLSHandshake}}. diff --git a/src/gun_http.erl b/src/gun_http.erl index f27563e..80c83bb 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -33,7 +33,7 @@ -export([cancel/5]). -export([stream_info/2]). -export([down/1]). --export([ws_upgrade/9]). +-export([ws_upgrade/10]). %% Functions shared with gun_http2. -export([host_header/3]). @@ -43,11 +43,16 @@ %% @todo Make that a record. -type connect_info() :: {connect, reference(), gun:connect_destination()}. -%% @todo Make that a record. --type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options +-record(websocket, { + ref :: reference(), + reply_to :: pid(), + key :: binary(), + extensions :: [binary()], + opts :: gun:ws_opts() +}). -record(stream, { - ref :: reference() | connect_info() | websocket_info(), + ref :: reference() | connect_info() | #websocket{}, reply_to :: pid(), flow :: integer() | infinity, method :: binary(), @@ -56,7 +61,6 @@ }). -record(http_state, { - owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:http_opts(), @@ -101,9 +105,9 @@ opts_name() -> http_opts. has_keepalive() -> true. default_keepalive() -> infinity. -init(Owner, Socket, Transport, Opts) -> +init(_ReplyTo, Socket, Transport, Opts) -> Version = maps:get(version, Opts, 'HTTP/1.1'), - {connected, #http_state{owner=Owner, socket=Socket, transport=Transport, + {connected, #http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}}. switch_transport(Transport, Socket, State) -> @@ -274,16 +278,15 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts, }, EvHandlerState0), %% @todo We might want to switch to the HTTP/2 protocol or to the TLS transport as well. case StreamRef of - {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts} -> - {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts), - EvHandlerState}; + #websocket{} -> + {ws_handshake(Rest2, State, StreamRef, Headers), EvHandlerState}; %% Any other 101 response results in us switching to the raw protocol. %% @todo We should check that we asked for an upgrade before accepting it. _ -> {_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers), Upgrade = cow_http_hd:parse_upgrade(Upgrade0), ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers}, - {{switch_protocol, raw}, EvHandlerState0} + {{switch_protocol, raw, ReplyTo}, EvHandlerState0} end; %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> @@ -317,11 +320,11 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts, }, Protocols = maps:get(protocols, Destination, [http2, http]), {[{origin, <<"https">>, NewHost, NewPort, connect}, - {tls_handshake, HandshakeEvent, Protocols}], EvHandlerState1}; + {tls_handshake, HandshakeEvent, Protocols, ReplyTo}], EvHandlerState1}; _ -> [Protocol] = maps:get(protocols, Destination, [http]), {[{origin, <<"http">>, NewHost, NewPort, connect}, - {switch_protocol, Protocol}], EvHandlerState1} + {switch_protocol, Protocol, ReplyTo}], EvHandlerState1} end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, @@ -389,7 +392,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts, end. stream_ref({connect, StreamRef, _}) -> StreamRef; -stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; +stream_ref(#websocket{ref=StreamRef}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. %% The state must be first in order to retrieve it when the stream ended. @@ -702,7 +705,7 @@ stream_info(#http_state{streams=Streams}, StreamRef) -> down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of {connect, Ref2, _} -> Ref2; - {websocket, Ref2, _, _, _} -> Ref2; + #websocket{ref=Ref2} -> Ref2; _ -> Ref end || #stream{ref=Ref} <- Streams], {KilledStreams, []}. @@ -788,13 +791,13 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. -ws_upgrade(#http_state{owner=ReplyTo, version='HTTP/1.0'}, StreamRef, _, _, _, _, _, _, EvHandlerState) -> +ws_upgrade(#http_state{version='HTTP/1.0'}, + StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket cannot be used over an HTTP/1.0 connection."}}, {[], EvHandlerState}; -ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, - StreamRef, Host, Port, Path, Headers0, WsOpts, - EvHandler, EvHandlerState0) -> +ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, + Host, Port, Path, Headers0, WsOpts, EvHandler, EvHandlerState0) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} @@ -821,10 +824,10 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, EvHandler, EvHandlerState0, ?FUNCTION_NAME), InitialFlow = maps:get(flow, WsOpts, infinity), {new_stream(State#http_state{connection=Conn, out=Out}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow), - EvHandlerState}. + #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts}, + ReplyTo, <<"GET">>, InitialFlow), EvHandlerState}. -ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> +ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) -> %% @todo check upgrade, connection case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of false -> @@ -832,23 +835,23 @@ ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> {_, Accept} -> case cow_ws:encode_key(Key) of Accept -> - ws_handshake_extensions(Buffer, State, StreamRef, - Headers, GunExtensions, Opts); + ws_handshake_extensions(Buffer, State, Ws, Headers); _ -> close end end. -ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) -> +ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) -> case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of false -> - ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts); + ws_handshake_protocols(Buffer, State, Ws, Headers, #{}); {_, ExtHd} -> - case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of + ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd), + case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of close -> close; Extensions -> - ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) + ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions) end end. @@ -870,25 +873,25 @@ ws_validate_extensions(_, _, _, _) -> close. %% @todo Validate protocols. -ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) -> +ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) -> case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of false -> - ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions, - maps:get(default_protocol, Opts, gun_ws_h), Opts); + ws_handshake_end(Buffer, State, Ws, Headers, Extensions, + maps:get(default_protocol, Opts, gun_ws_h)); {_, Proto} -> ProtoOpt = maps:get(protocols, Opts, []), case lists:keyfind(Proto, 1, ProtoOpt) of {_, Handler} -> - ws_handshake_end(Buffer, State, StreamRef, - Headers, Extensions, Handler, Opts); + ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler); false -> close end end. %% We know that the most recent stream is the Websocket one. -ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport, - streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) -> +ws_handshake_end(Buffer, + #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]}, + #websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> @@ -898,7 +901,7 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, + ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ stream_ref => StreamRef, headers => Headers, @@ -906,4 +909,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans flow => InitialFlow, handler => Handler, opts => Opts - }}}. + }}, ReplyTo}. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index e6f09ea..bbc76ab 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -50,7 +50,6 @@ }). -record(http2_state, { - owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:http2_opts(), @@ -119,7 +118,7 @@ opts_name() -> http2_opts. has_keepalive() -> true. default_keepalive() -> 5000. -init(Owner, Socket, Transport, Opts0) -> +init(_ReplyTo, Socket, Transport, Opts0) -> %% We have different defaults than the protocol in order %% to optimize for performance when receiving responses. Opts = Opts0#{ @@ -129,7 +128,7 @@ init(Owner, Socket, Transport, Opts0) -> {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), %% @todo Better validate the preface being received. - State = #http2_state{owner=Owner, socket=Socket, + State = #http2_state{socket=Socket, transport=Transport, opts=Opts, content_handlers=Handlers, http2_machine=HTTP2Machine}, Transport:send(Socket, Preface), diff --git a/src/gun_raw.erl b/src/gun_raw.erl index da71cd6..bef2c1c 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -26,7 +26,7 @@ %% @todo down -record(raw_state, { - owner :: pid(), + reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module() }). @@ -39,12 +39,12 @@ name() -> raw. opts_name() -> raw_opts. has_keepalive() -> false. -init(Owner, Socket, Transport, _Opts) -> - {connected_data_only, #raw_state{owner=Owner, socket=Socket, transport=Transport}}. +init(ReplyTo, Socket, Transport, _Opts) -> + {connected_data_only, #raw_state{reply_to=ReplyTo, socket=Socket, transport=Transport}}. -handle(Data, State=#raw_state{owner=Owner}, _, EvHandlerState) -> +handle(Data, State=#raw_state{reply_to=ReplyTo}, _, EvHandlerState) -> %% When we take over the entire connection there is no stream reference. - Owner ! {gun_data, self(), undefined, nofin, Data}, + ReplyTo ! {gun_data, self(), undefined, nofin, Data}, {{state, State}, EvHandlerState}. %% We can always close immediately. diff --git a/src/gun_socks.erl b/src/gun_socks.erl index 487e7c4..5a42e3f 100644 --- a/src/gun_socks.erl +++ b/src/gun_socks.erl @@ -26,7 +26,7 @@ %% @todo down -record(socks_state, { - owner :: pid(), + reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:socks_opts(), @@ -83,7 +83,7 @@ name() -> socks. opts_name() -> socks_opts. has_keepalive() -> false. -init(Owner, Socket, Transport, Opts) -> +init(ReplyTo, Socket, Transport, Opts) -> 5 = Version = maps:get(version, Opts, 5), Auth = maps:get(auth, Opts, [none]), Methods = < none -> <<0>> end || A <- Auth>>, Transport:send(Socket, [<<5, (length(Auth))>>, Methods]), - {connected_no_input, #socks_state{owner=Owner, socket=Socket, transport=Transport, + {connected_no_input, #socks_state{reply_to=ReplyTo, socket=Socket, transport=Transport, opts=Opts, version=Version, status=auth_method_select}}. switch_transport(Transport, Socket, State) -> @@ -120,7 +120,7 @@ handle(<<1, 0>>, State=#socks_state{version=5, status=auth_username_password}) - handle(<<1, _>>, #socks_state{version=5, status=auth_username_password}) -> {error, {socks5, username_password_auth_failure}}; %% Connect reply. -handle(<<5, 0, 0, Rest0/bits>>, #socks_state{opts=Opts, version=5, status=connect}) -> +handle(<<5, 0, 0, Rest0/bits>>, #socks_state{reply_to=ReplyTo, opts=Opts, version=5, status=connect}) -> %% @todo What to do with BoundAddr and BoundPort? Add as metadata to origin info? {_BoundAddr, _BoundPort} = case Rest0 of %% @todo Seen a server with <<1, 0:48>>. @@ -142,11 +142,11 @@ handle(<<5, 0, 0, Rest0/bits>>, #socks_state{opts=Opts, version=5, status=connec timeout => maps:get(tls_handshake_timeout, Opts, infinity) }, [{origin, <<"https">>, NewHost, NewPort, socks5}, - {tls_handshake, HandshakeEvent, maps:get(protocols, Opts, [http2, http])}]; + {tls_handshake, HandshakeEvent, maps:get(protocols, Opts, [http2, http]), ReplyTo}]; _ -> [Protocol] = maps:get(protocols, Opts, [http]), [{origin, <<"http">>, NewHost, NewPort, socks5}, - {switch_protocol, Protocol}] + {switch_protocol, Protocol, ReplyTo}] end; handle(<<5, Error, _/bits>>, #socks_state{version=5, status=connect}) -> Reason = case Error of diff --git a/src/gun_ws.erl b/src/gun_ws.erl index ba61577..15c4a81 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -25,7 +25,7 @@ -export([closing/4]). -export([close/4]). -export([keepalive/3]). --export([send/4]). +-export([ws_send/5]). -export([down/1]). -record(payload, { @@ -39,7 +39,7 @@ }). -record(ws_state, { - owner :: pid(), + reply_to :: pid(), stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), @@ -79,6 +79,8 @@ do_check_options([Opt={protocols, L}|Opts]) when is_list(L) -> [true] -> do_check_options(Opts); _ -> {error, {options, {ws, Opt}}} end; +do_check_options([{reply_to, P}|Opts]) when is_pid(P) -> + do_check_options(Opts); do_check_options([{silence_pings, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{user_opts, _}|Opts]) -> @@ -91,10 +93,10 @@ opts_name() -> ws_opts. has_keepalive() -> true. default_keepalive() -> 5000. -init(Owner, Socket, Transport, #{stream_ref := StreamRef, headers := Headers, +init(ReplyTo, Socket, Transport, #{stream_ref := StreamRef, headers := Headers, extensions := Extensions, flow := InitialFlow, handler := Handler, opts := Opts}) -> - {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), - {connected_ws_only, #ws_state{owner=Owner, stream_ref=StreamRef, + {ok, HandlerState} = Handler:init(ReplyTo, StreamRef, Headers, Opts), + {connected_ws_only, #ws_state{reply_to=ReplyTo, stream_ref=StreamRef, socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. @@ -107,7 +109,7 @@ handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); -handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, +handle(Data, State=#ws_state{reply_to=ReplyTo, stream_ref=StreamRef, buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}, EvHandler, EvHandlerState0) -> %% Send the event only if there was no data in the buffer. @@ -175,7 +177,7 @@ maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> {active, Flow > 0} ], EvHandlerState}. -dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, +dispatch(Rest, State0=#ws_state{reply_to=ReplyTo, stream_ref=StreamRef, frag_state=FragState, extensions=Extensions, flow=Flow0, handler=Handler, handler_state=HandlerState0}, Type, Payload, CloseCode, EvHandler, EvHandlerState0) -> @@ -196,10 +198,10 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, State1 = State0#ws_state{flow=Flow, handler_state=HandlerState}, {State, EvHandlerState} = case Frame of ping -> - {[], EvHandlerState2} = send(pong, State1, EvHandler, EvHandlerState1), + {[], EvHandlerState2} = send(pong, State1, ReplyTo, EvHandler, EvHandlerState1), {State1, EvHandlerState2}; {ping, Payload} -> - {[], EvHandlerState2} = send({pong, Payload}, State1, EvHandler, EvHandlerState1), + {[], EvHandlerState2} = send({pong, Payload}, State1, ReplyTo, EvHandler, EvHandlerState1), {State1, EvHandlerState2}; close -> {State1#ws_state{in=close}, EvHandlerState1}; @@ -226,7 +228,7 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> %% The user already sent the close frame; do nothing. closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> {closing(State), EvHandlerState}; -closing(Reason, State, EvHandler, EvHandlerState) -> +closing(Reason, State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState) -> Code = case Reason of normal -> 1000; owner_down -> 1001; @@ -234,7 +236,7 @@ closing(Reason, State, EvHandler, EvHandlerState) -> {error, badframe} -> 1002; {error, badencoding} -> 1007 end, - send({close, Code, <<>>}, State, EvHandler, EvHandlerState). + send({close, Code, <<>>}, State, ReplyTo, EvHandler, EvHandlerState). closing(#ws_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), @@ -243,14 +245,14 @@ closing(#ws_state{opts=Opts}) -> close(_, _, _, EvHandlerState) -> EvHandlerState. -keepalive(State, EvHandler, EvHandlerState0) -> - {[], EvHandlerState} = send(ping, State, EvHandler, EvHandlerState0), +keepalive(State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState0) -> + {[], EvHandlerState} = send(ping, State, ReplyTo, EvHandler, EvHandlerState0), {State, EvHandlerState}. %% Send one frame. -send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, +send(Frame, State=#ws_state{stream_ref=StreamRef, socket=Socket, transport=Transport, in=In, extensions=Extensions}, - EvHandler, EvHandlerState0) when not is_list(Frame) -> + ReplyTo, EvHandler, EvHandlerState0) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -272,14 +274,17 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, ], EvHandlerState}; true -> {[], EvHandlerState} - end; + end. + %% Send many frames. -send([], _, _, EvHandlerState) -> +ws_send(Frame, State, ReplyTo, EvHandler, EvHandlerState) when not is_list(Frame) -> + send(Frame, State, ReplyTo, EvHandler, EvHandlerState); +ws_send([], _, _, _, EvHandlerState) -> {[], EvHandlerState}; -send([Frame|Tail], State, EvHandler, EvHandlerState0) -> - case send(Frame, State, EvHandler, EvHandlerState0) of +ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) -> + case send(Frame, State, ReplyTo, EvHandler, EvHandlerState0) of {[], EvHandlerState} -> - send(Tail, State, EvHandler, EvHandlerState); + ws_send(Tail, State, ReplyTo, EvHandler, EvHandlerState); Other -> Other end. diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl index 6a843ea..18ab3b5 100644 --- a/test/raw_SUITE.erl +++ b/test/raw_SUITE.erl @@ -159,6 +159,33 @@ do_connect_raw(OriginTransport, ProxyTransport) -> }]} = gun:info(ConnPid), gun:close(ConnPid). +connect_raw_reply_to(_) -> + doc("When using CONNECT to establish a connection with the reply_to option set, " + "Gun must honor this option in the raw protocol."), + Self = self(), + ReplyTo = spawn(fun() -> + {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end, + {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + Self ! {self(), ready}, + {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined), + Self ! {self(), ok} + end), + {ok, OriginPid, OriginPort} = init_origin(tcp, raw, fun do_echo/3), + {ok, ProxyPid, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp), + {ok, ConnPid} = gun:open("localhost", ProxyPort), + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:connect(ConnPid, #{ + host => "localhost", + port => OriginPort, + protocols => [raw] + }, [], #{reply_to => ReplyTo}), + ReplyTo ! {ConnPid, StreamRef}, + {request, <<"CONNECT">>, _, 'HTTP/1.1', _} = receive_from(ProxyPid), + handshake_completed = receive_from(OriginPid), + receive {ReplyTo, ready} -> ok after 1000 -> error(timeout) end, + gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), + receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end. + http11_upgrade_raw_tcp(_) -> doc("Use the HTTP Upgrade mechanism to switch to the raw protocol over TCP."), do_http11_upgrade_raw(tcp). @@ -202,6 +229,40 @@ do_http11_upgrade_raw(OriginTransport) -> } = gun:info(ConnPid), gun:close(ConnPid). +http11_upgrade_raw_reply_to(_) -> + doc("When upgrading an HTTP/1.1 connection with the reply_to option set, " + "Gun must honor this option in the raw protocol."), + Self = self(), + ReplyTo = spawn(fun() -> + {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end, + {upgrade, [<<"custom/1.0">>], _} = gun:await(ConnPid, StreamRef), + Self ! {self(), ready}, + {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined), + Self ! {self(), ok} + end), + {ok, OriginPid, OriginPort} = init_origin(tcp, raw, + fun (Parent, ClientSocket, ClientTransport) -> + %% We skip the request and send a 101 response unconditionally. + {ok, _} = ClientTransport:recv(ClientSocket, 0, 5000), + ClientTransport:send(ClientSocket, + "HTTP/1.1 101 Switching Protocols\r\n" + "Connection: upgrade\r\n" + "Upgrade: custom/1.0\r\n" + "\r\n"), + do_echo(Parent, ClientSocket, ClientTransport) + end), + {ok, ConnPid} = gun:open("localhost", OriginPort), + {ok, http} = gun:await_up(ConnPid), + handshake_completed = receive_from(OriginPid), + StreamRef = gun:get(ConnPid, "/", #{ + <<"connection">> => <<"upgrade">>, + <<"upgrade">> => <<"custom/1.0">> + }, #{reply_to => ReplyTo}), + ReplyTo ! {ConnPid, StreamRef}, + receive {ReplyTo, ready} -> ok after 1000 -> error(timeout) end, + gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), + receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end. + %% The origin server will echo everything back. do_echo(Parent, ClientSocket, ClientTransport) -> diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 55cdfba..d4413fb 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -122,6 +122,24 @@ reject_upgrade(Config) -> error(timeout) end. +reply_to(Config) -> + doc("Ensure we can send a list of frames in one gun:ws_send call."), + Self = self(), + Frame = {text, <<"Hello!">>}, + ReplyTo = spawn(fun() -> + {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end, + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + Self ! {self(), ready}, + {ws, Frame} = gun:await(ConnPid, StreamRef), + Self ! {self(), ok} + end), + {ok, ConnPid} = gun:open("localhost", config(port, Config)), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{reply_to => ReplyTo}), + ReplyTo ! {ConnPid, StreamRef}, + receive {ReplyTo, ready} -> gun:ws_send(ConnPid, Frame) after 1000 -> error(timeout) end, + receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end. + send_many(Config) -> doc("Ensure we can send a list of frames in one gun:ws_send call."), {ok, ConnPid} = gun:open("localhost", config(port, Config)), -- cgit v1.2.3