aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2015-04-09 23:13:57 +0300
committerLoïc Hoguin <[email protected]>2015-04-09 23:13:57 +0300
commita738db07c2040461f0fd431f04ecf90b157ead5b (patch)
tree3435489b45067c80fc7b7620cf02622676811c24 /src/gun.erl
parent2badb594bdedfd1283306fe2075c2c51abdd600d (diff)
downloadgun-a738db07c2040461f0fd431f04ecf90b157ead5b.tar.gz
gun-a738db07c2040461f0fd431f04ecf90b157ead5b.tar.bz2
gun-a738db07c2040461f0fd431f04ecf90b157ead5b.zip
Add gun_up and gun_down messages
The flush(Pid) function was enhanced to also discard Websocket messages and the new up/down messages.
Diffstat (limited to 'src/gun.erl')
-rw-r--r--src/gun.erl50
1 files changed, 30 insertions, 20 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 4bb2806..b3623eb 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -341,6 +341,10 @@ flush(StreamRef) ->
flush_pid(ServerPid) ->
receive
+ {gun_up, ServerPid, _} ->
+ flush_pid(ServerPid);
+ {gun_down, ServerPid, _, _, _, _} ->
+ flush_pid(ServerPid);
{gun_response, ServerPid, _, _, _, _} ->
flush_pid(ServerPid);
{gun_data, ServerPid, _, _, _} ->
@@ -351,6 +355,10 @@ flush_pid(ServerPid) ->
flush_pid(ServerPid);
{gun_error, ServerPid, _} ->
flush_pid(ServerPid);
+ {gun_ws_upgrade, ServerPid, _} ->
+ flush_pid(ServerPid);
+ {gun_ws, ServerPid, _} ->
+ flush_pid(ServerPid);
{'DOWN', _, process, ServerPid, _} ->
flush_pid(ServerPid)
after 0 ->
@@ -424,7 +432,7 @@ init(Parent, Owner, Host, Port, Opts) ->
default_transport(443) -> ssl;
default_transport(_) -> tcp.
-connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) ->
+connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) ->
Protocols = lists:flatten([case P of
http -> <<"http/1.1">>;
spdy -> [<<"spdy/3.1">>, <<"spdy/3">>]
@@ -438,14 +446,11 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra
{ok, <<"spdy/3", _/bits>>} -> {gun_spdy, spdy_opts};
_ -> {gun_http, http_opts}
end,
- ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
- ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- before_loop(State#state{socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState});
+ up(State, Socket, Protocol, ProtoOptsKey);
{error, _} ->
retry(State, Retries - 1)
end;
-connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
+connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
TransportOpts = [binary, {active, false}
|maps:get(transport_opts, Opts, [])],
case Transport:connect(Host, Port, TransportOpts) of
@@ -454,15 +459,20 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra
[http] -> {gun_http, http_opts};
[spdy] -> {gun_spdy, spdy_opts}
end,
- ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
- ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- before_loop(State#state{socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState});
+ up(State, Socket, Protocol, ProtoOptsKey);
{error, _} ->
retry(State, Retries - 1)
end.
-retry(State=#state{opts=Opts}) ->
+up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) ->
+ ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
+ ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
+ Owner ! {gun_up, self(), Protocol:name()},
+ before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}).
+
+down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) ->
+ {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
+ Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined},
maps:get(retry, Opts, 5)).
@@ -513,7 +523,7 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port,
case Protocol:handle(Data, ProtoState) of
close ->
Transport:close(Socket),
- retry(State);
+ down(State, normal);
{upgrade, Protocol2, ProtoState2} ->
ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2});
ProtoState2 ->
@@ -522,11 +532,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port,
{Closed, Socket} ->
Protocol:close(ProtoState),
Transport:close(Socket),
- retry(State);
- {Error, Socket, _} ->
+ down(State, closed);
+ {Error, Socket, Reason} ->
Protocol:close(ProtoState),
Transport:close(Socket),
- retry(State);
+ down(State, {error, Reason});
{OK, _PreviousSocket, _Data} ->
loop(State);
{Closed, _PreviousSocket} ->
@@ -591,16 +601,16 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket,
case Protocol:handle(Data, ProtoState) of
close ->
Transport:close(Socket),
- retry(State);
+ down(State, normal);
ProtoState2 ->
ws_loop(State#state{protocol_state=ProtoState2})
end;
{Closed, Socket} ->
Transport:close(Socket),
- retry(State);
- {Error, Socket, _} ->
+ down(State, closed);
+ {Error, Socket, Reason} ->
Transport:close(Socket),
- retry(State);
+ down(State, {error, Reason});
%% Ignore any previous HTTP keep-alive.
keepalive ->
ws_loop(State);
@@ -610,7 +620,7 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket,
case Protocol:send(Frame, ProtoState) of
close ->
Transport:close(Socket),
- retry(State);
+ down(State, normal);
ProtoState2 ->
ws_loop(State#state{protocol_state=ProtoState2})
end;