| | |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | |
| | | * The list of non-empty cursors opened on this log. Opened cursors may have |
| | | * to be updated when rotating the head log file. |
| | | */ |
| | | private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>(); |
| | | private final List<AbortableLogCursor<K, V>> openCursors = new CopyOnWriteArrayList<AbortableLogCursor<K, V>>(); |
| | | |
| | | /** |
| | | * A log file can be rotated once it has exceeded this size limit. The log file can have |
| | |
| | | return log; |
| | | } |
| | | |
| | | /** |
| | | * Returns an empty cursor. |
| | | * |
| | | * @param <K> the type of keys. |
| | | * @param <V> the type of values. |
| | | * @return an empty cursor |
| | | */ |
| | | static <K extends Comparable<K>, V> RepositionableCursor<K, V> getEmptyCursor() { |
| | | return new Log.EmptyCursor<K, V>(); |
| | | } |
| | | |
| | | /** Holds the parameters for log files rotation. */ |
| | | static class LogRotationParameters { |
| | | |
| | |
| | | this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes; |
| | | this.rotationIntervalInMillis = rotationParams.rotationInterval; |
| | | this.lastRotationTime = rotationParams.lastRotationTime; |
| | | |
| | | this.referenceCount = 1; |
| | | |
| | | final ReadWriteLock lock = new ReentrantReadWriteLock(false); |
| | |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (mustRotate(headLogFile)) |
| | | { |
| | | logger.debug(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | |
| | | rotateHeadLogFile(); |
| | | headLogFile = getHeadLogFile(); |
| | |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | { |
| | | // rotate because file size exceeded threshold |
| | | logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes()); |
| | | return true; |
| | | } |
| | | if (rotationIntervalInMillis > 0) |
| | | { |
| | | // rotate if time limit is reached |
| | | final long timeElapsed = timeService.since(lastRotationTime); |
| | | return timeElapsed > rotationIntervalInMillis; |
| | | boolean shouldRotate = timeElapsed > rotationIntervalInMillis; |
| | | if (shouldRotate) |
| | | { |
| | | logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s", |
| | | logPath.getPath(), timeElapsed, rotationIntervalInMillis); |
| | | } |
| | | return shouldRotate; |
| | | } |
| | | return false; |
| | | } |
| | |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor() throws ChangelogException |
| | | { |
| | | LogCursor<K, V> cursor = null; |
| | | AbortableLogCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | return new EmptyCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this)); |
| | | cursor.positionTo(null, null, null); |
| | | registerCursor(cursor); |
| | | return cursor; |
| | |
| | | { |
| | | return getCursor(); |
| | | } |
| | | LogCursor<K, V> cursor = null; |
| | | AbortableLogCursor<K, V> cursor = null; |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return new EmptyLogCursor<K, V>(); |
| | | return new EmptyCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this)); |
| | | final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy |
| | | if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | |
| | | else |
| | | { |
| | | StaticUtils.close(cursor); |
| | | return new EmptyLogCursor<K, V>(); |
| | | return new EmptyCursor<K, V>(); |
| | | } |
| | | } |
| | | catch (ChangelogException e) |
| | |
| | | return null; |
| | | } |
| | | final List<String> undeletableFiles = new ArrayList<String>(); |
| | | final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator(); |
| | | final Iterator<LogFile<K, V>> entriesToPurge = logFilesToPurge.values().iterator(); |
| | | while (entriesToPurge.hasNext()) |
| | | { |
| | | final LogFile<K, V> logFile = entriesToPurge.next().getValue(); |
| | | final LogFile<K, V> logFile = entriesToPurge.next(); |
| | | try |
| | | { |
| | | abortCursorsOpenOnLogFile(logFile); |
| | | logFile.close(); |
| | | logFile.delete(); |
| | | entriesToPurge.remove(); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Abort all cursors opened on the provided log file. |
| | | */ |
| | | private void abortCursorsOpenOnLogFile(LogFile<K, V> logFile) |
| | | { |
| | | for (AbortableLogCursor<K, V> cursor : openCursors) |
| | | { |
| | | if (cursor.isAccessingLogFile(logFile)) |
| | | { |
| | | cursor.abort(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Empties the log, discarding all records it contains. |
| | | * <p> |
| | | * All cursors open on the log are aborted. |
| | | * |
| | | * @throws ChangelogException |
| | | * If cursors are opened on this log, or if a problem occurs during |
| | |
| | | } |
| | | if (!openCursors.isEmpty()) |
| | | { |
| | | // Allow opened cursors at this point, but turn them into empty cursors. |
| | | // This behavior is needed by the change number indexer thread. |
| | | switchCursorsOpenedIntoEmptyCursors(); |
| | | // All open cursors are aborted, which means the change number indexer thread |
| | | // should manage AbortedChangelogCursorException specifically to avoid being |
| | | // stopped |
| | | abortAllOpenCursors(); |
| | | } |
| | | |
| | | // delete all log files |
| | |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | K key = null; |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | K key = null; |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | final Record<K, V> record = logFile.getOldestRecord(); |
| | | final V2 oldestValue = mapper.map(record.getValue()); |
| | | if (oldestValue.compareTo(limitValue) > 0) |
| | | { |
| | | final Record<K, V> record = logFile.getOldestRecord(); |
| | | final V2 oldestValue = mapper.map(record.getValue()); |
| | | if (oldestValue.compareTo(limitValue) > 0) |
| | | { |
| | | return key; |
| | | } |
| | | key = record.getKey(); |
| | | return key; |
| | | } |
| | | return key; |
| | | key = record.getKey(); |
| | | } |
| | | return key; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | |
| | | private void rotateHeadLogFile() throws ChangelogException |
| | | { |
| | | // Temporarily disable cursors opened on head, saving their state |
| | | final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead(); |
| | | final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead(); |
| | | |
| | | final LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile)); |
| | |
| | | } |
| | | |
| | | /** Update the cursors that were pointing to head after a rotation of the head log file. */ |
| | | private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors) |
| | | private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors) |
| | | throws ChangelogException |
| | | { |
| | | for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors) |
| | | for (Pair<AbortableLogCursor<K, V>, CursorState<K, V>> pair : cursors) |
| | | { |
| | | final CursorState<K, V> cursorState = pair.getSecond(); |
| | | |
| | | // Need to update the cursor only if it is pointing to the head log file |
| | | if (isHeadLogFile(cursorState.logFile)) |
| | | if (cursorState.isValid() && isHeadLogFile(cursorState.logFile)) |
| | | { |
| | | final K previousKey = logFiles.lowerKey(recordParser.getMaxKey()); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey); |
| | | final LogCursor<K, V> cursor = pair.getFirst(); |
| | | final AbortableLogCursor<K, V> cursor = pair.getFirst(); |
| | | cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException |
| | | private void abortAllOpenCursors() throws ChangelogException |
| | | { |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | for (AbortableLogCursor<K, V> cursor : openCursors) |
| | | { |
| | | cursor.actAsEmptyCursor(); |
| | | cursor.abort(); |
| | | } |
| | | openCursors.clear(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException |
| | | private List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() |
| | | throws ChangelogException |
| | | { |
| | | final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates = |
| | | new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>(); |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> openCursorsStates = new ArrayList<>(); |
| | | final LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | for (AbortableLogCursor<K, V> cursor : openCursors) |
| | | { |
| | | if (isHeadLogFile(cursor.currentLogFile)) |
| | | if (cursor.isAccessingLogFile(headLogFile)) |
| | | { |
| | | openCursorsStates.add(Pair.of(cursor, cursor.getState())); |
| | | cursor.closeUnderlyingCursor(); |
| | |
| | | logFiles.put(bounds.getSecond(), logFile); |
| | | } |
| | | |
| | | private void registerCursor(final LogCursor<K, V> cursor) |
| | | private void registerCursor(final AbortableLogCursor<K, V> cursor) |
| | | { |
| | | openCursors.add(cursor); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Implements a cursor on the log. |
| | | * Represents an internal view of a cursor on the log, with extended operations. |
| | | * <p> |
| | | * The cursor uses the log shared lock to ensure reads are not done during a rotation. |
| | | * <p> |
| | | * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()} |
| | | * method. |
| | | * This is an abstract class rather than an interface to allow reduced visibility of the methods. |
| | | */ |
| | | private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V> |
| | | private abstract static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V> |
| | | { |
| | | |
| | | /** Closes the underlying cursor. */ |
| | | abstract void closeUnderlyingCursor(); |
| | | |
| | | /** Returns the state of this cursor. */ |
| | | abstract CursorState<K, V> getState() throws ChangelogException; |
| | | |
| | | /** Reinitialize this cursor to the provided state. */ |
| | | abstract void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException; |
| | | |
| | | /** Returns true if cursor is pointing on provided log file. */ |
| | | abstract boolean isAccessingLogFile(LogFile<K, V> logFile); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Implements an internal cursor on the log. |
| | | * <p> |
| | | * This cursor is intended to be used <b>only<b> inside an {@link AbortableLogCursor}, |
| | | * because it is relying on AbortableLogCursor for locking. |
| | | */ |
| | | private static class InternalLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V> |
| | | { |
| | | private final Log<K, V> log; |
| | | |
| | | private LogFile<K, V> currentLogFile; |
| | | private LogFileCursor<K, V> currentCursor; |
| | | private boolean actAsEmptyCursor; |
| | | |
| | | /** |
| | | * Creates a cursor on the provided log. |
| | |
| | | * @throws ChangelogException |
| | | * If an error occurs when creating the cursor. |
| | | */ |
| | | private LogCursor(final Log<K, V> log) throws ChangelogException |
| | | private InternalLogCursor(final Log<K, V> log) throws ChangelogException |
| | | { |
| | | this.log = log; |
| | | this.actAsEmptyCursor = false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K, V> getRecord() |
| | | { |
| | | return currentCursor != null ? currentCursor.getRecord() : null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | final boolean hasNext = currentCursor.next(); |
| | | if (hasNext) |
| | | { |
| | | return false; |
| | | return true; |
| | | } |
| | | log.sharedLock.lock(); |
| | | try |
| | | final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile); |
| | | if (logFile != null) |
| | | { |
| | | final boolean hasNext = currentCursor.next(); |
| | | if (hasNext) |
| | | { |
| | | return true; |
| | | } |
| | | final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile); |
| | | if (logFile != null) |
| | | { |
| | | switchToLogFile(logFile); |
| | | return currentCursor.next(); |
| | | } |
| | | return false; |
| | | switchToLogFile(logFile); |
| | | return currentCursor.next(); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | log.unregisterCursor(this); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | StaticUtils.close(currentCursor); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo( |
| | | final K key, |
| | |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | final LogFile<K, V> logFile = log.findLogFileFor(key); |
| | | if (logFile != currentLogFile) |
| | | { |
| | | return false; |
| | | switchToLogFile(logFile); |
| | | } |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | final LogFile<K, V> logFile = log.findLogFileFor(key); |
| | | if (logFile != currentLogFile) |
| | | { |
| | | switchToLogFile(logFile); |
| | | } |
| | | return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | |
| | | /** Returns the state of this cursor. */ |
| | | private CursorState<K, V> getState() throws ChangelogException |
| | | @Override |
| | | CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return !actAsEmptyCursor ? |
| | | new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null; |
| | | return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()); |
| | | } |
| | | |
| | | private void closeUnderlyingCursor() |
| | | @Override |
| | | void closeUnderlyingCursor() |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | } |
| | | |
| | | /** Reinitialize this cursor to the provided state. */ |
| | | private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | @Override |
| | | void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | if (!actAsEmptyCursor) |
| | | { |
| | | currentLogFile = cursorState.logFile; |
| | | currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition); |
| | | } |
| | | currentLogFile = cursorState.logFile; |
| | | currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition); |
| | | } |
| | | |
| | | /** Turn this cursor into an empty cursor, with no actual resource used. */ |
| | | private void actAsEmptyCursor() |
| | | @Override |
| | | boolean isAccessingLogFile(LogFile<K, V> logFile) |
| | | { |
| | | currentLogFile = null; |
| | | currentCursor = null; |
| | | actAsEmptyCursor = true; |
| | | return currentLogFile != null && currentLogFile.equals(logFile); |
| | | } |
| | | |
| | | /** Switch the cursor to the provided log file. */ |
| | |
| | | currentCursor = currentLogFile.getCursor(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return actAsEmptyCursor ? |
| | | String.format("Cursor on log : %s, acting as empty cursor", log.logPath) : |
| | | String.format("Cursor on log : %s, current log file: %s, current cursor: %s", |
| | | return String.format("Cursor on log : %s, current log file: %s, current cursor: %s", |
| | | log.logPath, currentLogFile.getFile().getName(), currentCursor); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** An empty cursor, that always return null records and false to {@code next()} method. */ |
| | | static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V> |
| | | /** |
| | | * An empty cursor, that always return null records and false to {@link #next()} method. |
| | | * <p> |
| | | * This class is thread-safe. |
| | | */ |
| | | private static final class EmptyCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V> |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "EmptyLogCursor"; |
| | | return getClass().getSimpleName(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * An aborted cursor, that throws AbortedChangelogCursorException on methods that can |
| | | * throw a ChangelogException and returns a default value on other methods. |
| | | * <p> |
| | | * Although this cursor is thread-safe, it is intended to be used inside an |
| | | * AbortableLogCursor which manages locking. |
| | | */ |
| | | private static final class AbortedLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V> |
| | | { |
| | | /** Records the path of the log the aborted cursor was positioned on. */ |
| | | private final File logPath; |
| | | |
| | | AbortedLogCursor(File logPath) |
| | | { |
| | | this.logPath = logPath; |
| | | } |
| | | |
| | | @Override |
| | | public Record<K,V> getRecord() |
| | | { |
| | | throw new IllegalStateException("this cursor is aborted"); |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | throw abortedCursorException(); |
| | | } |
| | | |
| | | private AbortedChangelogCursorException abortedCursorException() |
| | | { |
| | | return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath)); |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException |
| | | { |
| | | throw abortedCursorException(); |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | @Override |
| | | CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | throw abortedCursorException(); |
| | | } |
| | | |
| | | @Override |
| | | void closeUnderlyingCursor() |
| | | { |
| | | // nothing to do |
| | | } |
| | | |
| | | @Override |
| | | void reinitializeTo(CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | throw abortedCursorException(); |
| | | } |
| | | |
| | | @Override |
| | | boolean isAccessingLogFile(LogFile<K, V> logFile) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * A cursor on the log that can be aborted. |
| | | * <p> |
| | | * The cursor uses the log sharedLock to ensure no read can occur during a |
| | | * rotation, a clear or a purge. |
| | | * <p> |
| | | * Note that only public methods use the sharedLock. Protected methods are intended to be used only |
| | | * internally in the Log class when the log exclusiveLock is on. |
| | | * <p> |
| | | * The cursor can be be aborted by calling the {@link #abort()} method. |
| | | */ |
| | | private static class AbortableLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V> |
| | | { |
| | | /** The log on which this cursor is created. */ |
| | | private final Log<K, V> log; |
| | | |
| | | /** The actual cursor on which methods are delegated. */ |
| | | private LogCursor<K, V> delegate; |
| | | |
| | | /** Indicates if the cursor must be aborted. */ |
| | | private boolean mustAbort; |
| | | |
| | | private AbortableLogCursor(Log<K,V> log, LogCursor<K, V> delegate) |
| | | { |
| | | this.log = log; |
| | | this.delegate = delegate; |
| | | } |
| | | |
| | | @Override |
| | | public Record<K, V> getRecord() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | return delegate.getRecord(); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | if (mustAbort) |
| | | { |
| | | delegate.close(); |
| | | delegate = new AbortedLogCursor<K, V>(log.getPath()); |
| | | mustAbort = false; |
| | | } |
| | | return delegate.next(); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | delegate.close(); |
| | | log.unregisterCursor(this); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | return delegate.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Aborts this cursor. Once aborted, a cursor throws an |
| | | * AbortedChangelogCursorException if it is used. |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | void abort() |
| | | { |
| | | mustAbort = true; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | @Override |
| | | CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return delegate.getState(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | @Override |
| | | void closeUnderlyingCursor() |
| | | { |
| | | delegate.closeUnderlyingCursor(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | @Override |
| | | void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | delegate.reinitializeTo(cursorState); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | @Override |
| | | boolean isAccessingLogFile(LogFile<K, V> logFile) |
| | | { |
| | | return delegate.isAccessingLogFile(logFile); |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return delegate.toString(); |
| | | } |
| | | } |
| | | |
| | |
| | | /** The record the cursor is pointing to. */ |
| | | private final Record<K,V> record; |
| | | |
| | | private final boolean isValid; |
| | | |
| | | /** Creates a non-valid state. */ |
| | | private CursorState() { |
| | | logFile = null; |
| | | filePosition = 0; |
| | | record = null; |
| | | isValid = false; |
| | | } |
| | | |
| | | /** Creates a valid state. */ |
| | | private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record) |
| | | { |
| | | this.logFile = logFile; |
| | | this.filePosition = filePosition; |
| | | this.record = record; |
| | | isValid = true; |
| | | } |
| | | |
| | | /** |
| | | * Indicates if this state is valid, i.e if it has non-null values. |
| | | * |
| | | * @return {@code true iff state is valid} |
| | | */ |
| | | public boolean isValid() |
| | | { |
| | | return isValid; |
| | | } |
| | | } |
| | | |