opends/src/messages/messages/replication.properties
@@ -614,4 +614,10 @@ SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \ offline state file '%s' for domain %s and server id %d SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \ file '%s' log file '%s' SEVERE_ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE_283=An error occurred while recovering the \ replication change log file '%s'. The recovery has been aborted and this replication server \ will be removed from the replication topology. The change log file system may be read-only, \ full, or corrupt and must be fixed before this replication server can be used. The underlying error was: %s INFO_CHANGELOG_LOG_FILE_RECOVERED_284=Log file '%s' was successfully \ recovered by removing a partially written record opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -36,6 +36,7 @@ import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.types.ByteString; import org.opends.server.types.ByteStringBuilder; import org.opends.server.util.StaticUtils; import com.forgerock.opendj.util.Pair; @@ -47,7 +48,7 @@ * reach the beginning of previous record (or next record if offset equals 0). * <p> * The reader provides both sequential access, using the {@code readRecord()} method, * and reasonably fast random access, using the {@code seek(K, boolean)} method. * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method. * * @param <K> * Type of the key of a record, which must be comparable. @@ -237,32 +238,30 @@ { try { final ByteString recordData = blockStartPosition == -1 ? readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition); if (blockStartPosition != -1) { positionToRecordFromBlockStart(blockStartPosition); } final ByteString recordData = readNextRecord(); return recordData != null ? parser.decodeRecord(recordData) : null; } catch (IOException io) catch (Exception io) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io); } catch (DecodingException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e); } } /** * Reads the first record found after the provided file position of block * Position to the record given by the offset read from provided block * start. * * @param blockStartPosition * Position of read pointer in the file, expected to be the start of * a block where a record offset is written. * @return the record read * @throws IOException * If an error occurs during read. */ private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException { reader.seek(blockStartPosition); if (blockStartPosition > 0) @@ -275,7 +274,6 @@ reader.seek(blockStartPosition - offsetToRecord); } // if offset is zero, reader is already well positioned } return readNextRecord(); } /** @@ -321,7 +319,7 @@ } return recordBytes.toByteString(); } catch(EOFException e) catch (EOFException e) { // end of stream, no record or uncomplete record return null; @@ -353,7 +351,7 @@ /** Read the length of a record. */ private int readRecordLength(final int distanceToBlockStart) throws IOException { final ByteStringBuilder lengthBytes = new ByteStringBuilder(); final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE); if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE) { lengthBytes.append(reader, distanceToBlockStart); @@ -388,7 +386,7 @@ */ long searchClosestBlockStartToKey(K key) throws ChangelogException { final long maxPos = getLastPositionInFile(); final long maxPos = getFileLength() - 1; long lowPos = 0L; long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos); @@ -428,11 +426,11 @@ return -1; } private long getLastPositionInFile() throws ChangelogException private long getFileLength() throws ChangelogException { try { return reader.length() - 1; return reader.length(); } catch (IOException e) { @@ -533,4 +531,45 @@ return distance == 0 ? 0 : blockSize - distance; } /** * Check if the log file is valid, by reading the latest records at the end of * the log file. * <p> * The intent of this method is to allow self-recovery in case a partial * record as been written at the end of file (eg, after a server crash). * <p> * Any unexpected exception is considered as a severe error where * self-recovery is not appropriate and thus will lead to a * ChangelogException. * * @return -1 if log is valid, or a positive number if log is invalid, where * the number represents the last valid position in the log file. * @throws ChangelogException * if an error occurs while checking the log */ long checkLogIsValid() throws ChangelogException { try { final long fileSize = getFileLength(); final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize); positionToRecordFromBlockStart(lastBlockStart); long lastValidPosition = lastBlockStart; for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) { parser.decodeRecord(recordData); lastValidPosition = reader.getFilePointer(); } final boolean isFileValid = lastValidPosition == fileSize; return isFileValid ? -1 : lastValidPosition; } catch (Exception e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get( file.getPath(), StaticUtils.stackTraceToSingleLineString(e))); } } } opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -203,21 +203,6 @@ } /** * Get the number of changes. * * @return Returns the number of changes. */ long getChangesCount() { final CSNLimits limits = csnLimits; if (limits.newestCSN != null && limits.oldestCSN != null) { return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1; } return 0; } /** * Returns a cursor that allows to retrieve the update messages from this DB, * starting at the position defined by the smallest CSN that is strictly * higher than the provided CSN. @@ -363,6 +348,16 @@ return log.getNumberOfRecords(); } /** * Dump this DB as text files, intended for debugging purpose only. * * @throws ChangelogException * If an error occurs during dump */ void dumpAsTextFiles() throws ChangelogException { log.dumpAsTextFile(log.getPath()); } /** Parser of records persisted in the ReplicaDB log. */ private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg> { opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -26,7 +26,6 @@ package org.opends.server.replication.server.changelog.file; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import java.io.Closeable; @@ -51,7 +50,6 @@ import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.loggers.ErrorLogger; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor; @@ -105,8 +103,6 @@ */ final class Log<K extends Comparable<K>, V> implements Closeable { private static final DebugTracer TRACER = getTracer(); private static final String LOG_FILE_SUFFIX = ".log"; static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX; @@ -702,6 +698,23 @@ } } /** * Dump this log as a text files, intended for debugging purpose only. * * @param dumpDirectory * Directory that will contains log files with text format * and ".txt" extensions * @throws ChangelogException * If an error occurs during dump */ void dumpAsTextFile(File dumpDirectory) throws ChangelogException { for (LogFile<K, V> logFile : logFiles.values()) { logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt")); } } /** {@inheritDoc} */ @Override public void close() opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -26,6 +26,7 @@ package org.opends.server.replication.server.changelog.file; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import java.io.BufferedWriter; @@ -33,6 +34,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.RandomAccessFile; import org.forgerock.util.Reject; import org.opends.messages.Message; @@ -103,7 +105,15 @@ this.isWriteEnabled = isWriteEnabled; createLogFileIfNotExists(); writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null; if (isWriteEnabled) { ensureLogFileIsValid(parser); writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser); } else { writer = null; } readerPool = new LogReaderPool<K, V>(logfile, parser); } @@ -184,6 +194,44 @@ } /** * Ensure that log file is not corrupted, by checking it is valid and cleaning * the end of file if necessary, to remove a partially written record. * <p> * If log file is cleaned to remove a partially written record, then a message * is logged for information. * * @throws ChangelogException * If an error occurs or if log file is corrupted and can't be * cleaned */ private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException { BlockLogReader<K, V> reader = null; try { final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws"); reader = BlockLogReader.newReader(logfile, readerWriter, parser) ; final long lastValidPosition = reader.checkLogIsValid(); if (lastValidPosition != -1) { // truncate the file to point where last valid record has been read readerWriter.setLength(lastValidPosition); logError(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath())); } } catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get( logfile.getPath(), StaticUtils.stackTraceToSingleLineString(e))); } finally { StaticUtils.close(reader); } } /** * Add the provided record at the end of this log. * <p> * In order to ensure that record is written out of buffers and persisted opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -26,9 +26,11 @@ package org.opends.server.replication.server.changelog.file; import java.io.File; import java.io.RandomAccessFile; import org.opends.messages.Message; import org.opends.server.DirectoryServerTestCase; import org.opends.server.TestCaseUtils; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.ByteSequenceReader; @@ -38,6 +40,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.assertj.core.api.Assertions.*; @@ -46,65 +49,51 @@ @Test(sequential=true) public class LogFileTest extends DirectoryServerTestCase { private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog"; private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME); static final StringRecordParser RECORD_PARSER = new StringRecordParser(); static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() { @Override public Record<String, String> decodeRecord(ByteString data) throws DecodingException { throw new DecodingException(Message.raw("Error when parsing record")); } }; static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() { @Override public ByteString encodeRecord(Record<String, String> record) { // write the key only, to corrupt the log file return new ByteStringBuilder().append(record.getKey()).toByteString(); } }; @BeforeClass public void createTestDirectory() { File logDir = new File(TEST_DIRECTORY_CHANGELOG); logDir.mkdirs(); TEST_DIRECTORY.mkdirs(); } @BeforeMethod /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */ public void initialize() throws Exception { File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME); if (theLogFile.exists()) if (TEST_LOG_FILE.exists()) { theLogFile.delete(); TEST_LOG_FILE.delete(); } LogFile<String, String> logFile = getLogFile(RECORD_PARSER); LogFile<String, String> logFile = null; try { logFile = getLogFile(RECORD_PARSER); for (int i = 1; i <= 10; i++) { logFile.append(Record.from(String.format("key%02d", i), "value"+i)); for (int i = 1; i <= 10; i++) { logFile.append(Record.from(String.format("key%02d", i), "value"+i)); } } logFile.close(); finally { StaticUtils.close(logFile); } } @AfterClass public void cleanTestChangelogDirectory() { final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); if (rootPath.exists()) { StaticUtils.recursiveDelete(rootPath); } StaticUtils.recursiveDelete(TEST_DIRECTORY); } private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException { return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser); return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser); } @Test @@ -227,7 +216,9 @@ @Test(expectedExceptions=ChangelogException.class) public void testCursorWhenParserFailsToRead() throws Exception { LogFile<String, String> changelog = getLogFile(RECORD_PARSER_FAILING_TO_READ); FailingStringRecordParser parser = new FailingStringRecordParser(); LogFile<String, String> changelog = getLogFile(parser); parser.setFailToRead(true); try { changelog.getCursor("key"); } @@ -266,6 +257,72 @@ } } @DataProvider(name = "corruptedRecordData") Object[][] corruptedRecordData() { return new Object[][] { // write partial record size (should be 4 bytes) { 1, new ByteStringBuilder().append((byte) 0) }, // write partial record size (should be 4 bytes) { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) }, // write size only { 3, new ByteStringBuilder().append(10) }, // write size + key { 4, new ByteStringBuilder().append(100).append("key") }, // write size + key + separator { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) }, // write size + key + separator + partial value { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") }, }; } @Test(dataProvider="corruptedRecordData") public void testRecoveryOnCorruptedLogFile( @SuppressWarnings("unused") int unusedId, ByteStringBuilder corruptedRecordData) throws Exception { LogFile<String, String> logFile = null; DBCursor<Record<String, String>> cursor = null; try { corruptTestLogFile(corruptedRecordData); // open the log file: the file should be repaired at this point logFile = getLogFile(RECORD_PARSER); // write a new valid record logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11)); // ensure log can be fully read including the new record cursor = logFile.getCursor("key05"); assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5")); assertThatCursorCanBeFullyRead(cursor, 6, 11); } finally { StaticUtils.close(logFile); } } /** * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log * file. */ private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception { RandomAccessFile output = null; try { output = new RandomAccessFile(TEST_LOG_FILE, "rwd"); output.seek(output.length()); output.write(corruptedRecordData.toByteArray()); } finally { StaticUtils.close(output); } } @Test /** * Test that changes are visible immediately to a reader after a write. @@ -372,4 +429,25 @@ } } /** A parser that can be set to fail when reading. */ static class FailingStringRecordParser extends StringRecordParser { private boolean failToRead = false; @Override public Record<String, String> decodeRecord(ByteString data) throws DecodingException { if (failToRead) { throw new DecodingException(Message.raw("Error when parsing record")); } return super.decodeRecord(data); } void setFailToRead(boolean shouldFail) { failToRead = shouldFail; } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -35,6 +35,7 @@ import org.opends.server.TestCaseUtils; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser; import org.opends.server.util.StaticUtils; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -228,7 +229,9 @@ @Test(expectedExceptions=ChangelogException.class) public void testCursorWhenParserFailsToRead() throws Exception { Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ); FailingStringRecordParser parser = new FailingStringRecordParser(); Log<String, String> log = openLog(parser); parser.setFailToRead(true); try { log.getCursor("key"); }