mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.58.2016 e84de8ea992525eb6ac672ddc764e3a4825e36d9
OPENDJ-2794 Move check of key ordering from Log to LogFile class when adding a changelog record

The assumption that appending a log record in Log class is done by only one
thread is false, as two remote replication servers can send an update to the
same RS with the same target changelog.
Check that was done in Log class was not thread-safe, due to optimizations.
4 files modified
109 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java 38 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java 63 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/messages/org/opends/messages/replication.properties 2 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java 6 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -38,7 +38,6 @@
import net.jcip.annotations.GuardedBy;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Pair;
import org.forgerock.util.Reject;
@@ -148,13 +147,6 @@
  private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<>();
  /**
   * The last key appended to the log. In order to keep the ordering of the keys
   * in the log, any attempt to append a record with a key lower or equal to
   * this key is rejected (no error but an event is logged).
   */
  private K lastAppendedKey;
  /**
   * The list of non-empty cursors opened on this log. Opened cursors may have
   * to be updated when rotating the head log file.
   */
@@ -411,13 +403,11 @@
   * <p>
   * The record must have a key strictly higher than the key
   * of the last record added. If it is not the case, the record is not
   * appended and the method returns immediately.
   * appended.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitly call the
   * {@code syncToFileSystem()} method.
   * <p>
   * This method is not thread-safe.
   *
   * @param record
   *          The record to add.
@@ -426,15 +416,6 @@
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
    if (recordIsBreakingKeyOrdering(record))
    {
      logger.info(LocalizableMessage.raw(
              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
              lastAppendedKey != null ? lastAppendedKey : "null"));
      return;
    }
    // Fast-path - assume that no rotation is needed and use shared lock.
    sharedLock.lock();
    try
@@ -447,7 +428,6 @@
      if (!mustRotate(headLogFile))
      {
        headLogFile.append(record);
        lastAppendedKey = record.getKey();
        return;
      }
    }
@@ -465,6 +445,11 @@
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (headLogFile.appendWouldBreakKeyOrdering(record))
      {
        // abort rotation
        return;
      }
      if (mustRotate(headLogFile))
      {
        logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
@@ -473,7 +458,6 @@
        headLogFile = getHeadLogFile();
      }
      headLogFile.append(record);
      lastAppendedKey = record.getKey();
    }
    finally
    {
@@ -483,7 +467,7 @@
  private boolean mustRotate(LogFile<K, V> headLogFile)
  {
    if (lastAppendedKey == null)
    if (headLogFile.getNewestRecord() == null)
    {
      // never rotate an empty file
      return false;
@@ -509,12 +493,6 @@
    return false;
  }
  /** Indicates if the provided record has a key that would break the key ordering in the log. */
  private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
  {
    return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
  }
  /**
   * Synchronize all records added with the file system, ensuring that records
   * are effectively persisted.
@@ -1110,8 +1088,6 @@
  private void openHeadLogFile() throws ChangelogException
  {
    final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
    final Record<K,V> newestRecord = head.getNewestRecord();
    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
    logFiles.put(recordParser.getMaxKey(), head);
  }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -11,7 +11,7 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2014-2015 ForgeRock AS.
 * Copyright 2014-2016 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
@@ -77,6 +77,11 @@
  /** Lock used to ensure that log file is in a consistent state when reading it. */
  private final Lock sharedLock;
  /**
   * The newest (last) record appended to this log file. In order to keep the ordering of the keys
   * in the log file, any attempt to append a record with a key lower or equal to this key is
   * rejected (no error but an event is logged).
   */
  private Record<K, V> newestRecord;
  /**
@@ -114,6 +119,7 @@
    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    exclusiveLock = rwLock.writeLock();
    sharedLock = rwLock.readLock();
    initializeNewestRecord();
  }
  /**
@@ -227,9 +233,11 @@
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   * The record must have a key strictly higher than the key of the last record added.
   * If it is not the case, the record is not appended.
   * <p>
   * In order to ensure that record is written out of buffers and persisted to file system, it is
   * necessary to explicitly call the {@link #syncToFileSystem()} method.
   *
   * @param record
   *          The record to add.
@@ -242,6 +250,10 @@
    exclusiveLock.lock();
    try
    {
      if (appendWouldBreakKeyOrdering(record))
      {
        return;
      }
      writer.write(record);
      newestRecord = record;
    }
@@ -251,6 +263,18 @@
    }
  }
  /** Indicates if the provided record has a key that would break the key ordering if appended in this file log. */
  boolean appendWouldBreakKeyOrdering(final Record<K, V> record)
  {
    boolean wouldBreakOrder = newestRecord != null && record.getKey().compareTo(newestRecord.getKey()) <= 0;
    if (wouldBreakOrder)
    {
      logger.debug(
          INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER.get(logfile.getPath(), record, newestRecord.getKey()));
    }
    return wouldBreakOrder;
  }
  /**
   * Dump this log file as a text file, intended for debugging purpose only.
   *
@@ -363,28 +387,29 @@
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  Record<K, V> getNewestRecord() throws ChangelogException
  Record<K, V> getNewestRecord()
  {
    if (newestRecord == null)
    return newestRecord;
  }
  private void initializeNewestRecord() throws ChangelogException
  {
    try (BlockLogReader<K, V> reader = getReader())
    {
      try (BlockLogReader<K, V> reader = getReader())
      sharedLock.lock();
      try
      {
        sharedLock.lock();
        try
        {
          newestRecord = reader.getNewestRecord();
        }
        finally
        {
          sharedLock.unlock();
        }
        newestRecord = reader.getNewestRecord();
      }
      catch (IOException ioe)
      finally
      {
        throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
        sharedLock.unlock();
      }
    }
    return newestRecord;
    catch (IOException ioe)
    {
      throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
    }
  }
  /**
opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -589,3 +589,5 @@
ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \
  change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change
ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s'
INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER_296=Filtering out from log file '%s' the record '%s'\
 because it would break ordering. Last key appended is '%s'.
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -11,7 +11,7 @@
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2014-2015 ForgeRock AS.
 * Copyright 2014-2016 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
@@ -267,9 +267,9 @@
  {
    try (LogFile<String, String> writeLog = getLogFile(RECORD_PARSER))
    {
      for (int i = 1; i <= 100; i++)
      for (int i = 1; i <= 90; i++)
      {
        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
        Record<String, String> record = Record.from(String.format("newkey%02d", i), "newvalue" + i);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));