diff options
author | Anders Svensson <[email protected]> | 2016-03-13 00:31:40 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2016-03-13 07:10:11 +0100 |
commit | e7b286c95531595daa26b09edffbf2f081c5455a (patch) | |
tree | feb42611018596986c1d2ee79f9d90c5b09a42c5 /lib | |
parent | 472a080ccf2f725e2f5277fa5feb76aaf9ce2e67 (diff) | |
download | otp-e7b286c95531595daa26b09edffbf2f081c5455a.tar.gz otp-e7b286c95531595daa26b09edffbf2f081c5455a.tar.bz2 otp-e7b286c95531595daa26b09edffbf2f081c5455a.zip |
Make throttling callbacks on message reception
The callback is now applied to the atom 'false' when asking if another
message should be received on the socket, and to a received binary
message after reception. Throttling on received messages makes it
possible to distinguish between requests and answers.
There is no callback on outgoing messages since these don't have to go
through the transport process, even if they currently do.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 56 |
1 files changed, 37 insertions, 19 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 51969a09c0..91e8d26d36 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -115,7 +115,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled = false :: boolean()}). %% stopped receiving? + throttled :: boolean() | binary()}). %% stopped receiving? + %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -219,7 +220,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) socket = Sock, ssl = SslOpts, timeout = Tmo, - throttle_cb = Throttle}); + throttle_cb = Throttle, + throttled = false /= Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -543,7 +545,7 @@ t(T,S) -> %% transition/2 %% Initial incoming message when we might need to upgrade to TLS: -%% don't request another message until we know. +%% don't receive another message until we know. transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, @@ -569,9 +571,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock, false = T, %% assert recv(Bin, S); -%% Check whether or not to read more after a throttle_cb timeout. +%% Make a new throttling callback after a timeout. transition(throttle, #transport{throttled = B} = S) -> - true = B, %% assert + true = false /= B, %% assert throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. @@ -673,17 +675,21 @@ tls(accept, Sock, Opts) -> %% Receive packets until a full message is received, then check %% whether to keep receiving. -recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) -> +recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of {Msg, B} -> - diameter_peer:recv(Pid, Msg), - throttle(S#transport{frag = B}); + throttle(S#transport{frag = B, throttled = Msg}); Frag -> setopts(S), start_fragment_timer(S#transport{frag = Frag, flush = false}) end. +%% recv/1 + +recv(S) -> + recv(<<>>, S). + %% rcv/2 %% No previous fragment. @@ -770,7 +776,7 @@ bin(Bin) %% No fragment to flush or not receiving messages. flush(#transport{frag = Frag, throttled = B} = S) when Frag == <<>>; - B -> + B /= false -> S; %% Messages have been received since last timer expiry. @@ -844,26 +850,38 @@ setopts(M, Sock) -> %% throttle/1 -throttle(#transport{throttle_cb = false} = S) -> - recv(<<>>, S); +throttle(#transport{throttled = false} = S) -> + recv(S); + +throttle(#transport{throttle_cb = F, throttled = B} = S) -> + throttle(cb(F, B), S). -throttle(#transport{throttle_cb = F} = S) -> - throttle(diameter_lib:eval(F), S). +%% cb/2 + +cb(false, _) -> + ok; + +cb(F, B) -> + diameter_lib:eval([F, true /= B andalso B]). %% throttle/2 -%% Don't ask for more packets as long as there are previously received -%% messages to extract. -throttle(ok, S) -> - recv(<<>>, S#transport{throttled = false}); +%% Callback says to receive another message. +throttle(ok, #transport{throttled = true} = S) -> + recv(S#transport{throttled = false}); + +%% Callback says to accept a received message. +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> + diameter_peer:recv(Pid, Msg), + throttle(S#transport{throttled = true}); throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); -%% Ask again after the specified number of milliseconds. +%% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), - S#transport{throttled = true}; + S; throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); |