mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.50.2014 08908175782573c536cb485092e473c7f1729281
OPENDJ-1177 (CR-3278) Re-implement changelog purging logic

In JEReplicaDB, simplified the logic that handled the internal queue that is used before actually persisting UpdateMsg changes to the underlying Berkeley JE DB.
Simplified the publisher/consumer model (msgQueue.add() / msgQueue.remove()) by relying on a LinkedBlockingQueue and a semaphore, instead of many synchronized blocks and fields that cluttered this code.



JEReplicaDB.java:
Changed msgQueue from LinkedList to LinkedBlockingQueue.
Removed fields queueMaxSize, queueLowmark, queueHimark, queueLowmarkBytes, queueHimarkBytes, queueByteSize and replaced them all with queueSizeBytes Semaphore.
Removed clearQueue() and getChanges().
Added collectAllPermits().
Added immutable CSNLimits class to remove the need for synchronizing on oldest and newest CSNs.

ReplicationDB.java:
In addEntries(), now return the total size of the persisted messages (return type was void).

JEReplicaDBTest.java:
In testTrim(), allowed the test to finish + made the code clearer.

replication.properties:
Added an error message for adding a change to the JEReplicaDB.
4 files modified
452 ■■■■ changed files
opends/src/messages/messages/replication.properties 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 354 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 11 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 81 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -535,4 +535,8 @@
 expected the newest change number index record CSN '%s' to be equal to \
 the CSN read from the replica DBs '%s'
NOTICE_ECL_LOOKTHROUGH_LIMIT_EXCEEDED_238=This search operation has checked the \
 maximum of %d entries for matches
 maximum of %d entries for matches
SEVERE_ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB_239=Could not add \
 change %s to replicaDB %s %s because: %s
SEVERE_ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
 change %s to replicaDB %s %s because flushing thread is shutting down
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -30,6 +30,9 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -67,13 +70,33 @@
 */
public class JEReplicaDB implements Runnable
{
  /**
   * Class that allows atomically setting oldest and newest CSNs without
   * synchronization.
   *
   * @Immutable
   */
  private static final class CSNLimits
  {
    private final CSN oldestCSN;
    private final CSN newestCSN;
    public CSNLimits(CSN oldestCSN, CSN newestCSN)
    {
      this.oldestCSN = oldestCSN;
      this.newestCSN = newestCSN;
    }
  }
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This list is only used as a temporary placeholder so that the write in the
   * stable storage can be grouped for efficiency reason. Adding an update
   * synchronously add the update to this list. A dedicated thread loops on
   * flush() and trim().
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread loops
   * on {@link #flush()} and {@link #trim()}.
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
@@ -86,37 +109,35 @@
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedList<UpdateMsg> msgQueue =
    new LinkedList<UpdateMsg>();
  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
    new LinkedBlockingQueue<UpdateMsg>();
  /**
   * The High and low water mark for the max size of the msgQueue. The threads
   * calling add() method will be blocked if the size of msgQueue becomes larger
   * than the queueHimark and will resume only when the size of the msgQueue
   * goes below queueLowmark.
   * Semaphore used to limit the number of bytes used in memory by the queue.
   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
   * size of msgQueue becomes larger than the available permits and will resume
   * only when the number of available permits allow it.
   */
  private int queueMaxSize = 5000;
  private int queueLowmark = 1000;
  private int queueHimark = 4000;
  /**
   * The queue himark and lowmark in bytes, this is set to 100 times the himark
   * and lowmark in number of updates.
   */
  private int queueMaxBytes = 100 * queueMaxSize;
  private int queueLowmarkBytes = 100 * queueLowmark;
  private int queueHimarkBytes = 100 * queueHimark;
  /** The number of bytes currently in the queue. */
  private int queueByteSize = 0;
  private final Semaphore queueSizeBytes;
  private final int queueMaxBytes;
  private ReplicationDB db;
  private CSN oldestCSN;
  private CSN newestCSN;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
   *
   * @NonNull
   */
  private volatile CSNLimits csnLimits;
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #clear()}
   * {@link #flush()} or {@link #trim()}. This can happen with the thread
   * flushing the queue, on shutdown or on cursor opening, a thread calling
   * clear(), etc.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
@@ -146,16 +167,10 @@
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    final int queueSize = replicationServer.getQueueSize();
    queueMaxSize = queueSize;
    queueLowmark = queueSize / 5;
    queueHimark = queueSize * 4 / 5;
    queueMaxBytes = 200 * queueMaxSize;
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    oldestCSN = db.readOldestCSN();
    newestCSN = db.readNewestCSN();
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + serverId
@@ -167,61 +182,72 @@
  }
  /**
   * Add an update to the list of messages that must be saved to the db
   * managed by this db handler.
   * This method is blocking if the size of the list of message is larger
   * than its maximum.
   * Add an update to the list of messages that must be saved to the db managed
   * by this db handler. This method is blocking if the size of the list of
   * message is larger than its maximum.
   *
   * @param update The update that must be saved to the db managed by this db
   *               handler.
   * @param updateMsg
   *          The update message that must be saved to the db managed by this db
   *          handler.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void add(UpdateMsg update)
  public void add(UpdateMsg updateMsg) throws ChangelogException
  {
    synchronized (msgQueue)
    if (thread.isShutdownInitiated())
    {
      int size = msgQueue.size();
      if (size > queueHimark || queueByteSize > queueHimarkBytes)
      {
        msgQueue.notify();
      }
      throw new ChangelogException(
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
      while (size > queueMaxSize || queueByteSize > queueMaxBytes)
    final int msgSize = updateMsg.size();
    if (msgSize < queueMaxBytes)
    {
      try
      {
        try
        {
          msgQueue.wait(500);
        } catch (InterruptedException e)
        {
          // simply loop to try again.
        }
        size = msgQueue.size();
        queueSizeBytes.acquire(msgSize);
      }
      catch (InterruptedException e)
      {
        throw new ChangelogException(
            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
                stackTraceToSingleLineString(e)));
      }
    }
    else
    {
      // edge case with a very large message
      collectAllPermits();
    }
    msgQueue.add(updateMsg);
      queueByteSize += update.size();
      msgQueue.add(update);
      if (newestCSN == null || newestCSN.isOlderThan(update.getCSN()))
      {
        newestCSN = update.getCSN();
      }
      if (oldestCSN == null)
      {
        oldestCSN = update.getCSN();
      }
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
        || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
    if (updateOld || updateNew)
    {
      csnLimits = new CSNLimits(
          updateOld ? updateMsg.getCSN() : limits.oldestCSN,
          updateNew ? updateMsg.getCSN() : limits.newestCSN);
    }
  }
  /**
   * Get some changes out of the message queue of the LDAP server.
   * (from the beginning of the queue)
   * @param number the maximum number of messages to extract.
   * @return a List containing number changes extracted from the queue.
   */
  private List<UpdateMsg> getChanges(int number)
  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
  private void collectAllPermits()
  {
    synchronized (msgQueue)
    int collectedPermits = queueSizeBytes.drainPermits();
    while (collectedPermits != queueMaxBytes)
    {
      final int minAvailableNb = Math.min(number, msgQueue.size());
      return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb));
      // Do not use Thread.sleep() because:
      // 1) it is expected the permits will be released very soon
      // 2) we want to collect all the permits, so do not leave a chance to
      // other threads to steal them from us.
      // 3) we want to keep low latency
      Thread.yield();
      collectedPermits += queueSizeBytes.drainPermits();
    }
  }
@@ -232,7 +258,7 @@
   */
  public CSN getOldestCSN()
  {
    return oldestCSN;
    return csnLimits.oldestCSN;
  }
  /**
@@ -242,7 +268,7 @@
   */
  public CSN getNewestCSN()
  {
    return newestCSN;
    return csnLimits.newestCSN;
  }
  /**
@@ -252,9 +278,10 @@
   */
  public long getChangesCount()
  {
    if (newestCSN != null && oldestCSN != null)
    final CSNLimits limits = csnLimits;
    if (limits.newestCSN != null && limits.oldestCSN != null)
    {
      return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
      return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
    }
    return 0;
  }
@@ -276,36 +303,13 @@
  {
    if (startAfterCSN == null)
    {
      // flush any potential changes before opening the cursor
      flush();
    }
    return new JEReplicaDBCursor(db, startAfterCSN, this);
  }
  /**
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
   */
  private void clearQueue(int number)
  {
    synchronized (msgQueue)
    {
      int current = 0;
      while (current < number && !msgQueue.isEmpty())
      {
        UpdateMsg msg = msgQueue.remove(); // remove first
        queueByteSize -= msg.size();
        current++;
      }
      if (msgQueue.size() < queueLowmark
          && queueByteSize < queueLowmarkBytes)
      {
        msgQueue.notifyAll();
      }
    }
  }
  /**
   * Shutdown this ReplicaDB.
   */
  public void shutdown()
@@ -317,11 +321,6 @@
    thread.initiateShutdown();
    synchronized (msgQueue)
    {
      msgQueue.notifyAll();
    }
    while (msgQueue.size() != 0)
    {
      try
@@ -331,8 +330,7 @@
      catch (ChangelogException e)
      {
        // We are already shutting down
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(e)));
        logError(e.getMessageObject());
      }
    }
@@ -358,27 +356,8 @@
        {
          flush();
          trim();
          synchronized (msgQueue)
          {
            if (msgQueue.size() < queueLowmark
                && queueByteSize < queueLowmarkBytes)
            {
              try
              {
                msgQueue.wait(1000);
              }
              catch (InterruptedException e)
              {
                // Do not reset the interrupt flag here,
                // because otherwise JE will barf next time flush() is called:
                // JE 5.0.97 refuses to persist changes to the DB when invoked
                // from a Thread with the interrupt flag set to true.
              }
            }
          }
        }
        catch (Exception end)
        catch (ChangelogException end)
        {
          stop(end);
          break;
@@ -400,11 +379,6 @@
    {
      thread.stopWork();
    }
    synchronized (this)
    {
      notifyAll();
    }
  }
  private void stop(Exception e)
@@ -453,13 +427,34 @@
      trimDate = lastBeforeTrimDate;
    }
    final int queueLowMarkBytes = queueMaxBytes / 5;
    for (int i = 0; i < 100; i++)
    {
      /*
       * Perform at least some trimming regardless of the flush backlog. Then
       * continue trim iterations while the flush backlog is low (below the
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
      {
        break;
      }
      synchronized (flushLock)
      {
        /*
         * the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         * the trim is done by group in order to save some CPU, IO bandwidth and
         * DB caches: start the transaction then do a bunch of remove then
         * commit.
         */
        /*
         * Matt wrote: The record removal is done as a DB transaction and the
         * deleted records are only "deleted" on commit. While the txn/cursor is
         * open the records to be deleted will, I think, be pinned in the DB
         * cache. In other words, the larger the transaction (the more records
         * deleted during a single batch) the more DB cache will be used to
         * process the transaction.
         */
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
@@ -477,13 +472,13 @@
              return;
            }
            if (!csn.equals(newestCSN) && csn.isOlderThan(trimDate))
            if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
            {
              cursor.delete();
            }
            else
            {
              oldestCSN = csn;
              csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
              return;
            }
          }
@@ -515,32 +510,33 @@
   */
  public void flush() throws ChangelogException
  {
    int size;
    int chunksize = Math.min(queueMaxSize, 500);
    do
    try
    {
      synchronized(flushLock)
      synchronized (flushLock)
      {
        // get N (or less) messages from the queue to save to the DB
        // (from the beginning of the queue)
        List<UpdateMsg> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
        if (changes == null || (size = changes.size()) == 0)
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, move on to the trim phase
          return;
        }
        // save the change to the stable storage.
        db.addEntries(changes);
        // Try to see if there are more changes and persist them all.
        changes.add(change);
        msgQueue.drainTo(changes);
        // remove the changes from the list of changes to be saved
        // (remove from the beginning of the queue)
        clearQueue(changes.size());
        int totalSize = db.addEntries(changes);
        // do not release more than queue max size permits
        // (be careful of the edge case with the very large message)
        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
      }
      // loop while there are more changes in the queue
    } while (size == chunksize);
    }
    catch (InterruptedException e)
    {
      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
          .get(stackTraceToSingleLineString(e)));
    }
  }
  /**
@@ -556,25 +552,28 @@
    public List<Attribute> getMonitorData()
    {
      List<Attribute> attributes = new ArrayList<Attribute>();
      attributes.add(Attributes.create("replicationServer-database",
          String.valueOf(serverId)));
      attributes.add(Attributes.create("domain-name",
          baseDN.toNormalizedString()));
      if (oldestCSN != null)
      create(attributes, "replicationServer-database", String.valueOf(serverId));
      create(attributes, "domain-name", baseDN.toNormalizedString());
      final CSNLimits limits = csnLimits;
      if (limits.oldestCSN != null)
      {
        attributes.add(Attributes.create("first-change", encode(oldestCSN)));
        create(attributes, "first-change", encode(limits.oldestCSN));
      }
      if (newestCSN != null)
      if (limits.newestCSN != null)
      {
        attributes.add(Attributes.create("last-change", encode(newestCSN)));
        create(attributes, "last-change", encode(limits.newestCSN));
      }
      attributes.add(
          Attributes.create("queue-size", String.valueOf(msgQueue.size())));
      attributes.add(
          Attributes.create("queue-size-bytes", String.valueOf(queueByteSize)));
      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
      create(attributes, "queue-size-bytes",
          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
      return attributes;
    }
    private void create(List<Attribute> attributes, String name, String value)
    {
      attributes.add(Attributes.create(name, value));
    }
    private String encode(CSN csn)
    {
      return csn + " " + new Date(csn.getTime());
@@ -609,8 +608,9 @@
  @Override
  public String toString()
  {
    final CSNLimits limits = csnLimits;
    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
        + oldestCSN + " " + newestCSN;
        + limits.oldestCSN + " " + limits.newestCSN;
  }
  /**
@@ -631,12 +631,10 @@
  {
    synchronized(flushLock)
    {
      collectAllPermits();
      msgQueue.clear();
      queueByteSize = 0;
      db.clear();
      oldestCSN = db.readOldestCSN();
      newestCSN = db.readNewestCSN();
      csnLimits = new CSNLimits(null, null);
    }
  }
@@ -655,7 +653,7 @@
   * For test purpose.
   * @return The memory queue size.
   */
  public int getQueueSize()
  int getQueueSize()
  {
    return this.msgQueue.size();
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -186,10 +186,11 @@
   *
   * @param changes
   *          The list of changes to add to the underlying db.
   * @return the total size of all the changes
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void addEntries(List<UpdateMsg> changes) throws ChangelogException
  public int addEntries(List<UpdateMsg> changes) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -197,9 +198,10 @@
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return;
        return 0;
      }
      int totalSize = 0;
      for (UpdateMsg change : changes)
      {
        final DatabaseEntry key = createReplicationKey(change.getCSN());
@@ -208,7 +210,10 @@
        insertCounterRecordIfNeeded(change.getCSN());
        db.put(null, key, data);
        counterCurrValue++;
        totalSize += change.size();
      }
      return totalSize;
    }
    catch (DatabaseException e)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -86,7 +86,7 @@
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      final JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 5);
@@ -96,20 +96,8 @@
      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
      //--
      // Iterator tests with memory queue only populated
      // verify that memory queue is populated
      assertEquals(replicaDB.getQueueSize(), 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
      //--
      // Iterator tests with db only populated
      Thread.sleep(1000); // let the time for flush to happen
      // verify that memory queue is empty (all changes flushed in the db)
      assertEquals(replicaDB.getQueueSize(), 0);
      // Iterator tests with changes persisted
      waitChangesArePersisted(replicaDB);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
@@ -118,36 +106,34 @@
      assertEquals(replicaDB.getNewestCSN(), csns[2]);
      //--
      // Cursor tests with db and memory queue populated
      // all changes in the db - add one in the memory queue
      // Cursor tests with changes persisted
      replicaDB.add(update4);
      // verify memory queue contains this one
      assertEquals(replicaDB.getQueueSize(), 1);
      waitChangesArePersisted(replicaDB);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      // Test cursor from existing CSN at the limit between queue and db
      // Test cursor from existing CSN
      assertFoundInOrder(replicaDB, csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4]);
      replicaDB.setPurgeDelay(1);
      boolean purged = false;
      int count = 300;  // wait at most 60 seconds
      while (!purged && (count > 0))
      int count = 0;
      boolean purgeSucceeded = false;
      final CSN expectedNewestCSN = csns[3];
      do
      {
        CSN oldestCSN = replicaDB.getOldestCSN();
        CSN newestCSN = replicaDB.getNewestCSN();
        if (!oldestCSN.equals(csns[3]) || !newestCSN.equals(csns[3]))
        {
          TestCaseUtils.sleep(100);
        } else
        {
          purged = true;
        }
        Thread.sleep(10);
        final CSN oldestCSN = replicaDB.getOldestCSN();
        final CSN newestCSN = replicaDB.getNewestCSN();
        purgeSucceeded =
            oldestCSN.equals(expectedNewestCSN)
                && newestCSN.equals(expectedNewestCSN);
        count++;
      }
      // FIXME should add an assert here
      while (!purgeSucceeded && count < 100);
      assertTrue(purgeSucceeded);
    }
    finally
    {
@@ -155,6 +141,18 @@
    }
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception
  {
    final int expected = 0;
    int count = 0;
    while (replicaDB.getQueueSize() != expected && count < 100)
    {
      Thread.sleep(10);
      count++;
    }
    assertEquals(replicaDB.getQueueSize(), expected);
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
@@ -175,10 +173,10 @@
    return new ReplicationServer(conf);
  }
  private JEReplicaDB newReplicaDB(ReplicationServer replicationServer) throws Exception
  private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
  {
    JEChangelogDB changelogDB = (JEChangelogDB) replicationServer.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, replicationServer).getFirst();
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
  }
  private File createCleanDir() throws IOException
@@ -251,22 +249,19 @@
      CSN[] csns = newCSNs(1, 0, 3);
      // Add the changes
      // Add the changes and check they are here
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
      // Check they are here
      assertEquals(csns[0], replicaDB.getOldestCSN());
      assertEquals(csns[2], replicaDB.getNewestCSN());
      // Clear ...
      // Clear DB and check it is cleared.
      replicaDB.clear();
      // Check the db is cleared.
      assertEquals(null, replicaDB.getOldestCSN());
      assertEquals(null, replicaDB.getNewestCSN());
    }
    finally
    {