diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 103 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 5 |
2 files changed, 70 insertions, 38 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index ef660a10b9..72b886bcdd 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -119,15 +119,15 @@ {id = now(), service_name, %% as passed to start_service/2, key in ?STATE_TABLE service :: #diameter_service{}, - peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm - connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up + peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm + connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] monitor = false :: false | pid(), %% process to die with - options :: [{atom(), term()}]}). -% :: [{sequence, diameter:sequence()} %% sequence mask -% | {share_peers, boolean()} %% broadcast peers to remote nodes? -% | {use_shared_peers, boolean()}]}).%% use broadcasted peers? + options + :: [{sequence, diameter:sequence()} %% sequence mask + | {share_peers, boolean()} %% broadcast peers to remote nodes? + | {use_shared_peers, boolean()}]}).%% use broadcasted peers? %% shared_peers reflects the peers broadcast from remote nodes. Note %% that the state term itself doesn't change, which is relevant for %% the stateless application callbacks since the state is retrieved @@ -145,7 +145,7 @@ :: match(op_state() | {op_state(), wd_state()}), started = now(), %% at process start conn = false :: match(boolean() | pid())}). - %% true at accept, pid() at connection_up (connT key) + %% true at accepted, pid() at connection_up or reopen %% Record representing a peer_fsm process. -record(conn, @@ -524,8 +524,6 @@ handle_info(T, #state{} = S) -> case transition(T,S) of ok -> {noreply, S}; - #state{} = NS -> - {noreply, NS}; {stop, Reason} -> {stop, {shutdown, Reason}, S} end; @@ -542,15 +540,26 @@ transition({accepted, Pid, TPid}, S) -> %% Peer process has a new open connection. transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S); + connection_up(Pid, T, S), + ok; + +%% Peer process has a new connection that will be opened after +%% watchdog exchange. This message was added long after connection_up, +%% to communicate the information as soon as it's available. Leave +%% connection_up as is it for now, duplicated information and all. +transition({reopen, Pid, T}, S) -> + reopen(Pid, T, S), + ok; %% Peer process has left state open. transition({connection_down, Pid}, S) -> - connection_down(Pid, S); + connection_down(Pid, S), + ok; %% Peer process has returned to state open. transition({connection_up, Pid}, S) -> - connection_up(Pid, S); + connection_up(Pid, S), + ok; %% Accepting transport has lost connectivity. transition({close, Pid}, S) -> @@ -599,7 +608,8 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> %% Local peer process has died. transition({'DOWN', _, process, Pid, Reason}, S) when node(Pid) == node() -> - peer_down(Pid, Reason, S); + peer_down(Pid, Reason, S), + ok; %% Remote service wants to know about shared transports. transition({service, Pid}, S) -> @@ -775,16 +785,20 @@ shutdown(#state{peerT = PeerT}) -> wait(Fun, T) -> diameter_lib:wait(ets:foldl(Fun, [], T)). -st(#peer{conn = B}, Acc) - when is_boolean(B) -> - Acc; -st(#peer{conn = Pid}, Acc) -> +st(#peer{op_state = {OS,_}} = P, Acc) -> + st(P#peer{op_state = OS}, Acc); +st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) -> Pid ! shutdown, - [Pid | Acc]. + [Pid | Acc]; +st(#peer{}, Acc) -> + Acc. -sw(#peer{pid = Pid}, Acc) -> +sw(#peer{pid = Pid}, Acc) + when is_pid(Pid) -> exit(Pid, shutdown), - [Pid | Acc]. + [Pid | Acc]; +sw(#peer{}, Acc) -> + Acc. %%% --------------------------------------------------------------------------- %%% # call_service/2 @@ -999,8 +1013,6 @@ fetch(Tid, Key) -> %%% --------------------------------------------------------------------------- %%% # connection_up/3 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Peer process has reached the open state. @@ -1018,9 +1030,29 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, connection_up([Pkt], P#peer{conn = TPid}, C, S). %%% --------------------------------------------------------------------------- +%%% # reopen/3 +%%% --------------------------------------------------------------------------- + +%% Note that this connection_up/3 rewrites the same #conn{} now +%% written here. Both do so in case reopen has not happened in old +%% code. + +reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, + connT = ConnT}) -> + P = fetch(PeerT, Pid), + C = #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}, + + insert(ConnT, C), + #peer{op_state = {?STATE_DOWN, _}} + = P, + insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, + conn = TPid}). + +%%% --------------------------------------------------------------------------- %%% # connection_up/2 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Peer process has transitioned back into the open state. Note that there @@ -1050,8 +1082,7 @@ connection_up(T, P, C, #state{peerT = PeerT, request_peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, T), - S. + report_status(up, P, C, S, T). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1093,8 +1124,6 @@ peer_cb(MFA, Alias) -> %%% --------------------------------------------------------------------------- %%% # connection_down/2 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Peer process has transitioned out of the open state. @@ -1113,8 +1142,8 @@ connection_down(Pid, #state{peerT = PeerT, %% connection_down/3 -connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) -> - S; +connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> + ok; connection_down(#peer{conn = TPid, op_state = {?STATE_UP, _}} @@ -1128,8 +1157,7 @@ connection_down(#peer{conn = TPid, = S) -> report_status(down, P, C, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid, S), - S. + request_peer_down(TPid, S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1148,8 +1176,6 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> %%% --------------------------------------------------------------------------- %%% # peer_down/3 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Peer process has died. @@ -1173,11 +1199,11 @@ closed(_, _, _) -> ok. %% The peer has never come up ... -peer_down(#peer{conn = B}, S) +peer_down(#peer{conn = B}, _) when is_boolean(B) -> - S; + ok; -%% ... or it has. +%% ... or maybe it has. peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> #conn{} = C = fetch(ConnT, TPid), ets:delete_object(ConnT, C), @@ -3167,6 +3193,9 @@ peer_acc(ConnT, Acc, #peer{pid = Pid, | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], Acc). +info_conn(ConnT, [TPid], B) -> + info_conn(ConnT, TPid, B); + info_conn(ConnT, TPid, true) when is_pid(TPid) -> try ets:lookup(ConnT, TPid) of diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 1045d9ad9f..b37a1a10e9 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -272,12 +272,15 @@ transition({open, TPid, Hosts, T} = Open, transition({open = P, TPid, _Hosts, T}, #watchdog{transport = TPid, + parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which - %% time we eraser(open). + %% time we eraser(open). The reopen message is a later addition, + %% to communicate the new capabilities as soon as they're known. putr(P, {TPid, T}), + Pid ! {reopen, self(), {TPid, T}}, set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); |