diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 119 |
1 files changed, 83 insertions, 36 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8e400bc6ee..2a580753a0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -53,6 +53,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,17 +69,22 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but now lives on +%% as the sender of outgoing messages, so that a blocking send doesn't +%% prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. +%% Monitor process state. The name monitor predates its role as sender. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false, + transport = self() :: pid(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size @@ -116,7 +122,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + throttled :: boolean() | binary(), %% stopped receiving? + monitor :: pid()}). %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, fragment_timer, @@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), - MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, + monitor(process, MPid), + MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), throttle(#transport{parent = Pid, module = M, @@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ssl = SslOpts, timeout = Tmo, throttle_cb = Throttle, - throttled = false /= Throttle}); + throttled = false /= Throttle, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. +%% gen_tcp seems to do. Links should be left to supervisors. i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) -> true -> S end}; - + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -491,6 +503,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -503,18 +516,38 @@ getr(Key) -> %% %% Transition monitor state. -%% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +%% Outgoing message. +m(Bin, #monitor{} = S) + when is_binary(Bin) -> + send(Bin, S), + S; -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport is telling us to be ready to send. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, Sock, Mod}, #monitor{parent = MRef, + transport = TPid} + = S) -> + demonitor(MRef, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); + +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, #monitor{} = S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% @@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Bin}}, #transport{} = S) -> + send(Bin, S), + ok; %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop. %% Crash on anything unexpected. @@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + monitor = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + MPid ! {tls, SSock}, %% tell the monitor process S#transport{socket = SSock, module = ssl}; @@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> +send(Bin, #monitor{socket = Sock, + module = M}) -> case send(M, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) - end. + end; + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Bin, #transport{monitor = MPid}) -> + MPid ! Bin, + MPid. %% send/3 @@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> S; - throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> throttle(T, S#transport{throttle_cb = F}); @@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), S; - throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); @@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S) throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), throw(S); - throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); |