mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
11.53.2015 b877a7554a1fa1c47a2982541972efe780dfad9a
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -239,8 +239,10 @@
   * @param sendToServerId
   *          serverId of the replica where changes will be sent
   * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
   * @throws ChangelogException
   *            If a problem occurs when reading the changelog
   */
  protected UpdateMsg getNextMessage(int sendToServerId)
  protected UpdateMsg getNextMessage(int sendToServerId) throws ChangelogException
  {
    while (activeConsumer)
    {
@@ -371,7 +373,7 @@
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue(int sendToServerId)
  private void fillLateQueue(int sendToServerId) throws ChangelogException
  {
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
@@ -384,10 +386,6 @@
        }
      }
    }
    catch (ChangelogException e)
    {
      logger.traceException(e);
    }
  }
  private boolean isLateQueueBelowThreshold()
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
@@ -925,8 +926,10 @@
   *
   * @return the next update that must be sent to the server managed by this
   *         ServerHandler.
   * @throws ChangelogException
   *            If a problem occurs when reading the changelog
   */
  public UpdateMsg take()
  public UpdateMsg take() throws ChangelogException
  {
    final UpdateMsg msg = getNextMessage(serverId);
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/AbortedChangelogCursorException.java
New file
@@ -0,0 +1,78 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2015 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
import javax.annotation.Generated;
import org.forgerock.i18n.LocalizableMessage;
/**
 * This exception is thrown when a cursor that has been aborted is used.
 * <p>
 * A cursor can be aborted when it is open on a log file that
 * must be purged or cleared.
 */
public class AbortedChangelogCursorException extends ChangelogException
{
  @Generated("Eclipse")
  private static final long serialVersionUID = -2123770048083474999L;
  /**
   * Creates a new exception with the provided message.
   *
   * @param message
   *          The message that explains the problem that occurred.
   */
  public AbortedChangelogCursorException(LocalizableMessage message)
  {
    super(message);
  }
  /**
   * Creates a new exception with the provided cause.
   *
   * @param cause
   *          The underlying cause that triggered this exception.
   */
  public AbortedChangelogCursorException(Throwable cause)
  {
    super(cause);
  }
  /**
   * Creates a new exception with the provided message and cause.
   *
   * @param message
   *          The message that explains the problem that occurred.
   * @param cause
   *          The underlying cause that triggered this exception.
   */
  public AbortedChangelogCursorException(LocalizableMessage message, Throwable cause)
  {
    super(message, cause);
  }
}
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -38,6 +38,7 @@
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -436,12 +437,12 @@
              wait();
            }
            // check whether new changes have been added to the ReplicaDBs
            nextChangeForInsertDBCursor.next();
            moveToNextChange();
            continue;
          }
          else if (msg instanceof ReplicaOfflineMsg)
          {
            nextChangeForInsertDBCursor.next();
            moveToNextChange();
            continue;
          }
@@ -504,6 +505,26 @@
    }
  }
  private void moveToNextChange() throws ChangelogException
  {
    try
    {
      nextChangeForInsertDBCursor.next();
    }
    catch (AbortedChangelogCursorException e) {
      if (domainsToClear.size() == 0)
      {
        // There is no domain to clear, thus it is
        // not expected that a cursor is aborted
        throw e;
      }
      // else assumes the aborted cursor is part of a domain
      // that will be removed on the next iteration
      logger.trace("Cursor was aborted: %s, but continuing because domainsToClear has size %s",
          e, domainsToClear.size());
    }
  }
  /**
   * Notifies the {@link ChangelogBackend} that a new entry has been added.
   *
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -64,6 +64,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
@@ -125,8 +126,9 @@
  private final ReplicationServer replicationServer;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private static final RepositionableCursor<CSN, UpdateMsg> EMPTY_CURSOR = Log.getEmptyCursor();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
      new FileReplicaDBCursor(EMPTY_CURSOR, null, AFTER_MATCHING_KEY);
  /**
   * Creates a new changelog DB.
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -39,7 +39,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -53,6 +52,7 @@
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.time.TimeService;
import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -166,7 +166,7 @@
   * The list of non-empty cursors opened on this log. Opened cursors may have
   * to be updated when rotating the head log file.
   */
  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
  private final List<AbortableLogCursor<K, V>> openCursors = new CopyOnWriteArrayList<AbortableLogCursor<K, V>>();
  /**
   * A log file can be rotated once it has exceeded this size limit. The log file can have
@@ -246,6 +246,17 @@
    return log;
  }
  /**
   * Returns an empty cursor.
   *
   * @param <K> the type of keys.
   * @param <V> the type of values.
   * @return an empty cursor
   */
  static <K extends Comparable<K>, V> RepositionableCursor<K, V> getEmptyCursor() {
    return new Log.EmptyCursor<K, V>();
  }
  /** Holds the parameters for log files rotation. */
  static class LogRotationParameters {
@@ -331,6 +342,7 @@
    this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
    this.rotationIntervalInMillis = rotationParams.rotationInterval;
    this.lastRotationTime = rotationParams.lastRotationTime;
    this.referenceCount = 1;
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -435,7 +447,7 @@
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (mustRotate(headLogFile))
      {
        logger.debug(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        rotateHeadLogFile();
        headLogFile = getHeadLogFile();
@@ -459,13 +471,20 @@
    if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
    {
      // rotate because file size exceeded threshold
      logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes());
      return true;
    }
    if (rotationIntervalInMillis > 0)
    {
      // rotate if time limit is reached
      final long timeElapsed = timeService.since(lastRotationTime);
      return timeElapsed > rotationIntervalInMillis;
      boolean shouldRotate = timeElapsed > rotationIntervalInMillis;
      if (shouldRotate)
      {
        logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s",
            logPath.getPath(), timeElapsed, rotationIntervalInMillis);
      }
      return shouldRotate;
    }
    return false;
  }
@@ -512,15 +531,15 @@
   */
  public RepositionableCursor<K, V> getCursor() throws ChangelogException
  {
    LogCursor<K, V> cursor = null;
    AbortableLogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
        return new EmptyCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
      cursor.positionTo(null, null, null);
      registerCursor(cursor);
      return cursor;
@@ -575,15 +594,15 @@
    {
      return getCursor();
    }
    LogCursor<K, V> cursor = null;
    AbortableLogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
        return new EmptyCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor = new AbortableLogCursor<K, V>(this, new InternalLogCursor<K, V>(this));
      final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
      // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
      if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
@@ -594,7 +613,7 @@
      else
      {
        StaticUtils.close(cursor);
        return new EmptyLogCursor<K, V>();
        return new EmptyCursor<K, V>();
      }
    }
    catch (ChangelogException e)
@@ -701,12 +720,13 @@
        return null;
      }
      final List<String> undeletableFiles = new ArrayList<String>();
      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
      final Iterator<LogFile<K, V>> entriesToPurge = logFilesToPurge.values().iterator();
      while (entriesToPurge.hasNext())
      {
        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
        final LogFile<K, V> logFile = entriesToPurge.next();
        try
        {
          abortCursorsOpenOnLogFile(logFile);
          logFile.close();
          logFile.delete();
          entriesToPurge.remove();
@@ -733,7 +753,23 @@
  }
  /**
   * Abort all cursors opened on the provided log file.
   */
  private void abortCursorsOpenOnLogFile(LogFile<K, V> logFile)
  {
    for (AbortableLogCursor<K, V> cursor : openCursors)
    {
      if (cursor.isAccessingLogFile(logFile))
      {
        cursor.abort();
      }
    }
  }
  /**
   * Empties the log, discarding all records it contains.
   * <p>
   * All cursors open on the log are aborted.
   *
   * @throws ChangelogException
   *           If cursors are opened on this log, or if a problem occurs during
@@ -750,9 +786,10 @@
      }
      if (!openCursors.isEmpty())
      {
        // Allow opened cursors at this point, but turn them into empty cursors.
        // This behavior is needed by the change number indexer thread.
        switchCursorsOpenedIntoEmptyCursors();
        // All open cursors are aborted, which means the change number indexer thread
        // should manage AbortedChangelogCursorException specifically to avoid being
        // stopped
        abortAllOpenCursors();
      }
      // delete all log files
@@ -806,7 +843,6 @@
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
@@ -856,19 +892,19 @@
    sharedLock.lock();
    try
    {
      K key = null;
      for (LogFile<K, V> logFile : logFiles.values())
    K key = null;
    for (LogFile<K, V> logFile : logFiles.values())
    {
      final Record<K, V> record = logFile.getOldestRecord();
      final V2 oldestValue = mapper.map(record.getValue());
      if (oldestValue.compareTo(limitValue) > 0)
      {
        final Record<K, V> record = logFile.getOldestRecord();
        final V2 oldestValue = mapper.map(record.getValue());
        if (oldestValue.compareTo(limitValue) > 0)
        {
          return key;
        }
        key = record.getKey();
        return key;
      }
      return key;
      key = record.getKey();
    }
    return key;
  }
    finally
    {
      sharedLock.unlock();
@@ -918,7 +954,7 @@
  private void rotateHeadLogFile() throws ChangelogException
  {
    // Temporarily disable cursors opened on head, saving their state
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
    final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
    final LogFile<K, V> headLogFile = getHeadLogFile();
    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
@@ -993,31 +1029,30 @@
  }
  /** Update the cursors that were pointing to head after a rotation of the head log file. */
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
      throws ChangelogException
  {
    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
    for (Pair<AbortableLogCursor<K, V>, CursorState<K, V>> pair : cursors)
    {
      final CursorState<K, V> cursorState = pair.getSecond();
      // Need to update the cursor only if it is pointing to the head log file
      if (isHeadLogFile(cursorState.logFile))
      if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
      {
        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
        final LogFile<K, V> logFile = findLogFileFor(previousKey);
        final LogCursor<K, V> cursor = pair.getFirst();
        final AbortableLogCursor<K, V> cursor = pair.getFirst();
        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
      }
    }
  }
  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
  private void abortAllOpenCursors() throws ChangelogException
  {
    for (LogCursor<K, V> cursor : openCursors)
    for (AbortableLogCursor<K, V> cursor : openCursors)
    {
      cursor.actAsEmptyCursor();
      cursor.abort();
    }
    openCursors.clear();
  }
  /**
@@ -1028,13 +1063,14 @@
   * @throws ChangelogException
   *           If an error occurs.
   */
  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
  private List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead()
      throws ChangelogException
  {
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
    for (LogCursor<K, V> cursor : openCursors)
    final List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> openCursorsStates = new ArrayList<>();
    final LogFile<K, V> headLogFile = getHeadLogFile();
    for (AbortableLogCursor<K, V> cursor : openCursors)
    {
      if (isHeadLogFile(cursor.currentLogFile))
      if (cursor.isAccessingLogFile(headLogFile))
      {
        openCursorsStates.add(Pair.of(cursor, cursor.getState()));
        cursor.closeUnderlyingCursor();
@@ -1058,7 +1094,7 @@
    logFiles.put(bounds.getSecond(), logFile);
  }
  private void registerCursor(final LogCursor<K, V> cursor)
  private void registerCursor(final AbortableLogCursor<K, V> cursor)
  {
    openCursors.add(cursor);
  }
@@ -1126,20 +1162,38 @@
  }
  /**
   * Implements a cursor on the log.
   * Represents an internal view of a cursor on the log, with extended operations.
   * <p>
   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
   * <p>
   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
   * method.
   * This is an abstract class rather than an interface to allow reduced visibility of the methods.
   */
  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  private abstract static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
    /** Closes the underlying cursor. */
    abstract void closeUnderlyingCursor();
    /** Returns the state of this cursor. */
    abstract CursorState<K, V> getState() throws ChangelogException;
    /** Reinitialize this cursor to the provided state. */
    abstract void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException;
    /** Returns true if cursor is pointing on provided log file. */
    abstract boolean isAccessingLogFile(LogFile<K, V> logFile);
  }
  /**
   * Implements an internal cursor on the log.
   * <p>
   * This cursor is intended to be used <b>only<b> inside an {@link AbortableLogCursor},
   * because it is relying on AbortableLogCursor for locking.
   */
  private static class InternalLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
  {
    private final Log<K, V> log;
    private LogFile<K, V> currentLogFile;
    private LogFileCursor<K, V> currentCursor;
    private boolean actAsEmptyCursor;
    /**
     * Creates a cursor on the provided log.
@@ -1149,66 +1203,40 @@
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    private LogCursor(final Log<K, V> log) throws ChangelogException
    private InternalLogCursor(final Log<K, V> log) throws ChangelogException
    {
      this.log = log;
      this.actAsEmptyCursor = false;
    }
    /** {@inheritDoc} */
    @Override
    public Record<K, V> getRecord()
    {
      return currentCursor != null ? currentCursor.getRecord() : null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (actAsEmptyCursor)
      final boolean hasNext = currentCursor.next();
      if (hasNext)
      {
        return false;
        return true;
      }
      log.sharedLock.lock();
      try
      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
      if (logFile != null)
      {
        final boolean hasNext = currentCursor.next();
        if (hasNext)
        {
          return true;
        }
        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return currentCursor.next();
        }
        return false;
        switchToLogFile(logFile);
        return currentCursor.next();
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        StaticUtils.close(currentCursor);
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      StaticUtils.close(currentCursor);
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(
        final K key,
@@ -1216,54 +1244,39 @@
        final PositionStrategy positionStrategy)
            throws ChangelogException
    {
      if (actAsEmptyCursor)
      final LogFile<K, V> logFile = log.findLogFileFor(key);
      if (logFile != currentLogFile)
      {
        return false;
        switchToLogFile(logFile);
      }
      log.sharedLock.lock();
      try
      {
        final LogFile<K, V> logFile = log.findLogFileFor(key);
        if (logFile != currentLogFile)
        {
          switchToLogFile(logFile);
        }
        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
    }
    /** Returns the state of this cursor. */
    private CursorState<K, V> getState() throws ChangelogException
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return !actAsEmptyCursor ?
          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
    }
    private void closeUnderlyingCursor()
    @Override
    void closeUnderlyingCursor()
    {
      StaticUtils.close(currentCursor);
    }
    /** Reinitialize this cursor to the provided state. */
    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    @Override
    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      if (!actAsEmptyCursor)
      {
        currentLogFile = cursorState.logFile;
        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
      }
      currentLogFile = cursorState.logFile;
      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
    }
    /** Turn this cursor into an empty cursor, with no actual resource used. */
    private void actAsEmptyCursor()
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
      currentLogFile = null;
      currentCursor = null;
      actAsEmptyCursor = true;
      return currentLogFile != null && currentLogFile.equals(logFile);
    }
    /** Switch the cursor to the provided log file. */
@@ -1274,53 +1287,281 @@
      currentCursor = currentLogFile.getCursor();
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return actAsEmptyCursor ?
          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
      return  String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
              log.logPath, currentLogFile.getFile().getName(), currentCursor);
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  /**
   * An empty cursor, that always return null records and false to {@link #next()} method.
   * <p>
   * This class is thread-safe.
   */
  private static final class EmptyCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next()
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "EmptyLogCursor";
      return getClass().getSimpleName();
    }
  }
  /**
   * An aborted cursor, that throws AbortedChangelogCursorException on methods that can
   * throw a ChangelogException and returns a default value on other methods.
   * <p>
   * Although this cursor is thread-safe, it is intended to be used inside an
   * AbortableLogCursor which manages locking.
   */
  private static final class AbortedLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
  {
    /** Records the path of the log the aborted cursor was positioned on. */
    private final File logPath;
    AbortedLogCursor(File logPath)
    {
      this.logPath = logPath;
    }
    @Override
    public Record<K,V> getRecord()
    {
      throw new IllegalStateException("this cursor is aborted");
    }
    @Override
    public boolean next() throws ChangelogException
    {
      throw abortedCursorException();
    }
    private AbortedChangelogCursorException abortedCursorException()
    {
      return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath));
    }
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
    {
      throw abortedCursorException();
    }
    @Override
    public void close()
    {
      // nothing to do
    }
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      throw abortedCursorException();
    }
    @Override
    void closeUnderlyingCursor()
    {
      // nothing to do
    }
    @Override
    void reinitializeTo(CursorState<K, V> cursorState) throws ChangelogException
    {
      throw abortedCursorException();
    }
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
      return false;
    }
    @Override
    public String toString()
    {
      return getClass().getSimpleName();
    }
  }
  /**
   * A cursor on the log that can be aborted.
   * <p>
   * The cursor uses the log sharedLock to ensure no read can occur during a
   * rotation, a clear or a purge.
   * <p>
   * Note that only public methods use the sharedLock. Protected methods are intended to be used only
   * internally in the Log class when the log exclusiveLock is on.
   * <p>
   * The cursor can be be aborted by calling the {@link #abort()} method.
   */
  private static class AbortableLogCursor<K extends Comparable<K>, V> extends LogCursor<K, V>
  {
    /** The log on which this cursor is created. */
    private final Log<K, V> log;
    /** The actual cursor on which methods are delegated. */
    private LogCursor<K, V> delegate;
    /** Indicates if the cursor must be aborted. */
    private boolean mustAbort;
    private AbortableLogCursor(Log<K,V> log, LogCursor<K, V> delegate)
    {
      this.log = log;
      this.delegate = delegate;
    }
    @Override
    public Record<K, V> getRecord()
    {
      log.sharedLock.lock();
      try
      {
        return delegate.getRecord();
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
    public boolean next() throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        if (mustAbort)
        {
          delegate.close();
          delegate = new AbortedLogCursor<K, V>(log.getPath());
          mustAbort = false;
        }
        return delegate.next();
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        delegate.close();
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
        throws ChangelogException
    {
      log.sharedLock.lock();
      try
      {
        return delegate.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /**
     * Aborts this cursor. Once aborted, a cursor throws an
     * AbortedChangelogCursorException if it is used.
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    void abort()
    {
      mustAbort = true;
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    CursorState<K, V> getState() throws ChangelogException
    {
      return delegate.getState();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    void closeUnderlyingCursor()
    {
      delegate.closeUnderlyingCursor();
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      delegate.reinitializeTo(cursorState);
    }
    /**
     * {@inheritDoc}
     * <p>
     * This method is called only when log.exclusiveLock has been acquired.
     */
    @Override
    boolean isAccessingLogFile(LogFile<K, V> logFile)
    {
      return delegate.isAccessingLogFile(logFile);
    }
    @Override
    public String toString()
    {
      return delegate.toString();
    }
  }
@@ -1345,11 +1586,33 @@
    /** The record the cursor is pointing to. */
    private final Record<K,V> record;
    private final boolean isValid;
    /** Creates a non-valid state. */
    private CursorState() {
      logFile = null;
      filePosition = 0;
      record = null;
      isValid = false;
    }
    /** Creates a valid state. */
    private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
    {
      this.logFile = logFile;
      this.filePosition = filePosition;
      this.record = record;
      isValid = true;
    }
    /**
     * Indicates if this state is valid, i.e if it has non-null values.
     *
     * @return {@code true iff state is valid}
     */
    public boolean isValid()
    {
      return isValid;
    }
  }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -500,6 +500,12 @@
    return logfile.equals(other.logfile);
  }
  @Override
  public String toString()
  {
    return "LogFile [logfile=" + logfile + ", isWriteEnabled=" + isWriteEnabled + "]";
  }
  /** Implements a repositionable cursor on the log file. */
  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -631,4 +631,6 @@
ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE_288=Could not create \
 file '%s' to store last log rotation time %d
ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE_289=Could not delete \
 file '%s' that stored the previous last log rotation time
 file '%s' that stored the previous last log rotation time
ERR_CHANGELOG_CURSOR_ABORTED_290=Cursor on log '%s' has been aborted after \
 a purge or a clear
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -36,6 +36,7 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -542,6 +543,56 @@
    }
  }
  /**
   * Similar to testPurge() test but with a concurrent cursor opened before starting the purge.
   * <p>
   * For all keys but "key000" the concurrent cursor should be aborted because the corresponding log file
   * has been purged.
   */
  @Test(dataProvider="purgeKeys")
  public void testPurgeWithConcurrentCursorOpened(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
      int cursorStartIndex, int cursorEndIndex) throws Exception
  {
    DBCursor<Record<String, String>> cursor = null;
    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
        DBCursor<Record<String, String>> concurrentCursor = log.getCursor())
    {
      concurrentCursor.next();
      assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      log.purgeUpTo(purgeKey);
      cursor = log.getCursor();
      assertThat(cursor.next()).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
      // concurrent cursor is expected to be aborted on the next() call for all cases but one
      assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
      if (purgeKey.equals("key000"))
      {
        // in that case no purge has been done, so cursor should not be aborted
        assertThatCursorCanBeFullyRead(concurrentCursor, cursorStartIndex, cursorEndIndex);
      }
      else
      {
        // in other cases cursor should be aborted
        try
        {
          concurrentCursor.next();
          fail("Expected an AbortedChangelogCursorException");
        }
        catch (AbortedChangelogCursorException e) {
          // nothing to do
        }
      }
    }
    finally
    {
      StaticUtils.close(cursor);
    }
  }
  static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
      {
        @Override
@@ -629,6 +680,7 @@
      final AtomicReference<ChangelogException> exceptionRef)
  {
    return new Thread() {
      @Override
      public void run()
      {
        for (int i = 1; i <= 30; i++)