aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gun.erl9
-rw-r--r--src/gun_raw.erl48
-rw-r--r--test/raw_SUITE.erl60
3 files changed, 103 insertions, 14 deletions
diff --git a/src/gun.erl b/src/gun.erl
index f5fad75..bd19b7c 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -141,6 +141,7 @@
http_opts => http_opts(),
http2_opts => http2_opts(),
protocols => protocols(),
+ raw_opts => raw_opts(),
retry => non_neg_integer(),
retry_fun => fun((non_neg_integer(), opts())
-> #{retries => non_neg_integer(), timeout => pos_integer()}),
@@ -188,6 +189,7 @@
-export_type([tunnel_info/0]).
-type raw_opts() :: #{
+ flow => pos_integer(),
%% Internal.
tunnel_transport => tcp | tls
}.
@@ -369,6 +371,13 @@ check_options([Opt = {protocols, L}|Opts]) when is_list(L) ->
ok -> check_options(Opts);
error -> {error, {options, Opt}}
end;
+check_options([{raw_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
+ case gun_raw:check_options(ProtoOpts) of
+ ok ->
+ check_options(Opts);
+ Error ->
+ Error
+ end;
check_options([{retry, R}|Opts]) when is_integer(R), R >= 0 ->
check_options(Opts);
check_options([{retry_fun, F}|Opts]) when is_function(F, 2) ->
diff --git a/src/gun_raw.erl b/src/gun_raw.erl
index 480d6bc..c5fbf45 100644
--- a/src/gun_raw.erl
+++ b/src/gun_raw.erl
@@ -20,21 +20,29 @@
-export([has_keepalive/0]).
-export([init/4]).
-export([handle/5]).
+-export([update_flow/4]).
-export([closing/4]).
-export([close/4]).
-export([data/7]).
-%% @todo down
+-export([down/1]).
-record(raw_state, {
ref :: undefined | gun:stream_ref(),
reply_to :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
- transport :: module()
+ transport :: module(),
+ flow :: integer() | infinity
}).
-%% @todo Reject ALL options.
-check_options(_) ->
- ok.
+check_options(Opts) ->
+ do_check_options(maps:to_list(Opts)).
+
+do_check_options([]) ->
+ ok;
+do_check_options([{flow, Flow}|Opts]) when is_integer(Flow); Flow == infinity ->
+ do_check_options(Opts);
+do_check_options([Opt|_]) ->
+ {error, {options, {raw, Opt}}}.
name() -> raw.
opts_name() -> raw_opts.
@@ -42,12 +50,32 @@ has_keepalive() -> false.
init(ReplyTo, Socket, Transport, Opts) ->
StreamRef = maps:get(stream_ref, Opts, undefined),
- {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport}}.
+ InitialFlow = maps:get(flow, Opts, infinity),
+ {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo,
+ socket=Socket, transport=Transport, flow=InitialFlow}}.
-handle(Data, #raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) ->
+handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo, flow=Flow0},
+ CookieStore, _, EvHandlerState) ->
%% When we take over the entire connection there is no stream reference.
ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
- {[], CookieStore, EvHandlerState}.
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - 1
+ end,
+ {[
+ {state, State#raw_state{flow=Flow}},
+ {active, Flow > 0}
+ ], CookieStore, EvHandlerState}.
+
+update_flow(State=#raw_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ [
+ {state, State#raw_state{flow=Flow}},
+ {active, Flow > 0}
+ ].
%% We can always close immediately.
closing(_, _, _, EvHandlerState) ->
@@ -63,3 +91,7 @@ data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef,
ok -> {[], EvHandlerState};
Error={error, _} -> {Error, EvHandlerState}
end.
+
+%% raw has no concept of streams.
+down(_) ->
+ [].
diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl
index 67fa542..0d6d270 100644
--- a/test/raw_SUITE.erl
+++ b/test/raw_SUITE.erl
@@ -30,24 +30,47 @@ groups() ->
direct_raw_tcp(_) ->
doc("Directly connect to a remote endpoint using the raw protocol over TCP."),
- do_direct_raw(tcp).
+ do_direct_raw(tcp, flow_control_disabled, client_side_close).
direct_raw_tls(_) ->
doc("Directly connect to a remote endpoint using the raw protocol over TLS."),
- do_direct_raw(tls).
+ do_direct_raw(tls, flow_control_disabled, client_side_close).
-do_direct_raw(OriginTransport) ->
+direct_raw_tcp_with_flow_control(_) ->
+ doc("Directly connect to a remote endpoint using the raw protocol over TCP "
+ "with flow control enabled."),
+ do_direct_raw(tcp, flow_control_enabled, client_side_close).
+
+direct_raw_tls_with_flow_control(_) ->
+ doc("Directly connect to a remote endpoint using the raw protocol over TLS "
+ "with flow control enabled."),
+ do_direct_raw(tls, flow_control_enabled, client_side_close).
+
+direct_raw_tcp_with_server_side_close(_) ->
+ doc("Directly connect to a remote endpoint using the raw protocol over TCP "
+ "with server-side close."),
+ do_direct_raw(tcp, flow_control_disabled, server_side_close).
+
+direct_raw_tls_with_server_side_close(_) ->
+ doc("Directly connect to a remote endpoint using the raw protocol over TLS "
+ "with server-side close."),
+ do_direct_raw(tls, flow_control_disabled, server_side_close).
+
+do_direct_raw(OriginTransport, FlowControl, CloseSide) ->
{ok, OriginPid, OriginPort} = init_origin(OriginTransport, raw, fun do_echo/3),
- {ok, ConnPid} = gun:open("localhost", OriginPort, #{
+ Opts0 = #{
transport => OriginTransport,
tls_opts => [{verify, verify_none}, {versions, ['tlsv1.2']}],
protocols => [raw]
- }),
+ },
+ Opts = do_maybe_add_flow(FlowControl, Opts0),
+ {ok, ConnPid} = gun:open("localhost", OriginPort, Opts),
{ok, raw} = gun:await_up(ConnPid),
handshake_completed = receive_from(OriginPid),
%% When we take over the entire connection there is no stream reference.
gun:data(ConnPid, undefined, nofin, <<"Hello world!">>),
{data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined),
+ do_flow_control(FlowControl, ConnPid),
#{
transport := OriginTransport,
protocol := raw,
@@ -56,7 +79,30 @@ do_direct_raw(OriginTransport) ->
origin_port := OriginPort,
intermediaries := []
} = gun:info(ConnPid),
- gun:close(ConnPid).
+ do_close(CloseSide, ConnPid).
+
+do_maybe_add_flow(flow_control_enabled, Opts) ->
+ Opts#{raw_opts => #{flow => 1}};
+do_maybe_add_flow(flow_control_disabled, Opts) ->
+ Opts.
+
+do_flow_control(flow_control_enabled, ConnPid) ->
+ gun:data(ConnPid, undefined, nofin, <<"Hello world!">>),
+ {error, timeout} = gun:await(ConnPid, undefined, 1000),
+ ok = gun:update_flow(ConnPid, undefined, 1),
+ {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined);
+do_flow_control(flow_control_disabled, _ConnPid) ->
+ ok.
+
+do_close(client_side_close, ConnPid) ->
+ gun:close(ConnPid);
+do_close(server_side_close, ConnPid) ->
+ gun:data(ConnPid, undefined, nofin, <<"close">>),
+ receive
+ {gun_down, ConnPid, raw, closed, []} -> ok
+ after
+ 1000 -> error(timeout)
+ end.
socks5_tcp_raw_tcp(_) ->
doc("Use Socks5 over TCP to connect to a remote endpoint using the raw protocol over TCP."),
@@ -335,6 +381,8 @@ do_http2_connect_raw(OriginTransport, ProxyScheme, ProxyTransport) ->
do_echo(Parent, ClientSocket, ClientTransport) ->
case ClientTransport:recv(ClientSocket, 0, 5000) of
+ {ok, <<"close">>} ->
+ ok = ClientTransport:close(ClientSocket);
{ok, Data} ->
ClientTransport:send(ClientSocket, Data),
do_echo(Parent, ClientSocket, ClientTransport);