diff options
author | Loïc Hoguin <[email protected]> | 2015-04-09 23:13:57 +0300 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2015-04-09 23:13:57 +0300 |
commit | a738db07c2040461f0fd431f04ecf90b157ead5b (patch) | |
tree | 3435489b45067c80fc7b7620cf02622676811c24 | |
parent | 2badb594bdedfd1283306fe2075c2c51abdd600d (diff) | |
download | gun-a738db07c2040461f0fd431f04ecf90b157ead5b.tar.gz gun-a738db07c2040461f0fd431f04ecf90b157ead5b.tar.bz2 gun-a738db07c2040461f0fd431f04ecf90b157ead5b.zip |
Add gun_up and gun_down messages
The flush(Pid) function was enhanced to also discard Websocket
messages and the new up/down messages.
-rw-r--r-- | doc/src/guide/connect.asciidoc | 14 | ||||
-rw-r--r-- | doc/src/manual/gun.asciidoc | 51 | ||||
-rw-r--r-- | src/gun.erl | 50 | ||||
-rw-r--r-- | src/gun_http.erl | 12 | ||||
-rw-r--r-- | src/gun_spdy.erl | 9 | ||||
-rw-r--r-- | src/gun_ws.erl | 8 | ||||
-rw-r--r-- | test/ws_SUITE.erl | 22 |
7 files changed, 133 insertions, 33 deletions
diff --git a/doc/src/guide/connect.asciidoc b/doc/src/guide/connect.asciidoc index e2bcaa7..8de8184 100644 --- a/doc/src/guide/connect.asciidoc +++ b/doc/src/guide/connect.asciidoc @@ -52,6 +52,20 @@ form of a map. [source,erlang] {ok, ConnPid} = gun:open("example.org", 8443, #{transport=>ssl}). +=== Connection up and down messages + +When Gun successfully connects to the server, it sends a +`gun_up` message with the protocol that has been selected +for the connection. + +When the connection is lost, Gun will send a `gun_down` +message indicating the current protocol, the reason the +connection was lost and two list of stream references. + +The first list indicates open streams that _may_ have been +processed by the server. The second list indicates open +streams that the server did not process. + === Monitoring the connection process @todo Gun should detect the owner process being killed diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index 9d592a8..b487eee 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -76,6 +76,57 @@ keepalive => pos_integer():: Calling functions from this module may result in the following messages being sent. +=== {gun_up, ConnPid, Protocol} + +ConnPid = pid():: The pid of the Gun connection process. +Protocol = http | spdy:: The protocol selected for this connection. + +The connection is up. + +This message informs the owner process that the connection or +reconnection completed. + +The protocol selected during the connection is sent in this +message. It can be used to determine the capabilities of the +server. + +Gun will now start processing the messages it received while +waiting for the connection to be up. If this is a reconnection, +then this may not be desirable for all requests. Those requests +should be cancelled when the connection goes down, and any +subsequent messages ignored. + +=== {gun_down, ConnPid, Protocol, Reason, KilledStreams, UnprocessedStreams} + +ConnPid = pid():: The pid of the Gun connection process. +Protocol = http | spdy | ws:: The protocol in use when the connection was lost. +Reason = normal | closed | {error, atom()}:: The reason for the loss of the connection. +KilledStreams = [reference()]:: List of streams that have been brutally terminated. +UnprocessedStreams = [reference()]:: List of streams that have not been processed by the server. + +The connection is down. + +This message informs the owner process that the connection is +currently down. Gun will automatically attempt to reconnect +depending on the `retry` and `retry_timeout` options. + +The reason of the termination is there for debugging purposes +only. You should not rely on this value to know what streams +were processed or completed. + +The _killed streams_ are the active streams that did not complete +before the closing of the connection. Whether they can be retried +safely depends on the protocol used and the idempotence property +of the requests. + +The _unprocessed streams_ are streams that the server did not +start processing yet. They may be retried safely depending on +what streams were killed before. + +When the connection goes back up, Gun will not attempt to retry +requests. It will also not upgrade to Websocket automatically +if that was the protocol in use when the connection was lost. + === {gun_push, ConnPid, StreamRef, NewStreamRef, URI, Headers} ConnPid = pid():: The pid of the Gun connection process. diff --git a/src/gun.erl b/src/gun.erl index 4bb2806..b3623eb 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -341,6 +341,10 @@ flush(StreamRef) -> flush_pid(ServerPid) -> receive + {gun_up, ServerPid, _} -> + flush_pid(ServerPid); + {gun_down, ServerPid, _, _, _, _} -> + flush_pid(ServerPid); {gun_response, ServerPid, _, _, _, _} -> flush_pid(ServerPid); {gun_data, ServerPid, _, _, _} -> @@ -351,6 +355,10 @@ flush_pid(ServerPid) -> flush_pid(ServerPid); {gun_error, ServerPid, _} -> flush_pid(ServerPid); + {gun_ws_upgrade, ServerPid, _} -> + flush_pid(ServerPid); + {gun_ws, ServerPid, _} -> + flush_pid(ServerPid); {'DOWN', _, process, ServerPid, _} -> flush_pid(ServerPid) after 0 -> @@ -424,7 +432,7 @@ init(Parent, Owner, Host, Port, Opts) -> default_transport(443) -> ssl; default_transport(_) -> tcp. -connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) -> +connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) -> Protocols = lists:flatten([case P of http -> <<"http/1.1">>; spdy -> [<<"spdy/3.1">>, <<"spdy/3">>] @@ -438,14 +446,11 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra {ok, <<"spdy/3", _/bits>>} -> {gun_spdy, spdy_opts}; _ -> {gun_http, http_opts} end, - ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), - ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - before_loop(State#state{socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState}); + up(State, Socket, Protocol, ProtoOptsKey); {error, _} -> retry(State, Retries - 1) end; -connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> +connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> TransportOpts = [binary, {active, false} |maps:get(transport_opts, Opts, [])], case Transport:connect(Host, Port, TransportOpts) of @@ -454,15 +459,20 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra [http] -> {gun_http, http_opts}; [spdy] -> {gun_spdy, spdy_opts} end, - ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), - ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - before_loop(State#state{socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState}); + up(State, Socket, Protocol, ProtoOptsKey); {error, _} -> retry(State, Retries - 1) end. -retry(State=#state{opts=Opts}) -> +up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) -> + ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), + ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), + Owner ! {gun_up, self(), Protocol:name()}, + before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}). + +down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) -> + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined}, maps:get(retry, Opts, 5)). @@ -513,7 +523,7 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, case Protocol:handle(Data, ProtoState) of close -> Transport:close(Socket), - retry(State); + down(State, normal); {upgrade, Protocol2, ProtoState2} -> ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); ProtoState2 -> @@ -522,11 +532,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, {Closed, Socket} -> Protocol:close(ProtoState), Transport:close(Socket), - retry(State); - {Error, Socket, _} -> + down(State, closed); + {Error, Socket, Reason} -> Protocol:close(ProtoState), Transport:close(Socket), - retry(State); + down(State, {error, Reason}); {OK, _PreviousSocket, _Data} -> loop(State); {Closed, _PreviousSocket} -> @@ -591,16 +601,16 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket, case Protocol:handle(Data, ProtoState) of close -> Transport:close(Socket), - retry(State); + down(State, normal); ProtoState2 -> ws_loop(State#state{protocol_state=ProtoState2}) end; {Closed, Socket} -> Transport:close(Socket), - retry(State); - {Error, Socket, _} -> + down(State, closed); + {Error, Socket, Reason} -> Transport:close(Socket), - retry(State); + down(State, {error, Reason}); %% Ignore any previous HTTP keep-alive. keepalive -> ws_loop(State); @@ -610,7 +620,7 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket, case Protocol:send(Frame, ProtoState) of close -> Transport:close(Socket), - retry(State); + down(State, normal); ProtoState2 -> ws_loop(State#state{protocol_state=ProtoState2}) end; diff --git a/src/gun_http.erl b/src/gun_http.erl index 8b176fe..d6a69d1 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -15,6 +15,7 @@ -module(gun_http). -export([check_options/1]). +-export([name/0]). -export([init/4]). -export([handle/2]). -export([close/1]). @@ -23,6 +24,7 @@ -export([request/8]). -export([data/4]). -export([cancel/2]). +-export([down/1]). -export([ws_upgrade/7]). -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked. @@ -52,6 +54,8 @@ do_check_options([{version, V}|Opts]) when V =:= 'HTTP/1.1'; V =:= 'HTTP/1.0' -> do_check_options([Opt|_]) -> {error, {options, {http, Opt}}}. +name() -> http. + init(Owner, Socket, Transport, Opts) -> Version = maps:get(version, Opts, 'HTTP/1.1'), #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version}. @@ -286,6 +290,14 @@ cancel(State, StreamRef) -> error_stream_not_found(State) end. +%% HTTP does not provide any way to figure out what streams are unprocessed. +down(#http_state{streams=Streams}) -> + KilledStreams = [case Ref of + {websocket, Ref2, _, _, _} -> Ref2; + _ -> Ref + end || {Ref, _} <- Streams], + {KilledStreams, []}. + error_stream_closed(State=#http_state{owner=Owner}) -> Owner ! {gun_error, self(), {badstate, "The stream has already been closed."}}, diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl index 6aeb517..c417e78 100644 --- a/src/gun_spdy.erl +++ b/src/gun_spdy.erl @@ -15,6 +15,7 @@ -module(gun_spdy). -export([check_options/1]). +-export([name/0]). -export([init/4]). -export([handle/2]). -export([close/1]). @@ -23,6 +24,7 @@ -export([request/8]). -export([data/4]). -export([cancel/2]). +-export([down/1]). -record(stream, { id :: non_neg_integer(), @@ -52,6 +54,8 @@ do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options([Opt|_]) -> {error, {options, {spdy, Opt}}}. +name() -> spdy. + init(Owner, Socket, Transport, _Opts) -> #spdy_state{owner=Owner, socket=Socket, transport=Transport, zdef=cow_spdy:deflate_init(), zinf=cow_spdy:inflate_init()}. @@ -263,6 +267,11 @@ cancel(State=#spdy_state{socket=Socket, transport=Transport}, error_stream_not_found(State) end. +%% @todo Add unprocessed streams when GOAWAY handling is done. +down(#spdy_state{streams=Streams}) -> + KilledStreams = [Ref || #stream{ref=Ref} <- Streams], + {KilledStreams, []}. + error_stream_closed(State=#spdy_state{owner=Owner}) -> Owner ! {gun_error, self(), {badstate, "The stream has already been closed."}}, diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 5379362..246cc57 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -14,9 +14,11 @@ -module(gun_ws). +-export([name/0]). -export([init/5]). -export([handle/2]). -export([send/2]). +-export([down/1]). -record(payload, { type = undefined :: cow_ws:frame_type(), @@ -40,6 +42,8 @@ extensions = #{} :: cow_ws:extensions() }). +name() -> ws. + %% @todo Protocols init(Owner, Socket, Transport, Extensions, _Protocols) -> Owner ! {gun_ws_upgrade, self(), ok}, @@ -123,3 +127,7 @@ send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Exten {close, _, _} -> close; _ -> State end. + +%% Websocket has no concept of streams. +down(_) -> + {[], []}. diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 2310b31..2df4329 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -120,6 +120,8 @@ loop(Pid, Ref) -> {gun_ws, Pid, Frame} -> gun:ws_send(Pid, Frame), loop(Pid, Ref); + {gun_down, Pid, ws, _, _, _} -> + close(Pid, Ref); {'DOWN', Ref, process, Pid, normal} -> close(Pid, Ref); Msg -> @@ -141,6 +143,12 @@ log_output() -> connect(Path) -> {ok, Pid} = gun:open("127.0.0.1", 33080, #{retry=>0}), + receive + {gun_up, Pid, http} -> + ok + after 1000 -> + error(open_timeout) + end, Ref = monitor(process, Pid), gun:ws_upgrade(Pid, Path, [], #{compress => true}), receive @@ -156,19 +164,7 @@ connect(Path) -> close(Pid, Ref) -> demonitor(Ref), gun:close(Pid), - flush(Pid). - -flush(Pid) -> - receive - {gun_ws, Pid, _} -> - flush(Pid); - {gun_ws_upgrade, Pid, _} -> - flush(Pid); - {'DOWN', _, process, Pid, _} -> - flush(Pid) - after 0 -> - ok - end. + gun:flush(Pid). terminate() -> Res = os:cmd("killall wstest"), |