From b877a7554a1fa1c47a2982541972efe780dfad9a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 11 Jun 2015 13:53:40 +0000
Subject: [PATCH] OPENDJ-1705 File based changelog: handle concurrency between purge and cursors
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java | 521 +++++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 392 insertions(+), 129 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 28b2e86..d503cb3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -39,7 +39,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -53,6 +52,7 @@
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.time.TimeService;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -166,7 +166,7 @@
* The list of non-empty cursors opened on this log. Opened cursors may have
* to be updated when rotating the head log file.
*/
- private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+ private final List<AbortableLogCursor<K, V>> openCursors = new CopyOnWriteArrayList<AbortableLogCursor<K, V>>();
/**
* A log file can be rotated once it has exceeded this size limit. The log file can have
@@ -246,6 +246,17 @@
return log;
}
+ /**
+ * Returns an empty cursor.
+ *
+ * @param <K> the type of keys.
+ * @param <V> the type of values.
+ * @return an empty cursor
+ */
+ static <K extends Comparable<K>, V> RepositionableCursor<K, V> getEmptyCursor() {
+ return new Log.EmptyCursor<K, V>();
+ }
+
/** Holds the parameters for log files rotation. */
static class LogRotationParameters {
@@ -331,6 +342,7 @@
this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
this.rotationIntervalInMillis = rotationParams.rotationInterval;
this.lastRotationTime = rotationParams.lastRotationTime;
+
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -435,7 +447,7 @@
LogFile<K, V> headLogFile = getHeadLogFile();
if (mustRotate(headLogFile))
{
- logger.debug(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+ logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
@@ -459,13 +471,20 @@
if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
{
// rotate because file size exceeded threshold
+ logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes());
return true;
}
if (rotationIntervalInMillis > 0)
{
// rotate if time limit is reached
final long timeElapsed = timeService.since(lastRotationTime);
- return timeElapsed > rotationIntervalInMillis;
+ boolean shouldRotate = timeElapsed > rotationIntervalInMillis;
+ if (shouldRotate)
+ {
+ logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s",
+ logPath.getPath(), timeElapsed, rotationIntervalInMillis);
+ }
+ return shouldRotate;
}
return false;
}
@@ -512,15 +531,15 @@
*/
public RepositionableCursor<K, V> getCursor() throws ChangelogException
{
- LogCursor<K, V> cursor = null;
+ AbortableLogCursor<K, V> cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
- return new EmptyLogCursor<K, V>();
+ return new EmptyCursor<K, V>();
}
- cursor = new LogCursor<K, V>(this);
+ cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
cursor.positionTo(null, null, null);
registerCursor(cursor);
return cursor;
@@ -575,15 +594,15 @@
{
return getCursor();
}
- LogCursor<K, V> cursor = null;
+ AbortableLogCursor<K, V> cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
- return new EmptyLogCursor<K, V>();
+ return new EmptyCursor<K, V>();
}
- cursor = new LogCursor<K, V>(this);
+ cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
// Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
@@ -594,7 +613,7 @@
else
{
StaticUtils.close(cursor);
- return new EmptyLogCursor<K, V>();
+ return new EmptyCursor<K, V>();
}
}
catch (ChangelogException e)
@@ -701,12 +720,13 @@
return null;
}
final List<String> undeletableFiles = new ArrayList<String>();
- final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+ final Iterator<LogFile<K, V>> entriesToPurge = logFilesToPurge.values().iterator();
while (entriesToPurge.hasNext())
{
- final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+ final LogFile<K, V> logFile = entriesToPurge.next();
try
{
+ abortCursorsOpenOnLogFile(logFile);
logFile.close();
logFile.delete();
entriesToPurge.remove();
@@ -733,7 +753,23 @@
}
/**
+ * Abort all cursors opened on the provided log file.
+ */
+ private void abortCursorsOpenOnLogFile(LogFile<K, V> logFile)
+ {
+ for (AbortableLogCursor<K, V> cursor : openCursors)
+ {
+ if (cursor.isAccessingLogFile(logFile))
+ {
+ cursor.abort();
+ }
+ }
+ }
+
+ /**
* Empties the log, discarding all records it contains.
+ * <p>
+ * All cursors open on the log are aborted.
*
* @throws ChangelogException
* If cursors are opened on this log, or if a problem occurs during
@@ -750,9 +786,10 @@
}
if (!openCursors.isEmpty())
{
- // Allow opened cursors at this point, but turn them into empty cursors.
- // This behavior is needed by the change number indexer thread.
- switchCursorsOpenedIntoEmptyCursors();
+ // All open cursors are aborted, which means the change number indexer thread
+ // should manage AbortedChangelogCursorException specifically to avoid being
+ // stopped
+ abortAllOpenCursors();
}
// delete all log files
@@ -806,7 +843,6 @@
}
}
- /** {@inheritDoc} */
@Override
public void close()
{
@@ -856,19 +892,19 @@
sharedLock.lock();
try
{
- K key = null;
- for (LogFile<K, V> logFile : logFiles.values())
+ K key = null;
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ final Record<K, V> record = logFile.getOldestRecord();
+ final V2 oldestValue = mapper.map(record.getValue());
+ if (oldestValue.compareTo(limitValue) > 0)
{
- final Record<K, V> record = logFile.getOldestRecord();
- final V2 oldestValue = mapper.map(record.getValue());
- if (oldestValue.compareTo(limitValue) > 0)
- {
- return key;
- }
- key = record.getKey();
+ return key;
}
- return key;
+ key = record.getKey();
}
+ return key;
+ }
finally
{
sharedLock.unlock();
@@ -918,7 +954,7 @@
private void rotateHeadLogFile() throws ChangelogException
{
// Temporarily disable cursors opened on head, saving their state
- final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+ final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
final LogFile<K, V> headLogFile = getHeadLogFile();
final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
@@ -993,31 +1029,30 @@
}
/** Update the cursors that were pointing to head after a rotation of the head log file. */
- private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+ private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
throws ChangelogException
{
- for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
+ for (Pair<AbortableLogCursor<K, V>, CursorState<K, V>> pair : cursors)
{
final CursorState<K, V> cursorState = pair.getSecond();
// Need to update the cursor only if it is pointing to the head log file
- if (isHeadLogFile(cursorState.logFile))
+ if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
final LogFile<K, V> logFile = findLogFileFor(previousKey);
- final LogCursor<K, V> cursor = pair.getFirst();
+ final AbortableLogCursor<K, V> cursor = pair.getFirst();
cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
}
}
}
- private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
+ private void abortAllOpenCursors() throws ChangelogException
{
- for (LogCursor<K, V> cursor : openCursors)
+ for (AbortableLogCursor<K, V> cursor : openCursors)
{
- cursor.actAsEmptyCursor();
+ cursor.abort();
}
- openCursors.clear();
}
/**
@@ -1028,13 +1063,14 @@
* @throws ChangelogException
* If an error occurs.
*/
- private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+ private List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead()
+ throws ChangelogException
{
- final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
- new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
- for (LogCursor<K, V> cursor : openCursors)
+ final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> openCursorsStates = new ArrayList<>();
+ final LogFile<K, V> headLogFile = getHeadLogFile();
+ for (AbortableLogCursor<K, V> cursor : openCursors)
{
- if (isHeadLogFile(cursor.currentLogFile))
+ if (cursor.isAccessingLogFile(headLogFile))
{
openCursorsStates.add(Pair.of(cursor, cursor.getState()));
cursor.closeUnderlyingCursor();
@@ -1058,7 +1094,7 @@
logFiles.put(bounds.getSecond(), logFile);
}
- private void registerCursor(final LogCursor<K, V> cursor)
+ private void registerCursor(final AbortableLogCursor<K, V> cursor)
{
openCursors.add(cursor);
}
@@ -1126,20 +1162,38 @@
}
/**
- * Implements a cursor on the log.
+ * Represents an internal view of a cursor on the log, with extended operations.
* <p>
- * The cursor uses the log shared lock to ensure reads are not done during a rotation.
- * <p>
- * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
- * method.
+ * This is an abstract class rather than an interface to allow reduced visibility of the methods.
*/
- private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+ private abstract static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+ {
+
+ /** Closes the underlying cursor. */
+ abstract void closeUnderlyingCursor();
+
+ /** Returns the state of this cursor. */
+ abstract CursorState<K, V> getState() throws ChangelogException;
+
+ /** Reinitialize this cursor to the provided state. */
+ abstract void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException;
+
+ /** Returns true if cursor is pointing on provided log file. */
+ abstract boolean isAccessingLogFile(LogFile<K, V> logFile);
+
+ }
+
+ /**
+ * Implements an internal cursor on the log.
+ * <p>
+ * This cursor is intended to be used <b>only<b> inside an {@link AbortableLogCursor},
+ * because it is relying on AbortableLogCursor for locking.
+ */
+ private static class InternalLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
{
private final Log<K, V> log;
-
private LogFile<K, V> currentLogFile;
private LogFileCursor<K, V> currentCursor;
- private boolean actAsEmptyCursor;
/**
* Creates a cursor on the provided log.
@@ -1149,66 +1203,40 @@
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
- private LogCursor(final Log<K, V> log) throws ChangelogException
+ private InternalLogCursor(final Log<K, V> log) throws ChangelogException
{
this.log = log;
- this.actAsEmptyCursor = false;
}
- /** {@inheritDoc} */
@Override
public Record<K, V> getRecord()
{
return currentCursor != null ? currentCursor.getRecord() : null;
}
- /** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
- if (actAsEmptyCursor)
+ final boolean hasNext = currentCursor.next();
+ if (hasNext)
{
- return false;
+ return true;
}
- log.sharedLock.lock();
- try
+ final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+ if (logFile != null)
{
- final boolean hasNext = currentCursor.next();
- if (hasNext)
- {
- return true;
- }
- final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
- if (logFile != null)
- {
- switchToLogFile(logFile);
- return currentCursor.next();
- }
- return false;
+ switchToLogFile(logFile);
+ return currentCursor.next();
}
- finally
- {
- log.sharedLock.unlock();
- }
+ return false;
}
- /** {@inheritDoc} */
@Override
public void close()
{
- log.sharedLock.lock();
- try
- {
- StaticUtils.close(currentCursor);
- log.unregisterCursor(this);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ StaticUtils.close(currentCursor);
}
- /** {@inheritDoc} */
@Override
public boolean positionTo(
final K key,
@@ -1216,54 +1244,39 @@
final PositionStrategy positionStrategy)
throws ChangelogException
{
- if (actAsEmptyCursor)
+ final LogFile<K, V> logFile = log.findLogFileFor(key);
+ if (logFile != currentLogFile)
{
- return false;
+ switchToLogFile(logFile);
}
- log.sharedLock.lock();
- try
- {
- final LogFile<K, V> logFile = log.findLogFileFor(key);
- if (logFile != currentLogFile)
- {
- switchToLogFile(logFile);
- }
- return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
/** Returns the state of this cursor. */
- private CursorState<K, V> getState() throws ChangelogException
+ @Override
+ CursorState<K, V> getState() throws ChangelogException
{
- return !actAsEmptyCursor ?
- new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+ return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
}
- private void closeUnderlyingCursor()
+ @Override
+ void closeUnderlyingCursor()
{
StaticUtils.close(currentCursor);
}
/** Reinitialize this cursor to the provided state. */
- private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+ @Override
+ void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
{
- if (!actAsEmptyCursor)
- {
- currentLogFile = cursorState.logFile;
- currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
- }
+ currentLogFile = cursorState.logFile;
+ currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
}
- /** Turn this cursor into an empty cursor, with no actual resource used. */
- private void actAsEmptyCursor()
+ @Override
+ boolean isAccessingLogFile(LogFile<K, V> logFile)
{
- currentLogFile = null;
- currentCursor = null;
- actAsEmptyCursor = true;
+ return currentLogFile != null && currentLogFile.equals(logFile);
}
/** Switch the cursor to the provided log file. */
@@ -1274,53 +1287,281 @@
currentCursor = currentLogFile.getCursor();
}
- /** {@inheritDoc} */
@Override
public String toString()
{
- return actAsEmptyCursor ?
- String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
- String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+ return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
log.logPath, currentLogFile.getFile().getName(), currentCursor);
}
+
}
- /** An empty cursor, that always return null records and false to {@code next()} method. */
- static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+ /**
+ * An empty cursor, that always return null records and false to {@link #next()} method.
+ * <p>
+ * This class is thread-safe.
+ */
+ private static final class EmptyCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
{
- /** {@inheritDoc} */
@Override
public Record<K,V> getRecord()
{
return null;
}
- /** {@inheritDoc} */
@Override
public boolean next()
{
return false;
}
- /** {@inheritDoc} */
@Override
public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
{
return false;
}
- /** {@inheritDoc} */
@Override
public void close()
{
// nothing to do
}
- /** {@inheritDoc} */
@Override
public String toString()
{
- return "EmptyLogCursor";
+ return getClass().getSimpleName();
+ }
+ }
+
+ /**
+ * An aborted cursor, that throws AbortedChangelogCursorException on methods that can
+ * throw a ChangelogException and returns a default value on other methods.
+ * <p>
+ * Although this cursor is thread-safe, it is intended to be used inside an
+ * AbortableLogCursor which manages locking.
+ */
+ private static final class AbortedLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
+ {
+ /** Records the path of the log the aborted cursor was positioned on. */
+ private final File logPath;
+
+ AbortedLogCursor(File logPath)
+ {
+ this.logPath = logPath;
+ }
+
+ @Override
+ public Record<K,V> getRecord()
+ {
+ throw new IllegalStateException("this cursor is aborted");
+ }
+
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ private AbortedChangelogCursorException abortedCursorException()
+ {
+ return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath));
+ }
+
+ @Override
+ public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ @Override
+ public void close()
+ {
+ // nothing to do
+ }
+
+ @Override
+ CursorState<K, V> getState() throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ @Override
+ void closeUnderlyingCursor()
+ {
+ // nothing to do
+ }
+
+ @Override
+ void reinitializeTo(CursorState<K, V> cursorState) throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ @Override
+ boolean isAccessingLogFile(LogFile<K, V> logFile)
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName();
+ }
+ }
+
+ /**
+ * A cursor on the log that can be aborted.
+ * <p>
+ * The cursor uses the log sharedLock to ensure no read can occur during a
+ * rotation, a clear or a purge.
+ * <p>
+ * Note that only public methods use the sharedLock. Protected methods are intended to be used only
+ * internally in the Log class when the log exclusiveLock is on.
+ * <p>
+ * The cursor can be be aborted by calling the {@link #abort()} method.
+ */
+ private static class AbortableLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
+ {
+ /** The log on which this cursor is created. */
+ private final Log<K, V> log;
+
+ /** The actual cursor on which methods are delegated. */
+ private LogCursor<K, V> delegate;
+
+ /** Indicates if the cursor must be aborted. */
+ private boolean mustAbort;
+
+ private AbortableLogCursor(Log<K,V> log, LogCursor<K, V> delegate)
+ {
+ this.log = log;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Record<K, V> getRecord()
+ {
+ log.sharedLock.lock();
+ try
+ {
+ return delegate.getRecord();
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ log.sharedLock.lock();
+ try
+ {
+ if (mustAbort)
+ {
+ delegate.close();
+ delegate = new AbortedLogCursor<K, V>(log.getPath());
+ mustAbort = false;
+ }
+ return delegate.next();
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ log.sharedLock.lock();
+ try
+ {
+ delegate.close();
+ log.unregisterCursor(this);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException
+ {
+ log.sharedLock.lock();
+ try
+ {
+ return delegate.positionTo(key, matchStrategy, positionStrategy);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Aborts this cursor. Once aborted, a cursor throws an
+ * AbortedChangelogCursorException if it is used.
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ void abort()
+ {
+ mustAbort = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ CursorState<K, V> getState() throws ChangelogException
+ {
+ return delegate.getState();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ void closeUnderlyingCursor()
+ {
+ delegate.closeUnderlyingCursor();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+ {
+ delegate.reinitializeTo(cursorState);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ boolean isAccessingLogFile(LogFile<K, V> logFile)
+ {
+ return delegate.isAccessingLogFile(logFile);
+ }
+
+ @Override
+ public String toString()
+ {
+ return delegate.toString();
}
}
@@ -1345,11 +1586,33 @@
/** The record the cursor is pointing to. */
private final Record<K,V> record;
+ private final boolean isValid;
+
+ /** Creates a non-valid state. */
+ private CursorState() {
+ logFile = null;
+ filePosition = 0;
+ record = null;
+ isValid = false;
+ }
+
+ /** Creates a valid state. */
private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
{
this.logFile = logFile;
this.filePosition = filePosition;
this.record = record;
+ isValid = true;
+ }
+
+ /**
+ * Indicates if this state is valid, i.e if it has non-null values.
+ *
+ * @return {@code true iff state is valid}
+ */
+ public boolean isValid()
+ {
+ return isValid;
}
}
--
Gitblit v1.10.0