From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java |  327 ++++++++++++++++--------------------------------------
 1 files changed, 98 insertions(+), 229 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 5f1b63e..a106a98 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
 import org.opends.server.types.*;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
  * This class publishes some monitoring information below <code>
  * cn=monitor</code>.
  */
-public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
+public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
 {
   /**
    * The tracer object for the debug logger.
@@ -74,7 +66,7 @@
   /** FIXME What is this field used for? */
   private volatile long oldestChangeNumber = NO_KEY;
   /**
-   * The newest changenumber stored in the DB. It is used to avoid trimming the
+   * The newest changenumber stored in the DB. It is used to avoid purging the
    * record with the newest changenumber. The newest record in the changenumber
    * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
    * then retrieved on server startup.
@@ -86,48 +78,25 @@
    * condition between:
    * <ol>
    * <li>this atomic long being incremented for a new record ('recordB')</li>
-   * <li>the current newest record ('recordA') being trimmed from the DB</li>
+   * <li>the current newest record ('recordA') being purged from the DB</li>
    * <li>'recordB' failing to be inserted in the DB</li>
    * </ol>
    */
   private final AtomicLong lastGeneratedChangeNumber;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
-  /**
-   * A dedicated thread loops trim().
-   * <p>
-   * trim() : deletes from the DB a number of changes that are older than a
-   * certain date.
-   */
-  private DirectoryThread trimmingThread;
-  /**
-   * The trim age in milliseconds. Changes record in the change DB that are
-   * older than this age are removed.
-   * <p>
-   * FIXME it never gets updated even when the replication server purge delay is
-   * updated
-   */
-  private volatile long trimAge;
-
-  private ReplicationServer replicationServer;
 
 
   /**
    * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
    *
-   * @param replicationServer The ReplicationServer that creates this instance.
-   * @param dbenv the Database Env to use to create the ReplicationServer DB.
+   * @param dbEnv the Database Env to use to create the ReplicationServer DB.
    * server for this domain.
    * @throws ChangelogException If a database problem happened
    */
-  public JEChangeNumberIndexDB(ReplicationServer replicationServer,
-      ReplicationDbEnv dbenv) throws ChangelogException
+  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
   {
-    this.replicationServer = replicationServer;
-    this.trimAge = replicationServer.getTrimAge();
-
-    // DB initialization
-    db = new DraftCNDB(dbenv);
+    db = new DraftCNDB(dbEnv);
     final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
     final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
     oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
     DirectoryServer.registerMonitorProvider(dbMonitor);
   }
 
-  /**
-   * Creates and starts the thread trimming the CNIndexDB.
-   */
-  public void startTrimmingThread()
-  {
-    trimmingThread =
-        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
-    trimmingThread.start();
-  }
-
   private long getChangeNumber(ChangeNumberIndexRecord record)
       throws ChangelogException
   {
@@ -251,77 +210,82 @@
       notifyAll();
     }
 
-    if (trimmingThread != null)
-    {
-      try
-      {
-        trimmingThread.join();
-      }
-      catch (InterruptedException ignored)
-      {
-        // Nothing can be done about it, just proceed
-      }
-    }
-
     db.shutdown();
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
   }
 
   /**
-   * Run method for this class.
-   * Periodically Flushes the ReplicationServerDomain cache from memory to the
-   * stable storage and trims the old updates.
+   * Synchronously purges the change number index DB up to and excluding the
+   * provided timestamp.
+   *
+   * @param purgeTimestamp
+   *          the timestamp up to which purging must happen
+   * @return the {@link MultiDomainServerState} object that drives purging the
+   *         replicaDBs.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  @Override
-  public void run()
+  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
+      throws ChangelogException
   {
-    while (!shutdown.get())
+    if (isEmpty())
     {
-      try {
-        trim(shutdown);
+      return null;
+    }
 
-        synchronized (this)
-        {
-          if (!shutdown.get())
-          {
-            try
-            {
-              wait(1000);
-            }
-            catch (InterruptedException e)
-            {
-              Thread.currentThread().interrupt();
-            }
-          }
-        }
-      }
-      catch (Exception end)
+    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+
+    final DraftCNDBCursor cursor = db.openDeleteCursor();
+    try
+    {
+      while (!mustShutdown(shutdown) && cursor.next())
       {
-        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-            .get(stackTraceToSingleLineString(end)));
-        if (replicationServer != null)
+        final ChangeNumberIndexRecord record = cursor.currentRecord();
+        if (record.getChangeNumber() != oldestChangeNumber)
         {
-          replicationServer.shutdown();
+          oldestChangeNumber = record.getChangeNumber();
         }
-        break;
+        if (record.getChangeNumber() == newestChangeNumber)
+        {
+          // do not purge the newest record to avoid having the last generated
+          // changenumber dropping back to 0 if the server restarts
+          return getPurgeCookie(record);
+        }
+
+        if (record.getCSN().isOlderThan(purgeCSN))
+        {
+          cursor.delete();
+        }
+        else
+        {
+          // Current record is not old enough to purge.
+          return getPurgeCookie(record);
+        }
       }
+
+      return null;
+    }
+    catch (ChangelogException e)
+    {
+      cursor.abort();
+      throw e;
+    }
+    catch (Exception e)
+    {
+      cursor.abort();
+      throw new ChangelogException(e);
+    }
+    finally
+    {
+      cursor.close();
     }
   }
 
-  /**
-   * Trim old changes from this database.
-   *
-   * @param shutdown
-   *          AtomicBoolean telling whether the current run must be stopped
-   * @throws ChangelogException
-   *           In case of database problem.
-   */
-  public void trim(AtomicBoolean shutdown) throws ChangelogException
+  private MultiDomainServerState getPurgeCookie(
+      final ChangeNumberIndexRecord record) throws DirectoryException
   {
-    if (trimAge == 0)
-      return;
-
-    clear(null, shutdown);
+    // Do not include the record's CSN to avoid having it purged
+    return new MultiDomainServerState(record.getPreviousCookie());
   }
 
   /**
@@ -334,127 +298,49 @@
    * @throws ChangelogException
    *           if a database problem occurs.
    */
-  public void clear(DN baseDNToClear) throws ChangelogException
-  {
-    clear(baseDNToClear, null);
-  }
-
-  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
-      throws ChangelogException
+  public void removeDomain(DN baseDNToClear) throws ChangelogException
   {
     if (isEmpty())
     {
       return;
     }
 
-    for (int i = 0; i < 100; i++)
+    final DraftCNDBCursor cursor = db.openDeleteCursor();
+    try
     {
-      if (mustShutdown(shutdown))
+      boolean isOldestRecord = true;
+      while (!mustShutdown(shutdown) && cursor.next())
       {
-        return;
-      }
-      final DraftCNDBCursor cursor = db.openDeleteCursor();
-      try
-      {
-        for (int j = 0; j < 50; j++)
+        final ChangeNumberIndexRecord record = cursor.currentRecord();
+        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
         {
-          // let's traverse the CNIndexDB
-          if (mustShutdown(shutdown) || !cursor.next())
-          {
-            cursor.close();
-            return;
-          }
-
-          final ChangeNumberIndexRecord record = cursor.currentRecord();
-          if (record.getChangeNumber() != oldestChangeNumber)
-          {
-            oldestChangeNumber = record.getChangeNumber();
-          }
-          if (record.getChangeNumber() == newestChangeNumber)
-          {
-            // do not trim the newest record to avoid having the last generated
-            // changenumber dropping back to 0 if the server restarts
-            cursor.close();
-            return;
-          }
-
-          if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
-          {
-            cursor.delete();
-            continue;
-          }
-
-          final ReplicationServerDomain domain =
-              replicationServer.getReplicationServerDomain(record.getBaseDN());
-          if (domain == null)
-          {
-            // the domain has been removed since the record was written in the
-            // CNIndexDB, thus it makes no sense to keep this record in the DB.
-            cursor.delete();
-            continue;
-          }
-
-          // FIXME there is an opportunity for a phantom record in the CNIndexDB
-          // if the replicaDB gets purged after call to domain.getOldestState().
-          final CSN csn = record.getCSN();
-          final ServerState oldestState = domain.getOldestState();
-          final CSN fcsn = oldestState.getCSN(csn.getServerId());
-          if (csn.isOlderThan(fcsn))
-          {
-            // This change which has already been purged from the corresponding
-            // replicaDB => purge it from CNIndexDB
-            cursor.delete();
-            continue;
-          }
-
-          ServerState csnVector;
-          try
-          {
-            Map<DN, ServerState> csnStartStates =
-                MultiDomainServerState.splitGenStateToServerStates(
-                        record.getPreviousCookie());
-            csnVector = csnStartStates.get(record.getBaseDN());
-
-            if (debugEnabled())
-              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
-                  + csnVector + " -- StartState:" + oldestState);
-          }
-          catch(Exception e)
-          {
-            // We could not parse the MultiDomainServerState from the record
-            // FIXME this is quite an aggressive delete()
-            cursor.delete();
-            continue;
-          }
-
-          if (csnVector == null
-              || (csnVector.getCSN(csn.getServerId()) != null
-                    && !csnVector.cover(oldestState)))
-          {
-            cursor.delete();
-            if (debugEnabled())
-              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
-                  + "Not covering startState");
-            continue;
-          }
-
           oldestChangeNumber = record.getChangeNumber();
-          cursor.close();
+        }
+        if (record.getChangeNumber() == newestChangeNumber)
+        {
+          // do not purge the newest record to avoid having the last generated
+          // changenumber dropping back to 0 if the server restarts
           return;
         }
 
-        cursor.close();
+        if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
+        {
+          cursor.delete();
+        }
+        else
+        {
+          isOldestRecord = false;
+        }
       }
-      catch (ChangelogException e)
-      {
-        cursor.abort();
-        throw e;
-      }
-      catch (Exception e)
-      {
-        cursor.abort();
-        throw new ChangelogException(e);
-      }
+    }
+    catch (ChangelogException e)
+    {
+      cursor.abort();
+      throw e;
+    }
+    finally
+    {
+      cursor.close();
     }
   }
 
@@ -469,9 +355,7 @@
    */
   private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
   {
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override
     public List<Attribute> getMonitorData()
     {
@@ -509,18 +393,14 @@
       return 0;
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override
     public String getMonitorInstanceName()
     {
       return "ChangeNumber Index Database";
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override
     public void initializeMonitorProvider(MonitorProviderCfg configuration)
                             throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public String toString()
   {
@@ -540,15 +418,6 @@
   }
 
   /**
-   * Set the Purge delay for this db Handler.
-   * @param delay The purge delay in Milliseconds.
-   */
-  public void setPurgeDelay(long delay)
-  {
-    trimAge = delay;
-  }
-
-  /**
    * Clear the changes from this DB (from both memory cache and DB storage).
    *
    * @throws ChangelogException

--
Gitblit v1.10.0