From bef7c528fe33395b154145e033b717210ba63060 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 23 Dec 2014 11:39:41 +0000
Subject: [PATCH] OPENDJ-1690 CR-5724 Fix file-based changelog purging

---
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java |   67 +----
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java  |   67 +++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java                                              |   12 +
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java                              |  200 ++++++++++++++++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java                             |   20 +
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties                                                                            |    8 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java                                                 |  174 ++++++++++++++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                                     |    9 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java           |    3 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java                     |   68 +++++
 10 files changed, 526 insertions(+), 102 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 97fa2f5..28a7c0d 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -626,5 +626,9 @@
  perform a search request on cn=changelog
 ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \
  searching base DN '%s' with filter '%s' in changelog backend : %s
-ERR_CHANGELOG_BACKEND_ATTRIBUTE_287 =An error occurred when \
- retrieving attribute value for attribute '%s' for entry DN '%s' in changelog backend : %s
\ No newline at end of file
+ERR_CHANGELOG_BACKEND_ATTRIBUTE_287=An error occurred when \
+ retrieving attribute value for attribute '%s' for entry DN '%s' in changelog backend : %s
+ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE_288=Could not create \
+ file '%s' to store last log rotation time %d
+ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE_289=Could not delete \
+ file '%s' that stored the previous last log rotation time
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 84b1d98..aa5543b 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -59,6 +59,16 @@
   /** The parser of records stored in this ChangeNumberIndexDB. */
   static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
 
+  static final Record.Mapper<ChangeNumberIndexRecord, CSN> MAPPER_TO_CSN =
+      new Record.Mapper<ChangeNumberIndexRecord, CSN>()
+      {
+        @Override
+        public CSN map(ChangeNumberIndexRecord value)
+        {
+          return value.getCSN();
+        }
+      };
+
   /** The log in which records are persisted. */
   private final Log<Long, ChangeNumberIndexRecord> log;
 
@@ -231,8 +241,14 @@
     {
       return null;
     }
-    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
-    return record != null ? record.getValue().getCSN() : null;
+    // Retrieve the oldest change number that must not be purged
+    final Long purgeChangeNumber = log.findBoundaryKeyFromRecord(MAPPER_TO_CSN, purgeCSN);
+    if (purgeChangeNumber != null)
+    {
+      final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeChangeNumber);
+      return record != null ? record.getValue().getCSN() : null;
+    }
+    return null;
   }
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 9544a75..41970d4 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,6 +41,7 @@
 import org.forgerock.i18n.LocalizableMessageBuilder;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.util.time.TimeService;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.backends.ChangelogBackend;
@@ -296,7 +297,7 @@
     try
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
-      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
+      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer, TimeService.SYSTEM);
       final ChangelogState changelogState = replicationEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
@@ -574,6 +575,12 @@
   public void setPurgeDelay(final long purgeDelayInMillis)
   {
     this.purgeDelayInMillis = purgeDelayInMillis;
+
+    // Rotation time interval for CN Index DB log file
+    // needs to be a fraction of the purge delay
+    // to ensure there is at least one file to purge
+    replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2);
+
     if (purgeDelayInMillis > 0)
     {
       final ChangelogDBPurger newPurger = new ChangelogDBPurger();
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 3ffc90f..693cab2 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -51,6 +51,7 @@
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.util.Reject;
 import org.forgerock.util.Utils;
+import org.forgerock.util.time.TimeService;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -74,9 +75,9 @@
  * the string representation of lowest and highest key present in the log file.</li>
  * </ul>
  * A read-only log file is created each time the head log file has reached the
- * maximum size limit. The head log file is then rotated to the read-only file and a
- * new empty head log file is opened. There is no limit on the number of read-only
- * files, but they can be purged.
+ * maximum size limit or the time limit. The head log file is then rotated to the
+ * read-only file and a new empty head log file is opened. There is no limit on the
+ * number of read-only files, but they can be purged.
  * <p>
  * A log is obtained using the {@code Log.openLog()} method and must always be
  * released using the {@code close()} method.
@@ -170,14 +171,23 @@
   private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
 
   /**
-   * A log file is rotated once it has exceeded this size limit. The log file can have
+   * A log file can be rotated once it has exceeded this size limit. The log file can have
    * a size much larger than this limit if the last record written has a huge size.
    *
-   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+   * TODO : to be replaced later by a list of configurable Rotation policy
    * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
    */
   private final long sizeLimitPerLogFileInBytes;
 
+  /** The time service used for timing. It is package private so it can be modified by test case. */
+  TimeService timeService = TimeService.SYSTEM;
+
+  /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
+  private long rotationIntervalInMillis;
+
+  /** The last time a log file was rotated. */
+  private long lastRotationTime;
+
   /**
    * The exclusive lock used for writes and lifecycle operations on this log:
    * initialize, clear, sync and close.
@@ -190,6 +200,12 @@
   private final Lock sharedLock;
 
   /**
+   * The replication environment used to create this log. The log is notifying it for any change
+   * that must be persisted.
+   */
+  private final ReplicationEnvironment replicationEnv;
+
+  /**
    * Open a log with the provided log path, record parser and maximum size per
    * log file.
    * <p>
@@ -199,25 +215,28 @@
    *          Type of the key of a record, which must be comparable.
    * @param <V>
    *          Type of the value of a record.
+   * @param replicationEnv
+   *          The replication environment used to create this log.
    * @param logPath
    *          Path of the log.
    * @param parser
    *          Parser for encoding/decoding of records.
-   * @param sizeLimitPerFileInBytes
-   *          Limit in bytes before rotating the head log file of the log.
+   * @param rotationParameters
+   *          Parameters for the log files rotation.
    * @return a log
    * @throws ChangelogException
    *           If a problem occurs during initialization.
    */
-  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
-      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv,
+      final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters)
+      throws ChangelogException
   {
     Reject.ifNull(logPath, parser);
     @SuppressWarnings("unchecked")
     Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
     if (log == null)
     {
-      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+      log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters);
       logsCache.put(logPath, log);
     }
     else
@@ -229,6 +248,43 @@
     return log;
   }
 
+  /** Holds the parameters for log files rotation. */
+  static class LogRotationParameters {
+
+    private final long sizeLimitPerFileInBytes;
+    private final long rotationInterval;
+    private final long lastRotationTime;
+
+    /**
+     * Creates rotation parameters.
+     *
+     * @param sizeLimitPerFileInBytes
+     *           Size limit before rotating a log file.
+     * @param rotationInterval
+     *           Time interval before rotating a log file.
+     * @param lastRotationTime
+     *           Last time a log file was rotated.
+     */
+    LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
+    {
+      this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
+      this.rotationInterval = rotationInterval;
+      this.lastRotationTime = lastRotationTime;
+    }
+
+  }
+
+  /**
+   * Set the time interval for rotation of log file.
+   *
+   * @param rotationIntervalInMillis
+   *           time interval before rotation of log file
+   */
+  void setRotationInterval(long rotationIntervalInMillis)
+  {
+    this.rotationIntervalInMillis = rotationIntervalInMillis;
+  }
+
   /**
    * Release a reference to the log corresponding to provided path. The log is
    * closed if this is the last reference.
@@ -256,21 +312,27 @@
   /**
    * Creates a new log.
    *
+   * @param replicationEnv
+   *            The replication environment used to create this log.
    * @param logPath
    *            The directory path of the log.
    * @param parser
    *          Parser of records.
-   * @param sizeLimitPerFile
-   *            Limit in bytes before rotating a log file.
+   * @param rotationParams
+   *          Parameters for log-file rotation.
+   *
    * @throws ChangelogException
    *            If a problem occurs during initialization.
    */
-  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
-      throws ChangelogException
+  private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser,
+      final LogRotationParameters rotationParams) throws ChangelogException
   {
+    this.replicationEnv = replicationEnv;
     this.logPath = logPath;
     this.recordParser = parser;
-    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+    this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
+    this.rotationIntervalInMillis = rotationParams.rotationInterval;
+    this.lastRotationTime = rotationParams.lastRotationTime;
     this.referenceCount = 1;
 
     final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -376,9 +438,9 @@
         return;
       }
       LogFile<K, V> headLogFile = getHeadLogFile();
-      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+      if (mustRotate(headLogFile))
       {
-        logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+        logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
 
         rotateHeadLogFile();
         headLogFile = getHeadLogFile();
@@ -392,6 +454,27 @@
     }
   }
 
+  private boolean mustRotate(LogFile<K, V> headLogFile)
+  {
+    if (lastAppendedKey == null)
+    {
+      // never rotate an empty file
+      return false;
+    }
+    if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+    {
+      // rotate because file size exceeded threshold
+      return true;
+    }
+    if (rotationIntervalInMillis > 0)
+    {
+      // rotate if time limit is reached
+      final long timeElapsed = timeService.since(lastRotationTime);
+      return timeElapsed > rotationIntervalInMillis;
+    }
+    return false;
+  }
+
   /**
    * Indicates if the provided record has a key that would break the key
    * ordering in the log.
@@ -711,6 +794,60 @@
     releaseLog(logPath);
   }
 
+  /**
+   * Find the highest key that corresponds to a record that is the oldest (or
+   * first) of a read-only log file and where value mapped from the record is
+   * lower or equals to provided limit value.
+   * <p>
+   * Example<br>
+   * Given a log with 3 log files, with Record<Int, String> and Mapper<String,
+   * Long> mapping a string to its long value
+   * <ul>
+   * <li>1_10.log where oldest record is (key=1, value="50")</li>
+   * <li>11_20.log where oldest record is (key=11, value="150")</li>
+   * <li>head.log where oldest record is (key=25, value="250")</li>
+   * </ul>
+   * Then
+   * <ul>
+   * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li>
+   * </ul>
+   *
+   * @param <V2>
+   *          Type of the value extracted from the record
+   * @param mapper
+   *          The mapper to extract a value from a record. It is expected that
+   *          extracted values are ordered according to an order consistent with
+   *          this log ordering, i.e. for two records, if key(R1) > key(R2) then
+   *          extractedValue(R1) > extractedValue(R2).
+   * @param limitValue
+   *          The limit value to search for
+   * @return the key or {@code null} if no key corresponds
+   * @throws ChangelogException
+   *           If a problem occurs
+   */
+  <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue)
+      throws ChangelogException
+  {
+    K key = null;
+    for (LogFile<K, V> logFile : logFiles.values())
+    {
+      final Record<K, V> record = logFile.getOldestRecord();
+      final V2 oldestValue = mapper.map(record.getValue());
+      if (oldestValue.compareTo(limitValue) > 0)
+      {
+        return key;
+      }
+      key = record.getKey();
+    }
+    return key;
+  }
+
   /** Effectively close this log. */
   private void doClose()
   {
@@ -766,6 +903,9 @@
 
     // Re-enable cursors previously opened on head, with the saved state
     updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
+
+    // Notify even if time-based rotation is not enabled, as it could be enabled at any time
+    replicationEnv.notifyLogFileRotation(this);
   }
 
   private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
index 24b25d2..a53c0e0 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
@@ -35,6 +35,18 @@
  */
 class Record<K, V>
 {
+  /** Map the record value to another value. */
+  static interface Mapper<V, V2> {
+      /**
+       * Map a record value to another value.
+       *
+       * @param value
+       *          The value to map
+       * @return the new value
+       */
+      V2 map(V value);
+  }
+
   private final K key;
   private final V value;
 
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 624cda1..066893e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -46,13 +46,16 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.time.TimeService;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.util.StaticUtils;
@@ -67,7 +70,8 @@
  * created :
  * <ul>
  * <li>A "changenumberindex" directory containing the log files for
- * ChangeNumberIndexDB</li>
+ * ChangeNumberIndexDB, and a file named "rotationtime[millis].last" where [millis] is
+ * the time of the last log file rotation in milliseconds</li>
  * <li>A "domains.state" file containing a mapping of each domain DN to an id. The
  * id is used to name the corresponding domain directory.</li>
  * <li>One directory per domain, named after "[id].domain" where [id] is the id
@@ -76,7 +80,7 @@
  * <p>
  * Each domain directory contains the following directories and files :
  * <ul>
- * <li>A "generation_[id].id" file, where [id] is the generation id</li>
+ * <li>A "generation[id].id" file, where [id] is the generation id</li>
  * <li>One directory per server id, named after "[id].server" where [id] is the
  * id of the server.</li>
  * </ul>
@@ -100,6 +104,7 @@
  * |   \---changenumberindex
  * |      \--- head.log [contains last records written]
  * |      \--- 1_50.log [contains records with keys in interval [1, 50]]
+ * |      \--- rtime198745512.last
  * |   \---1.domain
  * |       \---generation1.id
  * |       \---22.server
@@ -119,7 +124,9 @@
 {
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-  private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024;
+  private static final long CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 1024 * 1024;
+
+  private static final long REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 10 * CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES;
 
   private static final int NO_GENERATION_ID = -1;
 
@@ -129,9 +136,13 @@
 
   static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
 
+  static final String LAST_ROTATION_TIME_FILE_PREFIX = "rotationtime";
+
+  static final String LAST_ROTATION_TIME_FILE_SUFFIX = ".ms";
+
   private static final String DOMAIN_STATE_SEPARATOR = ":";
 
-  private static final String DOMAIN_SUFFIX = ".domain";
+  private static final String DOMAIN_SUFFIX = ".dom";
 
   private static final String SERVER_ID_SUFFIX = ".server";
 
@@ -170,6 +181,17 @@
     }
   };
 
+  private static final FileFilter LAST_ROTATION_TIME_FILE_FILTER = new FileFilter()
+  {
+    @Override
+    public boolean accept(File file)
+    {
+      return file.isFile()
+          && file.getName().startsWith(LAST_ROTATION_TIME_FILE_PREFIX)
+          && file.getName().endsWith(LAST_ROTATION_TIME_FILE_SUFFIX);
+    }
+  };
+
   /** Root path where the replication log is stored. */
   private final String replicationRootPath;
   /**
@@ -181,8 +203,16 @@
    */
   private final ChangelogState changelogState;
 
-  /** The list of logs that are in use. */
-  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
+  /** The list of logs that are in use for Replica DBs. */
+  private final List<Log<CSN, UpdateMsg>> logsReplicaDB = new CopyOnWriteArrayList<Log<CSN, UpdateMsg>>();
+
+  /**
+   * The list of logs that are in use for the CN Index DB.
+   * There is a single CN Index DB for a ReplicationServer, but there can be multiple references opened on it.
+   * This is the responsability of Log class to handle properly these multiple references.
+   */
+  private List<Log<Long, ChangeNumberIndexRecord>> logsCNIndexDB =
+      new CopyOnWriteArrayList<Log<Long, ChangeNumberIndexRecord>>();;
 
   /**
    * Maps each domain DN to a domain id that is used to name directory in file system.
@@ -205,6 +235,22 @@
 
   private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
 
+  /** The time service used for timing. */
+  private final TimeService timeService;
+
+  /**
+   * For CN Index DB, a log file can be rotated once it has exceeded a given time interval.
+   * <p>
+   * It is disabled if the interval is equals to zero.
+   * The interval can be modified at any time.
+   */
+  private long cnIndexDBRotationInterval;
+
+  /**
+   * For CN Index DB, the last time a log file was rotated.
+   * It is persisted to file each time it changes and read at server start. */
+  private long cnIndexDBLastRotationTime;
+
   /**
    * Creates the replication environment.
    *
@@ -212,15 +258,34 @@
    *          Root path where replication log is stored.
    * @param replicationServer
    *          The underlying replication server.
+   * @param timeService
+   *          Time service to use for timing.
    * @throws ChangelogException
    *           If an error occurs during initialization.
    */
   ReplicationEnvironment(final String rootPath,
-      final ReplicationServer replicationServer) throws ChangelogException
+      final ReplicationServer replicationServer, final TimeService timeService) throws ChangelogException
   {
     this.replicationRootPath = rootPath;
     this.replicationServer = replicationServer;
+    this.timeService = timeService;
     this.changelogState = readOnDiskChangelogState();
+    this.cnIndexDBLastRotationTime = readOnDiskLastRotationTime();
+  }
+
+  /**
+   * Sets the rotation time interval of a log file for the CN Index DB.
+   *
+   * @param timeInterval
+   *          time interval for rotation of a log file.
+   */
+  void setCNIndexDBRotationInterval(long timeInterval)
+  {
+    cnIndexDBRotationInterval = timeInterval;
+    for (Log<Long, ChangeNumberIndexRecord> log : logsCNIndexDB)
+    {
+      log.setRotationInterval(cnIndexDBRotationInterval);
+    }
   }
 
   /**
@@ -257,6 +322,16 @@
   }
 
   /**
+   * Return the last rotation time for CN Index DB log files.
+   *
+   * @return the last rotation time in millis
+   */
+  long getCnIndexDBLastRotationTime()
+  {
+    return cnIndexDBLastRotationTime;
+  }
+
+  /**
    * Finds or creates the log used to store changes from the replication server
    * with the given serverId and the given baseDN.
    *
@@ -299,7 +374,8 @@
         ensureGenerationIdFileExists(generationIdPath);
         changelogState.setDomainGenerationId(domainDN, generationId);
 
-        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
+        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER,
+            new LogRotationParameters(REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES, 0, 0), logsReplicaDB);
       }
     }
     catch (Exception e)
@@ -325,7 +401,9 @@
     final File path = getCNIndexDBPath();
     try
     {
-      return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
+      final LogRotationParameters rotationParams = new LogRotationParameters(CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES,
+          cnIndexDBRotationInterval, cnIndexDBLastRotationTime);
+      return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER, rotationParams, logsCNIndexDB);
     }
     catch (Exception e)
     {
@@ -344,7 +422,8 @@
   {
     if (isShuttingDown.compareAndSet(false, true))
     {
-      logs.clear();
+      logsReplicaDB.clear();
+      logsCNIndexDB.clear();
     }
   }
 
@@ -409,6 +488,25 @@
   }
 
   /**
+   * Notify that log file has been rotated for provided log.
+   *
+   * The last rotation time is persisted to a file and read at startup time.
+   *
+   * @param log
+   *          the log that has a file rotated.
+   * @throws ChangelogException
+   *            If a problem occurs
+   */
+  void notifyLogFileRotation(Log<?, ?> log) throws ChangelogException
+  {
+    // only CN Index DB log rotation time is persisted
+    if (logsCNIndexDB.contains(log))
+    {
+      updateCNIndexDBLastRotationTime(timeService.now());
+    }
+  }
+
+  /**
    * Notify that the replica corresponding to provided domain and provided CSN
    * is offline.
    *
@@ -654,16 +752,17 @@
   }
 
   /** Open a log from the provided path and record parser. */
-  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
-      throws ChangelogException
+  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser,
+      LogRotationParameters rotationParams, List<Log<K, V>> logsCache) throws ChangelogException
   {
     checkShutDownBeforeOpening(serverIdPath);
 
-    final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
+    final Log<K, V> log = Log.openLog(this, serverIdPath, parser, rotationParams);
 
     checkShutDownAfterOpening(serverIdPath, log);
 
-    logs.add(log);
+    logsCache.add(log);
+
     return log;
   }
 
@@ -718,6 +817,46 @@
     return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
   }
 
+  /**
+   * Retrieve the last rotation time from the disk.
+   *
+   * @return the last rotation time in millis (which is the current time if no
+   *         rotation file is found or if a problem occurs).
+   */
+  private long readOnDiskLastRotationTime()
+  {
+    try
+    {
+      final File file = retrieveLastRotationTimeFile();
+      if (file != null)
+      {
+        final String filename = file.getName();
+        final String value = filename.substring(LAST_ROTATION_TIME_FILE_PREFIX.length(),
+            filename.length() - LAST_ROTATION_TIME_FILE_SUFFIX.length());
+        return Long.valueOf(value);
+      }
+    }
+    catch (Exception e)
+    {
+      logger.trace(LocalizableMessage.raw("Error when retrieving last log file rotation time from file"), e);
+    }
+    // Default to current time
+    return timeService.now();
+  }
+
+  /**
+   * Retrieve the file named after the last rotation time from the provided
+   * directory.
+   *
+   * @return the last rotation time file or {@code null} if the corresponding file
+   *         can't be found
+   */
+  private File retrieveLastRotationTimeFile()
+  {
+    File[] files = getCNIndexDBPath().listFiles(LAST_ROTATION_TIME_FILE_FILTER);
+    return (files != null && files.length > 0) ? files[0] : null;
+  }
+
   private File getDomainPath(final String domainId)
   {
     return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
@@ -748,9 +887,16 @@
     return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
   }
 
+  private File getLastRotationTimePath(long lastRotationTime)
+  {
+    return new File(getCNIndexDBPath(),
+        LAST_ROTATION_TIME_FILE_PREFIX + lastRotationTime + LAST_ROTATION_TIME_FILE_SUFFIX);
+  }
+
   private void closeLog(final Log<?, ?> log)
   {
-    logs.remove(log);
+    logsReplicaDB.remove(log);
+    logsCNIndexDB.remove(log);
     log.close();
   }
 
@@ -789,6 +935,30 @@
     }
   }
 
+  private void updateCNIndexDBLastRotationTime(final long lastRotationTime) throws ChangelogException {
+    final File previousRotationFile = retrieveLastRotationTimeFile();
+    final File newRotationFile = getLastRotationTimePath(lastRotationTime);
+    try
+    {
+      newRotationFile.createNewFile();
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE.get(
+          newRotationFile.getPath(), lastRotationTime), e);
+    }
+    if (previousRotationFile != null)
+    {
+      final boolean isDeleted = previousRotationFile.delete();
+      if (!isDeleted)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE.get(
+            previousRotationFile.getPath()));
+      }
+    }
+    cnIndexDBLastRotationTime = lastRotationTime;
+  }
+
   private void ensureGenerationIdFileExists(final File generationIdPath)
       throws ChangelogException
   {
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
index 948a221..2eabd59 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -37,7 +37,6 @@
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -137,23 +136,12 @@
     }
   }
 
-
   /**
-   * This test makes basic operations of a ChangeNumberIndexDB:
-   * <ol>
-   * <li>create the db</li>
-   * <li>add records</li>
-   * <li>read them with a cursor</li>
-   * <li>set a very short trim period</li>
-   * <li>wait for the db to be trimmed / here since the changes are not stored
-   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
-   * </ol>
+   * This test verifies purge is working by relying on a very short purge delay.
+   * The purge can be done only if there is at least one read-only log file, so
+   * this test ensures the rotation happens, using the rotation based on time.
    */
-  // TODO : this works only if we ensure that there is a rotation of ahead log file
-  // at the right place. First two records are 37 and 76 bytes long,
-  // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
-  // Re-enable this test when max file size is customizable for log
-  @Test(enabled=false)
+  @Test
   public void testPurge() throws Exception
   {
     ReplicationServer replicationServer = null;
@@ -163,49 +151,32 @@
       final ChangelogDB changelogDB = replicationServer.getChangelogDB();
       changelogDB.setPurgeDelay(0); // disable purging
 
-      // Prepare data to be stored in the db
-      DN baseDN1 = DN.valueOf("o=test1");
-      DN baseDN2 = DN.valueOf("o=test2");
-      DN baseDN3 = DN.valueOf("o=test3");
-
-      CSN[] csns = generateCSNs(1, 0, 3);
-
       // Add records
+      DN[] baseDNs = { DN.valueOf("o=test1"), DN.valueOf("o=test2"), DN.valueOf("o=test3"), DN.valueOf("o=test4") };
+      CSN[] csns = generateCSNs(1, 0, 4);
       final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
-      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
-                 addRecord(cnIndexDB, baseDN2, csns[1]);
-      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+      long cn0 = addRecord(cnIndexDB, baseDNs[0], csns[0]);
+                 addRecord(cnIndexDB, baseDNs[1], csns[1]);
+      long cn2 = addRecord(cnIndexDB, baseDNs[2], csns[2]);
 
-      // The ChangeNumber should not get purged
-      final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
-      assertEquals(oldestCN, cn1);
-      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+      // The CN DB should not be purged at this point
+      assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn0);
+      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn2);
 
-      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
-      try
-      {
-        assertTrue(cursor.next());
-        assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
-        assertTrue(cursor.next());
-        assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
-        assertTrue(cursor.next());
-        assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
-        assertFalse(cursor.next());
-      }
-      finally
-      {
-        StaticUtils.close(cursor);
-      }
+      // change the purge delay to a very short time
+      changelogDB.setPurgeDelay(5);
+      Thread.sleep(50);
+      // add a new record to force the rotation of the log
+      addRecord(cnIndexDB, baseDNs[3], csns[3]);
 
-      // Now test that purging removes all changes but the last one
-      changelogDB.setPurgeDelay(1);
+      // Now all changes should have been purged but the last one
       int count = 0;
       while (cnIndexDB.count() > 1 && count < 100)
       {
         Thread.sleep(10);
         count++;
       }
-      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
+      assertOnlyNewestRecordIsLeft(cnIndexDB, 4);
     }
     finally
     {
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index 96640ca..15f24e6 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -34,6 +34,7 @@
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.util.time.TimeService;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -423,7 +424,7 @@
       replicationServer = configureReplicationServer(100000, 10);
 
       testRoot = createCleanDir();
-      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
+      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer, TimeService.SYSTEM);
       replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
 
       // Populate the db with 'max' msg
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
index b810314..b3dab49 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -26,6 +26,7 @@
 package org.opends.server.replication.server.changelog.file;
 
 import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
 import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
 import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
@@ -39,7 +40,9 @@
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
 import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
 import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
+import org.opends.server.replication.server.changelog.file.Record.Mapper;
 import org.opends.server.util.StaticUtils;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -52,6 +55,8 @@
   // Use a directory dedicated to this test class
   private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
 
+  private static final long NO_TIME_BASED_LOG_ROTATION = 0;
+
   @BeforeMethod
   public void initialize() throws Exception
   {
@@ -79,9 +84,12 @@
     // This allow to ensure rotation mechanism is thoroughly tested
     // Some tests rely on having 2 records per log file (especially the purge tests), so take care
     // if this value has to be changed
-    int sizeLimitPerFileInBytes = 30;
+    final int sizeLimitPerFileInBytes = 30;
+    final LogRotationParameters rotationParams = new LogRotationParameters(sizeLimitPerFileInBytes,
+        NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION);
+    final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
 
-    return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
+    return Log.openLog(replicationEnv, LOG_DIRECTORY, parser, rotationParams);
   }
 
   @Test
@@ -366,8 +374,11 @@
     Log<String, String> writeLog = null;
     try
     {
-      long sizeOf1MB = 1024*1024;
-      writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
+      long sizeOf10MB = 10*1024*1024;
+      final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION,
+          NO_TIME_BASED_LOG_ROTATION);
+      final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
+      writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams);
 
       for (int i = 1; i < 1000000; i++)
       {
@@ -514,12 +525,10 @@
   public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
       int cursorStartIndex, int cursorEndIndex) throws Exception
   {
-    Log<String, String> log = null;
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
     DBCursor<Record<String, String>> cursor = null;
     try
     {
-      log = openLog(LogFileTest.RECORD_PARSER);
-
       log.purgeUpTo(purgeKey);
 
       cursor = log.getCursor();
@@ -533,6 +542,51 @@
     }
   }
 
+  static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
+      {
+        @Override
+        public Integer map(String value)
+        {
+          // extract numeric value, e.g. from "value10" return 10
+          return Integer.valueOf(value.substring("value".length()));
+        }
+      };
+
+  @DataProvider
+  Object[][] findBoundaryKeyData()
+  {
+    return new Object[][] {
+       // limit value, expected key
+       { 0, null },
+       { 1, "key001" },
+       { 2, "key001" },
+       { 3, "key003" },
+       { 4, "key003" },
+       { 5, "key005" },
+       { 6, "key005" },
+       { 7, "key007" },
+       { 8, "key007" },
+       { 9, "key009" },
+       { 10, "key009" },
+       { 11, "key009" },
+       { 12, "key009" },
+    };
+  }
+
+  @Test(dataProvider = "findBoundaryKeyData")
+  public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    try
+    {
+      assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey);
+    }
+    finally
+    {
+      StaticUtils.close(log);
+    }
+  }
+
   private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
       throws Exception
   {
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index 3eeca60..57025f7 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -31,12 +31,14 @@
 import java.util.List;
 
 import org.assertj.core.data.MapEntry;
+import org.forgerock.util.time.TimeService;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.CSNGenerator;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.DN;
@@ -48,6 +50,7 @@
 import org.testng.annotations.Test;
 
 import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
 import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
 
 @SuppressWarnings("javadoc")
@@ -94,7 +97,7 @@
     {
       final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       final DN domainDN = DN.valueOf(DN1_AS_STRING);
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
       replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
@@ -122,7 +125,7 @@
     {
       File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING));
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       for (int i = 0; i <= 2 ; i++)
       {
@@ -163,7 +166,7 @@
     {
       final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       final DN domainDN = DN.valueOf(DN1_AS_STRING);
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
 
@@ -193,7 +196,7 @@
     {
       final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       final DN domainDN = DN.valueOf(DN1_AS_STRING);
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
 
@@ -218,7 +221,7 @@
       final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       final DN domainDN = DN.valueOf(DN1_AS_STRING);
 
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
 
@@ -249,7 +252,7 @@
       final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       final DN domainDN = DN.valueOf(DN1_AS_STRING);
 
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
 
@@ -278,7 +281,7 @@
       final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       final DN domainDN = DN.valueOf(DN1_AS_STRING);
 
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       cnDB = environment.getOrCreateCNIndexDB();
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
       CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
@@ -309,13 +312,13 @@
     {
       File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
       DN domainDN = DN.valueOf(DN1_AS_STRING);
-      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      ReplicationEnvironment environment = createReplicationEnv(rootPath);
       replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
       replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
 
       // delete the domain directory created for the 2 replica DBs to break the
       // consistency with domain state file
-      StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+      StaticUtils.recursiveDelete(new File(rootPath, "1.dom"));
 
       environment.readOnDiskChangelogState();
     }
@@ -324,4 +327,50 @@
       StaticUtils.close(cnDB, replicaDB, replicaDB2);
     }
   }
+
+  private ReplicationEnvironment createReplicationEnv(File rootPath) throws ChangelogException
+  {
+    ReplicationServer unusedReplicationServer = null;
+    return new ReplicationEnvironment(rootPath.getAbsolutePath(), unusedReplicationServer, TimeService.SYSTEM);
+  }
+
+  @Test
+  public void testLastRotationTimeRetrievalWithNoRotationFile() throws Exception
+  {
+    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+    TimeService time = mock(TimeService.class);
+    when(time.now()).thenReturn(100L);
+    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
+
+    assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(100L);
+  }
+
+  @Test
+  public void testLastRotationTimeRetrievalWithRotationFile() throws Exception
+  {
+    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+    final TimeService time = mock(TimeService.class);
+    when(time.now()).thenReturn(100L, 200L);
+    ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
+    Log<Long,ChangeNumberIndexRecord> cnIndexDB = environment.getOrCreateCNIndexDB();
+
+    try {
+      environment.notifyLogFileRotation(cnIndexDB);
+
+      // check runtime change of last rotation time is effective
+      // this should also persist the time in a file, but this is checked later in the test
+      assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L);
+    }
+    finally
+    {
+      cnIndexDB.close();
+      environment.shutdown();
+    }
+
+    // now check last rotation time is correctly read from persisted file when re-creating environment
+    when(time.now()).thenReturn(0L);
+    environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
+    assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L);
+  }
+
 }

--
Gitblit v1.10.0