mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.51.2014 819f74758a1c464bbf578e70ca8592cc8d101d75
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -722,7 +722,7 @@
   * @return  The time after which changes must be deleted from the
   *          persistent storage (in milliseconds).
   */
  public long getTrimAge()
  public long getPurgeDelay()
  {
    return this.config.getReplicationPurgeDelay() * 1000;
  }
@@ -780,7 +780,7 @@
    final long newPurgeDelay = config.getReplicationPurgeDelay();
    if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
    {
      this.changelogDB.setPurgeDelay(getTrimAge());
      this.changelogDB.setPurgeDelay(getPurgeDelay());
    }
    final boolean computeCN = config.isComputeChangeNumber();
    if (computeCN != oldConfig.isComputeChangeNumber())
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -487,13 +487,6 @@
          // OK, the oldest change is older than the medium consistency point
          // let's publish it to the CNIndexDB.
          // Next if statement is ugly but ensures the first change will not be
          // immediately trimmed from the CNIndexDB. Yuck!
          if (mediumConsistencyRUV.isEmpty())
          {
            mediumConsistencyRUV.replace(baseDN, new ServerState());
          }
          final String previousCookie = mediumConsistencyRUV.toString();
          final ChangeNumberIndexRecord record =
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
@@ -620,20 +613,21 @@
  }
  /**
   * Asks the current thread to clear its state.
   * Asks the current thread to clear its state and blocks until state is
   * cleared.
   * <p>
   * This method is only useful for unit tests.
   */
  public void clear()
  {
    doClear.set(true);
    synchronized (this)
    while (doClear.get() && !State.TERMINATED.equals(getState()))
    {
      notify();
    }
    while (doClear.get())
    {
      // wait until clear() has been done by thread
      // wait until clear() has been done by thread, always waking it up
      synchronized (this)
      {
        notify();
      }
      // ensures unit tests wait that this thread's state is cleaned up
      Thread.yield();
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
{
  /**
   * The tracer object for the debug logger.
@@ -74,7 +66,7 @@
  /** FIXME What is this field used for? */
  private volatile long oldestChangeNumber = NO_KEY;
  /**
   * The newest changenumber stored in the DB. It is used to avoid trimming the
   * The newest changenumber stored in the DB. It is used to avoid purging the
   * record with the newest changenumber. The newest record in the changenumber
   * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
   * then retrieved on server startup.
@@ -86,48 +78,25 @@
   * condition between:
   * <ol>
   * <li>this atomic long being incremented for a new record ('recordB')</li>
   * <li>the current newest record ('recordA') being trimmed from the DB</li>
   * <li>the current newest record ('recordA') being purged from the DB</li>
   * <li>'recordB' failing to be inserted in the DB</li>
   * </ol>
   */
  private final AtomicLong lastGeneratedChangeNumber;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /**
   * A dedicated thread loops trim().
   * <p>
   * trim() : deletes from the DB a number of changes that are older than a
   * certain date.
   */
  private DirectoryThread trimmingThread;
  /**
   * The trim age in milliseconds. Changes record in the change DB that are
   * older than this age are removed.
   * <p>
   * FIXME it never gets updated even when the replication server purge delay is
   * updated
   */
  private volatile long trimAge;
  private ReplicationServer replicationServer;
  /**
   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
   *
   * @param replicationServer The ReplicationServer that creates this instance.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @param dbEnv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   */
  public JEChangeNumberIndexDB(ReplicationServer replicationServer,
      ReplicationDbEnv dbenv) throws ChangelogException
  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
  {
    this.replicationServer = replicationServer;
    this.trimAge = replicationServer.getTrimAge();
    // DB initialization
    db = new DraftCNDB(dbenv);
    db = new DraftCNDB(dbEnv);
    final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
    final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
    oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  /**
   * Creates and starts the thread trimming the CNIndexDB.
   */
  public void startTrimmingThread()
  {
    trimmingThread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    trimmingThread.start();
  }
  private long getChangeNumber(ChangeNumberIndexRecord record)
      throws ChangelogException
  {
@@ -251,77 +210,82 @@
      notifyAll();
    }
    if (trimmingThread != null)
    {
      try
      {
        trimmingThread.join();
      }
      catch (InterruptedException ignored)
      {
        // Nothing can be done about it, just proceed
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   * Synchronously purges the change number index DB up to and excluding the
   * provided timestamp.
   *
   * @param purgeTimestamp
   *          the timestamp up to which purging must happen
   * @return the {@link MultiDomainServerState} object that drives purging the
   *         replicaDBs.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  @Override
  public void run()
  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
      throws ChangelogException
  {
    while (!shutdown.get())
    if (isEmpty())
    {
      try {
        trim(shutdown);
      return null;
    }
        synchronized (this)
        {
          if (!shutdown.get())
          {
            try
            {
              wait(1000);
            }
            catch (InterruptedException e)
            {
              Thread.currentThread().interrupt();
            }
          }
        }
      }
      catch (Exception end)
    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      while (!mustShutdown(shutdown) && cursor.next())
      {
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(end)));
        if (replicationServer != null)
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (record.getChangeNumber() != oldestChangeNumber)
        {
          replicationServer.shutdown();
          oldestChangeNumber = record.getChangeNumber();
        }
        break;
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return getPurgeCookie(record);
        }
        if (record.getCSN().isOlderThan(purgeCSN))
        {
          cursor.delete();
        }
        else
        {
          // Current record is not old enough to purge.
          return getPurgeCookie(record);
        }
      }
      return null;
    }
    catch (ChangelogException e)
    {
      cursor.abort();
      throw e;
    }
    catch (Exception e)
    {
      cursor.abort();
      throw new ChangelogException(e);
    }
    finally
    {
      cursor.close();
    }
  }
  /**
   * Trim old changes from this database.
   *
   * @param shutdown
   *          AtomicBoolean telling whether the current run must be stopped
   * @throws ChangelogException
   *           In case of database problem.
   */
  public void trim(AtomicBoolean shutdown) throws ChangelogException
  private MultiDomainServerState getPurgeCookie(
      final ChangeNumberIndexRecord record) throws DirectoryException
  {
    if (trimAge == 0)
      return;
    clear(null, shutdown);
    // Do not include the record's CSN to avoid having it purged
    return new MultiDomainServerState(record.getPreviousCookie());
  }
  /**
@@ -334,127 +298,49 @@
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public void clear(DN baseDNToClear) throws ChangelogException
  {
    clear(baseDNToClear, null);
  }
  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
      throws ChangelogException
  public void removeDomain(DN baseDNToClear) throws ChangelogException
  {
    if (isEmpty())
    {
      return;
    }
    for (int i = 0; i < 100; i++)
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      if (mustShutdown(shutdown))
      boolean isOldestRecord = true;
      while (!mustShutdown(shutdown) && cursor.next())
      {
        return;
      }
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
        {
          // let's traverse the CNIndexDB
          if (mustShutdown(shutdown) || !cursor.next())
          {
            cursor.close();
            return;
          }
          final ChangeNumberIndexRecord record = cursor.currentRecord();
          if (record.getChangeNumber() != oldestChangeNumber)
          {
            oldestChangeNumber = record.getChangeNumber();
          }
          if (record.getChangeNumber() == newestChangeNumber)
          {
            // do not trim the newest record to avoid having the last generated
            // changenumber dropping back to 0 if the server restarts
            cursor.close();
            return;
          }
          if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
          {
            cursor.delete();
            continue;
          }
          final ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(record.getBaseDN());
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
            // CNIndexDB, thus it makes no sense to keep this record in the DB.
            cursor.delete();
            continue;
          }
          // FIXME there is an opportunity for a phantom record in the CNIndexDB
          // if the replicaDB gets purged after call to domain.getOldestState().
          final CSN csn = record.getCSN();
          final ServerState oldestState = domain.getOldestState();
          final CSN fcsn = oldestState.getCSN(csn.getServerId());
          if (csn.isOlderThan(fcsn))
          {
            // This change which has already been purged from the corresponding
            // replicaDB => purge it from CNIndexDB
            cursor.delete();
            continue;
          }
          ServerState csnVector;
          try
          {
            Map<DN, ServerState> csnStartStates =
                MultiDomainServerState.splitGenStateToServerStates(
                        record.getPreviousCookie());
            csnVector = csnStartStates.get(record.getBaseDN());
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
                  + csnVector + " -- StartState:" + oldestState);
          }
          catch(Exception e)
          {
            // We could not parse the MultiDomainServerState from the record
            // FIXME this is quite an aggressive delete()
            cursor.delete();
            continue;
          }
          if (csnVector == null
              || (csnVector.getCSN(csn.getServerId()) != null
                    && !csnVector.cover(oldestState)))
          {
            cursor.delete();
            if (debugEnabled())
              TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
                  + "Not covering startState");
            continue;
          }
          oldestChangeNumber = record.getChangeNumber();
          cursor.close();
        }
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return;
        }
        cursor.close();
        if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
        {
          cursor.delete();
        }
        else
        {
          isOldestRecord = false;
        }
      }
      catch (ChangelogException e)
      {
        cursor.abort();
        throw e;
      }
      catch (Exception e)
      {
        cursor.abort();
        throw new ChangelogException(e);
      }
    }
    catch (ChangelogException e)
    {
      cursor.abort();
      throw e;
    }
    finally
    {
      cursor.close();
    }
  }
@@ -469,9 +355,7 @@
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public List<Attribute> getMonitorData()
    {
@@ -509,18 +393,14 @@
      return 0;
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public String getMonitorInstanceName()
    {
      return "ChangeNumber Index Database";
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public void initializeMonitorProvider(MonitorProviderCfg configuration)
                            throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -540,15 +418,6 @@
  }
  /**
   * Set the Purge delay for this db Handler.
   * @param delay The purge delay in Milliseconds.
   */
  public void setPurgeDelay(long delay)
  {
    trimAge = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   *
   * @throws ChangelogException
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,9 +36,11 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -46,6 +49,7 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
@@ -87,7 +91,7 @@
   * The handler of the changelog database, the database stores the relation
   * between a change number and the associated cookie.
   * <p>
   * Guarded by cnIndexDBLock
   * @GuardedBy("cnIndexDBLock")
   */
  private JEChangeNumberIndexDB cnIndexDB;
  private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -96,6 +100,15 @@
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
  /**
   * The purge delay (in milliseconds). Records in the changelog DB that are
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger =
      new AtomicReference<ChangelogDBPurger>();
  private volatile long latestPurgeDate;
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
@@ -312,13 +325,9 @@
      initializeChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
        final ChangeNumberIndexer indexer =
            new ChangeNumberIndexer(this, changelogState);
        if (cnIndexer.compareAndSet(null, indexer))
        {
          indexer.start();
        }
        startIndexer(changelogState);
      }
      setPurgeDelay(replicationServer.getPurgeDelay());
    }
    catch (ChangelogException e)
    {
@@ -374,12 +383,17 @@
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.get();
    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
    if (indexer != null)
    {
      indexer.initiateShutdown();
      cnIndexer.compareAndSet(indexer, null);
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
    }
    try
    {
      shutdownCNIndexDB();
@@ -581,7 +595,7 @@
      {
        try
        {
          cnIndexDB.clear(baseDN);
          cnIndexDB.removeDomain(baseDN);
        }
        catch (ChangelogException e)
        {
@@ -607,7 +621,9 @@
        firstException = e;
      }
      else if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    if (firstException != null)
@@ -618,18 +634,24 @@
  /** {@inheritDoc} */
  @Override
  public void setPurgeDelay(long delay)
  public void setPurgeDelay(long purgeDelayInMillis)
  {
    final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
    if (cnIndexDB != null)
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
    if (purgeDelayInMillis > 0)
    {
      cnIndexDB.setPurgeDelay(delay);
    }
    for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
    {
      for (JEReplicaDB replicaDB : domainMap.values())
      purger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, purger))
      {
        replicaDB.setPurgeDelay(delay);
        purger.start();
      } // otherwise a purger was already running
    }
    else
    {
      purger = cnPurger.getAndSet(null);
      if (purger != null)
      {
        purger.initiateShutdown();
      }
    }
  }
@@ -639,19 +661,13 @@
  public void setComputeChangeNumber(boolean computeChangeNumber)
      throws ChangelogException
  {
    final ChangeNumberIndexer indexer;
    if (computeChangeNumber)
    {
      final ChangelogState changelogState = dbEnv.readChangelogState();
      indexer = new ChangeNumberIndexer(this, changelogState);
      if (cnIndexer.compareAndSet(null, indexer))
      {
        indexer.start();
      }
      startIndexer(dbEnv.readChangelogState());
    }
    else
    {
      indexer = cnIndexer.getAndSet(null);
      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
      if (indexer != null)
      {
        indexer.initiateShutdown();
@@ -659,48 +675,34 @@
    }
  }
  private void startIndexer(final ChangelogState changelogState)
  {
    final ChangeNumberIndexer indexer =
        new ChangeNumberIndexer(this, changelogState);
    if (cnIndexer.compareAndSet(null, indexer))
    {
      indexer.start();
    }
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
  {
    long latest = 0;
    for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      if (latest == 0 || latest < replicaDB.getLatestTrimDate())
      {
        latest = replicaDB.getLatestTrimDate();
      }
    }
    return latest;
    return latestPurgeDate;
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    return getChangeNumberIndexDB(true);
  }
  /**
   * Returns the {@link ChangeNumberIndexDB} object.
   *
   * @param startTrimmingThread
   *          whether the trimming thread should be started
   * @return the {@link ChangeNumberIndexDB} object
   */
  ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB == null)
      {
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
          if (startTrimmingThread)
          {
            cnIndexDB.startTrimmingThread();
          }
          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
        }
        catch (Exception e)
        {
@@ -830,4 +832,83 @@
    }
    // TODO save this state in the changelogStateDB?
  }
  /**
   * The thread purging the changelogDB on a regular interval. Records are
   * purged from the changelogDB is they are older than a delay specified in
   * seconds. The purge process works in two steps:
   * <ol>
   * <li>first purge the changeNumberIndexDB and retrieve information to drive
   * replicaDBs purging</li>
   * <li>proceed to purge each replicaDBs based on the information collected
   * when purging the changeNumberIndexDB</li>
   * </ol>
   */
  private final class ChangelogDBPurger extends DirectoryThread
  {
    protected ChangelogDBPurger()
    {
      super("changelog DB purger");
    }
    /** {@inheritDoc} */
    @Override
    public void run()
    {
      // initialize CNIndexDB
      getChangeNumberIndexDB();
      while (!isShutdownInitiated())
      {
        try
        {
          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
          if (localCNIndexDB == null)
          { // shutdown has been called
            return;
          }
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final MultiDomainServerState purgeUpToCookie =
              localCNIndexDB.purgeUpTo(purgeTimestamp);
          if (purgeUpToCookie == null)
          { // this can happen when the change number index DB is empty
            continue;
          }
          /*
           * Drive purge of the replica DBs by the oldest non purged cookie in
           * the change number index DB.
           */
          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
              : domainToReplicaDBs.entrySet())
          {
            final DN baseDN = entry1.getKey();
            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
            {
              final Integer serverId = entry2.getKey();
              final JEReplicaDB replicaDB = entry2.getValue();
              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
            }
          }
          latestPurgeDate = purgeTimestamp;
          // purge delay is specified in seconds so it should not be a problem
          // to sleep for 500 millis
          sleep(500);
        }
        catch (Exception e)
        {
          logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
              .get(stackTraceToSingleLineString(e)));
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
        }
      }
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
   * <p>
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread loops
   * on {@link #flush()} and {@link #trim()}.
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
   * to the db.</dd>
   * <dt>trim()</dt>
   * <dd>deletes from the DB a number of changes that are older than a certain
   * date.</dd>
   * </dl>
   * update synchronously add the update to this list. A dedicated thread
   * flushes this blocking queue.
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #clear()}
   * {@link #flush()} or {@link #trim()}. This can happen with the thread
   * flushing the queue, on shutdown or on cursor opening, a thread calling
   * clear(), etc.
   * Used to prevent race conditions between threads calling {@link #flush()}.
   * This can happen with the thread flushing the queue, or else on shutdown.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  private long latestTrimDate = 0;
  /**
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   */
  private long trimAge;
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
@@ -166,15 +147,14 @@
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + serverId
        + ") for domain \"" + baseDN + "\"");
            + replicationServer.getServerId()
            + ") flusher thread for Replica DS(" + serverId
            + ") for domain \"" + baseDN + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
  }
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   * Flushes the replicaDB queue from memory to stable storage.
   */
  @Override
  public void run()
@@ -350,7 +328,6 @@
        try
        {
          flush();
          trim();
        }
        catch (ChangelogException end)
        {
@@ -390,55 +367,26 @@
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
   * Synchronously purge changes older than purgeCSN from this replicaDB.
   *
   * @param purgeCSN
   *          The CSN up to which changes can be purged. No purging happens when
   *          it is null.
   * @throws ChangelogException
   *           In case of database problem.
   */
  public long getLatestTrimDate()
  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
  {
    return latestTrimDate;
  }
  /**
   * Trim old changes from this replicationServer database.
   * @throws ChangelogException In case of database problem.
   */
  private void trim() throws ChangelogException
  {
    if (trimAge == 0)
    if (purgeCSN == null)
    {
      return;
    }
    latestTrimDate = TimeThread.getTime() - trimAge;
    CSN trimDate = new CSN(latestTrimDate, 0, 0);
    // Find the last CSN before the trimDate, in the Database.
    CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    for (int i = 0; i < 100; i++)
    {
      /*
       * Perform at least some trimming regardless of the flush backlog. Then
       * continue trim iterations while the flush backlog is low (below the
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && isQueueAboveLowMark())
      {
        break;
      }
      /*
       * the trim is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then
       * commit.
       * the purge is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then commit.
       */
      /*
       * Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
            return;
          }
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
          {
            cursor.delete();
          }
@@ -490,37 +438,31 @@
    }
  }
  private boolean isQueueAboveLowMark()
  {
    final int lowMarkBytes = queueMaxBytes / 5;
    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
    return bytesUsed > lowMarkBytes;
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   *
   * <p>
   * @GuardedBy("flushLock")
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void flush() throws ChangelogException
  private void flush() throws ChangelogException
  {
    try
    {
      synchronized (flushLock)
      {
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, move on to the trim phase
          // nothing to persist, check if shutdown was invoked
          return;
        }
        // Try to see if there are more changes and persist them all.
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        changes.add(change);
        msgQueue.drainTo(changes);
@@ -604,15 +546,6 @@
  }
  /**
   * Set the Purge delay for this db Handler.
   * @param delay The purge delay in Milliseconds.
   */
  public void setPurgeDelay(long delay)
  {
    trimAge = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   * @throws ChangelogException When an exception occurs while removing the
   * changes from the DB.
@@ -636,13 +569,15 @@
  }
  /**
   * Return the size of the msgQueue (the memory cache of the ReplicaDB).
   * Return the number of records of this replicaDB.
   * <p>
   * For test purpose.
   * @return The memory queue size.
   *
   * @return The number of records of this replicaDB.
   */
  int getQueueSize()
  long getNumberRecords()
  {
    return this.msgQueue.size();
    return db.getNumberRecords();
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -73,15 +73,6 @@
      // we didn't find it in the db
      cursor = null;
    }
    if (cursor == null)
    {
      // flush the queue into the db
      replicaDB.flush();
      // look again in the db
      cursor = db.openReadCursor(startAfterCSN);
    }
  }
  /** {@inheritDoc} */
@@ -96,15 +87,7 @@
  public boolean next() throws ChangelogException
  {
    final ReplServerDBCursor localCursor = cursor;
    if (localCursor != null)
    {
      currentChange = localCursor.next();
    }
    else
    {
      currentChange = null;
    }
    currentChange = localCursor != null ? localCursor.next() : null;
    if (currentChange != null)
    {
@@ -114,12 +97,8 @@
    {
      synchronized (this)
      {
        if (cursor != null)
        {
          cursor.close();
          cursor = null;
        }
        replicaDB.flush();
        closeCursor();
        // previously exhausted cursor must be able to reinitialize themselves
        cursor = db.openReadCursor(lastNonNullCurrentCSN);
        currentChange = cursor.next();
        if (currentChange != null)
@@ -137,13 +116,17 @@
  {
    synchronized (this)
    {
      if (cursor != null)
      {
        cursor.close();
        cursor = null;
      }
      closeCursor();
      this.replicaDB = null;
      this.db = null;
    }
  }
  private void closeCursor()
  {
    if (cursor != null)
    {
      cursor.close();
      cursor = null;
    }
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -943,4 +943,13 @@
    return db == null || !db.getEnvironment().isValid();
  }
  /**
   * Returns the number of records in this DB.
   *
   * @return the number of records in this DB.
   */
  long getNumberRecords()
  {
    return db.count();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -67,7 +67,10 @@
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.*;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -168,7 +171,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"})
  public void ECLReplicationServerTest() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // let's enable ECl manually now that we tested that ECl is not available
    ECLWorkflowElement wfe =
        (ECLWorkflowElement) DirectoryServer
@@ -189,7 +192,7 @@
  @Test(enabled=false, dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerTest1() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();
  }
@@ -204,7 +207,7 @@
  @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerTest3() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
@@ -263,7 +266,7 @@
  @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest3() throws Exception
  {
    getCNIndexDB().setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Test all types of ops.
    ECLAllOps(); // Do not clean the db for the next test
@@ -347,8 +350,7 @@
  @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
  public void ECLReplicationServerFullTest15() throws Exception
  {
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    cnIndexDB.setPurgeDelay(0);
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write 4 changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
@@ -369,8 +371,9 @@
    ECLCompatTestLimitsAndAdd(1, 8, 4);
    // Test CNIndexDB is purged when replication change log is purged
    cnIndexDB.setPurgeDelay(1);
    cnIndexDB.trim(null);
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    cnIndexDB.purgeUpTo(Long.MAX_VALUE);
    assertTrue(cnIndexDB.isEmpty());
    ECLPurgeCNIndexDBAfterChangelogClear();
    // Test first and last are updated
@@ -896,7 +899,7 @@
          null);
      cnt++;
    }
    while (cnt < 100 // wait at most 1s
    while (cnt < 300 // wait at most 3s
        && op.getSearchEntries().size() != expectedNbEntries);
    final List<SearchResultEntry> entries = op.getSearchEntries();
    assertThat(entries).hasSize(expectedNbEntries);
@@ -1951,16 +1954,6 @@
    clearChangelogDB(replicationServer);
  }
  @AfterTest
  public void setPurgeDelayToInitialValue() throws Exception
  {
    JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    if (cnIndexDB != null)
    {
      cnIndexDB.setPurgeDelay(1);
    }
  }
  /**
   * After the tests stop the replicationServer.
   */
@@ -2461,10 +2454,9 @@
    String tn = "ECLPurgeCNIndexDBAfterChangelogClear";
    debugInfo(tn, "Starting test\n\n");
    JEChangeNumberIndexDB cnIndexDB =
        (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    assertEquals(cnIndexDB.count(), 8);
    cnIndexDB.setPurgeDelay(1000);
    replicationServer.getChangelogDB().setPurgeDelay(1000);
    clearChangelogDB(replicationServer);
@@ -2620,11 +2612,7 @@
  private JEChangeNumberIndexDB getCNIndexDB()
  {
    if (replicationServer != null)
    {
      return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
    }
    return null;
    return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -447,11 +447,6 @@
    {
      final ReplicatedUpdateMsg msg = msgs[i];
      final ChangeNumberIndexRecord record = allValues.get(i);
      if (previousCookie.isEmpty())
      {
        // ugly hack to go round strange legacy code @see OPENDJ-67
        previousCookie.replace(record.getBaseDN(), new ServerState());
      }
      // check content in order
      String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -26,16 +26,22 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*;
@@ -48,9 +54,16 @@
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
{
  private static final String value1 = "value1";
  private static final String value2 = "value2";
  private static final String value3 = "value3";
  private final MultiDomainServerState previousCookie =
      new MultiDomainServerState();
  private final List<String> cookies = new ArrayList<String>();
  @BeforeMethod
  public void clearCookie()
  {
    previousCookie.clear();
    cookies.clear();
  }
  /**
   * This test makes basic operations of a JEChangeNumberIndexDB:
@@ -67,12 +80,11 @@
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEChangeNumberIndexDB cnIndexDB = null;
    try
    {
      replicationServer = newReplicationServer();
      cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0); // disable purging
      // Prepare data to be stored in the db
      DN baseDN1 = DN.decode("o=baseDN1");
@@ -82,9 +94,10 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add records
      long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
                 addRecord(cnIndexDB, value2, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
                 addRecord(cnIndexDB, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
      // The ChangeNumber should not get purged
      final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
@@ -94,11 +107,11 @@
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
      try
      {
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1);
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2);
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3);
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
        assertFalse(cursor.next());
      }
      finally
@@ -106,14 +119,13 @@
        StaticUtils.close(cursor);
      }
      // Now test that the trimming thread does its job => start it
      cnIndexDB.setPurgeDelay(100);
      cnIndexDB.startTrimmingThread();
      // Check the db is cleared.
      while (cnIndexDB.count() > 1)
      // Now test that purging removes all changes bar the last one
      changelogDB.setPurgeDelay(1);
      int count = 0;
      while (cnIndexDB.count() > 1 && count < 100)
      {
        Thread.yield();
        Thread.sleep(10);
        count++;
      }
      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
    }
@@ -123,10 +135,14 @@
    }
  }
  private long addRecord(JEChangeNumberIndexDB cnIndexDB, String cookie, DN baseDN, CSN csn)
      throws ChangelogException
  private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
  {
    return cnIndexDB.addRecord(new ChangeNumberIndexRecord(cookie, baseDN, csn));
    final String cookie = previousCookie.toString();
    cookies.add(cookie);
    final long changeNumber = cnIndexDB.addRecord(
        new ChangeNumberIndexRecord(cookie, baseDN, csn));
    previousCookie.update(baseDN, csn);
    return changeNumber;
  }
  private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
@@ -136,11 +152,11 @@
    assertEquals(record.getPreviousCookie(), cookie);
  }
  private JEChangeNumberIndexDB getCNIndexDBNoTrimming(ReplicationServer rs) throws ChangelogException
  private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
  {
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    final JEChangeNumberIndexDB cnIndexDB =
        (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(false);
        (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
    assertTrue(cnIndexDB.isEmpty());
    return cnIndexDB;
  }
@@ -160,12 +176,11 @@
  void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEChangeNumberIndexDB cnIndexDB = null;
    try
    {
      replicationServer = newReplicationServer();
      cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0);
      // Prepare data to be stored in the db
@@ -176,9 +191,10 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add records
      long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
      long cn2 = addRecord(cnIndexDB, value2, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
      long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
      // Checks
      assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
@@ -187,9 +203,9 @@
      assertEquals(cnIndexDB.count(), 3, "Db count");
      assertFalse(cnIndexDB.isEmpty());
      assertEquals(getPreviousCookie(cnIndexDB, cn1), value1);
      assertEquals(getPreviousCookie(cnIndexDB, cn2), value2);
      assertEquals(getPreviousCookie(cnIndexDB, cn3), value3);
      assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
      assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
      assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -200,7 +216,7 @@
      cursor = cnIndexDB.getCursorFrom(cn3);
      assertCursorReadsInOrder(cursor, cn3);
      cnIndexDB.clear(null);
      cnIndexDB.removeDomain(null);
      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
      // Check the db is cleared.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,7 +97,7 @@
      //--
      // Iterator tests with changes persisted
      waitChangesArePersisted(replicaDB);
      waitChangesArePersisted(replicaDB, 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
@@ -108,7 +108,7 @@
      //--
      // Cursor tests with changes persisted
      replicaDB.add(update4);
      waitChangesArePersisted(replicaDB);
      waitChangesArePersisted(replicaDB, 4);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      // Test cursor from existing CSN
@@ -116,7 +116,7 @@
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4]);
      replicaDB.setPurgeDelay(1);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      int count = 0;
      boolean purgeSucceeded = false;
@@ -141,16 +141,27 @@
    }
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted) throws Exception
  {
    final int expected = 0;
    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted, int counterWindow) throws Exception
  {
    // one counter record is inserted every time "counterWindow"
    // records have been inserted
    int expectedNbRecords =
        nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
    int count = 0;
    while (replicaDB.getQueueSize() != expected && count < 100)
    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
    {
      Thread.sleep(10);
      count++;
    }
    assertEquals(replicaDB.getQueueSize(), expected);
    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
@@ -204,8 +215,9 @@
      assertNull(cursor.getRecord());
      for (int i = 1; i < csns.length; i++)
      {
        assertTrue(cursor.next());
        assertEquals(cursor.getRecord().getCSN(), csns[i]);
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        assertTrue(cursor.next(), msg);
        assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
      }
      assertFalse(cursor.next());
      assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
@@ -274,11 +286,12 @@
  {
    ReplicationServer replicationServer = null;
    DBCursor<UpdateMsg> cursor = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
      for (int i = 0; i < 5; i++)
@@ -288,7 +301,7 @@
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
      }
      replicaDB.flush();
      waitChangesArePersisted(replicaDB, 4);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
@@ -307,6 +320,8 @@
    finally
    {
      StaticUtils.close(cursor);
      if (replicaDB != null)
        replicaDB.shutdown();
      remove(replicationServer);
    }
  }
@@ -334,7 +349,7 @@
    testGetOldestNewestCSNs(4000, 1000);
  }
  private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception
  private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
  {
    String tn = "testDBCount("+max+","+counterWindow+")";
    debugInfo(tn, "Starting test");
@@ -363,7 +378,7 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      replicaDB.flush();
      waitChangesArePersisted(replicaDB, max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -387,15 +402,13 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      replicaDB.flush();
      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      //
      replicaDB.setPurgeDelay(100);
      sleep(1000);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      String testcase = "AFTER PURGE (oldest, newest)=";
      debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());