From 6531523345efd08282b1454dc06e572ae9eff848 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                   |  244 ++++++----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java   |   35 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                            |   37 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |  359 +++------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java                                  |  132 +++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java                             |  130 +++++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |  115 +++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java                               |  154 ++++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                               |  161 +++++-
 9 files changed, 829 insertions(+), 538 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 4639e7e..56b0509 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,10 @@
 package org.opends.server.replication.server.changelog.api;
 
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
 import org.opends.server.types.DN;
 
 /**
@@ -89,6 +91,26 @@
    */
   void removeDomain(DN baseDN) throws ChangelogException;
 
+  /**
+   * Generates a {@link DBCursor} across all the domains starting after the
+   * provided {@link MultiDomainServerState} for each domain.
+   * <p>
+   * When the cursor is not used anymore, client code MUST call the
+   * {@link DBCursor#close()} method to free the resources and locks used by the
+   * cursor.
+   *
+   * @param startAfterState
+   *          Starting point for each domain cursor. If any {@link ServerState}
+   *          for a domain is null, then start from the oldest CSN for each
+   *          replicaDBs
+   * @return a non null {@link DBCursor}
+   * @throws ChangelogException
+   *           If a database problem happened
+   * @see #getCursorFrom(DN, ServerState)
+   */
+  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+      throws ChangelogException;
+
   // serverId methods
 
   /**
@@ -102,16 +124,17 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @param startAfterServerState
+   * @param startAfterState
    *          Starting point for each ReplicaDB cursor. If any CSN for a
    *          replicaDB is null, then start from the oldest CSN for this
    *          replicaDB
    * @return a non null {@link DBCursor}
    * @throws ChangelogException
    *           If a database problem happened
+   * @see #getCursorFrom(DN, int, CSN)
    */
-  DBCursor<UpdateMsg> getCursorFrom(DN baseDN,
-      ServerState startAfterServerState) throws ChangelogException;
+  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+      throws ChangelogException;
 
   /**
    * Generates a {@link DBCursor} for one replicaDB for the specified
@@ -136,6 +159,14 @@
       throws ChangelogException;
 
   /**
+   * Unregisters the provided cursor from this replication domain.
+   *
+   * @param cursor
+   *          the cursor to unregister.
+   */
+  void unregisterCursor(DBCursor<?> cursor);
+
+  /**
    * Publishes the provided change to the changelog DB for the specified
    * serverId and replication domain. After a change has been successfully
    * published, it becomes available to be returned by the External ChangeLog.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7592fd9..a194d6d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -45,7 +46,8 @@
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.je.CompositeDBCursor;
+import org.opends.server.replication.server.changelog.je.DomainDBCursor;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
 import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
@@ -64,6 +66,7 @@
  */
 public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
 {
+  /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
   /**
@@ -77,11 +80,18 @@
    * <li>then check it's not null</li>
    * <li>then close all inside</li>
    * </ol>
-   * When creating a FileReplicaDB, synchronize on the domainMap to avoid
+   * When creating a replicaDB, synchronize on the domainMap to avoid
    * concurrent shutdown.
    */
-  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>>
-      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
   private ReplicationEnvironment replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -124,10 +134,10 @@
    *           if a problem occurs opening the supplied directory
    */
   public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
-     throws ConfigException
+      throws ConfigException
   {
-    this.replicationServer = replicationServer;
     this.config = config;
+    this.replicationServer = replicationServer;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
@@ -175,8 +185,7 @@
    *          the serverId for which to create a ReplicaDB
    * @param server
    *          the ReplicationServer
-   * @return a Pair with the FileReplicaDB and a boolean indicating whether it had
-   *         to be created
+   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
    * @throws ChangelogException
    *           if a problem occurred with the database
    */
@@ -189,6 +198,19 @@
       final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
       if (result != null)
       {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
         return result;
       }
     }
@@ -214,20 +236,26 @@
       // there was already a value associated to the key, let's use it
       return previousValue;
     }
+
+    // we just created a new domain => update all cursors
+    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    {
+      cursor.addDomain(baseDN, null);
+    }
     return newValue;
   }
 
   private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
       final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    // happy path: the FileReplicaDB already exists
+    // happy path: the replicaDB already exists
     FileReplicaDB currentValue = domainMap.get(serverId);
     if (currentValue != null)
     {
       return Pair.of(currentValue, false);
     }
 
-    // unlucky, the FileReplicaDB does not exist: take the hit and synchronize
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
@@ -242,7 +270,7 @@
         // The domainMap could have been concurrently removed because
         // 1) a shutdown was initiated or 2) an initialize was called.
         // Return will allow the code to:
-        // 1) shutdown properly or 2) lazily recreate the FileReplicaDB
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
         return null;
       }
 
@@ -371,6 +399,7 @@
       {
         // do nothing: we are already shutting down
       }
+
       replicationEnv.shutdown();
     }
 
@@ -381,10 +410,10 @@
   }
 
   /**
-   * Clears all records from the changelog (does not remove the log itself).
+   * Clears all records from the changelog (does not remove the changelog itself).
    *
    * @throws ChangelogException
-   *           If an error occurs when clearing the log.
+   *           If an error occurs when clearing the changelog.
    */
   public void clearDB() throws ChangelogException
   {
@@ -629,40 +658,57 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
-      throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
   {
-    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final MultiDomainServerState offlineReplicas = replicationEnv.getChangelogState().getOfflineReplicas();
-    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
-    for (int serverId : serverIds)
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
-      replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
-      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
     }
-    // recycle exhausted cursors,
-    // because client code will not manage the cursors itself
-    return new CompositeDBCursor<Void>(cursors, true);
+    return cursor;
   }
 
-  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
-      ServerState startAfterServerState)
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+      throws ChangelogException
   {
-    final ServerState domainState = offlineReplicas.getServerState(baseDN);
-    if (domainState != null)
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    for (int serverId : getDomainMap(baseDN).keySet())
     {
-      for (CSN offlineCSN : domainState)
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
       {
-        if (serverId == offlineCSN.getServerId()
-            && !startAfterServerState.cover(offlineCSN))
-        {
-          return offlineCSN;
-        }
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
       }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
     }
     return null;
   }
@@ -675,28 +721,55 @@
     final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      return replicaDB.generateCursorFrom(startAfterCSN);
+      final DBCursor<UpdateMsg> cursor =
+          replicaDB.generateCursorFrom(startAfterCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+      // TODO JNR if (offlineCSN != null) ??
+      // What about replicas that suddenly become offline?
+      return new ReplicaOfflineCursor(cursor, offlineCSN);
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
 
   /** {@inheritDoc} */
   @Override
+  public void unregisterCursor(final DBCursor<?> cursor)
+  {
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
   {
+    final CSN csn = updateMsg.getCSN();
     final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
-        updateMsg.getCSN().getServerId(), replicationServer);
+        csn.getServerId(), replicationServer);
     final FileReplicaDB replicaDB = pair.getFirst();
-    final boolean wasCreated = pair.getSecond();
-
     replicaDB.add(updateMsg);
+
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
-      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
-    return wasCreated;
+    return pair.getSecond(); // replica DB was created
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index deacf2a..68b9688 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,15 +29,8 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.*;
 
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.opends.messages.Message;
@@ -47,18 +40,15 @@
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
-import org.opends.server.util.StaticUtils;
-
-import com.forgerock.opendj.util.Pair;
 
 /**
  * Thread responsible for inserting replicated changes into the ChangeNumber
@@ -84,7 +74,7 @@
   private ChangelogState changelogState;
 
   /*
-   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
+   * The following MultiDomainServerState fields must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
    * updates 2) many updates can happen concurrently.
    */
@@ -130,39 +120,7 @@
    *
    * @NonNull
    */
-  @SuppressWarnings("unchecked")
-  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
-      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
-
-  /**
-   * New cursors for this Map must be created from the {@link #run()} method,
-   * i.e. from the same thread that will make use of them. If this rule is not
-   * obeyed, then a JE exception will be thrown about
-   * "Non-transactional Cursors may not be used in multiple threads;".
-   */
-  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
-      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
-  /**
-   * Holds the newCursors that will have to be created in the next iteration
-   * inside the {@link #run()} method.
-   * <p>
-   * This map can be updated by multiple threads.
-   */
-  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
-      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
-          new Comparator<Pair<DN, Integer>>()
-          {
-            @Override
-            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
-            {
-              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
-              if (compareBaseDN == 0)
-              {
-                return o1.getSecond().compareTo(o2.getSecond());
-              }
-              return compareBaseDN;
-            }
-          });
+  private MultiDomainDBCursor nextChangeForInsertDBCursor;
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -232,11 +190,8 @@
       return;
     }
 
-    final CSN csn = updateMsg.getCSN();
-    // only keep the oldest CSN that will be the new cursor's starting point
-    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
     final CSN oldestCSNBefore = getOldestLastAliveCSN();
-    lastAliveCSNs.update(baseDN, csn);
+    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
     tryNotify(oldestCSNBefore);
   }
 
@@ -381,28 +336,25 @@
     for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
-      if (!isECLEnabledDomain(baseDN))
+      if (isECLEnabledDomain(baseDN))
       {
-        continue;
-      }
+        for (Integer serverId : entry.getValue())
+        {
+          /*
+           * initialize with the oldest possible CSN in order for medium
+           * consistency to wait for all replicas to be alive before moving
+           * forward
+           */
+          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
+        }
 
-      for (Integer serverId : entry.getValue())
-      {
-        /*
-         * initialize with the oldest possible CSN in order for medium
-         * consistency to wait for all replicas to be alive before moving
-         * forward
-         */
-        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
-        // start after the actual CSN when initializing from the previous cookie
-        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
-        ensureCursorExists(baseDN, serverId, csn);
+        ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+        lastAliveCSNs.update(baseDN, latestKnownState);
       }
-
-      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
-      lastAliveCSNs.update(baseDN, latestKnownState);
     }
-    resetNextChangeForInsertDBCursor();
+
+    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+    nextChangeForInsertDBCursor.next();
 
     if (newestRecord != null)
     {
@@ -445,68 +397,6 @@
     return new CSN(0, 0, serverId);
   }
 
-  private void resetNextChangeForInsertDBCursor() throws ChangelogException
-  {
-    final Map<DBCursor<UpdateMsg>, DN> cursors =
-        new HashMap<DBCursor<UpdateMsg>, DN>();
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
-        : this.allCursors.entrySet())
-    {
-      for (Entry<Integer, DBCursor<UpdateMsg>> entry2
-          : entry.getValue().entrySet())
-      {
-        cursors.put(entry2.getValue(), entry.getKey());
-      }
-    }
-
-    // CNIndexer manages the cursor itself,
-    // so do not try to recycle exhausted cursors
-    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
-    result.next();
-    nextChangeForInsertDBCursor = result;
-  }
-
-  private boolean ensureCursorExists(DN baseDN, Integer serverId,
-      CSN startAfterCSN) throws ChangelogException
-  {
-    Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
-    if (map == null)
-    {
-      map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
-      allCursors.put(baseDN, map);
-    }
-    DBCursor<UpdateMsg> cursor = map.get(serverId);
-    if (cursor == null)
-    {
-      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
-      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
-      cursor.next();
-      map.put(serverId, cursor);
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Returns the immediately preceding CSN.
-   *
-   * @param csn
-   *          the CSN to use
-   * @return the immediately preceding CSN or null if the provided CSN is null.
-   */
-  CSN getPrecedingCSN(CSN csn)
-  {
-    if (csn == null)
-    {
-      return null;
-    }
-    if (csn.getSeqnum() > 0)
-    {
-      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
-    }
-    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
-  }
-
   /** {@inheritDoc} */
   @Override
   public void initiateShutdown()
@@ -535,28 +425,18 @@
       {
         try
         {
-          if (!domainsToClear.isEmpty())
+          while (!domainsToClear.isEmpty())
           {
-            while (!domainsToClear.isEmpty())
-            {
-              final DN baseDNToClear = domainsToClear.first();
-              removeCursors(baseDNToClear);
-              // Only release the waiting thread
-              // once this domain's state has been cleared.
-              domainsToClear.remove(baseDNToClear);
-            }
-            resetNextChangeForInsertDBCursor();
-          }
-          else
-          {
-            final boolean createdCursors = createNewCursors();
-            final boolean recycledCursors = recycleExhaustedCursors();
-            if (createdCursors || recycledCursors)
-            {
-              resetNextChangeForInsertDBCursor();
-            }
+            final DN baseDNToClear = domainsToClear.first();
+            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+            // Only release the waiting thread
+            // once this domain's state has been cleared.
+            domainsToClear.remove(baseDNToClear);
           }
 
+          // Do not call DBCursor.next() here
+          // because we might not have consumed the last record,
+          // for example if we could not move the MCP forward
           final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
           if (msg == null)
           {
@@ -568,8 +448,13 @@
               }
               wait();
             }
-            // loop to check whether new changes have been added to the
-            // ReplicaDBs
+            // check whether new changes have been added to the ReplicaDBs
+            nextChangeForInsertDBCursor.next();
+            continue;
+          }
+          else if (msg instanceof ReplicaOfflineMsg)
+          {
+            nextChangeForInsertDBCursor.next();
             continue;
           }
 
@@ -615,39 +500,44 @@
     }
     catch (RuntimeException e)
     {
-      // Nothing can be done about it.
-      // Rely on the DirectoryThread uncaught exceptions handler
-      // for logging error + alert.
-      // Message logged here gives corrective information to the administrator.
-      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
-          getClass().getSimpleName(), stackTraceToSingleLineString(e));
-      TRACER.debugError(msg.toString());
+      logUnexpectedException(e);
+      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
       throw e;
     }
     catch (Exception e)
     {
-      // Nothing can be done about it.
-      // Rely on the DirectoryThread uncaught exceptions handler
-      // for logging error + alert.
-      // Message logged here gives corrective information to the administrator.
-      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
-          getClass().getSimpleName(), stackTraceToSingleLineString(e));
-      TRACER.debugError(msg.toString());
+      logUnexpectedException(e);
+      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
       throw new RuntimeException(e);
     }
     finally
     {
-      removeCursors(DN.NULL_DN);
+      nextChangeForInsertDBCursor.close();
+      nextChangeForInsertDBCursor = null;
     }
   }
 
+  /**
+   * Nothing can be done about it.
+   * <p>
+   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
+   * alert.
+   * <p>
+   * Message logged here gives corrective information to the administrator.
+   */
+  private void logUnexpectedException(Exception e)
+  {
+    Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+        getClass().getSimpleName(), stackTraceToSingleLineString(e));
+    TRACER.debugError(msg.toString());
+  }
+
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
 
-    boolean callNextOnCursor = true;
     final int mcServerId = mcCSN.getServerId();
     final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
     final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -660,133 +550,22 @@
       }
       else if (offlineCSN.isOlderThan(mcCSN))
       {
-        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
-            pair = getCursor(mcBaseDN, mcCSN.getServerId());
-        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
-        if (iter != null && !iter.hasNext())
-        {
-          /*
-           * replica is not back online, Medium consistency point has gone past
-           * its last offline time, and there are no more changes after the
-           * offline CSN in the cursor: remove everything known about it:
-           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
-           * this replica from the medium consistency RUV.
-           */
-          iter.remove();
-          StaticUtils.close(pair.getFirst());
-          resetNextChangeForInsertDBCursor();
-          callNextOnCursor = false;
-          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
-          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
-        }
+        /*
+         * replica is not back online, Medium consistency point has gone past
+         * its last offline time, and there are no more changes after the
+         * offline CSN in the cursor: remove everything known about it:
+         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+         * this replica from the medium consistency RUV.
+         */
+        // TODO JNR how to close cursor for offline replica?
+        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
       }
     }
 
-    if (callNextOnCursor)
-    {
-      // advance the cursor we just read from,
-      // success/failure will be checked later
-      nextChangeForInsertDBCursor.next();
-    }
-  }
-
-  private void removeCursors(DN baseDN)
-  {
-    if (nextChangeForInsertDBCursor != null)
-    {
-      nextChangeForInsertDBCursor.close();
-      nextChangeForInsertDBCursor = null;
-    }
-    if (DN.NULL_DN.equals(baseDN))
-    {
-      // close all cursors
-      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
-      {
-        StaticUtils.close(map.values());
-      }
-      allCursors.clear();
-      newCursors.clear();
-    }
-    else
-    {
-      // close cursors for this DN
-      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
-      if (map != null)
-      {
-        StaticUtils.close(map.values());
-      }
-      for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
-      {
-        if (it.next().getFirst().equals(baseDN))
-        {
-          it.remove();
-        }
-      }
-    }
-  }
-
-  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
-      getCursor(final DN baseDN, final int serverId) throws ChangelogException
-  {
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
-        : allCursors.entrySet())
-    {
-      if (baseDN.equals(entry1.getKey()))
-      {
-        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
-            entry1.getValue().entrySet().iterator(); iter.hasNext();)
-        {
-          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
-          if (serverId == entry2.getKey())
-          {
-            return Pair.of(entry2.getValue(), iter);
-          }
-        }
-      }
-    }
-    return Pair.empty();
-  }
-
-  private boolean recycleExhaustedCursors() throws ChangelogException
-  {
-    boolean succesfullyRecycled = false;
-    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
-    {
-      for (DBCursor<UpdateMsg> cursor : map.values())
-      {
-        // try to recycle it by calling next()
-        if (cursor.getRecord() == null && cursor.next())
-        {
-          succesfullyRecycled = true;
-        }
-      }
-    }
-    return succesfullyRecycled;
-  }
-
-  private boolean createNewCursors() throws ChangelogException
-  {
-    if (!newCursors.isEmpty())
-    {
-      boolean newCursorAdded = false;
-      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
-          newCursors.entrySet().iterator(); iter.hasNext();)
-      {
-        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
-        final DN baseDN = entry.getKey().getFirst();
-        final CSN csn = entry.getValue();
-        // start after preceding CSN so the first CSN read will exactly be the
-        // current one
-        final CSN startFromCSN = getPrecedingCSN(csn);
-        if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
-        {
-          newCursorAdded = true;
-        }
-        iter.remove();
-      }
-      return newCursorAdded;
-    }
-    return false;
+    // advance the cursor we just read from,
+    // success/failure will be checked later
+    nextChangeForInsertDBCursor.next();
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index ccd27c0..7f271c1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
  * @param <Data>
  *          The type of data associated with each cursor
  */
-public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
   private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
    */
   private byte state = UNINITIALIZED;
 
-  /** Whether this composite should try to recycle exhausted cursors. */
-  private final boolean recycleExhaustedCursors;
   /**
    * These cursors are considered exhausted because they had no new changes the
    * last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
   /**
    * The cursors are sorted based on the current change of each cursor to
    * consider the next change across all available cursors.
+   * <p>
+   * New cursors for this Map must be created from the same thread that will
+   * make use of them. When this rule is not obeyed, a JE exception will be
+   * thrown about
+   * "Non-transactional Cursors may not be used in multiple threads;".
    */
-  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+  private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
       new TreeMap<DBCursor<UpdateMsg>, Data>(
           new Comparator<DBCursor<UpdateMsg>>()
           {
@@ -81,25 +84,6 @@
             }
           });
 
-  /**
-   * Builds a CompositeDBCursor using the provided collection of cursors.
-   *
-   * @param cursors
-   *          the cursors that will be iterated upon.
-   * @param recycleExhaustedCursors
-   *          whether a call to {@link #next()} tries to recycle exhausted
-   *          cursors
-   */
-  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
-      boolean recycleExhaustedCursors)
-  {
-    this.recycleExhaustedCursors = recycleExhaustedCursors;
-    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
-    {
-      put(entry);
-    }
-  }
-
   /** {@inheritDoc} */
   @Override
   public boolean next() throws ChangelogException
@@ -108,51 +92,75 @@
     {
       return false;
     }
-    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
-    state = READY;
-    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
+
+    if (state == UNINITIALIZED)
     {
-      // try to recycle empty cursors in case the underlying ReplicaDBs received
-      // new changes.
+      state = READY;
+    }
+    else
+    {
+      // Previous state was READY => we must advance the first cursor
+      // because the UpdateMsg it is pointing has already been consumed.
+      // To keep consistent the cursors' order in the SortedSet, it is necessary
+      // to remove the first cursor, then add it again after moving it forward.
+      final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry();
+      if (cursorToAdvance != null)
+      {
+        addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
+      }
+    }
+
+    recycleExhaustedCursors();
+    removeNoLongerNeededCursors();
+    incorporateNewCursors();
+    return !cursors.isEmpty();
+  }
+
+  private void recycleExhaustedCursors() throws ChangelogException
+  {
+    if (!exhaustedCursors.isEmpty())
+    {
+      // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
       final Map<DBCursor<UpdateMsg>, Data> copy =
           new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
       for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
       {
-        entry.getKey().next();
-        put(entry);
-      }
-      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
-      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
-      {
-        // if the first cursor was previously an exhausted cursor,
-        // then we have already called next() on it.
-        // Avoid calling it again because we know new changes have been found.
-        return true;
+        addCursor(entry.getKey(), entry.getValue());
       }
     }
-
-    // To keep consistent the cursors' order in the SortedSet, it is necessary
-    // to remove and add again the cursor after moving it forward.
-    if (advanceNonExhaustedCursors)
-    {
-      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
-      if (firstEntry != null)
-      {
-        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
-        cursor.next();
-        put(firstEntry);
-      }
-    }
-    // no cursors are left with changes.
-    return !cursors.isEmpty();
   }
 
-  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
+  private void removeNoLongerNeededCursors()
   {
-    final DBCursor<UpdateMsg> cursor = entry.getKey();
-    final Data data = entry.getValue();
-    if (cursor.getRecord() != null)
+    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
+        cursors.entrySet().iterator(); iterator.hasNext();)
+    {
+      final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
+      final Data data = entry.getValue();
+      if (isCursorNoLongerNeededFor(data))
+      {
+        entry.getKey().close();
+        iterator.remove();
+        cursorRemoved(data);
+      }
+    }
+  }
+
+  /**
+   * Adds a cursor to this composite cursor. It first calls
+   * {@link DBCursor#next()} to verify whether it is exhausted or not.
+   *
+   * @param cursor
+   *          the cursor to add to this composite
+   * @param data
+   *          the data associated to the provided cursor
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
+  {
+    if (cursor.next())
     {
       this.cursors.put(cursor, data);
     }
@@ -166,6 +174,8 @@
   @Override
   public UpdateMsg getRecord()
   {
+    // Cannot call incorporateNewCursors() here because
+    // somebody might have already called DBCursor.getRecord() and read the record
     final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
     if (entry != null)
     {
@@ -175,6 +185,33 @@
   }
 
   /**
+   * Called when implementors should incorporate new cursors into the current
+   * composite DBCursor. Implementors should call
+   * {@link #addCursor(DBCursor, Object)} to do so.
+   *
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  protected abstract void incorporateNewCursors() throws ChangelogException;
+
+  /**
+   * Returns whether the cursor associated to the provided data should be removed.
+   *
+   * @param data the data associated to the cursor to be tested
+   * @return true if the cursor associated to the provided data should be removed,
+   *         false otherwise
+   */
+  protected abstract boolean isCursorNoLongerNeededFor(Data data);
+
+  /**
+   * Notifies that the cursor associated to the provided data has been removed.
+   *
+   * @param data
+   *          the data associated to the removed cursor
+   */
+  protected abstract void cursorRemoved(Data data);
+
+  /**
    * Returns the data associated to the cursor that returned the current record.
    *
    * @return the data associated to the cursor that returned the current record.
@@ -193,8 +230,11 @@
   @Override
   public void close()
   {
+    state = CLOSED;
     StaticUtils.close(cursors.keySet());
     StaticUtils.close(exhaustedCursors.keySet());
+    cursors.clear();
+    exhaustedCursors.clear();
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
new file mode 100644
index 0000000..04620c5
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -0,0 +1,132 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a replication domain's replica DBs.
+ */
+public class DomainDBCursor extends CompositeDBCursor<Void>
+{
+
+  private final DN baseDN;
+  private final ReplicationDomainDB domainDB;
+
+  private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
+      new ConcurrentSkipListMap<Integer, CSN>();
+  /**
+   * Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
+   */
+  private static final CSN NULL_CSN = new CSN(0, 0, 0);
+
+  /**
+   * Builds a DomainDBCursor instance.
+   *
+   * @param baseDN
+   *          the replication domain baseDN of this cursor
+   * @param domainDB
+   *          the DB for the provided replication domain
+   */
+  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+  {
+    this.baseDN = baseDN;
+    this.domainDB = domainDB;
+  }
+
+  /**
+   * Returns the replication domain baseDN of this cursor.
+   *
+   * @return the replication domain baseDN of this cursor.
+   */
+  public DN getBaseDN()
+  {
+    return baseDN;
+  }
+
+  /**
+   * Adds a replicaDB for this cursor to iterate over. Added cursors will be
+   * created and iterated over on the next call to {@link #next()}.
+   *
+   * @param serverId
+   *          the serverId of the replica
+   * @param startAfterCSN
+   *          the CSN after which to start iterating
+   */
+  public void addReplicaDB(int serverId, CSN startAfterCSN)
+  {
+    // only keep the oldest CSN that will be the new cursor's starting point
+    newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void incorporateNewCursors() throws ChangelogException
+  {
+    for (Iterator<Entry<Integer, CSN>> iter = newReplicas.entrySet().iterator(); iter.hasNext();)
+    {
+      final Entry<Integer, CSN> pair = iter.next();
+      final int serverId = pair.getKey();
+      final CSN csn = pair.getValue();
+      final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
+      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+      addCursor(cursor, null);
+      iter.remove();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected boolean isCursorNoLongerNeededFor(Void data)
+  {
+    return false; // Not needed
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void cursorRemoved(Void data)
+  {
+    // Not used so far
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    super.close();
+    domainDB.unregisterCursor(this);
+    newReplicas.clear();
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 723b682..4ebee02 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,10 +29,10 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
@@ -76,13 +76,20 @@
    * <li>then check it's not null</li>
    * <li>then close all inside</li>
    * </ol>
-   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+   * When creating a replicaDB, synchronize on the domainMap to avoid
    * concurrent shutdown.
    */
-  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
-      domainToReplicaDBs = new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
-  private ReplicationDbEnv dbEnv;
-  private ReplicationServerCfg config;
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private ReplicationDbEnv replicationEnv;
+  private final ReplicationServerCfg config;
   private final File dbDirectory;
 
   /**
@@ -107,9 +114,9 @@
 
   /** The local replication server. */
   private final ReplicationServer replicationServer;
-  private AtomicBoolean shutdown = new AtomicBoolean();
+  private final AtomicBoolean shutdown = new AtomicBoolean();
 
-  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
+  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
       new DBCursor<UpdateMsg>()
   {
 
@@ -139,7 +146,7 @@
   };
 
   /**
-   * Builds an instance of this class.
+   * Creates a new changelog DB.
    *
    * @param replicationServer
    *          the local replication server.
@@ -148,15 +155,15 @@
    * @throws ConfigException
    *           if a problem occurs opening the supplied directory
    */
-  public JEChangelogDB(ReplicationServer replicationServer,
-      ReplicationServerCfg config) throws ConfigException
+  public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+      throws ConfigException
   {
     this.config = config;
     this.replicationServer = replicationServer;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
-  private File makeDir(String dbDirName) throws ConfigException
+  private File makeDir(final String dbDirName) throws ConfigException
   {
     // Check that this path exists or create it.
     final File dbDirectory = getFileForPath(dbDirName);
@@ -173,12 +180,9 @@
       if (debugEnabled())
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
 
-      final MessageBuilder mb = new MessageBuilder();
-      mb.append(e.getLocalizedMessage());
-      mb.append(" ");
-      mb.append(String.valueOf(dbDirectory));
-      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
-      throw new ConfigException(msg, e);
+      final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
+          .append(String.valueOf(dbDirectory));
+      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
     }
   }
 
@@ -223,35 +227,42 @@
    *          the serverId for which to create a ReplicaDB
    * @param server
    *          the ReplicationServer
-   * @return a Pair with the JEReplicaDB and a boolean indicating whether it had
-   *         to be created
+   * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
    * @throws ChangelogException
    *           if a problem occurred with the database
    */
-  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
-      int serverId, ReplicationServer server) throws ChangelogException
+  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+      final ReplicationServer server) throws ChangelogException
   {
     while (!shutdown.get())
     {
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
-          getExistingOrNewDomainMap(baseDN);
-      final Pair<JEReplicaDB, Boolean> result =
-          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+      final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
       if (result != null)
       {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
         return result;
       }
     }
-    throw new ChangelogException(
-        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
+    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
   }
 
-  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
-      DN baseDN)
+  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
   {
     // happy path: the domainMap already exists
-    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
-        domainToReplicaDBs.get(baseDN);
+    final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
     if (currentValue != null)
     {
       return currentValue;
@@ -260,30 +271,33 @@
     // unlucky, the domainMap does not exist: take the hit and create the
     // newValue, even though the same could be done concurrently by another
     // thread
-    final ConcurrentMap<Integer, JEReplicaDB> newValue =
-        new ConcurrentHashMap<Integer, JEReplicaDB>();
-    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
-        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
+    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
     if (previousValue != null)
     {
       // there was already a value associated to the key, let's use it
       return previousValue;
     }
+
+    // we just created a new domain => update all cursors
+    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    {
+      cursor.addDomain(baseDN, null);
+    }
     return newValue;
   }
 
-  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
-      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
-      DN baseDN, ReplicationServer server) throws ChangelogException
+  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
+      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    // happy path: the JEReplicaDB already exists
+    // happy path: the replicaDB already exists
     JEReplicaDB currentValue = domainMap.get(serverId);
     if (currentValue != null)
     {
       return Pair.of(currentValue, false);
     }
 
-    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
     // on the domainMap to create a new ReplicaDB
     synchronized (domainMap)
     {
@@ -299,11 +313,11 @@
         // The domainMap could have been concurrently removed because
         // 1) a shutdown was initiated or 2) an initialize was called.
         // Return will allow the code to:
-        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
         return null;
       }
 
-      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, dbEnv);
+      final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
       domainMap.put(serverId, newDB);
       return Pair.of(newDB, true);
     }
@@ -316,8 +330,8 @@
     try
     {
       final File dbDir = getFileForPath(config.getReplicationDBDirectory());
-      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
-      final ChangelogState changelogState = dbEnv.getChangelogState();
+      replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
+      final ChangelogState changelogState = replicationEnv.getChangelogState();
       initializeToChangelogState(changelogState);
       if (config.isComputeChangeNumber())
       {
@@ -351,7 +365,7 @@
     }
   }
 
-  private void shutdownCNIndexDB() throws ChangelogException
+  private void shutdownChangeNumberIndexDB() throws ChangelogException
   {
     synchronized (cnIndexDBLock)
     {
@@ -389,7 +403,7 @@
 
     try
     {
-      shutdownCNIndexDB();
+      shutdownChangeNumberIndexDB();
     }
     catch (ChangelogException e)
     {
@@ -410,7 +424,7 @@
       }
     }
 
-    if (dbEnv != null)
+    if (replicationEnv != null)
     {
       // wait for shutdown of the threads holding cursors
       try
@@ -429,7 +443,7 @@
         // do nothing: we are already shutting down
       }
 
-      dbEnv.shutdown();
+      replicationEnv.shutdown();
     }
 
     if (firstException != null)
@@ -439,11 +453,10 @@
   }
 
   /**
-   * Clears all content from the changelog database, but leaves its directory on
-   * the filesystem.
+   * Clears all records from the changelog (does not remove the changelog itself).
    *
    * @throws ChangelogException
-   *           If a database problem happened
+   *           If an error occurs when clearing the changelog.
    */
   public void clearDB() throws ChangelogException
   {
@@ -477,7 +490,7 @@
 
         try
         {
-          shutdownCNIndexDB();
+          shutdownChangeNumberIndexDB();
         }
         catch (ChangelogException e)
         {
@@ -593,7 +606,7 @@
     // 3- clear the changelogstate DB
     try
     {
-      dbEnv.clearGenerationId(baseDN);
+      replicationEnv.clearGenerationId(baseDN);
     }
     catch (ChangelogException e)
     {
@@ -644,7 +657,7 @@
   {
     if (computeChangeNumber)
     {
-      startIndexer(dbEnv.getChangelogState());
+      startIndexer(replicationEnv.getChangelogState());
     }
     else
     {
@@ -682,7 +695,7 @@
       {
         try
         {
-          cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
+          cnIndexDB = new JEChangeNumberIndexDB(this.replicationEnv);
         }
         catch (Exception e)
         {
@@ -704,75 +717,118 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterServerState)
-      throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
   {
-    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
-    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
-    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
-    for (int serverId : serverIds)
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
-      replicaDBCursor.next();
-      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
-      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
+      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
     }
-    // recycle exhausted cursors,
-    // because client code will not manage the cursors itself
-    return new CompositeDBCursor<Void>(cursors, true);
+    return cursor;
   }
 
-  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
-      ServerState startAfterServerState)
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
+      throws ChangelogException
   {
-    final ServerState domainState = offlineReplicas.getServerState(baseDN);
-    if (domainState != null)
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    for (int serverId : getDomainMap(baseDN).keySet())
     {
-      for (CSN offlineCSN : domainState)
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
       {
-        if (serverId == offlineCSN.getServerId()
-            && !startAfterServerState.cover(offlineCSN))
-        {
-          return offlineCSN;
-        }
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
       }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
     }
     return null;
   }
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startAfterCSN)
       throws ChangelogException
   {
     JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      return replicaDB.generateCursorFrom(startAfterCSN);
+      final DBCursor<UpdateMsg> cursor =
+          replicaDB.generateCursorFrom(startAfterCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+      // TODO JNR if (offlineCSN != null) ??
+      // What about replicas that suddenly become offline?
+      return new ReplicaOfflineCursor(cursor, offlineCSN);
     }
-    return EMPTY_CURSOR;
+    return EMPTY_CURSOR_REPLICA_DB;
   }
 
   /** {@inheritDoc} */
   @Override
-  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
-      throws ChangelogException
+  public void unregisterCursor(final DBCursor<?> cursor)
   {
-    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
-        updateMsg.getCSN().getServerId(), replicationServer);
-    final JEReplicaDB replicaDB = pair.getFirst();
-    final boolean wasCreated = pair.getSecond();
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
 
+  /** {@inheritDoc} */
+  @Override
+  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+  {
+    final CSN csn = updateMsg.getCSN();
+    final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+        csn.getServerId(), replicationServer);
+    final JEReplicaDB replicaDB = pair.getFirst();
     replicaDB.add(updateMsg);
+
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
-      notifyReplicaOnline(indexer, baseDN, updateMsg.getCSN().getServerId());
+      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
       indexer.publishUpdateMsg(baseDN, updateMsg);
     }
-    return wasCreated;
+    return pair.getSecond(); // replica DB was created
   }
 
   /** {@inheritDoc} */
@@ -792,7 +848,7 @@
   {
     if (indexer.isReplicaOffline(baseDN, serverId))
     {
-      dbEnv.notifyReplicaOnline(baseDN, serverId);
+      replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
   }
 
@@ -800,7 +856,7 @@
   @Override
   public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
   {
-    dbEnv.notifyReplicaOffline(baseDN, offlineCSN);
+    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
new file mode 100644
index 0000000..5e0fcb7
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -0,0 +1,130 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+/**
+ * Cursor iterating over a all the replication domain known to the changelog DB.
+ */
+public class MultiDomainDBCursor extends CompositeDBCursor<DN>
+{
+  private final ReplicationDomainDB domainDB;
+
+  private final ConcurrentSkipListMap<DN, ServerState> newDomains =
+      new ConcurrentSkipListMap<DN, ServerState>();
+  private final ConcurrentSkipListSet<DN> removeDomains =
+      new ConcurrentSkipListSet<DN>();
+
+  /**
+   * Builds a MultiDomainDBCursor instance.
+   *
+   * @param domainDB
+   *          the replication domain management DB
+   */
+  public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+  {
+    this.domainDB = domainDB;
+  }
+
+  /**
+   * Adds a replication domain for this cursor to iterate over. Added cursors
+   * will be created and iterated over on the next call to {@link #next()}.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   * @param startAfterState
+   *          the {@link ServerState} after which to start iterating
+   */
+  public void addDomain(DN baseDN, ServerState startAfterState)
+  {
+    newDomains.put(baseDN,
+        startAfterState != null ? startAfterState : new ServerState());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void incorporateNewCursors() throws ChangelogException
+  {
+    for (Iterator<Entry<DN, ServerState>> iter = newDomains.entrySet().iterator();
+         iter.hasNext();)
+    {
+      final Entry<DN, ServerState> entry = iter.next();
+      final DN baseDN = entry.getKey();
+      final ServerState serverState = entry.getValue();
+      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+      addCursor(domainDBCursor, baseDN);
+      iter.remove();
+    }
+  }
+
+  /**
+   * Removes a replication domain from this cursor and stops iterating over it.
+   * Removed cursors will be effectively removed on the next call to
+   * {@link #next()}.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   */
+  public void removeDomain(DN baseDN)
+  {
+    removeDomains.add(baseDN);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected boolean isCursorNoLongerNeededFor(DN baseDN)
+  {
+    return removeDomains.contains(baseDN);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  protected void cursorRemoved(DN baseDN)
+  {
+    removeDomains.remove(baseDN);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    super.close();
+    domainDB.unregisterCursor(this);
+    newDomains.clear();
+    removeDomains.clear();
+  }
+
+}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 4f7c836..5258d28 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -127,7 +127,11 @@
   private ChangeNumberIndexDB cnIndexDB;
   @Mock
   private ReplicationDomainDB domainDB;
-  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors;
+
+  private List<DN> eclEnabledDomains;
+  private MultiDomainDBCursor multiDomainCursor;
+  private Map<Pair<DN, Integer>, SequentialDBCursor> replicaDBCursors;
+  private Map<DN, DomainDBCursor> domainDBCursors;
   private ChangelogState initialState;
   private Map<DN, ServerState> domainNewestCSNs;
   private ChangeNumberIndexer cnIndexer;
@@ -152,13 +156,18 @@
   public void setup() throws Exception
   {
     MockitoAnnotations.initMocks(this);
-    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
-    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
 
+    multiDomainCursor = new MultiDomainDBCursor(domainDB);
     initialState = new ChangelogState();
     initialCookie = new MultiDomainServerState();
-    cursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+    replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+    domainDBCursors = new HashMap<DN, DomainDBCursor>();
     domainNewestCSNs = new HashMap<DN, ServerState>();
+
+    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
+    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+    when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
+        multiDomainCursor);
   }
 
   @AfterMethod
@@ -172,15 +181,17 @@
   @Test
   public void emptyDBNoDS() throws Exception
   {
-    startCNIndexer(BASE_DN1);
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
   }
 
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneDS() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -191,10 +202,11 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void nonEmptyDBOneDS() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     addReplica(BASE_DN1, serverId1);
     setCNIndexDBInitialRecords(msg1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
@@ -205,9 +217,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSs() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     // simulate messages received out of order
@@ -223,9 +236,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsDifferentDomains() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN2, serverId2);
-    startCNIndexer(BASE_DN1, BASE_DN2);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -258,8 +272,9 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -286,12 +301,13 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void nonEmptyDBTwoDSs() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     setCNIndexDBInitialRecords(msg1, msg2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
@@ -311,9 +327,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
@@ -328,10 +345,11 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(ADMIN_DATA_DN, serverId1);
     addReplica(BASE_DN1, serverId2);
     addReplica(BASE_DN1, serverId3);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     // cn=admin data will does not participate in the external changelog
@@ -349,8 +367,9 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -370,8 +389,9 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -389,9 +409,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -406,9 +427,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneGoingOffline() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -439,10 +461,11 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -472,12 +495,13 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
     publishUpdateMsg(msg1);
     initialState.addOfflineReplica(BASE_DN1, new CSN(2, 1, serverId1));
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
 
     // blocked until we receive info for serverId2
     assertExternalChangelogContent();
@@ -516,13 +540,14 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
     initialState.addOfflineReplica(BASE_DN1, new CSN(1, 1, serverId1));
     final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
     final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
     publishUpdateMsg(msg2, msg3);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     // MCP moves forward because serverId1 is not really offline
@@ -539,9 +564,10 @@
   @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
   public void emptyDBTwoDSsOneKilled() throws Exception
   {
+    eclEnabledDomains = Arrays.asList(BASE_DN1);
     addReplica(BASE_DN1, serverId1);
     addReplica(BASE_DN1, serverId2);
-    startCNIndexer(BASE_DN1);
+    startCNIndexer();
     assertExternalChangelogContent();
 
     final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
@@ -561,10 +587,26 @@
 
   private void addReplica(DN baseDN, int serverId) throws Exception
   {
-    final SequentialDBCursor cursor = new SequentialDBCursor();
-    cursors.put(Pair.of(baseDN, serverId), cursor);
-    when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
-        .thenReturn(cursor);
+    final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
+    replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
+
+    if (isECLEnabledDomain2(baseDN))
+    {
+      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
+      if (domainDBCursor == null)
+      {
+        domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+        domainDBCursors.put(baseDN, domainDBCursor);
+
+        multiDomainCursor.addDomain(baseDN, null);
+        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+            .thenReturn(domainDBCursor);
+      }
+      domainDBCursor.addReplicaDB(serverId, null);
+      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+          .thenReturn(replicaDBCursor);
+    }
+
     when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
         getDomainNewestCSNs(baseDN));
     initialState.addServerIdToDomain(serverId, baseDN);
@@ -581,21 +623,26 @@
     return serverState;
   }
 
-  private void startCNIndexer(DN... eclEnabledDomains)
+  private void startCNIndexer()
   {
-    final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
     cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
     {
       @Override
       protected boolean isECLEnabledDomain(DN baseDN)
       {
-        return eclEnabledDomainList.contains(baseDN);
+        return isECLEnabledDomain2(baseDN);
       }
+
     };
     cnIndexer.start();
     waitForWaitingState(cnIndexer);
   }
 
+  private boolean isECLEnabledDomain2(DN baseDN)
+  {
+    return eclEnabledDomains.contains(baseDN);
+  }
+
   private void stopCNIndexer() throws Exception
   {
     if (cnIndexer != null)
@@ -630,7 +677,8 @@
         final CSN csn = newestMsg.getCSN();
         when(cnIndexDB.getNewestRecord()).thenReturn(
             new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
-        final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
+        final SequentialDBCursor cursor =
+            replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
         cursor.add(newestMsg);
       }
       initialCookie.update(msg.getBaseDN(), msg.getCSN());
@@ -642,7 +690,7 @@
     for (ReplicatedUpdateMsg msg : msgs)
     {
       final SequentialDBCursor cursor =
-          cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
+          replicaDBCursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
       if (msg.isEmptyCursor())
       {
         cursor.add(null);
@@ -745,11 +793,4 @@
     };
   }
 
-  @Test(dataProvider = "precedingCSNDataProvider")
-  public void getPrecedingCSN(CSN start, CSN expected)
-  {
-    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState);
-    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
-    assertThat(precedingCSN).isEqualTo(expected);
-  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index d4c32b5..7c76a7a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,11 +25,7 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.opends.server.DirectoryServerTestCase;
-import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -46,6 +42,25 @@
 public class CompositeDBCursorTest extends DirectoryServerTestCase
 {
 
+  private final class ConcreteCompositeDBCursor extends CompositeDBCursor<String>
+  {
+    @Override
+    protected void incorporateNewCursors() throws ChangelogException
+    {
+    }
+
+    @Override
+    protected boolean isCursorNoLongerNeededFor(String data)
+    {
+      return false;
+    }
+
+    @Override
+    protected void cursorRemoved(String data)
+    {
+    }
+  }
+
   private UpdateMsg msg1;
   private UpdateMsg msg2;
   private UpdateMsg msg3;
@@ -174,8 +189,6 @@
         of(msg4, baseDN1));
   }
 
-  // TODO : this test fails because msg2 is returned twice
-  @Test(enabled=false)
   public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
   {
     final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -221,16 +234,12 @@
   private CompositeDBCursor<String> newCompositeDBCursor(
       Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
   {
-    final Map<DBCursor<UpdateMsg>, String> cursorsMap =
-        new HashMap<DBCursor<UpdateMsg>, String>();
+    final CompositeDBCursor<String> cursor = new ConcreteCompositeDBCursor();
     for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
     {
-      // The cursors in the composite are expected to be pointing
-      // to first record available
-      pair.getFirst().next();
-      cursorsMap.put(pair.getFirst(), pair.getSecond());
+      cursor.addCursor(pair.getFirst(), pair.getSecond());
     }
-    return new CompositeDBCursor<String>(cursorsMap, true);
+    return cursor;
   }
 
   private void assertInOrder(final CompositeDBCursor<String> compCursor,

--
Gitblit v1.10.0