mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.58.2016 e84de8ea992525eb6ac672ddc764e3a4825e36d9
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -38,7 +38,6 @@
import net.jcip.annotations.GuardedBy;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Pair;
import org.forgerock.util.Reject;
@@ -148,13 +147,6 @@
  private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<>();
  /**
   * The last key appended to the log. In order to keep the ordering of the keys
   * in the log, any attempt to append a record with a key lower or equal to
   * this key is rejected (no error but an event is logged).
   */
  private K lastAppendedKey;
  /**
   * The list of non-empty cursors opened on this log. Opened cursors may have
   * to be updated when rotating the head log file.
   */
@@ -411,13 +403,11 @@
   * <p>
   * The record must have a key strictly higher than the key
   * of the last record added. If it is not the case, the record is not
   * appended and the method returns immediately.
   * appended.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitly call the
   * {@code syncToFileSystem()} method.
   * <p>
   * This method is not thread-safe.
   *
   * @param record
   *          The record to add.
@@ -426,15 +416,6 @@
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
    if (recordIsBreakingKeyOrdering(record))
    {
      logger.info(LocalizableMessage.raw(
              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
              lastAppendedKey != null ? lastAppendedKey : "null"));
      return;
    }
    // Fast-path - assume that no rotation is needed and use shared lock.
    sharedLock.lock();
    try
@@ -447,7 +428,6 @@
      if (!mustRotate(headLogFile))
      {
        headLogFile.append(record);
        lastAppendedKey = record.getKey();
        return;
      }
    }
@@ -465,6 +445,11 @@
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (headLogFile.appendWouldBreakKeyOrdering(record))
      {
        // abort rotation
        return;
      }
      if (mustRotate(headLogFile))
      {
        logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
@@ -473,7 +458,6 @@
        headLogFile = getHeadLogFile();
      }
      headLogFile.append(record);
      lastAppendedKey = record.getKey();
    }
    finally
    {
@@ -483,7 +467,7 @@
  private boolean mustRotate(LogFile<K, V> headLogFile)
  {
    if (lastAppendedKey == null)
    if (headLogFile.getNewestRecord() == null)
    {
      // never rotate an empty file
      return false;
@@ -509,12 +493,6 @@
    return false;
  }
  /** Indicates if the provided record has a key that would break the key ordering in the log. */
  private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
  {
    return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
  }
  /**
   * Synchronize all records added with the file system, ensuring that records
   * are effectively persisted.
@@ -1110,8 +1088,6 @@
  private void openHeadLogFile() throws ChangelogException
  {
    final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
    final Record<K,V> newestRecord = head.getNewestRecord();
    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
    logFiles.put(recordParser.getMaxKey(), head);
  }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -11,7 +11,7 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2014-2015 ForgeRock AS.
 * Copyright 2014-2016 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
@@ -77,6 +77,11 @@
  /** Lock used to ensure that log file is in a consistent state when reading it. */
  private final Lock sharedLock;
  /**
   * The newest (last) record appended to this log file. In order to keep the ordering of the keys
   * in the log file, any attempt to append a record with a key lower or equal to this key is
   * rejected (no error but an event is logged).
   */
  private Record<K, V> newestRecord;
  /**
@@ -114,6 +119,7 @@
    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    exclusiveLock = rwLock.writeLock();
    sharedLock = rwLock.readLock();
    initializeNewestRecord();
  }
  /**
@@ -227,9 +233,11 @@
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   * The record must have a key strictly higher than the key of the last record added.
   * If it is not the case, the record is not appended.
   * <p>
   * In order to ensure that record is written out of buffers and persisted to file system, it is
   * necessary to explicitly call the {@link #syncToFileSystem()} method.
   *
   * @param record
   *          The record to add.
@@ -242,6 +250,10 @@
    exclusiveLock.lock();
    try
    {
      if (appendWouldBreakKeyOrdering(record))
      {
        return;
      }
      writer.write(record);
      newestRecord = record;
    }
@@ -251,6 +263,18 @@
    }
  }
  /** Indicates if the provided record has a key that would break the key ordering if appended in this file log. */
  boolean appendWouldBreakKeyOrdering(final Record<K, V> record)
  {
    boolean wouldBreakOrder = newestRecord != null && record.getKey().compareTo(newestRecord.getKey()) <= 0;
    if (wouldBreakOrder)
    {
      logger.debug(
          INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER.get(logfile.getPath(), record, newestRecord.getKey()));
    }
    return wouldBreakOrder;
  }
  /**
   * Dump this log file as a text file, intended for debugging purpose only.
   *
@@ -363,28 +387,29 @@
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  Record<K, V> getNewestRecord() throws ChangelogException
  Record<K, V> getNewestRecord()
  {
    if (newestRecord == null)
    return newestRecord;
  }
  private void initializeNewestRecord() throws ChangelogException
  {
    try (BlockLogReader<K, V> reader = getReader())
    {
      try (BlockLogReader<K, V> reader = getReader())
      sharedLock.lock();
      try
      {
        sharedLock.lock();
        try
        {
          newestRecord = reader.getNewestRecord();
        }
        finally
        {
          sharedLock.unlock();
        }
        newestRecord = reader.getNewestRecord();
      }
      catch (IOException ioe)
      finally
      {
        throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
        sharedLock.unlock();
      }
    }
    return newestRecord;
    catch (IOException ioe)
    {
      throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
    }
  }
  /**
opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -589,3 +589,5 @@
ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \
  change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change
ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s'
INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER_296=Filtering out from log file '%s' the record '%s'\
 because it would break ordering. Last key appended is '%s'.
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -11,7 +11,7 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2014-2015 ForgeRock AS.
 * Copyright 2014-2016 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
@@ -267,9 +267,9 @@
  {
    try (LogFile<String, String> writeLog = getLogFile(RECORD_PARSER))
    {
      for (int i = 1; i <= 100; i++)
      for (int i = 1; i <= 90; i++)
      {
        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
        Record<String, String> record = Record.from(String.format("newkey%02d", i), "newvalue" + i);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));