diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 81 |
1 files changed, 49 insertions, 32 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 427b2395b9..edbbec1709 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -69,16 +69,16 @@ %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and %% a process owning the listening port. The monitor process -%% historically died after connection establishment, but now lives on -%% as the sender of outgoing messages, so that a blocking send doesn't -%% prevent messages from being received. +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. The name monitor predates its role as sender. +%% Monitor process state. -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), @@ -108,6 +108,7 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} + | {sender, boolean()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. @@ -120,7 +121,7 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - monitor :: pid()}). %% monitor/sender process + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% that kills us with the parent until call returns, and then %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + sender, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + Sender = proplists:get_value(sender, OwnOpts, false), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, - monitor(process, MPid), - MPid ! {start, self(), Sock, M}, %% prepare monitor for sending + Sender andalso monitor(process, MPid), + MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, timeout = Tmo, - monitor = MPid}); + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -515,15 +518,22 @@ m(Bin, S) send(Bin, S), S; -%% Transport is telling us to be ready to send. Stop monitoring on the +%% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, Sock, Mod}, #monitor{parent = MRef, - transport = TPid} - = S) -> - demonitor(MRef, [flush]), - S#monitor{parent = false, - socket = Sock, - module = Mod}; +m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; + false -> %% monitor not sending + x(M) + end; + +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); %% Transport is telling us to die. m({stop, TPid} = T, #monitor{transport = TPid}) -> @@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock, %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - ok == (catch gen_server:call(MPid, {stop, Pid})) - orelse exit(MPid, kill), + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) -> tls_handshake(Type, true, #transport{socket = Sock, module = M, ssl = Opts, - monitor = MPid} + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), - MPid ! {tls, SSock}, %% tell the monitor process + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) -> %% send/2 +send(false, #transport{}) -> %% ack + ok; + send(#diameter_packet{bin = Bin}, S) -> send(Bin, S); -send(Bin, #monitor{} = S) -> - send1(Bin, S); +send(Bin, #transport{socket = Sock, module = M, send = false}) -> + send1(M, Sock, Bin); + +send(Bin, #monitor{socket = Sock, module = M}) -> + send1(M, Sock, Bin); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{monitor = MPid}) -> - MPid ! Bin, - MPid. +send(Bin, #transport{send = Pid}) -> + Pid ! Bin, + ok. -%% send1/2 +%% send1/3 -send1(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> |