From 3c931b0f1ba72ce655f1fe03295aff77b4bfcf38 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 12 Jun 2014 15:08:37 +0000
Subject: [PATCH] OPENDJ-1472 : File based changelog : optimize random seek in each log file CR-3727
---
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 155 ++++++---------------------------------------------
1 files changed, 20 insertions(+), 135 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index f0bb7e9..b27c78e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -30,11 +30,9 @@
import java.io.BufferedWriter;
import java.io.Closeable;
-import java.io.EOFException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
@@ -42,11 +40,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
+import com.forgerock.opendj.util.Pair;
+
/**
* A log file, containing part of a {@code Log}. The log file may be:
* <ul>
@@ -72,17 +70,14 @@
/** The file containing the records. */
private final File logfile;
- /** The parser of records, to convert bytes to record and record to bytes. */
- private final RecordParser<K, V> parser;
-
/** The pool to obtain a reader on the log. */
- private LogReaderPool readerPool;
+ private final LogReaderPool<K, V> readerPool;
/**
* The writer on the log file, which may be {@code null} if log file is not
- * write-enabled
+ * write-enabled.
*/
- private LogWriter writer;
+ private final BlockLogWriter<K, V> writer;
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
@@ -105,10 +100,11 @@
{
Reject.ifNull(logFilePath, parser);
this.logfile = logFilePath;
- this.parser = parser;
this.isWriteEnabled = isWriteEnabled;
- initialize();
+ createLogFileIfNotExists();
+ writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
+ readerPool = new LogReaderPool<K, V>(logfile, parser);
}
/**
@@ -155,25 +151,6 @@
}
/**
- * Initialize this log.
- * <p>
- * Create directories and file if necessary, and create a writer
- * and pool of readers.
- *
- * @throws ChangelogException
- * If an errors occurs during initialization.
- */
- private void initialize() throws ChangelogException
- {
- createLogFileIfNotExists();
- if (isWriteEnabled)
- {
- writer = new LogWriter(logfile);
- }
- readerPool = new LogReaderPool(logfile);
- }
-
- /**
* Returns the file containing the records.
*
* @return the file
@@ -221,23 +198,7 @@
void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
- try
- {
- writer.write(encodeRecord(record));
- }
- catch (IOException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
- }
- }
-
- private ByteString encodeRecord(final Record<K, V> record)
- {
- final ByteString data = parser.encodeRecord(record);
- return new ByteStringBuilder()
- .append(data.length())
- .append(data)
- .toByteString();
+ writer.write(record);
}
/**
@@ -530,73 +491,21 @@
return logfile.getPath();
}
- /** Read a record from the provided reader. */
- private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
- {
- try
- {
- final ByteString recordData = readEncodedRecord(reader);
- return recordData != null ? parser.decodeRecord(recordData) : null;
- }
- catch(DecodingException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
- }
- }
-
- private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
- {
- try
- {
- final byte[] lengthData = new byte[4];
- reader.readFully(lengthData);
- int recordLength = ByteString.wrap(lengthData).toInt();
-
- final byte[] recordData = new byte[recordLength];
- reader.readFully(recordData);
- return ByteString.wrap(recordData);
- }
- catch(EOFException e)
- {
- // end of stream, no record or uncomplete record
- return null;
- }
- catch (IOException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
- }
- }
-
- /** Seek to given position on the provided reader. */
- private void seek(RandomAccessFile reader, long position) throws ChangelogException
- {
- try
- {
- reader.seek(position);
- }
- catch (IOException e)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
- }
- }
-
/**
- * Returns a random access file to read this log.
+ * Returns a reader for this log.
* <p>
* Assumes that calling methods ensure that log is not closed.
*/
- private RandomAccessFile getReader() throws ChangelogException
+ private BlockLogReader<K, V> getReader() throws ChangelogException
{
return readerPool.get();
}
/** Release the provided reader. */
- private void releaseReader(RandomAccessFile reader) {
+ private void releaseReader(BlockLogReader<K, V> reader) {
readerPool.release(reader);
}
-
-
/** {@inheritDoc} */
@Override
public int hashCode()
@@ -633,7 +542,7 @@
private final LogFile<K, V> logFile;
/** To read the records. */
- private final RandomAccessFile reader;
+ private final BlockLogReader<K, V> reader;
/** The current available record, may be {@code null}. */
private Record<K,V> currentRecord;
@@ -674,14 +583,14 @@
this.logFile = logFile;
this.reader = logFile.getReader();
this.currentRecord = record;
- logFile.seek(reader, filePosition);
+ reader.seekToPosition(filePosition);
}
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
- currentRecord = logFile.readRecord(reader);
+ currentRecord = reader.readRecord();
return currentRecord != null;
}
@@ -695,26 +604,10 @@
/** {@inheritDoc} */
@Override
public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
- do
- {
- if (currentRecord != null)
- {
- final boolean matches = findNearest ?
- currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
- if (matches)
- {
- if (findNearest && currentRecord.getKey().equals(key))
- {
- // skip key in order to position on lowest higher key
- next();
- }
- return true;
- }
- }
- next();
- }
- while (currentRecord != null);
- return false;
+ final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
+ final boolean found = result.getFirst();
+ currentRecord = found ? result.getSecond() : null;
+ return found;
}
/** {@inheritDoc} */
@@ -733,15 +626,7 @@
*/
long getFilePosition() throws ChangelogException
{
- try
- {
- return reader.getFilePointer();
- }
- catch (IOException e)
- {
- throw new ChangelogException(
- ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
- }
+ return reader.getFilePosition();
}
/** {@inheritDoc} */
--
Gitblit v1.10.0