From dbc982944cd13543eaa810c6eb0b78a7c2524d86 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 17 Jun 2014 13:40:54 +0000
Subject: [PATCH] OPENDJ-1449 : File-based changelog should handle partially written record left over from a previous failure CR-3768
---
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java | 73 ++++++++++++++++++++++++++++--------
1 files changed, 56 insertions(+), 17 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
index 5aedc91..e5cf15e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -36,6 +36,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
@@ -47,7 +48,7 @@
* reach the beginning of previous record (or next record if offset equals 0).
* <p>
* The reader provides both sequential access, using the {@code readRecord()} method,
- * and reasonably fast random access, using the {@code seek(K, boolean)} method.
+ * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
*
* @param <K>
* Type of the key of a record, which must be comparable.
@@ -237,32 +238,30 @@
{
try
{
- final ByteString recordData = blockStartPosition == -1 ?
- readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
+ if (blockStartPosition != -1)
+ {
+ positionToRecordFromBlockStart(blockStartPosition);
+ }
+ final ByteString recordData = readNextRecord();
return recordData != null ? parser.decodeRecord(recordData) : null;
}
- catch (IOException io)
+ catch (Exception io)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
}
- catch (DecodingException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
- }
}
/**
- * Reads the first record found after the provided file position of block
+ * Position to the record given by the offset read from provided block
* start.
*
* @param blockStartPosition
* Position of read pointer in the file, expected to be the start of
* a block where a record offset is written.
- * @return the record read
* @throws IOException
* If an error occurs during read.
*/
- private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
+ private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
{
reader.seek(blockStartPosition);
if (blockStartPosition > 0)
@@ -275,7 +274,6 @@
reader.seek(blockStartPosition - offsetToRecord);
} // if offset is zero, reader is already well positioned
}
- return readNextRecord();
}
/**
@@ -321,7 +319,7 @@
}
return recordBytes.toByteString();
}
- catch(EOFException e)
+ catch (EOFException e)
{
// end of stream, no record or uncomplete record
return null;
@@ -353,7 +351,7 @@
/** Read the length of a record. */
private int readRecordLength(final int distanceToBlockStart) throws IOException
{
- final ByteStringBuilder lengthBytes = new ByteStringBuilder();
+ final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
{
lengthBytes.append(reader, distanceToBlockStart);
@@ -388,7 +386,7 @@
*/
long searchClosestBlockStartToKey(K key) throws ChangelogException
{
- final long maxPos = getLastPositionInFile();
+ final long maxPos = getFileLength() - 1;
long lowPos = 0L;
long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
@@ -428,11 +426,11 @@
return -1;
}
- private long getLastPositionInFile() throws ChangelogException
+ private long getFileLength() throws ChangelogException
{
try
{
- return reader.length() - 1;
+ return reader.length();
}
catch (IOException e)
{
@@ -533,4 +531,45 @@
return distance == 0 ? 0 : blockSize - distance;
}
+ /**
+ * Check if the log file is valid, by reading the latest records at the end of
+ * the log file.
+ * <p>
+ * The intent of this method is to allow self-recovery in case a partial
+ * record as been written at the end of file (eg, after a server crash).
+ * <p>
+ * Any unexpected exception is considered as a severe error where
+ * self-recovery is not appropriate and thus will lead to a
+ * ChangelogException.
+ *
+ * @return -1 if log is valid, or a positive number if log is invalid, where
+ * the number represents the last valid position in the log file.
+ * @throws ChangelogException
+ * if an error occurs while checking the log
+ */
+ long checkLogIsValid() throws ChangelogException
+ {
+ try
+ {
+ final long fileSize = getFileLength();
+ final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
+ positionToRecordFromBlockStart(lastBlockStart);
+
+ long lastValidPosition = lastBlockStart;
+ for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
+ parser.decodeRecord(recordData);
+ lastValidPosition = reader.getFilePointer();
+ }
+
+ final boolean isFileValid = lastValidPosition == fileSize;
+ return isFileValid ? -1 : lastValidPosition;
+ }
+ catch (Exception e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+ file.getPath(),
+ StaticUtils.stackTraceToSingleLineString(e)));
+ }
+ }
+
}
--
Gitblit v1.10.0