aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/guide/connect.asciidoc14
-rw-r--r--doc/src/manual/gun.asciidoc51
-rw-r--r--src/gun.erl50
-rw-r--r--src/gun_http.erl12
-rw-r--r--src/gun_spdy.erl9
-rw-r--r--src/gun_ws.erl8
-rw-r--r--test/ws_SUITE.erl22
7 files changed, 133 insertions, 33 deletions
diff --git a/doc/src/guide/connect.asciidoc b/doc/src/guide/connect.asciidoc
index e2bcaa7..8de8184 100644
--- a/doc/src/guide/connect.asciidoc
+++ b/doc/src/guide/connect.asciidoc
@@ -52,6 +52,20 @@ form of a map.
[source,erlang]
{ok, ConnPid} = gun:open("example.org", 8443, #{transport=>ssl}).
+=== Connection up and down messages
+
+When Gun successfully connects to the server, it sends a
+`gun_up` message with the protocol that has been selected
+for the connection.
+
+When the connection is lost, Gun will send a `gun_down`
+message indicating the current protocol, the reason the
+connection was lost and two list of stream references.
+
+The first list indicates open streams that _may_ have been
+processed by the server. The second list indicates open
+streams that the server did not process.
+
=== Monitoring the connection process
@todo Gun should detect the owner process being killed
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index 9d592a8..b487eee 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -76,6 +76,57 @@ keepalive => pos_integer()::
Calling functions from this module may result in the following
messages being sent.
+=== {gun_up, ConnPid, Protocol}
+
+ConnPid = pid():: The pid of the Gun connection process.
+Protocol = http | spdy:: The protocol selected for this connection.
+
+The connection is up.
+
+This message informs the owner process that the connection or
+reconnection completed.
+
+The protocol selected during the connection is sent in this
+message. It can be used to determine the capabilities of the
+server.
+
+Gun will now start processing the messages it received while
+waiting for the connection to be up. If this is a reconnection,
+then this may not be desirable for all requests. Those requests
+should be cancelled when the connection goes down, and any
+subsequent messages ignored.
+
+=== {gun_down, ConnPid, Protocol, Reason, KilledStreams, UnprocessedStreams}
+
+ConnPid = pid():: The pid of the Gun connection process.
+Protocol = http | spdy | ws:: The protocol in use when the connection was lost.
+Reason = normal | closed | {error, atom()}:: The reason for the loss of the connection.
+KilledStreams = [reference()]:: List of streams that have been brutally terminated.
+UnprocessedStreams = [reference()]:: List of streams that have not been processed by the server.
+
+The connection is down.
+
+This message informs the owner process that the connection is
+currently down. Gun will automatically attempt to reconnect
+depending on the `retry` and `retry_timeout` options.
+
+The reason of the termination is there for debugging purposes
+only. You should not rely on this value to know what streams
+were processed or completed.
+
+The _killed streams_ are the active streams that did not complete
+before the closing of the connection. Whether they can be retried
+safely depends on the protocol used and the idempotence property
+of the requests.
+
+The _unprocessed streams_ are streams that the server did not
+start processing yet. They may be retried safely depending on
+what streams were killed before.
+
+When the connection goes back up, Gun will not attempt to retry
+requests. It will also not upgrade to Websocket automatically
+if that was the protocol in use when the connection was lost.
+
=== {gun_push, ConnPid, StreamRef, NewStreamRef, URI, Headers}
ConnPid = pid():: The pid of the Gun connection process.
diff --git a/src/gun.erl b/src/gun.erl
index 4bb2806..b3623eb 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -341,6 +341,10 @@ flush(StreamRef) ->
flush_pid(ServerPid) ->
receive
+ {gun_up, ServerPid, _} ->
+ flush_pid(ServerPid);
+ {gun_down, ServerPid, _, _, _, _} ->
+ flush_pid(ServerPid);
{gun_response, ServerPid, _, _, _, _} ->
flush_pid(ServerPid);
{gun_data, ServerPid, _, _, _} ->
@@ -351,6 +355,10 @@ flush_pid(ServerPid) ->
flush_pid(ServerPid);
{gun_error, ServerPid, _} ->
flush_pid(ServerPid);
+ {gun_ws_upgrade, ServerPid, _} ->
+ flush_pid(ServerPid);
+ {gun_ws, ServerPid, _} ->
+ flush_pid(ServerPid);
{'DOWN', _, process, ServerPid, _} ->
flush_pid(ServerPid)
after 0 ->
@@ -424,7 +432,7 @@ init(Parent, Owner, Host, Port, Opts) ->
default_transport(443) -> ssl;
default_transport(_) -> tcp.
-connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) ->
+connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) ->
Protocols = lists:flatten([case P of
http -> <<"http/1.1">>;
spdy -> [<<"spdy/3.1">>, <<"spdy/3">>]
@@ -438,14 +446,11 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra
{ok, <<"spdy/3", _/bits>>} -> {gun_spdy, spdy_opts};
_ -> {gun_http, http_opts}
end,
- ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
- ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- before_loop(State#state{socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState});
+ up(State, Socket, Protocol, ProtoOptsKey);
{error, _} ->
retry(State, Retries - 1)
end;
-connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
+connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
TransportOpts = [binary, {active, false}
|maps:get(transport_opts, Opts, [])],
case Transport:connect(Host, Port, TransportOpts) of
@@ -454,15 +459,20 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra
[http] -> {gun_http, http_opts};
[spdy] -> {gun_spdy, spdy_opts}
end,
- ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
- ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- before_loop(State#state{socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState});
+ up(State, Socket, Protocol, ProtoOptsKey);
{error, _} ->
retry(State, Retries - 1)
end.
-retry(State=#state{opts=Opts}) ->
+up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) ->
+ ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
+ ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
+ Owner ! {gun_up, self(), Protocol:name()},
+ before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}).
+
+down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) ->
+ {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
+ Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined},
maps:get(retry, Opts, 5)).
@@ -513,7 +523,7 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port,
case Protocol:handle(Data, ProtoState) of
close ->
Transport:close(Socket),
- retry(State);
+ down(State, normal);
{upgrade, Protocol2, ProtoState2} ->
ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2});
ProtoState2 ->
@@ -522,11 +532,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port,
{Closed, Socket} ->
Protocol:close(ProtoState),
Transport:close(Socket),
- retry(State);
- {Error, Socket, _} ->
+ down(State, closed);
+ {Error, Socket, Reason} ->
Protocol:close(ProtoState),
Transport:close(Socket),
- retry(State);
+ down(State, {error, Reason});
{OK, _PreviousSocket, _Data} ->
loop(State);
{Closed, _PreviousSocket} ->
@@ -591,16 +601,16 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket,
case Protocol:handle(Data, ProtoState) of
close ->
Transport:close(Socket),
- retry(State);
+ down(State, normal);
ProtoState2 ->
ws_loop(State#state{protocol_state=ProtoState2})
end;
{Closed, Socket} ->
Transport:close(Socket),
- retry(State);
- {Error, Socket, _} ->
+ down(State, closed);
+ {Error, Socket, Reason} ->
Transport:close(Socket),
- retry(State);
+ down(State, {error, Reason});
%% Ignore any previous HTTP keep-alive.
keepalive ->
ws_loop(State);
@@ -610,7 +620,7 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket,
case Protocol:send(Frame, ProtoState) of
close ->
Transport:close(Socket),
- retry(State);
+ down(State, normal);
ProtoState2 ->
ws_loop(State#state{protocol_state=ProtoState2})
end;
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 8b176fe..d6a69d1 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -15,6 +15,7 @@
-module(gun_http).
-export([check_options/1]).
+-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/1]).
@@ -23,6 +24,7 @@
-export([request/8]).
-export([data/4]).
-export([cancel/2]).
+-export([down/1]).
-export([ws_upgrade/7]).
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked.
@@ -52,6 +54,8 @@ do_check_options([{version, V}|Opts]) when V =:= 'HTTP/1.1'; V =:= 'HTTP/1.0' ->
do_check_options([Opt|_]) ->
{error, {options, {http, Opt}}}.
+name() -> http.
+
init(Owner, Socket, Transport, Opts) ->
Version = maps:get(version, Opts, 'HTTP/1.1'),
#http_state{owner=Owner, socket=Socket, transport=Transport, version=Version}.
@@ -286,6 +290,14 @@ cancel(State, StreamRef) ->
error_stream_not_found(State)
end.
+%% HTTP does not provide any way to figure out what streams are unprocessed.
+down(#http_state{streams=Streams}) ->
+ KilledStreams = [case Ref of
+ {websocket, Ref2, _, _, _} -> Ref2;
+ _ -> Ref
+ end || {Ref, _} <- Streams],
+ {KilledStreams, []}.
+
error_stream_closed(State=#http_state{owner=Owner}) ->
Owner ! {gun_error, self(), {badstate,
"The stream has already been closed."}},
diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl
index 6aeb517..c417e78 100644
--- a/src/gun_spdy.erl
+++ b/src/gun_spdy.erl
@@ -15,6 +15,7 @@
-module(gun_spdy).
-export([check_options/1]).
+-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/1]).
@@ -23,6 +24,7 @@
-export([request/8]).
-export([data/4]).
-export([cancel/2]).
+-export([down/1]).
-record(stream, {
id :: non_neg_integer(),
@@ -52,6 +54,8 @@ do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options([Opt|_]) ->
{error, {options, {spdy, Opt}}}.
+name() -> spdy.
+
init(Owner, Socket, Transport, _Opts) ->
#spdy_state{owner=Owner, socket=Socket, transport=Transport,
zdef=cow_spdy:deflate_init(), zinf=cow_spdy:inflate_init()}.
@@ -263,6 +267,11 @@ cancel(State=#spdy_state{socket=Socket, transport=Transport},
error_stream_not_found(State)
end.
+%% @todo Add unprocessed streams when GOAWAY handling is done.
+down(#spdy_state{streams=Streams}) ->
+ KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
+ {KilledStreams, []}.
+
error_stream_closed(State=#spdy_state{owner=Owner}) ->
Owner ! {gun_error, self(), {badstate,
"The stream has already been closed."}},
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 5379362..246cc57 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -14,9 +14,11 @@
-module(gun_ws).
+-export([name/0]).
-export([init/5]).
-export([handle/2]).
-export([send/2]).
+-export([down/1]).
-record(payload, {
type = undefined :: cow_ws:frame_type(),
@@ -40,6 +42,8 @@
extensions = #{} :: cow_ws:extensions()
}).
+name() -> ws.
+
%% @todo Protocols
init(Owner, Socket, Transport, Extensions, _Protocols) ->
Owner ! {gun_ws_upgrade, self(), ok},
@@ -123,3 +127,7 @@ send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Exten
{close, _, _} -> close;
_ -> State
end.
+
+%% Websocket has no concept of streams.
+down(_) ->
+ {[], []}.
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 2310b31..2df4329 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -120,6 +120,8 @@ loop(Pid, Ref) ->
{gun_ws, Pid, Frame} ->
gun:ws_send(Pid, Frame),
loop(Pid, Ref);
+ {gun_down, Pid, ws, _, _, _} ->
+ close(Pid, Ref);
{'DOWN', Ref, process, Pid, normal} ->
close(Pid, Ref);
Msg ->
@@ -141,6 +143,12 @@ log_output() ->
connect(Path) ->
{ok, Pid} = gun:open("127.0.0.1", 33080, #{retry=>0}),
+ receive
+ {gun_up, Pid, http} ->
+ ok
+ after 1000 ->
+ error(open_timeout)
+ end,
Ref = monitor(process, Pid),
gun:ws_upgrade(Pid, Path, [], #{compress => true}),
receive
@@ -156,19 +164,7 @@ connect(Path) ->
close(Pid, Ref) ->
demonitor(Ref),
gun:close(Pid),
- flush(Pid).
-
-flush(Pid) ->
- receive
- {gun_ws, Pid, _} ->
- flush(Pid);
- {gun_ws_upgrade, Pid, _} ->
- flush(Pid);
- {'DOWN', _, process, Pid, _} ->
- flush(Pid)
- after 0 ->
- ok
- end.
+ gun:flush(Pid).
terminate() ->
Res = os:cmd("killall wstest"),