mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
25.43.2014 933e60b44df8950bb3df2e8908d991bcb6edfc1a
Checkpoint commit for OPENDJ-1471 File based changelog : improve cursor behavior

First step : improve cursor behavior for file-based implementation only

* Log.java, LogFile.java, FileReplicaDBCursor.java : change cursors to behave
like java.sql.ResultSet, ie cursor is positionned before the first record

* FileReplicaDBCursor.java : fix behavior for case when record with
start key is not available when cursor is created

* FileChangeNumberIndexDBCursor.java : update to adapt to new behavior
of underlying cursor, but still behave differently than java.sql.ResultSet
(to be changed in the next step)

* LogTest.java, LogFileTest.java : adapt tests to new behavior

* FileReplicaDBTest.java : update test related to exhausted cursor
for better coverage of edge cases
7 files modified
301 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java 68 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java 35 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java 63 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java 41 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java 26 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java 60 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
@@ -47,9 +47,15 @@
   *
   * @param cursor
   *          The underlying cursor to read log.
   * @throws ChangelogException
   *            If an error occurs.
   */
  FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor) {
  FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor)
      throws ChangelogException
  {
    this.cursor = cursor;
    // cursor is positioned to first record at start
    next();
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -32,18 +32,28 @@
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
/**
 * A cursor on ReplicaDB.
 * A cursor on ReplicaDB, which can re-initialize itself after exhaustion.
 * <p>
 * This cursor behaves specially in two ways :
 * <ul>
 *  <li>The cursor initially points to a {@code null} value: the
 *      {@code getRecord()} method return {@code null} if called before any call to
 *      {@code next()} method.</li>
 *  <li>The cursor automatically re-initializes itself if it is exhausted: when
 *      exhausted, the cursor re-position itself to the last non null CSN previously
 *      read.
 *  <li>
 * </ul>
 * The cursor provides a java.sql.ResultSet like API :
 * <pre>
 * {@code
 *  FileReplicaDBCursor cursor = ...;
 *  try {
 *    while (cursor.next()) {
 *      Record record = cursor.getRecord();
 *      // ... can call cursor.getRecord() again: it will return the same result
 *    }
 *  }
 *  finally {
 *    close(cursor);
 *  }
 * }
 * </pre>
 * <p>
 * The cursor automatically re-initializes itself if it is exhausted: if a
 * record is newly available, a subsequent call to the {@code next()} method will
 * return {@code true} and the record will be available by calling {@code getRecord()}
 * method.
 */
class FileReplicaDBCursor implements DBCursor<UpdateMsg>
{
@@ -54,7 +64,7 @@
  /** The next record to return. */
  private Record<CSN, UpdateMsg> nextRecord;
  /** The CSN to re-start with in case the cursor is exhausted. */
  /**  The CSN to re-start with in case the cursor is exhausted. */
  private CSN lastNonNullCurrentCSN;
  /**
@@ -82,25 +92,33 @@
  @Override
  public boolean next() throws ChangelogException
  {
    nextRecord = cursor.getRecord();
    if (nextRecord != null)
    if (cursor.next())
    {
      nextRecord = cursor.getRecord();
      if (nextRecord.getKey().compareTo(lastNonNullCurrentCSN) > 0)
      {
        lastNonNullCurrentCSN = nextRecord.getKey();
        return true;
      }
    }
    return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
  }
  /** Re-initialize the cursor after the last non null CSN. */
  private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
  {
    final boolean found = cursor.positionTo(lastNonNullCurrentCSN, true);
    if (found && cursor.next())
    {
      nextRecord = cursor.getRecord();
      lastNonNullCurrentCSN = nextRecord.getKey();
      return true;
    }
    else
    {
      // Exhausted cursor must be able to reinitialize itself
      cursor.positionTo(lastNonNullCurrentCSN, true);
      nextRecord = cursor.getRecord();
      if (nextRecord != null)
      {
        lastNonNullCurrentCSN = nextRecord.getKey();
      }
      nextRecord = null;
      return false;
    }
    // the underlying cursor is one record in advance
    cursor.next();
    return nextRecord != null;
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -424,10 +424,9 @@
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   * <p>
   * The returned cursor initially points to record corresponding to the first
   * key, that is {@code cursor.getRecord()} is equals to the record
   * corresponding to the first key before any call to {@code cursor.next()}
   * method.
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
@@ -463,9 +462,9 @@
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the provided key.
   * <p>
   * The returned cursor initially points to record corresponding to the key,
   * that is {@code cursor.getRecord()} is equals to the record corresponding to
   * the key before any call to {@code cursor.next()} method.
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -484,10 +483,10 @@
   * starting at the position defined by the smallest key that is higher than
   * the provided key.
   * <p>
   * The returned cursor initially points to record corresponding to the key
   * found, that is {@code cursor.getRecord()} is equals to the record
   * corresponding to the key found before any call to {@code cursor.next()}
   * method.
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method. After the first call to {@code cursor.next()}
   * the cursor points to the record corresponding to the key found.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -1027,7 +1026,7 @@
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return true;
          return currentCursor.next();
        }
        return false;
      }
@@ -1069,17 +1068,7 @@
        {
          switchToLogFile(logFile);
        }
        if (key != null)
        {
          boolean isFound = currentCursor.positionTo(key, findNearest);
          if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile))
          {
            // The key to position is probably in the next file, force the switch
            isFound = next();
          }
          return isFound;
        }
        return true;
        return (key == null) ? true : currentCursor.positionTo(key, findNearest);
      }
      finally
      {
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -313,10 +313,9 @@
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   * <p>
   * The returned cursor initially points to record corresponding to the first
   * key, that is {@code cursor.getRecord()} is equals to the record
   * corresponding to the first key before any call to {@code cursor.next()}
   * method.
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals {@code null} before any call to
   * {@code cursor.next()} method.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
@@ -331,9 +330,10 @@
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the provided key.
   * <p>
   * The returned cursor initially points to record corresponding to the key,
   * that is {@code cursor.getRecord()} is equals to the record corresponding to
   * the key before any call to {@code cursor.next()} method.
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method. After the first call to {@code cursor.next()}
   * the cursor points to the record corresponding to the key.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -352,10 +352,10 @@
   * starting at the position defined by the smallest key that is higher than
   * the provided key.
   * <p>
   * The returned cursor initially points to record corresponding to the key
   * found, that is {@code cursor.getRecord()} is equals to the record
   * corresponding to the key found before any call to {@code cursor.next()}
   * method.
   * The returned cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method. After the first call to {@code cursor.next()}
   * the cursor points to the record corresponding to the key found.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
@@ -421,7 +421,7 @@
    try
    {
      cursor = getCursor();
      return cursor.getRecord();
      return cursor.next() ? cursor.getRecord() : null;
    }
    finally
    {
@@ -443,7 +443,7 @@
    try
    {
      cursor = getCursor();
      Record<K, V> record = cursor.getRecord();
      Record<K, V> record = null;
      while (cursor.next())
      {
        record = cursor.getRecord();
@@ -470,15 +470,9 @@
    try
    {
      cursor = getCursor();
      Record<K, V> record = cursor.getRecord();
      if (record == null)
      {
        return 0L;
      }
      long counter = 1L;
      long counter = 0L;
      while (cursor.next())
      {
        record = cursor.getRecord();
        counter++;
      }
      return counter;
@@ -580,8 +574,8 @@
  /**
   * Implements a repositionable cursor on the log file.
   * <p>
   * The cursor initially points to a record, that is {@code cursor.getRecord()}
   * is equals to the first record available from the cursor before any call to
   * The cursor initially points to no record, that is
   * {@code cursor.getRecord()} is equals to {@code null} before any call to
   * {@code cursor.next()} method.
   */
  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
@@ -596,6 +590,12 @@
    private Record<K,V> currentRecord;
    /**
     * The initial record when starting from a given key. It must be
     * stored because it is read in advance.
     */
    private Record<K,V> initialRecord;
    /**
     * Creates a cursor on the provided log.
     *
     * @param logFile
@@ -607,16 +607,6 @@
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
      try
      {
        // position to the first record.
        next();
      }
      catch (ChangelogException e)
      {
        close();
        throw e;
      }
    }
    /**
@@ -638,6 +628,13 @@
    @Override
    public boolean next() throws ChangelogException
    {
      if (initialRecord != null)
      {
        // initial record is used only once
        currentRecord = initialRecord;
        initialRecord = null;
        return true;
      }
      currentRecord = reader.readRecord();
      return currentRecord != null;
    }
@@ -654,7 +651,7 @@
    public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
      final boolean found = result.getFirst();
      currentRecord = found ? result.getSecond() : null;
      initialRecord = found ? result.getSecond() : null;
      return found;
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -217,8 +217,19 @@
    }
  }
  @Test
  public void testGenerateCursorFromWithCursorReinitialization() throws Exception
  @DataProvider
  Object[][] dataForTestsWithCursorReinitialization()
  {
    return new Object[][] {
      // the index to use in CSN array for the start key of the cursor
      { 0 },
      { 1 },
      { 4 },
    };
  }
  @Test(dataProvider="dataForTestsWithCursorReinitialization")
  public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception
  {
    ReplicationServer replicationServer = null;
    DBCursor<UpdateMsg> cursor = null;
@@ -231,27 +242,25 @@
      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey]);
      assertFalse(cursor.next());
      for (int i = 0; i < 5; i++)
      int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
      for (int i : indicesToAdd)
      {
        if (i != 3)
        {
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
      }
      waitChangesArePersisted(replicaDB, 4);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[1]);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[2]);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[4]);
      for (int i = csnIndexForStartKey+1; i < 5; i++)
      {
        if (i != 3)
        {
          assertTrue(cursor.next());
          assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i);
        }
      }
      assertFalse(cursor.next());
      StaticUtils.close(cursor);
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -104,8 +104,7 @@
    try {
      cursor = changelog.getCursor();
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyRead(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
@@ -120,8 +119,7 @@
    try {
      cursor = changelog.getCursor("key05");
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
      assertThatCursorCanBeFullyRead(cursor, 6, 10);
      assertThatCursorCanBeFullyRead(cursor, 5, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
@@ -153,9 +151,7 @@
    try {
      cursor = changelog.getCursor(null);
      // should start from start
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyRead(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
@@ -170,9 +166,7 @@
    try {
      cursor = changelog.getNearestCursor("key01");
      // lowest higher key is key2
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key02", "value2"));
      assertThatCursorCanBeFullyRead(cursor, 3, 10);
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
@@ -187,9 +181,7 @@
    try {
      cursor = changelog.getNearestCursor("key00");
      // lowest higher key is key1
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyRead(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
@@ -204,9 +196,7 @@
    try {
      cursor = changelog.getNearestCursor(null);
      // should start from start
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyRead(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, changelog);
@@ -296,8 +286,7 @@
      // ensure log can be fully read including the new record
      cursor = logFile.getCursor("key05");
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
      assertThatCursorCanBeFullyRead(cursor, 6, 11);
      assertThatCursorCanBeFullyRead(cursor, 5, 11);
    }
    finally
    {
@@ -359,6 +348,7 @@
  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
    assertThat(cursor.getRecord()).isNull();
    for (int i = fromIndex; i <= endIndex; i++)
    {
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -88,8 +88,7 @@
    try {
      cursor = log.getCursor();
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
@@ -104,8 +103,7 @@
    try {
      cursor = log.getCursor("key005");
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key005", "value5"));
      assertThatCursorCanBeFullyRead(cursor, 6, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
@@ -138,9 +136,7 @@
    try {
      cursor = log.getCursor(null);
      // should start from first record
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
@@ -155,20 +151,14 @@
    try {
      // this key is the first key of the log file "key1_key2.log"
      cursor1 = log.getNearestCursor("key001");
      // lowest higher key is key2
      assertThat(cursor1.getRecord()).isEqualTo(Record.from("key002", "value2"));
      assertThatCursorCanBeFullyRead(cursor1, 3, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor1, 2, 10);
      // this key is the last key of the log file "key3_key4.log"
      cursor2 = log.getNearestCursor("key004");
      // lowest higher key is key5
      assertThat(cursor2.getRecord()).isEqualTo(Record.from("key005", "value5"));
      assertThatCursorCanBeFullyRead(cursor2, 6, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor2, 5, 10);
      cursor3 = log.getNearestCursor("key009");
      // lowest higher key is key10
      assertThat(cursor3.getRecord()).isEqualTo(Record.from("key010", "value10"));
      assertThatCursorIsExhausted(cursor3);
      assertThatCursorCanBeFullyReadFromStart(cursor3, 10, 10);
    }
    finally {
      StaticUtils.close(cursor1, cursor2, cursor3, log);
@@ -200,9 +190,7 @@
      // key is between key005 and key006
      cursor = log.getNearestCursor("key005000");
      // lowest higher key is key006
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key006", "value6"));
      assertThatCursorCanBeFullyRead(cursor, 7, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor, 6, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
@@ -217,9 +205,7 @@
    try {
      cursor = log.getNearestCursor(null);
      // should start from start
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
@@ -324,7 +310,7 @@
      }
      writeLog1.syncToFileSystem();
      cursor = writeLog1.getCursor("key020");
      for (int i = 1; i <= 60; i++)
      for (int i = 1; i <= 61; i++)
      {
         assertThat(cursor.next()).isTrue();
      }
@@ -370,7 +356,7 @@
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      // advance cursor to last record to ensure it is pointing to ahead log file
      advanceCursorFromFirstRecordTo(cursor, 10);
      advanceCursorUpTo(cursor, 1, 10);
      // add new records to ensure the ahead log file is rotated
      for (int i = 11; i <= 20; i++)
@@ -396,13 +382,13 @@
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor1 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor1, 1);
      advanceCursorUpTo(cursor1, 1, 1);
      cursor2 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor2, 4);
      advanceCursorUpTo(cursor2, 1, 4);
      cursor3 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor3, 9);
      advanceCursorUpTo(cursor3, 1, 9);
      cursor4 = log.getCursor();
      advanceCursorFromFirstRecordTo(cursor4, 10);
      advanceCursorUpTo(cursor4, 1, 10);
      // add new records to ensure the ahead log file is rotated
      for (int i = 11; i <= 20; i++)
@@ -503,6 +489,7 @@
      log.purgeUpTo(purgeKey);
      cursor = log.getCursor();
      assertThat(cursor.next()).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
    }
@@ -512,13 +499,6 @@
    }
  }
  private void advanceCursorFromFirstRecordTo(DBCursor<Record<String, String>> cursor, int endIndex)
      throws Exception
  {
    assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
    advanceCursorUpTo(cursor, 2, endIndex);
  }
  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
@@ -540,6 +520,16 @@
    assertThatCursorIsExhausted(cursor);
  }
  /**
   * Read the cursor until exhaustion, beginning at start of cursor.
   */
  private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
      {
    assertThat(cursor.getRecord()).isNull();
    assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex);
  }
  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
  {
    assertThat(cursor.next()).isFalse();