diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/doc/src/diameter_sctp.xml | 3 | ||||
-rw-r--r-- | lib/diameter/doc/src/diameter_tcp.xml | 17 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 97 |
3 files changed, 71 insertions, 46 deletions
diff --git a/lib/diameter/doc/src/diameter_sctp.xml b/lib/diameter/doc/src/diameter_sctp.xml index 5e3fd5eaf1..df140b16b9 100644 --- a/lib/diameter/doc/src/diameter_sctp.xml +++ b/lib/diameter/doc/src/diameter_sctp.xml @@ -15,7 +15,7 @@ <erlref> <header> <copyright> -<year>2011</year><year>2012</year> +<year>2011</year><year>2013</year> <holder>Ericsson AB. All Rights Reserved.</holder> </copyright> <legalnotice> @@ -81,7 +81,6 @@ and implements the behaviour documented in The start function required by &man_transport;.</p> <p> -The only diameter_sctp-specific argument is the options list. Options <c>raddr</c> and <c>rport</c> specify the remote address and port for a connecting transport and not valid for a listening transport: the former is required while latter defaults to 3868 if diff --git a/lib/diameter/doc/src/diameter_tcp.xml b/lib/diameter/doc/src/diameter_tcp.xml index fe2389d57d..01c781d553 100644 --- a/lib/diameter/doc/src/diameter_tcp.xml +++ b/lib/diameter/doc/src/diameter_tcp.xml @@ -93,7 +93,8 @@ before configuring TLS capability on diameter transports.</p> <v>Reason = term()</v> <v>OwnOpt = {raddr, &ip_address;} | {rport, integer()} - | {port, integer()}</v> + | {port, integer()} + | {fragment_timer, infinity | 0..16#FFFFFFFF}</v> <v>SslOpt = {ssl_options, true | list()}</v> <v>TcpOpt = term()</v> </type> @@ -103,7 +104,6 @@ before configuring TLS capability on diameter transports.</p> The start function required by &man_transport;.</p> <p> -The only diameter_tcp-specific argument is the options list. Options <c>raddr</c> and <c>rport</c> specify the remote address and port for a connecting transport and are not valid for a listening transport. @@ -112,7 +112,18 @@ that should support TLS: a value of <c>true</c> results in a TLS handshake immediately upon connection establishment while <c>list()</c> specifies options to be passed to &ssl_connect2; or &ssl_accept2; -after capabilities exchange if TLS is negotiated. +after capabilities exchange if TLS is negotiated.</p> + +<p> +Option <c>fragment_timer</c> specifies the timeout, in milliseconds, +of a timer used to flush messages from the incoming byte +stream even if the number of bytes indicated in the Message Length +field of its Diameter Header have not yet been accumulated: +such a message is received over the transport interface after +two successive timeouts without the reception of additional bytes. +Defaults to 1000.</p> + +<p> Remaining options are any accepted by &ssl_connect3; or &gen_tcp_connect3; for a connecting transport, or &ssl_listen2; or &gen_tcp_listen2; for diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8966a79c79..080eff2055 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -52,7 +52,10 @@ -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -define(LISTENER_TIMEOUT, 30000). --define(FRAGMENT_TIMEOUT, 1000). +-define(DEFAULT_FRAGMENT_TIMEOUT, 1000). + +-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)). +-define(IS_TIMEOUT(N), (infinity == N orelse ?IS_UINT32(N))). %% cb_info passed to ssl. -define(TCP_CB(Mod), {Mod, tcp, tcp_closed, tcp_error}). @@ -72,7 +75,6 @@ {parent :: pid(), transport = self() :: pid()}). --type tref() :: reference(). %% timer reference -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size -type frag() :: {length(), size(), binary(), list(binary())} @@ -83,8 +85,10 @@ {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: binary() | {tref(), frag()}, %% message fragment - ssl :: boolean() | [term()]}). %% ssl options + frag = <<>> :: frag(), %% message fragment + ssl :: boolean() | [term()], %% ssl options + timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout + flush = false :: boolean()}). %% flush fragment at timeout %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first %% (for simplicity) transport option. The transport_module diameter_etcp @@ -161,16 +165,23 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% that does nothing but kill us with the parent until call %% returns. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - {SslOpts, Rest} = ssl(Opts), + {SslOpts, Rest0} = ssl(Opts), + {OwnOpts, Rest} = own(Rest0), + Tmo = proplists:get_value(fragment_timer, + OwnOpts, + ?DEFAULT_FRAGMENT_TIMEOUT), + ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, setopts(M, Sock), putr(?REF_KEY, Ref), + infinity == Tmo orelse erlang:start_timer(Tmo, self(), flush), #transport{parent = Pid, module = M, socket = Sock, - ssl = SslOpts}; + ssl = SslOpts, + timeout = Tmo}; %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -196,6 +207,10 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> erlang:monitor(process, APid), start_timer(#listener{socket = LSock}). +own(Opts) -> + {Own, Rest} = proplists:split(Opts, [fragment_timer]), + {lists:append(Own), Rest}. + ssl(Opts) -> {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), {ssl_opts(SslOpts), Rest}. @@ -457,7 +472,7 @@ transition({tcp, Sock, Bin}, #transport{socket = Sock, ssl = Opts} = S) when is_list(Opts) -> - case recv1(Head, Bin) of + case rcv(Head, Bin) of {Msg, B} when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), S#transport{frag = B}; @@ -518,8 +533,9 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid, stop; %% Timeout for reception of outstanding packets. -transition({timeout, TRef, flush}, S) -> - flush(TRef, S); +transition({timeout, _TRef, flush}, #transport{timeout = Tmo} = S) -> + erlang:start_timer(Tmo, self(), flush), + flush(S); %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock, @@ -572,30 +588,25 @@ tls(accept, Sock, Opts) -> %% recv/2 %% -%% Reassemble fragmented messages and extract multple message sent +%% Reassemble fragmented messages and extract multiple message sent %% using Nagle. recv(Bin, #transport{parent = Pid, frag = Head} = S) -> - case recv1(Head, Bin) of + case rcv(Head, Bin) of {Msg, B} when is_binary(Msg) -> diameter_peer:recv(Pid, Msg), recv(B, S#transport{frag = <<>>}); Frag -> - S#transport{frag = Frag} + S#transport{frag = Frag, + flush = false} end. -%% recv1/2 +%% rcv/2 %% No previous fragment. -recv1(<<>>, Bin) -> +rcv(<<>>, Bin) -> rcv(Bin); -recv1({TRef, Head}, Bin) -> - erlang:cancel_timer(TRef), - rcv(Head, Bin). - -%% rcv/2 - %% Not even the first four bytes of the header. rcv(Head, Bin) when is_binary(Head) -> @@ -610,22 +621,22 @@ rcv({Len, N, Head, Acc}, Bin) -> %% Extract a message for which we have all bytes. rcv(Len, N, Head, Acc) when Len =< N -> - rcv1(Len, bin(Head, Acc)); + recv1(Len, bin(Head, Acc)); %% Wait for more packets. rcv(Len, N, Head, Acc) -> - {start_timer(), {Len, N, Head, Acc}}. + {Len, N, Head, Acc}. -%% rcv/2 +%% rcv/1 %% Nothing left. rcv(<<>> = Bin) -> Bin; -%% Well, this isn't good. Chances are things will go south from here -%% but if we're lucky then the bytes we have extend to an intended -%% message boundary and we can recover by simply discarding them, -%% which is the result of receiving them. +%% The Message Length isn't even sufficient for a header. Chances are +%% things will go south from here but if we're lucky then the bytes we +%% have extend to an intended message boundary and we can recover by +%% simply receiving them. Make it so. rcv(<<_:1/binary, Len:24, _/binary>> = Bin) when Len < 20 -> {Bin, <<>>}; @@ -633,23 +644,23 @@ rcv(<<_:1/binary, Len:24, _/binary>> = Bin) %% Enough bytes to extract a message. rcv(<<_:1/binary, Len:24, _/binary>> = Bin) when Len =< size(Bin) -> - rcv1(Len, Bin); + recv1(Len, Bin); %% Or not: wait for more packets. rcv(<<_:1/binary, Len:24, _/binary>> = Head) -> - {start_timer(), {Len, size(Head), Head, []}}; + {Len, size(Head), Head, []}; %% Not even 4 bytes yet. rcv(Head) -> - {start_timer(), Head}. + Head. -%% rcv1/2 +%% recv1/2 -rcv1(Len, Bin) -> +recv1(Len, Bin) -> <<Msg:Len/binary, Rest/binary>> = Bin, {Msg, Rest}. -%% bin/[12] +%% bin/1-2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). @@ -660,7 +671,7 @@ bin(Bin) when is_binary(Bin) -> Bin. -%% start_timer/0 +%% flush/1 %% An erroneously large message length may leave us with a fragment %% that lingers if the peer doesn't have anything more to send. Start @@ -673,14 +684,18 @@ bin(Bin) %% since all messages with length problems are discarded this should %% also eventually lead to watchdog failover. -start_timer() -> - erlang:start_timer(?FRAGMENT_TIMEOUT, self(), flush). +%% Messages have been received since last timer expiry. +flush(#transport{flush = false} = S) -> + S#transport{flush = true}; -flush(TRef, #transport{parent = Pid, frag = {TRef, Head}} = S) -> - diameter_peer:recv(Pid, bin(Head)), - S#transport{frag = <<>>}; -flush(_, S) -> - S. +%% No fragment to flush. +flush(#transport{frag = <<>>}) -> + ok; + +%% No messages since last expiry. +flush(#transport{frag = Frag, parent = Pid} = S) -> + diameter_peer:recv(Pid, bin(Frag)), + S#transport{frag = <<>>}. %% accept/2 |