aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-10 23:40:53 +0200
committerAnders Svensson <[email protected]>2017-06-11 16:30:39 +0200
commiteadf4efc7e264fe8dd30befb42a42a02cdef58f1 (patch)
tree6ac266589946ff35f843cbf89d6913113cbb1629
parent034089ed1ba3f78c732edcfc84d85a6ed4a4854e (diff)
downloadotp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.gz
otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.bz2
otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.zip
Make diameter_{tcp,sctp} sender configurable
With sends still from the receiving process by default, since changing the default behaviour may well have negative effects. A separate sender probably implies a greater need for some form of load regulation for one, since a blocking send would no longer imply that incoming messages are no longer recevied. Dealing with this could result in the same deadlock that the sending process intends to avoid, but the user should be in control over how/when incoming traffic is regulated.
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl59
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl81
2 files changed, 90 insertions, 50 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index d23e56b413..3919596cb1 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -68,6 +68,7 @@
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -75,8 +76,12 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -96,7 +101,7 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
- monitor :: pid() | undefined}). %% sending process
+ send = false :: pid() | boolean()}). %% sending process
%% Monitor process state.
-record(monitor,
@@ -110,7 +115,8 @@
socket :: gen_sctp:sctp_socket(),
service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ accept :: [match()],
+ sender :: boolean()}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest} = proplists:split(Opts, [accept, sender]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
@@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ accept = [[M] || {accept, M} <- OwnOpts],
+ sender = proplists:get_value(sender, OwnOpts, false)};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]),
+ OwnOpts = lists:append(Split),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Matches, Bool} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ t(T, S#transport{socket = Sock,
+ send = Bool});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -466,11 +477,12 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ accept = Matches,
+ sender = Sender}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender},
setopts(Sock),
NewS;
@@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- undefined == MPid
+ send = MPid}) ->
+ is_boolean(MPid)
orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
orelse exit(MPid, kill),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
%% Start monitor process on first send.
-send(Msg, #transport{monitor = undefined,
+send(Msg, #transport{send = true,
socket = Sock,
assoc_id = AId}
= S) ->
@@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined,
socket = Sock,
assoc_id = AId}),
monitor(process, MPid),
- send(Msg, S#transport{monitor = MPid});
+ send(Msg, S#transport{send = MPid});
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
@@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{monitor = MPid}) ->
+send(StreamId, Bin, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}) ->
+ send(Sock, AId, StreamId, Bin);
+
+send(StreamId, Bin, #transport{send = MPid}) ->
MPid ! {Bin, StreamId},
MPid;
send(StreamId, Bin, #monitor{socket = Sock,
assoc_id = AId}) ->
- case gen_sctp:send(Sock, AId, StreamId, Bin) of
+ send(Sock, AId, StreamId, Bin).
+
+%% send/4
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 427b2395b9..edbbec1709 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -69,16 +69,16 @@
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
%% a process owning the listening port. The monitor process
-%% historically died after connection establishment, but now lives on
-%% as the sender of outgoing messages, so that a blocking send doesn't
-%% prevent messages from being received.
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state. The name monitor predates its role as sender.
+%% Monitor process state.
-record(monitor,
{parent :: reference() | false | pid(),
transport = self() :: pid(),
@@ -108,6 +108,7 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
+ | {sender, boolean()}
| {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
@@ -120,7 +121,7 @@
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- monitor :: pid()}). %% monitor/sender process
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
%% that kills us with the parent until call returns, and then
%% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ sender,
fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ Sender = proplists:get_value(sender, OwnOpts, false),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
- monitor(process, MPid),
- MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
+ Sender andalso monitor(process, MPid),
+ MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending
putr(?REF_KEY, Ref),
setopts(#transport{parent = Pid,
module = M,
socket = Sock,
ssl = SslOpts,
timeout = Tmo,
- monitor = MPid});
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -515,15 +518,22 @@ m(Bin, S)
send(Bin, S),
S;
-%% Transport is telling us to be ready to send. Stop monitoring on the
+%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
-m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
- transport = TPid}
- = S) ->
- demonitor(MRef, [flush]),
- S#monitor{parent = false,
- socket = Sock,
- module = Mod};
+m({start, TPid, T} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
%% Transport is telling us to die.
m({stop, TPid} = T, #monitor{transport = TPid}) ->
@@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- ok == (catch gen_server:call(MPid, {stop, Pid}))
- orelse exit(MPid, kill),
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) ->
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
ssl = Opts,
- monitor = MPid}
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
- MPid ! {tls, SSock}, %% tell the monitor process
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
+send(false, #transport{}) -> %% ack
+ ok;
+
send(#diameter_packet{bin = Bin}, S) ->
send(Bin, S);
-send(Bin, #monitor{} = S) ->
- send1(Bin, S);
+send(Bin, #transport{socket = Sock, module = M, send = false}) ->
+ send1(M, Sock, Bin);
+
+send(Bin, #monitor{socket = Sock, module = M}) ->
+ send1(M, Sock, Bin);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
-send(Bin, #transport{monitor = MPid}) ->
- MPid ! Bin,
- MPid.
+send(Bin, #transport{send = Pid}) ->
+ Pid ! Bin,
+ ok.
-%% send1/2
+%% send1/3
-send1(Bin, #monitor{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
end.
-
+
%% send/3
send(gen_tcp, Sock, Bin) ->