From dbc982944cd13543eaa810c6eb0b78a7c2524d86 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 17 Jun 2014 13:40:54 +0000
Subject: [PATCH] OPENDJ-1449 : File-based changelog should handle partially written record left over from a previous failure CR-3768

---
 opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java |   73 ++++++++++++++++++++++++++++--------
 1 files changed, 56 insertions(+), 17 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
index 5aedc91..e5cf15e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -36,6 +36,7 @@
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.ByteString;
 import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
 
 import com.forgerock.opendj.util.Pair;
 
@@ -47,7 +48,7 @@
  * reach the beginning of previous record (or next record if offset equals 0).
  * <p>
  * The reader provides both sequential access, using the {@code readRecord()} method,
- * and reasonably fast random access, using the {@code seek(K, boolean)} method.
+ * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
  *
  * @param <K>
  *          Type of the key of a record, which must be comparable.
@@ -237,32 +238,30 @@
   {
     try
     {
-      final ByteString recordData = blockStartPosition == -1 ?
-          readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
+      if (blockStartPosition != -1)
+      {
+        positionToRecordFromBlockStart(blockStartPosition);
+      }
+      final ByteString recordData = readNextRecord();
       return recordData != null ? parser.decodeRecord(recordData) : null;
     }
-    catch (IOException io)
+    catch (Exception io)
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
     }
-    catch (DecodingException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
-    }
   }
 
   /**
-   * Reads the first record found after the provided file position of block
+   * Position to the record given by the offset read from provided block
    * start.
    *
    * @param blockStartPosition
    *          Position of read pointer in the file, expected to be the start of
    *          a block where a record offset is written.
-   * @return the record read
    * @throws IOException
    *           If an error occurs during read.
    */
-  private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
+  private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
   {
     reader.seek(blockStartPosition);
     if (blockStartPosition > 0)
@@ -275,7 +274,6 @@
         reader.seek(blockStartPosition - offsetToRecord);
       } // if offset is zero, reader is already well positioned
     }
-    return readNextRecord();
   }
 
   /**
@@ -321,7 +319,7 @@
       }
       return recordBytes.toByteString();
     }
-    catch(EOFException e)
+    catch (EOFException e)
     {
       // end of stream, no record or uncomplete record
       return null;
@@ -353,7 +351,7 @@
   /** Read the length of a record. */
   private int readRecordLength(final int distanceToBlockStart) throws IOException
   {
-    final ByteStringBuilder lengthBytes = new ByteStringBuilder();
+    final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
     if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
     {
       lengthBytes.append(reader, distanceToBlockStart);
@@ -388,7 +386,7 @@
    */
   long searchClosestBlockStartToKey(K key) throws ChangelogException
   {
-    final long maxPos = getLastPositionInFile();
+    final long maxPos = getFileLength() - 1;
     long lowPos = 0L;
     long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
 
@@ -428,11 +426,11 @@
     return -1;
   }
 
-  private long getLastPositionInFile() throws ChangelogException
+  private long getFileLength() throws ChangelogException
   {
     try
     {
-      return reader.length() - 1;
+      return reader.length();
     }
     catch (IOException e)
     {
@@ -533,4 +531,45 @@
     return distance == 0 ? 0 : blockSize - distance;
   }
 
+  /**
+   * Check if the log file is valid, by reading the latest records at the end of
+   * the log file.
+   * <p>
+   * The intent of this method is to allow self-recovery in case a partial
+   * record as been written at the end of file (eg, after a server crash).
+   * <p>
+   * Any unexpected exception is considered as a severe error where
+   * self-recovery is not appropriate and thus will lead to a
+   * ChangelogException.
+   *
+   * @return -1 if log is valid, or a positive number if log is invalid, where
+   *         the number represents the last valid position in the log file.
+   * @throws ChangelogException
+   *           if an error occurs while checking the log
+   */
+ long checkLogIsValid() throws ChangelogException
+ {
+   try
+   {
+     final long fileSize = getFileLength();
+     final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
+     positionToRecordFromBlockStart(lastBlockStart);
+
+     long lastValidPosition = lastBlockStart;
+     for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
+       parser.decodeRecord(recordData);
+       lastValidPosition = reader.getFilePointer();
+     }
+
+     final boolean isFileValid = lastValidPosition == fileSize;
+     return isFileValid ? -1 : lastValidPosition;
+   }
+   catch (Exception e)
+   {
+     throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+         file.getPath(),
+         StaticUtils.stackTraceToSingleLineString(e)));
+   }
+ }
+
 }

--
Gitblit v1.10.0