aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-02-09 17:18:21 +0100
committerAnders Svensson <[email protected]>2017-06-11 16:30:34 +0200
commit9ff8491996381cb2297671b94b7282a7ffb2136f (patch)
tree7a0504de92ca395357cd18645b9bf2c853ab7c34 /lib/diameter/src/transport/diameter_sctp.erl
parent69b0c1878a95bdfcfe9043fbccf8a0f7b4545bdc (diff)
downloadotp-9ff8491996381cb2297671b94b7282a7ffb2136f.tar.gz
otp-9ff8491996381cb2297671b94b7282a7ffb2136f.tar.bz2
otp-9ff8491996381cb2297671b94b7282a7ffb2136f.zip
Don't send from receiving transport processes
Both diameter_tcp and diameter_sctp are susceptible to deadlock since a peer that blocks send also prevents additional messages from being received. Send from a process that's paired with the transport process to avoid this. Use the existing monitor process in the TCP case, add one in the SCTP case. This has been the reason for many sporadic testcase failures, mostly in diameter_traffic_SUITE.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl76
1 files changed, 64 insertions, 12 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f48e4347ee..f9feb68874 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -92,7 +93,14 @@
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ monitor :: pid() | undefined}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}). %% next output stream
%% Listener process state.
-record(listener,
@@ -216,6 +224,12 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
i({listen, Ref, {Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
S
end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
%% Prior to the possiblity of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
@@ -422,6 +444,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ monitor = MPid}) ->
+ undefined == MPid
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Bin, StreamId}, #monitor{} = S) ->
+ send(StreamId, Bin, S);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{monitor = undefined,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId}),
+ monitor(process, MPid),
+ send(Msg, S#transport{monitor = MPid});
+
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
#transport{streams = {_, OS}}
@@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
-
-%% send/4
+send(StreamId, Bin, #transport{monitor = MPid}) ->
+ MPid ! {Bin, StreamId},
+ MPid;
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(StreamId, Bin, #monitor{socket = Sock,
+ assoc_id = AId}) ->
+ case gen_sctp:send(Sock, AId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->