mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
17.40.2014 dbc982944cd13543eaa810c6eb0b78a7c2524d86
OPENDJ-1449 : File-based changelog should handle partially written record left over from a previous failure
CR-3768

* LogFile.java
** Add a check of file validity in constructor when log file is write-enabled
** Recover the file if it is corrupted (partially written record) by truncating it

* BlockLogReader.java
** Add method checkLogIsValid() to check validity of file

* replication.properties
** Add new messages related to check and recovery

* LogFileTest.java
** Add tests for recovery after log file corruption

* Minor changes in other files
7 files modified
330 ■■■■ changed files
opends/src/messages/messages/replication.properties 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java 73 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java 25 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java 50 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java 148 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java 5 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -614,4 +614,10 @@
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
 offline state file '%s' for domain %s and server id %d
SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \
 file '%s'
 log file '%s'
SEVERE_ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE_283=An error occurred while recovering the \
 replication change log file '%s'. The recovery has been aborted and this replication server \
 will be removed from the replication topology. The change log file system may be read-only, \
 full, or corrupt and must be fixed before this replication server can be used. The underlying error was: %s
INFO_CHANGELOG_LOG_FILE_RECOVERED_284=Log file '%s' was successfully \
 recovered by removing a partially written record
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -36,6 +36,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
@@ -47,7 +48,7 @@
 * reach the beginning of previous record (or next record if offset equals 0).
 * <p>
 * The reader provides both sequential access, using the {@code readRecord()} method,
 * and reasonably fast random access, using the {@code seek(K, boolean)} method.
 * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
@@ -237,32 +238,30 @@
  {
    try
    {
      final ByteString recordData = blockStartPosition == -1 ?
          readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
      if (blockStartPosition != -1)
      {
        positionToRecordFromBlockStart(blockStartPosition);
      }
      final ByteString recordData = readNextRecord();
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
    catch (IOException io)
    catch (Exception io)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
    }
    catch (DecodingException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
    }
  }
  /**
   * Reads the first record found after the provided file position of block
   * Position to the record given by the offset read from provided block
   * start.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written.
   * @return the record read
   * @throws IOException
   *           If an error occurs during read.
   */
  private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
  private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
  {
    reader.seek(blockStartPosition);
    if (blockStartPosition > 0)
@@ -275,7 +274,6 @@
        reader.seek(blockStartPosition - offsetToRecord);
      } // if offset is zero, reader is already well positioned
    }
    return readNextRecord();
  }
  /**
@@ -321,7 +319,7 @@
      }
      return recordBytes.toByteString();
    }
    catch(EOFException e)
    catch (EOFException e)
    {
      // end of stream, no record or uncomplete record
      return null;
@@ -353,7 +351,7 @@
  /** Read the length of a record. */
  private int readRecordLength(final int distanceToBlockStart) throws IOException
  {
    final ByteStringBuilder lengthBytes = new ByteStringBuilder();
    final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
    if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
    {
      lengthBytes.append(reader, distanceToBlockStart);
@@ -388,7 +386,7 @@
   */
  long searchClosestBlockStartToKey(K key) throws ChangelogException
  {
    final long maxPos = getLastPositionInFile();
    final long maxPos = getFileLength() - 1;
    long lowPos = 0L;
    long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
@@ -428,11 +426,11 @@
    return -1;
  }
  private long getLastPositionInFile() throws ChangelogException
  private long getFileLength() throws ChangelogException
  {
    try
    {
      return reader.length() - 1;
      return reader.length();
    }
    catch (IOException e)
    {
@@ -533,4 +531,45 @@
    return distance == 0 ? 0 : blockSize - distance;
  }
  /**
   * Check if the log file is valid, by reading the latest records at the end of
   * the log file.
   * <p>
   * The intent of this method is to allow self-recovery in case a partial
   * record as been written at the end of file (eg, after a server crash).
   * <p>
   * Any unexpected exception is considered as a severe error where
   * self-recovery is not appropriate and thus will lead to a
   * ChangelogException.
   *
   * @return -1 if log is valid, or a positive number if log is invalid, where
   *         the number represents the last valid position in the log file.
   * @throws ChangelogException
   *           if an error occurs while checking the log
   */
 long checkLogIsValid() throws ChangelogException
 {
   try
   {
     final long fileSize = getFileLength();
     final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
     positionToRecordFromBlockStart(lastBlockStart);
     long lastValidPosition = lastBlockStart;
     for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
       parser.decodeRecord(recordData);
       lastValidPosition = reader.getFilePointer();
     }
     final boolean isFileValid = lastValidPosition == fileSize;
     return isFileValid ? -1 : lastValidPosition;
   }
   catch (Exception e)
   {
     throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
         file.getPath(),
         StaticUtils.stackTraceToSingleLineString(e)));
   }
 }
}
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -203,21 +203,6 @@
  }
  /**
   * Get the number of changes.
   *
   * @return Returns the number of changes.
   */
  long getChangesCount()
  {
    final CSNLimits limits = csnLimits;
    if (limits.newestCSN != null && limits.oldestCSN != null)
    {
      return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
    }
    return 0;
  }
  /**
   * Returns a cursor that allows to retrieve the update messages from this DB,
   * starting at the position defined by the smallest CSN that is strictly
   * higher than the provided CSN.
@@ -363,6 +348,16 @@
    return log.getNumberOfRecords();
  }
  /**
   * Dump this DB as text files, intended for debugging purpose only.
   *
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  void dumpAsTextFiles() throws ChangelogException {
    log.dumpAsTextFile(log.getPath());
  }
  /** Parser of records persisted in the ReplicaDB log. */
  private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
  {
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -26,7 +26,6 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
@@ -51,7 +50,6 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
@@ -105,8 +103,6 @@
 */
final class Log<K extends Comparable<K>, V> implements Closeable
{
  private static final DebugTracer TRACER = getTracer();
  private static final String LOG_FILE_SUFFIX = ".log";
  static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
@@ -702,6 +698,23 @@
    }
  }
  /**
   * Dump this log as a text files, intended for debugging purpose only.
   *
   * @param dumpDirectory
   *          Directory that will contains log files with text format
   *          and ".txt" extensions
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  void dumpAsTextFile(File dumpDirectory) throws ChangelogException
  {
    for (LogFile<K, V> logFile : logFiles.values())
    {
      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close()
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.io.BufferedWriter;
@@ -33,6 +34,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
@@ -103,7 +105,15 @@
    this.isWriteEnabled = isWriteEnabled;
    createLogFileIfNotExists();
    writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
    if (isWriteEnabled)
    {
      ensureLogFileIsValid(parser);
      writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser);
    }
    else
    {
      writer = null;
    }
    readerPool = new LogReaderPool<K, V>(logfile, parser);
  }
@@ -184,6 +194,44 @@
  }
  /**
   * Ensure that log file is not corrupted, by checking it is valid and cleaning
   * the end of file if necessary, to remove a partially written record.
   * <p>
   * If log file is cleaned to remove a partially written record, then a message
   * is logged for information.
   *
   * @throws ChangelogException
   *           If an error occurs or if log file is corrupted and can't be
   *           cleaned
   */
  private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
  {
    BlockLogReader<K, V> reader = null;
    try
    {
      final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
      reader = BlockLogReader.newReader(logfile, readerWriter, parser) ;
      final long lastValidPosition = reader.checkLogIsValid();
      if (lastValidPosition != -1)
      {
          // truncate the file to point where last valid record has been read
          readerWriter.setLength(lastValidPosition);
          logError(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath()));
      }
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
          logfile.getPath(),
          StaticUtils.stackTraceToSingleLineString(e)));
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -26,9 +26,11 @@
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.io.RandomAccessFile;
import org.opends.messages.Message;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.ByteSequenceReader;
@@ -38,6 +40,7 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
@@ -46,65 +49,51 @@
@Test(sequential=true)
public class LogFileTest extends DirectoryServerTestCase
{
  private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog";
  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME);
  static final StringRecordParser RECORD_PARSER = new StringRecordParser();
  static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
      @Override
      public Record<String, String> decodeRecord(ByteString data) throws DecodingException
      {
        throw new DecodingException(Message.raw("Error when parsing record"));
      }
  };
  static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() {
    @Override
    public ByteString encodeRecord(Record<String, String> record)
    {
      // write the key only, to corrupt the log file
      return new ByteStringBuilder().append(record.getKey()).toByteString();
    }
  };
  @BeforeClass
  public void createTestDirectory()
  {
    File logDir = new File(TEST_DIRECTORY_CHANGELOG);
    logDir.mkdirs();
    TEST_DIRECTORY.mkdirs();
  }
  @BeforeMethod
  /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */
  public void initialize() throws Exception
  {
    File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME);
    if (theLogFile.exists())
    if (TEST_LOG_FILE.exists())
    {
      theLogFile.delete();
      TEST_LOG_FILE.delete();
    }
    LogFile<String, String> logFile = getLogFile(RECORD_PARSER);
    LogFile<String, String> logFile =  null;
    try
    {
      logFile = getLogFile(RECORD_PARSER);
    for (int i = 1; i <= 10; i++)
    {
      logFile.append(Record.from(String.format("key%02d", i), "value"+i));
      for (int i = 1; i <= 10; i++)
      {
        logFile.append(Record.from(String.format("key%02d", i), "value"+i));
      }
    }
    logFile.close();
    finally
    {
      StaticUtils.close(logFile);
    }
  }
  @AfterClass
  public void cleanTestChangelogDirectory()
  {
    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    if (rootPath.exists())
    {
      StaticUtils.recursiveDelete(rootPath);
    }
    StaticUtils.recursiveDelete(TEST_DIRECTORY);
  }
  private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
  {
    return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser);
    return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser);
  }
  @Test
@@ -227,7 +216,9 @@
  @Test(expectedExceptions=ChangelogException.class)
  public void testCursorWhenParserFailsToRead() throws Exception
  {
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER_FAILING_TO_READ);
    FailingStringRecordParser parser = new FailingStringRecordParser();
    LogFile<String, String> changelog = getLogFile(parser);
    parser.setFailToRead(true);
    try {
      changelog.getCursor("key");
    }
@@ -266,6 +257,72 @@
    }
  }
  @DataProvider(name = "corruptedRecordData")
  Object[][] corruptedRecordData()
  {
    return new Object[][]
    {
      // write partial record size (should be 4 bytes)
      { 1, new ByteStringBuilder().append((byte) 0) },
      // write partial record size (should be 4 bytes)
      { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) },
      // write size only
      { 3, new ByteStringBuilder().append(10) },
      // write size + key
      { 4, new ByteStringBuilder().append(100).append("key") },
      // write size + key + separator
      { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) },
      // write size + key + separator + partial value
      { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") },
    };
  }
  @Test(dataProvider="corruptedRecordData")
  public void testRecoveryOnCorruptedLogFile(
      @SuppressWarnings("unused") int unusedId,
      ByteStringBuilder corruptedRecordData) throws Exception
  {
    LogFile<String, String> logFile = null;
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      corruptTestLogFile(corruptedRecordData);
      // open the log file: the file should be repaired at this point
      logFile = getLogFile(RECORD_PARSER);
      // write a new valid record
      logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11));
      // ensure log can be fully read including the new record
      cursor = logFile.getCursor("key05");
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
      assertThatCursorCanBeFullyRead(cursor, 6, 11);
    }
    finally
    {
      StaticUtils.close(logFile);
    }
  }
  /**
   * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log
   * file.
   */
  private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception
  {
    RandomAccessFile output = null;
    try {
      output = new RandomAccessFile(TEST_LOG_FILE, "rwd");
      output.seek(output.length());
      output.write(corruptedRecordData.toByteArray());
    }
    finally
    {
      StaticUtils.close(output);
    }
  }
  @Test
  /**
   * Test that changes are visible immediately to a reader after a write.
@@ -372,4 +429,25 @@
    }
  }
  /** A parser that can be set to fail when reading. */
  static class FailingStringRecordParser extends StringRecordParser
  {
    private boolean failToRead = false;
    @Override
    public Record<String, String> decodeRecord(ByteString data) throws DecodingException
    {
      if (failToRead)
      {
        throw new DecodingException(Message.raw("Error when parsing record"));
      }
      return super.decodeRecord(data);
    }
    void setFailToRead(boolean shouldFail)
    {
      failToRead = shouldFail;
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -35,6 +35,7 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -228,7 +229,9 @@
  @Test(expectedExceptions=ChangelogException.class)
  public void testCursorWhenParserFailsToRead() throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ);
    FailingStringRecordParser parser = new FailingStringRecordParser();
    Log<String, String> log = openLog(parser);
    parser.setFailToRead(true);
    try {
      log.getCursor("key");
    }