From 8a0ef37a4b2c5d28b2bc9c91e90c4201768b97ea Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 15:43:59 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                           |   19 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java                             |   11 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java                       |   46 ++++--
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java |  151 ++++++++++++++-------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java                           |  168 +++++++++++++----------
 5 files changed, 241 insertions(+), 154 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 90091c9..5b3cca6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -34,7 +34,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.forgerock.util.Reject;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
@@ -183,7 +182,9 @@
     catch (Exception e)
     {
       if (debugEnabled())
+      {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
 
       final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
           .append(String.valueOf(dbDirectory));
@@ -585,7 +586,9 @@
             firstException = e;
           }
           else if (debugEnabled())
+          {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
         }
       }
     }
@@ -687,7 +690,9 @@
         catch (Exception e)
         {
           if (debugEnabled())
+          {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
           logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
         }
       }
@@ -765,12 +770,10 @@
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
       final PositionStrategy positionStrategy) throws ChangelogException
   {
-    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
-        + " is not supported for the JE implementation fo changelog");
     final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
+      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
       final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
       final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
@@ -860,7 +863,7 @@
     {
       replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
-    updateCursorsWithOfflineCSN(baseDN, null);
+    updateCursorsWithOfflineCSN(baseDN, serverId, null);
   }
 
   /** {@inheritDoc} */
@@ -873,12 +876,12 @@
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
-    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
   }
 
-  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+  private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
   {
-    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
     if (cursors != null && !cursors.isEmpty())
     {
       for (ReplicaCursor cursor : cursors)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 6d93ca0..bb9a22f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,7 +41,8 @@
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DN;
@@ -178,18 +179,20 @@
    * Generate a new {@link DBCursor} that allows to browse the db managed by
    * this ReplicaDB and starting at the position defined by a given CSN.
    *
-   * @param startAfterCSN
+   * @param startCSN
    *          The position where the cursor must start. If null, start from the
    *          oldest CSN
+   * @param positionStrategy
+   *          indicates at which exact position the cursor must start
    * @return a new {@link DBCursor} that allows to browse the db managed by this
    *         ReplicaDB and starting at the position defined by a given CSN.
    * @throws ChangelogException
    *           if a database problem happened
    */
-  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
+  public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
       throws ChangelogException
   {
-    return new JEReplicaDBCursor(db, startAfterCSN, this);
+    return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 1e9e8f7..e3dc03e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,18 +32,21 @@
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
 
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
 /**
  * Berkeley DB JE implementation of {@link DBCursor}.
  *
  * \@NotThreadSafe
  */
-public class JEReplicaDBCursor implements DBCursor<UpdateMsg>
+class JEReplicaDBCursor implements DBCursor<UpdateMsg>
 {
-  private UpdateMsg currentChange;
-  private ReplServerDBCursor cursor;
+  private final ReplicationDB db;
+  private final PositionStrategy positionStrategy;
   private JEReplicaDB replicaDB;
-  private ReplicationDB db;
   private CSN lastNonNullCurrentCSN;
+  private ReplServerDBCursor cursor;
+  private UpdateMsg currentChange;
 
   /**
    * Creates a new {@link JEReplicaDBCursor}. All created cursor must be
@@ -51,20 +54,23 @@
    *
    * @param db
    *          The db where the cursor must be created.
-   * @param startAfterCSN
+   * @param startCSN
    *          The CSN after which the cursor must start.If null, start from the
    *          oldest CSN
+   * @param positionStrategy
+   *          indicates at which exact position the cursor must start
    * @param replicaDB
    *          The associated JEReplicaDB.
    * @throws ChangelogException
-   *           if a database problem happened.
+   *          if a database problem happened.
    */
-  public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN,
+  public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy,
       JEReplicaDB replicaDB) throws ChangelogException
   {
     this.db = db;
+    this.positionStrategy = positionStrategy;
     this.replicaDB = replicaDB;
-    this.lastNonNullCurrentCSN = startAfterCSN;
+    this.lastNonNullCurrentCSN = startCSN;
   }
 
   /** {@inheritDoc} */
@@ -78,9 +84,6 @@
   @Override
   public boolean next() throws ChangelogException
   {
-    final ReplServerDBCursor localCursor = cursor;
-    currentChange = localCursor != null ? localCursor.next() : null;
-
     if (currentChange == null)
     {
       synchronized (this)
@@ -91,10 +94,18 @@
         // if following code is called while the cursor is closed.
         // It is better to let the deadlock happen to help quickly identifying
         // and fixing such issue with unit tests.
-        cursor = db.openReadCursor(lastNonNullCurrentCSN);
-        currentChange = cursor.next();
+        cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy);
       }
     }
+
+    // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized.
+    if (positionStrategy == ON_MATCHING_KEY && currentChange != null
+        || positionStrategy == AFTER_MATCHING_KEY)
+    {
+      cursor.next();
+    }
+    currentChange = cursor.getRecord();
+
     if (currentChange != null)
     {
       lastNonNullCurrentCSN = currentChange.getCSN();
@@ -110,7 +121,7 @@
     synchronized (this)
     {
       closeCursor();
-      this.replicaDB = null;
+      replicaDB = null;
     }
   }
 
@@ -120,11 +131,12 @@
     {
       cursor.close();
       cursor = null;
+      currentChange = null;
     }
   }
 
   /**
-   * Called by the Gc when the object is garbage collected. Release the internal
+   * Called by the GC when the object is garbage collected. Release the internal
    * cursor in case the cursor was badly used and {@link #close()} was never
    * called.
    */
@@ -138,7 +150,9 @@
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " currentChange=" + currentChange
+    return getClass().getSimpleName()
+        + " positionStrategy=" + positionStrategy
+        + " currentChange=" + currentChange
         + " replicaDB=" + replicaDB;
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index afc94f0..c04ffef 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,12 +26,10 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.io.Closeable;
 import java.io.UnsupportedEncodingException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.ProtocolVersion;
@@ -40,6 +38,8 @@
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
 import org.opends.server.util.StaticUtils;
 
@@ -244,15 +244,18 @@
 
   private DatabaseEntry createReplicationKey(CSN csn)
   {
-    DatabaseEntry key = new DatabaseEntry();
-    try
+    final DatabaseEntry key = new DatabaseEntry();
+    if (csn != null)
     {
-      key.setData(csn.toString().getBytes("UTF-8"));
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      // Should never happens, UTF-8 is always supported
-      // TODO : add better logging
+      try
+      {
+        key.setData(csn.toString().getBytes("UTF-8"));
+      }
+      catch (UnsupportedEncodingException e)
+      {
+        // Should never happens, UTF-8 is always supported
+        // TODO : add better logging
+      }
     }
     return key;
   }
@@ -286,13 +289,15 @@
    * @param startCSN
    *          The CSN from which the cursor must start.If null, start from the
    *          oldest CSN
+   * @param positionStrategy
+   *          indicates at which exact position the cursor must start
    * @return The ReplServerDBCursor.
    * @throws ChangelogException
    *           If a database problem happened
    */
-  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
+  ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
   {
-    return new ReplServerDBCursor(startCSN);
+    return new ReplServerDBCursor(startCSN, positionStrategy);
   }
 
   /**
@@ -446,7 +451,7 @@
    * This Class implements a cursor that can be used to browse a
    * replicationServer database.
    */
-  class ReplServerDBCursor implements Closeable
+  class ReplServerDBCursor implements DBCursor<UpdateMsg>
   {
     /**
      * The transaction that will protect the actions done with the cursor.
@@ -455,12 +460,14 @@
      * <p>
      * Will be set non null for a write cursor
      */
-    private final Transaction txn;
     private final Cursor cursor;
     private final DatabaseEntry key;
     private final DatabaseEntry data;
+    /** \@Null for read cursors, \@NotNull for deleting cursors. */
+    private final Transaction txn;
+    private UpdateMsg currentRecord;
 
-    private boolean isClosed = false;
+    private boolean isClosed;
 
     /**
      * Creates a ReplServerDBCursor that can be used for browsing a
@@ -468,21 +475,15 @@
      *
      * @param startCSN
      *          The CSN from which the cursor must start.
+     * @param positionStrategy
+     *          indicates at which exact position the cursor must start
      * @throws ChangelogException
      *           When the startCSN does not exist.
      */
-    private ReplServerDBCursor(CSN startCSN) throws ChangelogException
+    private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
     {
-      if (startCSN != null)
-      {
-        key = createReplicationKey(startCSN);
-      }
-      else
-      {
-        key = new DatabaseEntry();
-      }
+      key = createReplicationKey(startCSN);
       data = new DatabaseEntry();
-
       txn = null;
 
       // Take the lock. From now on, whatever error that happen in the life
@@ -516,18 +517,25 @@
             return;
           }
 
-          // We can move close to the startCSN.
-          // Let's create a cursor from that point.
-          DatabaseEntry aKey = new DatabaseEntry();
-          DatabaseEntry aData = new DatabaseEntry();
-          if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
+          if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
           {
-            localCursor.close();
-            localCursor = db.openCursor(txn, null);
+            // We can move close to the startCSN.
+            // Let's create a cursor from that point.
+            key.setData(null);
+            if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+            {
+              localCursor.close();
+              localCursor = db.openCursor(txn, null);
+            }
           }
         }
         cursor = localCursor;
         cursorHeld = cursor != null;
+
+        if (key.getData() != null)
+        {
+          computeCurrentRecord();
+        }
       }
       catch (DatabaseException e)
       {
@@ -605,6 +613,7 @@
           return;
         }
         isClosed = true;
+        currentRecord = null;
       }
 
       closeAndReleaseReadLock(cursor);
@@ -659,6 +668,7 @@
         return null;
       }
 
+      currentRecord = null;
       try
       {
         if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -673,63 +683,75 @@
       }
     }
 
-    /**
-     * Get the next UpdateMsg from this cursor.
-     *
-     * @return the next UpdateMsg.
-     */
-    UpdateMsg next()
+    /** {@inheritDoc} */
+    @Override
+    public boolean next() throws ChangelogException
     {
       if (isClosed)
       {
-        return null;
+        return false;
       }
 
-      UpdateMsg currentChange = null;
-      while (currentChange == null)
+      currentRecord = null;
+      while (currentRecord == null)
       {
         try
         {
           if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
           {
-            return null;
+            return false;
           }
         }
         catch (DatabaseException e)
         {
-          return null;
+          throw new ChangelogException(e);
         }
-
-        CSN csn = null;
-        try
-        {
-          csn = toCSN(key.getData());
-          if (isACounterRecord(csn))
-          {
-            continue;
-          }
-          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
-              data.getData(), ProtocolVersion.getCurrentVersion());
-        }
-        catch (Exception e)
-        {
-          /*
-           * An error happening trying to convert the data from the
-           * replicationServer database to an Update Message. This can only
-           * happen if the database is corrupted. There is not much more that we
-           * can do at this point except trying to continue with the next
-           * record. In such case, it is therefore possible that we miss some
-           * changes.
-           * TODO : This should be handled by the repair functionality.
-           */
-          Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
-              .get(replicationServer.getServerId(),
-                  (csn != null ? csn.toString() : ""),
-                  e.getMessage());
-          logError(message);
-        }
+        computeCurrentRecord();
       }
-      return currentChange;
+      return currentRecord != null;
+    }
+
+    private void computeCurrentRecord()
+    {
+      CSN csn = null;
+      try
+      {
+        csn = toCSN(key.getData());
+        if (isACounterRecord(csn))
+        {
+          return;
+        }
+        currentRecord = toRecord(data.getData());
+      }
+      catch (Exception e)
+      {
+        /*
+         * An error happening trying to convert the data from the
+         * replicationServer database to an Update Message. This can only
+         * happen if the database is corrupted. There is not much more that we
+         * can do at this point except trying to continue with the next
+         * record. In such case, it is therefore possible that we miss some
+         * changes.
+         * TODO : This should be handled by the repair functionality.
+         */
+        logError(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD.get(
+            replicationServer.getServerId(),
+            (csn != null ? csn.toString() : ""),
+            e.getMessage()));
+      }
+    }
+
+    private UpdateMsg toRecord(final byte[] data) throws Exception
+    {
+      final short currentVersion = ProtocolVersion.getCurrentVersion();
+      return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UpdateMsg getRecord()
+    {
+      return currentRecord;
     }
 
     /**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 9442e44..3033af5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -28,7 +28,10 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 
+import org.assertj.core.api.SoftAssertions;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -41,15 +44,18 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
 import static org.testng.Assert.*;
 
 /**
@@ -98,11 +104,12 @@
   void testTrim() throws Exception
   {
     ReplicationServer replicationServer = null;
+    JEReplicaDB replicaDB = null;
     try
     {
       TestCaseUtils.startServer();
       replicationServer = configureReplicationServer(100, 5000);
-      final JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+      replicaDB = newReplicaDB(replicationServer);
 
       CSN[] csns = newCSNs(1, 0, 5);
 
@@ -114,7 +121,7 @@
       //--
       // Iterator tests with changes persisted
       assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
-      assertNotFound(replicaDB, csns[4]);
+      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
 
       assertEquals(replicaDB.getOldestCSN(), csns[0]);
       assertEquals(replicaDB.getNewestCSN(), csns[2]);
@@ -127,7 +134,7 @@
       // Test cursor from existing CSN
       assertFoundInOrder(replicaDB, csns[2], csns[3]);
       assertFoundInOrder(replicaDB, csns[3]);
-      assertNotFound(replicaDB, csns[4]);
+      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
 
       replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
 
@@ -150,6 +157,7 @@
     }
     finally
     {
+      shutdown(replicaDB);
       remove(replicationServer);
     }
   }
@@ -199,38 +207,34 @@
       return;
     }
 
-    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
-    try
-    {
-      assertNull(cursor.getRecord());
-      for (int i = 1; i < csns.length; i++)
-      {
-        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
-        assertTrue(cursor.next(), msg);
-        assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
-      }
-      assertFalse(cursor.next());
-      assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
-          + ", Expected null");
-    }
-    finally
-    {
-      StaticUtils.close(cursor);
-    }
+    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
   }
 
-  private void assertNotFound(JEReplicaDB replicaDB, CSN csn) throws Exception
+  private void assertFoundInOrder(JEReplicaDB replicaDB,
+      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
   {
-    DBCursor<UpdateMsg> cursor = null;
+    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
     try
     {
-      cursor = replicaDB.generateCursorFrom(csn);
-      assertFalse(cursor.next());
-      assertNull(cursor.getRecord());
+      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+      {
+        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+        final SoftAssertions softly = new SoftAssertions();
+        softly.assertThat(cursor.next()).as(msg).isTrue();
+        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+        softly.assertAll();
+      }
+      final SoftAssertions softly = new SoftAssertions();
+      softly.assertThat(cursor.next()).isFalse();
+      softly.assertThat(cursor.getRecord()).isNull();
+      softly.assertAll();
     }
     finally
     {
-      StaticUtils.close(cursor);
+      close(cursor);
     }
   }
 
@@ -243,11 +247,12 @@
   void testClear() throws Exception
   {
     ReplicationServer replicationServer = null;
+    JEReplicaDB replicaDB = null;
     try
     {
       TestCaseUtils.startServer();
       replicationServer = configureReplicationServer(100, 5000);
-      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+      replicaDB = newReplicaDB(replicationServer);
 
       CSN[] csns = newCSNs(1, 0, 3);
 
@@ -267,6 +272,7 @@
     }
     finally
     {
+      shutdown(replicaDB);
       remove(replicationServer);
     }
   }
@@ -275,7 +281,6 @@
   public void testGenerateCursorFrom() throws Exception
   {
     ReplicationServer replicationServer = null;
-    DBCursor<UpdateMsg> cursor = null;
     JEReplicaDB replicaDB = null;
     try
     {
@@ -283,38 +288,69 @@
       replicationServer = configureReplicationServer(100000, 10);
       replicaDB = newReplicaDB(replicationServer);
 
-      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
-      for (int i = 0; i < 5; i++)
+      final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
+      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+      csns2.remove(csns[3]);
+
+      for (CSN csn : csns2)
       {
-        if (i != 3)
-        {
-          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
-        }
+        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
       }
 
-      cursor = replicaDB.generateCursorFrom(csns[0]);
-      assertTrue(cursor.next());
-      assertEquals(cursor.getRecord().getCSN(), csns[1]);
-      StaticUtils.close(cursor);
+      for (CSN csn : csns2)
+      {
+        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+      }
+      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
 
-      cursor = replicaDB.generateCursorFrom(csns[3]);
-      assertTrue(cursor.next());
-      assertEquals(cursor.getRecord().getCSN(), csns[4]);
-      StaticUtils.close(cursor);
-
-      cursor = replicaDB.generateCursorFrom(csns[4]);
-      assertFalse(cursor.next());
-      assertNull(cursor.getRecord());
+      for (int i = 0; i < csns2.size() - 1; i++)
+      {
+        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+      }
+      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
     }
     finally
     {
-      StaticUtils.close(cursor);
-      if (replicaDB != null)
-        replicaDB.shutdown();
+      shutdown(replicaDB);
       remove(replicationServer);
     }
   }
 
+  private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
+      final PositionStrategy positionStrategy, final CSN expectedCSN)
+      throws ChangelogException
+  {
+    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+    try
+    {
+      final SoftAssertions softly = new SoftAssertions();
+      softly.assertThat(cursor.next()).isTrue();
+      softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
+      softly.assertAll();
+    }
+    finally
+    {
+      close(cursor);
+    }
+  }
+
+  private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
+      final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+    try
+    {
+      final SoftAssertions softly = new SoftAssertions();
+      softly.assertThat(cursor.next()).isFalse();
+      softly.assertThat(cursor.getRecord()).isNull();
+      softly.assertAll();
+    }
+    finally
+    {
+      close(cursor);
+    }
+  }
+
   /**
    * Test the logic that manages counter records in the JEReplicaDB in order to
    * optimize the oldest and newest records in the replication changelog db.
@@ -412,13 +448,22 @@
     }
     finally
     {
-      if (replicaDB != null)
-        replicaDB.shutdown();
+      shutdown(replicaDB);
       if (dbEnv != null)
+      {
         dbEnv.shutdown();
+      }
       remove(replicationServer);
       TestCaseUtils.deleteDirectory(testRoot);
     }
   }
 
+  private void shutdown(JEReplicaDB replicaDB)
+  {
+    if (replicaDB != null)
+    {
+      replicaDB.shutdown();
+    }
+  }
+
 }

--
Gitblit v1.10.0