aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2016-03-13 00:31:40 +0100
committerAnders Svensson <[email protected]>2016-03-13 07:10:11 +0100
commite7b286c95531595daa26b09edffbf2f081c5455a (patch)
treefeb42611018596986c1d2ee79f9d90c5b09a42c5
parent472a080ccf2f725e2f5277fa5feb76aaf9ce2e67 (diff)
downloadotp-e7b286c95531595daa26b09edffbf2f081c5455a.tar.gz
otp-e7b286c95531595daa26b09edffbf2f081c5455a.tar.bz2
otp-e7b286c95531595daa26b09edffbf2f081c5455a.zip
Make throttling callbacks on message reception
The callback is now applied to the atom 'false' when asking if another message should be received on the socket, and to a received binary message after reception. Throttling on received messages makes it possible to distinguish between requests and answers. There is no callback on outgoing messages since these don't have to go through the transport process, even if they currently do.
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl56
1 files changed, 37 insertions, 19 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 51969a09c0..91e8d26d36 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -115,7 +115,8 @@
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled = false :: boolean()}). %% stopped receiving?
+ throttled :: boolean() | binary()}). %% stopped receiving?
+
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -219,7 +220,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
socket = Sock,
ssl = SslOpts,
timeout = Tmo,
- throttle_cb = Throttle});
+ throttle_cb = Throttle,
+ throttled = false /= Throttle});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -543,7 +545,7 @@ t(T,S) ->
%% transition/2
%% Initial incoming message when we might need to upgrade to TLS:
-%% don't request another message until we know.
+%% don't receive another message until we know.
transition({tcp, Sock, Bin}, #transport{socket = Sock,
parent = Pid,
frag = Head,
@@ -569,9 +571,9 @@ transition({P, Sock, Bin}, #transport{socket = Sock,
false = T, %% assert
recv(Bin, S);
-%% Check whether or not to read more after a throttle_cb timeout.
+%% Make a new throttling callback after a timeout.
transition(throttle, #transport{throttled = B} = S) ->
- true = B, %% assert
+ true = false /= B, %% assert
throttle(S);
%% Capabilties exchange has decided on whether or not to run over TLS.
@@ -673,17 +675,21 @@ tls(accept, Sock, Opts) ->
%% Receive packets until a full message is received, then check
%% whether to keep receiving.
-recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) ->
+recv(Bin, #transport{frag = Head, throttled = false} = S) ->
case rcv(Head, Bin) of
{Msg, B} ->
- diameter_peer:recv(Pid, Msg),
- throttle(S#transport{frag = B});
+ throttle(S#transport{frag = B, throttled = Msg});
Frag ->
setopts(S),
start_fragment_timer(S#transport{frag = Frag,
flush = false})
end.
+%% recv/1
+
+recv(S) ->
+ recv(<<>>, S).
+
%% rcv/2
%% No previous fragment.
@@ -770,7 +776,7 @@ bin(Bin)
%% No fragment to flush or not receiving messages.
flush(#transport{frag = Frag, throttled = B} = S)
when Frag == <<>>;
- B ->
+ B /= false ->
S;
%% Messages have been received since last timer expiry.
@@ -844,26 +850,38 @@ setopts(M, Sock) ->
%% throttle/1
-throttle(#transport{throttle_cb = false} = S) ->
- recv(<<>>, S);
+throttle(#transport{throttled = false} = S) ->
+ recv(S);
+
+throttle(#transport{throttle_cb = F, throttled = B} = S) ->
+ throttle(cb(F, B), S).
-throttle(#transport{throttle_cb = F} = S) ->
- throttle(diameter_lib:eval(F), S).
+%% cb/2
+
+cb(false, _) ->
+ ok;
+
+cb(F, B) ->
+ diameter_lib:eval([F, true /= B andalso B]).
%% throttle/2
-%% Don't ask for more packets as long as there are previously received
-%% messages to extract.
-throttle(ok, S) ->
- recv(<<>>, S#transport{throttled = false});
+%% Callback says to receive another message.
+throttle(ok, #transport{throttled = true} = S) ->
+ recv(S#transport{throttled = false});
+
+%% Callback says to accept a received message.
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S) ->
+ diameter_peer:recv(Pid, Msg),
+ throttle(S#transport{throttled = true});
throttle({ok = T, F}, S) ->
throttle(T, S#transport{throttle_cb = F});
-%% Ask again after the specified number of milliseconds.
+%% Callback says to ask again in the specified number of milliseconds.
throttle({timeout, Tmo}, #transport{} = S) ->
erlang:send_after(Tmo, self(), throttle),
- S#transport{throttled = true};
+ S;
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});