From bef7c528fe33395b154145e033b717210ba63060 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 23 Dec 2014 11:39:41 +0000
Subject: [PATCH] OPENDJ-1690 CR-5724 Fix file-based changelog purging
---
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java | 67 +----
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java | 67 +++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java | 12 +
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java | 200 ++++++++++++++++-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 20 +
opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties | 8
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java | 174 ++++++++++++++-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 9
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 3
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java | 68 +++++
10 files changed, 526 insertions(+), 102 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 97fa2f5..28a7c0d 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -626,5 +626,9 @@
perform a search request on cn=changelog
ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \
searching base DN '%s' with filter '%s' in changelog backend : %s
-ERR_CHANGELOG_BACKEND_ATTRIBUTE_287 =An error occurred when \
- retrieving attribute value for attribute '%s' for entry DN '%s' in changelog backend : %s
\ No newline at end of file
+ERR_CHANGELOG_BACKEND_ATTRIBUTE_287=An error occurred when \
+ retrieving attribute value for attribute '%s' for entry DN '%s' in changelog backend : %s
+ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE_288=Could not create \
+ file '%s' to store last log rotation time %d
+ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE_289=Could not delete \
+ file '%s' that stored the previous last log rotation time
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 84b1d98..aa5543b 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -59,6 +59,16 @@
/** The parser of records stored in this ChangeNumberIndexDB. */
static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
+ static final Record.Mapper<ChangeNumberIndexRecord, CSN> MAPPER_TO_CSN =
+ new Record.Mapper<ChangeNumberIndexRecord, CSN>()
+ {
+ @Override
+ public CSN map(ChangeNumberIndexRecord value)
+ {
+ return value.getCSN();
+ }
+ };
+
/** The log in which records are persisted. */
private final Log<Long, ChangeNumberIndexRecord> log;
@@ -231,8 +241,14 @@
{
return null;
}
- final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
- return record != null ? record.getValue().getCSN() : null;
+ // Retrieve the oldest change number that must not be purged
+ final Long purgeChangeNumber = log.findBoundaryKeyFromRecord(MAPPER_TO_CSN, purgeCSN);
+ if (purgeChangeNumber != null)
+ {
+ final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeChangeNumber);
+ return record != null ? record.getValue().getCSN() : null;
+ }
+ return null;
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 9544a75..41970d4 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,6 +41,7 @@
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.util.time.TimeService;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.ChangelogBackend;
@@ -296,7 +297,7 @@
try
{
final File dbDir = getFileForPath(config.getReplicationDBDirectory());
- replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
+ replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer, TimeService.SYSTEM);
final ChangelogState changelogState = replicationEnv.getChangelogState();
initializeToChangelogState(changelogState);
if (config.isComputeChangeNumber())
@@ -574,6 +575,12 @@
public void setPurgeDelay(final long purgeDelayInMillis)
{
this.purgeDelayInMillis = purgeDelayInMillis;
+
+ // Rotation time interval for CN Index DB log file
+ // needs to be a fraction of the purge delay
+ // to ensure there is at least one file to purge
+ replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2);
+
if (purgeDelayInMillis > 0)
{
final ChangelogDBPurger newPurger = new ChangelogDBPurger();
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 3ffc90f..693cab2 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -51,6 +51,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
+import org.forgerock.util.time.TimeService;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -74,9 +75,9 @@
* the string representation of lowest and highest key present in the log file.</li>
* </ul>
* A read-only log file is created each time the head log file has reached the
- * maximum size limit. The head log file is then rotated to the read-only file and a
- * new empty head log file is opened. There is no limit on the number of read-only
- * files, but they can be purged.
+ * maximum size limit or the time limit. The head log file is then rotated to the
+ * read-only file and a new empty head log file is opened. There is no limit on the
+ * number of read-only files, but they can be purged.
* <p>
* A log is obtained using the {@code Log.openLog()} method and must always be
* released using the {@code close()} method.
@@ -170,14 +171,23 @@
private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
/**
- * A log file is rotated once it has exceeded this size limit. The log file can have
+ * A log file can be rotated once it has exceeded this size limit. The log file can have
* a size much larger than this limit if the last record written has a huge size.
*
- * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+ * TODO : to be replaced later by a list of configurable Rotation policy
* eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
*/
private final long sizeLimitPerLogFileInBytes;
+ /** The time service used for timing. It is package private so it can be modified by test case. */
+ TimeService timeService = TimeService.SYSTEM;
+
+ /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
+ private long rotationIntervalInMillis;
+
+ /** The last time a log file was rotated. */
+ private long lastRotationTime;
+
/**
* The exclusive lock used for writes and lifecycle operations on this log:
* initialize, clear, sync and close.
@@ -190,6 +200,12 @@
private final Lock sharedLock;
/**
+ * The replication environment used to create this log. The log is notifying it for any change
+ * that must be persisted.
+ */
+ private final ReplicationEnvironment replicationEnv;
+
+ /**
* Open a log with the provided log path, record parser and maximum size per
* log file.
* <p>
@@ -199,25 +215,28 @@
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
+ * @param replicationEnv
+ * The replication environment used to create this log.
* @param logPath
* Path of the log.
* @param parser
* Parser for encoding/decoding of records.
- * @param sizeLimitPerFileInBytes
- * Limit in bytes before rotating the head log file of the log.
+ * @param rotationParameters
+ * Parameters for the log files rotation.
* @return a log
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
- final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+ static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv,
+ final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters)
+ throws ChangelogException
{
Reject.ifNull(logPath, parser);
@SuppressWarnings("unchecked")
Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
if (log == null)
{
- log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+ log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters);
logsCache.put(logPath, log);
}
else
@@ -229,6 +248,43 @@
return log;
}
+ /** Holds the parameters for log files rotation. */
+ static class LogRotationParameters {
+
+ private final long sizeLimitPerFileInBytes;
+ private final long rotationInterval;
+ private final long lastRotationTime;
+
+ /**
+ * Creates rotation parameters.
+ *
+ * @param sizeLimitPerFileInBytes
+ * Size limit before rotating a log file.
+ * @param rotationInterval
+ * Time interval before rotating a log file.
+ * @param lastRotationTime
+ * Last time a log file was rotated.
+ */
+ LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
+ {
+ this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
+ this.rotationInterval = rotationInterval;
+ this.lastRotationTime = lastRotationTime;
+ }
+
+ }
+
+ /**
+ * Set the time interval for rotation of log file.
+ *
+ * @param rotationIntervalInMillis
+ * time interval before rotation of log file
+ */
+ void setRotationInterval(long rotationIntervalInMillis)
+ {
+ this.rotationIntervalInMillis = rotationIntervalInMillis;
+ }
+
/**
* Release a reference to the log corresponding to provided path. The log is
* closed if this is the last reference.
@@ -256,21 +312,27 @@
/**
* Creates a new log.
*
+ * @param replicationEnv
+ * The replication environment used to create this log.
* @param logPath
* The directory path of the log.
* @param parser
* Parser of records.
- * @param sizeLimitPerFile
- * Limit in bytes before rotating a log file.
+ * @param rotationParams
+ * Parameters for log-file rotation.
+ *
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
- throws ChangelogException
+ private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser,
+ final LogRotationParameters rotationParams) throws ChangelogException
{
+ this.replicationEnv = replicationEnv;
this.logPath = logPath;
this.recordParser = parser;
- this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+ this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
+ this.rotationIntervalInMillis = rotationParams.rotationInterval;
+ this.lastRotationTime = rotationParams.lastRotationTime;
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -376,9 +438,9 @@
return;
}
LogFile<K, V> headLogFile = getHeadLogFile();
- if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+ if (mustRotate(headLogFile))
{
- logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+ logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
@@ -392,6 +454,27 @@
}
}
+ private boolean mustRotate(LogFile<K, V> headLogFile)
+ {
+ if (lastAppendedKey == null)
+ {
+ // never rotate an empty file
+ return false;
+ }
+ if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+ {
+ // rotate because file size exceeded threshold
+ return true;
+ }
+ if (rotationIntervalInMillis > 0)
+ {
+ // rotate if time limit is reached
+ final long timeElapsed = timeService.since(lastRotationTime);
+ return timeElapsed > rotationIntervalInMillis;
+ }
+ return false;
+ }
+
/**
* Indicates if the provided record has a key that would break the key
* ordering in the log.
@@ -711,6 +794,60 @@
releaseLog(logPath);
}
+ /**
+ * Find the highest key that corresponds to a record that is the oldest (or
+ * first) of a read-only log file and where value mapped from the record is
+ * lower or equals to provided limit value.
+ * <p>
+ * Example<br>
+ * Given a log with 3 log files, with Record<Int, String> and Mapper<String,
+ * Long> mapping a string to its long value
+ * <ul>
+ * <li>1_10.log where oldest record is (key=1, value="50")</li>
+ * <li>11_20.log where oldest record is (key=11, value="150")</li>
+ * <li>head.log where oldest record is (key=25, value="250")</li>
+ * </ul>
+ * Then
+ * <ul>
+ * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li>
+ * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li>
+ * </ul>
+ *
+ * @param <V2>
+ * Type of the value extracted from the record
+ * @param mapper
+ * The mapper to extract a value from a record. It is expected that
+ * extracted values are ordered according to an order consistent with
+ * this log ordering, i.e. for two records, if key(R1) > key(R2) then
+ * extractedValue(R1) > extractedValue(R2).
+ * @param limitValue
+ * The limit value to search for
+ * @return the key or {@code null} if no key corresponds
+ * @throws ChangelogException
+ * If a problem occurs
+ */
+ <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue)
+ throws ChangelogException
+ {
+ K key = null;
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ final Record<K, V> record = logFile.getOldestRecord();
+ final V2 oldestValue = mapper.map(record.getValue());
+ if (oldestValue.compareTo(limitValue) > 0)
+ {
+ return key;
+ }
+ key = record.getKey();
+ }
+ return key;
+ }
+
/** Effectively close this log. */
private void doClose()
{
@@ -766,6 +903,9 @@
// Re-enable cursors previously opened on head, with the saved state
updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
+
+ // Notify even if time-based rotation is not enabled, as it could be enabled at any time
+ replicationEnv.notifyLogFileRotation(this);
}
private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
index 24b25d2..a53c0e0 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
@@ -35,6 +35,18 @@
*/
class Record<K, V>
{
+ /** Map the record value to another value. */
+ static interface Mapper<V, V2> {
+ /**
+ * Map a record value to another value.
+ *
+ * @param value
+ * The value to map
+ * @return the new value
+ */
+ V2 map(V value);
+ }
+
private final K key;
private final V value;
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 624cda1..066893e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -46,13 +46,16 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.time.TimeService;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
@@ -67,7 +70,8 @@
* created :
* <ul>
* <li>A "changenumberindex" directory containing the log files for
- * ChangeNumberIndexDB</li>
+ * ChangeNumberIndexDB, and a file named "rotationtime[millis].last" where [millis] is
+ * the time of the last log file rotation in milliseconds</li>
* <li>A "domains.state" file containing a mapping of each domain DN to an id. The
* id is used to name the corresponding domain directory.</li>
* <li>One directory per domain, named after "[id].domain" where [id] is the id
@@ -76,7 +80,7 @@
* <p>
* Each domain directory contains the following directories and files :
* <ul>
- * <li>A "generation_[id].id" file, where [id] is the generation id</li>
+ * <li>A "generation[id].id" file, where [id] is the generation id</li>
* <li>One directory per server id, named after "[id].server" where [id] is the
* id of the server.</li>
* </ul>
@@ -100,6 +104,7 @@
* | \---changenumberindex
* | \--- head.log [contains last records written]
* | \--- 1_50.log [contains records with keys in interval [1, 50]]
+ * | \--- rtime198745512.last
* | \---1.domain
* | \---generation1.id
* | \---22.server
@@ -119,7 +124,9 @@
{
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024;
+ private static final long CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 1024 * 1024;
+
+ private static final long REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 10 * CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES;
private static final int NO_GENERATION_ID = -1;
@@ -129,9 +136,13 @@
static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+ static final String LAST_ROTATION_TIME_FILE_PREFIX = "rotationtime";
+
+ static final String LAST_ROTATION_TIME_FILE_SUFFIX = ".ms";
+
private static final String DOMAIN_STATE_SEPARATOR = ":";
- private static final String DOMAIN_SUFFIX = ".domain";
+ private static final String DOMAIN_SUFFIX = ".dom";
private static final String SERVER_ID_SUFFIX = ".server";
@@ -170,6 +181,17 @@
}
};
+ private static final FileFilter LAST_ROTATION_TIME_FILE_FILTER = new FileFilter()
+ {
+ @Override
+ public boolean accept(File file)
+ {
+ return file.isFile()
+ && file.getName().startsWith(LAST_ROTATION_TIME_FILE_PREFIX)
+ && file.getName().endsWith(LAST_ROTATION_TIME_FILE_SUFFIX);
+ }
+ };
+
/** Root path where the replication log is stored. */
private final String replicationRootPath;
/**
@@ -181,8 +203,16 @@
*/
private final ChangelogState changelogState;
- /** The list of logs that are in use. */
- private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
+ /** The list of logs that are in use for Replica DBs. */
+ private final List<Log<CSN, UpdateMsg>> logsReplicaDB = new CopyOnWriteArrayList<Log<CSN, UpdateMsg>>();
+
+ /**
+ * The list of logs that are in use for the CN Index DB.
+ * There is a single CN Index DB for a ReplicationServer, but there can be multiple references opened on it.
+ * This is the responsability of Log class to handle properly these multiple references.
+ */
+ private List<Log<Long, ChangeNumberIndexRecord>> logsCNIndexDB =
+ new CopyOnWriteArrayList<Log<Long, ChangeNumberIndexRecord>>();;
/**
* Maps each domain DN to a domain id that is used to name directory in file system.
@@ -205,6 +235,22 @@
private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+ /** The time service used for timing. */
+ private final TimeService timeService;
+
+ /**
+ * For CN Index DB, a log file can be rotated once it has exceeded a given time interval.
+ * <p>
+ * It is disabled if the interval is equals to zero.
+ * The interval can be modified at any time.
+ */
+ private long cnIndexDBRotationInterval;
+
+ /**
+ * For CN Index DB, the last time a log file was rotated.
+ * It is persisted to file each time it changes and read at server start. */
+ private long cnIndexDBLastRotationTime;
+
/**
* Creates the replication environment.
*
@@ -212,15 +258,34 @@
* Root path where replication log is stored.
* @param replicationServer
* The underlying replication server.
+ * @param timeService
+ * Time service to use for timing.
* @throws ChangelogException
* If an error occurs during initialization.
*/
ReplicationEnvironment(final String rootPath,
- final ReplicationServer replicationServer) throws ChangelogException
+ final ReplicationServer replicationServer, final TimeService timeService) throws ChangelogException
{
this.replicationRootPath = rootPath;
this.replicationServer = replicationServer;
+ this.timeService = timeService;
this.changelogState = readOnDiskChangelogState();
+ this.cnIndexDBLastRotationTime = readOnDiskLastRotationTime();
+ }
+
+ /**
+ * Sets the rotation time interval of a log file for the CN Index DB.
+ *
+ * @param timeInterval
+ * time interval for rotation of a log file.
+ */
+ void setCNIndexDBRotationInterval(long timeInterval)
+ {
+ cnIndexDBRotationInterval = timeInterval;
+ for (Log<Long, ChangeNumberIndexRecord> log : logsCNIndexDB)
+ {
+ log.setRotationInterval(cnIndexDBRotationInterval);
+ }
}
/**
@@ -257,6 +322,16 @@
}
/**
+ * Return the last rotation time for CN Index DB log files.
+ *
+ * @return the last rotation time in millis
+ */
+ long getCnIndexDBLastRotationTime()
+ {
+ return cnIndexDBLastRotationTime;
+ }
+
+ /**
* Finds or creates the log used to store changes from the replication server
* with the given serverId and the given baseDN.
*
@@ -299,7 +374,8 @@
ensureGenerationIdFileExists(generationIdPath);
changelogState.setDomainGenerationId(domainDN, generationId);
- return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
+ return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER,
+ new LogRotationParameters(REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES, 0, 0), logsReplicaDB);
}
}
catch (Exception e)
@@ -325,7 +401,9 @@
final File path = getCNIndexDBPath();
try
{
- return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
+ final LogRotationParameters rotationParams = new LogRotationParameters(CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES,
+ cnIndexDBRotationInterval, cnIndexDBLastRotationTime);
+ return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER, rotationParams, logsCNIndexDB);
}
catch (Exception e)
{
@@ -344,7 +422,8 @@
{
if (isShuttingDown.compareAndSet(false, true))
{
- logs.clear();
+ logsReplicaDB.clear();
+ logsCNIndexDB.clear();
}
}
@@ -409,6 +488,25 @@
}
/**
+ * Notify that log file has been rotated for provided log.
+ *
+ * The last rotation time is persisted to a file and read at startup time.
+ *
+ * @param log
+ * the log that has a file rotated.
+ * @throws ChangelogException
+ * If a problem occurs
+ */
+ void notifyLogFileRotation(Log<?, ?> log) throws ChangelogException
+ {
+ // only CN Index DB log rotation time is persisted
+ if (logsCNIndexDB.contains(log))
+ {
+ updateCNIndexDBLastRotationTime(timeService.now());
+ }
+ }
+
+ /**
* Notify that the replica corresponding to provided domain and provided CSN
* is offline.
*
@@ -654,16 +752,17 @@
}
/** Open a log from the provided path and record parser. */
- private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
- throws ChangelogException
+ private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser,
+ LogRotationParameters rotationParams, List<Log<K, V>> logsCache) throws ChangelogException
{
checkShutDownBeforeOpening(serverIdPath);
- final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
+ final Log<K, V> log = Log.openLog(this, serverIdPath, parser, rotationParams);
checkShutDownAfterOpening(serverIdPath, log);
- logs.add(log);
+ logsCache.add(log);
+
return log;
}
@@ -718,6 +817,46 @@
return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
}
+ /**
+ * Retrieve the last rotation time from the disk.
+ *
+ * @return the last rotation time in millis (which is the current time if no
+ * rotation file is found or if a problem occurs).
+ */
+ private long readOnDiskLastRotationTime()
+ {
+ try
+ {
+ final File file = retrieveLastRotationTimeFile();
+ if (file != null)
+ {
+ final String filename = file.getName();
+ final String value = filename.substring(LAST_ROTATION_TIME_FILE_PREFIX.length(),
+ filename.length() - LAST_ROTATION_TIME_FILE_SUFFIX.length());
+ return Long.valueOf(value);
+ }
+ }
+ catch (Exception e)
+ {
+ logger.trace(LocalizableMessage.raw("Error when retrieving last log file rotation time from file"), e);
+ }
+ // Default to current time
+ return timeService.now();
+ }
+
+ /**
+ * Retrieve the file named after the last rotation time from the provided
+ * directory.
+ *
+ * @return the last rotation time file or {@code null} if the corresponding file
+ * can't be found
+ */
+ private File retrieveLastRotationTimeFile()
+ {
+ File[] files = getCNIndexDBPath().listFiles(LAST_ROTATION_TIME_FILE_FILTER);
+ return (files != null && files.length > 0) ? files[0] : null;
+ }
+
private File getDomainPath(final String domainId)
{
return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
@@ -748,9 +887,16 @@
return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
}
+ private File getLastRotationTimePath(long lastRotationTime)
+ {
+ return new File(getCNIndexDBPath(),
+ LAST_ROTATION_TIME_FILE_PREFIX + lastRotationTime + LAST_ROTATION_TIME_FILE_SUFFIX);
+ }
+
private void closeLog(final Log<?, ?> log)
{
- logs.remove(log);
+ logsReplicaDB.remove(log);
+ logsCNIndexDB.remove(log);
log.close();
}
@@ -789,6 +935,30 @@
}
}
+ private void updateCNIndexDBLastRotationTime(final long lastRotationTime) throws ChangelogException {
+ final File previousRotationFile = retrieveLastRotationTimeFile();
+ final File newRotationFile = getLastRotationTimePath(lastRotationTime);
+ try
+ {
+ newRotationFile.createNewFile();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE.get(
+ newRotationFile.getPath(), lastRotationTime), e);
+ }
+ if (previousRotationFile != null)
+ {
+ final boolean isDeleted = previousRotationFile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE.get(
+ previousRotationFile.getPath()));
+ }
+ }
+ cnIndexDBLastRotationTime = lastRotationTime;
+ }
+
private void ensureGenerationIdFileExists(final File generationIdPath)
throws ChangelogException
{
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
index 948a221..2eabd59 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -37,7 +37,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -137,23 +136,12 @@
}
}
-
/**
- * This test makes basic operations of a ChangeNumberIndexDB:
- * <ol>
- * <li>create the db</li>
- * <li>add records</li>
- * <li>read them with a cursor</li>
- * <li>set a very short trim period</li>
- * <li>wait for the db to be trimmed / here since the changes are not stored
- * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
- * </ol>
+ * This test verifies purge is working by relying on a very short purge delay.
+ * The purge can be done only if there is at least one read-only log file, so
+ * this test ensures the rotation happens, using the rotation based on time.
*/
- // TODO : this works only if we ensure that there is a rotation of ahead log file
- // at the right place. First two records are 37 and 76 bytes long,
- // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
- // Re-enable this test when max file size is customizable for log
- @Test(enabled=false)
+ @Test
public void testPurge() throws Exception
{
ReplicationServer replicationServer = null;
@@ -163,49 +151,32 @@
final ChangelogDB changelogDB = replicationServer.getChangelogDB();
changelogDB.setPurgeDelay(0); // disable purging
- // Prepare data to be stored in the db
- DN baseDN1 = DN.valueOf("o=test1");
- DN baseDN2 = DN.valueOf("o=test2");
- DN baseDN3 = DN.valueOf("o=test3");
-
- CSN[] csns = generateCSNs(1, 0, 3);
-
// Add records
+ DN[] baseDNs = { DN.valueOf("o=test1"), DN.valueOf("o=test2"), DN.valueOf("o=test3"), DN.valueOf("o=test4") };
+ CSN[] csns = generateCSNs(1, 0, 4);
final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
- long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
- addRecord(cnIndexDB, baseDN2, csns[1]);
- long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+ long cn0 = addRecord(cnIndexDB, baseDNs[0], csns[0]);
+ addRecord(cnIndexDB, baseDNs[1], csns[1]);
+ long cn2 = addRecord(cnIndexDB, baseDNs[2], csns[2]);
- // The ChangeNumber should not get purged
- final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
- assertEquals(oldestCN, cn1);
- assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+ // The CN DB should not be purged at this point
+ assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn0);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn2);
- DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
- try
- {
- assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
- assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
- assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
- assertFalse(cursor.next());
- }
- finally
- {
- StaticUtils.close(cursor);
- }
+ // change the purge delay to a very short time
+ changelogDB.setPurgeDelay(5);
+ Thread.sleep(50);
+ // add a new record to force the rotation of the log
+ addRecord(cnIndexDB, baseDNs[3], csns[3]);
- // Now test that purging removes all changes but the last one
- changelogDB.setPurgeDelay(1);
+ // Now all changes should have been purged but the last one
int count = 0;
while (cnIndexDB.count() > 1 && count < 100)
{
Thread.sleep(10);
count++;
}
- assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
+ assertOnlyNewestRecordIsLeft(cnIndexDB, 4);
}
finally
{
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index 96640ca..15f24e6 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -34,6 +34,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.util.time.TimeService;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -423,7 +424,7 @@
replicationServer = configureReplicationServer(100000, 10);
testRoot = createCleanDir();
- dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
+ dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer, TimeService.SYSTEM);
replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
// Populate the db with 'max' msg
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
index b810314..b3dab49 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
@@ -39,7 +40,9 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
+import org.opends.server.replication.server.changelog.file.Record.Mapper;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
@@ -52,6 +55,8 @@
// Use a directory dedicated to this test class
private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+ private static final long NO_TIME_BASED_LOG_ROTATION = 0;
+
@BeforeMethod
public void initialize() throws Exception
{
@@ -79,9 +84,12 @@
// This allow to ensure rotation mechanism is thoroughly tested
// Some tests rely on having 2 records per log file (especially the purge tests), so take care
// if this value has to be changed
- int sizeLimitPerFileInBytes = 30;
+ final int sizeLimitPerFileInBytes = 30;
+ final LogRotationParameters rotationParams = new LogRotationParameters(sizeLimitPerFileInBytes,
+ NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION);
+ final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
- return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
+ return Log.openLog(replicationEnv, LOG_DIRECTORY, parser, rotationParams);
}
@Test
@@ -366,8 +374,11 @@
Log<String, String> writeLog = null;
try
{
- long sizeOf1MB = 1024*1024;
- writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
+ long sizeOf10MB = 10*1024*1024;
+ final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION,
+ NO_TIME_BASED_LOG_ROTATION);
+ final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class);
+ writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams);
for (int i = 1; i < 1000000; i++)
{
@@ -514,12 +525,10 @@
public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
int cursorStartIndex, int cursorEndIndex) throws Exception
{
- Log<String, String> log = null;
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
DBCursor<Record<String, String>> cursor = null;
try
{
- log = openLog(LogFileTest.RECORD_PARSER);
-
log.purgeUpTo(purgeKey);
cursor = log.getCursor();
@@ -533,6 +542,51 @@
}
}
+ static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
+ {
+ @Override
+ public Integer map(String value)
+ {
+ // extract numeric value, e.g. from "value10" return 10
+ return Integer.valueOf(value.substring("value".length()));
+ }
+ };
+
+ @DataProvider
+ Object[][] findBoundaryKeyData()
+ {
+ return new Object[][] {
+ // limit value, expected key
+ { 0, null },
+ { 1, "key001" },
+ { 2, "key001" },
+ { 3, "key003" },
+ { 4, "key003" },
+ { 5, "key005" },
+ { 6, "key005" },
+ { 7, "key007" },
+ { 8, "key007" },
+ { 9, "key009" },
+ { 10, "key009" },
+ { 11, "key009" },
+ { 12, "key009" },
+ };
+ }
+
+ @Test(dataProvider = "findBoundaryKeyData")
+ public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ try
+ {
+ assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey);
+ }
+ finally
+ {
+ StaticUtils.close(log);
+ }
+ }
+
private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
throws Exception
{
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index 3eeca60..57025f7 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -31,12 +31,14 @@
import java.util.List;
import org.assertj.core.data.MapEntry;
+import org.forgerock.util.time.TimeService;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
@@ -48,6 +50,7 @@
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
@SuppressWarnings("javadoc")
@@ -94,7 +97,7 @@
{
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
final DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
@@ -122,7 +125,7 @@
{
File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING));
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
for (int i = 0; i <= 2 ; i++)
{
@@ -163,7 +166,7 @@
{
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
final DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -193,7 +196,7 @@
{
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
final DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -218,7 +221,7 @@
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
final DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -249,7 +252,7 @@
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
final DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
@@ -278,7 +281,7 @@
final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
final DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
cnDB = environment.getOrCreateCNIndexDB();
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
@@ -309,13 +312,13 @@
{
File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
DN domainDN = DN.valueOf(DN1_AS_STRING);
- ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ ReplicationEnvironment environment = createReplicationEnv(rootPath);
replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
// delete the domain directory created for the 2 replica DBs to break the
// consistency with domain state file
- StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+ StaticUtils.recursiveDelete(new File(rootPath, "1.dom"));
environment.readOnDiskChangelogState();
}
@@ -324,4 +327,50 @@
StaticUtils.close(cnDB, replicaDB, replicaDB2);
}
}
+
+ private ReplicationEnvironment createReplicationEnv(File rootPath) throws ChangelogException
+ {
+ ReplicationServer unusedReplicationServer = null;
+ return new ReplicationEnvironment(rootPath.getAbsolutePath(), unusedReplicationServer, TimeService.SYSTEM);
+ }
+
+ @Test
+ public void testLastRotationTimeRetrievalWithNoRotationFile() throws Exception
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ TimeService time = mock(TimeService.class);
+ when(time.now()).thenReturn(100L);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
+
+ assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(100L);
+ }
+
+ @Test
+ public void testLastRotationTimeRetrievalWithRotationFile() throws Exception
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final TimeService time = mock(TimeService.class);
+ when(time.now()).thenReturn(100L, 200L);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
+ Log<Long,ChangeNumberIndexRecord> cnIndexDB = environment.getOrCreateCNIndexDB();
+
+ try {
+ environment.notifyLogFileRotation(cnIndexDB);
+
+ // check runtime change of last rotation time is effective
+ // this should also persist the time in a file, but this is checked later in the test
+ assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L);
+ }
+ finally
+ {
+ cnIndexDB.close();
+ environment.shutdown();
+ }
+
+ // now check last rotation time is correctly read from persisted file when re-creating environment
+ when(time.now()).thenReturn(0L);
+ environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time);
+ assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L);
+ }
+
}
--
Gitblit v1.10.0