mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
01.45.2015 8fdf1768757fba933e7ce63ac6381eacec41f0c6
OPENDJ-2476: Purge of file-based changelog is very slow and the
changelog size is growing.

Replaced the exclusive-lock in Log.append() by a shared-lock so that
cursor can iterate inside logfiles without being contended by append().

Pushed down lock into LogFile to ensure write atomicity.
4 files modified
413 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java 255 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java 110 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java 44 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2014 ForgeRock AS
 *      Portions Copyright 2014-2015 ForgeRock AS
 */
package org.opends.server.loggers;
@@ -37,7 +37,7 @@
public final class MeteredStream extends OutputStream
{
  OutputStream out;
  long written;
  volatile long written;
  /**
   * Create the stream wrapped around the specified output
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -187,14 +187,12 @@
  private long lastRotationTime;
  /**
   * The exclusive lock used for writes and lifecycle operations on this log:
   * The exclusive lock used for log rotation and lifecycle operations on this log:
   * initialize, clear, sync and close.
   */
  private final Lock exclusiveLock;
  /**
   * The shared lock used for reads and cursor operations on this log.
   */
  /** The shared lock used for write operations and accessing {@link #logFiles} map. */
  private final Lock sharedLock;
  /**
@@ -416,8 +414,10 @@
   * appended and the method returns immediately.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * to file system, it is necessary to explicitly call the
   * {@code syncToFileSystem()} method.
   * <p>
   * This method is not thread-safe.
   *
   * @param record
   *          The record to add.
@@ -426,22 +426,42 @@
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
    // If this exclusive lock happens to be a bottleneck :
    // 1. use a shared lock for appending the record first
    // 2. switch to an exclusive lock only if rotation is needed
    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
    exclusiveLock.lock();
    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
    if (recordIsBreakingKeyOrdering(record))
    {
      logger.info(LocalizableMessage.raw(
              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
              lastAppendedKey != null ? lastAppendedKey : "null"));
      return;
    }
    // Fast-path - assume that no rotation is needed and use shared lock.
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (recordIsBreakingKeyOrdering(record))
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (!mustRotate(headLogFile))
      {
        logger.info(LocalizableMessage.raw(
            "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
            lastAppendedKey != null ? lastAppendedKey : "null"));
        headLogFile.append(record);
        lastAppendedKey = record.getKey();
        return;
      }
    }
    finally
    {
      sharedLock.unlock();
    }
    // Slow-path - rotation is needed so use exclusive lock.
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
@@ -837,9 +857,17 @@
   */
  void dumpAsTextFile(File dumpDirectory) throws ChangelogException
  {
    for (LogFile<K, V> logFile : logFiles.values())
    sharedLock.lock();
    try
    {
      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
      for (LogFile<K, V> logFile : logFiles.values())
      {
        logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
      }
    }
    finally
    {
      sharedLock.unlock();
    }
  }
@@ -892,19 +920,19 @@
    sharedLock.lock();
    try
    {
    K key = null;
    for (LogFile<K, V> logFile : logFiles.values())
    {
      final Record<K, V> record = logFile.getOldestRecord();
      final V2 oldestValue = mapper.map(record.getValue());
      if (oldestValue.compareTo(limitValue) > 0)
      K key = null;
      for (LogFile<K, V> logFile : logFiles.values())
      {
        return key;
        final Record<K, V> record = logFile.getOldestRecord();
        final V2 oldestValue = mapper.map(record.getValue());
        if (oldestValue.compareTo(limitValue) > 0)
        {
          return key;
        }
        key = record.getKey();
      }
      key = record.getKey();
      return key;
    }
    return key;
  }
    finally
    {
      sharedLock.unlock();
@@ -950,6 +978,7 @@
   * <p>
   * All cursors opened on this log are temporarily disabled (closing underlying resources)
   * and then re-open with their previous state.
   * @GuardedBy("exclusiveLock")
   */
  private void rotateHeadLogFile() throws ChangelogException
  {
@@ -1028,7 +1057,10 @@
       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
  }
  /** Update the cursors that were pointing to head after a rotation of the head log file. */
  /**
   * Update the cursors that were pointing to head after a rotation of the head log file.
   * @GuardedBy("exclusiveLock")
   */
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
      throws ChangelogException
  {
@@ -1040,7 +1072,7 @@
      if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
      {
        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
        final LogFile<K, V> logFile = findLogFileFor(previousKey);
        final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
        final AbortableLogCursor<K, V> cursor = pair.getFirst();
        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
      }
@@ -1058,6 +1090,7 @@
  /**
   * Disable the cursors opened on the head log file log, by closing their underlying cursor.
   * Returns the state of each cursor just before the close operation.
   * @GuardedBy("exclusiveLock")
   *
   * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
   * @throws ChangelogException
@@ -1111,12 +1144,20 @@
   */
  private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
  {
    if (isHeadLogFile(currentLogFile))
    sharedLock.lock();
    try
    {
      return null;
      if (isHeadLogFile(currentLogFile))
      {
        return null;
      }
      final Pair<K, K> bounds = getKeyBounds(currentLogFile);
      return logFiles.higherEntry(bounds.getSecond()).getValue();
    }
    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
    return logFiles.higherEntry(bounds.getSecond()).getValue();
    finally
    {
      sharedLock.unlock();
    }
  }
  private boolean isHeadLogFile(final LogFile<K, V> logFile)
@@ -1124,14 +1165,23 @@
    return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
  }
  /** Returns the log file that should contain the provided key. */
  private LogFile<K, V> findLogFileFor(final K key)
  /** @GuardedBy("sharedLock") */
  private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
  {
    if (key == null || logFiles.lowerKey(key) == null)
    {
      return getOldestLogFile();
    }
    return logFiles.ceilingEntry(key).getValue();
    final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue();
    if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
        && candidate.getOldestRecord().getKey().compareTo(key) > 0)
    {
      // This handle the special case where the first key of the candidate is actually greater than the expected one.
      // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
      return logFiles.floorEntry(key).getValue();
    }
    return candidate;
  }
  /**
@@ -1217,18 +1267,28 @@
    @Override
    public boolean next() throws ChangelogException
    {
      final boolean hasNext = currentCursor.next();
      if (hasNext)
      // Lock is needed here to ensure that log rotation is performed atomically.
      // This ensures that currentCursor will not be aborted concurrently.
      log.sharedLock.lock();
      try
      {
        return true;
        final boolean hasNext = currentCursor.next();
        if (hasNext)
        {
          return true;
        }
        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return currentCursor.next();
        }
        return false;
      }
      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
      if (logFile != null)
      finally
      {
        switchToLogFile(logFile);
        return currentCursor.next();
        log.sharedLock.unlock();
      }
      return false;
    }
    @Override
@@ -1244,19 +1304,38 @@
        final PositionStrategy positionStrategy)
            throws ChangelogException
    {
      final LogFile<K, V> logFile = log.findLogFileFor(key);
      if (logFile != currentLogFile)
      // Lock is needed here to ensure that log rotation is performed atomically.
      // This ensures that currentLogFile will not be closed concurrently.
      log.sharedLock.lock();
      try
      {
        switchToLogFile(logFile);
        final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy);
        if (logFile != currentLogFile)
        {
          switchToLogFile(logFile);
        }
        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      }
      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** Returns the state of this cursor. */
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
      // Lock is needed here to ensure that log rotation is performed atomically.
      // This ensures that currentCursor will not be aborted concurrently.
      log.sharedLock.lock();
      try
      {
        return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
@@ -1442,65 +1521,33 @@
    @Override
    public Record<K, V> getRecord()
    {
      log.sharedLock.lock();
      try
      {
        return delegate.getRecord();
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return delegate.getRecord();
    }
    @Override
    public boolean next() throws ChangelogException
    public synchronized boolean next() throws ChangelogException
    {
      log.sharedLock.lock();
      try
      if (mustAbort)
      {
        if (mustAbort)
        {
          delegate.close();
          delegate = new AbortedLogCursor<>(log.getPath());
          mustAbort = false;
        }
        return delegate.next();
        delegate.close();
        delegate = new AbortedLogCursor<>(log.getPath());
        mustAbort = false;
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return delegate.next();
    }
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        delegate.close();
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      delegate.close();
      log.unregisterCursor(this);
    }
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
        throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        return delegate.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return delegate.positionTo(key, matchStrategy, positionStrategy);
    }
    /**
@@ -1509,49 +1556,33 @@
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    void abort()
    synchronized void abort()
    {
      mustAbort = true;
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return delegate.getState();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    void closeUnderlyingCursor()
    {
      delegate.closeUnderlyingCursor();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      delegate.reinitializeTo(cursorState);
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -33,6 +33,9 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -52,9 +55,7 @@
 * </ul>
 * <p>
 * A log file is NOT intended to be used directly, but only has part of a
 * {@link Log}. In particular, there is no concurrency management and no checks
 * to ensure that log is not closed when performing any operation on it. Those
 * are managed at the {@code Log} level.
 * {@link Log}.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
@@ -80,6 +81,12 @@
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
  /** Lock used to ensure write atomicity. */
  private final Lock exclusiveLock;
  /** Lock used to ensure that log file is in a consistent state when reading it. */
  private final Lock sharedLock;
  private Record<K, V> newestRecord;
  /**
@@ -113,6 +120,10 @@
      writer = null;
    }
    readerPool = new LogReaderPool<>(logfile, parser);
    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    exclusiveLock = rwLock.writeLock();
    sharedLock = rwLock.readLock();
  }
  /**
@@ -204,8 +215,8 @@
   */
  private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
  {
    try(final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
        final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
    try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
         final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
    {
      final long lastValidPosition = reader.checkLogIsValid();
      if (lastValidPosition != -1)
@@ -238,8 +249,16 @@
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    writer.write(record);
    newestRecord = record;
    exclusiveLock.lock();
    try
    {
      writer.write(record);
      newestRecord = record;
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
@@ -252,8 +271,8 @@
   */
  void dumpAsTextFile(File dumpFile) throws ChangelogException
  {
    try(final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
        final DBCursor<Record<K, V>> cursor = getCursor())
    try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
         final DBCursor<Record<K, V>> cursor = getCursor())
    {
      while (cursor.getRecord() != null)
      {
@@ -287,6 +306,7 @@
  void syncToFileSystem() throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    sharedLock.lock();
    try
    {
      writer.sync();
@@ -295,6 +315,10 @@
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
@@ -355,7 +379,15 @@
    {
      try (BlockLogReader<K, V> reader = getReader())
      {
        newestRecord = reader.getNewestRecord();
        sharedLock.lock();
        try
        {
          newestRecord = reader.getNewestRecord();
        }
        finally
        {
          sharedLock.unlock();
        }
      }
      catch (IOException ioe)
      {
@@ -375,7 +407,7 @@
  long getNumberOfRecords() throws ChangelogException
  {
    // TODO  : need a more efficient way to retrieve it
    try(final DBCursor<Record<K, V>> cursor = getCursor())
    try (final DBCursor<Record<K, V>> cursor = getCursor())
    {
      long counter = 0L;
      while (cursor.next())
@@ -414,9 +446,18 @@
   */
  void delete() throws ChangelogException
  {
    if (!logfile.delete())
    exclusiveLock.lock();
    try
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
      final boolean isDeleted = logfile.delete();
      if (!isDeleted)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
      }
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
@@ -517,14 +558,22 @@
     * pointing to the provided file position.
     * <p>
     * Note: there is no check to ensure that provided record and file position are
     * consistent. It is the responsability of the caller of this method.
     * consistent. It is the responsibility of the caller of this method.
     */
    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      reader.seekToPosition(filePosition);
      logFile.sharedLock.lock();
      try
      {
        reader.seekToPosition(filePosition);
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
@@ -538,7 +587,15 @@
        initialRecord = null;
        return true;
      }
      currentRecord = reader.readRecord();
      logFile.sharedLock.lock();
      try
      {
        currentRecord = reader.readRecord();
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
      return currentRecord != null;
    }
@@ -553,7 +610,16 @@
    @Override
    public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
        throws ChangelogException {
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
      final Pair<Boolean, Record<K, V>> result;
      logFile.sharedLock.lock();
      try
      {
        result = reader.seekToRecord(key, match, pos);
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
      final boolean found = result.getFirst();
      initialRecord = found ? result.getSecond() : null;
      return found;
@@ -575,7 +641,15 @@
     */
    long getFilePosition() throws ChangelogException
    {
      return reader.getFilePosition();
      logFile.sharedLock.lock();
      try
      {
        return reader.getFilePosition();
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -187,13 +187,13 @@
      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 1, 10 },
      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 2, 10 },
      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 5, 10 },
      { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 10, 10 },
      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, -1, -1 },
      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, -1, -1 },
    };
  }
@@ -277,38 +277,6 @@
    }
  }
  @Test
  public void testTwoConcurrentWrite() throws Exception
  {
    try (Log<String, String> writeLog1 = openLog(LogFileTest.RECORD_PARSER);
        Log<String, String> writeLog2 = openLog(LogFileTest.RECORD_PARSER))
    {
      writeLog1.append(Record.from("key020", "starting record"));
      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<>();
      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
      Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
      write1.run();
      write2.run();
      write1.join();
      write2.join();
      if (exceptionRef.get() != null)
      {
        throw exceptionRef.get();
      }
      writeLog1.syncToFileSystem();
      try (DBCursor<Record<String, String>> cursor = writeLog1.getCursor("key020"))
      {
        for (int i = 1; i <= 61; i++)
        {
          assertThat(cursor.next()).isTrue();
        }
        assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
      }
    }
  }
  /**
   *  This test should be disabled.
   *  Enable it locally when you need to have an rough idea of write performance.