From 46992e79c3b43e79c556f747cab12f93614aeae0 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 28 May 2014 12:06:46 +0000
Subject: [PATCH] OPENDJ-1482 : SEVERE_ERROR (cursor still opened on the log) when running replication topology with file-based changelog

---
 opends/src/server/org/opends/server/replication/server/changelog/file/Log.java |   34 +++++++++++++++++++++++++++++++---
 1 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 9667a56..37817c4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -25,9 +25,9 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
+import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.*;
-import static org.opends.messages.ReplicationMessages.*;
 
 import java.io.Closeable;
 import java.io.File;
@@ -47,6 +47,9 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.forgerock.util.Reject;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
 import org.opends.server.loggers.ErrorLogger;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -153,6 +156,13 @@
   private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
 
   /**
+   * The last key appended to the log. In order to keep the ordering of the keys
+   * in the log, any attempt to append a record with a key lower or equal to
+   * this key will silently fail.
+   */
+  private K lastAppendedKey;
+
+  /**
    * The list of non-empty cursors opened on this log. Opened cursors may have
    * to be updated when rotating the head log file.
    */
@@ -354,6 +364,12 @@
       {
         return;
       }
+      if (recordIsBreakingKeyOrdering(record))
+      {
+        ErrorLogger.logError(Message.raw(Category.SYNC, Severity.NOTICE,
+            "Rejecting append to log '%s' for record: [%s]", logPath.getPath(), record.toString()));
+        return;
+      }
       LogFile<K, V> headLogFile = getHeadLogFile();
       if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
       {
@@ -363,6 +379,7 @@
         headLogFile = getHeadLogFile();
       }
       headLogFile.append(record);
+      lastAppendedKey = record.getKey();
     }
     finally
     {
@@ -371,6 +388,15 @@
   }
 
   /**
+   * Indicates if the provided record has a key that would break the key
+   * ordering in the log.
+   */
+  private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
+  {
+    return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
+  }
+
+  /**
    * Synchronize all records added with the file system, ensuring that records
    * are effectively persisted.
    * <p>
@@ -814,8 +840,10 @@
 
   private void openHeadLogFile() throws ChangelogException
   {
-    logFiles.put(recordParser.getMaxKey(),
-        LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser));
+    final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
+    Record<K,V> newestRecord = head.getNewestRecord();
+    lastAppendedKey = newestRecord == null ? null : newestRecord.getKey();
+    logFiles.put(recordParser.getMaxKey(), head);
   }
 
   private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException

--
Gitblit v1.10.0