aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_peer_fsm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl')
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl364
1 files changed, 271 insertions, 93 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 302540e76b..858870566f 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -48,43 +48,64 @@
-include("diameter_internal.hrl").
-include("diameter_gen_base_rfc3588.hrl").
+%% Values of Disconnect-Cause in DPR.
-define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU').
-define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING').
+-define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY').
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
%% Keys in process dictionary.
--define(CB_KEY, cb). %% capabilities callback
--define(DWA_KEY, dwa). %% outgoing DWA
--define(Q_KEY, q). %% transport start queue
--define(START_KEY, start). %% start of connected transport
+-define(CB_KEY, cb). %% capabilities callback
+-define(DPR_KEY, dpr). %% disconnect callback
+-define(DWA_KEY, dwa). %% outgoing DWA
+-define(REF_KEY, ref). %% transport_ref()
+-define(Q_KEY, q). %% transport start queue
+-define(START_KEY, start). %% start of connected transport
+-define(SEQUENCE_KEY, mask). %% mask for sequence numbers
+-define(RESTRICT_KEY, restrict). %% nodes for connection check
+
+%% The default sequence mask.
+-define(NOMASK, {0,32}).
%% A 2xxx series Result-Code. Not necessarily 2001.
-define(IS_SUCCESS(N), 2 == (N) div 1000).
+%% Guards.
+-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)).
+-define(IS_TIMEOUT(N), ?IS_UINT32(N)).
+-define(IS_CAUSE(N), N == ?REBOOT; N == rebooting;
+ N == ?GOAWAY; N == goaway;
+ N == ?BUSY; N == busy).
+
%% RFC 3588:
%%
%% Timeout An application-defined timer has expired while waiting
%% for some event.
%%
-define(EVENT_TIMEOUT, 10000).
+%% Default timeout for reception of CER/CEA.
-%% How long to wait for a DPA in response to DPR before simply
-%% aborting. Used to distinguish between shutdown and not but there's
-%% not really any need. Stopping a service will require a timeout if
-%% the peer doesn't answer DPR so the value should be short-ish.
+%% Default timeout for DPA in response to DPR. A bit short but the
+%% timeout used to be hardcoded. (So it could be worse.)
-define(DPA_TIMEOUT, 1000).
+-type uint32() :: diameter:'Unsigned32'().
+
-record(state,
- {state = 'Wait-Conn-Ack' %% state of RFC 3588 Peer State Machine
- :: 'Wait-Conn-Ack' | recv_CER | 'Wait-CEA' | 'Open',
+ {state %% of RFC 3588 Peer State Machine
+ :: 'Wait-Conn-Ack' %% old code
+ | {'Wait-Conn-Ack', uint32()}
+ | recv_CER
+ | 'Wait-CEA' %% old code
+ | {'Wait-CEA', uint32(), uint32()}
+ | 'Open',
mode :: accept | connect | {connect, reference()},
- parent :: pid(),
- transport :: pid(),
+ parent :: pid(), %% watchdog process
+ transport :: pid(), %% transport process
service :: #diameter_service{},
- dpr = false :: false | {diameter:'Unsigned32'(),
- diameter:'Unsigned32'()}}).
+ dpr = false :: false | {uint32(), uint32()}}).
%% | hop by hop and end to end identifiers
%% There are non-3588 states possible as a consequence of 5.6.1 of the
@@ -121,7 +142,10 @@
%%% Output: Pid
%%% ---------------------------------------------------------------------------
--spec start(T, [Opt], #diameter_service{})
+-spec start(T, [Opt], #diameter_service{} %% from old code
+ | {diameter:sequence(),
+ diameter:restriction(),
+ #diameter_service{}})
-> pid()
when T :: {connect|accept, diameter:transport_ref()},
Opt :: diameter:transport_opt().
@@ -131,10 +155,8 @@
%% specified on the transport in question. Check here that the list is
%% still non-empty.
-start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) ->
- [] /= Apps orelse ?ERROR({no_apps, Type, Opts}),
- T = {self(), Type, Opts, Svc},
- {ok, Pid} = diameter_peer_fsm_sup:start_child(T),
+start({_,_} = Type, Opts, MS) ->
+ {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}),
Pid.
start_link(T) ->
@@ -153,15 +175,28 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) ->
- putr(?DWA_KEY, dwa(Caps)),
+i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code
+ i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}});
+
+i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
+ capabilities = LCaps}
+ = Svc}}) ->
+ [] /= Apps orelse ?ERROR({no_apps, T, Opts}),
+ putr(?DWA_KEY, dwa(LCaps)),
{M, Ref} = T,
diameter_stats:reg(Ref),
- {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
- putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
+ {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]),
+ putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}),
+ putr(?DPR_KEY, [F || {_, F} <- Ds]),
+ putr(?REF_KEY, Ref),
+ putr(?SEQUENCE_KEY, Mask),
+ putr(?RESTRICT_KEY, Nodes),
erlang:monitor(process, WPid),
{TPid, Addrs} = start_transport(T, Rest, Svc),
- #state{parent = WPid,
+ Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT),
+ ?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}),
+ #state{state = {'Wait-Conn-Ack', Tmo},
+ parent = WPid,
transport = TPid,
mode = M,
service = svc(Svc, Addrs)}.
@@ -174,8 +209,8 @@ i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) ->
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
-start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) ->
- Addrs0 = Caps#diameter_caps.host_ip_address,
+start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
+ Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
start_transport(Addrs0, T) ->
@@ -198,9 +233,9 @@ svc(Svc, []) ->
svc(Svc, Addrs) ->
readdr(Svc, Addrs).
-readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) ->
- Caps = Caps0#diameter_caps{host_ip_address = Addrs},
- Svc#diameter_service{capabilities = Caps}.
+readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) ->
+ LCaps = LCaps0#diameter_caps{host_ip_address = Addrs},
+ Svc#diameter_service{capabilities = LCaps}.
%% The 4-tuple Data returned from diameter_peer:start/1 identifies the
%% transport module/config use to start the transport process in
@@ -299,13 +334,17 @@ eraser(Key) ->
%% transition/2
+%% Started in old code.
+transition(T, #state{state = 'Wait-Conn-Ack' = PS} = S) ->
+ transition(T, S#state{state = {PS, ?EVENT_TIMEOUT}});
+
%% Connection to peer.
transition({diameter, {TPid, connected, Remote}},
#state{transport = TPid,
state = PS,
mode = M}
= S) ->
- 'Wait-Conn-Ack' = PS, %% assert
+ {'Wait-Conn-Ack', _} = PS, %% assert
connect = M, %%
keep_transport(TPid),
send_CER(S#state{mode = {M, Remote}});
@@ -317,11 +356,11 @@ transition({diameter, {TPid, connected}},
mode = M,
parent = Pid}
= S) ->
- 'Wait-Conn-Ack' = PS, %% assert
+ {'Wait-Conn-Ack', Tmo} = PS, %% assert
accept = M, %%
keep_transport(TPid),
Pid ! {accepted, self()},
- start_timer(S#state{state = recv_CER});
+ start_timer(Tmo, S#state{state = recv_CER});
%% Connection established after receiving a connection_timeout
%% message. This may be followed by an incoming message which arrived
@@ -335,7 +374,7 @@ transition({diameter, {_, connected, _}}, _) ->
%% Connection has timed out: start an alternate.
transition({connection_timeout = T, TPid},
#state{transport = TPid,
- state = 'Wait-Conn-Ack'}
+ state = {'Wait-Conn-Ack', _}}
= S) ->
exit(TPid, {shutdown, T}),
start_next(S);
@@ -349,7 +388,8 @@ transition({diameter, {recv, Pkt}}, S) ->
recv(Pkt, S);
%% Timeout when still in the same state ...
-transition({timeout, PS}, #state{state = PS}) ->
+transition({timeout = T, PS}, #state{state = PS} = S) ->
+ close({capx(PS), T}, S),
stop;
%% ... or not.
@@ -361,25 +401,19 @@ transition({send, Msg}, #state{transport = TPid}) ->
send(TPid, Msg),
ok;
-%% Request for graceful shutdown.
-transition({shutdown, Pid}, #state{parent = Pid, dpr = false} = S) ->
- dpr(?GOAWAY, S);
-transition({shutdown, Pid}, #state{parent = Pid}) ->
+%% Messages from old (diameter_service) code.
+transition(shutdown = T, #state{parent = Pid} = S) ->
+ transition({T, Pid, service}, S); %% Reason irrelevant: old code has no cb
+
+%% Request for graceful shutdown at remove_transport, stop_service of
+%% application shutdown.
+transition({shutdown = T, Pid}, S) ->
+ transition({T, Pid, transport}, S);
+transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) ->
+ dpr(Reason, S);
+transition({shutdown, Pid, _}, #state{parent = Pid}) ->
ok;
-%% Application shutdown.
-transition(shutdown, #state{dpr = false} = S) ->
- dpr(?REBOOT, S);
-transition(shutdown, _) -> %% DPR already send: ensure expected timeout
- dpa_timer(),
- ok;
-
-%% Request to close the transport connection.
-transition({close = T, Pid}, #state{parent = Pid,
- transport = TPid}) ->
- diameter_peer:close(TPid),
- {stop, T};
-
%% DPA reception has timed out.
transition(dpa_timeout, _) ->
stop;
@@ -411,6 +445,11 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
+capx(recv_CER) ->
+ 'CER';
+capx({'Wait-CEA', _, _}) ->
+ 'CEA'.
+
%% start_next/1
start_next(#state{service = Svc0} = S) ->
@@ -426,18 +465,23 @@ start_next(#state{service = Svc0} = S) ->
%% send_CER/1
-send_CER(#state{mode = {connect, Remote},
- service = #diameter_service{capabilities = Caps},
+send_CER(#state{state = {'Wait-Conn-Ack', Tmo},
+ mode = {connect, Remote},
+ service = #diameter_service{capabilities = LCaps},
transport = TPid}
= S) ->
- OH = Caps#diameter_caps.origin_host,
+ OH = LCaps#diameter_caps.origin_host,
req_send_CER(OH, Remote)
orelse
- close({already_connected, Remote, Caps}, S),
+ close({already_connected, Remote, LCaps}, S),
CER = build_CER(S),
?LOG(send, 'CER'),
- send(TPid, encode(CER)),
- start_timer(S#state{state = 'Wait-CEA'}).
+ #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
+ hop_by_hop_id = Hid}}
+ = Pkt
+ = encode(CER),
+ send(TPid, Pkt),
+ start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}).
%% Register ourselves as connecting to the remote endpoint in
%% question. This isn't strictly necessary since a peer implementing
@@ -449,23 +493,36 @@ send_CER(#state{mode = {connect, Remote},
req_send_CER(OriginHost, Remote) ->
register_everywhere({?MODULE, connection, OriginHost, {remote, Remote}}).
-%% start_timer/1
+%% start_timer/2
-start_timer(#state{state = PS} = S) ->
- erlang:send_after(?EVENT_TIMEOUT, self(), {timeout, PS}),
+start_timer(Tmo, #state{state = PS} = S) ->
+ erlang:send_after(Tmo, self(), {timeout, PS}),
S.
%% build_CER/1
-build_CER(#state{service = #diameter_service{capabilities = Caps}}) ->
- {ok, CER} = diameter_capx:build_CER(Caps),
+build_CER(#state{service = #diameter_service{capabilities = LCaps}}) ->
+ {ok, CER} = diameter_capx:build_CER(LCaps),
CER.
%% encode/1
encode(Rec) ->
- #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Rec),
- Bin.
+ Seq = diameter_session:sequence(sequence()),
+ Hdr = #diameter_header{version = ?DIAMETER_VERSION,
+ end_to_end_id = Seq,
+ hop_by_hop_id = Seq},
+ diameter_codec:encode(?BASE, #diameter_packet{header = Hdr,
+ msg = Rec}).
+
+sequence() ->
+ case getr(?SEQUENCE_KEY) of
+ {_,_} = Mask ->
+ Mask;
+ undefined -> %% started in old code
+ putr(?SEQUENCE_KEY, ?NOMASK),
+ ?NOMASK
+ end.
%% recv/2
@@ -524,7 +581,14 @@ discard(Reason, F, A) ->
%% rcv/3
%% Incoming CEA.
-rcv('CEA', Pkt, #state{state = 'Wait-CEA'} = S) ->
+rcv('CEA',
+ #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
+ hop_by_hop_id = Hid}}
+ = Pkt,
+ #state{state = {'Wait-CEA' = T, Hid, Eid}}
+ = S) ->
+ handle_CEA(Pkt, S#state{state = T});
+rcv('CEA', Pkt, #state{state = 'Wait-CEA'} = S) -> %% old code
handle_CEA(Pkt, S);
%% Incoming CER
@@ -544,16 +608,16 @@ rcv(N, Pkt, S)
N == 'DPR' ->
handle_request(N, Pkt, S);
-%% DPA even though we haven't sent DPR: ignore.
-rcv('DPA', _Pkt, #state{dpr = false}) ->
- ok;
-
-%% DPA in response to DPR. We could check the sequence numbers but
-%% don't bother, just close.
-rcv('DPA' = N, _Pkt, #state{transport = TPid}) ->
+%% DPA in response to DPR and with the expected identifiers.
+rcv('DPA' = N,
+ #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
+ hop_by_hop_id = Hid}},
+ #state{transport = TPid,
+ dpr = {Hid, Eid}}) ->
diameter_peer:close(TPid),
{stop, N};
+%% Ignore anything else, an unsolicited DPA in particular.
rcv(_, _, _) ->
ok.
@@ -771,8 +835,8 @@ a('CER', #diameter_caps{vendor_id = Vid,
{'Product-Name', Name},
{'Origin-State-Id', OSI}];
-a('DPR', #diameter_caps{origin_host = Host,
- origin_realm = Realm}) ->
+a('DPR', #diameter_caps{origin_host = {Host, _},
+ origin_realm = {Realm, _}}) ->
['DPA', {'Origin-Host', Host},
{'Origin-Realm', Realm}].
@@ -880,7 +944,9 @@ rejected(N)
%% open/5
-open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) ->
+open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid,
+ service = Svc}
+ = S) ->
#diameter_caps{origin_host = {_,_} = H,
inband_security_id = {LS,_}}
= Caps,
@@ -888,7 +954,9 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) ->
tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S),
Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}},
- S#state{state = 'Open'}.
+ %% Replace capabilities record with local/remote pairs.
+ S#state{state = 'Open',
+ service = Svc#diameter_service{capabilities = Caps}}.
%% We've advertised TLS support: tell the transport the result
%% and expect a reply when the handshake is complete.
@@ -941,38 +1009,148 @@ dwa(#diameter_caps{origin_host = OH,
{'Origin-State-Id', OSI}].
%% dpr/2
+%%
+%% The RFC isn't clear on whether DPR should be send in a non-Open
+%% state. The Peer State Machine transitions it documents aren't
+%% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to
+%% the implementation and transition to Closed (ie. die) if we haven't
+%% yet reached Open.
+
+%% Connection is open, DPR has not been sent.
+dpr(Reason, #state{state = 'Open',
+ dpr = false,
+ service = #diameter_service{capabilities = Caps}}
+ = S) ->
+ case getr(?DPR_KEY) of
+ CBs when is_list(CBs) ->
+ Ref = getr(?REF_KEY),
+ Peer = {self(), Caps},
+ dpr(CBs, [Reason, Ref, Peer], S);
+ undefined -> %% started in old code
+ send_dpr(Reason, [], S)
+ end;
-dpr(Cause, #state{transport = TPid,
- service = #diameter_service{capabilities = Caps}}
- = S) ->
- #diameter_caps{origin_host = OH,
- origin_realm = OR}
+%% Connection is open, DPR already sent.
+dpr(_, #state{state = 'Open'}) ->
+ ok;
+
+%% Connection not open.
+dpr(_Reason, _S) ->
+ stop.
+
+%% dpr/3
+%%
+%% Note that an implementation that wants to do something
+%% transport_module-specific can lookup the pid of the transport
+%% process and contact it. (eg. diameter:service_info/2)
+
+dpr([CB|Rest], [Reason | _] = Args, S) ->
+ try diameter_lib:eval([CB | Args]) of
+ {dpr, Opts} when is_list(Opts) ->
+ send_dpr(Reason, Opts, S);
+ dpr ->
+ send_dpr(Reason, [], S);
+ close = T ->
+ {stop, {disconnect_cb, T}};
+ ignore ->
+ dpr(Rest, Args, S);
+ T ->
+ No = {disconnect_cb, T},
+ diameter_lib:error_report(invalid, No),
+ {stop, No}
+ catch
+ E:R ->
+ No = {disconnect_cb, E, R, ?STACK},
+ diameter_lib:error_report(failure, No),
+ {stop, No}
+ end;
+
+dpr([], [Reason | _], S) ->
+ send_dpr(Reason, [], S).
+
+-record(opts, {cause, timeout = ?DPA_TIMEOUT}).
+
+send_dpr(Reason, Opts, #state{transport = TPid,
+ service = #diameter_service{capabilities = Caps}}
+ = S) ->
+ #opts{cause = Cause, timeout = Tmo}
+ = lists:foldl(fun opt/2,
+ #opts{cause = case Reason of
+ transport -> ?GOAWAY;
+ _ -> ?REBOOT
+ end,
+ timeout = ?DPA_TIMEOUT},
+ Opts),
+ #diameter_caps{origin_host = {OH, _},
+ origin_realm = {OR, _}}
= Caps,
- Bin = encode(['DPR', {'Origin-Host', OH},
+ #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
+ hop_by_hop_id = Hid}}
+ = Pkt
+ = encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Disconnect-Cause', Cause}]),
- send(TPid, Bin),
- dpa_timer(),
+ send(TPid, Pkt),
+ dpa_timer(Tmo),
?LOG(send, 'DPR'),
- S#state{dpr = diameter_codec:sequence_numbers(Bin)}.
-
-dpa_timer() ->
- erlang:send_after(?DPA_TIMEOUT, self(), dpa_timeout).
+ S#state{dpr = {Hid, Eid}}.
+
+opt({timeout, Tmo}, Rec)
+ when ?IS_TIMEOUT(Tmo) ->
+ Rec#opts{timeout = Tmo};
+opt({cause, Cause}, Rec)
+ when ?IS_CAUSE(Cause) ->
+ Rec#opts{cause = cause(Cause)};
+opt(T, _) ->
+ ?ERROR({invalid_option, T}).
+
+cause(rebooting) -> ?REBOOT;
+cause(goaway) -> ?GOAWAY;
+cause(busy) -> ?BUSY;
+cause(N)
+ when ?IS_CAUSE(N) ->
+ N;
+cause(N) ->
+ ?ERROR({invalid_cause, N}).
+
+dpa_timer(Tmo) ->
+ erlang:send_after(Tmo, self(), dpa_timeout).
%% register_everywhere/1
%%
%% Register a term and ensure it's not registered elsewhere. Note that
%% two process that simultaneously register the same term may well
%% both fail to do so this isn't foolproof.
+%%
+%% Everywhere is no longer everywhere, it's where a
+%% restrict_connections service_opt() specifies.
register_everywhere(T) ->
- diameter_reg:add_new(T)
- andalso unregistered(T).
+ reg(getr(?RESTRICT_KEY), T).
+
+reg(Nodes, T) ->
+ add(lists:member(node(), Nodes), T) andalso unregistered(Nodes, T).
+
+add(true, T) ->
+ diameter_reg:add_new(T);
+add(false, T) ->
+ diameter_reg:add(T).
+
+%% unregistered
+%%
+%% Ensure that the term in question isn't registered on other nodes.
+
+unregistered(Nodes, T) ->
+ {ResL, _} = rpc:multicall(Nodes, ?MODULE, match, [{node(), T}]),
+ lists:all(fun nomatch/1, ResL).
+
+nomatch({badrpc, {'EXIT', {undef, _}}}) -> %% no diameter on remote node
+ true;
+nomatch(L) ->
+ [] == L.
-unregistered(T) ->
- {ResL, _} = rpc:multicall(?MODULE, match, [{node(), T}]),
- lists:all(fun(L) -> [] == L end, ResL).
+%% match/1
match({Node, _})
when Node == node() ->