From 3c931b0f1ba72ce655f1fe03295aff77b4bfcf38 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 12 Jun 2014 15:08:37 +0000
Subject: [PATCH] OPENDJ-1472 : File based changelog : optimize random seek in each log file CR-3727

---
 opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java |  155 ++++++---------------------------------------------
 1 files changed, 20 insertions(+), 135 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index f0bb7e9..b27c78e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -30,11 +30,9 @@
 
 import java.io.BufferedWriter;
 import java.io.Closeable;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 
 import org.forgerock.util.Reject;
 import org.opends.messages.Message;
@@ -42,11 +40,11 @@
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.ByteStringBuilder;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
 
+import com.forgerock.opendj.util.Pair;
+
 /**
  * A log file, containing part of a {@code Log}. The log file may be:
  * <ul>
@@ -72,17 +70,14 @@
   /** The file containing the records. */
   private final File logfile;
 
-  /** The parser of records, to convert bytes to record and record to bytes. */
-  private final RecordParser<K, V> parser;
-
   /** The pool to obtain a reader on the log. */
-  private LogReaderPool readerPool;
+  private final LogReaderPool<K, V> readerPool;
 
   /**
    * The writer on the log file, which may be {@code null} if log file is not
-   * write-enabled
+   * write-enabled.
    */
-  private LogWriter writer;
+  private final BlockLogWriter<K, V> writer;
 
   /** Indicates if log is enabled for write. */
   private final boolean isWriteEnabled;
@@ -105,10 +100,11 @@
   {
     Reject.ifNull(logFilePath, parser);
     this.logfile = logFilePath;
-    this.parser = parser;
     this.isWriteEnabled = isWriteEnabled;
 
-    initialize();
+    createLogFileIfNotExists();
+    writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
+    readerPool = new LogReaderPool<K, V>(logfile, parser);
   }
 
   /**
@@ -155,25 +151,6 @@
   }
 
   /**
-   * Initialize this log.
-   * <p>
-   * Create directories and file if necessary, and create a writer
-   * and pool of readers.
-   *
-   * @throws ChangelogException
-   *            If an errors occurs during initialization.
-   */
-  private void initialize() throws ChangelogException
-  {
-    createLogFileIfNotExists();
-    if (isWriteEnabled)
-    {
-      writer = new LogWriter(logfile);
-    }
-    readerPool = new LogReaderPool(logfile);
-  }
-
-  /**
    * Returns the file containing the records.
    *
    * @return the file
@@ -221,23 +198,7 @@
   void append(final Record<K, V> record) throws ChangelogException
   {
     checkLogIsEnabledForWrite();
-    try
-    {
-      writer.write(encodeRecord(record));
-    }
-    catch (IOException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
-    }
-  }
-
-  private ByteString encodeRecord(final Record<K, V> record)
-  {
-    final ByteString data = parser.encodeRecord(record);
-    return new ByteStringBuilder()
-      .append(data.length())
-      .append(data)
-      .toByteString();
+    writer.write(record);
   }
 
   /**
@@ -530,73 +491,21 @@
     return logfile.getPath();
   }
 
-  /** Read a record from the provided reader. */
-  private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
-  {
-    try
-    {
-      final ByteString recordData = readEncodedRecord(reader);
-      return recordData != null ? parser.decodeRecord(recordData) : null;
-    }
-    catch(DecodingException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
-    }
-  }
-
-  private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
-  {
-    try
-    {
-      final byte[] lengthData = new byte[4];
-      reader.readFully(lengthData);
-      int recordLength = ByteString.wrap(lengthData).toInt();
-
-      final byte[] recordData = new byte[recordLength];
-      reader.readFully(recordData);
-      return ByteString.wrap(recordData);
-    }
-    catch(EOFException e)
-    {
-      // end of stream, no record or uncomplete record
-      return null;
-    }
-    catch (IOException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
-    }
-  }
-
-  /** Seek to given position on the provided reader. */
-  private void seek(RandomAccessFile reader, long position) throws ChangelogException
-  {
-    try
-    {
-      reader.seek(position);
-    }
-    catch (IOException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
-    }
-  }
-
   /**
-   * Returns a random access file to read this log.
+   * Returns a reader for this log.
    * <p>
    * Assumes that calling methods ensure that log is not closed.
    */
-  private RandomAccessFile getReader() throws ChangelogException
+  private BlockLogReader<K, V> getReader() throws ChangelogException
   {
     return readerPool.get();
   }
 
   /** Release the provided reader. */
-  private void releaseReader(RandomAccessFile reader) {
+  private void releaseReader(BlockLogReader<K, V> reader) {
     readerPool.release(reader);
   }
 
-
-
   /** {@inheritDoc} */
   @Override
   public int hashCode()
@@ -633,7 +542,7 @@
     private final LogFile<K, V> logFile;
 
     /** To read the records. */
-    private final RandomAccessFile reader;
+    private final BlockLogReader<K, V> reader;
 
     /** The current available record, may be {@code null}. */
     private Record<K,V> currentRecord;
@@ -674,14 +583,14 @@
       this.logFile = logFile;
       this.reader = logFile.getReader();
       this.currentRecord = record;
-      logFile.seek(reader, filePosition);
+      reader.seekToPosition(filePosition);
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean next() throws ChangelogException
     {
-      currentRecord = logFile.readRecord(reader);
+      currentRecord = reader.readRecord();
       return currentRecord != null;
     }
 
@@ -695,26 +604,10 @@
     /** {@inheritDoc} */
     @Override
     public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
-      do
-      {
-        if (currentRecord != null)
-        {
-          final boolean matches = findNearest ?
-              currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
-          if (matches)
-          {
-            if (findNearest && currentRecord.getKey().equals(key))
-            {
-              // skip key in order to position on lowest higher key
-              next();
-            }
-            return true;
-          }
-        }
-        next();
-      }
-      while (currentRecord != null);
-      return false;
+      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
+      final boolean found = result.getFirst();
+      currentRecord = found ? result.getSecond() : null;
+      return found;
     }
 
     /** {@inheritDoc} */
@@ -733,15 +626,7 @@
      */
     long getFilePosition() throws ChangelogException
     {
-      try
-      {
-        return reader.getFilePointer();
-      }
-      catch (IOException e)
-      {
-        throw new ChangelogException(
-            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
-      }
+      return reader.getFilePosition();
     }
 
     /** {@inheritDoc} */

--
Gitblit v1.10.0