mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -35,7 +35,7 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -61,8 +61,8 @@
  private static final int START = 0;
  private static final int STOP = 1;
  private Database db = null;
  private ReplicationDbEnv dbenv = null;
  private Database db;
  private ReplicationDbEnv dbenv;
  private ReplicationServer replicationServer;
  private int serverId;
  private String baseDn;
@@ -76,7 +76,7 @@
  // Change counter management
  // The Db itself does not allow to count records between a start and an end
  // change. And we cannot rely on the replication seqnum that is part of the
  // changenumber, since there can be holes (when an operation is canceled).
  // CSN, since there can be holes (when an operation is canceled).
  // And traversing all the records from the start one to the end one works
  // fine but can be very long (ECL:lastChangeNumber).
  //
@@ -87,9 +87,9 @@
  // - a counter value : count of changes since previous counter record.
  //
  // A counter record has to follow the order of the db, so it needs to have
  // a changenumber key that follows the order.
  // A counter record must have its own changenumber key since the Db does not
  // support duplicate keys (it is a compatibility breaker character of the DB).
  // a CSN key that follows the order.
  // A counter record must have its own CSN key since the Db does not support
  // duplicate keys (it is a compatibility breaker character of the DB).
  //
  // We define 2 conditions to store a counter record :
  // 1/- at least 'counterWindowSize' changes have been stored in the Db
@@ -156,11 +156,11 @@
      OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        ChangeNumber cn = toChangeNumber(key.getData());
        if (isACounterRecord(cn))
        CSN csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          counterCurrValue = decodeCounterValue(data.getData()) + 1;
          counterTsLimit = cn.getTime();
          counterTsLimit = csn.getTime();
          break;
        }
@@ -179,9 +179,9 @@
    }
  }
  private static ChangeNumber toChangeNumber(byte[] data)
  private static CSN toCSN(byte[] data)
  {
    return new ChangeNumber(decodeUTF8(data));
    return new CSN(decodeUTF8(data));
  }
@@ -204,10 +204,10 @@
      for (UpdateMsg change : changes)
      {
        final DatabaseEntry key =
            createReplicationKey(change.getChangeNumber());
            createReplicationKey(change.getCSN());
        final DatabaseEntry data = new ReplicationData(change);
        insertCounterRecordIfNeeded(change.getChangeNumber());
        insertCounterRecordIfNeeded(change.getCSN());
        db.put(null, key, data);
        counterCurrValue++;
      }
@@ -222,19 +222,18 @@
    }
  }
  private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
      throws DatabaseException
  private void insertCounterRecordIfNeeded(CSN csn) throws DatabaseException
  {
    if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
    {
      // enough changes to generate a counter record
      // wait for the next change of time
      counterTsLimit = changeNumber.getTime();
      counterTsLimit = csn.getTime();
    }
    if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit)
    if (counterTsLimit != 0 && csn.getTime() != counterTsLimit)
    {
      // Write the counter record
      final ChangeNumber counterRecord = newCounterRecord(changeNumber);
      final CSN counterRecord = newCounterRecord(csn);
      DatabaseEntry counterKey = createReplicationKey(counterRecord);
      DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1);
      db.put(null, counterKey, counterValue);
@@ -242,12 +241,12 @@
    }
  }
  private DatabaseEntry createReplicationKey(ChangeNumber changeNumber)
  private DatabaseEntry createReplicationKey(CSN csn)
  {
    DatabaseEntry key = new DatabaseEntry();
    try
    {
      key.setData(changeNumber.toString().getBytes("UTF-8"));
      key.setData(csn.toString().getBytes("UTF-8"));
    }
    catch (UnsupportedEncodingException e)
    {
@@ -285,15 +284,16 @@
   * Create a cursor that can be used to search or iterate on this
   * ReplicationServer DB.
   *
   * @param changeNumber The ChangeNumber from which the cursor must start.
   * @param startCSN
   *          The CSN from which the cursor must start.
   * @throws ChangelogException
   *           When a problem occurs or the startingChangeNumber does not exist.
   *           When a problem occurs or the startCSN does not exist.
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
  public ReplServerDBCursor openReadCursor(CSN startCSN)
      throws ChangelogException
  {
    return new ReplServerDBCursor(changeNumber);
    return new ReplServerDBCursor(startCSN);
  }
  /**
@@ -326,9 +326,10 @@
  /**
   * Read the first Change from the database.
   * @return the first ChangeNumber.
   *
   * @return the first CSN.
   */
  public ChangeNumber readFirstChange()
  public CSN readFirstChange()
  {
    dbCloseLock.readLock().lock();
@@ -351,10 +352,10 @@
        return null;
      }
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (!isACounterRecord(cn))
      final CSN csn = toCSN(key.getData());
      if (!isACounterRecord(csn))
      {
        return cn;
        return csn;
      }
      // First record is a counter record .. go next
@@ -365,7 +366,7 @@
      }
      // There cannot be 2 counter record next to each other,
      // it is safe to return this record
      return toChangeNumber(key.getData());
      return toCSN(key.getData());
    }
    catch (DatabaseException e)
    {
@@ -383,9 +384,9 @@
  /**
   * Read the last Change from the database.
   *
   * @return the last ChangeNumber.
   * @return the last CSN.
   */
  public ChangeNumber readLastChange()
  public CSN readLastChange()
  {
    dbCloseLock.readLock().lock();
@@ -408,10 +409,10 @@
        return null;
      }
      final ChangeNumber cn = toChangeNumber(key.getData());
      if (!isACounterRecord(cn))
      final CSN csn = toCSN(key.getData());
      if (!isACounterRecord(csn))
      {
        return cn;
        return csn;
      }
      if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -424,7 +425,7 @@
      }
      // There cannot be 2 counter record next to each other,
      // it is safe to return this record
      return toChangeNumber(key.getData());
      return toCSN(key.getData());
    }
    catch (DatabaseException e)
    {
@@ -438,17 +439,16 @@
  }
  /**
   * Try to find in the DB, the change number right before the one
   * passed as a parameter.
   * Try to find in the DB, the CSN right before the one passed as a parameter.
   *
   * @param changeNumber
   *          The changeNumber from which we start searching.
   * @return the changeNumber right before the one passed as a parameter.
   *         Can return null if there is none.
   * @param csn
   *          The CSN from which we start searching.
   * @return the CSN right before the one passed as a parameter. Can return null
   *         if there is none.
   */
  public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber)
  public CSN getPreviousCSN(CSN csn)
  {
    if (changeNumber == null)
    if (csn == null)
    {
      return null;
    }
@@ -464,23 +464,23 @@
        return null;
      }
      DatabaseEntry key = createReplicationKey(changeNumber);
      DatabaseEntry key = createReplicationKey(csn);
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
      {
        // We can move close to the changeNumber.
        // We can move close to the CSN.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
        // else, there was no change previous to our changeNumber.
        // else, there was no change previous to our CSN.
      }
      else
      {
        // We could not move the cursor past to the changeNumber
        // Check if the last change is older than changeNumber
        // We could not move the cursor past to the CSN
        // Check if the last change is older than CSN
        if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
@@ -498,24 +498,24 @@
    return null;
  }
  private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
  private CSN getRegularRecord(Cursor cursor, DatabaseEntry key,
      DatabaseEntry data) throws DatabaseException
  {
    final ChangeNumber cn = toChangeNumber(key.getData());
    if (!isACounterRecord(cn))
    final CSN csn = toCSN(key.getData());
    if (!isACounterRecord(csn))
    {
      return cn;
      return csn;
    }
    // There cannot be 2 counter record next to each other,
    // it is safe to return previous record which must exist
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
    {
      return toChangeNumber(key.getData());
      return toCSN(key.getData());
    }
    // database only contain a counter record, which should not be possible
    // let's just say no changeNumber
    // let's just say no CSN
    return null;
  }
@@ -553,17 +553,16 @@
     * Creates a ReplServerDBCursor that can be used for browsing a
     * replicationServer db.
     *
     * @param startingChangeNumber
     *          The ChangeNumber from which the cursor must start.
     * @param startCSN
     *          The CSN from which the cursor must start.
     * @throws ChangelogException
     *           When the startingChangeNumber does not exist.
     *           When the startCSN does not exist.
     */
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
        throws ChangelogException
    private ReplServerDBCursor(CSN startCSN) throws ChangelogException
    {
      if (startingChangeNumber != null)
      if (startCSN != null)
      {
        key = createReplicationKey(startingChangeNumber);
        key = createReplicationKey(startCSN);
      }
      else
      {
@@ -590,19 +589,18 @@
        }
        localCursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        if (startCSN != null)
        {
          if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            // We could not move the cursor to the expected startCSN
            if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              throw new ChangelogException(
                  Message.raw("ChangeNumber not available"));
              // We could not even move the cursor close to it => failure
              throw new ChangelogException(Message.raw("CSN not available"));
            }
            // We can move close to the startingChangeNumber.
            // We can move close to the startCSN.
            // Let's create a cursor from that point.
            DatabaseEntry aKey = new DatabaseEntry();
            DatabaseEntry aData = new DatabaseEntry();
@@ -752,12 +750,13 @@
    }
    /**
     * Get the next ChangeNumber in the database from this Cursor.
     * Get the next CSN in the database from this Cursor.
     *
     * @return The next ChangeNumber in the database from this cursor.
     * @throws ChangelogException In case of underlying database problem.
     * @return The next CSN in the database from this cursor.
     * @throws ChangelogException
     *           In case of underlying database problem.
     */
    public ChangeNumber nextChangeNumber() throws ChangelogException
    public CSN nextCSN() throws ChangelogException
    {
      if (isClosed)
      {
@@ -770,7 +769,7 @@
        {
          return null;
        }
        return toChangeNumber(key.getData());
        return toCSN(key.getData());
      }
      catch (DatabaseException e)
      {
@@ -804,11 +803,11 @@
          return null;
        }
        ChangeNumber cn = null;
        CSN csn = null;
        try
        {
          cn = toChangeNumber(key.getData());
          if (isACounterRecord(cn))
          csn = toCSN(key.getData());
          if (isACounterRecord(csn))
          {
            continue;
          }
@@ -827,7 +826,7 @@
           */
          Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
              .get(replicationServer.getServerId(),
                  (cn == null ? "" : cn.toString()),
                  (csn == null ? "" : csn.toString()),
                  e.getMessage());
          logError(message);
        }
@@ -908,10 +907,10 @@
   * Count the number of changes between 2 changes numbers (inclusive).
   * @param start The lower limit of the count.
   * @param stop The higher limit of the count.
   * @return The number of changes between provided start and stop changeNumber.
   * @return The number of changes between provided start and stop CSN.
   * Returns 0 when an error occurs.
   */
  public long count(ChangeNumber start, ChangeNumber stop)
  public long count(CSN start, CSN stop)
  {
    dbCloseLock.readLock().lock();
    try
@@ -962,8 +961,8 @@
  }
  private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
      ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
  private void findFirstCounterRecordAfterStartPoint(CSN start,
      CSN stop, int[] counterValues, int[] distanceToCounterRecords)
      throws DatabaseException
  {
    Cursor cursor = db.openCursor(null, null);
@@ -988,8 +987,8 @@
      while (status == OperationStatus.SUCCESS)
      {
        // test whether the record is a regular change or a counter
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (isACounterRecord(cn))
        final CSN csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          // we have found the counter record
          counterValues[START] = decodeCounterValue(data.getData());
@@ -998,7 +997,7 @@
        // reached a regular change record
        // test whether we reached the 'stop' target
        if (!cn.newer(stop))
        if (!csn.newer(stop))
        {
          // let's loop
          distanceToCounterRecords[START]++;
@@ -1017,8 +1016,8 @@
    }
  }
  private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
      ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
  private boolean findFirstCounterRecordBeforeStopPoint(CSN start,
      CSN stop, int[] counterValues, int[] distanceToCounterRecords)
      throws DatabaseException
  {
    Cursor cursor = db.openCursor(null, null);
@@ -1040,8 +1039,8 @@
      while (status == OperationStatus.SUCCESS)
      {
        final ChangeNumber cn = toChangeNumber(key.getData());
        if (isACounterRecord(cn))
        final CSN csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          // we have found the counter record
          counterValues[STOP] = decodeCounterValue(data.getData());
@@ -1049,7 +1048,7 @@
        }
        // it is a regular change record
        if (!cn.older(start))
        if (!csn.older(start))
        {
          distanceToCounterRecords[STOP]++;
          status = cursor.getPrev(key, data, LockMode.DEFAULT);
@@ -1067,7 +1066,7 @@
  /**
   * The diagram below shows a visual description of how the distance between
   * two change numbers in the database is computed.
   * two CSNs in the database is computed.
   *
   * <pre>
   *     +--------+                        +--------+
@@ -1102,9 +1101,9 @@
   * Explanation of the terms used:
   * <dl>
   * <dt>START</dt>
   * <dd>Start change number for the count</dd>
   * <dd>Start CSN for the count</dd>
   * <dt>STOP</dt>
   * <dd>Stop change number for the count</dd>
   * <dd>Stop CSN for the count</dd>
   * <dt>dist</dt>
   * <dd>Distance from START (or STOP) to the counter record</dd>
   * <dt>CSN</dt>
@@ -1113,9 +1112,9 @@
   * database is ordered.</dd>
   * <dt>CR</dt>
   * <dd>Stands for "Counter Record". Counter Records are inserted in the
   * database along with real change numbers, but they are not real changes.
   * They are only used to speed up calculating the distance between 2 change
   * numbers without the need to scan the whole database in between.</dd>
   * database along with real CSNs, but they are not real changes. They are only
   * used to speed up calculating the distance between 2 CSNs without the need
   * to scan the whole database in between.</dd>
   * </dl>
   */
  private long computeDistance(int[] counterValues,
@@ -1137,21 +1136,21 @@
  }
  /**
   * Whether a provided changeNumber represents a counter record. A counter
   * record is used to store TODO.
   * Whether a provided CSN represents a counter record. A counter record is
   * used to store the time.
   *
   * @param cn
   *          The changeNumber to test
   * @return true if the provided changenumber is a counter, false otherwise
   * @param csn
   *          The CSN to test
   * @return true if the provided CSN is a counter, false otherwise
   */
  private static boolean isACounterRecord(ChangeNumber cn)
  private static boolean isACounterRecord(CSN csn)
  {
    return cn.getServerId() == 0 && cn.getSeqnum() == 0;
    return csn.getServerId() == 0 && csn.getSeqnum() == 0;
  }
  private static ChangeNumber newCounterRecord(ChangeNumber changeNumber)
  private static CSN newCounterRecord(CSN csn)
  {
    return new ChangeNumber(changeNumber.getTime(), 0, 0);
    return new CSN(csn.getTime(), 0, 0);
  }
  /**