mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
01.45.2015 8fdf1768757fba933e7ce63ac6381eacec41f0c6
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -187,14 +187,12 @@
  private long lastRotationTime;
  /**
   * The exclusive lock used for writes and lifecycle operations on this log:
   * The exclusive lock used for log rotation and lifecycle operations on this log:
   * initialize, clear, sync and close.
   */
  private final Lock exclusiveLock;
  /**
   * The shared lock used for reads and cursor operations on this log.
   */
  /** The shared lock used for write operations and accessing {@link #logFiles} map. */
  private final Lock sharedLock;
  /**
@@ -416,8 +414,10 @@
   * appended and the method returns immediately.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * to file system, it is necessary to explicitly call the
   * {@code syncToFileSystem()} method.
   * <p>
   * This method is not thread-safe.
   *
   * @param record
   *          The record to add.
@@ -426,22 +426,42 @@
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
    // If this exclusive lock happens to be a bottleneck :
    // 1. use a shared lock for appending the record first
    // 2. switch to an exclusive lock only if rotation is needed
    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
    exclusiveLock.lock();
    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
    if (recordIsBreakingKeyOrdering(record))
    {
      logger.info(LocalizableMessage.raw(
              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
              lastAppendedKey != null ? lastAppendedKey : "null"));
      return;
    }
    // Fast-path - assume that no rotation is needed and use shared lock.
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (recordIsBreakingKeyOrdering(record))
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (!mustRotate(headLogFile))
      {
        logger.info(LocalizableMessage.raw(
            "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
            lastAppendedKey != null ? lastAppendedKey : "null"));
        headLogFile.append(record);
        lastAppendedKey = record.getKey();
        return;
      }
    }
    finally
    {
      sharedLock.unlock();
    }
    // Slow-path - rotation is needed so use exclusive lock.
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
@@ -837,9 +857,17 @@
   */
  void dumpAsTextFile(File dumpDirectory) throws ChangelogException
  {
    for (LogFile<K, V> logFile : logFiles.values())
    sharedLock.lock();
    try
    {
      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
      for (LogFile<K, V> logFile : logFiles.values())
      {
        logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
      }
    }
    finally
    {
      sharedLock.unlock();
    }
  }
@@ -892,19 +920,19 @@
    sharedLock.lock();
    try
    {
    K key = null;
    for (LogFile<K, V> logFile : logFiles.values())
    {
      final Record<K, V> record = logFile.getOldestRecord();
      final V2 oldestValue = mapper.map(record.getValue());
      if (oldestValue.compareTo(limitValue) > 0)
      K key = null;
      for (LogFile<K, V> logFile : logFiles.values())
      {
        return key;
        final Record<K, V> record = logFile.getOldestRecord();
        final V2 oldestValue = mapper.map(record.getValue());
        if (oldestValue.compareTo(limitValue) > 0)
        {
          return key;
        }
        key = record.getKey();
      }
      key = record.getKey();
      return key;
    }
    return key;
  }
    finally
    {
      sharedLock.unlock();
@@ -950,6 +978,7 @@
   * <p>
   * All cursors opened on this log are temporarily disabled (closing underlying resources)
   * and then re-open with their previous state.
   * @GuardedBy("exclusiveLock")
   */
  private void rotateHeadLogFile() throws ChangelogException
  {
@@ -1028,7 +1057,10 @@
       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
  }
  /** Update the cursors that were pointing to head after a rotation of the head log file. */
  /**
   * Update the cursors that were pointing to head after a rotation of the head log file.
   * @GuardedBy("exclusiveLock")
   */
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
      throws ChangelogException
  {
@@ -1040,7 +1072,7 @@
      if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
      {
        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
        final LogFile<K, V> logFile = findLogFileFor(previousKey);
        final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
        final AbortableLogCursor<K, V> cursor = pair.getFirst();
        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
      }
@@ -1058,6 +1090,7 @@
  /**
   * Disable the cursors opened on the head log file log, by closing their underlying cursor.
   * Returns the state of each cursor just before the close operation.
   * @GuardedBy("exclusiveLock")
   *
   * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
   * @throws ChangelogException
@@ -1111,12 +1144,20 @@
   */
  private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
  {
    if (isHeadLogFile(currentLogFile))
    sharedLock.lock();
    try
    {
      return null;
      if (isHeadLogFile(currentLogFile))
      {
        return null;
      }
      final Pair<K, K> bounds = getKeyBounds(currentLogFile);
      return logFiles.higherEntry(bounds.getSecond()).getValue();
    }
    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
    return logFiles.higherEntry(bounds.getSecond()).getValue();
    finally
    {
      sharedLock.unlock();
    }
  }
  private boolean isHeadLogFile(final LogFile<K, V> logFile)
@@ -1124,14 +1165,23 @@
    return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
  }
  /** Returns the log file that should contain the provided key. */
  private LogFile<K, V> findLogFileFor(final K key)
  /** @GuardedBy("sharedLock") */
  private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
  {
    if (key == null || logFiles.lowerKey(key) == null)
    {
      return getOldestLogFile();
    }
    return logFiles.ceilingEntry(key).getValue();
    final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue();
    if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
        && candidate.getOldestRecord().getKey().compareTo(key) > 0)
    {
      // This handle the special case where the first key of the candidate is actually greater than the expected one.
      // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
      return logFiles.floorEntry(key).getValue();
    }
    return candidate;
  }
  /**
@@ -1217,18 +1267,28 @@
    @Override
    public boolean next() throws ChangelogException
    {
      final boolean hasNext = currentCursor.next();
      if (hasNext)
      // Lock is needed here to ensure that log rotation is performed atomically.
      // This ensures that currentCursor will not be aborted concurrently.
      log.sharedLock.lock();
      try
      {
        return true;
        final boolean hasNext = currentCursor.next();
        if (hasNext)
        {
          return true;
        }
        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return currentCursor.next();
        }
        return false;
      }
      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
      if (logFile != null)
      finally
      {
        switchToLogFile(logFile);
        return currentCursor.next();
        log.sharedLock.unlock();
      }
      return false;
    }
    @Override
@@ -1244,19 +1304,38 @@
        final PositionStrategy positionStrategy)
            throws ChangelogException
    {
      final LogFile<K, V> logFile = log.findLogFileFor(key);
      if (logFile != currentLogFile)
      // Lock is needed here to ensure that log rotation is performed atomically.
      // This ensures that currentLogFile will not be closed concurrently.
      log.sharedLock.lock();
      try
      {
        switchToLogFile(logFile);
        final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy);
        if (logFile != currentLogFile)
        {
          switchToLogFile(logFile);
        }
        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      }
      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** Returns the state of this cursor. */
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
      // Lock is needed here to ensure that log rotation is performed atomically.
      // This ensures that currentCursor will not be aborted concurrently.
      log.sharedLock.lock();
      try
      {
        return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
@@ -1442,65 +1521,33 @@
    @Override
    public Record<K, V> getRecord()
    {
      log.sharedLock.lock();
      try
      {
        return delegate.getRecord();
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return delegate.getRecord();
    }
    @Override
    public boolean next() throws ChangelogException
    public synchronized boolean next() throws ChangelogException
    {
      log.sharedLock.lock();
      try
      if (mustAbort)
      {
        if (mustAbort)
        {
          delegate.close();
          delegate = new AbortedLogCursor<>(log.getPath());
          mustAbort = false;
        }
        return delegate.next();
        delegate.close();
        delegate = new AbortedLogCursor<>(log.getPath());
        mustAbort = false;
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return delegate.next();
    }
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        delegate.close();
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      delegate.close();
      log.unregisterCursor(this);
    }
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
        throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        return delegate.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return delegate.positionTo(key, matchStrategy, positionStrategy);
    }
    /**
@@ -1509,49 +1556,33 @@
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    void abort()
    synchronized void abort()
    {
      mustAbort = true;
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return delegate.getState();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    void closeUnderlyingCursor()
    {
      delegate.closeUnderlyingCursor();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      delegate.reinitializeTo(cursorState);
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    /** @GuardedBy("exclusiveLock") */
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {