opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -38,7 +38,6 @@ import net.jcip.annotations.GuardedBy; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.util.Pair; import org.forgerock.util.Reject; @@ -148,13 +147,6 @@ private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<>(); /** * The last key appended to the log. In order to keep the ordering of the keys * in the log, any attempt to append a record with a key lower or equal to * this key is rejected (no error but an event is logged). */ private K lastAppendedKey; /** * The list of non-empty cursors opened on this log. Opened cursors may have * to be updated when rotating the head log file. */ @@ -411,13 +403,11 @@ * <p> * The record must have a key strictly higher than the key * of the last record added. If it is not the case, the record is not * appended and the method returns immediately. * appended. * <p> * In order to ensure that record is written out of buffers and persisted * to file system, it is necessary to explicitly call the * {@code syncToFileSystem()} method. * <p> * This method is not thread-safe. * * @param record * The record to add. @@ -426,15 +416,6 @@ */ public void append(final Record<K, V> record) throws ChangelogException { // This check is ok outside of any locking because only the append thread updates lastAppendedKey. if (recordIsBreakingKeyOrdering(record)) { logger.info(LocalizableMessage.raw( "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record, lastAppendedKey != null ? lastAppendedKey : "null")); return; } // Fast-path - assume that no rotation is needed and use shared lock. sharedLock.lock(); try @@ -447,7 +428,6 @@ if (!mustRotate(headLogFile)) { headLogFile.append(record); lastAppendedKey = record.getKey(); return; } } @@ -465,6 +445,11 @@ return; } LogFile<K, V> headLogFile = getHeadLogFile(); if (headLogFile.appendWouldBreakKeyOrdering(record)) { // abort rotation return; } if (mustRotate(headLogFile)) { logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); @@ -473,7 +458,6 @@ headLogFile = getHeadLogFile(); } headLogFile.append(record); lastAppendedKey = record.getKey(); } finally { @@ -483,7 +467,7 @@ private boolean mustRotate(LogFile<K, V> headLogFile) { if (lastAppendedKey == null) if (headLogFile.getNewestRecord() == null) { // never rotate an empty file return false; @@ -509,12 +493,6 @@ return false; } /** Indicates if the provided record has a key that would break the key ordering in the log. */ private boolean recordIsBreakingKeyOrdering(final Record<K, V> record) { return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0; } /** * Synchronize all records added with the file system, ensuring that records * are effectively persisted. @@ -1110,8 +1088,6 @@ private void openHeadLogFile() throws ChangelogException { final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser); final Record<K,V> newestRecord = head.getNewestRecord(); lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null; logFiles.put(recordParser.getMaxKey(), head); } opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -11,7 +11,7 @@ * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2014-2015 ForgeRock AS. * Copyright 2014-2016 ForgeRock AS. */ package org.opends.server.replication.server.changelog.file; @@ -77,6 +77,11 @@ /** Lock used to ensure that log file is in a consistent state when reading it. */ private final Lock sharedLock; /** * The newest (last) record appended to this log file. In order to keep the ordering of the keys * in the log file, any attempt to append a record with a key lower or equal to this key is * rejected (no error but an event is logged). */ private Record<K, V> newestRecord; /** @@ -114,6 +119,7 @@ final ReadWriteLock rwLock = new ReentrantReadWriteLock(); exclusiveLock = rwLock.writeLock(); sharedLock = rwLock.readLock(); initializeNewestRecord(); } /** @@ -227,9 +233,11 @@ /** * Add the provided record at the end of this log. * <p> * In order to ensure that record is written out of buffers and persisted * to file system, it is necessary to explicitely call the * {@code syncToFileSystem()} method. * The record must have a key strictly higher than the key of the last record added. * If it is not the case, the record is not appended. * <p> * In order to ensure that record is written out of buffers and persisted to file system, it is * necessary to explicitly call the {@link #syncToFileSystem()} method. * * @param record * The record to add. @@ -242,6 +250,10 @@ exclusiveLock.lock(); try { if (appendWouldBreakKeyOrdering(record)) { return; } writer.write(record); newestRecord = record; } @@ -251,6 +263,18 @@ } } /** Indicates if the provided record has a key that would break the key ordering if appended in this file log. */ boolean appendWouldBreakKeyOrdering(final Record<K, V> record) { boolean wouldBreakOrder = newestRecord != null && record.getKey().compareTo(newestRecord.getKey()) <= 0; if (wouldBreakOrder) { logger.debug( INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER.get(logfile.getPath(), record, newestRecord.getKey())); } return wouldBreakOrder; } /** * Dump this log file as a text file, intended for debugging purpose only. * @@ -363,28 +387,29 @@ * @throws ChangelogException * If an error occurs while retrieving the record. */ Record<K, V> getNewestRecord() throws ChangelogException Record<K, V> getNewestRecord() { if (newestRecord == null) return newestRecord; } private void initializeNewestRecord() throws ChangelogException { try (BlockLogReader<K, V> reader = getReader()) { try (BlockLogReader<K, V> reader = getReader()) sharedLock.lock(); try { sharedLock.lock(); try { newestRecord = reader.getNewestRecord(); } finally { sharedLock.unlock(); } newestRecord = reader.getNewestRecord(); } catch (IOException ioe) finally { throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe); sharedLock.unlock(); } } return newestRecord; catch (IOException ioe) { throw new ChangelogException(ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD.get(logfile.getAbsolutePath()), ioe); } } /** opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -589,3 +589,5 @@ ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \ change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s' INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER_296=Filtering out from log file '%s' the record '%s'\ because it would break ordering. Last key appended is '%s'. opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -11,7 +11,7 @@ * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2014-2015 ForgeRock AS. * Copyright 2014-2016 ForgeRock AS. */ package org.opends.server.replication.server.changelog.file; @@ -267,9 +267,9 @@ { try (LogFile<String, String> writeLog = getLogFile(RECORD_PARSER)) { for (int i = 1; i <= 100; i++) for (int i = 1; i <= 90; i++) { Record<String, String> record = Record.from("newkey" + i, "newvalue" + i); Record<String, String> record = Record.from(String.format("newkey%02d", i), "newvalue" + i); writeLog.append(record); assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record); assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));