diff options
author | Anders Svensson <[email protected]> | 2017-03-11 21:11:48 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-06-11 16:30:38 +0200 |
commit | ea339754d579b7e917dab0afa9a4694689f1f990 (patch) | |
tree | 3f26802ca88d4a8a009c5d660c68193c42e00641 /lib | |
parent | 9a90f20f365a90612ee5e5f7db14fd71b4875b84 (diff) | |
download | otp-ea339754d579b7e917dab0afa9a4694689f1f990.tar.gz otp-ea339754d579b7e917dab0afa9a4694689f1f990.tar.bz2 otp-ea339754d579b7e917dab0afa9a4694689f1f990.zip |
Strip throttling callbacks from diameter_tcp
Commits starting at 472a080c added a throttle_cb option to diameter_tcp
to let a callback apply backpressure when it decides that additional
requests should not be read. It didn't provide a hook for knowing that
an answer was sent however, which is needed when sends no longer take
place in the receiver process, and is more complicated than it should
be. Strip it all away, in preparation for a simpler incarnation.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 221 |
1 files changed, 51 insertions, 170 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index f745e2a6f7..c81701b624 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -19,7 +19,6 @@ %% -module(diameter_tcp). --dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -109,22 +108,19 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF} - | {throttle_cb, diameter:evaluable()}. + | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: frag(), %% message fragment ssl :: [term()] | boolean(), %% ssl options, ssl or not + frag = <<>> :: frag(), %% message fragment timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary(), %% stopped receiving? - monitor :: pid()}). + monitor :: pid()}). %% monitor/sender process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -213,30 +209,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) %% Since accept/connect might block indefinitely, spawn a process %% that kills us with the parent until call returns, and then %% sends outgoing messages. - {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, - fragment_timer, - throttle_cb]), + fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Throttle = proplists:get_value(throttle_cb, OwnOpts, false), + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), M = if SslOpts -> ssl; true -> Mod end, monitor(process, MPid), MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), - throttle(#transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo, - throttle_cb = Throttle, - throttled = false /= Throttle, - monitor = MPid}); + setopts(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -518,7 +510,7 @@ getr(Key) -> %% Transition monitor state. %% Outgoing message. -m(Bin, #monitor{} = S) +m(Bin, S) when is_binary(Bin) -> send(Bin, S), S; @@ -539,7 +531,7 @@ m({stop, TPid} = T, #monitor{transport = TPid}) -> %% Transport is telling us that TLS has been negotiated after %% capabilities exchange. -m({tls, SSock}, #monitor{} = S) -> +m({tls, SSock}, S) -> S#monitor{socket = SSock, module = ssl}; @@ -583,22 +575,14 @@ t(T,S) -> %% transition/2 -%% Incoming message. +%% Incoming packets. transition({P, Sock, Bin}, #transport{socket = Sock, - ssl = B, - throttled = T} + ssl = B} = S) when P == ssl, true == B; P == tcp -> - false = T, %% assert recv(Bin, S); -%% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = false}) -> - ok; -transition(throttle, S) -> - throttle(S); - %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> @@ -607,7 +591,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - throttle(NS#transport{ssl = B}); + NS#transport{ssl = B}; transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -623,8 +607,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, #transport{} = S) -> - send(Bin, S), +transition({diameter, {send, Msg}}, #transport{} = S) -> + send(Msg, S), ok; %% Request to close the transport connection. @@ -703,24 +687,16 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{frag = Head, throttled = false} = S) -> +recv(Bin, #transport{parent = Pid, frag = Head} = S) -> case rcv(Head, Bin) of - {Msg, B} -> - throttle(S#transport{frag = B, throttled = Msg}); - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag, - flush = false}) + {Msg, B} -> %% have a complete message ... + diameter_peer:recv(Pid, Msg), + recv(<<>>, S#transport{frag = B}); + Frag -> %% read more on the socket + start_fragment_timer(setopts(S#transport{frag = Frag, + flush = false})) end. -%% recv/1 - -recv(#transport{throttled = false} = S) -> - recv(<<>>, S); - -recv(#transport{} = S) -> - S. - %% rcv/2 %% No previous fragment. @@ -780,13 +756,16 @@ recv1(Len, Bin) -> <<Msg:Len/binary, Rest/binary>> = Bin, {Msg, Rest}. -%% bin/1-2 +%% bin/2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). +%% bin/1 + bin({_, _, Head, Acc}) -> bin(Head, Acc); + bin(Bin) when is_binary(Bin) -> Bin. @@ -805,9 +784,7 @@ bin(Bin) %% also eventually lead to watchdog failover. %% No fragment to flush or not receiving messages. -flush(#transport{frag = Frag, throttled = B} = S) - when Frag == <<>>; - B /= false -> +flush(#transport{frag = <<>>} = S) -> S; %% Messages have been received since last timer expiry. @@ -850,14 +827,11 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of - ok -> - ok; - {error, Reason} -> - x({send, Reason}) - end; +send(#diameter_packet{bin = Bin}, S) -> + send(Bin, S); + +send(Bin, #monitor{} = S) -> + send1(Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. @@ -865,6 +839,17 @@ send(Bin, #transport{monitor = MPid}) -> MPid ! Bin, MPid. +%% send1/2 + +send1(Bin, #monitor{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -885,116 +870,12 @@ setopts(M, Sock, Opts) -> %% setopts/1 -setopts(#transport{socket = Sock, module = M}) -> - setopts(M, Sock). - -%% setopts/2 - -setopts(M, Sock) -> +setopts(#transport{socket = Sock, + module = M} + = S)-> case setopts(M, Sock, [{active, once}]) of - ok -> ok; - X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect - end. - -%% throttle/1 - -%% Still collecting packets for a complete message: keep receiving. -throttle(#transport{throttled = false} = S) -> - recv(S); - -%% Decide whether to receive another, or whether to accept a message -%% that's been received. -throttle(#transport{throttle_cb = F, throttled = T} = S) -> - Res = cb(F, T), - - try throttle(Res, S) of - #transport{ssl = SB} = NS when is_boolean(SB) -> - throttle(defrag(NS)); - #transport{throttled = Msg} = NS when is_binary(Msg) -> - %% Initial incoming message when we might need to upgrade - %% to TLS: wait for reception of a tls tuple. - defrag(NS) - catch - #transport{} = NS -> - recv(NS) - end. - -%% cb/2 - -cb(false, _) -> - ok; - -cb(F, B) -> - diameter_lib:eval([F, true /= B andalso B]). - -%% throttle/2 - -%% Callback says to receive another message. -throttle(ok, #transport{throttled = true} = S) -> - throw(S#transport{throttled = false}); - -%% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) - when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S; - -throttle({ok = T, F}, S) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback says to accept a received message and acknowledged the -%% returned pid with a {request, Pid} message if a request pid is -%% spawned, a discard message otherwise. The latter does not mean that -%% the message was necessarily discarded: it could have been an -%% answer. -throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - diameter_peer:recv(Pid, {Msg, NPid}), - S; - -throttle({NPid, F}, #transport{throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - throttle(NPid, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to discard it. -throttle(discard, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - S; -throttle({discard = T, F}, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to answer it with the -%% supplied binary. -throttle(Bin, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - send(Bin, S), - S; -throttle({Bin, F}, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - throttle(Bin, S#transport{throttle_cb = F}); - -%% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, S) -> - erlang:send_after(Tmo, self(), throttle), - throw(S); -throttle({timeout = T, Tmo, F}, S) -> - throttle({T, Tmo}, S#transport{throttle_cb = F}); - -throttle(T, #transport{throttle_cb = F}) -> - ?ERROR({invalid_return, T, F}). - -%% defrag/1 -%% -%% Try to extract another message from packets already read before -%% another throttling callback. - -defrag(#transport{frag = Head} = S) -> - case rcv(Head, <<>>) of - {Msg, B} -> - S#transport{throttled = Msg, frag = B}; - _ -> - S#transport{throttled = true} + ok -> S; + X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect end. %% portnr/2 |