diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 331 |
1 files changed, 163 insertions, 168 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index a0af92c2a2..efce8c8f10 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -114,8 +114,10 @@ {id = now(), service_name, %% as passed to start_service/2, key in ?STATE_TABLE service :: #diameter_service{}, - peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm - connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen + watchdogT = ets_new(watchdogs) %% #watchdog{} at start + :: ets:tid(), + peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen + :: ets:tid(), shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] monitor = false :: false | pid(), %% process to die with @@ -131,35 +133,26 @@ %% service record is used to determine whether or not we need to call %% the process for a pick_peer callback. -%% Record representing a watchdog process as implemented by -%% diameter_watchdog. The term "peer" here is historical, made -%% especially confusing by the fact that a peer_ref() in the -%% documentation is the key of a #conn{} record, not a #peer{} record. -%% The name is also unfortunate given the meaning of peer in the -%% Diameter sense. --record(peer, +%% Record representing an RFC 3539 watchdog process implemented by +%% diameter_watchdog. +-record(watchdog, {pid :: match(pid()), type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - wd_state = ?WD_INITIAL :: match(wd_state()), + state = ?WD_INITIAL :: match(wd_state()), started = now(), %% at process start - conn = false :: match(boolean() | pid())}). + peer = false :: match(boolean() | pid())}). %% true at accepted, pid() at okay/reopen -%% Record representing a peer process as implemented by -%% diameter_peer_fsm. The term "conn" is historical. Despite the name -%% here, comments refer to watchdog and peer processes, that are keys -%% in #peer{} and #conn{} records respectively. To add to the -%% confusion, a #request.transport is a peer process = key in a -%% #conn{} record. The actual transport process (that the peer process -%% knows about and that has a transport connection) isn't seen here. --record(conn, +%% Record representing an Peer State Machine processes implemented by +%% diameter_peer_fsm. +-record(peer, {pid :: pid(), apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} caps :: #diameter_caps{}, started = now(), %% at process start - peer :: pid()}). %% key into peerT + watchdog :: pid()}). %% key into watchdogT %% Record stored in diameter_request for each outgoing request. -record(request, @@ -519,17 +512,17 @@ transition({reconnect, Pid}, S) -> %% Watchdog is sending notification of a state transition. transition({watchdog, Pid, {[TPid | Data], From, To}}, #state{service_name = SvcName, - peerT = PeerT} + watchdogT = WatchdogT} = S) -> - #peer{ref = Ref, type = T, options = Opts} - = P - = fetch(PeerT, Pid), - watchdog(TPid, Data, From, To, P, S), + #watchdog{ref = Ref, type = T, options = Opts} + = Wd + = fetch(WatchdogT, Pid), + watchdog(TPid, Data, From, To, Wd, S), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; -%% Death of a watchdog process (#peer.pid) results in the removal of +%% Death of a watchdog process (#watchdog.pid) results in the removal of %% it's peer and any associated conn record when 'DOWN' is received. -%% Death of a peer process process (#conn.pid, #peer.conn) results in +%% Death of a peer process process (#peer.pid, #watchdog.peer) results in %% ?WD_DOWN. %% Monitor process has died. Just die with a reason that tells @@ -667,27 +660,27 @@ mod_state(Alias, ModS) -> %%% --------------------------------------------------------------------------- %% remove_transport -shutdown(Refs, #state{peerT = PeerT}) +shutdown(Refs, #state{watchdogT = WatchdogT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, PeerT); + ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT); %% application/service shutdown -shutdown(Reason, #state{peerT = PeerT}) +shutdown(Reason, #state{watchdogT = WatchdogT}) when Reason == application; Reason == service -> diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, [], - PeerT)). + WatchdogT)). %% st/2 -st(#peer{ref = Ref, pid = Pid}, Refs) -> +st(#watchdog{ref = Ref, pid = Pid}, Refs) -> lists:member(Ref, Refs) andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up %% st/3 -st(#peer{pid = Pid}, Reason, Acc) -> +st(#watchdog{pid = Pid}, Reason, Acc) -> Pid ! {shutdown, self(), Reason}, [Pid | Acc]. @@ -822,22 +815,22 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{peerT = PeerT, - connT = ConnT, +start(Ref, Type, Opts, #state{watchdogT = WatchdogT, + peerT = PeerT, options = SvcOpts, service_name = SvcName, service = Svc}) when Type == connect; Type == accept -> - Pid = s(Type, Ref, {ConnT, + Pid = s(Type, Ref, {PeerT, Opts, SvcName, SvcOpts, merge_service(Opts, Svc)}), - insert(PeerT, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + insert(WatchdogT, #watchdog{pid = Pid, + type = Type, + ref = Ref, + options = Opts}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -884,12 +877,12 @@ ms(_, Svc) -> %%% # accepted/3 %%% --------------------------------------------------------------------------- -accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> - #peer{ref = Ref, type = accept = T, conn = false, options = Opts} - = P - = fetch(PeerT, Pid), - insert(PeerT, P#peer{conn = true}), %% mark replacement as started - start(Ref, T, Opts, S). %% start new watchdog +accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> + #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} + = Wd + = fetch(WatchdogT, Pid), + insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -900,29 +893,29 @@ fetch(Tid, Key) -> %%% --------------------------------------------------------------------------- %% Watchdog has a new open connection. -watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) -> - connection_up({TPid, T}, Peer, State); +watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) -> + connection_up({TPid, T}, Wd, State); %% Watchdog has a new connection that will be opened after DW[RA] %% exchange. -watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) -> - reopen({TPid, T}, Peer, State); +watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) -> + reopen({TPid, T}, Wd, State); %% Watchdog has recovered a suspect connection. -watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) -> - #peer{conn = TPid} = Peer, %% assert - connection_up(Peer, State); +watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> + #watchdog{peer = TPid} = Wd, %% assert + connection_up(Wd, State); %% Watchdog has an unresponsive connection. -watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) -> - #peer{conn = TPid} = Peer, %% assert - connection_down(Peer, To, State); +watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> + #watchdog{peer = TPid} = Wd, %% assert + connection_down(Wd, To, State); %% Watchdog has lost its connection. -watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) -> - close(Peer, S), - connection_down(Peer, To, S), - ets:delete(ConnT, TPid); +watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> + close(Wd, S), + connection_down(Wd, To, S), + ets:delete(PeerT, TPid); watchdog(_, [], _, _, _, _) -> ok. @@ -934,32 +927,32 @@ watchdog(_, [], _, _, _, _) -> %% Watchdog process has reached state OKAY. connection_up({TPid, {Caps, SApps, Pkt}}, - #peer{pid = Pid} - = P, - #state{connT = ConnT} + #watchdog{pid = Pid} + = Wd, + #state{peerT = PeerT} = S) -> - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - insert(ConnT, C), - connection_up([Pkt], P#peer{conn = TPid}, C, S). + Pr = #peer{pid = TPid, + apps = SApps, + caps = Caps, + watchdog = Pid}, + insert(PeerT, Pr), + connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S). %%% --------------------------------------------------------------------------- %%% # reopen/3 %%% --------------------------------------------------------------------------- reopen({TPid, {Caps, SApps, _Pkt}}, - #peer{pid = Pid} - = P, - #state{peerT = PeerT, - connT = ConnT}) -> - insert(ConnT, #conn{pid = TPid, + #watchdog{pid = Pid} + = Wd, + #state{watchdogT = WatchdogT, + peerT = PeerT}) -> + insert(PeerT, #peer{pid = TPid, apps = SApps, caps = Caps, - peer = Pid}), - insert(PeerT, P#peer{wd_state = ?WD_REOPEN, - conn = TPid}). + watchdog = Pid}), + insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, + peer = TPid}). %%% --------------------------------------------------------------------------- %%% # connection_up/2 @@ -968,26 +961,26 @@ reopen({TPid, {Caps, SApps, _Pkt}}, %% Watchdog has recovered as suspect connection. Note that there has %% been no new capabilties exchange in this case. -connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - connection_up([], P, fetch(ConnT, TPid), S). +connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> + connection_up([], Wd, fetch(PeerT, TPid), S). %% connection_up/4 connection_up(Extra, - #peer{conn = TPid} - = P, - #conn{apps = SApps, caps = Caps} - = C, - #state{peerT = PeerT, + #watchdog{peer = TPid} + = Wd, + #peer{apps = SApps, caps = Caps} + = Pr, + #state{watchdogT = WatchdogT, local_peers = LDict, service_name = SvcName, service = #diameter_service{applications = Apps}} = S) -> - insert(PeerT, P#peer{wd_state = ?WD_OKAY}), + insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), request_peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, Extra). + report_status(up, Wd, Pr, S, Extra). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1026,35 +1019,35 @@ peer_cb(MFA, Alias) -> %%% # connection_down/3 %%% --------------------------------------------------------------------------- -connection_down(#peer{wd_state = ?WD_OKAY, - conn = TPid} - = P, - #conn{caps = Caps, +connection_down(#watchdog{state = ?WD_OKAY, + peer = TPid} + = Wd, + #peer{caps = Caps, apps = SApps} - = C, + = Pr, #state{service_name = SvcName, service = #diameter_service{applications = Apps}, local_peers = LDict} = S) -> - report_status(down, P, C, S, []), + report_status(down, Wd, Pr, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), request_peer_down(TPid, S); -connection_down(#peer{}, #conn{}, _) -> +connection_down(#watchdog{}, #peer{}, _) -> ok; -connection_down(#peer{wd_state = WS, - conn = TPid} - = P, +connection_down(#watchdog{state = WS, + peer = TPid} + = Wd, To, - #state{peerT = PeerT, - connT = ConnT} + #state{watchdogT = WatchdogT, + peerT = PeerT} = S) when is_atom(To) -> - insert(PeerT, P#peer{wd_state = To}), + insert(WatchdogT, Wd#watchdog{state = To}), ?WD_OKAY == WS andalso - connection_down(P, fetch(ConnT, TPid), S). + connection_down(Wd, fetch(PeerT, TPid), S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1077,19 +1070,19 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> %% Watchdog process has died. -peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> - P = fetch(PeerT, Pid), - ets:delete_object(PeerT, P), +peer_down(Pid, Reason, #state{watchdogT = WatchdogT} = S) -> + P = fetch(WatchdogT, Pid), + ets:delete_object(WatchdogT, P), closed(Reason, P, S), restart(P,S), peer_down(P,S). %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{wd_state = WS, - ref = Ref, - type = Type, - options = Opts}, + #watchdog{state = WS, + ref = Ref, + type = Type, + options = Opts}, #state{service_name = SvcName}) when WS /= ?WD_OKAY -> send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); @@ -1097,14 +1090,14 @@ closed(_, _, _) -> ok. %% The watchdog has never reached OKAY ... -peer_down(#peer{conn = B}, _) +peer_down(#watchdog{peer = B}, _) when is_boolean(B) -> ok; %% ... or maybe it has. -peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - connection_down(P, ?WD_DOWN, S), - ets:delete(ConnT, TPid). +peer_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> + connection_down(Wd, ?WD_DOWN, S), + ets:delete(PeerT, TPid). %% restart/2 @@ -1114,22 +1107,22 @@ restart(P,S) -> %% restart/1 %% Always try to reconnect. -restart(#peer{ref = Ref, - type = connect = T, - options = Opts, - started = Time}) -> +restart(#watchdog{ref = Ref, + type = connect = T, + options = Opts, + started = Time}) -> {Time, {Ref, T, Opts}}; %% Transport connection hasn't yet been accepted ... -restart(#peer{ref = Ref, - type = accept = T, - options = Opts, - conn = false, - started = Time}) -> +restart(#watchdog{ref = Ref, + type = accept = T, + options = Opts, + peer = false, + started = Time}) -> {Time, {Ref, T, Opts}}; %% ... or it has: a replacement has already been spawned. -restart(#peer{type = accept}) -> +restart(#watchdog{type = accept}) -> false. %% q_restart/2 @@ -1190,12 +1183,12 @@ tc(false = No, _, _) -> %% removed %% the accepting watchdog upon reception of a CER from the previously %% connected peer, or us after reconnect_timer timeout. -close(#peer{type = connect}, _) -> +close(#watchdog{type = connect}, _) -> ok; -close(#peer{type = accept, - pid = Pid, - ref = Ref, - options = Opts}, +close(#watchdog{type = accept, + pid = Pid, + ref = Ref, + options = Opts}, #state{service_name = SvcName}) -> c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). @@ -1219,11 +1212,11 @@ c(Pid, false, _Opts) -> %%% --------------------------------------------------------------------------- reconnect(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, - type = connect, - options = Opts} - = fetch(PeerT, Pid), + watchdogT = WatchdogT}) -> + #watchdog{ref = Ref, + type = connect, + options = Opts} + = fetch(WatchdogT, Pid), send_event(SvcName, {reconnect, Ref, Opts}). %%% --------------------------------------------------------------------------- @@ -1703,8 +1696,8 @@ request_peer_down(TPid, S) -> %%% recv_request/3 %%% --------------------------------------------------------------------------- -recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> - try ets:lookup(ConnT, TPid) of +recv_request(TPid, Pkt, {PeerT, SvcName, Apps, Mask}) -> + try ets:lookup(PeerT, TPid) of [C] -> recv_request(C, TPid, Pkt, SvcName, Apps, Mask); [] -> %% transport has gone down @@ -1716,7 +1709,7 @@ recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> %% recv_request/5 -recv_request(#conn{apps = SApps, caps = Caps}, +recv_request(#peer{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, @@ -2536,11 +2529,11 @@ rt(#request{packet = #diameter_packet{msg = Msg}, %%% --------------------------------------------------------------------------- report_status(Status, - #peer{ref = Ref, - conn = TPid, - type = Type, - options = Opts}, - #conn{apps = [_|_] = As, + #watchdog{ref = Ref, + peer = TPid, + type = Type, + options = Opts}, + #peer{apps = [_|_] = As, caps = Caps}, #state{service_name = SvcName} = S, @@ -2909,8 +2902,8 @@ eq(Any, Id, PeerId) -> %% transports/1 -transports(#state{peerT = PeerT}) -> - ets:select(PeerT, [{#peer{conn = '$1', _ = '_'}, +transports(#state{watchdogT = WatchdogT}) -> + ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, [{'is_pid', '$1'}], ['$1']}]). @@ -2964,11 +2957,12 @@ tagged_info(Item, S) undefined end; -tagged_info(TPid, #state{peerT = PT, connT = CT}) +tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT}) when is_pid(TPid) -> try - [#conn{peer = Pid}] = ets:lookup(CT, TPid), - [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid), + [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), + [#watchdog{ref = Ref, type = Type, options = Opts}] + = ets:lookup(WatchdogT, Pid), [{ref, Ref}, {type, Type}, {options, Opts}] @@ -3051,11 +3045,11 @@ complete(Pre) -> %% info_stats/1 -info_stats(#state{peerT = PeerT}) -> - MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, +info_stats(#state{watchdogT = WatchdogT}) -> + MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'}, [{'is_pid', '$2'}], [['$1', '$2']]}], - try ets:select(PeerT, MatchSpec) of + try ets:select(WatchdogT, MatchSpec) of L -> diameter_stats:read(lists:append(L)) catch @@ -3065,7 +3059,8 @@ info_stats(#state{peerT = PeerT}) -> %% info_transport/1 %% %% One entry per configured transport. Statistics for each entry are -%% the accumulated values for the ref and associated peer pids. +%% the accumulated values for the ref and associated watchdog/peer +%% pids. info_transport(S) -> PeerD = peer_dict(S, config_dict(S)), @@ -3098,42 +3093,42 @@ transport([[_,_] | L]) -> %% Possibly many peer entries for a listening transport. Note that all %% have the same options by construction, which is not terribly space -%% efficient. (TODO: all entries for the same Ref should share options.) +%% efficient. transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> [{type, listen}, {options, Opts}, {accept, [lists:nthtail(2,L) || L <- Ls]}]. -peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) -> - try ets:tab2list(PeerT) of +peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> + try ets:tab2list(WatchdogT) of L -> - lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L) + lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. -peer_acc(ConnT, Acc, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts, - wd_state = WS, - started = T, - conn = TPid}) -> +peer_acc(PeerT, Acc, #watchdog{pid = Pid, + type = Type, + ref = Ref, + options = Opts, + state = WS, + started = At, + peer = TPid}) -> dict:append(Ref, [{type, Type}, {options, Opts}, - {watchdog, {Pid, T, WS}} - | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], + {watchdog, {Pid, At, WS}} + | info_peer(PeerT, TPid, WS)], Acc). -info_conn(ConnT, TPid, true) - when is_pid(TPid) -> - try ets:lookup(ConnT, TPid) of - T -> info_conn(T) +info_peer(PeerT, TPid, WS) + when is_pid(TPid), WS /= ?WD_DOWN -> + try ets:lookup(PeerT, TPid) of + T -> info_peer(T) catch error: badarg -> [] %% service has gone down end; -info_conn(_, _, _) -> +info_peer(_, _, _) -> []. %% The point of extracting the config here is so that 'transport' info @@ -3152,12 +3147,12 @@ config_acc({Ref, T, Opts}, Dict) config_acc(_, Dict) -> Dict. -info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> +info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> [{peer, {Pid, T}}, {apps, SApps}, {caps, info_caps(Caps)} | try [{port, info_port(Pid)}] catch _:_ -> [] end]; -info_conn([] = No) -> +info_peer([] = No) -> No. %% Extract information that the processes involved are expected to |