From 8fdf1768757fba933e7ce63ac6381eacec41f0c6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 09 Dec 2015 17:41:58 +0000
Subject: [PATCH] OPENDJ-2476: Purge of file-based changelog is very slow and the changelog size is growing.

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java |  110 ++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 92 insertions(+), 18 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index 16921a1..cfe86b5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -33,6 +33,9 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -52,9 +55,7 @@
  * </ul>
  * <p>
  * A log file is NOT intended to be used directly, but only has part of a
- * {@link Log}. In particular, there is no concurrency management and no checks
- * to ensure that log is not closed when performing any operation on it. Those
- * are managed at the {@code Log} level.
+ * {@link Log}.
  *
  * @param <K>
  *          Type of the key of a record, which must be comparable.
@@ -80,6 +81,12 @@
   /** Indicates if log is enabled for write. */
   private final boolean isWriteEnabled;
 
+  /** Lock used to ensure write atomicity. */
+  private final Lock exclusiveLock;
+
+  /** Lock used to ensure that log file is in a consistent state when reading it. */
+  private final Lock sharedLock;
+
   private Record<K, V> newestRecord;
 
   /**
@@ -113,6 +120,10 @@
       writer = null;
     }
     readerPool = new LogReaderPool<>(logfile, parser);
+
+    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    exclusiveLock = rwLock.writeLock();
+    sharedLock = rwLock.readLock();
   }
 
   /**
@@ -204,8 +215,8 @@
    */
   private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
   {
-    try(final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
-        final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
+    try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+         final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
     {
       final long lastValidPosition = reader.checkLogIsValid();
       if (lastValidPosition != -1)
@@ -238,8 +249,16 @@
   void append(final Record<K, V> record) throws ChangelogException
   {
     checkLogIsEnabledForWrite();
-    writer.write(record);
-    newestRecord = record;
+    exclusiveLock.lock();
+    try
+    {
+      writer.write(record);
+      newestRecord = record;
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
   }
 
   /**
@@ -252,8 +271,8 @@
    */
   void dumpAsTextFile(File dumpFile) throws ChangelogException
   {
-    try(final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
-        final DBCursor<Record<K, V>> cursor = getCursor())
+    try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
+         final DBCursor<Record<K, V>> cursor = getCursor())
     {
       while (cursor.getRecord() != null)
       {
@@ -287,6 +306,7 @@
   void syncToFileSystem() throws ChangelogException
   {
     checkLogIsEnabledForWrite();
+    sharedLock.lock();
     try
     {
       writer.sync();
@@ -295,6 +315,10 @@
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
     }
+    finally
+    {
+      sharedLock.unlock();
+    }
   }
 
   /**
@@ -355,7 +379,15 @@
     {
       try (BlockLogReader<K, V> reader = getReader())
       {
-        newestRecord = reader.getNewestRecord();
+        sharedLock.lock();
+        try
+        {
+          newestRecord = reader.getNewestRecord();
+        }
+        finally
+        {
+          sharedLock.unlock();
+        }
       }
       catch (IOException ioe)
       {
@@ -375,7 +407,7 @@
   long getNumberOfRecords() throws ChangelogException
   {
     // TODO  : need a more efficient way to retrieve it
-    try(final DBCursor<Record<K, V>> cursor = getCursor())
+    try (final DBCursor<Record<K, V>> cursor = getCursor())
     {
       long counter = 0L;
       while (cursor.next())
@@ -414,9 +446,18 @@
    */
   void delete() throws ChangelogException
   {
-    if (!logfile.delete())
+    exclusiveLock.lock();
+    try
     {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+      final boolean isDeleted = logfile.delete();
+      if (!isDeleted)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+      }
+    }
+    finally
+    {
+      exclusiveLock.unlock();
     }
   }
 
@@ -517,14 +558,22 @@
      * pointing to the provided file position.
      * <p>
      * Note: there is no check to ensure that provided record and file position are
-     * consistent. It is the responsability of the caller of this method.
+     * consistent. It is the responsibility of the caller of this method.
      */
     private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
     {
       this.logFile = logFile;
       this.reader = logFile.getReader();
       this.currentRecord = record;
-      reader.seekToPosition(filePosition);
+      logFile.sharedLock.lock();
+      try
+      {
+        reader.seekToPosition(filePosition);
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
     }
 
     /** {@inheritDoc} */
@@ -538,7 +587,15 @@
         initialRecord = null;
         return true;
       }
-      currentRecord = reader.readRecord();
+      logFile.sharedLock.lock();
+      try
+      {
+        currentRecord = reader.readRecord();
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
       return currentRecord != null;
     }
 
@@ -553,7 +610,16 @@
     @Override
     public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
         throws ChangelogException {
-      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
+      final Pair<Boolean, Record<K, V>> result;
+      logFile.sharedLock.lock();
+      try
+      {
+        result = reader.seekToRecord(key, match, pos);
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
       final boolean found = result.getFirst();
       initialRecord = found ? result.getSecond() : null;
       return found;
@@ -575,7 +641,15 @@
      */
     long getFilePosition() throws ChangelogException
     {
-      return reader.getFilePosition();
+      logFile.sharedLock.lock();
+      try
+      {
+        return reader.getFilePosition();
+      }
+      finally
+      {
+        logFile.sharedLock.unlock();
+      }
     }
 
     /** {@inheritDoc} */

--
Gitblit v1.10.0