From 12db845ee284503024cd2ebd62e6549d5cc42b77 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 20 Aug 2014 10:57:29 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java |  162 +++++++++++++++++++++++++++++++-----------------------
 1 files changed, 93 insertions(+), 69 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 27ea1a6..abbe49c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,7 +26,6 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.io.Closeable;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import java.io.UnsupportedEncodingException;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -39,6 +38,8 @@
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
 import org.opends.server.util.StaticUtils;
 
@@ -244,15 +245,18 @@
 
   private DatabaseEntry createReplicationKey(CSN csn)
   {
-    DatabaseEntry key = new DatabaseEntry();
-    try
+    final DatabaseEntry key = new DatabaseEntry();
+    if (csn != null)
     {
-      key.setData(csn.toString().getBytes("UTF-8"));
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      // Should never happens, UTF-8 is always supported
-      // TODO : add better logging
+      try
+      {
+        key.setData(csn.toString().getBytes("UTF-8"));
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        // Should never happens, UTF-8 is always supported
+        // TODO : add better logging
+      }
     }
     return key;
   }
@@ -285,13 +289,15 @@
    * @param startCSN
    *          The CSN from which the cursor must start.If null, start from the
    *          oldest CSN
+   * @param positionStrategy
+   *          indicates at which exact position the cursor must start
    * @return The ReplServerDBCursor.
    * @throws ChangelogException
    *           If a database problem happened
    */
-  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
+  ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
   {
-    return new ReplServerDBCursor(startCSN);
+    return new ReplServerDBCursor(startCSN, positionStrategy);
   }
 
   /**
@@ -445,7 +451,7 @@
    * This Class implements a cursor that can be used to browse a
    * replicationServer database.
    */
-  class ReplServerDBCursor implements Closeable
+  class ReplServerDBCursor implements DBCursor<UpdateMsg>
   {
     /**
      * The transaction that will protect the actions done with the cursor.
@@ -454,12 +460,14 @@
      * <p>
      * Will be set non null for a write cursor
      */
-    private final Transaction txn;
     private final Cursor cursor;
     private final DatabaseEntry key;
     private final DatabaseEntry data;
+    /** \@Null for read cursors, \@NotNull for deleting cursors. */
+    private final Transaction txn;
+    private UpdateMsg currentRecord;
 
-    private boolean isClosed = false;
+    private boolean isClosed;
 
     /**
      * Creates a ReplServerDBCursor that can be used for browsing a
@@ -467,21 +475,15 @@
      *
      * @param startCSN
      *          The CSN from which the cursor must start.
+     * @param positionStrategy
+     *          indicates at which exact position the cursor must start
      * @throws ChangelogException
      *           When the startCSN does not exist.
      */
-    private ReplServerDBCursor(CSN startCSN) throws ChangelogException
+    private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
     {
-      if (startCSN != null)
-      {
-        key = createReplicationKey(startCSN);
-      }
-      else
-      {
-        key = new DatabaseEntry();
-      }
+      key = createReplicationKey(startCSN);
       data = new DatabaseEntry();
-
       txn = null;
 
       // Take the lock. From now on, whatever error that happen in the life
@@ -515,18 +517,25 @@
             return;
           }
 
-          // We can move close to the startCSN.
-          // Let's create a cursor from that point.
-          DatabaseEntry aKey = new DatabaseEntry();
-          DatabaseEntry aData = new DatabaseEntry();
-          if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
+          if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
           {
-            localCursor.close();
-            localCursor = db.openCursor(txn, null);
+            // We can move close to the startCSN.
+            // Let's create a cursor from that point.
+            key.setData(null);
+            if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+            {
+              localCursor.close();
+              localCursor = db.openCursor(txn, null);
+            }
           }
         }
         cursor = localCursor;
         cursorHeld = cursor != null;
+
+        if (key.getData() != null)
+        {
+          computeCurrentRecord();
+        }
       }
       catch (DatabaseException e)
       {
@@ -604,6 +613,7 @@
           return;
         }
         isClosed = true;
+        currentRecord = null;
       }
 
       closeAndReleaseReadLock(cursor);
@@ -658,6 +668,7 @@
         return null;
       }
 
+      currentRecord = null;
       try
       {
         if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -672,60 +683,73 @@
       }
     }
 
-    /**
-     * Get the next UpdateMsg from this cursor.
-     *
-     * @return the next UpdateMsg.
-     */
-    UpdateMsg next()
+    /** {@inheritDoc} */
+    @Override
+    public boolean next() throws ChangelogException
     {
       if (isClosed)
       {
-        return null;
+        return false;
       }
 
-      UpdateMsg currentChange = null;
-      while (currentChange == null)
+      currentRecord = null;
+      while (currentRecord == null)
       {
         try
         {
           if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
           {
-            return null;
+            return false;
           }
         }
         catch (DatabaseException e)
         {
-          return null;
+          throw new ChangelogException(e);
         }
-
-        CSN csn = null;
-        try
-        {
-          csn = toCSN(key.getData());
-          if (isACounterRecord(csn))
-          {
-            continue;
-          }
-          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
-              data.getData(), ProtocolVersion.getCurrentVersion());
-        }
-        catch (Exception e)
-        {
-          /*
-           * An error happening trying to convert the data from the
-           * replicationServer database to an Update LocalizableMessage. This can only
-           * happen if the database is corrupted. There is not much more that we
-           * can do at this point except trying to continue with the next
-           * record. In such case, it is therefore possible that we miss some
-           * changes.
-           * TODO : This should be handled by the repair functionality.
-           */
-          logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
-                  csn, e.getMessage());
-        }
+        computeCurrentRecord();
       }
-      return currentChange;
+      return currentRecord != null;
+    }
+
+    private void computeCurrentRecord()
+    {
+      CSN csn = null;
+      try
+      {
+        csn = toCSN(key.getData());
+        if (isACounterRecord(csn))
+        {
+          return;
+        }
+        currentRecord = toRecord(data.getData());
+      }
+      catch (Exception e)
+      {
+        /*
+         * An error happening trying to convert the data from the
+         * replicationServer database to an Update Message. This can only
+         * happen if the database is corrupted. There is not much more that we
+         * can do at this point except trying to continue with the next
+         * record. In such case, it is therefore possible that we miss some
+         * changes.
+         * TODO : This should be handled by the repair functionality.
+         */
+        logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
+            csn, e.getMessage());
+      }
+    }
+
+    private UpdateMsg toRecord(final byte[] data) throws Exception
+    {
+      final short currentVersion = ProtocolVersion.getCurrentVersion();
+      return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UpdateMsg getRecord()
+    {
+      return currentRecord;
     }
 
     /**

--
Gitblit v1.10.0