mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.57.2014 12db845ee284503024cd2ebd62e6549d5cc42b77
OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog

Implemented PositionStrategy.ON_MATCHING_KEY for JEChangelogDB.getCursorFrom(DN baseDN, int serverId).



JEChangelogDB.java, JEReplicaDB.java:
Implemented PositionStrategy.ON_MATCHING_KEY.

JEReplicaDBCursor.java:
Added new field positionStrategy.
Simplified code in next() + added gotoNext().

ReplicationDB.java:
In openReadCursor(), added PositionStrategy parameter.
Made createReplicationKey() handle null CSNs.
Inner class ReplServerDBCursor now implement DBCursor + Added getRecord() + Made next(), now return boolean instead of UpdateMsg + Extracted method computeCurrentRecord().
Added new field "UpdateMsg currentRecord".
Added PositionStrategy parameter to ReplServerDBCursor ctor + protected special code for PositionStrategy.AFTER_MATCHING_KEY.


JEReplicaDBTest.java:
Added test cases for new PositionStrategy ON_MATCHING_KEY.
Split assertFoundInOrder() in two: one each for AFTER_MATCHING_KEY and ON_MATCHING_KEY.
In testGenerateCursorFrom(), changed the code to cover all CSNs.
Extracted method assertNextCSN() and shutdown().
Used SoftAssertions.
5 files modified
384 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 16 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 9 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java 46 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 162 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 151 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -37,7 +37,6 @@
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.util.Reject;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
@@ -578,7 +577,10 @@
          {
            firstException = e;
          }
          else logger.traceException(e);
          else
          {
            logger.traceException(e);
          }
        }
      }
    }
@@ -757,12 +759,10 @@
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
        + " is not supported for the JE implementation fo changelog");
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
@@ -853,12 +853,12 @@
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
  private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
  {
    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
    if (cursors != null && !cursors.isEmpty())
    {
      for (ReplicaCursor cursor : cursors)
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -177,18 +178,20 @@
   * Generate a new {@link DBCursor} that allows to browse the db managed by
   * this ReplicaDB and starting at the position defined by a given CSN.
   *
   * @param startAfterCSN
   * @param startCSN
   *          The position where the cursor must start. If null, start from the
   *          oldest CSN
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   * @return a new {@link DBCursor} that allows to browse the db managed by this
   *         ReplicaDB and starting at the position defined by a given CSN.
   * @throws ChangelogException
   *           if a database problem happened
   */
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
      throws ChangelogException
  {
    return new JEReplicaDBCursor(db, startAfterCSN, this);
    return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,18 +32,21 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
 * Berkeley DB JE implementation of {@link DBCursor}.
 *
 * \@NotThreadSafe
 */
public class JEReplicaDBCursor implements DBCursor<UpdateMsg>
class JEReplicaDBCursor implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentChange;
  private ReplServerDBCursor cursor;
  private final ReplicationDB db;
  private final PositionStrategy positionStrategy;
  private JEReplicaDB replicaDB;
  private ReplicationDB db;
  private CSN lastNonNullCurrentCSN;
  private ReplServerDBCursor cursor;
  private UpdateMsg currentChange;
  /**
   * Creates a new {@link JEReplicaDBCursor}. All created cursor must be
@@ -51,20 +54,23 @@
   *
   * @param db
   *          The db where the cursor must be created.
   * @param startAfterCSN
   * @param startCSN
   *          The CSN after which the cursor must start.If null, start from the
   *          oldest CSN
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   * @param replicaDB
   *          The associated JEReplicaDB.
   * @throws ChangelogException
   *           if a database problem happened.
   *          if a database problem happened.
   */
  public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN,
  public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy,
      JEReplicaDB replicaDB) throws ChangelogException
  {
    this.db = db;
    this.positionStrategy = positionStrategy;
    this.replicaDB = replicaDB;
    this.lastNonNullCurrentCSN = startAfterCSN;
    this.lastNonNullCurrentCSN = startCSN;
  }
  /** {@inheritDoc} */
@@ -78,9 +84,6 @@
  @Override
  public boolean next() throws ChangelogException
  {
    final ReplServerDBCursor localCursor = cursor;
    currentChange = localCursor != null ? localCursor.next() : null;
    if (currentChange == null)
    {
      synchronized (this)
@@ -91,10 +94,18 @@
        // if following code is called while the cursor is closed.
        // It is better to let the deadlock happen to help quickly identifying
        // and fixing such issue with unit tests.
        cursor = db.openReadCursor(lastNonNullCurrentCSN);
        currentChange = cursor.next();
        cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy);
      }
    }
    // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized.
    if (positionStrategy == ON_MATCHING_KEY && currentChange != null
        || positionStrategy == AFTER_MATCHING_KEY)
    {
      cursor.next();
    }
    currentChange = cursor.getRecord();
    if (currentChange != null)
    {
      lastNonNullCurrentCSN = currentChange.getCSN();
@@ -110,7 +121,7 @@
    synchronized (this)
    {
      closeCursor();
      this.replicaDB = null;
      replicaDB = null;
    }
  }
@@ -120,11 +131,12 @@
    {
      cursor.close();
      cursor = null;
      currentChange = null;
    }
  }
  /**
   * Called by the Gc when the object is garbage collected. Release the internal
   * Called by the GC when the object is garbage collected. Release the internal
   * cursor in case the cursor was badly used and {@link #close()} was never
   * called.
   */
@@ -138,7 +150,9 @@
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentChange=" + currentChange
    return getClass().getSimpleName()
        + " positionStrategy=" + positionStrategy
        + " currentChange=" + currentChange
        + " replicaDB=" + replicaDB;
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.io.Closeable;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReadWriteLock;
@@ -39,6 +38,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -244,15 +245,18 @@
  private DatabaseEntry createReplicationKey(CSN csn)
  {
    DatabaseEntry key = new DatabaseEntry();
    try
    final DatabaseEntry key = new DatabaseEntry();
    if (csn != null)
    {
      key.setData(csn.toString().getBytes("UTF-8"));
    }
    catch (UnsupportedEncodingException e)
    {
      // Should never happens, UTF-8 is always supported
      // TODO : add better logging
      try
      {
        key.setData(csn.toString().getBytes("UTF-8"));
      }
      catch (UnsupportedEncodingException e)
      {
        // Should never happens, UTF-8 is always supported
        // TODO : add better logging
      }
    }
    return key;
  }
@@ -285,13 +289,15 @@
   * @param startCSN
   *          The CSN from which the cursor must start.If null, start from the
   *          oldest CSN
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   * @return The ReplServerDBCursor.
   * @throws ChangelogException
   *           If a database problem happened
   */
  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN);
    return new ReplServerDBCursor(startCSN, positionStrategy);
  }
  /**
@@ -445,7 +451,7 @@
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  class ReplServerDBCursor implements Closeable
  class ReplServerDBCursor implements DBCursor<UpdateMsg>
  {
    /**
     * The transaction that will protect the actions done with the cursor.
@@ -454,12 +460,14 @@
     * <p>
     * Will be set non null for a write cursor
     */
    private final Transaction txn;
    private final Cursor cursor;
    private final DatabaseEntry key;
    private final DatabaseEntry data;
    /** \@Null for read cursors, \@NotNull for deleting cursors. */
    private final Transaction txn;
    private UpdateMsg currentRecord;
    private boolean isClosed = false;
    private boolean isClosed;
    /**
     * Creates a ReplServerDBCursor that can be used for browsing a
@@ -467,21 +475,15 @@
     *
     * @param startCSN
     *          The CSN from which the cursor must start.
     * @param positionStrategy
     *          indicates at which exact position the cursor must start
     * @throws ChangelogException
     *           When the startCSN does not exist.
     */
    private ReplServerDBCursor(CSN startCSN) throws ChangelogException
    private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
    {
      if (startCSN != null)
      {
        key = createReplicationKey(startCSN);
      }
      else
      {
        key = new DatabaseEntry();
      }
      key = createReplicationKey(startCSN);
      data = new DatabaseEntry();
      txn = null;
      // Take the lock. From now on, whatever error that happen in the life
@@ -515,18 +517,25 @@
            return;
          }
          // We can move close to the startCSN.
          // Let's create a cursor from that point.
          DatabaseEntry aKey = new DatabaseEntry();
          DatabaseEntry aData = new DatabaseEntry();
          if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
          if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
          {
            localCursor.close();
            localCursor = db.openCursor(txn, null);
            // We can move close to the startCSN.
            // Let's create a cursor from that point.
            key.setData(null);
            if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              localCursor.close();
              localCursor = db.openCursor(txn, null);
            }
          }
        }
        cursor = localCursor;
        cursorHeld = cursor != null;
        if (key.getData() != null)
        {
          computeCurrentRecord();
        }
      }
      catch (DatabaseException e)
      {
@@ -604,6 +613,7 @@
          return;
        }
        isClosed = true;
        currentRecord = null;
      }
      closeAndReleaseReadLock(cursor);
@@ -658,6 +668,7 @@
        return null;
      }
      currentRecord = null;
      try
      {
        if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -672,60 +683,73 @@
      }
    }
    /**
     * Get the next UpdateMsg from this cursor.
     *
     * @return the next UpdateMsg.
     */
    UpdateMsg next()
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (isClosed)
      {
        return null;
        return false;
      }
      UpdateMsg currentChange = null;
      while (currentChange == null)
      currentRecord = null;
      while (currentRecord == null)
      {
        try
        {
          if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
          {
            return null;
            return false;
          }
        }
        catch (DatabaseException e)
        {
          return null;
          throw new ChangelogException(e);
        }
        CSN csn = null;
        try
        {
          csn = toCSN(key.getData());
          if (isACounterRecord(csn))
          {
            continue;
          }
          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
              data.getData(), ProtocolVersion.getCurrentVersion());
        }
        catch (Exception e)
        {
          /*
           * An error happening trying to convert the data from the
           * replicationServer database to an Update LocalizableMessage. This can only
           * happen if the database is corrupted. There is not much more that we
           * can do at this point except trying to continue with the next
           * record. In such case, it is therefore possible that we miss some
           * changes.
           * TODO : This should be handled by the repair functionality.
           */
          logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
                  csn, e.getMessage());
        }
        computeCurrentRecord();
      }
      return currentChange;
      return currentRecord != null;
    }
    private void computeCurrentRecord()
    {
      CSN csn = null;
      try
      {
        csn = toCSN(key.getData());
        if (isACounterRecord(csn))
        {
          return;
        }
        currentRecord = toRecord(data.getData());
      }
      catch (Exception e)
      {
        /*
         * An error happening trying to convert the data from the
         * replicationServer database to an Update Message. This can only
         * happen if the database is corrupted. There is not much more that we
         * can do at this point except trying to continue with the next
         * record. In such case, it is therefore possible that we miss some
         * changes.
         * TODO : This should be handled by the repair functionality.
         */
        logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
            csn, e.getMessage());
      }
    }
    private UpdateMsg toRecord(final byte[] data) throws Exception
    {
      final short currentVersion = ProtocolVersion.getCurrentVersion();
      return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
    }
    /** {@inheritDoc} */
    @Override
    public UpdateMsg getRecord()
    {
      return currentRecord;
    }
    /**
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -28,7 +28,10 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.forgerock.opendj.config.server.ConfigException;
@@ -40,13 +43,16 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
@@ -81,11 +87,12 @@
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      final JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 5);
@@ -97,7 +104,7 @@
      //--
      // Iterator tests with changes persisted
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
      assertEquals(replicaDB.getOldestCSN(), csns[0]);
      assertEquals(replicaDB.getNewestCSN(), csns[2]);
@@ -110,7 +117,7 @@
      // Test cursor from existing CSN
      assertFoundInOrder(replicaDB, csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4]);
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
@@ -133,6 +140,7 @@
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -182,38 +190,34 @@
      return;
    }
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
    try
    {
      assertNull(cursor.getRecord());
      for (int i = 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        assertTrue(cursor.next(), msg);
        assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
      }
      assertFalse(cursor.next());
      assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
          + ", Expected null");
    }
    finally
    {
      StaticUtils.close(cursor);
    }
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertNotFound(JEReplicaDB replicaDB, CSN csn) throws Exception
  private void assertFoundInOrder(JEReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = null;
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    try
    {
      cursor = replicaDB.generateCursorFrom(csn);
      assertFalse(cursor.next());
      assertNull(cursor.getRecord());
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      StaticUtils.close(cursor);
      close(cursor);
    }
  }
@@ -226,11 +230,12 @@
  void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      JEReplicaDB replicaDB = newReplicaDB(replicationServer);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 3);
@@ -250,6 +255,7 @@
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -258,7 +264,6 @@
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    DBCursor<UpdateMsg> cursor = null;
    JEReplicaDB replicaDB = null;
    try
    {
@@ -266,38 +271,69 @@
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
      for (int i = 0; i < 5; i++)
      final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        if (i != 3)
        {
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[1]);
      StaticUtils.close(cursor);
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      cursor = replicaDB.generateCursorFrom(csns[3]);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[4]);
      StaticUtils.close(cursor);
      cursor = replicaDB.generateCursorFrom(csns[4]);
      assertFalse(cursor.next());
      assertNull(cursor.getRecord());
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      StaticUtils.close(cursor);
      if (replicaDB != null)
        replicaDB.shutdown();
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy, final CSN expectedCSN)
      throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isTrue();
      softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  /**
   * Test the logic that manages counter records in the JEReplicaDB in order to
   * optimize the oldest and newest records in the replication changelog db.
@@ -395,13 +431,22 @@
    }
    finally
    {
      if (replicaDB != null)
        replicaDB.shutdown();
      shutdown(replicaDB);
      if (dbEnv != null)
      {
        dbEnv.shutdown();
      }
      remove(replicationServer);
      TestCaseUtils.deleteDirectory(testRoot);
    }
  }
  private void shutdown(JEReplicaDB replicaDB)
  {
    if (replicaDB != null)
    {
      replicaDB.shutdown();
    }
  }
}