diff options
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 76 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp_sup.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 119 | ||||
-rw-r--r-- | lib/diameter/test/diameter_watchdog_SUITE.erl | 14 |
4 files changed, 160 insertions, 52 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f48e4347ee..f9feb68874 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -92,7 +93,14 @@ | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + monitor :: pid() | undefined}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% next output stream %% Listener process state. -record(listener, @@ -216,6 +224,12 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> S end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following @@ -422,6 +444,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + undefined == MPid + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop; %% Timeout after transport process has been started. @@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Bin, StreamId}, #monitor{} = S) -> + send(StreamId, Bin, S); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{monitor = undefined, + socket = Sock, + assoc_id = AId} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId}), + monitor(process, MPid), + send(Msg, S#transport{monitor = MPid}); + %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, #transport{streams = {_, OS}} @@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). - -%% send/4 +send(StreamId, Bin, #transport{monitor = MPid}) -> + MPid ! {Bin, StreamId}, + MPid; -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(StreamId, Bin, #monitor{socket = Sock, + assoc_id = AId}) -> + case gen_sctp:send(Sock, AId, StreamId, Bin) of ok -> ok; {error, Reason} -> diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl index 36050aaf28..e8e26ec7c5 100644 --- a/lib/diameter/src/transport/diameter_sctp_sup.erl +++ b/lib/diameter/src/transport/diameter_sctp_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ start() -> start_child(T) -> SupRef = case element(1,T) of + monitor -> ?TRANSPORT_SUP; connect -> ?TRANSPORT_SUP; accept -> ?TRANSPORT_SUP; listen -> ?LISTENER_SUP diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8e400bc6ee..2a580753a0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -53,6 +53,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,17 +69,22 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but now lives on +%% as the sender of outgoing messages, so that a blocking send doesn't +%% prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. +%% Monitor process state. The name monitor predates its role as sender. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false, + transport = self() :: pid(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size @@ -116,7 +122,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + throttled :: boolean() | binary(), %% stopped receiving? + monitor :: pid()}). %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, fragment_timer, @@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), - MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, + monitor(process, MPid), + MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), throttle(#transport{parent = Pid, module = M, @@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ssl = SslOpts, timeout = Tmo, throttle_cb = Throttle, - throttled = false /= Throttle}); + throttled = false /= Throttle, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. +%% gen_tcp seems to do. Links should be left to supervisors. i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) -> true -> S end}; - + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -491,6 +503,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -503,18 +516,38 @@ getr(Key) -> %% %% Transition monitor state. -%% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +%% Outgoing message. +m(Bin, #monitor{} = S) + when is_binary(Bin) -> + send(Bin, S), + S; -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport is telling us to be ready to send. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, Sock, Mod}, #monitor{parent = MRef, + transport = TPid} + = S) -> + demonitor(MRef, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); + +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, #monitor{} = S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% @@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Bin}}, #transport{} = S) -> + send(Bin, S), + ok; %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop. %% Crash on anything unexpected. @@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + monitor = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + MPid ! {tls, SSock}, %% tell the monitor process S#transport{socket = SSock, module = ssl}; @@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> +send(Bin, #monitor{socket = Sock, + module = M}) -> case send(M, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) - end. + end; + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Bin, #transport{monitor = MPid}) -> + MPid ! Bin, + MPid. %% send/3 @@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> S; - throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> throttle(T, S#transport{throttle_cb = F}); @@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), S; - throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); @@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S) throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), throw(S); - throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl index 6d22ddcc18..5ae951f7c2 100644 --- a/lib/diameter/test/diameter_watchdog_SUITE.erl +++ b/lib/diameter/test/diameter_watchdog_SUITE.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -569,12 +569,12 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) -> {'Origin-Host', "XXX"}, {'Origin-Realm', ?REALM}]}, #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), - self() ! {tcp, Sock, Bin}, + tpid(Sock) ! {tcp, Sock, Bin}, ok; %% First outgoing DWA. send(init, Sock, Bin) -> - [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}), + [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}), putr(config, T), send(Sock, Bin); @@ -607,6 +607,14 @@ send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) -> putr(config, N-1), gen_tcp:send(Sock, Bin). +%% tpid/1 + +tpid(Sock) -> + {connected, Pid} = erlang:port_info(Sock, connected), + Pid. + +%%failback/5 + failback(Tmo, Msg, Sock, Bin, Origin) -> timer:sleep(Tmo), ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)). |