From 84cecd3711ddbbd60132cdd80957e387f23cf63e Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 55 +++++++++++++++++++++++++++++++++++++++----------------
1 files changed, 39 insertions(+), 16 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 1526830..0525996 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -43,7 +43,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
@@ -92,7 +92,7 @@
private final AtomicBoolean shutdown = new AtomicBoolean(false);
/** The log in which records are persisted. */
- private final LogFile<CSN, UpdateMsg> logFile;
+ private final Log<CSN, UpdateMsg> log;
/**
* Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -133,7 +133,7 @@
this.baseDN = baseDN;
this.replicationServer = replicationServer;
this.replicationEnv = replicationEnv;
- this.logFile = createLogFile(replicationEnv);
+ this.log = createLog(replicationEnv);
this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -142,17 +142,17 @@
private CSN readOldestCSN() throws ChangelogException
{
- final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
+ final Record<CSN, UpdateMsg> record = log.getOldestRecord();
return record == null ? null : record.getKey();
}
private CSN readNewestCSN() throws ChangelogException
{
- final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
+ final Record<CSN, UpdateMsg> record = log.getNewestRecord();
return record == null ? null : record.getKey();
}
- private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
+ private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
{
final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
@@ -174,7 +174,7 @@
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
.toString(), String.valueOf(baseDN), String.valueOf(serverId)));
}
- logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
+ log.append(Record.from(updateMsg.getCSN(), updateMsg));
final CSNLimits limits = csnLimits;
final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
final boolean updateOld = limits.oldestCSN == null;
@@ -239,7 +239,7 @@
*/
DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
{
- LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
+ RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
return new FileReplicaDBCursor(cursor, startAfterCSN);
}
@@ -250,7 +250,7 @@
{
if (shutdown.compareAndSet(false, true))
{
- logFile.close();
+ log.close();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -270,10 +270,11 @@
{
return;
}
-
- // TODO : no purge implemented yet, as we have a single-file log.
- // The purge must be implemented once we handle a log with multiple files.
- // The purge will only delete whole files.
+ final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
+ if (oldestRecord != null)
+ {
+ csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
+ }
}
/**
@@ -346,7 +347,7 @@
void clear() throws ChangelogException
{
// Remove all persisted data and reset generationId to default value
- logFile.clear();
+ log.clear();
replicationEnv.resetGenerationId(baseDN);
csnLimits = new CSNLimits(null, null);
@@ -363,7 +364,7 @@
*/
long getNumberRecords() throws ChangelogException
{
- return logFile.getNumberOfRecords();
+ return log.getNumberOfRecords();
}
/** Parser of records persisted in the ReplicaDB log. */
@@ -372,8 +373,9 @@
private static final DebugTracer TRACER = getTracer();
@Override
- public ByteString encodeRecord(CSN key, UpdateMsg message)
+ public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
{
+ final UpdateMsg message = record.getValue();
try
{
return ByteString.wrap(message.getBytes());
@@ -400,6 +402,27 @@
throw new DecodingException(e);
}
}
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN decodeKeyFromString(String key) throws ChangelogException
+ {
+ return new CSN(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String encodeKeyToString(CSN key)
+ {
+ return key.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CSN getMaxKey()
+ {
+ return CSN.MAX_CSN_VALUE;
+ }
}
}
--
Gitblit v1.10.0