From aba87ac47f81475c93ab6f439f247176334b1ef0 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Thu, 09 Jun 2011 21:24:05 +0000
Subject: [PATCH] Fix issue OPENDJ-67: Investigate ECL change number consistency across replications servers which have been subjected to different purging policies.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                     |   14 ++++
 opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java                     |   17 -----
 opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java                      |   39 ++++++++++++-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java |    3 
 opends/src/server/org/opends/server/replication/server/DbHandler.java                             |   10 ++
 opends/src/server/org/opends/server/replication/server/DraftCNDB.java                             |   63 ++++++++------------
 6 files changed, 85 insertions(+), 61 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index b147246..1ab48e1 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -654,8 +654,14 @@
   public int getCount(ChangeNumber from, ChangeNumber to)
   {
     int c=0;
-    flush();
-    c = db.count(from, to);
+    // Now that we always keep the last ChangeNumber in the DB to avoid
+    // expiring cookies to quickly, we need to check if the "to"
+    // is older than the trim date.
+    if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0)))
+    {
+      flush();
+      c = db.count(from, to);
+    }
     return c;
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index d060a5b..c9f23e5 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -321,7 +321,7 @@
     private final Transaction txn;
     private final DatabaseEntry key;
     private final DatabaseEntry entry;
-
+    private DraftCNData seqnumData = null;
     private boolean isClosed = false;
 
 
@@ -367,18 +367,22 @@
             }
             else
             {
-              // We can move close to the startingChangeNumber.
-              // Let's create a cursor from that point.
-              DatabaseEntry key = new DatabaseEntry();
-              DatabaseEntry data = new DatabaseEntry();
-              if (localCursor.getPrev(
-                  key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS)
+              if (localCursor.getPrev(key, entry, LockMode.DEFAULT)
+                      != OperationStatus.SUCCESS)
               {
                 localCursor.close();
                 localCursor = db.openCursor(localTxn, null);
               }
+              else
+              {
+                 seqnumData =  new DraftCNData(entry.getData());
+              }
             }
           }
+          else
+          {
+            seqnumData = new DraftCNData(entry.getData());
+          }
         }
 
         this.txn = localTxn;
@@ -514,15 +518,10 @@
     {
       try
       {
-        OperationStatus status =
-          cursor.getCurrent(key, entry, LockMode.DEFAULT);
-
-        if (status != OperationStatus.SUCCESS)
+        if (seqnumData != null)
         {
-          return null;
+          return seqnumData.getValue();
         }
-        DraftCNData seqnumData = new DraftCNData(entry.getData());
-        return seqnumData.getValue();
       }
       catch(Exception e)
       {
@@ -539,15 +538,10 @@
     {
       try
       {
-        OperationStatus status =
-          cursor.getCurrent(key, entry, LockMode.DEFAULT);
-
-        if (status != OperationStatus.SUCCESS)
+        if (seqnumData != null)
         {
-          return null;
+          return seqnumData.getServiceID();
         }
-        DraftCNData seqnumData = new DraftCNData(entry.getData());
-        return seqnumData.getServiceID();
       }
       catch(Exception e)
       {
@@ -557,7 +551,7 @@
     }
 
     /**
-     * Getter for the integer value of the current curson, representing
+     * Getter for the integer value of the current cursor, representing
      * the current DraftChangeNumber being processed.
      *
      * @return the current DraftCN as an integer.
@@ -566,13 +560,6 @@
     {
        try
       {
-        OperationStatus status =
-          cursor.getCurrent(key, entry, LockMode.DEFAULT);
-
-        if (status != OperationStatus.SUCCESS)
-        {
-          return -1;
-        }
         String str = decodeUTF8(key.getData());
         int draftCN = new Integer(str);
         return draftCN;
@@ -592,16 +579,10 @@
     {
       try
       {
-        OperationStatus status =
-          cursor.getCurrent(key, entry, LockMode.DEFAULT);
-
-        if (status != OperationStatus.SUCCESS)
+        if (seqnumData != null)
         {
-          return null;
+          return seqnumData.getChangeNumber();
         }
-        DraftCNData seqnumData =
-          new DraftCNData(entry.getData());
-        return seqnumData.getChangeNumber();
       }
       catch(Exception e)
       {
@@ -620,8 +601,16 @@
       OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
       if (status != OperationStatus.SUCCESS)
       {
+        seqnumData = null;
         return false;
       }
+      try {
+        seqnumData = new DraftCNData(entry.getData());
+      }
+      catch(Exception e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
       return true;
     }
 
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index 9230497..0af06df 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,6 +50,8 @@
 import org.opends.server.types.InitializationException;
 
 import com.sleepycat.je.DatabaseException;
+import java.util.HashMap;
+import org.opends.server.replication.common.MultiDomainServerState;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -385,14 +387,45 @@
 
           int currentKey = cursor.currentKey();
 
-          // Do not delete the lastKey. This should allow us to
-          // preserve last change number over time.
-          if ((currentKey != lastkey) && (cn.older(fcn)))
+          if (cn.older(fcn))
           {
             cursor.delete();
             continue;
           }
 
+          ServerState cnVector = null;
+          try
+          {
+            HashMap<String,ServerState> cnStartStates =
+                MultiDomainServerState.splitGenStateToServerStates(
+                        cursor.currentValue());
+            cnVector = cnStartStates.get(serviceID);
+
+            if (debugEnabled())
+              TRACER.debugInfo("DraftCNDBHandler:clear() - ChangeVector:"+
+                      cnVector.toString()+
+                      " -- StartState:"+startState.toString());
+            // cnVector.update(cn);
+          }
+          catch(Exception e)
+          {
+            // We couldn't parse the mdss from the DraftCNData Value
+            assert(false);
+            cursor.delete();
+            continue;
+          }
+
+          if ((cnVector == null)
+                  || ((cnVector.getMaxChangeNumber(cn.getServerId()) != null)
+                      && !cnVector.cover(startState)))
+          {
+            cursor.delete();
+            if (debugEnabled())
+              TRACER.debugInfo("DraftCNDBHandler:clear() - deleted "+
+                      cn.toString()+"Not covering startState");
+            continue;
+          }
+
           firstkey = currentKey;
           cursor.close();
           return;
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
index cd858e7..2c6e2fd 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbIterator.java
@@ -68,23 +68,6 @@
   }
 
   /**
-   * Getter for the value field (external changelog cookie).
-   * @return The value field (external changelog cookie).
-   */
-  public String getValue()
-  {
-    try
-    {
-      return this.draftCNDbCursor.currentValue();
-    }
-    catch(Exception e)
-    {
-      TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      return null;
-    }
-  }
-
-  /**
    * Getter for the serviceID field.
    * @return The service ID.
    */
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 5c96d23..437c634 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1860,6 +1860,7 @@
 
     int firstDraftCN;
     int lastDraftCN;
+    Boolean dbEmpty = false;
     Long newestDate = 0L;
     DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler();
 
@@ -1870,6 +1871,7 @@
     String domainForLastSeqnum = null;
     if (firstDraftCN < 1)
     {
+      dbEmpty = true;
       firstDraftCN = 0;
       lastDraftCN = 0;
     }
@@ -1912,8 +1914,11 @@
         if (domainsServerStateForLastSeqnum == null)
         {
           // Count changes of this domain from the beginning of the changelog
+          ChangeNumber trimCN =
+              new ChangeNumber(rsd.getLatestDomainTrimDate(), 0,0);
           ec = rsd.getEligibleCount(
-              new ServerState(), crossDomainEligibleCN);
+                    rsd.getStartState().duplicateOnlyOlderThan(trimCN),
+                    crossDomainEligibleCN);
         }
         else
         {
@@ -1947,6 +1952,13 @@
           firstDraftCN = 1;
       }
     }
+    if (dbEmpty)
+    {
+      // The database was empty, just keep increasing numbers since last time
+      // we generated one DraftCN.
+      firstDraftCN += lastGeneratedDraftCN;
+      lastDraftCN += lastGeneratedDraftCN;
+    }
     return new int[]{firstDraftCN, lastDraftCN};
   }
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 41e93e4..0910f0d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
+ *      Portions Copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -587,7 +588,7 @@
       }
       else
       {
-        expectedCnt = 1;
+        expectedCnt = 0;
       }
       debugInfo(tn,testcase + " actualCnt=" + actualCnt);
       assertEquals(actualCnt, expectedCnt, testcase);

--
Gitblit v1.10.0