From 8fdf1768757fba933e7ce63ac6381eacec41f0c6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 09 Dec 2015 17:41:58 +0000
Subject: [PATCH] OPENDJ-2476: Purge of file-based changelog is very slow and the changelog size is growing.

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java |  110 +++++++++++++--
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java     |  255 ++++++++++++++++++++----------------
 opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java |   44 -----
 opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java                     |    4 
 4 files changed, 243 insertions(+), 170 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java b/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
index 803284b..b8e95f5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/loggers/MeteredStream.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions Copyright 2014 ForgeRock AS
+ *      Portions Copyright 2014-2015 ForgeRock AS
  */
 package org.opends.server.loggers;
 
@@ -37,7 +37,7 @@
 public final class MeteredStream extends OutputStream
 {
   OutputStream out;
-  long written;
+  volatile long written;
 
   /**
    * Create the stream wrapped around the specified output
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
index 52e2785..b072dfc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/Log.java
@@ -187,14 +187,12 @@
   private long lastRotationTime;
 
   /**
-   * The exclusive lock used for writes and lifecycle operations on this log:
+   * The exclusive lock used for log rotation and lifecycle operations on this log:
    * initialize, clear, sync and close.
    */
   private final Lock exclusiveLock;
 
-  /**
-   * The shared lock used for reads and cursor operations on this log.
-   */
+  /** The shared lock used for write operations and accessing {@link #logFiles} map. */
   private final Lock sharedLock;
 
   /**
@@ -416,8 +414,10 @@
    * appended and the method returns immediately.
    * <p>
    * In order to ensure that record is written out of buffers and persisted
-   * to file system, it is necessary to explicitely call the
+   * to file system, it is necessary to explicitly call the
    * {@code syncToFileSystem()} method.
+   * <p>
+   * This method is not thread-safe.
    *
    * @param record
    *          The record to add.
@@ -426,22 +426,42 @@
    */
   public void append(final Record<K, V> record) throws ChangelogException
   {
-    // If this exclusive lock happens to be a bottleneck :
-    // 1. use a shared lock for appending the record first
-    // 2. switch to an exclusive lock only if rotation is needed
-    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
-    exclusiveLock.lock();
+    // This check is ok outside of any locking because only the append thread updates lastAppendedKey.
+    if (recordIsBreakingKeyOrdering(record))
+    {
+      logger.info(LocalizableMessage.raw(
+              "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
+              lastAppendedKey != null ? lastAppendedKey : "null"));
+      return;
+    }
+
+    // Fast-path - assume that no rotation is needed and use shared lock.
+    sharedLock.lock();
     try
     {
       if (isClosed)
       {
         return;
       }
-      if (recordIsBreakingKeyOrdering(record))
+      LogFile<K, V> headLogFile = getHeadLogFile();
+      if (!mustRotate(headLogFile))
       {
-        logger.info(LocalizableMessage.raw(
-            "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
-            lastAppendedKey != null ? lastAppendedKey : "null"));
+        headLogFile.append(record);
+        lastAppendedKey = record.getKey();
+        return;
+      }
+    }
+    finally
+    {
+      sharedLock.unlock();
+    }
+
+    // Slow-path - rotation is needed so use exclusive lock.
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
         return;
       }
       LogFile<K, V> headLogFile = getHeadLogFile();
@@ -837,9 +857,17 @@
    */
   void dumpAsTextFile(File dumpDirectory) throws ChangelogException
   {
-    for (LogFile<K, V> logFile : logFiles.values())
+    sharedLock.lock();
+    try
     {
-      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+      for (LogFile<K, V> logFile : logFiles.values())
+      {
+        logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+      }
+    }
+    finally
+    {
+      sharedLock.unlock();
     }
   }
 
@@ -892,19 +920,19 @@
     sharedLock.lock();
     try
     {
-    K key = null;
-    for (LogFile<K, V> logFile : logFiles.values())
-    {
-      final Record<K, V> record = logFile.getOldestRecord();
-      final V2 oldestValue = mapper.map(record.getValue());
-      if (oldestValue.compareTo(limitValue) > 0)
+      K key = null;
+      for (LogFile<K, V> logFile : logFiles.values())
       {
-        return key;
+        final Record<K, V> record = logFile.getOldestRecord();
+        final V2 oldestValue = mapper.map(record.getValue());
+        if (oldestValue.compareTo(limitValue) > 0)
+        {
+          return key;
+        }
+        key = record.getKey();
       }
-      key = record.getKey();
+      return key;
     }
-    return key;
-  }
     finally
     {
       sharedLock.unlock();
@@ -950,6 +978,7 @@
    * <p>
    * All cursors opened on this log are temporarily disabled (closing underlying resources)
    * and then re-open with their previous state.
+   * @GuardedBy("exclusiveLock")
    */
   private void rotateHeadLogFile() throws ChangelogException
   {
@@ -1028,7 +1057,10 @@
        + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
   }
 
-  /** Update the cursors that were pointing to head after a rotation of the head log file. */
+  /**
+   * Update the cursors that were pointing to head after a rotation of the head log file.
+   * @GuardedBy("exclusiveLock")
+   */
   private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors)
       throws ChangelogException
   {
@@ -1040,7 +1072,7 @@
       if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
       {
         final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
-        final LogFile<K, V> logFile = findLogFileFor(previousKey);
+        final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
         final AbortableLogCursor<K, V> cursor = pair.getFirst();
         cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
       }
@@ -1058,6 +1090,7 @@
   /**
    * Disable the cursors opened on the head log file log, by closing their underlying cursor.
    * Returns the state of each cursor just before the close operation.
+   * @GuardedBy("exclusiveLock")
    *
    * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
    * @throws ChangelogException
@@ -1111,12 +1144,20 @@
    */
   private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
   {
-    if (isHeadLogFile(currentLogFile))
+    sharedLock.lock();
+    try
     {
-      return null;
+      if (isHeadLogFile(currentLogFile))
+      {
+        return null;
+      }
+      final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+      return logFiles.higherEntry(bounds.getSecond()).getValue();
     }
-    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
-    return logFiles.higherEntry(bounds.getSecond()).getValue();
+    finally
+    {
+      sharedLock.unlock();
+    }
   }
 
   private boolean isHeadLogFile(final LogFile<K, V> logFile)
@@ -1124,14 +1165,23 @@
     return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
   }
 
-  /** Returns the log file that should contain the provided key. */
-  private LogFile<K, V> findLogFileFor(final K key)
+  /** @GuardedBy("sharedLock") */
+  private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
   {
     if (key == null || logFiles.lowerKey(key) == null)
     {
       return getOldestLogFile();
     }
-    return logFiles.ceilingEntry(key).getValue();
+
+    final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue();
+    if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
+        && candidate.getOldestRecord().getKey().compareTo(key) > 0)
+    {
+      // This handle the special case where the first key of the candidate is actually greater than the expected one.
+      // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
+      return logFiles.floorEntry(key).getValue();
+    }
+    return candidate;
   }
 
   /**
@@ -1217,18 +1267,28 @@
     @Override
     public boolean next() throws ChangelogException
     {
-      final boolean hasNext = currentCursor.next();
-      if (hasNext)
+      // Lock is needed here to ensure that log rotation is performed atomically.
+      // This ensures that currentCursor will not be aborted concurrently.
+      log.sharedLock.lock();
+      try
       {
-        return true;
+        final boolean hasNext = currentCursor.next();
+        if (hasNext)
+        {
+          return true;
+        }
+        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+        if (logFile != null)
+        {
+          switchToLogFile(logFile);
+          return currentCursor.next();
+        }
+        return false;
       }
-      final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
-      if (logFile != null)
+      finally
       {
-        switchToLogFile(logFile);
-        return currentCursor.next();
+        log.sharedLock.unlock();
       }
-      return false;
     }
 
     @Override
@@ -1244,19 +1304,38 @@
         final PositionStrategy positionStrategy)
             throws ChangelogException
     {
-      final LogFile<K, V> logFile = log.findLogFileFor(key);
-      if (logFile != currentLogFile)
+      // Lock is needed here to ensure that log rotation is performed atomically.
+      // This ensures that currentLogFile will not be closed concurrently.
+      log.sharedLock.lock();
+      try
       {
-        switchToLogFile(logFile);
+        final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy);
+        if (logFile != currentLogFile)
+        {
+          switchToLogFile(logFile);
+        }
+        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
       }
-      return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
+      finally
+      {
+        log.sharedLock.unlock();
+      }
     }
 
-    /** Returns the state of this cursor. */
     @Override
     CursorState<K, V> getState() throws ChangelogException
     {
-      return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+      // Lock is needed here to ensure that log rotation is performed atomically.
+      // This ensures that currentCursor will not be aborted concurrently.
+      log.sharedLock.lock();
+      try
+      {
+        return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
     }
 
     @Override
@@ -1442,65 +1521,33 @@
     @Override
     public Record<K, V> getRecord()
     {
-      log.sharedLock.lock();
-      try
-      {
-        return delegate.getRecord();
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return delegate.getRecord();
     }
 
     @Override
-    public boolean next() throws ChangelogException
+    public synchronized boolean next() throws ChangelogException
     {
-      log.sharedLock.lock();
-      try
+      if (mustAbort)
       {
-        if (mustAbort)
-        {
-          delegate.close();
-          delegate = new AbortedLogCursor<>(log.getPath());
-          mustAbort = false;
-        }
-        return delegate.next();
+        delegate.close();
+        delegate = new AbortedLogCursor<>(log.getPath());
+        mustAbort = false;
       }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return delegate.next();
     }
 
     @Override
     public void close()
     {
-      log.sharedLock.lock();
-      try
-      {
-        delegate.close();
-        log.unregisterCursor(this);
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      delegate.close();
+      log.unregisterCursor(this);
     }
 
     @Override
     public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
         throws ChangelogException
     {
-      log.sharedLock.lock();
-      try
-      {
-        return delegate.positionTo(key, matchStrategy, positionStrategy);
-      }
-      finally
-      {
-        log.sharedLock.unlock();
-      }
+      return delegate.positionTo(key, matchStrategy, positionStrategy);
     }
 
     /**
@@ -1509,49 +1556,33 @@
      * <p>
      * This method is called only when log.exclusiveLock has been acquired.
      */
-    void abort()
+    synchronized void abort()
     {
       mustAbort = true;
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     CursorState<K, V> getState() throws ChangelogException
     {
       return delegate.getState();
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     void closeUnderlyingCursor()
     {
       delegate.closeUnderlyingCursor();
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
     {
       delegate.reinitializeTo(cursorState);
     }
 
-    /**
-     * {@inheritDoc}
-     * <p>
-     * This method is called only when log.exclusiveLock has been acquired.
-     */
+    /** @GuardedBy("exclusiveLock") */
     @Override
     boolean isAccessingLogFile(LogFile<K, V> logFile)
     {
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index 16921a1..cfe86b5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -33,6 +33,9 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -52,9 +55,7 @@
  * </ul>
  * <p>
  * A log file is NOT intended to be used directly, but only has part of a
- * {@link Log}. In particular, there is no concurrency management and no checks
- * to ensure that log is not closed when performing any operation on it. Those
- * are managed at the {@code Log} level.
+ * {@link Log}.
  *
  * @param <K>
  *          Type of the key of a record, which must be comparable.
@@ -80,6 +81,12 @@
   /** Indicates if log is enabled for write. */
   private final boolean isWriteEnabled;
 
+  /** Lock used to ensure write atomicity. */
+  private final Lock exclusiveLock;
+
+  /** Lock used to ensure that log file is in a consistent state when reading it. */
+  private final Lock sharedLock;
+
   private Record<K, V> newestRecord;
 
   /**
@@ -113,6 +120,10 @@
       writer = null;
     }
     readerPool = new LogReaderPool<>(logfile, parser);
+
+    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    exclusiveLock = rwLock.writeLock();
+    sharedLock = rwLock.readLock();
   }
 
   /**
@@ -204,8 +215,8 @@
    */
   private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
   {
-    try(final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
-        final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
+    try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+         final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
     {
       final long lastValidPosition = reader.checkLogIsValid();
       if (lastValidPosition != -1)
@@ -238,8 +249,16 @@
   void append(final Record<K, V> record) throws ChangelogException
   {
     checkLogIsEnabledForWrite();
-    writer.write(record);
-    newestRecord = record;
+    exclusiveLock.lock();
+    try
+    {
+      writer.write(record);
+      newestRecord = record;
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
   }
 
   /**
@@ -252,8 +271,8 @@
    */
   void dumpAsTextFile(File dumpFile) throws ChangelogException
   {
-    try(final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
-        final DBCursor<Record<K, V>> cursor = getCursor())
+    try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
+         final DBCursor<Record<K, V>> cursor = getCursor())
     {
       while (cursor.getRecord() != null)
       {
@@ -287,6 +306,7 @@
   void syncToFileSystem() throws ChangelogException
   {
     checkLogIsEnabledForWrite();
+    sharedLock.lock();
     try
     {
       writer.sync();
@@ -295,6 +315,10 @@
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
     }
+    finally
+    {
+      sharedLock.unlock();
+    }
   }
 
   /**
@@ -355,7 +379,15 @@
     {
       try (BlockLogReader<K, V> reader = getReader())
       {
-        newestRecord = reader.getNewestRecord();
+        sharedLock.lock();
+        try
+        {
+          newestRecord = reader.getNewestRecord();
+        }
+        finally
+        {
+          sharedLock.unlock();
+        }
       }
       catch (IOException ioe)
       {
@@ -375,7 +407,7 @@
   long getNumberOfRecords() throws ChangelogException
   {
     // TODO  : need a more efficient way to retrieve it
-    try(final DBCursor<Record<K, V>> cursor = getCursor())
+    try (final DBCursor<Record<K, V>> cursor = getCursor())
     {
       long counter = 0L;
       while (cursor.next())
@@ -414,9 +446,18 @@
    */
   void delete() throws ChangelogException
   {
-    if (!logfile.delete())
+    exclusiveLock.lock();
+    try
     {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+      final boolean isDeleted = logfile.delete();
+      if (!isDeleted)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+      }
+    }
+    finally
+    {
+      exclusiveLock.unlock();
     }
   }
 
@@ -517,14 +558,22 @@
      * pointing to the provided file position.
      * <p>
      * Note: there is no check to ensure that provided record and file position are
-     * consistent. It is the responsability of the caller of this method.
+     * consistent. It is the responsibility of the caller of this method.
      */
     private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
     {
       this.logFile = logFile;
       this.reader = logFile.getReader();
       this.currentRecord = record;
-      reader.seekToPosition(filePosition);
+      logFile.sharedLock.lock();
+      try
+      {
+        reader.seekToPosition(filePosition);
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
     }
 
     /** {@inheritDoc} */
@@ -538,7 +587,15 @@
         initialRecord = null;
         return true;
       }
-      currentRecord = reader.readRecord();
+      logFile.sharedLock.lock();
+      try
+      {
+        currentRecord = reader.readRecord();
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
       return currentRecord != null;
     }
 
@@ -553,7 +610,16 @@
     @Override
     public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
         throws ChangelogException {
-      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
+      final Pair<Boolean, Record<K, V>> result;
+      logFile.sharedLock.lock();
+      try
+      {
+        result = reader.seekToRecord(key, match, pos);
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
       final boolean found = result.getFirst();
       initialRecord = found ? result.getSecond() : null;
       return found;
@@ -575,7 +641,15 @@
      */
     long getFilePosition() throws ChangelogException
     {
-      return reader.getFilePosition();
+      logFile.sharedLock.lock();
+      try
+      {
+        return reader.getFilePosition();
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
     }
 
     /** {@inheritDoc} */
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
index 89de646..4a9d34f 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -187,13 +187,13 @@
       { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
       { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
 
-      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
-      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
-      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 1, 10 },
+      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 2, 10 },
+      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 5, 10 },
       { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
-      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
-      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
-      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, 10, 10 },
+      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, -1, -1 },
+      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY,  AFTER_MATCHING_KEY, -1, -1 },
     };
   }
 
@@ -277,38 +277,6 @@
     }
   }
 
-  @Test
-  public void testTwoConcurrentWrite() throws Exception
-  {
-    try (Log<String, String> writeLog1 = openLog(LogFileTest.RECORD_PARSER);
-        Log<String, String> writeLog2 = openLog(LogFileTest.RECORD_PARSER))
-    {
-      writeLog1.append(Record.from("key020", "starting record"));
-      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<>();
-      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
-      Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
-      write1.run();
-      write2.run();
-
-      write1.join();
-      write2.join();
-      if (exceptionRef.get() != null)
-      {
-        throw exceptionRef.get();
-      }
-      writeLog1.syncToFileSystem();
-
-      try (DBCursor<Record<String, String>> cursor = writeLog1.getCursor("key020"))
-      {
-        for (int i = 1; i <= 61; i++)
-        {
-          assertThat(cursor.next()).isTrue();
-        }
-        assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
-      }
-    }
-  }
-
   /**
    *  This test should be disabled.
    *  Enable it locally when you need to have an rough idea of write performance.

--
Gitblit v1.10.0