From 84cecd3711ddbbd60132cdd80957e387f23cf63e Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java |   55 +++++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 39 insertions(+), 16 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 1526830..0525996 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -43,7 +43,7 @@
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.ByteString;
@@ -92,7 +92,7 @@
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
   /** The log in which records are persisted. */
-  private final LogFile<CSN, UpdateMsg> logFile;
+  private final Log<CSN, UpdateMsg> log;
 
   /**
    * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -133,7 +133,7 @@
     this.baseDN = baseDN;
     this.replicationServer = replicationServer;
     this.replicationEnv = replicationEnv;
-    this.logFile = createLogFile(replicationEnv);
+    this.log = createLog(replicationEnv);
     this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
 
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -142,17 +142,17 @@
 
   private CSN readOldestCSN() throws ChangelogException
   {
-    final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
+    final Record<CSN, UpdateMsg> record = log.getOldestRecord();
     return record == null ? null : record.getKey();
   }
 
   private CSN readNewestCSN() throws ChangelogException
   {
-    final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
+    final Record<CSN, UpdateMsg> record = log.getNewestRecord();
     return record == null ? null : record.getKey();
   }
 
-  private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
+  private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
   {
     final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
     return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
@@ -174,7 +174,7 @@
           ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
               .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
     }
-    logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
+    log.append(Record.from(updateMsg.getCSN(), updateMsg));
     final CSNLimits limits = csnLimits;
     final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
     final boolean updateOld = limits.oldestCSN == null;
@@ -239,7 +239,7 @@
    */
   DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
   {
-    LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
+    RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
     return new FileReplicaDBCursor(cursor, startAfterCSN);
   }
 
@@ -250,7 +250,7 @@
   {
     if (shutdown.compareAndSet(false, true))
     {
-      logFile.close();
+      log.close();
       DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
   }
@@ -270,10 +270,11 @@
     {
       return;
     }
-
-    // TODO : no purge implemented yet, as we have a single-file log.
-    // The purge must be implemented once we handle a log with multiple files.
-    // The purge will only delete whole files.
+    final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
+    if (oldestRecord != null)
+    {
+      csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
+    }
   }
 
   /**
@@ -346,7 +347,7 @@
   void clear() throws ChangelogException
   {
     // Remove all persisted data and reset generationId to default value
-    logFile.clear();
+    log.clear();
     replicationEnv.resetGenerationId(baseDN);
 
     csnLimits = new CSNLimits(null, null);
@@ -363,7 +364,7 @@
    */
   long getNumberRecords() throws ChangelogException
   {
-    return logFile.getNumberOfRecords();
+    return log.getNumberOfRecords();
   }
 
   /** Parser of records persisted in the ReplicaDB log. */
@@ -372,8 +373,9 @@
     private static final DebugTracer TRACER = getTracer();
 
     @Override
-    public ByteString encodeRecord(CSN key, UpdateMsg message)
+    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
     {
+      final UpdateMsg message = record.getValue();
       try
       {
         return ByteString.wrap(message.getBytes());
@@ -400,6 +402,27 @@
         throw new DecodingException(e);
       }
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public CSN decodeKeyFromString(String key) throws ChangelogException
+    {
+      return new CSN(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String encodeKeyToString(CSN key)
+    {
+      return key.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CSN getMaxKey()
+    {
+      return CSN.MAX_CSN_VALUE;
+    }
   }
 
 }

--
Gitblit v1.10.0