aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2012-10-12 11:19:49 +0200
committerAnders Svensson <[email protected]>2012-11-15 16:07:49 +0100
commit380a3f0aece61c649efaed45041c8679038891f1 (patch)
treec385a5842761ff94741af142a9c86ef28b8c71ea /lib
parentf472ddbedec007b37fff5b40a379b97656f15bce (diff)
downloadotp-380a3f0aece61c649efaed45041c8679038891f1.tar.gz
otp-380a3f0aece61c649efaed45041c8679038891f1.tar.bz2
otp-380a3f0aece61c649efaed45041c8679038891f1.zip
Implement transport_opt() disconnect_cb
Diffstat (limited to 'lib')
-rw-r--r--lib/diameter/src/base/diameter.erl1
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl187
-rw-r--r--lib/diameter/src/base/diameter_service.erl64
3 files changed, 179 insertions, 73 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index 3e3a6be0ef..95702f03d4 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -330,6 +330,7 @@ call(SvcName, App, Message) ->
| {applications, [app_alias()]}
| {capabilities, [capability()]}
| {capabilities_cb, evaluable()}
+ | {disconnect_cb, evaluable()}
| {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
| {reconnect_timer, 'Unsigned32'()}
| {private, any()}.
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 3f4945f7a6..ecdb09d34a 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -48,15 +48,19 @@
-include("diameter_internal.hrl").
-include("diameter_gen_base_rfc3588.hrl").
+%% Values of Disconnect-Cause in DPR.
-define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU').
-define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING').
+-define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY').
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
%% Keys in process dictionary.
-define(CB_KEY, cb). %% capabilities callback
+-define(DPR_KEY, dpr). %% disconnect callback
-define(DWA_KEY, dwa). %% outgoing DWA
+-define(REF_KEY, ref). %% transport_ref()
-define(Q_KEY, q). %% transport start queue
-define(START_KEY, start). %% start of connected transport
-define(SEQUENCE_KEY, mask). %% mask for sequence numbers
@@ -68,6 +72,13 @@
%% A 2xxx series Result-Code. Not necessarily 2001.
-define(IS_SUCCESS(N), 2 == (N) div 1000).
+%% Guards.
+-define(IS_UINT32(N), (0 =< N andalso 0 == N bsr 32)).
+-define(IS_TIMEOUT(N), ?IS_UINT32(N)).
+-define(IS_CAUSE(N), N == ?REBOOT; N == rebooting;
+ N == ?GOAWAY; N == goaway;
+ N == ?BUSY; N == busy).
+
%% RFC 3588:
%%
%% Timeout An application-defined timer has expired while waiting
@@ -75,18 +86,16 @@
%%
-define(EVENT_TIMEOUT, 10000).
-%% How long to wait for a DPA in response to DPR before simply
-%% aborting. Used to distinguish between shutdown and not but there's
-%% not really any need. Stopping a service will require a timeout if
-%% the peer doesn't answer DPR so the value should be short-ish.
+%% Default timeout for DPA in response to DPR. A bit short but the
+%% timeout used to be hardcoded. (So it could be worse.)
-define(DPA_TIMEOUT, 1000).
-record(state,
{state = 'Wait-Conn-Ack' %% state of RFC 3588 Peer State Machine
:: 'Wait-Conn-Ack' | recv_CER | 'Wait-CEA' | 'Open',
mode :: accept | connect | {connect, reference()},
- parent :: pid(),
- transport :: pid(),
+ parent :: pid(), %% watchdog process
+ transport :: pid(), %% transport process
service :: #diameter_service{},
dpr = false :: false | {diameter:'Unsigned32'(),
diameter:'Unsigned32'()}}).
@@ -163,14 +172,16 @@ i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code
i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}});
i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
- capabilities = Caps}
+ capabilities = LCaps}
= Svc}}) ->
[] /= Apps orelse ?ERROR({no_apps, T, Opts}),
- putr(?DWA_KEY, dwa(Caps)),
+ putr(?DWA_KEY, dwa(LCaps)),
{M, Ref} = T,
diameter_stats:reg(Ref),
- {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
- putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
+ {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]),
+ putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}),
+ putr(?DPR_KEY, [F || {_, F} <- Ds]),
+ putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
erlang:monitor(process, WPid),
@@ -188,8 +199,8 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
-start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) ->
- Addrs0 = Caps#diameter_caps.host_ip_address,
+start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
+ Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
start_transport(Addrs0, T) ->
@@ -212,9 +223,9 @@ svc(Svc, []) ->
svc(Svc, Addrs) ->
readdr(Svc, Addrs).
-readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) ->
- Caps = Caps0#diameter_caps{host_ip_address = Addrs},
- Svc#diameter_service{capabilities = Caps}.
+readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) ->
+ LCaps = LCaps0#diameter_caps{host_ip_address = Addrs},
+ Svc#diameter_service{capabilities = LCaps}.
%% The 4-tuple Data returned from diameter_peer:start/1 identifies the
%% transport module/config use to start the transport process in
@@ -375,17 +386,17 @@ transition({send, Msg}, #state{transport = TPid}) ->
send(TPid, Msg),
ok;
-%% Request for graceful shutdown.
-transition({shutdown, Pid}, #state{parent = Pid, dpr = false} = S) ->
- dpr(?GOAWAY, S);
-transition({shutdown, Pid}, #state{parent = Pid}) ->
- ok;
-
-%% Application shutdown.
-transition(shutdown, #state{dpr = false} = S) ->
- dpr(?REBOOT, S);
-transition(shutdown, _) -> %% DPR already send: ensure expected timeout
- dpa_timer(),
+%% Messages from old (diameter_service) code.
+transition(shutdown = T, #state{parent = Pid} = S) ->
+ transition({T, Pid, service}, S); %% Reason irrelevant: old code has no cb
+
+%% Request for graceful shutdown at remove_transport, stop_service of
+%% application shutdown.
+transition({shutdown = T, Pid}, S) ->
+ transition({T, Pid, transport}, S);
+transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) ->
+ dpr(Reason, S);
+transition({shutdown, Pid, _}, #state{parent = Pid}) ->
ok;
%% Request to close the transport connection.
@@ -441,13 +452,13 @@ start_next(#state{service = Svc0} = S) ->
%% send_CER/1
send_CER(#state{mode = {connect, Remote},
- service = #diameter_service{capabilities = Caps},
+ service = #diameter_service{capabilities = LCaps},
transport = TPid}
= S) ->
- OH = Caps#diameter_caps.origin_host,
+ OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
orelse
- close({already_connected, Remote, Caps}, S),
+ close({already_connected, Remote, LCaps}, S),
CER = build_CER(S),
?LOG(send, 'CER'),
send(TPid, encode(CER)),
@@ -471,8 +482,8 @@ start_timer(#state{state = PS} = S) ->
%% build_CER/1
-build_CER(#state{service = #diameter_service{capabilities = Caps}}) ->
- {ok, CER} = diameter_capx:build_CER(Caps),
+build_CER(#state{service = #diameter_service{capabilities = LCaps}}) ->
+ {ok, CER} = diameter_capx:build_CER(LCaps),
CER.
%% encode/1
@@ -800,8 +811,8 @@ a('CER', #diameter_caps{vendor_id = Vid,
{'Product-Name', Name},
{'Origin-State-Id', OSI}];
-a('DPR', #diameter_caps{origin_host = Host,
- origin_realm = Realm}) ->
+a('DPR', #diameter_caps{origin_host = {Host, _},
+ origin_realm = {Realm, _}}) ->
['DPA', {'Origin-Host', Host},
{'Origin-Realm', Realm}].
@@ -909,7 +920,9 @@ rejected(N)
%% open/5
-open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) ->
+open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid,
+ service = Svc}
+ = S) ->
#diameter_caps{origin_host = {_,_} = H,
inband_security_id = {LS,_}}
= Caps,
@@ -917,7 +930,9 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) ->
tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S),
Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}},
- S#state{state = 'Open'}.
+ %% Replace capabilities record with local/remote pairs.
+ S#state{state = 'Open',
+ service = Svc#diameter_service{capabilities = Caps}}.
%% We've advertised TLS support: tell the transport the result
%% and expect a reply when the handshake is complete.
@@ -970,24 +985,110 @@ dwa(#diameter_caps{origin_host = OH,
{'Origin-State-Id', OSI}].
%% dpr/2
+%%
+%% The RFC isn't clear on whether DPR should be send in a non-Open
+%% state. The Peer State Machine transitions it documents aren't
+%% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to
+%% the implementation and transition to Closed (ie. die) if we haven't
+%% yet reached Open.
+
+%% Connection is open, DPR has not been sent.
+dpr(Reason, #state{state = 'Open',
+ dpr = false,
+ service = #diameter_service{capabilities = Caps}}
+ = S) ->
+ case getr(?DPR_KEY) of
+ CBs when is_list(CBs) ->
+ Ref = getr(?REF_KEY),
+ Peer = {self(), Caps},
+ dpr(CBs, [Reason, Ref, Peer], S);
+ undefined -> %% started in old code
+ send_dpr(Reason, [], S)
+ end;
-dpr(Cause, #state{transport = TPid,
- service = #diameter_service{capabilities = Caps}}
- = S) ->
- #diameter_caps{origin_host = OH,
- origin_realm = OR}
+%% Connection is open, DPR already sent.
+dpr(_, #state{state = 'Open'}) ->
+ ok;
+
+%% Connection not open.
+dpr(_Reason, _S) ->
+ stop.
+
+%% dpr/3
+%%
+%% Note that an implementation that wants to do something
+%% transport_module-specific can lookup the pid of the transport
+%% process and contact it. (eg. diameter:service_info/2)
+
+dpr([CB|Rest], [Reason | _] = Args, S) ->
+ try diameter_lib:eval([CB | Args]) of
+ {dpr, Opts} when is_list(Opts) ->
+ send_dpr(Reason, Opts, S);
+ dpr ->
+ send_dpr(Reason, [], S);
+ close = T ->
+ {stop, {disconnect_cb, T}};
+ ignore ->
+ dpr(Rest, Args, S);
+ T ->
+ No = {disconnect_cb, T},
+ diameter_lib:error_report(invalid, No),
+ {stop, No}
+ catch
+ E:R ->
+ No = {disconnect_cb, E, R, ?STACK},
+ diameter_lib:error_report(failure, No),
+ {stop, No}
+ end;
+
+dpr([], [Reason | _], S) ->
+ send_dpr(Reason, [], S).
+
+-record(opts, {cause, timeout = ?DPA_TIMEOUT}).
+
+send_dpr(Reason, Opts, #state{transport = TPid,
+ service = #diameter_service{capabilities = Caps}}
+ = S) ->
+ #opts{cause = Cause, timeout = Tmo}
+ = lists:foldl(fun opt/2,
+ #opts{cause = case Reason of
+ transport -> ?GOAWAY;
+ _ -> ?REBOOT
+ end,
+ timeout = ?DPA_TIMEOUT},
+ Opts),
+ #diameter_caps{origin_host = {OH, _},
+ origin_realm = {OR, _}}
= Caps,
Bin = encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Disconnect-Cause', Cause}]),
send(TPid, Bin),
- dpa_timer(),
+ dpa_timer(Tmo),
?LOG(send, 'DPR'),
S#state{dpr = diameter_codec:sequence_numbers(Bin)}.
-dpa_timer() ->
- erlang:send_after(?DPA_TIMEOUT, self(), dpa_timeout).
+opt({timeout, Tmo}, Rec)
+ when ?IS_TIMEOUT(Tmo) ->
+ Rec#opts{timeout = Tmo};
+opt({cause, Cause}, Rec)
+ when ?IS_CAUSE(Cause) ->
+ Rec#opts{cause = cause(Cause)};
+opt(T, _) ->
+ ?ERROR({invalid_option, T}).
+
+cause(rebooting) -> ?REBOOT;
+cause(goaway) -> ?GOAWAY;
+cause(busy) -> ?BUSY;
+cause(N)
+ when ?IS_CAUSE(N) ->
+ N;
+cause(N) ->
+ ?ERROR({invalid_cause, N}).
+
+dpa_timer(Tmo) ->
+ erlang:send_after(Tmo, self(), dpa_timeout).
%% register_everywhere/1
%%
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index cffba4fc94..0a0fd6ded1 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -494,7 +494,7 @@ handle_call({info, Item}, _From, S) ->
{reply, service_info(Item, S), S};
handle_call(stop, _From, S) ->
- shutdown(S),
+ shutdown(service, S),
{stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
@@ -683,7 +683,7 @@ upgrade_insert(#state{service = #diameter_service{pid = Pid}} = S) ->
terminate(Reason, #state{service_name = Name} = S) ->
ets:delete(?STATE_TABLE, Name),
shutdown == Reason %% application shutdown
- andalso shutdown(S).
+ andalso shutdown(application, S).
%%% ---------------------------------------------------------------------------
%%% # code_change(FromVsn, State, Extra)
@@ -766,44 +766,48 @@ mod_state(Alias, ModS) ->
%%% # shutdown/2
%%% ---------------------------------------------------------------------------
-shutdown(Refs, #state{peerT = PeerT}) ->
- ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT).
+%% remove_transport: ask watchdogs to terminate their transport.
+shutdown(Refs, #state{peerT = PeerT})
+ when is_list(Refs) ->
+ ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT);
-s(#peer{ref = Ref, pid = Pid}, Refs) ->
- s(lists:member(Ref, Refs), Pid);
-
-s(true, Pid) ->
- Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual
-s(false, _) ->
- ok.
-
-%%% ---------------------------------------------------------------------------
-%%% # shutdown/1
-%%% ---------------------------------------------------------------------------
-
-shutdown(#state{peerT = PeerT}) ->
+%% application/service shutdown: ask transports to terminate themselves.
+shutdown(Reason, #state{peerT = PeerT}) ->
%% A transport might not be alive to receive the shutdown request
%% but give those that are a chance to shutdown gracefully.
- wait(fun st/2, PeerT),
+ shutdown(conn, Reason, PeerT),
%% Kill the watchdogs explicitly in case there was no transport.
- wait(fun sw/2, PeerT).
+ shutdown(peer, Reason, PeerT).
-wait(Fun, T) ->
- diameter_lib:wait(ets:foldl(Fun, [], T)).
+%% sp/2
-st(#peer{op_state = {OS,_}} = P, Acc) ->
- st(P#peer{op_state = OS}, Acc);
-st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) ->
- Pid ! shutdown,
- [Pid | Acc];
-st(#peer{}, Acc) ->
- Acc.
+sp(#peer{ref = Ref, pid = Pid}, Refs) ->
+ lists:member(Ref, Refs)
+ andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up
+
+%% shutdown/3
+
+shutdown(Who, Reason, T) ->
+ diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end,
+ [],
+ T)).
-sw(#peer{pid = Pid}, Acc)
+shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) ->
+ shutdown(Who, P#peer{op_state = OS}, Reason, Acc);
+
+shutdown(conn,
+ #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid},
+ Reason,
+ Acc) ->
+ TPid ! {shutdown, Pid, Reason},
+ [TPid | Acc];
+
+shutdown(peer, #peer{pid = Pid}, _Reason, Acc)
when is_pid(Pid) ->
exit(Pid, shutdown),
[Pid | Acc];
-sw(#peer{}, Acc) ->
+
+shutdown(_, #peer{}, _, Acc) ->
Acc.
%%% ---------------------------------------------------------------------------