aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl282
1 files changed, 193 insertions, 89 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 005b2442c0..546c2cfa5e 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
%%
-module(diameter_tcp).
+-dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -56,7 +57,6 @@
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
--define(LISTENER_TIMEOUT, 30000).
-define(DEFAULT_FRAGMENT_TIMEOUT, 1000).
-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)).
@@ -72,8 +72,10 @@
%% Listener process state.
-record(listener, {socket :: inet:socket(),
- count = 1 :: non_neg_integer(),
- tref :: reference()}).
+ count = 1 :: non_neg_integer()}). %% accepting processes
+%% The count of accepting processes was previously used to terminate
+%% the listening process, but diameter_reg:subscribe/2 is now used for
+%% this. Leave the the count for trace purposes.
%% Monitor process state.
-record(monitor,
@@ -102,7 +104,8 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}.
+ | {fragment_timer, 0..16#FFFFFFFF}
+ | {throttle_cb, diameter:evaluable()}.
%% Accepting/connecting transport process state.
-record(transport,
@@ -110,10 +113,13 @@
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
frag = <<>> :: frag(), %% message fragment
- ssl :: boolean() | [term()], %% ssl options
+ ssl :: [term()] | boolean(), %% ssl options, ssl or not
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
- flush = false :: boolean()}). %% flush fragment at timeout?
+ flush = false :: boolean(), %% flush fragment at timeout?
+ throttle_cb :: false | diameter:evaluable(), %% ask to receive
+ throttled :: boolean() | binary()}). %% stopped receiving?
+
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
%% (for simplicity) transport option. The transport_module diameter_etcp
@@ -198,22 +204,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% that does nothing but kill us with the parent until call
%% returns.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
- {SslOpts, Rest0} = ssl(Opts),
- {OwnOpts, Rest} = own(Rest0),
+ {[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ fragment_timer,
+ throttle_cb]),
+ SslOpts = ssl_opts(SO),
+ OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
+ Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
- setopts(M, Sock),
putr(?REF_KEY, Ref),
- #transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo};
+ throttle(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ timeout = Tmo,
+ throttle_cb = Throttle,
+ throttled = false /= Throttle});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -230,6 +241,7 @@ i(#monitor{parent = Pid, transport = TPid} = S) ->
%% gen_tcp seems to so. Links should be left to supervisors.
i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
+ [_] = diameter_config:subscribe(LRef, transport), %% assert existence
{[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
LAddrOpt = get_addr(LA, Addrs),
LPort = get_port(LP),
@@ -238,7 +250,7 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
monitor(process, APid),
- start_timer(#listener{socket = LSock}).
+ #listener{socket = LSock}.
laddr([], Mod, Sock) ->
{ok, {Addr, _Port}} = sockname(Mod, Sock),
@@ -246,14 +258,6 @@ laddr([], Mod, Sock) ->
laddr([{ip, Addr}], _, _) ->
Addr.
-own(Opts) ->
- {[Own], Rest} = proplists:split(Opts, [fragment_timer]),
- {Own, Rest}.
-
-ssl(Opts) ->
- {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
- {ssl_opts(SslOpts), Rest}.
-
ssl_opts([]) ->
false;
ssl_opts([{ssl_options, true}]) ->
@@ -261,8 +265,8 @@ ssl_opts([{ssl_options, true}]) ->
ssl_opts([{ssl_options, Opts}])
when is_list(Opts) ->
Opts;
-ssl_opts(L) ->
- ?ERROR({ssl_options, L}).
+ssl_opts(T) ->
+ ?ERROR({ssl_options, T}).
%% init/7
@@ -393,7 +397,7 @@ get_port(Ps) ->
gen_opts(LAddrOpt, Opts) ->
{L,_} = proplists:split(Opts, [binary, packet, active]),
[[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
- [binary, {packet, 0}, {active, once}] ++ LAddrOpt ++ Opts.
+ [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts.
%% ---------------------------------------------------------------------------
%% # ports/1
@@ -482,13 +486,6 @@ putr(Key, Val) ->
getr(Key) ->
get({?MODULE, Key}).
-%% start_timer/1
-
-start_timer(#listener{count = 0} = S) ->
- S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)};
-start_timer(S) ->
- S.
-
%% m/2
%%
%% Transition monitor state.
@@ -510,21 +507,19 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
%%
%% Transition listener state.
-%% Another accept transport is attaching.
+%% An accepting transport is attaching.
l({accept, TPid}, #listener{count = N} = S) ->
monitor(process, TPid),
S#listener{count = N+1};
%% Accepting process has died.
l({'DOWN', _, process, _, _}, #listener{count = N} = S) ->
- start_timer(S#listener{count = N-1});
+ S#listener{count = N-1};
-%% Timeout after the last accepting process has died.
-l({timeout, TRef, close = T}, #listener{tref = TRef,
- count = 0}) ->
- x(T);
-l({timeout, _, close}, #listener{} = S) ->
- S.
+%% Transport has been removed.
+l({transport, remove, _} = T, #listener{socket = Sock}) ->
+ gen_tcp:close(Sock),
+ x(T).
%% t/2
%%
@@ -536,53 +531,37 @@ t(T,S) ->
S;
#transport{} = NS ->
NS;
- {stop, Reason} ->
- x(Reason);
stop ->
x(T)
end.
%% transition/2
-%% Initial incoming message when we might need to upgrade to TLS:
-%% don't request another message until we know.
-
-transition({tcp, Sock, Bin}, #transport{socket = Sock,
- parent = Pid,
- frag = Head,
- module = M,
- ssl = Opts}
- = S)
- when is_list(Opts) ->
- case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S#transport{frag = B};
- Frag ->
- setopts(M, Sock),
- start_fragment_timer(S#transport{frag = Frag})
- end;
-
%% Incoming message.
transition({P, Sock, Bin}, #transport{socket = Sock,
- module = M,
- ssl = B}
+ ssl = B,
+ throttled = T}
= S)
- when P == tcp, not B;
- P == ssl, B ->
- setopts(M, Sock),
- start_fragment_timer(recv(Bin, S));
+ when P == ssl, true == B;
+ P == tcp ->
+ false = T, %% assert
+ recv(Bin, S);
+
+%% Make a new throttling callback after a timeout.
+transition(throttle, #transport{throttled = false}) ->
+ ok;
+transition(throttle, S) ->
+ throttle(S);
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= S) ->
- #transport{socket = Sock,
- module = M}
+ true = is_boolean(B), %% assert
+ #transport{}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- setopts(M, Sock),
- start_fragment_timer(NS#transport{ssl = B});
+ throttle(NS#transport{ssl = B});
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -598,14 +577,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
- ok ->
- ok;
- {error, Reason} ->
- {stop, {send, Reason}}
- end;
+transition({diameter, {send, Bin}}, S) ->
+ send(Bin, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -672,16 +645,25 @@ tls(accept, Sock, Opts) ->
%% Reassemble fragmented messages and extract multiple message sent
%% using Nagle.
-recv(Bin, #transport{parent = Pid, frag = Head} = S) ->
+%% Receive packets until a full message is received,
+recv(Bin, #transport{frag = Head, throttled = false} = S) ->
case rcv(Head, Bin) of
- {Msg, B} when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- recv(B, S#transport{frag = <<>>});
+ {Msg, B} ->
+ throttle(S#transport{frag = B, throttled = Msg});
Frag ->
- S#transport{frag = Frag,
- flush = false}
+ setopts(S),
+ start_fragment_timer(S#transport{frag = Frag,
+ flush = false})
end.
+%% recv/1
+
+recv(#transport{throttled = false} = S) ->
+ recv(<<>>, S);
+
+recv(#transport{} = S) ->
+ S.
+
%% rcv/2
%% No previous fragment.
@@ -765,8 +747,10 @@ bin(Bin)
%% since all messages with length problems are discarded this should
%% also eventually lead to watchdog failover.
-%% No fragment to flush.
-flush(#transport{frag = <<>>} = S) ->
+%% No fragment to flush or not receiving messages.
+flush(#transport{frag = Frag, throttled = B} = S)
+ when Frag == <<>>;
+ B /= false ->
S;
%% Messages have been received since last timer expiry.
@@ -807,6 +791,17 @@ accept(Mod, LSock) ->
connect(Mod, Host, Port, Opts) ->
Mod:connect(Host, Port, Opts).
+%% send/2
+
+send(Bin, #transport{socket = Sock,
+ module = M}) ->
+ case send(M, Sock, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ x({send, Reason})
+ end.
+
%% send/3
send(gen_tcp, Sock, Bin) ->
@@ -825,6 +820,11 @@ setopts(ssl, Sock, Opts) ->
setopts(M, Sock, Opts) ->
M:setopts(Sock, Opts).
+%% setopts/1
+
+setopts(#transport{socket = Sock, module = M}) ->
+ setopts(M, Sock).
+
%% setopts/2
setopts(M, Sock) ->
@@ -833,6 +833,110 @@ setopts(M, Sock) ->
X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
end.
+%% throttle/1
+
+%% Still collecting packets for a complete message: keep receiving.
+throttle(#transport{throttled = false} = S) ->
+ recv(S);
+
+%% Decide whether to receive another, or whether to accept a message
+%% that's been received.
+throttle(#transport{throttle_cb = F, throttled = T} = S) ->
+ Res = cb(F, T),
+
+ try throttle(Res, S) of
+ #transport{ssl = SB} = NS when is_boolean(SB) ->
+ throttle(defrag(NS));
+ #transport{throttled = Msg} = NS when is_binary(Msg) ->
+ %% Initial incoming message when we might need to upgrade
+ %% to TLS: wait for reception of a tls tuple.
+ defrag(NS)
+ catch
+ #transport{} = NS ->
+ recv(NS)
+ end.
+
+%% cb/2
+
+cb(false, _) ->
+ ok;
+
+cb(F, B) ->
+ diameter_lib:eval([F, true /= B andalso B]).
+
+%% throttle/2
+
+%% Callback says to receive another message.
+throttle(ok, #transport{throttled = true} = S) ->
+ throw(S#transport{throttled = false});
+
+%% Callback says to accept a received message.
+throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
+ when is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ S;
+
+throttle({ok = T, F}, S) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Callback says to accept a received message and acknowledged the
+%% returned pid with a {request, Pid} message if a request pid is
+%% spawned, a discard message otherwise. The latter does not mean that
+%% the message was necessarily discarded: it could have been an
+%% answer.
+throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
+ when is_pid(NPid), is_binary(Msg) ->
+ diameter_peer:recv(Pid, {Msg, NPid}),
+ S;
+
+throttle({NPid, F}, #transport{throttled = Msg} = S)
+ when is_pid(NPid), is_binary(Msg) ->
+ throttle(NPid, S#transport{throttle_cb = F});
+
+%% Callback to accept a received message says to discard it.
+throttle(discard, #transport{throttled = Msg} = S)
+ when is_binary(Msg) ->
+ S;
+
+throttle({discard = T, F}, #transport{throttled = Msg} = S)
+ when is_binary(Msg) ->
+ throttle(T, S#transport{throttle_cb = F});
+
+%% Callback to accept a received message says to answer it with the
+%% supplied binary.
+throttle(Bin, #transport{throttled = Msg} = S)
+ when is_binary(Bin), is_binary(Msg) ->
+ send(Bin, S),
+ S;
+
+throttle({Bin, F}, #transport{throttled = Msg} = S)
+ when is_binary(Bin), is_binary(Msg) ->
+ throttle(Bin, S#transport{throttle_cb = F});
+
+%% Callback says to ask again in the specified number of milliseconds.
+throttle({timeout, Tmo}, S) ->
+ erlang:send_after(Tmo, self(), throttle),
+ throw(S);
+
+throttle({timeout = T, Tmo, F}, S) ->
+ throttle({T, Tmo}, S#transport{throttle_cb = F});
+
+throttle(T, #transport{throttle_cb = F}) ->
+ ?ERROR({invalid_return, T, F}).
+
+%% defrag/1
+%%
+%% Try to extract another message from packets already read before
+%% another throttling callback.
+
+defrag(#transport{frag = Head} = S) ->
+ case rcv(Head, <<>>) of
+ {Msg, B} ->
+ S#transport{throttled = Msg, frag = B};
+ _ ->
+ S#transport{throttled = true}
+ end.
+
%% portnr/2
portnr(gen_tcp, Sock) ->