aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-09-26 13:16:56 +0200
committerLoïc Hoguin <[email protected]>2019-09-26 13:20:28 +0200
commit00cc1f385f94823a0684deee001b643091e235b0 (patch)
tree6e5406fb62b71b17b29f1de42ccbe01c8c191547
parentd86d55c1f90b37d991e20ad0f1ac37b1e38b36e1 (diff)
downloadgun-00cc1f385f94823a0684deee001b643091e235b0.tar.gz
gun-00cc1f385f94823a0684deee001b643091e235b0.tar.bz2
gun-00cc1f385f94823a0684deee001b643091e235b0.zip
Add reply_to option to ws_upgrade; remove notowner entirely
The reply_to option is also propagated when we switch protocols.
-rw-r--r--doc/src/guide/connect.asciidoc4
-rw-r--r--doc/src/guide/http.asciidoc4
-rw-r--r--doc/src/manual/gun_socks_up.asciidoc2
-rw-r--r--src/gun.erl60
-rw-r--r--src/gun_http.erl77
-rw-r--r--src/gun_http2.erl5
-rw-r--r--src/gun_raw.erl10
-rw-r--r--src/gun_socks.erl12
-rw-r--r--src/gun_ws.erl45
-rw-r--r--test/raw_SUITE.erl61
-rw-r--r--test/ws_SUITE.erl18
11 files changed, 190 insertions, 108 deletions
diff --git a/doc/src/guide/connect.asciidoc b/doc/src/guide/connect.asciidoc
index a2c0a28..08f8db2 100644
--- a/doc/src/guide/connect.asciidoc
+++ b/doc/src/guide/connect.asciidoc
@@ -16,10 +16,10 @@ a remote endpoint. This Gun connection is owned by a user
process that is called the _owner_ of the connection, and is
managed by the supervision tree of the `gun` application.
-The owner process communicates with the Gun connection
+Any process can communicate with the Gun connection
by calling functions from the module `gun`. All functions
perform their respective operations asynchronously. The Gun
-connection will send Erlang messages to the owner process
+connection will send Erlang messages to the calling process
whenever needed.
When the remote endpoint closes the connection, Gun attempts
diff --git a/doc/src/guide/http.asciidoc b/doc/src/guide/http.asciidoc
index 382b245..51cb994 100644
--- a/doc/src/guide/http.asciidoc
+++ b/doc/src/guide/http.asciidoc
@@ -15,7 +15,7 @@ Stream references use the Erlang _reference_ data type and
are therefore unique.
Streams can be canceled at any time. This will stop any further
-messages from being sent to the owner process. Depending on
+messages from being sent to the calling process. Depending on
its capabilities, the server will also be instructed to cancel
the request.
@@ -233,7 +233,7 @@ gun:request(ConnPid, "TRACE", "/", [
=== Processing responses
-All data received from the server is sent to the owner
+All data received from the server is sent to the calling
process as a message. First a `gun_response` message is sent,
followed by zero or more `gun_data` messages. If something goes wrong,
a `gun_error` message is sent instead.
diff --git a/doc/src/manual/gun_socks_up.asciidoc b/doc/src/manual/gun_socks_up.asciidoc
index e74f1a9..e65cbff 100644
--- a/doc/src/manual/gun_socks_up.asciidoc
+++ b/doc/src/manual/gun_socks_up.asciidoc
@@ -16,7 +16,7 @@ Protocol :: http | http2 | socks
The Socks connection is up.
-This message informs the owner process that the connection
+This message informs the owner/calling process that the connection
completed through the configured Socks proxy.
If Gun is configured to connect to another Socks server, then the
diff --git a/src/gun.erl b/src/gun.erl
index 3154b9b..ddd38c8 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -142,7 +142,6 @@
ws_opts => ws_opts()
}.
-export_type([opts/0]).
-%% @todo Add an option to disable/enable the notowner behavior.
-type connect_destination() :: #{
host := inet:hostname() | inet:ip_address(),
@@ -229,6 +228,7 @@
flow => pos_integer(),
keepalive => timeout(),
protocols => [{binary(), module()}],
+ reply_to => pid(),
silence_pings => boolean()
}.
-export_type([ws_opts/0]).
@@ -447,7 +447,7 @@ close(ServerPid) ->
-spec shutdown(pid()) -> ok.
shutdown(ServerPid) ->
- gen_statem:cast(ServerPid, {shutdown, self()}).
+ gen_statem:cast(ServerPid, shutdown).
%% Requests.
@@ -843,7 +843,8 @@ ws_upgrade(ServerPid, Path, Headers) ->
ws_upgrade(ServerPid, Path, Headers, Opts) ->
ok = gun_ws:check_options(Opts),
StreamRef = make_ref(),
- gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers, Opts}),
+ ReplyTo = maps:get(reply_to, Opts, self()),
+ gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, Opts}),
StreamRef.
%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
@@ -1011,20 +1012,20 @@ ensure_alpn_sni(Protocols0, TransOpts0, #state{origin_host=OriginHost}) ->
end.
%% Normal TLS handshake.
-tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols},
+tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols, ReplyTo},
State0=#state{socket=Socket, transport=gun_tcp}) ->
case normal_tls_handshake(Socket, State0, HandshakeEvent, Protocols) of
{ok, TLSSocket, NewProtocol, State} ->
commands([
{switch_transport, gun_tls, TLSSocket},
- {switch_protocol, NewProtocol}
+ {switch_protocol, NewProtocol, ReplyTo}
], State);
{error, Reason, State} ->
commands({error, Reason}, State)
end;
%% TLS over TLS.
tls_handshake(internal, {tls_handshake,
- HandshakeEvent0=#{tls_opts := TLSOpts0, timeout := TLSTimeout}, Protocols},
+ HandshakeEvent0=#{tls_opts := TLSOpts0, timeout := TLSTimeout}, Protocols, ReplyTo},
State=#state{socket=Socket, transport=Transport, origin_host=OriginHost, origin_port=OriginPort,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
TLSOpts = ensure_alpn_sni(Protocols, TLSOpts0, State),
@@ -1034,20 +1035,20 @@ tls_handshake(internal, {tls_handshake,
},
EvHandlerState = EvHandler:tls_handshake_start(HandshakeEvent, EvHandlerState0),
{ok, ProxyPid} = gun_tls_proxy:start_link(OriginHost, OriginPort,
- TLSOpts, TLSTimeout, Socket, Transport, {HandshakeEvent, Protocols}),
+ TLSOpts, TLSTimeout, Socket, Transport, {HandshakeEvent, Protocols, ReplyTo}),
commands([{switch_transport, gun_tls_proxy, ProxyPid}], State#state{
socket=ProxyPid, transport=gun_tls_proxy, event_handler_state=EvHandlerState});
%% When using gun_tls_proxy we need a separate message to know whether
%% the handshake succeeded and whether we need to switch to a different protocol.
-tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, Protocols}},
+tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, Protocols, ReplyTo}},
State0=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
NewProtocol = protocol_negotiated(Negotiated, Protocols),
EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
socket => Socket,
protocol => NewProtocol
}, EvHandlerState0),
- commands([{switch_protocol, NewProtocol}], State0#state{event_handler_state=EvHandlerState});
-tls_handshake(info, {gun_tls_proxy, Socket, Error = {error, Reason}, {HandshakeEvent, _}},
+ commands([{switch_protocol, NewProtocol, ReplyTo}], State0#state{event_handler_state=EvHandlerState});
+tls_handshake(info, {gun_tls_proxy, Socket, Error = {error, Reason}, {HandshakeEvent, _, _}},
State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
error => Reason
@@ -1099,10 +1100,10 @@ connected_data_only(cast, Msg, _)
connected_data_only(Type, Event, State) ->
handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
-connected_ws_only(cast, {ws_send, Owner, Frames}, State=#state{
- owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState,
+connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{
+ protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0),
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames, ProtoState, ReplyTo, EvHandler, EvHandlerState0),
commands(Commands, State#state{event_handler_state=EvHandlerState});
connected_ws_only(cast, Msg, _)
when element(1, Msg) =:= headers; element(1, Msg) =:= request; element(1, Msg) =:= data;
@@ -1155,22 +1156,22 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
-connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
+connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
WsOpts = maps:get(ws_opts, Opts, #{}),
- connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, State);
-connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts},
- State=#state{owner=Owner, origin_host=Host, origin_port=Port,
+ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts}, State);
+connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts},
+ State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0})
when Protocol =:= gun_http ->
EvHandlerState1 = EvHandler:ws_upgrade(#{
stream_ref => StreamRef,
- reply_to => Owner, %% Only the owner can upgrade the connection at this time.
+ reply_to => ReplyTo,
opts => WsOpts
}, EvHandlerState0),
%% @todo Can fail if HTTP/1.0.
{ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
- StreamRef, Host, Port, Path, Headers, WsOpts,
+ StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts,
EvHandler, EvHandlerState1),
{keep_state, State#state{protocol_state=ProtoState2,
event_handler_state=EvHandlerState}};
@@ -1272,6 +1273,7 @@ handle_common_connected_no_input(Type, Event, StateName, State) ->
%% Common events.
handle_common(cast, {set_owner, CurrentOwner, NewOwner}, _,
State=#state{owner=CurrentOwner, status={up, CurrentOwnerRef}}) ->
+ %% @todo This should probably trigger an event.
demonitor(CurrentOwnerRef, [flush]),
NewOwnerRef = monitor(process, NewOwner),
{keep_state, State#state{owner=NewOwner, status={up, NewOwnerRef}}};
@@ -1280,8 +1282,8 @@ handle_common(cast, {set_owner, CurrentOwner, _}, _, #state{owner=CurrentOwner})
CurrentOwner ! {gun_error, self(), {badstate,
"The owner of the connection cannot be changed when the connection is shutting down."}},
keep_state_and_state;
-handle_common(cast, {shutdown, Owner}, StateName, State=#state{
- owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) ->
+handle_common(cast, shutdown, StateName, State=#state{
+ status=Status, socket=Socket, transport=Transport, protocol=Protocol}) ->
case {Socket, Protocol} of
{undefined, _} ->
{stop, shutdown};
@@ -1318,12 +1320,6 @@ handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State
end;
handle_common({call, From}, _, _, _) ->
{keep_state_and_data, {reply, From, {error, bad_call}}};
-%% @todo The ReplyTo patch disabled the notowner behavior.
-%% We need to add an option to enforce this behavior if needed.
-handle_common(cast, Any, _, #state{owner=Owner}) when element(2, Any) =/= Owner ->
- element(2, Any) ! {gun_error, self(), {notowner,
- "Operations are restricted to the owner of the connection."}},
- keep_state_and_data;
%% We postpone all HTTP/Websocket operations until we are connected.
handle_common(cast, _, StateName, _) when StateName =/= connected ->
{keep_state_and_data, postpone};
@@ -1381,8 +1377,8 @@ commands([{switch_transport, Transport, Socket}|Tail], State=#state{
commands(Tail, active(State#state{socket=Socket, transport=Transport,
messages=Transport:messages(), protocol_state=ProtoState,
event_handler_state=EvHandlerState}));
-commands([{switch_protocol, Protocol0}], State0=#state{
- owner=Owner, opts=Opts, socket=Socket, transport=Transport, protocol=CurrentProtocol,
+commands([{switch_protocol, Protocol0, ReplyTo}], State0=#state{
+ opts=Opts, socket=Socket, transport=Transport, protocol=CurrentProtocol,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Protocol, ProtoOpts} = case Protocol0 of
{P, PO} -> {protocol_handler(P), PO};
@@ -1392,10 +1388,10 @@ commands([{switch_protocol, Protocol0}], State0=#state{
end,
%% When we switch_protocol from socks we must send a gun_socks_up message.
_ = case CurrentProtocol of
- gun_socks -> Owner ! {gun_socks_up, self(), Protocol:name()};
+ gun_socks -> ReplyTo ! {gun_socks_up, self(), Protocol:name()};
_ -> ok
end,
- {StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts),
+ {StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
%% We cancel the existing keepalive and, depending on the protocol,
%% we enable keepalive again, effectively resetting the timer.
@@ -1406,7 +1402,7 @@ commands([{switch_protocol, Protocol0}], State0=#state{
false -> {next_state, StateName, State}
end;
%% Perform a TLS handshake.
-commands([TLSHandshake={tls_handshake, _, _}], State) ->
+commands([TLSHandshake={tls_handshake, _, _, _}], State) ->
{next_state, tls_handshake, State,
{next_event, internal, TLSHandshake}}.
diff --git a/src/gun_http.erl b/src/gun_http.erl
index f27563e..80c83bb 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -33,7 +33,7 @@
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
--export([ws_upgrade/9]).
+-export([ws_upgrade/10]).
%% Functions shared with gun_http2.
-export([host_header/3]).
@@ -43,11 +43,16 @@
%% @todo Make that a record.
-type connect_info() :: {connect, reference(), gun:connect_destination()}.
-%% @todo Make that a record.
--type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options
+-record(websocket, {
+ ref :: reference(),
+ reply_to :: pid(),
+ key :: binary(),
+ extensions :: [binary()],
+ opts :: gun:ws_opts()
+}).
-record(stream, {
- ref :: reference() | connect_info() | websocket_info(),
+ ref :: reference() | connect_info() | #websocket{},
reply_to :: pid(),
flow :: integer() | infinity,
method :: binary(),
@@ -56,7 +61,6 @@
}).
-record(http_state, {
- owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:http_opts(),
@@ -101,9 +105,9 @@ opts_name() -> http_opts.
has_keepalive() -> true.
default_keepalive() -> infinity.
-init(Owner, Socket, Transport, Opts) ->
+init(_ReplyTo, Socket, Transport, Opts) ->
Version = maps:get(version, Opts, 'HTTP/1.1'),
- {connected, #http_state{owner=Owner, socket=Socket, transport=Transport,
+ {connected, #http_state{socket=Socket, transport=Transport,
opts=Opts, version=Version}}.
switch_transport(Transport, Socket, State) ->
@@ -274,16 +278,15 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts,
}, EvHandlerState0),
%% @todo We might want to switch to the HTTP/2 protocol or to the TLS transport as well.
case StreamRef of
- {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts} ->
- {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts),
- EvHandlerState};
+ #websocket{} ->
+ {ws_handshake(Rest2, State, StreamRef, Headers), EvHandlerState};
%% Any other 101 response results in us switching to the raw protocol.
%% @todo We should check that we asked for an upgrade before accepting it.
_ ->
{_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers),
Upgrade = cow_http_hd:parse_upgrade(Upgrade0),
ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers},
- {{switch_protocol, raw}, EvHandlerState0}
+ {{switch_protocol, raw, ReplyTo}, EvHandlerState0}
end;
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
{_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
@@ -317,11 +320,11 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts,
},
Protocols = maps:get(protocols, Destination, [http2, http]),
{[{origin, <<"https">>, NewHost, NewPort, connect},
- {tls_handshake, HandshakeEvent, Protocols}], EvHandlerState1};
+ {tls_handshake, HandshakeEvent, Protocols, ReplyTo}], EvHandlerState1};
_ ->
[Protocol] = maps:get(protocols, Destination, [http]),
{[{origin, <<"http">>, NewHost, NewPort, connect},
- {switch_protocol, Protocol}], EvHandlerState1}
+ {switch_protocol, Protocol, ReplyTo}], EvHandlerState1}
end;
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
@@ -389,7 +392,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts,
end.
stream_ref({connect, StreamRef, _}) -> StreamRef;
-stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
+stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.
%% The state must be first in order to retrieve it when the stream ended.
@@ -702,7 +705,7 @@ stream_info(#http_state{streams=Streams}, StreamRef) ->
down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
{connect, Ref2, _} -> Ref2;
- {websocket, Ref2, _, _, _} -> Ref2;
+ #websocket{ref=Ref2} -> Ref2;
_ -> Ref
end || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
@@ -788,13 +791,13 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
-ws_upgrade(#http_state{owner=ReplyTo, version='HTTP/1.0'}, StreamRef, _, _, _, _, _, _, EvHandlerState) ->
+ws_upgrade(#http_state{version='HTTP/1.0'},
+ StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
{[], EvHandlerState};
-ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
- StreamRef, Host, Port, Path, Headers0, WsOpts,
- EvHandler, EvHandlerState0) ->
+ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
+ Host, Port, Path, Headers0, WsOpts, EvHandler, EvHandlerState0) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
true -> {[{<<"sec-websocket-extensions">>,
<<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}
@@ -821,10 +824,10 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = maps:get(flow, WsOpts, infinity),
{new_stream(State#http_state{connection=Conn, out=Out},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow),
- EvHandlerState}.
+ #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts},
+ ReplyTo, <<"GET">>, InitialFlow), EvHandlerState}.
-ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
+ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
%% @todo check upgrade, connection
case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
false ->
@@ -832,23 +835,23 @@ ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
{_, Accept} ->
case cow_ws:encode_key(Key) of
Accept ->
- ws_handshake_extensions(Buffer, State, StreamRef,
- Headers, GunExtensions, Opts);
+ ws_handshake_extensions(Buffer, State, Ws, Headers);
_ ->
close
end
end.
-ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) ->
+ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) ->
case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
false ->
- ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts);
+ ws_handshake_protocols(Buffer, State, Ws, Headers, #{});
{_, ExtHd} ->
- case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
+ ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd),
+ case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of
close ->
close;
Extensions ->
- ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts)
+ ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions)
end
end.
@@ -870,25 +873,25 @@ ws_validate_extensions(_, _, _, _) ->
close.
%% @todo Validate protocols.
-ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
+ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) ->
case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
false ->
- ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions,
- maps:get(default_protocol, Opts, gun_ws_h), Opts);
+ ws_handshake_end(Buffer, State, Ws, Headers, Extensions,
+ maps:get(default_protocol, Opts, gun_ws_h));
{_, Proto} ->
ProtoOpt = maps:get(protocols, Opts, []),
case lists:keyfind(Proto, 1, ProtoOpt) of
{_, Handler} ->
- ws_handshake_end(Buffer, State, StreamRef,
- Headers, Extensions, Handler, Opts);
+ ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler);
false ->
close
end
end.
%% We know that the most recent stream is the Websocket one.
-ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport,
- streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) ->
+ws_handshake_end(Buffer,
+ #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]},
+ #websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -898,7 +901,7 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
+ ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
stream_ref => StreamRef,
headers => Headers,
@@ -906,4 +909,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
flow => InitialFlow,
handler => Handler,
opts => Opts
- }}}.
+ }}, ReplyTo}.
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index e6f09ea..bbc76ab 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -50,7 +50,6 @@
}).
-record(http2_state, {
- owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:http2_opts(),
@@ -119,7 +118,7 @@ opts_name() -> http2_opts.
has_keepalive() -> true.
default_keepalive() -> 5000.
-init(Owner, Socket, Transport, Opts0) ->
+init(_ReplyTo, Socket, Transport, Opts0) ->
%% We have different defaults than the protocol in order
%% to optimize for performance when receiving responses.
Opts = Opts0#{
@@ -129,7 +128,7 @@ init(Owner, Socket, Transport, Opts0) ->
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
%% @todo Better validate the preface being received.
- State = #http2_state{owner=Owner, socket=Socket,
+ State = #http2_state{socket=Socket,
transport=Transport, opts=Opts, content_handlers=Handlers,
http2_machine=HTTP2Machine},
Transport:send(Socket, Preface),
diff --git a/src/gun_raw.erl b/src/gun_raw.erl
index da71cd6..bef2c1c 100644
--- a/src/gun_raw.erl
+++ b/src/gun_raw.erl
@@ -26,7 +26,7 @@
%% @todo down
-record(raw_state, {
- owner :: pid(),
+ reply_to :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module()
}).
@@ -39,12 +39,12 @@ name() -> raw.
opts_name() -> raw_opts.
has_keepalive() -> false.
-init(Owner, Socket, Transport, _Opts) ->
- {connected_data_only, #raw_state{owner=Owner, socket=Socket, transport=Transport}}.
+init(ReplyTo, Socket, Transport, _Opts) ->
+ {connected_data_only, #raw_state{reply_to=ReplyTo, socket=Socket, transport=Transport}}.
-handle(Data, State=#raw_state{owner=Owner}, _, EvHandlerState) ->
+handle(Data, State=#raw_state{reply_to=ReplyTo}, _, EvHandlerState) ->
%% When we take over the entire connection there is no stream reference.
- Owner ! {gun_data, self(), undefined, nofin, Data},
+ ReplyTo ! {gun_data, self(), undefined, nofin, Data},
{{state, State}, EvHandlerState}.
%% We can always close immediately.
diff --git a/src/gun_socks.erl b/src/gun_socks.erl
index 487e7c4..5a42e3f 100644
--- a/src/gun_socks.erl
+++ b/src/gun_socks.erl
@@ -26,7 +26,7 @@
%% @todo down
-record(socks_state, {
- owner :: pid(),
+ reply_to :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:socks_opts(),
@@ -83,7 +83,7 @@ name() -> socks.
opts_name() -> socks_opts.
has_keepalive() -> false.
-init(Owner, Socket, Transport, Opts) ->
+init(ReplyTo, Socket, Transport, Opts) ->
5 = Version = maps:get(version, Opts, 5),
Auth = maps:get(auth, Opts, [none]),
Methods = <<case A of
@@ -91,7 +91,7 @@ init(Owner, Socket, Transport, Opts) ->
none -> <<0>>
end || A <- Auth>>,
Transport:send(Socket, [<<5, (length(Auth))>>, Methods]),
- {connected_no_input, #socks_state{owner=Owner, socket=Socket, transport=Transport,
+ {connected_no_input, #socks_state{reply_to=ReplyTo, socket=Socket, transport=Transport,
opts=Opts, version=Version, status=auth_method_select}}.
switch_transport(Transport, Socket, State) ->
@@ -120,7 +120,7 @@ handle(<<1, 0>>, State=#socks_state{version=5, status=auth_username_password}) -
handle(<<1, _>>, #socks_state{version=5, status=auth_username_password}) ->
{error, {socks5, username_password_auth_failure}};
%% Connect reply.
-handle(<<5, 0, 0, Rest0/bits>>, #socks_state{opts=Opts, version=5, status=connect}) ->
+handle(<<5, 0, 0, Rest0/bits>>, #socks_state{reply_to=ReplyTo, opts=Opts, version=5, status=connect}) ->
%% @todo What to do with BoundAddr and BoundPort? Add as metadata to origin info?
{_BoundAddr, _BoundPort} = case Rest0 of
%% @todo Seen a server with <<1, 0:48>>.
@@ -142,11 +142,11 @@ handle(<<5, 0, 0, Rest0/bits>>, #socks_state{opts=Opts, version=5, status=connec
timeout => maps:get(tls_handshake_timeout, Opts, infinity)
},
[{origin, <<"https">>, NewHost, NewPort, socks5},
- {tls_handshake, HandshakeEvent, maps:get(protocols, Opts, [http2, http])}];
+ {tls_handshake, HandshakeEvent, maps:get(protocols, Opts, [http2, http]), ReplyTo}];
_ ->
[Protocol] = maps:get(protocols, Opts, [http]),
[{origin, <<"http">>, NewHost, NewPort, socks5},
- {switch_protocol, Protocol}]
+ {switch_protocol, Protocol, ReplyTo}]
end;
handle(<<5, Error, _/bits>>, #socks_state{version=5, status=connect}) ->
Reason = case Error of
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index ba61577..15c4a81 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -25,7 +25,7 @@
-export([closing/4]).
-export([close/4]).
-export([keepalive/3]).
--export([send/4]).
+-export([ws_send/5]).
-export([down/1]).
-record(payload, {
@@ -39,7 +39,7 @@
}).
-record(ws_state, {
- owner :: pid(),
+ reply_to :: pid(),
stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
@@ -79,6 +79,8 @@ do_check_options([Opt={protocols, L}|Opts]) when is_list(L) ->
[true] -> do_check_options(Opts);
_ -> {error, {options, {ws, Opt}}}
end;
+do_check_options([{reply_to, P}|Opts]) when is_pid(P) ->
+ do_check_options(Opts);
do_check_options([{silence_pings, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{user_opts, _}|Opts]) ->
@@ -91,10 +93,10 @@ opts_name() -> ws_opts.
has_keepalive() -> true.
default_keepalive() -> 5000.
-init(Owner, Socket, Transport, #{stream_ref := StreamRef, headers := Headers,
+init(ReplyTo, Socket, Transport, #{stream_ref := StreamRef, headers := Headers,
extensions := Extensions, flow := InitialFlow, handler := Handler, opts := Opts}) ->
- {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
- {connected_ws_only, #ws_state{owner=Owner, stream_ref=StreamRef,
+ {ok, HandlerState} = Handler:init(ReplyTo, StreamRef, Headers, Opts),
+ {connected_ws_only, #ws_state{reply_to=ReplyTo, stream_ref=StreamRef,
socket=Socket, transport=Transport, opts=Opts, extensions=Extensions,
flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
@@ -107,7 +109,7 @@ handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) ->
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
maybe_active(State, EvHandlerState);
-handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
+handle(Data, State=#ws_state{reply_to=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
in=head, frag_state=FragState, extensions=Extensions},
EvHandler, EvHandlerState0) ->
%% Send the event only if there was no data in the buffer.
@@ -175,7 +177,7 @@ maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
{active, Flow > 0}
], EvHandlerState}.
-dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
+dispatch(Rest, State0=#ws_state{reply_to=ReplyTo, stream_ref=StreamRef,
frag_state=FragState, extensions=Extensions, flow=Flow0,
handler=Handler, handler_state=HandlerState0},
Type, Payload, CloseCode, EvHandler, EvHandlerState0) ->
@@ -196,10 +198,10 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
State1 = State0#ws_state{flow=Flow, handler_state=HandlerState},
{State, EvHandlerState} = case Frame of
ping ->
- {[], EvHandlerState2} = send(pong, State1, EvHandler, EvHandlerState1),
+ {[], EvHandlerState2} = send(pong, State1, ReplyTo, EvHandler, EvHandlerState1),
{State1, EvHandlerState2};
{ping, Payload} ->
- {[], EvHandlerState2} = send({pong, Payload}, State1, EvHandler, EvHandlerState1),
+ {[], EvHandlerState2} = send({pong, Payload}, State1, ReplyTo, EvHandler, EvHandlerState1),
{State1, EvHandlerState2};
close ->
{State1#ws_state{in=close}, EvHandlerState1};
@@ -226,7 +228,7 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
%% The user already sent the close frame; do nothing.
closing(_, State=#ws_state{out=close}, _, EvHandlerState) ->
{closing(State), EvHandlerState};
-closing(Reason, State, EvHandler, EvHandlerState) ->
+closing(Reason, State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState) ->
Code = case Reason of
normal -> 1000;
owner_down -> 1001;
@@ -234,7 +236,7 @@ closing(Reason, State, EvHandler, EvHandlerState) ->
{error, badframe} -> 1002;
{error, badencoding} -> 1007
end,
- send({close, Code, <<>>}, State, EvHandler, EvHandlerState).
+ send({close, Code, <<>>}, State, ReplyTo, EvHandler, EvHandlerState).
closing(#ws_state{opts=Opts}) ->
Timeout = maps:get(closing_timeout, Opts, 15000),
@@ -243,14 +245,14 @@ closing(#ws_state{opts=Opts}) ->
close(_, _, _, EvHandlerState) ->
EvHandlerState.
-keepalive(State, EvHandler, EvHandlerState0) ->
- {[], EvHandlerState} = send(ping, State, EvHandler, EvHandlerState0),
+keepalive(State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState0) ->
+ {[], EvHandlerState} = send(ping, State, ReplyTo, EvHandler, EvHandlerState0),
{State, EvHandlerState}.
%% Send one frame.
-send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
+send(Frame, State=#ws_state{stream_ref=StreamRef,
socket=Socket, transport=Transport, in=In, extensions=Extensions},
- EvHandler, EvHandlerState0) when not is_list(Frame) ->
+ ReplyTo, EvHandler, EvHandlerState0) ->
WsSendFrameEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -272,14 +274,17 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
], EvHandlerState};
true ->
{[], EvHandlerState}
- end;
+ end.
+
%% Send many frames.
-send([], _, _, EvHandlerState) ->
+ws_send(Frame, State, ReplyTo, EvHandler, EvHandlerState) when not is_list(Frame) ->
+ send(Frame, State, ReplyTo, EvHandler, EvHandlerState);
+ws_send([], _, _, _, EvHandlerState) ->
{[], EvHandlerState};
-send([Frame|Tail], State, EvHandler, EvHandlerState0) ->
- case send(Frame, State, EvHandler, EvHandlerState0) of
+ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) ->
+ case send(Frame, State, ReplyTo, EvHandler, EvHandlerState0) of
{[], EvHandlerState} ->
- send(Tail, State, EvHandler, EvHandlerState);
+ ws_send(Tail, State, ReplyTo, EvHandler, EvHandlerState);
Other ->
Other
end.
diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl
index 6a843ea..18ab3b5 100644
--- a/test/raw_SUITE.erl
+++ b/test/raw_SUITE.erl
@@ -159,6 +159,33 @@ do_connect_raw(OriginTransport, ProxyTransport) ->
}]} = gun:info(ConnPid),
gun:close(ConnPid).
+connect_raw_reply_to(_) ->
+ doc("When using CONNECT to establish a connection with the reply_to option set, "
+ "Gun must honor this option in the raw protocol."),
+ Self = self(),
+ ReplyTo = spawn(fun() ->
+ {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end,
+ {response, fin, 200, _} = gun:await(ConnPid, StreamRef),
+ Self ! {self(), ready},
+ {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined),
+ Self ! {self(), ok}
+ end),
+ {ok, OriginPid, OriginPort} = init_origin(tcp, raw, fun do_echo/3),
+ {ok, ProxyPid, ProxyPort} = rfc7231_SUITE:do_proxy_start(tcp),
+ {ok, ConnPid} = gun:open("localhost", ProxyPort),
+ {ok, http} = gun:await_up(ConnPid),
+ StreamRef = gun:connect(ConnPid, #{
+ host => "localhost",
+ port => OriginPort,
+ protocols => [raw]
+ }, [], #{reply_to => ReplyTo}),
+ ReplyTo ! {ConnPid, StreamRef},
+ {request, <<"CONNECT">>, _, 'HTTP/1.1', _} = receive_from(ProxyPid),
+ handshake_completed = receive_from(OriginPid),
+ receive {ReplyTo, ready} -> ok after 1000 -> error(timeout) end,
+ gun:data(ConnPid, undefined, nofin, <<"Hello world!">>),
+ receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end.
+
http11_upgrade_raw_tcp(_) ->
doc("Use the HTTP Upgrade mechanism to switch to the raw protocol over TCP."),
do_http11_upgrade_raw(tcp).
@@ -202,6 +229,40 @@ do_http11_upgrade_raw(OriginTransport) ->
} = gun:info(ConnPid),
gun:close(ConnPid).
+http11_upgrade_raw_reply_to(_) ->
+ doc("When upgrading an HTTP/1.1 connection with the reply_to option set, "
+ "Gun must honor this option in the raw protocol."),
+ Self = self(),
+ ReplyTo = spawn(fun() ->
+ {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end,
+ {upgrade, [<<"custom/1.0">>], _} = gun:await(ConnPid, StreamRef),
+ Self ! {self(), ready},
+ {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined),
+ Self ! {self(), ok}
+ end),
+ {ok, OriginPid, OriginPort} = init_origin(tcp, raw,
+ fun (Parent, ClientSocket, ClientTransport) ->
+ %% We skip the request and send a 101 response unconditionally.
+ {ok, _} = ClientTransport:recv(ClientSocket, 0, 5000),
+ ClientTransport:send(ClientSocket,
+ "HTTP/1.1 101 Switching Protocols\r\n"
+ "Connection: upgrade\r\n"
+ "Upgrade: custom/1.0\r\n"
+ "\r\n"),
+ do_echo(Parent, ClientSocket, ClientTransport)
+ end),
+ {ok, ConnPid} = gun:open("localhost", OriginPort),
+ {ok, http} = gun:await_up(ConnPid),
+ handshake_completed = receive_from(OriginPid),
+ StreamRef = gun:get(ConnPid, "/", #{
+ <<"connection">> => <<"upgrade">>,
+ <<"upgrade">> => <<"custom/1.0">>
+ }, #{reply_to => ReplyTo}),
+ ReplyTo ! {ConnPid, StreamRef},
+ receive {ReplyTo, ready} -> ok after 1000 -> error(timeout) end,
+ gun:data(ConnPid, undefined, nofin, <<"Hello world!">>),
+ receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end.
+
%% The origin server will echo everything back.
do_echo(Parent, ClientSocket, ClientTransport) ->
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 55cdfba..d4413fb 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -122,6 +122,24 @@ reject_upgrade(Config) ->
error(timeout)
end.
+reply_to(Config) ->
+ doc("Ensure we can send a list of frames in one gun:ws_send call."),
+ Self = self(),
+ Frame = {text, <<"Hello!">>},
+ ReplyTo = spawn(fun() ->
+ {ConnPid, StreamRef} = receive Msg -> Msg after 1000 -> error(timeout) end,
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ Self ! {self(), ready},
+ {ws, Frame} = gun:await(ConnPid, StreamRef),
+ Self ! {self(), ok}
+ end),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config)),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{reply_to => ReplyTo}),
+ ReplyTo ! {ConnPid, StreamRef},
+ receive {ReplyTo, ready} -> gun:ws_send(ConnPid, Frame) after 1000 -> error(timeout) end,
+ receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end.
+
send_many(Config) ->
doc("Ensure we can send a list of frames in one gun:ws_send call."),
{ok, ConnPid} = gun:open("localhost", config(port, Config)),