From 35d9f47fbcff23395c256b7814ce6af1d85129a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 26 Aug 2020 17:19:29 +0200 Subject: Add a gun:stream_ref() type and fix Dialyzer --- src/gun.erl | 100 ++++++++++++++++++++++++++------------------------ src/gun_http.erl | 8 ++-- src/gun_http2.erl | 5 ++- src/gun_protocols.erl | 6 +++ src/gun_raw.erl | 2 +- src/gun_socks.erl | 2 +- 6 files changed, 68 insertions(+), 55 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index 919d481..02b7302 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -122,10 +122,16 @@ | {close, ws_close_code(), iodata()}. -export_type([ws_frame/0]). --type protocols() :: [http | http2 | raw | socks - | {http, http_opts()} | {http2, http2_opts()} | {raw, raw_opts()} | {socks, socks_opts()}]. +-type protocol() :: http | http2 | raw | socks + | {http, http_opts()} | {http2, http2_opts()} | {raw, raw_opts()} | {socks, socks_opts()}. +-export_type([protocol/0]). + +-type protocols() :: [protocol()]. -export_type([protocols/0]). +-type stream_ref() :: reference() | [reference()]. +-export_type([stream_ref/0]). + -type opts() :: #{ connect_timeout => timeout(), cookie_ignore_informational => boolean(), @@ -166,7 +172,7 @@ type := connect | socks5, host := inet:hostname() | inet:ip_address(), port := inet:port_number(), - transport := tcp | tls, + transport := tcp | tls | tls_proxy, protocol := http | socks }. @@ -197,7 +203,7 @@ -type req_opts() :: #{ flow => pos_integer(), reply_to => pid(), - tunnel => reference() | [reference()] + tunnel => stream_ref() }. -export_type([req_opts/0]). @@ -501,93 +507,93 @@ shutdown(ServerPid) -> %% Requests. --spec delete(pid(), iodata()) -> reference(). +-spec delete(pid(), iodata()) -> stream_ref(). delete(ServerPid, Path) -> request(ServerPid, <<"DELETE">>, Path, [], <<>>). --spec delete(pid(), iodata(), req_headers()) -> reference(). +-spec delete(pid(), iodata(), req_headers()) -> stream_ref(). delete(ServerPid, Path, Headers) -> request(ServerPid, <<"DELETE">>, Path, Headers, <<>>). --spec delete(pid(), iodata(), req_headers(), req_opts()) -> reference(). +-spec delete(pid(), iodata(), req_headers(), req_opts()) -> stream_ref(). delete(ServerPid, Path, Headers, ReqOpts) -> request(ServerPid, <<"DELETE">>, Path, Headers, <<>>, ReqOpts). --spec get(pid(), iodata()) -> reference(). +-spec get(pid(), iodata()) -> stream_ref(). get(ServerPid, Path) -> request(ServerPid, <<"GET">>, Path, [], <<>>). --spec get(pid(), iodata(), req_headers()) -> reference(). +-spec get(pid(), iodata(), req_headers()) -> stream_ref(). get(ServerPid, Path, Headers) -> request(ServerPid, <<"GET">>, Path, Headers, <<>>). --spec get(pid(), iodata(), req_headers(), req_opts()) -> reference(). +-spec get(pid(), iodata(), req_headers(), req_opts()) -> stream_ref(). get(ServerPid, Path, Headers, ReqOpts) -> request(ServerPid, <<"GET">>, Path, Headers, <<>>, ReqOpts). --spec head(pid(), iodata()) -> reference(). +-spec head(pid(), iodata()) -> stream_ref(). head(ServerPid, Path) -> request(ServerPid, <<"HEAD">>, Path, [], <<>>). --spec head(pid(), iodata(), req_headers()) -> reference(). +-spec head(pid(), iodata(), req_headers()) -> stream_ref(). head(ServerPid, Path, Headers) -> request(ServerPid, <<"HEAD">>, Path, Headers, <<>>). --spec head(pid(), iodata(), req_headers(), req_opts()) -> reference(). +-spec head(pid(), iodata(), req_headers(), req_opts()) -> stream_ref(). head(ServerPid, Path, Headers, ReqOpts) -> request(ServerPid, <<"HEAD">>, Path, Headers, <<>>, ReqOpts). --spec options(pid(), iodata()) -> reference(). +-spec options(pid(), iodata()) -> stream_ref(). options(ServerPid, Path) -> request(ServerPid, <<"OPTIONS">>, Path, [], <<>>). --spec options(pid(), iodata(), req_headers()) -> reference(). +-spec options(pid(), iodata(), req_headers()) -> stream_ref(). options(ServerPid, Path, Headers) -> request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>). --spec options(pid(), iodata(), req_headers(), req_opts()) -> reference(). +-spec options(pid(), iodata(), req_headers(), req_opts()) -> stream_ref(). options(ServerPid, Path, Headers, ReqOpts) -> request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>, ReqOpts). --spec patch(pid(), iodata(), req_headers()) -> reference(). +-spec patch(pid(), iodata(), req_headers()) -> stream_ref(). patch(ServerPid, Path, Headers) -> headers(ServerPid, <<"PATCH">>, Path, Headers). --spec patch(pid(), iodata(), req_headers(), iodata() | req_opts()) -> reference(). +-spec patch(pid(), iodata(), req_headers(), iodata() | req_opts()) -> stream_ref(). patch(ServerPid, Path, Headers, ReqOpts) when is_map(ReqOpts) -> headers(ServerPid, <<"PATCH">>, Path, Headers, ReqOpts); patch(ServerPid, Path, Headers, Body) -> request(ServerPid, <<"PATCH">>, Path, Headers, Body). --spec patch(pid(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). +-spec patch(pid(), iodata(), req_headers(), iodata(), req_opts()) -> stream_ref(). patch(ServerPid, Path, Headers, Body, ReqOpts) -> request(ServerPid, <<"PATCH">>, Path, Headers, Body, ReqOpts). --spec post(pid(), iodata(), req_headers()) -> reference(). +-spec post(pid(), iodata(), req_headers()) -> stream_ref(). post(ServerPid, Path, Headers) -> headers(ServerPid, <<"POST">>, Path, Headers). --spec post(pid(), iodata(), req_headers(), iodata() | req_opts()) -> reference(). +-spec post(pid(), iodata(), req_headers(), iodata() | req_opts()) -> stream_ref(). post(ServerPid, Path, Headers, ReqOpts) when is_map(ReqOpts) -> headers(ServerPid, <<"POST">>, Path, Headers, ReqOpts); post(ServerPid, Path, Headers, Body) -> request(ServerPid, <<"POST">>, Path, Headers, Body). --spec post(pid(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). +-spec post(pid(), iodata(), req_headers(), iodata(), req_opts()) -> stream_ref(). post(ServerPid, Path, Headers, Body, ReqOpts) -> request(ServerPid, <<"POST">>, Path, Headers, Body, ReqOpts). --spec put(pid(), iodata(), req_headers()) -> reference(). +-spec put(pid(), iodata(), req_headers()) -> stream_ref(). put(ServerPid, Path, Headers) -> headers(ServerPid, <<"PUT">>, Path, Headers). --spec put(pid(), iodata(), req_headers(), iodata() | req_opts()) -> reference(). +-spec put(pid(), iodata(), req_headers(), iodata() | req_opts()) -> stream_ref(). put(ServerPid, Path, Headers, ReqOpts) when is_map(ReqOpts) -> headers(ServerPid, <<"PUT">>, Path, Headers, ReqOpts); put(ServerPid, Path, Headers, Body) -> request(ServerPid, <<"PUT">>, Path, Headers, Body). --spec put(pid(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). +-spec put(pid(), iodata(), req_headers(), iodata(), req_opts()) -> stream_ref(). put(ServerPid, Path, Headers, Body, ReqOpts) -> request(ServerPid, <<"PUT">>, Path, Headers, Body, ReqOpts). @@ -595,11 +601,11 @@ put(ServerPid, Path, Headers, Body, ReqOpts) -> %% %% @todo Accept a TargetURI map as well as a normal Path. --spec headers(pid(), iodata(), iodata(), req_headers()) -> reference(). +-spec headers(pid(), iodata(), iodata(), req_headers()) -> stream_ref(). headers(ServerPid, Method, Path, Headers) -> headers(ServerPid, Method, Path, Headers, #{}). --spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> reference(). +-spec headers(pid(), iodata(), iodata(), req_headers(), req_opts()) -> stream_ref(). headers(ServerPid, Method, Path, Headers0, ReqOpts) -> Tunnel = get_tunnel(ReqOpts), StreamRef = make_stream_ref(Tunnel), @@ -609,11 +615,11 @@ headers(ServerPid, Method, Path, Headers0, ReqOpts) -> Method, Path, normalize_headers(Headers0), InitialFlow}), StreamRef. --spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> reference(). +-spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> stream_ref(). request(ServerPid, Method, Path, Headers, Body) -> request(ServerPid, Method, Path, Headers, Body, #{}). --spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> reference(). +-spec request(pid(), iodata(), iodata(), req_headers(), iodata(), req_opts()) -> stream_ref(). request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> Tunnel = get_tunnel(ReqOpts), StreamRef = make_stream_ref(Tunnel), @@ -646,7 +652,7 @@ normalize_headers(Headers) when is_map(Headers) -> %% Streaming data. --spec data(pid(), reference(), fin | nofin, iodata()) -> ok. +-spec data(pid(), stream_ref(), fin | nofin, iodata()) -> ok. data(ServerPid, StreamRef, IsFin, Data) -> case iolist_size(Data) of 0 when IsFin =:= nofin -> @@ -657,15 +663,15 @@ data(ServerPid, StreamRef, IsFin, Data) -> %% Tunneling. --spec connect(pid(), connect_destination()) -> reference(). +-spec connect(pid(), connect_destination()) -> stream_ref(). connect(ServerPid, Destination) -> connect(ServerPid, Destination, [], #{}). --spec connect(pid(), connect_destination(), req_headers()) -> reference(). +-spec connect(pid(), connect_destination(), req_headers()) -> stream_ref(). connect(ServerPid, Destination, Headers) -> connect(ServerPid, Destination, Headers, #{}). --spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> reference(). +-spec connect(pid(), connect_destination(), req_headers(), req_opts()) -> stream_ref(). connect(ServerPid, Destination, Headers, ReqOpts) -> Tunnel = get_tunnel(ReqOpts), StreamRef = make_stream_ref(Tunnel), @@ -683,19 +689,19 @@ connect(ServerPid, Destination, Headers, ReqOpts) -> | {data, fin | nofin, binary()} | {sse, cow_sse:event() | fin} | {trailers, resp_headers()} - | {push, reference(), binary(), binary(), resp_headers()} + | {push, stream_ref(), binary(), binary(), resp_headers()} | {upgrade, [binary()], resp_headers()} | {ws, ws_frame()} | {error, {stream_error | connection_error | down, any()} | timeout}. --spec await(pid(), reference()) -> await_result(). +-spec await(pid(), stream_ref()) -> await_result(). await(ServerPid, StreamRef) -> MRef = monitor(process, ServerPid), Res = await(ServerPid, StreamRef, 5000, MRef), demonitor(MRef, [flush]), Res. --spec await(pid(), reference(), timeout() | reference()) -> await_result(). +-spec await(pid(), stream_ref(), timeout() | reference()) -> await_result(). await(ServerPid, StreamRef, MRef) when is_reference(MRef) -> await(ServerPid, StreamRef, 5000, MRef); await(ServerPid, StreamRef, Timeout) -> @@ -704,7 +710,7 @@ await(ServerPid, StreamRef, Timeout) -> demonitor(MRef, [flush]), Res. --spec await(pid(), reference(), timeout(), reference()) -> await_result(). +-spec await(pid(), stream_ref(), timeout(), reference()) -> await_result(). await(ServerPid, StreamRef, Timeout, MRef) -> receive {gun_inform, ServerPid, StreamRef, Status, Headers} -> @@ -739,14 +745,14 @@ await(ServerPid, StreamRef, Timeout, MRef) -> | {ok, binary(), resp_headers()} | {error, {stream_error | connection_error | down, any()} | timeout}. --spec await_body(pid(), reference()) -> await_body_result(). +-spec await_body(pid(), stream_ref()) -> await_body_result(). await_body(ServerPid, StreamRef) -> MRef = monitor(process, ServerPid), Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>), demonitor(MRef, [flush]), Res. --spec await_body(pid(), reference(), timeout() | reference()) -> await_body_result(). +-spec await_body(pid(), stream_ref(), timeout() | reference()) -> await_body_result(). await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) -> await_body(ServerPid, StreamRef, 5000, MRef, <<>>); await_body(ServerPid, StreamRef, Timeout) -> @@ -755,7 +761,7 @@ await_body(ServerPid, StreamRef, Timeout) -> demonitor(MRef, [flush]), Res. --spec await_body(pid(), reference(), timeout(), reference()) -> await_body_result(). +-spec await_body(pid(), stream_ref(), timeout(), reference()) -> await_body_result(). await_body(ServerPid, StreamRef, Timeout, MRef) -> await_body(ServerPid, StreamRef, Timeout, MRef, <<>>). @@ -813,7 +819,7 @@ await_up(ServerPid, Timeout, MRef) -> {error, timeout} end. --spec flush(pid() | reference()) -> ok. +-spec flush(pid() | stream_ref()) -> ok. flush(ServerPid) when is_pid(ServerPid) -> flush_pid(ServerPid); flush(StreamRef) -> @@ -873,19 +879,19 @@ flush_ref(StreamRef) -> %% Flow control. --spec update_flow(pid(), reference(), pos_integer()) -> ok. +-spec update_flow(pid(), stream_ref(), pos_integer()) -> ok. update_flow(ServerPid, StreamRef, Flow) -> gen_statem:cast(ServerPid, {update_flow, self(), StreamRef, Flow}). %% Cancelling a stream. --spec cancel(pid(), reference()) -> ok. +-spec cancel(pid(), stream_ref()) -> ok. cancel(ServerPid, StreamRef) -> gen_statem:cast(ServerPid, {cancel, self(), StreamRef}). %% Information about a stream. --spec stream_info(pid(), reference()) -> {ok, map() | undefined} | {error, not_connected}. +-spec stream_info(pid(), stream_ref()) -> {ok, map() | undefined} | {error, not_connected}. stream_info(ServerPid, StreamRef) -> gen_statem:call(ServerPid, {stream_info, StreamRef}). @@ -894,17 +900,17 @@ stream_info(ServerPid, StreamRef) -> %% Websocket. --spec ws_upgrade(pid(), iodata()) -> reference(). +-spec ws_upgrade(pid(), iodata()) -> stream_ref(). ws_upgrade(ServerPid, Path) -> ws_upgrade(ServerPid, Path, []). --spec ws_upgrade(pid(), iodata(), req_headers()) -> reference(). +-spec ws_upgrade(pid(), iodata(), req_headers()) -> stream_ref(). ws_upgrade(ServerPid, Path, Headers) -> StreamRef = make_ref(), gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers}), StreamRef. --spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> reference(). +-spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref(). ws_upgrade(ServerPid, Path, Headers, Opts) -> ok = gun_ws:check_options(Opts), StreamRef = make_ref(), diff --git a/src/gun_http.erl b/src/gun_http.erl index 2536369..490f025 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -41,10 +41,10 @@ -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer. %% @todo Make that a record. --type connect_info() :: {connect, reference(), gun:connect_destination()}. +-type connect_info() :: {connect, gun:stream_ref(), gun:connect_destination()}. -record(websocket, { - ref :: reference(), + ref :: gun:stream_ref(), reply_to :: pid(), key :: binary(), extensions :: [binary()], @@ -52,7 +52,7 @@ }). -record(stream, { - ref :: reference() | connect_info() | #websocket{}, + ref :: gun:stream_ref() | connect_info() | #websocket{}, reply_to :: pid(), flow :: integer() | infinity, method :: binary(), @@ -75,7 +75,7 @@ %% Base stream ref, defined when the protocol runs %% inside an HTTP/2 CONNECT stream. - base_stream_ref = undefined :: undefined | reference() | [reference()], + base_stream_ref = undefined :: undefined | gun:stream_ref(), streams = [] :: [#stream{}], in = head :: io(), diff --git a/src/gun_http2.erl b/src/gun_http2.erl index d8a84b4..6863b84 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -41,6 +41,7 @@ id = undefined :: cow_http2:streamid(), %% Reference used by the user of Gun to refer to this stream. + %% This may be only a part of a stream_ref() for tunneled streams. ref :: reference(), %% Process to send messages to. @@ -72,7 +73,7 @@ %% Base stream ref, defined when the protocol runs %% inside an HTTP/2 CONNECT stream. - base_stream_ref = undefined :: undefined | reference() | [reference()], + base_stream_ref = undefined :: undefined | gun:stream_ref(), %% Current status of the connection. We use this to ensure we are %% not sending the GOAWAY frame more than once, and to validate @@ -1050,7 +1051,7 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, Authority = [Host, $:, integer_to_binary(Port)], PseudoHeaders = #{ method => <<"CONNECT">>, - authority => Authority + authority => iolist_to_binary(Authority) }, Headers1 = lists:keydelete(<<"host">>, 1, diff --git a/src/gun_protocols.erl b/src/gun_protocols.erl index e5c3c93..4209641 100644 --- a/src/gun_protocols.erl +++ b/src/gun_protocols.erl @@ -19,6 +19,8 @@ -export([handler_and_opts/2]). -export([negotiated/2]). +-spec add_stream_ref(Protocol, undefined | gun:stream_ref()) + -> Protocol when Protocol :: gun:protocol(). add_stream_ref(Protocol, undefined) -> Protocol; add_stream_ref({ProtocolName, ProtocolOpts}, StreamRef) -> @@ -26,6 +28,7 @@ add_stream_ref({ProtocolName, ProtocolOpts}, StreamRef) -> add_stream_ref(ProtocolName, StreamRef) -> {ProtocolName, #{stream_ref => StreamRef}}. +-spec handler(gun:protocol()) -> module(). handler(http) -> gun_http; handler({http, _}) -> gun_http; handler(http2) -> gun_http2; @@ -37,12 +40,15 @@ handler({socks, _}) -> gun_socks; handler(ws) -> gun_ws; handler({ws, _}) -> gun_ws. +-spec handler_and_opts(gun:protocol(), map()) -> {module(), map()}. handler_and_opts({ProtocolName, ProtocolOpts}, _) -> {handler(ProtocolName), ProtocolOpts}; handler_and_opts(ProtocolName, Opts) -> Protocol = handler(ProtocolName), {Protocol, maps:get(Protocol:opts_name(), Opts, #{})}. +-spec negotiated({ok, binary()} | {error, protocol_not_negotiated}, gun:protocols()) + -> http | http2 | raw | socks. negotiated({ok, <<"h2">>}, _) -> http2; negotiated({ok, <<"http/1.1">>}, _) -> http; negotiated({error, protocol_not_negotiated}, [Protocol]) -> Protocol; diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 840774a..8d0aa42 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -26,7 +26,7 @@ %% @todo down -record(raw_state, { - ref :: reference() | undefined, + ref :: undefined | gun:stream_ref(), reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module() diff --git a/src/gun_socks.erl b/src/gun_socks.erl index ce91f93..cf5c41c 100644 --- a/src/gun_socks.erl +++ b/src/gun_socks.erl @@ -26,7 +26,7 @@ %% @todo down -record(socks_state, { - ref :: undefined | reference(), %% @todo Need a proper stream_ref type. + ref :: undefined | gun:stream_ref(), reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), -- cgit v1.2.3