From 8fdf1768757fba933e7ce63ac6381eacec41f0c6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 09 Dec 2015 17:41:58 +0000
Subject: [PATCH] OPENDJ-2476: Purge of file-based changelog is very slow and the changelog size is growing.
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java | 255 ++++++++++++++++++++++++++++----------------------
1 files changed, 143 insertions(+), 112 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 52e2785..b072dfc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -187,14 +187,12 @@
private long lastRotationTime;
/**
- * The exclusive lock used for writes and lifecycle operations on this log:
+ * The exclusive lock used for log rotation and lifecycle operations on this log:
* initialize, clear, sync and close.
*/
private final Lock exclusiveLock;
- /**
- * The shared lock used for reads and cursor operations on this log.
- */
+ /** The shared lock used for write operations and accessing {@link #logFiles} map. */
private final Lock sharedLock;
/**
@@ -416,8 +414,10 @@
* appended and the method returns immediately.
* <p>
* In order to ensure that record is written out of buffers and persisted
- * to file system, it is necessary to explicitely call the
+ * to file system, it is necessary to explicitly call the
* {@code syncToFileSystem()} method.
+ * <p>
+ * This method is not thread-safe.
*
* @param record
* The record to add.
@@ -426,22 +426,42 @@
*/
public void append(final Record<K, V> record) throws ChangelogException
{
- // If this exclusive lock happens to be a bottleneck :
- // 1. use a shared lock for appending the record first
- // 2. switch to an exclusive lock only if rotation is needed
- // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
- exclusiveLock.lock();
+ // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
+ if (recordIsBreakingKeyOrdering(record))
+ {
+ logger.info(LocalizableMessage.raw(
+ "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
+ lastAppendedKey != null ? lastAppendedKey : "null"));
+ return;
+ }
+
+ // Fast-path - assume that no rotation is needed and use shared lock.
+ sharedLock.lock();
try
{
if (isClosed)
{
return;
}
- if (recordIsBreakingKeyOrdering(record))
+ LogFile<K, V> headLogFile = getHeadLogFile();
+ if (!mustRotate(headLogFile))
{
- logger.info(LocalizableMessage.raw(
- "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
- lastAppendedKey != null ? lastAppendedKey : "null"));
+ headLogFile.append(record);
+ lastAppendedKey = record.getKey();
+ return;
+ }
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+
+ // Slow-path - rotation is needed so use exclusive lock.
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
return;
}
LogFile<K, V> headLogFile = getHeadLogFile();
@@ -837,9 +857,17 @@
*/
void dumpAsTextFile(File dumpDirectory) throws ChangelogException
{
- for (LogFile<K, V> logFile : logFiles.values())
+ sharedLock.lock();
+ try
{
- logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+ }
+ }
+ finally
+ {
+ sharedLock.unlock();
}
}
@@ -892,19 +920,19 @@
sharedLock.lock();
try
{
- K key = null;
- for (LogFile<K, V> logFile : logFiles.values())
- {
- final Record<K, V> record = logFile.getOldestRecord();
- final V2 oldestValue = mapper.map(record.getValue());
- if (oldestValue.compareTo(limitValue) > 0)
+ K key = null;
+ for (LogFile<K, V> logFile : logFiles.values())
{
- return key;
+ final Record<K, V> record = logFile.getOldestRecord();
+ final V2 oldestValue = mapper.map(record.getValue());
+ if (oldestValue.compareTo(limitValue) > 0)
+ {
+ return key;
+ }
+ key = record.getKey();
}
- key = record.getKey();
+ return key;
}
- return key;
- }
finally
{
sharedLock.unlock();
@@ -950,6 +978,7 @@
* <p>
* All cursors opened on this log are temporarily disabled (closing underlying resources)
* and then re-open with their previous state.
+ * @GuardedBy("exclusiveLock")
*/
private void rotateHeadLogFile() throws ChangelogException
{
@@ -1028,7 +1057,10 @@
+ recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
}
- /** Update the cursors that were pointing to head after a rotation of the head log file. */
+ /**
+ * Update the cursors that were pointing to head after a rotation of the head log file.
+ * @GuardedBy("exclusiveLock")
+ */
private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
throws ChangelogException
{
@@ -1040,7 +1072,7 @@
if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
- final LogFile<K, V> logFile = findLogFileFor(previousKey);
+ final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
final AbortableLogCursor<K, V> cursor = pair.getFirst();
cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
}
@@ -1058,6 +1090,7 @@
/**
* Disable the cursors opened on the head log file log, by closing their underlying cursor.
* Returns the state of each cursor just before the close operation.
+ * @GuardedBy("exclusiveLock")
*
* @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
* @throws ChangelogException
@@ -1111,12 +1144,20 @@
*/
private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
{
- if (isHeadLogFile(currentLogFile))
+ sharedLock.lock();
+ try
{
- return null;
+ if (isHeadLogFile(currentLogFile))
+ {
+ return null;
+ }
+ final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+ return logFiles.higherEntry(bounds.getSecond()).getValue();
}
- final Pair<K, K> bounds = getKeyBounds(currentLogFile);
- return logFiles.higherEntry(bounds.getSecond()).getValue();
+ finally
+ {
+ sharedLock.unlock();
+ }
}
private boolean isHeadLogFile(final LogFile<K, V> logFile)
@@ -1124,14 +1165,23 @@
return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
}
- /** Returns the log file that should contain the provided key. */
- private LogFile<K, V> findLogFileFor(final K key)
+ /** @GuardedBy("sharedLock") */
+ private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
{
if (key == null || logFiles.lowerKey(key) == null)
{
return getOldestLogFile();
}
- return logFiles.ceilingEntry(key).getValue();
+
+ final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue();
+ if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
+ && candidate.getOldestRecord().getKey().compareTo(key) > 0)
+ {
+ // This handle the special case where the first key of the candidate is actually greater than the expected one.
+ // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
+ return logFiles.floorEntry(key).getValue();
+ }
+ return candidate;
}
/**
@@ -1217,18 +1267,28 @@
@Override
public boolean next() throws ChangelogException
{
- final boolean hasNext = currentCursor.next();
- if (hasNext)
+ // Lock is needed here to ensure that log rotation is performed atomically.
+ // This ensures that currentCursor will not be aborted concurrently.
+ log.sharedLock.lock();
+ try
{
- return true;
+ final boolean hasNext = currentCursor.next();
+ if (hasNext)
+ {
+ return true;
+ }
+ final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+ if (logFile != null)
+ {
+ switchToLogFile(logFile);
+ return currentCursor.next();
+ }
+ return false;
}
- final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
- if (logFile != null)
+ finally
{
- switchToLogFile(logFile);
- return currentCursor.next();
+ log.sharedLock.unlock();
}
- return false;
}
@Override
@@ -1244,19 +1304,38 @@
final PositionStrategy positionStrategy)
throws ChangelogException
{
- final LogFile<K, V> logFile = log.findLogFileFor(key);
- if (logFile != currentLogFile)
+ // Lock is needed here to ensure that log rotation is performed atomically.
+ // This ensures that currentLogFile will not be closed concurrently.
+ log.sharedLock.lock();
+ try
{
- switchToLogFile(logFile);
+ final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy);
+ if (logFile != currentLogFile)
+ {
+ switchToLogFile(logFile);
+ }
+ return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
- return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
+ finally
+ {
+ log.sharedLock.unlock();
+ }
}
- /** Returns the state of this cursor. */
@Override
CursorState<K, V> getState() throws ChangelogException
{
- return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+ // Lock is needed here to ensure that log rotation is performed atomically.
+ // This ensures that currentCursor will not be aborted concurrently.
+ log.sharedLock.lock();
+ try
+ {
+ return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
}
@Override
@@ -1442,65 +1521,33 @@
@Override
public Record<K, V> getRecord()
{
- log.sharedLock.lock();
- try
- {
- return delegate.getRecord();
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ return delegate.getRecord();
}
@Override
- public boolean next() throws ChangelogException
+ public synchronized boolean next() throws ChangelogException
{
- log.sharedLock.lock();
- try
+ if (mustAbort)
{
- if (mustAbort)
- {
- delegate.close();
- delegate = new AbortedLogCursor<>(log.getPath());
- mustAbort = false;
- }
- return delegate.next();
+ delegate.close();
+ delegate = new AbortedLogCursor<>(log.getPath());
+ mustAbort = false;
}
- finally
- {
- log.sharedLock.unlock();
- }
+ return delegate.next();
}
@Override
public void close()
{
- log.sharedLock.lock();
- try
- {
- delegate.close();
- log.unregisterCursor(this);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ delegate.close();
+ log.unregisterCursor(this);
}
@Override
public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
throws ChangelogException
{
- log.sharedLock.lock();
- try
- {
- return delegate.positionTo(key, matchStrategy, positionStrategy);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ return delegate.positionTo(key, matchStrategy, positionStrategy);
}
/**
@@ -1509,49 +1556,33 @@
* <p>
* This method is called only when log.exclusiveLock has been acquired.
*/
- void abort()
+ synchronized void abort()
{
mustAbort = true;
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
CursorState<K, V> getState() throws ChangelogException
{
return delegate.getState();
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
void closeUnderlyingCursor()
{
delegate.closeUnderlyingCursor();
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
{
delegate.reinitializeTo(cursorState);
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
boolean isAccessingLogFile(LogFile<K, V> logFile)
{
--
Gitblit v1.10.0