diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 56 |
1 files changed, 37 insertions, 19 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 51969a09c0..91e8d26d36 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -115,7 +115,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled = false :: boolean()}). %% stopped receiving? + throttled :: boolean() | binary()}). %% stopped receiving? + %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -219,7 +220,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) socket = Sock, ssl = SslOpts, timeout = Tmo, - throttle_cb = Throttle}); + throttle_cb = Throttle, + throttled = false /= Throttle}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -543,7 +545,7 @@ t(T,S) -> %% transition/2 %% Initial incoming message when we might need to upgrade to TLS: -%% don't request another message until we know. +%% don't receive another message until we know. transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, @@ -569,9 +571,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock, false = T, %% assert recv(Bin, S); -%% Check whether or not to read more after a throttle_cb timeout. +%% Make a new throttling callback after a timeout. transition(throttle, #transport{throttled = B} = S) -> - true = B, %% assert + true = false /= B, %% assert throttle(S); %% Capabilties exchange has decided on whether or not to run over TLS. @@ -673,17 +675,21 @@ tls(accept, Sock, Opts) -> %% Receive packets until a full message is received, then check %% whether to keep receiving. -recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) -> +recv(Bin, #transport{frag = Head, throttled = false} = S) -> case rcv(Head, Bin) of {Msg, B} -> - diameter_peer:recv(Pid, Msg), - throttle(S#transport{frag = B}); + throttle(S#transport{frag = B, throttled = Msg}); Frag -> setopts(S), start_fragment_timer(S#transport{frag = Frag, flush = false}) end. +%% recv/1 + +recv(S) -> + recv(<<>>, S). + %% rcv/2 %% No previous fragment. @@ -770,7 +776,7 @@ bin(Bin) %% No fragment to flush or not receiving messages. flush(#transport{frag = Frag, throttled = B} = S) when Frag == <<>>; - B -> + B /= false -> S; %% Messages have been received since last timer expiry. @@ -844,26 +850,38 @@ setopts(M, Sock) -> %% throttle/1 -throttle(#transport{throttle_cb = false} = S) -> - recv(<<>>, S); +throttle(#transport{throttled = false} = S) -> + recv(S); + +throttle(#transport{throttle_cb = F, throttled = B} = S) -> + throttle(cb(F, B), S). -throttle(#transport{throttle_cb = F} = S) -> - throttle(diameter_lib:eval(F), S). +%% cb/2 + +cb(false, _) -> + ok; + +cb(F, B) -> + diameter_lib:eval([F, true /= B andalso B]). %% throttle/2 -%% Don't ask for more packets as long as there are previously received -%% messages to extract. -throttle(ok, S) -> - recv(<<>>, S#transport{throttled = false}); +%% Callback says to receive another message. +throttle(ok, #transport{throttled = true} = S) -> + recv(S#transport{throttled = false}); + +%% Callback says to accept a received message. +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> + diameter_peer:recv(Pid, Msg), + throttle(S#transport{throttled = true}); throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); -%% Ask again after the specified number of milliseconds. +%% Callback says to ask again in the specified number of milliseconds. throttle({timeout, Tmo}, #transport{} = S) -> erlang:send_after(Tmo, self(), throttle), - S#transport{throttled = true}; + S; throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); |