aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-09-24 19:18:35 +0200
committerLoïc Hoguin <[email protected]>2019-09-24 19:28:48 +0200
commitd9a970be90d0105af215531d74809878f9c21338 (patch)
treeeb891e8c9373dee9f7353a9920aaf25f6b2f2570
parenta18ca0ae8ff76594c7b629f4340adab0a30954c4 (diff)
downloadgun-d9a970be90d0105af215531d74809878f9c21338.tar.gz
gun-d9a970be90d0105af215531d74809878f9c21338.tar.bz2
gun-d9a970be90d0105af215531d74809878f9c21338.zip
Add auto-ping to Websocket and a silence_pings option
The auto-ping will at regular interval send a ping frame. The silence_pings option defaults to true. It can be set to false when the user needs to receive ping/pong frames.
-rw-r--r--doc/src/guide/websocket.asciidoc8
-rw-r--r--doc/src/manual/gun.asciidoc15
-rw-r--r--doc/src/manual/gun_ws.asciidoc5
-rw-r--r--src/gun.erl15
-rw-r--r--src/gun_http.erl14
-rw-r--r--src/gun_http2.erl6
-rw-r--r--src/gun_ws.erl47
-rw-r--r--src/gun_ws_h.erl11
-rw-r--r--test/ws_SUITE.erl27
9 files changed, 102 insertions, 46 deletions
diff --git a/doc/src/guide/websocket.asciidoc b/doc/src/guide/websocket.asciidoc
index 662b9ea..287b3f7 100644
--- a/doc/src/guide/websocket.asciidoc
+++ b/doc/src/guide/websocket.asciidoc
@@ -122,11 +122,3 @@ receive
handle_frame(ConnPid, StreamRef, Frame)
end.
----
-
-// @todo auto ping has not been implemented yet
-//
-//Gun will automatically send ping messages to the server to keep
-//the connection alive, however if the connection dies and Gun has
-//to reconnect it will not upgrade to Websocket automatically, you
-//need to perform the operation when you receive the `gun_error`
-//message.
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index 7b54666..478d40b 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -459,7 +459,9 @@ ws_opts() :: #{
closing_timeout => timeout(),
compress => boolean(),
flow => pos_integer(),
- protocols => [{binary(), module()}]
+ keepalive => timeout(),
+ protocols => [{binary(), module()}],
+ silence_pings => boolean()
}
----
@@ -484,6 +486,10 @@ flow - see below::
The initial flow control value for the Websocket connection.
By default flow control is disabled.
+keepalive (5000)::
+
+Time between pings in milliseconds.
+
protocols ([])::
A non-empty list enables Websocket protocol negotiation. The
@@ -491,6 +497,12 @@ list of protocols will be sent in the sec-websocket-protocol
request header. The handler module interface is currently
undocumented and must be set to `gun_ws_h`.
+silence_pings (true)::
+
+Whether the ping and pong frames should be sent to the user.
+In all cases Gun will automatically send a pong frame back
+when receiving a ping.
+
// @todo Document default_protocol and user_opts.
== Changelog
@@ -517,6 +529,7 @@ undocumented and must be set to `gun_ws_h`.
* *2.0*: Function `gun:headers/4,5` introduced.
* *2.0*: The `keepalive` option is now set to `infinity` by
default for the HTTP/1.1 protocol, disabling it.
+* *2.0*: Websocket options `keepalive` and `silence_pings` introduced.
* *1.3*: Add the CONNECT destination's `protocols` option and
deprecate the previously introduced `protocol` option.
* *1.2*: Introduce the type `connect_destination()`.
diff --git a/doc/src/manual/gun_ws.asciidoc b/doc/src/manual/gun_ws.asciidoc
index 127f2a2..374b0b3 100644
--- a/doc/src/manual/gun_ws.asciidoc
+++ b/doc/src/manual/gun_ws.asciidoc
@@ -12,9 +12,10 @@ gun_ws - Websocket frame
ConnPid :: pid()
StreamRef :: reference()
-Frame :: close
+Frame :: close | ping | pong
| {text | binary | close, binary()}
| {close, non_neg_integer(), binary()}
+ | {ping | pong, binary()}
----
Websocket frame.
@@ -41,6 +42,8 @@ The Websocket frame in question.
== Changelog
+* *2.0*: Depending on the option `silence_pings`, ping and
+ pong frames may be sent as well.
* *1.0*: Message introduced.
== Examples
diff --git a/src/gun.erl b/src/gun.erl
index ab26dbf..12f4319 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -222,12 +222,13 @@
}.
-export_type([socks_opts/0]).
-%% @todo keepalive
-type ws_opts() :: #{
closing_timeout => timeout(),
compress => boolean(),
flow => pos_integer(),
- protocols => [{binary(), module()}]
+ keepalive => timeout(),
+ protocols => [{binary(), module()}],
+ silence_pings => boolean()
}.
-export_type([ws_opts/0]).
@@ -602,7 +603,7 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
| {trailers, resp_headers()}
| {push, reference(), binary(), binary(), resp_headers()}
| {upgrade, [binary()], resp_headers()}
- | {ws, ws_frame()} %% @todo Excluding ping/pong, for now.
+ | {ws, ws_frame()}
| {error, {stream_error | connection_error | down, any()} | timeout}.
-spec await(pid(), reference()) -> await_result().
@@ -1225,9 +1226,11 @@ handle_common_connected_no_input(info, {Error, Socket, Reason}, _,
%% We should have a timeout function in protocols that deal with
%% received timeouts. Currently the timeout messages are ignored.
handle_common_connected_no_input(info, keepalive, _,
- State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:keepalive(ProtoState),
- {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
+ State=#state{protocol=Protocol, protocol_state=ProtoState0,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
+ {keep_state, keepalive_timeout(State#state{
+ protocol_state=ProtoState, event_handler_state=EvHandlerState})};
handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, _,
State0=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 87b50c8..f27563e 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -25,7 +25,7 @@
-export([update_flow/4]).
-export([closing/4]).
-export([close/4]).
--export([keepalive/1]).
+-export([keepalive/3]).
-export([headers/11]).
-export([request/12]).
-export([data/7]).
@@ -473,14 +473,14 @@ close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
close_streams(Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
-keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) ->
- State;
+keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
+ {State, EvHandlerState};
%% We can only keep-alive by sending an empty line in-between streams.
-keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
+keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
Transport:send(Socket, <<"\r\n">>),
- State;
-keepalive(State) ->
- State.
+ {State, EvHandlerState};
+keepalive(State, _, EvHandlerState) ->
+ {State, EvHandlerState}.
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 7041ad9..e6f09ea 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -25,7 +25,7 @@
-export([update_flow/4]).
-export([closing/4]).
-export([close/4]).
--export([keepalive/1]).
+-export([keepalive/3]).
-export([headers/11]).
-export([request/12]).
-export([data/7]).
@@ -512,9 +512,9 @@ close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
ReplyTo ! {gun_error, self(), StreamRef, Reason},
ok.
-keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
+keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
Transport:send(Socket, cow_http2:ping(0)),
- State.
+ {State, EvHandlerState}.
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0, streams=Streams},
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index c4eefaf..ba61577 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -18,11 +18,13 @@
-export([name/0]).
-export([opts_name/0]).
-export([has_keepalive/0]).
+-export([default_keepalive/0]).
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
-export([closing/4]).
-export([close/4]).
+-export([keepalive/3]).
-export([send/4]).
-export([down/1]).
@@ -68,11 +70,17 @@ do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
do_check_options(Opts);
do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
do_check_options(Opts);
+do_check_options([{keepalive, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={protocols, L}|Opts]) when is_list(L) ->
case lists:usort(lists:flatten([[is_binary(B), is_atom(M)] || {B, M} <- L])) of
[true] -> do_check_options(Opts);
_ -> {error, {options, {ws, Opt}}}
end;
+do_check_options([{silence_pings, B}|Opts]) when B =:= true; B =:= false ->
+ do_check_options(Opts);
do_check_options([{user_opts, _}|Opts]) ->
do_check_options(Opts);
do_check_options([Opt|_]) ->
@@ -80,7 +88,8 @@ do_check_options([Opt|_]) ->
name() -> ws.
opts_name() -> ws_opts.
-has_keepalive() -> false.
+has_keepalive() -> true.
+default_keepalive() -> 5000.
init(Owner, Socket, Transport, #{stream_ref := StreamRef, headers := Headers,
extensions := Extensions, flow := InitialFlow, handler := Handler, opts := Opts}) ->
@@ -178,16 +187,6 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
payload => Payload
}, EvHandlerState0),
case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of
- ping ->
- {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
- handle(Rest, State0, EvHandler, EvHandlerState);
- {ping, Payload} ->
- {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
- handle(Rest, State0, EvHandler, EvHandlerState);
- pong ->
- handle(Rest, State0, EvHandler, EvHandlerState1);
- {pong, _} ->
- handle(Rest, State0, EvHandler, EvHandlerState1);
Frame ->
{ok, Dec, HandlerState} = Handler:handle(Frame, HandlerState0),
Flow = case Flow0 of
@@ -195,13 +194,23 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
_ -> Flow0 - Dec
end,
State1 = State0#ws_state{flow=Flow, handler_state=HandlerState},
- State = case Frame of
- close -> State1#ws_state{in=close};
- {close, _, _} -> State1#ws_state{in=close};
- {fragment, fin, _, _} -> State1#ws_state{frag_state=undefined};
- _ -> State1
+ {State, EvHandlerState} = case Frame of
+ ping ->
+ {[], EvHandlerState2} = send(pong, State1, EvHandler, EvHandlerState1),
+ {State1, EvHandlerState2};
+ {ping, Payload} ->
+ {[], EvHandlerState2} = send({pong, Payload}, State1, EvHandler, EvHandlerState1),
+ {State1, EvHandlerState2};
+ close ->
+ {State1#ws_state{in=close}, EvHandlerState1};
+ {close, _, _} ->
+ {State1#ws_state{in=close}, EvHandlerState1};
+ {fragment, fin, _, _} ->
+ {State1#ws_state{frag_state=undefined}, EvHandlerState1};
+ _ ->
+ {State1, EvHandlerState1}
end,
- handle(Rest, State, EvHandler, EvHandlerState1)
+ handle(Rest, State, EvHandler, EvHandlerState)
end.
update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
@@ -234,6 +243,10 @@ closing(#ws_state{opts=Opts}) ->
close(_, _, _, EvHandlerState) ->
EvHandlerState.
+keepalive(State, EvHandler, EvHandlerState0) ->
+ {[], EvHandlerState} = send(ping, State, EvHandler, EvHandlerState0),
+ {State, EvHandlerState}.
+
%% Send one frame.
send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
socket=Socket, transport=Transport, in=In, extensions=Extensions},
diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl
index 4859532..88f923f 100644
--- a/src/gun_ws_h.erl
+++ b/src/gun_ws_h.erl
@@ -20,11 +20,13 @@
-record(state, {
reply_to :: pid(),
stream_ref :: reference(),
- frag_buffer = <<>> :: binary()
+ frag_buffer = <<>> :: binary(),
+ silence_pings :: boolean()
}).
-init(ReplyTo, StreamRef, _, _) ->
- {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef}}.
+init(ReplyTo, StreamRef, _, Opts) ->
+ {ok, #state{reply_to=ReplyTo, stream_ref=StreamRef,
+ silence_pings=maps:get(silence_pings, Opts, true)}}.
handle({fragment, nofin, _, Payload},
State=#state{frag_buffer=SoFar}) ->
@@ -33,6 +35,9 @@ handle({fragment, fin, Type, Payload},
State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) ->
ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}},
{ok, 1, State#state{frag_buffer= <<>>}};
+handle(Frame, State=#state{silence_pings=true}) when Frame =:= ping; Frame =:= pong;
+ element(1, Frame) =:= ping; element(1, Frame) =:= pong ->
+ {ok, 0, State};
handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
ReplyTo ! {gun_ws, self(), StreamRef, Frame},
{ok, 1, State}.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 4d2387b..55cdfba 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -80,6 +80,33 @@ error_http_request(Config) ->
{error, {connection_error, {badstate, _}}} = gun:await(ConnPid, StreamRef2),
gun:close(ConnPid).
+keepalive(Config) ->
+ doc("Ensure that Gun automatically sends ping frames."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ ws_opts => #{
+ keepalive => 100,
+ silence_pings => false
+ }
+ }),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% Gun sent a ping automatically, we therefore receive a pong.
+ {ws, pong} = gun:await(ConnPid, StreamRef),
+ gun:close(ConnPid).
+
+keepalive_default_silence_pings(Config) ->
+ doc("Ensure that Gun does not forward ping/pong by default."),
+ {ok, ConnPid} = gun:open("localhost", config(port, Config), #{
+ ws_opts => #{keepalive => 100}
+ }),
+ {ok, _} = gun:await_up(ConnPid),
+ StreamRef = gun:ws_upgrade(ConnPid, "/", []),
+ {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef),
+ %% Gun sent a ping automatically, but we silence ping/pong by default.
+ {error, timeout} = gun:await(ConnPid, StreamRef, 1000),
+ gun:close(ConnPid).
+
reject_upgrade(Config) ->
doc("Ensure Websocket connections can be rejected."),
{ok, ConnPid} = gun:open("localhost", config(port, Config)),