mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
08.37.2013 0667279a02d2cc651ebcac518e50b773a4aa9412
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -282,7 +282,7 @@
    {
      flush();
    }
    return new ReplicationIterator(serverId, db, changeNumber, this);
    return new ReplicationIterator(db, changeNumber, this);
  }
  /**
@@ -657,7 +657,7 @@
   * @param to   The upper (newer) change number.
   * @return The computed number of changes.
   */
  public int getCount(ChangeNumber from, ChangeNumber to)
  public long getCount(ChangeNumber from, ChangeNumber to)
  {
    // Now that we always keep the last ChangeNumber in the DB to avoid
    // expiring cookies too quickly, we need to check if the "to"
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -32,6 +32,7 @@
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -50,6 +51,9 @@
 */
public class ReplicationDB
{
  private static final int START = 0;
  private static final int STOP = 1;
  private Database db = null;
  private ReplicationDbEnv dbenv = null;
  private ReplicationServer replicationServer;
@@ -60,7 +64,8 @@
   * The lock used to provide exclusive access to the thread that close the db
   * (shutdown or clear).
   */
  private ReentrantReadWriteLock dbCloseLock;
  private final ReentrantReadWriteLock dbCloseLock =
      new ReentrantReadWriteLock(true);
  // Change counter management
  // The Db itself does not allow to count records between a start and an end
@@ -76,9 +81,9 @@
  // - a counter value : count of changes since previous counter record.
  //
  // A counter record has to follow the order of the db, so it needs to have
  // a changenumber key that follow the order.
  // a changenumber key that follows the order.
  // A counter record must have its own changenumber key since the Db does not
  // support duplicate key (it is a compatibility breaker character of the DB).
  // support duplicate keys (it is a compatibility breaker character of the DB).
  //
  // We define 2 conditions to store a counter record :
  // 1/- at least 'counterWindowSize' changes have been stored in the Db
@@ -88,7 +93,7 @@
  /** Current value of the counter. */
  private int  counterCurrValue = 1;
  private int counterCurrValue = 1;
  /**
   * When not null, the next change with a ts different from
@@ -100,7 +105,7 @@
   * The counter record will never be written to the db more often than each
   * counterWindowSize changes.
   */
  private int  counterWindowSize = 1000;
  private int counterWindowSize = 1000;
 /**
   * Creates a new database or open existing database that will be used
@@ -122,40 +127,37 @@
    this.replicationServer = replicationServer;
    // Get or create the associated ReplicationServerDomain and Db.
    db = dbenv.getOrAddDb(serverId, baseDn,
        replicationServer.getReplicationServerDomain(baseDn,
        true).getGenerationId());
    final ReplicationServerDomain domain =
        replicationServer.getReplicationServerDomain(baseDn, true);
    db = dbenv.getOrAddDb(serverId, baseDn, domain.getGenerationId());
    dbCloseLock = new ReentrantReadWriteLock(true);
    Cursor cursor;
    Transaction txn = null;
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status;
    int distBackToCounterRecord = 0;
    intializeCounters();
  }
    // Initialize counter
  private void intializeCounters()
  {
    this.counterCurrValue = 1;
    cursor = db.openCursor(txn, null);
    Cursor cursor = db.openCursor(null, null);
    try
    {
      status = cursor.getLast(key, data, LockMode.DEFAULT);
      int distBackToCounterRecord = 0;
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
        if (!ReplicationDB.isaCounter(cn))
        ChangeNumber cn = toChangeNumber(key.getData());
        if (isACounterRecord(cn))
        {
          status = cursor.getPrev(key, data, LockMode.DEFAULT);
          distBackToCounterRecord++;
        }
        else
        {
          // counter record
          counterCurrValue = decodeCounterValue(data.getData()) + 1;
          counterTsLimit = cn.getTime();
          break;
        }
        status = cursor.getPrev(key, data, LockMode.DEFAULT);
        distBackToCounterRecord++;
      }
      counterCurrValue += distBackToCounterRecord;
    }
@@ -165,6 +167,10 @@
    }
  }
  private static ChangeNumber toChangeNumber(byte[] data)
  {
    return new ChangeNumber(decodeUTF8(data));
  }
  /**
@@ -188,29 +194,11 @@
        for (UpdateMsg change : changes)
        {
          DatabaseEntry key = new ReplicationKey(
              change.getChangeNumber());
          DatabaseEntry data = new ReplicationData(change);
          final DatabaseEntry key =
              createReplicationKey(change.getChangeNumber());
          final DatabaseEntry data = new ReplicationData(change);
          if ((counterCurrValue != 0)
              && (counterCurrValue % counterWindowSize == 0))
          {
            // enough changes to generate a counter record - wait for the next
            // change of time
            counterTsLimit = change.getChangeNumber().getTime();
          }
          if ((counterTsLimit != 0)
              && (change.getChangeNumber().getTime() != counterTsLimit))
          {
            // Write the counter record
            DatabaseEntry counterKey = new ReplicationKey(
                new ChangeNumber(change.getChangeNumber().getTime(),
                    0, 0));
            DatabaseEntry counterValue =
              encodeCounterValue(counterCurrValue - 1);
            db.put(null, counterKey, counterValue);
            counterTsLimit = 0;
          }
          insertCounterRecordIfNeeded(change.getChangeNumber());
          db.put(null, key, data);
          counterCurrValue++;
        }
@@ -226,6 +214,39 @@
    }
  }
  private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
  {
    if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
    {
      // enough changes to generate a counter record
      // wait for the next change of time
      counterTsLimit = changeNumber.getTime();
    }
    if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit)
    {
      // Write the counter record
      final ChangeNumber counterRecord = newCounterRecord(changeNumber);
      DatabaseEntry counterKey = createReplicationKey(counterRecord);
      DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1);
      db.put(null, counterKey, counterValue);
      counterTsLimit = 0;
    }
  }
  private DatabaseEntry createReplicationKey(ChangeNumber changeNumber)
  {
    DatabaseEntry key = new DatabaseEntry();
    try
    {
      key.setData(changeNumber.toString().getBytes("UTF-8"));
    }
    catch (UnsupportedEncodingException e)
    {
      // Should never happens, UTF-8 is always supported
      // TODO : add better logging
    }
    return key;
  }
  /**
   * Shutdown the database.
@@ -288,8 +309,7 @@
  private void closeLockedCursor(Cursor cursor)
      throws DatabaseException
  private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException
  {
    try
    {
@@ -308,8 +328,6 @@
  public ChangeNumber readFirstChange()
  {
    Cursor cursor = null;
    ChangeNumber cn = null;
    try
    {
      dbCloseLock.readLock().lock();
@@ -321,49 +339,43 @@
          return null;
        }
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        OperationStatus status = cursor.getFirst(key, data,
            LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode defaultMode = LockMode.DEFAULT;
        if (cursor.getFirst(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          /* database is empty */
          // database is empty
          return null;
        }
        String str = decodeUTF8(key.getData());
        cn = new ChangeNumber(str);
        if (ReplicationDB.isaCounter(cn))
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (!isACounterRecord(cn))
        {
          // First record is a counter record .. go next
          status = cursor.getNext(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          {
            // DB contains only a counter record
            return null;
          }
          else
          {
            cn = new ChangeNumber(decodeUTF8(key.getData()));
          }
          return cn;
        }
        // First record is a counter record .. go next
        if (cursor.getNext(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          // DB contains only a counter record
          return null;
        }
        // There cannot be 2 counter record next to each other,
        // it is safe to return this record
        return toChangeNumber(key.getData());
      }
      finally
      {
        closeLockedCursor(cursor);
        closeAndReleaseReadLock(cursor);
      }
    }
    catch (DatabaseException e)
    {
      /* database is faulty */
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
      return null;
    }
    return cn;
  }
@@ -376,8 +388,6 @@
  public ChangeNumber readLastChange()
  {
    Cursor cursor = null;
    ChangeNumber cn = null;
    try
    {
      dbCloseLock.readLock().lock();
@@ -389,53 +399,45 @@
          return null;
        }
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        OperationStatus status = cursor.getLast(key, data,
            LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode defaultMode = LockMode.DEFAULT;
        if (cursor.getLast(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          /* database is empty */
          // database is empty
          return null;
        }
        String str = decodeUTF8(key.getData());
        cn = new ChangeNumber(str);
        if (ReplicationDB.isaCounter(cn))
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (!isACounterRecord(cn))
        {
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              != OperationStatus.SUCCESS)
          {
            /*
             * database only contain a counter record - don't know how much it
             * can be possible but ...
             */
            cn = null;
          }
          else
          {
            str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            // There can't be 2 counter record next to each other
          }
          return cn;
        }
        if (cursor.getPrev(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
           */
          return null;
        }
        // There cannot be 2 counter record next to each other,
        // it is safe to return this record
        return toChangeNumber(key.getData());
      }
      finally
      {
        closeLockedCursor(cursor);
        closeAndReleaseReadLock(cursor);
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
      return null;
    }
    return cn;
  }
  /**
@@ -455,11 +457,6 @@
    }
    Cursor cursor = null;
    ChangeNumber cn = null;
    DatabaseEntry key = new ReplicationKey(changeNumber);
    DatabaseEntry data = new DatabaseEntry();
    try
    {
      dbCloseLock.readLock().lock();
@@ -471,6 +468,8 @@
          return null;
        }
        DatabaseEntry key = createReplicationKey(changeNumber);
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
            == OperationStatus.SUCCESS)
@@ -480,23 +479,7 @@
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            String str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            if (ReplicationDB.isaCounter(cn))
            {
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
              {
                // database starts with a counter record.
                cn = null;
              }
              else
              {
                str = decodeUTF8(key.getData());
                cn = new ChangeNumber(str);
                // There can't be 2 counter record next to each other
              }
            }
            return getRegularRecord(cursor, key, data);
          }
          // else, there was no change previous to our changeNumber.
        }
@@ -507,40 +490,41 @@
          if (cursor.getLast(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            String str = decodeUTF8(key.getData());
            cn = new ChangeNumber(str);
            if (ReplicationDB.isaCounter(cn))
            {
              if (cursor.getPrev(key, data, LockMode.DEFAULT)
                  != OperationStatus.SUCCESS)
              {
                /*
                 * database only contain a counter record, should not be
                 * possible, but Ok, let's just say no change Number
                 */
                cn = null;
              }
              else
              {
                str = decodeUTF8(key.getData());
                cn = new ChangeNumber(str);
                // There can't be 2 counter record next to each other
              }
            }
            return getRegularRecord(cursor, key, data);
          }
        }
      }
      finally
      {
        closeLockedCursor(cursor);
        closeAndReleaseReadLock(cursor);
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      cn = null;
    }
    return cn;
    return null;
  }
  private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
      DatabaseEntry data)
  {
    final ChangeNumber cn = toChangeNumber(key.getData());
    if (!isACounterRecord(cn))
    {
      return cn;
    }
    // There cannot be 2 counter record next to each other,
    // it is safe to return previous record which must exist
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS)
    {
      return toChangeNumber(key.getData());
    }
    // database only contain a counter record, which should not be possible
    // let's just say no changeNumber
    return null;
  }
@@ -587,7 +571,7 @@
    {
      if (startingChangeNumber != null)
      {
        key = new ReplicationKey(startingChangeNumber);
        key = createReplicationKey(startingChangeNumber);
      }
      else
      {
@@ -646,7 +630,7 @@
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        closeLockedCursor(localCursor);
        closeAndReleaseReadLock(localCursor);
        throw e;
      }
    }
@@ -682,7 +666,7 @@
      }
      catch (Exception e)
      {
        closeLockedCursor(localCursor);
        closeAndReleaseReadLock(localCursor);
        if (localTxn != null)
        {
@@ -714,7 +698,8 @@
        isClosed = true;
      }
      closeLockedCursor(cursor);
      closeAndReleaseReadLock(cursor);
      if (txn != null)
      {
        try
@@ -747,7 +732,7 @@
        isClosed = true;
      }
      closeLockedCursor(cursor);
      closeAndReleaseReadLock(cursor);
      if (txn != null)
      {
@@ -776,13 +761,11 @@
      }
      OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        return null;
      }
      String csnString = decodeUTF8(key.getData());
      return new ChangeNumber(csnString);
      return toChangeNumber(key.getData());
    }
    /**
@@ -815,15 +798,12 @@
        ChangeNumber cn = null;
        try
        {
          cn = new ChangeNumber(
              decodeUTF8(key.getData()));
          if (ReplicationDB.isaCounter(cn))
          cn = toChangeNumber(key.getData());
          if (isACounterRecord(cn))
          {
            // counter record
            continue;
          }
          currentChange = ReplicationData.generateChange(data
              .getData());
          currentChange = ReplicationData.generateChange(data.getData());
        }
        catch (Exception e)
        {
@@ -893,7 +873,7 @@
      dbenv.clearDb(dbName);
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
      db = dbenv.getOrAddDb(serverId, baseDn, -1);
    }
    catch(Exception e)
    {
@@ -916,15 +896,8 @@
   * @return The number of changes between provided start and stop changeNumber.
   * Returns 0 when an error occurs.
   */
  public int count(ChangeNumber start, ChangeNumber stop)
  public long count(ChangeNumber start, ChangeNumber stop)
  {
    int counterRecord1 = 0;
    int counterRecord2 = 0;
    int distToCounterRecord1 = 0;
    int distBackToCounterRecord2 = 0;
    int count=0;
    OperationStatus status;
    try
    {
      dbCloseLock.readLock().lock();
@@ -937,144 +910,226 @@
        {
          return 0;
        }
        if (start == null && stop == null)
        {
          return db.count();
        }
        ChangeNumber cn ;
        if ((start==null)&&(stop==null))
          return (int)db.count();
        int[] counterValues = new int[2];
        int[] distanceToCounterRecords = new int[2];
        // Step 1 : from the start point, traverse db to the next counter record
        // or to the stop point.
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        if (start != null)
        {
          key = new ReplicationKey(start);
          status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
          if (status == OperationStatus.NOTFOUND)
            status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
        }
        else
        {
          status = cursor.getNext(key, data, LockMode.DEFAULT);
        }
        while (status == OperationStatus.SUCCESS)
        {
          // test whether the record is a regular change or a counter
          String csnString = decodeUTF8(key.getData());
          cn = new ChangeNumber(csnString);
          if (cn.getServerId() != 0)
          {
            // reached a regular change record
            // test whether we reached the 'stop' target
            if (!cn.newer(stop))
            {
              // let's loop
              distToCounterRecord1++;
              status = cursor.getNext(key, data, LockMode.DEFAULT);
            }
            else
            {
              // reached the end
              break;
            }
          }
          else
          {
            // counter record
            counterRecord1 = decodeCounterValue(data.getData());
            break;
          }
        }
        findFirstCounterRecordAfterStartPoint(start, stop, cursor,
            counterValues, distanceToCounterRecords);
        cursor.close();
        // cases
        //
        if (counterRecord1==0)
          return distToCounterRecord1;
        if (counterValues[START] == 0)
          return distanceToCounterRecords[START];
        // Step 2 : from the stop point, traverse db to the next counter record
        // or to the start point.
        data = new DatabaseEntry();
        key = new ReplicationKey(stop);
        cursor = db.openCursor(null, null);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.SUCCESS)
        if (!findFirstCounterRecordBeforeStopPoint(start, stop, cursor,
            counterValues, distanceToCounterRecords))
        {
          cn = new ChangeNumber(decodeUTF8(key.getData()));
        }
        else
        {
          key = new DatabaseEntry();
          data = new DatabaseEntry();
          status = cursor.getLast(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          {
            /* database is empty */
            return 0;
          }
        }
        while (status == OperationStatus.SUCCESS)
        {
          cn = new ChangeNumber(decodeUTF8(key.getData()));
          if (!ReplicationDB.isaCounter(cn))
          {
            // regular change record
            if (!cn.older(start))
            {
              distBackToCounterRecord2++;
              status = cursor.getPrev(key, data, LockMode.DEFAULT);
            }
            else
              break;
          }
          else
          {
            // counter record
            counterRecord2 = decodeCounterValue(data.getData());
            break;
          }
          // database is empty
          return 0;
        }
        cursor.close();
        // Step 3 : Now consolidates the result
        if (counterRecord1!=0)
        {
          if (counterRecord1 == counterRecord2)
          {
            // only one cp between from and to - no need to use it
            count = distToCounterRecord1 + distBackToCounterRecord2;
          }
          else
          {
            // 2 cp between from and to
            count = distToCounterRecord1 + (counterRecord2-counterRecord1)
            + distBackToCounterRecord2;
          }
        }
        return computeDistance(counterValues, distanceToCounterRecords);
      }
      finally
      {
        closeLockedCursor(cursor);
        closeAndReleaseReadLock(cursor);
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    return count;
    return 0;
  }
  private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
      ChangeNumber stop, Cursor cursor, int[] counterValues,
      int[] distanceToCounterRecords)
  {
    OperationStatus status;
    DatabaseEntry key;
    DatabaseEntry data = new DatabaseEntry();
    if (start != null)
    {
      key = createReplicationKey(start);
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.NOTFOUND)
        status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
    }
    else
    {
      key = new DatabaseEntry();
      status = cursor.getNext(key, data, LockMode.DEFAULT);
    }
    while (status == OperationStatus.SUCCESS)
    {
      // test whether the record is a regular change or a counter
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (isACounterRecord(cn))
      {
        // we have found the counter record
        counterValues[START] = decodeCounterValue(data.getData());
        break;
      }
      // reached a regular change record
      // test whether we reached the 'stop' target
      if (!cn.newer(stop))
      {
        // let's loop
        distanceToCounterRecords[START]++;
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      else
      {
        // reached the end
        break;
      }
    }
  }
  private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
      ChangeNumber stop, Cursor cursor, int[] counterValues,
      int[] distanceToCounterRecords)
  {
    DatabaseEntry key = createReplicationKey(stop);
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
    if (status != OperationStatus.SUCCESS)
    {
      key = new DatabaseEntry();
      data = new DatabaseEntry();
      status = cursor.getLast(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        return false;
      }
    }
    while (status == OperationStatus.SUCCESS)
    {
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (isACounterRecord(cn))
      {
        // we have found the counter record
        counterValues[STOP] = decodeCounterValue(data.getData());
        break;
      }
      // it is a regular change record
      if (!cn.older(start))
      {
        distanceToCounterRecords[STOP]++;
        status = cursor.getPrev(key, data, LockMode.DEFAULT);
      }
      else
        break;
    }
    return true;
  }
  /**
   * Test if a provided changeNumber represents a counter record.
   * @param cn The provided changeNumber.
   * @return True if the provided changenumber is a counter.
   * The diagram below shows a visual description of how the distance between
   * two change numbers in the database is computed.
   *
   * <pre>
   *     +--------+                        +--------+
   *     | CASE 1 |                        | CASE 2 |
   *     +--------+                        +--------+
   *
   *             CSN                               CSN
   *             -----                             -----
   *   START  => -----                   START  => -----
   *     ^       -----                     ^       -----
   *     |       -----                     |       -----
   *   dist 1    -----                   dist 1    -----
   *     |       -----                     |       -----
   *     v       -----                     v       -----
   *   CR 1&2 => [1000]                   CR 1  => [1000]
   *     ^       -----                             -----
   *     |       -----                             -----
   *   dist 2    -----                             -----
   *     |       -----                             -----
   *     v       -----                             -----
   *   STOP   => -----                             -----
   *             -----                             -----
   *     CR   => [2000]                   CR 2  => [2000]
   *             -----                     ^       -----
   *                                       |       -----
   *                                     dist 2    -----
   *                                       |       -----
   *                                       v       -----
   *                                     STOP   => -----
   * </pre>
   *
   * Explanation of the terms used:
   * <dl>
   * <dt>START</dt>
   * <dd>Start change number for the count</dd>
   * <dt>STOP</dt>
   * <dd>Stop change number for the count</dd>
   * <dt>dist</dt>
   * <dd>Distance from START (or STOP) to the counter record</dd>
   * <dt>CSN</dt>
   * <dd>Stands for "Change Sequence Number". Below it, the database is
   * symbolized, where each record is represented by using dashes "-----". The
   * database is ordered.</dd>
   * <dt>CR</dt>
   * <dd>Stands for "Counter Record". Counter Records are inserted in the
   * database along with real change numbers, but they are not real changes.
   * They are only used to speed up calculating the distance between 2 change
   * numbers without the need to scan the whole database in between.</dd>
   * </dl>
   */
  static private boolean isaCounter(ChangeNumber cn)
  private long computeDistance(int[] counterValues,
      int[] distanceToCounterRecords)
  {
    return ((cn.getServerId()== 0) && (cn.getSeqnum()==0));
    if (counterValues[START] != 0)
    {
      if (counterValues[START] == counterValues[STOP])
      {
        // only one counter record between from and to - no need to use it
        return distanceToCounterRecords[START] + distanceToCounterRecords[STOP];
      }
      // at least 2 counter records between from and to
      return distanceToCounterRecords[START]
          + (counterValues[STOP] - counterValues[START])
          + distanceToCounterRecords[STOP];
    }
    return 0;
  }
  /**
   * Whether a provided changeNumber represents a counter record. A counter
   * record is used to store TODO.
   *
   * @param cn
   *          The changeNumber to test
   * @return true if the provided changenumber is a counter, false otherwise
   */
  private static boolean isACounterRecord(ChangeNumber cn)
  {
    return cn.getServerId() == 0 && cn.getSeqnum() == 0;
  }
  private static ChangeNumber newCounterRecord(ChangeNumber changeNumber)
  {
    return new ChangeNumber(changeNumber.getTime(), 0, 0);
  }
  /**
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -315,25 +315,26 @@
          serverId + " " + baseDn + " " + generationId);
      try
      {
        String key = serverId + FIELD_SEPARATOR + baseDn;
        final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn;
        // Opens the database for the changes received from this server
        // on this domain. Create it if it does not already exist.
        DatabaseConfig dbConfig = new DatabaseConfig();
        dbConfig.setAllowCreate(true);
        dbConfig.setTransactional(true);
        Database db = dbEnvironment.openDatabase(null, key, dbConfig);
        Database db = dbEnvironment.openDatabase(null, serverIdKey, dbConfig);
        // Creates the record serverId/domain base Dn in the stateDb
        // if it does not already exist.
        putInStateDBIfNotExist(key, key);
        putInStateDBIfNotExist(serverIdKey, serverIdKey);
        // Creates the record domain base Dn/ generationId in the stateDb
        // if it does not already exist.
        key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
                + FIELD_SEPARATOR + baseDn;
        putInStateDBIfNotExist(key, data);
        final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
        final String genIdData = GENERATION_ID_TAG
            + FIELD_SEPARATOR + generationId
            + FIELD_SEPARATOR + baseDn;
        putInStateDBIfNotExist(genIdKey, genIdData);
        return db;
      }
      catch (UnsupportedEncodingException e)
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -43,27 +43,25 @@
  private ReplServerDBCursor cursor = null;
  private DbHandler dbh;
  private ReplicationDB db;
  ChangeNumber lastNonNullCurrentCN;
  private ChangeNumber lastNonNullCurrentCN;
  /**
   * Creates a new ReplicationIterator.
   * All created iterator must be released by the caller using the
   * releaseCursor() method.
   *
   * @param id the Identifier of the server on which the iterator applies.
   * @param db The db where the iterator must be created.
   * @param changeNumber The ChangeNumber after which the iterator must start.
   * @param dbh The associated DbHandler.
   * @param dbHandler The associated DbHandler.
   * @throws Exception If there is no other change to push after change
   *         with changeNumber number.
   * @throws DatabaseException if a database problem happened.
   */
  public ReplicationIterator(
          int id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
          throws Exception, DatabaseException
  public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
      DbHandler dbHandler) throws Exception, DatabaseException
  {
    this.db = db;
    this.dbh = dbh;
    this.dbh = dbHandler;
    this.lastNonNullCurrentCN = changeNumber;
    try
@@ -79,7 +77,7 @@
    if (cursor == null)
    {
      // flush the queue into the db
      dbh.flush();
      dbHandler.flush();
      // look again in the db
      cursor = db.openReadCursor(changeNumber);
@@ -173,6 +171,7 @@
   * Release the cursor in case the iterator was badly used and releaseCursor
   * was never called.
   */
  @Override
  protected void finalize()
  {
    releaseCursor();
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationKey.java
File was deleted
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -32,34 +32,9 @@
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.LogManager;
@@ -74,22 +49,13 @@
import org.opends.server.api.Backend;
import org.opends.server.api.WorkQueue;
import org.opends.server.backends.MemoryBackend;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.jeb.DatabaseContainer;
import org.opends.server.backends.jeb.EntryContainer;
import org.opends.server.backends.jeb.Index;
import org.opends.server.backends.jeb.RootContainer;
import org.opends.server.backends.jeb.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.extensions.ConfigFileHandler;
import org.opends.server.loggers.AccessLogger;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.HTTPAccessLogger;
import org.opends.server.loggers.TextAccessLogPublisher;
import org.opends.server.loggers.TextErrorLogPublisher;
import org.opends.server.loggers.TextHTTPAccessLogPublisher;
import org.opends.server.loggers.*;
import org.opends.server.loggers.debug.DebugLogger;
import org.opends.server.loggers.debug.TextDebugLogPublisher;
import org.opends.server.plugins.InvocationCounterPlugin;
@@ -103,19 +69,8 @@
import org.opends.server.protocols.ldap.LDAPReader;
import org.opends.server.tools.LDAPModify;
import org.opends.server.tools.dsconfig.DSConfig;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeTypeConstants;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryEnvironmentConfig;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.*;
import org.opends.server.types.FilePermission;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.OperatingSystem;
import org.opends.server.types.ResultCode;
import org.opends.server.types.Schema;
import org.opends.server.util.BuildVersion;
import org.opends.server.util.EmbeddedUtils;
import org.opends.server.util.LDIFReader;
@@ -790,8 +745,7 @@
  private static ServerSocket bindPort(int port)
          throws IOException
  {
    ServerSocket serverLdapSocket;
    serverLdapSocket = new ServerSocket();
    ServerSocket serverLdapSocket = new ServerSocket();
    serverLdapSocket.setReuseAddress(true);
    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port));
    return serverLdapSocket;
@@ -806,10 +760,48 @@
   */
  public static ServerSocket bindFreePort() throws IOException
  {
    ServerSocket serverLdapSocket = new ServerSocket();
    serverLdapSocket.setReuseAddress(true);
    serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
    return serverLdapSocket;
    return bindPort(0);
  }
  /**
   * Find a free port on the local host.
   *
   * @throws IOException
   *           in case of underlying exception.
   * @return the free port number found
   */
  public static int findFreePort() throws IOException
  {
    return findFreePorts(1)[0];
  }
  /**
   * Find nb free ports on the local host.
   *
   * @param nb
   *          the number of free ports to find
   * @throws IOException
   *           in case of underlying exception.
   * @return an array with the free port numbers found
   */
  public static int[] findFreePorts(int nb) throws IOException
  {
    final ServerSocket[] sockets = new ServerSocket[nb];
    try
    {
      final int[] ports = new int[nb];
      for (int i = 0; i < nb; i++)
      {
        final ServerSocket socket = bindFreePort();
        sockets[i] = socket;
        ports[i] = socket.getLocalPort();
      }
      return ports;
    }
    finally
    {
      close(sockets);
    }
  }
  /**
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -27,10 +27,16 @@
 */
package org.opends.server.replication.server;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
import java.io.File;
import java.net.ServerSocket;
import java.io.IOException;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
@@ -38,18 +44,15 @@
import org.opends.server.replication.protocol.DeleteMsg;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
/**
 * Test the dbHandler class
 */
@SuppressWarnings("javadoc")
public class DbHandlerTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Utility - log debug message - highlight it is from the test and not
   * from the server code. Makes easier to observe the test steps.
@@ -69,33 +72,15 @@
    ReplicationServer replicationServer = null;
    ReplicationDbEnv dbEnv = null;
    DbHandler handler = null;
    ReplicationIterator it = null;
    try
    {
      TestCaseUtils.startServer();
      //  find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int changelogPort = socket.getLocalPort();
      socket.close();
      // configure a ReplicationServer.
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(changelogPort, null, 0,
        2, 0, 100, null);
      replicationServer = new ReplicationServer(conf);
      replicationServer = configureReplicationServer(100);
      // create or clean a directory for the dbHandler
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
              buildRoot + File.separator + "build");
      path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
      testRoot = new File(path);
      if (testRoot.exists())
      {
        TestCaseUtils.deleteDirectory(testRoot);
      }
      testRoot.mkdirs();
      String path = getReplicationDbPath();
      testRoot = createDirectory(path);
      dbEnv = new ReplicationDbEnv(path, replicationServer);
@@ -109,18 +94,10 @@
      ChangeNumber changeNumber4 = gen.newChangeNumber();
      ChangeNumber changeNumber5 = gen.newChangeNumber();
      DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
        "uid");
      DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
        "uid");
      DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
      "uid");
      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4,
      "uid");
      handler.add(update1);
      handler.add(update2);
      handler.add(update3);
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"));
      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4, "uid");
      //--
      // Iterator tests with memory queue only populated
@@ -128,31 +105,8 @@
      // verify that memory queue is populated
      assertEquals(handler.getQueueSize(),3);
      // Iterator from existing CN
      it = handler.generateIterator(changeNumber1);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
          " Actual change number=" + it.getChange().getChangeNumber() +
          " Expect change number=" + changeNumber2);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      it.releaseCursor();
      it=null;
      // Iterator from NON existing CN
      Exception ec = null;
      try
      {
        it = handler.generateIterator(changeNumber5);
      }
      catch(Exception e)
      {
        ec = e;
      }
      assertNotNull(ec);
      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
      assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3);
      assertNotFound(handler, changeNumber5);
      //--
      // Iterator tests with db only populated
@@ -161,30 +115,8 @@
      // verify that memory queue is empty (all changes flushed in the db)
      assertEquals(handler.getQueueSize(),0);
      // Test iterator from existing CN
      it = handler.generateIterator(changeNumber1);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      it.releaseCursor();
      it=null;
      // Iterator from NON existing CN
      ec = null;
      try
      {
        it = handler.generateIterator(changeNumber5);
      }
      catch(Exception e)
      {
        ec = e;
      }
      assertNotNull(ec);
      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
      assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3);
      assertNotFound(handler, changeNumber5);
      // Test first and last
      assertEquals(changeNumber1, handler.getFirstChange());
@@ -198,52 +130,11 @@
      // verify memory queue contains this one
      assertEquals(handler.getQueueSize(),1);
      // Test iterator from existing CN
      it = handler.generateIterator(changeNumber1);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      assertTrue(it.getChange()==null);
      it.releaseCursor();
      it=null;
      assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3, changeNumber4);
      // Test iterator from existing CN at the limit between queue and db
      it = handler.generateIterator(changeNumber3);
      assertTrue(it.next());
      assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
          " Actual change number=" + it.getChange().getChangeNumber());
      assertFalse(it.next());
      assertTrue(it.getChange()==null);
      it.releaseCursor();
      it=null;
      // Test iterator from existing CN at the limit between queue and db
      it = handler.generateIterator(changeNumber4);
      assertFalse(it.next());
      assertTrue(it.getChange()==null,
          " Actual change number=" + it.getChange());
      it.releaseCursor();
      it=null;
      // Test iterator from NON existing CN
      ec = null;
      try
      {
        it = handler.generateIterator(changeNumber5);
      }
      catch(Exception e)
      {
        ec = e;
      }
      assertNotNull(ec);
      assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
      assertFoundInOrder(handler, changeNumber3, changeNumber4);
      assertFoundInOrder(handler, changeNumber4);
      assertNotFound(handler, changeNumber5);
      handler.setPurgeDelay(1);
@@ -262,13 +153,9 @@
          purged = true;
        }
      }
      // FIXME should add an assert here
    } finally
    {
      if (it != null)
      {
        it.releaseCursor();
        it=null;
      }
      if (handler != null)
        handler.shutdown();
      if (dbEnv != null)
@@ -280,6 +167,74 @@
    }
  }
  private ReplicationServer configureReplicationServer(int windowSize)
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf =
        new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, windowSize, null);
    return new ReplicationServer(conf);
  }
  private String getReplicationDbPath()
  {
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path =
        System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    return path + File.separator + "unit-tests" + File.separator + "dbHandler";
  }
  private File createDirectory(String path) throws IOException
  {
    File testRoot = new File(path);
    if (testRoot.exists())
    {
      TestCaseUtils.deleteDirectory(testRoot);
    }
    testRoot.mkdirs();
    return testRoot;
  }
  private ReplicationIterator assertFoundInOrder(DbHandler handler,
      ChangeNumber... changeNumbers) throws Exception
  {
    if (changeNumbers.length == 0)
    {
      return null;
    }
    ReplicationIterator it = handler.generateIterator(changeNumbers[0]);
    for (int i = 1; i < changeNumbers.length; i++)
    {
      assertTrue(it.next());
      final ChangeNumber cn = it.getChange().getChangeNumber();
      final boolean equals = cn.compareTo(changeNumbers[i]) == 0;
      assertTrue(equals, "Actual change number=" + cn
          + ", Expected change number=" + changeNumbers[i]);
    }
    assertFalse(it.next());
    assertNull(it.getChange(), "Actual change number=" + it.getChange()
        + ", Expected null");
    it.releaseCursor();
    return it;
  }
  private void assertNotFound(DbHandler handler, ChangeNumber changeNumber)
  {
    try
    {
      ReplicationIterator iter = handler.generateIterator(changeNumber);
      iter.releaseCursor();
      fail("Expected exception");
    }
    catch (Exception e)
    {
      assertEquals(e.getLocalizedMessage(), "ChangeNumber not available");
    }
  }
  /**
   * Test the feature of clearing a dbHandler used by a replication server.
   * The clear feature is used when a replication server receives a request
@@ -296,28 +251,11 @@
    {
      TestCaseUtils.startServer();
      //  find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int changelogPort = socket.getLocalPort();
      socket.close();
      // configure a ReplicationServer.
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(changelogPort, null, 0,
        2, 0, 100, null);
      replicationServer = new ReplicationServer(conf);
      replicationServer = configureReplicationServer(100);
      // create or clean a directory for the dbHandler
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
              buildRoot + File.separator + "build");
      path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
      testRoot = new File(path);
      if (testRoot.exists())
      {
        TestCaseUtils.deleteDirectory(testRoot);
      }
      testRoot.mkdirs();
      String path = getReplicationDbPath();
      testRoot = createDirectory(path);
      dbEnv = new ReplicationDbEnv(path, replicationServer);
@@ -331,17 +269,10 @@
      ChangeNumber changeNumber2 = gen.newChangeNumber();
      ChangeNumber changeNumber3 = gen.newChangeNumber();
      DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
        "uid");
      DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
        "uid");
      DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
        "uid");
      // Add the changes
      handler.add(update1);
      handler.add(update2);
      handler.add(update3);
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"));
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"));
      // Check they are here
      assertEquals(changeNumber1, handler.getFirstChange());
@@ -366,6 +297,7 @@
        TestCaseUtils.deleteDirectory(testRoot);
    }
  }
  /**
   * Test the logic that manages counter records in the DbHandler in order to
   * optimize the counting of record in the replication changelog db.
@@ -416,35 +348,17 @@
    ReplicationServer replicationServer = null;
    ReplicationDbEnv dbEnv = null;
    DbHandler handler = null;
    ReplicationIterator ri = null;
    int actualCnt = 0;
    long actualCnt = 0;
    String testcase;
    try
    {
      TestCaseUtils.startServer();
      //  find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int changelogPort = socket.getLocalPort();
      socket.close();
      // configure a ReplicationServer.
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(changelogPort, null, 0,
            2, 0, 100000, null);
      replicationServer = new ReplicationServer(conf);
      replicationServer = configureReplicationServer(100000);
      // create or clean a directory for the dbHandler
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
              buildRoot + File.separator + "build");
      path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
      testRoot = new File(path);
      if (testRoot.exists())
      {
        TestCaseUtils.deleteDirectory(testRoot);
      }
      testRoot.mkdirs();
      String path = getReplicationDbPath();
      testRoot = createDirectory(path);
      dbEnv = new ReplicationDbEnv(path, replicationServer);
@@ -589,7 +503,7 @@
      handler.setPurgeDelay(100);
      sleep(4000);
      int totalCount = handler.getCount(null, null);
      long totalCount = handler.getCount(null, null);
      debugInfo(tn,testcase + " After purge, total count=" + totalCount);
      testcase="AFTER PURGE (first, last)=";
@@ -619,12 +533,9 @@
      assertEquals(null, handler.getFirstChange());
      assertEquals(null, handler.getLastChange());
      debugInfo(tn,"Success");
    }
    finally
    {
      if (ri!=null)
        ri.releaseCursor();
      if (handler != null)
        handler.shutdown();
      if (dbEnv != null)
@@ -635,4 +546,5 @@
        TestCaseUtils.deleteDirectory(testRoot);
    }
  }
}