diff options
author | Loïc Hoguin <[email protected]> | 2015-04-09 23:13:57 +0300 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2015-04-09 23:13:57 +0300 |
commit | a738db07c2040461f0fd431f04ecf90b157ead5b (patch) | |
tree | 3435489b45067c80fc7b7620cf02622676811c24 /src/gun.erl | |
parent | 2badb594bdedfd1283306fe2075c2c51abdd600d (diff) | |
download | gun-a738db07c2040461f0fd431f04ecf90b157ead5b.tar.gz gun-a738db07c2040461f0fd431f04ecf90b157ead5b.tar.bz2 gun-a738db07c2040461f0fd431f04ecf90b157ead5b.zip |
Add gun_up and gun_down messages
The flush(Pid) function was enhanced to also discard Websocket
messages and the new up/down messages.
Diffstat (limited to 'src/gun.erl')
-rw-r--r-- | src/gun.erl | 50 |
1 files changed, 30 insertions, 20 deletions
diff --git a/src/gun.erl b/src/gun.erl index 4bb2806..b3623eb 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -341,6 +341,10 @@ flush(StreamRef) -> flush_pid(ServerPid) -> receive + {gun_up, ServerPid, _} -> + flush_pid(ServerPid); + {gun_down, ServerPid, _, _, _, _} -> + flush_pid(ServerPid); {gun_response, ServerPid, _, _, _, _} -> flush_pid(ServerPid); {gun_data, ServerPid, _, _, _} -> @@ -351,6 +355,10 @@ flush_pid(ServerPid) -> flush_pid(ServerPid); {gun_error, ServerPid, _} -> flush_pid(ServerPid); + {gun_ws_upgrade, ServerPid, _} -> + flush_pid(ServerPid); + {gun_ws, ServerPid, _} -> + flush_pid(ServerPid); {'DOWN', _, process, ServerPid, _} -> flush_pid(ServerPid) after 0 -> @@ -424,7 +432,7 @@ init(Parent, Owner, Host, Port, Opts) -> default_transport(443) -> ssl; default_transport(_) -> tcp. -connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) -> +connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) -> Protocols = lists:flatten([case P of http -> <<"http/1.1">>; spdy -> [<<"spdy/3.1">>, <<"spdy/3">>] @@ -438,14 +446,11 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra {ok, <<"spdy/3", _/bits>>} -> {gun_spdy, spdy_opts}; _ -> {gun_http, http_opts} end, - ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), - ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - before_loop(State#state{socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState}); + up(State, Socket, Protocol, ProtoOptsKey); {error, _} -> retry(State, Retries - 1) end; -connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> +connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> TransportOpts = [binary, {active, false} |maps:get(transport_opts, Opts, [])], case Transport:connect(Host, Port, TransportOpts) of @@ -454,15 +459,20 @@ connect(State=#state{owner=Owner, host=Host, port=Port, opts=Opts, transport=Tra [http] -> {gun_http, http_opts}; [spdy] -> {gun_spdy, spdy_opts} end, - ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), - ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - before_loop(State#state{socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState}); + up(State, Socket, Protocol, ProtoOptsKey); {error, _} -> retry(State, Retries - 1) end. -retry(State=#state{opts=Opts}) -> +up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) -> + ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), + ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), + Owner ! {gun_up, self(), Protocol:name()}, + before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}). + +down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) -> + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined}, maps:get(retry, Opts, 5)). @@ -513,7 +523,7 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, case Protocol:handle(Data, ProtoState) of close -> Transport:close(Socket), - retry(State); + down(State, normal); {upgrade, Protocol2, ProtoState2} -> ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); ProtoState2 -> @@ -522,11 +532,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, {Closed, Socket} -> Protocol:close(ProtoState), Transport:close(Socket), - retry(State); - {Error, Socket, _} -> + down(State, closed); + {Error, Socket, Reason} -> Protocol:close(ProtoState), Transport:close(Socket), - retry(State); + down(State, {error, Reason}); {OK, _PreviousSocket, _Data} -> loop(State); {Closed, _PreviousSocket} -> @@ -591,16 +601,16 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket, case Protocol:handle(Data, ProtoState) of close -> Transport:close(Socket), - retry(State); + down(State, normal); ProtoState2 -> ws_loop(State#state{protocol_state=ProtoState2}) end; {Closed, Socket} -> Transport:close(Socket), - retry(State); - {Error, Socket, _} -> + down(State, closed); + {Error, Socket, Reason} -> Transport:close(Socket), - retry(State); + down(State, {error, Reason}); %% Ignore any previous HTTP keep-alive. keepalive -> ws_loop(State); @@ -610,7 +620,7 @@ ws_loop(State=#state{parent=Parent, owner=Owner, socket=Socket, case Protocol:send(Frame, ProtoState) of close -> Transport:close(Socket), - retry(State); + down(State, normal); ProtoState2 -> ws_loop(State#state{protocol_state=ProtoState2}) end; |