aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_tcp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl119
1 files changed, 83 insertions, 36 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 8e400bc6ee..2a580753a0 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -53,6 +53,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,17 +69,22 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but now lives on
+%% as the sender of outgoing messages, so that a blocking send doesn't
+%% prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state.
+%% Monitor process state. The name monitor predates its role as sender.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false,
+ transport = self() :: pid(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
@@ -116,7 +122,8 @@
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ throttled :: boolean() | binary(), %% stopped receiving?
+ monitor :: pid()}).
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
fragment_timer,
@@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
- MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
+ monitor(process, MPid),
+ MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
putr(?REF_KEY, Ref),
throttle(#transport{parent = Pid,
module = M,
@@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
ssl = SslOpts,
timeout = Tmo,
throttle_cb = Throttle,
- throttled = false /= Throttle});
+ throttled = false /= Throttle,
+ monitor = MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
+%% gen_tcp seems to do. Links should be left to supervisors.
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
true ->
S
end};
-
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -491,6 +503,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -503,18 +516,38 @@ getr(Key) ->
%%
%% Transition monitor state.
-%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+%% Outgoing message.
+m(Bin, #monitor{} = S)
+ when is_binary(Bin) ->
+ send(Bin, S),
+ S;
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport is telling us to be ready to send. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
+ transport = TPid}
+ = S) ->
+ demonitor(MRef, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
+
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, #monitor{} = S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
@@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Bin}}, #transport{} = S) ->
+ send(Bin, S),
+ ok;
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ monitor = MPid}) ->
+ ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
stop.
%% Crash on anything unexpected.
@@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ monitor = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ MPid ! {tls, SSock}, %% tell the monitor process
S#transport{socket = SSock,
module = ssl};
@@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
+send(Bin, #monitor{socket = Sock,
+ module = M}) ->
case send(M, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
- end.
+ end;
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Bin, #transport{monitor = MPid}) ->
+ MPid ! Bin,
+ MPid.
%% send/3
@@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S)
throttle(discard, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
S;
-
throttle({discard = T, F}, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
throttle(T, S#transport{throttle_cb = F});
@@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
send(Bin, S),
S;
-
throttle({Bin, F}, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
throttle(Bin, S#transport{throttle_cb = F});
@@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S)
throttle({timeout, Tmo}, S) ->
erlang:send_after(Tmo, self(), throttle),
throw(S);
-
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});