mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.55.2014 f28eee1ba07554be73881e1df417494b3968ea85
OPENDJ-1444 CR-4537 Remove previous cookie from storage of ChangeNumberIndexDB

[Note: real merge of all changelog.file package content and ChangelogBackend to be done
in one shot in a future commit]

* Implement a new matching strategy for cursors : LESS_THAN_OR_EQUAL_TO_KEY
for both je and file-based implementations

* Replace the previous cookie by the usage of an ECLMultiDomainDBCursor generated
with the medium consistency CSN as start point and the
LESS_THAN_OR_EQUAL_TO_KEY strategy
in classes ChangeNumberIndexer and ChangelogBackend

* Remove storage of the previous cookie in the log for both je and file-based
implementations
21 files modified
1061 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 5 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java 33 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java 36 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 73 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 15 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 157 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java 31 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java 25 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java 4 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java 19 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java 48 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 3 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 23 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 11 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java 23 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java 17 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 143 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 159 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java 7 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 44 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 185 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -53,6 +53,7 @@
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
@@ -1348,12 +1349,12 @@
   * @return a non null {@link DBCursor} going from oldest to newest CSN
   * @throws ChangelogException
   *           If a database problem happened
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy)
   */
  public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
      throws ChangelogException
  {
    return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
    return domainDB.getCursorFrom(baseDN, startAfterServerState, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
@@ -38,8 +38,6 @@
  /** This is the key used to store this record. */
  private final long changeNumber;
  /** This is used on startup to recover the medium consistency point. */
  private final String previousCookie;
  /** The baseDN where the change happened. */
  private final DN baseDN;
  /** The CSN of the change. */
@@ -50,36 +48,30 @@
   *
   * @param changeNumber
   *          the change number
   * @param previousCookie
   *          the previous cookie
   * @param baseDN
   *          the baseDN
   * @param csn
   *          the replication CSN field
   */
  public ChangeNumberIndexRecord(long changeNumber, String previousCookie,
      DN baseDN, CSN csn)
  public ChangeNumberIndexRecord(long changeNumber, DN baseDN, CSN csn)
  {
    this.changeNumber = changeNumber;
    this.previousCookie = previousCookie;
    this.baseDN = baseDN;
    this.csn = csn;
  }
  /**
   * Builds an instance of this class, with changeNumber equal to 0.
   *
   * @param previousCookie
   *          the previous cookie
   * @param baseDN
   *          the baseDN
   * @param csn
   *          the replication CSN field
   * @see #ChangeNumberIndexRecord(long, String, DN, CSN)
   *
   * @see #ChangeNumberIndexRecord(long, DN, CSN)
   */
  public ChangeNumberIndexRecord(String previousCookie, DN baseDN, CSN csn)
  public ChangeNumberIndexRecord(DN baseDN, CSN csn)
  {
    this(0, previousCookie, baseDN, csn);
    this(0, baseDN, csn);
  }
  /**
@@ -112,21 +104,10 @@
    return changeNumber;
  }
  /**
   * Get the previous cookie field.
   *
   * @return the previous cookie.
   */
  public String getPreviousCookie()
  {
    return previousCookie;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN
        + " previousCookie=" + previousCookie;
    return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN;
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -52,6 +52,38 @@
 * }
 * </pre>
 *
 * A cursor can be initialised from a key, using a {@code KeyMatchingStrategy} and
 * a {@code PositionStrategy}, to determine the exact starting position.
 * <p>
 * Let's call Kp the highest key lower than K and Kn the lowest key higher
 * than K : Kp &lt; K &lt; Kn
 * <ul>
 *  <li>When using EQUAL_TO_KEY on key K :
 *   <ul>
 *    <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log),
 *        otherwise it is empty</li>
 *    <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if K exists in log),
 *        otherwise it is empty</li>
 *   </ul>
 *  </li>
 *  <li>When using LESS_THAN_OR_EQUAL_TO_KEY on key K :
 *   <ul>
 *    <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log)
 *        or else Kp (if Kp exists in log), otherwise it is empty</li>
 *    <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if Kp or K exist in log),
 *        otherwise it is empty</li>
 *   </ul>
 *  </li>
 *  <li>When using GREATER_THAN_OR_EQUAL_TO_KEY on key K :
 *   <ul>
 *    <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log)
 *        or else Kn (if Kn exists in log), otherwise it is empty</li>
 *    <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if K or Kn exist in log),
 *        otherwise it is empty</li>
 *   </ul>
 *  </li>
 * </ul>
 *
 * @param <T>
 *          type of the record being returned
 * \@NotThreadSafe
@@ -61,9 +93,11 @@
  /**
   * Represents a cursor key matching strategy, which allow to choose if only
   * the exact key must be found or if any key equals or higher should match.
   * the exact key must be found or if any key equal or lower/higher should match.
   */
  public enum KeyMatchingStrategy {
    /** matches if the key or a lower key is found. */
    LESS_THAN_OR_EQUAL_TO_KEY,
    /** matches only if the exact key is found. */
    EQUAL_TO_KEY,
    /** matches if the key or a greater key is found. */
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -31,6 +31,7 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
@@ -38,6 +39,15 @@
/**
 * This interface allows to query or control the replication domain database(s)
 * (composed of one or more ReplicaDBs) and query/update each ReplicaDB.
 * <p>
 * In particular, the {@code getCursorFom()} methods allow to obtain a cursor at any level:
 * <ul>
 *  <li>Across all the domains, provided a {@link MultiDomainServerState}</li>
 *  <li>Across all replicaDBs of a domain, provided a {@link ServerState}</li>
 *  <li>On one replica DB for a domain and serverId, provided a CSN</li>
 * </ul>
 * The cursor starting point is specified by providing a key, a {@link KeyMatchingStrategy} and
 * a {@link PositionStrategy}.
 */
public interface ReplicationDomainDB
{
@@ -95,8 +105,9 @@
  void removeDomain(DN baseDN) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting at or after the
   * provided {@link MultiDomainServerState} for each domain.
   * Generates a {@link DBCursor} across all the domains starting before, at or
   * after the provided {@link MultiDomainServerState} for each domain,
   * depending on the provided matching and positioning strategies.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -106,21 +117,22 @@
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   *          Cursor position strategy
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
   * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting at or after
   * the provided {@link MultiDomainServerState} for each domain, excluding a
   * provided set of domain DNs.
   * Generates a {@link DBCursor} across all the domains starting before, at or
   * after the provided {@link MultiDomainServerState} for each domain,
   * excluding a provided set of domain DNs.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -130,25 +142,25 @@
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which exact
   *          position the cursor must start
   *          Cursor position strategy
   * @param excludedDomainDns
   *          Every domain appearing in this set is excluded from the cursor
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
   * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy,
      Set<DN> excludedDomainDns) throws ChangelogException;
  // serverId methods
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the replicaDBs for the specified
   * replication domain starting at or after the provided {@link ServerState} for each
   * replicaDBs.
   * replication domain starting before, at or after the provided
   * {@link ServerState} for each replicaDB, depending on the provided matching
   * and positioning strategies.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -160,20 +172,22 @@
   *          Starting point for each ReplicaDB cursor. If any CSN for a
   *          replicaDB is null, then start from the oldest CSN for this
   *          replicaDB
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   *          Cursor position strategy
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
   * @see #getCursorFrom(DN, int, CSN, KeyMatchingStrategy, PositionStrategy)
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} for one replicaDB for the specified
   * replication domain and serverId starting at or after the provided {@link CSN}.
   * replication domain and serverId starting beofre, at or after the provided
   * {@link CSN}, depending on the provided matching and positioning strategies.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -186,15 +200,16 @@
   * @param startCSN
   *          Starting point for the ReplicaDB cursor. If the CSN is null, then
   *          start from the oldest CSN for this replicaDB
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   *          Cursor position strategy
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
      throws ChangelogException;
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException;
  /**
   * Unregisters the provided cursor from this replication domain.
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -37,6 +37,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
@@ -93,7 +94,8 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
      PositionStrategy positionStrategy) throws ChangelogException
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
@@ -101,8 +103,8 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
      PositionStrategy positionStrategy, Set<DN> excludedDomainDns)
      throws ChangelogException
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
      Set<DN> excludedDomainDns) throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
@@ -110,7 +112,8 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
      PositionStrategy positionStrategy) throws ChangelogException
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
@@ -118,8 +121,8 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
      CSN startCSN, PositionStrategy positionStrategy)
      throws ChangelogException
      CSN startCSN, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -45,14 +45,18 @@
import org.opends.server.types.DirectoryException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * Thread responsible for inserting replicated changes into the ChangeNumber
 * Index DB (CNIndexDB for short). Only changes older than the medium
 * consistency point are inserted in the CNIndexDB. As a consequence this class
 * is also responsible for maintaining the medium consistency point.
 * Index DB (CNIndexDB for short).
 * <p>
 * Only changes older than the medium consistency point are inserted in the
 * CNIndexDB. As a consequence this class is also responsible for maintaining
 * the medium consistency point (indirectly through an
 * {@code ECLMultiDomainDBCursor}).
 */
public class ChangeNumberIndexer extends DirectoryThread
{
@@ -75,27 +79,10 @@
  /*
   * The following MultiDomainServerState fields must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently.
   * updates
   * 2) many updates can happen concurrently.
   */
  /**
   * Holds the cross domain medium consistency Replication Update Vector for the
   * current replication server, also known as the previous cookie.
   * <p>
   * Stores the value of the cookie before the change currently processed is
   * inserted in the DB. After insert, it is updated with the CSN of the change
   * currently processed (thus becoming the "current" cookie just before the
   * change is returned.
   * <p>
   * Note: This object is only updated by changes/updates.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency RUV</a>
   */
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the last time each replica was seen alive, whether via updates or
   * heartbeat notifications, or offline notifications. Data is held for each
   * serverId cross domain.
@@ -105,11 +92,10 @@
   * <p>
   * Note: This object is updated by both heartbeats and changes/updates.
   */
  private final MultiDomainServerState lastAliveCSNs =
      new MultiDomainServerState();
  private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
  /** Note: This object is updated by replica offline messages. */
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
  private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
  /**
   * Cursor across all the replicaDBs for all the replication domains. It is
@@ -312,25 +298,50 @@
  }
  /**
   * Restores in memory data needed to build the CNIndexDB, including the medium
   * consistency point.
   * Restores in memory data needed to build the CNIndexDB. In particular,
   * initializes the changes cursor to the medium consistency point.
   */
  private void initialize() throws ChangelogException, DirectoryException
  {
    final ChangeNumberIndexRecord newestRecord =
        changelogDB.getChangeNumberIndexDB().getNewestRecord();
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
    initializeLastAliveCSNs(domainDB);
    initializeNextChangeCursor(domainDB);
    initializeOfflineReplicas();
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
  {
    final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
        domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
    nextChangeForInsertDBCursor.next();
  }
  /** Returns a cookie initialised with the newest CSN for each replica. */
  private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
  {
    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
    final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
    if (newestRecord != null)
    {
      // restore the mediumConsistencyRUV from DB
      mediumConsistencyRUV.update(
          new MultiDomainServerState(newestRecord.getPreviousCookie()));
      // Do not update with the newestRecord CSN
      // as it will be used for a sanity check later in the same method
      final CSN newestCsn = newestRecord.getCSN();
      for (DN baseDN : changelogState.getDomainToServerIds().keySet())
      {
        cookieWithNewestCSN.update(baseDN, newestCsn);
      }
    }
    return cookieWithNewestCSN;
  }
    // initialize the DB cursor and the last seen updates
    // to ensure the medium consistency CSN can move forward
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
  {
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
@@ -349,34 +360,10 @@
        lastAliveCSNs.update(baseDN, latestKnownState);
      }
    }
  }
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
        domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
    {
      // restore the "previousCookie" state before shutdown
      UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      if (record instanceof ReplicaOfflineMsg)
      {
        // ignore: replica offline messages are never stored in the CNIndexDB
        nextChangeForInsertDBCursor.next();
        record = nextChangeForInsertDBCursor.getRecord();
      }
      // sanity check: ensure that when initializing the cursors at the previous
      // cookie, the next change we find is the newest record in the CNIndexDB
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
            newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
      }
      // Now we can update the mediumConsistencyRUV
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
      nextChangeForInsertDBCursor.next();
    }
  private void initializeOfflineReplicas()
  {
    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
    for (DN baseDN : offlineReplicas)
    {
@@ -391,9 +378,6 @@
        }
      }
    }
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private CSN oldestPossibleCSN(int serverId)
@@ -488,10 +472,10 @@
          // OK, the oldest change is older than the medium consistency point
          // let's publish it to the CNIndexDB.
          final String previousCookie = mediumConsistencyRUV.toString();
          final ChangeNumberIndexRecord record =
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          final long changeNumber = changelogDB.getChangeNumberIndexDB()
              .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
          MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
          notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
          moveForwardMediumConsistencyPoint(csn, baseDN);
        }
        catch (InterruptedException ignored)
@@ -521,6 +505,28 @@
  }
  /**
   * Notifies the {@link ChangelogBackend} that a new entry has been added.
   *
   * @param baseDN
   *          the baseDN of the newly added entry.
   * @param changeNumber
   *          the change number of the newly added entry. It will be greater
   *          than zero for entries added to the change number index and less
   *          than or equal to zero for entries added to any replica DB
   * @param cookie
   *          the cookie of the newly added entry. This is only meaningful for
   *          entries added to the change number index
   * @param msg
   *          the update message of the newly added entry
   * @throws ChangelogException
   *           If a problem occurs while notifying of the newly added entry.
   */
  protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
      MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
  {
  }
  /**
   * Nothing can be done about it.
   * <p>
   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
@@ -534,12 +540,8 @@
        getClass().getSimpleName(), stackTraceToSingleLineString(e));
  }
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -560,7 +562,6 @@
         * from the medium consistency RUV).
         */
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -34,6 +34,8 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * {@link DBCursor} implementation that iterates across a Collection of
 * {@link DBCursor}s, advancing from the oldest to the newest change cross all
@@ -214,6 +216,35 @@
    return null;
  }
  /**
   * Returns a snapshot of this cursor.
   *
   * @return a list of (Data, UpdateMsg) pairs representing the state of the
   *         cursor. In each pair, the data or the update message may be
   *         {@code null}, but at least one of them is non-null.
   */
  public List<Pair<Data, UpdateMsg>> getSnapshot()
  {
    final List<Pair<Data, UpdateMsg>> snapshot = new ArrayList<Pair<Data, UpdateMsg>>();
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      final UpdateMsg updateMsg = entry.getKey().getRecord();
      final Data data = entry.getValue();
      if (updateMsg != null || data != null)
      {
        snapshot.add(Pair.of(data, updateMsg));
      }
    }
    for (Data data : exhaustedCursors.values())
    {
      if (data != null)
      {
        snapshot.add(Pair.of(data, (UpdateMsg) null));
      }
    }
    return snapshot;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -46,14 +46,14 @@
  private final DN baseDN;
  private final ReplicationDomainDB domainDB;
  private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
      new ConcurrentSkipListMap<Integer, CSN>();
  private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<Integer, CSN>();
  /**
   * Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
   */
  private static final CSN NULL_CSN = new CSN(0, 0, 0);
  private final PositionStrategy positionStrategy;
  private final KeyMatchingStrategy matchingStrategy;
  /**
   * Builds a DomainDBCursor instance.
@@ -62,14 +62,19 @@
   *          the replication domain baseDN of this cursor
   * @param domainDB
   *          the DB for the provided replication domain
   * @param matchingStrategy
   *          Cursor key matching strategy, which allow to indicates how key is
   *          matched
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   *          Cursor position strategy, which allow to indicates at which exact
   *          position the cursor must start
   */
  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
  public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
  {
    this.baseDN = baseDN;
    this.domainDB = domainDB;
    this.matchingStrategy = matchingStrategy;
    this.positionStrategy = positionStrategy;
  }
@@ -89,13 +94,13 @@
   *
   * @param serverId
   *          the serverId of the replica
   * @param startAfterCSN
   *          the CSN after which to start iterating
   * @param startCSN
   *          the CSN to use as a starting point
   */
  public void addReplicaDB(int serverId, CSN startAfterCSN)
  public void addReplicaDB(int serverId, CSN startCSN)
  {
    // only keep the oldest CSN that will be the new cursor's starting point
    newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
    newReplicas.putIfAbsent(serverId, startCSN != null ? startCSN : NULL_CSN);
  }
  /** {@inheritDoc} */
@@ -109,7 +114,7 @@
      final CSN csn = pair.getValue();
      final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
      final DBCursor<UpdateMsg> cursor =
          domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
          domainDB.getCursorFrom(baseDN, serverId, startCSN, matchingStrategy, positionStrategy);
      addCursor(cursor, null);
      iter.remove();
    }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -88,9 +88,7 @@
    {
      final long changeNumber = record.getChangeNumber();
      DatabaseEntry key = new ReplicationDraftCNKey(changeNumber);
      DatabaseEntry data = new DraftCNData(changeNumber,
          record.getPreviousCookie(), record.getBaseDN().toNormalizedString(),
          record.getCSN());
      DatabaseEntry data = new DraftCNData(changeNumber, record.getBaseDN().toNormalizedString(), record.getCSN());
      // Use a transaction so that we can override durability.
      Transaction txn = null;
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
@@ -45,6 +45,8 @@
{
  private static final String FIELD_SEPARATOR = "!";
  private static final String EMPTY_STRING_PREVIOUS_COOKIE = "";
  private static final long serialVersionUID = 1L;
  private long changeNumber;
@@ -55,19 +57,17 @@
   *
   * @param changeNumber
   *          the change number
   * @param previousCookie
   *          The previous cookie
   * @param baseDN
   *          The baseDN (domain DN)
   * @param csn
   *          The replication CSN
   */
  public DraftCNData(long changeNumber, String previousCookie, String baseDN,
      CSN csn)
  public DraftCNData(long changeNumber, String baseDN, CSN csn)
  {
    this.changeNumber = changeNumber;
    String record =
        previousCookie + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn;
    // Although the previous cookie is not used any more, we need
    // to keep it in database for compatibility with previous versions
    String record = EMPTY_STRING_PREVIOUS_COOKIE + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn;
    setData(getBytes(record));
  }
@@ -102,11 +102,14 @@
  {
    try
    {
      // Although the previous cookie is not used any more, we need
      // to keep it in database for compatibility with previous versions
      String stringData = new String(data, "UTF-8");
      String[] str = stringData.split(FIELD_SEPARATOR, 3);
      // str[0] contains previous cookie and is ignored
      final DN baseDN = DN.valueOf(str[1]);
      final CSN csn = new CSN(str[2]);
      return new ChangeNumberIndexRecord(changeNumber, str[0], baseDN, csn);
      return new ChangeNumberIndexRecord(changeNumber, baseDN, csn);
    }
    catch (UnsupportedEncodingException e)
    {
@@ -130,7 +133,9 @@
  public ChangeNumberIndexRecord getRecord() throws ChangelogException
  {
    if (record == null)
    {
      record = decodeData(changeNumber, getData());
    }
    return record;
  }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -24,11 +24,17 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.ArrayList;
import java.util.List;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import com.forgerock.opendj.util.Pair;
/**
 * Multi domain DB cursor that only returns updates for the domains which have
 * been enabled for the external changelog.
@@ -110,4 +116,46 @@
  {
    return getClass().getSimpleName() + " cursor=[" + cursor + ']';
  }
  /**
   * Returns a snapshot of this cursor.
   *
   * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled
   *         for the external changelog. The update message may be {@code null}.
   */
  List<Pair<DN, UpdateMsg>> getSnapshot()
  {
    final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot();
    final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>();
    for (Pair<DN, UpdateMsg> pair : snapshot)
    {
      DN baseDN = pair.getFirst();
      if (predicate.isECLEnabledDomain(baseDN))
      {
        eclSnapshot.add(pair);
      }
    }
    return eclSnapshot;
  }
  /**
   * Returns the cookie corresponding to the state of this cursor.
   *
   * @return a valid cookie taking into account only the base DNs enabled for
   *         the external changelog
   */
  public MultiDomainServerState toCookie()
  {
    List<Pair<DN, UpdateMsg>> snapshot = getSnapshot();
    MultiDomainServerState cookie = new MultiDomainServerState();
    for (Pair<DN, UpdateMsg> pair : snapshot)
    {
      // only put base DNs where a CSN is available in the cookie
      if (pair.getSecond() != null)
      {
        cookie.update(pair.getFirst(), pair.getSecond().getCSN());
      }
    }
    return cookie;
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -117,8 +117,7 @@
  {
    long changeNumber = nextChangeNumber();
    final ChangeNumberIndexRecord newRecord =
        new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(),
            record.getBaseDN(), record.getCSN());
        new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
    db.addRecord(newRecord);
    newestChangeNumber = changeNumber;
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -699,18 +700,19 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final Set<DN> excludedDomainDns = Collections.emptySet();
    return getCursorFrom(startState, positionStrategy, excludedDomainDns);
    return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy, final  Set<DN> excludedDomainDns) throws ChangelogException
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
      final  Set<DN> excludedDomainDns) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
@@ -724,9 +726,9 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      // get the last already sent CSN from that server to get a cursor
@@ -736,11 +738,12 @@
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
@@ -768,12 +771,12 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
@@ -182,17 +183,19 @@
   * @param startCSN
   *          The position where the cursor must start. If null, start from the
   *          oldest CSN
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   *          Cursor position strategy
   * @return a new {@link DBCursor} that allows to browse the db managed by this
   *         ReplicaDB and starting at the position defined by a given CSN.
   * @throws ChangelogException
   *           if a database problem happened
   */
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
      throws ChangelogException
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
    return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this);
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,6 +32,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
@@ -42,8 +43,10 @@
class JEReplicaDBCursor implements DBCursor<UpdateMsg>
{
  private final ReplicationDB db;
  private final PositionStrategy positionStrategy;
  private PositionStrategy positionStrategy;
  private KeyMatchingStrategy matchingStrategy;
  private JEReplicaDB replicaDB;
  private final CSN startCSN;
  private CSN lastNonNullCurrentCSN;
  private ReplServerDBCursor cursor;
  private UpdateMsg currentChange;
@@ -57,19 +60,23 @@
   * @param startCSN
   *          The CSN after which the cursor must start.If null, start from the
   *          oldest CSN
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   *          Cursor position strategy
   * @param replicaDB
   *          The associated JEReplicaDB.
   * @throws ChangelogException
   *          if a database problem happened.
   */
  public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy,
      JEReplicaDB replicaDB) throws ChangelogException
  public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy, JEReplicaDB replicaDB) throws ChangelogException
  {
    this.db = db;
    this.matchingStrategy = matchingStrategy;
    this.positionStrategy = positionStrategy;
    this.replicaDB = replicaDB;
    this.startCSN = startCSN;
    this.lastNonNullCurrentCSN = startCSN;
  }
@@ -94,7 +101,13 @@
        // if following code is called while the cursor is closed.
        // It is better to let the deadlock happen to help quickly identifying
        // and fixing such issue with unit tests.
        cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy);
        if (lastNonNullCurrentCSN != startCSN)
        {
          // re-initialize to further CSN, take care to use appropriate strategies
          matchingStrategy = GREATER_THAN_OR_EQUAL_TO_KEY;
          positionStrategy = AFTER_MATCHING_KEY;
        }
        cursor = db.openReadCursor(lastNonNullCurrentCSN, matchingStrategy, positionStrategy);
      }
    }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -47,6 +47,8 @@
  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
      new ConcurrentSkipListMap<DN, ServerState>();
  private final KeyMatchingStrategy matchingStrategy;
  private final PositionStrategy positionStrategy;
  /**
@@ -54,13 +56,16 @@
   *
   * @param domainDB
   *          the replication domain management DB
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which
   *          exact position the cursor must start
   *          Cursor position strategy
   */
  public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
  public MultiDomainDBCursor(final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
  {
    this.domainDB = domainDB;
    this.matchingStrategy = matchingStrategy;
    this.positionStrategy = positionStrategy;
  }
@@ -75,8 +80,7 @@
   */
  public void addDomain(DN baseDN, ServerState startAfterState)
  {
    newDomains.put(baseDN,
        startAfterState != null ? startAfterState : new ServerState());
    newDomains.put(baseDN, startAfterState != null ? startAfterState : new ServerState());
  }
  /** {@inheritDoc} */
@@ -89,7 +93,8 @@
      final Entry<DN, ServerState> entry = iter.next();
      final DN baseDN = entry.getKey();
      final ServerState serverState = entry.getValue();
      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
      final DBCursor<UpdateMsg> domainDBCursor =
          domainDB.getCursorFrom(baseDN, serverState, matchingStrategy, positionStrategy);
      addCursor(domainDBCursor, baseDN);
      iter.remove();
    }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -39,6 +39,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -49,6 +50,8 @@
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -289,15 +292,18 @@
   * @param startCSN
   *          The CSN from which the cursor must start.If null, start from the
   *          oldest CSN
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          indicates at which exact position the cursor must start
   *          Cursor position strategy
   * @return The ReplServerDBCursor.
   * @throws ChangelogException
   *           If a database problem happened
   */
  ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN, positionStrategy);
    return new ReplServerDBCursor(startCSN, matchingStrategy, positionStrategy);
  }
  /**
@@ -447,6 +453,31 @@
    return serverId + " " + baseDN.toNormalizedString();
  }
  /** Hold a cursor and an indicator of wether the cursor should be considered as empty. */
  private static class CursorWithEmptyIndicator
  {
    private Cursor cursor;
    private boolean isEmpty;
    private CursorWithEmptyIndicator(Cursor localCursor, boolean isEmpty)
    {
      this.cursor = localCursor;
      this.isEmpty = isEmpty;
    }
    /** Creates cursor considered as empty. */
    static CursorWithEmptyIndicator createEmpty(Cursor cursor)
    {
      return new CursorWithEmptyIndicator(cursor, true);
    }
    /** Creates cursor considered as non-empty. */
    static CursorWithEmptyIndicator createNonEmpty(Cursor cursor)
    {
      return new CursorWithEmptyIndicator(cursor, false);
    }
  }
  /**
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
@@ -460,7 +491,7 @@
     * <p>
     * Will be set non null for a write cursor
     */
    private final Cursor cursor;
    private Cursor cursor;
    private final DatabaseEntry key;
    private final DatabaseEntry data;
    /** \@Null for read cursors, \@NotNull for deleting cursors. */
@@ -475,12 +506,16 @@
     *
     * @param startCSN
     *          The CSN from which the cursor must start.
     * @param matchingStrategy
     *          Cursor key matching strategy, which allow to indicates how key
     *          is matched
     * @param positionStrategy
     *          indicates at which exact position the cursor must start
     * @throws ChangelogException
     *           When the startCSN does not exist.
     */
    private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
    private ReplServerDBCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
        throws ChangelogException
    {
      key = createReplicationKey(startCSN);
      data = new DatabaseEntry();
@@ -491,8 +526,7 @@
      // unlock it when throwing an exception.
      dbCloseLock.readLock().lock();
      boolean cursorHeld = false;
      Cursor localCursor = null;
      CursorWithEmptyIndicator maybeEmptyCursor = null;
      try
      {
        // If the DB has been closed then create empty cursor.
@@ -503,35 +537,15 @@
          return;
        }
        localCursor = db.openCursor(txn, null);
        if (startCSN != null
            && localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS)
        maybeEmptyCursor = generateCursor(startCSN, matchingStrategy, positionStrategy);
        if (maybeEmptyCursor.isEmpty)
        {
          // We could not move the cursor to the expected startCSN
          if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
          {
            // We could not even move the cursor close to it
            // => return empty cursor
            isClosed = true;
            cursor = null;
            return;
          }
          if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
          {
            // We can move close to the startCSN.
            // Let's create a cursor from that point.
            key.setData(null);
            if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              localCursor.close();
              localCursor = db.openCursor(txn, null);
            }
          }
          isClosed = true;
          cursor = null;
          return;
        }
        cursor = localCursor;
        cursorHeld = cursor != null;
        cursor = maybeEmptyCursor.cursor;
        if (key.getData() != null)
        {
          computeCurrentRecord();
@@ -543,13 +557,72 @@
      }
      finally
      {
        if (!cursorHeld)
        if (maybeEmptyCursor != null && maybeEmptyCursor.isEmpty)
        {
          closeAndReleaseReadLock(localCursor);
          closeAndReleaseReadLock(maybeEmptyCursor.cursor);
        }
      }
    }
    /** Generate a possibly empty cursor with the provided start CSN and strategies. */
    private CursorWithEmptyIndicator generateCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
        PositionStrategy positionStrategy)
    {
      Cursor cursor = db.openCursor(txn, null);
      boolean isCsnFound = startCSN == null || cursor.getSearchKey(key, data, LockMode.DEFAULT) == SUCCESS;
      if (!isCsnFound)
      {
        if (matchingStrategy == EQUAL_TO_KEY)
        {
          return CursorWithEmptyIndicator.createEmpty(cursor);
        }
        boolean isGreaterCsnFound = cursor.getSearchKeyRange(key, data, DEFAULT) == SUCCESS;
        if (isGreaterCsnFound)
        {
          if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && positionStrategy == AFTER_MATCHING_KEY)
          {
            // Move backward so that the first call to next() points to this greater csn
            key.setData(null);
            if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              // Edge case: we're at the beginning of the database
              cursor.close();
              cursor = db.openCursor(txn, null);
            }
          }
          else if (matchingStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
          {
            // Move backward to point on the lower csn
            key.setData(null);
            if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
            {
              // Edge case: we're at the beginning of the log, there is no lower csn
              return CursorWithEmptyIndicator.createEmpty(cursor);
            }
          }
        }
        else
        {
          if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
          {
            // There is no greater csn
            return CursorWithEmptyIndicator.createEmpty(cursor);
          }
          // LESS_THAN_OR_EQUAL_TO_KEY case : the lower csn is the highest csn available
          key.setData(null);
          boolean isLastKeyFound = cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS;
          if (!isLastKeyFound)
          {
            // Edge case: empty database
            cursor.close();
            cursor = db.openCursor(txn, null);
          }
        }
      }
      return CursorWithEmptyIndicator.createNonEmpty(cursor);
    }
    private ReplServerDBCursor() throws ChangelogException
    {
      key = new DatabaseEntry();
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.*;
@@ -54,6 +55,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
@@ -138,7 +140,6 @@
  private Map<DN, ServerState> domainNewestCSNs;
  private ECLEnabledDomainPredicate predicate;
  private ChangeNumberIndexer cnIndexer;
  private MultiDomainServerState initialCookie;
  @BeforeClass
  public static void classSetup() throws Exception
@@ -160,17 +161,16 @@
  {
    MockitoAnnotations.initMocks(this);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
    initialState = new ChangelogState();
    initialCookie = new MultiDomainServerState();
    replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
    domainDBCursors = new HashMap<DN, DomainDBCursor>();
    domainNewestCSNs = new HashMap<DN, ServerState>();
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
      .thenReturn(multiDomainCursor);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class),
        eq(LESS_THAN_OR_EQUAL_TO_KEY), eq(AFTER_MATCHING_KEY))).thenReturn(multiDomainCursor);
  }
  @AfterMethod
@@ -179,18 +179,18 @@
    stopCNIndexer();
  }
  private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
  private static final String NO_DS = "noDS";
  @Test
  public void emptyDBNoDS() throws Exception
  public void noDS() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneDS() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void oneDS() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -202,23 +202,8 @@
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBOneDS() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setCNIndexDBInitialRecords(msg1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSs() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSs() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -236,8 +221,8 @@
    assertExternalChangelogContent(msg1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsDifferentDomains() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsDifferentDomains() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
    addReplica(BASE_DN1, serverId1);
@@ -272,8 +257,8 @@
   * CompositeDBCursor currentRecord == Upd2.<li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsDoesNotLoseChanges() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -301,34 +286,8 @@
    assertExternalChangelogContent(msg1, msg2, msg3);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoDSs() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setCNIndexDBInitialRecords(msg1, msg2);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg3, msg4);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
    publishUpdateMsg(msg5);
    assertExternalChangelogContent(msg3);
    final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
    publishUpdateMsg(msg6);
    assertExternalChangelogContent(msg3, msg4, msg5);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneSendsNoUpdatesForSomeTime() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -345,8 +304,8 @@
    assertExternalChangelogContent(msg1Sid2, msg2Sid1);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void threeDSsOneIsNotECLEnabledDomain() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(ADMIN_DATA_DN, serverId1);
@@ -367,8 +326,8 @@
    assertExternalChangelogContent(msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void oneInitialDSAnotherDSJoining() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -389,8 +348,8 @@
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void oneInitialDSAnotherDSJoining2() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -409,8 +368,8 @@
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneSendingHeartbeats() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -427,8 +386,8 @@
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneGoingOffline() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneGoingOffline() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -461,8 +420,8 @@
    assertExternalChangelogContent(msg1, msg2, msg4, msg5);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneInitiallyOffline() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -495,8 +454,8 @@
   * <li>RS starts</li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneInitiallyWithChangesThenOffline() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -540,8 +499,8 @@
   * <li>RS starts</li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -564,8 +523,8 @@
    assertExternalChangelogContent(msg2, msg3, msg4);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneKilled() throws Exception
  @Test(dependsOnMethods = { NO_DS })
  public void twoDSsOneKilled() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
@@ -598,16 +557,16 @@
      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
      if (domainDBCursor == null)
      {
        domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
        domainDBCursor = new DomainDBCursor(baseDN, domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
        domainDBCursors.put(baseDN, domainDBCursor);
        multiDomainCursor.addDomain(baseDN, null);
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
            .thenReturn(domainDBCursor);
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(LESS_THAN_OR_EQUAL_TO_KEY),
            eq(AFTER_MATCHING_KEY))).thenReturn(domainDBCursor);
      }
      domainDBCursor.addReplicaDB(serverId, null);
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
          .thenReturn(replicaDBCursor);
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(LESS_THAN_OR_EQUAL_TO_KEY),
          eq(AFTER_MATCHING_KEY))).thenReturn(replicaDBCursor);
    }
    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
@@ -636,7 +595,16 @@
        return eclEnabledDomains.contains(baseDN);
      }
    };
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate);
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate)
    {
      /** {@inheritDoc} */
      @Override
      protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
          MultiDomainServerState previousCookie, UpdateMsg msg) throws ChangelogException
      {
        // avoid problems with ChangelogBackend initialization
      }
    };
    cnIndexer.start();
    waitForWaitingState(cnIndexer);
  }
@@ -661,28 +629,6 @@
    return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
  }
  private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
  {
    // Initialize the previous cookie that will be used to compare the records
    // added to the CNIndexDB at the end of this test
    for (int i = 0; i < msgs.length; i++)
    {
      ReplicatedUpdateMsg msg = msgs[i];
      if (i + 1 == msgs.length)
      {
        final ReplicatedUpdateMsg newestMsg = msg;
        final DN baseDN = newestMsg.getBaseDN();
        final CSN csn = newestMsg.getCSN();
        when(cnIndexDB.getNewestRecord()).thenReturn(
            new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
        final SequentialDBCursor cursor =
            replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
        cursor.add(newestMsg);
      }
      initialCookie.update(msg.getBaseDN(), msg.getCSN());
    }
  }
  private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception
  {
    for (ReplicatedUpdateMsg msg : msgs)
@@ -758,9 +704,6 @@
    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
    final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
    // clone initial state to avoid modifying it
    final MultiDomainServerState previousCookie =
        new MultiDomainServerState(initialCookie.toString());
    // check it was not called more than expected
    String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">";
    assertThat(allValues).as(desc1).hasSize(expectedMsgs.length);
@@ -772,8 +715,6 @@
      String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">";
      assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN());
      assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN());
      assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
      previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN());
    }
  }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java
@@ -40,9 +40,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
@SuppressWarnings("javadoc")
public class ECLMultiDomainDBCursorTest extends DirectoryServerTestCase
@@ -67,7 +68,7 @@
  {
    TestCaseUtils.startFakeServer();
    MockitoAnnotations.initMocks(this);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, ON_MATCHING_KEY);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
    eclCursor = new ECLMultiDomainDBCursor(predicate, multiDomainCursor);
  }
@@ -164,7 +165,7 @@
  private void addDomainCursorToCursor(DN baseDN, SequentialDBCursor cursor) throws ChangelogException
  {
    final ServerState state = new ServerState();
    when(domainDB.getCursorFrom(baseDN, state, ON_MATCHING_KEY)).thenReturn(cursor);
    when(domainDB.getCursorFrom(baseDN, state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY)).thenReturn(cursor);
    multiDomainCursor.addDomain(baseDN, state);
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -58,7 +58,6 @@
{
  private final MultiDomainServerState previousCookie =
      new MultiDomainServerState();
  private final List<String> cookies = new ArrayList<String>();
  private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
@@ -74,13 +73,6 @@
    setReplicationDBImplementation(previousDBImpl);
  }
  @BeforeMethod
  public void clearCookie()
  {
    previousCookie.clear();
    cookies.clear();
  }
  /**
   * This test makes basic operations of a JEChangeNumberIndexDB:
   * <ol>
@@ -124,11 +116,11 @@
      try
      {
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
        assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
        assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
        assertTrue(cursor.next());
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
        assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
        assertFalse(cursor.next());
      }
      finally
@@ -154,19 +146,13 @@
  private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
  {
    final String cookie = previousCookie.toString();
    cookies.add(cookie);
    final long changeNumber = cnIndexDB.addRecord(
        new ChangeNumberIndexRecord(cookie, baseDN, csn));
    previousCookie.update(baseDN, csn);
    return changeNumber;
    return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
  }
  private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
  private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
  {
    assertEquals(record.getCSN(), csn);
    assertEquals(record.getBaseDN(), baseDN);
    assertEquals(record.getPreviousCookie(), cookie);
  }
  private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
@@ -220,10 +206,6 @@
      assertEquals(cnIndexDB.count(), 3, "Db count");
      assertFalse(cnIndexDB.isEmpty());
      assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
      assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
      assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -262,7 +244,6 @@
    final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
    assertEquals(oldest.getChangeNumber(), newestChangeNumber);
    assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
    assertEquals(oldest.getPreviousCookie(), newest.getPreviousCookie());
    assertEquals(oldest.getBaseDN(), newest.getBaseDN());
    assertEquals(oldest.getCSN(), newest.getCSN());
  }
@@ -277,21 +258,6 @@
    return new ReplicationServer(cfg);
  }
  private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,
      long changeNumber) throws Exception
  {
    DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber);
    try
    {
      cursor.next();
      return cursor.getRecord().getPreviousCookie();
    }
    finally
    {
      StaticUtils.close(cursor);
    }
  }
  private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
      long... cns) throws ChangelogException
  {
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
@@ -46,13 +47,17 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
@@ -68,6 +73,8 @@
  private DN TEST_ROOT_DN;
  private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
  private ReplicationServer replicationServer;
  private JEReplicaDB replicaDB;
  @BeforeClass
  public void setDBImpl()
@@ -99,47 +106,131 @@
    TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
  }
  @Test
  public void testGenerateCursorFrom() throws Exception
  @DataProvider
  Object[][] cursorData()
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    // create 7 csns
    final CSN[] sevenCsns = generateCSNs(1, System.currentTimeMillis(), 7);
    CSN beforeCsn = sevenCsns[0];
    CSN middleCsn = sevenCsns[3]; // will be between csns[1] and csns[2]
    CSN afterCsn = sevenCsns[6];
    // but use only 4 of them for update msg
    // beforeCsn, middleCsn and afterCsn are not used
    // in order to test cursor generation from a key not present in the log (before, in the middle, after)
    final List<CSN> usedCsns = new ArrayList<CSN>(Arrays.asList(sevenCsns));
    usedCsns.remove(beforeCsn);
    usedCsns.remove(middleCsn);
    usedCsns.remove(afterCsn);
    final CSN[] csns = usedCsns.toArray(new CSN[4]);
    return new Object[][] {
      // equal matching
      { csns, beforeCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { csns, csns[0], EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
      { csns, csns[1], EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
      { csns, middleCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { csns, csns[2], EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
      { csns, csns[3], EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
      { csns, afterCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { csns, beforeCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { csns, csns[0], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
      { csns, csns[1], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
      { csns, middleCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { csns, csns[2], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
      { csns, csns[3], EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { csns, afterCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      // less than or equal matching
      { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
      { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
      { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
      { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
      { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
      { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
      { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
      { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
      { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
      { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
      { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      // greater than or equal matching
      { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
      { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
      { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
      { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
      { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
      { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
      { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
      { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 0, 3 },
      { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
      { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
      { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
      { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
      { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
      { null, null, null, null, -1, -1 } // stop line
    };
  }
  /**
   * Test the cursor with all acceptable strategies combination.
   * Creation of a replication server is costly so it is created only once on first test and cleaned after the
   * last test using the stop line in data to do so.
   */
  @Test(dataProvider="cursorData")
  public void testGenerateCursor(CSN[] csns, CSN startCsn, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy, int startIndex, int endIndex) throws Exception
  {
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      if (replicationServer == null)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
        // initialize only once
        TestCaseUtils.startServer();
        replicationServer = configureReplicationServer(100000, 10);
        replicaDB = newReplicaDB(replicationServer);
        for (CSN csn : csns)
        {
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
        }
      }
      if (csns == null)
      {
        return; // stop line, time to clean replication artefacts
      }
      for (CSN csn : csns2)
      cursor = replicaDB.generateCursorFrom(startCsn, matchingStrategy, positionStrategy);
      if (startIndex != -1)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
        assertThatCursorCanBeFullyReadFromStart(cursor, csns, startIndex, endIndex);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      for (int i = 0; i < csns2.size() - 1; i++)
      else
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
        assertThatCursorIsExhausted(cursor);
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
      close(cursor);
      if (csns == null)
      {
        // stop line, stop and remove replication
        shutdown(replicaDB);
        remove(replicationServer);
      }
    }
  }
  @Test
  void testTrim() throws Exception
  public void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
@@ -288,28 +379,42 @@
    }
  }
  private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy, final CSN expectedCSN)
      throws ChangelogException
  private void advanceCursorUpTo(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex)
      throws Exception
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
    try
    for (int i = startIndex; i <= endIndex; i++)
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isTrue();
      softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
      softly.assertAll();
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
      assertThat(cursor.getRecord().getCSN()).isEqualTo(csns[i]);
    }
    finally
    {
      close(cursor);
    }
  }
  private void assertThatCursorIsExhausted(DBCursor<UpdateMsg> cursor) throws Exception
  {
    final SoftAssertions softly = new SoftAssertions();
    softly.assertThat(cursor.next()).isFalse();
    softly.assertThat(cursor.getRecord()).isNull();
    softly.assertAll();
  }
  private void assertThatCursorCanBeFullyRead(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex)
      throws Exception
  {
    advanceCursorUpTo(cursor, csns, startIndex, endIndex);
    assertThatCursorIsExhausted(cursor);
  }
  private void assertThatCursorCanBeFullyReadFromStart(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex,
      int endIndex) throws Exception
  {
    assertThat(cursor.getRecord()).isNull();
    assertThatCursorCanBeFullyRead(cursor, csns, startIndex, endIndex);
  }
  private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
@@ -328,7 +433,7 @@
   * optimize the oldest and newest records in the replication changelog db.
   */
  @Test(groups = { "opendj-256" })
  void testGetOldestNewestCSNs() throws Exception
  public void testGetOldestNewestCSNs() throws Exception
  {
    // It's worth testing with 2 different setting for counterRecord
    // - a counter record is put every 10 Update msg in the db - just a unit
@@ -451,7 +556,7 @@
  private void assertFoundInOrder(JEReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    try
    {
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");