aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/manual/gun.asciidoc1
-rw-r--r--doc/src/manual/gun.cancel.asciidoc3
-rw-r--r--doc/src/manual/gun.stream_info.asciidoc63
-rw-r--r--src/gun.erl12
-rw-r--r--src/gun_http.erl16
-rw-r--r--src/gun_http2.erl13
-rw-r--r--test/gun_SUITE.erl60
7 files changed, 166 insertions, 2 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index 2726564..95ca2ad 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -47,6 +47,7 @@ Messages:
Streams:
* link:man:gun:cancel(3)[gun:cancel(3)] - Cancel the given stream
+* link:man:gun:stream_info(3)[gun:stream_info(3)] - Obtain information about a stream
Websocket:
diff --git a/doc/src/manual/gun.cancel.asciidoc b/doc/src/manual/gun.cancel.asciidoc
index bc2bab9..fd86eba 100644
--- a/doc/src/manual/gun.cancel.asciidoc
+++ b/doc/src/manual/gun.cancel.asciidoc
@@ -65,4 +65,5 @@ link:man:gun:patch(3)[gun:patch(3)],
link:man:gun:post(3)[gun:post(3)],
link:man:gun:put(3)[gun:put(3)],
link:man:gun:delete(3)[gun:delete(3)],
-link:man:gun:request(3)[gun:request(3)]
+link:man:gun:request(3)[gun:request(3)],
+link:man:gun:stream_info(3)[gun:stream_info(3)]
diff --git a/doc/src/manual/gun.stream_info.asciidoc b/doc/src/manual/gun.stream_info.asciidoc
new file mode 100644
index 0000000..8f38020
--- /dev/null
+++ b/doc/src/manual/gun.stream_info.asciidoc
@@ -0,0 +1,63 @@
+= gun:stream_info(3)
+
+== Name
+
+gun:stream_info - Obtain information about a stream
+
+== Description
+
+[source,erlang]
+----
+stream_info(ConnPid, StreamRef) -> {ok, undefined | Info} | {error, not_connected}
+
+ConnPid :: pid()
+StreamRef :: reference()
+Info :: #{
+ ref => reference(),
+ reply_to => pid(),
+ state => running | stopping
+}
+----
+
+Obtain information about a stream.
+
+== Arguments
+
+ConnPid::
+
+The pid of the Gun connection process.
+
+StreamRef::
+
+Identifier of the stream for the original request.
+
+== Return value
+
+A map is returned containing various informations about
+the stream.
+
+== Changelog
+
+* *2.0*: Function introduced.
+
+== Examples
+
+.Obtain information about a stream
+[source,erlang]
+----
+Info = gun:stream_info(ConnPid, StreamRef).
+----
+
+== See also
+
+link:man:gun(3)[gun(3)],
+link:man:gun:get(3)[gun:get(3)],
+link:man:gun:head(3)[gun:head(3)],
+link:man:gun:options(3)[gun:options(3)],
+link:man:gun:patch(3)[gun:patch(3)],
+link:man:gun:post(3)[gun:post(3)],
+link:man:gun:put(3)[gun:put(3)],
+link:man:gun:delete(3)[gun:delete(3)],
+link:man:gun:headers(3)[gun:headers(3)],
+link:man:gun:request(3)[gun:request(3)],
+link:man:gun:cancel(3)[gun:cancel(3)]
diff --git a/src/gun.erl b/src/gun.erl
index 75092f1..1fd7e37 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -78,8 +78,9 @@
%% Flushing gun messages.
-export([flush/1]).
-%% Cancelling a stream.
+%% Streams.
-export([cancel/2]).
+-export([stream_info/2]).
%% Websocket.
-export([ws_upgrade/2]).
@@ -643,6 +644,10 @@ flush_ref(StreamRef) ->
cancel(ServerPid, StreamRef) ->
gen_statem:cast(ServerPid, {cancel, self(), StreamRef}).
+-spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}.
+stream_info(ServerPid, StreamRef) ->
+ gen_statem:call(ServerPid, {stream_info, StreamRef}).
+
%% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2.
%% http2_upgrade
@@ -725,6 +730,8 @@ not_connected(_, {retries, Retries},
{keep_state, State,
{state_timeout, Timeout, {retries, Retries - 1}}}
end;
+not_connected({call, From}, {stream_info, _}, _) ->
+ {keep_state_and_data, {reply, From, {error, not_connected}}};
not_connected(Type, Event, State) ->
handle_common(Type, Event, ?FUNCTION_NAME, State).
@@ -843,6 +850,9 @@ connected(cast, {ws_send, ReplyTo, _}, _) ->
"Connection needs to be upgraded to Websocket "
"before the gun:ws_send/1 function can be used."}},
keep_state_and_data;
+connected({call, From}, {stream_info, StreamRef},
+ #state{protocol=Protocol, protocol_state=ProtoState}) ->
+ {keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}};
connected(Type, Event, State) ->
handle_common(Type, Event, ?FUNCTION_NAME, State).
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 376f431..719307c 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -25,6 +25,7 @@
-export([data/5]).
-export([connect/5]).
-export([cancel/3]).
+-export([stream_info/2]).
-export([down/1]).
-export([ws_upgrade/7]).
@@ -473,6 +474,21 @@ cancel(State, StreamRef, ReplyTo) ->
error_stream_not_found(State, StreamRef, ReplyTo)
end.
+stream_info(#http_state{streams=Streams}, StreamRef) ->
+ case lists:keyfind(StreamRef, #stream.ref, Streams) of
+ #stream{reply_to=ReplyTo, is_alive=IsAlive} ->
+ {ok, #{
+ ref => StreamRef,
+ reply_to => ReplyTo,
+ state => case IsAlive of
+ true -> running;
+ false -> stopping
+ end
+ }};
+ false ->
+ {ok, undefined}
+ end.
+
%% HTTP does not provide any way to figure out what streams are unprocessed.
down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 9159d78..8072a00 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -24,6 +24,7 @@
-export([request/9]).
-export([data/5]).
-export([cancel/3]).
+-export([stream_info/2]).
-export([down/1]).
-record(stream, {
@@ -371,6 +372,18 @@ cancel(State=#http2_state{socket=Socket, transport=Transport,
error_stream_not_found(State, StreamRef, ReplyTo)
end.
+stream_info(State, StreamRef) ->
+ case get_stream_by_ref(State, StreamRef) of
+ #stream{reply_to=ReplyTo} ->
+ {ok, #{
+ ref => StreamRef,
+ reply_to => ReplyTo,
+ state => running
+ }};
+ false ->
+ {ok, undefined}
+ end.
+
%% @todo Add unprocessed streams when GOAWAY handling is done.
down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index dd7b083..acaee64 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -52,6 +52,66 @@ atom_hostname(_) ->
[<<"host: localhost:", _/bits>>] = [L || <<"host: ", _/bits>> = L <- Lines],
gun:close(Pid).
+stream_info_http(_) ->
+ doc("Ensure the function gun:stream_info/2 works as expected for HTTP/1.1."),
+ {ok, _, OriginPort} = init_origin(tcp, http,
+ fun(_, ClientSocket, ClientTransport) ->
+ %% Give some time to detect the cancel.
+ timer:sleep(100),
+ %% Then terminate the stream.
+ ClientTransport:send(ClientSocket,
+ "HTTP/1.1 200 OK\r\n"
+ "content-length: 0\r\n"
+ "\r\n"
+ ),
+ timer:sleep(200)
+ end),
+ {ok, Pid} = gun:open("localhost", OriginPort),
+ {ok, http} = gun:await_up(Pid),
+ {ok, undefined} = gun:stream_info(Pid, make_ref()),
+ StreamRef = gun:get(Pid, "/"),
+ Self = self(),
+ {ok, #{
+ ref := StreamRef,
+ reply_to := Self,
+ state := running
+ }} = gun:stream_info(Pid, StreamRef),
+ gun:cancel(Pid, StreamRef),
+ {ok, #{
+ ref := StreamRef,
+ reply_to := Self,
+ state := stopping
+ }} = gun:stream_info(Pid, StreamRef),
+ %% Wait a little for the stream to terminate.
+ timer:sleep(200),
+ {ok, undefined} = gun:stream_info(Pid, StreamRef),
+ %% Wait a little more for the connection to terminate.
+ timer:sleep(200),
+ {error, not_connected} = gun:stream_info(Pid, StreamRef),
+ gun:close(Pid).
+
+stream_info_http2(_) ->
+ doc("Ensure the function gun:stream_info/2 works as expected for HTTP/2."),
+ {ok, _, OriginPort} = init_origin(tcp, http2,
+ fun(_, _, _) -> timer:sleep(100) end),
+ {ok, Pid} = gun:open("localhost", OriginPort, #{
+ protocols => [http2]
+ }),
+ {ok, http2} = gun:await_up(Pid),
+ {ok, undefined} = gun:stream_info(Pid, make_ref()),
+ StreamRef = gun:get(Pid, "/"),
+ Self = self(),
+ {ok, #{
+ ref := StreamRef,
+ reply_to := Self,
+ state := running
+ }} = gun:stream_info(Pid, StreamRef),
+ gun:cancel(Pid, StreamRef),
+ %% Wait a little for the connection to terminate.
+ timer:sleep(200),
+ {error, not_connected} = gun:stream_info(Pid, StreamRef),
+ gun:close(Pid).
+
connect_timeout(_) ->
doc("Ensure an integer value for connect_timeout is accepted."),
{ok, Pid} = gun:open("localhost", 12345, #{connect_timeout => 1000, retry => 0}),