diff options
author | Loïc Hoguin <[email protected]> | 2019-01-09 18:12:51 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-01-09 18:12:51 +0100 |
commit | 6612610964cabadfcf408e4223a702555a3570cb (patch) | |
tree | 98598c40aa031aba05ddbe79d5858fc573bb7b5a | |
parent | ab4878838fafbd453b41d031c9224b2ee8d2d956 (diff) | |
download | gun-6612610964cabadfcf408e4223a702555a3570cb.tar.gz gun-6612610964cabadfcf408e4223a702555a3570cb.tar.bz2 gun-6612610964cabadfcf408e4223a702555a3570cb.zip |
Add function gun:stream_info/2
-rw-r--r-- | doc/src/manual/gun.asciidoc | 1 | ||||
-rw-r--r-- | doc/src/manual/gun.cancel.asciidoc | 3 | ||||
-rw-r--r-- | doc/src/manual/gun.stream_info.asciidoc | 63 | ||||
-rw-r--r-- | src/gun.erl | 12 | ||||
-rw-r--r-- | src/gun_http.erl | 16 | ||||
-rw-r--r-- | src/gun_http2.erl | 13 | ||||
-rw-r--r-- | test/gun_SUITE.erl | 60 |
7 files changed, 166 insertions, 2 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index 2726564..95ca2ad 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -47,6 +47,7 @@ Messages: Streams: * link:man:gun:cancel(3)[gun:cancel(3)] - Cancel the given stream +* link:man:gun:stream_info(3)[gun:stream_info(3)] - Obtain information about a stream Websocket: diff --git a/doc/src/manual/gun.cancel.asciidoc b/doc/src/manual/gun.cancel.asciidoc index bc2bab9..fd86eba 100644 --- a/doc/src/manual/gun.cancel.asciidoc +++ b/doc/src/manual/gun.cancel.asciidoc @@ -65,4 +65,5 @@ link:man:gun:patch(3)[gun:patch(3)], link:man:gun:post(3)[gun:post(3)], link:man:gun:put(3)[gun:put(3)], link:man:gun:delete(3)[gun:delete(3)], -link:man:gun:request(3)[gun:request(3)] +link:man:gun:request(3)[gun:request(3)], +link:man:gun:stream_info(3)[gun:stream_info(3)] diff --git a/doc/src/manual/gun.stream_info.asciidoc b/doc/src/manual/gun.stream_info.asciidoc new file mode 100644 index 0000000..8f38020 --- /dev/null +++ b/doc/src/manual/gun.stream_info.asciidoc @@ -0,0 +1,63 @@ += gun:stream_info(3) + +== Name + +gun:stream_info - Obtain information about a stream + +== Description + +[source,erlang] +---- +stream_info(ConnPid, StreamRef) -> {ok, undefined | Info} | {error, not_connected} + +ConnPid :: pid() +StreamRef :: reference() +Info :: #{ + ref => reference(), + reply_to => pid(), + state => running | stopping +} +---- + +Obtain information about a stream. + +== Arguments + +ConnPid:: + +The pid of the Gun connection process. + +StreamRef:: + +Identifier of the stream for the original request. + +== Return value + +A map is returned containing various informations about +the stream. + +== Changelog + +* *2.0*: Function introduced. + +== Examples + +.Obtain information about a stream +[source,erlang] +---- +Info = gun:stream_info(ConnPid, StreamRef). +---- + +== See also + +link:man:gun(3)[gun(3)], +link:man:gun:get(3)[gun:get(3)], +link:man:gun:head(3)[gun:head(3)], +link:man:gun:options(3)[gun:options(3)], +link:man:gun:patch(3)[gun:patch(3)], +link:man:gun:post(3)[gun:post(3)], +link:man:gun:put(3)[gun:put(3)], +link:man:gun:delete(3)[gun:delete(3)], +link:man:gun:headers(3)[gun:headers(3)], +link:man:gun:request(3)[gun:request(3)], +link:man:gun:cancel(3)[gun:cancel(3)] diff --git a/src/gun.erl b/src/gun.erl index 75092f1..1fd7e37 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -78,8 +78,9 @@ %% Flushing gun messages. -export([flush/1]). -%% Cancelling a stream. +%% Streams. -export([cancel/2]). +-export([stream_info/2]). %% Websocket. -export([ws_upgrade/2]). @@ -643,6 +644,10 @@ flush_ref(StreamRef) -> cancel(ServerPid, StreamRef) -> gen_statem:cast(ServerPid, {cancel, self(), StreamRef}). +-spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}. +stream_info(ServerPid, StreamRef) -> + gen_statem:call(ServerPid, {stream_info, StreamRef}). + %% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2. %% http2_upgrade @@ -725,6 +730,8 @@ not_connected(_, {retries, Retries}, {keep_state, State, {state_timeout, Timeout, {retries, Retries - 1}}} end; +not_connected({call, From}, {stream_info, _}, _) -> + {keep_state_and_data, {reply, From, {error, not_connected}}}; not_connected(Type, Event, State) -> handle_common(Type, Event, ?FUNCTION_NAME, State). @@ -843,6 +850,9 @@ connected(cast, {ws_send, ReplyTo, _}, _) -> "Connection needs to be upgraded to Websocket " "before the gun:ws_send/1 function can be used."}}, keep_state_and_data; +connected({call, From}, {stream_info, StreamRef}, + #state{protocol=Protocol, protocol_state=ProtoState}) -> + {keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}}; connected(Type, Event, State) -> handle_common(Type, Event, ?FUNCTION_NAME, State). diff --git a/src/gun_http.erl b/src/gun_http.erl index 376f431..719307c 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -25,6 +25,7 @@ -export([data/5]). -export([connect/5]). -export([cancel/3]). +-export([stream_info/2]). -export([down/1]). -export([ws_upgrade/7]). @@ -473,6 +474,21 @@ cancel(State, StreamRef, ReplyTo) -> error_stream_not_found(State, StreamRef, ReplyTo) end. +stream_info(#http_state{streams=Streams}, StreamRef) -> + case lists:keyfind(StreamRef, #stream.ref, Streams) of + #stream{reply_to=ReplyTo, is_alive=IsAlive} -> + {ok, #{ + ref => StreamRef, + reply_to => ReplyTo, + state => case IsAlive of + true -> running; + false -> stopping + end + }}; + false -> + {ok, undefined} + end. + %% HTTP does not provide any way to figure out what streams are unprocessed. down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 9159d78..8072a00 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -24,6 +24,7 @@ -export([request/9]). -export([data/5]). -export([cancel/3]). +-export([stream_info/2]). -export([down/1]). -record(stream, { @@ -371,6 +372,18 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, error_stream_not_found(State, StreamRef, ReplyTo) end. +stream_info(State, StreamRef) -> + case get_stream_by_ref(State, StreamRef) of + #stream{reply_to=ReplyTo} -> + {ok, #{ + ref => StreamRef, + reply_to => ReplyTo, + state => running + }}; + false -> + {ok, undefined} + end. + %% @todo Add unprocessed streams when GOAWAY handling is done. down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl index dd7b083..acaee64 100644 --- a/test/gun_SUITE.erl +++ b/test/gun_SUITE.erl @@ -52,6 +52,66 @@ atom_hostname(_) -> [<<"host: localhost:", _/bits>>] = [L || <<"host: ", _/bits>> = L <- Lines], gun:close(Pid). +stream_info_http(_) -> + doc("Ensure the function gun:stream_info/2 works as expected for HTTP/1.1."), + {ok, _, OriginPort} = init_origin(tcp, http, + fun(_, ClientSocket, ClientTransport) -> + %% Give some time to detect the cancel. + timer:sleep(100), + %% Then terminate the stream. + ClientTransport:send(ClientSocket, + "HTTP/1.1 200 OK\r\n" + "content-length: 0\r\n" + "\r\n" + ), + timer:sleep(200) + end), + {ok, Pid} = gun:open("localhost", OriginPort), + {ok, http} = gun:await_up(Pid), + {ok, undefined} = gun:stream_info(Pid, make_ref()), + StreamRef = gun:get(Pid, "/"), + Self = self(), + {ok, #{ + ref := StreamRef, + reply_to := Self, + state := running + }} = gun:stream_info(Pid, StreamRef), + gun:cancel(Pid, StreamRef), + {ok, #{ + ref := StreamRef, + reply_to := Self, + state := stopping + }} = gun:stream_info(Pid, StreamRef), + %% Wait a little for the stream to terminate. + timer:sleep(200), + {ok, undefined} = gun:stream_info(Pid, StreamRef), + %% Wait a little more for the connection to terminate. + timer:sleep(200), + {error, not_connected} = gun:stream_info(Pid, StreamRef), + gun:close(Pid). + +stream_info_http2(_) -> + doc("Ensure the function gun:stream_info/2 works as expected for HTTP/2."), + {ok, _, OriginPort} = init_origin(tcp, http2, + fun(_, _, _) -> timer:sleep(100) end), + {ok, Pid} = gun:open("localhost", OriginPort, #{ + protocols => [http2] + }), + {ok, http2} = gun:await_up(Pid), + {ok, undefined} = gun:stream_info(Pid, make_ref()), + StreamRef = gun:get(Pid, "/"), + Self = self(), + {ok, #{ + ref := StreamRef, + reply_to := Self, + state := running + }} = gun:stream_info(Pid, StreamRef), + gun:cancel(Pid, StreamRef), + %% Wait a little for the connection to terminate. + timer:sleep(200), + {error, not_connected} = gun:stream_info(Pid, StreamRef), + gun:close(Pid). + connect_timeout(_) -> doc("Ensure an integer value for connect_timeout is accepted."), {ok, Pid} = gun:open("localhost", 12345, #{connect_timeout => 1000, retry => 0}), |