mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
04.19.2014 334014cfe2047b73bb9770035d2ea4e5e089cfdb
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -663,13 +663,9 @@
      }
      if (!openCursors.isEmpty())
      {
        // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
        // there is one open cursor when clearing.
        // Switch to logging until this issue is solved
        // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
        //    logPath.getPath(), openCursors.size()));
        ErrorLogger.logError(
            ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
        // Allow opened cursors at this point, but turn them into empty cursors.
        // This behavior is needed by the change number indexer thread.
        switchCursorsOpenedIntoEmptyCursors();
      }
      // delete all log files
@@ -747,9 +743,18 @@
    return logFiles.firstEntry().getValue();
  }
  /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
  /**
   * Rotate the head log file to a read-only log file, and open a new empty head
   * log file to write in.
   * <p>
   * All cursors opened on this log are temporarily disabled (closing underlying resources)
   * and then re-open with their previous state.
   */
  private void rotateHeadLogFile() throws ChangelogException
  {
    // Temporarily disable cursors opened on head, saving their state
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
    final LogFile<K, V> headLogFile = getHeadLogFile();
    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
    headLogFile.close();
@@ -757,7 +762,9 @@
    openHeadLogFile();
    openReadOnlyLogFile(readOnlyLogFile);
    updateCursorsOpenedOnHead();
    // Re-enable cursors previously opened on head, with the saved state
    updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
  }
  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
@@ -816,31 +823,55 @@
       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
  }
  /** Update the open cursors after a rotation of the head log file. */
  private void updateCursorsOpenedOnHead() throws ChangelogException
  /** Update the cursors that were pointing to head after a rotation of the head log file. */
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
      throws ChangelogException
  {
    for (LogCursor<K, V> cursor : openCursors)
    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
    {
      final CursorState<K, V> state = cursor.getState();
      final CursorState<K, V> cursorState = pair.getSecond();
      // Need to update the cursor only if it is pointing to the head log file
      if (isHeadLogFile(state.logFile))
      if (isHeadLogFile(cursorState.logFile))
      {
        updateOpenCursor(cursor, state);
        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
        final LogFile<K, V> logFile = findLogFileFor(previousKey);
        final LogCursor<K, V> cursor = pair.getFirst();
        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
      }
    }
  }
  /**
   * Update the provided open cursor with the provided state.
   * <p>
   * The cursor must report the previous state on the head log file to the same
   * state (position in file, current record) in the read-only log file just created.
   */
  private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException
  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
  {
    final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
    final LogFile<K, V> logFile = findLogFileFor(previousKey);
    cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record));
    for (LogCursor<K, V> cursor : openCursors)
    {
      cursor.actAsEmptyCursor();
    }
    openCursors.clear();
  }
  /**
   * Disable the cursors opened on the head log file log, by closing their underlying cursor.
   * Returns the state of each cursor just before the close operation.
   *
   * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
   * @throws ChangelogException
   *           If an error occurs.
   */
  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
  {
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
    for (LogCursor<K, V> cursor : openCursors)
    {
      if (isHeadLogFile(cursor.currentLogFile))
      {
        openCursorsStates.add(Pair.of(cursor, cursor.getState()));
        cursor.closeUnderlyingCursor();
      }
    }
    return openCursorsStates;
  }
  private void openHeadLogFile() throws ChangelogException
@@ -930,6 +961,9 @@
   * {@code cursor.next()} method.
   * <p>
   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
   * <p>
   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
   * method.
   */
  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
@@ -937,6 +971,7 @@
    private LogFile<K, V> currentLogFile;
    private LogFileCursor<K, V> currentCursor;
    private boolean actAsEmptyCursor;
    /**
     * Creates a cursor on the provided log.
@@ -949,19 +984,24 @@
    private LogCursor(final Log<K, V> log) throws ChangelogException
    {
      this.log = log;
      this.actAsEmptyCursor = false;
    }
    /** {@inheritDoc} */
    @Override
    public Record<K, V> getRecord()
    {
      return currentCursor.getRecord();
      return currentCursor != null ? currentCursor.getRecord() : null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (actAsEmptyCursor)
      {
        return false;
      }
      log.sharedLock.lock();
      try
      {
@@ -1004,6 +1044,10 @@
    @Override
    public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
    {
      if (actAsEmptyCursor)
      {
        return false;
      }
      log.sharedLock.lock();
      try
      {
@@ -1033,15 +1077,31 @@
    /** Returns the state of this cursor. */
    private CursorState<K, V> getState() throws ChangelogException
    {
      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
      return !actAsEmptyCursor ?
          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
    }
    private void closeUnderlyingCursor()
    {
      StaticUtils.close(currentCursor);
    }
    /** Reinitialize this cursor to the provided state. */
    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      StaticUtils.close(currentCursor);
      currentLogFile = cursorState.logFile;
      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
      if (!actAsEmptyCursor)
      {
        currentLogFile = cursorState.logFile;
        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
      }
    }
    /** Turn this cursor into an empty cursor, with no actual resource used. */
    private void actAsEmptyCursor()
    {
      currentLogFile = null;
      currentCursor = null;
      actAsEmptyCursor = true;
    }
    /** Switch the cursor to the provided log file. */
@@ -1055,8 +1115,10 @@
    /** {@inheritDoc} */
    public String toString()
    {
      return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
          log.logPath, currentLogFile.getFile().getName(), currentCursor);
      return actAsEmptyCursor ?
          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
              log.logPath, currentLogFile.getFile().getName(), currentCursor);
    }
  }