From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |  189 +++++++++++++++++++++++++++++++++-------------
 1 files changed, 135 insertions(+), 54 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fbd19e8..f18bd1e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
 import java.io.File;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,9 +36,11 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -46,6 +49,7 @@
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
 
 import com.forgerock.opendj.util.Pair;
 
@@ -87,7 +91,7 @@
    * The handler of the changelog database, the database stores the relation
    * between a change number and the associated cookie.
    * <p>
-   * Guarded by cnIndexDBLock
+   * @GuardedBy("cnIndexDBLock")
    */
   private JEChangeNumberIndexDB cnIndexDB;
   private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -96,6 +100,15 @@
   /** Used for protecting {@link ChangeNumberIndexDB} related state. */
   private final Object cnIndexDBLock = new Object();
 
+  /**
+   * The purge delay (in milliseconds). Records in the changelog DB that are
+   * older than this delay might be removed.
+   */
+  private long purgeDelayInMillis;
+  private final AtomicReference<ChangelogDBPurger> cnPurger =
+      new AtomicReference<ChangelogDBPurger>();
+  private volatile long latestPurgeDate;
+
   /** The local replication server. */
   private final ReplicationServer replicationServer;
   private AtomicBoolean shutdown = new AtomicBoolean();
@@ -312,13 +325,9 @@
       initializeChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
-        final ChangeNumberIndexer indexer =
-            new ChangeNumberIndexer(this, changelogState);
-        if (cnIndexer.compareAndSet(null, indexer))
-        {
-          indexer.start();
-        }
+        startIndexer(changelogState);
       }
+      setPurgeDelay(replicationServer.getPurgeDelay());
     }
     catch (ChangelogException e)
     {
@@ -374,12 +383,17 @@
     // - then throw the first encountered exception
     ChangelogException firstException = null;
 
-    final ChangeNumberIndexer indexer = cnIndexer.get();
+    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
     if (indexer != null)
     {
       indexer.initiateShutdown();
-      cnIndexer.compareAndSet(indexer, null);
     }
+    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+    if (purger != null)
+    {
+      purger.initiateShutdown();
+    }
+
     try
     {
       shutdownCNIndexDB();
@@ -581,7 +595,7 @@
       {
         try
         {
-          cnIndexDB.clear(baseDN);
+          cnIndexDB.removeDomain(baseDN);
         }
         catch (ChangelogException e)
         {
@@ -607,7 +621,9 @@
         firstException = e;
       }
       else if (debugEnabled())
+      {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
     }
 
     if (firstException != null)
@@ -618,18 +634,24 @@
 
   /** {@inheritDoc} */
   @Override
-  public void setPurgeDelay(long delay)
+  public void setPurgeDelay(long purgeDelayInMillis)
   {
-    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
-    if (cnIndexDB != null)
+    this.purgeDelayInMillis = purgeDelayInMillis;
+    final ChangelogDBPurger purger;
+    if (purgeDelayInMillis > 0)
     {
-      cnIndexDB.setPurgeDelay(delay);
-    }
-    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
-    {
-      for (JEReplicaDB replicaDB : domainMap.values())
+      purger = new ChangelogDBPurger();
+      if (cnPurger.compareAndSet(null, purger))
       {
-        replicaDB.setPurgeDelay(delay);
+        purger.start();
+      } // otherwise a purger was already running
+    }
+    else
+    {
+      purger = cnPurger.getAndSet(null);
+      if (purger != null)
+      {
+        purger.initiateShutdown();
       }
     }
   }
@@ -639,19 +661,13 @@
   public void setComputeChangeNumber(boolean computeChangeNumber)
       throws ChangelogException
   {
-    final ChangeNumberIndexer indexer;
     if (computeChangeNumber)
     {
-      final ChangelogState changelogState = dbEnv.readChangelogState();
-      indexer = new ChangeNumberIndexer(this, changelogState);
-      if (cnIndexer.compareAndSet(null, indexer))
-      {
-        indexer.start();
-      }
+      startIndexer(dbEnv.readChangelogState());
     }
     else
     {
-      indexer = cnIndexer.getAndSet(null);
+      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
       if (indexer != null)
       {
         indexer.initiateShutdown();
@@ -659,48 +675,34 @@
     }
   }
 
+  private void startIndexer(final ChangelogState changelogState)
+  {
+    final ChangeNumberIndexer indexer =
+        new ChangeNumberIndexer(this, changelogState);
+    if (cnIndexer.compareAndSet(null, indexer))
+    {
+      indexer.start();
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
   public long getDomainLatestTrimDate(DN baseDN)
   {
-    long latest = 0;
-    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
-    {
-      if (latest == 0 || latest < replicaDB.getLatestTrimDate())
-      {
-        latest = replicaDB.getLatestTrimDate();
-      }
-    }
-    return latest;
+    return latestPurgeDate;
   }
 
   /** {@inheritDoc} */
   @Override
   public ChangeNumberIndexDB getChangeNumberIndexDB()
   {
-    return getChangeNumberIndexDB(true);
-  }
-
-  /**
-   * Returns the {@link ChangeNumberIndexDB} object.
-   *
-   * @param startTrimmingThread
-   *          whether the trimming thread should be started
-   * @return the {@link ChangeNumberIndexDB} object
-   */
-  ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
-  {
     synchronized (cnIndexDBLock)
     {
       if (cnIndexDB == null)
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
-          if (startTrimmingThread)
-          {
-            cnIndexDB.startTrimmingThread();
-          }
+          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
         }
         catch (Exception e)
         {
@@ -830,4 +832,83 @@
     }
     // TODO save this state in the changelogStateDB?
   }
+
+  /**
+   * The thread purging the changelogDB on a regular interval. Records are
+   * purged from the changelogDB is they are older than a delay specified in
+   * seconds. The purge process works in two steps:
+   * <ol>
+   * <li>first purge the changeNumberIndexDB and retrieve information to drive
+   * replicaDBs purging</li>
+   * <li>proceed to purge each replicaDBs based on the information collected
+   * when purging the changeNumberIndexDB</li>
+   * </ol>
+   */
+  private final class ChangelogDBPurger extends DirectoryThread
+  {
+
+    protected ChangelogDBPurger()
+    {
+      super("changelog DB purger");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void run()
+    {
+      // initialize CNIndexDB
+      getChangeNumberIndexDB();
+      while (!isShutdownInitiated())
+      {
+        try
+        {
+          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+          if (localCNIndexDB == null)
+          { // shutdown has been called
+            return;
+          }
+
+          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+          final MultiDomainServerState purgeUpToCookie =
+              localCNIndexDB.purgeUpTo(purgeTimestamp);
+          if (purgeUpToCookie == null)
+          { // this can happen when the change number index DB is empty
+            continue;
+          }
+
+          /*
+           * Drive purge of the replica DBs by the oldest non purged cookie in
+           * the change number index DB.
+           */
+          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
+              : domainToReplicaDBs.entrySet())
+          {
+            final DN baseDN = entry1.getKey();
+            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
+            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+            {
+              final Integer serverId = entry2.getKey();
+              final JEReplicaDB replicaDB = entry2.getValue();
+              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+            }
+          }
+
+          latestPurgeDate = purgeTimestamp;
+
+          // purge delay is specified in seconds so it should not be a problem
+          // to sleep for 500 millis
+          sleep(500);
+        }
+        catch (Exception e)
+        {
+          logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+              .get(stackTraceToSingleLineString(e)));
+          if (replicationServer != null)
+          {
+            replicationServer.shutdown();
+          }
+        }
+      }
+    }
+  }
 }

--
Gitblit v1.10.0