From e91b3e8091cb7c814de5b5af9d1a47fbfa2d4ca0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 18 Aug 2014 15:26:23 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java                             |  123 ++++
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |  115 ++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java                                  |  127 ++++
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java                |  418 +++++----------
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                   |  265 +++++----
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java   |   30 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                            |   37 +
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |  365 +++----------
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java                               |  140 +++--
 9 files changed, 847 insertions(+), 773 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index ad2ceb1..5728e6a 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,10 @@
 package org.opends.server.replication.server.changelog.api;
 
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
 import org.opends.server.types.DN;
 
 /**
@@ -89,6 +91,26 @@
    */
   void removeDomain(DN baseDN) throws ChangelogException;
 
+  /**
+   * Generates a {@link DBCursor} across all the domains starting after the
+   * provided {@link MultiDomainServerState} for each domain.
+   * <p>
+   * When the cursor is not used anymore, client code MUST call the
+   * {@link DBCursor#close()} method to free the resources and locks used by the
+   * cursor.
+   *
+   * @param startAfterState
+   *          Starting point for each domain cursor. If any {@link ServerState}
+   *          for a domain is null, then start from the oldest CSN for each
+   *          replicaDBs
+   * @return a non null {@link DBCursor}
+   * @throws ChangelogException
+   *           If a database problem happened
+   * @see #getCursorFrom(DN, ServerState)
+   */
+  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+      throws ChangelogException;
+
   // serverId methods
 
   /**
@@ -102,16 +124,17 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @param startAfterServerState
+   * @param startAfterState
    *          Starting point for each ReplicaDB cursor. If any CSN for a
    *          replicaDB is null, then start from the oldest CSN for this
    *          replicaDB
    * @return a non null {@link DBCursor}
    * @throws ChangelogException
    *           If a database problem happened
+   * @see #getCursorFrom(DN, int, CSN)
    */
-  DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
-      ServerState startAfterServerState) throws ChangelogException;
+  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+      throws ChangelogException;
 
   /**
    * Generates a {@link DBCursor} for one replicaDB for the specified
@@ -136,6 +159,14 @@
       throws ChangelogException;
 
   /**
+   * Unregisters the provided cursor from this replication domain.
+   *
+   * @param cursor
+   *          the cursor to unregister.
+   */
+  void unregisterCursor(DBCursor<?> cursor);
+
+  /**
    * Publishes the provided change to the changelog DB for the specified
    * serverId and replication domain. After a change has been successfully
    * published, it becomes available to be returned by the External ChangeLog.
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index ea4f86a..9e9b6df 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,15 +25,8 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -42,18 +35,15 @@
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
-import org.opends.server.util.StaticUtils;
-
-import com.forgerock.opendj.util.Pair;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.*;
@@ -82,7 +72,7 @@
   private ChangelogState changelogState;
 
   /*
-   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
+   * The following MultiDomainServerState fields must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
    * updates 2) many updates can happen concurrently.
    */
@@ -128,39 +118,7 @@
    *
    * @NonNull
    */
-  @SuppressWarnings("unchecked")
-  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
-      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
-
-  /**
-   * New cursors for this Map must be created from the {@link #run()} method,
-   * i.e. from the same thread that will make use of them. If this rule is not
-   * obeyed, then a JE exception will be thrown about
-   * "Non-transactional Cursors may not be used in multiple threads;".
-   */
-  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
-      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
-  /**
-   * Holds the newCursors that will have to be created in the next iteration
-   * inside the {@link #run()} method.
-   * <p>
-   * This map can be updated by multiple threads.
-   */
-  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
-      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
-          new Comparator<Pair<DN, Integer>>()
-          {
-            @Override
-            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
-            {
-              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
-              if (compareBaseDN == 0)
-              {
-                return o1.getSecond().compareTo(o2.getSecond());
-              }
-              return compareBaseDN;
-            }
-          });
+  private MultiDomainDBCursor nextChangeForInsertDBCursor;
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -215,11 +173,8 @@
       return;
     }
 
-    final CSN csn = updateMsg.getCSN();
-    // only keep the oldest CSN that will be the new cursor's starting point
-    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
     final CSN oldestCSNBefore = getOldestLastAliveCSN();
-    lastAliveCSNs.update(baseDN, csn);
+    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
     tryNotify(oldestCSNBefore);
   }
 
@@ -364,40 +319,42 @@
     for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
-      if (!isECLEnabledDomain(baseDN))
+      if (isECLEnabledDomain(baseDN))
       {
-        continue;
-      }
+        for (Integer serverId : entry.getValue())
+        {
+          /*
+           * initialize with the oldest possible CSN in order for medium
+           * consistency to wait for all replicas to be alive before moving forward
+           */
+          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
+        }
 
-      for (Integer serverId : entry.getValue())
-      {
-        /*
-         * initialize with the oldest possible CSN in order for medium
-         * consistency to wait for all replicas to be alive before moving
-         * forward
-         */
-        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
-        // start after the actual CSN when initializing from the previous cookie
-        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
-        ensureCursorExists(baseDN, serverId, csn);
+        final ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+        lastAliveCSNs.update(baseDN, latestKnownState);
       }
-
-      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
-      lastAliveCSNs.update(baseDN, latestKnownState);
     }
-    resetNextChangeForInsertDBCursor();
+
+    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+    nextChangeForInsertDBCursor.next();
 
     if (newestRecord != null)
     {
       // restore the "previousCookie" state before shutdown
-      final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
+      UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
+      if (record instanceof ReplicaOfflineMsg)
+      {
+        // ignore: replica offline messages are never stored in the CNIndexDB
+        nextChangeForInsertDBCursor.next();
+        record = nextChangeForInsertDBCursor.getRecord();
+      }
+
       // sanity check: ensure that when initializing the cursors at the previous
       // cookie, the next change we find is the newest record in the CNIndexDB
       if (!record.getCSN().equals(newestRecord.getCSN()))
       {
-        throw new ChangelogException(
-            ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord
-                .getCSN().toStringUI(), record.getCSN().toStringUI()));
+        throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
+            newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
       }
       // Now we can update the mediumConsistencyRUV
       mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
@@ -428,68 +385,6 @@
     return new CSN(0, 0, serverId);
   }
 
-  private void resetNextChangeForInsertDBCursor() throws ChangelogException
-  {
-    final Map<DBCursor<UpdateMsg>, DN> cursors =
-        new HashMap<DBCursor<UpdateMsg>, DN>();
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
-        : this.allCursors.entrySet())
-    {
-      for (Entry<Integer, DBCursor<UpdateMsg>> entry2
-          : entry.getValue().entrySet())
-      {
-        cursors.put(entry2.getValue(), entry.getKey());
-      }
-    }
-
-    // CNIndexer manages the cursor itself,
-    // so do not try to recycle exhausted cursors
-    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
-    result.next();
-    nextChangeForInsertDBCursor = result;
-  }
-
-  private boolean ensureCursorExists(DN baseDN, Integer serverId,
-      CSN startAfterCSN) throws ChangelogException
-  {
-    Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
-    if (map == null)
-    {
-      map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
-      allCursors.put(baseDN, map);
-    }
-    DBCursor<UpdateMsg> cursor = map.get(serverId);
-    if (cursor == null)
-    {
-      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
-      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
-      cursor.next();
-      map.put(serverId, cursor);
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Returns the immediately preceding CSN.
-   *
-   * @param csn
-   *          the CSN to use
-   * @return the immediately preceding CSN or null if the provided CSN is null.
-   */
-  CSN getPrecedingCSN(CSN csn)
-  {
-    if (csn == null)
-    {
-      return null;
-    }
-    if (csn.getSeqnum() > 0)
-    {
-      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
-    }
-    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
-  }
-
   /** {@inheritDoc} */
   @Override
   public void initiateShutdown()
@@ -509,8 +404,7 @@
     {
       /*
        * initialize here to allow fast application start up and avoid errors due
-       * cursors being created in a different thread to the one where they are
-       * used.
+       * cursors being created in a different thread to the one where they are used.
        */
       initialize();
 
@@ -520,26 +414,29 @@
         {
           if (!domainsToClear.isEmpty())
           {
+            final DN cursorData = nextChangeForInsertDBCursor.getData();
+            final boolean callNextOnCursor =
+                cursorData == null || domainsToClear.contains(cursorData);
             while (!domainsToClear.isEmpty())
             {
               final DN baseDNToClear = domainsToClear.first();
-              removeCursors(baseDNToClear);
+              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
               // Only release the waiting thread
               // once this domain's state has been cleared.
               domainsToClear.remove(baseDNToClear);
             }
-            resetNextChangeForInsertDBCursor();
-          }
-          else
-          {
-            final boolean createdCursors = createNewCursors();
-            final boolean recycledCursors = recycleExhaustedCursors();
-            if (createdCursors || recycledCursors)
+
+            if (callNextOnCursor)
             {
-              resetNextChangeForInsertDBCursor();
+              // The next change to consume comes from a domain to be removed.
+              // Call DBCursor.next() to ensure this domain is removed
+              nextChangeForInsertDBCursor.next();
             }
           }
 
+          // Do not call DBCursor.next() here
+          // because we might not have consumed the last record,
+          // for example if we could not move the MCP forward
           final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
           if (msg == null)
           {
@@ -551,8 +448,13 @@
               }
               wait();
             }
-            // loop to check whether new changes have been added to the
-            // ReplicaDBs
+            // check whether new changes have been added to the ReplicaDBs
+            nextChangeForInsertDBCursor.next();
+            continue;
+          }
+          else if (msg instanceof ReplicaOfflineMsg)
+          {
+            nextChangeForInsertDBCursor.next();
             continue;
           }
 
@@ -599,37 +501,43 @@
     }
     catch (RuntimeException e)
     {
-      // Nothing can be done about it.
-      // Rely on the DirectoryThread uncaught exceptions handler
-      // for logging error + alert.
-      // LocalizableMessage logged here gives corrective information to the administrator.
-      logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
-          getClass().getSimpleName(), stackTraceToSingleLineString(e));
+      logUnexpectedException(e);
+      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
       throw e;
     }
     catch (Exception e)
     {
-      // Nothing can be done about it.
-      // Rely on the DirectoryThread uncaught exceptions handler
-      // for logging error + alert.
-      // LocalizableMessage logged here gives corrective information to the administrator.
-      logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
-          getClass().getSimpleName(), stackTraceToSingleLineString(e));
+      logUnexpectedException(e);
+      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
       throw new RuntimeException(e);
     }
     finally
     {
-      removeCursors(DN.NULL_DN);
+      nextChangeForInsertDBCursor.close();
+      nextChangeForInsertDBCursor = null;
     }
   }
 
+  /**
+   * Nothing can be done about it.
+   * <p>
+   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
+   * alert.
+   * <p>
+   * Message logged here gives corrective information to the administrator.
+   */
+  private void logUnexpectedException(Exception e)
+  {
+    logger.trace(ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION,
+        getClass().getSimpleName(), stackTraceToSingleLineString(e));
+  }
+
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
 
-    boolean callNextOnCursor = true;
     final int mcServerId = mcCSN.getServerId();
     final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
     final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -642,133 +550,22 @@
       }
       else if (offlineCSN.isOlderThan(mcCSN))
       {
-        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
-            pair = getCursor(mcBaseDN, mcCSN.getServerId());
-        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
-        if (iter != null && !iter.hasNext())
-        {
-          /*
-           * replica is not back online, Medium consistency point has gone past
-           * its last offline time, and there are no more changes after the
-           * offline CSN in the cursor: remove everything known about it:
-           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
-           * this replica from the medium consistency RUV.
-           */
-          iter.remove();
-          StaticUtils.close(pair.getFirst());
-          resetNextChangeForInsertDBCursor();
-          callNextOnCursor = false;
-          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
-          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
-        }
+        /*
+         * replica is not back online, Medium consistency point has gone past
+         * its last offline time, and there are no more changes after the
+         * offline CSN in the cursor: remove everything known about it:
+         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+         * this replica from the medium consistency RUV.
+         */
+        // TODO JNR how to close cursor for offline replica?
+        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
       }
     }
 
-    if (callNextOnCursor)
-    {
-      // advance the cursor we just read from,
-      // success/failure will be checked later
-      nextChangeForInsertDBCursor.next();
-    }
-  }
-
-  private void removeCursors(DN baseDN)
-  {
-    if (nextChangeForInsertDBCursor != null)
-    {
-      nextChangeForInsertDBCursor.close();
-      nextChangeForInsertDBCursor = null;
-    }
-    if (DN.NULL_DN.equals(baseDN))
-    {
-      // close all cursors
-      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
-      {
-        StaticUtils.close(map.values());
-      }
-      allCursors.clear();
-      newCursors.clear();
-    }
-    else
-    {
-      // close cursors for this DN
-      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
-      if (map != null)
-      {
-        StaticUtils.close(map.values());
-      }
-      for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
-      {
-        if (it.next().getFirst().equals(baseDN))
-        {
-          it.remove();
-        }
-      }
-    }
-  }
-
-  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
-      getCursor(final DN baseDN, final int serverId) throws ChangelogException
-  {
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
-        : allCursors.entrySet())
-    {
-      if (baseDN.equals(entry1.getKey()))
-      {
-        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
-            entry1.getValue().entrySet().iterator(); iter.hasNext();)
-        {
-          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
-          if (serverId == entry2.getKey())
-          {
-            return Pair.of(entry2.getValue(), iter);
-          }
-        }
-      }
-    }
-    return Pair.empty();
-  }
-
-  private boolean recycleExhaustedCursors() throws ChangelogException
-  {
-    boolean succesfullyRecycled = false;
-    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
-    {
-      for (DBCursor<UpdateMsg> cursor : map.values())
-      {
-        // try to recycle it by calling next()
-        if (cursor.getRecord() == null && cursor.next())
-        {
-          succesfullyRecycled = true;
-        }
-      }
-    }
-    return succesfullyRecycled;
-  }
-
-  private boolean createNewCursors() throws ChangelogException
-  {
-    if (!newCursors.isEmpty())
-    {
-      boolean newCursorAdded = false;
-      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
-          newCursors.entrySet().iterator(); iter.hasNext();)
-      {
-        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
-        final DN baseDN = entry.getKey().getFirst();
-        final CSN csn = entry.getValue();
-        // start after preceding CSN so the first CSN read will exactly be the
-        // current one
-        final CSN startFromCSN = getPrecedingCSN(csn);
-        if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
-        {
-          newCursorAdded = true;
-        }
-        iter.remove();
-      }
-      return newCursorAdded;
-    }
-    return false;
+    // advance the cursor we just read from,
+    // success/failure will be checked later
+    nextChangeForInsertDBCursor.next();
   }
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 52ef4c6..e0305cd 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
  * @param <Data>
  *          The type of data associated with each cursor
  */
-final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
   private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
    */
   private byte state = UNINITIALIZED;
 
-  /** Whether this composite should try to recycle exhausted cursors. */
-  private final boolean recycleExhaustedCursors;
   /**
    * These cursors are considered exhausted because they had no new changes the
    * last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
   /**
    * The cursors are sorted based on the current change of each cursor to
    * consider the next change across all available cursors.
+   * <p>
+   * New cursors for this Map must be created from the same thread that will
+   * make use of them. When this rule is not obeyed, a JE exception will be
+   * thrown about
+   * "Non-transactional Cursors may not be used in multiple threads;".
    */
-  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+  private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
       new TreeMap<DBCursor<UpdateMsg>, Data>(
           new Comparator<DBCursor<UpdateMsg>>()
           {
@@ -81,25 +84,6 @@
             }
           });
 
-  /**
-   * Builds a CompositeDBCursor using the provided collection of cursors.
-   *
-   * @param cursors
-   *          the cursors that will be iterated upon.
-   * @param recycleExhaustedCursors
-   *          whether a call to {@link #next()} tries to recycle exhausted
-   *          cursors
-   */
-  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
-      boolean recycleExhaustedCursors)
-  {
-    this.recycleExhaustedCursors = recycleExhaustedCursors;
-    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
-    {
-      put(entry);
-    }
-  }
-
   /** {@inheritDoc} */
   @Override
   public boolean next() throws ChangelogException
@@ -108,51 +92,80 @@
     {
       return false;
     }
-    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
+
+    // If previous state was ready, then we must advance the first cursor
+    // (which UpdateMsg has been consumed).
+    // To keep consistent the cursors' order in the SortedSet, it is necessary
+    // to remove the first cursor, then add it again after moving it forward.
+    final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance =
+        state != UNINITIALIZED ? cursors.pollFirstEntry() : null;
     state = READY;
-    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
+    recycleExhaustedCursors();
+    if (cursorToAdvance != null)
     {
-      // try to recycle empty cursors in case the underlying ReplicaDBs received
-      // new changes.
+      addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
+    }
+
+    removeNoLongerNeededCursors();
+    incorporateNewCursors();
+    return !cursors.isEmpty();
+  }
+
+  private void recycleExhaustedCursors() throws ChangelogException
+  {
+    if (!exhaustedCursors.isEmpty())
+    {
+      // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
       final Map<DBCursor<UpdateMsg>, Data> copy =
           new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
       for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
       {
-        entry.getKey().next();
-        put(entry);
-      }
-      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
-      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
-      {
-        // if the first cursor was previously an exhausted cursor,
-        // then we have already called next() on it.
-        // Avoid calling it again because we know new changes have been found.
-        return true;
+        addCursor(entry.getKey(), entry.getValue());
       }
     }
-
-    // To keep consistent the cursors' order in the SortedSet, it is necessary
-    // to remove and add again the cursor after moving it forward.
-    if (advanceNonExhaustedCursors)
-    {
-      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
-      if (firstEntry != null)
-      {
-        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
-        cursor.next();
-        put(firstEntry);
-      }
-    }
-    // no cursors are left with changes.
-    return !cursors.isEmpty();
   }
 
-  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
+  private void removeNoLongerNeededCursors()
   {
-    final DBCursor<UpdateMsg> cursor = entry.getKey();
-    final Data data = entry.getValue();
-    if (cursor.getRecord() != null)
+    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
+    {
+      final Data dataToFind = iter.next();
+      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
+          cursors.entrySet().iterator(); cursorIter.hasNext();)
+      {
+        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
+        if (dataToFind.equals(entry.getValue()))
+        {
+          entry.getKey().close();
+          cursorIter.remove();
+        }
+      }
+      iter.remove();
+    }
+  }
+
+  /**
+   * Returns an Iterator over the data associated to cursors that must be removed.
+   *
+   * @return an Iterator over the data associated to cursors that must be removed.
+   */
+  protected abstract Iterator<Data> removedCursorsIterator();
+
+  /**
+   * Adds a cursor to this composite cursor. It first calls
+   * {@link DBCursor#next()} to verify whether it is exhausted or not.
+   *
+   * @param cursor
+   *          the cursor to add to this composite
+   * @param data
+   *          the data associated to the provided cursor
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
+  {
+    if (cursor.next())
     {
       this.cursors.put(cursor, data);
     }
@@ -166,6 +179,8 @@
   @Override
   public UpdateMsg getRecord()
   {
+    // Cannot call incorporateNewCursors() here because
+    // somebody might have already called DBCursor.getRecord() and read the record
     final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
     if (entry != null)
     {
@@ -175,6 +190,16 @@
   }
 
   /**
+   * Called when implementors should incorporate new cursors into the current
+   * composite DBCursor. Implementors should call
+   * {@link #addCursor(DBCursor, Object)} to do so.
+   *
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  protected abstract void incorporateNewCursors() throws ChangelogException;
+
+  /**
    * Returns the data associated to the cursor that returned the current record.
    *
    * @return the data associated to the cursor that returned the current record.
@@ -193,8 +218,11 @@
   @Override
   public void close()
   {
+    state = CLOSED;
     StaticUtils.close(cursors.keySet());
     StaticUtils.close(exhaustedCursors.keySet());
+    cursors.clear();
+    exhaustedCursors.clear();
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
new file mode 100644
index 0000000..3cf8b5d
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -0,0 +1,127 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a replication domain's replica DBs.
+ */
+public class DomainDBCursor extends CompositeDBCursor<Void>
+{
+
+  private final DN baseDN;
+  private final ReplicationDomainDB domainDB;
+
+  private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
+      new ConcurrentSkipListMap<Integer, CSN>();
+  /**
+   * Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
+   */
+  private static final CSN NULL_CSN = new CSN(0, 0, 0);
+
+  /**
+   * Builds a DomainDBCursor instance.
+   *
+   * @param baseDN
+   *          the replication domain baseDN of this cursor
+   * @param domainDB
+   *          the DB for the provided replication domain
+   */
+  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+  {
+    this.baseDN = baseDN;
+    this.domainDB = domainDB;
+  }
+
+  /**
+   * Returns the replication domain baseDN of this cursor.
+   *
+   * @return the replication domain baseDN of this cursor.
+   */
+  public DN getBaseDN()
+  {
+    return baseDN;
+  }
+
+  /**
+   * Adds a replicaDB for this cursor to iterate over. Added cursors will be
+   * created and iterated over on the next call to {@link #next()}.
+   *
+   * @param serverId
+   *          the serverId of the replica
+   * @param startAfterCSN
+   *          the CSN after which to start iterating
+   */
+  public void addReplicaDB(int serverId, CSN startAfterCSN)
+  {
+    // only keep the oldest CSN that will be the new cursor's starting point
+    newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void incorporateNewCursors() throws ChangelogException
+  {
+    for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();)
+    {
+      final Entry<Integer, CSN> pair = iter.next();
+      final int serverId = pair.getKey();
+      final CSN csn = pair.getValue();
+      final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
+      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+      addCursor(cursor, null);
+      iter.remove();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  @SuppressWarnings("unchecked")
+  protected Iterator<Void> removedCursorsIterator()
+  {
+    return Collections.EMPTY_LIST.iterator(); // nothing to remove
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    super.close();
+    domainDB.unregisterCursor(this);
+    newReplicas.clear();
+  }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 7e75bf1..dc23a7e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -40,6 +41,7 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
@@ -72,13 +74,20 @@
    * <li>then check it's not null</li>
    * <li>then close all inside</li>
    * </ol>
-   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+   * When creating a replicaDB, synchronize on the domainMap to avoid
    * concurrent shutdown.
    */
-  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
-      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
-  private ReplicationDbEnv dbEnv;
-  private ReplicationServerCfg config;
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private ReplicationDbEnv replicationEnv;
+  private final ReplicationServerCfg config;
   private final File dbDirectory;
 
   /**
@@ -103,9 +112,9 @@
 
   /** The local replication server. */
   private final ReplicationServer replicationServer;
-  private AtomicBoolean shutdown = new AtomicBoolean();
+  private final AtomicBoolean shutdown = new AtomicBoolean();
 
-  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
       new DBCursor<UpdateMsg>()
   {
 
@@ -135,7 +144,7 @@
   };
 
   /**
-   * Builds an instance of this class.
+   * Creates a new changelog DB.
    *
    * @param replicationServer
    *          the local replication server.
@@ -144,15 +153,15 @@
    * @throws ConfigException
    *           if a problem occurs opening the supplied directory
    */
-  public JEChangelogDB(ReplicationServer replicationServer,
-      ReplicationServerCfg config) throws ConfigException
+  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+      throws ConfigException
   {
     this.config = config;
     this.replicationServer = replicationServer;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
-  private File makeDir(String dbDirName) throws ConfigException
+  private File makeDir(final String dbDirName) throws ConfigException
   {
     // Check that this path exists or create it.
     final File dbDirectory = getFileForPath(dbDirName);
@@ -168,15 +177,13 @@
     {
       logger.traceException(e);
 
-      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder();
-      mb.append(e.getLocalizedMessage());
-      mb.append(" ");
-      mb.append(dbDirectory);
-      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb), e);
+      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
+          e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
+      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
     }
   }
 
-  private Map<Integer, JEReplicaDB> getDomainMap(DN baseDN)
+  private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
   {
     final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
     if (domainMap != null)
@@ -186,29 +193,12 @@
     return Collections.emptyMap();
   }
 
-  private JEReplicaDB getReplicaDB(DN baseDN, int serverId)
+  private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
   {
     return getDomainMap(baseDN).get(serverId);
   }
 
   /**
-   * Provision resources for the specified serverId in the specified replication
-   * domain.
-   *
-   * @param baseDN
-   *          the replication domain where to add the serverId
-   * @param serverId
-   *          the server Id to add to the replication domain
-   * @throws ChangelogException
-   *           If a database error happened.
-   */
-  private void commission(DN baseDN, int serverId, ReplicationServer rs)
-      throws ChangelogException
-  {
-    getOrCreateReplicaDB(baseDN, serverId, rs);
-  }
-
-  /**
    * Returns a {@link JEReplicaDB}, possibly creating it.
    *
    * @param baseDN
@@ -217,35 +207,42 @@
    *          the serverId for which to create a ReplicaDB
    * @param server
    *          the ReplicationServer
-   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
-   *         to be created
+   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
    * @throws ChangelogException
    *           if a problem occurred with the database
    */
-  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
-      int serverId, ReplicationServer server) throws ChangelogException
+  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+      final ReplicationServer server) throws ChangelogException
   {
     while (!shutdown.get())
     {
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
-          getExistingOrNewDomainMap(baseDN);
-      final Pair<JEReplicaDB, Boolean> result =
-          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
       if (result != null)
       {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
         return result;
       }
     }
-    throw new ChangelogException(
-        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
   }
 
-  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
-      DN baseDN)
+  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
   {
     // happy path: the domainMap already exists
-    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
-        domainToReplicaDBs.get(baseDN);
+    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
     if (currentValue != null)
     {
       return currentValue;
@@ -254,30 +251,36 @@
     // unlucky, the domainMap does not exist: take the hit and create the
     // newValue, even though the same could be done concurrently by another
     // thread
-    final ConcurrentMap<Integer, JEReplicaDB> newValue =
-        new ConcurrentHashMap<Integer, JEReplicaDB>();
-    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
-        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
     if (previousValue != null)
     {
       // there was already a value associated to the key, let's use it
       return previousValue;
     }
+
+    if (MultimasterReplication.isECLEnabledDomain(baseDN))
+    {
+      // we just created a new domain => update all cursors
+      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+      {
+        cursor.addDomain(baseDN, null);
+      }
+    }
     return newValue;
   }
 
-  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
-      DN baseDN, ReplicationServer server) throws ChangelogException
+  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    // happy path: the JEReplicaDB already exists
+    // happy path: the replicaDB already exists
     JEReplicaDB currentValue = domainMap.get(serverId);
     if (currentValue != null)
     {
       return Pair.of(currentValue, false);
     }
 
-    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
@@ -293,11 +296,11 @@
         // The domainMap could have been concurrently removed because
         // 1) a shutdown was initiated or 2) an initialize was called.
         // Return will allow the code to:
-        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
         return null;
       }
 
-      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
       domainMap.put(serverId, newDB);
       return Pair.of(newDB, true);
     }
@@ -310,8 +313,8 @@
     try
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
-      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
-      final ChangelogState changelogState = dbEnv.getChangelogState();
+      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+      final ChangelogState changelogState = replicationEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
@@ -338,12 +341,12 @@
     {
       for (int serverId : entry.getValue())
       {
-        commission(entry.getKey(), serverId, replicationServer);
+        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
       }
     }
   }
 
-  private void shutdownCNIndexDB() throws ChangelogException
+  private void shutdownChangeNumberIndexDB() throws ChangelogException
   {
     synchronized (cnIndexDBLock)
     {
@@ -381,7 +384,7 @@
 
     try
     {
-      shutdownCNIndexDB();
+      shutdownChangeNumberIndexDB();
     }
     catch (ChangelogException e)
     {
@@ -402,7 +405,7 @@
       }
     }
 
-    if (dbEnv != null)
+    if (replicationEnv != null)
     {
       // wait for shutdown of the threads holding cursors
       try
@@ -421,7 +424,7 @@
         // do nothing: we are already shutting down
       }
 
-      dbEnv.shutdown();
+      replicationEnv.shutdown();
     }
 
     if (firstException != null)
@@ -431,11 +434,10 @@
   }
 
   /**
-   * Clears all content from the changelog database, but leaves its directory on
-   * the filesystem.
+   * Clears all records from the changelog (does not remove the changelog itself).
    *
    * @throws ChangelogException
-   *           If a database problem happened
+   *           If an error occurs when clearing the changelog.
    */
   public void clearDB() throws ChangelogException
   {
@@ -469,7 +471,7 @@
 
         try
         {
-          shutdownCNIndexDB();
+          shutdownChangeNumberIndexDB();
         }
         catch (ChangelogException e)
         {
@@ -584,7 +586,7 @@
     // 3- clear the changelogstate DB
     try
     {
-      dbEnv.clearGenerationId(baseDN);
+      replicationEnv.clearGenerationId(baseDN);
     }
     catch (ChangelogException e)
     {
@@ -635,7 +637,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(dbEnv.getChangelogState());
+      startIndexer(replicationEnv.getChangelogState());
     }
     else
     {
@@ -673,7 +675,7 @@
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+          cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
         }
         catch (Exception e)
         {
@@ -694,40 +696,57 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
-      throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
   {
-    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
-    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
-    for (int serverId : serverIds)
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
-      replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
-      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
     }
-    // recycle exhausted cursors,
-    // because client code will not manage the cursors itself
-    return new CompositeDBCursor<Void>(cursors, true);
+    return cursor;
   }
 
-  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
-      ServerState startAfterServerState)
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+      throws ChangelogException
   {
-    final ServerState domainState = offlineReplicas.getServerState(baseDN);
-    if (domainState != null)
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    for (int serverId : getDomainMap(baseDN).keySet())
     {
-      for (CSN offlineCSN : domainState)
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
       {
-        if (serverId == offlineCSN.getServerId()
-            && !startAfterServerState.cover(offlineCSN))
-        {
-          return offlineCSN;
-        }
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
       }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
     }
     return null;
   }
@@ -737,31 +756,57 @@
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
       throws ChangelogException
   {
-    JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+    final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      return replicaDB.generateCursorFrom(startAfterCSN);
+      final DBCursor<UpdateMsg> cursor =
+          replicaDB.generateCursorFrom(startAfterCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+      // TODO JNR if (offlineCSN != null) ??
+      // What about replicas that suddenly become offline?
+      return new ReplicaOfflineCursor(cursor, offlineCSN);
     }
-    return EMPTY_CURSOR;
+    return EMPTY_CURSOR_REPLICA_DB;
   }
 
   /** {@inheritDoc} */
   @Override
-  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
-      throws ChangelogException
+  public void unregisterCursor(final DBCursor<?> cursor)
   {
-    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
-        updateMsg.getCSN().getServerId(), replicationServer);
-    final JEReplicaDB replicaDB = pair.getFirst();
-    final boolean wasCreated = pair.getSecond();
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
 
+  /** {@inheritDoc} */
+  @Override
+  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+  {
+    final CSN csn = updateMsg.getCSN();
+    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+        csn.getServerId(), replicationServer);
+    final JEReplicaDB replicaDB = pair.getFirst();
     replicaDB.add(updateMsg);
+
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
-    return wasCreated;
+    return pair.getSecond(); // replica DB was created
   }
 
   /** {@inheritDoc} */
@@ -779,7 +824,7 @@
   @Override
   public void replicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
   {
-    dbEnv.addOfflineReplica(baseDN, offlineCSN);
+    replicationEnv.addOfflineReplica(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
new file mode 100644
index 0000000..9f78065
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -0,0 +1,123 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a all the replication domain known to the changelog DB.
+ */
+public class MultiDomainDBCursor extends CompositeDBCursor<DN>
+{
+  private final ReplicationDomainDB domainDB;
+
+  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
+      new ConcurrentSkipListMap<DN, ServerState>();
+  private final ConcurrentSkipListSet<DN> removeDomains =
+      new ConcurrentSkipListSet<DN>();
+
+  /**
+   * Builds a MultiDomainDBCursor instance.
+   *
+   * @param domainDB
+   *          the replication domain management DB
+   */
+  public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+  {
+    this.domainDB = domainDB;
+  }
+
+  /**
+   * Adds a replication domain for this cursor to iterate over. Added cursors
+   * will be created and iterated over on the next call to {@link #next()}.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   * @param startAfterState
+   *          the {@link ServerState} after which to start iterating
+   */
+  public void addDomain(DN baseDN, ServerState startAfterState)
+  {
+    newDomains.put(baseDN,
+        startAfterState != null ? startAfterState : new ServerState());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void incorporateNewCursors() throws ChangelogException
+  {
+    for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator();
+         iter.hasNext();)
+    {
+      final Entry<DN, ServerState> entry = iter.next();
+      final DN baseDN = entry.getKey();
+      final ServerState serverState = entry.getValue();
+      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+      addCursor(domainDBCursor, baseDN);
+      iter.remove();
+    }
+  }
+
+  /**
+   * Removes a replication domain from this cursor and stops iterating over it.
+   * Removed cursors will be effectively removed on the next call to
+   * {@link #next()}.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   */
+  public void removeDomain(DN baseDN)
+  {
+    removeDomains.add(baseDN);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected Iterator<DN> removedCursorsIterator()
+  {
+    return removeDomains.iterator();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    super.close();
+    domainDB.unregisterCursor(this);
+    newDomains.clear();
+    removeDomains.clear();
+  }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index d8d536c..ebf4df5 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -95,6 +95,16 @@
 public class ExternalChangeLogTest extends ReplicationTestCase
 {
 
+  private static class Results
+  {
+
+    public final List<SearchResultEntryProtocolOp> searchResultEntries =
+        new ArrayList<SearchResultEntryProtocolOp>();
+    public long searchReferences;
+    public long searchesDone;
+
+  }
+
   private static final int SERVER_ID_1 = 1201;
   private static final int SERVER_ID_2 = 1202;
 
@@ -188,14 +198,15 @@
   @Test(enabled=true, dependsOnMethods = { "PrimaryTest"})
   public void TestWithAndWithoutControl() throws Exception
   {
+    final String tn = "TestWithAndWithoutControl";
     replicationServer.getChangelogDB().setPurgeDelay(0);
     // Write changes and read ECL from start
-    ECLCompatWriteReadAllOps(1);
+    ECLCompatWriteReadAllOps(1, tn);
 
     ECLCompatNoControl(1);
 
     // Write additional changes and read ECL from a provided change number
-    ECLCompatWriteReadAllOps(5);
+    ECLCompatWriteReadAllOps(5, tn);
   }
 
   @Test(enabled=false, dependsOnMethods = { "PrimaryTest"})
@@ -293,12 +304,13 @@
   @Test(enabled=false, groups="slow", dependsOnMethods = { "PrimaryTest"})
   public void ECLReplicationServerFullTest15() throws Exception
   {
+    final String tn = "ECLReplicationServerFullTest15";
     replicationServer.getChangelogDB().setPurgeDelay(0);
     // Write 4 changes and read ECL from start
-    ECLCompatWriteReadAllOps(1);
+    ECLCompatWriteReadAllOps(1, tn);
 
     // Write 4 additional changes and read ECL from a provided change number
-    CSN csn = ECLCompatWriteReadAllOps(5);
+    CSN csn = ECLCompatWriteReadAllOps(5, tn);
 
     // Test request from a provided change number - read 6
     ECLCompatReadFrom(6, csn);
@@ -895,15 +907,12 @@
 
       final CSN[] csns = generateCSNs(3, SERVER_ID_1);
       publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
-      Thread.sleep(1000);
-
-      // Test that last cookie has been updated
-      String cookieNotEmpty = readLastCookie();
-      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+      final String firstCookie = assertLastCookieDifferentThanLastValue("");
+      String lastCookie = firstCookie;
       publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
       publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
 
       // ---
       // 2. Now set up a very short purge delay on the replication changelogs
@@ -930,7 +939,7 @@
       //    returns the appropriate error.
       debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
       debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
-      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+      searchOp = searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
       assertTrue(searchOp.getErrorMessage().toString().startsWith(
           ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(TEST_ROOT_DN_STRING).toString()),
           searchOp.getErrorMessage().toString());
@@ -962,26 +971,21 @@
 
       final CSN[] csns = generateCSNs(3, SERVER_ID_1);
       publishDeleteMsgInOTest(server01, csns[0], testName, 1);
-
-      Thread.sleep(1000);
-
-      // Test that last cookie has been updated
-      String cookieNotEmpty = readLastCookie();
-      debugInfo(testName, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
-
+      final String firstCookie = assertLastCookieDifferentThanLastValue("");
+      String lastCookie = firstCookie;
       publishDeleteMsgInOTest(server01, csns[1], testName, 2);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
       publishDeleteMsgInOTest(server01, csns[2], testName, 3);
+      lastCookie = assertLastCookieDifferentThanLastValue(lastCookie);
 
       // ---
       // 2. Now remove the domain by sending a reset message
-      ResetGenerationIdMsg msg = new ResetGenerationIdMsg(23657);
-      server01.publish(msg);
+      server01.publish(new ResetGenerationIdMsg(23657));
 
       // ---
       // 3. Assert that a request with an empty cookie returns nothing
       // since replication changelog has been cleared
       String cookie= "";
-      InternalSearchOperation searchOp = null;
       searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
 
       // ---
@@ -989,7 +993,7 @@
       // since replication changelog has been cleared
       cookie = readLastCookie();
       debugInfo(testName, "2. Search with last cookie=" + cookie + "\"");
-      searchOp = searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
+      searchOnCookieChangelog("(targetDN=*)", cookie, 0, testName, SUCCESS);
 
       // ---
       // 5. Assert that a request with an "old" cookie - one that refers to
@@ -997,7 +1001,8 @@
       //    returns the appropriate error.
       debugInfo(testName, "d1 trimdate" + getDomainOldestState(TEST_ROOT_DN));
       debugInfo(testName, "d2 trimdate" + getDomainOldestState(TEST_ROOT_DN2));
-      searchOp = searchOnCookieChangelog("(targetDN=*)", cookieNotEmpty, 0, testName, UNWILLING_TO_PERFORM);
+      final InternalSearchOperation searchOp =
+          searchOnCookieChangelog("(targetDN=*)", firstCookie, 0, testName, UNWILLING_TO_PERFORM);
       assertThat(searchOp.getErrorMessage().toString()).contains("unknown replicated domain", TEST_ROOT_DN_STRING.toString());
     }
     finally
@@ -1007,6 +1012,23 @@
     debugInfo(testName, "Ending test successfully");
   }
 
+  private String assertLastCookieDifferentThanLastValue(final String lastCookie) throws Exception
+  {
+    int cnt = 0;
+    while (cnt < 100)
+    {
+      final String newCookie = readLastCookie();
+      if (!newCookie.equals(lastCookie))
+      {
+        return newCookie;
+      }
+      cnt++;
+      Thread.sleep(10);
+    }
+    Assertions.fail("Expected last cookie would have been updated, but it always stayed at value '" + lastCookie + "'");
+    return null;// dead code
+  }
+
   private void debugAndWriteEntries(LDIFWriter ldifWriter,
       List<SearchResultEntry> entries, String tn) throws Exception
   {
@@ -1074,10 +1096,11 @@
 
       // Publish ADD
       csnCounter++;
-      String lentry = "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
-          + "objectClass: top\n" + "objectClass: domain\n"
-          + "entryUUID: "+user1entryUUID+"\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn: uid=" + tn + "2," + TEST_ROOT_DN_STRING + "\n"
+          + "objectClass: top\n"
+          + "objectClass: domain\n"
+          + "entryUUID: " + user1entryUUID + "\n");
       AddMsg addMsg = new AddMsg(
           csns[csnCounter],
           DN.valueOf("uid="+tn+"2," + TEST_ROOT_DN_STRING),
@@ -1412,49 +1435,27 @@
 
       InvocationCounterPlugin.resetAllCounters();
 
-      long searchEntries;
-      long searchReferences = ldapStatistics.getSearchResultReferences();
-      long searchesDone     = ldapStatistics.getSearchResultsDone();
+      final Results results = new Results();
+      results.searchReferences = ldapStatistics.getSearchResultReferences();
+      results.searchesDone     = ldapStatistics.getSearchResultsDone();
 
       debugInfo(tn, "Search Persistent filter=(targetDN=*"+tn+"*,o=test)");
-      LDAPMessage message = new LDAPMessage(2, searchRequest, controls);
-      w.writeMessage(message);
+      w.writeMessage(new LDAPMessage(2, searchRequest, controls));
       Thread.sleep(500);
 
       if (!changesOnly)
       {
         // Wait for change 1
         debugInfo(tn, "Waiting for init search expected to return change 1");
-        searchEntries = 0;
+        readMessages(tn, r, results, 1, "Init search Result=");
+        for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
         {
-          while (searchEntries < 1 && (message = r.readMessage()) != null)
-          {
-            debugInfo(tn, "Init search Result=" +
-                message.getProtocolOpType() + message + " " + searchEntries);
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              SearchResultEntryProtocolOp searchResultEntry =
-                  message.getSearchResultEntryProtocolOp();
-              searchEntries++;
-              // FIXME:ECL Double check 1 is really the valid value here.
-              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
-                  (compatMode?"1":"0"));
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              searchReferences++;
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
-          }
+          // FIXME:ECL Double check 1 is really the valid value here.
+          final String cn = compatMode ? "1" : "0";
+          checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
         }
         debugInfo(tn, "INIT search done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
+            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
       }
 
       // Produces change 2
@@ -1470,30 +1471,8 @@
       " published , psearch will now wait for new entries");
 
       // wait for the 1 new entry
-      searchEntries = 0;
-      SearchResultEntryProtocolOp searchResultEntry = null;
-      while (searchEntries < 1 && (message = r.readMessage()) != null)
-      {
-        debugInfo(tn, "psearch search  Result=" +
-            message.getProtocolOpType() + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchResultEntry = message.getSearchResultEntryProtocolOp();
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          searchReferences++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r, results, 1, "psearch search  Result=");
+      SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(0);
       Thread.sleep(1000);
 
       // Check we received change 2
@@ -1523,11 +1502,12 @@
             createSearchRequest("(targetDN=*directpsearch*,o=test)", null);
 
         debugInfo(tn, "ACI test : sending search");
-        message = new LDAPMessage(2, searchRequest, createCookieControl(""));
-        w.writeMessage(message);
+        w.writeMessage(new LDAPMessage(2, searchRequest, createCookieControl("")));
 
-        searchesDone=0;
-        searchEntries = 0;
+        LDAPMessage message;
+        int searchesDone = 0;
+        int searchEntries = 0;
+        int searchReferences = 0;
         while ((searchesDone==0) && (message = r.readMessage()) != null)
         {
           debugInfo(tn, "ACI test : message returned " +
@@ -1719,125 +1699,53 @@
 
       InvocationCounterPlugin.resetAllCounters();
 
-      ldapStatistics.getSearchRequests();
-      long searchEntries    = ldapStatistics.getSearchResultEntries();
-      ldapStatistics.getSearchResultReferences();
-      long searchesDone     = ldapStatistics.getSearchResultsDone();
+      final Results results = new Results();
+      results.searchesDone = ldapStatistics.getSearchResultsDone();
 
-      LDAPMessage message;
-      message = new LDAPMessage(2, searchRequest1, controls);
-      w1.writeMessage(message);
+      w1.writeMessage(new LDAPMessage(2, searchRequest1, controls));
       Thread.sleep(500);
-
-      message = new LDAPMessage(2, searchRequest2, controls);
-      w2.writeMessage(message);
+      w2.writeMessage(new LDAPMessage(2, searchRequest2, controls));
       Thread.sleep(500);
-
-      message = new LDAPMessage(2, searchRequest3, controls);
-      w3.writeMessage(message);
+      w3.writeMessage(new LDAPMessage(2, searchRequest3, controls));
       Thread.sleep(500);
 
       if (!changesOnly)
       {
         debugInfo(tn, "Search1  Persistent filter=" + searchRequest1.getFilter()
                   + " expected to return change " + csn1);
-        searchEntries = 0;
-        message = null;
-
         {
-          while (searchEntries < 1 && (message = r1.readMessage()) != null)
+          readMessages(tn, r1, results, 1, "Search1 Result=");
+          final int searchEntries = results.searchResultEntries.size();
+          if (searchEntries == 1)
           {
-            debugInfo(tn, "Search1 Result=" +
-                message.getProtocolOpType() + " " + message);
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              SearchResultEntryProtocolOp searchResultEntry =
-                  message.getSearchResultEntryProtocolOp();
-              searchEntries++;
-              if (searchEntries==1)
-              {
-                checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
-                checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
-                    (compatMode?"10":"0"));
-              }
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
+            final SearchResultEntryProtocolOp searchResultEntry = results.searchResultEntries.get(1);
+            final String cn = compatMode ? "10" : "0";
+            checkValue(searchResultEntry.toSearchResultEntry(),"replicationcsn",csn1.toString());
+            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
           }
+          debugInfo(tn, "Search1 done with success. searchEntries="
+              + searchEntries + " #searchesDone=" + results.searchesDone);
         }
-        debugInfo(tn, "Search1 done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
 
-        searchEntries = 0;
-        message = null;
         {
           debugInfo(tn, "Search 2  Persistent filter=" + searchRequest2.getFilter()
               + " expected to return change " + csn2 + " & " + csn3);
-          while (searchEntries < 2 && (message = r2.readMessage()) != null)
+          readMessages(tn, r2, results, 2, "Search 2 Result=");
+          for (SearchResultEntryProtocolOp searchResultEntry : results.searchResultEntries)
           {
-            debugInfo(tn, "Search 2 Result=" +
-                message.getProtocolOpType() + message);
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              SearchResultEntryProtocolOp searchResultEntry =
-                  message.getSearchResultEntryProtocolOp();
-              searchEntries++;
-              checkValue(searchResultEntry.toSearchResultEntry(),"changenumber",
-                  (compatMode?"10":"0"));
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
+            final String cn = compatMode ? "10" : "0";
+            checkValue(searchResultEntry.toSearchResultEntry(), "changenumber", cn);
           }
+          debugInfo(tn, "Search2 done with success. searchEntries="
+              + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
         }
-        debugInfo(tn, "Search2 done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
 
 
-        searchEntries = 0;
-        message = null;
-        {
-          debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
-              + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
-          while (searchEntries < 4 && (message = r3.readMessage()) != null)
-          {
-            debugInfo(tn, "Search3 Result=" +
-                message.getProtocolOpType() + " " + message);
-
-            switch (message.getProtocolOpType())
-            {
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-              searchEntries++;
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-              break;
-
-            case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-              assertSuccessful(message);
-              searchesDone++;
-              break;
-            }
-          }
-        }
+        debugInfo(tn, "Search3  Persistent filter=" + searchRequest3.getFilter()
+            + " expected to return change top + " + csn1 + " & " + csn2 + " & " + csn3);
+        readMessages(tn, r3, results, 4, "Search3 Result=");
         debugInfo(tn, "Search3 done with success. searchEntries="
-            + searchEntries + " #searchesDone="+ searchesDone);
-
+            + results.searchResultEntries.size() + " #searchesDone=" + results.searchesDone);
       }
 
       // Produces additional change
@@ -1871,82 +1779,19 @@
       debugInfo(tn, delMsg13.getCSN()  + " published additionally ");
 
       // wait 11
-      searchEntries = 0;
-      message = null;
-      while (searchEntries < 1 && (message = r1.readMessage()) != null)
-      {
-        debugInfo(tn, "Search 11 Result=" +
-            message.getProtocolOpType() + " " + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r1, results, 1, "Search 11 Result=");
       Thread.sleep(1000);
       debugInfo(tn, "Search 1 successfully receives additional changes");
 
       // wait 12 & 13
-      searchEntries = 0;
-      message = null;
-      while (searchEntries < 2 && (message = r2.readMessage()) != null)
-      {
-        debugInfo(tn, "psearch search 12 Result=" +
-            message.getProtocolOpType() + " " + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r2, results, 2, "psearch search 12 Result=");
       Thread.sleep(1000);
       debugInfo(tn, "Search 2 successfully receives additional changes");
 
       // wait 11 & 12 & 13
-      searchEntries = 0;
-      SearchResultEntryProtocolOp searchResultEntry = null;
-      message = null;
-      while (searchEntries < 3 && (message = r3.readMessage()) != null)
-      {
-        debugInfo(tn, "psearch search 13 Result=" +
-            message.getProtocolOpType() + " " + message);
-        switch (message.getProtocolOpType())
-        {
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
-          searchResultEntry = message.getSearchResultEntryProtocolOp();
-          searchEntries++;
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
-          break;
-
-        case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
-          assertSuccessful(message);
-//        assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
-          searchesDone++;
-          break;
-        }
-      }
+      readMessages(tn, r3, results, 3, "psearch search 13 Result=");
+      SearchResultEntryProtocolOp searchResultEntry =
+          results.searchResultEntries.get(results.searchResultEntries.size() - 1);
       Thread.sleep(1000);
 
       // Check we received change 13
@@ -1961,6 +1806,35 @@
     debugInfo(tn, "Ends test successfully");
   }
 
+  private void readMessages(String tn, org.opends.server.tools.LDAPReader r,
+      final Results results, final int i, final String string) throws Exception
+  {
+    results.searchResultEntries.clear();
+
+    LDAPMessage message;
+    while (results.searchResultEntries.size() < i
+        && (message = r.readMessage()) != null)
+    {
+      debugInfo(tn, string + message.getProtocolOpType() + " " + message);
+
+      switch (message.getProtocolOpType())
+      {
+      case LDAPConstants.OP_TYPE_SEARCH_RESULT_ENTRY:
+        results.searchResultEntries.add(message.getSearchResultEntryProtocolOp());
+        break;
+
+      case LDAPConstants.OP_TYPE_SEARCH_RESULT_REFERENCE:
+        results.searchReferences++;
+        break;
+
+      case LDAPConstants.OP_TYPE_SEARCH_RESULT_DONE:
+        assertSuccessful(message);
+        results.searchesDone++;
+        break;
+      }
+    }
+  }
+
   private void assertSuccessful(LDAPMessage message)
   {
     SearchResultDoneProtocolOp doneOp = message.getSearchResultDoneProtocolOp();
@@ -2007,10 +1881,9 @@
       new BindRequestProtocolOp(
           ByteString.valueOf(bindDN),
           3, ByteString.valueOf(password));
-    LDAPMessage message = new LDAPMessage(1, bindRequest);
-    w.writeMessage(message);
+    w.writeMessage(new LDAPMessage(1, bindRequest));
 
-    message = r.readMessage();
+    final LDAPMessage message = r.readMessage();
     BindResponseProtocolOp bindResponse = message.getBindResponseProtocolOp();
 //  assertEquals(InvocationCounterPlugin.waitForPostResponse(), 1);
     assertEquals(bindResponse.getResultCode(), expected);
@@ -2204,9 +2077,9 @@
     debugInfo(tn, "Ending test successfully");
   }
 
-  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber) throws Exception
+  private CSN ECLCompatWriteReadAllOps(long firstChangeNumber, String testName) throws Exception
   {
-    String tn = "ECLCompatWriteReadAllOps/" + firstChangeNumber;
+    String tn = testName + "-ECLCompatWriteReadAllOps/" + firstChangeNumber;
     debugInfo(tn, "Starting test\n\n");
     LDAPReplicationDomain domain = null;
     try
@@ -2224,17 +2097,16 @@
       CSN[] csns = generateCSNs(4, SERVER_ID_1);
 
       // Publish DEL
-      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
+      DeleteMsg delMsg = newDeleteMsg("uid=" + tn + "-1," + TEST_ROOT_DN_STRING, csns[0], user1entryUUID);
       server01.publish(delMsg);
       debugInfo(tn, " publishes " + delMsg.getCSN());
 
       // Publish ADD
-      String lentry =
-          "dn: uid="+tn+"2," + TEST_ROOT_DN_STRING + "\n"
+      Entry entry = TestCaseUtils.entryFromLdifString(
+          "dn: uid=" + tn + "-2," + TEST_ROOT_DN_STRING + "\n"
           + "objectClass: top\n"
           + "objectClass: domain\n"
-          + "entryUUID: "+user1entryUUID+"\n";
-      Entry entry = TestCaseUtils.entryFromLdifString(lentry);
+          + "entryUUID: " + user1entryUUID + "\n");
       AddMsg addMsg = new AddMsg(
           csns[1],
           entry.getName(),
@@ -2247,7 +2119,7 @@
       debugInfo(tn, " publishes " + addMsg.getCSN());
 
       // Publish MOD
-      DN baseDN = DN.valueOf("uid="+tn+"3," + TEST_ROOT_DN_STRING);
+      DN baseDN = DN.valueOf("uid="+tn+"-3," + TEST_ROOT_DN_STRING);
       List<Modification> mods = createMods("description", "new value");
       ModifyMsg modMsg = new ModifyMsg(csns[2], baseDN, mods, user1entryUUID);
       server01.publish(modMsg);
@@ -2255,7 +2127,7 @@
 
       // Publish modDN
       ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
-          DN.valueOf("uid="+tn+"4," + TEST_ROOT_DN_STRING), // entryDN
+          DN.valueOf("uid="+tn+"-4," + TEST_ROOT_DN_STRING), // entryDN
           RDN.decode("uid="+tn+"new4"), // new rdn
           true,  // deleteoldrdn
           TEST_ROOT_DN2); // new superior
@@ -2265,8 +2137,8 @@
       server01.publish(modDNMsg);
       debugInfo(tn, " publishes " + modDNMsg.getCSN());
 
-      String filter = "(targetdn=*" + tn + "*,o=test)";
-      InternalSearchOperation searchOp = searchOnChangelog(filter, 4, tn, SUCCESS);
+      InternalSearchOperation searchOp =
+          searchOnChangelog("(targetdn=*" + tn + "*,o=test)", 4, tn, SUCCESS);
 
       // test 4 entries returned
       final LDIFWriter ldifWriter = getLDIFWriter();
@@ -2276,7 +2148,7 @@
       stop(server01);
 
       // Test with filter on change number
-      filter =
+      String filter =
           "(&(targetdn=*" + tn + "*,o=test)"
             + "(&(changenumber>=" + firstChangeNumber + ")"
               + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
@@ -2339,7 +2211,7 @@
       long firstChangeNumber, int i, String tn, CSN csn)
   {
     final long changeNumber = firstChangeNumber + i;
-    final String targetDN = "uid=" + tn + (i + 1) + "," + TEST_ROOT_DN_STRING;
+    final String targetDN = "uid=" + tn + "-" + (i + 1) + "," + TEST_ROOT_DN_STRING;
 
     assertDNEquals(resultEntry, changeNumber);
     checkValue(resultEntry, "changenumber", String.valueOf(changeNumber));
@@ -2352,9 +2224,11 @@
 
   private void assertDNEquals(SearchResultEntry resultEntry, long changeNumber)
   {
-    String actualDN = resultEntry.getName().toNormalizedString();
-    String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
-    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
+    final String actualDN = resultEntry.getName().toNormalizedString();
+    final String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
+    assertThat(actualDN)
+        .as("Unexpected DN for entry " + resultEntry)
+        .isEqualToIgnoringCase(expectedDN);
   }
 
   private void ECLCompatReadFrom(long firstChangeNumber, Object csn) throws Exception
@@ -2548,7 +2422,7 @@
     while (!cnIndexDB.isEmpty())
     {
       debugInfo(tn, "cnIndexDB.count=" + cnIndexDB.count());
-      Thread.sleep(200);
+      Thread.sleep(10);
     }
 
     debugInfo(tn, "Ending test with success");
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index bf5c2da..23c9318 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -128,7 +128,11 @@
   private ChangeNumberIndexDB cnIndexDB;
   @Mock
   private ReplicationDomainDB domainDB;
-  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
+
+  private List<DN> eclEnabledDomains;
+  private MultiDomainDBCursor multiDomainCursor;
+  private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors;
+  private Map<DN, DomainDBCursor> domainDBCursors;
   private ChangelogState initialState;
   private Map<DN, ServerState> domainNewestCSNs;
   private ChangeNumberIndexer cnIndexer;
@@ -153,13 +157,18 @@
   public void setup() throws Exception
   {
     MockitoAnnotations.initMocks(this);
-    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
-    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
 
+    multiDomainCursor = new MultiDomainDBCursor(domainDB);
     initialState = new ChangelogState();
     initialCookie = new MultiDomainServerState();
-    cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+    replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+    domainDBCursors = new HashMap<DN, DomainDBCursor>();
     domainNewestCSNs = new HashMap<DN, ServerState>();
+
+    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
+    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+    when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
+        multiDomainCursor);
   }
 
   @AfterMethod
@@ -173,15 +182,17 @@
   @Test
   public void emptyDBNoDS() throws Exception
   {
-    startCNIndexer(BASE_DN1);
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneDS() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -192,10 +203,11 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void nonEmptyDBOneDS() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     addReplica(BASE_DN1, serverId1);
     setCNIndexDBInitialRecords(msg1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
@@ -206,9 +218,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSs() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     // simulate messages received out of order
@@ -224,9 +237,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsDifferentDomains() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN2, serverId2);
-    startCNIndexer(BASE_DN1, BASE_DN2);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -259,8 +273,9 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -287,12 +302,13 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void nonEmptyDBTwoDSs() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     setCNIndexDBInitialRecords(msg1, msg2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
@@ -312,9 +328,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
@@ -329,10 +346,11 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(ADMIN_DATA_DN, serverId1);
     addReplica(BASE_DN1, serverId2);
     addReplica(BASE_DN1, serverId3);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     // cn=admin data will does not participate in the external changelog
@@ -350,8 +368,9 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -371,8 +390,9 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -390,9 +410,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -407,9 +428,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneGoingOffline() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -440,10 +462,11 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -473,12 +496,13 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
     initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
 
     // blocked until we receive info for serverId2
     assertExternalChangelogContent();
@@ -517,13 +541,14 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
     publishUpdateMsg(msg2, msg3);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     // MCP moves forward because serverId1 is not really offline
@@ -540,9 +565,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneKilled() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -562,10 +588,26 @@
 
   private void addReplica(DN baseDN, int serverId) throws Exception
   {
-    final SequentialDBCursor cursor = new SequentialDBCursor();
-    cursors.put(Pair.of(baseDN, serverId), cursor);
-    when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
-        .thenReturn(cursor);
+    final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
+    replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
+
+    if (isECLEnabledDomain2(baseDN))
+    {
+      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
+      if (domainDBCursor == null)
+      {
+        domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+        domainDBCursors.put(baseDN, domainDBCursor);
+
+        multiDomainCursor.addDomain(baseDN, null);
+        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+            .thenReturn(domainDBCursor);
+      }
+      domainDBCursor.addReplicaDB(serverId, null);
+      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+          .thenReturn(replicaDBCursor);
+    }
+
     when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
         getDomainNewestCSNs(baseDN));
     initialState.addServerIdToDomain(serverId, baseDN);
@@ -582,21 +624,26 @@
     return serverState;
   }
 
-  private void startCNIndexer(DN... eclEnabledDomains)
+  private void startCNIndexer()
   {
-    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
     cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
     {
       @Override
       protected boolean isECLEnabledDomain(DN baseDN)
       {
-        return eclEnabledDomainList.contains(baseDN);
+        return isECLEnabledDomain2(baseDN);
       }
+
     };
     cnIndexer.start();
     waitForWaitingState(cnIndexer);
   }
 
+  private boolean isECLEnabledDomain2(DN baseDN)
+  {
+    return eclEnabledDomains.contains(baseDN);
+  }
+
   private void stopCNIndexer() throws Exception
   {
     if (cnIndexer != null)
@@ -631,7 +678,8 @@
         final CSN csn = newestMsg.getCSN();
         when(cnIndexDB.getNewestRecord()).thenReturn(
             new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
-        final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
+        final SequentialDBCursor cursor =
+            replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
         cursor.add(newestMsg);
       }
       initialCookie.update(msg.getBaseDN(), msg.getCSN());
@@ -643,7 +691,7 @@
     for (ReplicatedUpdateMsg msg : msgs)
     {
       final SequentialDBCursor cursor =
-          cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
+          replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
       if (msg.isEmptyCursor())
       {
         cursor.add(null);
@@ -746,11 +794,4 @@
     };
   }
 
-  @Test(dataProvider = "precedingCSNDataProvider")
-  public void getPrecedingCSN(CSN start, CSN expected)
-  {
-    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
-    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
-    assertThat(precedingCSN).isEqualTo(expected);
-  }
 }
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 5f54b10..da8bcd0 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,8 +25,8 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collections;
+import java.util.Iterator;
 
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -45,6 +45,20 @@
 public class CompositeDBCursorTest extends DirectoryServerTestCase
 {
 
+  private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String>
+  {
+    @Override
+    protected void incorporateNewCursors() throws ChangelogException
+    {
+    }
+
+    @Override
+    protected Iterator<String> removedCursorsIterator()
+    {
+      return Collections.EMPTY_LIST.iterator();
+    }
+  }
+
   private UpdateMsg msg1;
   private UpdateMsg msg2;
   private UpdateMsg msg3;
@@ -173,8 +187,6 @@
         of(msg4, baseDN1));
   }
 
-  // TODO : this test fails because msg2 is returned twice
-  @Test(enabled=false)
   public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
   {
     final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -220,16 +232,12 @@
   private CompositeDBCursor<String> newCompositeDBCursor(
       Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
   {
-    final Map<DBCursor<UpdateMsg>, String> cursorsMap =
-        new HashMap<DBCursor<UpdateMsg>, String>();
+    final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor();
     for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
     {
-      // The cursors in the composite are expected to be pointing
-      // to first record available
-      pair.getFirst().next();
-      cursorsMap.put(pair.getFirst(), pair.getSecond());
+      cursor.addCursor(pair.getFirst(), pair.getSecond());
     }
-    return new CompositeDBCursor<String>(cursorsMap, true);
+    return cursor;
   }
 
   private void assertInOrder(final CompositeDBCursor<String> compCursor,

--
Gitblit v1.10.0