OPENDJ-1690 CR-5724 Fix file-based changelog purging
- Log file rotation for CN Index DB is based on both size and time
- Time interval before log file rotation is a fraction of replication purge delay
- Last log file rotation time for CN index DB is stored in a file
| | |
| | | searching base DN '%s' with filter '%s' in changelog backend : %s |
| | | ERR_CHANGELOG_BACKEND_ATTRIBUTE_287 =An error occurred when \ |
| | | retrieving attribute value for attribute '%s' for entry DN '%s' in changelog backend : %s |
| | | ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE_288=Could not create \ |
| | | file '%s' to store last log rotation time %d |
| | | ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE_289=Could not delete \ |
| | | file '%s' that stored the previous last log rotation time |
| | |
| | | /** The parser of records stored in this ChangeNumberIndexDB. */ |
| | | static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser(); |
| | | |
| | | static final Record.Mapper<ChangeNumberIndexRecord, CSN> MAPPER_TO_CSN = |
| | | new Record.Mapper<ChangeNumberIndexRecord, CSN>() |
| | | { |
| | | @Override |
| | | public CSN map(ChangeNumberIndexRecord value) |
| | | { |
| | | return value.getCSN(); |
| | | } |
| | | }; |
| | | |
| | | /** The log in which records are persisted. */ |
| | | private final Log<Long, ChangeNumberIndexRecord> log; |
| | | |
| | |
| | | { |
| | | return null; |
| | | } |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime()); |
| | | // Retrieve the oldest change number that must not be purged |
| | | final Long purgeChangeNumber = log.findBoundaryKeyFromRecord(MAPPER_TO_CSN, purgeCSN); |
| | | if (purgeChangeNumber != null) |
| | | { |
| | | final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeChangeNumber); |
| | | return record != null ? record.getValue().getCSN() : null; |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Implements the Monitoring capabilities of the FileChangeNumberIndexDB. |
| | |
| | | import org.forgerock.i18n.LocalizableMessageBuilder; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.ChangelogBackend; |
| | |
| | | try |
| | | { |
| | | final File dbDir = getFileForPath(config.getReplicationDBDirectory()); |
| | | replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer); |
| | | replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer, TimeService.SYSTEM); |
| | | final ChangelogState changelogState = replicationEnv.getChangelogState(); |
| | | initializeToChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | |
| | | public void setPurgeDelay(final long purgeDelayInMillis) |
| | | { |
| | | this.purgeDelayInMillis = purgeDelayInMillis; |
| | | |
| | | // Rotation time interval for CN Index DB log file |
| | | // needs to be a fraction of the purge delay |
| | | // to ensure there is at least one file to purge |
| | | replicationEnv.setCNIndexDBRotationInterval(purgeDelayInMillis / 2); |
| | | |
| | | if (purgeDelayInMillis > 0) |
| | | { |
| | | final ChangelogDBPurger newPurger = new ChangelogDBPurger(); |
| | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | |
| | | * the string representation of lowest and highest key present in the log file.</li> |
| | | * </ul> |
| | | * A read-only log file is created each time the head log file has reached the |
| | | * maximum size limit. The head log file is then rotated to the read-only file and a |
| | | * new empty head log file is opened. There is no limit on the number of read-only |
| | | * files, but they can be purged. |
| | | * maximum size limit or the time limit. The head log file is then rotated to the |
| | | * read-only file and a new empty head log file is opened. There is no limit on the |
| | | * number of read-only files, but they can be purged. |
| | | * <p> |
| | | * A log is obtained using the {@code Log.openLog()} method and must always be |
| | | * released using the {@code close()} method. |
| | |
| | | private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>(); |
| | | |
| | | /** |
| | | * A log file is rotated once it has exceeded this size limit. The log file can have |
| | | * A log file can be rotated once it has exceeded this size limit. The log file can have |
| | | * a size much larger than this limit if the last record written has a huge size. |
| | | * |
| | | * TODO : to be replaced later by a (or a list of) configurable Rotation policy |
| | | * TODO : to be replaced later by a list of configurable Rotation policy |
| | | * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>(); |
| | | */ |
| | | private final long sizeLimitPerLogFileInBytes; |
| | | |
| | | /** The time service used for timing. It is package private so it can be modified by test case. */ |
| | | TimeService timeService = TimeService.SYSTEM; |
| | | |
| | | /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */ |
| | | private long rotationIntervalInMillis; |
| | | |
| | | /** The last time a log file was rotated. */ |
| | | private long lastRotationTime; |
| | | |
| | | /** |
| | | * The exclusive lock used for writes and lifecycle operations on this log: |
| | | * initialize, clear, sync and close. |
| | |
| | | private final Lock sharedLock; |
| | | |
| | | /** |
| | | * The replication environment used to create this log. The log is notifying it for any change |
| | | * that must be persisted. |
| | | */ |
| | | private final ReplicationEnvironment replicationEnv; |
| | | |
| | | /** |
| | | * Open a log with the provided log path, record parser and maximum size per |
| | | * log file. |
| | | * <p> |
| | |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param replicationEnv |
| | | * The replication environment used to create this log. |
| | | * @param logPath |
| | | * Path of the log. |
| | | * @param parser |
| | | * Parser for encoding/decoding of records. |
| | | * @param sizeLimitPerFileInBytes |
| | | * Limit in bytes before rotating the head log file of the log. |
| | | * @param rotationParameters |
| | | * Parameters for the log files rotation. |
| | | * @return a log |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath, |
| | | final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException |
| | | static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv, |
| | | final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters) |
| | | throws ChangelogException |
| | | { |
| | | Reject.ifNull(logPath, parser); |
| | | @SuppressWarnings("unchecked") |
| | | Log<K, V> log = (Log<K, V>) logsCache.get(logPath); |
| | | if (log == null) |
| | | { |
| | | log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes); |
| | | log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters); |
| | | logsCache.put(logPath, log); |
| | | } |
| | | else |
| | |
| | | return log; |
| | | } |
| | | |
| | | /** Holds the parameters for log files rotation. */ |
| | | static class LogRotationParameters { |
| | | |
| | | private final long sizeLimitPerFileInBytes; |
| | | private final long rotationInterval; |
| | | private final long lastRotationTime; |
| | | |
| | | /** |
| | | * Creates rotation parameters. |
| | | * |
| | | * @param sizeLimitPerFileInBytes |
| | | * Size limit before rotating a log file. |
| | | * @param rotationInterval |
| | | * Time interval before rotating a log file. |
| | | * @param lastRotationTime |
| | | * Last time a log file was rotated. |
| | | */ |
| | | LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime) |
| | | { |
| | | this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes; |
| | | this.rotationInterval = rotationInterval; |
| | | this.lastRotationTime = lastRotationTime; |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Set the time interval for rotation of log file. |
| | | * |
| | | * @param rotationIntervalInMillis |
| | | * time interval before rotation of log file |
| | | */ |
| | | void setRotationInterval(long rotationIntervalInMillis) |
| | | { |
| | | this.rotationIntervalInMillis = rotationIntervalInMillis; |
| | | } |
| | | |
| | | /** |
| | | * Release a reference to the log corresponding to provided path. The log is |
| | | * closed if this is the last reference. |
| | |
| | | /** |
| | | * Creates a new log. |
| | | * |
| | | * @param replicationEnv |
| | | * The replication environment used to create this log. |
| | | * @param logPath |
| | | * The directory path of the log. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @param sizeLimitPerFile |
| | | * Limit in bytes before rotating a log file. |
| | | * @param rotationParams |
| | | * Parameters for log-file rotation. |
| | | * |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile) |
| | | throws ChangelogException |
| | | private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser, |
| | | final LogRotationParameters rotationParams) throws ChangelogException |
| | | { |
| | | this.replicationEnv = replicationEnv; |
| | | this.logPath = logPath; |
| | | this.recordParser = parser; |
| | | this.sizeLimitPerLogFileInBytes = sizeLimitPerFile; |
| | | this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes; |
| | | this.rotationIntervalInMillis = rotationParams.rotationInterval; |
| | | this.lastRotationTime = rotationParams.lastRotationTime; |
| | | this.referenceCount = 1; |
| | | |
| | | final ReadWriteLock lock = new ReentrantReadWriteLock(false); |
| | |
| | | return; |
| | | } |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | if (mustRotate(headLogFile)) |
| | | { |
| | | logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | |
| | | rotateHeadLogFile(); |
| | | headLogFile = getHeadLogFile(); |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean mustRotate(LogFile<K, V> headLogFile) |
| | | { |
| | | if (lastAppendedKey == null) |
| | | { |
| | | // never rotate an empty file |
| | | return false; |
| | | } |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | { |
| | | // rotate because file size exceeded threshold |
| | | return true; |
| | | } |
| | | if (rotationIntervalInMillis > 0) |
| | | { |
| | | // rotate if time limit is reached |
| | | final long timeElapsed = timeService.since(lastRotationTime); |
| | | return timeElapsed > rotationIntervalInMillis; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Indicates if the provided record has a key that would break the key |
| | | * ordering in the log. |
| | |
| | | releaseLog(logPath); |
| | | } |
| | | |
| | | /** |
| | | * Find the highest key that corresponds to a record that is the oldest (or |
| | | * first) of a read-only log file and where value mapped from the record is |
| | | * lower or equals to provided limit value. |
| | | * <p> |
| | | * Example<br> |
| | | * Given a log with 3 log files, with Record<Int, String> and Mapper<String, |
| | | * Long> mapping a string to its long value |
| | | * <ul> |
| | | * <li>1_10.log where oldest record is (key=1, value="50")</li> |
| | | * <li>11_20.log where oldest record is (key=11, value="150")</li> |
| | | * <li>head.log where oldest record is (key=25, value="250")</li> |
| | | * </ul> |
| | | * Then |
| | | * <ul> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li> |
| | | * </ul> |
| | | * |
| | | * @param <V2> |
| | | * Type of the value extracted from the record |
| | | * @param mapper |
| | | * The mapper to extract a value from a record. It is expected that |
| | | * extracted values are ordered according to an order consistent with |
| | | * this log ordering, i.e. for two records, if key(R1) > key(R2) then |
| | | * extractedValue(R1) > extractedValue(R2). |
| | | * @param limitValue |
| | | * The limit value to search for |
| | | * @return the key or {@code null} if no key corresponds |
| | | * @throws ChangelogException |
| | | * If a problem occurs |
| | | */ |
| | | <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue) |
| | | throws ChangelogException |
| | | { |
| | | K key = null; |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | final Record<K, V> record = logFile.getOldestRecord(); |
| | | final V2 oldestValue = mapper.map(record.getValue()); |
| | | if (oldestValue.compareTo(limitValue) > 0) |
| | | { |
| | | return key; |
| | | } |
| | | key = record.getKey(); |
| | | } |
| | | return key; |
| | | } |
| | | |
| | | /** Effectively close this log. */ |
| | | private void doClose() |
| | | { |
| | |
| | | |
| | | // Re-enable cursors previously opened on head, with the saved state |
| | | updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead); |
| | | |
| | | // Notify even if time-based rotation is not enabled, as it could be enabled at any time |
| | | replicationEnv.notifyLogFileRotation(this); |
| | | } |
| | | |
| | | private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException |
| | |
| | | */ |
| | | class Record<K, V> |
| | | { |
| | | /** Map the record value to another value. */ |
| | | static interface Mapper<V, V2> { |
| | | /** |
| | | * Map a record value to another value. |
| | | * |
| | | * @param value |
| | | * The value to map |
| | | * @return the new value |
| | | */ |
| | | V2 map(V value); |
| | | } |
| | | |
| | | private final K key; |
| | | private final V value; |
| | | |
| | |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | * created : |
| | | * <ul> |
| | | * <li>A "changenumberindex" directory containing the log files for |
| | | * ChangeNumberIndexDB</li> |
| | | * ChangeNumberIndexDB, and a file named "rotationtime[millis].last" where [millis] is |
| | | * the time of the last log file rotation in milliseconds</li> |
| | | * <li>A "domains.state" file containing a mapping of each domain DN to an id. The |
| | | * id is used to name the corresponding domain directory.</li> |
| | | * <li>One directory per domain, named after "[id].domain" where [id] is the id |
| | |
| | | * <p> |
| | | * Each domain directory contains the following directories and files : |
| | | * <ul> |
| | | * <li>A "generation_[id].id" file, where [id] is the generation id</li> |
| | | * <li>A "generation[id].id" file, where [id] is the generation id</li> |
| | | * <li>One directory per server id, named after "[id].server" where [id] is the |
| | | * id of the server.</li> |
| | | * </ul> |
| | |
| | | * | \---changenumberindex |
| | | * | \--- head.log [contains last records written] |
| | | * | \--- 1_50.log [contains records with keys in interval [1, 50]] |
| | | * | \--- rtime198745512.last |
| | | * | \---1.domain |
| | | * | \---generation1.id |
| | | * | \---22.server |
| | |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024; |
| | | private static final long CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 1024 * 1024; |
| | | |
| | | private static final long REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES = 10 * CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES; |
| | | |
| | | private static final int NO_GENERATION_ID = -1; |
| | | |
| | |
| | | |
| | | static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state"; |
| | | |
| | | static final String LAST_ROTATION_TIME_FILE_PREFIX = "rotationtime"; |
| | | |
| | | static final String LAST_ROTATION_TIME_FILE_SUFFIX = ".ms"; |
| | | |
| | | private static final String DOMAIN_STATE_SEPARATOR = ":"; |
| | | |
| | | private static final String DOMAIN_SUFFIX = ".domain"; |
| | | private static final String DOMAIN_SUFFIX = ".dom"; |
| | | |
| | | private static final String SERVER_ID_SUFFIX = ".server"; |
| | | |
| | |
| | | } |
| | | }; |
| | | |
| | | private static final FileFilter LAST_ROTATION_TIME_FILE_FILTER = new FileFilter() |
| | | { |
| | | @Override |
| | | public boolean accept(File file) |
| | | { |
| | | return file.isFile() |
| | | && file.getName().startsWith(LAST_ROTATION_TIME_FILE_PREFIX) |
| | | && file.getName().endsWith(LAST_ROTATION_TIME_FILE_SUFFIX); |
| | | } |
| | | }; |
| | | |
| | | /** Root path where the replication log is stored. */ |
| | | private final String replicationRootPath; |
| | | /** |
| | |
| | | */ |
| | | private final ChangelogState changelogState; |
| | | |
| | | /** The list of logs that are in use. */ |
| | | private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>(); |
| | | /** The list of logs that are in use for Replica DBs. */ |
| | | private final List<Log<CSN, UpdateMsg>> logsReplicaDB = new CopyOnWriteArrayList<Log<CSN, UpdateMsg>>(); |
| | | |
| | | /** |
| | | * The list of logs that are in use for the CN Index DB. |
| | | * There is a single CN Index DB for a ReplicationServer, but there can be multiple references opened on it. |
| | | * This is the responsability of Log class to handle properly these multiple references. |
| | | */ |
| | | private List<Log<Long, ChangeNumberIndexRecord>> logsCNIndexDB = |
| | | new CopyOnWriteArrayList<Log<Long, ChangeNumberIndexRecord>>();; |
| | | |
| | | /** |
| | | * Maps each domain DN to a domain id that is used to name directory in file system. |
| | |
| | | |
| | | private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); |
| | | |
| | | /** The time service used for timing. */ |
| | | private final TimeService timeService; |
| | | |
| | | /** |
| | | * For CN Index DB, a log file can be rotated once it has exceeded a given time interval. |
| | | * <p> |
| | | * It is disabled if the interval is equals to zero. |
| | | * The interval can be modified at any time. |
| | | */ |
| | | private long cnIndexDBRotationInterval; |
| | | |
| | | /** |
| | | * For CN Index DB, the last time a log file was rotated. |
| | | * It is persisted to file each time it changes and read at server start. */ |
| | | private long cnIndexDBLastRotationTime; |
| | | |
| | | /** |
| | | * Creates the replication environment. |
| | | * |
| | |
| | | * Root path where replication log is stored. |
| | | * @param replicationServer |
| | | * The underlying replication server. |
| | | * @param timeService |
| | | * Time service to use for timing. |
| | | * @throws ChangelogException |
| | | * If an error occurs during initialization. |
| | | */ |
| | | ReplicationEnvironment(final String rootPath, |
| | | final ReplicationServer replicationServer) throws ChangelogException |
| | | final ReplicationServer replicationServer, final TimeService timeService) throws ChangelogException |
| | | { |
| | | this.replicationRootPath = rootPath; |
| | | this.replicationServer = replicationServer; |
| | | this.timeService = timeService; |
| | | this.changelogState = readOnDiskChangelogState(); |
| | | this.cnIndexDBLastRotationTime = readOnDiskLastRotationTime(); |
| | | } |
| | | |
| | | /** |
| | | * Sets the rotation time interval of a log file for the CN Index DB. |
| | | * |
| | | * @param timeInterval |
| | | * time interval for rotation of a log file. |
| | | */ |
| | | void setCNIndexDBRotationInterval(long timeInterval) |
| | | { |
| | | cnIndexDBRotationInterval = timeInterval; |
| | | for (Log<Long, ChangeNumberIndexRecord> log : logsCNIndexDB) |
| | | { |
| | | log.setRotationInterval(cnIndexDBRotationInterval); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the last rotation time for CN Index DB log files. |
| | | * |
| | | * @return the last rotation time in millis |
| | | */ |
| | | long getCnIndexDBLastRotationTime() |
| | | { |
| | | return cnIndexDBLastRotationTime; |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the log used to store changes from the replication server |
| | | * with the given serverId and the given baseDN. |
| | | * |
| | |
| | | ensureGenerationIdFileExists(generationIdPath); |
| | | changelogState.setDomainGenerationId(domainDN, generationId); |
| | | |
| | | return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER); |
| | | return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER, |
| | | new LogRotationParameters(REPLICA_DB_MAX_LOG_FILE_SIZE_IN_BYTES, 0, 0), logsReplicaDB); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | final File path = getCNIndexDBPath(); |
| | | try |
| | | { |
| | | return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER); |
| | | final LogRotationParameters rotationParams = new LogRotationParameters(CN_INDEX_DB_MAX_LOG_FILE_SIZE_IN_BYTES, |
| | | cnIndexDBRotationInterval, cnIndexDBLastRotationTime); |
| | | return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER, rotationParams, logsCNIndexDB); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | { |
| | | if (isShuttingDown.compareAndSet(false, true)) |
| | | { |
| | | logs.clear(); |
| | | logsReplicaDB.clear(); |
| | | logsCNIndexDB.clear(); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Notify that log file has been rotated for provided log. |
| | | * |
| | | * The last rotation time is persisted to a file and read at startup time. |
| | | * |
| | | * @param log |
| | | * the log that has a file rotated. |
| | | * @throws ChangelogException |
| | | * If a problem occurs |
| | | */ |
| | | void notifyLogFileRotation(Log<?, ?> log) throws ChangelogException |
| | | { |
| | | // only CN Index DB log rotation time is persisted |
| | | if (logsCNIndexDB.contains(log)) |
| | | { |
| | | updateCNIndexDBLastRotationTime(timeService.now()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notify that the replica corresponding to provided domain and provided CSN |
| | | * is offline. |
| | | * |
| | |
| | | } |
| | | |
| | | /** Open a log from the provided path and record parser. */ |
| | | private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser) |
| | | throws ChangelogException |
| | | private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser, |
| | | LogRotationParameters rotationParams, List<Log<K, V>> logsCache) throws ChangelogException |
| | | { |
| | | checkShutDownBeforeOpening(serverIdPath); |
| | | |
| | | final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES); |
| | | final Log<K, V> log = Log.openLog(this, serverIdPath, parser, rotationParams); |
| | | |
| | | checkShutDownAfterOpening(serverIdPath, log); |
| | | |
| | | logs.add(log); |
| | | logsCache.add(log); |
| | | |
| | | return log; |
| | | } |
| | | |
| | |
| | | return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null; |
| | | } |
| | | |
| | | /** |
| | | * Retrieve the last rotation time from the disk. |
| | | * |
| | | * @return the last rotation time in millis (which is the current time if no |
| | | * rotation file is found or if a problem occurs). |
| | | */ |
| | | private long readOnDiskLastRotationTime() |
| | | { |
| | | try |
| | | { |
| | | final File file = retrieveLastRotationTimeFile(); |
| | | if (file != null) |
| | | { |
| | | final String filename = file.getName(); |
| | | final String value = filename.substring(LAST_ROTATION_TIME_FILE_PREFIX.length(), |
| | | filename.length() - LAST_ROTATION_TIME_FILE_SUFFIX.length()); |
| | | return Long.valueOf(value); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.trace(LocalizableMessage.raw("Error when retrieving last log file rotation time from file"), e); |
| | | } |
| | | // Default to current time |
| | | return timeService.now(); |
| | | } |
| | | |
| | | /** |
| | | * Retrieve the file named after the last rotation time from the provided |
| | | * directory. |
| | | * |
| | | * @return the last rotation time file or {@code null} if the corresponding file |
| | | * can't be found |
| | | */ |
| | | private File retrieveLastRotationTimeFile() |
| | | { |
| | | File[] files = getCNIndexDBPath().listFiles(LAST_ROTATION_TIME_FILE_FILTER); |
| | | return (files != null && files.length > 0) ? files[0] : null; |
| | | } |
| | | |
| | | private File getDomainPath(final String domainId) |
| | | { |
| | | return new File(replicationRootPath, domainId + DOMAIN_SUFFIX); |
| | |
| | | return new File(replicationRootPath, CN_INDEX_DB_DIRNAME); |
| | | } |
| | | |
| | | private File getLastRotationTimePath(long lastRotationTime) |
| | | { |
| | | return new File(getCNIndexDBPath(), |
| | | LAST_ROTATION_TIME_FILE_PREFIX + lastRotationTime + LAST_ROTATION_TIME_FILE_SUFFIX); |
| | | } |
| | | |
| | | private void closeLog(final Log<?, ?> log) |
| | | { |
| | | logs.remove(log); |
| | | logsReplicaDB.remove(log); |
| | | logsCNIndexDB.remove(log); |
| | | log.close(); |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void updateCNIndexDBLastRotationTime(final long lastRotationTime) throws ChangelogException { |
| | | final File previousRotationFile = retrieveLastRotationTimeFile(); |
| | | final File newRotationFile = getLastRotationTimePath(lastRotationTime); |
| | | try |
| | | { |
| | | newRotationFile.createNewFile(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LAST_LOG_ROTATION_TIME_FILE.get( |
| | | newRotationFile.getPath(), lastRotationTime), e); |
| | | } |
| | | if (previousRotationFile != null) |
| | | { |
| | | final boolean isDeleted = previousRotationFile.delete(); |
| | | if (!isDeleted) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LAST_LOG_ROTATION_TIME_FILE.get( |
| | | previousRotationFile.getPath())); |
| | | } |
| | | } |
| | | cnIndexDBLastRotationTime = lastRotationTime; |
| | | } |
| | | |
| | | private void ensureGenerationIdFileExists(final File generationIdPath) |
| | | throws ChangelogException |
| | | { |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This test makes basic operations of a ChangeNumberIndexDB: |
| | | * <ol> |
| | | * <li>create the db</li> |
| | | * <li>add records</li> |
| | | * <li>read them with a cursor</li> |
| | | * <li>set a very short trim period</li> |
| | | * <li>wait for the db to be trimmed / here since the changes are not stored |
| | | * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li> |
| | | * </ol> |
| | | * This test verifies purge is working by relying on a very short purge delay. |
| | | * The purge can be done only if there is at least one read-only log file, so |
| | | * this test ensures the rotation happens, using the rotation based on time. |
| | | */ |
| | | // TODO : this works only if we ensure that there is a rotation of ahead log file |
| | | // at the right place. First two records are 37 and 76 bytes long, |
| | | // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file |
| | | // Re-enable this test when max file size is customizable for log |
| | | @Test(enabled=false) |
| | | @Test |
| | | public void testPurge() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | changelogDB.setPurgeDelay(0); // disable purging |
| | | |
| | | // Prepare data to be stored in the db |
| | | DN baseDN1 = DN.valueOf("o=test1"); |
| | | DN baseDN2 = DN.valueOf("o=test2"); |
| | | DN baseDN3 = DN.valueOf("o=test3"); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | DN[] baseDNs = { DN.valueOf("o=test1"), DN.valueOf("o=test2"), DN.valueOf("o=test3"), DN.valueOf("o=test4") }; |
| | | CSN[] csns = generateCSNs(1, 0, 4); |
| | | final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | | long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); |
| | | addRecord(cnIndexDB, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); |
| | | long cn0 = addRecord(cnIndexDB, baseDNs[0], csns[0]); |
| | | addRecord(cnIndexDB, baseDNs[1], csns[1]); |
| | | long cn2 = addRecord(cnIndexDB, baseDNs[2], csns[2]); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber(); |
| | | assertEquals(oldestCN, cn1); |
| | | assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3); |
| | | // The CN DB should not be purged at this point |
| | | assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn0); |
| | | assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn2); |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN); |
| | | try |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3); |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | // change the purge delay to a very short time |
| | | changelogDB.setPurgeDelay(5); |
| | | Thread.sleep(50); |
| | | // add a new record to force the rotation of the log |
| | | addRecord(cnIndexDB, baseDNs[3], csns[3]); |
| | | |
| | | // Now test that purging removes all changes but the last one |
| | | changelogDB.setPurgeDelay(1); |
| | | // Now all changes should have been purged but the last one |
| | | int count = 0; |
| | | while (cnIndexDB.count() > 1 && count < 100) |
| | | { |
| | | Thread.sleep(10); |
| | | count++; |
| | | } |
| | | assertOnlyNewestRecordIsLeft(cnIndexDB, 3); |
| | | assertOnlyNewestRecordIsLeft(cnIndexDB, 4); |
| | | } |
| | | finally |
| | | { |
| | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | |
| | | testRoot = createCleanDir(); |
| | | dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer); |
| | | dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer, TimeService.SYSTEM); |
| | | replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv); |
| | | |
| | | // Populate the db with 'max' msg |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Mockito.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.file.LogFileTest.*; |
| | |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters; |
| | | import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser; |
| | | import org.opends.server.replication.server.changelog.file.Record.Mapper; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | |
| | | // Use a directory dedicated to this test class |
| | | private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | private static final long NO_TIME_BASED_LOG_ROTATION = 0; |
| | | |
| | | @BeforeMethod |
| | | public void initialize() throws Exception |
| | | { |
| | |
| | | // This allow to ensure rotation mechanism is thoroughly tested |
| | | // Some tests rely on having 2 records per log file (especially the purge tests), so take care |
| | | // if this value has to be changed |
| | | int sizeLimitPerFileInBytes = 30; |
| | | final int sizeLimitPerFileInBytes = 30; |
| | | final LogRotationParameters rotationParams = new LogRotationParameters(sizeLimitPerFileInBytes, |
| | | NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION); |
| | | final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class); |
| | | |
| | | return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes); |
| | | return Log.openLog(replicationEnv, LOG_DIRECTORY, parser, rotationParams); |
| | | } |
| | | |
| | | @Test |
| | |
| | | Log<String, String> writeLog = null; |
| | | try |
| | | { |
| | | long sizeOf1MB = 1024*1024; |
| | | writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB); |
| | | long sizeOf10MB = 10*1024*1024; |
| | | final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION, |
| | | NO_TIME_BASED_LOG_ROTATION); |
| | | final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class); |
| | | writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams); |
| | | |
| | | for (int i = 1; i < 1000000; i++) |
| | | { |
| | |
| | | public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge, |
| | | int cursorStartIndex, int cursorEndIndex) throws Exception |
| | | { |
| | | Log<String, String> log = null; |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | log.purgeUpTo(purgeKey); |
| | | |
| | | cursor = log.getCursor(); |
| | |
| | | } |
| | | } |
| | | |
| | | static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>() |
| | | { |
| | | @Override |
| | | public Integer map(String value) |
| | | { |
| | | // extract numeric value, e.g. from "value10" return 10 |
| | | return Integer.valueOf(value.substring("value".length())); |
| | | } |
| | | }; |
| | | |
| | | @DataProvider |
| | | Object[][] findBoundaryKeyData() |
| | | { |
| | | return new Object[][] { |
| | | // limit value, expected key |
| | | { 0, null }, |
| | | { 1, "key001" }, |
| | | { 2, "key001" }, |
| | | { 3, "key003" }, |
| | | { 4, "key003" }, |
| | | { 5, "key005" }, |
| | | { 6, "key005" }, |
| | | { 7, "key007" }, |
| | | { 8, "key007" }, |
| | | { 9, "key009" }, |
| | | { 10, "key009" }, |
| | | { 11, "key009" }, |
| | | { 12, "key009" }, |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider = "findBoundaryKeyData") |
| | | public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | { |
| | | assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | |
| | | import java.util.List; |
| | | |
| | | import org.assertj.core.data.MapEntry; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Mockito.*; |
| | | import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | |
| | | { |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING)); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | for (int i = 0; i <= 2 ; i++) |
| | | { |
| | |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | |
| | | { |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | ReplicationEnvironment environment = createReplicationEnv(rootPath); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | | |
| | | // delete the domain directory created for the 2 replica DBs to break the |
| | | // consistency with domain state file |
| | | StaticUtils.recursiveDelete(new File(rootPath, "1.domain")); |
| | | StaticUtils.recursiveDelete(new File(rootPath, "1.dom")); |
| | | |
| | | environment.readOnDiskChangelogState(); |
| | | } |
| | |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | } |
| | | } |
| | | |
| | | private ReplicationEnvironment createReplicationEnv(File rootPath) throws ChangelogException |
| | | { |
| | | ReplicationServer unusedReplicationServer = null; |
| | | return new ReplicationEnvironment(rootPath.getAbsolutePath(), unusedReplicationServer, TimeService.SYSTEM); |
| | | } |
| | | |
| | | @Test |
| | | public void testLastRotationTimeRetrievalWithNoRotationFile() throws Exception |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | TimeService time = mock(TimeService.class); |
| | | when(time.now()).thenReturn(100L); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time); |
| | | |
| | | assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(100L); |
| | | } |
| | | |
| | | @Test |
| | | public void testLastRotationTimeRetrievalWithRotationFile() throws Exception |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final TimeService time = mock(TimeService.class); |
| | | when(time.now()).thenReturn(100L, 200L); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time); |
| | | Log<Long,ChangeNumberIndexRecord> cnIndexDB = environment.getOrCreateCNIndexDB(); |
| | | |
| | | try { |
| | | environment.notifyLogFileRotation(cnIndexDB); |
| | | |
| | | // check runtime change of last rotation time is effective |
| | | // this should also persist the time in a file, but this is checked later in the test |
| | | assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L); |
| | | } |
| | | finally |
| | | { |
| | | cnIndexDB.close(); |
| | | environment.shutdown(); |
| | | } |
| | | |
| | | // now check last rotation time is correctly read from persisted file when re-creating environment |
| | | when(time.now()).thenReturn(0L); |
| | | environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null, time); |
| | | assertThat(environment.getCnIndexDBLastRotationTime()).isEqualTo(200L); |
| | | } |
| | | |
| | | } |