aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/doc/src/diameter.xml152
-rw-r--r--lib/diameter/doc/src/diameter_app.xml4
-rw-r--r--lib/diameter/doc/src/diameter_transport.xml6
-rw-r--r--lib/diameter/src/base/diameter.erl1
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl193
-rw-r--r--lib/diameter/src/base/diameter_service.erl64
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl22
-rw-r--r--lib/diameter/test/diameter_dpr_SUITE.erl196
-rw-r--r--lib/diameter/test/modules.mk5
9 files changed, 524 insertions, 119 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml
index a35fd5b3a8..15f398c5f8 100644
--- a/lib/diameter/doc/src/diameter.xml
+++ b/lib/diameter/doc/src/diameter.xml
@@ -833,6 +833,7 @@ request a connection with one peer over SCTP or another
To listen on both SCTP and TCP, define one transport for each.</p>
</item>
+<marker id="applications"/>
<tag><c>{applications, [<seealso marker="#application_alias">application_alias()</seealso>]}</c></tag>
<item>
<p>
@@ -842,6 +843,7 @@ Defaults to all applications configured on the service in question.
Applications not configured on the service in question are ignored.</p>
</item>
+<marker id="capabilities"/>
<tag><c>{capabilities, [<seealso marker="#capability">capability()</seealso>]}</c></tag>
<item>
<p>
@@ -856,33 +858,136 @@ TLS is desired over TCP as implemented by
<seealso marker="diameter_tcp">diameter_tcp(3)</seealso>.</p>
</item>
+<marker id="capabilities_cb"/>
<tag><c>{capabilities_cb, <seealso marker="#evaluable">evaluable()</seealso>}</c></tag>
<item>
<p>
A callback invoked upon reception of CER/CEA during capabilities
exchange in order to ask whether or not the connection should
be accepted.
-Applied to the relevant <c><seealso
-marker="#transport_ref">transport_ref()</seealso></c> and the
-<c>#diameter_caps{}</c> record of the connection.
-Returning <c>ok</c> accepts the connection.
-Returning <c>integer()</c> causes an incoming
-CER to be answered with the specified Result-Code.
-Returning <c>discard</c> causes an incoming CER to
-be discarded.
-Returning <c>unknown</c> is equivalent to returning <c>3010</c>,
-DIAMETER_UNKNOWN_PEER.
-Returning anything but <c>ok</c> or a 2xxx series result
-code causes the transport connection to be broken.</p>
+Applied to the <c><seealso
+marker="#transport_ref">transport_ref()</seealso></c> and
+<c>#diameter_caps{}</c> record of the connection.</p>
+
+<p>
+The return value can have one of the following types.</p>
+
+<taglist>
+<tag><c>ok</c></tag>
+<item>
+<p>
+Accept the connection.</p>
+</item>
+
+<tag><c>integer()</c></tag>
+<item>
+<p>
+Causes an incoming CER to be answered with the specified Result-Code.</p>
+</item>
+
+<tag><c>discard</c></tag>
+<item>
+<p>
+Causes an incoming CER to be discarded without CEA being sent.</p>
+</item>
+
+<tag><c>unknown</c></tag>
+<item>
+<p>
+Equivalent to returning <c>3010</c>, DIAMETER_UNKNOWN_PEER.</p>
+</item>
+</taglist>
<p>
+Returning anything but <c>ok</c> or a 2xxx series result
+code causes the transport connection to be broken.
Multiple <c>capabilities_cb</c> options can be specified, in which
case the corresponding callbacks are applied until either all return
<c>ok</c> or one does not.</p>
+</item>
-<marker id="watchdog_timer"/>
+<marker id="disconnect_cb"/>
+<tag><c>{disconnect_cb, <seealso marker="#evaluable">evaluable()</seealso>}</c></tag>
+
+<item>
+<p>
+A callback invoked prior to requesting shutdown of a transport process
+for a transport connection having watchdog state <c>OKAY</c>.
+Applied to <c>Reason=transport|service|application</c> and the
+<c><seealso marker="#transport_ref">transport_ref()</seealso></c> and
+<c><seealso marker="diameter_app#peer">diameter_app:peer()</seealso></c>
+in question, <c>Reason</c> indicating whether the the diameter
+application is being stopped, the service in question is being stopped
+at <seealso
+marker="diameter#stop_service">diameter:stop_service/1</seealso> or
+the transport in question is being removed at <seealso
+marker="diameter#remove_transport">diameter:remove_transport/2</seealso>,
+respectively.</p>
+
+<p>
+The return value can have one of the following types.</p>
+
+<taglist>
+<tag><c>{dpr, [option()]}</c></tag>
+<item>
+<p>
+Causes Disconnect-Peer-Request to be sent to the peer, transport
+process shutdown being requested after reception of
+Disconnect-Peer-Answer or timeout.
+An <c>option()</c> can be one of the following.</p>
+
+<taglist>
+<tag><c>{timeout, integer()}</c></tag>
+<item>
+<p>
+Transport process shutdown will be requested after this number of
+milliseconds if DPA is not received.
+Defaults to 1000.</p>
+</item>
+
+<tag><c>{cause, 0|rebooting|1|busy|2|goaway}</c></tag>
+<item>
+<p>
+The Disconnect-Cause to send, <c>REBOOTING</c>, <c>BUSY</c> and
+<c>DO_NOT_WANT_TO_TALK_TO_YOU</c> respectively.
+Defaults to <c>rebooting</c> for <c>Reason=service|application</c> and
+<c>goaway</c> for <c>Reason=transport</c>.</p>
+</item>
+</taglist>
+</item>
+
+<tag><c>dpr</c></tag>
+<item>
+<p>
+Equivalent to <c>{dpr, []}</c>.</p>
+</item>
+
+<tag><c>close</c></tag>
+<item>
+<p>
+Causes transport process shutdown to be requested without
+Disconnect-Peer-Request being sent to the peer.</p>
</item>
+<tag><c>ignore</c></tag>
+<item>
+<p>
+Equivalent to not having configured the callback.</p>
+</item>
+</taglist>
+
+<p>
+Multiple <c>disconnect_cb</c> options can be specified, in which
+case the corresponding callbacks are applied until one of them returns
+a value other than <c>ignore</c>.
+All callbacks returning <c>ignore</c> is equivalent to not having
+configured them.</p>
+
+<p>
+Defaults to a single callback returning <c>dpr</c>.</p>
+</item>
+
+<marker id="watchdog_timer"/>
<tag><c>{watchdog_timer, TwInit}</c></tag>
<item>
<code>
@@ -902,10 +1007,9 @@ the callback.</p>
<p>
An integer value must be at least 6000 as required by RFC 3539.
Defaults to 30000 if unspecified.</p>
-
-<marker id="reconnect_timer"/>
</item>
+<marker id="reconnect_timer"/>
<tag><c>{reconnect_timer, Tc}</c></tag>
<item>
<code>
@@ -1161,7 +1265,7 @@ at the time the diameter application was started.</p>
<!-- ===================================================================== -->
<func>
-<name>remove_transport(SvcName, Pred) -> ok</name>
+<name>remove_transport(SvcName, Pred) -> ok | {error, Reason}</name>
<fsummary>Remove previously added transports.</fsummary>
<type>
<v>SvcName = <seealso marker="#service_name">service_name()</seealso></v>
@@ -1171,6 +1275,7 @@ at the time the diameter application was started.</p>
<v>&nbsp;&nbsp;&nbsp; | fun((<seealso marker="#transport_ref">transport_ref()</seealso>, list()) -> boolean())</v>
<v>&nbsp;&nbsp;&nbsp; | fun((list()) -> boolean())</v>
<v>MFA = {atom(), atom(), list()}</v>
+<v>Reason = term()</v>
</type>
<desc>
<p>
@@ -1196,15 +1301,12 @@ Pred = {M,F,A}: fun(Ref, Type, Opts) -> apply(M, F, [Ref, Type, Opts | A]) end
</code>
<p>
-Removing a transport causes all associated transport connections to
-be broken.
-A DPR message with
-Disconnect-Cause <c>DO_NOT_WANT_TO_TALK_TO_YOU</c> will be sent
-to each connected peer before disassociating the transport configuration
-from the service and terminating the transport upon reception of
-DPA or timeout.</p>
-
-<!-- TODO: document the timeout value, possibly make configurable. -->
+Removing a transport causes the corresponding transport processes to
+be asked to terminate.
+Whether or not a DPR message is sent to a peer is
+controlled by
+value of <seealso marker="disconnect_cb">disconnect_cb</seealso>
+configured on the transport.</p>
<marker id="service_info"/>
</desc>
diff --git a/lib/diameter/doc/src/diameter_app.xml b/lib/diameter/doc/src/diameter_app.xml
index 9d8a6568eb..ac056c2d39 100644
--- a/lib/diameter/doc/src/diameter_app.xml
+++ b/lib/diameter/doc/src/diameter_app.xml
@@ -309,12 +309,12 @@ by either <seealso marker="#handle_answer">handle_answer/4</seealso>
or <seealso marker="#handle_error">handle_error/4</seealso> depending
on whether or not an answer message is received from the peer.
If the transport becomes unavailable after <seealso
-marker="prepare_request">prepare_request/3</seealso> then a new <seealso
+marker="#prepare_request">prepare_request/3</seealso> then a new <seealso
marker="#pick_peer">pick_peer/4</seealso> callback may take place to
failover to an alternate peer, after which <seealso
marker="#prepare_retransmit">prepare_retransmit/3</seealso> takes the
place of <seealso
-marker="prepare_request">prepare_request/3</seealso> in resending the
+marker="#prepare_request">prepare_request/3</seealso> in resending the
request.
There is no guarantee that a <seealso
marker="#pick_peer">pick_peer/4</seealso> callback to select
diff --git a/lib/diameter/doc/src/diameter_transport.xml b/lib/diameter/doc/src/diameter_transport.xml
index d9b36a1e09..0c8b41397a 100644
--- a/lib/diameter/doc/src/diameter_transport.xml
+++ b/lib/diameter/doc/src/diameter_transport.xml
@@ -149,9 +149,9 @@ contains the binary to send.</p>
<tag><c>{diameter, {close, Pid}}</c></tag>
<item>
<p>
-A request to close the transport connection.
-The transport process should terminate after closing the
-connection.
+A request to terminate the transport process after having received DPA
+in response to DPR.
+The transport process should exit.
<c>Pid</c> is the pid() of the parent process.</p>
</item>
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index 3e3a6be0ef..95702f03d4 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -330,6 +330,7 @@ call(SvcName, App, Message) ->
| {applications, [app_alias()]}
| {capabilities, [capability()]}
| {capabilities_cb, evaluable()}
+ | {disconnect_cb, evaluable()}
| {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
| {reconnect_timer, 'Unsigned32'()}
| {private, any()}.
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 3f4945f7a6..4acfd8313b 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -48,15 +48,19 @@
-include("diameter_internal.hrl").
-include("diameter_gen_base_rfc3588.hrl").
+%% Values of Disconnect-Cause in DPR.
-define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU').
-define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING').
+-define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY').
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
%% Keys in process dictionary.
-define(CB_KEY, cb). %% capabilities callback
+-define(DPR_KEY, dpr). %% disconnect callback
-define(DWA_KEY, dwa). %% outgoing DWA
+-define(REF_KEY, ref). %% transport_ref()
-define(Q_KEY, q). %% transport start queue
-define(START_KEY, start). %% start of connected transport
-define(SEQUENCE_KEY, mask). %% mask for sequence numbers
@@ -68,6 +72,13 @@
%% A 2xxx series Result-Code. Not necessarily 2001.
-define(IS_SUCCESS(N), 2 == (N) div 1000).
+%% Guards.
+-define(IS_UINT32(N), (0 =< N andalso 0 == N bsr 32)).
+-define(IS_TIMEOUT(N), ?IS_UINT32(N)).
+-define(IS_CAUSE(N), N == ?REBOOT; N == rebooting;
+ N == ?GOAWAY; N == goaway;
+ N == ?BUSY; N == busy).
+
%% RFC 3588:
%%
%% Timeout An application-defined timer has expired while waiting
@@ -75,18 +86,16 @@
%%
-define(EVENT_TIMEOUT, 10000).
-%% How long to wait for a DPA in response to DPR before simply
-%% aborting. Used to distinguish between shutdown and not but there's
-%% not really any need. Stopping a service will require a timeout if
-%% the peer doesn't answer DPR so the value should be short-ish.
+%% Default timeout for DPA in response to DPR. A bit short but the
+%% timeout used to be hardcoded. (So it could be worse.)
-define(DPA_TIMEOUT, 1000).
-record(state,
{state = 'Wait-Conn-Ack' %% state of RFC 3588 Peer State Machine
:: 'Wait-Conn-Ack' | recv_CER | 'Wait-CEA' | 'Open',
mode :: accept | connect | {connect, reference()},
- parent :: pid(),
- transport :: pid(),
+ parent :: pid(), %% watchdog process
+ transport :: pid(), %% transport process
service :: #diameter_service{},
dpr = false :: false | {diameter:'Unsigned32'(),
diameter:'Unsigned32'()}}).
@@ -163,14 +172,16 @@ i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code
i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}});
i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
- capabilities = Caps}
+ capabilities = LCaps}
= Svc}}) ->
[] /= Apps orelse ?ERROR({no_apps, T, Opts}),
- putr(?DWA_KEY, dwa(Caps)),
+ putr(?DWA_KEY, dwa(LCaps)),
{M, Ref} = T,
diameter_stats:reg(Ref),
- {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
- putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
+ {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]),
+ putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}),
+ putr(?DPR_KEY, [F || {_, F} <- Ds]),
+ putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
erlang:monitor(process, WPid),
@@ -188,8 +199,8 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
-start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) ->
- Addrs0 = Caps#diameter_caps.host_ip_address,
+start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
+ Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
start_transport(Addrs0, T) ->
@@ -212,9 +223,9 @@ svc(Svc, []) ->
svc(Svc, Addrs) ->
readdr(Svc, Addrs).
-readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) ->
- Caps = Caps0#diameter_caps{host_ip_address = Addrs},
- Svc#diameter_service{capabilities = Caps}.
+readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) ->
+ LCaps = LCaps0#diameter_caps{host_ip_address = Addrs},
+ Svc#diameter_service{capabilities = LCaps}.
%% The 4-tuple Data returned from diameter_peer:start/1 identifies the
%% transport module/config use to start the transport process in
@@ -375,25 +386,19 @@ transition({send, Msg}, #state{transport = TPid}) ->
send(TPid, Msg),
ok;
-%% Request for graceful shutdown.
-transition({shutdown, Pid}, #state{parent = Pid, dpr = false} = S) ->
- dpr(?GOAWAY, S);
-transition({shutdown, Pid}, #state{parent = Pid}) ->
- ok;
-
-%% Application shutdown.
-transition(shutdown, #state{dpr = false} = S) ->
- dpr(?REBOOT, S);
-transition(shutdown, _) -> %% DPR already send: ensure expected timeout
- dpa_timer(),
+%% Messages from old (diameter_service) code.
+transition(shutdown = T, #state{parent = Pid} = S) ->
+ transition({T, Pid, service}, S); %% Reason irrelevant: old code has no cb
+
+%% Request for graceful shutdown at remove_transport, stop_service of
+%% application shutdown.
+transition({shutdown = T, Pid}, S) ->
+ transition({T, Pid, transport}, S);
+transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) ->
+ dpr(Reason, S);
+transition({shutdown, Pid, _}, #state{parent = Pid}) ->
ok;
-%% Request to close the transport connection.
-transition({close = T, Pid}, #state{parent = Pid,
- transport = TPid}) ->
- diameter_peer:close(TPid),
- {stop, T};
-
%% DPA reception has timed out.
transition(dpa_timeout, _) ->
stop;
@@ -441,13 +446,13 @@ start_next(#state{service = Svc0} = S) ->
%% send_CER/1
send_CER(#state{mode = {connect, Remote},
- service = #diameter_service{capabilities = Caps},
+ service = #diameter_service{capabilities = LCaps},
transport = TPid}
= S) ->
- OH = Caps#diameter_caps.origin_host,
+ OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
orelse
- close({already_connected, Remote, Caps}, S),
+ close({already_connected, Remote, LCaps}, S),
CER = build_CER(S),
?LOG(send, 'CER'),
send(TPid, encode(CER)),
@@ -471,8 +476,8 @@ start_timer(#state{state = PS} = S) ->
%% build_CER/1
-build_CER(#state{service = #diameter_service{capabilities = Caps}}) ->
- {ok, CER} = diameter_capx:build_CER(Caps),
+build_CER(#state{service = #diameter_service{capabilities = LCaps}}) ->
+ {ok, CER} = diameter_capx:build_CER(LCaps),
CER.
%% encode/1
@@ -800,8 +805,8 @@ a('CER', #diameter_caps{vendor_id = Vid,
{'Product-Name', Name},
{'Origin-State-Id', OSI}];
-a('DPR', #diameter_caps{origin_host = Host,
- origin_realm = Realm}) ->
+a('DPR', #diameter_caps{origin_host = {Host, _},
+ origin_realm = {Realm, _}}) ->
['DPA', {'Origin-Host', Host},
{'Origin-Realm', Realm}].
@@ -909,7 +914,9 @@ rejected(N)
%% open/5
-open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) ->
+open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid,
+ service = Svc}
+ = S) ->
#diameter_caps{origin_host = {_,_} = H,
inband_security_id = {LS,_}}
= Caps,
@@ -917,7 +924,9 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) ->
tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S),
Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}},
- S#state{state = 'Open'}.
+ %% Replace capabilities record with local/remote pairs.
+ S#state{state = 'Open',
+ service = Svc#diameter_service{capabilities = Caps}}.
%% We've advertised TLS support: tell the transport the result
%% and expect a reply when the handshake is complete.
@@ -970,24 +979,110 @@ dwa(#diameter_caps{origin_host = OH,
{'Origin-State-Id', OSI}].
%% dpr/2
+%%
+%% The RFC isn't clear on whether DPR should be send in a non-Open
+%% state. The Peer State Machine transitions it documents aren't
+%% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to
+%% the implementation and transition to Closed (ie. die) if we haven't
+%% yet reached Open.
+
+%% Connection is open, DPR has not been sent.
+dpr(Reason, #state{state = 'Open',
+ dpr = false,
+ service = #diameter_service{capabilities = Caps}}
+ = S) ->
+ case getr(?DPR_KEY) of
+ CBs when is_list(CBs) ->
+ Ref = getr(?REF_KEY),
+ Peer = {self(), Caps},
+ dpr(CBs, [Reason, Ref, Peer], S);
+ undefined -> %% started in old code
+ send_dpr(Reason, [], S)
+ end;
-dpr(Cause, #state{transport = TPid,
- service = #diameter_service{capabilities = Caps}}
- = S) ->
- #diameter_caps{origin_host = OH,
- origin_realm = OR}
+%% Connection is open, DPR already sent.
+dpr(_, #state{state = 'Open'}) ->
+ ok;
+
+%% Connection not open.
+dpr(_Reason, _S) ->
+ stop.
+
+%% dpr/3
+%%
+%% Note that an implementation that wants to do something
+%% transport_module-specific can lookup the pid of the transport
+%% process and contact it. (eg. diameter:service_info/2)
+
+dpr([CB|Rest], [Reason | _] = Args, S) ->
+ try diameter_lib:eval([CB | Args]) of
+ {dpr, Opts} when is_list(Opts) ->
+ send_dpr(Reason, Opts, S);
+ dpr ->
+ send_dpr(Reason, [], S);
+ close = T ->
+ {stop, {disconnect_cb, T}};
+ ignore ->
+ dpr(Rest, Args, S);
+ T ->
+ No = {disconnect_cb, T},
+ diameter_lib:error_report(invalid, No),
+ {stop, No}
+ catch
+ E:R ->
+ No = {disconnect_cb, E, R, ?STACK},
+ diameter_lib:error_report(failure, No),
+ {stop, No}
+ end;
+
+dpr([], [Reason | _], S) ->
+ send_dpr(Reason, [], S).
+
+-record(opts, {cause, timeout = ?DPA_TIMEOUT}).
+
+send_dpr(Reason, Opts, #state{transport = TPid,
+ service = #diameter_service{capabilities = Caps}}
+ = S) ->
+ #opts{cause = Cause, timeout = Tmo}
+ = lists:foldl(fun opt/2,
+ #opts{cause = case Reason of
+ transport -> ?GOAWAY;
+ _ -> ?REBOOT
+ end,
+ timeout = ?DPA_TIMEOUT},
+ Opts),
+ #diameter_caps{origin_host = {OH, _},
+ origin_realm = {OR, _}}
= Caps,
Bin = encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Disconnect-Cause', Cause}]),
send(TPid, Bin),
- dpa_timer(),
+ dpa_timer(Tmo),
?LOG(send, 'DPR'),
S#state{dpr = diameter_codec:sequence_numbers(Bin)}.
-dpa_timer() ->
- erlang:send_after(?DPA_TIMEOUT, self(), dpa_timeout).
+opt({timeout, Tmo}, Rec)
+ when ?IS_TIMEOUT(Tmo) ->
+ Rec#opts{timeout = Tmo};
+opt({cause, Cause}, Rec)
+ when ?IS_CAUSE(Cause) ->
+ Rec#opts{cause = cause(Cause)};
+opt(T, _) ->
+ ?ERROR({invalid_option, T}).
+
+cause(rebooting) -> ?REBOOT;
+cause(goaway) -> ?GOAWAY;
+cause(busy) -> ?BUSY;
+cause(N)
+ when ?IS_CAUSE(N) ->
+ N;
+cause(N) ->
+ ?ERROR({invalid_cause, N}).
+
+dpa_timer(Tmo) ->
+ erlang:send_after(Tmo, self(), dpa_timeout).
%% register_everywhere/1
%%
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 36907a8d1c..395d0b77af 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -494,7 +494,7 @@ handle_call({info, Item}, _From, S) ->
{reply, service_info(Item, S), S};
handle_call(stop, _From, S) ->
- shutdown(S),
+ shutdown(service, S),
{stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
@@ -684,7 +684,7 @@ terminate(Reason, #state{service_name = Name} = S) ->
send_event(Name, stop),
ets:delete(?STATE_TABLE, Name),
shutdown == Reason %% application shutdown
- andalso shutdown(S).
+ andalso shutdown(application, S).
%%% ---------------------------------------------------------------------------
%%% # code_change(FromVsn, State, Extra)
@@ -767,44 +767,48 @@ mod_state(Alias, ModS) ->
%%% # shutdown/2
%%% ---------------------------------------------------------------------------
-shutdown(Refs, #state{peerT = PeerT}) ->
- ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT).
+%% remove_transport: ask watchdogs to terminate their transport.
+shutdown(Refs, #state{peerT = PeerT})
+ when is_list(Refs) ->
+ ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT);
-s(#peer{ref = Ref, pid = Pid}, Refs) ->
- s(lists:member(Ref, Refs), Pid);
-
-s(true, Pid) ->
- Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual
-s(false, _) ->
- ok.
-
-%%% ---------------------------------------------------------------------------
-%%% # shutdown/1
-%%% ---------------------------------------------------------------------------
-
-shutdown(#state{peerT = PeerT}) ->
+%% application/service shutdown: ask transports to terminate themselves.
+shutdown(Reason, #state{peerT = PeerT}) ->
%% A transport might not be alive to receive the shutdown request
%% but give those that are a chance to shutdown gracefully.
- wait(fun st/2, PeerT),
+ shutdown(conn, Reason, PeerT),
%% Kill the watchdogs explicitly in case there was no transport.
- wait(fun sw/2, PeerT).
+ shutdown(peer, Reason, PeerT).
-wait(Fun, T) ->
- diameter_lib:wait(ets:foldl(Fun, [], T)).
+%% sp/2
-st(#peer{op_state = {OS,_}} = P, Acc) ->
- st(P#peer{op_state = OS}, Acc);
-st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) ->
- Pid ! shutdown,
- [Pid | Acc];
-st(#peer{}, Acc) ->
- Acc.
+sp(#peer{ref = Ref, pid = Pid}, Refs) ->
+ lists:member(Ref, Refs)
+ andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up
+
+%% shutdown/3
+
+shutdown(Who, Reason, T) ->
+ diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end,
+ [],
+ T)).
-sw(#peer{pid = Pid}, Acc)
+shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) ->
+ shutdown(Who, P#peer{op_state = OS}, Reason, Acc);
+
+shutdown(conn,
+ #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid},
+ Reason,
+ Acc) ->
+ TPid ! {shutdown, Pid, Reason},
+ [TPid | Acc];
+
+shutdown(peer, #peer{pid = Pid}, _Reason, Acc)
when is_pid(Pid) ->
exit(Pid, shutdown),
[Pid | Acc];
-sw(#peer{}, Acc) ->
+
+shutdown(_, #peer{}, _, Acc) ->
Acc.
%%% ---------------------------------------------------------------------------
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index d814f1afe2..d6a2d2833b 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -48,18 +48,19 @@
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
status = initial :: initial | okay | suspect | down | reopen,
- pending = false :: boolean(),
+ pending = false :: boolean(), %% DWA
tw :: 6000..16#FFFFFFFF | {module(), atom(), list()},
%% {M,F,A} -> integer() >= 0
num_dwa = 0 :: -1 | non_neg_integer(),
%% number of DWAs received during reopen
%% end PCB
- parent = self() :: pid(),
- transport :: pid() | undefined,
+ parent = self() :: pid(), %% service process
+ transport :: pid() | undefined, %% peer_fsm process
tref :: reference(), %% reference for current watchdog timer
message_data, %% term passed into diameter_service with message
sequence :: diameter:sequence(), %% mask
- restrict :: {diameter:restriction(), boolean()}}).
+ restrict :: {diameter:restriction(), boolean()},
+ shutdown = false :: boolean()}).
%% start/2
%%
@@ -168,7 +169,8 @@ handle_info(T, S) ->
handle_info(T, upgrade(S)).
upgrade(S) ->
- #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK, {nodes, true}]).
+ #watchdog{} = list_to_tuple(tuple_to_list(S)
+ ++ [?NOMASK, {nodes, true}, false]).
event(#watchdog{status = T}, #watchdog{status = T}) ->
ok;
@@ -225,9 +227,10 @@ transition({shutdown, Pid}, #watchdog{parent = Pid,
down = S, %% sanity check
stop;
transition({shutdown = T, Pid}, #watchdog{parent = Pid,
- transport = TPid}) ->
+ transport = TPid}
+ = S) ->
TPid ! {T, self()},
- ok;
+ S#watchdog{shutdown = true};
%% Parent process has died,
transition({'DOWN', _, process, Pid, _Reason},
@@ -301,7 +304,10 @@ transition({open = P, TPid, _Hosts, T},
transition({'DOWN', _, process, TPid, _},
#watchdog{transport = TPid,
- status = initial}) ->
+ status = S,
+ shutdown = D})
+ when S == initial;
+ D ->
stop;
transition({'DOWN', _, process, TPid, _},
diff --git a/lib/diameter/test/diameter_dpr_SUITE.erl b/lib/diameter/test/diameter_dpr_SUITE.erl
new file mode 100644
index 0000000000..9252650bf7
--- /dev/null
+++ b/lib/diameter/test/diameter_dpr_SUITE.erl
@@ -0,0 +1,196 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2012. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Tests of the disconnect_cb configuration.
+%%
+
+-module(diameter_dpr_SUITE).
+
+-export([suite/0,
+ all/0,
+ groups/0,
+ init_per_group/2,
+ end_per_group/2]).
+
+%% testcases
+-export([start/1,
+ connect/1,
+ remove_transport/1,
+ stop_service/1,
+ check/1,
+ stop/1]).
+
+%% disconnect_cb
+-export([disconnect/5]).
+
+-include("diameter.hrl").
+
+%% ===========================================================================
+
+-define(util, diameter_util).
+
+-define(ADDR, {127,0,0,1}).
+
+-define(CLIENT, "CLIENT").
+-define(SERVER, "SERVER").
+
+-define(DICT_COMMON, ?DIAMETER_DICT_COMMON).
+-define(APP_ID, ?DICT_COMMON:id()).
+
+%% Config for diameter:start_service/2.
+-define(SERVICE(Host),
+ [{'Origin-Host', Host},
+ {'Origin-Realm', "erlang.org"},
+ {'Host-IP-Address', [?ADDR]},
+ {'Vendor-Id', hd(Host)}, %% match this in disconnect/5
+ {'Product-Name', "OTP/diameter"},
+ {'Acct-Application-Id', [?APP_ID]},
+ {restrict_connections, false},
+ {application, [{dictionary, ?DICT_COMMON},
+ {module, #diameter_callback{_ = false}}]}]).
+
+%% Disconnect reasons that diameter passes as the first argument of a
+%% function configured as disconnect_cb.
+-define(REASONS, [transport, service, application]).
+
+%% Valid values for Disconnect-Cause.
+-define(CAUSES, [0, rebooting, 1, busy, 2, goaway]).
+
+%% Establish one client connection for element of this list,
+%% configured with disconnect/5 as disconnect_cb and returning the
+%% specified value.
+-define(RETURNS,
+ [[close, {dpr, [{cause, invalid}]}], [ignore, close], []]
+ ++ [[{dpr, [{timeout, 5000}, {cause, T}]}] || T <- ?CAUSES]).
+
+%% ===========================================================================
+
+suite() ->
+ [{timetrap, {seconds, 60}}].
+
+all() ->
+ [{group, R} || R <- ?REASONS].
+
+%% The group determines how transports are terminated: by remove_transport,
+%% stop_service or application stop.
+groups() ->
+ Ts = tc(),
+ [{R, [], Ts} || R <- ?REASONS].
+
+init_per_group(Name, Config) ->
+ [{group, Name} | Config].
+
+end_per_group(_, _) ->
+ ok.
+
+tc() ->
+ [start, connect, remove_transport, stop_service, check, stop].
+
+%% ===========================================================================
+%% start/stop testcases
+
+start(_Config) ->
+ ok = diameter:start(),
+ ok = diameter:start_service(?SERVER, ?SERVICE(?SERVER)),
+ ok = diameter:start_service(?CLIENT, ?SERVICE(?CLIENT)).
+
+connect(Config) ->
+ Pid = spawn(fun init/0), %% process for disconnect_cb to bang
+ Grp = group(Config),
+ LRef = ?util:listen(?SERVER, tcp),
+ Refs = [?util:connect(?CLIENT, tcp, LRef, opts(RCs, {Grp, Pid}))
+ || RCs <- ?RETURNS],
+ ?util:write_priv(Config, config, [Pid | Refs]).
+
+%% Remove all the client transports only in the transport group.
+remove_transport(Config) ->
+ transport == group(Config)
+ andalso (ok = diameter:remove_transport(?CLIENT, true)).
+
+%% Stop the service only in the service group.
+stop_service(Config) ->
+ service == group(Config)
+ andalso (ok = diameter:stop_service(?CLIENT)).
+
+%% Check for callbacks and stop the service. (Not the other way around
+%% for the timing reason explained below.)
+check(Config) ->
+ Grp = group(Config),
+ [Pid | Refs] = ?util:read_priv(Config, config),
+ Pid ! self(), %% ask for dictionary
+ Dict = receive {Pid, D} -> D end, %% get it
+ check(Refs, ?RETURNS, Grp, Dict). %% check for callbacks
+
+stop(_Config) ->
+ ok = diameter:stop().
+
+%% Whether or not there are callbacks after diameter:stop() depends on
+%% timing as long as the server runs on the same node: a server
+%% transport could close the connection before the client has chance
+%% to apply its callback. Therefore, just check that there haven't
+%% been any callbacks yet.
+check(_, _, application, Dict) ->
+ [] = dict:to_list(Dict);
+
+check([], [], _, _) ->
+ ok;
+
+check([Ref | Refs], CBs, Grp, Dict) ->
+ check1(Ref, hd(CBs), Grp, Dict),
+ check(Refs, tl(CBs), Grp, Dict).
+
+check1(Ref, [ignore | RCs], Reason, Dict) ->
+ check1(Ref, RCs, Reason, Dict);
+
+check1(Ref, [_|_], Reason, Dict) ->
+ {ok, Reason} = dict:find(Ref, Dict); %% callback with expected reason
+
+check1(Ref, [], _, Dict) ->
+ error = dict:find(Ref, Dict). %% no callback
+
+%% ----------------------------------------
+
+group(Config) ->
+ {group, Grp} = lists:keyfind(group, 1, Config),
+ Grp.
+
+%% Configure the callback with the group name (= disconnect reason) as
+%% extra argument.
+opts(RCs, T) ->
+ [{disconnect_cb, {?MODULE, disconnect, [T, RC]}} || RC <- RCs].
+
+%% Match the group name with the disconnect reason to ensure the
+%% callback is being called as expected.
+disconnect(Reason, Ref, Peer, {Reason, Pid}, RC) ->
+ io:format("disconnect: ~p ~p~n", [Ref, Reason]),
+ {_, #diameter_caps{vendor_id = {$C,$S}}} = Peer,
+ Pid ! {Reason, Ref},
+ RC.
+
+init() ->
+ exit(recv(dict:new())).
+
+recv(Dict) ->
+ receive
+ Pid when is_pid(Pid) ->
+ Pid ! {self(), Dict};
+ {Reason, Ref} ->
+ recv(dict:store(Ref, Reason, Dict))
+ end.
diff --git a/lib/diameter/test/modules.mk b/lib/diameter/test/modules.mk
index 7f163536fb..5898e125ae 100644
--- a/lib/diameter/test/modules.mk
+++ b/lib/diameter/test/modules.mk
@@ -2,7 +2,7 @@
# %CopyrightBegin%
#
-# Copyright Ericsson AB 2010-2011. All Rights Reserved.
+# Copyright Ericsson AB 2010-2012. All Rights Reserved.
#
# The contents of this file are subject to the Erlang Public License,
# Version 1.1, (the "License"); you may not use this file except in
@@ -39,7 +39,8 @@ MODULES = \
diameter_traffic_SUITE \
diameter_relay_SUITE \
diameter_tls_SUITE \
- diameter_failover_SUITE
+ diameter_failover_SUITE \
+ diameter_dpr_SUITE
HRL_FILES = \
diameter_ct.hrl