From c5135432faf9bbbcd496ea160d59755fba31012c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 21 Nov 2013 16:17:00 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB 

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java      |   88 ++++
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java                               |    4 
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |  376 ++++++++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |  344 +++++++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java   |  145 +++---
 opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java                                       |   65 ++
 opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java                            |   34 +
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java                               |   92 ++-
 opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                            |   25 +
 9 files changed, 1,045 insertions(+), 128 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index 7e2cea2..537cc34 100644
--- a/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -69,8 +69,7 @@
    * @param mdss The provided string representation of the state.
    * @throws DirectoryException when the string has an invalid format
    */
-  public MultiDomainServerState(String mdss)
-          throws DirectoryException
+  public MultiDomainServerState(String mdss) throws DirectoryException
   {
     list = splitGenStateToServerStates(mdss);
   }
@@ -105,18 +104,13 @@
 
     synchronized(this)
     {
-      int serverId =  csn.getServerId();
       ServerState oldServerState = list.get(baseDN);
       if (oldServerState == null)
-        oldServerState = new ServerState();
-
-      if (csn.isNewerThan(oldServerState.getCSN(serverId)))
       {
-        oldServerState.update(csn);
+        oldServerState = new ServerState();
         list.put(baseDN, oldServerState);
-        return true;
       }
-      return false;
+      return oldServerState.update(csn);
     }
   }
 
@@ -132,7 +126,24 @@
    */
   public void update(DN baseDN, ServerState serverState)
   {
-    list.put(baseDN, serverState);
+    for (CSN csn : serverState)
+    {
+      update(baseDN, csn);
+    }
+  }
+
+  /**
+   * Update the current object with the provided multi domain server state.
+   *
+   * @param state
+   *          The provided multi domain server state.
+   */
+  public void update(MultiDomainServerState state)
+  {
+    for (Entry<DN, ServerState> entry : state.list.entrySet())
+    {
+      update(entry.getKey(), entry.getValue());
+    }
   }
 
   /**
@@ -142,7 +153,7 @@
   @Override
   public String toString()
   {
-    StringBuilder res = new StringBuilder();
+    final StringBuilder res = new StringBuilder();
     if (list != null && !list.isEmpty())
     {
       for (Entry<DN, ServerState> entry : list.entrySet())
@@ -163,7 +174,6 @@
     buffer.append(this);
   }
 
-
   /**
    * Tests if the state is empty.
    *
@@ -182,6 +192,19 @@
   }
 
   /**
+   * Returns the ServerState associated to the provided replication domain's
+   * baseDN.
+   *
+   * @param baseDN
+   *          the replication domain's baseDN
+   * @return the associated ServerState
+   */
+  public ServerState get(DN baseDN)
+  {
+    return list.get(baseDN);
+  }
+
+  /**
    * Test if this object equals the provided other object.
    * @param other The other object with which we want to test equality.
    * @return      Returns True if this equals other, else return false.
@@ -211,6 +234,22 @@
   }
 
   /**
+   * Test if this object covers the provided CSN for the provided baseDN.
+   *
+   * @param baseDN
+   *          The provided baseDN.
+   * @param csn
+   *          The provided CSN.
+   * @return true when this object covers the provided CSN for the provided
+   *         baseDN.
+   */
+  public boolean cover(DN baseDN, CSN csn)
+  {
+    final ServerState state = list.get(baseDN);
+    return state != null && state.cover(csn);
+  }
+
+  /**
    * Splits the provided generalizedServerState being a String with the
    * following syntax: "domain1:state1;domain2:state2;..." to a Map of (domain
    * DN, domain ServerState).
@@ -224,7 +263,7 @@
   public static Map<DN, ServerState> splitGenStateToServerStates(
       String multiDomainServerState) throws DirectoryException
   {
-    Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
+    final Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>();
     if (multiDomainServerState != null && multiDomainServerState.length() > 0)
     {
       try
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 39c637e..b92dc07 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -210,6 +210,31 @@
       ServerState startAfterServerState) throws ChangelogException;
 
   /**
+   * Generates a {@link DBCursor} for one replicaDB for the specified
+   * replication domain and serverId starting after the provided {@link CSN}.
+   * <p>
+   * The cursor is already advanced to the records after the CSN.
+   * <p>
+   * When the cursor is not used anymore, client code MUST call the
+   * {@link DBCursor#close()} method to free the resources and locks used by the
+   * cursor.
+   *
+   * @param baseDN
+   *          the replication domain baseDN of the replicaDB
+   * @param serverId
+   *          the serverId of the replicaDB
+   * @param startAfterCSN
+   *          Starting point for the ReplicaDB cursor. If the CSN is null, then
+   *          start from the oldest CSN for this replicaDB
+   * @return a non null {@link DBCursor}
+   * @throws ChangelogException
+   *           If a database problem happened
+   * @see #getCursorFrom(DN, CSN)
+   */
+  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
+      throws ChangelogException;
+
+  /**
    * Publishes the provided change to the changelog DB for the specified
    * serverId and replication domain. After a change has been successfully
    * published, it becomes available to be returned by the External ChangeLog.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
new file mode 100644
index 0000000..c82bc4e
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -0,0 +1,376 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
+/**
+ * Thread responsible for inserting replicated changes into the ChangeNumber
+ * Index DB (CNIndexDB for short). Only changes older than the medium
+ * consistency point are inserted in the CNIndexDB. As a consequence this class
+ * is also responsible for maintaining the medium consistency point.
+ */
+public class ChangeNumberIndexer extends DirectoryThread
+{
+  /** The tracer object for the debug logger. */
+  private static final DebugTracer TRACER = getTracer();
+
+  private final ChangelogDB changelogDB;
+  /** Only used for initialization, and then discarded. */
+  private ChangelogState changelogState;
+
+  /*
+   * previousCookie and mediumConsistencyPoint must be thread safe, because
+   * 1) initialization can happen while the replication server starts receiving
+   * updates 2) many updates can happen concurrently. This solution also avoids
+   * using a queue that could fill up before we have consumed all its content.
+   */
+  /**
+   * Stores the value of the cookie before the change currently processed is
+   * inserted in the DB. After insert, it is updated with the CSN of the change
+   * currently processed (thus becoming the "current" cookie just before the
+   * change is returned.
+   */
+  private final MultiDomainServerState previousCookie =
+      new MultiDomainServerState();
+
+  /**
+   * Holds the medium consistency point for the current replication server.
+   *
+   * @see <a href=
+   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
+   * >OpenDJ Domain Names for a description of what the medium consistency point
+   * is</a>
+   */
+  private final MultiDomainServerState mediumConsistencyPoint =
+      new MultiDomainServerState();
+
+  /**
+   * Composite cursor across all the replicaDBs for all the replication domains.
+   * It is volatile to ensure it supports concurrent update. Each time it is
+   * used more than once in a method, the method must take a local copy to
+   * ensure the cursor does not get updated in the middle of the method.
+   */
+  private volatile CompositeDBCursor<DN> crossDomainDBCursor;
+
+  /**
+   * New cursors for this Map must be created from the {@link #run()} method,
+   * i.e. from the same thread that will make use of them. If this rule is not
+   * obeyed, then a JE exception will be thrown about
+   * "Non-transactional Cursors may not be used in multiple threads;".
+   */
+  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
+      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
+  /** This map can be updated by multiple threads. */
+  private ConcurrentMap<Integer, DN> newCursors =
+      new ConcurrentSkipListMap<Integer, DN>();
+
+  /**
+   * Builds a ChangeNumberIndexer object.
+   *
+   * @param changelogDB
+   *          the changelogDB
+   * @param changelogState
+   *          the changelog state used for initialization
+   */
+  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+  {
+    super("Change number indexer");
+    this.changelogDB = changelogDB;
+    this.changelogState = changelogState;
+  }
+
+  /**
+   * Ensures the medium consistency point is updated by heartbeats.
+   *
+   * @param baseDN
+   *          the baseDN of the domain for which the heartbeat is published
+   * @param heartbeatCSN
+   *          the CSN coming from the heartbeat
+   */
+  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
+  {
+    mediumConsistencyPoint.update(baseDN, heartbeatCSN);
+    final CompositeDBCursor<DN> localCursor = crossDomainDBCursor;
+    final DN changeBaseDN = localCursor.getData();
+    final CSN changeCSN = localCursor.getRecord().getCSN();
+    tryNotify(changeBaseDN, changeCSN);
+  }
+
+  /**
+   * Ensures the medium consistency point is updated by UpdateMsg.
+   *
+   * @param baseDN
+   *          the baseDN of the domain for which the heartbeat is published
+   * @param updateMsg
+   *          the updateMsg that will update the medium consistency point
+   * @throws ChangelogException
+   *           If a database problem happened
+   */
+  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
+      throws ChangelogException
+  {
+    final CSN csn = updateMsg.getCSN();
+    mediumConsistencyPoint.update(baseDN, csn);
+    newCursors.put(csn.getServerId(), baseDN);
+    tryNotify(baseDN, csn);
+  }
+
+  /**
+   * Notifies the Change number indexer thread if it will be able to do some
+   * work.
+   */
+  private void tryNotify(final DN baseDN, final CSN csn)
+  {
+    if (mediumConsistencyPoint.cover(baseDN, csn))
+    {
+      synchronized (this)
+      {
+        notify();
+      }
+    }
+  }
+
+  private void initialize() throws ChangelogException, DirectoryException
+  {
+    final ChangeNumberIndexRecord newestRecord =
+        changelogDB.getChangeNumberIndexDB().getNewestRecord();
+    if (newestRecord != null)
+    {
+      previousCookie.update(
+          new MultiDomainServerState(newestRecord.getPreviousCookie()));
+    }
+
+    // initialize the cross domain DB cursor
+    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+    for (Entry<DN, List<Integer>> entry
+        : changelogState.getDomainToServerIds().entrySet())
+    {
+      final DN baseDN = entry.getKey();
+      for (Integer serverId : entry.getValue())
+      {
+        final ServerState previousSS = previousCookie.get(baseDN);
+        final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null;
+        ensureCursorExists(baseDN, serverId, csn);
+      }
+
+      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+      mediumConsistencyPoint.update(baseDN, latestKnownState);
+    }
+
+    crossDomainDBCursor = newCompositeDBCursor();
+    if (newestRecord != null)
+    {
+      // restore the "previousCookie" state before shutdown
+      final UpdateMsg record = crossDomainDBCursor.getRecord();
+      if (!record.getCSN().equals(newestRecord.getCSN()))
+      {
+        // TODO JNR remove
+        throw new RuntimeException("They do not equal! recordCSN="
+            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN());
+      }
+      // TODO JNR is it possible to use the following line instead?
+      // previousCookie.update(newestRecord.getBaseDN(), record.getCSN());
+      // TODO JNR would this mean updating the if above?
+      previousCookie.update(crossDomainDBCursor.getData(), record.getCSN());
+      crossDomainDBCursor.next();
+    }
+
+    // this will not be used any more. Discard for garbage collection.
+    this.changelogState = null;
+  }
+
+  private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException
+  {
+    final Map<DBCursor<UpdateMsg>, DN> cursors =
+        new HashMap<DBCursor<UpdateMsg>, DN>();
+    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
+        : this.allCursors.entrySet())
+    {
+      for (Entry<Integer, DBCursor<UpdateMsg>> entry2
+          : entry.getValue().entrySet())
+      {
+        cursors.put(entry2.getValue(), entry.getKey());
+      }
+    }
+    final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
+    result.next();
+    return result;
+  }
+
+  private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn)
+      throws ChangelogException
+  {
+    Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
+    if (map == null)
+    {
+      map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
+      allCursors.put(baseDN, map);
+    }
+    DBCursor<UpdateMsg> cursor = map.get(serverId);
+    if (cursor == null)
+    {
+      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+      cursor = domainDB.getCursorFrom(baseDN, serverId, csn);
+      map.put(serverId, cursor);
+      return false;
+    }
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void run()
+  {
+    try
+    {
+      /*
+       * initialize here to allow fast application start up and avoid errors due
+       * cursors being created in a different thread to the one where they are
+       * used.
+       */
+      initialize();
+    }
+    catch (DirectoryException e)
+    {
+      // TODO JNR error message i18n
+      if (debugEnabled())
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      return;
+    }
+    catch (ChangelogException e)
+    {
+      // TODO Auto-generated catch block
+      if (debugEnabled())
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      return;
+    }
+
+    while (!isShutdownInitiated())
+    {
+      try
+      {
+        createNewCursors();
+
+        final UpdateMsg msg = crossDomainDBCursor.getRecord();
+        if (msg == null)
+        {
+          synchronized (this)
+          {
+            wait();
+          }
+          // advance cursor, success/failure will be checked later
+          crossDomainDBCursor.next();
+          // loop to check whether new changes have been added to the ReplicaDBs
+          continue;
+        }
+
+        final CSN csn = msg.getCSN();
+        final DN baseDN = crossDomainDBCursor.getData();
+        // FIXME problem: what if the serverId is not part of the ServerState?
+        // right now, thread will be blocked
+        if (!mediumConsistencyPoint.cover(baseDN, csn))
+        {
+          // the oldest record to insert is newer than the medium consistency
+          // point. Let's wait for a change that can be published.
+          synchronized (this)
+          {
+            // double check to protect against a missed call to notify()
+            if (!mediumConsistencyPoint.cover(baseDN, csn))
+            {
+              wait();
+              // loop to check if changes older than the medium consistency
+              // point have been added to the ReplicaDBs
+              continue;
+            }
+          }
+        }
+
+        // OK, the oldest change is older than the medium consistency point
+        // let's publish it to the CNIndexDB
+        final ChangeNumberIndexRecord record =
+            new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn);
+        changelogDB.getChangeNumberIndexDB().addRecord(record);
+        // update, so it becomes the previous cookie for the next change
+        previousCookie.update(baseDN, csn);
+
+        // advance cursor, success/failure will be checked later
+        crossDomainDBCursor.next();
+      }
+      catch (ChangelogException e)
+      {
+        if (debugEnabled())
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        // TODO JNR error message i18n
+      }
+      catch (InterruptedException ignored)
+      {
+        // was shutdown called?
+      }
+    }
+  }
+
+  private void createNewCursors() throws ChangelogException
+  {
+    if (!newCursors.isEmpty())
+    {
+      boolean newCursorAdded = false;
+      for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator();
+          iter.hasNext();)
+      {
+        final Entry<Integer, DN> entry = iter.next();
+        if (!ensureCursorExists(entry.getValue(), entry.getKey(), null))
+        {
+          newCursorAdded = true;
+        }
+        iter.remove();
+      }
+      if (newCursorAdded)
+      {
+        crossDomainDBCursor = newCompositeDBCursor();
+      }
+    }
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 7f8e3c3..2e0e7c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -27,6 +27,7 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.util.*;
+import java.util.Map.Entry;
 
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -38,28 +39,33 @@
  * {@link DBCursor} implementation that iterates across a Collection of
  * {@link DBCursor}s, advancing from the oldest to the newest change cross all
  * cursors.
+ *
+ * @param <Data>
+ *          The type of data associated with each cursor
  */
-final class CompositeDBCursor implements DBCursor<UpdateMsg>
+final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
-  private UpdateMsg currentChange;
-  private final List<DBCursor<UpdateMsg>> exhaustedCursors =
-      new ArrayList<DBCursor<UpdateMsg>>();
+  private UpdateMsg currentRecord;
+  private Data currentData;
+  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
+      new HashMap<DBCursor<UpdateMsg>, Data>();
   /**
    * The cursors are sorted based on the current change of each cursor to
    * consider the next change across all available cursors.
    */
-  private final NavigableSet<DBCursor<UpdateMsg>> cursors =
-      new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
-      {
-        @Override
-        public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
-        {
-          final CSN csn1 = o1.getRecord().getCSN();
-          final CSN csn2 = o2.getRecord().getCSN();
-          return CSN.compare(csn1, csn2);
-        }
-      });
+  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+      new TreeMap<DBCursor<UpdateMsg>, Data>(
+          new Comparator<DBCursor<UpdateMsg>>()
+          {
+            @Override
+            public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
+            {
+              final CSN csn1 = o1.getRecord().getCSN();
+              final CSN csn2 = o2.getRecord().getCSN();
+              return CSN.compare(csn1, csn2);
+            }
+          });
 
   /**
    * Builds a CompositeDBCursor using the provided collection of cursors.
@@ -67,11 +73,11 @@
    * @param cursors
    *          the cursors that will be iterated upon.
    */
-  public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
+  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
   {
-    for (DBCursor<UpdateMsg> cursor : cursors)
+    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
     {
-      add(cursor);
+      put(entry);
     }
   }
 
@@ -83,41 +89,46 @@
     {
       // try to recycle empty cursors in case the underlying ReplicaDBs received
       // new changes. Copy the List to avoid ConcurrentModificationExceptions.
-      final DBCursor<UpdateMsg>[] copy =
-          exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]);
+      final Map<DBCursor<UpdateMsg>, Data> copy =
+          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
-      for (DBCursor<UpdateMsg> cursor : copy)
+      for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
       {
-        cursor.next();
-        add(cursor);
+        entry.getKey().next();
+        put(entry);
       }
     }
 
     if (cursors.isEmpty())
     {
       // no cursors are left with changes.
-      currentChange = null;
+      currentRecord = null;
+      currentData = null;
       return false;
     }
 
     // To keep consistent the cursors' order in the SortedSet, it is necessary
     // to remove and eventually add again a cursor (after moving it forward).
-    final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
-    currentChange = cursor.getRecord();
+    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
+    final DBCursor<UpdateMsg> cursor = entry.getKey();
+    currentRecord = cursor.getRecord();
+    currentData = entry.getValue();
     cursor.next();
-    add(cursor);
+    put(entry);
     return true;
   }
 
-  private void add(DBCursor<UpdateMsg> cursor)
+  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
   {
+    final DBCursor<UpdateMsg> cursor = entry.getKey();
+    final Data data = entry.getValue();
     if (cursor.getRecord() != null)
     {
-      this.cursors.add(cursor);
+      this.cursors.put(cursor, data);
     }
     else
     {
-      this.exhaustedCursors.add(cursor);
+      this.exhaustedCursors.put(cursor, data);
     }
   }
 
@@ -125,23 +136,34 @@
   @Override
   public UpdateMsg getRecord()
   {
-    return currentChange;
+    return currentRecord;
+  }
+
+  /**
+   * Returns the data associated to the cursor that returned the current record.
+   *
+   * @return the data associated to the cursor that returned the current record.
+   */
+  public Data getData()
+  {
+    return currentData;
   }
 
   /** {@inheritDoc} */
   @Override
   public void close()
   {
-    StaticUtils.close(cursors);
-    StaticUtils.close(exhaustedCursors);
+    StaticUtils.close(cursors.keySet());
+    StaticUtils.close(exhaustedCursors.keySet());
   }
 
   /** {@inheritDoc} */
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " currentChange=" + currentChange
-        + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
+    return getClass().getSimpleName() + " currentRecord=" + currentRecord
+        + " currentData=" + currentData + " openCursors=" + cursors
+        + " exhaustedCursors=" + exhaustedCursors;
   }
 
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 0b1c599..f502a7c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -154,7 +154,7 @@
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " currentChange=" + currentChange + ""
-        + replicaDB;
+    return getClass().getSimpleName() + " currentChange=" + currentChange
+        + " replicaDB=" + replicaDB;
   }
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
new file mode 100644
index 0000000..3754ac4
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -0,0 +1,344 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.lang.Thread.State;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.mockito.ArgumentCaptor;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+import org.opends.server.util.Pair;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+@SuppressWarnings("javadoc")
+public class ChangeNumberIndexerTest extends DirectoryServerTestCase
+{
+
+  private static final class ReplicatedUpdateMsg extends UpdateMsg
+  {
+
+    private final DN baseDN;
+    private final boolean emptyCursor;
+
+    public ReplicatedUpdateMsg(DN baseDN, CSN csn)
+    {
+      this(baseDN, csn, false);
+    }
+
+    public ReplicatedUpdateMsg(DN baseDN, CSN csn, boolean emptyCursor)
+    {
+      super(csn, null);
+      this.baseDN = baseDN;
+      this.emptyCursor = emptyCursor;
+    }
+
+    public DN getBaseDN()
+    {
+      return baseDN;
+    }
+
+    public boolean isEmptyCursor()
+    {
+      return emptyCursor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return "csn=" + getCSN() + ", baseDN=" + baseDN;
+    }
+  }
+
+  private static DN BASE_DN;
+  private static final int serverId1 = 101;
+  private static final int serverId2 = 102;
+
+  private ChangelogDB changelogDB;
+  private ChangeNumberIndexDB cnIndexDB;
+  private ReplicationDomainDB domainDB;
+  private Map<Pair<DN, Integer>, SequentialDBCursor> cursors =
+      new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
+  private ChangelogState initialState;
+  private ChangeNumberIndexer indexer;
+  private MultiDomainServerState previousCookie;
+
+  @BeforeClass
+  public static void classSetup() throws Exception
+  {
+    TestCaseUtils.startFakeServer();
+    BASE_DN = DN.decode("dc=example,dc=com");
+  }
+
+  @AfterClass
+  public static void classTearDown() throws Exception
+  {
+    TestCaseUtils.shutdownFakeServer();
+  }
+
+  @BeforeMethod
+  public void setup() throws Exception
+  {
+    changelogDB = mock(ChangelogDB.class);
+    cnIndexDB = mock(ChangeNumberIndexDB.class);
+    domainDB = mock(ReplicationDomainDB.class);
+    when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
+    when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
+
+    initialState = new ChangelogState();
+    previousCookie = new MultiDomainServerState();
+  }
+
+
+  private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
+
+  @Test
+  public void emptyDBNoDS() throws Exception
+  {
+    startIndexer();
+    verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBOneInitialDS() throws Exception
+  {
+    addReplica(BASE_DN, serverId1);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    publishUpdateMsg(msg1);
+
+    assertAddedRecords(msg1);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void nonEmptyDBOneInitialDS() throws Exception
+  {
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    addReplica(BASE_DN, serverId1);
+    setDBInitialRecords(msg1);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId1, 2);
+    publishUpdateMsg(msg2);
+
+    assertAddedRecords(msg2);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoInitialDSs() throws Exception
+  {
+    addReplica(BASE_DN, serverId1);
+    addReplica(BASE_DN, serverId2);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+    publishUpdateMsg(msg2, msg1);
+
+    assertAddedRecords(msg1, msg2);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void nonEmptyDBTwoInitialDSs() throws Exception
+  {
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+    addReplica(BASE_DN, serverId1);
+    addReplica(BASE_DN, serverId2);
+    setDBInitialRecords(msg1, msg2);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg3 = msg(BASE_DN, serverId2, 3);
+    final ReplicatedUpdateMsg msg4 = msg(BASE_DN, serverId1, 4);
+    publishUpdateMsg(msg3, msg4);
+
+    assertAddedRecords(msg3, msg4);
+  }
+
+  @Test(enabled = false, dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBTwoCursorsOneEmptyForSomeTime() throws Exception
+  {
+    // TODO JNR make this tests work
+    addReplica(BASE_DN, serverId1);
+    addReplica(BASE_DN, serverId2);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN, serverId2, 1);
+    final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN, serverId2);
+    final ReplicatedUpdateMsg msg2Sid1 = msg(BASE_DN, serverId1, 2);
+    final ReplicatedUpdateMsg msg3Sid2 = msg(BASE_DN, serverId2, 3);
+    // simulate no messages received during some time for replica 2
+    publishUpdateMsg(msg1Sid2, emptySid2, emptySid2, emptySid2, msg3Sid2, msg2Sid1);
+
+    assertAddedRecords(msg1Sid2, msg2Sid1, msg3Sid2);
+  }
+
+  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+  public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
+  {
+    addReplica(BASE_DN, serverId1);
+    startIndexer();
+
+    final ReplicatedUpdateMsg msg1 = msg(BASE_DN, serverId1, 1);
+    publishUpdateMsg(msg1);
+
+    addReplica(BASE_DN, serverId2);
+    final ReplicatedUpdateMsg msg2 = msg(BASE_DN, serverId2, 2);
+    publishUpdateMsg(msg2);
+
+    assertAddedRecords(msg1, msg2);
+  }
+
+  private void addReplica(DN baseDN, int serverId) throws Exception
+  {
+    final SequentialDBCursor cursor = new SequentialDBCursor();
+    cursors.put(Pair.of(baseDN, serverId), cursor);
+    when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+        .thenReturn(cursor);
+    when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(new ServerState());
+    initialState.addServerIdToDomain(serverId, baseDN);
+  }
+
+  private void startIndexer()
+  {
+    indexer = new ChangeNumberIndexer(changelogDB, initialState);
+    indexer.start();
+    waitForWaitingState(indexer);
+  }
+
+  private ReplicatedUpdateMsg msg(DN baseDN, int serverId, long time)
+  {
+    return new ReplicatedUpdateMsg(baseDN, new CSN(time, 0, serverId));
+  }
+
+  private ReplicatedUpdateMsg emptyCursor(DN baseDN, int serverId)
+  {
+    return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
+  }
+
+  private void setDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
+  {
+    // Initialize the previous cookie that will be used to compare the records
+    // added to the CNIndexDB at the end of this test
+    for (int i = 0; i < msgs.length; i++)
+    {
+      ReplicatedUpdateMsg msg = msgs[i];
+      if (i + 1 == msgs.length)
+      {
+        final ReplicatedUpdateMsg newestMsg = msg;
+        final DN baseDN = newestMsg.getBaseDN();
+        final CSN csn = newestMsg.getCSN();
+        when(cnIndexDB.getNewestRecord()).thenReturn(
+            new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn));
+        final SequentialDBCursor cursor =
+            cursors.get(Pair.of(baseDN, csn.getServerId()));
+        cursor.add(newestMsg);
+        cursor.next(); // simulate the cursor had been initialized with this change
+      }
+      previousCookie.update(msg.getBaseDN(), msg.getCSN());
+    }
+  }
+
+  private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception
+  {
+    for (ReplicatedUpdateMsg msg : msgs)
+    {
+      final SequentialDBCursor cursor =
+          cursors.get(Pair.of(msg.getBaseDN(), msg.getCSN().getServerId()));
+      if (msg.isEmptyCursor())
+      {
+        cursor.add(null);
+      }
+      else
+      {
+        cursor.add(msg);
+      }
+    }
+    for (ReplicatedUpdateMsg msg : msgs)
+    {
+      if (!msg.isEmptyCursor())
+      {
+        indexer.publishUpdateMsg(msg.getBaseDN(), msg);
+      }
+    }
+    waitForWaitingState(indexer);
+  }
+
+  private void waitForWaitingState(final Thread t)
+  {
+    State state = t.getState();
+    while (!state.equals(State.WAITING) && !state.equals(State.TERMINATED))
+    {
+      Thread.yield();
+      state = t.getState();
+    }
+    assertThat(state).isEqualTo(State.WAITING);
+  }
+
+  private void assertAddedRecords(ReplicatedUpdateMsg... msgs) throws Exception
+  {
+    final ArgumentCaptor<ChangeNumberIndexRecord> arg =
+        ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
+    verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
+    final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
+
+    // recheck it was not called more than expected
+    assertThat(allValues).hasSameSizeAs(msgs);
+    for (int i = 0; i < msgs.length; i++)
+    {
+      final ReplicatedUpdateMsg msg = msgs[i];
+      final ChangeNumberIndexRecord record = allValues.get(i);
+      // check content in order
+      String description = "expected: <" + msg + ">, but got: <" + record + ">";
+      assertThat(record.getBaseDN()).as(description).isEqualTo(msg.getBaseDN());
+      assertThat(record.getCSN()).as(description).isEqualTo(msg.getCSN());
+      assertThat(record.getPreviousCookie()).as(description).isEqualTo(previousCookie.toString());
+      previousCookie.update(msg.getBaseDN(), msg.getCSN());
+    }
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 46354c9..2f0a47e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -26,69 +26,31 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.util.Pair;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.opends.server.util.Pair.*;
 import static org.testng.Assert.*;
 
-@SuppressWarnings("javadoc")
+@SuppressWarnings({ "javadoc", "unchecked" })
 public class CompositeDBCursorTest extends DirectoryServerTestCase
 {
 
-  private static class MyDBCursor implements DBCursor<UpdateMsg>
-  {
-
-    private final List<UpdateMsg> msgs;
-    private UpdateMsg current;
-
-    public MyDBCursor(UpdateMsg... msgs)
-    {
-      this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
-      next();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public UpdateMsg getRecord()
-    {
-      return this.current;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean next()
-    {
-      if (!this.msgs.isEmpty())
-      {
-        this.current = this.msgs.remove(0);
-        return true;
-      }
-      this.current = null;
-      return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void close()
-    {
-      // nothing to do
-    }
-
-  }
-
   private UpdateMsg msg1;
   private UpdateMsg msg2;
   private UpdateMsg msg3;
   private UpdateMsg msg4;
+  private String baseDN1 = "dc=forgerock,dc=com";
+  private String baseDN2 = "dc=example,dc=com";
 
   @BeforeClass
   public void setupMsgs()
@@ -102,60 +64,89 @@
   @Test
   public void emptyCursor() throws Exception
   {
-    final CompositeDBCursor compCursor = newCompositeDBCursor(new MyDBCursor());
+    final CompositeDBCursor<String> compCursor =
+        newCompositeDBCursor(of(new SequentialDBCursor(), baseDN1));
     assertInOrder(compCursor);
   }
 
   @Test
   public void oneElementCursor() throws Exception
   {
-    final CompositeDBCursor compCursor =
-        newCompositeDBCursor(new MyDBCursor(msg1));
-    assertInOrder(compCursor, msg1);
+    final CompositeDBCursor<String> compCursor =
+        newCompositeDBCursor(of(new SequentialDBCursor(msg1), baseDN1));
+    assertInOrder(compCursor, of(msg1, baseDN1));
   }
 
   @Test
   public void twoElementsCursor() throws Exception
   {
-    final CompositeDBCursor compCursor =
-        newCompositeDBCursor(new MyDBCursor(msg1, msg2));
-    assertInOrder(compCursor, msg1, msg2);
+    final CompositeDBCursor<String> compCursor =
+        newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2), baseDN1));
+    assertInOrder(compCursor,
+        of(msg1, baseDN1),
+        of(msg2, baseDN1));
   }
 
   @Test
   public void twoEmptyCursors() throws Exception
   {
-    final CompositeDBCursor compCursor = newCompositeDBCursor(
-        new MyDBCursor(),
-        new MyDBCursor());
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(), baseDN1),
+        of(new SequentialDBCursor(), baseDN2));
     assertInOrder(compCursor);
   }
 
   @Test
   public void twoOneElementCursors() throws Exception
   {
-    final CompositeDBCursor compCursor = newCompositeDBCursor(
-        new MyDBCursor(msg2),
-        new MyDBCursor(msg1));
-    assertInOrder(compCursor, msg1, msg2);
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg2), baseDN1),
+        of(new SequentialDBCursor(msg1), baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg2, baseDN1));
   }
 
   @Test
   public void twoTwoElementCursors() throws Exception
   {
-    final CompositeDBCursor compCursor = newCompositeDBCursor(
-        new MyDBCursor(msg2, msg3),
-        new MyDBCursor(msg1, msg4));
-    assertInOrder(compCursor, msg1, msg2, msg3, msg4);
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg2, msg3), baseDN1),
+        of(new SequentialDBCursor(msg1, msg4), baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg2, baseDN1),
+        of(msg3, baseDN1),
+        of(msg4, baseDN2));
   }
 
   @Test
   public void recycleTwoElementCursors() throws Exception
   {
-    final CompositeDBCursor compCursor = newCompositeDBCursor(
-        new MyDBCursor(msg2, null, msg3),
-        new MyDBCursor(null, msg1, msg4));
-    assertInOrder(compCursor, msg1, msg2, msg3, msg4);
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg2, null, msg3), baseDN1),
+        of(new SequentialDBCursor(null, msg1, msg4), baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg2, baseDN1),
+        of(msg3, baseDN1),
+        of(msg4, baseDN2));
+  }
+
+  @Test
+  public void recycleTwoElementCursorsTODOJNR() throws Exception
+  {
+    SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3);
+    SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4);
+    cursor1.next();
+    cursor2.next();
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(cursor1, baseDN1),
+        of(cursor2, baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg3, baseDN1),
+        of(msg4, baseDN2));
   }
 
   private UpdateMsg newUpdateMsg(int t)
@@ -163,18 +154,26 @@
     return new UpdateMsg(new CSN(t, t, t), new byte[t]);
   }
 
-  private CompositeDBCursor newCompositeDBCursor(DBCursor<UpdateMsg>... cursors)
+  private CompositeDBCursor<String> newCompositeDBCursor(
+      Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
   {
-    return new CompositeDBCursor(Arrays.asList(cursors));
+    final Map<DBCursor<UpdateMsg>, String> cursorsMap =
+        new HashMap<DBCursor<UpdateMsg>, String>();
+    for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
+    {
+      cursorsMap.put(pair.getFirst(), pair.getSecond());
+    }
+    return new CompositeDBCursor<String>(cursorsMap);
   }
 
-  private void assertInOrder(final CompositeDBCursor compCursor,
-      UpdateMsg... msgs) throws ChangelogException
+  private void assertInOrder(final CompositeDBCursor<String> compCursor,
+      Pair<UpdateMsg, String>... expecteds) throws ChangelogException
   {
-    for (UpdateMsg msg : msgs)
+    for (final Pair<UpdateMsg, String> expected : expecteds)
     {
       assertTrue(compCursor.next());
-      assertSame(compCursor.getRecord(), msg);
+      assertSame(compCursor.getRecord(), expected.getFirst());
+      assertSame(compCursor.getData(), expected.getSecond());
     }
     assertFalse(compCursor.next());
     compCursor.close();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
new file mode 100644
index 0000000..063fd27
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -0,0 +1,88 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+@SuppressWarnings("javadoc")
+class SequentialDBCursor implements DBCursor<UpdateMsg>
+{
+
+  private final List<UpdateMsg> msgs;
+  private UpdateMsg current;
+
+  public SequentialDBCursor(UpdateMsg... msgs)
+  {
+    this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
+    next();
+  }
+
+  public void add(UpdateMsg msg)
+  {
+    this.msgs.add(msg);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public UpdateMsg getRecord()
+  {
+    return current;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next()
+  {
+    if (!msgs.isEmpty())
+    {
+      current = msgs.remove(0);
+      return current != null;
+    }
+    current = null;
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    // nothing to do
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return "currentRecord=" + current + " nextMessages=" + msgs;
+  }
+
+}
\ No newline at end of file
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
index 6f55686..b536725 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/types/AttributeTypeConstants.java
@@ -26,8 +26,9 @@
  */
 package org.opends.server.types;
 
-import java.util.Collections;
+import java.util.Arrays;
 
+import org.opends.server.api.AttributeSyntax;
 import org.opends.server.schema.OIDSyntax;
 
 /**
@@ -38,13 +39,36 @@
 public interface AttributeTypeConstants
 {
 
+  AttributeSyntax<?> OID_SYNTAX = new OIDSyntax();
+
   AttributeType OBJECT_CLASS = new AttributeType(
       "( 2.5.4.0 NAME 'objectClass' EQUALITY objectIdentifierMatch "
           + "SYNTAX 1.3.6.1.4.1.1466.115.121.1.38 X-ORIGIN 'RFC 2256' )",
-      "objectClass", Collections.singletonList("objectClass"), "2.5.4.0", null,
-      null, new OIDSyntax(), AttributeUsage.USER_APPLICATIONS, false, false,
-      false, false);
+      "objectClass", Arrays.asList("objectClass"), "2.5.4.0",
+      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+      false, false, false, false);
 
-  AttributeType[] ALL = { OBJECT_CLASS, };
+  AttributeType ORGANIZATION_NAME = new AttributeType(
+      "( 2.5.4.10 NAME ( 'o' 'organizationName' ) SUP name X-ORIGIN 'RFC 4519' )",
+      "organizationName", Arrays.asList("o", "organizationName"), "2.5.4.10",
+      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+      false, false, false, false);
+
+  AttributeType ORGANIZATIONAL_UNIT_NAME = new AttributeType(
+      "( 2.5.4.11 NAME ( 'ou' 'organizationalUnitName' ) SUP name X-ORIGIN 'RFC 4519' )",
+      "organizationalUnitName", Arrays.asList("ou", "organizationalUnitName"), "2.5.4.11",
+      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+      false, false, false, false);
+
+  AttributeType DOMAIN_COMPONENT = new AttributeType(
+      "( 0.9.2342.19200300.100.1.25 NAME ( 'dc' 'domainComponent' ) "
+          + "EQUALITY caseIgnoreIA5Match SUBSTR caseIgnoreIA5SubstringsMatch"
+          + "SYNTAX 1.3.6.1.4.1.1466.115.121.1.26 SINGLE-VALUE X-ORIGIN 'RFC 4519' )",
+      "domainComponent", Arrays.asList("dc", "domainComponent"), "0.9.2342.19200300.100.1.25",
+      null, null, OID_SYNTAX, AttributeUsage.USER_APPLICATIONS,
+      false, false, false, false);
+
+  AttributeType[] ALL = { OBJECT_CLASS, ORGANIZATION_NAME,
+    ORGANIZATIONAL_UNIT_NAME, DOMAIN_COMPONENT, };
 
 }

--
Gitblit v1.10.0