| | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.forgerock.util.time.TimeService; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | |
| | | * the string representation of lowest and highest key present in the log file.</li> |
| | | * </ul> |
| | | * A read-only log file is created each time the head log file has reached the |
| | | * maximum size limit. The head log file is then rotated to the read-only file and a |
| | | * new empty head log file is opened. There is no limit on the number of read-only |
| | | * files, but they can be purged. |
| | | * maximum size limit or the time limit. The head log file is then rotated to the |
| | | * read-only file and a new empty head log file is opened. There is no limit on the |
| | | * number of read-only files, but they can be purged. |
| | | * <p> |
| | | * A log is obtained using the {@code Log.openLog()} method and must always be |
| | | * released using the {@code close()} method. |
| | |
| | | private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>(); |
| | | |
| | | /** |
| | | * A log file is rotated once it has exceeded this size limit. The log file can have |
| | | * A log file can be rotated once it has exceeded this size limit. The log file can have |
| | | * a size much larger than this limit if the last record written has a huge size. |
| | | * |
| | | * TODO : to be replaced later by a (or a list of) configurable Rotation policy |
| | | * TODO : to be replaced later by a list of configurable Rotation policy |
| | | * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>(); |
| | | */ |
| | | private final long sizeLimitPerLogFileInBytes; |
| | | |
| | | /** The time service used for timing. It is package private so it can be modified by test case. */ |
| | | TimeService timeService = TimeService.SYSTEM; |
| | | |
| | | /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */ |
| | | private long rotationIntervalInMillis; |
| | | |
| | | /** The last time a log file was rotated. */ |
| | | private long lastRotationTime; |
| | | |
| | | /** |
| | | * The exclusive lock used for writes and lifecycle operations on this log: |
| | | * initialize, clear, sync and close. |
| | |
| | | private final Lock sharedLock; |
| | | |
| | | /** |
| | | * The replication environment used to create this log. The log is notifying it for any change |
| | | * that must be persisted. |
| | | */ |
| | | private final ReplicationEnvironment replicationEnv; |
| | | |
| | | /** |
| | | * Open a log with the provided log path, record parser and maximum size per |
| | | * log file. |
| | | * <p> |
| | |
| | | * Type of the key of a record, which must be comparable. |
| | | * @param <V> |
| | | * Type of the value of a record. |
| | | * @param replicationEnv |
| | | * The replication environment used to create this log. |
| | | * @param logPath |
| | | * Path of the log. |
| | | * @param parser |
| | | * Parser for encoding/decoding of records. |
| | | * @param sizeLimitPerFileInBytes |
| | | * Limit in bytes before rotating the head log file of the log. |
| | | * @param rotationParameters |
| | | * Parameters for the log files rotation. |
| | | * @return a log |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath, |
| | | final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException |
| | | static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv, |
| | | final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters) |
| | | throws ChangelogException |
| | | { |
| | | Reject.ifNull(logPath, parser); |
| | | @SuppressWarnings("unchecked") |
| | | Log<K, V> log = (Log<K, V>) logsCache.get(logPath); |
| | | if (log == null) |
| | | { |
| | | log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes); |
| | | log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters); |
| | | logsCache.put(logPath, log); |
| | | } |
| | | else |
| | |
| | | return log; |
| | | } |
| | | |
| | | /** Holds the parameters for log files rotation. */ |
| | | static class LogRotationParameters { |
| | | |
| | | private final long sizeLimitPerFileInBytes; |
| | | private final long rotationInterval; |
| | | private final long lastRotationTime; |
| | | |
| | | /** |
| | | * Creates rotation parameters. |
| | | * |
| | | * @param sizeLimitPerFileInBytes |
| | | * Size limit before rotating a log file. |
| | | * @param rotationInterval |
| | | * Time interval before rotating a log file. |
| | | * @param lastRotationTime |
| | | * Last time a log file was rotated. |
| | | */ |
| | | LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime) |
| | | { |
| | | this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes; |
| | | this.rotationInterval = rotationInterval; |
| | | this.lastRotationTime = lastRotationTime; |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Set the time interval for rotation of log file. |
| | | * |
| | | * @param rotationIntervalInMillis |
| | | * time interval before rotation of log file |
| | | */ |
| | | void setRotationInterval(long rotationIntervalInMillis) |
| | | { |
| | | this.rotationIntervalInMillis = rotationIntervalInMillis; |
| | | } |
| | | |
| | | /** |
| | | * Release a reference to the log corresponding to provided path. The log is |
| | | * closed if this is the last reference. |
| | |
| | | /** |
| | | * Creates a new log. |
| | | * |
| | | * @param replicationEnv |
| | | * The replication environment used to create this log. |
| | | * @param logPath |
| | | * The directory path of the log. |
| | | * @param parser |
| | | * Parser of records. |
| | | * @param sizeLimitPerFile |
| | | * Limit in bytes before rotating a log file. |
| | | * @param rotationParams |
| | | * Parameters for log-file rotation. |
| | | * |
| | | * @throws ChangelogException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile) |
| | | throws ChangelogException |
| | | private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser, |
| | | final LogRotationParameters rotationParams) throws ChangelogException |
| | | { |
| | | this.replicationEnv = replicationEnv; |
| | | this.logPath = logPath; |
| | | this.recordParser = parser; |
| | | this.sizeLimitPerLogFileInBytes = sizeLimitPerFile; |
| | | this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes; |
| | | this.rotationIntervalInMillis = rotationParams.rotationInterval; |
| | | this.lastRotationTime = rotationParams.lastRotationTime; |
| | | this.referenceCount = 1; |
| | | |
| | | final ReadWriteLock lock = new ReentrantReadWriteLock(false); |
| | |
| | | return; |
| | | } |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | if (mustRotate(headLogFile)) |
| | | { |
| | | logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | | |
| | | rotateHeadLogFile(); |
| | | headLogFile = getHeadLogFile(); |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean mustRotate(LogFile<K, V> headLogFile) |
| | | { |
| | | if (lastAppendedKey == null) |
| | | { |
| | | // never rotate an empty file |
| | | return false; |
| | | } |
| | | if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes) |
| | | { |
| | | // rotate because file size exceeded threshold |
| | | return true; |
| | | } |
| | | if (rotationIntervalInMillis > 0) |
| | | { |
| | | // rotate if time limit is reached |
| | | final long timeElapsed = timeService.since(lastRotationTime); |
| | | return timeElapsed > rotationIntervalInMillis; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Indicates if the provided record has a key that would break the key |
| | | * ordering in the log. |
| | |
| | | releaseLog(logPath); |
| | | } |
| | | |
| | | /** |
| | | * Find the highest key that corresponds to a record that is the oldest (or |
| | | * first) of a read-only log file and where value mapped from the record is |
| | | * lower or equals to provided limit value. |
| | | * <p> |
| | | * Example<br> |
| | | * Given a log with 3 log files, with Record<Int, String> and Mapper<String, |
| | | * Long> mapping a string to its long value |
| | | * <ul> |
| | | * <li>1_10.log where oldest record is (key=1, value="50")</li> |
| | | * <li>11_20.log where oldest record is (key=11, value="150")</li> |
| | | * <li>head.log where oldest record is (key=25, value="250")</li> |
| | | * </ul> |
| | | * Then |
| | | * <ul> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li> |
| | | * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li> |
| | | * </ul> |
| | | * |
| | | * @param <V2> |
| | | * Type of the value extracted from the record |
| | | * @param mapper |
| | | * The mapper to extract a value from a record. It is expected that |
| | | * extracted values are ordered according to an order consistent with |
| | | * this log ordering, i.e. for two records, if key(R1) > key(R2) then |
| | | * extractedValue(R1) > extractedValue(R2). |
| | | * @param limitValue |
| | | * The limit value to search for |
| | | * @return the key or {@code null} if no key corresponds |
| | | * @throws ChangelogException |
| | | * If a problem occurs |
| | | */ |
| | | <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue) |
| | | throws ChangelogException |
| | | { |
| | | K key = null; |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | final Record<K, V> record = logFile.getOldestRecord(); |
| | | final V2 oldestValue = mapper.map(record.getValue()); |
| | | if (oldestValue.compareTo(limitValue) > 0) |
| | | { |
| | | return key; |
| | | } |
| | | key = record.getKey(); |
| | | } |
| | | return key; |
| | | } |
| | | |
| | | /** Effectively close this log. */ |
| | | private void doClose() |
| | | { |
| | |
| | | |
| | | // Re-enable cursors previously opened on head, with the saved state |
| | | updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead); |
| | | |
| | | // Notify even if time-based rotation is not enabled, as it could be enabled at any time |
| | | replicationEnv.notifyLogFileRotation(this); |
| | | } |
| | | |
| | | private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException |