aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-02-09 17:18:21 +0100
committerAnders Svensson <[email protected]>2017-06-11 16:30:34 +0200
commit9ff8491996381cb2297671b94b7282a7ffb2136f (patch)
tree7a0504de92ca395357cd18645b9bf2c853ab7c34
parent69b0c1878a95bdfcfe9043fbccf8a0f7b4545bdc (diff)
downloadotp-9ff8491996381cb2297671b94b7282a7ffb2136f.tar.gz
otp-9ff8491996381cb2297671b94b7282a7ffb2136f.tar.bz2
otp-9ff8491996381cb2297671b94b7282a7ffb2136f.zip
Don't send from receiving transport processes
Both diameter_tcp and diameter_sctp are susceptible to deadlock since a peer that blocks send also prevents additional messages from being received. Send from a process that's paired with the transport process to avoid this. Use the existing monitor process in the TCP case, add one in the SCTP case. This has been the reason for many sporadic testcase failures, mostly in diameter_traffic_SUITE.
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl76
-rw-r--r--lib/diameter/src/transport/diameter_sctp_sup.erl3
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl119
-rw-r--r--lib/diameter/test/diameter_watchdog_SUITE.erl14
4 files changed, 160 insertions, 52 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f48e4347ee..f9feb68874 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -92,7 +93,14 @@
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ monitor :: pid() | undefined}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}). %% next output stream
%% Listener process state.
-record(listener,
@@ -216,6 +224,12 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
i({listen, Ref, {Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
S
end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
%% Prior to the possiblity of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
@@ -422,6 +444,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ monitor = MPid}) ->
+ undefined == MPid
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Bin, StreamId}, #monitor{} = S) ->
+ send(StreamId, Bin, S);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{monitor = undefined,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId}),
+ monitor(process, MPid),
+ send(Msg, S#transport{monitor = MPid});
+
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
#transport{streams = {_, OS}}
@@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
-
-%% send/4
+send(StreamId, Bin, #transport{monitor = MPid}) ->
+ MPid ! {Bin, StreamId},
+ MPid;
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(StreamId, Bin, #monitor{socket = Sock,
+ assoc_id = AId}) ->
+ case gen_sctp:send(Sock, AId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl
index 36050aaf28..e8e26ec7c5 100644
--- a/lib/diameter/src/transport/diameter_sctp_sup.erl
+++ b/lib/diameter/src/transport/diameter_sctp_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@ start() ->
start_child(T) ->
SupRef = case element(1,T) of
+ monitor -> ?TRANSPORT_SUP;
connect -> ?TRANSPORT_SUP;
accept -> ?TRANSPORT_SUP;
listen -> ?LISTENER_SUP
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 8e400bc6ee..2a580753a0 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -53,6 +53,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,17 +69,22 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but now lives on
+%% as the sender of outgoing messages, so that a blocking send doesn't
+%% prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state.
+%% Monitor process state. The name monitor predates its role as sender.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false,
+ transport = self() :: pid(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
@@ -116,7 +122,8 @@
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ throttled :: boolean() | binary(), %% stopped receiving?
+ monitor :: pid()}).
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
fragment_timer,
@@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
- MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
+ monitor(process, MPid),
+ MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
putr(?REF_KEY, Ref),
throttle(#transport{parent = Pid,
module = M,
@@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
ssl = SslOpts,
timeout = Tmo,
throttle_cb = Throttle,
- throttled = false /= Throttle});
+ throttled = false /= Throttle,
+ monitor = MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
+%% gen_tcp seems to do. Links should be left to supervisors.
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
true ->
S
end};
-
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -491,6 +503,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -503,18 +516,38 @@ getr(Key) ->
%%
%% Transition monitor state.
-%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+%% Outgoing message.
+m(Bin, #monitor{} = S)
+ when is_binary(Bin) ->
+ send(Bin, S),
+ S;
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport is telling us to be ready to send. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
+ transport = TPid}
+ = S) ->
+ demonitor(MRef, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
+
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, #monitor{} = S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
@@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Bin}}, #transport{} = S) ->
+ send(Bin, S),
+ ok;
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ monitor = MPid}) ->
+ ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
stop.
%% Crash on anything unexpected.
@@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ monitor = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ MPid ! {tls, SSock}, %% tell the monitor process
S#transport{socket = SSock,
module = ssl};
@@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
+send(Bin, #monitor{socket = Sock,
+ module = M}) ->
case send(M, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
- end.
+ end;
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Bin, #transport{monitor = MPid}) ->
+ MPid ! Bin,
+ MPid.
%% send/3
@@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S)
throttle(discard, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
S;
-
throttle({discard = T, F}, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
throttle(T, S#transport{throttle_cb = F});
@@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
send(Bin, S),
S;
-
throttle({Bin, F}, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
throttle(Bin, S#transport{throttle_cb = F});
@@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S)
throttle({timeout, Tmo}, S) ->
erlang:send_after(Tmo, self(), throttle),
throw(S);
-
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});
diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl
index 6d22ddcc18..5ae951f7c2 100644
--- a/lib/diameter/test/diameter_watchdog_SUITE.erl
+++ b/lib/diameter/test/diameter_watchdog_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -569,12 +569,12 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
{'Origin-Host', "XXX"},
{'Origin-Realm', ?REALM}]},
#diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
- self() ! {tcp, Sock, Bin},
+ tpid(Sock) ! {tcp, Sock, Bin},
ok;
%% First outgoing DWA.
send(init, Sock, Bin) ->
- [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}),
+ [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}),
putr(config, T),
send(Sock, Bin);
@@ -607,6 +607,14 @@ send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) ->
putr(config, N-1),
gen_tcp:send(Sock, Bin).
+%% tpid/1
+
+tpid(Sock) ->
+ {connected, Pid} = erlang:port_info(Sock, connected),
+ Pid.
+
+%%failback/5
+
failback(Tmo, Msg, Sock, Bin, Origin) ->
timer:sleep(Tmo),
ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)).