mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
11.53.2015 b877a7554a1fa1c47a2982541972efe780dfad9a
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -39,7 +39,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -53,6 +52,7 @@
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.time.TimeService;
import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -166,7 +166,7 @@
   * The list of non-empty cursors opened on this log. Opened cursors may have
   * to be updated when rotating the head log file.
   */
  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
  private final List<AbortableLogCursor<K, V>> openCursors = new CopyOnWriteArrayList<AbortableLogCursor<K, V>>();
  /**
   * A log file can be rotated once it has exceeded this size limit. The log file can have
@@ -246,6 +246,17 @@
    return log;
  }
  /**
   * Returns an empty cursor.
   *
   * @param <K> the type of keys.
   * @param <V> the type of values.
   * @return an empty cursor
   */
  static <K extends Comparable<K>, V> RepositionableCursor<K, V> getEmptyCursor() {
    return new Log.EmptyCursor<K, V>();
  }
  /** Holds the parameters for log files rotation. */
  static class LogRotationParameters {
@@ -331,6 +342,7 @@
    this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
    this.rotationIntervalInMillis = rotationParams.rotationInterval;
    this.lastRotationTime = rotationParams.lastRotationTime;
    this.referenceCount = 1;
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -435,7 +447,7 @@
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (mustRotate(headLogFile))
      {
        logger.debug(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        rotateHeadLogFile();
        headLogFile = getHeadLogFile();
@@ -459,13 +471,20 @@
    if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
    {
      // rotate because file size exceeded threshold
      logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes());
      return true;
    }
    if (rotationIntervalInMillis > 0)
    {
      // rotate if time limit is reached
      final long timeElapsed = timeService.since(lastRotationTime);
      return timeElapsed > rotationIntervalInMillis;
      boolean shouldRotate = timeElapsed > rotationIntervalInMillis;
      if (shouldRotate)
      {
        logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s",
            logPath.getPath(), timeElapsed, rotationIntervalInMillis);
      }
      return shouldRotate;
    }
    return false;
  }
@@ -512,15 +531,15 @@
   */
  public RepositionableCursor<K, V> getCursor() throws ChangelogException
  {
    LogCursor<K, V> cursor = null;
    AbortableLogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
        return new EmptyCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
      cursor.positionTo(null, null, null);
      registerCursor(cursor);
      return cursor;
@@ -575,15 +594,15 @@
    {
      return getCursor();
    }
    LogCursor<K, V> cursor = null;
    AbortableLogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
        return new EmptyCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
      final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
      // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
      if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
@@ -594,7 +613,7 @@
      else
      {
        StaticUtils.close(cursor);
        return new EmptyLogCursor<K, V>();
        return new EmptyCursor<K, V>();
      }
    }
    catch (ChangelogException e)
@@ -701,12 +720,13 @@
        return null;
      }
      final List<String> undeletableFiles = new ArrayList<String>();
      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
      final Iterator<LogFile<K, V>> entriesToPurge = logFilesToPurge.values().iterator();
      while (entriesToPurge.hasNext())
      {
        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
        final LogFile<K, V> logFile = entriesToPurge.next();
        try
        {
          abortCursorsOpenOnLogFile(logFile);
          logFile.close();
          logFile.delete();
          entriesToPurge.remove();
@@ -733,7 +753,23 @@
  }
  /**
   * Abort all cursors opened on the provided log file.
   */
  private void abortCursorsOpenOnLogFile(LogFile<K, V> logFile)
  {
    for (AbortableLogCursor<K, V> cursor : openCursors)
    {
      if (cursor.isAccessingLogFile(logFile))
      {
        cursor.abort();
      }
    }
  }
  /**
   * Empties the log, discarding all records it contains.
   * <p>
   * All cursors open on the log are aborted.
   *
   * @throws ChangelogException
   *           If cursors are opened on this log, or if a problem occurs during
@@ -750,9 +786,10 @@
      }
      if (!openCursors.isEmpty())
      {
        // Allow opened cursors at this point, but turn them into empty cursors.
        // This behavior is needed by the change number indexer thread.
        switchCursorsOpenedIntoEmptyCursors();
        // All open cursors are aborted, which means the change number indexer thread
        // should manage AbortedChangelogCursorException specifically to avoid being
        // stopped
        abortAllOpenCursors();
      }
      // delete all log files
@@ -806,7 +843,6 @@
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
@@ -856,19 +892,19 @@
    sharedLock.lock();
    try
    {
      K key = null;
      for (LogFile<K, V> logFile : logFiles.values())
    K key = null;
    for (LogFile<K, V> logFile : logFiles.values())
    {
      final Record<K, V> record = logFile.getOldestRecord();
      final V2 oldestValue = mapper.map(record.getValue());
      if (oldestValue.compareTo(limitValue) > 0)
      {
        final Record<K, V> record = logFile.getOldestRecord();
        final V2 oldestValue = mapper.map(record.getValue());
        if (oldestValue.compareTo(limitValue) > 0)
        {
          return key;
        }
        key = record.getKey();
        return key;
      }
      return key;
      key = record.getKey();
    }
    return key;
  }
    finally
    {
      sharedLock.unlock();
@@ -918,7 +954,7 @@
  private void rotateHeadLogFile() throws ChangelogException
  {
    // Temporarily disable cursors opened on head, saving their state
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
    final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
    final LogFile<K, V> headLogFile = getHeadLogFile();
    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
@@ -993,31 +1029,30 @@
  }
  /** Update the cursors that were pointing to head after a rotation of the head log file. */
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
      throws ChangelogException
  {
    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
    for (Pair<AbortableLogCursor<K, V>, CursorState<K, V>> pair : cursors)
    {
      final CursorState<K, V> cursorState = pair.getSecond();
      // Need to update the cursor only if it is pointing to the head log file
      if (isHeadLogFile(cursorState.logFile))
      if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
      {
        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
        final LogFile<K, V> logFile = findLogFileFor(previousKey);
        final LogCursor<K, V> cursor = pair.getFirst();
        final AbortableLogCursor<K, V> cursor = pair.getFirst();
        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
      }
    }
  }
  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
  private void abortAllOpenCursors() throws ChangelogException
  {
    for (LogCursor<K, V> cursor : openCursors)
    for (AbortableLogCursor<K, V> cursor : openCursors)
    {
      cursor.actAsEmptyCursor();
      cursor.abort();
    }
    openCursors.clear();
  }
  /**
@@ -1028,13 +1063,14 @@
   * @throws ChangelogException
   *           If an error occurs.
   */
  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
  private List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead()
      throws ChangelogException
  {
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
    for (LogCursor<K, V> cursor : openCursors)
    final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> openCursorsStates = new ArrayList<>();
    final LogFile<K, V> headLogFile = getHeadLogFile();
    for (AbortableLogCursor<K, V> cursor : openCursors)
    {
      if (isHeadLogFile(cursor.currentLogFile))
      if (cursor.isAccessingLogFile(headLogFile))
      {
        openCursorsStates.add(Pair.of(cursor, cursor.getState()));
        cursor.closeUnderlyingCursor();
@@ -1058,7 +1094,7 @@
    logFiles.put(bounds.getSecond(), logFile);
  }
  private void registerCursor(final LogCursor<K, V> cursor)
  private void registerCursor(final AbortableLogCursor<K, V> cursor)
  {
    openCursors.add(cursor);
  }
@@ -1126,20 +1162,38 @@
  }
  /**
   * Implements a cursor on the log.
   * Represents an internal view of a cursor on the log, with extended operations.
   * <p>
   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
   * <p>
   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
   * method.
   * This is an abstract class rather than an interface to allow reduced visibility of the methods.
   */
  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  private abstract static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
    /** Closes the underlying cursor. */
    abstract void closeUnderlyingCursor();
    /** Returns the state of this cursor. */
    abstract CursorState<K, V> getState() throws ChangelogException;
    /** Reinitialize this cursor to the provided state. */
    abstract void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException;
    /** Returns true if cursor is pointing on provided log file. */
    abstract boolean isAccessingLogFile(LogFile<K, V> logFile);
  }
  /**
   * Implements an internal cursor on the log.
   * <p>
   * This cursor is intended to be used <b>only<b> inside an {@link AbortableLogCursor},
   * because it is relying on AbortableLogCursor for locking.
   */
  private static class InternalLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
  {
    private final Log<K, V> log;
    private LogFile<K, V> currentLogFile;
    private LogFileCursor<K, V> currentCursor;
    private boolean actAsEmptyCursor;
    /**
     * Creates a cursor on the provided log.
@@ -1149,66 +1203,40 @@
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    private LogCursor(final Log<K, V> log) throws ChangelogException
    private InternalLogCursor(final Log<K, V> log) throws ChangelogException
    {
      this.log = log;
      this.actAsEmptyCursor = false;
    }
    /** {@inheritDoc} */
    @Override
    public Record<K, V> getRecord()
    {
      return currentCursor != null ? currentCursor.getRecord() : null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (actAsEmptyCursor)
      final boolean hasNext = currentCursor.next();
      if (hasNext)
      {
        return false;
        return true;
      }
      log.sharedLock.lock();
      try
      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
      if (logFile != null)
      {
        final boolean hasNext = currentCursor.next();
        if (hasNext)
        {
          return true;
        }
        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return currentCursor.next();
        }
        return false;
        switchToLogFile(logFile);
        return currentCursor.next();
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        StaticUtils.close(currentCursor);
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      StaticUtils.close(currentCursor);
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(
        final K key,
@@ -1216,54 +1244,39 @@
        final PositionStrategy positionStrategy)
            throws ChangelogException
    {
      if (actAsEmptyCursor)
      final LogFile<K, V> logFile = log.findLogFileFor(key);
      if (logFile != currentLogFile)
      {
        return false;
        switchToLogFile(logFile);
      }
      log.sharedLock.lock();
      try
      {
        final LogFile<K, V> logFile = log.findLogFileFor(key);
        if (logFile != currentLogFile)
        {
          switchToLogFile(logFile);
        }
        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
    }
    /** Returns the state of this cursor. */
    private CursorState<K, V> getState() throws ChangelogException
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return !actAsEmptyCursor ?
          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
    }
    private void closeUnderlyingCursor()
    @Override
    void closeUnderlyingCursor()
    {
      StaticUtils.close(currentCursor);
    }
    /** Reinitialize this cursor to the provided state. */
    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    @Override
    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      if (!actAsEmptyCursor)
      {
        currentLogFile = cursorState.logFile;
        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
      }
      currentLogFile = cursorState.logFile;
      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
    }
    /** Turn this cursor into an empty cursor, with no actual resource used. */
    private void actAsEmptyCursor()
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
      currentLogFile = null;
      currentCursor = null;
      actAsEmptyCursor = true;
      return currentLogFile != null && currentLogFile.equals(logFile);
    }
    /** Switch the cursor to the provided log file. */
@@ -1274,53 +1287,281 @@
      currentCursor = currentLogFile.getCursor();
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return actAsEmptyCursor ?
          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
      return  String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
              log.logPath, currentLogFile.getFile().getName(), currentCursor);
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  /**
   * An empty cursor, that always return null records and false to {@link #next()} method.
   * <p>
   * This class is thread-safe.
   */
  private static final class EmptyCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next()
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "EmptyLogCursor";
      return getClass().getSimpleName();
    }
  }
  /**
   * An aborted cursor, that throws AbortedChangelogCursorException on methods that can
   * throw a ChangelogException and returns a default value on other methods.
   * <p>
   * Although this cursor is thread-safe, it is intended to be used inside an
   * AbortableLogCursor which manages locking.
   */
  private static final class AbortedLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
  {
    /** Records the path of the log the aborted cursor was positioned on. */
    private final File logPath;
    AbortedLogCursor(File logPath)
    {
      this.logPath = logPath;
    }
    @Override
    public Record<K,V> getRecord()
    {
      throw new IllegalStateException("this cursor is aborted");
    }
    @Override
    public boolean next() throws ChangelogException
    {
      throw abortedCursorException();
    }
    private AbortedChangelogCursorException abortedCursorException()
    {
      return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath));
    }
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
    {
      throw abortedCursorException();
    }
    @Override
    public void close()
    {
      // nothing to do
    }
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      throw abortedCursorException();
    }
    @Override
    void closeUnderlyingCursor()
    {
      // nothing to do
    }
    @Override
    void reinitializeTo(CursorState<K, V> cursorState) throws ChangelogException
    {
      throw abortedCursorException();
    }
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
      return false;
    }
    @Override
    public String toString()
    {
      return getClass().getSimpleName();
    }
  }
  /**
   * A cursor on the log that can be aborted.
   * <p>
   * The cursor uses the log sharedLock to ensure no read can occur during a
   * rotation, a clear or a purge.
   * <p>
   * Note that only public methods use the sharedLock. Protected methods are intended to be used only
   * internally in the Log class when the log exclusiveLock is on.
   * <p>
   * The cursor can be be aborted by calling the {@link #abort()} method.
   */
  private static class AbortableLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
  {
    /** The log on which this cursor is created. */
    private final Log<K, V> log;
    /** The actual cursor on which methods are delegated. */
    private LogCursor<K, V> delegate;
    /** Indicates if the cursor must be aborted. */
    private boolean mustAbort;
    private AbortableLogCursor(Log<K,V> log, LogCursor<K, V> delegate)
    {
      this.log = log;
      this.delegate = delegate;
    }
    @Override
    public Record<K, V> getRecord()
    {
      log.sharedLock.lock();
      try
      {
        return delegate.getRecord();
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
    public boolean next() throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        if (mustAbort)
        {
          delegate.close();
          delegate = new AbortedLogCursor<K, V>(log.getPath());
          mustAbort = false;
        }
        return delegate.next();
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        delegate.close();
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
        throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        return delegate.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /**
     * Aborts this cursor. Once aborted, a cursor throws an
     * AbortedChangelogCursorException if it is used.
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    void abort()
    {
      mustAbort = true;
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return delegate.getState();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    void closeUnderlyingCursor()
    {
      delegate.closeUnderlyingCursor();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      delegate.reinitializeTo(cursorState);
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
      return delegate.isAccessingLogFile(logFile);
    }
    @Override
    public String toString()
    {
      return delegate.toString();
    }
  }
@@ -1345,11 +1586,33 @@
    /** The record the cursor is pointing to. */
    private final Record<K,V> record;
    private final boolean isValid;
    /** Creates a non-valid state. */
    private CursorState() {
      logFile = null;
      filePosition = 0;
      record = null;
      isValid = false;
    }
    /** Creates a valid state. */
    private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
    {
      this.logFile = logFile;
      this.filePosition = filePosition;
      this.record = record;
      isValid = true;
    }
    /**
     * Indicates if this state is valid, i.e if it has non-null values.
     *
     * @return {@code true iff state is valid}
     */
    public boolean isValid()
    {
      return isValid;
    }
  }