mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
01.08.2015 25637a6fd6234f6dc3306e21cce81ef10ce931c5
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -96,64 +96,44 @@
  @Test
  public void testCursor() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor();
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor())
    {
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenAnExistingKey() throws Exception
  {
    Log<String, String> log = openLog(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor("key005");
    try (Log<String, String> log = openLog(RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor("key005"))
    {
      assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenAnUnexistingKey() throws Exception
  {
    Log<String, String> log = openLog(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      // key is between key005 and key006
      cursor = log.getCursor("key005000");
    try (Log<String, String> log = openLog(RECORD_PARSER);
        // key is between key005 and key006
        DBCursor<Record<String, String>> cursor = log.getCursor("key005000"))
    {
      assertThat(cursor).isNotNull();
      assertThat(cursor.getRecord()).isNull();
      assertThat(cursor.next()).isFalse();
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorWhenGivenANullKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(null);
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor(null))
    {
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @DataProvider
@@ -221,11 +201,9 @@
  public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(key, matchingStrategy, positionStrategy);
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor(key, matchingStrategy, positionStrategy))
    {
      if (cursorShouldStartAt != -1)
      {
        assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt);
@@ -235,83 +213,58 @@
        assertThatCursorIsExhausted(cursor);
      }
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getCursor(null, null, null);
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor(null, null, null))
    {
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
    finally {
      StaticUtils.close(cursor, log);
    }
  }
  @Test(expectedExceptions=ChangelogException.class)
  public void testCursorWhenParserFailsToRead() throws Exception
  {
    FailingStringRecordParser parser = new FailingStringRecordParser();
    Log<String, String> log = openLog(parser);
    parser.setFailToRead(true);
    try {
    try (Log<String, String> log = openLog(parser))
    {
      parser.setFailToRead(true);
      log.getCursor("key");
    }
    finally {
      StaticUtils.close(log);
    }
  }
  @Test
  public void testGetOldestRecord() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER))
    {
      Record<String, String> record = log.getOldestRecord();
      assertThat(record).isEqualTo(Record.from("key001", "value1"));
    }
    finally {
      StaticUtils.close(log);
    }
  }
  @Test
  public void testGetNewestRecord() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER))
    {
      Record<String, String> record = log.getNewestRecord();
      assertThat(record).isEqualTo(Record.from("key010", "value10"));
    }
    finally {
      StaticUtils.close(log);
    }
  }
  /**
   * Test that changes are visible immediately to a reader after a write.
   */
  /** Test that changes are visible immediately to a reader after a write. */
  @Test
  public void testWriteAndReadOnSameLog() throws Exception
  {
    Log<String, String> writeLog = null;
    Log<String, String> readLog = null;
    try
    try (Log<String, String> writeLog = openLog(LogFileTest.RECORD_PARSER);
        Log<String, String> readLog = openLog(LogFileTest.RECORD_PARSER))
    {
      writeLog = openLog(LogFileTest.RECORD_PARSER);
      readLog = openLog(LogFileTest.RECORD_PARSER);
      for (int i = 1; i <= 10; i++)
      {
        Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
@@ -322,22 +275,14 @@
        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
      }
    }
    finally
    {
      StaticUtils.close(writeLog, readLog);
    }
  }
  @Test
  public void testTwoConcurrentWrite() throws Exception
  {
    Log<String, String> writeLog1 = null;
    Log<String, String> writeLog2 = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    try (Log<String, String> writeLog1 = openLog(LogFileTest.RECORD_PARSER);
        Log<String, String> writeLog2 = openLog(LogFileTest.RECORD_PARSER))
    {
      writeLog1 = openLog(LogFileTest.RECORD_PARSER);
      writeLog2 = openLog(LogFileTest.RECORD_PARSER);
      writeLog1.append(Record.from("key020", "starting record"));
      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
@@ -352,16 +297,15 @@
        throw exceptionRef.get();
      }
      writeLog1.syncToFileSystem();
      cursor = writeLog1.getCursor("key020");
      for (int i = 1; i <= 61; i++)
      try (DBCursor<Record<String, String>> cursor = writeLog1.getCursor("key020"))
      {
         assertThat(cursor.next()).isTrue();
        for (int i = 1; i <= 61; i++)
        {
          assertThat(cursor.next()).isTrue();
        }
        assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
      }
      assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
    }
    finally
    {
      StaticUtils.close(cursor, writeLog1, writeLog2);
    }
  }
@@ -372,35 +316,27 @@
  @Test(enabled=false)
  public void logWriteSpeed() throws Exception
  {
    Log<String, String> writeLog = null;
    try
    {
      long sizeOf10MB = 10*1024*1024;
      final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION,
          NO_TIME_BASED_LOG_ROTATION);
      final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
      writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams);
    long sizeOf10MB = 10 * 1024 * 1024;
    final LogRotationParameters rotationParams = new LogRotationParameters(
        sizeOf10MB, NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION);
    final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
    try (Log<String, String> writeLog =
        Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams))
    {
      for (int i = 1; i < 1000000; i++)
      {
        writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
      }
    }
    finally
    {
      StaticUtils.close(writeLog);
    }
  }
  @Test
  public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor())
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      // advance cursor to last record to ensure it is pointing to ahead log file
      advanceCursorUpTo(cursor, 1, 10);
@@ -413,27 +349,20 @@
      // check that cursor can fully read the new records
      assertThatCursorCanBeFullyRead(cursor, 11, 20);
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  @Test
  public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
  {
    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
    Log<String, String> log = null;
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor1 = log.getCursor();
        DBCursor<Record<String, String>> cursor2 = log.getCursor();
        DBCursor<Record<String, String>> cursor3 = log.getCursor();
        DBCursor<Record<String, String>> cursor4 = log.getCursor())
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor1 = log.getCursor();
      advanceCursorUpTo(cursor1, 1, 1);
      cursor2 = log.getCursor();
      advanceCursorUpTo(cursor2, 1, 4);
      cursor3 = log.getCursor();
      advanceCursorUpTo(cursor3, 1, 9);
      cursor4 = log.getCursor();
      advanceCursorUpTo(cursor4, 1, 10);
      // add new records to ensure the ahead log file is rotated
@@ -448,28 +377,19 @@
      assertThatCursorCanBeFullyRead(cursor3, 10, 20);
      assertThatCursorCanBeFullyRead(cursor4, 11, 20);
    }
    finally
    {
      StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
    }
  }
  @Test
  public void testClear() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER))
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      log.clear();
      cursor = log.getCursor();
      assertThatCursorIsExhausted(cursor);
    }
    finally
    {
      StaticUtils.close(cursor, log);
      try (DBCursor<Record<String, String>> cursor = log.getCursor())
      {
        assertThatCursorIsExhausted(cursor);
      }
    }
  }
@@ -477,18 +397,11 @@
  @Test(enabled=false, expectedExceptions=ChangelogException.class)
  public void testClearWhenCursorIsOpened() throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    Log<String, String> log = null;
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> cursor = log.getCursor())
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      cursor = log.getCursor();
      log.clear();
    }
    finally
    {
      StaticUtils.close(cursor, log);
    }
  }
  @DataProvider(name = "purgeKeys")
@@ -526,20 +439,16 @@
  public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
      int cursorStartIndex, int cursorEndIndex) throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER))
    {
      log.purgeUpTo(purgeKey);
      cursor = log.getCursor();
      assertThat(cursor.next()).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
    }
    finally
    {
      StaticUtils.close(cursor, log);
      try (DBCursor<Record<String, String>> cursor = log.getCursor())
      {
        assertThat(cursor.next()).isTrue();
        assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
        assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
      }
    }
  }
@@ -627,15 +536,10 @@
  @Test(dataProvider = "findBoundaryKeyData")
  public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER))
    {
      assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey);
    }
    finally
    {
      StaticUtils.close(log);
    }
  }
  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
@@ -659,9 +563,7 @@
    assertThatCursorIsExhausted(cursor);
  }
  /**
   * Read the cursor until exhaustion, beginning at start of cursor.
   */
  /** Read the cursor until exhaustion, beginning at start of cursor. */
  private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex,
      int endIndex) throws Exception
  {
@@ -700,5 +602,4 @@
      }
    };
  }
}