aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl97
1 files changed, 56 insertions, 41 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 8966a79c79..080eff2055 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -52,7 +52,10 @@
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-define(LISTENER_TIMEOUT, 30000).
--define(FRAGMENT_TIMEOUT, 1000).
+-define(DEFAULT_FRAGMENT_TIMEOUT, 1000).
+
+-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)).
+-define(IS_TIMEOUT(N), (infinity == N orelse ?IS_UINT32(N))).
%% cb_info passed to ssl.
-define(TCP_CB(Mod), {Mod, tcp, tcp_closed, tcp_error}).
@@ -72,7 +75,6 @@
{parent :: pid(),
transport = self() :: pid()}).
--type tref() :: reference(). %% timer reference
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
-type frag() :: {length(), size(), binary(), list(binary())}
@@ -83,8 +85,10 @@
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: binary() | {tref(), frag()}, %% message fragment
- ssl :: boolean() | [term()]}). %% ssl options
+ frag = <<>> :: frag(), %% message fragment
+ ssl :: boolean() | [term()], %% ssl options
+ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
+ flush = false :: boolean()}). %% flush fragment at timeout
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -161,16 +165,23 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% that does nothing but kill us with the parent until call
%% returns.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
- {SslOpts, Rest} = ssl(Opts),
+ {SslOpts, Rest0} = ssl(Opts),
+ {OwnOpts, Rest} = own(Rest0),
+ Tmo = proplists:get_value(fragment_timer,
+ OwnOpts,
+ ?DEFAULT_FRAGMENT_TIMEOUT),
+ ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
Sock = i(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
setopts(M, Sock),
putr(?REF_KEY, Ref),
+ infinity == Tmo orelse erlang:start_timer(Tmo, self(), flush),
#transport{parent = Pid,
module = M,
socket = Sock,
- ssl = SslOpts};
+ ssl = SslOpts,
+ timeout = Tmo};
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -196,6 +207,10 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
erlang:monitor(process, APid),
start_timer(#listener{socket = LSock}).
+own(Opts) ->
+ {Own, Rest} = proplists:split(Opts, [fragment_timer]),
+ {lists:append(Own), Rest}.
+
ssl(Opts) ->
{[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
{ssl_opts(SslOpts), Rest}.
@@ -457,7 +472,7 @@ transition({tcp, Sock, Bin}, #transport{socket = Sock,
ssl = Opts}
= S)
when is_list(Opts) ->
- case recv1(Head, Bin) of
+ case rcv(Head, Bin) of
{Msg, B} when is_binary(Msg) ->
diameter_peer:recv(Pid, Msg),
S#transport{frag = B};
@@ -518,8 +533,9 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid,
stop;
%% Timeout for reception of outstanding packets.
-transition({timeout, TRef, flush}, S) ->
- flush(TRef, S);
+transition({timeout, _TRef, flush}, #transport{timeout = Tmo} = S) ->
+ erlang:start_timer(Tmo, self(), flush),
+ flush(S);
%% Request for the local port number.
transition({resolve_port, Pid}, #transport{socket = Sock,
@@ -572,30 +588,25 @@ tls(accept, Sock, Opts) ->
%% recv/2
%%
-%% Reassemble fragmented messages and extract multple message sent
+%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.
recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
- case recv1(Head, Bin) of
+ case rcv(Head, Bin) of
{Msg, B} when is_binary(Msg) ->
diameter_peer:recv(Pid, Msg),
recv(B, S#transport{frag = <<>>});
Frag ->
- S#transport{frag = Frag}
+ S#transport{frag = Frag,
+ flush = false}
end.
-%% recv1/2
+%% rcv/2
%% No previous fragment.
-recv1(<<>>, Bin) ->
+rcv(<<>>, Bin) ->
rcv(Bin);
-recv1({TRef, Head}, Bin) ->
- erlang:cancel_timer(TRef),
- rcv(Head, Bin).
-
-%% rcv/2
-
%% Not even the first four bytes of the header.
rcv(Head, Bin)
when is_binary(Head) ->
@@ -610,22 +621,22 @@ rcv({Len, N, Head, Acc}, Bin) ->
%% Extract a message for which we have all bytes.
rcv(Len, N, Head, Acc)
when Len =< N ->
- rcv1(Len, bin(Head, Acc));
+ recv1(Len, bin(Head, Acc));
%% Wait for more packets.
rcv(Len, N, Head, Acc) ->
- {start_timer(), {Len, N, Head, Acc}}.
+ {Len, N, Head, Acc}.
-%% rcv/2
+%% rcv/1
%% Nothing left.
rcv(<<>> = Bin) ->
Bin;
-%% Well, this isn't good. Chances are things will go south from here
-%% but if we're lucky then the bytes we have extend to an intended
-%% message boundary and we can recover by simply discarding them,
-%% which is the result of receiving them.
+%% The Message Length isn't even sufficient for a header. Chances are
+%% things will go south from here but if we're lucky then the bytes we
+%% have extend to an intended message boundary and we can recover by
+%% simply receiving them. Make it so.
rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
when Len < 20 ->
{Bin, <<>>};
@@ -633,23 +644,23 @@ rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
%% Enough bytes to extract a message.
rcv(<<_:1/binary, Len:24, _/binary>> = Bin)
when Len =< size(Bin) ->
- rcv1(Len, Bin);
+ recv1(Len, Bin);
%% Or not: wait for more packets.
rcv(<<_:1/binary, Len:24, _/binary>> = Head) ->
- {start_timer(), {Len, size(Head), Head, []}};
+ {Len, size(Head), Head, []};
%% Not even 4 bytes yet.
rcv(Head) ->
- {start_timer(), Head}.
+ Head.
-%% rcv1/2
+%% recv1/2
-rcv1(Len, Bin) ->
+recv1(Len, Bin) ->
<<Msg:Len/binary, Rest/binary>> = Bin,
{Msg, Rest}.
-%% bin/[12]
+%% bin/1-2
bin(Head, Acc) ->
list_to_binary([Head | lists:reverse(Acc)]).
@@ -660,7 +671,7 @@ bin(Bin)
when is_binary(Bin) ->
Bin.
-%% start_timer/0
+%% flush/1
%% An erroneously large message length may leave us with a fragment
%% that lingers if the peer doesn't have anything more to send. Start
@@ -673,14 +684,18 @@ bin(Bin)
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.
-start_timer() ->
- erlang:start_timer(?FRAGMENT_TIMEOUT, self(), flush).
+%% Messages have been received since last timer expiry.
+flush(#transport{flush = false} = S) ->
+ S#transport{flush = true};
-flush(TRef, #transport{parent = Pid, frag = {TRef, Head}} = S) ->
- diameter_peer:recv(Pid, bin(Head)),
- S#transport{frag = <<>>};
-flush(_, S) ->
- S.
+%% No fragment to flush.
+flush(#transport{frag = <<>>}) ->
+ ok;
+
+%% No messages since last expiry.
+flush(#transport{frag = Frag, parent = Pid} = S) ->
+ diameter_peer:recv(Pid, bin(Frag)),
+ S#transport{frag = <<>>}.
%% accept/2