From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java |   84 +++--
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                              |    6 
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                     |  189 ++++++++---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java                                 |   45 -
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                               |   22 
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java                                       |  129 ++------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java   |    5 
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java                             |  327 ++++++--------------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java           |   47 +-
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java                                     |    9 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java                  |   44 +-
 11 files changed, 395 insertions(+), 512 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 97cb8e0..bf00104 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -722,7 +722,7 @@
    * @return  The time after which changes must be deleted from the
    *          persistent storage (in milliseconds).
    */
-  public long getTrimAge()
+  public long getPurgeDelay()
   {
     return this.config.getReplicationPurgeDelay() * 1000;
   }
@@ -780,7 +780,7 @@
     final long newPurgeDelay = config.getReplicationPurgeDelay();
     if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
     {
-      this.changelogDB.setPurgeDelay(getTrimAge());
+      this.changelogDB.setPurgeDelay(getPurgeDelay());
     }
     final boolean computeCN = config.isComputeChangeNumber();
     if (computeCN != oldConfig.isComputeChangeNumber())
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 33eaa30..0489065 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -487,13 +487,6 @@
 
           // OK, the oldest change is older than the medium consistency point
           // let's publish it to the CNIndexDB.
-
-          // Next if statement is ugly but ensures the first change will not be
-          // immediately trimmed from the CNIndexDB. Yuck!
-          if (mediumConsistencyRUV.isEmpty())
-          {
-            mediumConsistencyRUV.replace(baseDN, new ServerState());
-          }
           final String previousCookie = mediumConsistencyRUV.toString();
           final ChangeNumberIndexRecord record =
               new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
@@ -620,20 +613,21 @@
   }
 
   /**
-   * Asks the current thread to clear its state.
+   * Asks the current thread to clear its state and blocks until state is
+   * cleared.
    * <p>
    * This method is only useful for unit tests.
    */
   public void clear()
   {
     doClear.set(true);
-    synchronized (this)
+    while (doClear.get() && !State.TERMINATED.equals(getState()))
     {
-      notify();
-    }
-    while (doClear.get())
-    {
-      // wait until clear() has been done by thread
+      // wait until clear() has been done by thread, always waking it up
+      synchronized (this)
+      {
+        notify();
+      }
       // ensures unit tests wait that this thread's state is cleaned up
       Thread.yield();
     }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 5f1b63e..a106a98 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
 import org.opends.server.types.*;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
  * This class publishes some monitoring information below <code>
  * cn=monitor</code>.
  */
-public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
+public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
 {
   /**
    * The tracer object for the debug logger.
@@ -74,7 +66,7 @@
   /** FIXME What is this field used for? */
   private volatile long oldestChangeNumber = NO_KEY;
   /**
-   * The newest changenumber stored in the DB. It is used to avoid trimming the
+   * The newest changenumber stored in the DB. It is used to avoid purging the
    * record with the newest changenumber. The newest record in the changenumber
    * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
    * then retrieved on server startup.
@@ -86,48 +78,25 @@
    * condition between:
    * <ol>
    * <li>this atomic long being incremented for a new record ('recordB')</li>
-   * <li>the current newest record ('recordA') being trimmed from the DB</li>
+   * <li>the current newest record ('recordA') being purged from the DB</li>
    * <li>'recordB' failing to be inserted in the DB</li>
    * </ol>
    */
   private final AtomicLong lastGeneratedChangeNumber;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
-  /**
-   * A dedicated thread loops trim().
-   * <p>
-   * trim() : deletes from the DB a number of changes that are older than a
-   * certain date.
-   */
-  private DirectoryThread trimmingThread;
-  /**
-   * The trim age in milliseconds. Changes record in the change DB that are
-   * older than this age are removed.
-   * <p>
-   * FIXME it never gets updated even when the replication server purge delay is
-   * updated
-   */
-  private volatile long trimAge;
-
-  private ReplicationServer replicationServer;
 
 
   /**
    * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
    *
-   * @param replicationServer The ReplicationServer that creates this instance.
-   * @param dbenv the Database Env to use to create the ReplicationServer DB.
+   * @param dbEnv the Database Env to use to create the ReplicationServer DB.
    * server for this domain.
    * @throws ChangelogException If a database problem happened
    */
-  public JEChangeNumberIndexDB(ReplicationServer replicationServer,
-      ReplicationDbEnv dbenv) throws ChangelogException
+  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
   {
-    this.replicationServer = replicationServer;
-    this.trimAge = replicationServer.getTrimAge();
-
-    // DB initialization
-    db = new DraftCNDB(dbenv);
+    db = new DraftCNDB(dbEnv);
     final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
     final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
     oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
     DirectoryServer.registerMonitorProvider(dbMonitor);
   }
 
-  /**
-   * Creates and starts the thread trimming the CNIndexDB.
-   */
-  public void startTrimmingThread()
-  {
-    trimmingThread =
-        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
-    trimmingThread.start();
-  }
-
   private long getChangeNumber(ChangeNumberIndexRecord record)
       throws ChangelogException
   {
@@ -251,77 +210,82 @@
       notifyAll();
     }
 
-    if (trimmingThread != null)
-    {
-      try
-      {
-        trimmingThread.join();
-      }
-      catch (InterruptedException ignored)
-      {
-        // Nothing can be done about it, just proceed
-      }
-    }
-
     db.shutdown();
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
   }
 
   /**
-   * Run method for this class.
-   * Periodically Flushes the ReplicationServerDomain cache from memory to the
-   * stable storage and trims the old updates.
+   * Synchronously purges the change number index DB up to and excluding the
+   * provided timestamp.
+   *
+   * @param purgeTimestamp
+   *          the timestamp up to which purging must happen
+   * @return the {@link MultiDomainServerState} object that drives purging the
+   *         replicaDBs.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  @Override
-  public void run()
+  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
+      throws ChangelogException
   {
-    while (!shutdown.get())
+    if (isEmpty())
     {
-      try {
-        trim(shutdown);
+      return null;
+    }
 
-        synchronized (this)
-        {
-          if (!shutdown.get())
-          {
-            try
-            {
-              wait(1000);
-            }
-            catch (InterruptedException e)
-            {
-              Thread.currentThread().interrupt();
-            }
-          }
-        }
-      }
-      catch (Exception end)
+    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+
+    final DraftCNDBCursor cursor = db.openDeleteCursor();
+    try
+    {
+      while (!mustShutdown(shutdown) && cursor.next())
       {
-        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-            .get(stackTraceToSingleLineString(end)));
-        if (replicationServer != null)
+        final ChangeNumberIndexRecord record = cursor.currentRecord();
+        if (record.getChangeNumber() != oldestChangeNumber)
         {
-          replicationServer.shutdown();
+          oldestChangeNumber = record.getChangeNumber();
         }
-        break;
+        if (record.getChangeNumber() == newestChangeNumber)
+        {
+          // do not purge the newest record to avoid having the last generated
+          // changenumber dropping back to 0 if the server restarts
+          return getPurgeCookie(record);
+        }
+
+        if (record.getCSN().isOlderThan(purgeCSN))
+        {
+          cursor.delete();
+        }
+        else
+        {
+          // Current record is not old enough to purge.
+          return getPurgeCookie(record);
+        }
       }
+
+      return null;
+    }
+    catch (ChangelogException e)
+    {
+      cursor.abort();
+      throw e;
+    }
+    catch (Exception e)
+    {
+      cursor.abort();
+      throw new ChangelogException(e);
+    }
+    finally
+    {
+      cursor.close();
     }
   }
 
-  /**
-   * Trim old changes from this database.
-   *
-   * @param shutdown
-   *          AtomicBoolean telling whether the current run must be stopped
-   * @throws ChangelogException
-   *           In case of database problem.
-   */
-  public void trim(AtomicBoolean shutdown) throws ChangelogException
+  private MultiDomainServerState getPurgeCookie(
+      final ChangeNumberIndexRecord record) throws DirectoryException
   {
-    if (trimAge == 0)
-      return;
-
-    clear(null, shutdown);
+    // Do not include the record's CSN to avoid having it purged
+    return new MultiDomainServerState(record.getPreviousCookie());
   }
 
   /**
@@ -334,127 +298,49 @@
    * @throws ChangelogException
    *           if a database problem occurs.
    */
-  public void clear(DN baseDNToClear) throws ChangelogException
-  {
-    clear(baseDNToClear, null);
-  }
-
-  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
-      throws ChangelogException
+  public void removeDomain(DN baseDNToClear) throws ChangelogException
   {
     if (isEmpty())
     {
       return;
     }
 
-    for (int i = 0; i < 100; i++)
+    final DraftCNDBCursor cursor = db.openDeleteCursor();
+    try
     {
-      if (mustShutdown(shutdown))
+      boolean isOldestRecord = true;
+      while (!mustShutdown(shutdown) && cursor.next())
       {
-        return;
-      }
-      final DraftCNDBCursor cursor = db.openDeleteCursor();
-      try
-      {
-        for (int j = 0; j < 50; j++)
+        final ChangeNumberIndexRecord record = cursor.currentRecord();
+        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
         {
-          // let's traverse the CNIndexDB
-          if (mustShutdown(shutdown) || !cursor.next())
-          {
-            cursor.close();
-            return;
-          }
-
-          final ChangeNumberIndexRecord record = cursor.currentRecord();
-          if (record.getChangeNumber() != oldestChangeNumber)
-          {
-            oldestChangeNumber = record.getChangeNumber();
-          }
-          if (record.getChangeNumber() == newestChangeNumber)
-          {
-            // do not trim the newest record to avoid having the last generated
-            // changenumber dropping back to 0 if the server restarts
-            cursor.close();
-            return;
-          }
-
-          if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
-          {
-            cursor.delete();
-            continue;
-          }
-
-          final ReplicationServerDomain domain =
-              replicationServer.getReplicationServerDomain(record.getBaseDN());
-          if (domain == null)
-          {
-            // the domain has been removed since the record was written in the
-            // CNIndexDB, thus it makes no sense to keep this record in the DB.
-            cursor.delete();
-            continue;
-          }
-
-          // FIXME there is an opportunity for a phantom record in the CNIndexDB
-          // if the replicaDB gets purged after call to domain.getOldestState().
-          final CSN csn = record.getCSN();
-          final ServerState oldestState = domain.getOldestState();
-          final CSN fcsn = oldestState.getCSN(csn.getServerId());
-          if (csn.isOlderThan(fcsn))
-          {
-            // This change which has already been purged from the corresponding
-            // replicaDB => purge it from CNIndexDB
-            cursor.delete();
-            continue;
-          }
-
-          ServerState csnVector;
-          try
-          {
-            Map<DN, ServerState> csnStartStates =
-                MultiDomainServerState.splitGenStateToServerStates(
-                        record.getPreviousCookie());
-            csnVector = csnStartStates.get(record.getBaseDN());
-
-            if (debugEnabled())
-              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
-                  + csnVector + " -- StartState:" + oldestState);
-          }
-          catch(Exception e)
-          {
-            // We could not parse the MultiDomainServerState from the record
-            // FIXME this is quite an aggressive delete()
-            cursor.delete();
-            continue;
-          }
-
-          if (csnVector == null
-              || (csnVector.getCSN(csn.getServerId()) != null
-                    && !csnVector.cover(oldestState)))
-          {
-            cursor.delete();
-            if (debugEnabled())
-              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
-                  + "Not covering startState");
-            continue;
-          }
-
           oldestChangeNumber = record.getChangeNumber();
-          cursor.close();
+        }
+        if (record.getChangeNumber() == newestChangeNumber)
+        {
+          // do not purge the newest record to avoid having the last generated
+          // changenumber dropping back to 0 if the server restarts
           return;
         }
 
-        cursor.close();
+        if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
+        {
+          cursor.delete();
+        }
+        else
+        {
+          isOldestRecord = false;
+        }
       }
-      catch (ChangelogException e)
-      {
-        cursor.abort();
-        throw e;
-      }
-      catch (Exception e)
-      {
-        cursor.abort();
-        throw new ChangelogException(e);
-      }
+    }
+    catch (ChangelogException e)
+    {
+      cursor.abort();
+      throw e;
+    }
+    finally
+    {
+      cursor.close();
     }
   }
 
@@ -469,9 +355,7 @@
    */
   private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
   {
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override
     public List<Attribute> getMonitorData()
     {
@@ -509,18 +393,14 @@
       return 0;
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override
     public String getMonitorInstanceName()
     {
       return "ChangeNumber Index Database";
     }
 
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override
     public void initializeMonitorProvider(MonitorProviderCfg configuration)
                             throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public String toString()
   {
@@ -540,15 +418,6 @@
   }
 
   /**
-   * Set the Purge delay for this db Handler.
-   * @param delay The purge delay in Milliseconds.
-   */
-  public void setPurgeDelay(long delay)
-  {
-    trimAge = delay;
-  }
-
-  /**
    * Clear the changes from this DB (from both memory cache and DB storage).
    *
    * @throws ChangelogException
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fbd19e8..f18bd1e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
 import java.io.File;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,9 +36,11 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
 import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
@@ -46,6 +49,7 @@
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
 
 import com.forgerock.opendj.util.Pair;
 
@@ -87,7 +91,7 @@
    * The handler of the changelog database, the database stores the relation
    * between a change number and the associated cookie.
    * <p>
-   * Guarded by cnIndexDBLock
+   * @GuardedBy("cnIndexDBLock")
    */
   private JEChangeNumberIndexDB cnIndexDB;
   private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -96,6 +100,15 @@
   /** Used for protecting {@link ChangeNumberIndexDB} related state. */
   private final Object cnIndexDBLock = new Object();
 
+  /**
+   * The purge delay (in milliseconds). Records in the changelog DB that are
+   * older than this delay might be removed.
+   */
+  private long purgeDelayInMillis;
+  private final AtomicReference<ChangelogDBPurger> cnPurger =
+      new AtomicReference<ChangelogDBPurger>();
+  private volatile long latestPurgeDate;
+
   /** The local replication server. */
   private final ReplicationServer replicationServer;
   private AtomicBoolean shutdown = new AtomicBoolean();
@@ -312,13 +325,9 @@
       initializeChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
-        final ChangeNumberIndexer indexer =
-            new ChangeNumberIndexer(this, changelogState);
-        if (cnIndexer.compareAndSet(null, indexer))
-        {
-          indexer.start();
-        }
+        startIndexer(changelogState);
       }
+      setPurgeDelay(replicationServer.getPurgeDelay());
     }
     catch (ChangelogException e)
     {
@@ -374,12 +383,17 @@
     // - then throw the first encountered exception
     ChangelogException firstException = null;
 
-    final ChangeNumberIndexer indexer = cnIndexer.get();
+    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
     if (indexer != null)
     {
       indexer.initiateShutdown();
-      cnIndexer.compareAndSet(indexer, null);
     }
+    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+    if (purger != null)
+    {
+      purger.initiateShutdown();
+    }
+
     try
     {
       shutdownCNIndexDB();
@@ -581,7 +595,7 @@
       {
         try
         {
-          cnIndexDB.clear(baseDN);
+          cnIndexDB.removeDomain(baseDN);
         }
         catch (ChangelogException e)
         {
@@ -607,7 +621,9 @@
         firstException = e;
       }
       else if (debugEnabled())
+      {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
     }
 
     if (firstException != null)
@@ -618,18 +634,24 @@
 
   /** {@inheritDoc} */
   @Override
-  public void setPurgeDelay(long delay)
+  public void setPurgeDelay(long purgeDelayInMillis)
   {
-    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
-    if (cnIndexDB != null)
+    this.purgeDelayInMillis = purgeDelayInMillis;
+    final ChangelogDBPurger purger;
+    if (purgeDelayInMillis > 0)
     {
-      cnIndexDB.setPurgeDelay(delay);
-    }
-    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
-    {
-      for (JEReplicaDB replicaDB : domainMap.values())
+      purger = new ChangelogDBPurger();
+      if (cnPurger.compareAndSet(null, purger))
       {
-        replicaDB.setPurgeDelay(delay);
+        purger.start();
+      } // otherwise a purger was already running
+    }
+    else
+    {
+      purger = cnPurger.getAndSet(null);
+      if (purger != null)
+      {
+        purger.initiateShutdown();
       }
     }
   }
@@ -639,19 +661,13 @@
   public void setComputeChangeNumber(boolean computeChangeNumber)
       throws ChangelogException
   {
-    final ChangeNumberIndexer indexer;
     if (computeChangeNumber)
     {
-      final ChangelogState changelogState = dbEnv.readChangelogState();
-      indexer = new ChangeNumberIndexer(this, changelogState);
-      if (cnIndexer.compareAndSet(null, indexer))
-      {
-        indexer.start();
-      }
+      startIndexer(dbEnv.readChangelogState());
     }
     else
     {
-      indexer = cnIndexer.getAndSet(null);
+      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
       if (indexer != null)
       {
         indexer.initiateShutdown();
@@ -659,48 +675,34 @@
     }
   }
 
+  private void startIndexer(final ChangelogState changelogState)
+  {
+    final ChangeNumberIndexer indexer =
+        new ChangeNumberIndexer(this, changelogState);
+    if (cnIndexer.compareAndSet(null, indexer))
+    {
+      indexer.start();
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
   public long getDomainLatestTrimDate(DN baseDN)
   {
-    long latest = 0;
-    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
-    {
-      if (latest == 0 || latest < replicaDB.getLatestTrimDate())
-      {
-        latest = replicaDB.getLatestTrimDate();
-      }
-    }
-    return latest;
+    return latestPurgeDate;
   }
 
   /** {@inheritDoc} */
   @Override
   public ChangeNumberIndexDB getChangeNumberIndexDB()
   {
-    return getChangeNumberIndexDB(true);
-  }
-
-  /**
-   * Returns the {@link ChangeNumberIndexDB} object.
-   *
-   * @param startTrimmingThread
-   *          whether the trimming thread should be started
-   * @return the {@link ChangeNumberIndexDB} object
-   */
-  ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
-  {
     synchronized (cnIndexDBLock)
     {
       if (cnIndexDB == null)
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
-          if (startTrimmingThread)
-          {
-            cnIndexDB.startTrimmingThread();
-          }
+          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
         }
         catch (Exception e)
         {
@@ -830,4 +832,83 @@
     }
     // TODO save this state in the changelogStateDB?
   }
+
+  /**
+   * The thread purging the changelogDB on a regular interval. Records are
+   * purged from the changelogDB is they are older than a delay specified in
+   * seconds. The purge process works in two steps:
+   * <ol>
+   * <li>first purge the changeNumberIndexDB and retrieve information to drive
+   * replicaDBs purging</li>
+   * <li>proceed to purge each replicaDBs based on the information collected
+   * when purging the changeNumberIndexDB</li>
+   * </ol>
+   */
+  private final class ChangelogDBPurger extends DirectoryThread
+  {
+
+    protected ChangelogDBPurger()
+    {
+      super("changelog DB purger");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void run()
+    {
+      // initialize CNIndexDB
+      getChangeNumberIndexDB();
+      while (!isShutdownInitiated())
+      {
+        try
+        {
+          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+          if (localCNIndexDB == null)
+          { // shutdown has been called
+            return;
+          }
+
+          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+          final MultiDomainServerState purgeUpToCookie =
+              localCNIndexDB.purgeUpTo(purgeTimestamp);
+          if (purgeUpToCookie == null)
+          { // this can happen when the change number index DB is empty
+            continue;
+          }
+
+          /*
+           * Drive purge of the replica DBs by the oldest non purged cookie in
+           * the change number index DB.
+           */
+          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
+              : domainToReplicaDBs.entrySet())
+          {
+            final DN baseDN = entry1.getKey();
+            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
+            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+            {
+              final Integer serverId = entry2.getKey();
+              final JEReplicaDB replicaDB = entry2.getValue();
+              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+            }
+          }
+
+          latestPurgeDate = purgeTimestamp;
+
+          // purge delay is specified in seconds so it should not be a problem
+          // to sleep for 500 millis
+          sleep(500);
+        }
+        catch (Exception e)
+        {
+          logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+              .get(stackTraceToSingleLineString(e)));
+          if (replicationServer != null)
+          {
+            replicationServer.shutdown();
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index e7309b0..bbde1d1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DN;
 import org.opends.server.types.InitializationException;
-import org.opends.server.util.TimeThread;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
    * <p>
    * This blocking queue is only used as a temporary placeholder so that the
    * write in the stable storage can be grouped for efficiency reason. Adding an
-   * update synchronously add the update to this list. A dedicated thread loops
-   * on {@link #flush()} and {@link #trim()}.
-   * <dl>
-   * <dt>flush()</dt>
-   * <dd>get a number of changes from the in memory list by block and write them
-   * to the db.</dd>
-   * <dt>trim()</dt>
-   * <dd>deletes from the DB a number of changes that are older than a certain
-   * date.</dd>
-   * </dl>
+   * update synchronously add the update to this list. A dedicated thread
+   * flushes this blocking queue.
    * <p>
    * Changes are not read back by replicationServer threads that are responsible
    * for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
   private DirectoryThread thread;
   /**
-   * Used to prevent race conditions between threads calling {@link #clear()}
-   * {@link #flush()} or {@link #trim()}. This can happen with the thread
-   * flushing the queue, on shutdown or on cursor opening, a thread calling
-   * clear(), etc.
+   * Used to prevent race conditions between threads calling {@link #flush()}.
+   * This can happen with the thread flushing the queue, or else on shutdown.
    */
   private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
-  private long latestTrimDate = 0;
-
-  /**
-   * The trim age in milliseconds. Changes record in the change DB that
-   * are older than this age are removed.
-   */
-  private long trimAge;
-
   /**
    * Creates a new ReplicaDB associated to a given LDAP server.
    *
@@ -166,15 +147,14 @@
     this.replicationServer = replicationServer;
     this.serverId = serverId;
     this.baseDN = baseDN;
-    trimAge = replicationServer.getTrimAge();
     queueMaxBytes = replicationServer.getQueueSize() * 200;
     queueSizeBytes = new Semaphore(queueMaxBytes);
     db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
     csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
     thread = new DirectoryThread(this, "Replication server RS("
-        + replicationServer.getServerId()
-        + ") changelog checkpointer for Replica DS(" + serverId
-        + ") for domain \"" + baseDN + "\"");
+            + replicationServer.getServerId()
+            + ") flusher thread for Replica DS(" + serverId
+            + ") for domain \"" + baseDN + "\"");
     thread.start();
 
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
   }
 
   /**
-   * Run method for this class.
-   * Periodically Flushes the ReplicationServerDomain cache from memory to the
-   * stable storage and trims the old updates.
+   * Flushes the replicaDB queue from memory to stable storage.
    */
   @Override
   public void run()
@@ -350,7 +328,6 @@
         try
         {
           flush();
-          trim();
         }
         catch (ChangelogException end)
         {
@@ -390,55 +367,26 @@
   }
 
   /**
-   * Retrieves the latest trim date.
-   * @return the latest trim date.
+   * Synchronously purge changes older than purgeCSN from this replicaDB.
+   *
+   * @param purgeCSN
+   *          The CSN up to which changes can be purged. No purging happens when
+   *          it is null.
+   * @throws ChangelogException
+   *           In case of database problem.
    */
-  public long getLatestTrimDate()
+  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
   {
-    return latestTrimDate;
-  }
-
-
-  /**
-   * Trim old changes from this replicationServer database.
-   * @throws ChangelogException In case of database problem.
-   */
-  private void trim() throws ChangelogException
-  {
-    if (trimAge == 0)
+    if (purgeCSN == null)
     {
       return;
     }
 
-    latestTrimDate = TimeThread.getTime() - trimAge;
-
-    CSN trimDate = new CSN(latestTrimDate, 0, 0);
-
-    // Find the last CSN before the trimDate, in the Database.
-    CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
-    if (lastBeforeTrimDate != null)
-    {
-      // If we found it, we want to stop trimming when reaching it.
-      trimDate = lastBeforeTrimDate;
-    }
-
     for (int i = 0; i < 100; i++)
     {
       /*
-       * Perform at least some trimming regardless of the flush backlog. Then
-       * continue trim iterations while the flush backlog is low (below the
-       * lowmark). Once the flush backlog increases, stop trimming and start
-       * flushing more eagerly.
-       */
-      if (i > 20 && isQueueAboveLowMark())
-      {
-        break;
-      }
-
-      /*
-       * the trim is done by group in order to save some CPU, IO bandwidth and
-       * DB caches: start the transaction then do a bunch of remove then
-       * commit.
+       * the purge is done by group in order to save some CPU, IO bandwidth and
+       * DB caches: start the transaction then do a bunch of remove then commit.
        */
       /*
        * Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
             return;
           }
 
-          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
+          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
           {
             cursor.delete();
           }
@@ -490,37 +438,31 @@
     }
   }
 
-  private boolean isQueueAboveLowMark()
-  {
-    final int lowMarkBytes = queueMaxBytes / 5;
-    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
-    return bytesUsed > lowMarkBytes;
-  }
-
   /**
    * Flush a number of updates from the memory list to the stable storage.
    * <p>
    * Flush is done by chunk sized to 500 messages, starting from the beginning
    * of the list.
-   *
+   * <p>
+   * @GuardedBy("flushLock")
    * @throws ChangelogException
    *           If a database problem happened
    */
-  public void flush() throws ChangelogException
+  private void flush() throws ChangelogException
   {
     try
     {
       synchronized (flushLock)
       {
-        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
-        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
         if (change == null)
         {
-          // nothing to persist, move on to the trim phase
+          // nothing to persist, check if shutdown was invoked
           return;
         }
 
         // Try to see if there are more changes and persist them all.
+        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
         changes.add(change);
         msgQueue.drainTo(changes);
 
@@ -604,15 +546,6 @@
   }
 
   /**
-   * Set the Purge delay for this db Handler.
-   * @param delay The purge delay in Milliseconds.
-   */
-  public void setPurgeDelay(long delay)
-  {
-    trimAge = delay;
-  }
-
-  /**
    * Clear the changes from this DB (from both memory cache and DB storage).
    * @throws ChangelogException When an exception occurs while removing the
    * changes from the DB.
@@ -636,13 +569,15 @@
   }
 
   /**
-   * Return the size of the msgQueue (the memory cache of the ReplicaDB).
+   * Return the number of records of this replicaDB.
+   * <p>
    * For test purpose.
-   * @return The memory queue size.
+   *
+   * @return The number of records of this replicaDB.
    */
-  int getQueueSize()
+  long getNumberRecords()
   {
-    return this.msgQueue.size();
+    return db.getNumberRecords();
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 919ad34..577fc39 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
@@ -73,15 +73,6 @@
       // we didn't find it in the db
       cursor = null;
     }
-
-    if (cursor == null)
-    {
-      // flush the queue into the db
-      replicaDB.flush();
-
-      // look again in the db
-      cursor = db.openReadCursor(startAfterCSN);
-    }
   }
 
   /** {@inheritDoc} */
@@ -96,15 +87,7 @@
   public boolean next() throws ChangelogException
   {
     final ReplServerDBCursor localCursor = cursor;
-    if (localCursor != null)
-    {
-      currentChange = localCursor.next();
-    }
-    else
-    {
-      currentChange = null;
-    }
-
+    currentChange = localCursor != null ? localCursor.next() : null;
 
     if (currentChange != null)
     {
@@ -114,12 +97,8 @@
     {
       synchronized (this)
       {
-        if (cursor != null)
-        {
-          cursor.close();
-          cursor = null;
-        }
-        replicaDB.flush();
+        closeCursor();
+        // previously exhausted cursor must be able to reinitialize themselves
         cursor = db.openReadCursor(lastNonNullCurrentCSN);
         currentChange = cursor.next();
         if (currentChange != null)
@@ -137,13 +116,17 @@
   {
     synchronized (this)
     {
-      if (cursor != null)
-      {
-        cursor.close();
-        cursor = null;
-      }
+      closeCursor();
       this.replicaDB = null;
-      this.db = null;
+    }
+  }
+
+  private void closeCursor()
+  {
+    if (cursor != null)
+    {
+      cursor.close();
+      cursor = null;
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index f755ec8..818fba5 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -943,4 +943,13 @@
     return db == null || !db.getEnvironment().isValid();
   }
 
+  /**
+   * Returns the number of records in this DB.
+   *
+   * @return the number of records in this DB.
+   */
+  long getNumberRecords()
+  {
+    return db.count();
+  }
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 17e37c4..05e27d7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -67,7 +67,10 @@
 import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
-import org.testng.annotations.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
 import static org.assertj.core.api.Assertions.*;
 import static org.opends.messages.ReplicationMessages.*;
@@ -168,7 +171,7 @@
   @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"})
   public void ECLReplicationServerTest() throws Exception
   {
-    getCNIndexDB().setPurgeDelay(0);
+    replicationServer.getChangelogDB().setPurgeDelay(0);
     // let's enable ECl manually now that we tested that ECl is not available
     ECLWorkflowElement wfe =
         (ECLWorkflowElement) DirectoryServer
@@ -189,7 +192,7 @@
   @Test(enabled=false, dependsOnMethods = { "ECLReplicationServerTest"})
   public void ECLReplicationServerTest1() throws Exception
   {
-    getCNIndexDB().setPurgeDelay(0);
+    replicationServer.getChangelogDB().setPurgeDelay(0);
     // Test with a mix of domains, a mix of DSes
     ECLTwoDomains();
   }
@@ -204,7 +207,7 @@
   @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"})
   public void ECLReplicationServerTest3() throws Exception
   {
-    getCNIndexDB().setPurgeDelay(0);
+    replicationServer.getChangelogDB().setPurgeDelay(0);
     // Write changes and read ECL from start
     ECLCompatWriteReadAllOps(1);
 
@@ -263,7 +266,7 @@
   @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
   public void ECLReplicationServerFullTest3() throws Exception
   {
-    getCNIndexDB().setPurgeDelay(0);
+    replicationServer.getChangelogDB().setPurgeDelay(0);
     // Test all types of ops.
     ECLAllOps(); // Do not clean the db for the next test
 
@@ -347,8 +350,7 @@
   @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
   public void ECLReplicationServerFullTest15() throws Exception
   {
-    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
-    cnIndexDB.setPurgeDelay(0);
+    replicationServer.getChangelogDB().setPurgeDelay(0);
     // Write 4 changes and read ECL from start
     ECLCompatWriteReadAllOps(1);
 
@@ -369,8 +371,9 @@
     ECLCompatTestLimitsAndAdd(1, 8, 4);
 
     // Test CNIndexDB is purged when replication change log is purged
-    cnIndexDB.setPurgeDelay(1);
-    cnIndexDB.trim(null);
+    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
+    cnIndexDB.purgeUpTo(Long.MAX_VALUE);
+    assertTrue(cnIndexDB.isEmpty());
     ECLPurgeCNIndexDBAfterChangelogClear();
 
     // Test first and last are updated
@@ -896,7 +899,7 @@
           null);
       cnt++;
     }
-    while (cnt < 100 // wait at most 1s
+    while (cnt < 300 // wait at most 3s
         && op.getSearchEntries().size() != expectedNbEntries);
     final List<SearchResultEntry> entries = op.getSearchEntries();
     assertThat(entries).hasSize(expectedNbEntries);
@@ -1951,16 +1954,6 @@
     clearChangelogDB(replicationServer);
   }
 
-  @AfterTest
-  public void setPurgeDelayToInitialValue() throws Exception
-  {
-    JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
-    if (cnIndexDB != null)
-    {
-      cnIndexDB.setPurgeDelay(1);
-    }
-  }
-
   /**
    * After the tests stop the replicationServer.
    */
@@ -2461,10 +2454,9 @@
     String tn = "ECLPurgeCNIndexDBAfterChangelogClear";
     debugInfo(tn, "Starting test\n\n");
 
-    JEChangeNumberIndexDB cnIndexDB =
-        (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
+    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
     assertEquals(cnIndexDB.count(), 8);
-    cnIndexDB.setPurgeDelay(1000);
+    replicationServer.getChangelogDB().setPurgeDelay(1000);
 
     clearChangelogDB(replicationServer);
 
@@ -2620,11 +2612,7 @@
 
   private JEChangeNumberIndexDB getCNIndexDB()
   {
-    if (replicationServer != null)
-    {
-      return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
-    }
-    return null;
+    return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
   }
 
   /**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 83c1180..8cb6ed4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -447,11 +447,6 @@
     {
       final ReplicatedUpdateMsg msg = msgs[i];
       final ChangeNumberIndexRecord record = allValues.get(i);
-      if (previousCookie.isEmpty())
-      {
-        // ugly hack to go round strange legacy code @see OPENDJ-67
-        previousCookie.replace(record.getBaseDN(), new ServerState());
-      }
       // check content in order
       String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
       assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 8f7717c..5fb4500 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -26,16 +26,22 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.opends.server.TestCaseUtils;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.types.DN;
 import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*;
@@ -48,9 +54,16 @@
 @SuppressWarnings("javadoc")
 public class JEChangeNumberIndexDBTest extends ReplicationTestCase
 {
-  private static final String value1 = "value1";
-  private static final String value2 = "value2";
-  private static final String value3 = "value3";
+  private final MultiDomainServerState previousCookie =
+      new MultiDomainServerState();
+  private final List<String> cookies = new ArrayList<String>();
+
+  @BeforeMethod
+  public void clearCookie()
+  {
+    previousCookie.clear();
+    cookies.clear();
+  }
 
   /**
    * This test makes basic operations of a JEChangeNumberIndexDB:
@@ -67,12 +80,11 @@
   void testTrim() throws Exception
   {
     ReplicationServer replicationServer = null;
-    JEChangeNumberIndexDB cnIndexDB = null;
     try
     {
       replicationServer = newReplicationServer();
-      cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
-      cnIndexDB.setPurgeDelay(0);
+      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+      changelogDB.setPurgeDelay(0); // disable purging
 
       // Prepare data to be stored in the db
       DN baseDN1 = DN.decode("o=baseDN1");
@@ -82,9 +94,10 @@
       CSN[] csns = newCSNs(1, 0, 3);
 
       // Add records
-      long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
-                 addRecord(cnIndexDB, value2, baseDN2, csns[1]);
-      long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
+      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+                 addRecord(cnIndexDB, baseDN2, csns[1]);
+      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
 
       // The ChangeNumber should not get purged
       final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
@@ -94,11 +107,11 @@
       DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
       try
       {
-        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1);
+        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
         assertTrue(cursor.next());
-        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2);
+        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
         assertTrue(cursor.next());
-        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3);
+        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
         assertFalse(cursor.next());
       }
       finally
@@ -106,14 +119,13 @@
         StaticUtils.close(cursor);
       }
 
-      // Now test that the trimming thread does its job => start it
-      cnIndexDB.setPurgeDelay(100);
-      cnIndexDB.startTrimmingThread();
-
-      // Check the db is cleared.
-      while (cnIndexDB.count() > 1)
+      // Now test that purging removes all changes bar the last one
+      changelogDB.setPurgeDelay(1);
+      int count = 0;
+      while (cnIndexDB.count() > 1 && count < 100)
       {
-        Thread.yield();
+        Thread.sleep(10);
+        count++;
       }
       assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
     }
@@ -123,10 +135,14 @@
     }
   }
 
-  private long addRecord(JEChangeNumberIndexDB cnIndexDB, String cookie, DN baseDN, CSN csn)
-      throws ChangelogException
+  private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
   {
-    return cnIndexDB.addRecord(new ChangeNumberIndexRecord(cookie, baseDN, csn));
+    final String cookie = previousCookie.toString();
+    cookies.add(cookie);
+    final long changeNumber = cnIndexDB.addRecord(
+        new ChangeNumberIndexRecord(cookie, baseDN, csn));
+    previousCookie.update(baseDN, csn);
+    return changeNumber;
   }
 
   private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
@@ -136,11 +152,11 @@
     assertEquals(record.getPreviousCookie(), cookie);
   }
 
-  private JEChangeNumberIndexDB getCNIndexDBNoTrimming(ReplicationServer rs) throws ChangelogException
+  private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
   {
     final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
     final JEChangeNumberIndexDB cnIndexDB =
-        (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(false);
+        (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
     assertTrue(cnIndexDB.isEmpty());
     return cnIndexDB;
   }
@@ -160,12 +176,11 @@
   void testClear() throws Exception
   {
     ReplicationServer replicationServer = null;
-    JEChangeNumberIndexDB cnIndexDB = null;
     try
     {
       replicationServer = newReplicationServer();
-      cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
-      cnIndexDB.setPurgeDelay(0);
+      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+      changelogDB.setPurgeDelay(0);
 
       // Prepare data to be stored in the db
 
@@ -176,9 +191,10 @@
       CSN[] csns = newCSNs(1, 0, 3);
 
       // Add records
-      long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
-      long cn2 = addRecord(cnIndexDB, value2, baseDN2, csns[1]);
-      long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
+      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+      long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
+      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
 
       // Checks
       assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
@@ -187,9 +203,9 @@
       assertEquals(cnIndexDB.count(), 3, "Db count");
       assertFalse(cnIndexDB.isEmpty());
 
-      assertEquals(getPreviousCookie(cnIndexDB, cn1), value1);
-      assertEquals(getPreviousCookie(cnIndexDB, cn2), value2);
-      assertEquals(getPreviousCookie(cnIndexDB, cn3), value3);
+      assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
+      assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
+      assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
 
       DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
       assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -200,7 +216,7 @@
       cursor = cnIndexDB.getCursorFrom(cn3);
       assertCursorReadsInOrder(cursor, cn3);
 
-      cnIndexDB.clear(null);
+      cnIndexDB.removeDomain(null);
       assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
 
       // Check the db is cleared.
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index e17878a..851afa2 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,7 +97,7 @@
 
       //--
       // Iterator tests with changes persisted
-      waitChangesArePersisted(replicaDB);
+      waitChangesArePersisted(replicaDB, 3);
 
       assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
       assertNotFound(replicaDB, csns[4]);
@@ -108,7 +108,7 @@
       //--
       // Cursor tests with changes persisted
       replicaDB.add(update4);
-      waitChangesArePersisted(replicaDB);
+      waitChangesArePersisted(replicaDB, 4);
 
       assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
       // Test cursor from existing CSN
@@ -116,7 +116,7 @@
       assertFoundInOrder(replicaDB, csns[3]);
       assertNotFound(replicaDB, csns[4]);
 
-      replicaDB.setPurgeDelay(1);
+      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
 
       int count = 0;
       boolean purgeSucceeded = false;
@@ -141,16 +141,27 @@
     }
   }
 
-  private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception
+  private void waitChangesArePersisted(JEReplicaDB replicaDB,
+      int nbRecordsInserted) throws Exception
   {
-    final int expected = 0;
+    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
+  }
+
+  private void waitChangesArePersisted(JEReplicaDB replicaDB,
+      int nbRecordsInserted, int counterWindow) throws Exception
+  {
+    // one counter record is inserted every time "counterWindow"
+    // records have been inserted
+    int expectedNbRecords =
+        nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
+
     int count = 0;
-    while (replicaDB.getQueueSize() != expected && count < 100)
+    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
     {
       Thread.sleep(10);
       count++;
     }
-    assertEquals(replicaDB.getQueueSize(), expected);
+    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
   }
 
   static CSN[] newCSNs(int serverId, long timestamp, int number)
@@ -204,8 +215,9 @@
       assertNull(cursor.getRecord());
       for (int i = 1; i < csns.length; i++)
       {
-        assertTrue(cursor.next());
-        assertEquals(cursor.getRecord().getCSN(), csns[i]);
+        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+        assertTrue(cursor.next(), msg);
+        assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
       }
       assertFalse(cursor.next());
       assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
@@ -274,11 +286,12 @@
   {
     ReplicationServer replicationServer = null;
     DBCursor<UpdateMsg> cursor = null;
+    JEReplicaDB replicaDB = null;
     try
     {
       TestCaseUtils.startServer();
       replicationServer = configureReplicationServer(100000, 10);
-      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+      replicaDB = newReplicaDB(replicationServer);
 
       CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
       for (int i = 0; i < 5; i++)
@@ -288,7 +301,7 @@
           replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
         }
       }
-      replicaDB.flush();
+      waitChangesArePersisted(replicaDB, 4);
 
       cursor = replicaDB.generateCursorFrom(csns[0]);
       assertTrue(cursor.next());
@@ -307,6 +320,8 @@
     finally
     {
       StaticUtils.close(cursor);
+      if (replicaDB != null)
+        replicaDB.shutdown();
       remove(replicationServer);
     }
   }
@@ -334,7 +349,7 @@
     testGetOldestNewestCSNs(4000, 1000);
   }
 
-  private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception
+  private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
   {
     String tn = "testDBCount("+max+","+counterWindow+")";
     debugInfo(tn, "Starting test");
@@ -363,7 +378,7 @@
         replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
         mySeqnum+=2;
       }
-      replicaDB.flush();
+      waitChangesArePersisted(replicaDB, max, counterWindow);
 
       assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
       assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -387,15 +402,13 @@
         replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
         mySeqnum+=2;
       }
-      replicaDB.flush();
+      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
 
       assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
       assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
 
       //
-
-      replicaDB.setPurgeDelay(100);
-      sleep(1000);
+      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
 
       String testcase = "AFTER PURGE (oldest, newest)=";
       debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());

--
Gitblit v1.10.0