From 01eb7d07467b57c61868c73e9a94bff1d0b2dcd1 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation

---
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java |   69 ++++++++++++++++++++++++----------
 1 files changed, 49 insertions(+), 20 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 53ee20e..9a91ff9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
@@ -38,10 +39,11 @@
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.types.*;
 
+import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 /**
- * Logfile-based implementation of a ChangeNumberIndexDB.
+ * Implementation of a ChangeNumberIndexDB with a log.
  * <p>
  * This class publishes some monitoring information below <code>
  * cn=monitor</code>.
@@ -57,7 +59,7 @@
   static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
 
   /** The log in which records are persisted. */
-  private final LogFile<Long, ChangeNumberIndexRecord> logFile;
+  private final Log<Long, ChangeNumberIndexRecord> log;
 
   /**
    * The newest changenumber stored in the DB. It is used to avoid purging the
@@ -92,7 +94,7 @@
    */
   FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
   {
-    logFile = replicationEnv.getOrCreateCNIndexDB();
+    log = replicationEnv.getOrCreateCNIndexDB();
     final ChangeNumberIndexRecord newestRecord = readLastRecord();
     newestChangeNumber = getChangeNumber(newestRecord);
     // initialization of the lastGeneratedChangeNumber from the DB content
@@ -106,13 +108,13 @@
 
   private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
   {
-    final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord();
+    final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
     return record == null ? null : record.getValue();
   }
 
   private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
   {
-    final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord();
+    final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
     return record == null ? null : record.getValue();
   }
 
@@ -132,7 +134,7 @@
     final long changeNumber = nextChangeNumber();
     final ChangeNumberIndexRecord newRecord =
         new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN());
-    logFile.addRecord(newRecord.getChangeNumber(), newRecord);
+    log.append(Record.from(newRecord.getChangeNumber(), newRecord));
     newestChangeNumber = changeNumber;
 
     if (debugEnabled())
@@ -177,7 +179,7 @@
    */
   long count() throws ChangelogException
   {
-    return logFile.getNumberOfRecords();
+    return log.getNumberOfRecords();
   }
 
   /**
@@ -197,7 +199,7 @@
   @Override
   public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
   {
-    return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber));
+    return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
   }
 
   /**
@@ -207,7 +209,7 @@
   {
     if (shutdown.compareAndSet(false, true))
     {
-      logFile.close();
+      log.close();
       DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
   }
@@ -228,11 +230,8 @@
     {
       return null;
     }
-
-    // TODO : no purge implemented yet as implementation is based on a single-file log.
-    // The purge must be implemented once we handle a log with multiple files.
-    // The purge will only delete whole files.
-    return null;
+    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
+    return record != null ? record.getValue().getCSN() : null;
   }
 
   /**
@@ -321,7 +320,7 @@
    */
   public void clear() throws ChangelogException
   {
-    logFile.clear();
+    log.clear();
     newestChangeNumber = NO_KEY;
   }
 
@@ -331,15 +330,16 @@
     private static final byte STRING_SEPARATOR = 0;
 
     @Override
-    public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record)
+    public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
     {
+      final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
       return new ByteStringBuilder()
-        .append(changeNumber)
-        .append(record.getPreviousCookie())
+        .append(record.getKey())
+        .append(cnIndexRecord.getPreviousCookie())
         .append(STRING_SEPARATOR)
-        .append(record.getBaseDN().toString())
+        .append(cnIndexRecord.getBaseDN().toString())
         .append(STRING_SEPARATOR)
-        .append(record.getCSN().toByteString()).toByteString();
+        .append(cnIndexRecord.getCSN().toByteString()).toByteString();
     }
 
     @Override
@@ -378,6 +378,35 @@
       }
       return length;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public Long decodeKeyFromString(String key) throws ChangelogException
+    {
+      try
+      {
+        return Long.valueOf(key);
+      }
+      catch (NumberFormatException e)
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String encodeKeyToString(Long key)
+    {
+      return key.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Long getMaxKey()
+    {
+      return Long.MAX_VALUE;
+    }
   }
 
 }

--
Gitblit v1.10.0