aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl76
-rw-r--r--lib/diameter/src/transport/diameter_sctp_sup.erl3
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl119
-rw-r--r--lib/diameter/test/diameter_watchdog_SUITE.erl14
4 files changed, 160 insertions, 52 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f48e4347ee..f9feb68874 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -52,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -92,7 +93,14 @@
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ monitor :: pid() | undefined}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}). %% next output stream
%% Listener process state.
-record(listener,
@@ -216,6 +224,12 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
i({listen, Ref, {Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
S
end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
%% Prior to the possiblity of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
@@ -422,6 +444,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ monitor = MPid}) ->
+ undefined == MPid
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Bin, StreamId}, #monitor{} = S) ->
+ send(StreamId, Bin, S);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{monitor = undefined,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId}),
+ monitor(process, MPid),
+ send(Msg, S#transport{monitor = MPid});
+
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
#transport{streams = {_, OS}}
@@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS},
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
-
-%% send/4
+send(StreamId, Bin, #transport{monitor = MPid}) ->
+ MPid ! {Bin, StreamId},
+ MPid;
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(StreamId, Bin, #monitor{socket = Sock,
+ assoc_id = AId}) ->
+ case gen_sctp:send(Sock, AId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl
index 36050aaf28..e8e26ec7c5 100644
--- a/lib/diameter/src/transport/diameter_sctp_sup.erl
+++ b/lib/diameter/src/transport/diameter_sctp_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@ start() ->
start_child(T) ->
SupRef = case element(1,T) of
+ monitor -> ?TRANSPORT_SUP;
connect -> ?TRANSPORT_SUP;
accept -> ?TRANSPORT_SUP;
listen -> ?LISTENER_SUP
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 8e400bc6ee..2a580753a0 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -53,6 +53,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,17 +69,22 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but now lives on
+%% as the sender of outgoing messages, so that a blocking send doesn't
+%% prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
module :: module(),
service = false :: false | pid()}). %% service process
-%% Monitor process state.
+%% Monitor process state. The name monitor predates its role as sender.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false,
+ transport = self() :: pid(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
@@ -116,7 +122,8 @@
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ throttled :: boolean() | binary(), %% stopped receiving?
+ monitor :: pid()}).
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
fragment_timer,
@@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
- MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
+ monitor(process, MPid),
+ MPid ! {start, self(), Sock, M}, %% prepare monitor for sending
putr(?REF_KEY, Ref),
throttle(#transport{parent = Pid,
module = M,
@@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
ssl = SslOpts,
timeout = Tmo,
throttle_cb = Throttle,
- throttled = false /= Throttle});
+ throttled = false /= Throttle,
+ monitor = MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
+%% gen_tcp seems to do. Links should be left to supervisors.
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
true ->
S
end};
-
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -491,6 +503,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -503,18 +516,38 @@ getr(Key) ->
%%
%% Transition monitor state.
-%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+%% Outgoing message.
+m(Bin, #monitor{} = S)
+ when is_binary(Bin) ->
+ send(Bin, S),
+ S;
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport is telling us to be ready to send. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, Sock, Mod}, #monitor{parent = MRef,
+ transport = TPid}
+ = S) ->
+ demonitor(MRef, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod};
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
+
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, #monitor{} = S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
@@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Bin}}, #transport{} = S) ->
+ send(Bin, S),
+ ok;
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ monitor = MPid}) ->
+ ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) ->
stop.
%% Crash on anything unexpected.
@@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ monitor = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ MPid ! {tls, SSock}, %% tell the monitor process
S#transport{socket = SSock,
module = ssl};
@@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
+send(Bin, #monitor{socket = Sock,
+ module = M}) ->
case send(M, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
- end.
+ end;
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Bin, #transport{monitor = MPid}) ->
+ MPid ! Bin,
+ MPid.
%% send/3
@@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S)
throttle(discard, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
S;
-
throttle({discard = T, F}, #transport{throttled = Msg} = S)
when is_binary(Msg) ->
throttle(T, S#transport{throttle_cb = F});
@@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
send(Bin, S),
S;
-
throttle({Bin, F}, #transport{throttled = Msg} = S)
when is_binary(Bin), is_binary(Msg) ->
throttle(Bin, S#transport{throttle_cb = F});
@@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S)
throttle({timeout, Tmo}, S) ->
erlang:send_after(Tmo, self(), throttle),
throw(S);
-
throttle({timeout = T, Tmo, F}, S) ->
throttle({T, Tmo}, S#transport{throttle_cb = F});
diff --git a/lib/diameter/test/diameter_watchdog_SUITE.erl b/lib/diameter/test/diameter_watchdog_SUITE.erl
index 6d22ddcc18..5ae951f7c2 100644
--- a/lib/diameter/test/diameter_watchdog_SUITE.erl
+++ b/lib/diameter/test/diameter_watchdog_SUITE.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -569,12 +569,12 @@ send(_, Sock, <<_:32, 1:1, _:7, 280:24, _:32, EId:32, HId:32, _/binary>>) ->
{'Origin-Host', "XXX"},
{'Origin-Realm', ?REALM}]},
#diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
- self() ! {tcp, Sock, Bin},
+ tpid(Sock) ! {tcp, Sock, Bin},
ok;
%% First outgoing DWA.
send(init, Sock, Bin) ->
- [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, self(), '_'}),
+ [{{?MODULE, _, T}, _}] = diameter_reg:wait({?MODULE, tpid(Sock), '_'}),
putr(config, T),
send(Sock, Bin);
@@ -607,6 +607,14 @@ send(N, Sock, <<_:32, 0:1, _:7, 280:24, _/binary>> = Bin) ->
putr(config, N-1),
gen_tcp:send(Sock, Bin).
+%% tpid/1
+
+tpid(Sock) ->
+ {connected, Pid} = erlang:port_info(Sock, connected),
+ Pid.
+
+%%failback/5
+
failback(Tmo, Msg, Sock, Bin, Origin) ->
timer:sleep(Tmo),
ok = gen_tcp:send(Sock, msg(Msg, Bin, Origin)).