diff options
-rw-r--r-- | src/gun.erl | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/src/gun.erl b/src/gun.erl index cea8006..7121324 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -41,6 +41,14 @@ %% Streaming data. -export([data/4]). +%% Awaiting gun messages. +-export([await/2]). +-export([await/3]). +-export([await/4]). +-export([await_body/2]). +-export([await_body/3]). +-export([await_body/4]). + %% Cancelling a stream. -export([cancel/2]). @@ -206,6 +214,75 @@ data(ServerPid, StreamRef, IsFin, Data) -> _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, ok. +%% Awaiting gun messages. + +await(ServerPid, StreamRef) -> + MRef = monitor(process, ServerPid), + Res = await(ServerPid, StreamRef, 5000, MRef), + demonitor(MRef, [flush]), + Res. + +await(ServerPid, StreamRef, MRef) when is_reference(MRef) -> + await(ServerPid, StreamRef, 5000, MRef); +await(ServerPid, StreamRef, Timeout) -> + MRef = monitor(process, ServerPid), + Res = await(ServerPid, StreamRef, Timeout, MRef), + demonitor(MRef, [flush]), + Res. + +await(ServerPid, StreamRef, Timeout, MRef) -> + receive + {gun_response, ServerPid, StreamRef, IsFin, Status, Headers} -> + {response, IsFin, Status, Headers}; + {gun_data, ServerPid, StreamRef, IsFin, Data} -> + {data, IsFin, Data}; + {gun_push, ServerPid, StreamRef, AssocToStreamRef, + Method, Host, Path, Headers} -> + {push, AssocToStreamRef, Method, Host, Path, Headers}; + {gun_error, ServerPid, StreamRef, Reason} -> + {error, Reason}; + {gun_error, ServerPid, Reason} -> + {error, Reason}; + {'DOWN', MRef, process, ServerPid, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + +await_body(ServerPid, StreamRef) -> + MRef = monitor(process, ServerPid), + Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>), + demonitor(MRef, [flush]), + Res. + +await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) -> + await_body(ServerPid, StreamRef, 5000, MRef, <<>>); +await_body(ServerPid, StreamRef, Timeout) -> + MRef = monitor(process, ServerPid), + Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>), + demonitor(MRef, [flush]), + Res. + +await_body(ServerPid, StreamRef, Timeout, MRef) -> + await_body(ServerPid, StreamRef, Timeout, MRef, <<>>). + +await_body(ServerPid, StreamRef, Timeout, MRef, Acc) -> + receive + {gun_data, ServerPid, StreamRef, nofin, Data} -> + await_body(ServerPid, StreamRef, Timeout, MRef, + << Acc/binary, Data/binary >>); + {gun_data, ServerPid, StreamRef, fin, Data} -> + {ok, << Acc/binary, Data/binary >>}; + {gun_error, ServerPid, StreamRef, Reason} -> + {error, Reason}; + {gun_error, ServerPid, Reason} -> + {error, Reason}; + {'DOWN', MRef, process, ServerPid, Reason} -> + {error, Reason} + after Timeout -> + {error, timeout} + end. + %% Cancelling a stream. -spec cancel(pid(), reference()) -> ok. |