From 0fbd274fb694c50ed0b599dfbda44560070b8f8a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 23 Dec 2014 11:39:41 +0000
Subject: [PATCH] OPENDJ-1690 CR-5724 Fix file-based changelog purging

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java |  174 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 157 insertions(+), 17 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 3ffc90f..693cab2 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -51,6 +51,7 @@
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.util.Reject;
 import org.forgerock.util.Utils;
+import org.forgerock.util.time.TimeService;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -74,9 +75,9 @@
  * the string representation of lowest and highest key present in the log file.</li>
  * </ul>
  * A read-only log file is created each time the head log file has reached the
- * maximum size limit. The head log file is then rotated to the read-only file and a
- * new empty head log file is opened. There is no limit on the number of read-only
- * files, but they can be purged.
+ * maximum size limit or the time limit. The head log file is then rotated to the
+ * read-only file and a new empty head log file is opened. There is no limit on the
+ * number of read-only files, but they can be purged.
  * <p>
  * A log is obtained using the {@code Log.openLog()} method and must always be
  * released using the {@code close()} method.
@@ -170,14 +171,23 @@
   private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
 
   /**
-   * A log file is rotated once it has exceeded this size limit. The log file can have
+   * A log file can be rotated once it has exceeded this size limit. The log file can have
    * a size much larger than this limit if the last record written has a huge size.
    *
-   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+   * TODO : to be replaced later by a list of configurable Rotation policy
    * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
    */
   private final long sizeLimitPerLogFileInBytes;
 
+  /** The time service used for timing. It is package private so it can be modified by test case. */
+  TimeService timeService = TimeService.SYSTEM;
+
+  /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
+  private long rotationIntervalInMillis;
+
+  /** The last time a log file was rotated. */
+  private long lastRotationTime;
+
   /**
    * The exclusive lock used for writes and lifecycle operations on this log:
    * initialize, clear, sync and close.
@@ -190,6 +200,12 @@
   private final Lock sharedLock;
 
   /**
+   * The replication environment used to create this log. The log is notifying it for any change
+   * that must be persisted.
+   */
+  private final ReplicationEnvironment replicationEnv;
+
+  /**
    * Open a log with the provided log path, record parser and maximum size per
    * log file.
    * <p>
@@ -199,25 +215,28 @@
    *          Type of the key of a record, which must be comparable.
    * @param <V>
    *          Type of the value of a record.
+   * @param replicationEnv
+   *          The replication environment used to create this log.
    * @param logPath
    *          Path of the log.
    * @param parser
    *          Parser for encoding/decoding of records.
-   * @param sizeLimitPerFileInBytes
-   *          Limit in bytes before rotating the head log file of the log.
+   * @param rotationParameters
+   *          Parameters for the log files rotation.
    * @return a log
    * @throws ChangelogException
    *           If a problem occurs during initialization.
    */
-  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
-      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv,
+      final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters)
+      throws ChangelogException
   {
     Reject.ifNull(logPath, parser);
     @SuppressWarnings("unchecked")
     Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
     if (log == null)
     {
-      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+      log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters);
       logsCache.put(logPath, log);
     }
     else
@@ -229,6 +248,43 @@
     return log;
   }
 
+  /** Holds the parameters for log files rotation. */
+  static class LogRotationParameters {
+
+    private final long sizeLimitPerFileInBytes;
+    private final long rotationInterval;
+    private final long lastRotationTime;
+
+    /**
+     * Creates rotation parameters.
+     *
+     * @param sizeLimitPerFileInBytes
+     *           Size limit before rotating a log file.
+     * @param rotationInterval
+     *           Time interval before rotating a log file.
+     * @param lastRotationTime
+     *           Last time a log file was rotated.
+     */
+    LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
+    {
+      this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
+      this.rotationInterval = rotationInterval;
+      this.lastRotationTime = lastRotationTime;
+    }
+
+  }
+
+  /**
+   * Set the time interval for rotation of log file.
+   *
+   * @param rotationIntervalInMillis
+   *           time interval before rotation of log file
+   */
+  void setRotationInterval(long rotationIntervalInMillis)
+  {
+    this.rotationIntervalInMillis = rotationIntervalInMillis;
+  }
+
   /**
    * Release a reference to the log corresponding to provided path. The log is
    * closed if this is the last reference.
@@ -256,21 +312,27 @@
   /**
    * Creates a new log.
    *
+   * @param replicationEnv
+   *            The replication environment used to create this log.
    * @param logPath
    *            The directory path of the log.
    * @param parser
    *          Parser of records.
-   * @param sizeLimitPerFile
-   *            Limit in bytes before rotating a log file.
+   * @param rotationParams
+   *          Parameters for log-file rotation.
+   *
    * @throws ChangelogException
    *            If a problem occurs during initialization.
    */
-  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
-      throws ChangelogException
+  private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser,
+      final LogRotationParameters rotationParams) throws ChangelogException
   {
+    this.replicationEnv = replicationEnv;
     this.logPath = logPath;
     this.recordParser = parser;
-    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+    this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
+    this.rotationIntervalInMillis = rotationParams.rotationInterval;
+    this.lastRotationTime = rotationParams.lastRotationTime;
     this.referenceCount = 1;
 
     final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -376,9 +438,9 @@
         return;
       }
       LogFile<K, V> headLogFile = getHeadLogFile();
-      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+      if (mustRotate(headLogFile))
       {
-        logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+        logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
 
         rotateHeadLogFile();
         headLogFile = getHeadLogFile();
@@ -392,6 +454,27 @@
     }
   }
 
+  private boolean mustRotate(LogFile<K, V> headLogFile)
+  {
+    if (lastAppendedKey == null)
+    {
+      // never rotate an empty file
+      return false;
+    }
+    if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+    {
+      // rotate because file size exceeded threshold
+      return true;
+    }
+    if (rotationIntervalInMillis > 0)
+    {
+      // rotate if time limit is reached
+      final long timeElapsed = timeService.since(lastRotationTime);
+      return timeElapsed > rotationIntervalInMillis;
+    }
+    return false;
+  }
+
   /**
    * Indicates if the provided record has a key that would break the key
    * ordering in the log.
@@ -711,6 +794,60 @@
     releaseLog(logPath);
   }
 
+  /**
+   * Find the highest key that corresponds to a record that is the oldest (or
+   * first) of a read-only log file and where value mapped from the record is
+   * lower or equals to provided limit value.
+   * <p>
+   * Example<br>
+   * Given a log with 3 log files, with Record<Int, String> and Mapper<String,
+   * Long> mapping a string to its long value
+   * <ul>
+   * <li>1_10.log where oldest record is (key=1, value="50")</li>
+   * <li>11_20.log where oldest record is (key=11, value="150")</li>
+   * <li>head.log where oldest record is (key=25, value="250")</li>
+   * </ul>
+   * Then
+   * <ul>
+   * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li>
+   * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li>
+   * </ul>
+   *
+   * @param <V2>
+   *          Type of the value extracted from the record
+   * @param mapper
+   *          The mapper to extract a value from a record. It is expected that
+   *          extracted values are ordered according to an order consistent with
+   *          this log ordering, i.e. for two records, if key(R1) > key(R2) then
+   *          extractedValue(R1) > extractedValue(R2).
+   * @param limitValue
+   *          The limit value to search for
+   * @return the key or {@code null} if no key corresponds
+   * @throws ChangelogException
+   *           If a problem occurs
+   */
+  <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue)
+      throws ChangelogException
+  {
+    K key = null;
+    for (LogFile<K, V> logFile : logFiles.values())
+    {
+      final Record<K, V> record = logFile.getOldestRecord();
+      final V2 oldestValue = mapper.map(record.getValue());
+      if (oldestValue.compareTo(limitValue) > 0)
+      {
+        return key;
+      }
+      key = record.getKey();
+    }
+    return key;
+  }
+
   /** Effectively close this log. */
   private void doClose()
   {
@@ -766,6 +903,9 @@
 
     // Re-enable cursors previously opened on head, with the saved state
     updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
+
+    // Notify even if time-based rotation is not enabled, as it could be enabled at any time
+    replicationEnv.notifyLogFileRotation(this);
   }
 
   private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException

--
Gitblit v1.10.0