From aacb8bbf0a764ce8eb205e0f6376c055b3e1baa8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Apr 2014 13:30:34 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3314) Re-implement changelog purging logic

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java |   10 +-
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                     |   86 ++++++++++++++-------
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java                             |   35 ++------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java                  |   91 ++++++++++++++--------
 4 files changed, 131 insertions(+), 91 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index a106a98..6c22e5c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -37,7 +37,6 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
 import org.opends.server.types.*;
@@ -218,23 +217,19 @@
    * Synchronously purges the change number index DB up to and excluding the
    * provided timestamp.
    *
-   * @param purgeTimestamp
+   * @param purgeCSN
    *          the timestamp up to which purging must happen
-   * @return the {@link MultiDomainServerState} object that drives purging the
-   *         replicaDBs.
+   * @return the oldest non purged CSN.
    * @throws ChangelogException
    *           if a database problem occurs.
    */
-  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
-      throws ChangelogException
+  public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException
   {
-    if (isEmpty())
+    if (isEmpty() || purgeCSN == null)
     {
       return null;
     }
 
-    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
-
     final DraftCNDBCursor cursor = db.openDeleteCursor();
     try
     {
@@ -245,21 +240,18 @@
         {
           oldestChangeNumber = record.getChangeNumber();
         }
-        if (record.getChangeNumber() == newestChangeNumber)
-        {
-          // do not purge the newest record to avoid having the last generated
-          // changenumber dropping back to 0 if the server restarts
-          return getPurgeCookie(record);
-        }
 
-        if (record.getCSN().isOlderThan(purgeCSN))
+        if (record.getChangeNumber() != newestChangeNumber
+            && record.getCSN().isOlderThan(purgeCSN))
         {
           cursor.delete();
         }
         else
         {
-          // Current record is not old enough to purge.
-          return getPurgeCookie(record);
+          // 1- Current record is not old enough to purge.
+          // 2- Do not purge the newest record to avoid having the last
+          // generated changenumber dropping back to 0 when the server restarts
+          return record.getCSN();
         }
       }
 
@@ -281,13 +273,6 @@
     }
   }
 
-  private MultiDomainServerState getPurgeCookie(
-      final ChangeNumberIndexRecord record) throws DirectoryException
-  {
-    // Do not include the record's CSN to avoid having it purged
-    return new MultiDomainServerState(record.getPreviousCookie());
-  }
-
   /**
    * Clear the changes from this DB (from both memory cache and DB storage) for
    * the provided baseDN.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index f18bd1e..4241fce 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@
 
 import java.io.File;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +39,6 @@
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -64,7 +62,7 @@
 public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
 {
   /** The tracer object for the debug logger. */
-  protected static final DebugTracer TRACER = getTracer();
+  private static final DebugTracer TRACER = getTracer();
 
   /**
    * This map contains the List of updates received from each LDAP server.
@@ -846,6 +844,7 @@
    */
   private final class ChangelogDBPurger extends DirectoryThread
   {
+    private static final int DEFAULT_SLEEP = 500;
 
     protected ChangelogDBPurger()
     {
@@ -862,42 +861,58 @@
       {
         try
         {
-          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
-          if (localCNIndexDB == null)
-          { // shutdown has been called
-            return;
-          }
-
           final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
-          final MultiDomainServerState purgeUpToCookie =
-              localCNIndexDB.purgeUpTo(purgeTimestamp);
-          if (purgeUpToCookie == null)
-          { // this can happen when the change number index DB is empty
-            continue;
+          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+          final CSN oldestNotPurgedCSN;
+
+          // next code assumes that the compute-change-number config
+          // never changes during the life time of an RS
+          if (!config.isComputeChangeNumber())
+          {
+            oldestNotPurgedCSN = purgeCSN;
+          }
+          else
+          {
+            final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+            if (localCNIndexDB == null)
+            { // shutdown has been initiated
+              return;
+            }
+
+            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+            if (oldestNotPurgedCSN == null)
+            { // shutdown may have been initiated...
+              if (!isShutdownInitiated())
+              {
+                // ... or the change number index DB is empty,
+                // wait for new changes to come in.
+
+                // Note we cannot sleep for as long as the purge delay
+                // (3 days default), because we might receive late updates
+                // that will have to be purged before the purge delay elapses.
+                // This can particularly happen in case of network partitions.
+                sleep(DEFAULT_SLEEP);
+              }
+              continue;
+            }
           }
 
-          /*
-           * Drive purge of the replica DBs by the oldest non purged cookie in
-           * the change number index DB.
-           */
-          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
-              : domainToReplicaDBs.entrySet())
+          for (final Map<Integer, JEReplicaDB> domainMap
+              : domainToReplicaDBs.values())
           {
-            final DN baseDN = entry1.getKey();
-            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
-            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+            for (final JEReplicaDB replicaDB : domainMap.values())
             {
-              final Integer serverId = entry2.getKey();
-              final JEReplicaDB replicaDB = entry2.getValue();
-              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+              replicaDB.purgeUpTo(oldestNotPurgedCSN);
             }
           }
 
           latestPurgeDate = purgeTimestamp;
 
-          // purge delay is specified in seconds so it should not be a problem
-          // to sleep for 500 millis
-          sleep(500);
+          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+        }
+        catch (InterruptedException e)
+        {
+          // shutdown initiated?
         }
         catch (Exception e)
         {
@@ -910,5 +925,18 @@
         }
       }
     }
+
+    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+    {
+      final long nextPurgeTime = notPurgedCSN.getTime();
+      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+      if (currentPurgeTime <= nextPurgeTime)
+      {
+        // sleep till the next CSN to purge,
+        return nextPurgeTime - currentPurgeTime;
+      }
+      // wait a bit before purging more
+      return DEFAULT_SLEEP;
+    }
   }
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 05e27d7..51e7a8e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -372,7 +372,7 @@
 
     // Test CNIndexDB is purged when replication change log is purged
     final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
-    cnIndexDB.purgeUpTo(Long.MAX_VALUE);
+    cnIndexDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
     assertTrue(cnIndexDB.isEmpty());
     ECLPurgeCNIndexDBAfterChangelogClear();
 
@@ -2514,44 +2514,69 @@
   {
     String tn = "ECLCompatTestLimits";
     debugInfo(tn, "Starting test\n\n");
-
-    LDIFWriter ldifWriter = getLDIFWriter();
-
-    // search on 'cn=changelog'
-    Set<String> attributes = new LinkedHashSet<String>();
-    if (expectedFirst > 0)
-      attributes.add("firstchangenumber");
-    attributes.add("lastchangenumber");
-    attributes.add("changelog");
-    attributes.add("lastExternalChangelogCookie");
-
     debugInfo(tn, " Search: rootDSE");
-    final InternalSearchOperation searchOp = searchOnRootDSE(attributes);
-    final List<SearchResultEntry> entries = searchOp.getSearchEntries();
-    assertThat(entries).hasSize(1);
-    debugAndWriteEntries(ldifWriter, entries, tn);
 
-    final SearchResultEntry resultEntry = entries.get(0);
-    if (eclEnabled)
-    {
-      if (expectedFirst > 0)
-        checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst));
-      checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast));
-      checkValue(resultEntry, "changelog", String.valueOf("cn=changelog"));
-      assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
-    }
-    else
-    {
-      if (expectedFirst > 0)
-        assertNull(getAttributeValue(resultEntry, "firstchangenumber"));
-      assertNull(getAttributeValue(resultEntry, "lastchangenumber"));
-      assertNull(getAttributeValue(resultEntry, "changelog"));
-      assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
-    }
+    final List<SearchResultEntry> entries =
+        assertECLLimits(eclEnabled, expectedFirst, expectedLast);
 
+    debugAndWriteEntries(getLDIFWriter(), entries, tn);
     debugInfo(tn, "Ending test with success");
   }
 
+  private List<SearchResultEntry> assertECLLimits(
+      boolean eclEnabled, int expectedFirst, int expectedLast) throws Exception
+  {
+    AssertionError e = null;
+
+    int count = 0;
+    while (count < 30)
+    {
+      count++;
+
+      try
+      {
+        final Set<String> attributes = new LinkedHashSet<String>();
+        if (expectedFirst > 0)
+          attributes.add("firstchangenumber");
+        attributes.add("lastchangenumber");
+        attributes.add("changelog");
+        attributes.add("lastExternalChangelogCookie");
+
+        final InternalSearchOperation searchOp = searchOnRootDSE(attributes);
+        final List<SearchResultEntry> entries = searchOp.getSearchEntries();
+        assertThat(entries).hasSize(1);
+
+        final SearchResultEntry resultEntry = entries.get(0);
+        if (eclEnabled)
+        {
+          if (expectedFirst > 0)
+            checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst));
+          checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast));
+          checkValue(resultEntry, "changelog", String.valueOf("cn=changelog"));
+          assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
+        }
+        else
+        {
+          if (expectedFirst > 0)
+            assertNull(getAttributeValue(resultEntry, "firstchangenumber"));
+          assertNull(getAttributeValue(resultEntry, "lastchangenumber"));
+          assertNull(getAttributeValue(resultEntry, "changelog"));
+          assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
+        }
+        return entries;
+      }
+      catch (AssertionError ae)
+      {
+        // try again to see if changes have been persisted
+        e = ae;
+      }
+
+      Thread.sleep(100);
+    }
+    assertNotNull(e);
+    throw e;
+  }
+
   private InternalSearchOperation searchOnRootDSE(Set<String> attributes)
       throws Exception
   {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 5fb4500..10aad5f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -76,8 +76,8 @@
    * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
    * </ol>
    */
-  @Test()
-  void testTrim() throws Exception
+  @Test
+  void testPurge() throws Exception
   {
     ReplicationServer replicationServer = null;
     try
@@ -254,8 +254,10 @@
   {
     TestCaseUtils.startServer();
     final int port = TestCaseUtils.findFreePort();
-    return new ReplicationServer(
-        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null));
+    final ReplServerFakeConfiguration cfg =
+        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null);
+    cfg.setComputeChangeNumber(true);
+    return new ReplicationServer(cfg);
   }
 
   private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,

--
Gitblit v1.10.0