aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base')
-rw-r--r--lib/diameter/src/base/diameter_service.erl103
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl5
2 files changed, 70 insertions, 38 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index ef660a10b9..72b886bcdd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -119,15 +119,15 @@
{id = now(),
service_name, %% as passed to start_service/2, key in ?STATE_TABLE
service :: #diameter_service{},
- peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm
- connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up
+ peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm
+ connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen
shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
monitor = false :: false | pid(), %% process to die with
- options :: [{atom(), term()}]}).
-% :: [{sequence, diameter:sequence()} %% sequence mask
-% | {share_peers, boolean()} %% broadcast peers to remote nodes?
-% | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+ options
+ :: [{sequence, diameter:sequence()} %% sequence mask
+ | {share_peers, boolean()} %% broadcast peers to remote nodes?
+ | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
%% shared_peers reflects the peers broadcast from remote nodes. Note
%% that the state term itself doesn't change, which is relevant for
%% the stateless application callbacks since the state is retrieved
@@ -145,7 +145,7 @@
:: match(op_state() | {op_state(), wd_state()}),
started = now(), %% at process start
conn = false :: match(boolean() | pid())}).
- %% true at accept, pid() at connection_up (connT key)
+ %% true at accepted, pid() at connection_up or reopen
%% Record representing a peer_fsm process.
-record(conn,
@@ -524,8 +524,6 @@ handle_info(T, #state{} = S) ->
case transition(T,S) of
ok ->
{noreply, S};
- #state{} = NS ->
- {noreply, NS};
{stop, Reason} ->
{stop, {shutdown, Reason}, S}
end;
@@ -542,15 +540,26 @@ transition({accepted, Pid, TPid}, S) ->
%% Peer process has a new open connection.
transition({connection_up, Pid, T}, S) ->
- connection_up(Pid, T, S);
+ connection_up(Pid, T, S),
+ ok;
+
+%% Peer process has a new connection that will be opened after
+%% watchdog exchange. This message was added long after connection_up,
+%% to communicate the information as soon as it's available. Leave
+%% connection_up as is it for now, duplicated information and all.
+transition({reopen, Pid, T}, S) ->
+ reopen(Pid, T, S),
+ ok;
%% Peer process has left state open.
transition({connection_down, Pid}, S) ->
- connection_down(Pid, S);
+ connection_down(Pid, S),
+ ok;
%% Peer process has returned to state open.
transition({connection_up, Pid}, S) ->
- connection_up(Pid, S);
+ connection_up(Pid, S),
+ ok;
%% Accepting transport has lost connectivity.
transition({close, Pid}, S) ->
@@ -599,7 +608,8 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
%% Local peer process has died.
transition({'DOWN', _, process, Pid, Reason}, S)
when node(Pid) == node() ->
- peer_down(Pid, Reason, S);
+ peer_down(Pid, Reason, S),
+ ok;
%% Remote service wants to know about shared transports.
transition({service, Pid}, S) ->
@@ -775,16 +785,20 @@ shutdown(#state{peerT = PeerT}) ->
wait(Fun, T) ->
diameter_lib:wait(ets:foldl(Fun, [], T)).
-st(#peer{conn = B}, Acc)
- when is_boolean(B) ->
- Acc;
-st(#peer{conn = Pid}, Acc) ->
+st(#peer{op_state = {OS,_}} = P, Acc) ->
+ st(P#peer{op_state = OS}, Acc);
+st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) ->
Pid ! shutdown,
- [Pid | Acc].
+ [Pid | Acc];
+st(#peer{}, Acc) ->
+ Acc.
-sw(#peer{pid = Pid}, Acc) ->
+sw(#peer{pid = Pid}, Acc)
+ when is_pid(Pid) ->
exit(Pid, shutdown),
- [Pid | Acc].
+ [Pid | Acc];
+sw(#peer{}, Acc) ->
+ Acc.
%%% ---------------------------------------------------------------------------
%%% # call_service/2
@@ -999,8 +1013,6 @@ fetch(Tid, Key) ->
%%% ---------------------------------------------------------------------------
%%% # connection_up/3
-%%%
-%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has reached the open state.
@@ -1018,9 +1030,29 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
connection_up([Pkt], P#peer{conn = TPid}, C, S).
%%% ---------------------------------------------------------------------------
+%%% # reopen/3
+%%% ---------------------------------------------------------------------------
+
+%% Note that this connection_up/3 rewrites the same #conn{} now
+%% written here. Both do so in case reopen has not happened in old
+%% code.
+
+reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT,
+ connT = ConnT}) ->
+ P = fetch(PeerT, Pid),
+ C = #conn{pid = TPid,
+ apps = SApps,
+ caps = Caps,
+ peer = Pid},
+
+ insert(ConnT, C),
+ #peer{op_state = {?STATE_DOWN, _}}
+ = P,
+ insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN},
+ conn = TPid}).
+
+%%% ---------------------------------------------------------------------------
%%% # connection_up/2
-%%%
-%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has transitioned back into the open state. Note that there
@@ -1050,8 +1082,7 @@ connection_up(T, P, C, #state{peerT = PeerT,
request_peer_up(TPid),
insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- report_status(up, P, C, S, T),
- S.
+ report_status(up, P, C, S, T).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -1093,8 +1124,6 @@ peer_cb(MFA, Alias) ->
%%% ---------------------------------------------------------------------------
%%% # connection_down/2
-%%%
-%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has transitioned out of the open state.
@@ -1113,8 +1142,8 @@ connection_down(Pid, #state{peerT = PeerT,
%% connection_down/3
-connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) ->
- S;
+connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) ->
+ ok;
connection_down(#peer{conn = TPid,
op_state = {?STATE_UP, _}}
@@ -1128,8 +1157,7 @@ connection_down(#peer{conn = TPid,
= S) ->
report_status(down, P, C, S, []),
remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
- request_peer_down(TPid, S),
- S.
+ request_peer_down(TPid, S).
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1148,8 +1176,6 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) ->
%%% ---------------------------------------------------------------------------
%%% # peer_down/3
-%%%
-%%% Output: #state{}
%%% ---------------------------------------------------------------------------
%% Peer process has died.
@@ -1173,11 +1199,11 @@ closed(_, _, _) ->
ok.
%% The peer has never come up ...
-peer_down(#peer{conn = B}, S)
+peer_down(#peer{conn = B}, _)
when is_boolean(B) ->
- S;
+ ok;
-%% ... or it has.
+%% ... or maybe it has.
peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
#conn{} = C = fetch(ConnT, TPid),
ets:delete_object(ConnT, C),
@@ -3167,6 +3193,9 @@ peer_acc(ConnT, Acc, #peer{pid = Pid,
| info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
Acc).
+info_conn(ConnT, [TPid], B) ->
+ info_conn(ConnT, TPid, B);
+
info_conn(ConnT, TPid, true)
when is_pid(TPid) ->
try ets:lookup(ConnT, TPid) of
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 1045d9ad9f..b37a1a10e9 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -272,12 +272,15 @@ transition({open, TPid, Hosts, T} = Open,
transition({open = P, TPid, _Hosts, T},
#watchdog{transport = TPid,
+ parent = Pid,
status = down}
= S) ->
%% Store the info we need to notify the parent to reopen the
%% connection after the requisite DWA's are received, at which
- %% time we eraser(open).
+ %% time we eraser(open). The reopen message is a later addition,
+ %% to communicate the new capabilities as soon as they're known.
putr(P, {TPid, T}),
+ Pid ! {reopen, self(), {TPid, T}},
set_watchdog(send_watchdog(S#watchdog{status = reopen,
num_dwa = 0}));