mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.26.2014 39845070920c859cd1d24cb23090bfa1bfad7b1a
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,10 @@
package org.opends.server.replication.server.changelog.api;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
/**
@@ -89,6 +91,26 @@
   */
  void removeDomain(DN baseDN) throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting after the
   * provided {@link MultiDomainServerState} for each domain.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startAfterState
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
      throws ChangelogException;
  // serverId methods
  /**
@@ -102,16 +124,17 @@
   *
   * @param baseDN
   *          the replication domain baseDN
   * @param startAfterServerState
   * @param startAfterState
   *          Starting point for each ReplicaDB cursor. If any CSN for a
   *          replicaDB is null, then start from the oldest CSN for this
   *          replicaDB
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, int, CSN)
   */
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
      ServerState startAfterServerState) throws ChangelogException;
  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
      throws ChangelogException;
  /**
   * Generates a {@link DBCursor} for one replicaDB for the specified
@@ -136,6 +159,14 @@
      throws ChangelogException;
  /**
   * Unregisters the provided cursor from this replication domain.
   *
   * @param cursor
   *          the cursor to unregister.
   */
  void unregisterCursor(DBCursor<?> cursor);
  /**
   * Publishes the provided change to the changelog DB for the specified
   * serverId and replication domain. After a change has been successfully
   * published, it becomes available to be returned by the External ChangeLog.
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,15 +25,8 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -42,18 +35,15 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.*;
@@ -82,7 +72,7 @@
  private ChangelogState changelogState;
  /*
   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
   * The following MultiDomainServerState fields must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently.
   */
@@ -128,39 +118,7 @@
   *
   * @NonNull
   */
  @SuppressWarnings("unchecked")
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
  /**
   * New cursors for this Map must be created from the {@link #run()} method,
   * i.e. from the same thread that will make use of them. If this rule is not
   * obeyed, then a JE exception will be thrown about
   * "Non-transactional Cursors may not be used in multiple threads;".
   */
  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
  /**
   * Holds the newCursors that will have to be created in the next iteration
   * inside the {@link #run()} method.
   * <p>
   * This map can be updated by multiple threads.
   */
  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
          new Comparator<Pair<DN, Integer>>()
          {
            @Override
            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
            {
              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
              if (compareBaseDN == 0)
              {
                return o1.getSecond().compareTo(o2.getSecond());
              }
              return compareBaseDN;
            }
          });
  private MultiDomainDBCursor nextChangeForInsertDBCursor;
  /**
   * Builds a ChangeNumberIndexer object.
@@ -215,11 +173,8 @@
      return;
    }
    final CSN csn = updateMsg.getCSN();
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, csn);
    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
    tryNotify(oldestCSNBefore);
  }
@@ -364,40 +319,42 @@
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (!isECLEnabledDomain(baseDN))
      if (isECLEnabledDomain(baseDN))
      {
        continue;
      }
        for (Integer serverId : entry.getValue())
        {
          /*
           * initialize with the oldest possible CSN in order for medium
           * consistency to wait for all replicas to be alive before moving forward
           */
          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
        }
      for (Integer serverId : entry.getValue())
      {
        /*
         * initialize with the oldest possible CSN in order for medium
         * consistency to wait for all replicas to be alive before moving
         * forward
         */
        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
        // start after the actual CSN when initializing from the previous cookie
        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
        ensureCursorExists(baseDN, serverId, csn);
        final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
        lastAliveCSNs.update(baseDN, latestKnownState);
      }
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      lastAliveCSNs.update(baseDN, latestKnownState);
    }
    resetNextChangeForInsertDBCursor();
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
    {
      // restore the "previousCookie" state before shutdown
      final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      if (record instanceof ReplicaOfflineMsg)
      {
        // ignore: replica offline messages are never stored in the CNIndexDB
        nextChangeForInsertDBCursor.next();
        record = nextChangeForInsertDBCursor.getRecord();
      }
      // sanity check: ensure that when initializing the cursors at the previous
      // cookie, the next change we find is the newest record in the CNIndexDB
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        throw new ChangelogException(
            ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord
                .getCSN().toStringUI(), record.getCSN().toStringUI()));
        throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
            newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
      }
      // Now we can update the mediumConsistencyRUV
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
@@ -428,68 +385,6 @@
    return new CSN(0, 0, serverId);
  }
  private void resetNextChangeForInsertDBCursor() throws ChangelogException
  {
    final Map<DBCursor<UpdateMsg>, DN> cursors =
        new HashMap<DBCursor<UpdateMsg>, DN>();
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
        : this.allCursors.entrySet())
    {
      for (Entry<Integer, DBCursor<UpdateMsg>> entry2
          : entry.getValue().entrySet())
      {
        cursors.put(entry2.getValue(), entry.getKey());
      }
    }
    // CNIndexer manages the cursor itself,
    // so do not try to recycle exhausted cursors
    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
    result.next();
    nextChangeForInsertDBCursor = result;
  }
  private boolean ensureCursorExists(DN baseDN, Integer serverId,
      CSN startAfterCSN) throws ChangelogException
  {
    Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
    if (map == null)
    {
      map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
      allCursors.put(baseDN, map);
    }
    DBCursor<UpdateMsg> cursor = map.get(serverId);
    if (cursor == null)
    {
      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      cursor.next();
      map.put(serverId, cursor);
      return false;
    }
    return true;
  }
  /**
   * Returns the immediately preceding CSN.
   *
   * @param csn
   *          the CSN to use
   * @return the immediately preceding CSN or null if the provided CSN is null.
   */
  CSN getPrecedingCSN(CSN csn)
  {
    if (csn == null)
    {
      return null;
    }
    if (csn.getSeqnum() > 0)
    {
      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
    }
    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
  }
  /** {@inheritDoc} */
  @Override
  public void initiateShutdown()
@@ -509,8 +404,7 @@
    {
      /*
       * initialize here to allow fast application start up and avoid errors due
       * cursors being created in a different thread to the one where they are
       * used.
       * cursors being created in a different thread to the one where they are used.
       */
      initialize();
@@ -520,26 +414,29 @@
        {
          if (!domainsToClear.isEmpty())
          {
            final DN cursorData = nextChangeForInsertDBCursor.getData();
            final boolean callNextOnCursor =
                cursorData == null || domainsToClear.contains(cursorData);
            while (!domainsToClear.isEmpty())
            {
              final DN baseDNToClear = domainsToClear.first();
              removeCursors(baseDNToClear);
              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
              // Only release the waiting thread
              // once this domain's state has been cleared.
              domainsToClear.remove(baseDNToClear);
            }
            resetNextChangeForInsertDBCursor();
          }
          else
          {
            final boolean createdCursors = createNewCursors();
            final boolean recycledCursors = recycleExhaustedCursors();
            if (createdCursors || recycledCursors)
            if (callNextOnCursor)
            {
              resetNextChangeForInsertDBCursor();
              // The next change to consume comes from a domain to be removed.
              // Call DBCursor.next() to ensure this domain is removed
              nextChangeForInsertDBCursor.next();
            }
          }
          // Do not call DBCursor.next() here
          // because we might not have consumed the last record,
          // for example if we could not move the MCP forward
          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
          if (msg == null)
          {
@@ -551,8 +448,13 @@
              }
              wait();
            }
            // loop to check whether new changes have been added to the
            // ReplicaDBs
            // check whether new changes have been added to the ReplicaDBs
            nextChangeForInsertDBCursor.next();
            continue;
          }
          else if (msg instanceof ReplicaOfflineMsg)
          {
            nextChangeForInsertDBCursor.next();
            continue;
          }
@@ -599,37 +501,43 @@
    }
    catch (RuntimeException e)
    {
      // Nothing can be done about it.
      // Rely on the DirectoryThread uncaught exceptions handler
      // for logging error + alert.
      // LocalizableMessage logged here gives corrective information to the administrator.
      logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
          getClass().getSimpleName(), stackTraceToSingleLineString(e));
      logUnexpectedException(e);
      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
      throw e;
    }
    catch (Exception e)
    {
      // Nothing can be done about it.
      // Rely on the DirectoryThread uncaught exceptions handler
      // for logging error + alert.
      // LocalizableMessage logged here gives corrective information to the administrator.
      logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
          getClass().getSimpleName(), stackTraceToSingleLineString(e));
      logUnexpectedException(e);
      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
      throw new RuntimeException(e);
    }
    finally
    {
      removeCursors(DN.NULL_DN);
      nextChangeForInsertDBCursor.close();
      nextChangeForInsertDBCursor = null;
    }
  }
  /**
   * Nothing can be done about it.
   * <p>
   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
   * alert.
   * <p>
   * Message logged here gives corrective information to the administrator.
   */
  private void logUnexpectedException(Exception e)
  {
    logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
        getClass().getSimpleName(), stackTraceToSingleLineString(e));
  }
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    boolean callNextOnCursor = true;
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -642,133 +550,22 @@
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
            pair = getCursor(mcBaseDN, mcCSN.getServerId());
        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
        if (iter != null && !iter.hasNext())
        {
          /*
           * replica is not back online, Medium consistency point has gone past
           * its last offline time, and there are no more changes after the
           * offline CSN in the cursor: remove everything known about it:
           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
           * this replica from the medium consistency RUV.
           */
          iter.remove();
          StaticUtils.close(pair.getFirst());
          resetNextChangeForInsertDBCursor();
          callNextOnCursor = false;
          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
        }
        /*
         * replica is not back online, Medium consistency point has gone past
         * its last offline time, and there are no more changes after the
         * offline CSN in the cursor: remove everything known about it:
         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
         * this replica from the medium consistency RUV.
         */
        // TODO JNR how to close cursor for offline replica?
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }
    if (callNextOnCursor)
    {
      // advance the cursor we just read from,
      // success/failure will be checked later
      nextChangeForInsertDBCursor.next();
    }
  }
  private void removeCursors(DN baseDN)
  {
    if (nextChangeForInsertDBCursor != null)
    {
      nextChangeForInsertDBCursor.close();
      nextChangeForInsertDBCursor = null;
    }
    if (DN.NULL_DN.equals(baseDN))
    {
      // close all cursors
      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
      {
        StaticUtils.close(map.values());
      }
      allCursors.clear();
      newCursors.clear();
    }
    else
    {
      // close cursors for this DN
      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
      if (map != null)
      {
        StaticUtils.close(map.values());
      }
      for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
      {
        if (it.next().getFirst().equals(baseDN))
        {
          it.remove();
        }
      }
    }
  }
  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
      getCursor(final DN baseDN, final int serverId) throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
    {
      if (baseDN.equals(entry1.getKey()))
      {
        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
            entry1.getValue().entrySet().iterator(); iter.hasNext();)
        {
          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
          if (serverId == entry2.getKey())
          {
            return Pair.of(entry2.getValue(), iter);
          }
        }
      }
    }
    return Pair.empty();
  }
  private boolean recycleExhaustedCursors() throws ChangelogException
  {
    boolean succesfullyRecycled = false;
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    {
      for (DBCursor<UpdateMsg> cursor : map.values())
      {
        // try to recycle it by calling next()
        if (cursor.getRecord() == null && cursor.next())
        {
          succesfullyRecycled = true;
        }
      }
    }
    return succesfullyRecycled;
  }
  private boolean createNewCursors() throws ChangelogException
  {
    if (!newCursors.isEmpty())
    {
      boolean newCursorAdded = false;
      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
          newCursors.entrySet().iterator(); iter.hasNext();)
      {
        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
        final DN baseDN = entry.getKey().getFirst();
        final CSN csn = entry.getValue();
        // start after preceding CSN so the first CSN read will exactly be the
        // current one
        final CSN startFromCSN = getPrecedingCSN(csn);
        if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
        {
          newCursorAdded = true;
        }
        iter.remove();
      }
      return newCursorAdded;
    }
    return false;
    // advance the cursor we just read from,
    // success/failure will be checked later
    nextChangeForInsertDBCursor.next();
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
 * @param <Data>
 *          The type of data associated with each cursor
 */
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
  private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
   */
  private byte state = UNINITIALIZED;
  /** Whether this composite should try to recycle exhausted cursors. */
  private final boolean recycleExhaustedCursors;
  /**
   * These cursors are considered exhausted because they had no new changes the
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
  /**
   * The cursors are sorted based on the current change of each cursor to
   * consider the next change across all available cursors.
   * <p>
   * New cursors for this Map must be created from the same thread that will
   * make use of them. When this rule is not obeyed, a JE exception will be
   * thrown about
   * "Non-transactional Cursors may not be used in multiple threads;".
   */
  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
  private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
      new TreeMap<DBCursor<UpdateMsg>, Data>(
          new Comparator<DBCursor<UpdateMsg>>()
          {
@@ -81,25 +84,6 @@
            }
          });
  /**
   * Builds a CompositeDBCursor using the provided collection of cursors.
   *
   * @param cursors
   *          the cursors that will be iterated upon.
   * @param recycleExhaustedCursors
   *          whether a call to {@link #next()} tries to recycle exhausted
   *          cursors
   */
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
      boolean recycleExhaustedCursors)
  {
    this.recycleExhaustedCursors = recycleExhaustedCursors;
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      put(entry);
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
@@ -108,51 +92,80 @@
    {
      return false;
    }
    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
    // If previous state was ready, then we must advance the first cursor
    // (which UpdateMsg has been consumed).
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove the first cursor, then add it again after moving it forward.
    final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance =
        state != UNINITIALIZED ? cursors.pollFirstEntry() : null;
    state = READY;
    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
    recycleExhaustedCursors();
    if (cursorToAdvance != null)
    {
      // try to recycle empty cursors in case the underlying ReplicaDBs received
      // new changes.
      addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
    }
    removeNoLongerNeededCursors();
    incorporateNewCursors();
    return !cursors.isEmpty();
  }
  private void recycleExhaustedCursors() throws ChangelogException
  {
    if (!exhaustedCursors.isEmpty())
    {
      // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      exhaustedCursors.clear();
      for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
      {
        entry.getKey().next();
        put(entry);
      }
      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
      {
        // if the first cursor was previously an exhausted cursor,
        // then we have already called next() on it.
        // Avoid calling it again because we know new changes have been found.
        return true;
        addCursor(entry.getKey(), entry.getValue());
      }
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and add again the cursor after moving it forward.
    if (advanceNonExhaustedCursors)
    {
      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
      if (firstEntry != null)
      {
        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
        cursor.next();
        put(firstEntry);
      }
    }
    // no cursors are left with changes.
    return !cursors.isEmpty();
  }
  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
  private void removeNoLongerNeededCursors()
  {
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    final Data data = entry.getValue();
    if (cursor.getRecord() != null)
    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
    {
      final Data dataToFind = iter.next();
      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
          cursors.entrySet().iterator(); cursorIter.hasNext();)
      {
        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
        if (dataToFind.equals(entry.getValue()))
        {
          entry.getKey().close();
          cursorIter.remove();
        }
      }
      iter.remove();
    }
  }
  /**
   * Returns an Iterator over the data associated to cursors that must be removed.
   *
   * @return an Iterator over the data associated to cursors that must be removed.
   */
  protected abstract Iterator<Data> removedCursorsIterator();
  /**
   * Adds a cursor to this composite cursor. It first calls
   * {@link DBCursor#next()} to verify whether it is exhausted or not.
   *
   * @param cursor
   *          the cursor to add to this composite
   * @param data
   *          the data associated to the provided cursor
   * @throws ChangelogException
   *           if a database problem occurred
   */
  protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
  {
    if (cursor.next())
    {
      this.cursors.put(cursor, data);
    }
@@ -166,6 +179,8 @@
  @Override
  public UpdateMsg getRecord()
  {
    // Cannot call incorporateNewCursors() here because
    // somebody might have already called DBCursor.getRecord() and read the record
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
@@ -175,6 +190,16 @@
  }
  /**
   * Called when implementors should incorporate new cursors into the current
   * composite DBCursor. Implementors should call
   * {@link #addCursor(DBCursor, Object)} to do so.
   *
   * @throws ChangelogException
   *           if a database problem occurred
   */
  protected abstract void incorporateNewCursors() throws ChangelogException;
  /**
   * Returns the data associated to the cursor that returned the current record.
   *
   * @return the data associated to the cursor that returned the current record.
@@ -193,8 +218,11 @@
  @Override
  public void close()
  {
    state = CLOSED;
    StaticUtils.close(cursors.keySet());
    StaticUtils.close(exhaustedCursors.keySet());
    cursors.clear();
    exhaustedCursors.clear();
  }
  /** {@inheritDoc} */
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
New file
@@ -0,0 +1,127 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
/**
 * Cursor iterating over a replication domain's replica DBs.
 */
public class DomainDBCursor extends CompositeDBCursor<Void>
{
  private final DN baseDN;
  private final ReplicationDomainDB domainDB;
  private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
      new ConcurrentSkipListMap<Integer, CSN>();
  /**
   * Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
   */
  private static final CSN NULL_CSN = new CSN(0, 0, 0);
  /**
   * Builds a DomainDBCursor instance.
   *
   * @param baseDN
   *          the replication domain baseDN of this cursor
   * @param domainDB
   *          the DB for the provided replication domain
   */
  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
  {
    this.baseDN = baseDN;
    this.domainDB = domainDB;
  }
  /**
   * Returns the replication domain baseDN of this cursor.
   *
   * @return the replication domain baseDN of this cursor.
   */
  public DN getBaseDN()
  {
    return baseDN;
  }
  /**
   * Adds a replicaDB for this cursor to iterate over. Added cursors will be
   * created and iterated over on the next call to {@link #next()}.
   *
   * @param serverId
   *          the serverId of the replica
   * @param startAfterCSN
   *          the CSN after which to start iterating
   */
  public void addReplicaDB(int serverId, CSN startAfterCSN)
  {
    // only keep the oldest CSN that will be the new cursor's starting point
    newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
  }
  /** {@inheritDoc} */
  @Override
  protected void incorporateNewCursors() throws ChangelogException
  {
    for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();)
    {
      final Entry<Integer, CSN> pair = iter.next();
      final int serverId = pair.getKey();
      final CSN csn = pair.getValue();
      final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
      addCursor(cursor, null);
      iter.remove();
    }
  }
  /** {@inheritDoc} */
  @Override
  @SuppressWarnings("unchecked")
  protected Iterator<Void> removedCursorsIterator()
  {
    return Collections.EMPTY_LIST.iterator(); // nothing to remove
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    super.close();
    domainDB.unregisterCursor(this);
    newReplicas.clear();
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -40,6 +41,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -72,13 +74,20 @@
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
   * When creating a replicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private ReplicationServerCfg config;
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  /**
   * \@GuardedBy("itself")
   */
  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private ReplicationDbEnv replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
  /**
@@ -103,9 +112,9 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new DBCursor<UpdateMsg>()
  {
@@ -135,7 +144,7 @@
  };
  /**
   * Builds an instance of this class.
   * Creates a new changelog DB.
   *
   * @param replicationServer
   *          the local replication server.
@@ -144,15 +153,15 @@
   * @throws ConfigException
   *           if a problem occurs opening the supplied directory
   */
  public JEChangelogDB(ReplicationServer replicationServer,
      ReplicationServerCfg config) throws ConfigException
  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
      throws ConfigException
  {
    this.config = config;
    this.replicationServer = replicationServer;
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
  private File makeDir(String dbDirName) throws ConfigException
  private File makeDir(final String dbDirName) throws ConfigException
  {
    // Check that this path exists or create it.
    final File dbDirectory = getFileForPath(dbDirName);
@@ -168,15 +177,13 @@
    {
      logger.traceException(e);
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
      mb.append(e.getLocalizedMessage());
      mb.append(" ");
      mb.append(dbDirectory);
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e);
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
    }
  }
  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
  {
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
@@ -186,29 +193,12 @@
    return Collections.emptyMap();
  }
  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
  {
    return getDomainMap(baseDN).get(serverId);
  }
  /**
   * Provision resources for the specified serverId in the specified replication
   * domain.
   *
   * @param baseDN
   *          the replication domain where to add the serverId
   * @param serverId
   *          the server Id to add to the replication domain
   * @throws ChangelogException
   *           If a database error happened.
   */
  private void commission(DN baseDN, int serverId, ReplicationServer rs)
      throws ChangelogException
  {
    getOrCreateReplicaDB(baseDN, serverId, rs);
  }
  /**
   * Returns a {@link JEReplicaDB}, possibly creating it.
   *
   * @param baseDN
@@ -217,35 +207,42 @@
   *          the serverId for which to create a ReplicaDB
   * @param server
   *          the ReplicationServer
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
   *         to be created
   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
   * @throws ChangelogException
   *           if a problem occurred with the database
   */
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer server) throws ChangelogException
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
      final ReplicationServer server) throws ChangelogException
  {
    while (!shutdown.get())
    {
      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
          getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result =
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      if (result != null)
      {
        final Boolean dbWasCreated = result.getSecond();
        if (dbWasCreated)
        { // new replicaDB => update all cursors with it
          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
          if (cursors != null && !cursors.isEmpty())
          {
            for (DomainDBCursor cursor : cursors)
            {
              cursor.addReplicaDB(serverId, null);
            }
          }
        }
        return result;
      }
    }
    throw new ChangelogException(
        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
  }
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
      DN baseDN)
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
  {
    // happy path: the domainMap already exists
    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
        domainToReplicaDBs.get(baseDN);
    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
    if (currentValue != null)
    {
      return currentValue;
@@ -254,30 +251,36 @@
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    final ConcurrentMap<Integer, JEReplicaDB> newValue =
        new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
    {
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    if (MultimasterReplication.isECLEnabledDomain(baseDN))
    {
      // we just created a new domain => update all cursors
      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
      {
        cursor.addDomain(baseDN, null);
      }
    }
    return newValue;
  }
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
      DN baseDN, ReplicationServer server) throws ChangelogException
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
  {
    // happy path: the JEReplicaDB already exists
    // happy path: the replicaDB already exists
    JEReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
    // unlucky, the replicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
@@ -293,11 +296,11 @@
        // The domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
        // 1) shutdown properly or 2) lazily recreate the replicaDB
        return null;
      }
      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
      domainMap.put(serverId, newDB);
      return Pair.of(newDB, true);
    }
@@ -310,8 +313,8 @@
    try
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.getChangelogState();
      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = replicationEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
@@ -338,12 +341,12 @@
    {
      for (int serverId : entry.getValue())
      {
        commission(entry.getKey(), serverId, replicationServer);
        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
      }
    }
  }
  private void shutdownCNIndexDB() throws ChangelogException
  private void shutdownChangeNumberIndexDB() throws ChangelogException
  {
    synchronized (cnIndexDBLock)
    {
@@ -381,7 +384,7 @@
    try
    {
      shutdownCNIndexDB();
      shutdownChangeNumberIndexDB();
    }
    catch (ChangelogException e)
    {
@@ -402,7 +405,7 @@
      }
    }
    if (dbEnv != null)
    if (replicationEnv != null)
    {
      // wait for shutdown of the threads holding cursors
      try
@@ -421,7 +424,7 @@
        // do nothing: we are already shutting down
      }
      dbEnv.shutdown();
      replicationEnv.shutdown();
    }
    if (firstException != null)
@@ -431,11 +434,10 @@
  }
  /**
   * Clears all content from the changelog database, but leaves its directory on
   * the filesystem.
   * Clears all records from the changelog (does not remove the changelog itself).
   *
   * @throws ChangelogException
   *           If a database problem happened
   *           If an error occurs when clearing the changelog.
   */
  public void clearDB() throws ChangelogException
  {
@@ -469,7 +471,7 @@
        try
        {
          shutdownCNIndexDB();
          shutdownChangeNumberIndexDB();
        }
        catch (ChangelogException e)
        {
@@ -584,7 +586,7 @@
    // 3- clear the changelogstate DB
    try
    {
      dbEnv.clearGenerationId(baseDN);
      replicationEnv.clearGenerationId(baseDN);
    }
    catch (ChangelogException e)
    {
@@ -635,7 +637,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(dbEnv.getChangelogState());
      startIndexer(replicationEnv.getChangelogState());
    }
    else
    {
@@ -673,7 +675,7 @@
      {
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
        }
        catch (Exception e)
        {
@@ -694,40 +696,57 @@
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
      throws ChangelogException
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      replicaDBCursor.next();
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
    }
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
    return cursor;
  }
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
      throws ChangelogException
  {
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      for (CSN offlineCSN : domainState)
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
        {
          return offlineCSN;
        }
        cursors = new ArrayList<DomainDBCursor>();
        registeredDomainCursors.put(baseDN, cursors);
      }
      cursors.add(cursor);
      return cursor;
    }
  }
  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
  {
    final MultiDomainServerState offlineReplicas =
        replicationEnv.getChangelogState().getOfflineReplicas();
    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
    if (offlineCSN != null
        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
    {
      return offlineCSN;
    }
    return null;
  }
@@ -737,31 +756,57 @@
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
      throws ChangelogException
  {
    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      return replicaDB.generateCursorFrom(startAfterCSN);
      final DBCursor<UpdateMsg> cursor =
          replicaDB.generateCursorFrom(startAfterCSN);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
      // TODO JNR if (offlineCSN != null) ??
      // What about replicas that suddenly become offline?
      return new ReplicaOfflineCursor(cursor, offlineCSN);
    }
    return EMPTY_CURSOR;
    return EMPTY_CURSOR_REPLICA_DB;
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  public void unregisterCursor(final DBCursor<?> cursor)
  {
    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
        updateMsg.getCSN().getServerId(), replicationServer);
    final JEReplicaDB replicaDB = pair.getFirst();
    final boolean wasCreated = pair.getSecond();
    if (cursor instanceof MultiDomainDBCursor)
    {
      registeredMultiDomainCursors.remove(cursor);
    }
    else if (cursor instanceof DomainDBCursor)
    {
      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
      synchronized (registeredMultiDomainCursors)
      {
        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
        csn.getServerId(), replicationServer);
    final JEReplicaDB replicaDB = pair.getFirst();
    replicaDB.add(updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return wasCreated;
    return pair.getSecond(); // replica DB was created
  }
  /** {@inheritDoc} */
@@ -779,7 +824,7 @@
  @Override
  public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  {
    dbEnv.addOfflineReplica(baseDN, offlineCSN);
    replicationEnv.addOfflineReplica(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
New file
@@ -0,0 +1,123 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
/**
 * Cursor iterating over a all the replication domain known to the changelog DB.
 */
public class MultiDomainDBCursor extends CompositeDBCursor<DN>
{
  private final ReplicationDomainDB domainDB;
  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
      new ConcurrentSkipListMap<DN, ServerState>();
  private final ConcurrentSkipListSet<DN> removeDomains =
      new ConcurrentSkipListSet<DN>();
  /**
   * Builds a MultiDomainDBCursor instance.
   *
   * @param domainDB
   *          the replication domain management DB
   */
  public MultiDomainDBCursor(ReplicationDomainDB domainDB)
  {
    this.domainDB = domainDB;
  }
  /**
   * Adds a replication domain for this cursor to iterate over. Added cursors
   * will be created and iterated over on the next call to {@link #next()}.
   *
   * @param baseDN
   *          the replication domain's baseDN
   * @param startAfterState
   *          the {@link ServerState} after which to start iterating
   */
  public void addDomain(DN baseDN, ServerState startAfterState)
  {
    newDomains.put(baseDN,
        startAfterState != null ? startAfterState : new ServerState());
  }
  /** {@inheritDoc} */
  @Override
  protected void incorporateNewCursors() throws ChangelogException
  {
    for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator();
         iter.hasNext();)
    {
      final Entry<DN, ServerState> entry = iter.next();
      final DN baseDN = entry.getKey();
      final ServerState serverState = entry.getValue();
      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
      addCursor(domainDBCursor, baseDN);
      iter.remove();
    }
  }
  /**
   * Removes a replication domain from this cursor and stops iterating over it.
   * Removed cursors will be effectively removed on the next call to
   * {@link #next()}.
   *
   * @param baseDN
   *          the replication domain's baseDN
   */
  public void removeDomain(DN baseDN)
  {
    removeDomains.add(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  protected Iterator<DN> removedCursorsIterator()
  {
    return removeDomains.iterator();
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    super.close();
    domainDB.unregisterCursor(this);
    newDomains.clear();
    removeDomains.clear();
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -95,6 +95,16 @@
public class ExternalChangeLogTest extends ReplicationTestCase
{
  private static class Results
  {
    public final List<SearchResultEntryProtocolOp> searchResultEntries =
        new ArrayList<SearchResultEntryProtocolOp>();
    public long searchReferences;
    public long searchesDone;
  }
  private static final int SERVER_ID_1 = 1201;
  private static final int SERVER_ID_2 = 1202;
@@ -188,14 +198,15 @@
  @Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
  public void TestWithAndWithoutControl() throws Exception
  {
    final String tn = "TestWithAndWithoutControl";
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
    ECLCompatWriteReadAllOps(1, tn);
    ECLCompatNoControl(1);
    // Write additional changes and read ECL from a provided change number
    ECLCompatWriteReadAllOps(5);
    ECLCompatWriteReadAllOps(5, tn);
  }
  @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
@@ -293,12 +304,13 @@
  @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
  public void ECLReplicationServerFullTest15() throws Exception
  {
    final String tn = "ECLReplicationServerFullTest15";
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // Write 4 changes and read ECL from start
    ECLCompatWriteReadAllOps(1);
    ECLCompatWriteReadAllOps(1, tn);
    // Write 4 additional changes and read ECL from a provided change number
    CSN csn = ECLCompatWriteReadAllOps(5);
    CSN csn = ECLCompatWriteReadAllOps(5, tn);
    // Test request from a provided change number - read 6
    ECLCompatReadFrom(6, csn);
@@ -895,15 +907,12 @@
      final CSN[] csns = generateCSNs(3, SERVER_ID_1);
      publishDeleteMsgInOTest(server01, csns[0], testName, 1);
      Thread.sleep(1000);
      // Test that last cookie has been updated
      String cookieNotEmpty = readLastCookie();
      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
      final String firstCookie = assertLastCookieDifferentThanLastValue("");
      String lastCookie = firstCookie;
      publishDeleteMsgInOTest(server01, csns[1], testName, 2);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      publishDeleteMsgInOTest(server01, csns[2], testName, 3);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      // ---
      // 2. Now set up a very short purge delay on the replication changelogs
@@ -930,7 +939,7 @@
      //    returns the appropriate error.
      debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
      debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
      searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
          ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
          searchOp.getErrorMessage().toString());
@@ -962,26 +971,21 @@
      final CSN[] csns = generateCSNs(3, SERVER_ID_1);
      publishDeleteMsgInOTest(server01, csns[0], testName, 1);
      Thread.sleep(1000);
      // Test that last cookie has been updated
      String cookieNotEmpty = readLastCookie();
      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
      final String firstCookie = assertLastCookieDifferentThanLastValue("");
      String lastCookie = firstCookie;
      publishDeleteMsgInOTest(server01, csns[1], testName, 2);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      publishDeleteMsgInOTest(server01, csns[2], testName, 3);
      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
      // ---
      // 2. Now remove the domain by sending a reset message
      ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
      server01.publish(msg);
      server01.publish(new ResetGenerationIdMsg(23657));
      // ---
      // 3. Assert that a request with an empty cookie returns nothing
      // since replication changelog has been cleared
      String cookie= "";
      InternalSearchOperation searchOp = null;
      searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
      // ---
@@ -989,7 +993,7 @@
      // since replication changelog has been cleared
      cookie = readLastCookie();
      debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
      searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
      // ---
      // 5. Assert that a request with an "old" cookie - one that refers to
@@ -997,7 +1001,8 @@
      //    returns the appropriate error.
      debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
      debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
      final InternalSearchOperation searchOp =
          searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
      assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
    }
    finally
@@ -1007,6 +1012,23 @@
    debugInfo(testName, "Ending test successfully");
  }
  private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
  {
    int cnt = 0;
    while (cnt < 100)
    {
      final String newCookie = readLastCookie();
      if (!newCookie.equals(lastCookie))
      {
        return newCookie;
      }
      cnt++;
      Thread.sleep(10);
    }
    Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
    return null;// dead code
  }
  private void debugAndWriteEntries(LDIFWriter ldifWriter,
      List<SearchResultEntry> entries, String tn) throws Exception
  {
@@ -1074,10 +1096,11 @@
      // Publish ADD
      csnCounter++;
      String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n" + "objectClass: domain\n"
          + "entryUUID: "+user1entryUUID+"\n";
      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: domain\n"
          + "entryUUID: " + user1entryUUID + "\n");
      AddMsg addMsg = new AddMsg(
          csns[csnCounter],
          DN.valueOf("uid="+tn+"2," + TEST_ROOT_DN_STRING),
@@ -1412,49 +1435,27 @@
      InvocationCounterPlugin.resetAllCounters();
      long searchEntries;
      long searchReferences = ldapStatistics.getSearchResultReferences();
      long searchesDone     = ldapStatistics.getSearchResultsDone();
      final Results results = new Results();
      results.searchReferences = ldapStatistics.getSearchResultReferences();
      results.searchesDone     = ldapStatistics.getSearchResultsDone();
      debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
      LDAPMessage message = new LDAPMessage(2, searchRequest, controls);
      w.writeMessage(message);
      w.writeMessage(new LDAPMessage(2, searchRequest, controls));
      Thread.sleep(500);
      if (!changesOnly)
      {
        // Wait for change 1
        debugInfo(tn, "Waiting for init search expected to return change 1");
        searchEntries = 0;
        readMessages(tn, r, results, 1, "Init search Result=");
        for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
        {
          while (searchEntries < 1 && (message = r.readMessage()) != null)
          {
            debugInfo(tn, "Init search Result=" +
                message.getProtocolOpType() + message + " " + searchEntries);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              SearchResultEntryProtocolOp searchResultEntry =
                  message.getSearchResultEntryProtocolOp();
              searchEntries++;
              // FIXME:ECL Double check 1 is really the valid value here.
              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
                  (compatMode?"1":"0"));
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              searchReferences++;
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
          }
          // FIXME:ECL Double check 1 is really the valid value here.
          final String cn = compatMode ? "1" : "0";
          checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
        }
        debugInfo(tn, "INIT search done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
      }
      // Produces change 2
@@ -1470,30 +1471,8 @@
      " published , psearch will now wait for new entries");
      // wait for the 1 new entry
      searchEntries = 0;
      SearchResultEntryProtocolOp searchResultEntry = null;
      while (searchEntries < 1 && (message = r.readMessage()) != null)
      {
        debugInfo(tn, "psearch search  Result=" +
            message.getProtocolOpType() + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchResultEntry = message.getSearchResultEntryProtocolOp();
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          searchReferences++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r, results, 1, "psearch search  Result=");
      SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
      Thread.sleep(1000);
      // Check we received change 2
@@ -1523,11 +1502,12 @@
            createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
        debugInfo(tn, "ACI test : sending search");
        message = new LDAPMessage(2, searchRequest, createCookieControl(""));
        w.writeMessage(message);
        w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
        searchesDone=0;
        searchEntries = 0;
        LDAPMessage message;
        int searchesDone = 0;
        int searchEntries = 0;
        int searchReferences = 0;
        while ((searchesDone==0) && (message = r.readMessage()) != null)
        {
          debugInfo(tn, "ACI test : message returned " +
@@ -1719,125 +1699,53 @@
      InvocationCounterPlugin.resetAllCounters();
      ldapStatistics.getSearchRequests();
      long searchEntries    = ldapStatistics.getSearchResultEntries();
      ldapStatistics.getSearchResultReferences();
      long searchesDone     = ldapStatistics.getSearchResultsDone();
      final Results results = new Results();
      results.searchesDone = ldapStatistics.getSearchResultsDone();
      LDAPMessage message;
      message = new LDAPMessage(2, searchRequest1, controls);
      w1.writeMessage(message);
      w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
      Thread.sleep(500);
      message = new LDAPMessage(2, searchRequest2, controls);
      w2.writeMessage(message);
      w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
      Thread.sleep(500);
      message = new LDAPMessage(2, searchRequest3, controls);
      w3.writeMessage(message);
      w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
      Thread.sleep(500);
      if (!changesOnly)
      {
        debugInfo(tn, "Search1  Persistent filter=" + searchRequest1.getFilter()
                  + " expected to return change " + csn1);
        searchEntries = 0;
        message = null;
        {
          while (searchEntries < 1 && (message = r1.readMessage()) != null)
          readMessages(tn, r1, results, 1, "Search1 Result=");
          final int searchEntries = results.searchResultEntries.size();
          if (searchEntries == 1)
          {
            debugInfo(tn, "Search1 Result=" +
                message.getProtocolOpType() + " " + message);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              SearchResultEntryProtocolOp searchResultEntry =
                  message.getSearchResultEntryProtocolOp();
              searchEntries++;
              if (searchEntries==1)
              {
                checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
                checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
                    (compatMode?"10":"0"));
              }
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
            final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
            final String cn = compatMode ? "10" : "0";
            checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
          }
          debugInfo(tn, "Search1 done with success. searchEntries="
              + searchEntries + " #searchesDone=" + results.searchesDone);
        }
        debugInfo(tn, "Search1 done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
        searchEntries = 0;
        message = null;
        {
          debugInfo(tn, "Search 2  Persistent filter=" + searchRequest2.getFilter()
              + " expected to return change " + csn2 + " & " + csn3);
          while (searchEntries < 2 && (message = r2.readMessage()) != null)
          readMessages(tn, r2, results, 2, "Search 2 Result=");
          for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
          {
            debugInfo(tn, "Search 2 Result=" +
                message.getProtocolOpType() + message);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              SearchResultEntryProtocolOp searchResultEntry =
                  message.getSearchResultEntryProtocolOp();
              searchEntries++;
              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
                  (compatMode?"10":"0"));
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
            final String cn = compatMode ? "10" : "0";
            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
          }
          debugInfo(tn, "Search2 done with success. searchEntries="
              + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
        }
        debugInfo(tn, "Search2 done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
        searchEntries = 0;
        message = null;
        {
          debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
              + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
          while (searchEntries < 4 && (message = r3.readMessage()) != null)
          {
            debugInfo(tn, "Search3 Result=" +
                message.getProtocolOpType() + " " + message);
            switch (message.getProtocolOpType())
            {
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
              searchEntries++;
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
              break;
            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
              assertSuccessful(message);
              searchesDone++;
              break;
            }
          }
        }
        debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
            + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
        readMessages(tn, r3, results, 4, "Search3 Result=");
        debugInfo(tn, "Search3 done with success. searchEntries="
            + searchEntries + " #searchesDone="+ searchesDone);
            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
      }
      // Produces additional change
@@ -1871,82 +1779,19 @@
      debugInfo(tn, delMsg13.getCSN()  + " published additionally ");
      // wait 11
      searchEntries = 0;
      message = null;
      while (searchEntries < 1 && (message = r1.readMessage()) != null)
      {
        debugInfo(tn, "Search 11 Result=" +
            message.getProtocolOpType() + " " + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r1, results, 1, "Search 11 Result=");
      Thread.sleep(1000);
      debugInfo(tn, "Search 1 successfully receives additional changes");
      // wait 12 & 13
      searchEntries = 0;
      message = null;
      while (searchEntries < 2 && (message = r2.readMessage()) != null)
      {
        debugInfo(tn, "psearch search 12 Result=" +
            message.getProtocolOpType() + " " + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r2, results, 2, "psearch search 12 Result=");
      Thread.sleep(1000);
      debugInfo(tn, "Search 2 successfully receives additional changes");
      // wait 11 & 12 & 13
      searchEntries = 0;
      SearchResultEntryProtocolOp searchResultEntry = null;
      message = null;
      while (searchEntries < 3 && (message = r3.readMessage()) != null)
      {
        debugInfo(tn, "psearch search 13 Result=" +
            message.getProtocolOpType() + " " + message);
        switch (message.getProtocolOpType())
        {
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
          searchResultEntry = message.getSearchResultEntryProtocolOp();
          searchEntries++;
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
          break;
        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
          assertSuccessful(message);
//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
          searchesDone++;
          break;
        }
      }
      readMessages(tn, r3, results, 3, "psearch search 13 Result=");
      SearchResultEntryProtocolOp searchResultEntry =
          results.searchResultEntries.get(results.searchResultEntries.size() - 1);
      Thread.sleep(1000);
      // Check we received change 13
@@ -1961,6 +1806,35 @@
    debugInfo(tn, "Ends test successfully");
  }
  private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
      final Results results, final int i, final String string) throws Exception
  {
    results.searchResultEntries.clear();
    LDAPMessage message;
    while (results.searchResultEntries.size() < i
        && (message = r.readMessage()) != null)
    {
      debugInfo(tn, string + message.getProtocolOpType() + " " + message);
      switch (message.getProtocolOpType())
      {
      case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
        results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
        break;
      case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
        results.searchReferences++;
        break;
      case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
        assertSuccessful(message);
        results.searchesDone++;
        break;
      }
    }
  }
  private void assertSuccessful(LDAPMessage message)
  {
    SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
@@ -2007,10 +1881,9 @@
      new BindRequestProtocolOp(
          ByteString.valueOf(bindDN),
          3, ByteString.valueOf(password));
    LDAPMessage message = new LDAPMessage(1, bindRequest);
    w.writeMessage(message);
    w.writeMessage(new LDAPMessage(1, bindRequest));
    message = r.readMessage();
    final LDAPMessage message = r.readMessage();
    BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
//  assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
    assertEquals(bindResponse.getResultCode(), expected);
@@ -2204,9 +2077,9 @@
    debugInfo(tn, "Ending test successfully");
  }
  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
  {
    String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
    String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
    debugInfo(tn, "Starting test\n\n");
    LDAPReplicationDomain domain = null;
    try
@@ -2224,17 +2097,16 @@
      CSN[] csns = generateCSNs(4, SERVER_ID_1);
      // Publish DEL
      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getCSN());
      // Publish ADD
      String lentry =
          "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
      Entry entry = TestCaseUtils.entryFromLdifString(
          "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
          + "objectClass: top\n"
          + "objectClass: domain\n"
          + "entryUUID: "+user1entryUUID+"\n";
      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
          + "entryUUID: " + user1entryUUID + "\n");
      AddMsg addMsg = new AddMsg(
          csns[1],
          entry.getName(),
@@ -2247,7 +2119,7 @@
      debugInfo(tn, " publishes " + addMsg.getCSN());
      // Publish MOD
      DN baseDN = DN.valueOf("uid="+tn+"3," + TEST_ROOT_DN_STRING);
      DN baseDN = DN.valueOf("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
      List<Modification> mods = createMods("description", "new value");
      ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
      server01.publish(modMsg);
@@ -2255,7 +2127,7 @@
      // Publish modDN
      ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
          DN.valueOf("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
          DN.valueOf("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
          RDN.decode("uid="+tn+"new4"), // new rdn
          true,  // deleteoldrdn
          TEST_ROOT_DN2); // new superior
@@ -2265,8 +2137,8 @@
      server01.publish(modDNMsg);
      debugInfo(tn, " publishes " + modDNMsg.getCSN());
      String filter = "(targetdn=*" + tn + "*,o=test)";
      InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
      InternalSearchOperation searchOp =
          searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
      // test 4 entries returned
      final LDIFWriter ldifWriter = getLDIFWriter();
@@ -2276,7 +2148,7 @@
      stop(server01);
      // Test with filter on change number
      filter =
      String filter =
          "(&(targetdn=*" + tn + "*,o=test)"
            + "(&(changenumber>=" + firstChangeNumber + ")"
              + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
@@ -2339,7 +2211,7 @@
      long firstChangeNumber, int i, String tn, CSN csn)
  {
    final long changeNumber = firstChangeNumber + i;
    final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING;
    final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
    assertDNEquals(resultEntry, changeNumber);
    checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
@@ -2352,9 +2224,11 @@
  private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
  {
    String actualDN = resultEntry.getName().toNormalizedString();
    String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
    final String actualDN = resultEntry.getName().toNormalizedString();
    final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
    assertThat(actualDN)
        .as("Unexpected DN for entry " + resultEntry)
        .isEqualToIgnoringCase(expectedDN);
  }
  private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
@@ -2548,7 +2422,7 @@
    while (!cnIndexDB.isEmpty())
    {
      debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
      Thread.sleep(200);
      Thread.sleep(10);
    }
    debugInfo(tn, "Ending test with success");
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -128,7 +128,11 @@
  private ChangeNumberIndexDB cnIndexDB;
  @Mock
  private ReplicationDomainDB domainDB;
  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
  private List<DN> eclEnabledDomains;
  private MultiDomainDBCursor multiDomainCursor;
  private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors;
  private Map<DN, DomainDBCursor> domainDBCursors;
  private ChangelogState initialState;
  private Map<DN, ServerState> domainNewestCSNs;
  private ChangeNumberIndexer cnIndexer;
@@ -153,13 +157,18 @@
  public void setup() throws Exception
  {
    MockitoAnnotations.initMocks(this);
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    multiDomainCursor = new MultiDomainDBCursor(domainDB);
    initialState = new ChangelogState();
    initialCookie = new MultiDomainServerState();
    cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
    replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
    domainDBCursors = new HashMap<DN, DomainDBCursor>();
    domainNewestCSNs = new HashMap<DN, ServerState>();
    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
    when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
        multiDomainCursor);
  }
  @AfterMethod
@@ -173,15 +182,17 @@
  @Test
  public void emptyDBNoDS() throws Exception
  {
    startCNIndexer(BASE_DN1);
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneDS() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -192,10 +203,11 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBOneDS() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
    setCNIndexDBInitialRecords(msg1);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
@@ -206,9 +218,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSs() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    // simulate messages received out of order
@@ -224,9 +237,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsDifferentDomains() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
    startCNIndexer(BASE_DN1, BASE_DN2);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -259,8 +273,9 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -287,12 +302,13 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoDSs() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    setCNIndexDBInitialRecords(msg1, msg2);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
@@ -312,9 +328,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
@@ -329,10 +346,11 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN1, serverId2);
    addReplica(BASE_DN1, serverId3);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    // cn=admin data will does not participate in the external changelog
@@ -350,8 +368,9 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -371,8 +390,9 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -390,9 +410,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -407,9 +428,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneGoingOffline() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -440,10 +462,11 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -473,12 +496,13 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    // blocked until we receive info for serverId2
    assertExternalChangelogContent();
@@ -517,13 +541,14 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg2, msg3);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    // MCP moves forward because serverId1 is not really offline
@@ -540,9 +565,10 @@
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneKilled() throws Exception
  {
    eclEnabledDomains = Arrays.asList(BASE_DN1);
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
    startCNIndexer(BASE_DN1);
    startCNIndexer();
    assertExternalChangelogContent();
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -562,10 +588,26 @@
  private void addReplica(DN baseDN, int serverId) throws Exception
  {
    final SequentialDBCursor cursor = new SequentialDBCursor();
    cursors.put(Pair.of(baseDN, serverId), cursor);
    when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
        .thenReturn(cursor);
    final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
    replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
    if (isECLEnabledDomain2(baseDN))
    {
      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
      if (domainDBCursor == null)
      {
        domainDBCursor = new DomainDBCursor(baseDN, domainDB);
        domainDBCursors.put(baseDN, domainDBCursor);
        multiDomainCursor.addDomain(baseDN, null);
        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
            .thenReturn(domainDBCursor);
      }
      domainDBCursor.addReplicaDB(serverId, null);
      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
          .thenReturn(replicaDBCursor);
    }
    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
        getDomainNewestCSNs(baseDN));
    initialState.addServerIdToDomain(serverId, baseDN);
@@ -582,21 +624,26 @@
    return serverState;
  }
  private void startCNIndexer(DN... eclEnabledDomains)
  private void startCNIndexer()
  {
    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
    {
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      {
        return eclEnabledDomainList.contains(baseDN);
        return isECLEnabledDomain2(baseDN);
      }
    };
    cnIndexer.start();
    waitForWaitingState(cnIndexer);
  }
  private boolean isECLEnabledDomain2(DN baseDN)
  {
    return eclEnabledDomains.contains(baseDN);
  }
  private void stopCNIndexer() throws Exception
  {
    if (cnIndexer != null)
@@ -631,7 +678,8 @@
        final CSN csn = newestMsg.getCSN();
        when(cnIndexDB.getNewestRecord()).thenReturn(
            new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
        final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
        final SequentialDBCursor cursor =
            replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
        cursor.add(newestMsg);
      }
      initialCookie.update(msg.getBaseDN(), msg.getCSN());
@@ -643,7 +691,7 @@
    for (ReplicatedUpdateMsg msg : msgs)
    {
      final SequentialDBCursor cursor =
          cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
          replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
      if (msg.isEmptyCursor())
      {
        cursor.add(null);
@@ -746,11 +794,4 @@
    };
  }
  @Test(dataProvider = "precedingCSNDataProvider")
  public void getPrecedingCSN(CSN start, CSN expected)
  {
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
    assertThat(precedingCSN).isEqualTo(expected);
  }
}
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,8 +25,8 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.HashMap;
import java.util.Map;
import java.util.Collections;
import java.util.Iterator;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -45,6 +45,20 @@
public class CompositeDBCursorTest extends DirectoryServerTestCase
{
  private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String>
  {
    @Override
    protected void incorporateNewCursors() throws ChangelogException
    {
    }
    @Override
    protected Iterator<String> removedCursorsIterator()
    {
      return Collections.EMPTY_LIST.iterator();
    }
  }
  private UpdateMsg msg1;
  private UpdateMsg msg2;
  private UpdateMsg msg3;
@@ -173,8 +187,6 @@
        of(msg4, baseDN1));
  }
  // TODO : this test fails because msg2 is returned twice
  @Test(enabled=false)
  public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -220,16 +232,12 @@
  private CompositeDBCursor<String> newCompositeDBCursor(
      Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
  {
    final Map<DBCursor<UpdateMsg>, String> cursorsMap =
        new HashMap<DBCursor<UpdateMsg>, String>();
    final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor();
    for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
    {
      // The cursors in the composite are expected to be pointing
      // to first record available
      pair.getFirst().next();
      cursorsMap.put(pair.getFirst(), pair.getSecond());
      cursor.addCursor(pair.getFirst(), pair.getSecond());
    }
    return new CompositeDBCursor<String>(cursorsMap, true);
    return cursor;
  }
  private void assertInOrder(final CompositeDBCursor<String> compCursor,