aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/base/diameter_service.erl331
1 files changed, 163 insertions, 168 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index a0af92c2a2..efce8c8f10 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -114,8 +114,10 @@
{id = now(),
service_name, %% as passed to start_service/2, key in ?STATE_TABLE
service :: #diameter_service{},
- peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm
- connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen
+ watchdogT = ets_new(watchdogs) %% #watchdog{} at start
+ :: ets:tid(),
+ peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
+ :: ets:tid(),
shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
monitor = false :: false | pid(), %% process to die with
@@ -131,35 +133,26 @@
%% service record is used to determine whether or not we need to call
%% the process for a pick_peer callback.
-%% Record representing a watchdog process as implemented by
-%% diameter_watchdog. The term "peer" here is historical, made
-%% especially confusing by the fact that a peer_ref() in the
-%% documentation is the key of a #conn{} record, not a #peer{} record.
-%% The name is also unfortunate given the meaning of peer in the
-%% Diameter sense.
--record(peer,
+%% Record representing an RFC 3539 watchdog process implemented by
+%% diameter_watchdog.
+-record(watchdog,
{pid :: match(pid()),
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- wd_state = ?WD_INITIAL :: match(wd_state()),
+ state = ?WD_INITIAL :: match(wd_state()),
started = now(), %% at process start
- conn = false :: match(boolean() | pid())}).
+ peer = false :: match(boolean() | pid())}).
%% true at accepted, pid() at okay/reopen
-%% Record representing a peer process as implemented by
-%% diameter_peer_fsm. The term "conn" is historical. Despite the name
-%% here, comments refer to watchdog and peer processes, that are keys
-%% in #peer{} and #conn{} records respectively. To add to the
-%% confusion, a #request.transport is a peer process = key in a
-%% #conn{} record. The actual transport process (that the peer process
-%% knows about and that has a transport connection) isn't seen here.
--record(conn,
+%% Record representing an Peer State Machine processes implemented by
+%% diameter_peer_fsm.
+-record(peer,
{pid :: pid(),
apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
caps :: #diameter_caps{},
started = now(), %% at process start
- peer :: pid()}). %% key into peerT
+ watchdog :: pid()}). %% key into watchdogT
%% Record stored in diameter_request for each outgoing request.
-record(request,
@@ -519,17 +512,17 @@ transition({reconnect, Pid}, S) ->
%% Watchdog is sending notification of a state transition.
transition({watchdog, Pid, {[TPid | Data], From, To}},
#state{service_name = SvcName,
- peerT = PeerT}
+ watchdogT = WatchdogT}
= S) ->
- #peer{ref = Ref, type = T, options = Opts}
- = P
- = fetch(PeerT, Pid),
- watchdog(TPid, Data, From, To, P, S),
+ #watchdog{ref = Ref, type = T, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ watchdog(TPid, Data, From, To, Wd, S),
send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
ok;
-%% Death of a watchdog process (#peer.pid) results in the removal of
+%% Death of a watchdog process (#watchdog.pid) results in the removal of
%% it's peer and any associated conn record when 'DOWN' is received.
-%% Death of a peer process process (#conn.pid, #peer.conn) results in
+%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
%% ?WD_DOWN.
%% Monitor process has died. Just die with a reason that tells
@@ -667,27 +660,27 @@ mod_state(Alias, ModS) ->
%%% ---------------------------------------------------------------------------
%% remove_transport
-shutdown(Refs, #state{peerT = PeerT})
+shutdown(Refs, #state{watchdogT = WatchdogT})
when is_list(Refs) ->
- ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, PeerT);
+ ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);
%% application/service shutdown
-shutdown(Reason, #state{peerT = PeerT})
+shutdown(Reason, #state{watchdogT = WatchdogT})
when Reason == application;
Reason == service ->
diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
[],
- PeerT)).
+ WatchdogT)).
%% st/2
-st(#peer{ref = Ref, pid = Pid}, Refs) ->
+st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
lists:member(Ref, Refs)
andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up
%% st/3
-st(#peer{pid = Pid}, Reason, Acc) ->
+st(#watchdog{pid = Pid}, Reason, Acc) ->
Pid ! {shutdown, self(), Reason},
[Pid | Acc].
@@ -822,22 +815,22 @@ type(connect = T) -> T.
%% start/4
-start(Ref, Type, Opts, #state{peerT = PeerT,
- connT = ConnT,
+start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
+ peerT = PeerT,
options = SvcOpts,
service_name = SvcName,
service = Svc})
when Type == connect;
Type == accept ->
- Pid = s(Type, Ref, {ConnT,
+ Pid = s(Type, Ref, {PeerT,
Opts,
SvcName,
SvcOpts,
merge_service(Opts, Svc)}),
- insert(PeerT, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts}),
+ insert(WatchdogT, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts}),
Pid.
%% Note that the service record passed into the watchdog is the merged
@@ -884,12 +877,12 @@ ms(_, Svc) ->
%%% # accepted/3
%%% ---------------------------------------------------------------------------
-accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
- #peer{ref = Ref, type = accept = T, conn = false, options = Opts}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{conn = true}), %% mark replacement as started
- start(Ref, T, Opts, S). %% start new watchdog
+accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
+ #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
+ start(Ref, T, Opts, S). %% start new watchdog
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
@@ -900,29 +893,29 @@ fetch(Tid, Key) ->
%%% ---------------------------------------------------------------------------
%% Watchdog has a new open connection.
-watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) ->
- connection_up({TPid, T}, Peer, State);
+watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
+ connection_up({TPid, T}, Wd, State);
%% Watchdog has a new connection that will be opened after DW[RA]
%% exchange.
-watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) ->
- reopen({TPid, T}, Peer, State);
+watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
+ reopen({TPid, T}, Wd, State);
%% Watchdog has recovered a suspect connection.
-watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) ->
- #peer{conn = TPid} = Peer, %% assert
- connection_up(Peer, State);
+watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_up(Wd, State);
%% Watchdog has an unresponsive connection.
-watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) ->
- #peer{conn = TPid} = Peer, %% assert
- connection_down(Peer, To, State);
+watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_down(Wd, To, State);
%% Watchdog has lost its connection.
-watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) ->
- close(Peer, S),
- connection_down(Peer, To, S),
- ets:delete(ConnT, TPid);
+watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
+ close(Wd, S),
+ connection_down(Wd, To, S),
+ ets:delete(PeerT, TPid);
watchdog(_, [], _, _, _, _) ->
ok.
@@ -934,32 +927,32 @@ watchdog(_, [], _, _, _, _) ->
%% Watchdog process has reached state OKAY.
connection_up({TPid, {Caps, SApps, Pkt}},
- #peer{pid = Pid}
- = P,
- #state{connT = ConnT}
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{peerT = PeerT}
= S) ->
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
- insert(ConnT, C),
- connection_up([Pkt], P#peer{conn = TPid}, C, S).
+ Pr = #peer{pid = TPid,
+ apps = SApps,
+ caps = Caps,
+ watchdog = Pid},
+ insert(PeerT, Pr),
+ connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
%%% ---------------------------------------------------------------------------
%%% # reopen/3
%%% ---------------------------------------------------------------------------
reopen({TPid, {Caps, SApps, _Pkt}},
- #peer{pid = Pid}
- = P,
- #state{peerT = PeerT,
- connT = ConnT}) ->
- insert(ConnT, #conn{pid = TPid,
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}) ->
+ insert(PeerT, #peer{pid = TPid,
apps = SApps,
caps = Caps,
- peer = Pid}),
- insert(PeerT, P#peer{wd_state = ?WD_REOPEN,
- conn = TPid}).
+ watchdog = Pid}),
+ insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
+ peer = TPid}).
%%% ---------------------------------------------------------------------------
%%% # connection_up/2
@@ -968,26 +961,26 @@ reopen({TPid, {Caps, SApps, _Pkt}},
%% Watchdog has recovered as suspect connection. Note that there has
%% been no new capabilties exchange in this case.
-connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
- connection_up([], P, fetch(ConnT, TPid), S).
+connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_up([], Wd, fetch(PeerT, TPid), S).
%% connection_up/4
connection_up(Extra,
- #peer{conn = TPid}
- = P,
- #conn{apps = SApps, caps = Caps}
- = C,
- #state{peerT = PeerT,
+ #watchdog{peer = TPid}
+ = Wd,
+ #peer{apps = SApps, caps = Caps}
+ = Pr,
+ #state{watchdogT = WatchdogT,
local_peers = LDict,
service_name = SvcName,
service
= #diameter_service{applications = Apps}}
= S) ->
- insert(PeerT, P#peer{wd_state = ?WD_OKAY}),
+ insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
request_peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- report_status(up, P, C, S, Extra).
+ report_status(up, Wd, Pr, S, Extra).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -1026,35 +1019,35 @@ peer_cb(MFA, Alias) ->
%%% # connection_down/3
%%% ---------------------------------------------------------------------------
-connection_down(#peer{wd_state = ?WD_OKAY,
- conn = TPid}
- = P,
- #conn{caps = Caps,
+connection_down(#watchdog{state = ?WD_OKAY,
+ peer = TPid}
+ = Wd,
+ #peer{caps = Caps,
apps = SApps}
- = C,
+ = Pr,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
local_peers = LDict}
= S) ->
- report_status(down, P, C, S, []),
+ report_status(down, Wd, Pr, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
request_peer_down(TPid, S);
-connection_down(#peer{}, #conn{}, _) ->
+connection_down(#watchdog{}, #peer{}, _) ->
ok;
-connection_down(#peer{wd_state = WS,
- conn = TPid}
- = P,
+connection_down(#watchdog{state = WS,
+ peer = TPid}
+ = Wd,
To,
- #state{peerT = PeerT,
- connT = ConnT}
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}
= S)
when is_atom(To) ->
- insert(PeerT, P#peer{wd_state = To}),
+ insert(WatchdogT, Wd#watchdog{state = To}),
?WD_OKAY == WS
andalso
- connection_down(P, fetch(ConnT, TPid), S).
+ connection_down(Wd, fetch(PeerT, TPid), S).
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1077,19 +1070,19 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) ->
%% Watchdog process has died.
-peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
- P = fetch(PeerT, Pid),
- ets:delete_object(PeerT, P),
+peer_down(Pid, Reason, #state{watchdogT = WatchdogT} = S) ->
+ P = fetch(WatchdogT, Pid),
+ ets:delete_object(WatchdogT, P),
closed(Reason, P, S),
restart(P,S),
peer_down(P,S).
%% Send an event at connection establishment failure.
closed({shutdown, {close, _TPid, Reason}},
- #peer{wd_state = WS,
- ref = Ref,
- type = Type,
- options = Opts},
+ #watchdog{state = WS,
+ ref = Ref,
+ type = Type,
+ options = Opts},
#state{service_name = SvcName})
when WS /= ?WD_OKAY ->
send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
@@ -1097,14 +1090,14 @@ closed(_, _, _) ->
ok.
%% The watchdog has never reached OKAY ...
-peer_down(#peer{conn = B}, _)
+peer_down(#watchdog{peer = B}, _)
when is_boolean(B) ->
ok;
%% ... or maybe it has.
-peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
- connection_down(P, ?WD_DOWN, S),
- ets:delete(ConnT, TPid).
+peer_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_down(Wd, ?WD_DOWN, S),
+ ets:delete(PeerT, TPid).
%% restart/2
@@ -1114,22 +1107,22 @@ restart(P,S) ->
%% restart/1
%% Always try to reconnect.
-restart(#peer{ref = Ref,
- type = connect = T,
- options = Opts,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = connect = T,
+ options = Opts,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% Transport connection hasn't yet been accepted ...
-restart(#peer{ref = Ref,
- type = accept = T,
- options = Opts,
- conn = false,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = accept = T,
+ options = Opts,
+ peer = false,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% ... or it has: a replacement has already been spawned.
-restart(#peer{type = accept}) ->
+restart(#watchdog{type = accept}) ->
false.
%% q_restart/2
@@ -1190,12 +1183,12 @@ tc(false = No, _, _) -> %% removed
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
-close(#peer{type = connect}, _) ->
+close(#watchdog{type = connect}, _) ->
ok;
-close(#peer{type = accept,
- pid = Pid,
- ref = Ref,
- options = Opts},
+close(#watchdog{type = accept,
+ pid = Pid,
+ ref = Ref,
+ options = Opts},
#state{service_name = SvcName}) ->
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
@@ -1219,11 +1212,11 @@ c(Pid, false, _Opts) ->
%%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref,
- type = connect,
- options = Opts}
- = fetch(PeerT, Pid),
+ watchdogT = WatchdogT}) ->
+ #watchdog{ref = Ref,
+ type = connect,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
%%% ---------------------------------------------------------------------------
@@ -1703,8 +1696,8 @@ request_peer_down(TPid, S) ->
%%% recv_request/3
%%% ---------------------------------------------------------------------------
-recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) ->
- try ets:lookup(ConnT, TPid) of
+recv_request(TPid, Pkt, {PeerT, SvcName, Apps, Mask}) ->
+ try ets:lookup(PeerT, TPid) of
[C] ->
recv_request(C, TPid, Pkt, SvcName, Apps, Mask);
[] -> %% transport has gone down
@@ -1716,7 +1709,7 @@ recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) ->
%% recv_request/5
-recv_request(#conn{apps = SApps, caps = Caps},
+recv_request(#peer{apps = SApps, caps = Caps},
TPid,
Pkt,
SvcName,
@@ -2536,11 +2529,11 @@ rt(#request{packet = #diameter_packet{msg = Msg},
%%% ---------------------------------------------------------------------------
report_status(Status,
- #peer{ref = Ref,
- conn = TPid,
- type = Type,
- options = Opts},
- #conn{apps = [_|_] = As,
+ #watchdog{ref = Ref,
+ peer = TPid,
+ type = Type,
+ options = Opts},
+ #peer{apps = [_|_] = As,
caps = Caps},
#state{service_name = SvcName}
= S,
@@ -2909,8 +2902,8 @@ eq(Any, Id, PeerId) ->
%% transports/1
-transports(#state{peerT = PeerT}) ->
- ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
+transports(#state{watchdogT = WatchdogT}) ->
+ ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
[{'is_pid', '$1'}],
['$1']}]).
@@ -2964,11 +2957,12 @@ tagged_info(Item, S)
undefined
end;
-tagged_info(TPid, #state{peerT = PT, connT = CT})
+tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
when is_pid(TPid) ->
try
- [#conn{peer = Pid}] = ets:lookup(CT, TPid),
- [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid),
+ [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
+ [#watchdog{ref = Ref, type = Type, options = Opts}]
+ = ets:lookup(WatchdogT, Pid),
[{ref, Ref},
{type, Type},
{options, Opts}]
@@ -3051,11 +3045,11 @@ complete(Pre) ->
%% info_stats/1
-info_stats(#state{peerT = PeerT}) ->
- MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'},
+info_stats(#state{watchdogT = WatchdogT}) ->
+ MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
[{'is_pid', '$2'}],
[['$1', '$2']]}],
- try ets:select(PeerT, MatchSpec) of
+ try ets:select(WatchdogT, MatchSpec) of
L ->
diameter_stats:read(lists:append(L))
catch
@@ -3065,7 +3059,8 @@ info_stats(#state{peerT = PeerT}) ->
%% info_transport/1
%%
%% One entry per configured transport. Statistics for each entry are
-%% the accumulated values for the ref and associated peer pids.
+%% the accumulated values for the ref and associated watchdog/peer
+%% pids.
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
@@ -3098,42 +3093,42 @@ transport([[_,_] | L]) ->
%% Possibly many peer entries for a listening transport. Note that all
%% have the same options by construction, which is not terribly space
-%% efficient. (TODO: all entries for the same Ref should share options.)
+%% efficient.
transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
[{type, listen},
{options, Opts},
{accept, [lists:nthtail(2,L) || L <- Ls]}].
-peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) ->
- try ets:tab2list(PeerT) of
+peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
+ try ets:tab2list(WatchdogT) of
L ->
- lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L)
+ lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
catch
error: badarg -> Dict0 %% service has gone down
end.
-peer_acc(ConnT, Acc, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts,
- wd_state = WS,
- started = T,
- conn = TPid}) ->
+peer_acc(PeerT, Acc, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ state = WS,
+ started = At,
+ peer = TPid}) ->
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, WS}}
- | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
+ {watchdog, {Pid, At, WS}}
+ | info_peer(PeerT, TPid, WS)],
Acc).
-info_conn(ConnT, TPid, true)
- when is_pid(TPid) ->
- try ets:lookup(ConnT, TPid) of
- T -> info_conn(T)
+info_peer(PeerT, TPid, WS)
+ when is_pid(TPid), WS /= ?WD_DOWN ->
+ try ets:lookup(PeerT, TPid) of
+ T -> info_peer(T)
catch
error: badarg -> [] %% service has gone down
end;
-info_conn(_, _, _) ->
+info_peer(_, _, _) ->
[].
%% The point of extracting the config here is so that 'transport' info
@@ -3152,12 +3147,12 @@ config_acc({Ref, T, Opts}, Dict)
config_acc(_, Dict) ->
Dict.
-info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
+info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
{caps, info_caps(Caps)}
| try [{port, info_port(Pid)}] catch _:_ -> [] end];
-info_conn([] = No) ->
+info_peer([] = No) ->
No.
%% Extract information that the processes involved are expected to