From a81d8cbbbd79b3883bb5531c67f964ac613a4cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 21 Mar 2014 10:50:36 +0100 Subject: Add an interface to asynchronously wait for responses The basic idea is that we do an async call and then optionally wait. If we already have a monitor for this connection, then we can reuse it across all await* calls, otherwise one is created automatically. --- src/gun.erl | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) (limited to 'src/gun.erl') diff --git a/src/gun.erl b/src/gun.erl index cea8006..7121324 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -41,6 +41,14 @@ %% Streaming data. -export([data/4]). +%% Awaiting gun messages. +-export([await/2]). +-export([await/3]). +-export([await/4]). +-export([await_body/2]). +-export([await_body/3]). +-export([await_body/4]). + %% Cancelling a stream. -export([cancel/2]). @@ -206,6 +214,75 @@ data(ServerPid, StreamRef, IsFin, Data) -> _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, ok. +%% Awaiting gun messages. + +await(ServerPid, StreamRef) -> + MRef = monitor(process, ServerPid), + Res = await(ServerPid, StreamRef, 5000, MRef), + demonitor(MRef, [flush]), + Res. + +await(ServerPid, StreamRef, MRef) when is_reference(MRef) -> + await(ServerPid, StreamRef, 5000, MRef); +await(ServerPid, StreamRef, Timeout) -> + MRef = monitor(process, ServerPid), + Res = await(ServerPid, StreamRef, Timeout, MRef), + demonitor(MRef, [flush]), + Res. + +await(ServerPid, StreamRef, Timeout, MRef) -> + receive + {gun_response, ServerPid, StreamRef, IsFin, Status, Headers} -> + {response, IsFin, Status, Headers}; + {gun_data, ServerPid, StreamRef, IsFin, Data} -> + {data, IsFin, Data}; + {gun_push, ServerPid, StreamRef, AssocToStreamRef, + Method, Host, Path, Headers} -> + {push, AssocToStreamRef, Method, Host, Path, Headers}; + {gun_error, ServerPid, StreamRef, Reason} -> + {error, Reason}; + {gun_error, ServerPid, Reason} -> + {error, Reason}; + {'DOWN', MRef, process, ServerPid, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +await_body(ServerPid, StreamRef) -> + MRef = monitor(process, ServerPid), + Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>), + demonitor(MRef, [flush]), + Res. + +await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) -> + await_body(ServerPid, StreamRef, 5000, MRef, <<>>); +await_body(ServerPid, StreamRef, Timeout) -> + MRef = monitor(process, ServerPid), + Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>), + demonitor(MRef, [flush]), + Res. + +await_body(ServerPid, StreamRef, Timeout, MRef) -> + await_body(ServerPid, StreamRef, Timeout, MRef, <<>>). + +await_body(ServerPid, StreamRef, Timeout, MRef, Acc) -> + receive + {gun_data, ServerPid, StreamRef, nofin, Data} -> + await_body(ServerPid, StreamRef, Timeout, MRef, + << Acc/binary, Data/binary >>); + {gun_data, ServerPid, StreamRef, fin, Data} -> + {ok, << Acc/binary, Data/binary >>}; + {gun_error, ServerPid, StreamRef, Reason} -> + {error, Reason}; + {gun_error, ServerPid, Reason} -> + {error, Reason}; + {'DOWN', MRef, process, ServerPid, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + %% Cancelling a stream. -spec cancel(pid(), reference()) -> ok. -- cgit v1.2.3