From e84de8ea992525eb6ac672ddc764e3a4825e36d9 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 25 Apr 2016 09:38:43 +0000
Subject: [PATCH] OPENDJ-2794 Move check of key ordering from Log to LogFile class when adding a changelog record

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java     |   63 ++++++++++++++++++++++---------
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java         |   38 +++---------------
 opendj-server-legacy/src/messages/org/opends/messages/replication.properties                            |    2 +
 opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java |    6 +-
 4 files changed, 56 insertions(+), 53 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index e218e71..0839b7c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -38,7 +38,6 @@
 
 import net.jcip.annotations.GuardedBy;
 
-import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.util.Pair;
 import org.forgerock.util.Reject;
@@ -148,13 +147,6 @@
   private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<>();
 
   /**
-   * The last key appended to the log. In order to keep the ordering of the keys
-   * in the log, any attempt to append a record with a key lower or equal to
-   * this key is rejected (no error but an event is logged).
-   */
-  private K lastAppendedKey;
-
-  /**
    * The list of non-empty cursors opened on this log. Opened cursors may have
    * to be updated when rotating the head log file.
    */
@@ -411,13 +403,11 @@
    * <p>
    * The record must have a key strictly higher than the key
    * of the last record added. If it is not the case, the record is not
-   * appended and the method returns immediately.
+   * appended.
    * <p>
    * In order to ensure that record is written out of buffers and persisted
    * to file system, it is necessary to explicitly call the
    * {@code syncToFileSystem()} method.
-   * <p>
-   * This method is not thread-safe.
    *
    * @param record
    *          The record to add.
@@ -426,15 +416,6 @@
    */
   public void append(final Record<K, V> record) throws ChangelogException
   {
-    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
-    if (recordIsBreakingKeyOrdering(record))
-    {
-      logger.info(LocalizableMessage.raw(
-              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
-              lastAppendedKey != null ? lastAppendedKey : "null"));
-      return;
-    }
-
     // Fast-path - assume that no rotation is needed and use shared lock.
     sharedLock.lock();
     try
@@ -447,7 +428,6 @@
       if (!mustRotate(headLogFile))
       {
         headLogFile.append(record);
-        lastAppendedKey = record.getKey();
         return;
       }
     }
@@ -465,6 +445,11 @@
         return;
       }
       LogFile<K, V> headLogFile = getHeadLogFile();
+      if (headLogFile.appendWouldBreakKeyOrdering(record))
+      {
+        // abort rotation
+        return;
+      }
       if (mustRotate(headLogFile))
       {
         logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
@@ -473,7 +458,6 @@
         headLogFile = getHeadLogFile();
       }
       headLogFile.append(record);
-      lastAppendedKey = record.getKey();
     }
     finally
     {
@@ -483,7 +467,7 @@
 
   private boolean mustRotate(LogFile<K, V> headLogFile)
   {
-    if (lastAppendedKey == null)
+    if (headLogFile.getNewestRecord() == null)
     {
       // never rotate an empty file
       return false;
@@ -509,12 +493,6 @@
     return false;
   }
 
-  /** Indicates if the provided record has a key that would break the key ordering in the log. */
-  private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
-  {
-    return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
-  }
-
   /**
    * Synchronize all records added with the file system, ensuring that records
    * are effectively persisted.
@@ -1110,8 +1088,6 @@
   private void openHeadLogFile() throws ChangelogException
   {
     final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
-    final Record<K,V> newestRecord = head.getNewestRecord();
-    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
     logFiles.put(recordParser.getMaxKey(), head);
   }
 
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index f088a15..09b95ad 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -11,7 +11,7 @@
  * Header, with the fields enclosed by brackets [] replaced by your own identifying
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
- * Copyright 2014-2015 ForgeRock AS.
+ * Copyright 2014-2016 ForgeRock AS.
  */
 package org.opends.server.replication.server.changelog.file;
 
@@ -77,6 +77,11 @@
   /** Lock used to ensure that log file is in a consistent state when reading it. */
   private final Lock sharedLock;
 
+  /**
+   * The newest (last) record appended to this log file. In order to keep the ordering of the keys
+   * in the log file, any attempt to append a record with a key lower or equal to this key is
+   * rejected (no error but an event is logged).
+   */
   private Record<K, V> newestRecord;
 
   /**
@@ -114,6 +119,7 @@
     final ReadWriteLock rwLock = new ReentrantReadWriteLock();
     exclusiveLock = rwLock.writeLock();
     sharedLock = rwLock.readLock();
+    initializeNewestRecord();
   }
 
   /**
@@ -227,9 +233,11 @@
   /**
    * Add the provided record at the end of this log.
    * <p>
-   * In order to ensure that record is written out of buffers and persisted
-   * to file system, it is necessary to explicitely call the
-   * {@code syncToFileSystem()} method.
+   * The record must have a key strictly higher than the key of the last record added.
+   * If it is not the case, the record is not appended.
+   * <p>
+   * In order to ensure that record is written out of buffers and persisted to file system, it is
+   * necessary to explicitly call the {@link #syncToFileSystem()} method.
    *
    * @param record
    *          The record to add.
@@ -242,6 +250,10 @@
     exclusiveLock.lock();
     try
     {
+      if (appendWouldBreakKeyOrdering(record))
+      {
+        return;
+      }
       writer.write(record);
       newestRecord = record;
     }
@@ -251,6 +263,18 @@
     }
   }
 
+  /** Indicates if the provided record has a key that would break the key ordering if appended in this file log. */
+  boolean appendWouldBreakKeyOrdering(final Record<K, V> record)
+  {
+    boolean wouldBreakOrder = newestRecord != null && record.getKey().compareTo(newestRecord.getKey()) <= 0;
+    if (wouldBreakOrder)
+    {
+      logger.debug(
+          INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER.get(logfile.getPath(), record, newestRecord.getKey()));
+    }
+    return wouldBreakOrder;
+  }
+
   /**
    * Dump this log file as a text file, intended for debugging purpose only.
    *
@@ -363,28 +387,29 @@
    * @throws ChangelogException
    *           If an error occurs while retrieving the record.
    */
-  Record<K, V> getNewestRecord() throws ChangelogException
+  Record<K, V> getNewestRecord()
   {
-    if (newestRecord == null)
+    return newestRecord;
+  }
+
+  private void initializeNewestRecord() throws ChangelogException
+  {
+    try (BlockLogReader<K, V> reader = getReader())
     {
-      try (BlockLogReader<K, V> reader = getReader())
+      sharedLock.lock();
+      try
       {
-        sharedLock.lock();
-        try
-        {
-          newestRecord = reader.getNewestRecord();
-        }
-        finally
-        {
-          sharedLock.unlock();
-        }
+        newestRecord = reader.getNewestRecord();
       }
-      catch (IOException ioe)
+      finally
       {
-        throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
+        sharedLock.unlock();
       }
     }
-    return newestRecord;
+    catch (IOException ioe)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
+    }
   }
 
   /**
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
index 5bc6891..6aae643 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -589,3 +589,5 @@
 ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \
   change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change
 ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s'
+INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER_296=Filtering out from log file '%s' the record '%s'\
+ because it would break ordering. Last key appended is '%s'.
\ No newline at end of file
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
index a34c543..2247dcf 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -11,7 +11,7 @@
  * Header, with the fields enclosed by brackets [] replaced by your own identifying
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
- * Copyright 2014-2015 ForgeRock AS.
+ * Copyright 2014-2016 ForgeRock AS.
  */
 package org.opends.server.replication.server.changelog.file;
 
@@ -267,9 +267,9 @@
   {
     try (LogFile<String, String> writeLog = getLogFile(RECORD_PARSER))
     {
-      for (int i = 1; i <= 100; i++)
+      for (int i = 1; i <= 90; i++)
       {
-        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
+        Record<String, String> record = Record.from(String.format("newkey%02d", i), "newvalue" + i);
         writeLog.append(record);
         assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
         assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));

--
Gitblit v1.10.0