diff options
author | Loïc Hoguin <[email protected]> | 2014-03-21 10:50:36 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2014-03-21 10:50:36 +0100 |
commit | a81d8cbbbd79b3883bb5531c67f964ac613a4cfd (patch) | |
tree | f9ac847aee186e65b64b0f45f34d9e6696ed1fec | |
parent | fb6a0ae19a274a4f8cff8237aa86815e1c7dd5b8 (diff) | |
download | gun-a81d8cbbbd79b3883bb5531c67f964ac613a4cfd.tar.gz gun-a81d8cbbbd79b3883bb5531c67f964ac613a4cfd.tar.bz2 gun-a81d8cbbbd79b3883bb5531c67f964ac613a4cfd.zip |
Add an interface to asynchronously wait for responses
The basic idea is that we do an async call and then optionally wait.
If we already have a monitor for this connection, then we can reuse
it across all await* calls, otherwise one is created automatically.
-rw-r--r-- | src/gun.erl | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/src/gun.erl b/src/gun.erl index cea8006..7121324 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -41,6 +41,14 @@ %% Streaming data. -export([data/4]). +%% Awaiting gun messages. +-export([await/2]). +-export([await/3]). +-export([await/4]). +-export([await_body/2]). +-export([await_body/3]). +-export([await_body/4]). + %% Cancelling a stream. -export([cancel/2]). @@ -206,6 +214,75 @@ data(ServerPid, StreamRef, IsFin, Data) -> _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, ok. +%% Awaiting gun messages. + +await(ServerPid, StreamRef) -> + MRef = monitor(process, ServerPid), + Res = await(ServerPid, StreamRef, 5000, MRef), + demonitor(MRef, [flush]), + Res. + +await(ServerPid, StreamRef, MRef) when is_reference(MRef) -> + await(ServerPid, StreamRef, 5000, MRef); +await(ServerPid, StreamRef, Timeout) -> + MRef = monitor(process, ServerPid), + Res = await(ServerPid, StreamRef, Timeout, MRef), + demonitor(MRef, [flush]), + Res. + +await(ServerPid, StreamRef, Timeout, MRef) -> + receive + {gun_response, ServerPid, StreamRef, IsFin, Status, Headers} -> + {response, IsFin, Status, Headers}; + {gun_data, ServerPid, StreamRef, IsFin, Data} -> + {data, IsFin, Data}; + {gun_push, ServerPid, StreamRef, AssocToStreamRef, + Method, Host, Path, Headers} -> + {push, AssocToStreamRef, Method, Host, Path, Headers}; + {gun_error, ServerPid, StreamRef, Reason} -> + {error, Reason}; + {gun_error, ServerPid, Reason} -> + {error, Reason}; + {'DOWN', MRef, process, ServerPid, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +await_body(ServerPid, StreamRef) -> + MRef = monitor(process, ServerPid), + Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>), + demonitor(MRef, [flush]), + Res. + +await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) -> + await_body(ServerPid, StreamRef, 5000, MRef, <<>>); +await_body(ServerPid, StreamRef, Timeout) -> + MRef = monitor(process, ServerPid), + Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>), + demonitor(MRef, [flush]), + Res. + +await_body(ServerPid, StreamRef, Timeout, MRef) -> + await_body(ServerPid, StreamRef, Timeout, MRef, <<>>). + +await_body(ServerPid, StreamRef, Timeout, MRef, Acc) -> + receive + {gun_data, ServerPid, StreamRef, nofin, Data} -> + await_body(ServerPid, StreamRef, Timeout, MRef, + << Acc/binary, Data/binary >>); + {gun_data, ServerPid, StreamRef, fin, Data} -> + {ok, << Acc/binary, Data/binary >>}; + {gun_error, ServerPid, StreamRef, Reason} -> + {error, Reason}; + {gun_error, ServerPid, Reason} -> + {error, Reason}; + {'DOWN', MRef, process, ServerPid, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + %% Cancelling a stream. -spec cancel(pid(), reference()) -> ok. |