From 0fbd274fb694c50ed0b599dfbda44560070b8f8a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 23 Dec 2014 11:39:41 +0000
Subject: [PATCH] OPENDJ-1690 CR-5724 Fix file-based changelog purging
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java | 174 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 files changed, 157 insertions(+), 17 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 3ffc90f..693cab2 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -51,6 +51,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
+import org.forgerock.util.time.TimeService;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -74,9 +75,9 @@
* the string representation of lowest and highest key present in the log file.</li>
* </ul>
* A read-only log file is created each time the head log file has reached the
- * maximum size limit. The head log file is then rotated to the read-only file and a
- * new empty head log file is opened. There is no limit on the number of read-only
- * files, but they can be purged.
+ * maximum size limit or the time limit. The head log file is then rotated to the
+ * read-only file and a new empty head log file is opened. There is no limit on the
+ * number of read-only files, but they can be purged.
* <p>
* A log is obtained using the {@code Log.openLog()} method and must always be
* released using the {@code close()} method.
@@ -170,14 +171,23 @@
private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
/**
- * A log file is rotated once it has exceeded this size limit. The log file can have
+ * A log file can be rotated once it has exceeded this size limit. The log file can have
* a size much larger than this limit if the last record written has a huge size.
*
- * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+ * TODO : to be replaced later by a list of configurable Rotation policy
* eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
*/
private final long sizeLimitPerLogFileInBytes;
+ /** The time service used for timing. It is package private so it can be modified by test case. */
+ TimeService timeService = TimeService.SYSTEM;
+
+ /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
+ private long rotationIntervalInMillis;
+
+ /** The last time a log file was rotated. */
+ private long lastRotationTime;
+
/**
* The exclusive lock used for writes and lifecycle operations on this log:
* initialize, clear, sync and close.
@@ -190,6 +200,12 @@
private final Lock sharedLock;
/**
+ * The replication environment used to create this log. The log is notifying it for any change
+ * that must be persisted.
+ */
+ private final ReplicationEnvironment replicationEnv;
+
+ /**
* Open a log with the provided log path, record parser and maximum size per
* log file.
* <p>
@@ -199,25 +215,28 @@
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
+ * @param replicationEnv
+ * The replication environment used to create this log.
* @param logPath
* Path of the log.
* @param parser
* Parser for encoding/decoding of records.
- * @param sizeLimitPerFileInBytes
- * Limit in bytes before rotating the head log file of the log.
+ * @param rotationParameters
+ * Parameters for the log files rotation.
* @return a log
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
- final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+ static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv,
+ final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters)
+ throws ChangelogException
{
Reject.ifNull(logPath, parser);
@SuppressWarnings("unchecked")
Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
if (log == null)
{
- log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+ log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters);
logsCache.put(logPath, log);
}
else
@@ -229,6 +248,43 @@
return log;
}
+ /** Holds the parameters for log files rotation. */
+ static class LogRotationParameters {
+
+ private final long sizeLimitPerFileInBytes;
+ private final long rotationInterval;
+ private final long lastRotationTime;
+
+ /**
+ * Creates rotation parameters.
+ *
+ * @param sizeLimitPerFileInBytes
+ * Size limit before rotating a log file.
+ * @param rotationInterval
+ * Time interval before rotating a log file.
+ * @param lastRotationTime
+ * Last time a log file was rotated.
+ */
+ LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
+ {
+ this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
+ this.rotationInterval = rotationInterval;
+ this.lastRotationTime = lastRotationTime;
+ }
+
+ }
+
+ /**
+ * Set the time interval for rotation of log file.
+ *
+ * @param rotationIntervalInMillis
+ * time interval before rotation of log file
+ */
+ void setRotationInterval(long rotationIntervalInMillis)
+ {
+ this.rotationIntervalInMillis = rotationIntervalInMillis;
+ }
+
/**
* Release a reference to the log corresponding to provided path. The log is
* closed if this is the last reference.
@@ -256,21 +312,27 @@
/**
* Creates a new log.
*
+ * @param replicationEnv
+ * The replication environment used to create this log.
* @param logPath
* The directory path of the log.
* @param parser
* Parser of records.
- * @param sizeLimitPerFile
- * Limit in bytes before rotating a log file.
+ * @param rotationParams
+ * Parameters for log-file rotation.
+ *
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
- throws ChangelogException
+ private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser,
+ final LogRotationParameters rotationParams) throws ChangelogException
{
+ this.replicationEnv = replicationEnv;
this.logPath = logPath;
this.recordParser = parser;
- this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+ this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
+ this.rotationIntervalInMillis = rotationParams.rotationInterval;
+ this.lastRotationTime = rotationParams.lastRotationTime;
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -376,9 +438,9 @@
return;
}
LogFile<K, V> headLogFile = getHeadLogFile();
- if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+ if (mustRotate(headLogFile))
{
- logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+ logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
@@ -392,6 +454,27 @@
}
}
+ private boolean mustRotate(LogFile<K, V> headLogFile)
+ {
+ if (lastAppendedKey == null)
+ {
+ // never rotate an empty file
+ return false;
+ }
+ if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+ {
+ // rotate because file size exceeded threshold
+ return true;
+ }
+ if (rotationIntervalInMillis > 0)
+ {
+ // rotate if time limit is reached
+ final long timeElapsed = timeService.since(lastRotationTime);
+ return timeElapsed > rotationIntervalInMillis;
+ }
+ return false;
+ }
+
/**
* Indicates if the provided record has a key that would break the key
* ordering in the log.
@@ -711,6 +794,60 @@
releaseLog(logPath);
}
+ /**
+ * Find the highest key that corresponds to a record that is the oldest (or
+ * first) of a read-only log file and where value mapped from the record is
+ * lower or equals to provided limit value.
+ * <p>
+ * Example<br>
+ * Given a log with 3 log files, with Record<Int, String> and Mapper<String,
+ * Long> mapping a string to its long value
+ * <ul>
+ * <li>1_10.log where oldest record is (key=1, value="50")</li>
+ * <li>11_20.log where oldest record is (key=11, value="150")</li>
+ * <li>head.log where oldest record is (key=25, value="250")</li>
+ * </ul>
+ * Then
+ * <ul>
+ * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li>
+ * </ul>
+ *
+ * @param <V2>
+ * Type of the value extracted from the record
+ * @param mapper
+ * The mapper to extract a value from a record. It is expected that
+ * extracted values are ordered according to an order consistent with
+ * this log ordering, i.e. for two records, if key(R1) > key(R2) then
+ * extractedValue(R1) > extractedValue(R2).
+ * @param limitValue
+ * The limit value to search for
+ * @return the key or {@code null} if no key corresponds
+ * @throws ChangelogException
+ * If a problem occurs
+ */
+ <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue)
+ throws ChangelogException
+ {
+ K key = null;
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ final Record<K, V> record = logFile.getOldestRecord();
+ final V2 oldestValue = mapper.map(record.getValue());
+ if (oldestValue.compareTo(limitValue) > 0)
+ {
+ return key;
+ }
+ key = record.getKey();
+ }
+ return key;
+ }
+
/** Effectively close this log. */
private void doClose()
{
@@ -766,6 +903,9 @@
// Re-enable cursors previously opened on head, with the saved state
updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
+
+ // Notify even if time-based rotation is not enabled, as it could be enabled at any time
+ replicationEnv.notifyLogFileRotation(this);
}
private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
--
Gitblit v1.10.0