From d5f1a47e9ab758a51b23440eb72a0251527f3f7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 22 Oct 2020 18:48:06 +0200 Subject: Initial implementation of Websocket over HTTP/2 --- src/gun.erl | 14 +- src/gun_http.erl | 53 ++----- src/gun_http2.erl | 415 +++++++++++++++++++++++++++++++++++---------------- src/gun_tunnel.erl | 10 +- src/gun_ws.erl | 49 ++++++ test/event_SUITE.erl | 183 +++++++++++++++++------ test/ws_SUITE.erl | 139 +++++++++++------ 7 files changed, 590 insertions(+), 273 deletions(-) diff --git a/src/gun.erl b/src/gun.erl index 69dbb6b..e441e52 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -229,6 +229,7 @@ cookie_ignore_informational => boolean(), flow => pos_integer(), keepalive => timeout(), + notify_settings_changed => boolean(), %% Options copied from cow_http2_machine. connection_window_margin_size => 0..16#7fffffff, @@ -708,6 +709,8 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> | {push, stream_ref(), binary(), binary(), resp_headers()} | {upgrade, [binary()], resp_headers()} | {ws, ws_frame()} + | {up, http | http2 | raw | socks} + | {notify, settings_changed, map()} | {error, {stream_error | connection_error | down, any()} | timeout}. -spec await(pid(), stream_ref()) -> await_result(). @@ -747,6 +750,8 @@ await(ServerPid, StreamRef, Timeout, MRef) -> {ws, Frame}; {gun_tunnel_up, ServerPid, StreamRef, Protocol} -> {up, Protocol}; + {gun_notify, ServerPid, Type, Info} -> + {notify, Type, Info}; {gun_error, ServerPid, StreamRef, Reason} -> {error, {stream_error, Reason}}; {gun_error, ServerPid, Reason} -> @@ -1223,7 +1228,8 @@ connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, EvHandlerState} = Protocol:ws_send(Frames, - ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + ProtoState, dereference_stream_ref(StreamRef, State), + ReplyTo, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{ protocol=Protocol=gun_ws, protocol_state=ProtoState, @@ -1312,10 +1318,10 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts}, %% @todo Maybe better standardize the protocol callbacks argument orders. connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) - when is_list(StreamRef) -> + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {Commands, EvHandlerState} = Protocol:ws_send(Frames, - ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + ProtoState, dereference_stream_ref(StreamRef, State), + ReplyTo, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); %% Catch-all for the StreamRef-free variant. connected(cast, {ws_send, ReplyTo, _}, _) -> diff --git a/src/gun_http.erl b/src/gun_http.erl index 8b716a5..0eeca05 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -955,56 +955,23 @@ ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) -> {_, Accept} -> case cow_ws:encode_key(Key) of Accept -> - ws_handshake_extensions(Buffer, State, Ws, Headers); + ws_handshake_extensions_and_protocol(Buffer, State, Ws, Headers); _ -> close end end. -ws_handshake_extensions(Buffer, State, Ws=#websocket{extensions=Extensions0, opts=Opts}, Headers) -> - case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of - false -> - ws_handshake_protocols(Buffer, State, Ws, Headers, #{}); - {_, ExtHd} -> - ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd), - case ws_validate_extensions(ParsedExtHd, Extensions0, #{}, Opts) of +ws_handshake_extensions_and_protocol(Buffer, State, + Ws=#websocket{extensions=Extensions0, opts=WsOpts}, Headers) -> + case gun_ws:select_extensions(Headers, Extensions0, WsOpts) of + close -> + close; + Extensions -> + case gun_ws:select_protocol(Headers, WsOpts) of close -> close; - Extensions -> - ws_handshake_protocols(Buffer, State, Ws, Headers, Extensions) - end - end. - -ws_validate_extensions([], _, Acc, _) -> - Acc; -ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) -> - case lists:member(Name, GunExts) of - true -> - case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of - {ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts); - error -> close - end; - %% Fail the connection if extension was not requested. - false -> - close - end; -%% Fail the connection on unknown extension. -ws_validate_extensions(_, _, _, _) -> - close. - -%% @todo Validate protocols. -ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensions) -> - case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of - false -> - ws_handshake_end(Buffer, State, Ws, Headers, Extensions, - maps:get(default_protocol, Opts, gun_ws_h)); - {_, Proto} -> - ProtoOpt = maps:get(protocols, Opts, []), - case lists:keyfind(Proto, 1, ProtoOpt) of - {_, Handler} -> - ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler); - false -> - close + Handler -> + ws_handshake_end(Buffer, State, Ws, Headers, Extensions, Handler) end end. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index cb10029..037c193 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -38,17 +38,19 @@ -export([ws_upgrade/11]). -export([ws_send/6]). +-record(websocket_info, { + extensions :: [binary()], + opts :: gun:ws_opts() +}). + -record(tunnel, { - %% The tunnel can either go requested->established - %% or requested->tls_handshake->established, or get - %% canceled. state = requested :: requested | established, %% Destination information. - destination = undefined :: gun:connect_destination(), + destination = undefined :: undefined | gun:connect_destination(), %% Tunnel information. - info = undefined :: gun:tunnel_info(), + info = undefined :: gun:tunnel_info() | #websocket_info{}, %% Protocol module and state of the outer layer. Only initialized %% after the TLS handshake has completed when TLS is involved. @@ -81,6 +83,7 @@ }). -record(http2_state, { + reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:http2_opts(), @@ -136,6 +139,8 @@ do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); +do_check_options([{notify_settings_changed, B}|Opts]) when is_boolean(B) -> + do_check_options(Opts); do_check_options([Opt={Name, _}|Opts]) -> %% We blindly accept all cow_http2_machine options. HTTP2MachineOpts = [ @@ -166,7 +171,7 @@ opts_name() -> http2_opts. has_keepalive() -> true. default_keepalive() -> infinity. -init(_ReplyTo, Socket, Transport, Opts0) -> +init(ReplyTo, Socket, Transport, Opts0) -> %% We have different defaults than the protocol in order %% to optimize for performance when receiving responses. Opts = Opts0#{ @@ -178,8 +183,8 @@ init(_ReplyTo, Socket, Transport, Opts0) -> TunnelTransport = maps:get(tunnel_transport, Opts, undefined), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts#{message_tag => BaseStreamRef}), %% @todo Better validate the preface being received. - State = #http2_state{socket=Socket, transport=Transport, opts=Opts, - base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, + State = #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport, + opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, content_handlers=Handlers, http2_machine=HTTP2Machine}, Transport:send(Socket, Preface), {connected, State}. @@ -283,7 +288,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan {update_window(State#http2_state{http2_machine=HTTP2Machine}), CookieStore, EvHandlerState}; {ok, HTTP2Machine} -> - {maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), + {maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame), CookieStore, EvHandlerState}; {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, @@ -313,7 +318,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan CookieStore, EvHandlerState}; {send, SendData, HTTP2Machine} -> {StateRet, EvHandlerStateRet} = send_data( - maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), + maybe_ack_or_notify(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, EvHandler, EvHandlerState), {StateRet, CookieStore, EvHandlerStateRet}; {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> @@ -325,11 +330,23 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, CookieStore, EvHan CookieStore, EvHandlerState} end. -maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> +maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, + transport=Transport, opts=Opts, http2_machine=HTTP2Machine}, Frame) -> case Frame of - {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); - {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); - _ -> ok + {settings, _} -> + %% We notify remote settings changes only if the user requested it. + _ = case Opts of + #{notify_settings_changed := true} -> + ReplyTo ! {gun_notify, self(), settings_changed, + cow_http2_machine:get_remote_settings(HTTP2Machine)}; + _ -> + ok + end, + Transport:send(Socket, cow_http2:settings_ack()); + {ping, Opaque} -> + Transport:send(Socket, cow_http2:ping_ack(Opaque)); + _ -> + ok end, State. @@ -363,7 +380,13 @@ tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, State, EvHandler, EvHandlerState); tunnel_commands([{error, _Reason}|_], #stream{id=StreamID}, State, _EvHandler, EvHandlerState) -> - {delete_stream(State, StreamID), EvHandlerState}. + {delete_stream(State, StreamID), EvHandlerState}; +%% @todo Set a timeout for closing the Websocket stream. +tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> + tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState); +%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.) +tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> + tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState). continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) -> case ContinueStreamRef of @@ -409,133 +432,212 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0, end, {maybe_delete_stream(State, StreamID, remote, IsFin), EvHandlerState}. -%% @todo Make separate functions for inform/connect/normal. -headers_frame(State=#http2_state{transport=Transport, opts=Opts, - tunnel_transport=TunnelTransport, content_handlers=Handlers0}, +headers_frame(State0=#http2_state{opts=Opts}, StreamID, IsFin, Headers, #{status := Status}, _BodyLen, CookieStore0, EvHandler, EvHandlerState0) -> - Stream = get_stream_by_id(State, StreamID), + Stream = get_stream_by_id(State0, StreamID), #stream{ - ref=StreamRef, - reply_to=ReplyTo, authority=Authority, path=Path, tunnel=Tunnel } = Stream, - CookieStore = gun_cookies:set_cookie_header(scheme(State), + CookieStore = gun_cookies:set_cookie_header(scheme(State0), Authority, Path, Status, Headers, CookieStore0, Opts), - RealStreamRef = stream_ref(State, StreamRef), - if + {State, EvHandlerState} = if Status >= 100, Status =< 199 -> - ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, - EvHandlerState = EvHandler:response_inform(#{ + headers_frame_inform(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); + Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested, IsFin =:= nofin -> + headers_frame_connect(State0, Stream, Status, Headers, EvHandler, EvHandlerState0); + true -> + headers_frame_response(State0, Stream, IsFin, Status, Headers, EvHandler, EvHandlerState0) + end, + {State, CookieStore, EvHandlerState}. + +headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, + Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {State, EvHandlerState}. + +headers_frame_connect(State0=#http2_state{http2_machine=HTTP2Machine0}, + Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, tunnel=#tunnel{ + info=#websocket_info{extensions=Extensions0, opts=WsOpts}}}, + Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = stream_ref(State0, StreamRef), + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + %% Websocket CONNECT response headers terminate the response but not the stream. + EvHandlerState = EvHandler:response_end(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, EvHandlerState1), + case gun_ws:select_extensions(Headers, Extensions0, WsOpts) of + close -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), + State1 = State0#http2_state{http2_machine=HTTP2Machine}, + State = reset_stream(State1, StreamID, {stream_error, cancel, + 'The sec-websocket-extensions header is invalid. (RFC6455 9.1, RFC7692 7)'}), + {State, EvHandlerState}; + Extensions -> + case gun_ws:select_protocol(Headers, WsOpts) of + close -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), + State1 = State0#http2_state{http2_machine=HTTP2Machine}, + State = reset_stream(State1, StreamID, {stream_error, cancel, + 'The sec-websocket-protocol header is invalid. (RFC6455 4.1)'}), + {State, EvHandlerState}; + Handler -> + headers_frame_connect_websocket(State0, Stream, Headers, + EvHandler, EvHandlerState, Extensions, Handler) + end + end; +headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_transport=TunnelTransport}, + Stream=#stream{ref=StreamRef, reply_to=ReplyTo, tunnel=Tunnel=#tunnel{ + destination=Destination=#{host := DestHost, port := DestPort}, info=TunnelInfo0}}, + Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = stream_ref(State, StreamRef), + TunnelInfo = TunnelInfo0#{ + origin_host => DestHost, + origin_port => DestPort + }, + ReplyTo ! {gun_response, self(), RealStreamRef, nofin, Status, Headers}, + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + EvHandlerState2 = EvHandler:origin_changed(#{ + stream_ref => RealStreamRef, + type => connect, + origin_scheme => case Destination of + #{transport := tls} -> <<"https">>; + _ -> <<"http">> + end, + origin_host => DestHost, + origin_port => DestPort + }, EvHandlerState1), + ContinueStreamRef = continue_stream_ref(State, StreamRef), + OriginSocket = #{ + gun_pid => self(), + reply_to => ReplyTo, + stream_ref => RealStreamRef, + handle_continue_stream_ref => ContinueStreamRef + }, + Proto = gun_tunnel, + ProtoOpts = case Destination of + #{transport := tls} -> + Protocols = maps:get(protocols, Destination, [http2, http]), + TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost), + HandshakeEvent = #{ stream_ref => RealStreamRef, reply_to => ReplyTo, - status => Status, - headers => Headers - }, EvHandlerState0), - {State, CookieStore, EvHandlerState}; - Status >= 200, Status =< 299, element(#tunnel.state, Tunnel) =:= requested -> - #tunnel{destination=Destination, info=TunnelInfo0} = Tunnel, - #{host := DestHost, port := DestPort} = Destination, - TunnelInfo = TunnelInfo0#{ - origin_host => DestHost, - origin_port => DestPort + tls_opts => TLSOpts, + timeout => maps:get(tls_handshake_timeout, Destination, infinity) }, - ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, - EvHandlerState1 = EvHandler:response_headers(#{ + Opts#{ stream_ref => RealStreamRef, - reply_to => ReplyTo, - status => Status, - headers => Headers - }, EvHandlerState0), - EvHandlerState2 = EvHandler:origin_changed(#{ - stream_ref => RealStreamRef, - type => connect, - origin_scheme => case Destination of - #{transport := tls} -> <<"https">>; - _ -> <<"http">> - end, - origin_host => DestHost, - origin_port => DestPort - }, EvHandlerState1), - ContinueStreamRef = continue_stream_ref(State, StreamRef), - OriginSocket = #{ - gun_pid => self(), - reply_to => ReplyTo, + tunnel => #{ + type => connect, + transport_name => case TunnelTransport of + undefined -> Transport:name(); + _ -> TunnelTransport + end, + protocol_name => http2, + info => TunnelInfo, + handshake_event => HandshakeEvent, + protocols => Protocols + } + }; + _ -> + [NewProtocol] = maps:get(protocols, Destination, [http]), + Opts#{ stream_ref => RealStreamRef, - handle_continue_stream_ref => ContinueStreamRef - }, - Proto = gun_tunnel, - ProtoOpts = case Destination of - #{transport := tls} -> - Protocols = maps:get(protocols, Destination, [http2, http]), - TLSOpts = gun:ensure_alpn_sni(Protocols, maps:get(tls_opts, Destination, []), DestHost), - HandshakeEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo, - tls_opts => TLSOpts, - timeout => maps:get(tls_handshake_timeout, Destination, infinity) - }, - Opts#{ - stream_ref => RealStreamRef, - tunnel => #{ - type => connect, - transport_name => case TunnelTransport of - undefined -> Transport:name(); - _ -> TunnelTransport - end, - protocol_name => http2, - info => TunnelInfo, - handshake_event => HandshakeEvent, - protocols => Protocols - } - }; - _ -> - [NewProtocol] = maps:get(protocols, Destination, [http]), - Opts#{ - stream_ref => RealStreamRef, - tunnel => #{ - type => connect, - transport_name => case TunnelTransport of - undefined -> Transport:name(); - _ -> TunnelTransport - end, - protocol_name => http2, - info => TunnelInfo, - new_protocol => NewProtocol - } - } - end, - {tunnel, ProtoState, EvHandlerState} = Proto:init( - ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{ - info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), - CookieStore, EvHandlerState}; - true -> - ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, - EvHandlerState1 = EvHandler:response_headers(#{ + tunnel => #{ + type => connect, + transport_name => case TunnelTransport of + undefined -> Transport:name(); + _ -> TunnelTransport + end, + protocol_name => http2, + info => TunnelInfo, + new_protocol => NewProtocol + } + } + end, + {tunnel, ProtoState, EvHandlerState} = Proto:init( + ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState2), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, + info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}}), + EvHandlerState}. + +headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, + tunnel=Tunnel=#tunnel{info=#websocket_info{opts=WsOpts}}}, + Headers, EvHandler, EvHandlerState0, Extensions, Handler) -> + RealStreamRef = stream_ref(State, StreamRef), + ContinueStreamRef = continue_stream_ref(State, StreamRef), + OriginSocket = #{ + gun_pid => self(), + reply_to => ReplyTo, + stream_ref => RealStreamRef, + handle_continue_stream_ref => ContinueStreamRef + }, + ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, + Proto = gun_ws, + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => RealStreamRef, + protocol => Proto:name() + }, EvHandlerState0), + ProtoOpts = #{ + stream_ref => RealStreamRef, + headers => Headers, + extensions => Extensions, + flow => maps:get(flow, WsOpts, infinity), + handler => Handler, + opts => WsOpts + }, + {connected_ws_only, ProtoState} = Proto:init( + ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts), + {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, + protocol=Proto, protocol_state=ProtoState}}), + EvHandlerState}. + +headers_frame_response(State=#http2_state{content_handlers=Handlers0}, + Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, + IsFin, Status, Headers, EvHandler, EvHandlerState0) -> + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {Handlers, EvHandlerState} = case IsFin of + fin -> + EvHandlerState2 = EvHandler:response_end(#{ stream_ref => RealStreamRef, - reply_to => ReplyTo, - status => Status, - headers => Headers - }, EvHandlerState0), - {Handlers, EvHandlerState} = case IsFin of - fin -> - EvHandlerState2 = EvHandler:response_end(#{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, EvHandlerState1), - {undefined, EvHandlerState2}; - nofin -> - {gun_content_handler:init(ReplyTo, RealStreamRef, - Status, Headers, Handlers0), EvHandlerState1} - end, - %% @todo Disable the tunnel if any. - {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), - StreamID, remote, IsFin), - CookieStore, EvHandlerState} - end. + reply_to => ReplyTo + }, EvHandlerState1), + {undefined, EvHandlerState2}; + nofin -> + {gun_content_handler:init(ReplyTo, RealStreamRef, + Status, Headers, Handlers0), EvHandlerState1} + end, + %% We disable the tunnel, if any, when receiving any non 2xx response. + {maybe_delete_stream(store_stream(State, + Stream#stream{handler_state=Handlers, tunnel=undefined}), + StreamID, remote, IsFin), EvHandlerState}. trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), @@ -1195,7 +1297,58 @@ stream_info(State, RealStreamRef=[StreamRef|_]) -> down(#http2_state{stream_refs=Refs}) -> maps:keys(Refs). -%% Websocket upgrades are currently only accepted when tunneled. +ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, + Host, Port, Path, Headers0, WsOpts, + CookieStore0, EvHandler, EvHandlerState0) + when is_reference(StreamRef) -> + {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( + <<"CONNECT">>, HTTP2Machine0), + {ok, PseudoHeaders, Headers1, CookieStore} = prepare_headers(State, + <<"CONNECT">>, Host, Port, Path, Headers0, CookieStore0), + {Headers2, GunExtensions} = case maps:get(compress, WsOpts, false) of + true -> + {[{<<"sec-websocket-extensions">>, + <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} + |Headers1], [<<"permessage-deflate">>]}; + false -> + {Headers1, []} + end, + Headers3 = case maps:get(protocols, WsOpts, []) of + [] -> + Headers2; + ProtoOpt -> + << _, _, Proto/bits >> = iolist_to_binary([[<<", ">>, P] || {P, _} <- ProtoOpt]), + [{<<"sec-websocket-protocol">>, Proto}|Headers2] + end, + Headers = [{<<"sec-websocket-version">>, <<"13">>}|Headers3], + Authority = maps:get(authority, PseudoHeaders), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => <<"CONNECT">>, + authority => Authority, + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( + StreamID, HTTP2Machine1, nofin, PseudoHeaders#{protocol => <<"websocket">>}, Headers), + Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), + InitialFlow = maps:get(flow, WsOpts, infinity), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, + authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{ + extensions=GunExtensions, opts=WsOpts}}}, + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + CookieStore, EvHandlerState}; ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of @@ -1210,7 +1363,11 @@ ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, %% @todo Error conditions? end. -ws_send(Frames, State0, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> +ws_send(Frames, State0, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) -> + StreamRef = case RealStreamRef of + [SR|_] -> SR; + _ -> RealStreamRef + end, case get_stream_by_ref(State0, StreamRef) of Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState}} -> {Commands, EvHandlerState1} = Proto:ws_send(Frames, ProtoState, diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index 7c29684..2594d24 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -340,12 +340,12 @@ connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, EvHandler, EvHandlerState0), {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. -cancel(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, +cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> - StreamRef = maybe_dereference(State0, StreamRef0), - {Commands, EvHandlerState1} = Proto:cancel(ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), - {{state, State}, EvHandlerState}. + StreamRef = maybe_dereference(State, StreamRef0), + {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, StreamRef, + ReplyTo, EvHandler, EvHandlerState0), + {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) -> case Proto:timeout(ProtoState0, Msg, TRef) of diff --git a/src/gun_ws.erl b/src/gun_ws.erl index f413f94..a1fdfae 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -15,12 +15,15 @@ -module(gun_ws). -export([check_options/1]). +-export([select_extensions/3]). +-export([select_protocol/2]). -export([name/0]). -export([opts_name/0]). -export([has_keepalive/0]). -export([default_keepalive/0]). -export([init/4]). -export([handle/5]). +-export([handle_continue/6]). -export([update_flow/4]). -export([closing/4]). -export([close/4]). @@ -89,6 +92,47 @@ do_check_options([{user_opts, _}|Opts]) -> do_check_options([Opt|_]) -> {error, {options, {ws, Opt}}}. +select_extensions(Headers, Extensions0, Opts) -> + case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of + false -> + #{}; + {_, ExtHd} -> + ParsedExtHd = cow_http_hd:parse_sec_websocket_extensions(ExtHd), + validate_extensions(ParsedExtHd, Extensions0, Opts, #{}) + end. + +validate_extensions([], _, _, Acc) -> + Acc; +validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], Extensions, Opts, Acc0) -> + case lists:member(Name, Extensions) of + true -> + case cow_ws:validate_permessage_deflate(Params, Acc0, Opts) of + {ok, Acc} -> validate_extensions(Tail, Extensions, Opts, Acc); + error -> close + end; + %% Fail the connection if extension was not requested. + false -> + close + end; +%% Fail the connection on unknown extension. +validate_extensions(_, _, _, _) -> + close. + +%% @todo Validate protocols. +select_protocol(Headers, Opts) -> + case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of + false -> + maps:get(default_protocol, Opts, gun_ws_h); + {_, Proto} -> + ProtoOpt = maps:get(protocols, Opts, []), + case lists:keyfind(Proto, 1, ProtoOpt) of + {_, Handler} -> + Handler; + false -> + close + end + end. + name() -> ws. opts_name() -> ws_opts. has_keepalive() -> true. @@ -176,6 +220,11 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke closing(Error, State, EvHandler, EvHandlerState) end. +handle_continue(ContinueStreamRef, {data, _ReplyTo, _StreamRef, IsFin, Data}, + #ws_state{}, CookieStore, _EvHandler, EvHandlerState) + when is_reference(ContinueStreamRef) -> + {{send, IsFin, Data}, CookieStore, EvHandlerState}. + maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> {[ {state, State}, diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl index d883dc5..8be10b1 100644 --- a/test/event_SUITE.erl +++ b/test/event_SUITE.erl @@ -34,25 +34,25 @@ groups() -> HTTP1Tests = [T || T <- Tests, lists:sublist(atom_to_list(T), 6) =:= "http1_"], %% Push is not possible over HTTP/1.1. PushTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 5) =:= "push_"], - %% We currently do not support Websocket over HTTP/2. - WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"], [ {http, [parallel], Tests -- [cancel_remote, cancel_remote_connect|PushTests]}, - {http2, [parallel], (Tests -- WsTests) -- HTTP1Tests} + {http2, [parallel], Tests -- HTTP1Tests} ]. init_per_suite(Config) -> - ProtoOpts = #{env => #{ - dispatch => cowboy_router:compile([{'_', [ - {"/", hello_h, []}, - {"/empty", empty_h, []}, - {"/inform", inform_h, []}, - {"/push", push_h, []}, - {"/stream", stream_h, []}, - {"/trailers", trailers_h, []}, - {"/ws", ws_echo_h, []} - ]}]) - }}, + Routes = [ + {"/", hello_h, []}, + {"/empty", empty_h, []}, + {"/inform", inform_h, []}, + {"/push", push_h, []}, + {"/stream", stream_h, []}, + {"/trailers", trailers_h, []}, + {"/ws", ws_echo_h, []} + ], + ProtoOpts = #{ + enable_connect_protocol => true, + env => #{dispatch => cowboy_router:compile([{'_', Routes}])} + }, {ok, _} = cowboy:start_clear({?MODULE, tcp}, [], ProtoOpts), TCPOriginPort = ranch:get_port({?MODULE, tcp}), {ok, _} = cowboy:start_tls({?MODULE, tls}, ct_helper:get_certs_from_ets(), ProtoOpts), @@ -1227,8 +1227,10 @@ http1_response_end_body_close(Config) -> ws_upgrade(Config) -> doc("Confirm that the ws_upgrade event callback is called."), + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), StreamRef = gun:ws_upgrade(Pid, "/ws"), ReplyTo = self(), #{ @@ -1258,11 +1260,15 @@ do_ws_upgrade_connect(Config, ProxyProtocol) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), #{ stream_ref := StreamRef2, @@ -1273,8 +1279,10 @@ do_ws_upgrade_connect(Config, ProxyProtocol) -> ws_upgrade_all_events(Config) -> doc("Confirm that a Websocket upgrade triggers all relevant events."), + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, OriginPort} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), StreamRef = gun:ws_upgrade(Pid, "/ws"), ReplyTo = self(), #{ @@ -1283,11 +1291,15 @@ ws_upgrade_all_events(Config) -> opts := #{} } = do_receive_event(ws_upgrade), Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), + Method = case Protocol of + http -> <<"GET">>; + http2 -> <<"CONNECT">> + end, #{ stream_ref := StreamRef, reply_to := ReplyTo, function := ws_upgrade, - method := <<"GET">>, + method := Method, authority := EventAuthority1, path := "/ws", headers := [_|_] @@ -1297,7 +1309,7 @@ ws_upgrade_all_events(Config) -> stream_ref := StreamRef, reply_to := ReplyTo, function := ws_upgrade, - method := <<"GET">>, + method := Method, authority := EventAuthority2, path := "/ws", headers := [_|_] @@ -1311,12 +1323,26 @@ ws_upgrade_all_events(Config) -> stream_ref := StreamRef, reply_to := ReplyTo } = do_receive_event(response_start), - #{ - stream_ref := StreamRef, - reply_to := ReplyTo, - status := 101, - headers := [_|_] - } = do_receive_event(response_inform), + _ = case Protocol of + http -> + #{ + stream_ref := StreamRef, + reply_to := ReplyTo, + status := 101, + headers := [_|_] + } = do_receive_event(response_inform); + http2 -> + #{ + stream_ref := StreamRef, + reply_to := ReplyTo, + status := 200, + headers := [_|_] + } = do_receive_event(response_headers), + #{ + stream_ref := StreamRef, + reply_to := ReplyTo + } = do_receive_event(response_end) + end, #{ stream_ref := StreamRef, protocol := ws @@ -1343,16 +1369,27 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), %% Skip all CONNECT-related events that may conflict. _ = do_receive_event(request_start), _ = do_receive_event(request_headers), _ = do_receive_event(request_end), _ = do_receive_event(response_start), + case OriginProtocol of + http -> ok; + http2 -> + _ = do_receive_event(response_headers), +% _ = do_receive_event(response_end), @todo Probably should response_end CONNECT responses for both protocols. + ok + end, _ = do_receive_event(protocol_changed), %% Check the Websocket events. StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), @@ -1362,11 +1399,15 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) -> opts := #{} } = do_receive_event(ws_upgrade), Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]), + Method = case OriginProtocol of + http -> <<"GET">>; + http2 -> <<"CONNECT">> + end, #{ stream_ref := StreamRef2, reply_to := ReplyTo, function := ws_upgrade, - method := <<"GET">>, + method := Method, authority := EventAuthority1, path := "/ws", headers := [_|_] @@ -1376,7 +1417,7 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) -> stream_ref := StreamRef2, reply_to := ReplyTo, function := ws_upgrade, - method := <<"GET">>, + method := Method, authority := EventAuthority2, path := "/ws", headers := [_|_] @@ -1390,12 +1431,26 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) -> stream_ref := StreamRef2, reply_to := ReplyTo } = do_receive_event(response_start), - #{ - stream_ref := StreamRef2, - reply_to := ReplyTo, - status := 101, - headers := [_|_] - } = do_receive_event(response_inform), + _ = case OriginProtocol of + http -> + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + status := 101, + headers := [_|_] + } = do_receive_event(response_inform); + http2 -> + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo, + status := 200, + headers := [_|_] + } = do_receive_event(response_headers), + #{ + stream_ref := StreamRef2, + reply_to := ReplyTo + } = do_receive_event(response_end) + end, #{ stream_ref := StreamRef2, protocol := ws @@ -1406,11 +1461,13 @@ do_ws_upgrade_all_events_connect(Config, ProxyProtocol) -> ws_recv_frame_start(Config) -> doc("Confirm that the ws_recv_frame_start event callback is called."), + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), StreamRef = gun:ws_upgrade(Pid, "/ws"), {upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), - gun:ws_send(Pid, {text, <<"Hello!">>}), + gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}), ReplyTo = self(), #{ stream_ref := StreamRef, @@ -1440,11 +1497,15 @@ do_ws_recv_frame_start_connect(Config, ProxyProtocol) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1458,11 +1519,13 @@ do_ws_recv_frame_start_connect(Config, ProxyProtocol) -> ws_recv_frame_header(Config) -> doc("Confirm that the ws_recv_frame_header event callback is called."), + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), StreamRef = gun:ws_upgrade(Pid, "/ws"), {upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), - gun:ws_send(Pid, {text, <<"Hello!">>}), + gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}), ReplyTo = self(), #{ stream_ref := StreamRef, @@ -1496,11 +1559,15 @@ do_ws_recv_frame_header_connect(Config, ProxyProtocol) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1518,11 +1585,13 @@ do_ws_recv_frame_header_connect(Config, ProxyProtocol) -> ws_recv_frame_end(Config) -> doc("Confirm that the ws_recv_frame_end event callback is called."), + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), StreamRef = gun:ws_upgrade(Pid, "/ws"), {upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), - gun:ws_send(Pid, {text, <<"Hello!">>}), + gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}), ReplyTo = self(), #{ stream_ref := StreamRef, @@ -1553,11 +1622,15 @@ do_ws_recv_frame_end_connect(Config, ProxyProtocol) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1581,11 +1654,13 @@ ws_send_frame_end(Config) -> do_ws_send_frame(Config, ?FUNCTION_NAME). do_ws_send_frame(Config, EventName) -> + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), StreamRef = gun:ws_upgrade(Pid, "/ws"), {upgrade, [<<"websocket">>], _} = gun:await(Pid, StreamRef), - gun:ws_send(Pid, {text, <<"Hello!">>}), + gun:ws_send(Pid, StreamRef, {text, <<"Hello!">>}), ReplyTo = self(), #{ stream_ref := StreamRef, @@ -1621,11 +1696,15 @@ do_ws_send_frame_connect(Config, ProxyProtocol, EventName) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), StreamRef2 = gun:ws_upgrade(ConnPid, "/ws", [], #{tunnel => StreamRef1}), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef2), gun:ws_send(ConnPid, StreamRef2, {text, <<"Hello!">>}), @@ -1641,8 +1720,10 @@ do_ws_send_frame_connect(Config, ProxyProtocol, EventName) -> ws_protocol_changed(Config) -> doc("Confirm that the protocol_changed event callback is called on Websocket upgrade success."), + Protocol = config(name, config(tc_group_properties, Config)), {ok, Pid, _} = do_gun_open(Config), - {ok, _} = gun:await_up(Pid), + {ok, Protocol} = gun:await_up(Pid), + ws_SUITE:do_await_enable_connect_protocol(Protocol, Pid), _ = gun:ws_upgrade(Pid, "/ws"), #{ protocol := ws @@ -1668,11 +1749,15 @@ do_ws_protocol_changed_connect(Config, ProxyProtocol) -> StreamRef1 = gun:connect(ConnPid, #{ host => "localhost", port => OriginPort, - protocols => [OriginProtocol] + protocols => [case OriginProtocol of + http -> http; + http2 -> {http2, #{notify_settings_changed => true}} + end] }, []), %% @todo _IsFin is 'fin' for HTTP and 'nofin' for HTTP/2... {response, _IsFin, 200, _} = gun:await(ConnPid, StreamRef1), {up, OriginProtocol} = gun:await(ConnPid, StreamRef1), + ws_SUITE:do_await_enable_connect_protocol(OriginProtocol, ConnPid), #{ stream_ref := StreamRef1, protocol := OriginProtocol @@ -1951,6 +2036,7 @@ do_gun_open(Config) -> do_gun_open(OriginPort, Config) -> Opts = #{ event_handler => {?MODULE, self()}, + http2_opts => #{notify_settings_changed => true}, protocols => [config(name, config(tc_group_properties, Config))] }, {ok, Pid} = gun:open("localhost", OriginPort, Opts), @@ -1960,6 +2046,7 @@ do_gun_open_tls(Config) -> OriginPort = config(tls_origin_port, Config), Opts = #{ event_handler => {?MODULE, self()}, + http2_opts => #{notify_settings_changed => true}, protocols => [config(name, config(tc_group_properties, Config))], transport => tls }, diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index c04acc2..201403e 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -22,19 +22,30 @@ %% ct. all() -> - [{group, ws}]. + [{group, http}, {group, http2}]. groups() -> - [{ws, [], ct_helper:all(?MODULE)}]. + Tests = ct_helper:all(?MODULE), + HTTP1Tests = [ + http10_upgrade_error, + http11_request_error, + http11_keepalive, + http11_keepalive_default_silence_pings + ], + [ + {http, [], Tests}, + {http2, [], Tests -- HTTP1Tests} + ]. init_per_suite(Config) -> Routes = [ {"/", ws_echo_h, []}, {"/reject", ws_reject_h, []} ], - {ok, _} = cowboy:start_clear(ws, [], #{env => #{ - dispatch => cowboy_router:compile([{'_', Routes}]) - }}), + {ok, _} = cowboy:start_clear(ws, [], #{ + enable_connect_protocol => true, + env => #{dispatch => cowboy_router:compile([{'_', Routes}])} + }), Port = ranch:get_port(ws), [{port, Port}|Config]. @@ -45,16 +56,37 @@ end_per_suite(_) -> await(Config) -> doc("Ensure gun:await/2 can be used to receive Websocket frames."), - {ok, ConnPid} = gun:open("localhost", config(port, Config)), - {ok, _} = gun:await_up(ConnPid), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + protocols => [Protocol], + http2_opts => #{notify_settings_changed => true} + }), + {ok, Protocol} = gun:await_up(ConnPid), + do_await_enable_connect_protocol(Protocol, ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/", []), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), Frame = {text, <<"Hello!">>}, - gun:ws_send(ConnPid, Frame), + gun:ws_send(ConnPid, StreamRef, Frame), {ws, Frame} = gun:await(ConnPid, StreamRef), gun:close(ConnPid). -error_http10_upgrade(Config) -> +headers_normalized_upgrade(Config) -> + doc("Headers passed to ws_upgrade are normalized before being used."), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + protocols => [Protocol], + http2_opts => #{notify_settings_changed => true} + }), + {ok, Protocol} = gun:await_up(ConnPid), + do_await_enable_connect_protocol(Protocol, ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", #{ + atom_header_name => <<"value">>, + "string_header_name" => <<"value">> + }), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid). + +http10_upgrade_error(Config) -> doc("Attempting to upgrade HTTP/1.0 to Websocket produces an error."), {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ http_opts => #{version => 'HTTP/1.0'} @@ -70,28 +102,7 @@ error_http10_upgrade(Config) -> error(timeout) end. -headers_normalized_upgrade(Config) -> - doc("Headers passed to ws_upgrade are normalized before being used."), - {ok, ConnPid} = gun:open("localhost", config(port, Config)), - {ok, _} = gun:await_up(ConnPid), - StreamRef = gun:ws_upgrade(ConnPid, "/", #{ - atom_header_name => <<"value">>, - "string_header_name" => <<"value">> - }), - {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), - gun:close(ConnPid). - -error_http_request(Config) -> - doc("Ensure that requests are rejected while using Websocket."), - {ok, ConnPid} = gun:open("localhost", config(port, Config)), - {ok, _} = gun:await_up(ConnPid), - StreamRef1 = gun:ws_upgrade(ConnPid, "/", []), - {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef1), - StreamRef2 = gun:get(ConnPid, "/"), - {error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2), - gun:close(ConnPid). - -keepalive(Config) -> +http11_keepalive(Config) -> doc("Ensure that Gun automatically sends ping frames."), {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ ws_opts => #{ @@ -106,7 +117,7 @@ keepalive(Config) -> {ws, pong} = gun:await(ConnPid, StreamRef), gun:close(ConnPid). -keepalive_default_silence_pings(Config) -> +http11_keepalive_default_silence_pings(Config) -> doc("Ensure that Gun does not forward ping/pong by default."), {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ ws_opts => #{keepalive => 100} @@ -118,10 +129,25 @@ keepalive_default_silence_pings(Config) -> {error, timeout} = gun:await(ConnPid, StreamRef, 1000), gun:close(ConnPid). -reject_upgrade(Config) -> - doc("Ensure Websocket connections can be rejected."), +http11_request_error(Config) -> + doc("Ensure that HTTP/1.1 requests are rejected while using Websocket."), {ok, ConnPid} = gun:open("localhost", config(port, Config)), {ok, _} = gun:await_up(ConnPid), + StreamRef1 = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef1), + StreamRef2 = gun:get(ConnPid, "/"), + {error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2), + gun:close(ConnPid). + +reject_upgrade(Config) -> + doc("Ensure Websocket connections can be rejected."), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + protocols => [Protocol], + http2_opts => #{notify_settings_changed => true} + }), + {ok, Protocol} = gun:await_up(ConnPid), + do_await_enable_connect_protocol(Protocol, ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/reject", []), receive {gun_response, ConnPid, StreamRef, nofin, 400, _} -> @@ -134,7 +160,7 @@ reject_upgrade(Config) -> end. reply_to(Config) -> - doc("Ensure we can send a list of frames in one gun:ws_send call."), + doc("Ensure the reply_to request option is respected."), Self = self(), Frame = {text, <<"Hello!">>}, ReplyTo = spawn(fun() -> @@ -144,36 +170,61 @@ reply_to(Config) -> {ws, Frame} = gun:await(ConnPid, StreamRef), Self ! {self(), ok} end), - {ok, ConnPid} = gun:open("localhost", config(port, Config)), - {ok, _} = gun:await_up(ConnPid), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + protocols => [Protocol], + http2_opts => #{notify_settings_changed => true} + }), + {ok, Protocol} = gun:await_up(ConnPid), + do_await_enable_connect_protocol(Protocol, ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/", [], #{reply_to => ReplyTo}), ReplyTo ! {ConnPid, StreamRef}, - receive {ReplyTo, ready} -> gun:ws_send(ConnPid, Frame) after 1000 -> error(timeout) end, + receive {ReplyTo, ready} -> gun:ws_send(ConnPid, StreamRef, Frame) after 1000 -> error(timeout) end, receive {ReplyTo, ok} -> gun:close(ConnPid) after 1000 -> error(timeout) end. send_many(Config) -> doc("Ensure we can send a list of frames in one gun:ws_send call."), - {ok, ConnPid} = gun:open("localhost", config(port, Config)), - {ok, _} = gun:await_up(ConnPid), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + protocols => [Protocol], + http2_opts => #{notify_settings_changed => true} + }), + {ok, Protocol} = gun:await_up(ConnPid), + do_await_enable_connect_protocol(Protocol, ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/", []), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), Frame1 = {text, <<"Hello!">>}, Frame2 = {binary, <<"World!">>}, - gun:ws_send(ConnPid, [Frame1, Frame2]), + gun:ws_send(ConnPid, StreamRef, [Frame1, Frame2]), {ws, Frame1} = gun:await(ConnPid, StreamRef), {ws, Frame2} = gun:await(ConnPid, StreamRef), gun:close(ConnPid). send_many_close(Config) -> doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."), - {ok, ConnPid} = gun:open("localhost", config(port, Config)), - {ok, _} = gun:await_up(ConnPid), + Protocol = config(name, config(tc_group_properties, Config)), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + protocols => [Protocol], + http2_opts => #{notify_settings_changed => true} + }), + {ok, Protocol} = gun:await_up(ConnPid), + do_await_enable_connect_protocol(Protocol, ConnPid), StreamRef = gun:ws_upgrade(ConnPid, "/", []), {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), Frame1 = {text, <<"Hello!">>}, Frame2 = {binary, <<"World!">>}, - gun:ws_send(ConnPid, [Frame1, Frame2, close]), + gun:ws_send(ConnPid, StreamRef, [Frame1, Frame2, close]), {ws, Frame1} = gun:await(ConnPid, StreamRef), {ws, Frame2} = gun:await(ConnPid, StreamRef), {ws, close} = gun:await(ConnPid, StreamRef), gun:close(ConnPid). + +%% Internal. + +do_await_enable_connect_protocol(http, _) -> + ok; +%% We cannot do a CONNECT :protocol request until the server tells us we can. +do_await_enable_connect_protocol(http2, ConnPid) -> + {notify, settings_changed, #{enable_connect_protocol := true}} + = gun:await(ConnPid, undefined), %% @todo Maybe have a gun:await/1? + ok. -- cgit v1.2.3