From 8fdf1768757fba933e7ce63ac6381eacec41f0c6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 09 Dec 2015 17:41:58 +0000
Subject: [PATCH] OPENDJ-2476: Purge of file-based changelog is very slow and the changelog size is growing.
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java | 110 ++++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 92 insertions(+), 18 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
index 16921a1..cfe86b5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -33,6 +33,9 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -52,9 +55,7 @@
* </ul>
* <p>
* A log file is NOT intended to be used directly, but only has part of a
- * {@link Log}. In particular, there is no concurrency management and no checks
- * to ensure that log is not closed when performing any operation on it. Those
- * are managed at the {@code Log} level.
+ * {@link Log}.
*
* @param <K>
* Type of the key of a record, which must be comparable.
@@ -80,6 +81,12 @@
/** Indicates if log is enabled for write. */
private final boolean isWriteEnabled;
+ /** Lock used to ensure write atomicity. */
+ private final Lock exclusiveLock;
+
+ /** Lock used to ensure that log file is in a consistent state when reading it. */
+ private final Lock sharedLock;
+
private Record<K, V> newestRecord;
/**
@@ -113,6 +120,10 @@
writer = null;
}
readerPool = new LogReaderPool<>(logfile, parser);
+
+ final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ exclusiveLock = rwLock.writeLock();
+ sharedLock = rwLock.readLock();
}
/**
@@ -204,8 +215,8 @@
*/
private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
{
- try(final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
- final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
+ try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+ final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
{
final long lastValidPosition = reader.checkLogIsValid();
if (lastValidPosition != -1)
@@ -238,8 +249,16 @@
void append(final Record<K, V> record) throws ChangelogException
{
checkLogIsEnabledForWrite();
- writer.write(record);
- newestRecord = record;
+ exclusiveLock.lock();
+ try
+ {
+ writer.write(record);
+ newestRecord = record;
+ }
+ finally
+ {
+ exclusiveLock.unlock();
+ }
}
/**
@@ -252,8 +271,8 @@
*/
void dumpAsTextFile(File dumpFile) throws ChangelogException
{
- try(final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
- final DBCursor<Record<K, V>> cursor = getCursor())
+ try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
+ final DBCursor<Record<K, V>> cursor = getCursor())
{
while (cursor.getRecord() != null)
{
@@ -287,6 +306,7 @@
void syncToFileSystem() throws ChangelogException
{
checkLogIsEnabledForWrite();
+ sharedLock.lock();
try
{
writer.sync();
@@ -295,6 +315,10 @@
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
}
+ finally
+ {
+ sharedLock.unlock();
+ }
}
/**
@@ -355,7 +379,15 @@
{
try (BlockLogReader<K, V> reader = getReader())
{
- newestRecord = reader.getNewestRecord();
+ sharedLock.lock();
+ try
+ {
+ newestRecord = reader.getNewestRecord();
+ }
+ finally
+ {
+ sharedLock.unlock();
+ }
}
catch (IOException ioe)
{
@@ -375,7 +407,7 @@
long getNumberOfRecords() throws ChangelogException
{
// TODO : need a more efficient way to retrieve it
- try(final DBCursor<Record<K, V>> cursor = getCursor())
+ try (final DBCursor<Record<K, V>> cursor = getCursor())
{
long counter = 0L;
while (cursor.next())
@@ -414,9 +446,18 @@
*/
void delete() throws ChangelogException
{
- if (!logfile.delete())
+ exclusiveLock.lock();
+ try
{
- throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+ final boolean isDeleted = logfile.delete();
+ if (!isDeleted)
+ {
+ throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+ }
+ }
+ finally
+ {
+ exclusiveLock.unlock();
}
}
@@ -517,14 +558,22 @@
* pointing to the provided file position.
* <p>
* Note: there is no check to ensure that provided record and file position are
- * consistent. It is the responsability of the caller of this method.
+ * consistent. It is the responsibility of the caller of this method.
*/
private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
{
this.logFile = logFile;
this.reader = logFile.getReader();
this.currentRecord = record;
- reader.seekToPosition(filePosition);
+ logFile.sharedLock.lock();
+ try
+ {
+ reader.seekToPosition(filePosition);
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
}
/** {@inheritDoc} */
@@ -538,7 +587,15 @@
initialRecord = null;
return true;
}
- currentRecord = reader.readRecord();
+ logFile.sharedLock.lock();
+ try
+ {
+ currentRecord = reader.readRecord();
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
return currentRecord != null;
}
@@ -553,7 +610,16 @@
@Override
public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
throws ChangelogException {
- final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
+ final Pair<Boolean, Record<K, V>> result;
+ logFile.sharedLock.lock();
+ try
+ {
+ result = reader.seekToRecord(key, match, pos);
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
final boolean found = result.getFirst();
initialRecord = found ? result.getSecond() : null;
return found;
@@ -575,7 +641,15 @@
*/
long getFilePosition() throws ChangelogException
{
- return reader.getFilePosition();
+ logFile.sharedLock.lock();
+ try
+ {
+ return reader.getFilePosition();
+ }
+ finally
+ {
+ logFile.sharedLock.unlock();
+ }
}
/** {@inheritDoc} */
--
Gitblit v1.10.0