aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-10 23:40:53 +0200
committerAnders Svensson <[email protected]>2017-06-11 16:30:39 +0200
commiteadf4efc7e264fe8dd30befb42a42a02cdef58f1 (patch)
tree6ac266589946ff35f843cbf89d6913113cbb1629 /lib/diameter/src/transport/diameter_tcp.erl
parent034089ed1ba3f78c732edcfc84d85a6ed4a4854e (diff)
downloadotp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.gz
otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.tar.bz2
otp-eadf4efc7e264fe8dd30befb42a42a02cdef58f1.zip
Make diameter_{tcp,sctp} sender configurable
With sends still from the receiving process by default, since changing the default behaviour may well have negative effects. A separate sender probably implies a greater need for some form of load regulation for one, since a blocking send would no longer imply that incoming messages are no longer recevied. Dealing with this could result in the same deadlock that the sending process intends to avoid, but the user should be in control over how/when incoming traffic is regulated.
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl81
1 files changed, 49 insertions, 32 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 427b2395b9..edbbec1709 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -69,16 +69,16 @@
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
%% a process owning the listening port. The monitor process
-%% historically died after connection establishment, but now lives on
-%% as the sender of outgoing messages, so that a blocking send doesn't
-%% prevent messages from being received.
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state. The name monitor predates its role as sender.
+%% Monitor process state.
-record(monitor,
{parent :: reference() | false | pid(),
transport = self() :: pid(),
@@ -108,6 +108,7 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
+ | {sender, boolean()}
| {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
@@ -120,7 +121,7 @@
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- monitor :: pid()}). %% monitor/sender process
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
%% that kills us with the parent until call returns, and then
%% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ sender,
fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ Sender = proplists:get_value(sender, OwnOpts, false),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
- monitor(process, MPid),
- MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
+ Sender andalso monitor(process, MPid),
+ MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending
putr(?REF_KEY, Ref),
setopts(#transport{parent = Pid,
module = M,
socket = Sock,
ssl = SslOpts,
timeout = Tmo,
- monitor = MPid});
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -515,15 +518,22 @@ m(Bin, S)
send(Bin, S),
S;
-%% Transport is telling us to be ready to send. Stop monitoring on the
+%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
-m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
- transport = TPid}
- = S) ->
- demonitor(MRef, [flush]),
- S#monitor{parent = false,
- socket = Sock,
- module = Mod};
+m({start, TPid, T} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
%% Transport is telling us to die.
m({stop, TPid} = T, #monitor{transport = TPid}) ->
@@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- ok == (catch gen_server:call(MPid, {stop, Pid}))
- orelse exit(MPid, kill),
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) ->
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
ssl = Opts,
- monitor = MPid}
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
- MPid ! {tls, SSock}, %% tell the monitor process
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
+send(false, #transport{}) -> %% ack
+ ok;
+
send(#diameter_packet{bin = Bin}, S) ->
send(Bin, S);
-send(Bin, #monitor{} = S) ->
- send1(Bin, S);
+send(Bin, #transport{socket = Sock, module = M, send = false}) ->
+ send1(M, Sock, Bin);
+
+send(Bin, #monitor{socket = Sock, module = M}) ->
+ send1(M, Sock, Bin);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
-send(Bin, #transport{monitor = MPid}) ->
- MPid ! Bin,
- MPid.
+send(Bin, #transport{send = Pid}) ->
+ Pid ! Bin,
+ ok.
-%% send1/2
+%% send1/3
-send1(Bin, #monitor{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
end.
-
+
%% send/3
send(gen_tcp, Sock, Bin) ->