From 334014cfe2047b73bb9770035d2ea4e5e089cfdb Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 04 Jun 2014 10:19:55 +0000
Subject: [PATCH] OPENDJ-1483 : On Windows, problem with changelog's file rotation when running replication topology with file-based changelog OPENDJ-1487 : File based changelog : cursors opened when clearing the log
---
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 126 +++++++++++++++++++++++++++++++----------
1 files changed, 94 insertions(+), 32 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 350b70e..20522bd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -663,13 +663,9 @@
}
if (!openCursors.isEmpty())
{
- // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
- // there is one open cursor when clearing.
- // Switch to logging until this issue is solved
- // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
- // logPath.getPath(), openCursors.size()));
- ErrorLogger.logError(
- ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
+ // Allow opened cursors at this point, but turn them into empty cursors.
+ // This behavior is needed by the change number indexer thread.
+ switchCursorsOpenedIntoEmptyCursors();
}
// delete all log files
@@ -747,9 +743,18 @@
return logFiles.firstEntry().getValue();
}
- /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
+ /**
+ * Rotate the head log file to a read-only log file, and open a new empty head
+ * log file to write in.
+ * <p>
+ * All cursors opened on this log are temporarily disabled (closing underlying resources)
+ * and then re-open with their previous state.
+ */
private void rotateHeadLogFile() throws ChangelogException
{
+ // Temporarily disable cursors opened on head, saving their state
+ final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+
final LogFile<K, V> headLogFile = getHeadLogFile();
final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
headLogFile.close();
@@ -757,7 +762,9 @@
openHeadLogFile();
openReadOnlyLogFile(readOnlyLogFile);
- updateCursorsOpenedOnHead();
+
+ // Re-enable cursors previously opened on head, with the saved state
+ updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
}
private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
@@ -816,31 +823,55 @@
+ recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
}
- /** Update the open cursors after a rotation of the head log file. */
- private void updateCursorsOpenedOnHead() throws ChangelogException
+ /** Update the cursors that were pointing to head after a rotation of the head log file. */
+ private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+ throws ChangelogException
{
- for (LogCursor<K, V> cursor : openCursors)
+ for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
{
- final CursorState<K, V> state = cursor.getState();
+ final CursorState<K, V> cursorState = pair.getSecond();
+
// Need to update the cursor only if it is pointing to the head log file
- if (isHeadLogFile(state.logFile))
+ if (isHeadLogFile(cursorState.logFile))
{
- updateOpenCursor(cursor, state);
+ final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
+ final LogFile<K, V> logFile = findLogFileFor(previousKey);
+ final LogCursor<K, V> cursor = pair.getFirst();
+ cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
}
}
}
- /**
- * Update the provided open cursor with the provided state.
- * <p>
- * The cursor must report the previous state on the head log file to the same
- * state (position in file, current record) in the read-only log file just created.
- */
- private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException
+ private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
{
- final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
- final LogFile<K, V> logFile = findLogFileFor(previousKey);
- cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record));
+ for (LogCursor<K, V> cursor : openCursors)
+ {
+ cursor.actAsEmptyCursor();
+ }
+ openCursors.clear();
+ }
+
+ /**
+ * Disable the cursors opened on the head log file log, by closing their underlying cursor.
+ * Returns the state of each cursor just before the close operation.
+ *
+ * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+ {
+ final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
+ new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
+ for (LogCursor<K, V> cursor : openCursors)
+ {
+ if (isHeadLogFile(cursor.currentLogFile))
+ {
+ openCursorsStates.add(Pair.of(cursor, cursor.getState()));
+ cursor.closeUnderlyingCursor();
+ }
+ }
+ return openCursorsStates;
}
private void openHeadLogFile() throws ChangelogException
@@ -930,6 +961,9 @@
* {@code cursor.next()} method.
* <p>
* The cursor uses the log shared lock to ensure reads are not done during a rotation.
+ * <p>
+ * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
+ * method.
*/
private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
{
@@ -937,6 +971,7 @@
private LogFile<K, V> currentLogFile;
private LogFileCursor<K, V> currentCursor;
+ private boolean actAsEmptyCursor;
/**
* Creates a cursor on the provided log.
@@ -949,19 +984,24 @@
private LogCursor(final Log<K, V> log) throws ChangelogException
{
this.log = log;
+ this.actAsEmptyCursor = false;
}
/** {@inheritDoc} */
@Override
public Record<K, V> getRecord()
{
- return currentCursor.getRecord();
+ return currentCursor != null ? currentCursor.getRecord() : null;
}
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
+ if (actAsEmptyCursor)
+ {
+ return false;
+ }
log.sharedLock.lock();
try
{
@@ -1004,6 +1044,10 @@
@Override
public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
{
+ if (actAsEmptyCursor)
+ {
+ return false;
+ }
log.sharedLock.lock();
try
{
@@ -1033,15 +1077,31 @@
/** Returns the state of this cursor. */
private CursorState<K, V> getState() throws ChangelogException
{
- return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+ return !actAsEmptyCursor ?
+ new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+ }
+
+ private void closeUnderlyingCursor()
+ {
+ StaticUtils.close(currentCursor);
}
/** Reinitialize this cursor to the provided state. */
private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
{
- StaticUtils.close(currentCursor);
- currentLogFile = cursorState.logFile;
- currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+ if (!actAsEmptyCursor)
+ {
+ currentLogFile = cursorState.logFile;
+ currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+ }
+ }
+
+ /** Turn this cursor into an empty cursor, with no actual resource used. */
+ private void actAsEmptyCursor()
+ {
+ currentLogFile = null;
+ currentCursor = null;
+ actAsEmptyCursor = true;
}
/** Switch the cursor to the provided log file. */
@@ -1055,8 +1115,10 @@
/** {@inheritDoc} */
public String toString()
{
- return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
- log.logPath, currentLogFile.getFile().getName(), currentCursor);
+ return actAsEmptyCursor ?
+ String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
+ String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+ log.logPath, currentLogFile.getFile().getName(), currentCursor);
}
}
--
Gitblit v1.10.0