aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-09-26 13:16:56 +0200
committerLoïc Hoguin <[email protected]>2019-09-26 13:20:28 +0200
commit00cc1f385f94823a0684deee001b643091e235b0 (patch)
tree6e5406fb62b71b17b29f1de42ccbe01c8c191547 /src/gun_http.erl
parentd86d55c1f90b37d991e20ad0f1ac37b1e38b36e1 (diff)
downloadgun-00cc1f385f94823a0684deee001b643091e235b0.tar.gz
gun-00cc1f385f94823a0684deee001b643091e235b0.tar.bz2
gun-00cc1f385f94823a0684deee001b643091e235b0.zip
Add reply_to option to ws_upgrade; remove notowner entirely
The reply_to option is also propagated when we switch protocols.
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r--src/gun_http.erl77
1 files changed, 40 insertions, 37 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl
index f27563e..80c83bb 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -33,7 +33,7 @@
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
--export([ws_upgrade/9]).
+-export([ws_upgrade/10]).
%% Functions shared with gun_http2.
-export([host_header/3]).
@@ -43,11 +43,16 @@
%% @todo Make that a record.
-type connect_info() :: {connect, reference(), gun:connect_destination()}.
-%% @todo Make that a record.
--type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options
+-record(websocket, {
+ ref :: reference(),
+ reply_to :: pid(),
+ key :: binary(),
+ extensions :: [binary()],
+ opts :: gun:ws_opts()
+}).
-record(stream, {
- ref :: reference() | connect_info() | websocket_info(),
+ ref :: reference() | connect_info() | #websocket{},
reply_to :: pid(),
flow :: integer() | infinity,
method :: binary(),
@@ -56,7 +61,6 @@
}).
-record(http_state, {
- owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:http_opts(),
@@ -101,9 +105,9 @@ opts_name() -> http_opts.
has_keepalive() -> true.
default_keepalive() -> infinity.
-init(Owner, Socket, Transport, Opts) ->
+init(_ReplyTo, Socket, Transport, Opts) ->
Version = maps:get(version, Opts, 'HTTP/1.1'),
- {connected, #http_state{owner=Owner, socket=Socket, transport=Transport,
+ {connected, #http_state{socket=Socket, transport=Transport,
opts=Opts, version=Version}}.
switch_transport(Transport, Socket, State) ->
@@ -274,16 +278,15 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts,
}, EvHandlerState0),
%% @todo We might want to switch to the HTTP/2 protocol or to the TLS transport as well.
case StreamRef of
- {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts} ->
- {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts),
- EvHandlerState};
+ #websocket{} ->
+ {ws_handshake(Rest2, State, StreamRef, Headers), EvHandlerState};
%% Any other 101 response results in us switching to the raw protocol.
%% @todo We should check that we asked for an upgrade before accepting it.
_ ->
{_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers),
Upgrade = cow_http_hd:parse_upgrade(Upgrade0),
ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers},
- {{switch_protocol, raw}, EvHandlerState0}
+ {{switch_protocol, raw, ReplyTo}, EvHandlerState0}
end;
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
{_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
@@ -317,11 +320,11 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts,
},
Protocols = maps:get(protocols, Destination, [http2, http]),
{[{origin, <<"https">>, NewHost, NewPort, connect},
- {tls_handshake, HandshakeEvent, Protocols}], EvHandlerState1};
+ {tls_handshake, HandshakeEvent, Protocols, ReplyTo}], EvHandlerState1};
_ ->
[Protocol] = maps:get(protocols, Destination, [http]),
{[{origin, <<"http">>, NewHost, NewPort, connect},
- {switch_protocol, Protocol}], EvHandlerState1}
+ {switch_protocol, Protocol, ReplyTo}], EvHandlerState1}
end;
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
@@ -389,7 +392,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, opts=Opts,
end.
stream_ref({connect, StreamRef, _}) -> StreamRef;
-stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
+stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.
%% The state must be first in order to retrieve it when the stream ended.
@@ -702,7 +705,7 @@ stream_info(#http_state{streams=Streams}, StreamRef) ->
down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
{connect, Ref2, _} -> Ref2;
- {websocket, Ref2, _, _, _} -> Ref2;
+ #websocket{ref=Ref2} -> Ref2;
_ -> Ref
end || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
@@ -788,13 +791,13 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
-ws_upgrade(#http_state{owner=ReplyTo, version='HTTP/1.0'}, StreamRef, _, _, _, _, _, _, EvHandlerState) ->
+ws_upgrade(#http_state{version='HTTP/1.0'},
+ StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
{[], EvHandlerState};
-ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
- StreamRef, Host, Port, Path, Headers0, WsOpts,
- EvHandler, EvHandlerState0) ->
+ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
+ Host, Port, Path, Headers0, WsOpts, EvHandler, EvHandlerState0) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
true -> {[{<<"sec-websocket-extensions">>,
<<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}
@@ -821,10 +824,10 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = maps:get(flow, WsOpts, infinity),
{new_stream(State#http_state{connection=Conn, out=Out},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow),
- EvHandlerState}.
+ #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts},
+ ReplyTo, <<"GET">>, InitialFlow), EvHandlerState}.
-ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
+ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
%% @todo check upgrade, connection
case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
false ->
@@ -832,23 +835,23 @@ ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
{_, Accept} ->
case cow_ws:encode_key(Key) of
Accept ->
- ws_handshake_extensions(Buffer, State, StreamRef,
- Headers, GunExtensions, Opts);
+ ws_handshake_extensions(Buffer, State, Ws, Headers);
_ ->
close
end
end.
-ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) ->
+ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) ->
case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
false ->
- ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts);
+ ws_handshake_protocols(Buffer, State, Ws, Headers, #{});
{_, ExtHd} ->
- case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
+ ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd),
+ case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of
close ->
close;
Extensions ->
- ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts)
+ ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions)
end
end.
@@ -870,25 +873,25 @@ ws_validate_extensions(_, _, _, _) ->
close.
%% @todo Validate protocols.
-ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
+ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) ->
case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
false ->
- ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions,
- maps:get(default_protocol, Opts, gun_ws_h), Opts);
+ ws_handshake_end(Buffer, State, Ws, Headers, Extensions,
+ maps:get(default_protocol, Opts, gun_ws_h));
{_, Proto} ->
ProtoOpt = maps:get(protocols, Opts, []),
case lists:keyfind(Proto, 1, ProtoOpt) of
{_, Handler} ->
- ws_handshake_end(Buffer, State, StreamRef,
- Headers, Extensions, Handler, Opts);
+ ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler);
false ->
close
end
end.
%% We know that the most recent stream is the Websocket one.
-ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport,
- streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) ->
+ws_handshake_end(Buffer,
+ #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]},
+ #websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -898,7 +901,7 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
+ ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
stream_ref => StreamRef,
headers => Headers,
@@ -906,4 +909,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
flow => InitialFlow,
handler => Handler,
opts => Opts
- }}}.
+ }}, ReplyTo}.