mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.55.2014 f28eee1ba07554be73881e1df417494b3968ea85
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -39,6 +39,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -49,6 +50,8 @@
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -289,15 +292,18 @@
   * @param startCSN
   *          The CSN from which the cursor must start.If null, start from the
   *          oldest CSN
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   *          Cursor position strategy
   * @return The ReplServerDBCursor.
   * @throws ChangelogException
   *           If a database problem happened
   */
  ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN, positionStrategy);
    return new ReplServerDBCursor(startCSN, matchingStrategy, positionStrategy);
  }
  /**
@@ -447,6 +453,31 @@
    return serverId + " " + baseDN.toNormalizedString();
  }
  /** Hold a cursor and an indicator of wether the cursor should be considered as empty. */
  private static class CursorWithEmptyIndicator
  {
    private Cursor cursor;
    private boolean isEmpty;
    private CursorWithEmptyIndicator(Cursor localCursor, boolean isEmpty)
    {
      this.cursor = localCursor;
      this.isEmpty = isEmpty;
    }
    /** Creates cursor considered as empty. */
    static CursorWithEmptyIndicator createEmpty(Cursor cursor)
    {
      return new CursorWithEmptyIndicator(cursor, true);
    }
    /** Creates cursor considered as non-empty. */
    static CursorWithEmptyIndicator createNonEmpty(Cursor cursor)
    {
      return new CursorWithEmptyIndicator(cursor, false);
    }
  }
  /**
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
@@ -460,7 +491,7 @@
     * <p>
     * Will be set non null for a write cursor
     */
    private final Cursor cursor;
    private Cursor cursor;
    private final DatabaseEntry key;
    private final DatabaseEntry data;
    /** \@Null for read cursors, \@NotNull for deleting cursors. */
@@ -475,12 +506,16 @@
     *
     * @param startCSN
     *          The CSN from which the cursor must start.
     * @param matchingStrategy
     *          Cursor key matching strategy, which allow to indicates how key
     *          is matched
     * @param positionStrategy
     *          indicates at which exact position the cursor must start
     * @throws ChangelogException
     *           When the startCSN does not exist.
     */
    private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
    private ReplServerDBCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
        throws ChangelogException
    {
      key = createReplicationKey(startCSN);
      data = new DatabaseEntry();
@@ -491,8 +526,7 @@
      // unlock it when throwing an exception.
      dbCloseLock.readLock().lock();
      boolean cursorHeld = false;
      Cursor localCursor = null;
      CursorWithEmptyIndicator maybeEmptyCursor = null;
      try
      {
        // If the DB has been closed then create empty cursor.
@@ -503,35 +537,15 @@
          return;
        }
        localCursor = db.openCursor(txn, null);
        if (startCSN != null
            && localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS)
        maybeEmptyCursor = generateCursor(startCSN, matchingStrategy, positionStrategy);
        if (maybeEmptyCursor.isEmpty)
        {
          // We could not move the cursor to the expected startCSN
          if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
          {
            // We could not even move the cursor close to it
            // => return empty cursor
            isClosed = true;
            cursor = null;
            return;
          }
          if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
          {
            // We can move close to the startCSN.
            // Let's create a cursor from that point.
            key.setData(null);
            if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              localCursor.close();
              localCursor = db.openCursor(txn, null);
            }
          }
          isClosed = true;
          cursor = null;
          return;
        }
        cursor = localCursor;
        cursorHeld = cursor != null;
        cursor = maybeEmptyCursor.cursor;
        if (key.getData() != null)
        {
          computeCurrentRecord();
@@ -543,13 +557,72 @@
      }
      finally
      {
        if (!cursorHeld)
        if (maybeEmptyCursor != null && maybeEmptyCursor.isEmpty)
        {
          closeAndReleaseReadLock(localCursor);
          closeAndReleaseReadLock(maybeEmptyCursor.cursor);
        }
      }
    }
    /** Generate a possibly empty cursor with the provided start CSN and strategies. */
    private CursorWithEmptyIndicator generateCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
        PositionStrategy positionStrategy)
    {
      Cursor cursor = db.openCursor(txn, null);
      boolean isCsnFound = startCSN == null || cursor.getSearchKey(key, data, LockMode.DEFAULT) == SUCCESS;
      if (!isCsnFound)
      {
        if (matchingStrategy == EQUAL_TO_KEY)
        {
          return CursorWithEmptyIndicator.createEmpty(cursor);
        }
        boolean isGreaterCsnFound = cursor.getSearchKeyRange(key, data, DEFAULT) == SUCCESS;
        if (isGreaterCsnFound)
        {
          if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && positionStrategy == AFTER_MATCHING_KEY)
          {
            // Move backward so that the first call to next() points to this greater csn
            key.setData(null);
            if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              // Edge case: we're at the beginning of the database
              cursor.close();
              cursor = db.openCursor(txn, null);
            }
          }
          else if (matchingStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
          {
            // Move backward to point on the lower csn
            key.setData(null);
            if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              // Edge case: we're at the beginning of the log, there is no lower csn
              return CursorWithEmptyIndicator.createEmpty(cursor);
            }
          }
        }
        else
        {
          if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
          {
            // There is no greater csn
            return CursorWithEmptyIndicator.createEmpty(cursor);
          }
          // LESS_THAN_OR_EQUAL_TO_KEY case : the lower csn is the highest csn available
          key.setData(null);
          boolean isLastKeyFound = cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS;
          if (!isLastKeyFound)
          {
            // Edge case: empty database
            cursor.close();
            cursor = db.openCursor(txn, null);
          }
        }
      }
      return CursorWithEmptyIndicator.createNonEmpty(cursor);
    }
    private ReplServerDBCursor() throws ChangelogException
    {
      key = new DatabaseEntry();