mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
23.39.2014 0fbd274fb694c50ed0b599dfbda44560070b8f8a
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -51,6 +51,7 @@
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.forgerock.util.time.TimeService;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -74,9 +75,9 @@
 * the string representation of lowest and highest key present in the log file.</li>
 * </ul>
 * A read-only log file is created each time the head log file has reached the
 * maximum size limit. The head log file is then rotated to the read-only file and a
 * new empty head log file is opened. There is no limit on the number of read-only
 * files, but they can be purged.
 * maximum size limit or the time limit. The head log file is then rotated to the
 * read-only file and a new empty head log file is opened. There is no limit on the
 * number of read-only files, but they can be purged.
 * <p>
 * A log is obtained using the {@code Log.openLog()} method and must always be
 * released using the {@code close()} method.
@@ -170,14 +171,23 @@
  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
  /**
   * A log file is rotated once it has exceeded this size limit. The log file can have
   * A log file can be rotated once it has exceeded this size limit. The log file can have
   * a size much larger than this limit if the last record written has a huge size.
   *
   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
   * TODO : to be replaced later by a list of configurable Rotation policy
   * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
   */
  private final long sizeLimitPerLogFileInBytes;
  /** The time service used for timing. It is package private so it can be modified by test case. */
  TimeService timeService = TimeService.SYSTEM;
  /** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
  private long rotationIntervalInMillis;
  /** The last time a log file was rotated. */
  private long lastRotationTime;
  /**
   * The exclusive lock used for writes and lifecycle operations on this log:
   * initialize, clear, sync and close.
@@ -190,6 +200,12 @@
  private final Lock sharedLock;
  /**
   * The replication environment used to create this log. The log is notifying it for any change
   * that must be persisted.
   */
  private final ReplicationEnvironment replicationEnv;
  /**
   * Open a log with the provided log path, record parser and maximum size per
   * log file.
   * <p>
@@ -199,25 +215,28 @@
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param replicationEnv
   *          The replication environment used to create this log.
   * @param logPath
   *          Path of the log.
   * @param parser
   *          Parser for encoding/decoding of records.
   * @param sizeLimitPerFileInBytes
   *          Limit in bytes before rotating the head log file of the log.
   * @param rotationParameters
   *          Parameters for the log files rotation.
   * @return a log
   * @throws ChangelogException
   *           If a problem occurs during initialization.
   */
  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final ReplicationEnvironment replicationEnv,
      final File logPath, final RecordParser<K, V> parser, final LogRotationParameters rotationParameters)
      throws ChangelogException
  {
    Reject.ifNull(logPath, parser);
    @SuppressWarnings("unchecked")
    Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
    if (log == null)
    {
      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
      log = new Log<K, V>(replicationEnv, logPath, parser, rotationParameters);
      logsCache.put(logPath, log);
    }
    else
@@ -229,6 +248,43 @@
    return log;
  }
  /** Holds the parameters for log files rotation. */
  static class LogRotationParameters {
    private final long sizeLimitPerFileInBytes;
    private final long rotationInterval;
    private final long lastRotationTime;
    /**
     * Creates rotation parameters.
     *
     * @param sizeLimitPerFileInBytes
     *           Size limit before rotating a log file.
     * @param rotationInterval
     *           Time interval before rotating a log file.
     * @param lastRotationTime
     *           Last time a log file was rotated.
     */
    LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
    {
      this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
      this.rotationInterval = rotationInterval;
      this.lastRotationTime = lastRotationTime;
    }
  }
  /**
   * Set the time interval for rotation of log file.
   *
   * @param rotationIntervalInMillis
   *           time interval before rotation of log file
   */
  void setRotationInterval(long rotationIntervalInMillis)
  {
    this.rotationIntervalInMillis = rotationIntervalInMillis;
  }
  /**
   * Release a reference to the log corresponding to provided path. The log is
   * closed if this is the last reference.
@@ -256,21 +312,27 @@
  /**
   * Creates a new log.
   *
   * @param replicationEnv
   *            The replication environment used to create this log.
   * @param logPath
   *            The directory path of the log.
   * @param parser
   *          Parser of records.
   * @param sizeLimitPerFile
   *            Limit in bytes before rotating a log file.
   * @param rotationParams
   *          Parameters for log-file rotation.
   *
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
      throws ChangelogException
  private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser<K, V> parser,
      final LogRotationParameters rotationParams) throws ChangelogException
  {
    this.replicationEnv = replicationEnv;
    this.logPath = logPath;
    this.recordParser = parser;
    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
    this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
    this.rotationIntervalInMillis = rotationParams.rotationInterval;
    this.lastRotationTime = rotationParams.lastRotationTime;
    this.referenceCount = 1;
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
@@ -376,9 +438,9 @@
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
      if (mustRotate(headLogFile))
      {
        logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        logger.info(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        rotateHeadLogFile();
        headLogFile = getHeadLogFile();
@@ -392,6 +454,27 @@
    }
  }
  private boolean mustRotate(LogFile<K, V> headLogFile)
  {
    if (lastAppendedKey == null)
    {
      // never rotate an empty file
      return false;
    }
    if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
    {
      // rotate because file size exceeded threshold
      return true;
    }
    if (rotationIntervalInMillis > 0)
    {
      // rotate if time limit is reached
      final long timeElapsed = timeService.since(lastRotationTime);
      return timeElapsed > rotationIntervalInMillis;
    }
    return false;
  }
  /**
   * Indicates if the provided record has a key that would break the key
   * ordering in the log.
@@ -711,6 +794,60 @@
    releaseLog(logPath);
  }
  /**
   * Find the highest key that corresponds to a record that is the oldest (or
   * first) of a read-only log file and where value mapped from the record is
   * lower or equals to provided limit value.
   * <p>
   * Example<br>
   * Given a log with 3 log files, with Record<Int, String> and Mapper<String,
   * Long> mapping a string to its long value
   * <ul>
   * <li>1_10.log where oldest record is (key=1, value="50")</li>
   * <li>11_20.log where oldest record is (key=11, value="150")</li>
   * <li>head.log where oldest record is (key=25, value="250")</li>
   * </ul>
   * Then
   * <ul>
   * <li>findBoundaryKeyFromRecord(mapper, 20) => null</li>
   * <li>findBoundaryKeyFromRecord(mapper, 50) => 1</li>
   * <li>findBoundaryKeyFromRecord(mapper, 100) => 1</li>
   * <li>findBoundaryKeyFromRecord(mapper, 150) => 11</li>
   * <li>findBoundaryKeyFromRecord(mapper, 200) => 11</li>
   * <li>findBoundaryKeyFromRecord(mapper, 250) => 25</li>
   * <li>findBoundaryKeyFromRecord(mapper, 300) => 25</li>
   * </ul>
   *
   * @param <V2>
   *          Type of the value extracted from the record
   * @param mapper
   *          The mapper to extract a value from a record. It is expected that
   *          extracted values are ordered according to an order consistent with
   *          this log ordering, i.e. for two records, if key(R1) > key(R2) then
   *          extractedValue(R1) > extractedValue(R2).
   * @param limitValue
   *          The limit value to search for
   * @return the key or {@code null} if no key corresponds
   * @throws ChangelogException
   *           If a problem occurs
   */
  <V2 extends Comparable<V2>> K findBoundaryKeyFromRecord(Record.Mapper<V, V2> mapper, V2 limitValue)
      throws ChangelogException
  {
    K key = null;
    for (LogFile<K, V> logFile : logFiles.values())
    {
      final Record<K, V> record = logFile.getOldestRecord();
      final V2 oldestValue = mapper.map(record.getValue());
      if (oldestValue.compareTo(limitValue) > 0)
      {
        return key;
      }
      key = record.getKey();
    }
    return key;
  }
  /** Effectively close this log. */
  private void doClose()
  {
@@ -766,6 +903,9 @@
    // Re-enable cursors previously opened on head, with the saved state
    updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
    // Notify even if time-based rotation is not enabled, as it could be enabled at any time
    replicationEnv.notifyLogFileRotation(this);
  }
  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException