diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 119 |
1 files changed, 72 insertions, 47 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 596e582ab0..132088b514 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -52,7 +52,10 @@ -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -define(LISTENER_TIMEOUT, 30000). --define(FRAGMENT_TIMEOUT, 1000). +-define(DEFAULT_FRAGMENT_TIMEOUT, 1000). + +-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)). +-define(IS_TIMEOUT(N), (infinity == N orelse ?IS_UINT32(N))). %% cb_info passed to ssl. -define(TCP_CB(Mod), {Mod, tcp, tcp_closed, tcp_error}). @@ -72,7 +75,6 @@ {parent :: pid(), transport = self() :: pid()}). --type tref() :: reference(). %% timer reference -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size -type frag() :: {length(), size(), binary(), list(binary())} @@ -83,8 +85,11 @@ {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: binary() | {tref(), frag()}, %% message fragment - ssl :: boolean() | [term()]}). %% ssl options + frag = <<>> :: frag(), %% message fragment + ssl :: boolean() | [term()], %% ssl options + timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout + tref = false :: false | reference(), %% fragment timer reference + flush = false :: boolean()}). %% flush fragment at timeout? %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -161,7 +166,12 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% that does nothing but kill us with the parent until call %% returns. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - {SslOpts, Rest} = ssl(Opts), + {SslOpts, Rest0} = ssl(Opts), + {OwnOpts, Rest} = own(Rest0), + Tmo = proplists:get_value(fragment_timer, + OwnOpts, + ?DEFAULT_FRAGMENT_TIMEOUT), + ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, @@ -170,7 +180,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) #transport{parent = Pid, module = M, socket = Sock, - ssl = SslOpts}; + ssl = SslOpts, + timeout = Tmo}; %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -196,6 +207,10 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> erlang:monitor(process, APid), start_timer(#listener{socket = LSock}). +own(Opts) -> + {Own, Rest} = proplists:split(Opts, [fragment_timer]), + {lists:append(Own), Rest}. + ssl(Opts) -> {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), {ssl_opts(SslOpts), Rest}. @@ -450,6 +465,7 @@ t(T,S) -> %% Initial incoming message when we might need to upgrade to TLS: %% don't request another message until we know. + transition({tcp, Sock, Bin}, #transport{socket = Sock, parent = Pid, frag = Head, @@ -457,13 +473,13 @@ transition({tcp, Sock, Bin}, #transport{socket = Sock, ssl = Opts} = S) when is_list(Opts) -> - case recv1(Head, Bin) of + case rcv(Head, Bin) of {Msg, B} when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), S#transport{frag = B}; Frag -> setopts(M, Sock), - S#transport{frag = Frag} + start_fragment_timer(S#transport{frag = Frag}) end; %% Incoming message. @@ -474,7 +490,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock, when P == tcp, not B; P == ssl, B -> setopts(M, Sock), - recv(Bin, S); + start_fragment_timer(recv(Bin, S)); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -485,7 +501,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, setopts(M, Sock), - NS#transport{ssl = B}; + start_fragment_timer(NS#transport{ssl = B}); transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -518,8 +534,8 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid, stop; %% Timeout for reception of outstanding packets. -transition({timeout, TRef, flush}, S) -> - flush(TRef, S); +transition({timeout, TRef, flush}, #transport{tref = TRef} = S) -> + flush(S#transport{tref = false}); %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock, @@ -557,9 +573,7 @@ tls_handshake(Type, true, #transport{socket = Sock, = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), - is_reference(Ref) %% started in new code - andalso - (true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}})), + true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), S#transport{socket = SSock, module = ssl}; @@ -574,30 +588,25 @@ tls(accept, Sock, Opts) -> %% recv/2 %% -%% Reassemble fragmented messages and extract multple message sent +%% Reassemble fragmented messages and extract multiple message sent %% using Nagle. recv(Bin, #transport{parent = Pid, frag = Head} = S) -> - case recv1(Head, Bin) of + case rcv(Head, Bin) of {Msg, B} when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), recv(B, S#transport{frag = <<>>}); Frag -> - S#transport{frag = Frag} + S#transport{frag = Frag, + flush = false} end. -%% recv1/2 +%% rcv/2 %% No previous fragment. -recv1(<<>>, Bin) -> +rcv(<<>>, Bin) -> rcv(Bin); -recv1({TRef, Head}, Bin) -> - erlang:cancel_timer(TRef), - rcv(Head, Bin). - -%% rcv/2 - %% Not even the first four bytes of the header. rcv(Head, Bin) when is_binary(Head) -> @@ -612,22 +621,22 @@ rcv({Len, N, Head, Acc}, Bin) -> %% Extract a message for which we have all bytes. rcv(Len, N, Head, Acc) when Len =< N -> - rcv1(Len, bin(Head, Acc)); + recv1(Len, bin(Head, Acc)); %% Wait for more packets. rcv(Len, N, Head, Acc) -> - {start_timer(), {Len, N, Head, Acc}}. + {Len, N, Head, Acc}. -%% rcv/2 +%% rcv/1 %% Nothing left. rcv(<<>> = Bin) -> Bin; -%% Well, this isn't good. Chances are things will go south from here -%% but if we're lucky then the bytes we have extend to an intended -%% message boundary and we can recover by simply discarding them, -%% which is the result of receiving them. +%% The Message Length isn't even sufficient for a header. Chances are +%% things will go south from here but if we're lucky then the bytes we +%% have extend to an intended message boundary and we can recover by +%% simply receiving them. Make it so. rcv(<<_:1/binary, Len:24, _/binary>> = Bin) when Len < 20 -> {Bin, <<>>}; @@ -635,23 +644,23 @@ rcv(<<_:1/binary, Len:24, _/binary>> = Bin) %% Enough bytes to extract a message. rcv(<<_:1/binary, Len:24, _/binary>> = Bin) when Len =< size(Bin) -> - rcv1(Len, Bin); + recv1(Len, Bin); %% Or not: wait for more packets. rcv(<<_:1/binary, Len:24, _/binary>> = Head) -> - {start_timer(), {Len, size(Head), Head, []}}; + {Len, size(Head), Head, []}; %% Not even 4 bytes yet. rcv(Head) -> - {start_timer(), Head}. + Head. -%% rcv1/2 +%% recv1/2 -rcv1(Len, Bin) -> +recv1(Len, Bin) -> <<Msg:Len/binary, Rest/binary>> = Bin, {Msg, Rest}. -%% bin/[12] +%% bin/1-2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). @@ -662,7 +671,7 @@ bin(Bin) when is_binary(Bin) -> Bin. -%% start_timer/0 +%% flush/1 %% An erroneously large message length may leave us with a fragment %% that lingers if the peer doesn't have anything more to send. Start @@ -675,14 +684,30 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. -start_timer() -> - erlang:start_timer(?FRAGMENT_TIMEOUT, self(), flush). +%% No fragment to flush. +flush(#transport{frag = <<>>} = S) -> + S; -flush(TRef, #transport{parent = Pid, frag = {TRef, Head}} = S) -> - diameter_peer:recv(Pid, bin(Head)), - S#transport{frag = <<>>}; -flush(_, S) -> - S. +%% Messages have been received since last timer expiry. +flush(#transport{flush = false} = S) -> + start_fragment_timer(S#transport{flush = true}); + +%% No messages since last expiry. +flush(#transport{frag = Frag, parent = Pid} = S) -> + diameter_peer:recv(Pid, bin(Frag)), + S#transport{frag = <<>>}. + +%% start_fragment_timer/1 +%% +%% Start a timer only if there's none running and a message to flush. + +start_fragment_timer(#transport{frag = B, tref = TRef} = S) + when B == <<>>; + TRef /= false -> + S; + +start_fragment_timer(#transport{timeout = Tmo} = S) -> + S#transport{tref = erlang:start_timer(Tmo, self(), flush)}. %% accept/2 |