From b877a7554a1fa1c47a2982541972efe780dfad9a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 11 Jun 2015 13:53:40 +0000
Subject: [PATCH] OPENDJ-1705 File based changelog: handle concurrency between purge and cursors
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java | 25 +
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 4
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java | 6
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java | 521 ++++++++++++++++++++++++++++++++----------
opendj-server-legacy/src/messages/org/opends/messages/replication.properties | 4
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java | 52 ++++
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/AbortedChangelogCursorException.java | 78 ++++++
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java | 5
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java | 10
9 files changed, 565 insertions(+), 140 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index a13585c..a5e96ec 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -239,8 +239,10 @@
* @param sendToServerId
* serverId of the replica where changes will be sent
* @return The next update that must be sent to the consumer, or {@code null} when queue is empty
+ * @throws ChangelogException
+ * If a problem occurs when reading the changelog
*/
- protected UpdateMsg getNextMessage(int sendToServerId)
+ protected UpdateMsg getNextMessage(int sendToServerId) throws ChangelogException
{
while (activeConsumer)
{
@@ -371,7 +373,7 @@
* Fills the late queue with the most recent changes, accepting only the
* messages from provided replica ids.
*/
- private void fillLateQueue(int sendToServerId)
+ private void fillLateQueue(int sendToServerId) throws ChangelogException
{
try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
{
@@ -384,10 +386,6 @@
}
}
}
- catch (ChangelogException e)
- {
- logger.traceException(e);
- }
}
private boolean isLateQueueBelowThreshold()
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
index 6656852..3a76cfa 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
@@ -925,8 +926,10 @@
*
* @return the next update that must be sent to the server managed by this
* ServerHandler.
+ * @throws ChangelogException
+ * If a problem occurs when reading the changelog
*/
- public UpdateMsg take()
+ public UpdateMsg take() throws ChangelogException
{
final UpdateMsg msg = getNextMessage(serverId);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/AbortedChangelogCursorException.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/AbortedChangelogCursorException.java
new file mode 100644
index 0000000..f0ada51
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/AbortedChangelogCursorException.java
@@ -0,0 +1,78 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import javax.annotation.Generated;
+
+import org.forgerock.i18n.LocalizableMessage;
+
+/**
+ * This exception is thrown when a cursor that has been aborted is used.
+ * <p>
+ * A cursor can be aborted when it is open on a log file that
+ * must be purged or cleared.
+ */
+public class AbortedChangelogCursorException extends ChangelogException
+{
+
+ @Generated("Eclipse")
+ private static final long serialVersionUID = -2123770048083474999L;
+
+ /**
+ * Creates a new exception with the provided message.
+ *
+ * @param message
+ * The message that explains the problem that occurred.
+ */
+ public AbortedChangelogCursorException(LocalizableMessage message)
+ {
+ super(message);
+ }
+
+ /**
+ * Creates a new exception with the provided cause.
+ *
+ * @param cause
+ * The underlying cause that triggered this exception.
+ */
+ public AbortedChangelogCursorException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ /**
+ * Creates a new exception with the provided message and cause.
+ *
+ * @param message
+ * The message that explains the problem that occurred.
+ * @param cause
+ * The underlying cause that triggered this exception.
+ */
+ public AbortedChangelogCursorException(LocalizableMessage message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
index 4d11069..48dbbbf 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -38,6 +38,7 @@
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -436,12 +437,12 @@
wait();
}
// check whether new changes have been added to the ReplicaDBs
- nextChangeForInsertDBCursor.next();
+ moveToNextChange();
continue;
}
else if (msg instanceof ReplicaOfflineMsg)
{
- nextChangeForInsertDBCursor.next();
+ moveToNextChange();
continue;
}
@@ -504,6 +505,26 @@
}
}
+ private void moveToNextChange() throws ChangelogException
+ {
+ try
+ {
+ nextChangeForInsertDBCursor.next();
+ }
+ catch (AbortedChangelogCursorException e) {
+ if (domainsToClear.size() == 0)
+ {
+ // There is no domain to clear, thus it is
+ // not expected that a cursor is aborted
+ throw e;
+ }
+ // else assumes the aborted cursor is part of a domain
+ // that will be removed on the next iteration
+ logger.trace("Cursor was aborted: %s, but continuing because domainsToClear has size %s",
+ e, domainsToClear.size());
+ }
+ }
+
/**
* Notifies the {@link ChangelogBackend} that a new entry has been added.
*
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 37f0d08..e4692fc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -64,6 +64,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
@@ -125,8 +126,9 @@
private final ReplicationServer replicationServer;
private final AtomicBoolean shutdown = new AtomicBoolean();
+ private static final RepositionableCursor<CSN, UpdateMsg> EMPTY_CURSOR = Log.getEmptyCursor();
private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
- new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
+ new FileReplicaDBCursor(EMPTY_CURSOR, null, AFTER_MATCHING_KEY);
/**
* Creates a new changelog DB.
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 28b2e86..d503cb3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -39,7 +39,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -53,6 +52,7 @@
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.time.TimeService;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -166,7 +166,7 @@
* The list of non-empty cursors opened on this log. Opened cursors may have
* to be updated when rotating the head log file.
*/
- private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+ private final List<AbortableLogCursor<K, V>> openCursors = new CopyOnWriteArrayList<AbortableLogCursor<K, V>>();
/**
* A log file can be rotated once it has exceeded this size limit. The log file can have
@@ -246,6 +246,17 @@
return log;
}
+ /**
+ * Returns an empty cursor.
+ *
+ * @param <K> the type of keys.
+ * @param <V> the type of values.
+ * @return an empty cursor
+ */
+ static <K extends Comparable<K>, V> RepositionableCursor<K, V> getEmptyCursor() {
+ return new Log.EmptyCursor<K, V>();
+ }
+
/** Holds the parameters for log files rotation. */
static class LogRotationParameters {
@@ -331,6 +342,7 @@
this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
this.rotationIntervalInMillis = rotationParams.rotationInterval;
this.lastRotationTime = rotationParams.lastRotationTime;
+
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -435,7 +447,7 @@
LogFile<K, V> headLogFile = getHeadLogFile();
if (mustRotate(headLogFile))
{
- logger.debug(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+ logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
@@ -459,13 +471,20 @@
if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
{
// rotate because file size exceeded threshold
+ logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes());
return true;
}
if (rotationIntervalInMillis > 0)
{
// rotate if time limit is reached
final long timeElapsed = timeService.since(lastRotationTime);
- return timeElapsed > rotationIntervalInMillis;
+ boolean shouldRotate = timeElapsed > rotationIntervalInMillis;
+ if (shouldRotate)
+ {
+ logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s",
+ logPath.getPath(), timeElapsed, rotationIntervalInMillis);
+ }
+ return shouldRotate;
}
return false;
}
@@ -512,15 +531,15 @@
*/
public RepositionableCursor<K, V> getCursor() throws ChangelogException
{
- LogCursor<K, V> cursor = null;
+ AbortableLogCursor<K, V> cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
- return new EmptyLogCursor<K, V>();
+ return new EmptyCursor<K, V>();
}
- cursor = new LogCursor<K, V>(this);
+ cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
cursor.positionTo(null, null, null);
registerCursor(cursor);
return cursor;
@@ -575,15 +594,15 @@
{
return getCursor();
}
- LogCursor<K, V> cursor = null;
+ AbortableLogCursor<K, V> cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
- return new EmptyLogCursor<K, V>();
+ return new EmptyCursor<K, V>();
}
- cursor = new LogCursor<K, V>(this);
+ cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
// Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
@@ -594,7 +613,7 @@
else
{
StaticUtils.close(cursor);
- return new EmptyLogCursor<K, V>();
+ return new EmptyCursor<K, V>();
}
}
catch (ChangelogException e)
@@ -701,12 +720,13 @@
return null;
}
final List<String> undeletableFiles = new ArrayList<String>();
- final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+ final Iterator<LogFile<K, V>> entriesToPurge = logFilesToPurge.values().iterator();
while (entriesToPurge.hasNext())
{
- final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+ final LogFile<K, V> logFile = entriesToPurge.next();
try
{
+ abortCursorsOpenOnLogFile(logFile);
logFile.close();
logFile.delete();
entriesToPurge.remove();
@@ -733,7 +753,23 @@
}
/**
+ * Abort all cursors opened on the provided log file.
+ */
+ private void abortCursorsOpenOnLogFile(LogFile<K, V> logFile)
+ {
+ for (AbortableLogCursor<K, V> cursor : openCursors)
+ {
+ if (cursor.isAccessingLogFile(logFile))
+ {
+ cursor.abort();
+ }
+ }
+ }
+
+ /**
* Empties the log, discarding all records it contains.
+ * <p>
+ * All cursors open on the log are aborted.
*
* @throws ChangelogException
* If cursors are opened on this log, or if a problem occurs during
@@ -750,9 +786,10 @@
}
if (!openCursors.isEmpty())
{
- // Allow opened cursors at this point, but turn them into empty cursors.
- // This behavior is needed by the change number indexer thread.
- switchCursorsOpenedIntoEmptyCursors();
+ // All open cursors are aborted, which means the change number indexer thread
+ // should manage AbortedChangelogCursorException specifically to avoid being
+ // stopped
+ abortAllOpenCursors();
}
// delete all log files
@@ -806,7 +843,6 @@
}
}
- /** {@inheritDoc} */
@Override
public void close()
{
@@ -856,19 +892,19 @@
sharedLock.lock();
try
{
- K key = null;
- for (LogFile<K, V> logFile : logFiles.values())
+ K key = null;
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ final Record<K, V> record = logFile.getOldestRecord();
+ final V2 oldestValue = mapper.map(record.getValue());
+ if (oldestValue.compareTo(limitValue) > 0)
{
- final Record<K, V> record = logFile.getOldestRecord();
- final V2 oldestValue = mapper.map(record.getValue());
- if (oldestValue.compareTo(limitValue) > 0)
- {
- return key;
- }
- key = record.getKey();
+ return key;
}
- return key;
+ key = record.getKey();
}
+ return key;
+ }
finally
{
sharedLock.unlock();
@@ -918,7 +954,7 @@
private void rotateHeadLogFile() throws ChangelogException
{
// Temporarily disable cursors opened on head, saving their state
- final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+ final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
final LogFile<K, V> headLogFile = getHeadLogFile();
final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
@@ -993,31 +1029,30 @@
}
/** Update the cursors that were pointing to head after a rotation of the head log file. */
- private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+ private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
throws ChangelogException
{
- for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
+ for (Pair<AbortableLogCursor<K, V>, CursorState<K, V>> pair : cursors)
{
final CursorState<K, V> cursorState = pair.getSecond();
// Need to update the cursor only if it is pointing to the head log file
- if (isHeadLogFile(cursorState.logFile))
+ if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
final LogFile<K, V> logFile = findLogFileFor(previousKey);
- final LogCursor<K, V> cursor = pair.getFirst();
+ final AbortableLogCursor<K, V> cursor = pair.getFirst();
cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
}
}
}
- private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
+ private void abortAllOpenCursors() throws ChangelogException
{
- for (LogCursor<K, V> cursor : openCursors)
+ for (AbortableLogCursor<K, V> cursor : openCursors)
{
- cursor.actAsEmptyCursor();
+ cursor.abort();
}
- openCursors.clear();
}
/**
@@ -1028,13 +1063,14 @@
* @throws ChangelogException
* If an error occurs.
*/
- private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+ private List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead()
+ throws ChangelogException
{
- final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
- new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
- for (LogCursor<K, V> cursor : openCursors)
+ final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> openCursorsStates = new ArrayList<>();
+ final LogFile<K, V> headLogFile = getHeadLogFile();
+ for (AbortableLogCursor<K, V> cursor : openCursors)
{
- if (isHeadLogFile(cursor.currentLogFile))
+ if (cursor.isAccessingLogFile(headLogFile))
{
openCursorsStates.add(Pair.of(cursor, cursor.getState()));
cursor.closeUnderlyingCursor();
@@ -1058,7 +1094,7 @@
logFiles.put(bounds.getSecond(), logFile);
}
- private void registerCursor(final LogCursor<K, V> cursor)
+ private void registerCursor(final AbortableLogCursor<K, V> cursor)
{
openCursors.add(cursor);
}
@@ -1126,20 +1162,38 @@
}
/**
- * Implements a cursor on the log.
+ * Represents an internal view of a cursor on the log, with extended operations.
* <p>
- * The cursor uses the log shared lock to ensure reads are not done during a rotation.
- * <p>
- * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
- * method.
+ * This is an abstract class rather than an interface to allow reduced visibility of the methods.
*/
- private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+ private abstract static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+ {
+
+ /** Closes the underlying cursor. */
+ abstract void closeUnderlyingCursor();
+
+ /** Returns the state of this cursor. */
+ abstract CursorState<K, V> getState() throws ChangelogException;
+
+ /** Reinitialize this cursor to the provided state. */
+ abstract void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException;
+
+ /** Returns true if cursor is pointing on provided log file. */
+ abstract boolean isAccessingLogFile(LogFile<K, V> logFile);
+
+ }
+
+ /**
+ * Implements an internal cursor on the log.
+ * <p>
+ * This cursor is intended to be used <b>only<b> inside an {@link AbortableLogCursor},
+ * because it is relying on AbortableLogCursor for locking.
+ */
+ private static class InternalLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
{
private final Log<K, V> log;
-
private LogFile<K, V> currentLogFile;
private LogFileCursor<K, V> currentCursor;
- private boolean actAsEmptyCursor;
/**
* Creates a cursor on the provided log.
@@ -1149,66 +1203,40 @@
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
- private LogCursor(final Log<K, V> log) throws ChangelogException
+ private InternalLogCursor(final Log<K, V> log) throws ChangelogException
{
this.log = log;
- this.actAsEmptyCursor = false;
}
- /** {@inheritDoc} */
@Override
public Record<K, V> getRecord()
{
return currentCursor != null ? currentCursor.getRecord() : null;
}
- /** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
- if (actAsEmptyCursor)
+ final boolean hasNext = currentCursor.next();
+ if (hasNext)
{
- return false;
+ return true;
}
- log.sharedLock.lock();
- try
+ final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+ if (logFile != null)
{
- final boolean hasNext = currentCursor.next();
- if (hasNext)
- {
- return true;
- }
- final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
- if (logFile != null)
- {
- switchToLogFile(logFile);
- return currentCursor.next();
- }
- return false;
+ switchToLogFile(logFile);
+ return currentCursor.next();
}
- finally
- {
- log.sharedLock.unlock();
- }
+ return false;
}
- /** {@inheritDoc} */
@Override
public void close()
{
- log.sharedLock.lock();
- try
- {
- StaticUtils.close(currentCursor);
- log.unregisterCursor(this);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ StaticUtils.close(currentCursor);
}
- /** {@inheritDoc} */
@Override
public boolean positionTo(
final K key,
@@ -1216,54 +1244,39 @@
final PositionStrategy positionStrategy)
throws ChangelogException
{
- if (actAsEmptyCursor)
+ final LogFile<K, V> logFile = log.findLogFileFor(key);
+ if (logFile != currentLogFile)
{
- return false;
+ switchToLogFile(logFile);
}
- log.sharedLock.lock();
- try
- {
- final LogFile<K, V> logFile = log.findLogFileFor(key);
- if (logFile != currentLogFile)
- {
- switchToLogFile(logFile);
- }
- return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
/** Returns the state of this cursor. */
- private CursorState<K, V> getState() throws ChangelogException
+ @Override
+ CursorState<K, V> getState() throws ChangelogException
{
- return !actAsEmptyCursor ?
- new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+ return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
}
- private void closeUnderlyingCursor()
+ @Override
+ void closeUnderlyingCursor()
{
StaticUtils.close(currentCursor);
}
/** Reinitialize this cursor to the provided state. */
- private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+ @Override
+ void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
{
- if (!actAsEmptyCursor)
- {
- currentLogFile = cursorState.logFile;
- currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
- }
+ currentLogFile = cursorState.logFile;
+ currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
}
- /** Turn this cursor into an empty cursor, with no actual resource used. */
- private void actAsEmptyCursor()
+ @Override
+ boolean isAccessingLogFile(LogFile<K, V> logFile)
{
- currentLogFile = null;
- currentCursor = null;
- actAsEmptyCursor = true;
+ return currentLogFile != null && currentLogFile.equals(logFile);
}
/** Switch the cursor to the provided log file. */
@@ -1274,53 +1287,281 @@
currentCursor = currentLogFile.getCursor();
}
- /** {@inheritDoc} */
@Override
public String toString()
{
- return actAsEmptyCursor ?
- String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
- String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+ return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
log.logPath, currentLogFile.getFile().getName(), currentCursor);
}
+
}
- /** An empty cursor, that always return null records and false to {@code next()} method. */
- static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+ /**
+ * An empty cursor, that always return null records and false to {@link #next()} method.
+ * <p>
+ * This class is thread-safe.
+ */
+ private static final class EmptyCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
{
- /** {@inheritDoc} */
@Override
public Record<K,V> getRecord()
{
return null;
}
- /** {@inheritDoc} */
@Override
public boolean next()
{
return false;
}
- /** {@inheritDoc} */
@Override
public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
{
return false;
}
- /** {@inheritDoc} */
@Override
public void close()
{
// nothing to do
}
- /** {@inheritDoc} */
@Override
public String toString()
{
- return "EmptyLogCursor";
+ return getClass().getSimpleName();
+ }
+ }
+
+ /**
+ * An aborted cursor, that throws AbortedChangelogCursorException on methods that can
+ * throw a ChangelogException and returns a default value on other methods.
+ * <p>
+ * Although this cursor is thread-safe, it is intended to be used inside an
+ * AbortableLogCursor which manages locking.
+ */
+ private static final class AbortedLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
+ {
+ /** Records the path of the log the aborted cursor was positioned on. */
+ private final File logPath;
+
+ AbortedLogCursor(File logPath)
+ {
+ this.logPath = logPath;
+ }
+
+ @Override
+ public Record<K,V> getRecord()
+ {
+ throw new IllegalStateException("this cursor is aborted");
+ }
+
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ private AbortedChangelogCursorException abortedCursorException()
+ {
+ return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath));
+ }
+
+ @Override
+ public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ @Override
+ public void close()
+ {
+ // nothing to do
+ }
+
+ @Override
+ CursorState<K, V> getState() throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ @Override
+ void closeUnderlyingCursor()
+ {
+ // nothing to do
+ }
+
+ @Override
+ void reinitializeTo(CursorState<K, V> cursorState) throws ChangelogException
+ {
+ throw abortedCursorException();
+ }
+
+ @Override
+ boolean isAccessingLogFile(LogFile<K, V> logFile)
+ {
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName();
+ }
+ }
+
+ /**
+ * A cursor on the log that can be aborted.
+ * <p>
+ * The cursor uses the log sharedLock to ensure no read can occur during a
+ * rotation, a clear or a purge.
+ * <p>
+ * Note that only public methods use the sharedLock. Protected methods are intended to be used only
+ * internally in the Log class when the log exclusiveLock is on.
+ * <p>
+ * The cursor can be be aborted by calling the {@link #abort()} method.
+ */
+ private static class AbortableLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
+ {
+ /** The log on which this cursor is created. */
+ private final Log<K, V> log;
+
+ /** The actual cursor on which methods are delegated. */
+ private LogCursor<K, V> delegate;
+
+ /** Indicates if the cursor must be aborted. */
+ private boolean mustAbort;
+
+ private AbortableLogCursor(Log<K,V> log, LogCursor<K, V> delegate)
+ {
+ this.log = log;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Record<K, V> getRecord()
+ {
+ log.sharedLock.lock();
+ try
+ {
+ return delegate.getRecord();
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ log.sharedLock.lock();
+ try
+ {
+ if (mustAbort)
+ {
+ delegate.close();
+ delegate = new AbortedLogCursor<K, V>(log.getPath());
+ mustAbort = false;
+ }
+ return delegate.next();
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ log.sharedLock.lock();
+ try
+ {
+ delegate.close();
+ log.unregisterCursor(this);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException
+ {
+ log.sharedLock.lock();
+ try
+ {
+ return delegate.positionTo(key, matchStrategy, positionStrategy);
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
+ }
+
+ /**
+ * Aborts this cursor. Once aborted, a cursor throws an
+ * AbortedChangelogCursorException if it is used.
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ void abort()
+ {
+ mustAbort = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ CursorState<K, V> getState() throws ChangelogException
+ {
+ return delegate.getState();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ void closeUnderlyingCursor()
+ {
+ delegate.closeUnderlyingCursor();
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+ {
+ delegate.reinitializeTo(cursorState);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method is called only when log.exclusiveLock has been acquired.
+ */
+ @Override
+ boolean isAccessingLogFile(LogFile<K, V> logFile)
+ {
+ return delegate.isAccessingLogFile(logFile);
+ }
+
+ @Override
+ public String toString()
+ {
+ return delegate.toString();
}
}
@@ -1345,11 +1586,33 @@
/** The record the cursor is pointing to. */
private final Record<K,V> record;
+ private final boolean isValid;
+
+ /** Creates a non-valid state. */
+ private CursorState() {
+ logFile = null;
+ filePosition = 0;
+ record = null;
+ isValid = false;
+ }
+
+ /** Creates a valid state. */
private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
{
this.logFile = logFile;
this.filePosition = filePosition;
this.record = record;
+ isValid = true;
+ }
+
+ /**
+ * Indicates if this state is valid, i.e if it has non-null values.
+ *
+ * @return {@code true iff state is valid}
+ */
+ public boolean isValid()
+ {
+ return isValid;
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index b4ab43c..cedee46 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -500,6 +500,12 @@
return logfile.equals(other.logfile);
}
+ @Override
+ public String toString()
+ {
+ return "LogFile [logfile=" + logfile + ", isWriteEnabled=" + isWriteEnabled + "]";
+ }
+
/** Implements a repositionable cursor on the log file. */
static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
{
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
index 9ae2894..2a9a5f1 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -631,4 +631,6 @@
ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE_288=Could not create \
file '%s' to store last log rotation time %d
ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE_289=Could not delete \
- file '%s' that stored the previous last log rotation time
\ No newline at end of file
+ file '%s' that stored the previous last log rotation time
+ERR_CHANGELOG_CURSOR_ABORTED_290=Cursor on log '%s' has been aborted after \
+ a purge or a clear
\ No newline at end of file
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
index 530f1e5..7b5f4a6 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -36,6 +36,7 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -542,6 +543,56 @@
}
}
+ /**
+ * Similar to testPurge() test but with a concurrent cursor opened before starting the purge.
+ * <p>
+ * For all keys but "key000" the concurrent cursor should be aborted because the corresponding log file
+ * has been purged.
+ */
+ @Test(dataProvider="purgeKeys")
+ public void testPurgeWithConcurrentCursorOpened(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+ int cursorStartIndex, int cursorEndIndex) throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> concurrentCursor = log.getCursor())
+ {
+ concurrentCursor.next();
+ assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+
+ log.purgeUpTo(purgeKey);
+
+ cursor = log.getCursor();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+ assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+
+ // concurrent cursor is expected to be aborted on the next() call for all cases but one
+ assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+ if (purgeKey.equals("key000"))
+ {
+ // in that case no purge has been done, so cursor should not be aborted
+ assertThatCursorCanBeFullyRead(concurrentCursor, cursorStartIndex, cursorEndIndex);
+ }
+ else
+ {
+ // in other cases cursor should be aborted
+ try
+ {
+ concurrentCursor.next();
+ fail("Expected an AbortedChangelogCursorException");
+ }
+ catch (AbortedChangelogCursorException e) {
+ // nothing to do
+ }
+ }
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
{
@Override
@@ -629,6 +680,7 @@
final AtomicReference<ChangelogException> exceptionRef)
{
return new Thread() {
+ @Override
public void run()
{
for (int i = 1; i <= 30; i++)
--
Gitblit v1.10.0