From 8fdf1768757fba933e7ce63ac6381eacec41f0c6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 09 Dec 2015 17:41:58 +0000
Subject: [PATCH] OPENDJ-2476: Purge of file-based changelog is very slow and the changelog size is growing.

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java |  255 ++++++++++++++++++++++++++++----------------------
 1 files changed, 143 insertions(+), 112 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 52e2785..b072dfc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -187,14 +187,12 @@
   private long lastRotationTime;
 
   /**
-   * The exclusive lock used for writes and lifecycle operations on this log:
+   * The exclusive lock used for log rotation and lifecycle operations on this log:
    * initialize, clear, sync and close.
    */
   private final Lock exclusiveLock;
 
-  /**
-   * The shared lock used for reads and cursor operations on this log.
-   */
+  /** The shared lock used for write operations and accessing {@link #logFiles} map. */
   private final Lock sharedLock;
 
   /**
@@ -416,8 +414,10 @@
    * appended and the method returns immediately.
    * <p>
    * In order to ensure that record is written out of buffers and persisted
-   * to file system, it is necessary to explicitely call the
+   * to file system, it is necessary to explicitly call the
    * {@code syncToFileSystem()} method.
+   * <p>
+   * This method is not thread-safe.
    *
    * @param record
    *          The record to add.
@@ -426,22 +426,42 @@
    */
   public void append(final Record<K, V> record) throws ChangelogException
   {
-    // If this exclusive lock happens to be a bottleneck :
-    // 1. use a shared lock for appending the record first
-    // 2. switch to an exclusive lock only if rotation is needed
-    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
-    exclusiveLock.lock();
+    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
+    if (recordIsBreakingKeyOrdering(record))
+    {
+      logger.info(LocalizableMessage.raw(
+              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
+              lastAppendedKey != null ? lastAppendedKey : "null"));
+      return;
+    }
+
+    // Fast-path - assume that no rotation is needed and use shared lock.
+    sharedLock.lock();
     try
     {
       if (isClosed)
       {
         return;
       }
-      if (recordIsBreakingKeyOrdering(record))
+      LogFile<K, V> headLogFile = getHeadLogFile();
+      if (!mustRotate(headLogFile))
       {
-        logger.info(LocalizableMessage.raw(
-            "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
-            lastAppendedKey != null ? lastAppendedKey : "null"));
+        headLogFile.append(record);
+        lastAppendedKey = record.getKey();
+        return;
+      }
+    }
+    finally
+    {
+      sharedLock.unlock();
+    }
+
+    // Slow-path - rotation is needed so use exclusive lock.
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
         return;
       }
       LogFile<K, V> headLogFile = getHeadLogFile();
@@ -837,9 +857,17 @@
    */
   void dumpAsTextFile(File dumpDirectory) throws ChangelogException
   {
-    for (LogFile<K, V> logFile : logFiles.values())
+    sharedLock.lock();
+    try
     {
-      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+      for (LogFile<K, V> logFile : logFiles.values())
+      {
+        logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+      }
+    }
+    finally
+    {
+      sharedLock.unlock();
     }
   }
 
@@ -892,19 +920,19 @@
     sharedLock.lock();
     try
     {
-    K key = null;
-    for (LogFile<K, V> logFile : logFiles.values())
-    {
-      final Record<K, V> record = logFile.getOldestRecord();
-      final V2 oldestValue = mapper.map(record.getValue());
-      if (oldestValue.compareTo(limitValue) > 0)
+      K key = null;
+      for (LogFile<K, V> logFile : logFiles.values())
       {
-        return key;
+        final Record<K, V> record = logFile.getOldestRecord();
+        final V2 oldestValue = mapper.map(record.getValue());
+        if (oldestValue.compareTo(limitValue) > 0)
+        {
+          return key;
+        }
+        key = record.getKey();
       }
-      key = record.getKey();
+      return key;
     }
-    return key;
-  }
     finally
     {
       sharedLock.unlock();
@@ -950,6 +978,7 @@
    * <p>
    * All cursors opened on this log are temporarily disabled (closing underlying resources)
    * and then re-open with their previous state.
+   * @GuardedBy("exclusiveLock")
    */
   private void rotateHeadLogFile() throws ChangelogException
   {
@@ -1028,7 +1057,10 @@
        + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
   }
 
-  /** Update the cursors that were pointing to head after a rotation of the head log file. */
+  /**
+   * Update the cursors that were pointing to head after a rotation of the head log file.
+   * @GuardedBy("exclusiveLock")
+   */
   private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
       throws ChangelogException
   {
@@ -1040,7 +1072,7 @@
       if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
       {
         final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
-        final LogFile<K, V> logFile = findLogFileFor(previousKey);
+        final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
         final AbortableLogCursor<K, V> cursor = pair.getFirst();
         cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
       }
@@ -1058,6 +1090,7 @@
   /**
    * Disable the cursors opened on the head log file log, by closing their underlying cursor.
    * Returns the state of each cursor just before the close operation.
+   * @GuardedBy("exclusiveLock")
    *
    * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
    * @throws ChangelogException
@@ -1111,12 +1144,20 @@
    */
   private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
   {
-    if (isHeadLogFile(currentLogFile))
+    sharedLock.lock();
+    try
     {
-      return null;
+      if (isHeadLogFile(currentLogFile))
+      {
+        return null;
+      }
+      final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+      return logFiles.higherEntry(bounds.getSecond()).getValue();
     }
-    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
-    return logFiles.higherEntry(bounds.getSecond()).getValue();
+    finally
+    {
+      sharedLock.unlock();
+    }
   }
 
   private boolean isHeadLogFile(final LogFile<K, V> logFile)
@@ -1124,14 +1165,23 @@
     return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
   }
 
-  /** Returns the log file that should contain the provided key. */
-  private LogFile<K, V> findLogFileFor(final K key)
+  /** @GuardedBy("sharedLock") */
+  private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
   {
     if (key == null || logFiles.lowerKey(key) == null)
     {
       return getOldestLogFile();
     }
-    return logFiles.ceilingEntry(key).getValue();
+
+    final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue();
+    if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
+        && candidate.getOldestRecord().getKey().compareTo(key) > 0)
+    {
+      // This handle the special case where the first key of the candidate is actually greater than the expected one.
+      // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
+      return logFiles.floorEntry(key).getValue();
+    }
+    return candidate;
   }
 
   /**
@@ -1217,18 +1267,28 @@
     @Override
     public boolean next() throws ChangelogException
     {
-      final boolean hasNext = currentCursor.next();
-      if (hasNext)
+      // Lock is needed here to ensure that log rotation is performed atomically.
+      // This ensures that currentCursor will not be aborted concurrently.
+      log.sharedLock.lock();
+      try
       {
-        return true;
+        final boolean hasNext = currentCursor.next();
+        if (hasNext)
+        {
+          return true;
+        }
+        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+        if (logFile != null)
+        {
+          switchToLogFile(logFile);
+          return currentCursor.next();
+        }
+        return false;
       }
-      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
-      if (logFile != null)
+      finally
       {
-        switchToLogFile(logFile);
-        return currentCursor.next();
+        log.sharedLock.unlock();
       }
-      return false;
     }
 
     @Override
@@ -1244,19 +1304,38 @@
         final PositionStrategy positionStrategy)
             throws ChangelogException
     {
-      final LogFile<K, V> logFile = log.findLogFileFor(key);
-      if (logFile != currentLogFile)
+      // Lock is needed here to ensure that log rotation is performed atomically.
+      // This ensures that currentLogFile will not be closed concurrently.
+      log.sharedLock.lock();
+      try
       {
-        switchToLogFile(logFile);
+        final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy);
+        if (logFile != currentLogFile)
+        {
+          switchToLogFile(logFile);
+        }
+        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
       }
-      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
+      finally
+      {
+        log.sharedLock.unlock();
+      }
     }
 
-    /** Returns the state of this cursor. */
     @Override
     CursorState<K, V> getState() throws ChangelogException
     {
-      return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+      // Lock is needed here to ensure that log rotation is performed atomically.
+      // This ensures that currentCursor will not be aborted concurrently.
+      log.sharedLock.lock();
+      try
+      {
+        return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
     }
 
     @Override
@@ -1442,65 +1521,33 @@
     @Override
     public Record<K, V> getRecord()
     {
-      log.sharedLock.lock();
-      try
-      {
-        return delegate.getRecord();
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return delegate.getRecord();
     }
 
     @Override
-    public boolean next() throws ChangelogException
+    public synchronized boolean next() throws ChangelogException
     {
-      log.sharedLock.lock();
-      try
+      if (mustAbort)
       {
-        if (mustAbort)
-        {
-          delegate.close();
-          delegate = new AbortedLogCursor<>(log.getPath());
-          mustAbort = false;
-        }
-        return delegate.next();
+        delegate.close();
+        delegate = new AbortedLogCursor<>(log.getPath());
+        mustAbort = false;
       }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return delegate.next();
     }
 
     @Override
     public void close()
     {
-      log.sharedLock.lock();
-      try
-      {
-        delegate.close();
-        log.unregisterCursor(this);
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      delegate.close();
+      log.unregisterCursor(this);
     }
 
     @Override
     public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
         throws ChangelogException
     {
-      log.sharedLock.lock();
-      try
-      {
-        return delegate.positionTo(key, matchStrategy, positionStrategy);
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return delegate.positionTo(key, matchStrategy, positionStrategy);
     }
 
     /**
@@ -1509,49 +1556,33 @@
      * <p>
      * This method is called only when log.exclusiveLock has been acquired.
      */
-    void abort()
+    synchronized void abort()
     {
       mustAbort = true;
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     CursorState<K, V> getState() throws ChangelogException
     {
       return delegate.getState();
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     void closeUnderlyingCursor()
     {
       delegate.closeUnderlyingCursor();
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
     {
       delegate.reinitializeTo(cursorState);
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     boolean isAccessingLogFile(LogFile<K, V> logFile)
     {

--
Gitblit v1.10.0