mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
09.59.2013 be36718d412e99be93dbb0a8775a6a57ab10eba9
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -27,6 +27,9 @@
 */
package org.opends.server.replication.server;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
@@ -180,38 +183,34 @@
   */
  public void addEntries(List<UpdateMsg> changes)
  {
    dbCloseLock.readLock().lock();
    try
    {
      dbCloseLock.readLock().lock();
      try
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        // If the DB has been closed then return immediately.
        if (isDBClosed())
        {
          return;
        }
        for (UpdateMsg change : changes)
        {
          final DatabaseEntry key =
              createReplicationKey(change.getChangeNumber());
          final DatabaseEntry data = new ReplicationData(change);
          insertCounterRecordIfNeeded(change.getChangeNumber());
          db.put(null, key, data);
          counterCurrValue++;
        }
        return;
      }
      finally
      for (UpdateMsg change : changes)
      {
        dbCloseLock.readLock().unlock();
        final DatabaseEntry key =
            createReplicationKey(change.getChangeNumber());
        final DatabaseEntry data = new ReplicationData(change);
        insertCounterRecordIfNeeded(change.getChangeNumber());
        db.put(null, key, data);
        counterCurrValue++;
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    finally
    {
      dbCloseLock.readLock().unlock();
    }
  }
  private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
@@ -253,18 +252,11 @@
   */
  public void shutdown()
  {
    dbCloseLock.writeLock().lock();
    try
    {
      dbCloseLock.writeLock().lock();
      try
      {
        db.close();
        db = null;
      }
      finally
      {
        dbCloseLock.writeLock().unlock();
      }
      db.close();
      db = null;
    }
    catch (DatabaseException e)
    {
@@ -273,6 +265,10 @@
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
    }
    finally
    {
      dbCloseLock.writeLock().unlock();
    }
  }
  /**
@@ -327,55 +323,52 @@
   */
  public ChangeNumber readFirstChange()
  {
    dbCloseLock.readLock().lock();
    Cursor cursor = null;
    try
    {
      dbCloseLock.readLock().lock();
      try
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        // If the DB has been closed then return immediately.
        if (isDBClosed())
        {
          return null;
        }
        cursor = db.openCursor(null, null);
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode defaultMode = LockMode.DEFAULT;
        if (cursor.getFirst(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          // database is empty
          return null;
        }
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (!isACounterRecord(cn))
        {
          return cn;
        }
        // First record is a counter record .. go next
        if (cursor.getNext(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          // DB contains only a counter record
          return null;
        }
        // There cannot be 2 counter record next to each other,
        // it is safe to return this record
        return toChangeNumber(key.getData());
        return null;
      }
      finally
      cursor = db.openCursor(null, null);
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      if (cursor.getFirst(key, data, LockMode.DEFAULT) != SUCCESS)
      {
        closeAndReleaseReadLock(cursor);
        // database is empty
        return null;
      }
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (!isACounterRecord(cn))
      {
        return cn;
      }
      // First record is a counter record .. go next
      if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
      {
        // DB contains only a counter record
        return null;
      }
      // There cannot be 2 counter record next to each other,
      // it is safe to return this record
      return toChangeNumber(key.getData());
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      return null;
    }
    finally
    {
      closeAndReleaseReadLock(cursor);
    }
  }
@@ -387,57 +380,54 @@
   */
  public ChangeNumber readLastChange()
  {
    dbCloseLock.readLock().lock();
    Cursor cursor = null;
    try
    {
      dbCloseLock.readLock().lock();
      try
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        // If the DB has been closed then return immediately.
        if (isDBClosed())
        {
          return null;
        }
        cursor = db.openCursor(null, null);
        DatabaseEntry key = new DatabaseEntry();
        DatabaseEntry data = new DatabaseEntry();
        LockMode defaultMode = LockMode.DEFAULT;
        if (cursor.getLast(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          // database is empty
          return null;
        }
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (!isACounterRecord(cn))
        {
          return cn;
        }
        if (cursor.getPrev(key, data, defaultMode) != OperationStatus.SUCCESS)
        {
          /*
           * database only contain a counter record - don't know how much it can
           * be possible but ...
           */
          return null;
        }
        // There cannot be 2 counter record next to each other,
        // it is safe to return this record
        return toChangeNumber(key.getData());
        return null;
      }
      finally
      cursor = db.openCursor(null, null);
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      if (cursor.getLast(key, data, LockMode.DEFAULT) != SUCCESS)
      {
        closeAndReleaseReadLock(cursor);
        // database is empty
        return null;
      }
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (!isACounterRecord(cn))
      {
        return cn;
      }
      if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
      {
        /*
         * database only contain a counter record - don't know how much it can
         * be possible but ...
         */
        return null;
      }
      // There cannot be 2 counter record next to each other,
      // it is safe to return this record
      return toChangeNumber(key.getData());
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
      return null;
    }
    finally
    {
      closeAndReleaseReadLock(cursor);
    }
  }
  /**
@@ -456,53 +446,48 @@
      return null;
    }
    dbCloseLock.readLock().lock();
    Cursor cursor = null;
    try
    {
      dbCloseLock.readLock().lock();
      try
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        // If the DB has been closed then return immediately.
        if (isDBClosed())
        {
          return null;
        }
        DatabaseEntry key = createReplicationKey(changeNumber);
        DatabaseEntry data = new DatabaseEntry();
        cursor = db.openCursor(null, null);
        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
            == OperationStatus.SUCCESS)
        {
          // We can move close to the changeNumber.
          // Let's move to the previous change.
          if (cursor.getPrev(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            return getRegularRecord(cursor, key, data);
          }
          // else, there was no change previous to our changeNumber.
        }
        else
        {
          // We could not move the cursor past to the changeNumber
          // Check if the last change is older than changeNumber
          if (cursor.getLast(key, data, LockMode.DEFAULT)
              == OperationStatus.SUCCESS)
          {
            return getRegularRecord(cursor, key, data);
          }
        }
        return null;
      }
      finally
      DatabaseEntry key = createReplicationKey(changeNumber);
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
      {
        closeAndReleaseReadLock(cursor);
        // We can move close to the changeNumber.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
        // else, there was no change previous to our changeNumber.
      }
      else
      {
        // We could not move the cursor past to the changeNumber
        // Check if the last change is older than changeNumber
        if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
      }
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    finally
    {
      closeAndReleaseReadLock(cursor);
    }
    return null;
  }
@@ -517,7 +502,7 @@
    // There cannot be 2 counter record next to each other,
    // it is safe to return previous record which must exist
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS)
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
    {
      return toChangeNumber(key.getData());
    }
@@ -600,28 +585,23 @@
        localCursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        {
          if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              throw new Exception("ChangeNumber not available");
            }
            else
            // We can move close to the startingChangeNumber.
            // Let's create a cursor from that point.
            DatabaseEntry aKey = new DatabaseEntry();
            DatabaseEntry aData = new DatabaseEntry();
            if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
            {
              // We can move close to the startingChangeNumber.
              // Let's create a cursor from that point.
              DatabaseEntry aKey = new DatabaseEntry();
              DatabaseEntry aData = new DatabaseEntry();
              if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                localCursor.close();
                localCursor = db.openCursor(txn, null);
              }
              localCursor.close();
              localCursor = db.openCursor(txn, null);
            }
          }
        }
@@ -760,8 +740,7 @@
        return null;
      }
      OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
      {
        return null;
      }
@@ -785,8 +764,7 @@
      {
        try
        {
          OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
          if (status != OperationStatus.SUCCESS)
          if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
          {
            return null;
          }
@@ -898,149 +876,154 @@
   */
  public long count(ChangeNumber start, ChangeNumber stop)
  {
    dbCloseLock.readLock().lock();
    try
    {
      dbCloseLock.readLock().lock();
      Cursor cursor = null;
      try
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        // If the DB has been closed then return immediately.
        if (isDBClosed())
        {
          return 0;
        }
        if (start == null && stop == null)
        {
          return db.count();
        }
        int[] counterValues = new int[2];
        int[] distanceToCounterRecords = new int[2];
        // Step 1 : from the start point, traverse db to the next counter record
        // or to the stop point.
        cursor = db.openCursor(null, null);
        findFirstCounterRecordAfterStartPoint(start, stop, cursor,
            counterValues, distanceToCounterRecords);
        cursor.close();
        // cases
        if (counterValues[START] == 0)
          return distanceToCounterRecords[START];
        // Step 2 : from the stop point, traverse db to the next counter record
        // or to the start point.
        cursor = db.openCursor(null, null);
        if (!findFirstCounterRecordBeforeStopPoint(start, stop, cursor,
            counterValues, distanceToCounterRecords))
        {
          // database is empty
          return 0;
        }
        cursor.close();
        // Step 3 : Now consolidates the result
        return computeDistance(counterValues, distanceToCounterRecords);
        return 0;
      }
      finally
      if (start == null && stop == null)
      {
        closeAndReleaseReadLock(cursor);
        return db.count();
      }
      int[] counterValues = new int[2];
      int[] distanceToCounterRecords = new int[2];
      // Step 1 : from the start point, traverse db to the next counter record
      // or to the stop point.
      findFirstCounterRecordAfterStartPoint(start, stop, counterValues,
          distanceToCounterRecords);
      // cases
      if (counterValues[START] == 0)
        return distanceToCounterRecords[START];
      // Step 2 : from the stop point, traverse db to the next counter record
      // or to the start point.
      if (!findFirstCounterRecordBeforeStopPoint(start, stop, counterValues,
          distanceToCounterRecords))
      {
        // database is empty
        return 0;
      }
      // Step 3 : Now consolidates the result
      return computeDistance(counterValues, distanceToCounterRecords);
    }
    catch (DatabaseException e)
    {
      replicationServer.handleUnexpectedDatabaseException(e);
    }
    finally
    {
      dbCloseLock.readLock().unlock();
    }
    return 0;
  }
  private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
      ChangeNumber stop, Cursor cursor, int[] counterValues,
      int[] distanceToCounterRecords)
      ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
  {
    OperationStatus status;
    DatabaseEntry key;
    DatabaseEntry data = new DatabaseEntry();
    if (start != null)
    Cursor cursor = db.openCursor(null, null);
    try
    {
      key = createReplicationKey(start);
      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status == OperationStatus.NOTFOUND)
        status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
    }
    else
    {
      key = new DatabaseEntry();
      status = cursor.getNext(key, data, LockMode.DEFAULT);
    }
    while (status == OperationStatus.SUCCESS)
    {
      // test whether the record is a regular change or a counter
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (isACounterRecord(cn))
      OperationStatus status;
      DatabaseEntry key;
      DatabaseEntry data = new DatabaseEntry();
      if (start != null)
      {
        // we have found the counter record
        counterValues[START] = decodeCounterValue(data.getData());
        break;
      }
      // reached a regular change record
      // test whether we reached the 'stop' target
      if (!cn.newer(stop))
      {
        // let's loop
        distanceToCounterRecords[START]++;
        status = cursor.getNext(key, data, LockMode.DEFAULT);
        key = createReplicationKey(start);
        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
        if (status == OperationStatus.NOTFOUND)
          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
      }
      else
      {
        // reached the end
        break;
        key = new DatabaseEntry();
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (isACounterRecord(cn))
        {
          // we have found the counter record
          counterValues[START] = decodeCounterValue(data.getData());
          break;
        }
        // reached a regular change record
        // test whether we reached the 'stop' target
        if (!cn.newer(stop))
        {
          // let's loop
          distanceToCounterRecords[START]++;
          status = cursor.getNext(key, data, LockMode.DEFAULT);
        }
        else
        {
          // reached the end
          break;
        }
      }
    }
    finally
    {
      close(cursor);
    }
  }
  private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
      ChangeNumber stop, Cursor cursor, int[] counterValues,
      int[] distanceToCounterRecords)
      ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
  {
    DatabaseEntry key = createReplicationKey(stop);
    DatabaseEntry data = new DatabaseEntry();
    OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
    if (status != OperationStatus.SUCCESS)
    Cursor cursor = db.openCursor(null, null);
    try
    {
      key = new DatabaseEntry();
      data = new DatabaseEntry();
      status = cursor.getLast(key, data, LockMode.DEFAULT);
      DatabaseEntry key = createReplicationKey(stop);
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
      if (status != OperationStatus.SUCCESS)
      {
        return false;
        key = new DatabaseEntry();
        data = new DatabaseEntry();
        status = cursor.getLast(key, data, LockMode.DEFAULT);
        if (status != OperationStatus.SUCCESS)
        {
          return false;
        }
      }
    }
    while (status == OperationStatus.SUCCESS)
      while (status == OperationStatus.SUCCESS)
      {
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (isACounterRecord(cn))
        {
          // we have found the counter record
          counterValues[STOP] = decodeCounterValue(data.getData());
          break;
        }
        // it is a regular change record
        if (!cn.older(start))
        {
          distanceToCounterRecords[STOP]++;
          status = cursor.getPrev(key, data, LockMode.DEFAULT);
        }
        else
          break;
      }
      return true;
    }
    finally
    {
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (isACounterRecord(cn))
      {
        // we have found the counter record
        counterValues[STOP] = decodeCounterValue(data.getData());
        break;
      }
      // it is a regular change record
      if (!cn.older(start))
      {
        distanceToCounterRecords[STOP]++;
        status = cursor.getPrev(key, data, LockMode.DEFAULT);
      }
      else
        break;
      close(cursor);
    }
    return true;
  }
  /**