From 9f27fd95ce75d56dbc011c4e02f8298bbabe6690 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 17 Jun 2014 13:40:54 +0000
Subject: [PATCH] OPENDJ-1449 : File-based changelog should handle partially written record left over from a previous failure CR-3768
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java | 73 +++++++++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 25 +--
opendj-sdk/opends/src/messages/messages/replication.properties | 8 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 21 ++
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java | 148 ++++++++++++++++++-----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 50 ++++++++
7 files changed, 256 insertions(+), 74 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 844c5c1..f7cf980 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -614,4 +614,10 @@
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
offline state file '%s' for domain %s and server id %d
SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \
- file '%s'
\ No newline at end of file
+ log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE_283=An error occurred while recovering the \
+ replication change log file '%s'. The recovery has been aborted and this replication server \
+ will be removed from the replication topology. The change log file system may be read-only, \
+ full, or corrupt and must be fixed before this replication server can be used. The underlying error was: %s
+INFO_CHANGELOG_LOG_FILE_RECOVERED_284=Log file '%s' was successfully \
+ recovered by removing a partially written record
\ No newline at end of file
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
index 5aedc91..e5cf15e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -36,6 +36,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
@@ -47,7 +48,7 @@
* reach the beginning of previous record (or next record if offset equals 0).
* <p>
* The reader provides both sequential access, using the {@code readRecord()} method,
- * and reasonably fast random access, using the {@code seek(K, boolean)} method.
+ * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
*
* @param <K>
* Type of the key of a record, which must be comparable.
@@ -237,32 +238,30 @@
{
try
{
- final ByteString recordData = blockStartPosition == -1 ?
- readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
+ if (blockStartPosition != -1)
+ {
+ positionToRecordFromBlockStart(blockStartPosition);
+ }
+ final ByteString recordData = readNextRecord();
return recordData != null ? parser.decodeRecord(recordData) : null;
}
- catch (IOException io)
+ catch (Exception io)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
}
- catch (DecodingException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
- }
}
/**
- * Reads the first record found after the provided file position of block
+ * Position to the record given by the offset read from provided block
* start.
*
* @param blockStartPosition
* Position of read pointer in the file, expected to be the start of
* a block where a record offset is written.
- * @return the record read
* @throws IOException
* If an error occurs during read.
*/
- private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
+ private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
{
reader.seek(blockStartPosition);
if (blockStartPosition > 0)
@@ -275,7 +274,6 @@
reader.seek(blockStartPosition - offsetToRecord);
} // if offset is zero, reader is already well positioned
}
- return readNextRecord();
}
/**
@@ -321,7 +319,7 @@
}
return recordBytes.toByteString();
}
- catch(EOFException e)
+ catch (EOFException e)
{
// end of stream, no record or uncomplete record
return null;
@@ -353,7 +351,7 @@
/** Read the length of a record. */
private int readRecordLength(final int distanceToBlockStart) throws IOException
{
- final ByteStringBuilder lengthBytes = new ByteStringBuilder();
+ final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
{
lengthBytes.append(reader, distanceToBlockStart);
@@ -388,7 +386,7 @@
*/
long searchClosestBlockStartToKey(K key) throws ChangelogException
{
- final long maxPos = getLastPositionInFile();
+ final long maxPos = getFileLength() - 1;
long lowPos = 0L;
long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
@@ -428,11 +426,11 @@
return -1;
}
- private long getLastPositionInFile() throws ChangelogException
+ private long getFileLength() throws ChangelogException
{
try
{
- return reader.length() - 1;
+ return reader.length();
}
catch (IOException e)
{
@@ -533,4 +531,45 @@
return distance == 0 ? 0 : blockSize - distance;
}
+ /**
+ * Check if the log file is valid, by reading the latest records at the end of
+ * the log file.
+ * <p>
+ * The intent of this method is to allow self-recovery in case a partial
+ * record as been written at the end of file (eg, after a server crash).
+ * <p>
+ * Any unexpected exception is considered as a severe error where
+ * self-recovery is not appropriate and thus will lead to a
+ * ChangelogException.
+ *
+ * @return -1 if log is valid, or a positive number if log is invalid, where
+ * the number represents the last valid position in the log file.
+ * @throws ChangelogException
+ * if an error occurs while checking the log
+ */
+ long checkLogIsValid() throws ChangelogException
+ {
+ try
+ {
+ final long fileSize = getFileLength();
+ final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
+ positionToRecordFromBlockStart(lastBlockStart);
+
+ long lastValidPosition = lastBlockStart;
+ for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
+ parser.decodeRecord(recordData);
+ lastValidPosition = reader.getFilePointer();
+ }
+
+ final boolean isFileValid = lastValidPosition == fileSize;
+ return isFileValid ? -1 : lastValidPosition;
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+ file.getPath(),
+ StaticUtils.stackTraceToSingleLineString(e)));
+ }
+ }
+
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 988e65c..a4acf52 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -203,21 +203,6 @@
}
/**
- * Get the number of changes.
- *
- * @return Returns the number of changes.
- */
- long getChangesCount()
- {
- final CSNLimits limits = csnLimits;
- if (limits.newestCSN != null && limits.oldestCSN != null)
- {
- return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
- }
- return 0;
- }
-
- /**
* Returns a cursor that allows to retrieve the update messages from this DB,
* starting at the position defined by the smallest CSN that is strictly
* higher than the provided CSN.
@@ -363,6 +348,16 @@
return log.getNumberOfRecords();
}
+ /**
+ * Dump this DB as text files, intended for debugging purpose only.
+ *
+ * @throws ChangelogException
+ * If an error occurs during dump
+ */
+ void dumpAsTextFiles() throws ChangelogException {
+ log.dumpAsTextFile(log.getPath());
+ }
+
/** Parser of records persisted in the ReplicaDB log. */
private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 28f0366..a987233 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -26,7 +26,6 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
@@ -51,7 +50,6 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.loggers.ErrorLogger;
-import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
@@ -105,8 +103,6 @@
*/
final class Log<K extends Comparable<K>, V> implements Closeable
{
- private static final DebugTracer TRACER = getTracer();
-
private static final String LOG_FILE_SUFFIX = ".log";
static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
@@ -702,6 +698,23 @@
}
}
+ /**
+ * Dump this log as a text files, intended for debugging purpose only.
+ *
+ * @param dumpDirectory
+ * Directory that will contains log files with text format
+ * and ".txt" extensions
+ * @throws ChangelogException
+ * If an error occurs during dump
+ */
+ void dumpAsTextFile(File dumpDirectory) throws ChangelogException
+ {
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+ }
+ }
+
/** {@inheritDoc} */
@Override
public void close()
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index b27c78e..3b91bfa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.io.BufferedWriter;
@@ -33,6 +34,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
@@ -103,7 +105,15 @@
this.isWriteEnabled = isWriteEnabled;
createLogFileIfNotExists();
- writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
+ if (isWriteEnabled)
+ {
+ ensureLogFileIsValid(parser);
+ writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser);
+ }
+ else
+ {
+ writer = null;
+ }
readerPool = new LogReaderPool<K, V>(logfile, parser);
}
@@ -184,6 +194,44 @@
}
/**
+ * Ensure that log file is not corrupted, by checking it is valid and cleaning
+ * the end of file if necessary, to remove a partially written record.
+ * <p>
+ * If log file is cleaned to remove a partially written record, then a message
+ * is logged for information.
+ *
+ * @throws ChangelogException
+ * If an error occurs or if log file is corrupted and can't be
+ * cleaned
+ */
+ private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
+ {
+ BlockLogReader<K, V> reader = null;
+ try
+ {
+ final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+ reader = BlockLogReader.newReader(logfile, readerWriter, parser) ;
+ final long lastValidPosition = reader.checkLogIsValid();
+ if (lastValidPosition != -1)
+ {
+ // truncate the file to point where last valid record has been read
+ readerWriter.setLength(lastValidPosition);
+ logError(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath()));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+ logfile.getPath(),
+ StaticUtils.stackTraceToSingleLineString(e)));
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ /**
* Add the provided record at the end of this log.
* <p>
* In order to ensure that record is written out of buffers and persisted
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
index 2af0e8c..83292b4 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -26,9 +26,11 @@
package org.opends.server.replication.server.changelog.file;
import java.io.File;
+import java.io.RandomAccessFile;
import org.opends.messages.Message;
import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.ByteSequenceReader;
@@ -38,6 +40,7 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
@@ -46,65 +49,51 @@
@Test(sequential=true)
public class LogFileTest extends DirectoryServerTestCase
{
- private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog";
+ private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+ private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME);
static final StringRecordParser RECORD_PARSER = new StringRecordParser();
- static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
- @Override
- public Record<String, String> decodeRecord(ByteString data) throws DecodingException
- {
- throw new DecodingException(Message.raw("Error when parsing record"));
- }
- };
-
- static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() {
- @Override
- public ByteString encodeRecord(Record<String, String> record)
- {
- // write the key only, to corrupt the log file
- return new ByteStringBuilder().append(record.getKey()).toByteString();
- }
- };
-
@BeforeClass
public void createTestDirectory()
{
- File logDir = new File(TEST_DIRECTORY_CHANGELOG);
- logDir.mkdirs();
+ TEST_DIRECTORY.mkdirs();
}
@BeforeMethod
/** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */
public void initialize() throws Exception
{
- File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME);
- if (theLogFile.exists())
+ if (TEST_LOG_FILE.exists())
{
- theLogFile.delete();
+ TEST_LOG_FILE.delete();
}
- LogFile<String, String> logFile = getLogFile(RECORD_PARSER);
+ LogFile<String, String> logFile = null;
+ try
+ {
+ logFile = getLogFile(RECORD_PARSER);
- for (int i = 1; i <= 10; i++)
- {
- logFile.append(Record.from(String.format("key%02d", i), "value"+i));
+ for (int i = 1; i <= 10; i++)
+ {
+ logFile.append(Record.from(String.format("key%02d", i), "value"+i));
+ }
}
- logFile.close();
+ finally
+ {
+ StaticUtils.close(logFile);
+ }
}
@AfterClass
public void cleanTestChangelogDirectory()
{
- final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
- if (rootPath.exists())
- {
- StaticUtils.recursiveDelete(rootPath);
- }
+ StaticUtils.recursiveDelete(TEST_DIRECTORY);
}
private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
{
- return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser);
+ return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser);
}
@Test
@@ -227,7 +216,9 @@
@Test(expectedExceptions=ChangelogException.class)
public void testCursorWhenParserFailsToRead() throws Exception
{
- LogFile<String, String> changelog = getLogFile(RECORD_PARSER_FAILING_TO_READ);
+ FailingStringRecordParser parser = new FailingStringRecordParser();
+ LogFile<String, String> changelog = getLogFile(parser);
+ parser.setFailToRead(true);
try {
changelog.getCursor("key");
}
@@ -266,6 +257,72 @@
}
}
+ @DataProvider(name = "corruptedRecordData")
+ Object[][] corruptedRecordData()
+ {
+ return new Object[][]
+ {
+ // write partial record size (should be 4 bytes)
+ { 1, new ByteStringBuilder().append((byte) 0) },
+ // write partial record size (should be 4 bytes)
+ { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) },
+ // write size only
+ { 3, new ByteStringBuilder().append(10) },
+ // write size + key
+ { 4, new ByteStringBuilder().append(100).append("key") },
+ // write size + key + separator
+ { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) },
+ // write size + key + separator + partial value
+ { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") },
+ };
+ }
+
+ @Test(dataProvider="corruptedRecordData")
+ public void testRecoveryOnCorruptedLogFile(
+ @SuppressWarnings("unused") int unusedId,
+ ByteStringBuilder corruptedRecordData) throws Exception
+ {
+ LogFile<String, String> logFile = null;
+ DBCursor<Record<String, String>> cursor = null;
+ try
+ {
+ corruptTestLogFile(corruptedRecordData);
+
+ // open the log file: the file should be repaired at this point
+ logFile = getLogFile(RECORD_PARSER);
+
+ // write a new valid record
+ logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11));
+
+ // ensure log can be fully read including the new record
+ cursor = logFile.getCursor("key05");
+ assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
+ assertThatCursorCanBeFullyRead(cursor, 6, 11);
+ }
+ finally
+ {
+ StaticUtils.close(logFile);
+ }
+ }
+
+ /**
+ * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log
+ * file.
+ */
+ private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception
+ {
+ RandomAccessFile output = null;
+ try {
+ output = new RandomAccessFile(TEST_LOG_FILE, "rwd");
+ output.seek(output.length());
+ output.write(corruptedRecordData.toByteArray());
+ }
+ finally
+ {
+ StaticUtils.close(output);
+ }
+ }
+
@Test
/**
* Test that changes are visible immediately to a reader after a write.
@@ -372,4 +429,25 @@
}
}
+ /** A parser that can be set to fail when reading. */
+ static class FailingStringRecordParser extends StringRecordParser
+ {
+ private boolean failToRead = false;
+
+ @Override
+ public Record<String, String> decodeRecord(ByteString data) throws DecodingException
+ {
+ if (failToRead)
+ {
+ throw new DecodingException(Message.raw("Error when parsing record"));
+ }
+ return super.decodeRecord(data);
+ }
+
+ void setFailToRead(boolean shouldFail)
+ {
+ failToRead = shouldFail;
+ }
+ }
+
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
index 321d256..72bf1b6 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -35,6 +35,7 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -228,7 +229,9 @@
@Test(expectedExceptions=ChangelogException.class)
public void testCursorWhenParserFailsToRead() throws Exception
{
- Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ);
+ FailingStringRecordParser parser = new FailingStringRecordParser();
+ Log<String, String> log = openLog(parser);
+ parser.setFailToRead(true);
try {
log.getCursor("key");
}
--
Gitblit v1.10.0