aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl42
1 files changed, 33 insertions, 9 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index d22b0ff320..e317922e7e 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -859,7 +859,17 @@ throttle(#transport{throttled = false} = S) ->
recv(S);
throttle(#transport{throttle_cb = F, throttled = B} = S) ->
- throttle(cb(F, B), S).
+ Res = cb(F, B),
+
+ try throttle(Res, S) of
+ #transport{} = NS ->
+ throttle(defrag(NS))
+ catch
+ #transport{throttled = false} = NS ->
+ recv(NS);
+ #transport{} = NS ->
+ NS
+ end.
%% cb/2
@@ -873,12 +883,13 @@ cb(F, B) ->
%% Callback says to receive another message.
throttle(ok, #transport{throttled = true} = S) ->
- recv(S#transport{throttled = false});
+ throw(S#transport{throttled = false});
%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S) ->
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
+ when is_binary(Msg) ->
diameter_peer:recv(Pid, Msg),
- throttle(S#transport{throttled = true});
+ S;
throttle({ok = T, F}, S) ->
throttle(T, S#transport{throttle_cb = F});
@@ -891,7 +902,7 @@ throttle({ok = T, F}, S) ->
throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
when is_pid(NPid), is_binary(Msg) ->
diameter_peer:recv(Pid, {Msg, NPid}),
- throttle(S#transport{throttled = true});
+ S;
throttle({NPid, F}, #transport{throttled = Msg} = S)
when is_pid(NPid), is_binary(Msg) ->
@@ -900,7 +911,7 @@ throttle({NPid, F}, #transport{throttled = Msg} = S)
%% Callback to accept a received message says to discard it.
throttle(discard, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
- throttle(S#transport{throttled = true});
+ S;
throttle({discard = T, F}, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
@@ -911,16 +922,16 @@ throttle({discard = T, F}, #transport{throttled = Msg} = S)
throttle(Bin, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
send(Bin, S),
- throttle(S#transport{throttled = true});
+ S;
throttle({Bin, F}, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
throttle(Bin, S#transport{throttle_cb = F});
%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, #transport{} = S) ->
+throttle({timeout, Tmo}, S) ->
erlang:send_after(Tmo, self(), throttle),
- S;
+ throw(S);
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});
@@ -928,6 +939,19 @@ throttle({timeout = T, Tmo, F}, S) ->
throttle(T, #transport{throttle_cb = F}) ->
?ERROR({invalid_return, T, F}).
+%% defrag/1
+%%
+%% Try to extract another message from packets already read before
+%% another throttling callback.
+
+defrag(#transport{frag = Head} = S) ->
+ case rcv(Head, <<>>) of
+ {Msg, B} ->
+ S#transport{throttled = Msg, frag = B};
+ _ ->
+ S#transport{throttled = true}
+ end.
+
%% portnr/2
portnr(gen_tcp, Sock) ->