From ae41fb531bbbd1bc8f9f6a82eb41c4eeb2da63c4 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Mon, 30 May 2011 15:20:19 +0000
Subject: [PATCH] Resolve several issues with the External Changelog with regards to Cookies and changes with updates and purging. More specifically these changes are resolving the following issues : OPENDJ-57 - ECL: lastChangeNumber and firstChangeNumber reset to zero when the changelog is purged to empty OPENDJ-172 - External ChangeLog Cookie varies when searching with an empty cookie. Cookie should be reproducible. OPENDJ-173 - External ChangeLog cookies content is altered by Change purging and prevents from continuing search with a previous returned cookie.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java       |    2 
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java        |   21 +++--
 opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java        |    8 +
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java           |  110 +++++++++++++++++++++++++++
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |   23 -----
 opends/src/server/org/opends/server/replication/common/ServerState.java             |   27 ++++++
 opends/src/server/org/opends/server/replication/server/DbHandler.java               |    7 +
 opends/src/server/org/opends/server/replication/server/DraftCNDB.java               |   28 +++++++
 8 files changed, 193 insertions(+), 33 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index e8fea1c..ff19e57 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -518,4 +518,31 @@
   {
     return saved;
   }
+
+  /**
+   * Build a copy of the ServerState with only ChangeNumbers older than
+   * a specific ChangeNumber. This is used when building the initial
+   * Cookie in the External Changelog, to cope with purged changes.
+   * @param cn The ChangeNumber to compare the ServerState with
+   * @return a copy of the ServerState which only contains the ChangeNumbers
+   *         older than cn.
+   */
+  public ServerState duplicateOnlyOlderThan(ChangeNumber cn)
+  {
+    ServerState newState = new ServerState();
+    synchronized (list)
+    {
+      for (Integer key  : list.keySet())
+      {
+        ChangeNumber change = list.get(key);
+        Integer id =  change.getServerId();
+        if (change.older(cn))
+        {
+          newState.list.put(id,change);
+        }
+      }
+    }
+    return newState;
+  }
+
 }
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 43cdff7..cfb83e4 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -466,6 +466,13 @@
 
     ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
 
+    // Find the last changeNumber before the trimDate, in the Database.
+    ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
+    if (lastBeforeTrimDate != null)
+    {
+      // If we found it, we want to stop trimming when reaching it.
+      trimDate = lastBeforeTrimDate;
+    }
     // In case of deadlock detection by the Database, this thread can
     // by aborted by a DeadlockException. This is a transient error and
     // the transaction should be attempted again.
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index f42a956..85af56e 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -668,6 +668,34 @@
     }
 
     /**
+     * Getter for the integer value of the current curson, representing
+     * the current DraftChangeNumber being processed.
+     *
+     * @return the current DraftCN as an integer.
+     */
+    public int currentKey()
+    {
+       try
+      {
+        OperationStatus status =
+          cursor.getCurrent(key, entry, LockMode.DEFAULT);
+
+        if (status != OperationStatus.SUCCESS)
+        {
+          return -1;
+        }
+        String str = decodeUTF8(key.getData());
+        int draftCN = new Integer(str);
+        return draftCN;
+      }
+      catch(Exception e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
+      return -1;
+    }
+
+    /**
      * Returns the replication changeNumber associated with the current key.
      * @return the replication changeNumber
      */
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index dcf15e6..ef4df7c 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -389,18 +389,22 @@
 
               // We don't use the returned endState but it's updating CN as
               // reading
-              domain.getEligibleState(crossDomainEligibleCN, false);
+              domain.getEligibleState(crossDomainEligibleCN);
 
               ChangeNumber fcn = startState.getMaxChangeNumber(
                   cn.getServerId());
 
-              if (cn.older(fcn))
+              int currentKey = cursor.currentKey();
+              // Do not delete the lastKey. This should allow us to
+              // preserve last change number over time.
+              if ((currentKey != lastkey) && (cn.older(fcn)))
               {
                 size++;
                 cursor.delete();
               }
               else
               {
+                firstkey = currentKey;
                 finished = true;
               }
             }
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 624c058..3ba19c3 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -182,8 +182,8 @@
               + ")" +
               "] [nextNonEligibleMsg="      + nextNonEligibleMsg +
               "] [startState=" + startState +
-              "] [stopState= " + stopState +
-              "] [currentState= " + currentState + "]]");
+              "] [stopState=" + stopState +
+              "] [currentState=" + currentState + "]]");
     }
 
     /**
@@ -749,7 +749,7 @@
           if (isPersistent ==
             StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
           {
-            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN, true);
+            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
             startStatesFromProvidedCookie.remove(rsd.getBaseDn());
           }
           else
@@ -766,7 +766,12 @@
               // let's start traversing this domain from the beginning of
               // what we have in the replication changelog
               if (newDomainCtxt.startState == null)
-                newDomainCtxt.startState = new ServerState();
+              {
+                ChangeNumber latestTrimCN =
+                    new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0,0);
+                newDomainCtxt.startState = rsd.getStartState()
+                        .duplicateOnlyOlderThan(latestTrimCN);
+              }
             }
             else
             {
@@ -807,7 +812,7 @@
             }
 
             // Set the stop state for the domain from the eligibleCN
-            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN, true);
+            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
           }
           newDomainCtxt.currentState = new ServerState();
 
@@ -822,9 +827,8 @@
           rsd.registerHandler(mh);
           newDomainCtxt.mh = mh;
 
-          previousCookie.update(
-              newDomainCtxt.rsd.getBaseDn(),
-              newDomainCtxt.startState);
+          previousCookie.update(newDomainCtxt.rsd.getBaseDn(),
+                                newDomainCtxt.startState);
 
           // store the new context
           tmpSet.add(newDomainCtxt);
@@ -1052,6 +1056,7 @@
           ResultCode.UNWILLING_TO_PERFORM,
           ERR_INVALID_COOKIE_SYNTAX.get());
     }
+
     excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
     replicationServer.disableEligibility(excludedServiceIDs);
     eligibleCN = replicationServer.getEligibleCN();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index b512b7e..2a6c676 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -450,6 +450,12 @@
            */
           cn = null;
         }
+        else
+        {
+          str = decodeUTF8(key.getData());
+          cn= new ChangeNumber(str);
+          // There can't be 2 counter record next to each other
+        }
       }
     }
     catch (DatabaseException e)
@@ -470,6 +476,110 @@
   }
 
   /**
+   * Try to find in the DB, the change number right before the one
+   * passed as a parameter.
+   *
+   * @param changeNumber
+   *          The changeNumber from which we start searching.
+   * @return the changeNumber right before the one passed as a parameter.
+   *         Can return null if there is none.
+   */
+  public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber)
+  {
+
+    if (changeNumber == null)
+      return null;
+
+    Cursor cursor = null;
+    ChangeNumber cn = null;
+
+    DatabaseEntry key = new ReplicationKey(changeNumber);
+    DatabaseEntry data = new DatabaseEntry();
+
+    Transaction txn = null;
+
+    dbCloseLock.readLock().lock();
+    try
+    {
+      cursor = db.openCursor(txn, null);
+      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
+              OperationStatus.SUCCESS)
+      {
+        // We can move close to the changeNumber.
+        // Let's move to the previous change.
+        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
+                OperationStatus.SUCCESS)
+        {
+          String str = decodeUTF8(key.getData());
+          cn = new ChangeNumber(str);
+          if (ReplicationDB.isaCounter(cn))
+          {
+            if (cursor.getPrev(key, data,
+                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
+            {
+              // database starts with a counter record.
+              cn = null;
+            }
+            else
+            {
+              str = decodeUTF8(key.getData());
+              cn= new ChangeNumber(str);
+              // There can't be 2 counter record next to each other
+            }
+          }
+        }
+        // else, there was no change previous to our changeNumber.
+      }
+      else
+      {
+        // We could not move the cursor past to the changeNumber
+        // Check if the last change is older than changeNumber
+        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
+                OperationStatus.SUCCESS)
+        {
+          String str = decodeUTF8(key.getData());
+          cn = new ChangeNumber(str);
+          if (ReplicationDB.isaCounter(cn))
+          {
+            if (cursor.getPrev(key, data,
+              LockMode.DEFAULT) != OperationStatus.SUCCESS)
+            {
+              /*
+               * database only contain a counter record, should not be
+               * possible, but Ok, let's just say no change Number
+               */
+              cn = null;
+            }
+            else
+            {
+              str = decodeUTF8(key.getData());
+              cn= new ChangeNumber(str);
+              // There can't be 2 counter record next to each other
+            }
+          }
+        }
+      }
+    }
+    catch (DatabaseException e)
+    {
+      /* database is faulty */
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+      mb.append(stackTraceToSingleLineString(e));
+      logError(mb.toMessage());
+      // TODO: Verify if shutting down replication is the right thing to do
+      replicationServer.shutdown();
+      cn = null;
+    }
+    finally
+    {
+      closeLockedCursor(cursor);
+    }
+    return cn;
+  }
+
+
+  /**
    * {@inheritDoc}
    */
   @Override
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c1a33a4..a21b51b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1977,7 +1977,7 @@
           continue;
 
         result.update(rsd.getBaseDn(), rsd.getEligibleState(
-            getEligibleCN(),false));
+            getEligibleCN()));
       }
     }
     return result;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index f7182f6..64cc575 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3153,12 +3153,9 @@
    * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
    *
    * @param eligibleCN              The provided eligibleCN.
-   * @param allowOlderThanPurgeDate When true, the returned state can be older
-   *                                than the purge date of the domain.
    * @return The computed eligible server state.
    */
-  public ServerState getEligibleState(ChangeNumber eligibleCN,
-      boolean allowOlderThanPurgeDate)
+  public ServerState getEligibleState(ChangeNumber eligibleCN)
   {
     ServerState result = new ServerState();
 
@@ -3225,24 +3222,6 @@
       }
     }
 
-    if (allowOlderThanPurgeDate == false)
-    {
-      boolean domainPurged = true;
-      long latestDomainTrimDate = getLatestDomainTrimDate();
-      Iterator<Integer> it = result.iterator();
-      while (it.hasNext())
-      {
-        int sid = it.next();
-        ChangeNumber cn = result.getMaxChangeNumber(sid);
-        if ((cn.getTime()>0) && (cn.getTime()<latestDomainTrimDate))
-          result.update(new ChangeNumber(0,0,sid));
-        else
-          domainPurged = false;
-      }
-      if (domainPurged == true)
-        result.clear();
-    }
-
     if (debugEnabled())
       TRACER.debugInfo("In " + this
         + " getEligibleState() result is " + result);

--
Gitblit v1.10.0