aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl56
1 files changed, 37 insertions, 19 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 51969a09c0..91e8d26d36 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -115,7 +115,8 @@
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled = false :: boolean()}). %% stopped receiving?
+ throttled :: boolean() | binary()}). %% stopped receiving?
+
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -219,7 +220,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
socket = Sock,
ssl = SslOpts,
timeout = Tmo,
- throttle_cb = Throttle});
+ throttle_cb = Throttle,
+ throttled = false /= Throttle});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -543,7 +545,7 @@ t(T,S) ->
%% transition/2
%% Initial incoming message when we might need to upgrade to TLS:
-%% don't request another message until we know.
+%% don't receive another message until we know.
transition({tcp, Sock, Bin}, #transport{socket = Sock,
parent = Pid,
frag = Head,
@@ -569,9 +571,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock,
false = T, %% assert
recv(Bin, S);
-%% Check whether or not to read more after a throttle_cb timeout.
+%% Make a new throttling callback after a timeout.
transition(throttle, #transport{throttled = B} = S) ->
- true = B, %% assert
+ true = false /= B, %% assert
throttle(S);
%% Capabilties exchange has decided on whether or not to run over TLS.
@@ -673,17 +675,21 @@ tls(accept, Sock, Opts) ->
%% Receive packets until a full message is received, then check
%% whether to keep receiving.
-recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) ->
+recv(Bin, #transport{frag = Head, throttled = false} = S) ->
case rcv(Head, Bin) of
{Msg, B} ->
- diameter_peer:recv(Pid, Msg),
- throttle(S#transport{frag = B});
+ throttle(S#transport{frag = B, throttled = Msg});
Frag ->
setopts(S),
start_fragment_timer(S#transport{frag = Frag,
flush = false})
end.
+%% recv/1
+
+recv(S) ->
+ recv(<<>>, S).
+
%% rcv/2
%% No previous fragment.
@@ -770,7 +776,7 @@ bin(Bin)
%% No fragment to flush or not receiving messages.
flush(#transport{frag = Frag, throttled = B} = S)
when Frag == <<>>;
- B ->
+ B /= false ->
S;
%% Messages have been received since last timer expiry.
@@ -844,26 +850,38 @@ setopts(M, Sock) ->
%% throttle/1
-throttle(#transport{throttle_cb = false} = S) ->
- recv(<<>>, S);
+throttle(#transport{throttled = false} = S) ->
+ recv(S);
+
+throttle(#transport{throttle_cb = F, throttled = B} = S) ->
+ throttle(cb(F, B), S).
-throttle(#transport{throttle_cb = F} = S) ->
- throttle(diameter_lib:eval(F), S).
+%% cb/2
+
+cb(false, _) ->
+ ok;
+
+cb(F, B) ->
+ diameter_lib:eval([F, true /= B andalso B]).
%% throttle/2
-%% Don't ask for more packets as long as there are previously received
-%% messages to extract.
-throttle(ok, S) ->
- recv(<<>>, S#transport{throttled = false});
+%% Callback says to receive another message.
+throttle(ok, #transport{throttled = true} = S) ->
+ recv(S#transport{throttled = false});
+
+%% Callback says to accept a received message.
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S) ->
+ diameter_peer:recv(Pid, Msg),
+ throttle(S#transport{throttled = true});
throttle({ok = T, F}, S) ->
throttle(T, S#transport{throttle_cb = F});
-%% Ask again after the specified number of milliseconds.
+%% Callback says to ask again in the specified number of milliseconds.
throttle({timeout, Tmo}, #transport{} = S) ->
erlang:send_after(Tmo, self(), throttle),
- S#transport{throttled = true};
+ S;
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});