From e84de8ea992525eb6ac672ddc764e3a4825e36d9 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 25 Apr 2016 09:38:43 +0000
Subject: [PATCH] OPENDJ-2794 Move check of key ordering from Log to LogFile class when adding a changelog record
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java | 63 ++++++++++++++++++++++---------
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java | 38 +++---------------
opendj-server-legacy/src/messages/org/opends/messages/replication.properties | 2 +
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java | 6 +-
4 files changed, 56 insertions(+), 53 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index e218e71..0839b7c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -38,7 +38,6 @@
import net.jcip.annotations.GuardedBy;
-import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Pair;
import org.forgerock.util.Reject;
@@ -148,13 +147,6 @@
private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<>();
/**
- * The last key appended to the log. In order to keep the ordering of the keys
- * in the log, any attempt to append a record with a key lower or equal to
- * this key is rejected (no error but an event is logged).
- */
- private K lastAppendedKey;
-
- /**
* The list of non-empty cursors opened on this log. Opened cursors may have
* to be updated when rotating the head log file.
*/
@@ -411,13 +403,11 @@
* <p>
* The record must have a key strictly higher than the key
* of the last record added. If it is not the case, the record is not
- * appended and the method returns immediately.
+ * appended.
* <p>
* In order to ensure that record is written out of buffers and persisted
* to file system, it is necessary to explicitly call the
* {@code syncToFileSystem()} method.
- * <p>
- * This method is not thread-safe.
*
* @param record
* The record to add.
@@ -426,15 +416,6 @@
*/
public void append(final Record<K, V> record) throws ChangelogException
{
- // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
- if (recordIsBreakingKeyOrdering(record))
- {
- logger.info(LocalizableMessage.raw(
- "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
- lastAppendedKey != null ? lastAppendedKey : "null"));
- return;
- }
-
// Fast-path - assume that no rotation is needed and use shared lock.
sharedLock.lock();
try
@@ -447,7 +428,6 @@
if (!mustRotate(headLogFile))
{
headLogFile.append(record);
- lastAppendedKey = record.getKey();
return;
}
}
@@ -465,6 +445,11 @@
return;
}
LogFile<K, V> headLogFile = getHeadLogFile();
+ if (headLogFile.appendWouldBreakKeyOrdering(record))
+ {
+ // abort rotation
+ return;
+ }
if (mustRotate(headLogFile))
{
logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
@@ -473,7 +458,6 @@
headLogFile = getHeadLogFile();
}
headLogFile.append(record);
- lastAppendedKey = record.getKey();
}
finally
{
@@ -483,7 +467,7 @@
private boolean mustRotate(LogFile<K, V> headLogFile)
{
- if (lastAppendedKey == null)
+ if (headLogFile.getNewestRecord() == null)
{
// never rotate an empty file
return false;
@@ -509,12 +493,6 @@
return false;
}
- /** Indicates if the provided record has a key that would break the key ordering in the log. */
- private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
- {
- return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
- }
-
/**
* Synchronize all records added with the file system, ensuring that records
* are effectively persisted.
@@ -1110,8 +1088,6 @@
private void openHeadLogFile() throws ChangelogException
{
final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser);
- final Record<K,V> newestRecord = head.getNewestRecord();
- lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
logFiles.put(recordParser.getMaxKey(), head);
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index f088a15..09b95ad 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -11,7 +11,7 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
- * Copyright 2014-2015 ForgeRock AS.
+ * Copyright 2014-2016 ForgeRock AS.
*/
package org.opends.server.replication.server.changelog.file;
@@ -77,6 +77,11 @@
/** Lock used to ensure that log file is in a consistent state when reading it. */
private final Lock sharedLock;
+ /**
+ * The newest (last) record appended to this log file. In order to keep the ordering of the keys
+ * in the log file, any attempt to append a record with a key lower or equal to this key is
+ * rejected (no error but an event is logged).
+ */
private Record<K, V> newestRecord;
/**
@@ -114,6 +119,7 @@
final ReadWriteLock rwLock = new ReentrantReadWriteLock();
exclusiveLock = rwLock.writeLock();
sharedLock = rwLock.readLock();
+ initializeNewestRecord();
}
/**
@@ -227,9 +233,11 @@
/**
* Add the provided record at the end of this log.
* <p>
- * In order to ensure that record is written out of buffers and persisted
- * to file system, it is necessary to explicitely call the
- * {@code syncToFileSystem()} method.
+ * The record must have a key strictly higher than the key of the last record added.
+ * If it is not the case, the record is not appended.
+ * <p>
+ * In order to ensure that record is written out of buffers and persisted to file system, it is
+ * necessary to explicitly call the {@link #syncToFileSystem()} method.
*
* @param record
* The record to add.
@@ -242,6 +250,10 @@
exclusiveLock.lock();
try
{
+ if (appendWouldBreakKeyOrdering(record))
+ {
+ return;
+ }
writer.write(record);
newestRecord = record;
}
@@ -251,6 +263,18 @@
}
}
+ /** Indicates if the provided record has a key that would break the key ordering if appended in this file log. */
+ boolean appendWouldBreakKeyOrdering(final Record<K, V> record)
+ {
+ boolean wouldBreakOrder = newestRecord != null && record.getKey().compareTo(newestRecord.getKey()) <= 0;
+ if (wouldBreakOrder)
+ {
+ logger.debug(
+ INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER.get(logfile.getPath(), record, newestRecord.getKey()));
+ }
+ return wouldBreakOrder;
+ }
+
/**
* Dump this log file as a text file, intended for debugging purpose only.
*
@@ -363,28 +387,29 @@
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
- Record<K, V> getNewestRecord() throws ChangelogException
+ Record<K, V> getNewestRecord()
{
- if (newestRecord == null)
+ return newestRecord;
+ }
+
+ private void initializeNewestRecord() throws ChangelogException
+ {
+ try (BlockLogReader<K, V> reader = getReader())
{
- try (BlockLogReader<K, V> reader = getReader())
+ sharedLock.lock();
+ try
{
- sharedLock.lock();
- try
- {
- newestRecord = reader.getNewestRecord();
- }
- finally
- {
- sharedLock.unlock();
- }
+ newestRecord = reader.getNewestRecord();
}
- catch (IOException ioe)
+ finally
{
- throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
+ sharedLock.unlock();
}
}
- return newestRecord;
+ catch (IOException ioe)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe);
+ }
}
/**
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
index 5bc6891..6aae643 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -589,3 +589,5 @@
ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \
change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change
ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s'
+INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER_296=Filtering out from log file '%s' the record '%s'\
+ because it would break ordering. Last key appended is '%s'.
\ No newline at end of file
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
index a34c543..2247dcf 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -11,7 +11,7 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
- * Copyright 2014-2015 ForgeRock AS.
+ * Copyright 2014-2016 ForgeRock AS.
*/
package org.opends.server.replication.server.changelog.file;
@@ -267,9 +267,9 @@
{
try (LogFile<String, String> writeLog = getLogFile(RECORD_PARSER))
{
- for (int i = 1; i <= 100; i++)
+ for (int i = 1; i <= 90; i++)
{
- Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
+ Record<String, String> record = Record.from(String.format("newkey%02d", i), "newvalue" + i);
writeLog.append(record);
assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
--
Gitblit v1.10.0