aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2014-03-21 10:50:36 +0100
committerLoïc Hoguin <[email protected]>2014-03-21 10:50:36 +0100
commita81d8cbbbd79b3883bb5531c67f964ac613a4cfd (patch)
treef9ac847aee186e65b64b0f45f34d9e6696ed1fec /src/gun.erl
parentfb6a0ae19a274a4f8cff8237aa86815e1c7dd5b8 (diff)
downloadgun-a81d8cbbbd79b3883bb5531c67f964ac613a4cfd.tar.gz
gun-a81d8cbbbd79b3883bb5531c67f964ac613a4cfd.tar.bz2
gun-a81d8cbbbd79b3883bb5531c67f964ac613a4cfd.zip
Add an interface to asynchronously wait for responses
The basic idea is that we do an async call and then optionally wait. If we already have a monitor for this connection, then we can reuse it across all await* calls, otherwise one is created automatically.
Diffstat (limited to 'src/gun.erl')
-rw-r--r--src/gun.erl77
1 files changed, 77 insertions, 0 deletions
diff --git a/src/gun.erl b/src/gun.erl
index cea8006..7121324 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -41,6 +41,14 @@
%% Streaming data.
-export([data/4]).
+%% Awaiting gun messages.
+-export([await/2]).
+-export([await/3]).
+-export([await/4]).
+-export([await_body/2]).
+-export([await_body/3]).
+-export([await_body/4]).
+
%% Cancelling a stream.
-export([cancel/2]).
@@ -206,6 +214,75 @@ data(ServerPid, StreamRef, IsFin, Data) ->
_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
ok.
+%% Awaiting gun messages.
+
+await(ServerPid, StreamRef) ->
+ MRef = monitor(process, ServerPid),
+ Res = await(ServerPid, StreamRef, 5000, MRef),
+ demonitor(MRef, [flush]),
+ Res.
+
+await(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
+ await(ServerPid, StreamRef, 5000, MRef);
+await(ServerPid, StreamRef, Timeout) ->
+ MRef = monitor(process, ServerPid),
+ Res = await(ServerPid, StreamRef, Timeout, MRef),
+ demonitor(MRef, [flush]),
+ Res.
+
+await(ServerPid, StreamRef, Timeout, MRef) ->
+ receive
+ {gun_response, ServerPid, StreamRef, IsFin, Status, Headers} ->
+ {response, IsFin, Status, Headers};
+ {gun_data, ServerPid, StreamRef, IsFin, Data} ->
+ {data, IsFin, Data};
+ {gun_push, ServerPid, StreamRef, AssocToStreamRef,
+ Method, Host, Path, Headers} ->
+ {push, AssocToStreamRef, Method, Host, Path, Headers};
+ {gun_error, ServerPid, StreamRef, Reason} ->
+ {error, Reason};
+ {gun_error, ServerPid, Reason} ->
+ {error, Reason};
+ {'DOWN', MRef, process, ServerPid, Reason} ->
+ {error, Reason}
+ after Timeout ->
+ {error, timeout}
+ end.
+
+await_body(ServerPid, StreamRef) ->
+ MRef = monitor(process, ServerPid),
+ Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>),
+ demonitor(MRef, [flush]),
+ Res.
+
+await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
+ await_body(ServerPid, StreamRef, 5000, MRef, <<>>);
+await_body(ServerPid, StreamRef, Timeout) ->
+ MRef = monitor(process, ServerPid),
+ Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>),
+ demonitor(MRef, [flush]),
+ Res.
+
+await_body(ServerPid, StreamRef, Timeout, MRef) ->
+ await_body(ServerPid, StreamRef, Timeout, MRef, <<>>).
+
+await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
+ receive
+ {gun_data, ServerPid, StreamRef, nofin, Data} ->
+ await_body(ServerPid, StreamRef, Timeout, MRef,
+ << Acc/binary, Data/binary >>);
+ {gun_data, ServerPid, StreamRef, fin, Data} ->
+ {ok, << Acc/binary, Data/binary >>};
+ {gun_error, ServerPid, StreamRef, Reason} ->
+ {error, Reason};
+ {gun_error, ServerPid, Reason} ->
+ {error, Reason};
+ {'DOWN', MRef, process, ServerPid, Reason} ->
+ {error, Reason}
+ after Timeout ->
+ {error, timeout}
+ end.
+
%% Cancelling a stream.
-spec cancel(pid(), reference()) -> ok.