From e7b286c95531595daa26b09edffbf2f081c5455a Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 13 Mar 2016 00:31:40 +0100 Subject: Make throttling callbacks on message reception The callback is now applied to the atom 'false' when asking if another message should be received on the socket, and to a received binary message after reception. Throttling on received messages makes it possible to distinguish between requests and answers. There is no callback on outgoing messages since these don't have to go through the transport process, even if they currently do. --- lib/diameter/src/transport/diameter_tcp.erl | 56 +++++++++++++++++++---------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 51969a09c0..91e8d26d36 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -115,7 +115,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled = false :: boolean()}). %% stopped receiving? + throttled :: boolean() | binary()}). %% stopped receiving? + %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -219,7 +220,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) socket = Sock, ssl = SslOpts, timeout = Tmo, - throttle_cb = Throttle}); + throttle_cb = Throttle, + throttled = false /= Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -543,7 +545,7 @@ t(T,S) -> %% transition/2 %% Initial incoming message when we might need to upgrade to TLS: -%% don't request another message until we know. +%% don't receive another message until we know. transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, @@ -569,9 +571,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock, false = T, %% assert recv(Bin, S); -%% Check whether or not to read more after a throttle_cb timeout. +%% Make a new throttling callback after a timeout. transition(throttle, #transport{throttled = B} = S) -> - true = B, %% assert + true = false /= B, %% assert throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. @@ -673,17 +675,21 @@ tls(accept, Sock, Opts) -> %% Receive packets until a full message is received, then check %% whether to keep receiving. -recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) -> +recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of {Msg, B} -> - diameter_peer:recv(Pid, Msg), - throttle(S#transport{frag = B}); + throttle(S#transport{frag = B, throttled = Msg}); Frag -> setopts(S), start_fragment_timer(S#transport{frag = Frag, flush = false}) end. +%% recv/1 + +recv(S) -> + recv(<<>>, S). + %% rcv/2 %% No previous fragment. @@ -770,7 +776,7 @@ bin(Bin) %% No fragment to flush or not receiving messages. flush(#transport{frag = Frag, throttled = B} = S) when Frag == <<>>; - B -> + B /= false -> S; %% Messages have been received since last timer expiry. @@ -844,26 +850,38 @@ setopts(M, Sock) -> %% throttle/1 -throttle(#transport{throttle_cb = false} = S) -> - recv(<<>>, S); +throttle(#transport{throttled = false} = S) -> + recv(S); + +throttle(#transport{throttle_cb = F, throttled = B} = S) -> + throttle(cb(F, B), S). -throttle(#transport{throttle_cb = F} = S) -> - throttle(diameter_lib:eval(F), S). +%% cb/2 + +cb(false, _) -> + ok; + +cb(F, B) -> + diameter_lib:eval([F, true /= B andalso B]). %% throttle/2 -%% Don't ask for more packets as long as there are previously received -%% messages to extract. -throttle(ok, S) -> - recv(<<>>, S#transport{throttled = false}); +%% Callback says to receive another message. +throttle(ok, #transport{throttled = true} = S) -> + recv(S#transport{throttled = false}); + +%% Callback says to accept a received message. +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> + diameter_peer:recv(Pid, Msg), + throttle(S#transport{throttled = true}); throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); -%% Ask again after the specified number of milliseconds. +%% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), - S#transport{throttled = true}; + S; throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); -- cgit v1.2.3