mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
10.32.2013 b4a1565a2ab3cd0192a1b17c026f16e151fd04ca
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -40,7 +40,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.CNIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
@@ -59,7 +62,11 @@
{
  private static int UNDEFINED_PHASE = 0;
  /** TODO JNR. */
  /**
   * Constant used to indicate the handler is in the ECL initialization phase.
   *
   * @see #getSearchPhase()
   */
  public static int INIT_PHASE = 1;
  private static int PERSISTENT_PHASE = 2;
@@ -70,7 +77,7 @@
  private String operationId;
  /** Cursor on the {@link ChangeNumberIndexDB}. */
  private ChangeNumberIndexDBCursor cnIndexDBCursor;
  private DBCursor<CNIndexRecord> cnIndexDBCursor;
  private boolean draftCompat = false;
  /**
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -41,7 +41,7 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -296,14 +296,14 @@
           *           restart as usual
           *   load this change on the delayList
           */
          ReplicaDBCursor cursor = null;
          DBCursor<UpdateMsg> cursor = null;
          try
          {
            // fill the lateQueue
            cursor = replicationServerDomain.getCursorFrom(serverState);
            while (cursor.next() && isLateQueueBelowThreshold())
            {
              lateQueue.add(cursor.getChange());
              lateQueue.add(cursor.getRecord());
            }
          }
          catch (ChangelogException e)
@@ -454,12 +454,12 @@
  private CSN findOldestCSNFromReplicaDBs()
  {
    ReplicaDBCursor cursor = null;
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      cursor = replicationServerDomain.getCursorFrom(serverState);
      cursor.next();
      return cursor.getChange().getCSN();
      return cursor.getRecord().getCSN();
    }
    catch (Exception e)
    {
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -51,7 +51,7 @@
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.*;
import org.opends.server.util.*;
@@ -630,14 +630,14 @@
      return;
    }
    ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN);
    DBCursor<UpdateMsg> cursor = rsDomain.getCursorFrom(previousCSN);
    try
    {
      int lookthroughCount = 0;
      // Walk through the changes
      cursor.next(); // first try to advance the cursor
      while (cursor.getChange() != null)
      while (cursor.getRecord() != null)
      {
        if (exportConfig != null && exportConfig.isCancelled())
        { // abort if cancelled
@@ -648,7 +648,7 @@
          break;
        }
        lookthroughCount++;
        writeChange(cursor.getChange(), ldifWriter, searchOperation,
        writeChange(cursor.getRecord(), ldifWriter, searchOperation,
            rsDomain.getBaseDN(), exportConfig != null);
        cursor.next();
      }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,7 +47,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.*;
@@ -1271,19 +1271,19 @@
  /**
   * Creates and returns a cursor across this replication domain.
   * <p>
   * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
   * to the next available record.
   * Client code must call {@link DBCursor#next()} to advance the cursor to the
   * next available record.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startAfterCSN
   *          Starting point for the cursor. If null, start from the oldest CSN
   * @return a non null {@link ReplicaDBCursor}
   * @return a non null {@link DBCursor}
   * @see ReplicationDomainDB#getCursorFrom(DN, CSN)
   */
  public ReplicaDBCursor getCursorFrom(CSN startAfterCSN)
  public DBCursor<UpdateMsg> getCursorFrom(CSN startAfterCSN)
  {
    return domainDB.getCursorFrom(baseDN, startAfterCSN);
  }
@@ -1291,20 +1291,20 @@
  /**
   * Creates and returns a cursor across this replication domain.
   * <p>
   * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor
   * to the next available record.
   * Client code must call {@link DBCursor#next()} to advance the cursor to the
   * next available record.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startAfterServerState
   *          Starting point for the replicaDB cursors. If null, start from the
   *          oldest CSN
   * @return a non null {@link ReplicaDBCursor} going from oldest to newest CSN
   * @return a non null {@link DBCursor} going from oldest to newest CSN
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
   */
  public ReplicaDBCursor getCursorFrom(ServerState startAfterServerState)
  public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
  {
    return domainDB.getCursorFrom(baseDN, startAfterServerState);
  }
opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -99,9 +99,8 @@
  long addRecord(CNIndexRecord record) throws ChangelogException;
  /**
   * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
   * db managed by this object and starting at the position defined by a given
   * changeNumber.
   * Generate a new {@link DBCursor} that allows to browse the db managed by
   * this object and starting at the position defined by a given changeNumber.
   *
   * @param startChangeNumber
   *          The position where the iterator must start.
@@ -111,7 +110,7 @@
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
  DBCursor<CNIndexRecord> getCursorFrom(long startChangeNumber)
      throws ChangelogException;
}
opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDBCursor.java
@@ -29,19 +29,22 @@
import java.io.Closeable;
/**
 * Iterator into the changelog database. Once it is not used anymore, a
 * ChangelogDBIterator must be closed to release all the resources into the
 * Generic cursor interface into the changelog database. Once it is not used
 * anymore, a cursor must be closed to release all the resources into the
 * database.
 *
 * @param <T>
 *          type of the record being returned
 */
public interface ChangeNumberIndexDBCursor extends Closeable
public interface DBCursor<T> extends Closeable
{
  /**
   * Getter for the record.
   * Getter for the current record.
   *
   * @return The current {@link CNIndexRecord}.
   * @return The current record.
   */
  CNIndexRecord getRecord();
  T getRecord();
  /**
   * Skip to the next record of the database.
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
File was deleted
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -162,36 +162,35 @@
  long getCount(DN baseDN, int serverId, CSN from, CSN to);
  /**
   * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
   * specified replication domain, with all cursors starting after the provided
   * CSN.
   * Generates a {@link DBCursor} across all the replicaDBs for the specified
   * replication domain, with all cursors starting after the provided CSN.
   * <p>
   * The cursor is already advanced to the record after startAfterCSN.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param startAfterCSN
   *          Starting point for each ReplicaDB cursor. If null, start from the
   *          oldest CSN for each ReplicaDB cursor.
   * @return a non null {@link ReplicaDBCursor}
   * @return a non null {@link DBCursor}
   * @see #getCursorFrom(DN, ServerState)
   */
  ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN);
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN);
  /**
   * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the
   * specified replication domain starting after the provided
   * {@link ServerState} for each replicaDBs.
   * Generates a {@link DBCursor} across all the replicaDBs for the specified
   * replication domain starting after the provided {@link ServerState} for each
   * replicaDBs.
   * <p>
   * The cursor is already advanced to the records after the serverState.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link ReplicaDBCursor#close()} method to free the resources and locks used
   * by the cursor.
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param baseDN
   *          the replication domain baseDN
@@ -199,10 +198,11 @@
   *          Starting point for each ReplicaDB cursor. If any CSN for a
   *          replicaDB is null, then start from the oldest CSN for this
   *          replicaDB
   * @return a non null {@link ReplicaDBCursor}
   * @return a non null {@link DBCursor}
   * @see #getCursorFrom(DN, CSN)
   */
  ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState);
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState);
  /**
   * for the specified serverId and replication domain.
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -59,8 +59,8 @@
 * This class is used for managing the replicationServer database for each
 * server in the topology. It is responsible for efficiently saving the updates
 * that is received from each master server into stable storage. This class is
 * also able to generate a {@link ChangeNumberIndexDBCursor} that can be used to
 * read all changes from a given change number.
 * also able to generate a {@link DBCursor} that can be used to read all changes
 * from a given change number.
 * <p>
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
@@ -240,7 +240,7 @@
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
  public DBCursor<CNIndexRecord> getCursorFrom(long startChangeNumber)
      throws ChangelogException
  {
    return new JEChangeNumberIndexDBCursor(db, startChangeNumber);
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
@@ -39,7 +39,7 @@
 * This class allows to iterate through the changes received from a given
 * LDAP Server Identifier.
 */
public class JEChangeNumberIndexDBCursor implements ChangeNumberIndexDBCursor
public class JEChangeNumberIndexDBCursor implements DBCursor<CNIndexRecord>
{
  private static final DebugTracer TRACER = getTracer();
  private DraftCNDBCursor draftCNDbCursor;
@@ -116,4 +116,5 @@
  {
    close();
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -54,21 +54,31 @@
{
  /**
   * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a
   * replication domain, advancing from the oldest to the newest change cross
   * {@link DBCursor} implementation that iterates across all the ReplicaDBs of
   * a replication domain, advancing from the oldest to the newest change cross
   * all replicaDBs.
   */
  private final class CrossReplicaDBCursor implements ReplicaDBCursor
  private final class CrossReplicaDBCursor implements DBCursor<UpdateMsg>
  {
    private final DN baseDN;
    private UpdateMsg currentChange;
    /**
     * The cursors are sorted based on the current change of each cursor to
     * consider the next change across all replicaDBs.
     */
    private final NavigableSet<ReplicaDBCursor> cursors =
        new TreeSet<ReplicaDBCursor>();
    private final DN baseDN;
    private final NavigableSet<DBCursor<UpdateMsg>> cursors =
        new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
        {
          @Override
          public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
          {
            final CSN csn1 = o1.getRecord().getCSN();
            final CSN csn2 = o2.getRecord().getCSN();
            return CSN.compare(csn1, csn2);
          }
        });
    public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState)
    {
@@ -81,7 +91,7 @@
      }
    }
    private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
    private DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
        CSN startAfterCSN)
    {
      JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
@@ -89,7 +99,8 @@
      {
        try
        {
          ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN);
          DBCursor<UpdateMsg> cursor =
              replicaDB.generateCursorFrom(startAfterCSN);
          cursor.next();
          return cursor;
        }
@@ -112,16 +123,16 @@
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove and eventually add again a cursor (after moving it forward).
      final ReplicaDBCursor cursor = cursors.pollFirst();
      currentChange = cursor.getChange();
      final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
      currentChange = cursor.getRecord();
      cursor.next();
      addCursorIfNotEmpty(cursor);
      return true;
    }
    void addCursorIfNotEmpty(ReplicaDBCursor cursor)
    void addCursorIfNotEmpty(DBCursor<UpdateMsg> cursor)
    {
      if (cursor.getChange() != null)
      if (cursor.getRecord() != null)
      {
        cursors.add(cursor);
      }
@@ -132,7 +143,7 @@
    }
    @Override
    public UpdateMsg getChange()
    public UpdateMsg getRecord()
    {
      return currentChange;
    }
@@ -143,15 +154,6 @@
      StaticUtils.close(cursors);
    }
    @Override
    public int compareTo(ReplicaDBCursor o)
    {
      final CSN csn1 = getChange().getCSN();
      final CSN csn2 = o.getChange().getCSN();
      return CSN.compare(csn1, csn2);
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
@@ -184,27 +186,18 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private static final ReplicaDBCursor EMPTY_CURSOR = new ReplicaDBCursor()
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
      new DBCursor<UpdateMsg>()
  {
    @Override
    public int compareTo(ReplicaDBCursor o)
    {
      if (o == null)
      {
        throw new NullPointerException(); // as per javadoc
      }
      return o == this ? 0 : -1; // equal to self, but less than all the rest
    }
    @Override
    public boolean next()
    {
      return false;
    }
    @Override
    public UpdateMsg getChange()
    public UpdateMsg getRecord()
    {
      return null;
    }
@@ -218,7 +211,7 @@
    @Override
    public String toString()
    {
      return "EmptyReplicaDBCursor";
      return "EmptyDBCursor<UpdateMsg>";
    }
  };
@@ -670,7 +663,7 @@
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN)
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, CSN startAfterCSN)
  {
    // Builds a new serverState for all the serverIds in the replication domain
    // to ensure we get cursors starting after the provided CSN.
@@ -679,7 +672,7 @@
  /** {@inheritDoc} */
  @Override
  public ReplicaDBCursor getCursorFrom(DN baseDN,
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState)
  {
    return new CrossReplicaDBCursor(baseDN, startAfterServerState);
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -43,7 +43,7 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -58,11 +58,13 @@
/**
 * This class is used for managing the replicationServer database for each
 * server in the topology.
 * <p>
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * This class is also able to generate a {@link ReplicaDBCursor} that can be
 * used to read all changes from a given {@link CSN}.
 *
 * <p>
 * This class is also able to generate a {@link DBCursor} that can be used to
 * read all changes from a given {@link CSN}.
 * <p>
 * This class publish some monitoring information below cn=monitor.
 */
public class JEReplicaDB implements Runnable
@@ -259,19 +261,18 @@
  }
  /**
   * Generate a new {@link ReplicaDBCursor} that allows to browse the db managed
   * by this ReplicaDB and starting at the position defined by a given CSN.
   * Generate a new {@link DBCursor} that allows to browse the db managed by
   * this ReplicaDB and starting at the position defined by a given CSN.
   *
   * @param startAfterCSN
   *          The position where the cursor must start. If null, start from the
   *          oldest CSN
   * @return a new {@link ReplicaDBCursor} that allows to browse the db managed
   *         by this ReplicaDB and starting at the position defined by a given
   *         CSN.
   * @return a new {@link DBCursor} that allows to browse the db managed by this
   *         ReplicaDB and starting at the position defined by a given CSN.
   * @throws ChangelogException
   *           if a database problem happened.
   */
  public ReplicaDBCursor generateCursorFrom(CSN startAfterCSN)
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
      throws ChangelogException
  {
    if (startAfterCSN == null)
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -30,13 +30,13 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
/**
 * Berkeley DB JE implementation of {@link ReplicaDBCursor}.
 * Berkeley DB JE implementation of {@link DBCursor}.
 */
public class JEReplicaDBCursor implements ReplicaDBCursor
public class JEReplicaDBCursor implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentChange;
  private ReplServerDBCursor cursor;
@@ -87,7 +87,7 @@
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getChange()
  public UpdateMsg getRecord()
  {
    return currentChange;
  }
@@ -152,16 +152,6 @@
  /** {@inheritDoc} */
  @Override
  public int compareTo(ReplicaDBCursor o)
  {
    final CSN csn1 = getChange().getCSN();
    final CSN csn2 = o.getChange().getCSN();
    return CSN.compare(csn1, csn2);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentChange=" + currentChange + ""
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -36,8 +36,8 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.CNIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.DraftCNDB.DraftCNDBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -210,7 +210,7 @@
      assertEquals(getPreviousCookie(cnIndexDB, cn2), value2);
      assertEquals(getPreviousCookie(cnIndexDB, cn3), value3);
      ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(cn1);
      DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
      cursor = cnIndexDB.getCursorFrom(cn2);
@@ -244,7 +244,7 @@
  private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,
      long changeNumber) throws Exception
  {
    ChangeNumberIndexDBCursor cursor = cnIndexDB.getCursorFrom(changeNumber);
    DBCursor<CNIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber);
    try
    {
      return cursor.getRecord().getPreviousCookie();
@@ -255,7 +255,7 @@
    }
  }
  private void assertCursorReadsInOrder(ChangeNumberIndexDBCursor cursor,
  private void assertCursorReadsInOrder(DBCursor<CNIndexRecord> cursor,
      long... sns) throws ChangelogException
  {
    try
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -38,10 +38,11 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
@@ -201,17 +202,17 @@
      return;
    }
    ReplicaDBCursor cursor = replicaDB.generateCursorFrom(csns[0]);
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
    try
    {
      assertNull(cursor.getChange());
      assertNull(cursor.getRecord());
      for (int i = 1; i < csns.length; i++)
      {
        assertTrue(cursor.next());
        assertEquals(cursor.getChange().getCSN(), csns[i]);
        assertEquals(cursor.getRecord().getCSN(), csns[i]);
      }
      assertFalse(cursor.next());
      assertNull(cursor.getChange(), "Actual change=" + cursor.getChange()
      assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
          + ", Expected null");
    }
    finally
@@ -222,7 +223,7 @@
  private void assertNotFound(JEReplicaDB replicaDB, CSN csn)
  {
    ReplicaDBCursor cursor = null;
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      cursor = replicaDB.generateCursorFrom(csn);
@@ -282,7 +283,7 @@
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    ReplicaDBCursor cursor = null;
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      TestCaseUtils.startServer();
@@ -301,17 +302,17 @@
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
      assertEquals(cursor.getChange().getCSN(), csns[1]);
      assertEquals(cursor.getRecord().getCSN(), csns[1]);
      StaticUtils.close(cursor);
      cursor = replicaDB.generateCursorFrom(csns[3]);
      assertTrue(cursor.next());
      assertEquals(cursor.getChange().getCSN(), csns[4]);
      assertEquals(cursor.getRecord().getCSN(), csns[4]);
      StaticUtils.close(cursor);
      cursor = replicaDB.generateCursorFrom(csns[4]);
      assertFalse(cursor.next());
      assertNull(cursor.getChange());
      assertNull(cursor.getRecord());
    }
    finally
    {