mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.06.2014 ab3cac04319c920ba14be59ea874e6e35f730655
Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend 
to support cn=changelog
CR-4053

Preparatory work for the changelog backend: add a new behavior to cursors, in order to be
able to start cursor at a given key instead of starting just after.
In order to avoid introducing another boolean in the methods, I created two enums to
define the behavior of a cursor.

Note that JE implementation does not yet implement the new behavior.

* DBCursor class : add two enums KeyMatchingStrategy and PositionStrategy
** The new behavior corresponds to PositionStrategy.ON_MATCHING_KEY, which allow
to position on the record with the given key (while AFTER_MATCHING_KEY position
just after the record with the given key).

* ReplicationDomainDB : add PositionStrategy argument for all methods that returns a cursor

* ReplicationServerDomain : getCursorFrom(DN, ServerState) method calls underlying method
with PositionStrategy.AFTER_MATCHING_KEY

* FileChangelogDB : add PositionStrategy argument for all methods that returns a cursor

* FileReplicaDB : add PositionStrategy argument to generateCursorFrom(CSN) method

* FileReplicaDBCursor : add PositionStrategy argument to the constructor, implement the
new behavior when position strategy is ON_MATCHING_KEY

* DomainDBCursor, MultiDomainDBCursor : add PositionStrategy argument to the constructor,
pass the strategy to underlying cursors

* Log, LogFile, BlockLogReader : implement the new behavior when position strategy
is ON_MATCHING_KEY

* ChangeNumberIndexer : use AFTER_MATCHING_KEY strategy when retrieving the
cursor (no behavior change)

* JEChangelogDB : add PositionStrategy argument for all methods that returns a cursor,
but getCursorFrom(DN, int, CSN, PositionStrategy) method does NOT implement the
new ON_MATCHING_KEY strategy.

* Update of tests classes to match method signature but no new tests added for the
new behavior (ON_MATCHING_KEY) - to be done later

18 files modified
403 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java 23 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 32 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java 64 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 32 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java 63 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java 13 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 33 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java 10 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java 16 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java 13 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 14 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -56,6 +56,7 @@
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -1361,12 +1362,12 @@
   * @return a non null {@link DBCursor} going from oldest to newest CSN
   * @throws ChangelogException
   *           If a database problem happened
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
   */
  public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
      throws ChangelogException
  {
    return domainDB.getCursorFrom(baseDN, startAfterServerState);
    return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -60,6 +60,29 @@
{
  /**
   * Represents a cursor key matching strategy, which allow to choose if only
   * the exact key must be found or if any key equals or higher should match.
   */
  public enum KeyMatchingStrategy {
    /** matches only if the exact key is found. */
    EQUAL_TO_KEY,
    /** matches if the key or a greater key is found. */
    GREATER_THAN_OR_EQUAL_TO_KEY
  }
  /**
   * Represents a cursor positioning strategy, which allow to choose if the start point
   * corresponds to the record at the provided key or the record just after the provided
   * key.
   */
  public enum PositionStrategy {
    /** start point is on the matching key. */
    ON_MATCHING_KEY,
    /** start point is after the matching key. */
    AFTER_MATCHING_KEY
  }
  /**
   * Getter for the current record.
   *
   * @return The current record.
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -29,6 +29,7 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
@@ -92,30 +93,33 @@
  void removeDomain(DN baseDN) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting after the
   * Generates a {@link DBCursor} across all the domains starting at or after the
   * provided {@link MultiDomainServerState} for each domain.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startAfterState
   * @param startState
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState)
   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  // serverId methods
  /**
   * Generates a {@link DBCursor} across all the replicaDBs for the specified
   * replication domain starting after the provided {@link ServerState} for each
   * replication domain starting at or after the provided {@link ServerState} for each
   * replicaDBs.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
@@ -124,21 +128,24 @@
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param startAfterState
   * @param startState
   *          Starting point for each ReplicaDB cursor. If any CSN for a
   *          replicaDB is null, then start from the oldest CSN for this
   *          replicaDB
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, int, CSN)
   * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  /**
   * Generates a {@link DBCursor} for one replicaDB for the specified
   * replication domain and serverId starting after the provided {@link CSN}.
   * replication domain and serverId starting at or after the provided {@link CSN}.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -148,14 +155,17 @@
   *          the replication domain baseDN of the replicaDB
   * @param serverId
   *          the serverId of the replicaDB
   * @param startAfterCSN
   * @param startCSN
   *          Starting point for the ReplicaDB cursor. If the CSN is null, then
   *          start from the oldest CSN for this replicaDB
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
      throws ChangelogException;
  /**
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -26,6 +26,8 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import java.io.Closeable;
import java.io.EOFException;
@@ -33,7 +35,10 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
@@ -135,31 +140,35 @@
  }
  /**
   * Position the reader to the record corresponding to the provided key or to
   * the nearest key (the lowest key higher than the provided key), and returns
   * the last record read.
   * Position the reader to the record corresponding to the provided key and
   * matching and positioning strategies. Returns the last record read.
   *
   * @param key
   *          Key to use as a start position. Key must not be {@code null}.
   * @param findNextRecord
   *          If {@code true}, start position is the lowest key that is higher
   *          than the provided key, otherwise start position is the provided
   *          key.
   * @param matchStrategy
   *          The key matching strategy.
   * @param positionStrategy
   *          The positioning strategy.
   * @return The pair (key_found, last_record_read). key_found is a boolean
   *         indicating if reader is successfully positioned to the key or the
   *         nearest key. last_record_read is the last record that was read.
   *         When key_found is equals to {@code false}, then last_record_read is
   *         always {@code null}. When key_found is equals to {@code true},
   *         last_record_read can be valued or be {@code null}
   *         indicating if reader is successfully positioned. last_record_read
   *         is the last record that was read. When key_found is equals to
   *         {@code false}, then last_record_read is always {@code null}. When
   *         key_found is equals to {@code true}, last_record_read can be valued
   *         or be {@code null}
   * @throws ChangelogException
   *           If an error occurs when seeking the key.
   */
  public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException
  public Pair<Boolean, Record<K,V>> seekToRecord(
      final K key,
      final KeyMatchingStrategy matchStrategy,
      final PositionStrategy positionStrategy)
          throws ChangelogException
  {
    Reject.checkNotNull(key);
    final long markerPosition = searchClosestBlockStartToKey(key);
    if (markerPosition >= 0)
    {
      return positionToKeySequentially(markerPosition, key, findNextRecord);
      return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
    }
    return Pair.of(false, null);
  }
@@ -440,39 +449,42 @@
  /**
   * Position to provided key, starting from provided block start position and
   * reading sequentially until key is found.
   * reading sequentially until key is found according to matching and
   * positioning strategies.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written.
   * @param key
   *          The key to find.
   * @param findNextRecord
   *          If {@code true}, position at the end of this method is the lowest
   *          key that is higher than the provided key, otherwise position is
   *          the provided key.
   * @param matchStrategy
   *          The key matching strategy.
   * @param positionStrategy
   *          The positioning strategy.
   * @return The pair ({@code true}, last record read) if reader is successfully
   *         positioned to the key or the nearest key (last record may be null
   *         if end of file is reached), ({@code false}, null) otherwise.
   *         positioned (last record may be null if end of file is reached), (
   *         {@code false}, null) otherwise.
   * @throws ChangelogException
   *            If an error occurs.
   *           If an error occurs.
   */
   Pair<Boolean, Record<K,V>> positionToKeySequentially(
       final long blockStartPosition,
       final K key,
       final boolean findNextRecord)
       final KeyMatchingStrategy matchStrategy,
       final PositionStrategy positionStrategy)
       throws ChangelogException {
    Record<K,V> record = readRecord(blockStartPosition);
    do {
      if (record != null)
      {
        final int keysComparison = record.getKey().compareTo(key);
        final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key);
        final boolean matches = (matchStrategy == EQUAL_TO_KEY && keysComparison == 0)
            || (matchStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && keysComparison >= 0);
        if (matches)
        {
          if (findNextRecord && keysComparison == 0)
          if (positionStrategy == AFTER_MATCHING_KEY && keysComparison == 0)
          {
            // skip key in order to position on lowest higher key
            // skip matching key
            record = readRecord();
          }
          return Pair.of(true, record);
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
@@ -60,6 +61,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -122,7 +124,7 @@
  private final AtomicBoolean shutdown = new AtomicBoolean();
  static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null);
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
  /**
   * Creates a new changelog DB.
@@ -658,37 +660,38 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
    }
    return cursor;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
@@ -715,15 +718,14 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -203,21 +204,25 @@
  }
  /**
   * Returns a cursor that allows to retrieve the update messages from this DB,
   * starting at the position defined by the smallest CSN that is strictly
   * higher than the provided CSN.
   * Returns a cursor that allows to retrieve the update messages from this DB.
   * The starting position is defined by the provided CSN and cursor
   * positioning strategy.
   *
   * @param startAfterCSN
   * @param startCSN
   *          The position where the cursor must start. If null, start from the
   *          oldest CSN
   * @param positionStrategy
   *          Cursor position strategy, which allow to choose if cursor must
   *          start from the provided CSN or just after the provided CSN.
   * @return a new {@link DBCursor} to retreive update messages.
   * @throws ChangelogException
   *           if a database problem happened
   */
  DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
      throws ChangelogException
  {
    RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
    return new FileReplicaDBCursor(cursor, startAfterCSN);
    RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startCSN, positionStrategy);
    return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -25,6 +25,9 @@
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -58,7 +61,6 @@
 */
class FileReplicaDBCursor implements DBCursor<UpdateMsg>
{
  /** The underlying cursor. */
  private final RepositionableCursor<CSN, UpdateMsg> cursor;
@@ -68,18 +70,27 @@
  /**  The CSN to re-start with in case the cursor is exhausted. */
  private CSN lastNonNullCurrentCSN;
  private PositionStrategy positionStrategy;
  /**
   * Creates the cursor from provided log cursor and start CSN.
   *
   * @param cursor
   *          The underlying log cursor to read log.
   * @param startAfterCSN
   * @param startCSN
   *          The CSN to use as a start point (excluded from cursor, the lowest
   *          CSN higher than this CSN is used as the real start point).
   * @param positionStrategy
   *          Cursor position strategy, which allow to choose if cursor must
   *          start from the provided CSN or just after the provided CSN.
   */
  FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
  FileReplicaDBCursor(
      final RepositionableCursor<CSN, UpdateMsg> cursor,
      final CSN startCSN,
      final PositionStrategy positionStrategy) {
    this.cursor = cursor;
    this.lastNonNullCurrentCSN = startAfterCSN;
    this.lastNonNullCurrentCSN = startCSN;
    this.positionStrategy = positionStrategy;
  }
  /** {@inheritDoc} */
@@ -96,19 +107,23 @@
    if (cursor.next())
    {
      nextRecord = cursor.getRecord();
      if (nextRecord.getKey().compareTo(lastNonNullCurrentCSN) > 0)
      final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN);
      if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY))
      {
        // start CSN is found, switch to position strategy that always find the next
        lastNonNullCurrentCSN = nextRecord.getKey();
        positionStrategy = AFTER_MATCHING_KEY;
        return true;
      }
    }
    // either cursor is exhausted or we still have not reached the start CSN
    return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
  }
  /** Re-initialize the cursor after the last non null CSN. */
  private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
  {
    final boolean found = cursor.positionTo(lastNonNullCurrentCSN, true);
    final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    if (found && cursor.next())
    {
      nextRecord = cursor.getRecord();
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
@@ -52,6 +53,8 @@
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
import org.opends.server.util.StaticUtils;
@@ -439,7 +442,7 @@
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor.positionTo(null, false);
      cursor.positionTo(null, null, null);
      registerCursor(cursor);
      return cursor;
    }
@@ -467,28 +470,35 @@
   */
  public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
  {
    return getCursor(key, false);
    return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, null);
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the smallest key that is higher than
   * the provided key.
   * Returns a cursor that allows to retrieve the records from this log.
   * The starting position is defined by the provided key and cursor
   * positioning strategy.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
   *          {@code null}, cursor will point at the first record of the log.
   * @param positionStrategy
   *          The cursor positioning strategy.
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  public RepositionableCursor<K, V> getNearestCursor(final K key, PositionStrategy positionStrategy)
      throws ChangelogException
  {
    return getCursor(key, true);
    return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
  }
  /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */
  private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException
  /**
   * Returns a cursor starting from a key, using the provided matching and
   * position strategies for the cursor.
   */
  private RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    if (key == null)
    {
@@ -503,9 +513,9 @@
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      final boolean isFound = cursor.positionTo(key, findNearest);
      // for nearest case, it is ok if the target is not found
      if (isFound || findNearest)
      final boolean isFound = cursor.positionTo(key, matchingStrategy, positionStrategy);
      // When not matching the exact key, it is ok if the target is not found
      if (isFound || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
      {
        registerCursor(cursor);
        return cursor;
@@ -936,22 +946,23 @@
  static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
  {
    /**
     * Position the cursor to the record corresponding to the provided key or to
     * the nearest key (the lowest key higher than the provided key).
     * Position the cursor to the record corresponding to the provided key and
     * provided matching and positioning strategies.
     *
     * @param key
     *          Key to use as a start position for the cursor. If key is
     *          {@code null}, use the key of the first record instead.
     * @param findNearest
     *          If {@code true}, start position is the lowest key that is higher
     *          than the provided key, otherwise start position is the provided
     *          key.
     * @return {@code true} if cursor is successfully positionned to the key or
     *         the nearest key, {@code false} otherwise.
     * @param matchStrategy
     *          The cursor key matching strategy.
     * @param positionStrategy
     *          The cursor positioning strategy.
     * @return {@code true} if cursor is successfully positioned, or
     *         {@code false} otherwise.
     * @throws ChangelogException
     *           If an error occurs when positioning cursor.
     */
    boolean positionTo(K key, boolean findNearest) throws ChangelogException;
    boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
        throws ChangelogException;
  }
  /**
@@ -1039,7 +1050,11 @@
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
    public boolean positionTo(
        final K key,
        final KeyMatchingStrategy matchStrategy,
        final PositionStrategy positionStrategy)
            throws ChangelogException
    {
      if (actAsEmptyCursor)
      {
@@ -1053,7 +1068,7 @@
        {
          switchToLogFile(logFile);
        }
        return (key == null) ? true : currentCursor.positionTo(key, findNearest);
        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
@@ -1128,7 +1143,7 @@
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
    {
      return false;
    }
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -28,6 +28,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -41,6 +42,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -335,7 +338,7 @@
   */
  LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
  {
    return getCursor(key, false);
    return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, PositionStrategy.ON_MATCHING_KEY);
  }
  /**
@@ -352,12 +355,15 @@
   */
  LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  {
    return getCursor(key, true);
    return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
  }
  /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
  private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
      throws ChangelogException
  private LogFileCursor<K, V> getCursor(
      final K key,
      final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
          throws ChangelogException
  {
    if (key == null)
    {
@@ -367,7 +373,7 @@
    try
    {
      cursor = new LogFileCursor<K, V>(this);
      cursor.positionTo(key, findNearest);
      cursor.positionTo(key, matchingStrategy, positionStrategy);
      // if target is not found, cursor is positioned at end of stream
      return cursor;
    }
@@ -628,8 +634,9 @@
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
    public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
        throws ChangelogException {
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
      final boolean found = result.getFirst();
      initialRecord = found ? result.getSecond() : null;
      return found;
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -47,6 +47,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -353,7 +354,7 @@
      }
    }
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -54,6 +54,8 @@
   */
  private static final CSN NULL_CSN = new CSN(0, 0, 0);
  private final PositionStrategy positionStrategy;
  /**
   * Builds a DomainDBCursor instance.
   *
@@ -61,11 +63,15 @@
   *          the replication domain baseDN of this cursor
   * @param domainDB
   *          the DB for the provided replication domain
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   */
  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
  {
    this.baseDN = baseDN;
    this.domainDB = domainDB;
    this.positionStrategy = positionStrategy;
  }
  /**
@@ -102,8 +108,9 @@
      final Entry<Integer, CSN> pair = iter.next();
      final int serverId = pair.getKey();
      final CSN csn = pair.getValue();
      final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
      final DBCursor<UpdateMsg> cursor =
          domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
      addCursor(cursor, null);
      iter.remove();
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.util.Reject;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
@@ -46,6 +47,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -56,6 +58,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -704,37 +707,38 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
    }
    return cursor;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
@@ -761,15 +765,18 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
        + " is not supported for the JE implementation fo changelog");
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
          replicaDB.generateCursorFrom(startCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -50,15 +50,21 @@
  private final ConcurrentSkipListSet<DN> removeDomains =
      new ConcurrentSkipListSet<DN>();
  private final PositionStrategy positionStrategy;
  /**
   * Builds a MultiDomainDBCursor instance.
   *
   * @param domainDB
   *          the replication domain management DB
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   */
  public MultiDomainDBCursor(ReplicationDomainDB domainDB)
  public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
  {
    this.domainDB = domainDB;
    this.positionStrategy = positionStrategy;
  }
  /**
@@ -86,7 +92,7 @@
      final Entry<DN, ServerState> entry = iter.next();
      final DN baseDN = entry.getKey();
      final ServerState serverState = entry.getValue();
      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
      addCursor(domainDBCursor, baseDN);
      iter.remove();
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
@@ -26,6 +26,8 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
import java.io.File;
@@ -38,6 +40,8 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
@@ -203,7 +207,11 @@
    try
    {
      reader = newReader(blockSize);
      Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest);
      KeyMatchingStrategy matchStrategy =
          findNearest ? KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY : KeyMatchingStrategy.EQUAL_TO_KEY;
      PositionStrategy posStrategy =
          findNearest ? PositionStrategy.AFTER_MATCHING_KEY : PositionStrategy.ON_MATCHING_KEY;
      Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchStrategy, posStrategy);
      assertThat(result.getFirst()).isEqualTo(expectedFound);
      assertThat(result.getSecond()).isEqualTo(expectedRecord);
@@ -331,7 +339,8 @@
      for (Integer key : keysToSeek)
      {
        final long ts = System.nanoTime();
        Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false);
        Pair<Boolean, Record<Integer, Integer>> result =
            reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
        final long te = System.nanoTime() - ts;
        if (te < minTime) minTime = te;
        if (te > maxTime) maxTime = te;
@@ -354,7 +363,8 @@
      for (Integer val : keysToSeek)
      {
        long ts = System.nanoTime();
        Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false);
        Pair<Boolean, Record<Integer, Integer>> result =
            reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
        assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
        long te = System.nanoTime() - ts;
        if (te < minTime) minTime = te;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -52,6 +53,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
 * Test the FileReplicaDB class
@@ -193,17 +195,17 @@
      }
      waitChangesArePersisted(replicaDB, 4);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[1]);
      StaticUtils.close(cursor);
      cursor = replicaDB.generateCursorFrom(csns[3]);
      cursor = replicaDB.generateCursorFrom(csns[3], AFTER_MATCHING_KEY);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[4]);
      StaticUtils.close(cursor);
      cursor = replicaDB.generateCursorFrom(csns[4]);
      cursor = replicaDB.generateCursorFrom(csns[4], AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      assertNull(cursor.getRecord());
    }
@@ -242,7 +244,7 @@
      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey]);
      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], PositionStrategy.AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
@@ -547,7 +549,7 @@
      return;
    }
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
    try
    {
      // Cursor points to a null record initially
@@ -574,7 +576,7 @@
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      cursor = replicaDB.generateCursorFrom(csn);
      cursor = replicaDB.generateCursorFrom(csn, AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      assertNull(cursor.getRecord());
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
import java.io.File;
@@ -150,14 +151,14 @@
    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null;
    try {
      // this key is the first key of the log file "key1_key2.log"
      cursor1 = log.getNearestCursor("key001");
      cursor1 = log.getNearestCursor("key001", AFTER_MATCHING_KEY);
      assertThatCursorCanBeFullyReadFromStart(cursor1, 2, 10);
      // this key is the last key of the log file "key3_key4.log"
      cursor2 = log.getNearestCursor("key004");
      cursor2 = log.getNearestCursor("key004", AFTER_MATCHING_KEY);
      assertThatCursorCanBeFullyReadFromStart(cursor2, 5, 10);
      cursor3 = log.getNearestCursor("key009");
      cursor3 = log.getNearestCursor("key009", AFTER_MATCHING_KEY);
      assertThatCursorCanBeFullyReadFromStart(cursor3, 10, 10);
    }
    finally {
@@ -171,7 +172,7 @@
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getNearestCursor("key010");
      cursor = log.getNearestCursor("key010", AFTER_MATCHING_KEY);
      // lowest higher key does not exist
      assertThatCursorIsExhausted(cursor);
@@ -188,7 +189,7 @@
    DBCursor<Record<String, String>> cursor = null;
    try {
      // key is between key005 and key006
      cursor = log.getNearestCursor("key005000");
      cursor = log.getNearestCursor("key005000", AFTER_MATCHING_KEY);
      assertThatCursorCanBeFullyReadFromStart(cursor, 6, 10);
    }
@@ -203,7 +204,7 @@
    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = log.getNearestCursor(null);
      cursor = log.getNearestCursor(null, null);
      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -44,6 +44,7 @@
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.*;
@@ -53,6 +54,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
 * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
@@ -157,7 +159,7 @@
  {
    MockitoAnnotations.initMocks(this);
    multiDomainCursor = new MultiDomainDBCursor(domainDB);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
    initialState = new ChangelogState();
    initialCookie = new MultiDomainServerState();
    replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
@@ -166,8 +168,8 @@
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
        multiDomainCursor);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
      .thenReturn(multiDomainCursor);
  }
  @AfterMethod
@@ -595,15 +597,15 @@
      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
      if (domainDBCursor == null)
      {
        domainDBCursor = new DomainDBCursor(baseDN, domainDB);
        domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
        domainDBCursors.put(baseDN, domainDBCursor);
        multiDomainCursor.addDomain(baseDN, null);
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
            .thenReturn(domainDBCursor);
      }
      domainDBCursor.addReplicaDB(serverId, null);
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
          .thenReturn(replicaDBCursor);
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -187,6 +187,7 @@
        of(msg4, baseDN1));
  }
  @Test
  public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(