diff options
author | Anders Svensson <[email protected]> | 2016-03-11 06:07:19 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2016-03-13 07:10:11 +0100 |
commit | eae5e8163f3b0784848ff0310ba3c8adca0bcb87 (patch) | |
tree | 36e445b4ba417e15fe96ae5e772a141850bde7d0 /lib/diameter/src | |
parent | 9298872b8771dff87e732237e36fddc81bfbcbde (diff) | |
download | otp-eae5e8163f3b0784848ff0310ba3c8adca0bcb87.tar.gz otp-eae5e8163f3b0784848ff0310ba3c8adca0bcb87.tar.bz2 otp-eae5e8163f3b0784848ff0310ba3c8adca0bcb87.zip |
Don't ask throttling callback to receive more unless needed
TCP packets can contain more than one message, so only ask to receive
another message if it hasn't already been received.
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 42 |
1 files changed, 33 insertions, 9 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index d22b0ff320..e317922e7e 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -859,7 +859,17 @@ throttle(#transport{throttled = false} = S) -> recv(S); throttle(#transport{throttle_cb = F, throttled = B} = S) -> - throttle(cb(F, B), S). + Res = cb(F, B), + + try throttle(Res, S) of + #transport{} = NS -> + throttle(defrag(NS)) + catch + #transport{throttled = false} = NS -> + recv(NS); + #transport{} = NS -> + NS + end. %% cb/2 @@ -873,12 +883,13 @@ cb(F, B) -> %% Callback says to receive another message. throttle(ok, #transport{throttled = true} = S) -> - recv(S#transport{throttled = false}); + throw(S#transport{throttled = false}); %% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) + when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), - throttle(S#transport{throttled = true}); + S; throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); @@ -891,7 +902,7 @@ throttle({ok = T, F}, S) -> throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> diameter_peer:recv(Pid, {Msg, NPid}), - throttle(S#transport{throttled = true}); + S; throttle({NPid, F}, #transport{throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> @@ -900,7 +911,7 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) %% Callback to accept a received message says to discard it. throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> - throttle(S#transport{throttled = true}); + S; throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> @@ -911,16 +922,16 @@ throttle({discard = T, F}, #transport{throttled = Msg} = S) throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), - throttle(S#transport{throttled = true}); + S; throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); %% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, #transport{} = S) -> +throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), - S; + throw(S); throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); @@ -928,6 +939,19 @@ throttle({timeout = T, Tmo, F}, S) -> throttle(T, #transport{throttle_cb = F}) -> ?ERROR({invalid_return, T, F}). +%% defrag/1 +%% +%% Try to extract another message from packets already read before +%% another throttling callback. + +defrag(#transport{frag = Head} = S) -> + case rcv(Head, <<>>) of + {Msg, B} -> + S#transport{throttled = Msg, frag = B}; + _ -> + S#transport{throttled = true} + end. + %% portnr/2 portnr(gen_tcp, Sock) -> |