From 0107a1af4a4325c502fe2632c7d0f4dcf3da98ba Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 16 Dec 2008 17:03:47 +0000
Subject: [PATCH] Assured Replication: - all unit tests for safe data mode - assured replication code corrections (thanks to safe data unit tests) => Still every unit tests for safe read mode to do...

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  172 +++++++++++++++++++++++++++++++++++---------------------
 1 files changed, 107 insertions(+), 65 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index cf68944..9408c24 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -276,10 +276,10 @@
         // According to assured sub-mode, prepare structures to keep track of
         // the acks we are interested in.
         AssuredMode assuredMode = update.getAssuredMode();
-        if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+        if (assuredMode == AssuredMode.SAFE_DATA_MODE)
         {
           preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
-        } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
+        } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
         {
           preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
         } else
@@ -525,7 +525,15 @@
         for (ServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
-            expectedServers.add(handler.getServerId());
+            // No ack expected from a RS with different group id
+          {
+            if ((generationId > 0) &&
+              (generationId == handler.getGenerationId()))
+              // No ack expected from a RS with bad gen id
+            {
+              expectedServers.add(handler.getServerId());
+            }
+          }
         }
       }
 
@@ -538,13 +546,29 @@
           continue;
         }
         if (handler.getGroupId() == groupId)
+          // No ack expected from a DS with different group id
         {
-          if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
+          ServerStatus serverStatus = handler.getStatus();
+          if (serverStatus == ServerStatus.NORMAL_STATUS)
           {
             expectedServers.add(handler.getServerId());
           } else
+            // No ack expected from a DS with wrong status
           {
-            wrongStatusServers.add(handler.getServerId());
+            if (serverStatus == ServerStatus.DEGRADED_STATUS)
+            {
+              wrongStatusServers.add(handler.getServerId());
+            } else
+            {
+              /*
+               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
+               * We do not want this to be reported as an error to the update
+               * maker -> no pollution or potential missunderstanding when
+               * reading logs or monitoring and it was just administration (for
+               * instance new server is being configured in topo: it goes in bad
+               * gen then then full full update).
+               */
+            }
           }
         }
       }
@@ -589,7 +613,7 @@
     UpdateMsg update, ServerHandler sourceHandler) throws IOException
   {
     ChangeNumber cn = update.getChangeNumber();
-    boolean interestedInAcks = true;
+    boolean interestedInAcks = false;
     byte safeDataLevel = update.getSafeDataLevel();
     byte groupId = replicationServer.getGroupId();
     byte sourceGroupId = sourceHandler.getGroupId();
@@ -600,47 +624,55 @@
         Short.toString(replicationServer.getServerId()),
         Byte.toString(safeDataLevel), baseDn, update.toString());
       logError(errorMsg);
-      interestedInAcks = false;
     } else if (sourceGroupId != groupId)
     {
       // Assured feature does not cross different group ids
-      interestedInAcks = false;
     } else
     {
-      if (sourceHandler.isLDAPserver())
+      if ((generationId > 0) &&
+        (generationId == sourceHandler.getGenerationId()))
+        // Ignore assured updates from wrong generationid servers
       {
-        if (safeDataLevel == (byte) 1)
+        if (sourceHandler.isLDAPserver())
         {
-          // Immediatly return the ack for an assured message in safe data mode
-          // with safe data level 1, coming from a DS. No need to wait for more
-          // acks
-          AckMsg ack = new AckMsg(cn);
-          sourceHandler.sendAck(ack);
-          interestedInAcks = false; // No further acks to obtain
+          if (safeDataLevel == (byte) 1)
+          {
+            // Immediatly return the ack for an assured message in safe data
+            // mode with safe data level 1, coming from a DS. No need to wait
+            // for more acks
+            AckMsg ack = new AckMsg(cn);
+            sourceHandler.sendAck(ack);
+          } else
+          {
+            if (safeDataLevel != (byte) 0)
+            {
+              // level > 1 : We need further acks
+              // The message will be posted in assured mode to elligible
+              // servers. The embedded safe data level is not changed, and his
+              // value will be used by a remote RS to determine if he must send
+              // an ack (level > 1) or not (level = 1)
+              interestedInAcks = true;
+            } else
+            {
+              // Should never happen
+            }
+          }
         } else
-        {
-          // level > 1 : We need further acks
-          // The message will be posted in assured mode to elligible servers.
-          // The embedded safe data level is not changed, and his value will be
-          // used by a remote RS to determine if he must send an ack (level > 1)
-          // or not (level = 1)
-        }
-      } else
-      { // A RS sent us the safe data message, for sure no futher acks to wait
-        interestedInAcks = false;
-        if (safeDataLevel == (byte) 1)
-        {
-          // The original level was 1 so the RS that sent us this message should
-          // have already sent his ack to the sender DS. Level 1 has already
-          // been reached so no further acks to wait
-          // This should not happen in theory as the sender RS server should
-          // have sent us a matching not assured message so we should not come
-          // to here.
-        } else
-        {
-          // level > 1, so Ack this message to originator RS
-          AckMsg ack = new AckMsg(cn);
-          sourceHandler.sendAck(ack);
+        { // A RS sent us the safe data message, for sure no futher acks to wait
+          if (safeDataLevel == (byte) 1)
+          {
+            // The original level was 1 so the RS that sent us this message
+            // should have already sent his ack to the sender DS. Level 1 has
+            // already been reached so no further acks to wait.
+            // This should not happen in theory as the sender RS server should
+            // have sent us a matching not assured message so we should not come
+            // to here.
+          } else
+          {
+            // level > 1, so Ack this message to originator RS
+            AckMsg ack = new AckMsg(cn);
+            sourceHandler.sendAck(ack);
+          }
         }
       }
     }
@@ -654,39 +686,46 @@
         for (ServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
-            expectedServers.add(handler.getServerId());
+            // No ack expected from a RS with different group id
+          {
+            if ((generationId > 0) &&
+              (generationId == handler.getGenerationId()))
+              // No ack expected from a RS with bad gen id
+            {
+              expectedServers.add(handler.getServerId());
+            }
+          }
         }
       }
-
-      // Look for DS elligible for assured
-      for (ServerHandler handler : directoryServers.values())
-      {
-        // Don't forward the change to the server that just sent it
-        if (handler == sourceHandler)
-        {
-          continue;
-        }
-        if (handler.getGroupId() == groupId)
-          expectedServers.add(handler.getServerId());
-      }
     }
 
     // Return computed structures
     PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
-    if (interestedInAcks && (expectedServers.size() > 0))
+    int nExpectedServers = expectedServers.size();
+    if (interestedInAcks) // interestedInAcks so level > 1
     {
-      // Some other acks to wait for
-      preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
-        sourceHandler, update.getSafeDataLevel());
-      preparedAssuredInfo.expectedServers = expectedServers;
-    }
-
-    if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
-    {
-      // level > 1 and source is a DS but no elligible servers found, send the
-      // ack immediatly
-      AckMsg ack = new AckMsg(cn);
-      sourceHandler.sendAck(ack);
+      if (nExpectedServers > 0)
+      {
+        // Some other acks to wait for
+        int sdl = update.getSafeDataLevel();
+        int neededAdditionalServers = sdl - 1;
+        // Change the number of expected acks if not enough available elligible
+        // servers: the level is a best effort thing, we do not want to timeout
+        // at every assured SD update for instance if a RS has had his gen id
+        // resetted
+        byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
+          (byte)sdl : // Keep level as it was
+          (byte)(nExpectedServers+1)); // Change level to match what's available
+        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+          sourceHandler, finalSdl, expectedServers);
+        preparedAssuredInfo.expectedServers = expectedServers;
+      } else
+      {
+        // level > 1 and source is a DS but no elligible servers found, send the
+        // ack immediatly
+        AckMsg ack = new AckMsg(cn);
+        sourceHandler.sendAck(ack);
+      }
     }
 
     return preparedAssuredInfo;
@@ -1436,6 +1475,9 @@
    */
   public void shutdown()
   {
+    // Terminate the assured timer
+    assuredTimeoutTimer.cancel();
+
     // Close session with other changelogs
     for (ServerHandler serverHandler : replicationServers.values())
     {

--
Gitblit v1.10.0