diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 42 |
1 files changed, 33 insertions, 9 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index d22b0ff320..e317922e7e 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -859,7 +859,17 @@ throttle(#transport{throttled = false} = S) -> recv(S); throttle(#transport{throttle_cb = F, throttled = B} = S) -> - throttle(cb(F, B), S). + Res = cb(F, B), + + try throttle(Res, S) of + #transport{} = NS -> + throttle(defrag(NS)) + catch + #transport{throttled = false} = NS -> + recv(NS); + #transport{} = NS -> + NS + end. %% cb/2 @@ -873,12 +883,13 @@ cb(F, B) -> %% Callback says to receive another message. throttle(ok, #transport{throttled = true} = S) -> - recv(S#transport{throttled = false}); + throw(S#transport{throttled = false}); %% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) -> +throttle(ok, #transport{parent = Pid, throttled = Msg} = S) + when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), - throttle(S#transport{throttled = true}); + S; throttle({ok = T, F}, S) -> throttle(T, S#transport{throttle_cb = F}); @@ -891,7 +902,7 @@ throttle({ok = T, F}, S) -> throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> diameter_peer:recv(Pid, {Msg, NPid}), - throttle(S#transport{throttled = true}); + S; throttle({NPid, F}, #transport{throttled = Msg} = S) when is_pid(NPid), is_binary(Msg) -> @@ -900,7 +911,7 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) %% Callback to accept a received message says to discard it. throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> - throttle(S#transport{throttled = true}); + S; throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> @@ -911,16 +922,16 @@ throttle({discard = T, F}, #transport{throttled = Msg} = S) throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), - throttle(S#transport{throttled = true}); + S; throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); %% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, #transport{} = S) -> +throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), - S; + throw(S); throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); @@ -928,6 +939,19 @@ throttle({timeout = T, Tmo, F}, S) -> throttle(T, #transport{throttle_cb = F}) -> ?ERROR({invalid_return, T, F}). +%% defrag/1 +%% +%% Try to extract another message from packets already read before +%% another throttling callback. + +defrag(#transport{frag = Head} = S) -> + case rcv(Head, <<>>) of + {Msg, B} -> + S#transport{throttled = Msg, frag = B}; + _ -> + S#transport{throttled = true} + end. + %% portnr/2 portnr(gen_tcp, Sock) -> |