From c14ef2dbd4e0e259fc401acce2ee43dc9fd4f4f9 Mon Sep 17 00:00:00 2001
From: Anders Svensson <anders@erlang.org>
Date: Tue, 9 Oct 2012 11:03:25 +0200
Subject: Add reopen message from watchdog

This makes capabilities available to service_info as soon as
capabilities exchange has been completed. In particular, before state
OKAY is reached.
---
 lib/diameter/src/base/diameter_service.erl  | 103 ++++++++++++++++++----------
 lib/diameter/src/base/diameter_watchdog.erl |   5 +-
 2 files changed, 70 insertions(+), 38 deletions(-)

diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index ef660a10b9..72b886bcdd 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -119,15 +119,15 @@
         {id = now(),
          service_name,  %% as passed to start_service/2, key in ?STATE_TABLE
          service :: #diameter_service{},
-         peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm
-         connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up
+         peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm
+         connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen
          shared_peers = ?Dict:new(),         %% Alias -> [{TPid, Caps}, ...]
          local_peers = ?Dict:new(),          %% Alias -> [{TPid, Caps}, ...]
          monitor = false :: false | pid(),   %% process to die with
-         options :: [{atom(), term()}]}).
-%         :: [{sequence, diameter:sequence()}  %% sequence mask
-%             | {share_peers, boolean()}  %% broadcast peers to remote nodes?
-%             | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+         options
+         :: [{sequence, diameter:sequence()}  %% sequence mask
+             | {share_peers, boolean()}  %% broadcast peers to remote nodes?
+             | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
 %% shared_peers reflects the peers broadcast from remote nodes. Note
 %% that the state term itself doesn't change, which is relevant for
 %% the stateless application callbacks since the state is retrieved
@@ -145,7 +145,7 @@
                  :: match(op_state() | {op_state(), wd_state()}),
          started = now(),      %% at process start
          conn = false :: match(boolean() | pid())}).
-                      %% true at accept, pid() at connection_up (connT key)
+                      %% true at accepted, pid() at connection_up or reopen
 
 %% Record representing a peer_fsm process.
 -record(conn,
@@ -524,8 +524,6 @@ handle_info(T, #state{} = S) ->
     case transition(T,S) of
         ok ->
             {noreply, S};
-        #state{} = NS ->
-            {noreply, NS};
         {stop, Reason} ->
             {stop, {shutdown, Reason}, S}
     end;
@@ -542,15 +540,26 @@ transition({accepted, Pid, TPid}, S) ->
 
 %% Peer process has a new open connection.
 transition({connection_up, Pid, T}, S) ->
-    connection_up(Pid, T, S);
+    connection_up(Pid, T, S),
+    ok;
+
+%% Peer process has a new connection that will be opened after
+%% watchdog exchange. This message was added long after connection_up,
+%% to communicate the information as soon as it's available. Leave
+%% connection_up as is it for now, duplicated information and all.
+transition({reopen, Pid, T}, S) ->
+    reopen(Pid, T, S),
+    ok;
 
 %% Peer process has left state open.
 transition({connection_down, Pid}, S) ->
-    connection_down(Pid, S);
+    connection_down(Pid, S),
+    ok;
 
 %% Peer process has returned to state open.
 transition({connection_up, Pid}, S) ->
-    connection_up(Pid, S);
+    connection_up(Pid, S),
+    ok;
 
 %% Accepting transport has lost connectivity.
 transition({close, Pid}, S) ->
@@ -599,7 +608,8 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
 %% Local peer process has died.
 transition({'DOWN', _, process, Pid, Reason}, S)
   when node(Pid) == node() ->
-    peer_down(Pid, Reason, S);
+    peer_down(Pid, Reason, S),
+    ok;
 
 %% Remote service wants to know about shared transports.
 transition({service, Pid}, S) ->
@@ -775,16 +785,20 @@ shutdown(#state{peerT = PeerT}) ->
 wait(Fun, T) ->
     diameter_lib:wait(ets:foldl(Fun, [], T)).
 
-st(#peer{conn = B}, Acc)
-  when is_boolean(B) ->
-    Acc;
-st(#peer{conn = Pid}, Acc) ->
+st(#peer{op_state = {OS,_}} = P, Acc) ->
+    st(P#peer{op_state = OS}, Acc);
+st(#peer{op_state = ?STATE_UP, conn = Pid}, Acc) ->
     Pid ! shutdown,
-    [Pid | Acc].
+    [Pid | Acc];
+st(#peer{}, Acc) ->
+    Acc.
 
-sw(#peer{pid = Pid}, Acc) ->
+sw(#peer{pid = Pid}, Acc)
+  when is_pid(Pid) ->
     exit(Pid, shutdown),
-    [Pid | Acc].
+    [Pid | Acc];
+sw(#peer{}, Acc) ->
+    Acc.
 
 %%% ---------------------------------------------------------------------------
 %%% # call_service/2
@@ -999,8 +1013,6 @@ fetch(Tid, Key) ->
 
 %%% ---------------------------------------------------------------------------
 %%% # connection_up/3
-%%%
-%%% Output: #state{}
 %%% ---------------------------------------------------------------------------
 
 %% Peer process has reached the open state.
@@ -1017,10 +1029,30 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
     insert(ConnT, C),
     connection_up([Pkt], P#peer{conn = TPid}, C, S).
 
+%%% ---------------------------------------------------------------------------
+%%% # reopen/3
+%%% ---------------------------------------------------------------------------
+
+%% Note that this connection_up/3 rewrites the same #conn{} now
+%% written here. Both do so in case reopen has not happened in old
+%% code.
+
+reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT,
+                                                connT = ConnT}) ->
+    P = fetch(PeerT, Pid),
+    C = #conn{pid = TPid,
+              apps = SApps,
+              caps = Caps,
+              peer = Pid},
+
+    insert(ConnT, C),
+    #peer{op_state = {?STATE_DOWN, _}}
+        = P,
+    insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN},
+                         conn = TPid}).
+
 %%% ---------------------------------------------------------------------------
 %%% # connection_up/2
-%%%
-%%% Output: #state{}
 %%% ---------------------------------------------------------------------------
 
 %% Peer process has transitioned back into the open state. Note that there
@@ -1050,8 +1082,7 @@ connection_up(T, P, C, #state{peerT = PeerT,
 
     request_peer_up(TPid),
     insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
-    report_status(up, P, C, S, T),
-    S.
+    report_status(up, P, C, S, T).
 
 insert_local_peer(SApps, T, LDict) ->
     lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -1093,8 +1124,6 @@ peer_cb(MFA, Alias) ->
 
 %%% ---------------------------------------------------------------------------
 %%% # connection_down/2
-%%%
-%%% Output: #state{}
 %%% ---------------------------------------------------------------------------
 
 %% Peer process has transitioned out of the open state.
@@ -1113,8 +1142,8 @@ connection_down(Pid, #state{peerT = PeerT,
 
 %% connection_down/3
 
-connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) ->
-    S;
+connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) ->
+    ok;
 
 connection_down(#peer{conn = TPid,
                       op_state = {?STATE_UP, _}}
@@ -1128,8 +1157,7 @@ connection_down(#peer{conn = TPid,
                 = S) ->
     report_status(down, P, C, S, []),
     remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
-    request_peer_down(TPid, S),
-    S.
+    request_peer_down(TPid, S).
 
 remove_local_peer(SApps, T, LDict) ->
     lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1148,8 +1176,6 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) ->
 
 %%% ---------------------------------------------------------------------------
 %%% # peer_down/3
-%%%
-%%% Output: #state{}
 %%% ---------------------------------------------------------------------------
 
 %% Peer process has died.
@@ -1173,11 +1199,11 @@ closed(_, _, _) ->
     ok.
 
 %% The peer has never come up ...
-peer_down(#peer{conn = B}, S)
+peer_down(#peer{conn = B}, _)
   when is_boolean(B) ->
-    S;
+    ok;
 
-%% ... or it has.
+%% ... or maybe it has.
 peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
     #conn{} = C = fetch(ConnT, TPid),
     ets:delete_object(ConnT, C),
@@ -3167,6 +3193,9 @@ peer_acc(ConnT, Acc, #peer{pid = Pid,
                  | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
                 Acc).
 
+info_conn(ConnT, [TPid], B) ->
+    info_conn(ConnT, TPid, B);
+
 info_conn(ConnT, TPid, true)
   when is_pid(TPid) ->
     try ets:lookup(ConnT, TPid) of
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 1045d9ad9f..b37a1a10e9 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -272,12 +272,15 @@ transition({open, TPid, Hosts, T} = Open,
 
 transition({open = P, TPid, _Hosts, T},
            #watchdog{transport = TPid,
+                     parent = Pid,
                      status = down}
            = S) ->
     %% Store the info we need to notify the parent to reopen the
     %% connection after the requisite DWA's are received, at which
-    %% time we eraser(open).
+    %% time we eraser(open). The reopen message is a later addition,
+    %% to communicate the new capabilities as soon as they're known.
     putr(P, {TPid, T}),
+    Pid ! {reopen, self(), {TPid, T}},
     set_watchdog(send_watchdog(S#watchdog{status = reopen,
                                           num_dwa = 0}));
 
-- 
cgit v1.2.3