aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2016-03-11 06:07:19 +0100
committerAnders Svensson <[email protected]>2016-03-13 07:10:11 +0100
commiteae5e8163f3b0784848ff0310ba3c8adca0bcb87 (patch)
tree36e445b4ba417e15fe96ae5e772a141850bde7d0
parent9298872b8771dff87e732237e36fddc81bfbcbde (diff)
downloadotp-eae5e8163f3b0784848ff0310ba3c8adca0bcb87.tar.gz
otp-eae5e8163f3b0784848ff0310ba3c8adca0bcb87.tar.bz2
otp-eae5e8163f3b0784848ff0310ba3c8adca0bcb87.zip
Don't ask throttling callback to receive more unless needed
TCP packets can contain more than one message, so only ask to receive another message if it hasn't already been received.
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl42
1 files changed, 33 insertions, 9 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index d22b0ff320..e317922e7e 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -859,7 +859,17 @@ throttle(#transport{throttled = false} = S) ->
recv(S);
throttle(#transport{throttle_cb = F, throttled = B} = S) ->
- throttle(cb(F, B), S).
+ Res = cb(F, B),
+
+ try throttle(Res, S) of
+ #transport{} = NS ->
+ throttle(defrag(NS))
+ catch
+ #transport{throttled = false} = NS ->
+ recv(NS);
+ #transport{} = NS ->
+ NS
+ end.
%% cb/2
@@ -873,12 +883,13 @@ cb(F, B) ->
%% Callback says to receive another message.
throttle(ok, #transport{throttled = true} = S) ->
- recv(S#transport{throttled = false});
+ throw(S#transport{throttled = false});
%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S) ->
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
+ when is_binary(Msg) ->
diameter_peer:recv(Pid, Msg),
- throttle(S#transport{throttled = true});
+ S;
throttle({ok = T, F}, S) ->
throttle(T, S#transport{throttle_cb = F});
@@ -891,7 +902,7 @@ throttle({ok = T, F}, S) ->
throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
when is_pid(NPid), is_binary(Msg) ->
diameter_peer:recv(Pid, {Msg, NPid}),
- throttle(S#transport{throttled = true});
+ S;
throttle({NPid, F}, #transport{throttled = Msg} = S)
when is_pid(NPid), is_binary(Msg) ->
@@ -900,7 +911,7 @@ throttle({NPid, F}, #transport{throttled = Msg} = S)
%% Callback to accept a received message says to discard it.
throttle(discard, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
- throttle(S#transport{throttled = true});
+ S;
throttle({discard = T, F}, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
@@ -911,16 +922,16 @@ throttle({discard = T, F}, #transport{throttled = Msg} = S)
throttle(Bin, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
send(Bin, S),
- throttle(S#transport{throttled = true});
+ S;
throttle({Bin, F}, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
throttle(Bin, S#transport{throttle_cb = F});
%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, #transport{} = S) ->
+throttle({timeout, Tmo}, S) ->
erlang:send_after(Tmo, self(), throttle),
- S;
+ throw(S);
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});
@@ -928,6 +939,19 @@ throttle({timeout = T, Tmo, F}, S) ->
throttle(T, #transport{throttle_cb = F}) ->
?ERROR({invalid_return, T, F}).
+%% defrag/1
+%%
+%% Try to extract another message from packets already read before
+%% another throttling callback.
+
+defrag(#transport{frag = Head} = S) ->
+ case rcv(Head, <<>>) of
+ {Msg, B} ->
+ S#transport{throttled = Msg, frag = B};
+ _ ->
+ S#transport{throttled = true}
+ end.
+
%% portnr/2
portnr(gen_tcp, Sock) ->