diff options
-rw-r--r-- | lib/diameter/doc/src/diameter.xml | 152 | ||||
-rw-r--r-- | lib/diameter/doc/src/diameter_app.xml | 4 | ||||
-rw-r--r-- | lib/diameter/doc/src/diameter_transport.xml | 6 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 1 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 193 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 64 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 22 | ||||
-rw-r--r-- | lib/diameter/test/diameter_dpr_SUITE.erl | 196 | ||||
-rw-r--r-- | lib/diameter/test/modules.mk | 5 |
9 files changed, 524 insertions, 119 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml index a35fd5b3a8..15f398c5f8 100644 --- a/lib/diameter/doc/src/diameter.xml +++ b/lib/diameter/doc/src/diameter.xml @@ -833,6 +833,7 @@ request a connection with one peer over SCTP or another To listen on both SCTP and TCP, define one transport for each.</p> </item> +<marker id="applications"/> <tag><c>{applications, [<seealso marker="#application_alias">application_alias()</seealso>]}</c></tag> <item> <p> @@ -842,6 +843,7 @@ Defaults to all applications configured on the service in question. Applications not configured on the service in question are ignored.</p> </item> +<marker id="capabilities"/> <tag><c>{capabilities, [<seealso marker="#capability">capability()</seealso>]}</c></tag> <item> <p> @@ -856,33 +858,136 @@ TLS is desired over TCP as implemented by <seealso marker="diameter_tcp">diameter_tcp(3)</seealso>.</p> </item> +<marker id="capabilities_cb"/> <tag><c>{capabilities_cb, <seealso marker="#evaluable">evaluable()</seealso>}</c></tag> <item> <p> A callback invoked upon reception of CER/CEA during capabilities exchange in order to ask whether or not the connection should be accepted. -Applied to the relevant <c><seealso -marker="#transport_ref">transport_ref()</seealso></c> and the -<c>#diameter_caps{}</c> record of the connection. -Returning <c>ok</c> accepts the connection. -Returning <c>integer()</c> causes an incoming -CER to be answered with the specified Result-Code. -Returning <c>discard</c> causes an incoming CER to -be discarded. -Returning <c>unknown</c> is equivalent to returning <c>3010</c>, -DIAMETER_UNKNOWN_PEER. -Returning anything but <c>ok</c> or a 2xxx series result -code causes the transport connection to be broken.</p> +Applied to the <c><seealso +marker="#transport_ref">transport_ref()</seealso></c> and +<c>#diameter_caps{}</c> record of the connection.</p> + +<p> +The return value can have one of the following types.</p> + +<taglist> +<tag><c>ok</c></tag> +<item> +<p> +Accept the connection.</p> +</item> + +<tag><c>integer()</c></tag> +<item> +<p> +Causes an incoming CER to be answered with the specified Result-Code.</p> +</item> + +<tag><c>discard</c></tag> +<item> +<p> +Causes an incoming CER to be discarded without CEA being sent.</p> +</item> + +<tag><c>unknown</c></tag> +<item> +<p> +Equivalent to returning <c>3010</c>, DIAMETER_UNKNOWN_PEER.</p> +</item> +</taglist> <p> +Returning anything but <c>ok</c> or a 2xxx series result +code causes the transport connection to be broken. Multiple <c>capabilities_cb</c> options can be specified, in which case the corresponding callbacks are applied until either all return <c>ok</c> or one does not.</p> +</item> -<marker id="watchdog_timer"/> +<marker id="disconnect_cb"/> +<tag><c>{disconnect_cb, <seealso marker="#evaluable">evaluable()</seealso>}</c></tag> + +<item> +<p> +A callback invoked prior to requesting shutdown of a transport process +for a transport connection having watchdog state <c>OKAY</c>. +Applied to <c>Reason=transport|service|application</c> and the +<c><seealso marker="#transport_ref">transport_ref()</seealso></c> and +<c><seealso marker="diameter_app#peer">diameter_app:peer()</seealso></c> +in question, <c>Reason</c> indicating whether the the diameter +application is being stopped, the service in question is being stopped +at <seealso +marker="diameter#stop_service">diameter:stop_service/1</seealso> or +the transport in question is being removed at <seealso +marker="diameter#remove_transport">diameter:remove_transport/2</seealso>, +respectively.</p> + +<p> +The return value can have one of the following types.</p> + +<taglist> +<tag><c>{dpr, [option()]}</c></tag> +<item> +<p> +Causes Disconnect-Peer-Request to be sent to the peer, transport +process shutdown being requested after reception of +Disconnect-Peer-Answer or timeout. +An <c>option()</c> can be one of the following.</p> + +<taglist> +<tag><c>{timeout, integer()}</c></tag> +<item> +<p> +Transport process shutdown will be requested after this number of +milliseconds if DPA is not received. +Defaults to 1000.</p> +</item> + +<tag><c>{cause, 0|rebooting|1|busy|2|goaway}</c></tag> +<item> +<p> +The Disconnect-Cause to send, <c>REBOOTING</c>, <c>BUSY</c> and +<c>DO_NOT_WANT_TO_TALK_TO_YOU</c> respectively. +Defaults to <c>rebooting</c> for <c>Reason=service|application</c> and +<c>goaway</c> for <c>Reason=transport</c>.</p> +</item> +</taglist> +</item> + +<tag><c>dpr</c></tag> +<item> +<p> +Equivalent to <c>{dpr, []}</c>.</p> +</item> + +<tag><c>close</c></tag> +<item> +<p> +Causes transport process shutdown to be requested without +Disconnect-Peer-Request being sent to the peer.</p> </item> +<tag><c>ignore</c></tag> +<item> +<p> +Equivalent to not having configured the callback.</p> +</item> +</taglist> + +<p> +Multiple <c>disconnect_cb</c> options can be specified, in which +case the corresponding callbacks are applied until one of them returns +a value other than <c>ignore</c>. +All callbacks returning <c>ignore</c> is equivalent to not having +configured them.</p> + +<p> +Defaults to a single callback returning <c>dpr</c>.</p> +</item> + +<marker id="watchdog_timer"/> <tag><c>{watchdog_timer, TwInit}</c></tag> <item> <code> @@ -902,10 +1007,9 @@ the callback.</p> <p> An integer value must be at least 6000 as required by RFC 3539. Defaults to 30000 if unspecified.</p> - -<marker id="reconnect_timer"/> </item> +<marker id="reconnect_timer"/> <tag><c>{reconnect_timer, Tc}</c></tag> <item> <code> @@ -1161,7 +1265,7 @@ at the time the diameter application was started.</p> <!-- ===================================================================== --> <func> -<name>remove_transport(SvcName, Pred) -> ok</name> +<name>remove_transport(SvcName, Pred) -> ok | {error, Reason}</name> <fsummary>Remove previously added transports.</fsummary> <type> <v>SvcName = <seealso marker="#service_name">service_name()</seealso></v> @@ -1171,6 +1275,7 @@ at the time the diameter application was started.</p> <v> | fun((<seealso marker="#transport_ref">transport_ref()</seealso>, list()) -> boolean())</v> <v> | fun((list()) -> boolean())</v> <v>MFA = {atom(), atom(), list()}</v> +<v>Reason = term()</v> </type> <desc> <p> @@ -1196,15 +1301,12 @@ Pred = {M,F,A}: fun(Ref, Type, Opts) -> apply(M, F, [Ref, Type, Opts | A]) end </code> <p> -Removing a transport causes all associated transport connections to -be broken. -A DPR message with -Disconnect-Cause <c>DO_NOT_WANT_TO_TALK_TO_YOU</c> will be sent -to each connected peer before disassociating the transport configuration -from the service and terminating the transport upon reception of -DPA or timeout.</p> - -<!-- TODO: document the timeout value, possibly make configurable. --> +Removing a transport causes the corresponding transport processes to +be asked to terminate. +Whether or not a DPR message is sent to a peer is +controlled by +value of <seealso marker="disconnect_cb">disconnect_cb</seealso> +configured on the transport.</p> <marker id="service_info"/> </desc> diff --git a/lib/diameter/doc/src/diameter_app.xml b/lib/diameter/doc/src/diameter_app.xml index 9d8a6568eb..ac056c2d39 100644 --- a/lib/diameter/doc/src/diameter_app.xml +++ b/lib/diameter/doc/src/diameter_app.xml @@ -309,12 +309,12 @@ by either <seealso marker="#handle_answer">handle_answer/4</seealso> or <seealso marker="#handle_error">handle_error/4</seealso> depending on whether or not an answer message is received from the peer. If the transport becomes unavailable after <seealso -marker="prepare_request">prepare_request/3</seealso> then a new <seealso +marker="#prepare_request">prepare_request/3</seealso> then a new <seealso marker="#pick_peer">pick_peer/4</seealso> callback may take place to failover to an alternate peer, after which <seealso marker="#prepare_retransmit">prepare_retransmit/3</seealso> takes the place of <seealso -marker="prepare_request">prepare_request/3</seealso> in resending the +marker="#prepare_request">prepare_request/3</seealso> in resending the request. There is no guarantee that a <seealso marker="#pick_peer">pick_peer/4</seealso> callback to select diff --git a/lib/diameter/doc/src/diameter_transport.xml b/lib/diameter/doc/src/diameter_transport.xml index d9b36a1e09..0c8b41397a 100644 --- a/lib/diameter/doc/src/diameter_transport.xml +++ b/lib/diameter/doc/src/diameter_transport.xml @@ -149,9 +149,9 @@ contains the binary to send.</p> <tag><c>{diameter, {close, Pid}}</c></tag> <item> <p> -A request to close the transport connection. -The transport process should terminate after closing the -connection. +A request to terminate the transport process after having received DPA +in response to DPR. +The transport process should exit. <c>Pid</c> is the pid() of the parent process.</p> </item> diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index 3e3a6be0ef..95702f03d4 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -330,6 +330,7 @@ call(SvcName, App, Message) -> | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} + | {disconnect_cb, evaluable()} | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}} | {reconnect_timer, 'Unsigned32'()} | {private, any()}. diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 3f4945f7a6..4acfd8313b 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -48,15 +48,19 @@ -include("diameter_internal.hrl"). -include("diameter_gen_base_rfc3588.hrl"). +%% Values of Disconnect-Cause in DPR. -define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU'). -define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING'). +-define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY'). -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). %% Keys in process dictionary. -define(CB_KEY, cb). %% capabilities callback +-define(DPR_KEY, dpr). %% disconnect callback -define(DWA_KEY, dwa). %% outgoing DWA +-define(REF_KEY, ref). %% transport_ref() -define(Q_KEY, q). %% transport start queue -define(START_KEY, start). %% start of connected transport -define(SEQUENCE_KEY, mask). %% mask for sequence numbers @@ -68,6 +72,13 @@ %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). +%% Guards. +-define(IS_UINT32(N), (0 =< N andalso 0 == N bsr 32)). +-define(IS_TIMEOUT(N), ?IS_UINT32(N)). +-define(IS_CAUSE(N), N == ?REBOOT; N == rebooting; + N == ?GOAWAY; N == goaway; + N == ?BUSY; N == busy). + %% RFC 3588: %% %% Timeout An application-defined timer has expired while waiting @@ -75,18 +86,16 @@ %% -define(EVENT_TIMEOUT, 10000). -%% How long to wait for a DPA in response to DPR before simply -%% aborting. Used to distinguish between shutdown and not but there's -%% not really any need. Stopping a service will require a timeout if -%% the peer doesn't answer DPR so the value should be short-ish. +%% Default timeout for DPA in response to DPR. A bit short but the +%% timeout used to be hardcoded. (So it could be worse.) -define(DPA_TIMEOUT, 1000). -record(state, {state = 'Wait-Conn-Ack' %% state of RFC 3588 Peer State Machine :: 'Wait-Conn-Ack' | recv_CER | 'Wait-CEA' | 'Open', mode :: accept | connect | {connect, reference()}, - parent :: pid(), - transport :: pid(), + parent :: pid(), %% watchdog process + transport :: pid(), %% transport process service :: #diameter_service{}, dpr = false :: false | {diameter:'Unsigned32'(), diameter:'Unsigned32'()}}). @@ -163,14 +172,16 @@ i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}}); i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, - capabilities = Caps} + capabilities = LCaps} = Svc}}) -> [] /= Apps orelse ?ERROR({no_apps, T, Opts}), - putr(?DWA_KEY, dwa(Caps)), + putr(?DWA_KEY, dwa(LCaps)), {M, Ref} = T, diameter_stats:reg(Ref), - {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), - putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), + {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), + putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), + putr(?DPR_KEY, [F || {_, F} <- Ds]), + putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), erlang:monitor(process, WPid), @@ -188,8 +199,8 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. -start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) -> - Addrs0 = Caps#diameter_caps.host_ip_address, +start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> + Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). start_transport(Addrs0, T) -> @@ -212,9 +223,9 @@ svc(Svc, []) -> svc(Svc, Addrs) -> readdr(Svc, Addrs). -readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) -> - Caps = Caps0#diameter_caps{host_ip_address = Addrs}, - Svc#diameter_service{capabilities = Caps}. +readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> + LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, + Svc#diameter_service{capabilities = LCaps}. %% The 4-tuple Data returned from diameter_peer:start/1 identifies the %% transport module/config use to start the transport process in @@ -375,25 +386,19 @@ transition({send, Msg}, #state{transport = TPid}) -> send(TPid, Msg), ok; -%% Request for graceful shutdown. -transition({shutdown, Pid}, #state{parent = Pid, dpr = false} = S) -> - dpr(?GOAWAY, S); -transition({shutdown, Pid}, #state{parent = Pid}) -> - ok; - -%% Application shutdown. -transition(shutdown, #state{dpr = false} = S) -> - dpr(?REBOOT, S); -transition(shutdown, _) -> %% DPR already send: ensure expected timeout - dpa_timer(), +%% Messages from old (diameter_service) code. +transition(shutdown = T, #state{parent = Pid} = S) -> + transition({T, Pid, service}, S); %% Reason irrelevant: old code has no cb + +%% Request for graceful shutdown at remove_transport, stop_service of +%% application shutdown. +transition({shutdown = T, Pid}, S) -> + transition({T, Pid, transport}, S); +transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> + dpr(Reason, S); +transition({shutdown, Pid, _}, #state{parent = Pid}) -> ok; -%% Request to close the transport connection. -transition({close = T, Pid}, #state{parent = Pid, - transport = TPid}) -> - diameter_peer:close(TPid), - {stop, T}; - %% DPA reception has timed out. transition(dpa_timeout, _) -> stop; @@ -441,13 +446,13 @@ start_next(#state{service = Svc0} = S) -> %% send_CER/1 send_CER(#state{mode = {connect, Remote}, - service = #diameter_service{capabilities = Caps}, + service = #diameter_service{capabilities = LCaps}, transport = TPid} = S) -> - OH = Caps#diameter_caps.origin_host, + OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse - close({already_connected, Remote, Caps}, S), + close({already_connected, Remote, LCaps}, S), CER = build_CER(S), ?LOG(send, 'CER'), send(TPid, encode(CER)), @@ -471,8 +476,8 @@ start_timer(#state{state = PS} = S) -> %% build_CER/1 -build_CER(#state{service = #diameter_service{capabilities = Caps}}) -> - {ok, CER} = diameter_capx:build_CER(Caps), +build_CER(#state{service = #diameter_service{capabilities = LCaps}}) -> + {ok, CER} = diameter_capx:build_CER(LCaps), CER. %% encode/1 @@ -800,8 +805,8 @@ a('CER', #diameter_caps{vendor_id = Vid, {'Product-Name', Name}, {'Origin-State-Id', OSI}]; -a('DPR', #diameter_caps{origin_host = Host, - origin_realm = Realm}) -> +a('DPR', #diameter_caps{origin_host = {Host, _}, + origin_realm = {Realm, _}}) -> ['DPA', {'Origin-Host', Host}, {'Origin-Realm', Realm}]. @@ -909,7 +914,9 @@ rejected(N) %% open/5 -open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) -> +open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, + service = Svc} + = S) -> #diameter_caps{origin_host = {_,_} = H, inband_security_id = {LS,_}} = Caps, @@ -917,7 +924,9 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) -> tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S), Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}}, - S#state{state = 'Open'}. + %% Replace capabilities record with local/remote pairs. + S#state{state = 'Open', + service = Svc#diameter_service{capabilities = Caps}}. %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. @@ -970,24 +979,110 @@ dwa(#diameter_caps{origin_host = OH, {'Origin-State-Id', OSI}]. %% dpr/2 +%% +%% The RFC isn't clear on whether DPR should be send in a non-Open +%% state. The Peer State Machine transitions it documents aren't +%% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to +%% the implementation and transition to Closed (ie. die) if we haven't +%% yet reached Open. + +%% Connection is open, DPR has not been sent. +dpr(Reason, #state{state = 'Open', + dpr = false, + service = #diameter_service{capabilities = Caps}} + = S) -> + case getr(?DPR_KEY) of + CBs when is_list(CBs) -> + Ref = getr(?REF_KEY), + Peer = {self(), Caps}, + dpr(CBs, [Reason, Ref, Peer], S); + undefined -> %% started in old code + send_dpr(Reason, [], S) + end; -dpr(Cause, #state{transport = TPid, - service = #diameter_service{capabilities = Caps}} - = S) -> - #diameter_caps{origin_host = OH, - origin_realm = OR} +%% Connection is open, DPR already sent. +dpr(_, #state{state = 'Open'}) -> + ok; + +%% Connection not open. +dpr(_Reason, _S) -> + stop. + +%% dpr/3 +%% +%% Note that an implementation that wants to do something +%% transport_module-specific can lookup the pid of the transport +%% process and contact it. (eg. diameter:service_info/2) + +dpr([CB|Rest], [Reason | _] = Args, S) -> + try diameter_lib:eval([CB | Args]) of + {dpr, Opts} when is_list(Opts) -> + send_dpr(Reason, Opts, S); + dpr -> + send_dpr(Reason, [], S); + close = T -> + {stop, {disconnect_cb, T}}; + ignore -> + dpr(Rest, Args, S); + T -> + No = {disconnect_cb, T}, + diameter_lib:error_report(invalid, No), + {stop, No} + catch + E:R -> + No = {disconnect_cb, E, R, ?STACK}, + diameter_lib:error_report(failure, No), + {stop, No} + end; + +dpr([], [Reason | _], S) -> + send_dpr(Reason, [], S). + +-record(opts, {cause, timeout = ?DPA_TIMEOUT}). + +send_dpr(Reason, Opts, #state{transport = TPid, + service = #diameter_service{capabilities = Caps}} + = S) -> + #opts{cause = Cause, timeout = Tmo} + = lists:foldl(fun opt/2, + #opts{cause = case Reason of + transport -> ?GOAWAY; + _ -> ?REBOOT + end, + timeout = ?DPA_TIMEOUT}, + Opts), + #diameter_caps{origin_host = {OH, _}, + origin_realm = {OR, _}} = Caps, Bin = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Disconnect-Cause', Cause}]), send(TPid, Bin), - dpa_timer(), + dpa_timer(Tmo), ?LOG(send, 'DPR'), S#state{dpr = diameter_codec:sequence_numbers(Bin)}. -dpa_timer() -> - erlang:send_after(?DPA_TIMEOUT, self(), dpa_timeout). +opt({timeout, Tmo}, Rec) + when ?IS_TIMEOUT(Tmo) -> + Rec#opts{timeout = Tmo}; +opt({cause, Cause}, Rec) + when ?IS_CAUSE(Cause) -> + Rec#opts{cause = cause(Cause)}; +opt(T, _) -> + ?ERROR({invalid_option, T}). + +cause(rebooting) -> ?REBOOT; +cause(goaway) -> ?GOAWAY; +cause(busy) -> ?BUSY; +cause(N) + when ?IS_CAUSE(N) -> + N; +cause(N) -> + ?ERROR({invalid_cause, N}). + +dpa_timer(Tmo) -> + erlang:send_after(Tmo, self(), dpa_timeout). %% register_everywhere/1 %% diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 36907a8d1c..395d0b77af 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -494,7 +494,7 @@ handle_call({info, Item}, _From, S) -> {reply, service_info(Item, S), S}; handle_call(stop, _From, S) -> - shutdown(S), + shutdown(service, S), {stop, normal, ok, S}; %% The server currently isn't guaranteed to be dead when the caller %% gets the reply. We deal with this in the call to the server, @@ -684,7 +684,7 @@ terminate(Reason, #state{service_name = Name} = S) -> send_event(Name, stop), ets:delete(?STATE_TABLE, Name), shutdown == Reason %% application shutdown - andalso shutdown(S). + andalso shutdown(application, S). %%% --------------------------------------------------------------------------- %%% # code_change(FromVsn, State, Extra) @@ -767,44 +767,48 @@ mod_state(Alias, ModS) -> %%% # shutdown/2 %%% --------------------------------------------------------------------------- -shutdown(Refs, #state{peerT = PeerT}) -> - ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT). +%% remove_transport: ask watchdogs to terminate their transport. +shutdown(Refs, #state{peerT = PeerT}) + when is_list(Refs) -> + ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT); -s(#peer{ref = Ref, pid = Pid}, Refs) -> - s(lists:member(Ref, Refs), Pid); - -s(true, Pid) -> - Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual -s(false, _) -> - ok. - -%%% --------------------------------------------------------------------------- -%%% # shutdown/1 -%%% --------------------------------------------------------------------------- - -shutdown(#state{peerT = PeerT}) -> +%% application/service shutdown: ask transports to terminate themselves. +shutdown(Reason, #state{peerT = PeerT}) -> %% A transport might not be alive to receive the shutdown request %% but give those that are a chance to shutdown gracefully. - wait(fun st/2, PeerT), + shutdown(conn, Reason, PeerT), %% Kill the watchdogs explicitly in case there was no transport. - wait(fun sw/2, PeerT). + shutdown(peer, Reason, PeerT). -wait(Fun, T) -> - diameter_lib:wait(ets:foldl(Fun, [], T)). +%% sp/2 -st(#peer{op_state = {OS,_}} = P, Acc) -> - st(P#peer{op_state = OS}, Acc); -st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) -> - Pid ! shutdown, - [Pid | Acc]; -st(#peer{}, Acc) -> - Acc. +sp(#peer{ref = Ref, pid = Pid}, Refs) -> + lists:member(Ref, Refs) + andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up + +%% shutdown/3 + +shutdown(Who, Reason, T) -> + diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end, + [], + T)). -sw(#peer{pid = Pid}, Acc) +shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) -> + shutdown(Who, P#peer{op_state = OS}, Reason, Acc); + +shutdown(conn, + #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid}, + Reason, + Acc) -> + TPid ! {shutdown, Pid, Reason}, + [TPid | Acc]; + +shutdown(peer, #peer{pid = Pid}, _Reason, Acc) when is_pid(Pid) -> exit(Pid, shutdown), [Pid | Acc]; -sw(#peer{}, Acc) -> + +shutdown(_, #peer{}, _, Acc) -> Acc. %%% --------------------------------------------------------------------------- diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index d814f1afe2..d6a2d2833b 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -48,18 +48,19 @@ -record(watchdog, {%% PCB - Peer Control Block; see RFC 3539, Appendix A status = initial :: initial | okay | suspect | down | reopen, - pending = false :: boolean(), + pending = false :: boolean(), %% DWA tw :: 6000..16#FFFFFFFF | {module(), atom(), list()}, %% {M,F,A} -> integer() >= 0 num_dwa = 0 :: -1 | non_neg_integer(), %% number of DWAs received during reopen %% end PCB - parent = self() :: pid(), - transport :: pid() | undefined, + parent = self() :: pid(), %% service process + transport :: pid() | undefined, %% peer_fsm process tref :: reference(), %% reference for current watchdog timer message_data, %% term passed into diameter_service with message sequence :: diameter:sequence(), %% mask - restrict :: {diameter:restriction(), boolean()}}). + restrict :: {diameter:restriction(), boolean()}, + shutdown = false :: boolean()}). %% start/2 %% @@ -168,7 +169,8 @@ handle_info(T, S) -> handle_info(T, upgrade(S)). upgrade(S) -> - #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK, {nodes, true}]). + #watchdog{} = list_to_tuple(tuple_to_list(S) + ++ [?NOMASK, {nodes, true}, false]). event(#watchdog{status = T}, #watchdog{status = T}) -> ok; @@ -225,9 +227,10 @@ transition({shutdown, Pid}, #watchdog{parent = Pid, down = S, %% sanity check stop; transition({shutdown = T, Pid}, #watchdog{parent = Pid, - transport = TPid}) -> + transport = TPid} + = S) -> TPid ! {T, self()}, - ok; + S#watchdog{shutdown = true}; %% Parent process has died, transition({'DOWN', _, process, Pid, _Reason}, @@ -301,7 +304,10 @@ transition({open = P, TPid, _Hosts, T}, transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid, - status = initial}) -> + status = S, + shutdown = D}) + when S == initial; + D -> stop; transition({'DOWN', _, process, TPid, _}, diff --git a/lib/diameter/test/diameter_dpr_SUITE.erl b/lib/diameter/test/diameter_dpr_SUITE.erl new file mode 100644 index 0000000000..9252650bf7 --- /dev/null +++ b/lib/diameter/test/diameter_dpr_SUITE.erl @@ -0,0 +1,196 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2012. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Tests of the disconnect_cb configuration. +%% + +-module(diameter_dpr_SUITE). + +-export([suite/0, + all/0, + groups/0, + init_per_group/2, + end_per_group/2]). + +%% testcases +-export([start/1, + connect/1, + remove_transport/1, + stop_service/1, + check/1, + stop/1]). + +%% disconnect_cb +-export([disconnect/5]). + +-include("diameter.hrl"). + +%% =========================================================================== + +-define(util, diameter_util). + +-define(ADDR, {127,0,0,1}). + +-define(CLIENT, "CLIENT"). +-define(SERVER, "SERVER"). + +-define(DICT_COMMON, ?DIAMETER_DICT_COMMON). +-define(APP_ID, ?DICT_COMMON:id()). + +%% Config for diameter:start_service/2. +-define(SERVICE(Host), + [{'Origin-Host', Host}, + {'Origin-Realm', "erlang.org"}, + {'Host-IP-Address', [?ADDR]}, + {'Vendor-Id', hd(Host)}, %% match this in disconnect/5 + {'Product-Name', "OTP/diameter"}, + {'Acct-Application-Id', [?APP_ID]}, + {restrict_connections, false}, + {application, [{dictionary, ?DICT_COMMON}, + {module, #diameter_callback{_ = false}}]}]). + +%% Disconnect reasons that diameter passes as the first argument of a +%% function configured as disconnect_cb. +-define(REASONS, [transport, service, application]). + +%% Valid values for Disconnect-Cause. +-define(CAUSES, [0, rebooting, 1, busy, 2, goaway]). + +%% Establish one client connection for element of this list, +%% configured with disconnect/5 as disconnect_cb and returning the +%% specified value. +-define(RETURNS, + [[close, {dpr, [{cause, invalid}]}], [ignore, close], []] + ++ [[{dpr, [{timeout, 5000}, {cause, T}]}] || T <- ?CAUSES]). + +%% =========================================================================== + +suite() -> + [{timetrap, {seconds, 60}}]. + +all() -> + [{group, R} || R <- ?REASONS]. + +%% The group determines how transports are terminated: by remove_transport, +%% stop_service or application stop. +groups() -> + Ts = tc(), + [{R, [], Ts} || R <- ?REASONS]. + +init_per_group(Name, Config) -> + [{group, Name} | Config]. + +end_per_group(_, _) -> + ok. + +tc() -> + [start, connect, remove_transport, stop_service, check, stop]. + +%% =========================================================================== +%% start/stop testcases + +start(_Config) -> + ok = diameter:start(), + ok = diameter:start_service(?SERVER, ?SERVICE(?SERVER)), + ok = diameter:start_service(?CLIENT, ?SERVICE(?CLIENT)). + +connect(Config) -> + Pid = spawn(fun init/0), %% process for disconnect_cb to bang + Grp = group(Config), + LRef = ?util:listen(?SERVER, tcp), + Refs = [?util:connect(?CLIENT, tcp, LRef, opts(RCs, {Grp, Pid})) + || RCs <- ?RETURNS], + ?util:write_priv(Config, config, [Pid | Refs]). + +%% Remove all the client transports only in the transport group. +remove_transport(Config) -> + transport == group(Config) + andalso (ok = diameter:remove_transport(?CLIENT, true)). + +%% Stop the service only in the service group. +stop_service(Config) -> + service == group(Config) + andalso (ok = diameter:stop_service(?CLIENT)). + +%% Check for callbacks and stop the service. (Not the other way around +%% for the timing reason explained below.) +check(Config) -> + Grp = group(Config), + [Pid | Refs] = ?util:read_priv(Config, config), + Pid ! self(), %% ask for dictionary + Dict = receive {Pid, D} -> D end, %% get it + check(Refs, ?RETURNS, Grp, Dict). %% check for callbacks + +stop(_Config) -> + ok = diameter:stop(). + +%% Whether or not there are callbacks after diameter:stop() depends on +%% timing as long as the server runs on the same node: a server +%% transport could close the connection before the client has chance +%% to apply its callback. Therefore, just check that there haven't +%% been any callbacks yet. +check(_, _, application, Dict) -> + [] = dict:to_list(Dict); + +check([], [], _, _) -> + ok; + +check([Ref | Refs], CBs, Grp, Dict) -> + check1(Ref, hd(CBs), Grp, Dict), + check(Refs, tl(CBs), Grp, Dict). + +check1(Ref, [ignore | RCs], Reason, Dict) -> + check1(Ref, RCs, Reason, Dict); + +check1(Ref, [_|_], Reason, Dict) -> + {ok, Reason} = dict:find(Ref, Dict); %% callback with expected reason + +check1(Ref, [], _, Dict) -> + error = dict:find(Ref, Dict). %% no callback + +%% ---------------------------------------- + +group(Config) -> + {group, Grp} = lists:keyfind(group, 1, Config), + Grp. + +%% Configure the callback with the group name (= disconnect reason) as +%% extra argument. +opts(RCs, T) -> + [{disconnect_cb, {?MODULE, disconnect, [T, RC]}} || RC <- RCs]. + +%% Match the group name with the disconnect reason to ensure the +%% callback is being called as expected. +disconnect(Reason, Ref, Peer, {Reason, Pid}, RC) -> + io:format("disconnect: ~p ~p~n", [Ref, Reason]), + {_, #diameter_caps{vendor_id = {$C,$S}}} = Peer, + Pid ! {Reason, Ref}, + RC. + +init() -> + exit(recv(dict:new())). + +recv(Dict) -> + receive + Pid when is_pid(Pid) -> + Pid ! {self(), Dict}; + {Reason, Ref} -> + recv(dict:store(Ref, Reason, Dict)) + end. diff --git a/lib/diameter/test/modules.mk b/lib/diameter/test/modules.mk index 7f163536fb..5898e125ae 100644 --- a/lib/diameter/test/modules.mk +++ b/lib/diameter/test/modules.mk @@ -2,7 +2,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2011. All Rights Reserved. +# Copyright Ericsson AB 2010-2012. All Rights Reserved. # # The contents of this file are subject to the Erlang Public License, # Version 1.1, (the "License"); you may not use this file except in @@ -39,7 +39,8 @@ MODULES = \ diameter_traffic_SUITE \ diameter_relay_SUITE \ diameter_tls_SUITE \ - diameter_failover_SUITE + diameter_failover_SUITE \ + diameter_dpr_SUITE HRL_FILES = \ diameter_ct.hrl |