diff options
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 38 |
1 files changed, 25 insertions, 13 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 080eff2055..132088b514 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -88,7 +88,8 @@ frag = <<>> :: frag(), %% message fragment ssl :: boolean() | [term()], %% ssl options timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout - flush = false :: boolean()}). %% flush fragment at timeout + tref = false :: false | reference(), %% fragment timer reference + flush = false :: boolean()}). %% flush fragment at timeout? %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -176,7 +177,6 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) M = if SslOpts -> ssl; true -> Mod end, setopts(M, Sock), putr(?REF_KEY, Ref), - infinity == Tmo orelse erlang:start_timer(Tmo, self(), flush), #transport{parent = Pid, module = M, socket = Sock, @@ -465,6 +465,7 @@ t(T,S) -> %% Initial incoming message when we might need to upgrade to TLS: %% don't request another message until we know. + transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, @@ -478,7 +479,7 @@ transition({tcp, Sock, Bin}, #transport{socket = Sock, S#transport{frag = B}; Frag -> setopts(M, Sock), - S#transport{frag = Frag} + start_fragment_timer(S#transport{frag = Frag}) end; %% Incoming message. @@ -489,7 +490,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock, when P == tcp, not B; P == ssl, B -> setopts(M, Sock), - recv(Bin, S); + start_fragment_timer(recv(Bin, S)); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -500,7 +501,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, setopts(M, Sock), - NS#transport{ssl = B}; + start_fragment_timer(NS#transport{ssl = B}); transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -533,9 +534,8 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid, stop; %% Timeout for reception of outstanding packets. -transition({timeout, _TRef, flush}, #transport{timeout = Tmo} = S) -> - erlang:start_timer(Tmo, self(), flush), - flush(S); +transition({timeout, TRef, flush}, #transport{tref = TRef} = S) -> + flush(S#transport{tref = false}); %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock, @@ -684,19 +684,31 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. +%% No fragment to flush. +flush(#transport{frag = <<>>} = S) -> + S; + %% Messages have been received since last timer expiry. flush(#transport{flush = false} = S) -> - S#transport{flush = true}; - -%% No fragment to flush. -flush(#transport{frag = <<>>}) -> - ok; + start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. flush(#transport{frag = Frag, parent = Pid} = S) -> diameter_peer:recv(Pid, bin(Frag)), S#transport{frag = <<>>}. +%% start_fragment_timer/1 +%% +%% Start a timer only if there's none running and a message to flush. + +start_fragment_timer(#transport{frag = B, tref = TRef} = S) + when B == <<>>; + TRef /= false -> + S; + +start_fragment_timer(#transport{timeout = Tmo} = S) -> + S#transport{tref = erlang:start_timer(Tmo, self(), flush)}. + %% accept/2 accept(ssl, LSock) -> |