From 8170a25a086fb89f90a2b2b8613e7b7d82a47208 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 08 Aug 2013 13:37:11 +0000
Subject: [PATCH] Big refactoring of ReplicationDB to make it readable. Changed count() return type from int to long. Removed a useless parameter from ReplicationIterator ctor. Various code cleanups in other classes.

---
 /dev/null                                                                                         |   59 ---
 opends/src/server/org/opends/server/replication/server/ReplicationIterator.java                   |   15 
 opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java                    |  106 ++---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java |  300 +++++----------
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java                         |  625 +++++++++++++++++---------------
 opends/src/server/org/opends/server/replication/server/DbHandler.java                             |    4 
 opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java                      |   15 
 7 files changed, 512 insertions(+), 612 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 8231012..219204c 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -282,7 +282,7 @@
     {
       flush();
     }
-    return new ReplicationIterator(serverId, db, changeNumber, this);
+    return new ReplicationIterator(db, changeNumber, this);
   }
 
   /**
@@ -657,7 +657,7 @@
    * @param to   The upper (newer) change number.
    * @return The computed number of changes.
    */
-  public int getCount(ChangeNumber from, ChangeNumber to)
+  public long getCount(ChangeNumber from, ChangeNumber to)
   {
     // Now that we always keep the last ChangeNumber in the DB to avoid
     // expiring cookies too quickly, we need to check if the "to"
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index fb0155b..5ea35d9 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -32,6 +32,7 @@
 import static org.opends.server.util.StaticUtils.*;
 
 import java.io.Closeable;
+import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -50,6 +51,9 @@
  */
 public class ReplicationDB
 {
+  private static final int START = 0;
+  private static final int STOP = 1;
+
   private Database db = null;
   private ReplicationDbEnv dbenv = null;
   private ReplicationServer replicationServer;
@@ -60,7 +64,8 @@
    * The lock used to provide exclusive access to the thread that close the db
    * (shutdown or clear).
    */
-  private ReentrantReadWriteLock dbCloseLock;
+  private final ReentrantReadWriteLock dbCloseLock =
+      new ReentrantReadWriteLock(true);
 
   // Change counter management
   // The Db itself does not allow to count records between a start and an end
@@ -76,9 +81,9 @@
   // - a counter value : count of changes since previous counter record.
   //
   // A counter record has to follow the order of the db, so it needs to have
-  // a changenumber key that follow the order.
+  // a changenumber key that follows the order.
   // A counter record must have its own changenumber key since the Db does not
-  // support duplicate key (it is a compatibility breaker character of the DB).
+  // support duplicate keys (it is a compatibility breaker character of the DB).
   //
   // We define 2 conditions to store a counter record :
   // 1/- at least 'counterWindowSize' changes have been stored in the Db
@@ -88,7 +93,7 @@
 
 
   /** Current value of the counter. */
-  private int  counterCurrValue = 1;
+  private int counterCurrValue = 1;
 
   /**
    * When not null, the next change with a ts different from
@@ -100,7 +105,7 @@
    * The counter record will never be written to the db more often than each
    * counterWindowSize changes.
    */
-  private int  counterWindowSize = 1000;
+  private int counterWindowSize = 1000;
 
  /**
    * Creates a new database or open existing database that will be used
@@ -122,40 +127,37 @@
     this.replicationServer = replicationServer;
 
     // Get or create the associated ReplicationServerDomain and Db.
-    db = dbenv.getOrAddDb(serverId, baseDn,
-        replicationServer.getReplicationServerDomain(baseDn,
-        true).getGenerationId());
+    final ReplicationServerDomain domain =
+        replicationServer.getReplicationServerDomain(baseDn, true);
+    db = dbenv.getOrAddDb(serverId, baseDn, domain.getGenerationId());
 
-    dbCloseLock = new ReentrantReadWriteLock(true);
 
-    Cursor cursor;
-    Transaction txn = null;
-    DatabaseEntry key = new DatabaseEntry();
-    DatabaseEntry data = new DatabaseEntry();
-    OperationStatus status;
-    int distBackToCounterRecord = 0;
+    intializeCounters();
+  }
 
-    // Initialize counter
+  private void intializeCounters()
+  {
     this.counterCurrValue = 1;
-    cursor = db.openCursor(txn, null);
+
+    Cursor cursor = db.openCursor(null, null);
     try
     {
-      status = cursor.getLast(key, data, LockMode.DEFAULT);
+      int distBackToCounterRecord = 0;
+      DatabaseEntry key = new DatabaseEntry();
+      DatabaseEntry data = new DatabaseEntry();
+      OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
       while (status == OperationStatus.SUCCESS)
       {
-        ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
-        if (!ReplicationDB.isaCounter(cn))
+        ChangeNumber cn = toChangeNumber(key.getData());
+        if (isACounterRecord(cn))
         {
-          status = cursor.getPrev(key, data, LockMode.DEFAULT);
-          distBackToCounterRecord++;
-        }
-        else
-        {
-          // counter record
           counterCurrValue = decodeCounterValue(data.getData()) + 1;
           counterTsLimit = cn.getTime();
           break;
         }
+
+        status = cursor.getPrev(key, data, LockMode.DEFAULT);
+        distBackToCounterRecord++;
       }
       counterCurrValue += distBackToCounterRecord;
     }
@@ -165,6 +167,10 @@
     }
   }
 
+  private static ChangeNumber toChangeNumber(byte[] data)
+  {
+    return new ChangeNumber(decodeUTF8(data));
+  }
 
 
   /**
@@ -188,29 +194,11 @@
 
         for (UpdateMsg change : changes)
         {
-          DatabaseEntry key = new ReplicationKey(
-              change.getChangeNumber());
-          DatabaseEntry data = new ReplicationData(change);
+          final DatabaseEntry key =
+              createReplicationKey(change.getChangeNumber());
+          final DatabaseEntry data = new ReplicationData(change);
 
-          if ((counterCurrValue != 0)
-              && (counterCurrValue % counterWindowSize == 0))
-          {
-            // enough changes to generate a counter record - wait for the next
-            // change of time
-            counterTsLimit = change.getChangeNumber().getTime();
-          }
-          if ((counterTsLimit != 0)
-              && (change.getChangeNumber().getTime() != counterTsLimit))
-          {
-            // Write the counter record
-            DatabaseEntry counterKey = new ReplicationKey(
-                new ChangeNumber(change.getChangeNumber().getTime(),
-                    0, 0));
-            DatabaseEntry counterValue =
-              encodeCounterValue(counterCurrValue - 1);
-            db.put(null, counterKey, counterValue);
-            counterTsLimit = 0;
-          }
+          insertCounterRecordIfNeeded(change.getChangeNumber());
           db.put(null, key, data);
           counterCurrValue++;
         }
@@ -226,6 +214,39 @@
     }
   }
 
+  private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
+  {
+    if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
+    {
+      // enough changes to generate a counter record
+      // wait for the next change of time
+      counterTsLimit = changeNumber.getTime();
+    }
+    if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit)
+    {
+      // Write the counter record
+      final ChangeNumber counterRecord = newCounterRecord(changeNumber);
+      DatabaseEntry counterKey = createReplicationKey(counterRecord);
+      DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1);
+      db.put(null, counterKey, counterValue);
+      counterTsLimit = 0;
+    }
+  }
+
+  private DatabaseEntry createReplicationKey(ChangeNumber changeNumber)
+  {
+    DatabaseEntry key = new DatabaseEntry();
+    try
+    {
+      key.setData(changeNumber.toString().getBytes("UTF-8"));
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      // Should never happens, UTF-8 is always supported
+      // TODO : add better logging
+    }
+    return key;
+  }
 
   /**
    * Shutdown the database.
@@ -288,8 +309,7 @@
 
 
 
-  private void closeLockedCursor(Cursor cursor)
-      throws DatabaseException
+  private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException
   {
     try
     {
@@ -308,8 +328,6 @@
   public ChangeNumber readFirstChange()
   {
     Cursor cursor = null;
-    ChangeNumber cn = null;
-
     try
     {
       dbCloseLock.readLock().lock();
@@ -321,49 +339,43 @@
           return null;
         }
 
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
-
         cursor = db.openCursor(null, null);
 
-        OperationStatus status = cursor.getFirst(key, data,
-            LockMode.DEFAULT);
-
-        if (status != OperationStatus.SUCCESS)
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+        LockMode defaultMode = LockMode.DEFAULT;
+        if (cursor.getFirst(key, data, defaultMode) != OperationStatus.SUCCESS)
         {
-          /* database is empty */
+          // database is empty
           return null;
         }
 
-        String str = decodeUTF8(key.getData());
-        cn = new ChangeNumber(str);
-        if (ReplicationDB.isaCounter(cn))
+        final ChangeNumber cn = toChangeNumber(key.getData());
+        if (!isACounterRecord(cn))
         {
-          // First record is a counter record .. go next
-          status = cursor.getNext(key, data, LockMode.DEFAULT);
-          if (status != OperationStatus.SUCCESS)
-          {
-            // DB contains only a counter record
-            return null;
-          }
-          else
-          {
-            cn = new ChangeNumber(decodeUTF8(key.getData()));
-          }
+          return cn;
         }
+
+        // First record is a counter record .. go next
+        if (cursor.getNext(key, data, defaultMode) != OperationStatus.SUCCESS)
+        {
+          // DB contains only a counter record
+          return null;
+        }
+        // There cannot be 2 counter record next to each other,
+        // it is safe to return this record
+        return toChangeNumber(key.getData());
       }
       finally
       {
-        closeLockedCursor(cursor);
+        closeAndReleaseReadLock(cursor);
       }
     }
     catch (DatabaseException e)
     {
-      /* database is faulty */
       replicationServer.handleUnexpectedDatabaseException(e);
-      cn = null;
+      return null;
     }
-    return cn;
   }
 
 
@@ -376,8 +388,6 @@
   public ChangeNumber readLastChange()
   {
     Cursor cursor = null;
-    ChangeNumber cn = null;
-
     try
     {
       dbCloseLock.readLock().lock();
@@ -389,53 +399,45 @@
           return null;
         }
 
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
-
         cursor = db.openCursor(null, null);
 
-        OperationStatus status = cursor.getLast(key, data,
-            LockMode.DEFAULT);
-
-        if (status != OperationStatus.SUCCESS)
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+        LockMode defaultMode = LockMode.DEFAULT;
+        if (cursor.getLast(key, data, defaultMode) != OperationStatus.SUCCESS)
         {
-          /* database is empty */
+          // database is empty
           return null;
         }
 
-        String str = decodeUTF8(key.getData());
-        cn = new ChangeNumber(str);
-        if (ReplicationDB.isaCounter(cn))
+        final ChangeNumber cn = toChangeNumber(key.getData());
+        if (!isACounterRecord(cn))
         {
-          if (cursor.getPrev(key, data, LockMode.DEFAULT)
-              != OperationStatus.SUCCESS)
-          {
-            /*
-             * database only contain a counter record - don't know how much it
-             * can be possible but ...
-             */
-            cn = null;
-          }
-          else
-          {
-            str = decodeUTF8(key.getData());
-            cn = new ChangeNumber(str);
-            // There can't be 2 counter record next to each other
-          }
+          return cn;
         }
+
+        if (cursor.getPrev(key, data, defaultMode) != OperationStatus.SUCCESS)
+        {
+          /*
+           * database only contain a counter record - don't know how much it can
+           * be possible but ...
+           */
+          return null;
+        }
+        // There cannot be 2 counter record next to each other,
+        // it is safe to return this record
+        return toChangeNumber(key.getData());
       }
       finally
       {
-        closeLockedCursor(cursor);
+        closeAndReleaseReadLock(cursor);
       }
     }
     catch (DatabaseException e)
     {
       replicationServer.handleUnexpectedDatabaseException(e);
-      cn = null;
+      return null;
     }
-
-    return cn;
   }
 
   /**
@@ -455,11 +457,6 @@
     }
 
     Cursor cursor = null;
-    ChangeNumber cn = null;
-
-    DatabaseEntry key = new ReplicationKey(changeNumber);
-    DatabaseEntry data = new DatabaseEntry();
-
     try
     {
       dbCloseLock.readLock().lock();
@@ -471,6 +468,8 @@
           return null;
         }
 
+        DatabaseEntry key = createReplicationKey(changeNumber);
+        DatabaseEntry data = new DatabaseEntry();
         cursor = db.openCursor(null, null);
         if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
             == OperationStatus.SUCCESS)
@@ -480,23 +479,7 @@
           if (cursor.getPrev(key, data, LockMode.DEFAULT)
               == OperationStatus.SUCCESS)
           {
-            String str = decodeUTF8(key.getData());
-            cn = new ChangeNumber(str);
-            if (ReplicationDB.isaCounter(cn))
-            {
-              if (cursor.getPrev(key, data, LockMode.DEFAULT)
-                  != OperationStatus.SUCCESS)
-              {
-                // database starts with a counter record.
-                cn = null;
-              }
-              else
-              {
-                str = decodeUTF8(key.getData());
-                cn = new ChangeNumber(str);
-                // There can't be 2 counter record next to each other
-              }
-            }
+            return getRegularRecord(cursor, key, data);
           }
           // else, there was no change previous to our changeNumber.
         }
@@ -507,40 +490,41 @@
           if (cursor.getLast(key, data, LockMode.DEFAULT)
               == OperationStatus.SUCCESS)
           {
-            String str = decodeUTF8(key.getData());
-            cn = new ChangeNumber(str);
-            if (ReplicationDB.isaCounter(cn))
-            {
-              if (cursor.getPrev(key, data, LockMode.DEFAULT)
-                  != OperationStatus.SUCCESS)
-              {
-                /*
-                 * database only contain a counter record, should not be
-                 * possible, but Ok, let's just say no change Number
-                 */
-                cn = null;
-              }
-              else
-              {
-                str = decodeUTF8(key.getData());
-                cn = new ChangeNumber(str);
-                // There can't be 2 counter record next to each other
-              }
-            }
+            return getRegularRecord(cursor, key, data);
           }
         }
       }
       finally
       {
-        closeLockedCursor(cursor);
+        closeAndReleaseReadLock(cursor);
       }
     }
     catch (DatabaseException e)
     {
       replicationServer.handleUnexpectedDatabaseException(e);
-      cn = null;
     }
-    return cn;
+    return null;
+  }
+
+  private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
+      DatabaseEntry data)
+  {
+    final ChangeNumber cn = toChangeNumber(key.getData());
+    if (!isACounterRecord(cn))
+    {
+      return cn;
+    }
+
+    // There cannot be 2 counter record next to each other,
+    // it is safe to return previous record which must exist
+    if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+    {
+      return toChangeNumber(key.getData());
+    }
+
+    // database only contain a counter record, which should not be possible
+    // let's just say no changeNumber
+    return null;
   }
 
 
@@ -587,7 +571,7 @@
     {
       if (startingChangeNumber != null)
       {
-        key = new ReplicationKey(startingChangeNumber);
+        key = createReplicationKey(startingChangeNumber);
       }
       else
       {
@@ -646,7 +630,7 @@
       catch (Exception e)
       {
         // Unlocking is required before throwing any exception
-        closeLockedCursor(localCursor);
+        closeAndReleaseReadLock(localCursor);
         throw e;
       }
     }
@@ -682,7 +666,7 @@
       }
       catch (Exception e)
       {
-        closeLockedCursor(localCursor);
+        closeAndReleaseReadLock(localCursor);
 
         if (localTxn != null)
         {
@@ -714,7 +698,8 @@
         isClosed = true;
       }
 
-      closeLockedCursor(cursor);
+      closeAndReleaseReadLock(cursor);
+
       if (txn != null)
       {
         try
@@ -747,7 +732,7 @@
         isClosed = true;
       }
 
-      closeLockedCursor(cursor);
+      closeAndReleaseReadLock(cursor);
 
       if (txn != null)
       {
@@ -776,13 +761,11 @@
       }
 
       OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
-
       if (status != OperationStatus.SUCCESS)
       {
         return null;
       }
-      String csnString = decodeUTF8(key.getData());
-      return new ChangeNumber(csnString);
+      return toChangeNumber(key.getData());
     }
 
     /**
@@ -815,15 +798,12 @@
         ChangeNumber cn = null;
         try
         {
-          cn = new ChangeNumber(
-              decodeUTF8(key.getData()));
-          if (ReplicationDB.isaCounter(cn))
+          cn = toChangeNumber(key.getData());
+          if (isACounterRecord(cn))
           {
-            // counter record
             continue;
           }
-          currentChange = ReplicationData.generateChange(data
-              .getData());
+          currentChange = ReplicationData.generateChange(data.getData());
         }
         catch (Exception e)
         {
@@ -893,7 +873,7 @@
       dbenv.clearDb(dbName);
 
       // RE-create the db
-      db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
+      db = dbenv.getOrAddDb(serverId, baseDn, -1);
     }
     catch(Exception e)
     {
@@ -916,15 +896,8 @@
    * @return The number of changes between provided start and stop changeNumber.
    * Returns 0 when an error occurs.
    */
-  public int count(ChangeNumber start, ChangeNumber stop)
+  public long count(ChangeNumber start, ChangeNumber stop)
   {
-    int counterRecord1 = 0;
-    int counterRecord2 = 0;
-    int distToCounterRecord1 = 0;
-    int distBackToCounterRecord2 = 0;
-    int count=0;
-    OperationStatus status;
-
     try
     {
       dbCloseLock.readLock().lock();
@@ -937,144 +910,226 @@
         {
           return 0;
         }
+        if (start == null && stop == null)
+        {
+          return db.count();
+        }
 
-        ChangeNumber cn ;
-
-        if ((start==null)&&(stop==null))
-          return (int)db.count();
+        int[] counterValues = new int[2];
+        int[] distanceToCounterRecords = new int[2];
 
         // Step 1 : from the start point, traverse db to the next counter record
         // or to the stop point.
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
         cursor = db.openCursor(null, null);
-        if (start != null)
-        {
-          key = new ReplicationKey(start);
-          status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
-          if (status == OperationStatus.NOTFOUND)
-            status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
-        }
-        else
-        {
-          status = cursor.getNext(key, data, LockMode.DEFAULT);
-        }
-
-        while (status == OperationStatus.SUCCESS)
-        {
-          // test whether the record is a regular change or a counter
-          String csnString = decodeUTF8(key.getData());
-          cn = new ChangeNumber(csnString);
-          if (cn.getServerId() != 0)
-          {
-            // reached a regular change record
-            // test whether we reached the 'stop' target
-            if (!cn.newer(stop))
-            {
-              // let's loop
-              distToCounterRecord1++;
-              status = cursor.getNext(key, data, LockMode.DEFAULT);
-            }
-            else
-            {
-              // reached the end
-              break;
-            }
-          }
-          else
-          {
-            // counter record
-            counterRecord1 = decodeCounterValue(data.getData());
-            break;
-          }
-        }
+        findFirstCounterRecordAfterStartPoint(start, stop, cursor,
+            counterValues, distanceToCounterRecords);
         cursor.close();
 
         // cases
-        //
-        if (counterRecord1==0)
-          return distToCounterRecord1;
+        if (counterValues[START] == 0)
+          return distanceToCounterRecords[START];
 
         // Step 2 : from the stop point, traverse db to the next counter record
         // or to the start point.
-        data = new DatabaseEntry();
-        key = new ReplicationKey(stop);
         cursor = db.openCursor(null, null);
-        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
-        if (status == OperationStatus.SUCCESS)
+        if (!findFirstCounterRecordBeforeStopPoint(start, stop, cursor,
+            counterValues, distanceToCounterRecords))
         {
-          cn = new ChangeNumber(decodeUTF8(key.getData()));
-        }
-        else
-        {
-          key = new DatabaseEntry();
-          data = new DatabaseEntry();
-          status = cursor.getLast(key, data, LockMode.DEFAULT);
-          if (status != OperationStatus.SUCCESS)
-          {
-            /* database is empty */
-            return 0;
-          }
-        }
-        while (status == OperationStatus.SUCCESS)
-        {
-          cn = new ChangeNumber(decodeUTF8(key.getData()));
-          if (!ReplicationDB.isaCounter(cn))
-          {
-            // regular change record
-            if (!cn.older(start))
-            {
-              distBackToCounterRecord2++;
-              status = cursor.getPrev(key, data, LockMode.DEFAULT);
-            }
-            else
-              break;
-          }
-          else
-          {
-            // counter record
-            counterRecord2 = decodeCounterValue(data.getData());
-            break;
-          }
+          // database is empty
+          return 0;
         }
         cursor.close();
 
         // Step 3 : Now consolidates the result
-        if (counterRecord1!=0)
-        {
-          if (counterRecord1 == counterRecord2)
-          {
-            // only one cp between from and to - no need to use it
-            count = distToCounterRecord1 + distBackToCounterRecord2;
-          }
-          else
-          {
-            // 2 cp between from and to
-            count = distToCounterRecord1 + (counterRecord2-counterRecord1)
-            + distBackToCounterRecord2;
-          }
-        }
+        return computeDistance(counterValues, distanceToCounterRecords);
       }
       finally
       {
-        closeLockedCursor(cursor);
+        closeAndReleaseReadLock(cursor);
       }
     }
     catch (DatabaseException e)
     {
       replicationServer.handleUnexpectedDatabaseException(e);
     }
-    return count;
+    return 0;
+  }
+
+
+  private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
+      ChangeNumber stop, Cursor cursor, int[] counterValues,
+      int[] distanceToCounterRecords)
+  {
+    OperationStatus status;
+    DatabaseEntry key;
+    DatabaseEntry data = new DatabaseEntry();
+    if (start != null)
+    {
+      key = createReplicationKey(start);
+      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+      if (status == OperationStatus.NOTFOUND)
+        status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
+    }
+    else
+    {
+      key = new DatabaseEntry();
+      status = cursor.getNext(key, data, LockMode.DEFAULT);
+    }
+
+    while (status == OperationStatus.SUCCESS)
+    {
+      // test whether the record is a regular change or a counter
+      final ChangeNumber cn = toChangeNumber(key.getData());
+      if (isACounterRecord(cn))
+      {
+        // we have found the counter record
+        counterValues[START] = decodeCounterValue(data.getData());
+        break;
+      }
+
+      // reached a regular change record
+      // test whether we reached the 'stop' target
+      if (!cn.newer(stop))
+      {
+        // let's loop
+        distanceToCounterRecords[START]++;
+        status = cursor.getNext(key, data, LockMode.DEFAULT);
+      }
+      else
+      {
+        // reached the end
+        break;
+      }
+    }
+  }
+
+  private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
+      ChangeNumber stop, Cursor cursor, int[] counterValues,
+      int[] distanceToCounterRecords)
+  {
+    DatabaseEntry key = createReplicationKey(stop);
+    DatabaseEntry data = new DatabaseEntry();
+    OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+    if (status != OperationStatus.SUCCESS)
+    {
+      key = new DatabaseEntry();
+      data = new DatabaseEntry();
+      status = cursor.getLast(key, data, LockMode.DEFAULT);
+      if (status != OperationStatus.SUCCESS)
+      {
+        return false;
+      }
+    }
+
+    while (status == OperationStatus.SUCCESS)
+    {
+      final ChangeNumber cn = toChangeNumber(key.getData());
+      if (isACounterRecord(cn))
+      {
+        // we have found the counter record
+        counterValues[STOP] = decodeCounterValue(data.getData());
+        break;
+      }
+
+      // it is a regular change record
+      if (!cn.older(start))
+      {
+        distanceToCounterRecords[STOP]++;
+        status = cursor.getPrev(key, data, LockMode.DEFAULT);
+      }
+      else
+        break;
+    }
+    return true;
   }
 
   /**
-   * Test if a provided changeNumber represents a counter record.
-   * @param cn The provided changeNumber.
-   * @return True if the provided changenumber is a counter.
+   * The diagram below shows a visual description of how the distance between
+   * two change numbers in the database is computed.
+   *
+   * <pre>
+   *     +--------+                        +--------+
+   *     | CASE 1 |                        | CASE 2 |
+   *     +--------+                        +--------+
+   *
+   *             CSN                               CSN
+   *             -----                             -----
+   *   START  => -----                   START  => -----
+   *     ^       -----                     ^       -----
+   *     |       -----                     |       -----
+   *   dist 1    -----                   dist 1    -----
+   *     |       -----                     |       -----
+   *     v       -----                     v       -----
+   *   CR 1&2 => [1000]                   CR 1  => [1000]
+   *     ^       -----                             -----
+   *     |       -----                             -----
+   *   dist 2    -----                             -----
+   *     |       -----                             -----
+   *     v       -----                             -----
+   *   STOP   => -----                             -----
+   *             -----                             -----
+   *     CR   => [2000]                   CR 2  => [2000]
+   *             -----                     ^       -----
+   *                                       |       -----
+   *                                     dist 2    -----
+   *                                       |       -----
+   *                                       v       -----
+   *                                     STOP   => -----
+   * </pre>
+   *
+   * Explanation of the terms used:
+   * <dl>
+   * <dt>START</dt>
+   * <dd>Start change number for the count</dd>
+   * <dt>STOP</dt>
+   * <dd>Stop change number for the count</dd>
+   * <dt>dist</dt>
+   * <dd>Distance from START (or STOP) to the counter record</dd>
+   * <dt>CSN</dt>
+   * <dd>Stands for "Change Sequence Number". Below it, the database is
+   * symbolized, where each record is represented by using dashes "-----". The
+   * database is ordered.</dd>
+   * <dt>CR</dt>
+   * <dd>Stands for "Counter Record". Counter Records are inserted in the
+   * database along with real change numbers, but they are not real changes.
+   * They are only used to speed up calculating the distance between 2 change
+   * numbers without the need to scan the whole database in between.</dd>
+   * </dl>
    */
-  static private boolean isaCounter(ChangeNumber cn)
+  private long computeDistance(int[] counterValues,
+      int[] distanceToCounterRecords)
   {
-    return ((cn.getServerId()== 0) && (cn.getSeqnum()==0));
+    if (counterValues[START] != 0)
+    {
+      if (counterValues[START] == counterValues[STOP])
+      {
+        // only one counter record between from and to - no need to use it
+        return distanceToCounterRecords[START] + distanceToCounterRecords[STOP];
+      }
+      // at least 2 counter records between from and to
+      return distanceToCounterRecords[START]
+          + (counterValues[STOP] - counterValues[START])
+          + distanceToCounterRecords[STOP];
+    }
+    return 0;
+  }
+
+  /**
+   * Whether a provided changeNumber represents a counter record. A counter
+   * record is used to store TODO.
+   *
+   * @param cn
+   *          The changeNumber to test
+   * @return true if the provided changenumber is a counter, false otherwise
+   */
+  private static boolean isACounterRecord(ChangeNumber cn)
+  {
+    return cn.getServerId() == 0 && cn.getSeqnum() == 0;
+  }
+
+  private static ChangeNumber newCounterRecord(ChangeNumber changeNumber)
+  {
+    return new ChangeNumber(changeNumber.getTime(), 0, 0);
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index dc33cc8..3758a13 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -315,25 +315,26 @@
           serverId + " " + baseDn + " " + generationId);
       try
       {
-        String key = serverId + FIELD_SEPARATOR + baseDn;
+        final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn;
 
         // Opens the database for the changes received from this server
         // on this domain. Create it if it does not already exist.
         DatabaseConfig dbConfig = new DatabaseConfig();
         dbConfig.setAllowCreate(true);
         dbConfig.setTransactional(true);
-        Database db = dbEnvironment.openDatabase(null, key, dbConfig);
+        Database db = dbEnvironment.openDatabase(null, serverIdKey, dbConfig);
 
         // Creates the record serverId/domain base Dn in the stateDb
         // if it does not already exist.
-        putInStateDBIfNotExist(key, key);
+        putInStateDBIfNotExist(serverIdKey, serverIdKey);
 
         // Creates the record domain base Dn/ generationId in the stateDb
         // if it does not already exist.
-        key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
-        String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
-                + FIELD_SEPARATOR + baseDn;
-        putInStateDBIfNotExist(key, data);
+        final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
+        final String genIdData = GENERATION_ID_TAG
+            + FIELD_SEPARATOR + generationId
+            + FIELD_SEPARATOR + baseDn;
+        putInStateDBIfNotExist(genIdKey, genIdData);
         return db;
       }
       catch (UnsupportedEncodingException e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index 29c086b..4e7a898 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -43,27 +43,25 @@
   private ReplServerDBCursor cursor = null;
   private DbHandler dbh;
   private ReplicationDB db;
-  ChangeNumber lastNonNullCurrentCN;
+  private ChangeNumber lastNonNullCurrentCN;
 
   /**
    * Creates a new ReplicationIterator.
    * All created iterator must be released by the caller using the
    * releaseCursor() method.
    *
-   * @param id the Identifier of the server on which the iterator applies.
    * @param db The db where the iterator must be created.
    * @param changeNumber The ChangeNumber after which the iterator must start.
-   * @param dbh The associated DbHandler.
+   * @param dbHandler The associated DbHandler.
    * @throws Exception If there is no other change to push after change
    *         with changeNumber number.
    * @throws DatabaseException if a database problem happened.
    */
-  public ReplicationIterator(
-          int id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
-          throws Exception, DatabaseException
+  public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
+      DbHandler dbHandler) throws Exception, DatabaseException
   {
     this.db = db;
-    this.dbh = dbh;
+    this.dbh = dbHandler;
     this.lastNonNullCurrentCN = changeNumber;
 
     try
@@ -79,7 +77,7 @@
     if (cursor == null)
     {
       // flush the queue into the db
-      dbh.flush();
+      dbHandler.flush();
 
       // look again in the db
       cursor = db.openReadCursor(changeNumber);
@@ -173,6 +171,7 @@
    * Release the cursor in case the iterator was badly used and releaseCursor
    * was never called.
    */
+  @Override
   protected void finalize()
   {
     releaseCursor();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationKey.java b/opends/src/server/org/opends/server/replication/server/ReplicationKey.java
deleted file mode 100644
index 2fdbdcb..0000000
--- a/opends/src/server/org/opends/server/replication/server/ReplicationKey.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License").  You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- *      Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions Copyright 2010 ForgeRock AS.
- */
-package org.opends.server.replication.server;
-
-import java.io.UnsupportedEncodingException;
-
-import com.sleepycat.je.DatabaseEntry;
-
-import org.opends.server.replication.common.ChangeNumber;
-
-/**
- * Superclass of DatabaseEntry.
- * Useful to create ReplicationServer keys from ChangeNumbers.
- */
-public class ReplicationKey extends DatabaseEntry
-{
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * Creates a new ReplicationKey from the given ChangeNumber.
-   * @param changeNumber The changeNumber to use.
-   */
-  public ReplicationKey(ChangeNumber changeNumber)
-  {
-    try
-    {
-      this.setData(changeNumber.toString().getBytes("UTF-8"));
-    } catch (UnsupportedEncodingException e)
-    {
-      // Should never happens, UTF-8 is always supported
-      // TODO : add better logging
-    }
-  }
-}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 6d3561d..a554143 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -32,34 +32,9 @@
 import static org.opends.server.util.StaticUtils.*;
 import static org.testng.Assert.*;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.io.*;
+import java.net.*;
+import java.util.*;
 import java.util.logging.ConsoleHandler;
 import java.util.logging.Handler;
 import java.util.logging.LogManager;
@@ -74,22 +49,13 @@
 import org.opends.server.api.Backend;
 import org.opends.server.api.WorkQueue;
 import org.opends.server.backends.MemoryBackend;
-import org.opends.server.backends.jeb.BackendImpl;
-import org.opends.server.backends.jeb.DatabaseContainer;
-import org.opends.server.backends.jeb.EntryContainer;
-import org.opends.server.backends.jeb.Index;
-import org.opends.server.backends.jeb.RootContainer;
+import org.opends.server.backends.jeb.*;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.extensions.ConfigFileHandler;
-import org.opends.server.loggers.AccessLogger;
-import org.opends.server.loggers.ErrorLogger;
-import org.opends.server.loggers.HTTPAccessLogger;
-import org.opends.server.loggers.TextAccessLogPublisher;
-import org.opends.server.loggers.TextErrorLogPublisher;
-import org.opends.server.loggers.TextHTTPAccessLogPublisher;
+import org.opends.server.loggers.*;
 import org.opends.server.loggers.debug.DebugLogger;
 import org.opends.server.loggers.debug.TextDebugLogPublisher;
 import org.opends.server.plugins.InvocationCounterPlugin;
@@ -103,19 +69,8 @@
 import org.opends.server.protocols.ldap.LDAPReader;
 import org.opends.server.tools.LDAPModify;
 import org.opends.server.tools.dsconfig.DSConfig;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeTypeConstants;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryEnvironmentConfig;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
+import org.opends.server.types.*;
 import org.opends.server.types.FilePermission;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.OperatingSystem;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.Schema;
 import org.opends.server.util.BuildVersion;
 import org.opends.server.util.EmbeddedUtils;
 import org.opends.server.util.LDIFReader;
@@ -790,8 +745,7 @@
   private static ServerSocket bindPort(int port)
           throws IOException
   {
-    ServerSocket serverLdapSocket;
-    serverLdapSocket = new ServerSocket();
+    ServerSocket serverLdapSocket = new ServerSocket();
     serverLdapSocket.setReuseAddress(true);
     serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port));
     return serverLdapSocket;
@@ -806,10 +760,48 @@
    */
   public static ServerSocket bindFreePort() throws IOException
   {
-    ServerSocket serverLdapSocket = new ServerSocket();
-    serverLdapSocket.setReuseAddress(true);
-    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
-    return serverLdapSocket;
+    return bindPort(0);
+  }
+
+  /**
+   * Find a free port on the local host.
+   * 
+   * @throws IOException
+   *           in case of underlying exception.
+   * @return the free port number found
+   */
+  public static int findFreePort() throws IOException
+  {
+    return findFreePorts(1)[0];
+  }
+
+  /**
+   * Find nb free ports on the local host.
+   *
+   * @param nb
+   *          the number of free ports to find
+   * @throws IOException
+   *           in case of underlying exception.
+   * @return an array with the free port numbers found
+   */
+  public static int[] findFreePorts(int nb) throws IOException
+  {
+    final ServerSocket[] sockets = new ServerSocket[nb];
+    try
+    {
+      final int[] ports = new int[nb];
+      for (int i = 0; i < nb; i++)
+      {
+        final ServerSocket socket = bindFreePort();
+        sockets[i] = socket;
+        ports[i] = socket.getLocalPort();
+      }
+      return ports;
+    }
+    finally
+    {
+      close(sockets);
+    }
   }
 
   /**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 39e17c1..5da4f99 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -27,10 +27,16 @@
  */
 package org.opends.server.replication.server;
 
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.testng.Assert.*;
+
 import java.io.File;
-import java.net.ServerSocket;
+import java.io.IOException;
 
 import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.config.ConfigException;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.common.ChangeNumber;
@@ -38,18 +44,15 @@
 import org.opends.server.replication.protocol.DeleteMsg;
 import org.testng.annotations.Test;
 
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.testng.Assert.*;
-
 /**
  * Test the dbHandler class
  */
 @SuppressWarnings("javadoc")
 public class DbHandlerTest extends ReplicationTestCase
 {
-  // The tracer object for the debug logger
+  /** The tracer object for the debug logger */
   private static final DebugTracer TRACER = getTracer();
+
   /**
    * Utility - log debug message - highlight it is from the test and not
    * from the server code. Makes easier to observe the test steps.
@@ -69,33 +72,15 @@
     ReplicationServer replicationServer = null;
     ReplicationDbEnv dbEnv = null;
     DbHandler handler = null;
-    ReplicationIterator it = null;
     try
     {
       TestCaseUtils.startServer();
 
-      //  find  a free port for the replicationServer
-      ServerSocket socket = TestCaseUtils.bindFreePort();
-      int changelogPort = socket.getLocalPort();
-      socket.close();
-
-      // configure a ReplicationServer.
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(changelogPort, null, 0,
-        2, 0, 100, null);
-      replicationServer = new ReplicationServer(conf);
+      replicationServer = configureReplicationServer(100);
 
       // create or clean a directory for the dbHandler
-      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
-      String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
-              buildRoot + File.separator + "build");
-      path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
-      testRoot = new File(path);
-      if (testRoot.exists())
-      {
-        TestCaseUtils.deleteDirectory(testRoot);
-      }
-      testRoot.mkdirs();
+      String path = getReplicationDbPath();
+      testRoot = createDirectory(path);
 
       dbEnv = new ReplicationDbEnv(path, replicationServer);
 
@@ -109,18 +94,10 @@
       ChangeNumber changeNumber4 = gen.newChangeNumber();
       ChangeNumber changeNumber5 = gen.newChangeNumber();
 
-      DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
-        "uid");
-      DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
-        "uid");
-      DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
-      "uid");
-      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4,
-      "uid");
-
-      handler.add(update1);
-      handler.add(update2);
-      handler.add(update3);
+      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"));
+      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"));
+      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"));
+      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4, "uid");
 
       //--
       // Iterator tests with memory queue only populated
@@ -128,31 +105,8 @@
       // verify that memory queue is populated
       assertEquals(handler.getQueueSize(),3);
 
-      // Iterator from existing CN
-      it = handler.generateIterator(changeNumber1);
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
-          " Actual change number=" + it.getChange().getChangeNumber() +
-          " Expect change number=" + changeNumber2);
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertFalse(it.next());
-      it.releaseCursor();
-      it=null;
-
-      // Iterator from NON existing CN
-      Exception ec = null;
-      try
-      {
-        it = handler.generateIterator(changeNumber5);
-      }
-      catch(Exception e)
-      {
-        ec = e;
-      }
-      assertNotNull(ec);
-      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+      assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3);
+      assertNotFound(handler, changeNumber5);
 
       //--
       // Iterator tests with db only populated
@@ -161,30 +115,8 @@
       // verify that memory queue is empty (all changes flushed in the db)
       assertEquals(handler.getQueueSize(),0);
 
-      // Test iterator from existing CN
-      it = handler.generateIterator(changeNumber1);
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertFalse(it.next());
-      it.releaseCursor();
-      it=null;
-
-      // Iterator from NON existing CN
-      ec = null;
-      try
-      {
-        it = handler.generateIterator(changeNumber5);
-      }
-      catch(Exception e)
-      {
-        ec = e;
-      }
-      assertNotNull(ec);
-      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+      assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3);
+      assertNotFound(handler, changeNumber5);
 
       // Test first and last
       assertEquals(changeNumber1, handler.getFirstChange());
@@ -198,52 +130,11 @@
       // verify memory queue contains this one
       assertEquals(handler.getQueueSize(),1);
 
-      // Test iterator from existing CN
-      it = handler.generateIterator(changeNumber1);
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertFalse(it.next());
-      assertTrue(it.getChange()==null);
-      it.releaseCursor();
-      it=null;
-
+      assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3, changeNumber4);
       // Test iterator from existing CN at the limit between queue and db
-      it = handler.generateIterator(changeNumber3);
-      assertTrue(it.next());
-      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
-          " Actual change number=" + it.getChange().getChangeNumber());
-      assertFalse(it.next());
-      assertTrue(it.getChange()==null);
-      it.releaseCursor();
-      it=null;
-
-      // Test iterator from existing CN at the limit between queue and db
-      it = handler.generateIterator(changeNumber4);
-      assertFalse(it.next());
-      assertTrue(it.getChange()==null,
-          " Actual change number=" + it.getChange());
-      it.releaseCursor();
-      it=null;
-
-      // Test iterator from NON existing CN
-      ec = null;
-      try
-      {
-        it = handler.generateIterator(changeNumber5);
-      }
-      catch(Exception e)
-      {
-        ec = e;
-      }
-      assertNotNull(ec);
-      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+      assertFoundInOrder(handler, changeNumber3, changeNumber4);
+      assertFoundInOrder(handler, changeNumber4);
+      assertNotFound(handler, changeNumber5);
 
       handler.setPurgeDelay(1);
 
@@ -262,13 +153,9 @@
           purged = true;
         }
       }
+      // FIXME should add an assert here
     } finally
     {
-      if (it != null)
-      {
-        it.releaseCursor();
-        it=null;
-      }
       if (handler != null)
         handler.shutdown();
       if (dbEnv != null)
@@ -280,6 +167,74 @@
     }
   }
 
+  private ReplicationServer configureReplicationServer(int windowSize)
+      throws IOException, ConfigException
+  {
+    final int changelogPort = findFreePort();
+    final ReplicationServerCfg conf =
+        new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, windowSize, null);
+    return new ReplicationServer(conf);
+  }
+
+  private String getReplicationDbPath()
+  {
+    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+    String path =
+        System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
+            + File.separator + "build");
+    return path + File.separator + "unit-tests" + File.separator + "dbHandler";
+  }
+
+  private File createDirectory(String path) throws IOException
+  {
+    File testRoot = new File(path);
+    if (testRoot.exists())
+    {
+      TestCaseUtils.deleteDirectory(testRoot);
+    }
+    testRoot.mkdirs();
+    return testRoot;
+  }
+
+  private ReplicationIterator assertFoundInOrder(DbHandler handler,
+      ChangeNumber... changeNumbers) throws Exception
+  {
+    if (changeNumbers.length == 0)
+    {
+      return null;
+    }
+
+    ReplicationIterator it = handler.generateIterator(changeNumbers[0]);
+    for (int i = 1; i < changeNumbers.length; i++)
+    {
+      assertTrue(it.next());
+      final ChangeNumber cn = it.getChange().getChangeNumber();
+      final boolean equals = cn.compareTo(changeNumbers[i]) == 0;
+      assertTrue(equals, "Actual change number=" + cn
+          + ", Expected change number=" + changeNumbers[i]);
+    }
+    assertFalse(it.next());
+    assertNull(it.getChange(), "Actual change number=" + it.getChange()
+        + ", Expected null");
+
+    it.releaseCursor();
+    return it;
+  }
+
+  private void assertNotFound(DbHandler handler, ChangeNumber changeNumber)
+  {
+    try
+    {
+      ReplicationIterator iter = handler.generateIterator(changeNumber);
+      iter.releaseCursor();
+      fail("Expected exception");
+    }
+    catch (Exception e)
+    {
+      assertEquals(e.getLocalizedMessage(), "ChangeNumber not available");
+    }
+  }
+
   /**
    * Test the feature of clearing a dbHandler used by a replication server.
    * The clear feature is used when a replication server receives a request
@@ -296,28 +251,11 @@
     {
       TestCaseUtils.startServer();
 
-      //  find  a free port for the replicationServer
-      ServerSocket socket = TestCaseUtils.bindFreePort();
-      int changelogPort = socket.getLocalPort();
-      socket.close();
-
-      // configure a ReplicationServer.
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(changelogPort, null, 0,
-        2, 0, 100, null);
-      replicationServer = new ReplicationServer(conf);
+      replicationServer = configureReplicationServer(100);
 
       // create or clean a directory for the dbHandler
-      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
-      String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
-              buildRoot + File.separator + "build");
-      path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
-      testRoot = new File(path);
-      if (testRoot.exists())
-      {
-        TestCaseUtils.deleteDirectory(testRoot);
-      }
-      testRoot.mkdirs();
+      String path = getReplicationDbPath();
+      testRoot = createDirectory(path);
 
       dbEnv = new ReplicationDbEnv(path, replicationServer);
 
@@ -331,17 +269,10 @@
       ChangeNumber changeNumber2 = gen.newChangeNumber();
       ChangeNumber changeNumber3 = gen.newChangeNumber();
 
-      DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
-        "uid");
-      DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
-        "uid");
-      DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
-        "uid");
-
       // Add the changes
-      handler.add(update1);
-      handler.add(update2);
-      handler.add(update3);
+      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"));
+      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"));
+      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"));
 
       // Check they are here
       assertEquals(changeNumber1, handler.getFirstChange());
@@ -366,6 +297,7 @@
         TestCaseUtils.deleteDirectory(testRoot);
     }
   }
+
   /**
    * Test the logic that manages counter records in the DbHandler in order to
    * optimize the counting of record in the replication changelog db.
@@ -416,35 +348,17 @@
     ReplicationServer replicationServer = null;
     ReplicationDbEnv dbEnv = null;
     DbHandler handler = null;
-    ReplicationIterator ri = null;
-    int actualCnt = 0;
+    long actualCnt = 0;
     String testcase;
     try
     {
       TestCaseUtils.startServer();
 
-      //  find  a free port for the replicationServer
-      ServerSocket socket = TestCaseUtils.bindFreePort();
-      int changelogPort = socket.getLocalPort();
-      socket.close();
-
-      // configure a ReplicationServer.
-      ReplServerFakeConfiguration conf =
-        new ReplServerFakeConfiguration(changelogPort, null, 0,
-            2, 0, 100000, null);
-      replicationServer = new ReplicationServer(conf);
+      replicationServer = configureReplicationServer(100000);
 
       // create or clean a directory for the dbHandler
-      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
-      String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
-              buildRoot + File.separator + "build");
-      path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
-      testRoot = new File(path);
-      if (testRoot.exists())
-      {
-        TestCaseUtils.deleteDirectory(testRoot);
-      }
-      testRoot.mkdirs();
+      String path = getReplicationDbPath();
+      testRoot = createDirectory(path);
 
       dbEnv = new ReplicationDbEnv(path, replicationServer);
 
@@ -589,7 +503,7 @@
 
       handler.setPurgeDelay(100);
       sleep(4000);
-      int totalCount = handler.getCount(null, null);
+      long totalCount = handler.getCount(null, null);
       debugInfo(tn,testcase + " After purge, total count=" + totalCount);
 
       testcase="AFTER PURGE (first, last)=";
@@ -619,12 +533,9 @@
       assertEquals(null, handler.getFirstChange());
       assertEquals(null, handler.getLastChange());
       debugInfo(tn,"Success");
-
     }
     finally
     {
-      if (ri!=null)
-        ri.releaseCursor();
       if (handler != null)
         handler.shutdown();
       if (dbEnv != null)
@@ -635,4 +546,5 @@
         TestCaseUtils.deleteDirectory(testRoot);
     }
   }
+
 }

--
Gitblit v1.10.0