diff options
author | Anders Svensson <[email protected]> | 2017-02-09 17:18:21 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-06-11 16:30:34 +0200 |
commit | 9ff8491996381cb2297671b94b7282a7ffb2136f (patch) | |
tree | 7a0504de92ca395357cd18645b9bf2c853ab7c34 /lib/diameter/src/transport/diameter_sctp.erl | |
parent | 69b0c1878a95bdfcfe9043fbccf8a0f7b4545bdc (diff) | |
download | otp-9ff8491996381cb2297671b94b7282a7ffb2136f.tar.gz otp-9ff8491996381cb2297671b94b7282a7ffb2136f.tar.bz2 otp-9ff8491996381cb2297671b94b7282a7ffb2136f.zip |
Don't send from receiving transport processes
Both diameter_tcp and diameter_sctp are susceptible to deadlock since a
peer that blocks send also prevents additional messages from being
received. Send from a process that's paired with the transport process
to avoid this. Use the existing monitor process in the TCP case, add one
in the SCTP case.
This has been the reason for many sporadic testcase failures, mostly in
diameter_traffic_SUITE.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 76 |
1 files changed, 64 insertions, 12 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f48e4347ee..f9feb68874 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -92,7 +93,14 @@ | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + monitor :: pid() | undefined}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% next output stream %% Listener process state. -record(listener, @@ -216,6 +224,12 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> S end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following @@ -422,6 +444,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + undefined == MPid + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop; %% Timeout after transport process has been started. @@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Bin, StreamId}, #monitor{} = S) -> + send(StreamId, Bin, S); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{monitor = undefined, + socket = Sock, + assoc_id = AId} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId}), + monitor(process, MPid), + send(Msg, S#transport{monitor = MPid}); + %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, #transport{streams = {_, OS}} @@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). - -%% send/4 +send(StreamId, Bin, #transport{monitor = MPid}) -> + MPid ! {Bin, StreamId}, + MPid; -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(StreamId, Bin, #monitor{socket = Sock, + assoc_id = AId}) -> + case gen_sctp:send(Sock, AId, StreamId, Bin) of ok -> ok; {error, Reason} -> |