mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.51.2014 819f74758a1c464bbf578e70ca8592cc8d101d75
OPENDJ-1177 (CR-3304) Re-implement changelog purging logic


After OPENDJ-1174, change number index DB is now populated eagerly (was populated lazily).
This means we can reap the benefits by changing how purging is done for changelogDB.
Previous purge process was driven by the replicaDBs: first purge replicaDBs then purge the change number index DB and it was causing lots of problems like stale CNIndexDB records for example.
New purge process is driven by the change number index DB: first purge change number index DB then purge the replicaDBs based on the oldest valid referenced record in the change number index DB.

Moved JEChangeNumberIndexDB purge thread to JEChangelogDB + made it responsible for purging the ChangeNumberIndexDB and all the ReplicaDBs.
In JEReplicaDB, thread is now only responsible for flushing, previously it was responsible for trimming and flushing which complicated the design and made it less efficient for both operations.



JEChangelogDB.java:
Added inner class ChangelogDBPurger.
Added fields purgeDelay, cnPurger and latestTrimDate.
Extracted method startIndexer()
Removed getChangeNumberIndexDB(boolean) + associated code.
Reimplemented getDomainLatestTrimDate() and setPurgeDelay().

JEChangeNumberIndexDB.java:
No longer implements Runnable + removed run().
Removed fields trimmingThread, trimAge and replicationServer.
In ctor, removed ReplicationServer parameter.
Removed startTrimmingThread(), setPurgeDelay() clear(DN, AtomicBoolean).
Renamed clear(DN) to removeDomain(DN).
Renamed trim(AtomicBoolean) to purgeUpTo(long).
Added purgeUpToCookie(ChangeNumberIndexRecord).

JEReplicaDB.java:
Removed fields latestTrimDate and trimAge.
In run(), no longer call trim().
Removed getLatestTrimDate(), isQueueAboveLowMark(), setPurgeDelay() and getQueueSize().
Renamed trim() to purgeUpTo(CSN).
Made flush() private + reduced wait time on polling the msgQueue to speed up shutdown.
Added getNumberRecords() for unit tests.

JEReplicaDBCursor.java:
Since flushing is now eager, removed all calls to JEReplicaDB.flush().
Extracted methods closeCursor().

ReplicationServer.java:
Renamed getTrimAge() to getPurgeDelay() for consistency.

ReplicationDB.java:
Added getNumberRecords().

ChangeNumberIndexer.java, ChangeNumberIndexerTest.java:
Removed hacks due to old purging code.

ExternalChangeLogTest.java:
Called ChangelogDB.setPurgeDelay() instead of ChangeNumberIndexDB.setPurgeDelay(0).
Consequence of the changes to JEReplicaDB.
Removed method setPurgeDelayToInitialValue() that was not doing anything (JEChangeNumberIndexDB was always null).
In getCNIndexDB(), removed useless null check.

JEChangeNumberIndexDBTest.java:
Removed constants value1, value2, value3 which are invalid cookies.
Replaced them with fields previousCookie and cookies.
Added clearCookie() method.
In addRecord(), removed cookie parameter and build the cookie from the new fields.
Consequence of the changes to JEChangeNumberIndexDB.

JEReplicaDBTest.java:
In waitChangesArePersisted(), used JEReplicaDB.getNumberRecords() instead of JEReplicaDB.getQueueSize() + added parameters describing the number of expected records + the counter record window.
Replaced all calls to JEReplicaDB.flush() with calls to waitChangesArePersisted().
Consequence of the changes to JEReplicaDB.
11 files modified
907 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 22 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 327 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 189 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 129 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java 45 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 9 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 44 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 5 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 84 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 47 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -722,7 +722,7 @@
   * @return  The time after which changes must be deleted from the
   *          persistent storage (in milliseconds).
   */
  public long getTrimAge()
  public long getPurgeDelay()
  {
    return this.config.getReplicationPurgeDelay() * 1000;
  }
@@ -780,7 +780,7 @@
    final long newPurgeDelay = config.getReplicationPurgeDelay();
    if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
    {
      this.changelogDB.setPurgeDelay(getTrimAge());
      this.changelogDB.setPurgeDelay(getPurgeDelay());
    }
    final boolean computeCN = config.isComputeChangeNumber();
    if (computeCN != oldConfig.isComputeChangeNumber())
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -487,13 +487,6 @@
          // OK, the oldest change is older than the medium consistency point
          // let's publish it to the CNIndexDB.
          // Next if statement is ugly but ensures the first change will not be
          // immediately trimmed from the CNIndexDB. Yuck!
          if (mediumConsistencyRUV.isEmpty())
          {
            mediumConsistencyRUV.replace(baseDN, new ServerState());
          }
          final String previousCookie = mediumConsistencyRUV.toString();
          final ChangeNumberIndexRecord record =
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
@@ -620,20 +613,21 @@
  }
  /**
   * Asks the current thread to clear its state.
   * Asks the current thread to clear its state and blocks until state is
   * cleared.
   * <p>
   * This method is only useful for unit tests.
   */
  public void clear()
  {
    doClear.set(true);
    synchronized (this)
    while (doClear.get() && !State.TERMINATED.equals(getState()))
    {
      notify();
    }
    while (doClear.get())
    {
      // wait until clear() has been done by thread
      // wait until clear() has been done by thread, always waking it up
      synchronized (this)
      {
        notify();
      }
      // ensures unit tests wait that this thread's state is cleaned up
      Thread.yield();
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
{
  /**
   * The tracer object for the debug logger.
@@ -74,7 +66,7 @@
  /** FIXME What is this field used for? */
  private volatile long oldestChangeNumber = NO_KEY;
  /**
   * The newest changenumber stored in the DB. It is used to avoid trimming the
   * The newest changenumber stored in the DB. It is used to avoid purging the
   * record with the newest changenumber. The newest record in the changenumber
   * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
   * then retrieved on server startup.
@@ -86,48 +78,25 @@
   * condition between:
   * <ol>
   * <li>this atomic long being incremented for a new record ('recordB')</li>
   * <li>the current newest record ('recordA') being trimmed from the DB</li>
   * <li>the current newest record ('recordA') being purged from the DB</li>
   * <li>'recordB' failing to be inserted in the DB</li>
   * </ol>
   */
  private final AtomicLong lastGeneratedChangeNumber;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /**
   * A dedicated thread loops trim().
   * <p>
   * trim() : deletes from the DB a number of changes that are older than a
   * certain date.
   */
  private DirectoryThread trimmingThread;
  /**
   * The trim age in milliseconds. Changes record in the change DB that are
   * older than this age are removed.
   * <p>
   * FIXME it never gets updated even when the replication server purge delay is
   * updated
   */
  private volatile long trimAge;
  private ReplicationServer replicationServer;
  /**
   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
   *
   * @param replicationServer The ReplicationServer that creates this instance.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @param dbEnv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   */
  public JEChangeNumberIndexDB(ReplicationServer replicationServer,
      ReplicationDbEnv dbenv) throws ChangelogException
  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
  {
    this.replicationServer = replicationServer;
    this.trimAge = replicationServer.getTrimAge();
    // DB initialization
    db = new DraftCNDB(dbenv);
    db = new DraftCNDB(dbEnv);
    final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
    final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
    oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  /**
   * Creates and starts the thread trimming the CNIndexDB.
   */
  public void startTrimmingThread()
  {
    trimmingThread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    trimmingThread.start();
  }
  private long getChangeNumber(ChangeNumberIndexRecord record)
      throws ChangelogException
  {
@@ -251,77 +210,82 @@
      notifyAll();
    }
    if (trimmingThread != null)
    {
      try
      {
        trimmingThread.join();
      }
      catch (InterruptedException ignored)
      {
        // Nothing can be done about it, just proceed
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   * Synchronously purges the change number index DB up to and excluding the
   * provided timestamp.
   *
   * @param purgeTimestamp
   *          the timestamp up to which purging must happen
   * @return the {@link MultiDomainServerState} object that drives purging the
   *         replicaDBs.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  @Override
  public void run()
  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
      throws ChangelogException
  {
    while (!shutdown.get())
    if (isEmpty())
    {
      try {
        trim(shutdown);
      return null;
    }
        synchronized (this)
        {
          if (!shutdown.get())
          {
            try
            {
              wait(1000);
            }
            catch (InterruptedException e)
            {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
      catch (Exception end)
    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      while (!mustShutdown(shutdown) && cursor.next())
      {
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(end)));
        if (replicationServer != null)
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (record.getChangeNumber() != oldestChangeNumber)
        {
          replicationServer.shutdown();
          oldestChangeNumber = record.getChangeNumber();
        }
        break;
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return getPurgeCookie(record);
        }
        if (record.getCSN().isOlderThan(purgeCSN))
        {
          cursor.delete();
        }
        else
        {
          // Current record is not old enough to purge.
          return getPurgeCookie(record);
        }
      }
      return null;
    }
    catch (ChangelogException e)
    {
      cursor.abort();
      throw e;
    }
    catch (Exception e)
    {
      cursor.abort();
      throw new ChangelogException(e);
    }
    finally
    {
      cursor.close();
    }
  }
  /**
   * Trim old changes from this database.
   *
   * @param shutdown
   *          AtomicBoolean telling whether the current run must be stopped
   * @throws ChangelogException
   *           In case of database problem.
   */
  public void trim(AtomicBoolean shutdown) throws ChangelogException
  private MultiDomainServerState getPurgeCookie(
      final ChangeNumberIndexRecord record) throws DirectoryException
  {
    if (trimAge == 0)
      return;
    clear(null, shutdown);
    // Do not include the record's CSN to avoid having it purged
    return new MultiDomainServerState(record.getPreviousCookie());
  }
  /**
@@ -334,127 +298,49 @@
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public void clear(DN baseDNToClear) throws ChangelogException
  {
    clear(baseDNToClear, null);
  }
  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
      throws ChangelogException
  public void removeDomain(DN baseDNToClear) throws ChangelogException
  {
    if (isEmpty())
    {
      return;
    }
    for (int i = 0; i < 100; i++)
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      if (mustShutdown(shutdown))
      boolean isOldestRecord = true;
      while (!mustShutdown(shutdown) && cursor.next())
      {
        return;
      }
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
        {
          // let's traverse the CNIndexDB
          if (mustShutdown(shutdown) || !cursor.next())
          {
            cursor.close();
            return;
          }
          final ChangeNumberIndexRecord record = cursor.currentRecord();
          if (record.getChangeNumber() != oldestChangeNumber)
          {
            oldestChangeNumber = record.getChangeNumber();
          }
          if (record.getChangeNumber() == newestChangeNumber)
          {
            // do not trim the newest record to avoid having the last generated
            // changenumber dropping back to 0 if the server restarts
            cursor.close();
            return;
          }
          if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
          {
            cursor.delete();
            continue;
          }
          final ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(record.getBaseDN());
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
            // CNIndexDB, thus it makes no sense to keep this record in the DB.
            cursor.delete();
            continue;
          }
          // FIXME there is an opportunity for a phantom record in the CNIndexDB
          // if the replicaDB gets purged after call to domain.getOldestState().
          final CSN csn = record.getCSN();
          final ServerState oldestState = domain.getOldestState();
          final CSN fcsn = oldestState.getCSN(csn.getServerId());
          if (csn.isOlderThan(fcsn))
          {
            // This change which has already been purged from the corresponding
            // replicaDB => purge it from CNIndexDB
            cursor.delete();
            continue;
          }
          ServerState csnVector;
          try
          {
            Map<DN, ServerState> csnStartStates =
                MultiDomainServerState.splitGenStateToServerStates(
                        record.getPreviousCookie());
            csnVector = csnStartStates.get(record.getBaseDN());
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
                  + csnVector + " -- StartState:" + oldestState);
          }
          catch(Exception e)
          {
            // We could not parse the MultiDomainServerState from the record
            // FIXME this is quite an aggressive delete()
            cursor.delete();
            continue;
          }
          if (csnVector == null
              || (csnVector.getCSN(csn.getServerId()) != null
                    && !csnVector.cover(oldestState)))
          {
            cursor.delete();
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
                  + "Not covering startState");
            continue;
          }
          oldestChangeNumber = record.getChangeNumber();
          cursor.close();
        }
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return;
        }
        cursor.close();
        if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
        {
          cursor.delete();
        }
        else
        {
          isOldestRecord = false;
        }
      }
      catch (ChangelogException e)
      {
        cursor.abort();
        throw e;
      }
      catch (Exception e)
      {
        cursor.abort();
        throw new ChangelogException(e);
      }
    }
    catch (ChangelogException e)
    {
      cursor.abort();
      throw e;
    }
    finally
    {
      cursor.close();
    }
  }
@@ -469,9 +355,7 @@
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public List<Attribute> getMonitorData()
    {
@@ -509,18 +393,14 @@
      return 0;
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public String getMonitorInstanceName()
    {
      return "ChangeNumber Index Database";
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public void initializeMonitorProvider(MonitorProviderCfg configuration)
                            throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -540,15 +418,6 @@
  }
  /**
   * Set the Purge delay for this db Handler.
   * @param delay The purge delay in Milliseconds.
   */
  public void setPurgeDelay(long delay)
  {
    trimAge = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   *
   * @throws ChangelogException
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,9 +36,11 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -46,6 +49,7 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
@@ -87,7 +91,7 @@
   * The handler of the changelog database, the database stores the relation
   * between a change number and the associated cookie.
   * <p>
   * Guarded by cnIndexDBLock
   * @GuardedBy("cnIndexDBLock")
   */
  private JEChangeNumberIndexDB cnIndexDB;
  private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -96,6 +100,15 @@
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
  /**
   * The purge delay (in milliseconds). Records in the changelog DB that are
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger =
      new AtomicReference<ChangelogDBPurger>();
  private volatile long latestPurgeDate;
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
@@ -312,13 +325,9 @@
      initializeChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
        final ChangeNumberIndexer indexer =
            new ChangeNumberIndexer(this, changelogState);
        if (cnIndexer.compareAndSet(null, indexer))
        {
          indexer.start();
        }
        startIndexer(changelogState);
      }
      setPurgeDelay(replicationServer.getPurgeDelay());
    }
    catch (ChangelogException e)
    {
@@ -374,12 +383,17 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
    if (indexer != null)
    {
      indexer.initiateShutdown();
      cnIndexer.compareAndSet(indexer, null);
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
    }
    try
    {
      shutdownCNIndexDB();
@@ -581,7 +595,7 @@
      {
        try
        {
          cnIndexDB.clear(baseDN);
          cnIndexDB.removeDomain(baseDN);
        }
        catch (ChangelogException e)
        {
@@ -607,7 +621,9 @@
        firstException = e;
      }
      else if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    if (firstException != null)
@@ -618,18 +634,24 @@
  /** {@inheritDoc} */
  @Override
  public void setPurgeDelay(long delay)
  public void setPurgeDelay(long purgeDelayInMillis)
  {
    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
    if (cnIndexDB != null)
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
    if (purgeDelayInMillis > 0)
    {
      cnIndexDB.setPurgeDelay(delay);
    }
    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
    {
      for (JEReplicaDB replicaDB : domainMap.values())
      purger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, purger))
      {
        replicaDB.setPurgeDelay(delay);
        purger.start();
      } // otherwise a purger was already running
    }
    else
    {
      purger = cnPurger.getAndSet(null);
      if (purger != null)
      {
        purger.initiateShutdown();
      }
    }
  }
@@ -639,19 +661,13 @@
  public void setComputeChangeNumber(boolean computeChangeNumber)
      throws ChangelogException
  {
    final ChangeNumberIndexer indexer;
    if (computeChangeNumber)
    {
      final ChangelogState changelogState = dbEnv.readChangelogState();
      indexer = new ChangeNumberIndexer(this, changelogState);
      if (cnIndexer.compareAndSet(null, indexer))
      {
        indexer.start();
      }
      startIndexer(dbEnv.readChangelogState());
    }
    else
    {
      indexer = cnIndexer.getAndSet(null);
      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
      if (indexer != null)
      {
        indexer.initiateShutdown();
@@ -659,48 +675,34 @@
    }
  }
  private void startIndexer(final ChangelogState changelogState)
  {
    final ChangeNumberIndexer indexer =
        new ChangeNumberIndexer(this, changelogState);
    if (cnIndexer.compareAndSet(null, indexer))
    {
      indexer.start();
    }
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
  {
    long latest = 0;
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      if (latest == 0 || latest < replicaDB.getLatestTrimDate())
      {
        latest = replicaDB.getLatestTrimDate();
      }
    }
    return latest;
    return latestPurgeDate;
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    return getChangeNumberIndexDB(true);
  }
  /**
   * Returns the {@link ChangeNumberIndexDB} object.
   *
   * @param startTrimmingThread
   *          whether the trimming thread should be started
   * @return the {@link ChangeNumberIndexDB} object
   */
  ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB == null)
      {
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
          if (startTrimmingThread)
          {
            cnIndexDB.startTrimmingThread();
          }
          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
        }
        catch (Exception e)
        {
@@ -830,4 +832,83 @@
    }
    // TODO save this state in the changelogStateDB?
  }
  /**
   * The thread purging the changelogDB on a regular interval. Records are
   * purged from the changelogDB is they are older than a delay specified in
   * seconds. The purge process works in two steps:
   * <ol>
   * <li>first purge the changeNumberIndexDB and retrieve information to drive
   * replicaDBs purging</li>
   * <li>proceed to purge each replicaDBs based on the information collected
   * when purging the changeNumberIndexDB</li>
   * </ol>
   */
  private final class ChangelogDBPurger extends DirectoryThread
  {
    protected ChangelogDBPurger()
    {
      super("changelog DB purger");
    }
    /** {@inheritDoc} */
    @Override
    public void run()
    {
      // initialize CNIndexDB
      getChangeNumberIndexDB();
      while (!isShutdownInitiated())
      {
        try
        {
          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
          if (localCNIndexDB == null)
          { // shutdown has been called
            return;
          }
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final MultiDomainServerState purgeUpToCookie =
              localCNIndexDB.purgeUpTo(purgeTimestamp);
          if (purgeUpToCookie == null)
          { // this can happen when the change number index DB is empty
            continue;
          }
          /*
           * Drive purge of the replica DBs by the oldest non purged cookie in
           * the change number index DB.
           */
          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
              : domainToReplicaDBs.entrySet())
          {
            final DN baseDN = entry1.getKey();
            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
            {
              final Integer serverId = entry2.getKey();
              final JEReplicaDB replicaDB = entry2.getValue();
              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
            }
          }
          latestPurgeDate = purgeTimestamp;
          // purge delay is specified in seconds so it should not be a problem
          // to sleep for 500 millis
          sleep(500);
        }
        catch (Exception e)
        {
          logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
              .get(stackTraceToSingleLineString(e)));
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
        }
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
   * <p>
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread loops
   * on {@link #flush()} and {@link #trim()}.
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
   * to the db.</dd>
   * <dt>trim()</dt>
   * <dd>deletes from the DB a number of changes that are older than a certain
   * date.</dd>
   * </dl>
   * update synchronously add the update to this list. A dedicated thread
   * flushes this blocking queue.
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #clear()}
   * {@link #flush()} or {@link #trim()}. This can happen with the thread
   * flushing the queue, on shutdown or on cursor opening, a thread calling
   * clear(), etc.
   * Used to prevent race conditions between threads calling {@link #flush()}.
   * This can happen with the thread flushing the queue, or else on shutdown.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  private long latestTrimDate = 0;
  /**
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   */
  private long trimAge;
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
@@ -166,15 +147,14 @@
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + serverId
        + ") for domain \"" + baseDN + "\"");
            + replicationServer.getServerId()
            + ") flusher thread for Replica DS(" + serverId
            + ") for domain \"" + baseDN + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
  }
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   * Flushes the replicaDB queue from memory to stable storage.
   */
  @Override
  public void run()
@@ -350,7 +328,6 @@
        try
        {
          flush();
          trim();
        }
        catch (ChangelogException end)
        {
@@ -390,55 +367,26 @@
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
   * Synchronously purge changes older than purgeCSN from this replicaDB.
   *
   * @param purgeCSN
   *          The CSN up to which changes can be purged. No purging happens when
   *          it is null.
   * @throws ChangelogException
   *           In case of database problem.
   */
  public long getLatestTrimDate()
  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
  {
    return latestTrimDate;
  }
  /**
   * Trim old changes from this replicationServer database.
   * @throws ChangelogException In case of database problem.
   */
  private void trim() throws ChangelogException
  {
    if (trimAge == 0)
    if (purgeCSN == null)
    {
      return;
    }
    latestTrimDate = TimeThread.getTime() - trimAge;
    CSN trimDate = new CSN(latestTrimDate, 0, 0);
    // Find the last CSN before the trimDate, in the Database.
    CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    for (int i = 0; i < 100; i++)
    {
      /*
       * Perform at least some trimming regardless of the flush backlog. Then
       * continue trim iterations while the flush backlog is low (below the
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && isQueueAboveLowMark())
      {
        break;
      }
      /*
       * the trim is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then
       * commit.
       * the purge is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then commit.
       */
      /*
       * Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
            return;
          }
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
          {
            cursor.delete();
          }
@@ -490,37 +438,31 @@
    }
  }
  private boolean isQueueAboveLowMark()
  {
    final int lowMarkBytes = queueMaxBytes / 5;
    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
    return bytesUsed > lowMarkBytes;
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   *
   * <p>
   * @GuardedBy("flushLock")
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void flush() throws ChangelogException
  private void flush() throws ChangelogException
  {
    try
    {
      synchronized (flushLock)
      {
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, move on to the trim phase
          // nothing to persist, check if shutdown was invoked
          return;
        }
        // Try to see if there are more changes and persist them all.
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        changes.add(change);
        msgQueue.drainTo(changes);
@@ -604,15 +546,6 @@
  }
  /**
   * Set the Purge delay for this db Handler.
   * @param delay The purge delay in Milliseconds.
   */
  public void setPurgeDelay(long delay)
  {
    trimAge = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   * @throws ChangelogException When an exception occurs while removing the
   * changes from the DB.
@@ -636,13 +569,15 @@
  }
  /**
   * Return the size of the msgQueue (the memory cache of the ReplicaDB).
   * Return the number of records of this replicaDB.
   * <p>
   * For test purpose.
   * @return The memory queue size.
   *
   * @return The number of records of this replicaDB.
   */
  int getQueueSize()
  long getNumberRecords()
  {
    return this.msgQueue.size();
    return db.getNumberRecords();
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -73,15 +73,6 @@
      // we didn't find it in the db
      cursor = null;
    }
    if (cursor == null)
    {
      // flush the queue into the db
      replicaDB.flush();
      // look again in the db
      cursor = db.openReadCursor(startAfterCSN);
    }
  }
  /** {@inheritDoc} */
@@ -96,15 +87,7 @@
  public boolean next() throws ChangelogException
  {
    final ReplServerDBCursor localCursor = cursor;
    if (localCursor != null)
    {
      currentChange = localCursor.next();
    }
    else
    {
      currentChange = null;
    }
    currentChange = localCursor != null ? localCursor.next() : null;
    if (currentChange != null)
    {
@@ -114,12 +97,8 @@
    {
      synchronized (this)
      {
        if (cursor != null)
        {
          cursor.close();
          cursor = null;
        }
        replicaDB.flush();
        closeCursor();
        // previously exhausted cursor must be able to reinitialize themselves
        cursor = db.openReadCursor(lastNonNullCurrentCSN);
        currentChange = cursor.next();
        if (currentChange != null)
@@ -137,13 +116,17 @@
  {
    synchronized (this)
    {
      if (cursor != null)
      {
        cursor.close();
        cursor = null;
      }
      closeCursor();
      this.replicaDB = null;
      this.db = null;
    }
  }
  private void closeCursor()
  {
    if (cursor != null)
    {
      cursor.close();
      cursor = null;
    }
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -943,4 +943,13 @@
    return db == null || !db.getEnvironment().isValid();
  }
  /**
   * Returns the number of records in this DB.
   *
   * @return the number of records in this DB.
   */
  long getNumberRecords()
  {
    return db.count();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -67,7 +67,10 @@
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.*;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -168,7 +171,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"})
  public void ECLReplicationServerTest() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // let's enable ECl manually now that we tested that ECl is not available
    ECLWorkflowElement wfe =
        (ECLWorkflowElement) DirectoryServer
@@ -189,7 +192,7 @@
  @Test(enabled=false, dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerTest1() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();
  }
@@ -204,7 +207,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerTest3() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
@@ -263,7 +266,7 @@
  @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest3() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Test all types of ops.
    ECLAllOps(); // Do not clean the db for the next test
@@ -347,8 +350,7 @@
  @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest15() throws Exception
  {
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    cnIndexDB.setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write 4 changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
@@ -369,8 +371,9 @@
    ECLCompatTestLimitsAndAdd(1, 8, 4);
    // Test CNIndexDB is purged when replication change log is purged
    cnIndexDB.setPurgeDelay(1);
    cnIndexDB.trim(null);
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    cnIndexDB.purgeUpTo(Long.MAX_VALUE);
    assertTrue(cnIndexDB.isEmpty());
    ECLPurgeCNIndexDBAfterChangelogClear();
    // Test first and last are updated
@@ -896,7 +899,7 @@
          null);
      cnt++;
    }
    while (cnt < 100 // wait at most 1s
    while (cnt < 300 // wait at most 3s
        && op.getSearchEntries().size() != expectedNbEntries);
    final List<SearchResultEntry> entries = op.getSearchEntries();
    assertThat(entries).hasSize(expectedNbEntries);
@@ -1951,16 +1954,6 @@
    clearChangelogDB(replicationServer);
  }
  @AfterTest
  public void setPurgeDelayToInitialValue() throws Exception
  {
    JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    if (cnIndexDB != null)
    {
      cnIndexDB.setPurgeDelay(1);
    }
  }
  /**
   * After the tests stop the replicationServer.
   */
@@ -2461,10 +2454,9 @@
    String tn = "ECLPurgeCNIndexDBAfterChangelogClear";
    debugInfo(tn, "Starting test\n\n");
    JEChangeNumberIndexDB cnIndexDB =
        (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    assertEquals(cnIndexDB.count(), 8);
    cnIndexDB.setPurgeDelay(1000);
    replicationServer.getChangelogDB().setPurgeDelay(1000);
    clearChangelogDB(replicationServer);
@@ -2620,11 +2612,7 @@
  private JEChangeNumberIndexDB getCNIndexDB()
  {
    if (replicationServer != null)
    {
      return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
    }
    return null;
    return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -447,11 +447,6 @@
    {
      final ReplicatedUpdateMsg msg = msgs[i];
      final ChangeNumberIndexRecord record = allValues.get(i);
      if (previousCookie.isEmpty())
      {
        // ugly hack to go round strange legacy code @see OPENDJ-67
        previousCookie.replace(record.getBaseDN(), new ServerState());
      }
      // check content in order
      String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -26,16 +26,22 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*;
@@ -48,9 +54,16 @@
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
{
  private static final String value1 = "value1";
  private static final String value2 = "value2";
  private static final String value3 = "value3";
  private final MultiDomainServerState previousCookie =
      new MultiDomainServerState();
  private final List<String> cookies = new ArrayList<String>();
  @BeforeMethod
  public void clearCookie()
  {
    previousCookie.clear();
    cookies.clear();
  }
  /**
   * This test makes basic operations of a JEChangeNumberIndexDB:
@@ -67,12 +80,11 @@
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEChangeNumberIndexDB cnIndexDB = null;
    try
    {
      replicationServer = newReplicationServer();
      cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0); // disable purging
      // Prepare data to be stored in the db
      DN baseDN1 = DN.decode("o=baseDN1");
@@ -82,9 +94,10 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add records
      long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
                 addRecord(cnIndexDB, value2, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
                 addRecord(cnIndexDB, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
      // The ChangeNumber should not get purged
      final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
@@ -94,11 +107,11 @@
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
      try
      {
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1);
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2);
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3);
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
        assertFalse(cursor.next());
      }
      finally
@@ -106,14 +119,13 @@
        StaticUtils.close(cursor);
      }
      // Now test that the trimming thread does its job => start it
      cnIndexDB.setPurgeDelay(100);
      cnIndexDB.startTrimmingThread();
      // Check the db is cleared.
      while (cnIndexDB.count() > 1)
      // Now test that purging removes all changes bar the last one
      changelogDB.setPurgeDelay(1);
      int count = 0;
      while (cnIndexDB.count() > 1 && count < 100)
      {
        Thread.yield();
        Thread.sleep(10);
        count++;
      }
      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
    }
@@ -123,10 +135,14 @@
    }
  }
  private long addRecord(JEChangeNumberIndexDB cnIndexDB, String cookie, DN baseDN, CSN csn)
      throws ChangelogException
  private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
  {
    return cnIndexDB.addRecord(new ChangeNumberIndexRecord(cookie, baseDN, csn));
    final String cookie = previousCookie.toString();
    cookies.add(cookie);
    final long changeNumber = cnIndexDB.addRecord(
        new ChangeNumberIndexRecord(cookie, baseDN, csn));
    previousCookie.update(baseDN, csn);
    return changeNumber;
  }
  private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
@@ -136,11 +152,11 @@
    assertEquals(record.getPreviousCookie(), cookie);
  }
  private JEChangeNumberIndexDB getCNIndexDBNoTrimming(ReplicationServer rs) throws ChangelogException
  private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
  {
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    final JEChangeNumberIndexDB cnIndexDB =
        (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(false);
        (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
    assertTrue(cnIndexDB.isEmpty());
    return cnIndexDB;
  }
@@ -160,12 +176,11 @@
  void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEChangeNumberIndexDB cnIndexDB = null;
    try
    {
      replicationServer = newReplicationServer();
      cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0);
      // Prepare data to be stored in the db
@@ -176,9 +191,10 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add records
      long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
      long cn2 = addRecord(cnIndexDB, value2, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
      long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
      // Checks
      assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
@@ -187,9 +203,9 @@
      assertEquals(cnIndexDB.count(), 3, "Db count");
      assertFalse(cnIndexDB.isEmpty());
      assertEquals(getPreviousCookie(cnIndexDB, cn1), value1);
      assertEquals(getPreviousCookie(cnIndexDB, cn2), value2);
      assertEquals(getPreviousCookie(cnIndexDB, cn3), value3);
      assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
      assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
      assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -200,7 +216,7 @@
      cursor = cnIndexDB.getCursorFrom(cn3);
      assertCursorReadsInOrder(cursor, cn3);
      cnIndexDB.clear(null);
      cnIndexDB.removeDomain(null);
      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
      // Check the db is cleared.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,7 +97,7 @@
      //--
      // Iterator tests with changes persisted
      waitChangesArePersisted(replicaDB);
      waitChangesArePersisted(replicaDB, 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
@@ -108,7 +108,7 @@
      //--
      // Cursor tests with changes persisted
      replicaDB.add(update4);
      waitChangesArePersisted(replicaDB);
      waitChangesArePersisted(replicaDB, 4);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      // Test cursor from existing CSN
@@ -116,7 +116,7 @@
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4]);
      replicaDB.setPurgeDelay(1);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      int count = 0;
      boolean purgeSucceeded = false;
@@ -141,16 +141,27 @@
    }
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted) throws Exception
  {
    final int expected = 0;
    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted, int counterWindow) throws Exception
  {
    // one counter record is inserted every time "counterWindow"
    // records have been inserted
    int expectedNbRecords =
        nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
    int count = 0;
    while (replicaDB.getQueueSize() != expected && count < 100)
    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
    {
      Thread.sleep(10);
      count++;
    }
    assertEquals(replicaDB.getQueueSize(), expected);
    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
@@ -204,8 +215,9 @@
      assertNull(cursor.getRecord());
      for (int i = 1; i < csns.length; i++)
      {
        assertTrue(cursor.next());
        assertEquals(cursor.getRecord().getCSN(), csns[i]);
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        assertTrue(cursor.next(), msg);
        assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
      }
      assertFalse(cursor.next());
      assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
@@ -274,11 +286,12 @@
  {
    ReplicationServer replicationServer = null;
    DBCursor<UpdateMsg> cursor = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
      for (int i = 0; i < 5; i++)
@@ -288,7 +301,7 @@
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
      }
      replicaDB.flush();
      waitChangesArePersisted(replicaDB, 4);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
@@ -307,6 +320,8 @@
    finally
    {
      StaticUtils.close(cursor);
      if (replicaDB != null)
        replicaDB.shutdown();
      remove(replicationServer);
    }
  }
@@ -334,7 +349,7 @@
    testGetOldestNewestCSNs(4000, 1000);
  }
  private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception
  private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
  {
    String tn = "testDBCount("+max+","+counterWindow+")";
    debugInfo(tn, "Starting test");
@@ -363,7 +378,7 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      replicaDB.flush();
      waitChangesArePersisted(replicaDB, max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -387,15 +402,13 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      replicaDB.flush();
      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      //
      replicaDB.setPurgeDelay(100);
      sleep(1000);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      String testcase = "AFTER PURGE (oldest, newest)=";
      debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());