mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
23.39.2014 bef7c528fe33395b154145e033b717210ba63060
OPENDJ-1690 CR-5724 Fix file-based changelog purging

- Log file rotation for CN Index DB is based on both size and time
- Time interval before log file rotation is a fraction of replication purge delay
- Last log file rotation time for CN index DB is stored in a file
10 files modified
622 ■■■■ changed files
opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties 4 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java 18 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 9 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java 174 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java 12 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 200 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java 67 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java 68 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java 67 ●●●● patch | view | raw | blame | history
opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -628,3 +628,7 @@
 searching base DN '%s' with filter '%s' in changelog backend : %s
ERR_CHANGELOG_BACKEND_ATTRIBUTE_287 =An error occurred when \
 retrieving attribute value for attribute '%s' for entry DN '%s' in changelog backend : %s
ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE_288=Could not create \
 file '%s' to store last log rotation time %d
ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE_289=Could not delete \
 file '%s' that stored the previous last log rotation time
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -59,6 +59,16 @@
  /** The parser of records stored in this ChangeNumberIndexDB. */
  static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
  static final Record.Mapper<ChangeNumberIndexRecord, CSN> MAPPER_TO_CSN =
      new Record.Mapper<ChangeNumberIndexRecord, CSN>()
      {
        @Override
        public CSN map(ChangeNumberIndexRecord value)
        {
          return value.getCSN();
        }
      };
  /** The log in which records are persisted. */
  private final Log<Long, ChangeNumberIndexRecord> log;
@@ -231,9 +241,15 @@
    {
      return null;
    }
    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
    // Retrieve the oldest change number that must not be purged
    final Long purgeChangeNumber = log.findBoundaryKeyFromRecord(MAPPER_TO_CSN, purgeCSN);
    if (purgeChangeNumber != null)
    {
      final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeChangeNumber);
    return record != null ? record.getValue().getCSN() : null;
  }
    return null;
  }
  /**
   * Implements the Monitoring capabilities of the FileChangeNumberIndexDB.
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,6 +41,7 @@
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.util.time.TimeService;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.ChangelogBackend;
@@ -296,7 +297,7 @@
    try
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer, TimeService.SYSTEM);
      final ChangelogState changelogState = replicationEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
@@ -574,6 +575,12 @@
  public void setPurgeDelay(final long purgeDelayInMillis)
  {
    this.purgeDelayInMillis = purgeDelayInMillis;
    // Rotation time interval for CN Index DB log file
    // needs to be a fraction of the purge delay
    // to ensure there is at least one file to purge
    replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2);
    if (purgeDelayInMillis > 0)
    {
      final ChangelogDBPurger newPurger = new ChangelogDBPurger();
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -51,6 +51,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.time.TimeService;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -74,9 +75,9 @@
 * the string representation of lowest and highest key present in the log file.</li>
 * </ul>
 * A read-only log file is created each time the head log file has reached the
 * maximum size limit. The head log file is then rotated to the read-only file and a
 * new empty head log file is opened. There is no limit on the number of read-only
 * files, but they can be purged.
 * maximum size limit or the time limit. The head log file is then rotated to the
 * read-only file and a new empty head log file is opened. There is no limit on the
 * number of read-only files, but they can be purged.
 * <p>
 * A log is obtained using the {@code Log.openLog()} method and must always be
 * released using the {@code close()} method.
@@ -170,14 +171,23 @@
  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
  /**
   * A log file is rotated once it has exceeded this size limit. The log file can have
   * A log file can be rotated once it has exceeded this size limit. The log file can have
   * a size much larger than this limit if the last record written has a huge size.
   *
   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
   * TODO : to be replaced later by a list of configurable Rotation policy
   * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
   */
  private final long sizeLimitPerLogFileInBytes;
  /** The time service used for timing. It is package private so it can be modified by test case. */
  TimeService timeService = TimeService.SYSTEM;
  /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
  private long rotationIntervalInMillis;
  /** The last time a log file was rotated. */
  private long lastRotationTime;
  /**
   * The exclusive lock used for writes and lifecycle operations on this log:
   * initialize, clear, sync and close.
@@ -190,6 +200,12 @@
  private final Lock sharedLock;
  /**
   * The replication environment used to create this log. The log is notifying it for any change
   * that must be persisted.
   */
  private final ReplicationEnvironment replicationEnv;
  /**
   * Open a log with the provided log path, record parser and maximum size per
   * log file.
   * <p>
@@ -199,25 +215,28 @@
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param replicationEnv
   *          The replication environment used to create this log.
   * @param logPath
   *          Path of the log.
   * @param parser
   *          Parser for encoding/decoding of records.
   * @param sizeLimitPerFileInBytes
   *          Limit in bytes before rotating the head log file of the log.
   * @param rotationParameters
   *          Parameters for the log files rotation.
   * @return a log
   * @throws ChangelogException
   *           If a problem occurs during initialization.
   */
  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv,
      final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters)
      throws ChangelogException
  {
    Reject.ifNull(logPath, parser);
    @SuppressWarnings("unchecked")
    Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
    if (log == null)
    {
      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
      log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters);
      logsCache.put(logPath, log);
    }
    else
@@ -229,6 +248,43 @@
    return log;
  }
  /** Holds the parameters for log files rotation. */
  static class LogRotationParameters {
    private final long sizeLimitPerFileInBytes;
    private final long rotationInterval;
    private final long lastRotationTime;
    /**
     * Creates rotation parameters.
     *
     * @param sizeLimitPerFileInBytes
     *           Size limit before rotating a log file.
     * @param rotationInterval
     *           Time interval before rotating a log file.
     * @param lastRotationTime
     *           Last time a log file was rotated.
     */
    LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
    {
      this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
      this.rotationInterval = rotationInterval;
      this.lastRotationTime = lastRotationTime;
    }
  }
  /**
   * Set the time interval for rotation of log file.
   *
   * @param rotationIntervalInMillis
   *           time interval before rotation of log file
   */
  void setRotationInterval(long rotationIntervalInMillis)
  {
    this.rotationIntervalInMillis = rotationIntervalInMillis;
  }
  /**
   * Release a reference to the log corresponding to provided path. The log is
   * closed if this is the last reference.
@@ -256,21 +312,27 @@
  /**
   * Creates a new log.
   *
   * @param replicationEnv
   *            The replication environment used to create this log.
   * @param logPath
   *            The directory path of the log.
   * @param parser
   *          Parser of records.
   * @param sizeLimitPerFile
   *            Limit in bytes before rotating a log file.
   * @param rotationParams
   *          Parameters for log-file rotation.
   *
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
      throws ChangelogException
  private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser,
      final LogRotationParameters rotationParams) throws ChangelogException
  {
    this.replicationEnv = replicationEnv;
    this.logPath = logPath;
    this.recordParser = parser;
    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
    this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
    this.rotationIntervalInMillis = rotationParams.rotationInterval;
    this.lastRotationTime = rotationParams.lastRotationTime;
    this.referenceCount = 1;
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -376,9 +438,9 @@
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
      if (mustRotate(headLogFile))
      {
        logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        rotateHeadLogFile();
        headLogFile = getHeadLogFile();
@@ -392,6 +454,27 @@
    }
  }
  private boolean mustRotate(LogFile<K, V> headLogFile)
  {
    if (lastAppendedKey == null)
    {
      // never rotate an empty file
      return false;
    }
    if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
    {
      // rotate because file size exceeded threshold
      return true;
    }
    if (rotationIntervalInMillis > 0)
    {
      // rotate if time limit is reached
      final long timeElapsed = timeService.since(lastRotationTime);
      return timeElapsed > rotationIntervalInMillis;
    }
    return false;
  }
  /**
   * Indicates if the provided record has a key that would break the key
   * ordering in the log.
@@ -711,6 +794,60 @@
    releaseLog(logPath);
  }
  /**
   * Find the highest key that corresponds to a record that is the oldest (or
   * first) of a read-only log file and where value mapped from the record is
   * lower or equals to provided limit value.
   * <p>
   * Example<br>
   * Given a log with 3 log files, with Record<Int, String> and Mapper<String,
   * Long> mapping a string to its long value
   * <ul>
   * <li>1_10.log where oldest record is (key=1, value="50")</li>
   * <li>11_20.log where oldest record is (key=11, value="150")</li>
   * <li>head.log where oldest record is (key=25, value="250")</li>
   * </ul>
   * Then
   * <ul>
   * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li>
   * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li>
   * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li>
   * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li>
   * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li>
   * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li>
   * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li>
   * </ul>
   *
   * @param <V2>
   *          Type of the value extracted from the record
   * @param mapper
   *          The mapper to extract a value from a record. It is expected that
   *          extracted values are ordered according to an order consistent with
   *          this log ordering, i.e. for two records, if key(R1) > key(R2) then
   *          extractedValue(R1) > extractedValue(R2).
   * @param limitValue
   *          The limit value to search for
   * @return the key or {@code null} if no key corresponds
   * @throws ChangelogException
   *           If a problem occurs
   */
  <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue)
      throws ChangelogException
  {
    K key = null;
    for (LogFile<K, V> logFile : logFiles.values())
    {
      final Record<K, V> record = logFile.getOldestRecord();
      final V2 oldestValue = mapper.map(record.getValue());
      if (oldestValue.compareTo(limitValue) > 0)
      {
        return key;
      }
      key = record.getKey();
    }
    return key;
  }
  /** Effectively close this log. */
  private void doClose()
  {
@@ -766,6 +903,9 @@
    // Re-enable cursors previously opened on head, with the saved state
    updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
    // Notify even if time-based rotation is not enabled, as it could be enabled at any time
    replicationEnv.notifyLogFileRotation(this);
  }
  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
@@ -35,6 +35,18 @@
 */
class Record<K, V>
{
  /** Map the record value to another value. */
  static interface Mapper<V, V2> {
      /**
       * Map a record value to another value.
       *
       * @param value
       *          The value to map
       * @return the new value
       */
      V2 map(V value);
  }
  private final K key;
  private final V value;
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -46,13 +46,16 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.time.TimeService;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
@@ -67,7 +70,8 @@
 * created :
 * <ul>
 * <li>A "changenumberindex" directory containing the log files for
 * ChangeNumberIndexDB</li>
 * ChangeNumberIndexDB, and a file named "rotationtime[millis].last" where [millis] is
 * the time of the last log file rotation in milliseconds</li>
 * <li>A "domains.state" file containing a mapping of each domain DN to an id. The
 * id is used to name the corresponding domain directory.</li>
 * <li>One directory per domain, named after "[id].domain" where [id] is the id
@@ -76,7 +80,7 @@
 * <p>
 * Each domain directory contains the following directories and files :
 * <ul>
 * <li>A "generation_[id].id" file, where [id] is the generation id</li>
 * <li>A "generation[id].id" file, where [id] is the generation id</li>
 * <li>One directory per server id, named after "[id].server" where [id] is the
 * id of the server.</li>
 * </ul>
@@ -100,6 +104,7 @@
 * |   \---changenumberindex
 * |      \--- head.log [contains last records written]
 * |      \--- 1_50.log [contains records with keys in interval [1, 50]]
 * |      \--- rtime198745512.last
 * |   \---1.domain
 * |       \---generation1.id
 * |       \---22.server
@@ -119,7 +124,9 @@
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024;
  private static final long CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 1024 * 1024;
  private static final long REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 10 * CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES;
  private static final int NO_GENERATION_ID = -1;
@@ -129,9 +136,13 @@
  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
  static final String LAST_ROTATION_TIME_FILE_PREFIX = "rotationtime";
  static final String LAST_ROTATION_TIME_FILE_SUFFIX = ".ms";
  private static final String DOMAIN_STATE_SEPARATOR = ":";
  private static final String DOMAIN_SUFFIX = ".domain";
  private static final String DOMAIN_SUFFIX = ".dom";
  private static final String SERVER_ID_SUFFIX = ".server";
@@ -170,6 +181,17 @@
    }
  };
  private static final FileFilter LAST_ROTATION_TIME_FILE_FILTER = new FileFilter()
  {
    @Override
    public boolean accept(File file)
    {
      return file.isFile()
          && file.getName().startsWith(LAST_ROTATION_TIME_FILE_PREFIX)
          && file.getName().endsWith(LAST_ROTATION_TIME_FILE_SUFFIX);
    }
  };
  /** Root path where the replication log is stored. */
  private final String replicationRootPath;
  /**
@@ -181,8 +203,16 @@
   */
  private final ChangelogState changelogState;
  /** The list of logs that are in use. */
  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
  /** The list of logs that are in use for Replica DBs. */
  private final List<Log<CSN, UpdateMsg>> logsReplicaDB = new CopyOnWriteArrayList<Log<CSN, UpdateMsg>>();
  /**
   * The list of logs that are in use for the CN Index DB.
   * There is a single CN Index DB for a ReplicationServer, but there can be multiple references opened on it.
   * This is the responsability of Log class to handle properly these multiple references.
   */
  private List<Log<Long, ChangeNumberIndexRecord>> logsCNIndexDB =
      new CopyOnWriteArrayList<Log<Long, ChangeNumberIndexRecord>>();;
  /**
   * Maps each domain DN to a domain id that is used to name directory in file system.
@@ -205,6 +235,22 @@
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  /** The time service used for timing. */
  private final TimeService timeService;
  /**
   * For CN Index DB, a log file can be rotated once it has exceeded a given time interval.
   * <p>
   * It is disabled if the interval is equals to zero.
   * The interval can be modified at any time.
   */
  private long cnIndexDBRotationInterval;
  /**
   * For CN Index DB, the last time a log file was rotated.
   * It is persisted to file each time it changes and read at server start. */
  private long cnIndexDBLastRotationTime;
  /**
   * Creates the replication environment.
   *
@@ -212,15 +258,34 @@
   *          Root path where replication log is stored.
   * @param replicationServer
   *          The underlying replication server.
   * @param timeService
   *          Time service to use for timing.
   * @throws ChangelogException
   *           If an error occurs during initialization.
   */
  ReplicationEnvironment(final String rootPath,
      final ReplicationServer replicationServer) throws ChangelogException
      final ReplicationServer replicationServer, final TimeService timeService) throws ChangelogException
  {
    this.replicationRootPath = rootPath;
    this.replicationServer = replicationServer;
    this.timeService = timeService;
    this.changelogState = readOnDiskChangelogState();
    this.cnIndexDBLastRotationTime = readOnDiskLastRotationTime();
  }
  /**
   * Sets the rotation time interval of a log file for the CN Index DB.
   *
   * @param timeInterval
   *          time interval for rotation of a log file.
   */
  void setCNIndexDBRotationInterval(long timeInterval)
  {
    cnIndexDBRotationInterval = timeInterval;
    for (Log<Long, ChangeNumberIndexRecord> log : logsCNIndexDB)
    {
      log.setRotationInterval(cnIndexDBRotationInterval);
    }
  }
  /**
@@ -257,6 +322,16 @@
  }
  /**
   * Return the last rotation time for CN Index DB log files.
   *
   * @return the last rotation time in millis
   */
  long getCnIndexDBLastRotationTime()
  {
    return cnIndexDBLastRotationTime;
  }
  /**
   * Finds or creates the log used to store changes from the replication server
   * with the given serverId and the given baseDN.
   *
@@ -299,7 +374,8 @@
        ensureGenerationIdFileExists(generationIdPath);
        changelogState.setDomainGenerationId(domainDN, generationId);
        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER,
            new LogRotationParameters(REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES, 0, 0), logsReplicaDB);
      }
    }
    catch (Exception e)
@@ -325,7 +401,9 @@
    final File path = getCNIndexDBPath();
    try
    {
      return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
      final LogRotationParameters rotationParams = new LogRotationParameters(CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES,
          cnIndexDBRotationInterval, cnIndexDBLastRotationTime);
      return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER, rotationParams, logsCNIndexDB);
    }
    catch (Exception e)
    {
@@ -344,7 +422,8 @@
  {
    if (isShuttingDown.compareAndSet(false, true))
    {
      logs.clear();
      logsReplicaDB.clear();
      logsCNIndexDB.clear();
    }
  }
@@ -409,6 +488,25 @@
  }
  /**
   * Notify that log file has been rotated for provided log.
   *
   * The last rotation time is persisted to a file and read at startup time.
   *
   * @param log
   *          the log that has a file rotated.
   * @throws ChangelogException
   *            If a problem occurs
   */
  void notifyLogFileRotation(Log<?, ?> log) throws ChangelogException
  {
    // only CN Index DB log rotation time is persisted
    if (logsCNIndexDB.contains(log))
    {
      updateCNIndexDBLastRotationTime(timeService.now());
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and provided CSN
   * is offline.
   *
@@ -654,16 +752,17 @@
  }
  /** Open a log from the provided path and record parser. */
  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
      throws ChangelogException
  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser,
      LogRotationParameters rotationParams, List<Log<K, V>> logsCache) throws ChangelogException
  {
    checkShutDownBeforeOpening(serverIdPath);
    final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
    final Log<K, V> log = Log.openLog(this, serverIdPath, parser, rotationParams);
    checkShutDownAfterOpening(serverIdPath, log);
    logs.add(log);
    logsCache.add(log);
    return log;
  }
@@ -718,6 +817,46 @@
    return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
  }
  /**
   * Retrieve the last rotation time from the disk.
   *
   * @return the last rotation time in millis (which is the current time if no
   *         rotation file is found or if a problem occurs).
   */
  private long readOnDiskLastRotationTime()
  {
    try
    {
      final File file = retrieveLastRotationTimeFile();
      if (file != null)
      {
        final String filename = file.getName();
        final String value = filename.substring(LAST_ROTATION_TIME_FILE_PREFIX.length(),
            filename.length() - LAST_ROTATION_TIME_FILE_SUFFIX.length());
        return Long.valueOf(value);
      }
    }
    catch (Exception e)
    {
      logger.trace(LocalizableMessage.raw("Error when retrieving last log file rotation time from file"), e);
    }
    // Default to current time
    return timeService.now();
  }
  /**
   * Retrieve the file named after the last rotation time from the provided
   * directory.
   *
   * @return the last rotation time file or {@code null} if the corresponding file
   *         can't be found
   */
  private File retrieveLastRotationTimeFile()
  {
    File[] files = getCNIndexDBPath().listFiles(LAST_ROTATION_TIME_FILE_FILTER);
    return (files != null && files.length > 0) ? files[0] : null;
  }
  private File getDomainPath(final String domainId)
  {
    return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
@@ -748,9 +887,16 @@
    return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
  }
  private File getLastRotationTimePath(long lastRotationTime)
  {
    return new File(getCNIndexDBPath(),
        LAST_ROTATION_TIME_FILE_PREFIX + lastRotationTime + LAST_ROTATION_TIME_FILE_SUFFIX);
  }
  private void closeLog(final Log<?, ?> log)
  {
    logs.remove(log);
    logsReplicaDB.remove(log);
    logsCNIndexDB.remove(log);
    log.close();
  }
@@ -789,6 +935,30 @@
    }
  }
  private void updateCNIndexDBLastRotationTime(final long lastRotationTime) throws ChangelogException {
    final File previousRotationFile = retrieveLastRotationTimeFile();
    final File newRotationFile = getLastRotationTimePath(lastRotationTime);
    try
    {
      newRotationFile.createNewFile();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE.get(
          newRotationFile.getPath(), lastRotationTime), e);
    }
    if (previousRotationFile != null)
    {
      final boolean isDeleted = previousRotationFile.delete();
      if (!isDeleted)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE.get(
            previousRotationFile.getPath()));
      }
    }
    cnIndexDBLastRotationTime = lastRotationTime;
  }
  private void ensureGenerationIdFileExists(final File generationIdPath)
      throws ChangelogException
  {
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -37,7 +37,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -137,23 +136,12 @@
    }
  }
  /**
   * This test makes basic operations of a ChangeNumberIndexDB:
   * <ol>
   * <li>create the db</li>
   * <li>add records</li>
   * <li>read them with a cursor</li>
   * <li>set a very short trim period</li>
   * <li>wait for the db to be trimmed / here since the changes are not stored
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   * This test verifies purge is working by relying on a very short purge delay.
   * The purge can be done only if there is at least one read-only log file, so
   * this test ensures the rotation happens, using the rotation based on time.
   */
  // TODO : this works only if we ensure that there is a rotation of ahead log file
  // at the right place. First two records are 37 and 76 bytes long,
  // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
  // Re-enable this test when max file size is customizable for log
  @Test(enabled=false)
  @Test
  public void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
@@ -163,49 +151,32 @@
      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
      changelogDB.setPurgeDelay(0); // disable purging
      // Prepare data to be stored in the db
      DN baseDN1 = DN.valueOf("o=test1");
      DN baseDN2 = DN.valueOf("o=test2");
      DN baseDN3 = DN.valueOf("o=test3");
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add records
      DN[] baseDNs = { DN.valueOf("o=test1"), DN.valueOf("o=test2"), DN.valueOf("o=test3"), DN.valueOf("o=test4") };
      CSN[] csns = generateCSNs(1, 0, 4);
      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
                 addRecord(cnIndexDB, baseDN2, csns[1]);
      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
      long cn0 = addRecord(cnIndexDB, baseDNs[0], csns[0]);
                 addRecord(cnIndexDB, baseDNs[1], csns[1]);
      long cn2 = addRecord(cnIndexDB, baseDNs[2], csns[2]);
      // The ChangeNumber should not get purged
      final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
      assertEquals(oldestCN, cn1);
      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
      // The CN DB should not be purged at this point
      assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn0);
      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn2);
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
      try
      {
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
        assertFalse(cursor.next());
      }
      finally
      {
        StaticUtils.close(cursor);
      }
      // change the purge delay to a very short time
      changelogDB.setPurgeDelay(5);
      Thread.sleep(50);
      // add a new record to force the rotation of the log
      addRecord(cnIndexDB, baseDNs[3], csns[3]);
      // Now test that purging removes all changes but the last one
      changelogDB.setPurgeDelay(1);
      // Now all changes should have been purged but the last one
      int count = 0;
      while (cnIndexDB.count() > 1 && count < 100)
      {
        Thread.sleep(10);
        count++;
      }
      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
      assertOnlyNewestRecordIsLeft(cnIndexDB, 4);
    }
    finally
    {
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -34,6 +34,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.util.time.TimeService;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -423,7 +424,7 @@
      replicationServer = configureReplicationServer(100000, 10);
      testRoot = createCleanDir();
      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer, TimeService.SYSTEM);
      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      // Populate the db with 'max' msg
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
@@ -39,7 +40,9 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
import org.opends.server.replication.server.changelog.file.Record.Mapper;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -52,6 +55,8 @@
  // Use a directory dedicated to this test class
  private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  private static final long NO_TIME_BASED_LOG_ROTATION = 0;
  @BeforeMethod
  public void initialize() throws Exception
  {
@@ -79,9 +84,12 @@
    // This allow to ensure rotation mechanism is thoroughly tested
    // Some tests rely on having 2 records per log file (especially the purge tests), so take care
    // if this value has to be changed
    int sizeLimitPerFileInBytes = 30;
    final int sizeLimitPerFileInBytes = 30;
    final LogRotationParameters rotationParams = new LogRotationParameters(sizeLimitPerFileInBytes,
        NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION);
    final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
    return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
    return Log.openLog(replicationEnv, LOG_DIRECTORY, parser, rotationParams);
  }
  @Test
@@ -366,8 +374,11 @@
    Log<String, String> writeLog = null;
    try
    {
      long sizeOf1MB = 1024*1024;
      writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
      long sizeOf10MB = 10*1024*1024;
      final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION,
          NO_TIME_BASED_LOG_ROTATION);
      final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
      writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams);
      for (int i = 1; i < 1000000; i++)
      {
@@ -514,12 +525,10 @@
  public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
      int cursorStartIndex, int cursorEndIndex) throws Exception
  {
    Log<String, String> log = null;
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try
    {
      log = openLog(LogFileTest.RECORD_PARSER);
      log.purgeUpTo(purgeKey);
      cursor = log.getCursor();
@@ -533,6 +542,51 @@
    }
  }
  static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
      {
        @Override
        public Integer map(String value)
        {
          // extract numeric value, e.g. from "value10" return 10
          return Integer.valueOf(value.substring("value".length()));
        }
      };
  @DataProvider
  Object[][] findBoundaryKeyData()
  {
    return new Object[][] {
       // limit value, expected key
       { 0, null },
       { 1, "key001" },
       { 2, "key001" },
       { 3, "key003" },
       { 4, "key003" },
       { 5, "key005" },
       { 6, "key005" },
       { 7, "key007" },
       { 8, "key007" },
       { 9, "key009" },
       { 10, "key009" },
       { 11, "key009" },
       { 12, "key009" },
    };
  }
  @Test(dataProvider = "findBoundaryKeyData")
  public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception
  {
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    try
    {
      assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey);
    }
    finally
    {
      StaticUtils.close(log);
    }
  }
  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
      throws Exception
  {
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -31,12 +31,14 @@
import java.util.List;
import org.assertj.core.data.MapEntry;
import org.forgerock.util.time.TimeService;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
@@ -48,6 +50,7 @@
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
@SuppressWarnings("javadoc")
@@ -94,7 +97,7 @@
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
@@ -122,7 +125,7 @@
    {
      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING));
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      for (int i = 0; i <= 2 ; i++)
      {
@@ -163,7 +166,7 @@
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -193,7 +196,7 @@
    {
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -218,7 +221,7 @@
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -249,7 +252,7 @@
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -278,7 +281,7 @@
      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      final DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      cnDB = environment.getOrCreateCNIndexDB();
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
@@ -309,13 +312,13 @@
    {
      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
      DN domainDN = DN.valueOf(DN1_AS_STRING);
      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
      ReplicationEnvironment environment = createReplicationEnv(rootPath);
      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
      // delete the domain directory created for the 2 replica DBs to break the
      // consistency with domain state file
      StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
      StaticUtils.recursiveDelete(new File(rootPath, "1.dom"));
      environment.readOnDiskChangelogState();
    }
@@ -324,4 +327,50 @@
      StaticUtils.close(cnDB, replicaDB, replicaDB2);
    }
  }
  private ReplicationEnvironment createReplicationEnv(File rootPath) throws ChangelogException
  {
    ReplicationServer unusedReplicationServer = null;
    return new ReplicationEnvironment(rootPath.getAbsolutePath(), unusedReplicationServer, TimeService.SYSTEM);
  }
  @Test
  public void testLastRotationTimeRetrievalWithNoRotationFile() throws Exception
  {
    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    TimeService time = mock(TimeService.class);
    when(time.now()).thenReturn(100L);
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
    assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(100L);
  }
  @Test
  public void testLastRotationTimeRetrievalWithRotationFile() throws Exception
  {
    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
    final TimeService time = mock(TimeService.class);
    when(time.now()).thenReturn(100L, 200L);
    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
    Log<Long,ChangeNumberIndexRecord> cnIndexDB = environment.getOrCreateCNIndexDB();
    try {
      environment.notifyLogFileRotation(cnIndexDB);
      // check runtime change of last rotation time is effective
      // this should also persist the time in a file, but this is checked later in the test
      assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L);
    }
    finally
    {
      cnIndexDB.close();
      environment.shutdown();
    }
    // now check last rotation time is correctly read from persisted file when re-creating environment
    when(time.now()).thenReturn(0L);
    environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
    assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L);
  }
}