From b877a7554a1fa1c47a2982541972efe780dfad9a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 11 Jun 2015 13:53:40 +0000
Subject: [PATCH] OPENDJ-1705 File based changelog: handle concurrency between purge and cursors

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java |  521 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 392 insertions(+), 129 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 28b2e86..d503cb3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -39,7 +39,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -53,6 +52,7 @@
 import org.forgerock.util.Reject;
 import org.forgerock.util.Utils;
 import org.forgerock.util.time.TimeService;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -166,7 +166,7 @@
    * The list of non-empty cursors opened on this log. Opened cursors may have
    * to be updated when rotating the head log file.
    */
-  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+  private final List<AbortableLogCursor<K, V>> openCursors = new CopyOnWriteArrayList<AbortableLogCursor<K, V>>();
 
   /**
    * A log file can be rotated once it has exceeded this size limit. The log file can have
@@ -246,6 +246,17 @@
     return log;
   }
 
+  /**
+   * Returns an empty cursor.
+   *
+   * @param <K> the type of keys.
+   * @param <V> the type of values.
+   * @return an empty cursor
+   */
+  static <K extends Comparable<K>, V> RepositionableCursor<K, V> getEmptyCursor() {
+    return new Log.EmptyCursor<K, V>();
+  }
+
   /** Holds the parameters for log files rotation. */
   static class LogRotationParameters {
 
@@ -331,6 +342,7 @@
     this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
     this.rotationIntervalInMillis = rotationParams.rotationInterval;
     this.lastRotationTime = rotationParams.lastRotationTime;
+
     this.referenceCount = 1;
 
     final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -435,7 +447,7 @@
       LogFile<K, V> headLogFile = getHeadLogFile();
       if (mustRotate(headLogFile))
       {
-        logger.debug(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+        logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
 
         rotateHeadLogFile();
         headLogFile = getHeadLogFile();
@@ -459,13 +471,20 @@
     if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
     {
       // rotate because file size exceeded threshold
+      logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes());
       return true;
     }
     if (rotationIntervalInMillis > 0)
     {
       // rotate if time limit is reached
       final long timeElapsed = timeService.since(lastRotationTime);
-      return timeElapsed > rotationIntervalInMillis;
+      boolean shouldRotate = timeElapsed > rotationIntervalInMillis;
+      if (shouldRotate)
+      {
+        logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s",
+            logPath.getPath(), timeElapsed, rotationIntervalInMillis);
+      }
+      return shouldRotate;
     }
     return false;
   }
@@ -512,15 +531,15 @@
    */
   public RepositionableCursor<K, V> getCursor() throws ChangelogException
   {
-    LogCursor<K, V> cursor = null;
+    AbortableLogCursor<K, V> cursor = null;
     sharedLock.lock();
     try
     {
       if (isClosed)
       {
-        return new EmptyLogCursor<K, V>();
+        return new EmptyCursor<K, V>();
       }
-      cursor = new LogCursor<K, V>(this);
+      cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
       cursor.positionTo(null, null, null);
       registerCursor(cursor);
       return cursor;
@@ -575,15 +594,15 @@
     {
       return getCursor();
     }
-    LogCursor<K, V> cursor = null;
+    AbortableLogCursor<K, V> cursor = null;
     sharedLock.lock();
     try
     {
       if (isClosed)
       {
-        return new EmptyLogCursor<K, V>();
+        return new EmptyCursor<K, V>();
       }
-      cursor = new LogCursor<K, V>(this);
+      cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
       final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
       // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
       if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
@@ -594,7 +613,7 @@
       else
       {
         StaticUtils.close(cursor);
-        return new EmptyLogCursor<K, V>();
+        return new EmptyCursor<K, V>();
       }
     }
     catch (ChangelogException e)
@@ -701,12 +720,13 @@
         return null;
       }
       final List<String> undeletableFiles = new ArrayList<String>();
-      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+      final Iterator<LogFile<K, V>> entriesToPurge = logFilesToPurge.values().iterator();
       while (entriesToPurge.hasNext())
       {
-        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+        final LogFile<K, V> logFile = entriesToPurge.next();
         try
         {
+          abortCursorsOpenOnLogFile(logFile);
           logFile.close();
           logFile.delete();
           entriesToPurge.remove();
@@ -733,7 +753,23 @@
   }
 
   /**
+   * Abort all cursors opened on the provided log file.
+   */
+  private void abortCursorsOpenOnLogFile(LogFile<K, V> logFile)
+  {
+    for (AbortableLogCursor<K, V> cursor : openCursors)
+    {
+      if (cursor.isAccessingLogFile(logFile))
+      {
+        cursor.abort();
+      }
+    }
+  }
+
+  /**
    * Empties the log, discarding all records it contains.
+   * <p>
+   * All cursors open on the log are aborted.
    *
    * @throws ChangelogException
    *           If cursors are opened on this log, or if a problem occurs during
@@ -750,9 +786,10 @@
       }
       if (!openCursors.isEmpty())
       {
-        // Allow opened cursors at this point, but turn them into empty cursors.
-        // This behavior is needed by the change number indexer thread.
-        switchCursorsOpenedIntoEmptyCursors();
+        // All open cursors are aborted, which means the change number indexer thread
+        // should manage AbortedChangelogCursorException specifically to avoid being
+        // stopped
+        abortAllOpenCursors();
       }
 
       // delete all log files
@@ -806,7 +843,6 @@
     }
   }
 
-  /** {@inheritDoc} */
   @Override
   public void close()
   {
@@ -856,19 +892,19 @@
     sharedLock.lock();
     try
     {
-      K key = null;
-      for (LogFile<K, V> logFile : logFiles.values())
+    K key = null;
+    for (LogFile<K, V> logFile : logFiles.values())
+    {
+      final Record<K, V> record = logFile.getOldestRecord();
+      final V2 oldestValue = mapper.map(record.getValue());
+      if (oldestValue.compareTo(limitValue) > 0)
       {
-        final Record<K, V> record = logFile.getOldestRecord();
-        final V2 oldestValue = mapper.map(record.getValue());
-        if (oldestValue.compareTo(limitValue) > 0)
-        {
-          return key;
-        }
-        key = record.getKey();
+        return key;
       }
-      return key;
+      key = record.getKey();
     }
+    return key;
+  }
     finally
     {
       sharedLock.unlock();
@@ -918,7 +954,7 @@
   private void rotateHeadLogFile() throws ChangelogException
   {
     // Temporarily disable cursors opened on head, saving their state
-    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+    final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
 
     final LogFile<K, V> headLogFile = getHeadLogFile();
     final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
@@ -993,31 +1029,30 @@
   }
 
   /** Update the cursors that were pointing to head after a rotation of the head log file. */
-  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
       throws ChangelogException
   {
-    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
+    for (Pair<AbortableLogCursor<K, V>, CursorState<K, V>> pair : cursors)
     {
       final CursorState<K, V> cursorState = pair.getSecond();
 
       // Need to update the cursor only if it is pointing to the head log file
-      if (isHeadLogFile(cursorState.logFile))
+      if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
       {
         final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
         final LogFile<K, V> logFile = findLogFileFor(previousKey);
-        final LogCursor<K, V> cursor = pair.getFirst();
+        final AbortableLogCursor<K, V> cursor = pair.getFirst();
         cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
       }
     }
   }
 
-  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
+  private void abortAllOpenCursors() throws ChangelogException
   {
-    for (LogCursor<K, V> cursor : openCursors)
+    for (AbortableLogCursor<K, V> cursor : openCursors)
     {
-      cursor.actAsEmptyCursor();
+      cursor.abort();
     }
-    openCursors.clear();
   }
 
   /**
@@ -1028,13 +1063,14 @@
    * @throws ChangelogException
    *           If an error occurs.
    */
-  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+  private List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead()
+      throws ChangelogException
   {
-    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
-        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
-    for (LogCursor<K, V> cursor : openCursors)
+    final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> openCursorsStates = new ArrayList<>();
+    final LogFile<K, V> headLogFile = getHeadLogFile();
+    for (AbortableLogCursor<K, V> cursor : openCursors)
     {
-      if (isHeadLogFile(cursor.currentLogFile))
+      if (cursor.isAccessingLogFile(headLogFile))
       {
         openCursorsStates.add(Pair.of(cursor, cursor.getState()));
         cursor.closeUnderlyingCursor();
@@ -1058,7 +1094,7 @@
     logFiles.put(bounds.getSecond(), logFile);
   }
 
-  private void registerCursor(final LogCursor<K, V> cursor)
+  private void registerCursor(final AbortableLogCursor<K, V> cursor)
   {
     openCursors.add(cursor);
   }
@@ -1126,20 +1162,38 @@
   }
 
   /**
-   * Implements a cursor on the log.
+   * Represents an internal view of a cursor on the log, with extended operations.
    * <p>
-   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
-   * <p>
-   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
-   * method.
+   * This is an abstract class rather than an interface to allow reduced visibility of the methods.
    */
-  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+  private abstract static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+  {
+
+    /** Closes the underlying cursor. */
+    abstract void closeUnderlyingCursor();
+
+    /** Returns the state of this cursor. */
+    abstract CursorState<K, V> getState() throws ChangelogException;
+
+    /** Reinitialize this cursor to the provided state. */
+    abstract void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException;
+
+    /** Returns true if cursor is pointing on provided log file. */
+    abstract boolean isAccessingLogFile(LogFile<K, V> logFile);
+
+  }
+
+  /**
+   * Implements an internal cursor on the log.
+   * <p>
+   * This cursor is intended to be used <b>only<b> inside an {@link AbortableLogCursor},
+   * because it is relying on AbortableLogCursor for locking.
+   */
+  private static class InternalLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
   {
     private final Log<K, V> log;
-
     private LogFile<K, V> currentLogFile;
     private LogFileCursor<K, V> currentCursor;
-    private boolean actAsEmptyCursor;
 
     /**
      * Creates a cursor on the provided log.
@@ -1149,66 +1203,40 @@
      * @throws ChangelogException
      *           If an error occurs when creating the cursor.
      */
-    private LogCursor(final Log<K, V> log) throws ChangelogException
+    private InternalLogCursor(final Log<K, V> log) throws ChangelogException
     {
       this.log = log;
-      this.actAsEmptyCursor = false;
     }
 
-    /** {@inheritDoc} */
     @Override
     public Record<K, V> getRecord()
     {
       return currentCursor != null ? currentCursor.getRecord() : null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean next() throws ChangelogException
     {
-      if (actAsEmptyCursor)
+      final boolean hasNext = currentCursor.next();
+      if (hasNext)
       {
-        return false;
+        return true;
       }
-      log.sharedLock.lock();
-      try
+      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+      if (logFile != null)
       {
-        final boolean hasNext = currentCursor.next();
-        if (hasNext)
-        {
-          return true;
-        }
-        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
-        if (logFile != null)
-        {
-          switchToLogFile(logFile);
-          return currentCursor.next();
-        }
-        return false;
+        switchToLogFile(logFile);
+        return currentCursor.next();
       }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return false;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close()
     {
-      log.sharedLock.lock();
-      try
-      {
-        StaticUtils.close(currentCursor);
-        log.unregisterCursor(this);
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      StaticUtils.close(currentCursor);
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean positionTo(
         final K key,
@@ -1216,54 +1244,39 @@
         final PositionStrategy positionStrategy)
             throws ChangelogException
     {
-      if (actAsEmptyCursor)
+      final LogFile<K, V> logFile = log.findLogFileFor(key);
+      if (logFile != currentLogFile)
       {
-        return false;
+        switchToLogFile(logFile);
       }
-      log.sharedLock.lock();
-      try
-      {
-        final LogFile<K, V> logFile = log.findLogFileFor(key);
-        if (logFile != currentLogFile)
-        {
-          switchToLogFile(logFile);
-        }
-        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
     }
 
     /** Returns the state of this cursor. */
-    private CursorState<K, V> getState() throws ChangelogException
+    @Override
+    CursorState<K, V> getState() throws ChangelogException
     {
-      return !actAsEmptyCursor ?
-          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
     }
 
-    private void closeUnderlyingCursor()
+    @Override
+    void closeUnderlyingCursor()
     {
       StaticUtils.close(currentCursor);
     }
 
     /** Reinitialize this cursor to the provided state. */
-    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+    @Override
+    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
     {
-      if (!actAsEmptyCursor)
-      {
-        currentLogFile = cursorState.logFile;
-        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
-      }
+      currentLogFile = cursorState.logFile;
+      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
     }
 
-    /** Turn this cursor into an empty cursor, with no actual resource used. */
-    private void actAsEmptyCursor()
+    @Override
+    boolean isAccessingLogFile(LogFile<K, V> logFile)
     {
-      currentLogFile = null;
-      currentCursor = null;
-      actAsEmptyCursor = true;
+      return currentLogFile != null && currentLogFile.equals(logFile);
     }
 
     /** Switch the cursor to the provided log file. */
@@ -1274,53 +1287,281 @@
       currentCursor = currentLogFile.getCursor();
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString()
     {
-      return actAsEmptyCursor ?
-          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
-          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+      return  String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
               log.logPath, currentLogFile.getFile().getName(), currentCursor);
     }
+
   }
 
-  /** An empty cursor, that always return null records and false to {@code next()} method. */
-  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+  /**
+   * An empty cursor, that always return null records and false to {@link #next()} method.
+   * <p>
+   * This class is thread-safe.
+   */
+  private static final class EmptyCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
   {
-    /** {@inheritDoc} */
     @Override
     public Record<K,V> getRecord()
     {
       return null;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean next()
     {
       return false;
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
     {
       return false;
     }
 
-    /** {@inheritDoc} */
     @Override
     public void close()
     {
       // nothing to do
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString()
     {
-      return "EmptyLogCursor";
+      return getClass().getSimpleName();
+    }
+  }
+
+  /**
+   * An aborted cursor, that throws AbortedChangelogCursorException on methods that can
+   * throw a ChangelogException and returns a default value on other methods.
+   * <p>
+   * Although this cursor is thread-safe, it is intended to be used inside an
+   * AbortableLogCursor which manages locking.
+   */
+  private static final class AbortedLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
+  {
+    /** Records the path of the log the aborted cursor was positioned on. */
+    private final File logPath;
+
+    AbortedLogCursor(File logPath)
+    {
+      this.logPath = logPath;
+    }
+
+    @Override
+    public Record<K,V> getRecord()
+    {
+      throw new IllegalStateException("this cursor is aborted");
+    }
+
+    @Override
+    public boolean next() throws ChangelogException
+    {
+      throw abortedCursorException();
+    }
+
+    private AbortedChangelogCursorException abortedCursorException()
+    {
+      return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath));
+    }
+
+    @Override
+    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
+    {
+      throw abortedCursorException();
+    }
+
+    @Override
+    public void close()
+    {
+      // nothing to do
+    }
+
+    @Override
+    CursorState<K, V> getState() throws ChangelogException
+    {
+      throw abortedCursorException();
+    }
+
+    @Override
+    void closeUnderlyingCursor()
+    {
+      // nothing to do
+    }
+
+    @Override
+    void reinitializeTo(CursorState<K, V> cursorState) throws ChangelogException
+    {
+      throw abortedCursorException();
+    }
+
+    @Override
+    boolean isAccessingLogFile(LogFile<K, V> logFile)
+    {
+      return false;
+    }
+
+    @Override
+    public String toString()
+    {
+      return getClass().getSimpleName();
+    }
+  }
+
+  /**
+   * A cursor on the log that can be aborted.
+   * <p>
+   * The cursor uses the log sharedLock to ensure no read can occur during a
+   * rotation, a clear or a purge.
+   * <p>
+   * Note that only public methods use the sharedLock. Protected methods are intended to be used only
+   * internally in the Log class when the log exclusiveLock is on.
+   * <p>
+   * The cursor can be be aborted by calling the {@link #abort()} method.
+   */
+  private static class AbortableLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
+  {
+    /** The log on which this cursor is created. */
+    private final Log<K, V> log;
+
+    /** The actual cursor on which methods are delegated. */
+    private LogCursor<K, V> delegate;
+
+    /** Indicates if the cursor must be aborted. */
+    private boolean mustAbort;
+
+    private AbortableLogCursor(Log<K,V> log, LogCursor<K, V> delegate)
+    {
+      this.log = log;
+      this.delegate = delegate;
+    }
+
+    @Override
+    public Record<K, V> getRecord()
+    {
+      log.sharedLock.lock();
+      try
+      {
+        return delegate.getRecord();
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    @Override
+    public boolean next() throws ChangelogException
+    {
+      log.sharedLock.lock();
+      try
+      {
+        if (mustAbort)
+        {
+          delegate.close();
+          delegate = new AbortedLogCursor<K, V>(log.getPath());
+          mustAbort = false;
+        }
+        return delegate.next();
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    @Override
+    public void close()
+    {
+      log.sharedLock.lock();
+      try
+      {
+        delegate.close();
+        log.unregisterCursor(this);
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    @Override
+    public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
+        throws ChangelogException
+    {
+      log.sharedLock.lock();
+      try
+      {
+        return delegate.positionTo(key, matchStrategy, positionStrategy);
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /**
+     * Aborts this cursor. Once aborted, a cursor throws an
+     * AbortedChangelogCursorException if it is used.
+     * <p>
+     * This method is called only when log.exclusiveLock has been acquired.
+     */
+    void abort()
+    {
+      mustAbort = true;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This method is called only when log.exclusiveLock has been acquired.
+     */
+    @Override
+    CursorState<K, V> getState() throws ChangelogException
+    {
+      return delegate.getState();
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This method is called only when log.exclusiveLock has been acquired.
+     */
+    @Override
+    void closeUnderlyingCursor()
+    {
+      delegate.closeUnderlyingCursor();
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This method is called only when log.exclusiveLock has been acquired.
+     */
+    @Override
+    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+    {
+      delegate.reinitializeTo(cursorState);
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This method is called only when log.exclusiveLock has been acquired.
+     */
+    @Override
+    boolean isAccessingLogFile(LogFile<K, V> logFile)
+    {
+      return delegate.isAccessingLogFile(logFile);
+    }
+
+    @Override
+    public String toString()
+    {
+      return delegate.toString();
     }
   }
 
@@ -1345,11 +1586,33 @@
     /** The record the cursor is pointing to. */
     private final Record<K,V> record;
 
+    private final boolean isValid;
+
+    /** Creates a non-valid state. */
+    private CursorState() {
+      logFile = null;
+      filePosition = 0;
+      record = null;
+      isValid = false;
+    }
+
+    /** Creates a valid state. */
     private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
     {
       this.logFile = logFile;
       this.filePosition = filePosition;
       this.record = record;
+      isValid = true;
+    }
+
+    /**
+     * Indicates if this state is valid, i.e if it has non-null values.
+     *
+     * @return {@code true iff state is valid}
+     */
+    public boolean isValid()
+    {
+      return isValid;
     }
   }
 

--
Gitblit v1.10.0