From 137d1b4ba1992acdd880b61b1a03dc31f0cc1839 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Apr 2014 14:19:01 +0000
Subject: [PATCH] OPENDJ-1448 Remove JReplicaDB flushing thread and msg queue 

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java |   13 -
 opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java                                       |   16 +-
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                              |   11 -
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                     |   19 ++
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java                                       |  213 +-----------------------------
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java                             |   44 +-----
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java           |   29 ----
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java                                     |   34 ++--
 8 files changed, 65 insertions(+), 314 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index f702e8b..51abace 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions copyright 2013 ForgeRock AS.
+ *      Portions copyright 2013-2014 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
@@ -66,8 +66,6 @@
     return csn;
   }
 
-
-
   /**
    * Creates a message from a provided byte array.
    *
@@ -113,11 +111,7 @@
     }
   }
 
-
-
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public byte[] getBytes(short protocolVersion)
   {
@@ -141,4 +135,10 @@
     }
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 6ff04ad..a765e8c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -947,17 +947,6 @@
   }
 
   /**
-   * Get the queueSize for this replication server.
-   *
-   * @return The maximum size of the queues for this Replication Server
-   *
-   */
-  public int getQueueSize()
-  {
-    return this.config.getQueueSize();
-  }
-
-  /**
    * Creates the backend associated to this replication server.
    */
   private void createBackend() throws ConfigException
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 6c22e5c..fcf0ad4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -62,8 +62,6 @@
   private static int NO_KEY = 0;
 
   private DraftCNDB db;
-  /** FIXME What is this field used for? */
-  private volatile long oldestChangeNumber = NO_KEY;
   /**
    * The newest changenumber stored in the DB. It is used to avoid purging the
    * record with the newest changenumber. The newest record in the changenumber
@@ -96,14 +94,11 @@
   public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
   {
     db = new DraftCNDB(dbEnv);
-    final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
     final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
-    oldestChangeNumber = getChangeNumber(oldestRecord);
-    final long newestCN = getChangeNumber(newestRecord);
-    newestChangeNumber = newestCN;
+    newestChangeNumber = getChangeNumber(newestRecord);
     // initialization of the lastGeneratedChangeNumber from the DB content
     // if DB is empty => last record does not exist => default to 0
-    lastGeneratedChangeNumber = new AtomicLong(newestCN);
+    lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
 
     // Monitoring registration
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -117,7 +112,7 @@
     {
       return record.getChangeNumber();
     }
-    return 0;
+    return NO_KEY;
   }
 
   /** {@inheritDoc} */
@@ -198,19 +193,11 @@
    */
   public void shutdown()
   {
-    if (shutdown.get())
+    if (shutdown.compareAndSet(false, true))
     {
-      return;
+      db.shutdown();
+      DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
-
-    shutdown.set(true);
-    synchronized (this)
-    {
-      notifyAll();
-    }
-
-    db.shutdown();
-    DirectoryServer.deregisterMonitorProvider(dbMonitor);
   }
 
   /**
@@ -236,11 +223,6 @@
       while (!mustShutdown(shutdown) && cursor.next())
       {
         final ChangeNumberIndexRecord record = cursor.currentRecord();
-        if (record.getChangeNumber() != oldestChangeNumber)
-        {
-          oldestChangeNumber = record.getChangeNumber();
-        }
-
         if (record.getChangeNumber() != newestChangeNumber
             && record.getCSN().isOlderThan(purgeCSN))
         {
@@ -293,14 +275,9 @@
     final DraftCNDBCursor cursor = db.openDeleteCursor();
     try
     {
-      boolean isOldestRecord = true;
       while (!mustShutdown(shutdown) && cursor.next())
       {
         final ChangeNumberIndexRecord record = cursor.currentRecord();
-        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
-        {
-          oldestChangeNumber = record.getChangeNumber();
-        }
         if (record.getChangeNumber() == newestChangeNumber)
         {
           // do not purge the newest record to avoid having the last generated
@@ -312,10 +289,6 @@
         {
           cursor.delete();
         }
-        else
-        {
-          isOldestRecord = false;
-        }
       }
     }
     catch (ChangelogException e)
@@ -398,7 +371,7 @@
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + ": " + oldestChangeNumber + " "
+    return getClass().getSimpleName() + ", newestChangeNumber="
         + newestChangeNumber;
   }
 
@@ -411,8 +384,7 @@
   public void clear() throws ChangelogException
   {
     db.clear();
-    oldestChangeNumber = getChangeNumber(db.readFirstRecord());
-    newestChangeNumber = getChangeNumber(db.readLastRecord());
+    newestChangeNumber = NO_KEY;
   }
 
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index b1e2d63..dd337e5 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -385,11 +385,13 @@
     if (indexer != null)
     {
       indexer.initiateShutdown();
+      indexer.interrupt();
     }
     final ChangelogDBPurger purger = cnPurger.getAndSet(null);
     if (purger != null)
     {
       purger.initiateShutdown();
+      purger.interrupt();
     }
 
     try
@@ -417,6 +419,23 @@
 
     if (dbEnv != null)
     {
+      // wait for shutdown of the threads holding cursors
+      try
+      {
+        if (indexer != null)
+        {
+          indexer.join();
+        }
+        if (purger != null)
+        {
+          purger.join();
+        }
+      }
+      catch (InterruptedException e)
+      {
+        // do nothing: we are already shutting down
+      }
+
       dbEnv.shutdown();
     }
 
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index bbde1d1..19406bd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@
 
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
@@ -52,8 +48,6 @@
 import org.opends.server.types.InitializationException;
 
 import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -67,7 +61,7 @@
  * <p>
  * This class publish some monitoring information below cn=monitor.
  */
-public class JEReplicaDB implements Runnable
+public class JEReplicaDB
 {
 
   /**
@@ -89,29 +83,7 @@
 
   }
 
-  /**
-   * The msgQueue holds all the updates not yet saved to stable storage.
-   * <p>
-   * This blocking queue is only used as a temporary placeholder so that the
-   * write in the stable storage can be grouped for efficiency reason. Adding an
-   * update synchronously add the update to this list. A dedicated thread
-   * flushes this blocking queue.
-   * <p>
-   * Changes are not read back by replicationServer threads that are responsible
-   * for pushing the changes to other replication server or to LDAP server
-   */
-  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
-    new LinkedBlockingQueue<UpdateMsg>();
-
-  /**
-   * Semaphore used to limit the number of bytes used in memory by the queue.
-   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
-   * size of msgQueue becomes larger than the available permits and will resume
-   * only when the number of available permits allow it.
-   */
-  private final Semaphore queueSizeBytes;
-  private final int queueMaxBytes;
-
+  private final AtomicBoolean shutdown = new AtomicBoolean(false);
   private ReplicationDB db;
   /**
    * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -122,12 +94,6 @@
   private int serverId;
   private DN baseDN;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
-  private DirectoryThread thread;
-  /**
-   * Used to prevent race conditions between threads calling {@link #flush()}.
-   * This can happen with the thread flushing the queue, or else on shutdown.
-   */
-  private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
   /**
@@ -147,15 +113,8 @@
     this.replicationServer = replicationServer;
     this.serverId = serverId;
     this.baseDN = baseDN;
-    queueMaxBytes = replicationServer.getQueueSize() * 200;
-    queueSizeBytes = new Semaphore(queueMaxBytes);
     db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
     csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
-    thread = new DirectoryThread(this, "Replication server RS("
-            + replicationServer.getServerId()
-            + ") flusher thread for Replica DS(" + serverId
-            + ") for domain \"" + baseDN + "\"");
-    thread.start();
 
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
     DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -174,34 +133,14 @@
    */
   public void add(UpdateMsg updateMsg) throws ChangelogException
   {
-    if (thread.isShutdownInitiated())
+    if (shutdown.get())
     {
       throw new ChangelogException(
           ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
               .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
     }
 
-    final int msgSize = updateMsg.size();
-    if (msgSize < queueMaxBytes)
-    {
-      try
-      {
-        queueSizeBytes.acquire(msgSize);
-      }
-      catch (InterruptedException e)
-      {
-        throw new ChangelogException(
-            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
-                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
-                stackTraceToSingleLineString(e)));
-      }
-    }
-    else
-    {
-      // edge case with a very large message
-      collectAllPermits();
-    }
-    msgQueue.add(updateMsg);
+    db.addEntry(updateMsg);
 
     final CSNLimits limits = csnLimits;
     final boolean updateNew = limits.newestCSN == null
@@ -215,22 +154,6 @@
     }
   }
 
-  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
-  private void collectAllPermits()
-  {
-    int collectedPermits = queueSizeBytes.drainPermits();
-    while (collectedPermits != queueMaxBytes)
-    {
-      // Do not use Thread.sleep() because:
-      // 1) it is expected the permits will be released very soon
-      // 2) we want to collect all the permits, so do not leave a chance to
-      // other threads to steal them from us.
-      // 3) we want to keep low latency
-      Thread.yield();
-      collectedPermits += queueSizeBytes.drainPermits();
-    }
-  }
-
   /**
    * Get the oldest CSN that has not been purged yet.
    *
@@ -289,80 +212,10 @@
    */
   public void shutdown()
   {
-    if (thread.isShutdownInitiated())
+    if (shutdown.compareAndSet(false, true))
     {
-      return;
-    }
-
-    thread.initiateShutdown();
-
-    while (msgQueue.size() != 0)
-    {
-      try
-      {
-        flush();
-      }
-      catch (ChangelogException e)
-      {
-        // We are already shutting down
-        logError(e.getMessageObject());
-      }
-    }
-
-    db.shutdown();
-    DirectoryServer.deregisterMonitorProvider(dbMonitor);
-  }
-
-  /**
-   * Flushes the replicaDB queue from memory to stable storage.
-   */
-  @Override
-  public void run()
-  {
-    thread.startWork();
-
-    try
-    {
-      while (!thread.isShutdownInitiated())
-      {
-        try
-        {
-          flush();
-        }
-        catch (ChangelogException end)
-        {
-          stop(end);
-          break;
-        }
-      }
-
-      try
-      {
-        // call flush a last time before exiting to make sure that
-        // no change was forgotten in the msgQueue
-        flush();
-      }
-      catch (ChangelogException e)
-      {
-        stop(e);
-      }
-    }
-    finally
-    {
-      thread.stopWork();
-    }
-  }
-
-  private void stop(Exception e)
-  {
-    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-        .get(stackTraceToSingleLineString(e)));
-
-    thread.initiateShutdown();
-
-    if (replicationServer != null)
-    {
-      replicationServer.shutdown();
+      db.shutdown();
+      DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
   }
 
@@ -401,7 +254,7 @@
       {
         for (int j = 0; j < 50; j++)
         {
-          if (thread.isShutdownInitiated())
+          if (shutdown.get())
           {
             return;
           }
@@ -428,7 +281,7 @@
         // mark shutdown for this db so that we don't try again to
         // stop it from cursor.close() or methods called by cursor.close()
         cursor.abort();
-        thread.initiateShutdown();
+        shutdown.set(true);
         throw e;
       }
       finally
@@ -439,47 +292,6 @@
   }
 
   /**
-   * Flush a number of updates from the memory list to the stable storage.
-   * <p>
-   * Flush is done by chunk sized to 500 messages, starting from the beginning
-   * of the list.
-   * <p>
-   * @GuardedBy("flushLock")
-   * @throws ChangelogException
-   *           If a database problem happened
-   */
-  private void flush() throws ChangelogException
-  {
-    try
-    {
-      synchronized (flushLock)
-      {
-        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
-        if (change == null)
-        {
-          // nothing to persist, check if shutdown was invoked
-          return;
-        }
-
-        // Try to see if there are more changes and persist them all.
-        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
-        changes.add(change);
-        msgQueue.drainTo(changes);
-
-        int totalSize = db.addEntries(changes);
-        // do not release more than queue max size permits
-        // (be careful of the edge case with the very large message)
-        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
-      }
-    }
-    catch (InterruptedException e)
-    {
-      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
-          .get(stackTraceToSingleLineString(e)));
-    }
-  }
-
-  /**
    * This internal class is used to implement the Monitoring capabilities of the
    * ReplicaDB.
    */
@@ -501,9 +313,6 @@
       {
         create(attributes, "last-change", encode(limits.newestCSN));
       }
-      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
-      create(attributes, "queue-size-bytes",
-          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
       return attributes;
     }
 
@@ -552,8 +361,6 @@
    */
   public void clear() throws ChangelogException
   {
-    collectAllPermits();
-    msgQueue.clear(); // this call should not do anything at all
     db.clear();
     csnLimits = new CSNLimits(null, null);
   }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 818fba5..a1dd6ec 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -28,7 +28,6 @@
 
 import java.io.Closeable;
 import java.io.UnsupportedEncodingException;
-import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -182,15 +181,14 @@
   }
 
   /**
-   * add a list of changes to the underlying db.
+   * add one change to the underlying db.
    *
-   * @param changes
-   *          The list of changes to add to the underlying db.
-   * @return the total size of all the changes
+   * @param change
+   *          The change to add to the underlying db.
    * @throws ChangelogException
    *           If a database problem happened
    */
-  public int addEntries(List<UpdateMsg> changes) throws ChangelogException
+  public void addEntry(UpdateMsg change) throws ChangelogException
   {
     dbCloseLock.readLock().lock();
     try
@@ -198,26 +196,22 @@
       // If the DB has been closed then return immediately.
       if (isDBClosed())
       {
-        return 0;
+        return;
       }
 
-      int totalSize = 0;
-      for (UpdateMsg change : changes)
-      {
-        final DatabaseEntry key = createReplicationKey(change.getCSN());
-        final DatabaseEntry data = new ReplicationData(change);
+      final DatabaseEntry key = createReplicationKey(change.getCSN());
+      final DatabaseEntry data = new ReplicationData(change);
 
-        insertCounterRecordIfNeeded(change.getCSN());
-        db.put(null, key, data);
-        counterCurrValue++;
-
-        totalSize += change.size();
-      }
-      return totalSize;
+      insertCounterRecordIfNeeded(change.getCSN());
+      db.put(null, key, data);
+      counterCurrValue++;
     }
     catch (DatabaseException e)
     {
-      throw new ChangelogException(e);
+      throw new ChangelogException(
+          ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(
+              change.toString(), String.valueOf(baseDN),
+              String.valueOf(serverId), stackTraceToSingleLineString(e)));
     }
     finally
     {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 10aad5f..788087b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -48,8 +48,7 @@
 import static org.testng.Assert.*;
 
 /**
- * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : -
- * periodic trim - call to clear method()
+ * Test the JEChangeNumberIndexDB class.
  */
 @SuppressWarnings("javadoc")
 public class JEChangeNumberIndexDBTest extends ReplicationTestCase
@@ -71,13 +70,13 @@
    * <li>create the db</li>
    * <li>add records</li>
    * <li>read them with a cursor</li>
-   * <li>set a very short trim period</li>
-   * <li>wait for the db to be trimmed / here since the changes are not stored
+   * <li>set a very short purge period</li>
+   * <li>wait for the db to be purged / here since the changes are not stored
    * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
    * </ol>
    */
   @Test
-  void testPurge() throws Exception
+  public void testPurge() throws Exception
   {
     ReplicationServer replicationServer = null;
     try
@@ -172,8 +171,8 @@
    * <li>clear the db</li>
    * </ol>
    */
-  @Test()
-  void testClear() throws Exception
+  @Test
+  public void testClear() throws Exception
   {
     ReplicationServer replicationServer = null;
     try
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 851afa2..e431d97 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,8 +97,6 @@
 
       //--
       // Iterator tests with changes persisted
-      waitChangesArePersisted(replicaDB, 3);
-
       assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
       assertNotFound(replicaDB, csns[4]);
 
@@ -108,7 +106,6 @@
       //--
       // Cursor tests with changes persisted
       replicaDB.add(update4);
-      waitChangesArePersisted(replicaDB, 4);
 
       assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
       // Test cursor from existing CSN
@@ -141,29 +138,6 @@
     }
   }
 
-  private void waitChangesArePersisted(JEReplicaDB replicaDB,
-      int nbRecordsInserted) throws Exception
-  {
-    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
-  }
-
-  private void waitChangesArePersisted(JEReplicaDB replicaDB,
-      int nbRecordsInserted, int counterWindow) throws Exception
-  {
-    // one counter record is inserted every time "counterWindow"
-    // records have been inserted
-    int expectedNbRecords =
-        nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
-
-    int count = 0;
-    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
-    {
-      Thread.sleep(10);
-      count++;
-    }
-    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
-  }
-
   static CSN[] newCSNs(int serverId, long timestamp, int number)
   {
     CSNGenerator gen = new CSNGenerator(serverId, timestamp);
@@ -301,7 +275,6 @@
           replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
         }
       }
-      waitChangesArePersisted(replicaDB, 4);
 
       cursor = replicaDB.generateCursorFrom(csns[0]);
       assertTrue(cursor.next());
@@ -378,7 +351,6 @@
         replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
         mySeqnum+=2;
       }
-      waitChangesArePersisted(replicaDB, max, counterWindow);
 
       assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
       assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -402,7 +374,6 @@
         replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
         mySeqnum+=2;
       }
-      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
 
       assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
       assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");

--
Gitblit v1.10.0