From 8d51ab4e04168223b660f353b033eb3cffcee696 Mon Sep 17 00:00:00 2001 From: Denys Knertser Date: Mon, 19 Jul 2021 17:00:38 +0200 Subject: Implement gun_raw:down/1, gun_raw:update_flow/4 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Loïc: I have reworded a couple things and reordered the tests. It would be great to also test these things over proxies. --- src/gun.erl | 9 +++++++++ src/gun_raw.erl | 48 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 49 insertions(+), 8 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index f5fad75..bd19b7c 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -141,6 +141,7 @@ http_opts => http_opts(), http2_opts => http2_opts(), protocols => protocols(), + raw_opts => raw_opts(), retry => non_neg_integer(), retry_fun => fun((non_neg_integer(), opts()) -> #{retries => non_neg_integer(), timeout => pos_integer()}), @@ -188,6 +189,7 @@ -export_type([tunnel_info/0]). -type raw_opts() :: #{ + flow => pos_integer(), %% Internal. tunnel_transport => tcp | tls }. @@ -369,6 +371,13 @@ check_options([Opt = {protocols, L}|Opts]) when is_list(L) -> ok -> check_options(Opts); error -> {error, {options, Opt}} end; +check_options([{raw_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> + case gun_raw:check_options(ProtoOpts) of + ok -> + check_options(Opts); + Error -> + Error + end; check_options([{retry, R}|Opts]) when is_integer(R), R >= 0 -> check_options(Opts); check_options([{retry_fun, F}|Opts]) when is_function(F, 2) -> diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 480d6bc..c5fbf45 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -20,21 +20,29 @@ -export([has_keepalive/0]). -export([init/4]). -export([handle/5]). +-export([update_flow/4]). -export([closing/4]). -export([close/4]). -export([data/7]). -%% @todo down +-export([down/1]). -record(raw_state, { ref :: undefined | gun:stream_ref(), reply_to :: pid(), socket :: inet:socket() | ssl:sslsocket(), - transport :: module() + transport :: module(), + flow :: integer() | infinity }). -%% @todo Reject ALL options. -check_options(_) -> - ok. +check_options(Opts) -> + do_check_options(maps:to_list(Opts)). + +do_check_options([]) -> + ok; +do_check_options([{flow, Flow}|Opts]) when is_integer(Flow); Flow == infinity -> + do_check_options(Opts); +do_check_options([Opt|_]) -> + {error, {options, {raw, Opt}}}. name() -> raw. opts_name() -> raw_opts. @@ -42,12 +50,32 @@ has_keepalive() -> false. init(ReplyTo, Socket, Transport, Opts) -> StreamRef = maps:get(stream_ref, Opts, undefined), - {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport}}. + InitialFlow = maps:get(flow, Opts, infinity), + {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, + socket=Socket, transport=Transport, flow=InitialFlow}}. -handle(Data, #raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) -> +handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo, flow=Flow0}, + CookieStore, _, EvHandlerState) -> %% When we take over the entire connection there is no stream reference. ReplyTo ! {gun_data, self(), StreamRef, nofin, Data}, - {[], CookieStore, EvHandlerState}. + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - 1 + end, + {[ + {state, State#raw_state{flow=Flow}}, + {active, Flow > 0} + ], CookieStore, EvHandlerState}. + +update_flow(State=#raw_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + [ + {state, State#raw_state{flow=Flow}}, + {active, Flow > 0} + ]. %% We can always close immediately. closing(_, _, _, EvHandlerState) -> @@ -63,3 +91,7 @@ data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef, ok -> {[], EvHandlerState}; Error={error, _} -> {Error, EvHandlerState} end. + +%% raw has no concept of streams. +down(_) -> + []. -- cgit v1.2.3