aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl121
1 files changed, 80 insertions, 41 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 0b26f429fb..51969a09c0 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -101,7 +101,8 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}.
+ | {fragment_timer, 0..16#FFFFFFFF}
+ | {throttle_cb, diameter:evaluable()}.
%% Accepting/connecting transport process state.
-record(transport,
@@ -112,7 +113,9 @@
ssl :: boolean() | [term()], %% ssl options
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
- flush = false :: boolean()}). %% flush fragment at timeout?
+ flush = false :: boolean(), %% flush fragment at timeout?
+ throttle_cb :: false | diameter:evaluable(), %% ask to receive
+ throttled = false :: boolean()}). %% stopped receiving?
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -197,22 +200,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% that does nothing but kill us with the parent until call
%% returns.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
- {SslOpts, Rest0} = ssl(Opts),
- {OwnOpts, Rest} = own(Rest0),
+ {[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ fragment_timer,
+ throttle_cb]),
+ SslOpts = ssl_opts(SO),
+ OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
+ Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
- setopts(M, Sock),
putr(?REF_KEY, Ref),
- #transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo};
+ throttle(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ timeout = Tmo,
+ throttle_cb = Throttle});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -245,14 +252,6 @@ laddr([], Mod, Sock) ->
laddr([{ip, Addr}], _, _) ->
Addr.
-own(Opts) ->
- {[Own], Rest} = proplists:split(Opts, [fragment_timer]),
- {Own, Rest}.
-
-ssl(Opts) ->
- {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
- {ssl_opts(SslOpts), Rest}.
-
ssl_opts([]) ->
false;
ssl_opts([{ssl_options, true}]) ->
@@ -260,8 +259,8 @@ ssl_opts([{ssl_options, true}]) ->
ssl_opts([{ssl_options, Opts}])
when is_list(Opts) ->
Opts;
-ssl_opts(L) ->
- ?ERROR({ssl_options, L}).
+ssl_opts(T) ->
+ ?ERROR({ssl_options, T}).
%% init/7
@@ -392,7 +391,7 @@ get_port(Ps) ->
gen_opts(LAddrOpt, Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts.
+ [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts.
%% ---------------------------------------------------------------------------
%% # ports/1
@@ -545,43 +544,44 @@ t(T,S) ->
%% Initial incoming message when we might need to upgrade to TLS:
%% don't request another message until we know.
-
transition({tcp, Sock, Bin}, #transport{socket = Sock,
parent = Pid,
frag = Head,
- module = M,
ssl = Opts}
= S)
when is_list(Opts) ->
case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
+ {Msg, B} ->
diameter_peer:recv(Pid, Msg),
S#transport{frag = B};
Frag ->
- setopts(M, Sock),
+ setopts(S),
start_fragment_timer(S#transport{frag = Frag})
end;
%% Incoming message.
transition({P, Sock, Bin}, #transport{socket = Sock,
- module = M,
- ssl = B}
+ ssl = B,
+ throttled = T}
= S)
when P == tcp, not B;
P == ssl, B ->
- setopts(M, Sock),
- start_fragment_timer(recv(Bin, S));
+ false = T, %% assert
+ recv(Bin, S);
+
+%% Check whether or not to read more after a throttle_cb timeout.
+transition(throttle, #transport{throttled = B} = S) ->
+ true = B, %% assert
+ throttle(S);
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= S) ->
- #transport{socket = Sock,
- module = M}
+ #transport{}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- setopts(M, Sock),
- start_fragment_timer(NS#transport{ssl = B});
+ throttle(NS#transport{ssl = B});
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -671,14 +671,17 @@ tls(accept, Sock, Opts) ->
%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.
-recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+%% Receive packets until a full message is received, then check
+%% whether to keep receiving.
+recv(Bin, #transport{parent = Pid, frag = Head, throttled = false} = S) ->
case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
+ {Msg, B} ->
diameter_peer:recv(Pid, Msg),
- recv(B, S#transport{frag = <<>>});
+ throttle(S#transport{frag = B});
Frag ->
- S#transport{frag = Frag,
- flush = false}
+ setopts(S),
+ start_fragment_timer(S#transport{frag = Frag,
+ flush = false})
end.
%% rcv/2
@@ -764,8 +767,10 @@ bin(Bin)
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.
-%% No fragment to flush.
-flush(#transport{frag = <<>>} = S) ->
+%% No fragment to flush or not receiving messages.
+flush(#transport{frag = Frag, throttled = B} = S)
+ when Frag == <<>>;
+ B ->
S;
%% Messages have been received since last timer expiry.
@@ -824,6 +829,11 @@ setopts(ssl, Sock, Opts) ->
setopts(M, Sock, Opts) ->
M:setopts(Sock, Opts).
+%% setopts/1
+
+setopts(#transport{socket = Sock, module = M}) ->
+ setopts(M, Sock).
+
%% setopts/2
setopts(M, Sock) ->
@@ -832,6 +842,35 @@ setopts(M, Sock) ->
X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
end.
+%% throttle/1
+
+throttle(#transport{throttle_cb = false} = S) ->
+ recv(<<>>, S);
+
+throttle(#transport{throttle_cb = F} = S) ->
+ throttle(diameter_lib:eval(F), S).
+
+%% throttle/2
+
+%% Don't ask for more packets as long as there are previously received
+%% messages to extract.
+throttle(ok, S) ->
+ recv(<<>>, S#transport{throttled = false});
+
+throttle({ok = T, F}, S) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Ask again after the specified number of milliseconds.
+throttle({timeout, Tmo}, #transport{} = S) ->
+ erlang:send_after(Tmo, self(), throttle),
+ S#transport{throttled = true};
+
+throttle({timeout = T, Tmo, F}, S) ->
+ throttle({T, Tmo}, S#transport{throttle_cb = F});
+
+throttle(T, #transport{throttle_cb = F}) ->
+ ?ERROR({invalid_return, T, F}).
+
%% portnr/2
portnr(gen_tcp, Sock) ->