From d9a970be90d0105af215531d74809878f9c21338 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 24 Sep 2019 19:18:35 +0200 Subject: Add auto-ping to Websocket and a silence_pings option The auto-ping will at regular interval send a ping frame. The silence_pings option defaults to true. It can be set to false when the user needs to receive ping/pong frames. --- doc/src/guide/websocket.asciidoc | 8 ------- doc/src/manual/gun.asciidoc | 15 ++++++++++++- doc/src/manual/gun_ws.asciidoc | 5 ++++- src/gun.erl | 15 ++++++++----- src/gun_http.erl | 14 ++++++------ src/gun_http2.erl | 6 ++--- src/gun_ws.erl | 47 +++++++++++++++++++++++++--------------- src/gun_ws_h.erl | 11 +++++++--- test/ws_SUITE.erl | 27 +++++++++++++++++++++++ 9 files changed, 102 insertions(+), 46 deletions(-) diff --git a/doc/src/guide/websocket.asciidoc b/doc/src/guide/websocket.asciidoc index 662b9ea..287b3f7 100644 --- a/doc/src/guide/websocket.asciidoc +++ b/doc/src/guide/websocket.asciidoc @@ -122,11 +122,3 @@ receive handle_frame(ConnPid, StreamRef, Frame) end. ---- - -// @todo auto ping has not been implemented yet -// -//Gun will automatically send ping messages to the server to keep -//the connection alive, however if the connection dies and Gun has -//to reconnect it will not upgrade to Websocket automatically, you -//need to perform the operation when you receive the `gun_error` -//message. diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index 7b54666..478d40b 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -459,7 +459,9 @@ ws_opts() :: #{ closing_timeout => timeout(), compress => boolean(), flow => pos_integer(), - protocols => [{binary(), module()}] + keepalive => timeout(), + protocols => [{binary(), module()}], + silence_pings => boolean() } ---- @@ -484,6 +486,10 @@ flow - see below:: The initial flow control value for the Websocket connection. By default flow control is disabled. +keepalive (5000):: + +Time between pings in milliseconds. + protocols ([]):: A non-empty list enables Websocket protocol negotiation. The @@ -491,6 +497,12 @@ list of protocols will be sent in the sec-websocket-protocol request header. The handler module interface is currently undocumented and must be set to `gun_ws_h`. +silence_pings (true):: + +Whether the ping and pong frames should be sent to the user. +In all cases Gun will automatically send a pong frame back +when receiving a ping. + // @todo Document default_protocol and user_opts. == Changelog @@ -517,6 +529,7 @@ undocumented and must be set to `gun_ws_h`. * *2.0*: Function `gun:headers/4,5` introduced. * *2.0*: The `keepalive` option is now set to `infinity` by default for the HTTP/1.1 protocol, disabling it. +* *2.0*: Websocket options `keepalive` and `silence_pings` introduced. * *1.3*: Add the CONNECT destination's `protocols` option and deprecate the previously introduced `protocol` option. * *1.2*: Introduce the type `connect_destination()`. diff --git a/doc/src/manual/gun_ws.asciidoc b/doc/src/manual/gun_ws.asciidoc index 127f2a2..374b0b3 100644 --- a/doc/src/manual/gun_ws.asciidoc +++ b/doc/src/manual/gun_ws.asciidoc @@ -12,9 +12,10 @@ gun_ws - Websocket frame ConnPid :: pid() StreamRef :: reference() -Frame :: close +Frame :: close | ping | pong | {text | binary | close, binary()} | {close, non_neg_integer(), binary()} + | {ping | pong, binary()} ---- Websocket frame. @@ -41,6 +42,8 @@ The Websocket frame in question. == Changelog +* *2.0*: Depending on the option `silence_pings`, ping and + pong frames may be sent as well. * *1.0*: Message introduced. == Examples diff --git a/src/gun.erl b/src/gun.erl index ab26dbf..12f4319 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -222,12 +222,13 @@ }. -export_type([socks_opts/0]). -%% @todo keepalive -type ws_opts() :: #{ closing_timeout => timeout(), compress => boolean(), flow => pos_integer(), - protocols => [{binary(), module()}] + keepalive => timeout(), + protocols => [{binary(), module()}], + silence_pings => boolean() }. -export_type([ws_opts/0]). @@ -602,7 +603,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> | {trailers, resp_headers()} | {push, reference(), binary(), binary(), resp_headers()} | {upgrade, [binary()], resp_headers()} - | {ws, ws_frame()} %% @todo Excluding ping/pong, for now. + | {ws, ws_frame()} | {error, {stream_error | connection_error | down, any()} | timeout}. -spec await(pid(), reference()) -> await_result(). @@ -1225,9 +1226,11 @@ handle_common_connected_no_input(info, {Error, Socket, Reason}, _, %% We should have a timeout function in protocols that deal with %% received timeouts. Currently the timeout messages are ignored. handle_common_connected_no_input(info, keepalive, _, - State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:keepalive(ProtoState), - {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; + State=#state{protocol=Protocol, protocol_state=ProtoState0, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0), + {keep_state, keepalive_timeout(State#state{ + protocol_state=ProtoState, event_handler_state=EvHandlerState})}; handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{protocol=Protocol, protocol_state=ProtoState}) -> Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), diff --git a/src/gun_http.erl b/src/gun_http.erl index 87b50c8..f27563e 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -25,7 +25,7 @@ -export([update_flow/4]). -export([closing/4]). -export([close/4]). --export([keepalive/1]). +-export([keepalive/3]). -export([headers/11]). -export([request/12]). -export([data/7]). @@ -473,14 +473,14 @@ close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> close_streams(Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. -keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) -> - State; +keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) -> + {State, EvHandlerState}; %% We can only keep-alive by sending an empty line in-between streams. -keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> +keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) -> Transport:send(Socket, <<"\r\n">>), - State; -keepalive(State) -> - State. + {State, EvHandlerState}; +keepalive(State, _, EvHandlerState) -> + {State, EvHandlerState}. headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 7041ad9..e6f09ea 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -25,7 +25,7 @@ -export([update_flow/4]). -export([closing/4]). -export([close/4]). --export([keepalive/1]). +-export([keepalive/3]). -export([headers/11]). -export([request/12]). -export([data/7]). @@ -512,9 +512,9 @@ close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> ReplyTo ! {gun_error, self(), StreamRef, Reason}, ok. -keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> +keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> Transport:send(Socket, cow_http2:ping(0)), - State. + {State, EvHandlerState}. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, diff --git a/src/gun_ws.erl b/src/gun_ws.erl index c4eefaf..ba61577 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -18,11 +18,13 @@ -export([name/0]). -export([opts_name/0]). -export([has_keepalive/0]). +-export([default_keepalive/0]). -export([init/4]). -export([handle/4]). -export([update_flow/4]). -export([closing/4]). -export([close/4]). +-export([keepalive/3]). -export([send/4]). -export([down/1]). @@ -68,11 +70,17 @@ do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> do_check_options(Opts); do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> do_check_options(Opts); +do_check_options([{keepalive, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> + do_check_options(Opts); do_check_options([Opt={protocols, L}|Opts]) when is_list(L) -> case lists:usort(lists:flatten([[is_binary(B), is_atom(M)] || {B, M} <- L])) of [true] -> do_check_options(Opts); _ -> {error, {options, {ws, Opt}}} end; +do_check_options([{silence_pings, B}|Opts]) when B =:= true; B =:= false -> + do_check_options(Opts); do_check_options([{user_opts, _}|Opts]) -> do_check_options(Opts); do_check_options([Opt|_]) -> @@ -80,7 +88,8 @@ do_check_options([Opt|_]) -> name() -> ws. opts_name() -> ws_opts. -has_keepalive() -> false. +has_keepalive() -> true. +default_keepalive() -> 5000. init(Owner, Socket, Transport, #{stream_ref := StreamRef, headers := Headers, extensions := Extensions, flow := InitialFlow, handler := Handler, opts := Opts}) -> @@ -178,16 +187,6 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, payload => Payload }, EvHandlerState0), case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of - ping -> - {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), - handle(Rest, State0, EvHandler, EvHandlerState); - {ping, Payload} -> - {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), - handle(Rest, State0, EvHandler, EvHandlerState); - pong -> - handle(Rest, State0, EvHandler, EvHandlerState1); - {pong, _} -> - handle(Rest, State0, EvHandler, EvHandlerState1); Frame -> {ok, Dec, HandlerState} = Handler:handle(Frame, HandlerState0), Flow = case Flow0 of @@ -195,13 +194,23 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, _ -> Flow0 - Dec end, State1 = State0#ws_state{flow=Flow, handler_state=HandlerState}, - State = case Frame of - close -> State1#ws_state{in=close}; - {close, _, _} -> State1#ws_state{in=close}; - {fragment, fin, _, _} -> State1#ws_state{frag_state=undefined}; - _ -> State1 + {State, EvHandlerState} = case Frame of + ping -> + {[], EvHandlerState2} = send(pong, State1, EvHandler, EvHandlerState1), + {State1, EvHandlerState2}; + {ping, Payload} -> + {[], EvHandlerState2} = send({pong, Payload}, State1, EvHandler, EvHandlerState1), + {State1, EvHandlerState2}; + close -> + {State1#ws_state{in=close}, EvHandlerState1}; + {close, _, _} -> + {State1#ws_state{in=close}, EvHandlerState1}; + {fragment, fin, _, _} -> + {State1#ws_state{frag_state=undefined}, EvHandlerState1}; + _ -> + {State1, EvHandlerState1} end, - handle(Rest, State, EvHandler, EvHandlerState1) + handle(Rest, State, EvHandler, EvHandlerState) end. update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> @@ -234,6 +243,10 @@ closing(#ws_state{opts=Opts}) -> close(_, _, _, EvHandlerState) -> EvHandlerState. +keepalive(State, EvHandler, EvHandlerState0) -> + {[], EvHandlerState} = send(ping, State, EvHandler, EvHandlerState0), + {State, EvHandlerState}. + %% Send one frame. send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, socket=Socket, transport=Transport, in=In, extensions=Extensions}, diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl index 4859532..88f923f 100644 --- a/src/gun_ws_h.erl +++ b/src/gun_ws_h.erl @@ -20,11 +20,13 @@ -record(state, { reply_to :: pid(), stream_ref :: reference(), - frag_buffer = <<>> :: binary() + frag_buffer = <<>> :: binary(), + silence_pings :: boolean() }). -init(ReplyTo, StreamRef, _, _) -> - {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}. +init(ReplyTo, StreamRef, _, Opts) -> + {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef, + silence_pings=maps:get(silence_pings, Opts, true)}}. handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) -> @@ -33,6 +35,9 @@ handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) -> ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}}, {ok, 1, State#state{frag_buffer= <<>>}}; +handle(Frame, State=#state{silence_pings=true}) when Frame =:= ping; Frame =:= pong; + element(1, Frame) =:= ping; element(1, Frame) =:= pong -> + {ok, 0, State}; handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> ReplyTo ! {gun_ws, self(), StreamRef, Frame}, {ok, 1, State}. diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 4d2387b..55cdfba 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -80,6 +80,33 @@ error_http_request(Config) -> {error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2), gun:close(ConnPid). +keepalive(Config) -> + doc("Ensure that Gun automatically sends ping frames."), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + ws_opts => #{ + keepalive => 100, + silence_pings => false + } + }), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% Gun sent a ping automatically, we therefore receive a pong. + {ws, pong} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid). + +keepalive_default_silence_pings(Config) -> + doc("Ensure that Gun does not forward ping/pong by default."), + {ok, ConnPid} = gun:open("localhost", config(port, Config), #{ + ws_opts => #{keepalive => 100} + }), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% Gun sent a ping automatically, but we silence ping/pong by default. + {error, timeout} = gun:await(ConnPid, StreamRef, 1000), + gun:close(ConnPid). + reject_upgrade(Config) -> doc("Ensure Websocket connections can be rejected."), {ok, ConnPid} = gun:open("localhost", config(port, Config)), -- cgit v1.2.3