aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl81
1 files changed, 49 insertions, 32 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 427b2395b9..edbbec1709 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -69,16 +69,16 @@
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
%% a process owning the listening port. The monitor process
-%% historically died after connection establishment, but now lives on
-%% as the sender of outgoing messages, so that a blocking send doesn't
-%% prevent messages from being received.
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state. The name monitor predates its role as sender.
+%% Monitor process state.
-record(monitor,
{parent :: reference() | false | pid(),
transport = self() :: pid(),
@@ -108,6 +108,7 @@
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
+ | {sender, boolean()}
| {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
@@ -120,7 +121,7 @@
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- monitor :: pid()}). %% monitor/sender process
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
%% that kills us with the parent until call returns, and then
%% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
+ sender,
fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ Sender = proplists:get_value(sender, OwnOpts, false),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
- monitor(process, MPid),
- MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
+ Sender andalso monitor(process, MPid),
+ MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending
putr(?REF_KEY, Ref),
setopts(#transport{parent = Pid,
module = M,
socket = Sock,
ssl = SslOpts,
timeout = Tmo,
- monitor = MPid});
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
@@ -515,15 +518,22 @@ m(Bin, S)
send(Bin, S),
S;
-%% Transport is telling us to be ready to send. Stop monitoring on the
+%% Transport has established a connection. Stop monitoring on the
%% parent so as not to die before a send from the transport.
-m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
- transport = TPid}
- = S) ->
- demonitor(MRef, [flush]),
- S#monitor{parent = false,
- socket = Sock,
- module = Mod};
+m({start, TPid, T} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
%% Transport is telling us to die.
m({stop, TPid} = T, #monitor{transport = TPid}) ->
@@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
%% Parent process has died: call the monitor to not close the socket
%% during an ongoing send, but don't let it take forever.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
- monitor = MPid}) ->
- ok == (catch gen_server:call(MPid, {stop, Pid}))
- orelse exit(MPid, kill),
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
stop;
%% Monitor process has died.
-transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) ->
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
ssl = Opts,
- monitor = MPid}
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
- MPid ! {tls, SSock}, %% tell the monitor process
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
+send(false, #transport{}) -> %% ack
+ ok;
+
send(#diameter_packet{bin = Bin}, S) ->
send(Bin, S);
-send(Bin, #monitor{} = S) ->
- send1(Bin, S);
+send(Bin, #transport{socket = Sock, module = M, send = false}) ->
+ send1(M, Sock, Bin);
+
+send(Bin, #monitor{socket = Sock, module = M}) ->
+ send1(M, Sock, Bin);
%% Send from the monitor process to avoid deadlock if both the
%% receiver and the peer were to block in send.
-send(Bin, #transport{monitor = MPid}) ->
- MPid ! Bin,
- MPid.
+send(Bin, #transport{send = Pid}) ->
+ Pid ! Bin,
+ ok.
-%% send1/2
+%% send1/3
-send1(Bin, #monitor{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
end.
-
+
%% send/3
send(gen_tcp, Sock, Bin) ->