diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 82 |
1 files changed, 66 insertions, 16 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 302540e76b..3f4945f7a6 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -55,10 +55,15 @@ -define(TLS, 1). %% Keys in process dictionary. --define(CB_KEY, cb). %% capabilities callback --define(DWA_KEY, dwa). %% outgoing DWA --define(Q_KEY, q). %% transport start queue --define(START_KEY, start). %% start of connected transport +-define(CB_KEY, cb). %% capabilities callback +-define(DWA_KEY, dwa). %% outgoing DWA +-define(Q_KEY, q). %% transport start queue +-define(START_KEY, start). %% start of connected transport +-define(SEQUENCE_KEY, mask). %% mask for sequence numbers +-define(RESTRICT_KEY, restrict). %% nodes for connection check + +%% The default sequence mask. +-define(NOMASK, {0,32}). %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). @@ -121,7 +126,10 @@ %%% Output: Pid %%% --------------------------------------------------------------------------- --spec start(T, [Opt], #diameter_service{}) +-spec start(T, [Opt], #diameter_service{} %% from old code + | {diameter:sequence(), + diameter:restriction(), + #diameter_service{}}) -> pid() when T :: {connect|accept, diameter:transport_ref()}, Opt :: diameter:transport_opt(). @@ -131,10 +139,8 @@ %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_,_} = Type, Opts, #diameter_service{applications = Apps} = Svc) -> - [] /= Apps orelse ?ERROR({no_apps, Type, Opts}), - T = {self(), Type, Opts, Svc}, - {ok, Pid} = diameter_peer_fsm_sup:start_child(T), +start({_,_} = Type, Opts, MS) -> + {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}), Pid. start_link(T) -> @@ -153,12 +159,20 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) -> +i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code + i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}}); + +i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, + capabilities = Caps} + = Svc}}) -> + [] /= Apps orelse ?ERROR({no_apps, T, Opts}), putr(?DWA_KEY, dwa(Caps)), {M, Ref} = T, diameter_stats:reg(Ref), {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), + putr(?SEQUENCE_KEY, Mask), + putr(?RESTRICT_KEY, Nodes), erlang:monitor(process, WPid), {TPid, Addrs} = start_transport(T, Rest, Svc), #state{parent = WPid, @@ -464,9 +478,24 @@ build_CER(#state{service = #diameter_service{capabilities = Caps}}) -> %% encode/1 encode(Rec) -> - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Rec), + Seq = diameter_session:sequence(sequence()), + Hdr = #diameter_header{version = ?DIAMETER_VERSION, + end_to_end_id = Seq, + hop_by_hop_id = Seq}, + Pkt = #diameter_packet{header = Hdr, + msg = Rec}, + #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), Bin. +sequence() -> + case getr(?SEQUENCE_KEY) of + {_,_} = Mask -> + Mask; + undefined -> %% started in old code + putr(?SEQUENCE_KEY, ?NOMASK), + ?NOMASK + end. + %% recv/2 %% RFC 3588 has result code 5015 for an invalid length but if a @@ -965,14 +994,35 @@ dpa_timer() -> %% Register a term and ensure it's not registered elsewhere. Note that %% two process that simultaneously register the same term may well %% both fail to do so this isn't foolproof. +%% +%% Everywhere is no longer everywhere, it's where a +%% restrict_connections service_opt() specifies. register_everywhere(T) -> - diameter_reg:add_new(T) - andalso unregistered(T). + reg(getr(?RESTRICT_KEY), T). + +reg(Nodes, T) -> + add(lists:member(node(), Nodes), T) andalso unregistered(Nodes, T). + +add(true, T) -> + diameter_reg:add_new(T); +add(false, T) -> + diameter_reg:add(T). + +%% unregistered +%% +%% Ensure that the term in question isn't registered on other nodes. + +unregistered(Nodes, T) -> + {ResL, _} = rpc:multicall(Nodes, ?MODULE, match, [{node(), T}]), + lists:all(fun nomatch/1, ResL). + +nomatch({badrpc, {'EXIT', {undef, _}}}) -> %% no diameter on remote node + true; +nomatch(L) -> + [] == L. -unregistered(T) -> - {ResL, _} = rpc:multicall(?MODULE, match, [{node(), T}]), - lists:all(fun(L) -> [] == L end, ResL). +%% match/1 match({Node, _}) when Node == node() -> |