From 8d51ab4e04168223b660f353b033eb3cffcee696 Mon Sep 17 00:00:00 2001 From: Denys Knertser Date: Mon, 19 Jul 2021 17:00:38 +0200 Subject: Implement gun_raw:down/1, gun_raw:update_flow/4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Loïc: I have reworded a couple things and reordered the tests. It would be great to also test these things over proxies. --- src/gun.erl | 9 ++++++++ src/gun_raw.erl | 48 +++++++++++++++++++++++++++++++++++-------- test/raw_SUITE.erl | 60 ++++++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 103 insertions(+), 14 deletions(-) diff --git a/src/gun.erl b/src/gun.erl index f5fad75..bd19b7c 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -141,6 +141,7 @@ http_opts => http_opts(), http2_opts => http2_opts(), protocols => protocols(), + raw_opts => raw_opts(), retry => non_neg_integer(), retry_fun => fun((non_neg_integer(), opts()) -> #{retries => non_neg_integer(), timeout => pos_integer()}), @@ -188,6 +189,7 @@ -export_type([tunnel_info/0]). -type raw_opts() :: #{ + flow => pos_integer(), %% Internal. tunnel_transport => tcp | tls }. @@ -369,6 +371,13 @@ check_options([Opt = {protocols, L}|Opts]) when is_list(L) -> ok -> check_options(Opts); error -> {error, {options, Opt}} end; +check_options([{raw_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> + case gun_raw:check_options(ProtoOpts) of + ok -> + check_options(Opts); + Error -> + Error + end; check_options([{retry, R}|Opts]) when is_integer(R), R >= 0 -> check_options(Opts); check_options([{retry_fun, F}|Opts]) when is_function(F, 2) -> diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 480d6bc..c5fbf45 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -20,21 +20,29 @@ -export([has_keepalive/0]). -export([init/4]). -export([handle/5]). +-export([update_flow/4]). -export([closing/4]). -export([close/4]). -export([data/7]). -%% @todo down +-export([down/1]). -record(raw_state, { ref :: undefined | gun:stream_ref(), reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), - transport :: module() + transport :: module(), + flow :: integer() | infinity }). -%% @todo Reject ALL options. -check_options(_) -> - ok. +check_options(Opts) -> + do_check_options(maps:to_list(Opts)). + +do_check_options([]) -> + ok; +do_check_options([{flow, Flow}|Opts]) when is_integer(Flow); Flow == infinity -> + do_check_options(Opts); +do_check_options([Opt|_]) -> + {error, {options, {raw, Opt}}}. name() -> raw. opts_name() -> raw_opts. @@ -42,12 +50,32 @@ has_keepalive() -> false. init(ReplyTo, Socket, Transport, Opts) -> StreamRef = maps:get(stream_ref, Opts, undefined), - {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport}}. + InitialFlow = maps:get(flow, Opts, infinity), + {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, + socket=Socket, transport=Transport, flow=InitialFlow}}. -handle(Data, #raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) -> +handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo, flow=Flow0}, + CookieStore, _, EvHandlerState) -> %% When we take over the entire connection there is no stream reference. ReplyTo ! {gun_data, self(), StreamRef, nofin, Data}, - {[], CookieStore, EvHandlerState}. + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - 1 + end, + {[ + {state, State#raw_state{flow=Flow}}, + {active, Flow > 0} + ], CookieStore, EvHandlerState}. + +update_flow(State=#raw_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + [ + {state, State#raw_state{flow=Flow}}, + {active, Flow > 0} + ]. %% We can always close immediately. closing(_, _, _, EvHandlerState) -> @@ -63,3 +91,7 @@ data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef, ok -> {[], EvHandlerState}; Error={error, _} -> {Error, EvHandlerState} end. + +%% raw has no concept of streams. +down(_) -> + []. diff --git a/test/raw_SUITE.erl b/test/raw_SUITE.erl index 67fa542..0d6d270 100644 --- a/test/raw_SUITE.erl +++ b/test/raw_SUITE.erl @@ -30,24 +30,47 @@ groups() -> direct_raw_tcp(_) -> doc("Directly connect to a remote endpoint using the raw protocol over TCP."), - do_direct_raw(tcp). + do_direct_raw(tcp, flow_control_disabled, client_side_close). direct_raw_tls(_) -> doc("Directly connect to a remote endpoint using the raw protocol over TLS."), - do_direct_raw(tls). + do_direct_raw(tls, flow_control_disabled, client_side_close). -do_direct_raw(OriginTransport) -> +direct_raw_tcp_with_flow_control(_) -> + doc("Directly connect to a remote endpoint using the raw protocol over TCP " + "with flow control enabled."), + do_direct_raw(tcp, flow_control_enabled, client_side_close). + +direct_raw_tls_with_flow_control(_) -> + doc("Directly connect to a remote endpoint using the raw protocol over TLS " + "with flow control enabled."), + do_direct_raw(tls, flow_control_enabled, client_side_close). + +direct_raw_tcp_with_server_side_close(_) -> + doc("Directly connect to a remote endpoint using the raw protocol over TCP " + "with server-side close."), + do_direct_raw(tcp, flow_control_disabled, server_side_close). + +direct_raw_tls_with_server_side_close(_) -> + doc("Directly connect to a remote endpoint using the raw protocol over TLS " + "with server-side close."), + do_direct_raw(tls, flow_control_disabled, server_side_close). + +do_direct_raw(OriginTransport, FlowControl, CloseSide) -> {ok, OriginPid, OriginPort} = init_origin(OriginTransport, raw, fun do_echo/3), - {ok, ConnPid} = gun:open("localhost", OriginPort, #{ + Opts0 = #{ transport => OriginTransport, tls_opts => [{verify, verify_none}, {versions, ['tlsv1.2']}], protocols => [raw] - }), + }, + Opts = do_maybe_add_flow(FlowControl, Opts0), + {ok, ConnPid} = gun:open("localhost", OriginPort, Opts), {ok, raw} = gun:await_up(ConnPid), handshake_completed = receive_from(OriginPid), %% When we take over the entire connection there is no stream reference. gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined), + do_flow_control(FlowControl, ConnPid), #{ transport := OriginTransport, protocol := raw, @@ -56,7 +79,30 @@ do_direct_raw(OriginTransport) -> origin_port := OriginPort, intermediaries := [] } = gun:info(ConnPid), - gun:close(ConnPid). + do_close(CloseSide, ConnPid). + +do_maybe_add_flow(flow_control_enabled, Opts) -> + Opts#{raw_opts => #{flow => 1}}; +do_maybe_add_flow(flow_control_disabled, Opts) -> + Opts. + +do_flow_control(flow_control_enabled, ConnPid) -> + gun:data(ConnPid, undefined, nofin, <<"Hello world!">>), + {error, timeout} = gun:await(ConnPid, undefined, 1000), + ok = gun:update_flow(ConnPid, undefined, 1), + {data, nofin, <<"Hello world!">>} = gun:await(ConnPid, undefined); +do_flow_control(flow_control_disabled, _ConnPid) -> + ok. + +do_close(client_side_close, ConnPid) -> + gun:close(ConnPid); +do_close(server_side_close, ConnPid) -> + gun:data(ConnPid, undefined, nofin, <<"close">>), + receive + {gun_down, ConnPid, raw, closed, []} -> ok + after + 1000 -> error(timeout) + end. socks5_tcp_raw_tcp(_) -> doc("Use Socks5 over TCP to connect to a remote endpoint using the raw protocol over TCP."), @@ -335,6 +381,8 @@ do_http2_connect_raw(OriginTransport, ProxyScheme, ProxyTransport) -> do_echo(Parent, ClientSocket, ClientTransport) -> case ClientTransport:recv(ClientSocket, 0, 5000) of + {ok, <<"close">>} -> + ok = ClientTransport:close(ClientSocket); {ok, Data} -> ClientTransport:send(ClientSocket, Data), do_echo(Parent, ClientSocket, ClientTransport); -- cgit v1.2.3