From 01eb7d07467b57c61868c73e9a94bff1d0b2dcd1 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation
---
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 69 ++++++++++++++++++++++++----------
1 files changed, 49 insertions(+), 20 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 53ee20e..9a91ff9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
@@ -38,10 +39,11 @@
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
- * Logfile-based implementation of a ChangeNumberIndexDB.
+ * Implementation of a ChangeNumberIndexDB with a log.
* <p>
* This class publishes some monitoring information below <code>
* cn=monitor</code>.
@@ -57,7 +59,7 @@
static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
/** The log in which records are persisted. */
- private final LogFile<Long, ChangeNumberIndexRecord> logFile;
+ private final Log<Long, ChangeNumberIndexRecord> log;
/**
* The newest changenumber stored in the DB. It is used to avoid purging the
@@ -92,7 +94,7 @@
*/
FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
{
- logFile = replicationEnv.getOrCreateCNIndexDB();
+ log = replicationEnv.getOrCreateCNIndexDB();
final ChangeNumberIndexRecord newestRecord = readLastRecord();
newestChangeNumber = getChangeNumber(newestRecord);
// initialization of the lastGeneratedChangeNumber from the DB content
@@ -106,13 +108,13 @@
private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
{
- final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord();
+ final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
return record == null ? null : record.getValue();
}
private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
{
- final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord();
+ final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
return record == null ? null : record.getValue();
}
@@ -132,7 +134,7 @@
final long changeNumber = nextChangeNumber();
final ChangeNumberIndexRecord newRecord =
new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN());
- logFile.addRecord(newRecord.getChangeNumber(), newRecord);
+ log.append(Record.from(newRecord.getChangeNumber(), newRecord));
newestChangeNumber = changeNumber;
if (debugEnabled())
@@ -177,7 +179,7 @@
*/
long count() throws ChangelogException
{
- return logFile.getNumberOfRecords();
+ return log.getNumberOfRecords();
}
/**
@@ -197,7 +199,7 @@
@Override
public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
{
- return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber));
+ return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
}
/**
@@ -207,7 +209,7 @@
{
if (shutdown.compareAndSet(false, true))
{
- logFile.close();
+ log.close();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -228,11 +230,8 @@
{
return null;
}
-
- // TODO : no purge implemented yet as implementation is based on a single-file log.
- // The purge must be implemented once we handle a log with multiple files.
- // The purge will only delete whole files.
- return null;
+ final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
+ return record != null ? record.getValue().getCSN() : null;
}
/**
@@ -321,7 +320,7 @@
*/
public void clear() throws ChangelogException
{
- logFile.clear();
+ log.clear();
newestChangeNumber = NO_KEY;
}
@@ -331,15 +330,16 @@
private static final byte STRING_SEPARATOR = 0;
@Override
- public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record)
+ public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
{
+ final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
return new ByteStringBuilder()
- .append(changeNumber)
- .append(record.getPreviousCookie())
+ .append(record.getKey())
+ .append(cnIndexRecord.getPreviousCookie())
.append(STRING_SEPARATOR)
- .append(record.getBaseDN().toString())
+ .append(cnIndexRecord.getBaseDN().toString())
.append(STRING_SEPARATOR)
- .append(record.getCSN().toByteString()).toByteString();
+ .append(cnIndexRecord.getCSN().toByteString()).toByteString();
}
@Override
@@ -378,6 +378,35 @@
}
return length;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public Long decodeKeyFromString(String key) throws ChangelogException
+ {
+ try
+ {
+ return Long.valueOf(key);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ChangelogException(
+ ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String encodeKeyToString(Long key)
+ {
+ return key.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Long getMaxKey()
+ {
+ return Long.MAX_VALUE;
+ }
}
}
--
Gitblit v1.10.0