aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-10-22 18:48:06 +0200
committerLoïc Hoguin <[email protected]>2020-11-02 17:16:57 +0100
commitd5f1a47e9ab758a51b23440eb72a0251527f3f7b (patch)
treee7c27355da10d89483394314a25cd5307ccc7da6
parent465d072abf4a76104d4562ed15345b27fe9a0cff (diff)
downloadgun-http2-websocket.tar.gz
gun-http2-websocket.tar.bz2
gun-http2-websocket.zip
Initial implementation of Websocket over HTTP/2http2-websocket
-rw-r--r--src/gun.erl14
-rw-r--r--src/gun_http.erl53
-rw-r--r--src/gun_http2.erl415
-rw-r--r--src/gun_tunnel.erl10
-rw-r--r--src/gun_ws.erl49
-rw-r--r--test/event_SUITE.erl183
-rw-r--r--test/ws_SUITE.erl139
7 files changed, 590 insertions, 273 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 69dbb6b..e441e52 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -229,6 +229,7 @@
cookie_ignore_informational => boolean(),
flow => pos_integer(),
keepalive => timeout(),
+ notify_settings_changed => boolean(),
%% Options copied from cow_http2_machine.
connection_window_margin_size => 0..16#7fffffff,
@@ -708,6 +709,8 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
| {push, stream_ref(), binary(), binary(), resp_headers()}
| {upgrade, [binary()], resp_headers()}
| {ws, ws_frame()}
+ | {up, http | http2 | raw | socks}
+ | {notify, settings_changed, map()}
| {error, {stream_error | connection_error | down, any()} | timeout}.
-spec await(pid(), stream_ref()) -> await_result().
@@ -747,6 +750,8 @@ await(ServerPid, StreamRef, Timeout, MRef) ->
{ws, Frame};
{gun_tunnel_up, ServerPid, StreamRef, Protocol} ->
{up, Protocol};
+ {gun_notify, ServerPid, Type, Info} ->
+ {notify, Type, Info};
{gun_error, ServerPid, StreamRef, Reason} ->
{error, {stream_error, Reason}};
{gun_error, ServerPid, Reason} ->
@@ -1223,7 +1228,8 @@ connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, EvHandlerState} = Protocol:ws_send(Frames,
- ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ ProtoState, dereference_stream_ref(StreamRef, State),
+ ReplyTo, EvHandler, EvHandlerState0),
commands(Commands, State#state{event_handler_state=EvHandlerState});
connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{
protocol=Protocol=gun_ws, protocol_state=ProtoState,
@@ -1312,10 +1318,10 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts},
%% @todo Maybe better standardize the protocol callbacks argument orders.
connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0})
- when is_list(StreamRef) ->
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, EvHandlerState} = Protocol:ws_send(Frames,
- ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ ProtoState, dereference_stream_ref(StreamRef, State),
+ ReplyTo, EvHandler, EvHandlerState0),
commands(Commands, State#state{event_handler_state=EvHandlerState});
%% Catch-all for the StreamRef-free variant.
connected(cast, {ws_send, ReplyTo, _}, _) ->
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 8b716a5..0eeca05 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -955,56 +955,23 @@ ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
{_, Accept} ->
case cow_ws:encode_key(Key) of
Accept ->
- ws_handshake_extensions(Buffer, State, Ws, Headers);
+ ws_handshake_extensions_and_protocol(Buffer, State, Ws, Headers);
_ ->
close
end
end.
-ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) ->
- case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
- false ->
- ws_handshake_protocols(Buffer, State, Ws, Headers, #{});
- {_, ExtHd} ->
- ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd),
- case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of
+ws_handshake_extensions_and_protocol(Buffer, State,
+ Ws=#websocket{extensions=Extensions0, opts=WsOpts}, Headers) ->
+ case gun_ws:select_extensions(Headers, Extensions0, WsOpts) of
+ close ->
+ close;
+ Extensions ->
+ case gun_ws:select_protocol(Headers, WsOpts) of
close ->
close;
- Extensions ->
- ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions)
- end
- end.
-
-ws_validate_extensions([], _, Acc, _) ->
- Acc;
-ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) ->
- case lists:member(Name, GunExts) of
- true ->
- case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of
- {ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts);
- error -> close
- end;
- %% Fail the connection if extension was not requested.
- false ->
- close
- end;
-%% Fail the connection on unknown extension.
-ws_validate_extensions(_, _, _, _) ->
- close.
-
-%% @todo Validate protocols.
-ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) ->
- case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
- false ->
- ws_handshake_end(Buffer, State, Ws, Headers, Extensions,
- maps:get(default_protocol, Opts, gun_ws_h));
- {_, Proto} ->
- ProtoOpt = maps:get(protocols, Opts, []),
- case lists:keyfind(Proto, 1, ProtoOpt) of
- {_, Handler} ->
- ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler);
- false ->
- close
+ Handler ->
+ ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler)
end
end.
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index cb10029..037c193 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -38,17 +38,19 @@
-export([ws_upgrade/11]).
-export([ws_send/6]).
+-record(websocket_info, {
+ extensions :: [binary()],
+ opts :: gun:ws_opts()
+}).
+
-record(tunnel, {
- %% The tunnel can either go requested->established
- %% or requested->tls_handshake->established, or get
- %% canceled.
state = requested :: requested | established,
%% Destination information.
- destination = undefined :: gun:connect_destination(),
+ destination = undefined :: undefined | gun:connect_destination(),
%% Tunnel information.
- info = undefined :: gun:tunnel_info(),
+ info = undefined :: gun:tunnel_info() | #websocket_info{},
%% Protocol module and state of the outer layer. Only initialized
%% after the TLS handshake has completed when TLS is involved.
@@ -81,6 +83,7 @@
}).
-record(http2_state, {
+ reply_to :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:http2_opts(),
@@ -136,6 +139,8 @@ do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
+do_check_options([{notify_settings_changed, B}|Opts]) when is_boolean(B) ->
+ do_check_options(Opts);
do_check_options([Opt={Name, _}|Opts]) ->
%% We blindly accept all cow_http2_machine options.
HTTP2MachineOpts = [
@@ -166,7 +171,7 @@ opts_name() -> http2_opts.
has_keepalive() -> true.
default_keepalive() -> infinity.
-init(_ReplyTo, Socket, Transport, Opts0) ->
+init(ReplyTo, Socket, Transport, Opts0) ->
%% We have different defaults than the protocol in order
%% to optimize for performance when receiving responses.
Opts = Opts0#{
@@ -178,8 +183,8 @@ init(_ReplyTo, Socket, Transport, Opts0) ->
TunnelTransport = maps:get(tunnel_transport, Opts, undefined),
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts#{message_tag => BaseStreamRef}),
%% @todo Better validate the preface being received.
- State = #http2_state{socket=Socket, transport=Transport, opts=Opts,
- base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport,
+ State = #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport,
+ opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport,
content_handlers=Handlers, http2_machine=HTTP2Machine},
Transport:send(Socket, Preface),
{connected, State}.
@@ -283,7 +288,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan
{update_window(State#http2_state{http2_machine=HTTP2Machine}),
CookieStore, EvHandlerState};
{ok, HTTP2Machine} ->
- {maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame),
+ {maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),
CookieStore, EvHandlerState};
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data,
@@ -313,7 +318,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan
CookieStore, EvHandlerState};
{send, SendData, HTTP2Machine} ->
{StateRet, EvHandlerStateRet} = send_data(
- maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame),
+ maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame),
SendData, EvHandler, EvHandlerState),
{StateRet, CookieStore, EvHandlerStateRet};
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
@@ -325,11 +330,23 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan
CookieStore, EvHandlerState}
end.
-maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
+maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket,
+ transport=Transport, opts=Opts, http2_machine=HTTP2Machine}, Frame) ->
case Frame of
- {settings, _} -> Transport:send(Socket, cow_http2:settings_ack());
- {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque));
- _ -> ok
+ {settings, _} ->
+ %% We notify remote settings changes only if the user requested it.
+ _ = case Opts of
+ #{notify_settings_changed := true} ->
+ ReplyTo ! {gun_notify, self(), settings_changed,
+ cow_http2_machine:get_remote_settings(HTTP2Machine)};
+ _ ->
+ ok
+ end,
+ Transport:send(Socket, cow_http2:settings_ack());
+ {ping, Opaque} ->
+ Transport:send(Socket, cow_http2:ping_ack(Opaque));
+ _ ->
+ ok
end,
State.
@@ -363,7 +380,13 @@ tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
State, EvHandler, EvHandlerState);
tunnel_commands([{error, _Reason}|_], #stream{id=StreamID},
State, _EvHandler, EvHandlerState) ->
- {delete_stream(State, StreamID), EvHandlerState}.
+ {delete_stream(State, StreamID), EvHandlerState};
+%% @todo Set a timeout for closing the Websocket stream.
+tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
+ tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
+%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.)
+tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
+ tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState).
continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->
case ContinueStreamRef of
@@ -409,133 +432,212 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
end,
{maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}.
-%% @todo Make separate functions for inform/connect/normal.
-headers_frame(State=#http2_state{transport=Transport, opts=Opts,
- tunnel_transport=TunnelTransport, content_handlers=Handlers0},
+headers_frame(State0=#http2_state{opts=Opts},
StreamID, IsFin, Headers, #{status := Status}, _BodyLen,
CookieStore0, EvHandler, EvHandlerState0) ->
- Stream = get_stream_by_id(State, StreamID),
+ Stream = get_stream_by_id(State0, StreamID),
#stream{
- ref=StreamRef,
- reply_to=ReplyTo,
authority=Authority,
path=Path,
tunnel=Tunnel
} = Stream,
- CookieStore = gun_cookies:set_cookie_header(scheme(State),
+ CookieStore = gun_cookies:set_cookie_header(scheme(State0),
Authority, Path, Status, Headers, CookieStore0, Opts),
- RealStreamRef = stream_ref(State, StreamRef),
- if
+ {State, EvHandlerState} = if
Status >= 100, Status =< 199 ->
- ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
- EvHandlerState = EvHandler:response_inform(#{
+ headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0);
+ Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin ->
+ headers_frame_connect(State0, Stream, Status, Headers, EvHandler, EvHandlerState0);
+ true ->
+ headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0)
+ end,
+ {State, CookieStore, EvHandlerState}.
+
+headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo},
+ Status, Headers, EvHandler, EvHandlerState0) ->
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
+ EvHandlerState = EvHandler:response_inform(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ {State, EvHandlerState}.
+
+headers_frame_connect(State0=#http2_state{http2_machine=HTTP2Machine0},
+ Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, tunnel=#tunnel{
+ info=#websocket_info{extensions=Extensions0, opts=WsOpts}}},
+ Status, Headers, EvHandler, EvHandlerState0) ->
+ RealStreamRef = stream_ref(State0, StreamRef),
+ EvHandlerState1 = EvHandler:response_headers(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ %% Websocket CONNECT response headers terminate the response but not the stream.
+ EvHandlerState = EvHandler:response_end(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState1),
+ case gun_ws:select_extensions(Headers, Extensions0, WsOpts) of
+ close ->
+ {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
+ State1 = State0#http2_state{http2_machine=HTTP2Machine},
+ State = reset_stream(State1, StreamID, {stream_error, cancel,
+ 'The sec-websocket-extensions header is invalid. (RFC6455 9.1, RFC7692 7)'}),
+ {State, EvHandlerState};
+ Extensions ->
+ case gun_ws:select_protocol(Headers, WsOpts) of
+ close ->
+ {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
+ State1 = State0#http2_state{http2_machine=HTTP2Machine},
+ State = reset_stream(State1, StreamID, {stream_error, cancel,
+ 'The sec-websocket-protocol header is invalid. (RFC6455 4.1)'}),
+ {State, EvHandlerState};
+ Handler ->
+ headers_frame_connect_websocket(State0, Stream, Headers,
+ EvHandler, EvHandlerState, Extensions, Handler)
+ end
+ end;
+headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_transport=TunnelTransport},
+ Stream=#stream{ref=StreamRef, reply_to=ReplyTo, tunnel=Tunnel=#tunnel{
+ destination=Destination=#{host := DestHost, port := DestPort}, info=TunnelInfo0}},
+ Status, Headers, EvHandler, EvHandlerState0) ->
+ RealStreamRef = stream_ref(State, StreamRef),
+ TunnelInfo = TunnelInfo0#{
+ origin_host => DestHost,
+ origin_port => DestPort
+ },
+ ReplyTo ! {gun_response, self(), RealStreamRef, nofin, Status, Headers},
+ EvHandlerState1 = EvHandler:response_headers(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ EvHandlerState2 = EvHandler:origin_changed(#{
+ stream_ref => RealStreamRef,
+ type => connect,
+ origin_scheme => case Destination of
+ #{transport := tls} -> <<"https">>;
+ _ -> <<"http">>
+ end,
+ origin_host => DestHost,
+ origin_port => DestPort
+ }, EvHandlerState1),
+ ContinueStreamRef = continue_stream_ref(State, StreamRef),
+ OriginSocket = #{
+ gun_pid => self(),
+ reply_to => ReplyTo,
+ stream_ref => RealStreamRef,
+ handle_continue_stream_ref => ContinueStreamRef
+ },
+ Proto = gun_tunnel,
+ ProtoOpts = case Destination of
+ #{transport := tls} ->
+ Protocols = maps:get(protocols, Destination, [http2, http]),
+ TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost),
+ HandshakeEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
- status => Status,
- headers => Headers
- }, EvHandlerState0),
- {State, CookieStore, EvHandlerState};
- Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested ->
- #tunnel{destination=Destination, info=TunnelInfo0} = Tunnel,
- #{host := DestHost, port := DestPort} = Destination,
- TunnelInfo = TunnelInfo0#{
- origin_host => DestHost,
- origin_port => DestPort
+ tls_opts => TLSOpts,
+ timeout => maps:get(tls_handshake_timeout, Destination, infinity)
},
- ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
- EvHandlerState1 = EvHandler:response_headers(#{
+ Opts#{
stream_ref => RealStreamRef,
- reply_to => ReplyTo,
- status => Status,
- headers => Headers
- }, EvHandlerState0),
- EvHandlerState2 = EvHandler:origin_changed(#{
- stream_ref => RealStreamRef,
- type => connect,
- origin_scheme => case Destination of
- #{transport := tls} -> <<"https">>;
- _ -> <<"http">>
- end,
- origin_host => DestHost,
- origin_port => DestPort
- }, EvHandlerState1),
- ContinueStreamRef = continue_stream_ref(State, StreamRef),
- OriginSocket = #{
- gun_pid => self(),
- reply_to => ReplyTo,
+ tunnel => #{
+ type => connect,
+ transport_name => case TunnelTransport of
+ undefined -> Transport:name();
+ _ -> TunnelTransport
+ end,
+ protocol_name => http2,
+ info => TunnelInfo,
+ handshake_event => HandshakeEvent,
+ protocols => Protocols
+ }
+ };
+ _ ->
+ [NewProtocol] = maps:get(protocols, Destination, [http]),
+ Opts#{
stream_ref => RealStreamRef,
- handle_continue_stream_ref => ContinueStreamRef
- },
- Proto = gun_tunnel,
- ProtoOpts = case Destination of
- #{transport := tls} ->
- Protocols = maps:get(protocols, Destination, [http2, http]),
- TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost),
- HandshakeEvent = #{
- stream_ref => RealStreamRef,
- reply_to => ReplyTo,
- tls_opts => TLSOpts,
- timeout => maps:get(tls_handshake_timeout, Destination, infinity)
- },
- Opts#{
- stream_ref => RealStreamRef,
- tunnel => #{
- type => connect,
- transport_name => case TunnelTransport of
- undefined -> Transport:name();
- _ -> TunnelTransport
- end,
- protocol_name => http2,
- info => TunnelInfo,
- handshake_event => HandshakeEvent,
- protocols => Protocols
- }
- };
- _ ->
- [NewProtocol] = maps:get(protocols, Destination, [http]),
- Opts#{
- stream_ref => RealStreamRef,
- tunnel => #{
- type => connect,
- transport_name => case TunnelTransport of
- undefined -> Transport:name();
- _ -> TunnelTransport
- end,
- protocol_name => http2,
- info => TunnelInfo,
- new_protocol => NewProtocol
- }
- }
- end,
- {tunnel, ProtoState, EvHandlerState} = Proto:init(
- ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{
- info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}),
- CookieStore, EvHandlerState};
- true ->
- ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
- EvHandlerState1 = EvHandler:response_headers(#{
+ tunnel => #{
+ type => connect,
+ transport_name => case TunnelTransport of
+ undefined -> Transport:name();
+ _ -> TunnelTransport
+ end,
+ protocol_name => http2,
+ info => TunnelInfo,
+ new_protocol => NewProtocol
+ }
+ }
+ end,
+ {tunnel, ProtoState, EvHandlerState} = Proto:init(
+ ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established,
+ info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}),
+ EvHandlerState}.
+
+headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
+ tunnel=Tunnel=#tunnel{info=#websocket_info{opts=WsOpts}}},
+ Headers, EvHandler, EvHandlerState0, Extensions, Handler) ->
+ RealStreamRef = stream_ref(State, StreamRef),
+ ContinueStreamRef = continue_stream_ref(State, StreamRef),
+ OriginSocket = #{
+ gun_pid => self(),
+ reply_to => ReplyTo,
+ stream_ref => RealStreamRef,
+ handle_continue_stream_ref => ContinueStreamRef
+ },
+ ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers},
+ Proto = gun_ws,
+ EvHandlerState = EvHandler:protocol_changed(#{
+ stream_ref => RealStreamRef,
+ protocol => Proto:name()
+ }, EvHandlerState0),
+ ProtoOpts = #{
+ stream_ref => RealStreamRef,
+ headers => Headers,
+ extensions => Extensions,
+ flow => maps:get(flow, WsOpts, infinity),
+ handler => Handler,
+ opts => WsOpts
+ },
+ {connected_ws_only, ProtoState} = Proto:init(
+ ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts),
+ {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established,
+ protocol=Proto, protocol_state=ProtoState}}),
+ EvHandlerState}.
+
+headers_frame_response(State=#http2_state{content_handlers=Handlers0},
+ Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
+ IsFin, Status, Headers, EvHandler, EvHandlerState0) ->
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
+ EvHandlerState1 = EvHandler:response_headers(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ {Handlers, EvHandlerState} = case IsFin of
+ fin ->
+ EvHandlerState2 = EvHandler:response_end(#{
stream_ref => RealStreamRef,
- reply_to => ReplyTo,
- status => Status,
- headers => Headers
- }, EvHandlerState0),
- {Handlers, EvHandlerState} = case IsFin of
- fin ->
- EvHandlerState2 = EvHandler:response_end(#{
- stream_ref => RealStreamRef,
- reply_to => ReplyTo
- }, EvHandlerState1),
- {undefined, EvHandlerState2};
- nofin ->
- {gun_content_handler:init(ReplyTo, RealStreamRef,
- Status, Headers, Handlers0), EvHandlerState1}
- end,
- %% @todo Disable the tunnel if any.
- {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}),
- StreamID, remote, IsFin),
- CookieStore, EvHandlerState}
- end.
+ reply_to => ReplyTo
+ }, EvHandlerState1),
+ {undefined, EvHandlerState2};
+ nofin ->
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
+ Status, Headers, Handlers0), EvHandlerState1}
+ end,
+ %% We disable the tunnel, if any, when receiving any non 2xx response.
+ {maybe_delete_stream(store_stream(State,
+ Stream#stream{handler_state=Handlers, tunnel=undefined}),
+ StreamID, remote, IsFin), EvHandlerState}.
trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
@@ -1195,7 +1297,58 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->
down(#http2_state{stream_refs=Refs}) ->
maps:keys(Refs).
-%% Websocket upgrades are currently only accepted when tunneled.
+ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
+ Host, Port, Path, Headers0, WsOpts,
+ CookieStore0, EvHandler, EvHandlerState0)
+ when is_reference(StreamRef) ->
+ {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
+ <<"CONNECT">>, HTTP2Machine0),
+ {ok, PseudoHeaders, Headers1, CookieStore} = prepare_headers(State,
+ <<"CONNECT">>, Host, Port, Path, Headers0, CookieStore0),
+ {Headers2, GunExtensions} = case maps:get(compress, WsOpts, false) of
+ true ->
+ {[{<<"sec-websocket-extensions">>,
+ <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}
+ |Headers1], [<<"permessage-deflate">>]};
+ false ->
+ {Headers1, []}
+ end,
+ Headers3 = case maps:get(protocols, WsOpts, []) of
+ [] ->
+ Headers2;
+ ProtoOpt ->
+ << _, _, Proto/bits >> = iolist_to_binary([[<<", ">>, P] || {P, _} <- ProtoOpt]),
+ [{<<"sec-websocket-protocol">>, Proto}|Headers2]
+ end,
+ Headers = [{<<"sec-websocket-version">>, <<"13">>}|Headers3],
+ Authority = maps:get(authority, PseudoHeaders),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ path => Path,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
+ StreamID, HTTP2Machine1, nofin, PseudoHeaders#{protocol => <<"websocket">>}, Headers),
+ Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
+ InitialFlow = maps:get(flow, WsOpts, infinity),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
+ authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{
+ extensions=GunExtensions, opts=WsOpts}}},
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ CookieStore, EvHandlerState};
ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
@@ -1210,7 +1363,11 @@ ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
%% @todo Error conditions?
end.
-ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
+ws_send(Frames, State0, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
+ StreamRef = case RealStreamRef of
+ [SR|_] -> SR;
+ _ -> RealStreamRef
+ end,
case get_stream_by_ref(State0, StreamRef) of
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} ->
{Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState,
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
index 7c29684..2594d24 100644
--- a/src/gun_tunnel.erl
+++ b/src/gun_tunnel.erl
@@ -340,12 +340,12 @@ connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
EvHandler, EvHandlerState0),
{State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
-cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
+cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
- StreamRef = maybe_dereference(State0, StreamRef0),
- {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
- {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
- {{state, State}, EvHandlerState}.
+ StreamRef = maybe_dereference(State, StreamRef0),
+ {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, StreamRef,
+ ReplyTo, EvHandler, EvHandlerState0),
+ {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) ->
case Proto:timeout(ProtoState0, Msg, TRef) of
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index f413f94..a1fdfae 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -15,12 +15,15 @@
-module(gun_ws).
-export([check_options/1]).
+-export([select_extensions/3]).
+-export([select_protocol/2]).
-export([name/0]).
-export([opts_name/0]).
-export([has_keepalive/0]).
-export([default_keepalive/0]).
-export([init/4]).
-export([handle/5]).
+-export([handle_continue/6]).
-export([update_flow/4]).
-export([closing/4]).
-export([close/4]).
@@ -89,6 +92,47 @@ do_check_options([{user_opts, _}|Opts]) ->
do_check_options([Opt|_]) ->
{error, {options, {ws, Opt}}}.
+select_extensions(Headers, Extensions0, Opts) ->
+ case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
+ false ->
+ #{};
+ {_, ExtHd} ->
+ ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd),
+ validate_extensions(ParsedExtHd, Extensions0, Opts, #{})
+ end.
+
+validate_extensions([], _, _, Acc) ->
+ Acc;
+validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], Extensions, Opts, Acc0) ->
+ case lists:member(Name, Extensions) of
+ true ->
+ case cow_ws:validate_permessage_deflate(Params, Acc0, Opts) of
+ {ok, Acc} -> validate_extensions(Tail, Extensions, Opts, Acc);
+ error -> close
+ end;
+ %% Fail the connection if extension was not requested.
+ false ->
+ close
+ end;
+%% Fail the connection on unknown extension.
+validate_extensions(_, _, _, _) ->
+ close.
+
+%% @todo Validate protocols.
+select_protocol(Headers, Opts) ->
+ case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
+ false ->
+ maps:get(default_protocol, Opts, gun_ws_h);
+ {_, Proto} ->
+ ProtoOpt = maps:get(protocols, Opts, []),
+ case lists:keyfind(Proto, 1, ProtoOpt) of
+ {_, Handler} ->
+ Handler;
+ false ->
+ close
+ end
+ end.
+
name() -> ws.
opts_name() -> ws_opts.
has_keepalive() -> true.
@@ -176,6 +220,11 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
closing(Error, State, EvHandler, EvHandlerState)
end.
+handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data},
+ #ws_state{}, CookieStore, _EvHandler, EvHandlerState)
+ when is_reference(ContinueStreamRef) ->
+ {{send, IsFin, Data}, CookieStore, EvHandlerState}.
+
maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
{[
{state, State},
diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl
index d883dc5..8be10b1 100644
--- a/test/event_SUITE.erl
+++ b/test/event_SUITE.erl
@@ -34,25 +34,25 @@ groups() ->
HTTP1Tests = [T || T <- Tests, lists:sublist(atom_to_list(T), 6) =:= "http1_"],
%% Push is not possible over HTTP/1.1.
PushTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 5) =:= "push_"],
- %% We currently do not support Websocket over HTTP/2.
- WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"],
[
{http, [parallel], Tests -- [cancel_remote, cancel_remote_connect|PushTests]},
- {http2, [parallel], (Tests -- WsTests) -- HTTP1Tests}
+ {http2, [parallel], Tests -- HTTP1Tests}
].
init_per_suite(Config) ->
- ProtoOpts = #{env => #{
- dispatch => cowboy_router:compile([{'_', [
- {"/", hello_h, []},
- {"/empty", empty_h, []},
- {"/inform", inform_h, []},
- {"/push", push_h, []},
- {"/stream", stream_h, []},
- {"/trailers", trailers_h, []},
- {"/ws", ws_echo_h, []}
- ]}])
- }},
+ Routes = [
+ {"/", hello_h, []},
+ {"/empty", empty_h, []},
+ {"/inform", inform_h, []},
+ {"/push", push_h, []},
+ {"/stream", stream_h, []},
+ {"/trailers", trailers_h, []},
+ {"/ws", ws_echo_h, []}
+ ],
+ ProtoOpts = #{
+ enable_connect_protocol => true,
+ env => #{dispatch => cowboy_router:compile([{'_', Routes}])}
+ },
{ok, _} = cowboy:start_clear({?MODULE, tcp}, [], ProtoOpts),
TCPOriginPort = ranch:get_port({?MODULE, tcp}),
{ok, _} = cowboy:start_tls({?MODULE, tls}, ct_helper:get_certs_from_ets(), ProtoOpts),
@@ -1227,8 +1227,10 @@ http1_response_end_body_close(Config) ->
ws_upgrade(Config) ->
doc("Confirm that the ws_upgrade event callback is called."),
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
StreamRef = gun:ws_upgrade(Pid, "/ws"),
ReplyTo = self(),
#{
@@ -1258,11 +1260,15 @@ do_ws_upgrade_connect(Config, ProxyProtocol) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
#{
stream_ref := StreamRef2,
@@ -1273,8 +1279,10 @@ do_ws_upgrade_connect(Config, ProxyProtocol) ->
ws_upgrade_all_events(Config) ->
doc("Confirm that a Websocket upgrade triggers all relevant events."),
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, OriginPort} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
StreamRef = gun:ws_upgrade(Pid, "/ws"),
ReplyTo = self(),
#{
@@ -1283,11 +1291,15 @@ ws_upgrade_all_events(Config) ->
opts := #{}
} = do_receive_event(ws_upgrade),
Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ Method = case Protocol of
+ http -> <<"GET">>;
+ http2 -> <<"CONNECT">>
+ end,
#{
stream_ref := StreamRef,
reply_to := ReplyTo,
function := ws_upgrade,
- method := <<"GET">>,
+ method := Method,
authority := EventAuthority1,
path := "/ws",
headers := [_|_]
@@ -1297,7 +1309,7 @@ ws_upgrade_all_events(Config) ->
stream_ref := StreamRef,
reply_to := ReplyTo,
function := ws_upgrade,
- method := <<"GET">>,
+ method := Method,
authority := EventAuthority2,
path := "/ws",
headers := [_|_]
@@ -1311,12 +1323,26 @@ ws_upgrade_all_events(Config) ->
stream_ref := StreamRef,
reply_to := ReplyTo
} = do_receive_event(response_start),
- #{
- stream_ref := StreamRef,
- reply_to := ReplyTo,
- status := 101,
- headers := [_|_]
- } = do_receive_event(response_inform),
+ _ = case Protocol of
+ http ->
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo,
+ status := 101,
+ headers := [_|_]
+ } = do_receive_event(response_inform);
+ http2 ->
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo,
+ status := 200,
+ headers := [_|_]
+ } = do_receive_event(response_headers),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo
+ } = do_receive_event(response_end)
+ end,
#{
stream_ref := StreamRef,
protocol := ws
@@ -1343,16 +1369,27 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
%% Skip all CONNECT-related events that may conflict.
_ = do_receive_event(request_start),
_ = do_receive_event(request_headers),
_ = do_receive_event(request_end),
_ = do_receive_event(response_start),
+ case OriginProtocol of
+ http -> ok;
+ http2 ->
+ _ = do_receive_event(response_headers),
+% _ = do_receive_event(response_end), @todo Probably should response_end CONNECT responses for both protocols.
+ ok
+ end,
_ = do_receive_event(protocol_changed),
%% Check the Websocket events.
StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
@@ -1362,11 +1399,15 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->
opts := #{}
} = do_receive_event(ws_upgrade),
Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ Method = case OriginProtocol of
+ http -> <<"GET">>;
+ http2 -> <<"CONNECT">>
+ end,
#{
stream_ref := StreamRef2,
reply_to := ReplyTo,
function := ws_upgrade,
- method := <<"GET">>,
+ method := Method,
authority := EventAuthority1,
path := "/ws",
headers := [_|_]
@@ -1376,7 +1417,7 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->
stream_ref := StreamRef2,
reply_to := ReplyTo,
function := ws_upgrade,
- method := <<"GET">>,
+ method := Method,
authority := EventAuthority2,
path := "/ws",
headers := [_|_]
@@ -1390,12 +1431,26 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->
stream_ref := StreamRef2,
reply_to := ReplyTo
} = do_receive_event(response_start),
- #{
- stream_ref := StreamRef2,
- reply_to := ReplyTo,
- status := 101,
- headers := [_|_]
- } = do_receive_event(response_inform),
+ _ = case OriginProtocol of
+ http ->
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ status := 101,
+ headers := [_|_]
+ } = do_receive_event(response_inform);
+ http2 ->
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo,
+ status := 200,
+ headers := [_|_]
+ } = do_receive_event(response_headers),
+ #{
+ stream_ref := StreamRef2,
+ reply_to := ReplyTo
+ } = do_receive_event(response_end)
+ end,
#{
stream_ref := StreamRef2,
protocol := ws
@@ -1406,11 +1461,13 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) ->
ws_recv_frame_start(Config) ->
doc("Confirm that the ws_recv_frame_start event callback is called."),
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
StreamRef = gun:ws_upgrade(Pid, "/ws"),
{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef),
- gun:ws_send(Pid, {text, <<"Hello!">>}),
+ gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),
ReplyTo = self(),
#{
stream_ref := StreamRef,
@@ -1440,11 +1497,15 @@ do_ws_recv_frame_start_connect(Config, ProxyProtocol) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
@@ -1458,11 +1519,13 @@ do_ws_recv_frame_start_connect(Config, ProxyProtocol) ->
ws_recv_frame_header(Config) ->
doc("Confirm that the ws_recv_frame_header event callback is called."),
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
StreamRef = gun:ws_upgrade(Pid, "/ws"),
{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef),
- gun:ws_send(Pid, {text, <<"Hello!">>}),
+ gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),
ReplyTo = self(),
#{
stream_ref := StreamRef,
@@ -1496,11 +1559,15 @@ do_ws_recv_frame_header_connect(Config, ProxyProtocol) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
@@ -1518,11 +1585,13 @@ do_ws_recv_frame_header_connect(Config, ProxyProtocol) ->
ws_recv_frame_end(Config) ->
doc("Confirm that the ws_recv_frame_end event callback is called."),
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
StreamRef = gun:ws_upgrade(Pid, "/ws"),
{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef),
- gun:ws_send(Pid, {text, <<"Hello!">>}),
+ gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),
ReplyTo = self(),
#{
stream_ref := StreamRef,
@@ -1553,11 +1622,15 @@ do_ws_recv_frame_end_connect(Config, ProxyProtocol) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
@@ -1581,11 +1654,13 @@ ws_send_frame_end(Config) ->
do_ws_send_frame(Config, ?FUNCTION_NAME).
do_ws_send_frame(Config, EventName) ->
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
StreamRef = gun:ws_upgrade(Pid, "/ws"),
{upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef),
- gun:ws_send(Pid, {text, <<"Hello!">>}),
+ gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}),
ReplyTo = self(),
#{
stream_ref := StreamRef,
@@ -1621,11 +1696,15 @@ do_ws_send_frame_connect(Config, ProxyProtocol, EventName) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2),
gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}),
@@ -1641,8 +1720,10 @@ do_ws_send_frame_connect(Config, ProxyProtocol, EventName) ->
ws_protocol_changed(Config) ->
doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success."),
+ Protocol = config(name, config(tc_group_properties, Config)),
{ok, Pid, _} = do_gun_open(Config),
- {ok, _} = gun:await_up(Pid),
+ {ok, Protocol} = gun:await_up(Pid),
+ ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid),
_ = gun:ws_upgrade(Pid, "/ws"),
#{
protocol := ws
@@ -1668,11 +1749,15 @@ do_ws_protocol_changed_connect(Config, ProxyProtocol) ->
StreamRef1 = gun:connect(ConnPid, #{
host => "localhost",
port => OriginPort,
- protocols => [OriginProtocol]
+ protocols => [case OriginProtocol of
+ http -> http;
+ http2 -> {http2, #{notify_settings_changed => true}}
+ end]
}, []),
%% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2...
{response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1),
{up, OriginProtocol} = gun:await(ConnPid, StreamRef1),
+ ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid),
#{
stream_ref := StreamRef1,
protocol := OriginProtocol
@@ -1951,6 +2036,7 @@ do_gun_open(Config) ->
do_gun_open(OriginPort, Config) ->
Opts = #{
event_handler => {?MODULE, self()},
+ http2_opts => #{notify_settings_changed => true},
protocols => [config(name, config(tc_group_properties, Config))]
},
{ok, Pid} = gun:open("localhost", OriginPort, Opts),
@@ -1960,6 +2046,7 @@ do_gun_open_tls(Config) ->
OriginPort = config(tls_origin_port, Config),
Opts = #{
event_handler => {?MODULE, self()},
+ http2_opts => #{notify_settings_changed => true},
protocols => [config(name, config(tc_group_properties, Config))],
transport => tls
},
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index c04acc2..201403e 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -22,19 +22,30 @@
%% ct.
all() ->
- [{group, ws}].
+ [{group, http}, {group, http2}].
groups() ->
- [{ws, [], ct_helper:all(?MODULE)}].
+ Tests = ct_helper:all(?MODULE),
+ HTTP1Tests = [
+ http10_upgrade_error,
+ http11_request_error,
+ http11_keepalive,
+ http11_keepalive_default_silence_pings
+ ],
+ [
+ {http, [], Tests},
+ {http2, [], Tests -- HTTP1Tests}
+ ].
init_per_suite(Config) ->
Routes = [
{"/", ws_echo_h, []},
{"/reject", ws_reject_h, []}
],
- {ok, _} = cowboy:start_clear(ws, [], #{env => #{
- dispatch => cowboy_router:compile([{'_', Routes}])
- }}),
+ {ok, _} = cowboy:start_clear(ws, [], #{
+ enable_connect_protocol => true,
+ env => #{dispatch => cowboy_router:compile([{'_', Routes}])}
+ }),
Port = ranch:get_port(ws),
[{port, Port}|Config].
@@ -45,16 +56,37 @@ end_per_suite(_) ->
await(Config) ->
doc("Ensure gun:await/2 can be used to receive Websocket frames."),
- {ok, ConnPid} = gun:open("localhost", config(port, Config)),
- {ok, _} = gun:await_up(ConnPid),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ protocols => [Protocol],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ do_await_enable_connect_protocol(Protocol, ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/", []),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
Frame = {text, <<"Hello!">>},
- gun:ws_send(ConnPid, Frame),
+ gun:ws_send(ConnPid, StreamRef, Frame),
{ws, Frame} = gun:await(ConnPid, StreamRef),
gun:close(ConnPid).
-error_http10_upgrade(Config) ->
+headers_normalized_upgrade(Config) ->
+ doc("Headers passed to ws_upgrade are normalized before being used."),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ protocols => [Protocol],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ do_await_enable_connect_protocol(Protocol, ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", #{
+ atom_header_name => <<"value">>,
+ "string_header_name" => <<"value">>
+ }),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).
+
+http10_upgrade_error(Config) ->
doc("Attempting to upgrade HTTP/1.0 to Websocket produces an error."),
{ok, ConnPid} = gun:open("localhost", config(port, Config), #{
http_opts => #{version => 'HTTP/1.0'}
@@ -70,28 +102,7 @@ error_http10_upgrade(Config) ->
error(timeout)
end.
-headers_normalized_upgrade(Config) ->
- doc("Headers passed to ws_upgrade are normalized before being used."),
- {ok, ConnPid} = gun:open("localhost", config(port, Config)),
- {ok, _} = gun:await_up(ConnPid),
- StreamRef = gun:ws_upgrade(ConnPid, "/", #{
- atom_header_name => <<"value">>,
- "string_header_name" => <<"value">>
- }),
- {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
- gun:close(ConnPid).
-
-error_http_request(Config) ->
- doc("Ensure that requests are rejected while using Websocket."),
- {ok, ConnPid} = gun:open("localhost", config(port, Config)),
- {ok, _} = gun:await_up(ConnPid),
- StreamRef1 = gun:ws_upgrade(ConnPid, "/", []),
- {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef1),
- StreamRef2 = gun:get(ConnPid, "/"),
- {error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2),
- gun:close(ConnPid).
-
-keepalive(Config) ->
+http11_keepalive(Config) ->
doc("Ensure that Gun automatically sends ping frames."),
{ok, ConnPid} = gun:open("localhost", config(port, Config), #{
ws_opts => #{
@@ -106,7 +117,7 @@ keepalive(Config) ->
{ws, pong} = gun:await(ConnPid, StreamRef),
gun:close(ConnPid).
-keepalive_default_silence_pings(Config) ->
+http11_keepalive_default_silence_pings(Config) ->
doc("Ensure that Gun does not forward ping/pong by default."),
{ok, ConnPid} = gun:open("localhost", config(port, Config), #{
ws_opts => #{keepalive => 100}
@@ -118,10 +129,25 @@ keepalive_default_silence_pings(Config) ->
{error, timeout} = gun:await(ConnPid, StreamRef, 1000),
gun:close(ConnPid).
-reject_upgrade(Config) ->
- doc("Ensure Websocket connections can be rejected."),
+http11_request_error(Config) ->
+ doc("Ensure that HTTP/1.1 requests are rejected while using Websocket."),
{ok, ConnPid} = gun:open("localhost", config(port, Config)),
{ok, _} = gun:await_up(ConnPid),
+ StreamRef1 = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef1),
+ StreamRef2 = gun:get(ConnPid, "/"),
+ {error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2),
+ gun:close(ConnPid).
+
+reject_upgrade(Config) ->
+ doc("Ensure Websocket connections can be rejected."),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ protocols => [Protocol],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ do_await_enable_connect_protocol(Protocol, ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/reject", []),
receive
{gun_response, ConnPid, StreamRef, nofin, 400, _} ->
@@ -134,7 +160,7 @@ reject_upgrade(Config) ->
end.
reply_to(Config) ->
- doc("Ensure we can send a list of frames in one gun:ws_send call."),
+ doc("Ensure the reply_to request option is respected."),
Self = self(),
Frame = {text, <<"Hello!">>},
ReplyTo = spawn(fun() ->
@@ -144,36 +170,61 @@ reply_to(Config) ->
{ws, Frame} = gun:await(ConnPid, StreamRef),
Self ! {self(), ok}
end),
- {ok, ConnPid} = gun:open("localhost", config(port, Config)),
- {ok, _} = gun:await_up(ConnPid),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ protocols => [Protocol],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ do_await_enable_connect_protocol(Protocol, ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{reply_to => ReplyTo}),
ReplyTo ! {ConnPid, StreamRef},
- receive {ReplyTo, ready} -> gun:ws_send(ConnPid, Frame) after 1000 -> error(timeout) end,
+ receive {ReplyTo, ready} -> gun:ws_send(ConnPid, StreamRef, Frame) after 1000 -> error(timeout) end,
receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end.
send_many(Config) ->
doc("Ensure we can send a list of frames in one gun:ws_send call."),
- {ok, ConnPid} = gun:open("localhost", config(port, Config)),
- {ok, _} = gun:await_up(ConnPid),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ protocols => [Protocol],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ do_await_enable_connect_protocol(Protocol, ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/", []),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
Frame1 = {text, <<"Hello!">>},
Frame2 = {binary, <<"World!">>},
- gun:ws_send(ConnPid, [Frame1, Frame2]),
+ gun:ws_send(ConnPid, StreamRef, [Frame1, Frame2]),
{ws, Frame1} = gun:await(ConnPid, StreamRef),
{ws, Frame2} = gun:await(ConnPid, StreamRef),
gun:close(ConnPid).
send_many_close(Config) ->
doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."),
- {ok, ConnPid} = gun:open("localhost", config(port, Config)),
- {ok, _} = gun:await_up(ConnPid),
+ Protocol = config(name, config(tc_group_properties, Config)),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ protocols => [Protocol],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ do_await_enable_connect_protocol(Protocol, ConnPid),
StreamRef = gun:ws_upgrade(ConnPid, "/", []),
{upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
Frame1 = {text, <<"Hello!">>},
Frame2 = {binary, <<"World!">>},
- gun:ws_send(ConnPid, [Frame1, Frame2, close]),
+ gun:ws_send(ConnPid, StreamRef, [Frame1, Frame2, close]),
{ws, Frame1} = gun:await(ConnPid, StreamRef),
{ws, Frame2} = gun:await(ConnPid, StreamRef),
{ws, close} = gun:await(ConnPid, StreamRef),
gun:close(ConnPid).
+
+%% Internal.
+
+do_await_enable_connect_protocol(http, _) ->
+ ok;
+%% We cannot do a CONNECT :protocol request until the server tells us we can.
+do_await_enable_connect_protocol(http2, ConnPid) ->
+ {notify, settings_changed, #{enable_connect_protocol := true}}
+ = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1?
+ ok.