From 8fdf1768757fba933e7ce63ac6381eacec41f0c6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 09 Dec 2015 17:41:58 +0000
Subject: [PATCH] OPENDJ-2476: Purge of file-based changelog is very slow and the changelog size is growing.
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java | 110 +++++++++++++--
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java | 255 ++++++++++++++++++++----------------
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java | 44 -----
opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java | 4
4 files changed, 243 insertions(+), 170 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java b/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
index 803284b..b8e95f5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2014 ForgeRock AS
+ * Portions Copyright 2014-2015 ForgeRock AS
*/
package org.opends.server.loggers;
@@ -37,7 +37,7 @@
public final class MeteredStream extends OutputStream
{
OutputStream out;
- long written;
+ volatile long written;
/**
* Create the stream wrapped around the specified output
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 52e2785..b072dfc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -187,14 +187,12 @@
private long lastRotationTime;
/**
- * The exclusive lock used for writes and lifecycle operations on this log:
+ * The exclusive lock used for log rotation and lifecycle operations on this log:
* initialize, clear, sync and close.
*/
private final Lock exclusiveLock;
- /**
- * The shared lock used for reads and cursor operations on this log.
- */
+ /** The shared lock used for write operations and accessing {@link #logFiles} map. */
private final Lock sharedLock;
/**
@@ -416,8 +414,10 @@
* appended and the method returns immediately.
* <p>
* In order to ensure that record is written out of buffers and persisted
- * to file system, it is necessary to explicitely call the
+ * to file system, it is necessary to explicitly call the
* {@code syncToFileSystem()} method.
+ * <p>
+ * This method is not thread-safe.
*
* @param record
* The record to add.
@@ -426,22 +426,42 @@
*/
public void append(final Record<K, V> record) throws ChangelogException
{
- // If this exclusive lock happens to be a bottleneck :
- // 1. use a shared lock for appending the record first
- // 2. switch to an exclusive lock only if rotation is needed
- // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
- exclusiveLock.lock();
+ // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
+ if (recordIsBreakingKeyOrdering(record))
+ {
+ logger.info(LocalizableMessage.raw(
+ "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
+ lastAppendedKey != null ? lastAppendedKey : "null"));
+ return;
+ }
+
+ // Fast-path - assume that no rotation is needed and use shared lock.
+ sharedLock.lock();
try
{
if (isClosed)
{
return;
}
- if (recordIsBreakingKeyOrdering(record))
+ LogFile<K, V> headLogFile = getHeadLogFile();
+ if (!mustRotate(headLogFile))
{
- logger.info(LocalizableMessage.raw(
- "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
- lastAppendedKey != null ? lastAppendedKey : "null"));
+ headLogFile.append(record);
+ lastAppendedKey = record.getKey();
+ return;
+ }
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
+
+ // Slow-path - rotation is needed so use exclusive lock.
+ exclusiveLock.lock();
+ try
+ {
+ if (isClosed)
+ {
return;
}
LogFile<K, V> headLogFile = getHeadLogFile();
@@ -837,9 +857,17 @@
*/
void dumpAsTextFile(File dumpDirectory) throws ChangelogException
{
- for (LogFile<K, V> logFile : logFiles.values())
+ sharedLock.lock();
+ try
{
- logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+ for (LogFile<K, V> logFile : logFiles.values())
+ {
+ logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+ }
+ }
+ finally
+ {
+ sharedLock.unlock();
}
}
@@ -892,19 +920,19 @@
sharedLock.lock();
try
{
- K key = null;
- for (LogFile<K, V> logFile : logFiles.values())
- {
- final Record<K, V> record = logFile.getOldestRecord();
- final V2 oldestValue = mapper.map(record.getValue());
- if (oldestValue.compareTo(limitValue) > 0)
+ K key = null;
+ for (LogFile<K, V> logFile : logFiles.values())
{
- return key;
+ final Record<K, V> record = logFile.getOldestRecord();
+ final V2 oldestValue = mapper.map(record.getValue());
+ if (oldestValue.compareTo(limitValue) > 0)
+ {
+ return key;
+ }
+ key = record.getKey();
}
- key = record.getKey();
+ return key;
}
- return key;
- }
finally
{
sharedLock.unlock();
@@ -950,6 +978,7 @@
* <p>
* All cursors opened on this log are temporarily disabled (closing underlying resources)
* and then re-open with their previous state.
+ * @GuardedBy("exclusiveLock")
*/
private void rotateHeadLogFile() throws ChangelogException
{
@@ -1028,7 +1057,10 @@
+ recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
}
- /** Update the cursors that were pointing to head after a rotation of the head log file. */
+ /**
+ * Update the cursors that were pointing to head after a rotation of the head log file.
+ * @GuardedBy("exclusiveLock")
+ */
private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
throws ChangelogException
{
@@ -1040,7 +1072,7 @@
if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
- final LogFile<K, V> logFile = findLogFileFor(previousKey);
+ final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
final AbortableLogCursor<K, V> cursor = pair.getFirst();
cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
}
@@ -1058,6 +1090,7 @@
/**
* Disable the cursors opened on the head log file log, by closing their underlying cursor.
* Returns the state of each cursor just before the close operation.
+ * @GuardedBy("exclusiveLock")
*
* @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
* @throws ChangelogException
@@ -1111,12 +1144,20 @@
*/
private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
{
- if (isHeadLogFile(currentLogFile))
+ sharedLock.lock();
+ try
{
- return null;
+ if (isHeadLogFile(currentLogFile))
+ {
+ return null;
+ }
+ final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+ return logFiles.higherEntry(bounds.getSecond()).getValue();
}
- final Pair<K, K> bounds = getKeyBounds(currentLogFile);
- return logFiles.higherEntry(bounds.getSecond()).getValue();
+ finally
+ {
+ sharedLock.unlock();
+ }
}
private boolean isHeadLogFile(final LogFile<K, V> logFile)
@@ -1124,14 +1165,23 @@
return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
}
- /** Returns the log file that should contain the provided key. */
- private LogFile<K, V> findLogFileFor(final K key)
+ /** @GuardedBy("sharedLock") */
+ private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
{
if (key == null || logFiles.lowerKey(key) == null)
{
return getOldestLogFile();
}
- return logFiles.ceilingEntry(key).getValue();
+
+ final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue();
+ if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
+ && candidate.getOldestRecord().getKey().compareTo(key) > 0)
+ {
+ // This handle the special case where the first key of the candidate is actually greater than the expected one.
+ // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
+ return logFiles.floorEntry(key).getValue();
+ }
+ return candidate;
}
/**
@@ -1217,18 +1267,28 @@
@Override
public boolean next() throws ChangelogException
{
- final boolean hasNext = currentCursor.next();
- if (hasNext)
+ // Lock is needed here to ensure that log rotation is performed atomically.
+ // This ensures that currentCursor will not be aborted concurrently.
+ log.sharedLock.lock();
+ try
{
- return true;
+ final boolean hasNext = currentCursor.next();
+ if (hasNext)
+ {
+ return true;
+ }
+ final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+ if (logFile != null)
+ {
+ switchToLogFile(logFile);
+ return currentCursor.next();
+ }
+ return false;
}
- final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
- if (logFile != null)
+ finally
{
- switchToLogFile(logFile);
- return currentCursor.next();
+ log.sharedLock.unlock();
}
- return false;
}
@Override
@@ -1244,19 +1304,38 @@
final PositionStrategy positionStrategy)
throws ChangelogException
{
- final LogFile<K, V> logFile = log.findLogFileFor(key);
- if (logFile != currentLogFile)
+ // Lock is needed here to ensure that log rotation is performed atomically.
+ // This ensures that currentLogFile will not be closed concurrently.
+ log.sharedLock.lock();
+ try
{
- switchToLogFile(logFile);
+ final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy);
+ if (logFile != currentLogFile)
+ {
+ switchToLogFile(logFile);
+ }
+ return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
- return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
+ finally
+ {
+ log.sharedLock.unlock();
+ }
}
- /** Returns the state of this cursor. */
@Override
CursorState<K, V> getState() throws ChangelogException
{
- return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+ // Lock is needed here to ensure that log rotation is performed atomically.
+ // This ensures that currentCursor will not be aborted concurrently.
+ log.sharedLock.lock();
+ try
+ {
+ return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+ }
+ finally
+ {
+ log.sharedLock.unlock();
+ }
}
@Override
@@ -1442,65 +1521,33 @@
@Override
public Record<K, V> getRecord()
{
- log.sharedLock.lock();
- try
- {
- return delegate.getRecord();
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ return delegate.getRecord();
}
@Override
- public boolean next() throws ChangelogException
+ public synchronized boolean next() throws ChangelogException
{
- log.sharedLock.lock();
- try
+ if (mustAbort)
{
- if (mustAbort)
- {
- delegate.close();
- delegate = new AbortedLogCursor<>(log.getPath());
- mustAbort = false;
- }
- return delegate.next();
+ delegate.close();
+ delegate = new AbortedLogCursor<>(log.getPath());
+ mustAbort = false;
}
- finally
- {
- log.sharedLock.unlock();
- }
+ return delegate.next();
}
@Override
public void close()
{
- log.sharedLock.lock();
- try
- {
- delegate.close();
- log.unregisterCursor(this);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ delegate.close();
+ log.unregisterCursor(this);
}
@Override
public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
throws ChangelogException
{
- log.sharedLock.lock();
- try
- {
- return delegate.positionTo(key, matchStrategy, positionStrategy);
- }
- finally
- {
- log.sharedLock.unlock();
- }
+ return delegate.positionTo(key, matchStrategy, positionStrategy);
}
/**
@@ -1509,49 +1556,33 @@
* <p>
* This method is called only when log.exclusiveLock has been acquired.
*/
- void abort()
+ synchronized void abort()
{
mustAbort = true;
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
CursorState<K, V> getState() throws ChangelogException
{
return delegate.getState();
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
void closeUnderlyingCursor()
{
delegate.closeUnderlyingCursor();
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
{
delegate.reinitializeTo(cursorState);
}
- /**
- * {@inheritDoc}
- * <p>
- * This method is called only when log.exclusiveLock has been acquired.
- */
+ /** @GuardedBy("exclusiveLock") */
@Override
boolean isAccessingLogFile(LogFile<K, V> logFile)
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index 16921a1..cfe86b5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -33,6 +33,9 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -52,9 +55,7 @@
* </ul>
* <p>
* A log file is NOT intended to be used directly, but only has part of a
- * {@link Log}. In particular, there is no concurrency management and no checks
- * to ensure that log is not closed when performing any operation on it. Those
- * are managed at the {@code Log} level.
+ * {@link Log}.
*
* @param <K>
* Type of the key of a record, which must be comparable.
@@ -80,6 +81,12 @@
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
+ /** Lock used to ensure write atomicity. */
+ private final Lock exclusiveLock;
+
+ /** Lock used to ensure that log file is in a consistent state when reading it. */
+ private final Lock sharedLock;
+
private Record<K, V> newestRecord;
/**
@@ -113,6 +120,10 @@
writer = null;
}
readerPool = new LogReaderPool<>(logfile, parser);
+
+ final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ exclusiveLock = rwLock.writeLock();
+ sharedLock = rwLock.readLock();
}
/**
@@ -204,8 +215,8 @@
*/
private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
{
- try(final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
- final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
+ try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+ final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
{
final long lastValidPosition = reader.checkLogIsValid();
if (lastValidPosition != -1)
@@ -238,8 +249,16 @@
void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
- writer.write(record);
- newestRecord = record;
+ exclusiveLock.lock();
+ try
+ {
+ writer.write(record);
+ newestRecord = record;
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
}
/**
@@ -252,8 +271,8 @@
*/
void dumpAsTextFile(File dumpFile) throws ChangelogException
{
- try(final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
- final DBCursor<Record<K, V>> cursor = getCursor())
+ try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
+ final DBCursor<Record<K, V>> cursor = getCursor())
{
while (cursor.getRecord() != null)
{
@@ -287,6 +306,7 @@
void syncToFileSystem() throws ChangelogException
{
checkLogIsEnabledForWrite();
+ sharedLock.lock();
try
{
writer.sync();
@@ -295,6 +315,10 @@
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
}
+ finally
+ {
+ sharedLock.unlock();
+ }
}
/**
@@ -355,7 +379,15 @@
{
try (BlockLogReader<K, V> reader = getReader())
{
- newestRecord = reader.getNewestRecord();
+ sharedLock.lock();
+ try
+ {
+ newestRecord = reader.getNewestRecord();
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
}
catch (IOException ioe)
{
@@ -375,7 +407,7 @@
long getNumberOfRecords() throws ChangelogException
{
// TODO : need a more efficient way to retrieve it
- try(final DBCursor<Record<K, V>> cursor = getCursor())
+ try (final DBCursor<Record<K, V>> cursor = getCursor())
{
long counter = 0L;
while (cursor.next())
@@ -414,9 +446,18 @@
*/
void delete() throws ChangelogException
{
- if (!logfile.delete())
+ exclusiveLock.lock();
+ try
{
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+ final boolean isDeleted = logfile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+ }
+ }
+ finally
+ {
+ exclusiveLock.unlock();
}
}
@@ -517,14 +558,22 @@
* pointing to the provided file position.
* <p>
* Note: there is no check to ensure that provided record and file position are
- * consistent. It is the responsability of the caller of this method.
+ * consistent. It is the responsibility of the caller of this method.
*/
private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
{
this.logFile = logFile;
this.reader = logFile.getReader();
this.currentRecord = record;
- reader.seekToPosition(filePosition);
+ logFile.sharedLock.lock();
+ try
+ {
+ reader.seekToPosition(filePosition);
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
}
/** {@inheritDoc} */
@@ -538,7 +587,15 @@
initialRecord = null;
return true;
}
- currentRecord = reader.readRecord();
+ logFile.sharedLock.lock();
+ try
+ {
+ currentRecord = reader.readRecord();
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
return currentRecord != null;
}
@@ -553,7 +610,16 @@
@Override
public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
throws ChangelogException {
- final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
+ final Pair<Boolean, Record<K, V>> result;
+ logFile.sharedLock.lock();
+ try
+ {
+ result = reader.seekToRecord(key, match, pos);
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
final boolean found = result.getFirst();
initialRecord = found ? result.getSecond() : null;
return found;
@@ -575,7 +641,15 @@
*/
long getFilePosition() throws ChangelogException
{
- return reader.getFilePosition();
+ logFile.sharedLock.lock();
+ try
+ {
+ return reader.getFilePosition();
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
}
/** {@inheritDoc} */
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
index 89de646..4a9d34f 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -187,13 +187,13 @@
{ "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
{ "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
- { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
- { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
- { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+ { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
+ { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+ { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
{ "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
- { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
- { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+ { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
};
}
@@ -277,38 +277,6 @@
}
}
- @Test
- public void testTwoConcurrentWrite() throws Exception
- {
- try (Log<String, String> writeLog1 = openLog(LogFileTest.RECORD_PARSER);
- Log<String, String> writeLog2 = openLog(LogFileTest.RECORD_PARSER))
- {
- writeLog1.append(Record.from("key020", "starting record"));
- AtomicReference<ChangelogException> exceptionRef = new AtomicReference<>();
- Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
- Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
- write1.run();
- write2.run();
-
- write1.join();
- write2.join();
- if (exceptionRef.get() != null)
- {
- throw exceptionRef.get();
- }
- writeLog1.syncToFileSystem();
-
- try (DBCursor<Record<String, String>> cursor = writeLog1.getCursor("key020"))
- {
- for (int i = 1; i <= 61; i++)
- {
- assertThat(cursor.next()).isTrue();
- }
- assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
- }
- }
- }
-
/**
* This test should be disabled.
* Enable it locally when you need to have an rough idea of write performance.
--
Gitblit v1.10.0