From 84cecd3711ddbbd60132cdd80957e387f23cf63e Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 510 +++++++++++++++++++-------------------------------------
1 files changed, 176 insertions(+), 334 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index 005c8c7..f0bb7e9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -25,8 +25,8 @@
*/
package org.opends.server.replication.server.changelog.file;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -35,24 +35,30 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.forgerock.util.Reject;
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
-import static org.opends.messages.ReplicationMessages.*;
-
/**
- * A file-based log that allow to append key-value records and
- * read them using a {@code DBCursor}.
+ * A log file, containing part of a {@code Log}. The log file may be:
+ * <ul>
+ * <li>write-enabled : allowing to append key-value records and read records
+ * from cursors,</li>
+ * <li>read-only : allowing to read records from cursors.</li>
+ * </ul>
+ * <p>
+ * A log file is NOT intended to be used directly, but only has part of a
+ * {@code Log}. In particular, there is no concurrency management and no checks
+ * to ensure that log is not closed when performing any operation on it. Those
+ * are managed at the {@code Log} level.
*
* @param <K>
* Type of the key of a record, which must be comparable.
@@ -61,16 +67,9 @@
*/
final class LogFile<K extends Comparable<K>, V> implements Closeable
{
-
private static final DebugTracer TRACER = getTracer();
- // Non private for use in tests
- static final String LOG_FILE_NAME = "current.log";
-
- /** The path of directory that contains the log file. */
- private final File rootPath;
-
- /** The log file containing the records. */
+ /** The file containing the records. */
private final File logfile;
/** The parser of records, to convert bytes to record and record to bytes. */
@@ -79,30 +78,20 @@
/** The pool to obtain a reader on the log. */
private LogReaderPool readerPool;
- /** The writer on the log, which may be {@code null} if log is not write-enabled */
+ /**
+ * The writer on the log file, which may be {@code null} if log file is not
+ * write-enabled
+ */
private LogWriter writer;
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
- /** Indicates if the log is closed. */
- private volatile boolean isClosed;
-
- /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */
- private final Lock exclusiveLock;
-
- /**
- * The shared lock used for read, write and flush operations on this log file.
- * Write and flush operations can be shared because they're synchronized in the underlying writer.
- * Reads can be done safely when writing because partially written records are handled.
- */
- private final Lock sharedLock;
-
/**
* Creates a new log file.
*
- * @param rootPath
- * Path of root directory that contains the log file.
+ * @param logFilePath
+ * Path of the log file.
* @param parser
* Parser of records.
* @param isWriteEnabled
@@ -111,17 +100,13 @@
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled)
+ private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
throws ChangelogException
{
- this.rootPath = rootPath;
+ Reject.ifNull(logFilePath, parser);
+ this.logfile = logFilePath;
this.parser = parser;
this.isWriteEnabled = isWriteEnabled;
- this.logfile = new File(rootPath, LOG_FILE_NAME);
-
- final ReadWriteLock lock = new ReentrantReadWriteLock(false);
- this.exclusiveLock = lock.writeLock();
- this.sharedLock = lock.readLock();
initialize();
}
@@ -133,18 +118,18 @@
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
- * @param rootPath
- * Path of root directory that contains the log file.
+ * @param logFilePath
+ * Path of the log file.
* @param parser
* Parser of records.
* @return a read-only log file
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath,
+ static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
final RecordParser<K, V> parser) throws ChangelogException
{
- return new LogFile<K, V>(rootPath, parser, false);
+ return new LogFile<K, V>(logFilePath, parser, false);
}
/**
@@ -155,18 +140,18 @@
* Type of the key of a record, which must be comparable.
* @param <V>
* Type of the value of a record.
- * @param rootPath
- * Path of root directory that contains the log file.
+ * @param logFilePath
+ * Path of the log file.
* @param parser
* Parser of records.
* @return a write-enabled log file
* @throws ChangelogException
* If a problem occurs during initialization.
*/
- public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath,
+ static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
final RecordParser<K, V> parser) throws ChangelogException
{
- return new LogFile<K, V>(rootPath, parser, true);
+ return new LogFile<K, V>(logFilePath, parser, true);
}
/**
@@ -180,89 +165,29 @@
*/
private void initialize() throws ChangelogException
{
- exclusiveLock.lock();
- try
+ createLogFileIfNotExists();
+ if (isWriteEnabled)
{
- createRootDirIfNotExists();
- createLogFileIfNotExists();
- isClosed = false;
- if (isWriteEnabled)
- {
- writer = LogWriter.acquireWriter(logfile);
- }
- readerPool = new LogReaderPool(logfile);
+ writer = new LogWriter(logfile);
}
- finally
- {
- exclusiveLock.unlock();
- }
+ readerPool = new LogReaderPool(logfile);
}
/**
- * Returns the name of this log.
+ * Returns the file containing the records.
*
- * @return the name, which corresponds to the directory containing the log
+ * @return the file
*/
- public String getName()
+ File getFile()
{
- return logfile.getParent().toString();
- }
-
- /**
- * Empties the log, discarding all records it contains.
- * <p>
- * This method should not be called with open cursors or
- * when multiple instances of the log are opened.
- *
- * @throws ChangelogException
- * If a problem occurs.
- */
- public void clear() throws ChangelogException
- {
- checkLogIsEnabledForWrite();
-
- exclusiveLock.lock();
- try
- {
- if (isClosed)
- {
- return;
- }
- close();
- final boolean isDeleted = logfile.delete();
- if (!isDeleted)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath()));
- }
- initialize();
- }
- catch (Exception e)
- {
- throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e)));
- }
- finally
- {
- exclusiveLock.unlock();
- }
+ return logfile;
}
private void checkLogIsEnabledForWrite() throws ChangelogException
{
if (!isWriteEnabled)
{
- throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath()));
- }
- }
-
- private void createRootDirIfNotExists() throws ChangelogException
- {
- if (!rootPath.exists())
- {
- final boolean created = rootPath.mkdirs();
- if (!created)
- {
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath()));
- }
+ throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
}
}
@@ -282,25 +207,6 @@
}
/**
- * Add a record at the end of this log from the provided key and value.
- * <p>
- * In order to ensure that record is written out of buffers and persisted
- * to file system, it is necessary to explicitely call the
- * {@code syncToFileSystem()} method.
- *
- * @param key
- * The key of the record.
- * @param value
- * The value of the record.
- * @throws ChangelogException
- * If the record can't be added to the log.
- */
- public void addRecord(final K key, final V value) throws ChangelogException
- {
- addRecord(Record.from(key, value));
- }
-
- /**
* Add the provided record at the end of this log.
* <p>
* In order to ensure that record is written out of buffers and persisted
@@ -312,33 +218,22 @@
* @throws ChangelogException
* If the record can't be added to the log.
*/
- public void addRecord(final Record<K, V> record) throws ChangelogException
+ void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
-
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return;
- }
writer.write(encodeRecord(record));
- writer.flush();
}
catch (IOException e)
{
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e);
- }
- finally
- {
- sharedLock.unlock();
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
}
}
private ByteString encodeRecord(final Record<K, V> record)
{
- final ByteString data = parser.encodeRecord(record.getKey(), record.getValue());
+ final ByteString data = parser.encodeRecord(record);
return new ByteStringBuilder()
.append(data.length())
.append(data)
@@ -346,14 +241,14 @@
}
/**
- * Dump this log as text file, intended for debugging purpose only.
+ * Dump this log file as a text file, intended for debugging purpose only.
*
* @param dumpFile
* File that will contains log records using a human-readable format
* @throws ChangelogException
* If an error occurs during dump
*/
- public void dumpAsTextFile(File dumpFile) throws ChangelogException
+ void dumpAsTextFile(File dumpFile) throws ChangelogException
{
DBCursor<Record<K, V>> cursor = getCursor();
BufferedWriter textWriter = null;
@@ -364,7 +259,7 @@
{
Record<K, V> record = cursor.getRecord();
textWriter.write("key=" + record.getKey());
- textWriter.write(" -- ");
+ textWriter.write(" | ");
textWriter.write("value=" + record.getValue());
textWriter.write('\n');
cursor.next();
@@ -374,7 +269,7 @@
{
// No I18N needed, used for debugging purpose only
throw new ChangelogException(
- Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e);
+ Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e);
}
finally
{
@@ -392,20 +287,16 @@
* @throws ChangelogException
* If the synchronization fails.
*/
- public void syncToFileSystem() throws ChangelogException
+ void syncToFileSystem() throws ChangelogException
{
- exclusiveLock.lock();
+ checkLogIsEnabledForWrite();
try
{
writer.sync();
}
catch (Exception e)
{
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e);
- }
- finally
- {
- exclusiveLock.unlock();
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
}
}
@@ -422,21 +313,9 @@
* @throws ChangelogException
* If the cursor can't be created.
*/
- public LogCursor<K, V> getCursor() throws ChangelogException
+ LogFileCursor<K, V> getCursor() throws ChangelogException
{
- sharedLock.lock();
- try
- {
- if (isClosed)
- {
- return new EmptyLogCursor<K, V>();
- }
- return new LogFileCursor<K, V>(this);
- }
- finally
- {
- sharedLock.unlock();
- }
+ return new LogFileCursor<K, V>(this);
}
/**
@@ -454,7 +333,7 @@
* @throws ChangelogException
* If the cursor can't be created.
*/
- public LogCursor<K, V> getCursor(final K key) throws ChangelogException
+ LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
{
return getCursor(key, false);
}
@@ -476,13 +355,13 @@
* @throws ChangelogException
* If the cursor can't be created.
*/
- public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+ LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
{
return getCursor(key, true);
}
/** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
- private LogCursor<K, V> getCursor(final K key, boolean findNearest)
+ private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
throws ChangelogException
{
if (key == null)
@@ -490,13 +369,8 @@
return getCursor();
}
LogFileCursor<K, V> cursor = null;
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return new EmptyLogCursor<K, V>();
- }
cursor = new LogFileCursor<K, V>(this);
cursor.positionTo(key, findNearest);
// if target is not found, cursor is positioned at end of stream
@@ -506,10 +380,22 @@
StaticUtils.close(cursor);
throw e;
}
- finally
- {
- sharedLock.unlock();
- }
+ }
+
+ /**
+ * Returns a cursor initialised to the provided record and position in file.
+ *
+ * @param record
+ * The initial record this cursor points on
+ * @param position
+ * The file position this cursor points on
+ * @return the cursor
+ * @throws ChangelogException
+ * If a problem occurs while creating the cursor.
+ */
+ LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
+ {
+ return new LogFileCursor<K, V>(this, record, position);
}
/**
@@ -520,7 +406,7 @@
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
- public Record<K, V> getOldestRecord() throws ChangelogException
+ Record<K, V> getOldestRecord() throws ChangelogException
{
DBCursor<Record<K, V>> cursor = null;
try
@@ -541,7 +427,7 @@
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
- public Record<K, V> getNewestRecord() throws ChangelogException
+ Record<K, V> getNewestRecord() throws ChangelogException
{
// TODO : need a more efficient way to retrieve it
DBCursor<Record<K, V>> cursor = null;
@@ -597,45 +483,58 @@
/** {@inheritDoc} */
public void close()
{
- exclusiveLock.lock();
- try
+ if (isWriteEnabled)
{
- if (isClosed)
+ try
{
- return;
+ syncToFileSystem();
}
+ catch (ChangelogException e)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ writer.close();
+ }
+ readerPool.shutdown();
+ }
- if (isWriteEnabled)
- {
- try
- {
- syncToFileSystem();
- }
- catch (ChangelogException e)
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- }
- writer.close();
- }
- readerPool.shutdown();
- isClosed = true;
- }
- finally
+ /**
+ * Delete this log file (file is physically removed). Should be called only
+ * when log file is closed.
+ *
+ * @throws ChangelogException
+ * If log file can't be deleted.
+ */
+ void delete() throws ChangelogException
+ {
+ final boolean isDeleted = logfile.delete();
+ if (!isDeleted)
{
- exclusiveLock.unlock();
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
}
}
+ /**
+ * Return the size of this log file in bytes.
+ *
+ * @return the size of log file
+ */
+ long getSizeInBytes()
+ {
+ return writer.getBytesWritten();
+ }
+
+ /** The path of this log file as a String. */
+ private String getPath()
+ {
+ return logfile.getPath();
+ }
+
/** Read a record from the provided reader. */
private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
{
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return null;
- }
final ByteString recordData = readEncodedRecord(reader);
return recordData != null ? parser.decodeRecord(recordData) : null;
}
@@ -643,10 +542,6 @@
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
}
- finally
- {
- sharedLock.unlock();
- }
}
private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
@@ -672,26 +567,17 @@
}
}
- /** Seek to provided position on the provided reader. */
+ /** Seek to given position on the provided reader. */
private void seek(RandomAccessFile reader, long position) throws ChangelogException
{
- sharedLock.lock();
try
{
- if (isClosed)
- {
- return;
- }
reader.seek(position);
}
catch (IOException e)
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
}
- finally
- {
- sharedLock.unlock();
- }
}
/**
@@ -706,66 +592,42 @@
/** Release the provided reader. */
private void releaseReader(RandomAccessFile reader) {
- sharedLock.lock();
- try
- {
- if (isClosed)
- {
- return;
- }
- readerPool.release(reader);
- }
- finally
- {
- sharedLock.unlock();
- }
+ readerPool.release(reader);
}
- /**
- * A cursor on the log.
- */
- static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode()
{
- /**
- * Position the cursor to the record corresponding to the provided key or to
- * the nearest key (the lowest key higher than the provided key).
- * <p>
- * The record is only searched forward. To search backward, it is first
- * necessary to call the {@code rewind()} method to start from beginning of
- * log file.
- *
- * @param key
- * Key to use as a start position for the cursor. If key is
- * {@code null}, use the key of the first record instead.
- * @param findNearest
- * If {@code true}, start position is the lowest key that is higher
- * than the provided key, otherwise start position is the provided
- * key.
- * @return {@code true} if cursor is successfully positionned to the key or
- * the the nearest key, {@code false} otherwise.
- * @throws ChangelogException
- * If an error occurs when positioning cursor.
- */
- boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+ return logfile.hashCode();
+ }
- /**
- * Rewind the cursor, positioning it to the beginning of the log file,
- * pointing to no record initially.
- *
- * @throws ChangelogException
- * If an error occurs when rewinding to zero.
- */
- void rewind() throws ChangelogException;
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object that)
+ {
+ if (this == that)
+ {
+ return true;
+ }
+ if (!(that instanceof LogFile))
+ {
+ return false;
+ }
+ final LogFile<?, ?> other = (LogFile<?, ?>) that;
+ return logfile.equals(other.logfile);
}
/**
- * Implements a cursor on the log.
+ * Implements a repositionable cursor on the log file.
* <p>
* The cursor initially points to a record, that is {@code cursor.getRecord()}
* is equals to the first record available from the cursor before any call to
* {@code cursor.next()} method.
*/
- private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
+ static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
{
/** The underlying log on which entries are read. */
private final LogFile<K, V> logFile;
@@ -784,7 +646,7 @@
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
- LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
+ private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
{
this.logFile = logFile;
this.reader = logFile.getReader();
@@ -800,17 +662,19 @@
}
}
- /** {@inheritDoc} */
- public String toString()
+ /**
+ * Creates a cursor on the provided log, initialised to the provided record and
+ * pointing to the provided file position.
+ * <p>
+ * Note: there is no check to ensure that provided record and file position are
+ * consistent. It is the responsability of the caller of this method.
+ */
+ private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
{
- return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
- }
-
- /** {@inheritDoc} */
- @Override
- public Record<K,V> getRecord()
- {
- return currentRecord;
+ this.logFile = logFile;
+ this.reader = logFile.getReader();
+ this.currentRecord = record;
+ logFile.seek(reader, filePosition);
}
/** {@inheritDoc} */
@@ -823,6 +687,13 @@
/** {@inheritDoc} */
@Override
+ public Record<K,V> getRecord()
+ {
+ return currentRecord;
+ }
+
+ /** {@inheritDoc} */
+ @Override
public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
do
{
@@ -848,64 +719,35 @@
/** {@inheritDoc} */
@Override
- public void rewind() throws ChangelogException
- {
- logFile.seek(reader, 0);
- currentRecord = null;
- }
-
- /** {@inheritDoc} */
- @Override
public void close()
{
logFile.releaseReader(reader);
}
- }
- /** An empty cursor, that always return null records and false to {@code next()} method. */
- static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
- {
- /** {@inheritDoc} */
- @Override
- public Record<K,V> getRecord()
+ /**
+ * Returns the file position this cursor is pointing at.
+ *
+ * @return the position of reader on the log file
+ * @throws ChangelogException
+ * If an error occurs.
+ */
+ long getFilePosition() throws ChangelogException
{
- return null;
+ try
+ {
+ return reader.getFilePointer();
+ }
+ catch (IOException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
+ }
}
/** {@inheritDoc} */
- @Override
- public boolean next()
- {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
- {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public void rewind() throws ChangelogException
- {
- // nothing to do
- }
-
- /** {@inheritDoc} */
- @Override
- public void close()
- {
- // nothing to do
- }
-
- /** {@inheritDoc} */
- @Override
public String toString()
{
- return "EmptyLogCursor";
+ return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
}
-
}
}
--
Gitblit v1.10.0