From 905da506252c98b57cc0cfc82ee5a453c0e15e9b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 09:30:53 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java                  |  155 ++-----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java             |   13 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java                     |  143 +++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java |   13 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java            |  536 ++++++++++++++-------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                   |   58 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java           |  151 +++----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java           |   43 -
 8 files changed, 527 insertions(+), 585 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index a040f00..2dbda07 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -27,35 +27,21 @@
  */
 package org.opends.server.replication.server;
 
+import java.io.IOException;
+import java.util.*;
+import java.util.zip.DataFormatException;
+
+import org.opends.messages.Message;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
+
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.replication.common.StatusMachine.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.zip.DataFormatException;
-
-import org.opends.messages.Message;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.*;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.ResultCode;
-
 /**
  * This class defines a server handler, which handles all interaction with a
  * peer server (RS or DS).
@@ -104,11 +90,11 @@
    * @param newGenId The new generation id to take into account
    * @throws IOException If IO error occurred.
    */
-  public void changeStatusForResetGenId(long newGenId)
-  throws IOException
+  public void changeStatusForResetGenId(long newGenId) throws IOException
   {
-    StatusMachineEvent event;
+    final int localRsServerId = replicationServer.getServerId();
 
+    StatusMachineEvent event;
     if (newGenId == -1)
     {
       // The generation id is being made invalid, let's put the DS
@@ -127,9 +113,8 @@
           if (debugEnabled())
           {
             TRACER.debugInfo(
-                "In RS " +
-                replicationServerDomain.getReplicationServer().getServerId() +
-                ". Closing connection to DS " + getServerId() +
+                "In RS " + localRsServerId +
+                ", closing connection to DS " + getServerId() +
                 " for baseDn " + getBaseDN() +
                 " to force reconnection as new local" +
                 " generationId and remote one match and DS is in bad gen id: " +
@@ -140,20 +125,19 @@
           // would rewait the RSD lock that we already must have entering this
           // method. This would lead to a reentrant lock which we do not want.
           // So simply close the session, this will make the hang up appear
-          // after the reader thread that took the RSD lock realeases it.
-          if (session != null)
+          // after the reader thread that took the RSD lock releases it.
+          if (session != null
+              // V4 protocol introduced a StopMsg to properly close the
+              // connection between servers
+             && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
           {
-            // V4 protocol introduces a StopMsg to properly close the
-            // connection between servers
-            if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+            try
             {
-              try
-              {
-                session.publish(new StopMsg());
-              } catch (IOException ioe)
-              {
-                // Anyway, going to close session, so nothing to do
-              }
+              session.publish(new StopMsg());
+            }
+            catch (IOException ioe)
+            {
+              // Anyway, going to close session, so nothing to do
             }
           }
 
@@ -165,12 +149,10 @@
         {
           if (debugEnabled())
           {
-            TRACER.debugInfo(
-                "In RS " +
-                replicationServerDomain.getReplicationServer().getServerId() +
-                ". DS " + getServerId() + " for baseDn " + getBaseDN() +
-                " has already generation id " + newGenId +
-            " so no ChangeStatusMsg sent to him.");
+            TRACER.debugInfo("In RS " + localRsServerId + ". DS "
+                + getServerId() + " for baseDn " + getBaseDN()
+                + " has already generation id " + newGenId
+                + " so no ChangeStatusMsg sent to him.");
           }
           return;
         }
@@ -182,14 +164,13 @@
       }
     }
 
-    if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) &&
-        (status == ServerStatus.FULL_UPDATE_STATUS))
+    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
+        && status == ServerStatus.FULL_UPDATE_STATUS)
     {
       // Prevent useless error message (full update status cannot lead to bad
       // gen status)
       Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
-          Integer.toString(replicationServerDomain.
-              getReplicationServer().getServerId()),
+              Integer.toString(localRsServerId),
               getBaseDN(),
               Integer.toString(serverId),
               Long.toString(generationId),
@@ -214,11 +195,9 @@
 
     if (debugEnabled())
     {
-      TRACER.debugInfo(
-          "In RS " +
-          replicationServerDomain.getReplicationServer().getServerId() +
-          " Sending change status for reset gen id to " + getServerId() +
-          " for baseDn " + getBaseDN() + ":\n" + csMsg);
+      TRACER.debugInfo("In RS " + localRsServerId
+          + " Sending change status for reset gen id to " + getServerId()
+          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
     }
 
     session.publish(csMsg);
@@ -257,11 +236,9 @@
 
     if (debugEnabled())
     {
-      TRACER.debugInfo(
-          "In RS " +
-          replicationServerDomain.getReplicationServer().getServerId() +
-          " Sending change status from status analyzer to " + getServerId() +
-          " for baseDn " + getBaseDN() + ":\n" + csMsg);
+      TRACER.debugInfo("In RS " + replicationServer.getServerId()
+          + " Sending change status from status analyzer to " + getServerId()
+          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
     }
 
     session.publish(csMsg);
@@ -296,14 +273,13 @@
     // Add the specific DS ones
     attributes.add(Attributes.create("replica", serverURL));
     attributes.add(Attributes.create("connected-to",
-        this.replicationServerDomain.getReplicationServer()
-        .getMonitorInstanceName()));
+        this.replicationServer.getMonitorInstanceName()));
 
     MonitorData md = replicationServerDomain.getDomainMonitorData();
 
     // Oldest missing update
-    Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
-    if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
+    long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+    if (approxFirstMissingDate > 0)
     {
       Date date = new Date(approxFirstMissingDate);
       attributes.add(Attributes.create(
@@ -314,14 +290,12 @@
     }
 
     // Missing changes
-    long missingChanges = md.getMissingChanges(serverId);
-    attributes.add(Attributes.create("missing-changes", String
-        .valueOf(missingChanges)));
+    attributes.add(Attributes.create("missing-changes",
+        String.valueOf(md.getMissingChanges(serverId))));
 
     // Replication delay
-    long delay = md.getApproxDelay(serverId);
-    attributes.add(Attributes.create("approximate-delay", String
-        .valueOf(delay)));
+    attributes.add(Attributes.create("approximate-delay",
+        String.valueOf(md.getApproxDelay(serverId))));
 
     /* get the Server State */
     AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -541,18 +515,9 @@
       {
         Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get(
           Integer.toString(inServerStartMsg.getServerId()),
-          Integer.toString(replicationServerDomain.getReplicationServer().
-          getServerId()));
+          Integer.toString(replicationServer.getServerId()));
         throw new DirectoryException(ResultCode.OTHER, errMessage);
       }
-      catch (NotSupportedOldVersionPDUException e)
-      {
-        // We do not need to support DS V1 connection, we just accept RS V1
-        // connection:
-        // We just trash the message, log the event for debug purpose and close
-        // the connection
-        throw new DirectoryException(ResultCode.OTHER, null, null);
-      }
       catch (Exception e)
       {
         // We do not need to support DS V1 connection, we just accept RS V1
@@ -588,7 +553,7 @@
     }
     finally
     {
-      if ((replicationServerDomain != null) &&
+      if (replicationServerDomain != null &&
           replicationServerDomain.hasLock())
         replicationServerDomain.release();
     }
@@ -610,21 +575,20 @@
     {
       // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
       startMsg = new ReplServerStartMsg(getReplicationServerId(),
-              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
           replicationServerDomain.getDbServerState(),
           localGenerationId, sslEncryption, getLocalGroupId(),
-          replicationServerDomain.getReplicationServer()
-              .getDegradedStatusThreshold());
+          replicationServer.getDegradedStatusThreshold());
     }
     else
     {
       // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
       startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
-              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
           replicationServerDomain.getDbServerState(),
           localGenerationId, sslEncryption, getLocalGroupId(),
-          replicationServerDomain.getReplicationServer()
-              .getDegradedStatusThreshold(), replicationServer.getWeight(),
+          replicationServer.getDegradedStatusThreshold(),
+          replicationServer.getWeight(),
           replicationServerDomain.getConnectedLDAPservers().size());
     }
 
@@ -651,17 +615,10 @@
   {
     if (serverId != 0)
     {
-      StringBuilder builder = new StringBuilder("Replica DS(");
-      builder.append(serverId);
-      builder.append(") for domain \"");
-      builder.append(replicationServerDomain.getBaseDn());
-      builder.append("\"");
-      return builder.toString();
+      return "Replica DS(" + serverId + ") for domain \""
+          + replicationServerDomain.getBaseDn() + "\"";
     }
-    else
-    {
-      return "Unknown server";
-    }
+    return "Unknown server";
   }
 
   /**
@@ -740,7 +697,7 @@
     else
     {
       // We are an empty ReplicationServer
-      if ((generationId > 0) && (!getServerState().isEmpty()))
+      if (generationId > 0 && !getServerState().isEmpty())
       {
         // If the LDAP server has already sent changes
         // it is not expected to connect to an empty RS
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index eb5ed4e..eae646a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,20 +365,19 @@
     if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
     {
       // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
-      startMsg = new ReplServerStartMsg(getReplicationServerId(),
-              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
-          replicationServerDomain.getDbServerState(),
-          localGenerationId, sslEncryption, getLocalGroupId(),
-          replicationServerDomain.getReplicationServer()
-              .getDegradedStatusThreshold());
+       startMsg = new ReplServerStartMsg(getReplicationServerId(),
+           getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+           replicationServerDomain.getDbServerState(),
+           localGenerationId, sslEncryption, getLocalGroupId(),
+           replicationServer.getDegradedStatusThreshold());
     }
     else
     {
       // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg
-      startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
-              getReplicationServerURL(), getBaseDN(), maxRcvWindow,
-          new ServerState(), localGenerationId, sslEncryption,
-          getLocalGroupId(), 0, replicationServer.getWeight(), 0);
+       startMsg = new ReplServerStartDSMsg(getReplicationServerId(),
+           getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+           new ServerState(), localGenerationId, sslEncryption,
+           getLocalGroupId(), 0, replicationServer.getWeight(), 0);
     }
 
     send(startMsg);
@@ -556,15 +555,13 @@
     catch(DirectoryException de)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, de);
-      if (draftCNDbIter != null)
-        draftCNDbIter.releaseCursor();
+      releaseIterator();
       throw de;
     }
     catch(Exception e)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      if (draftCNDbIter != null)
-        draftCNDbIter.releaseCursor();
+      releaseIterator();
       throw new DirectoryException(
           ResultCode.OPERATIONS_ERROR,
           Message.raw(Category.SYNC,
@@ -917,11 +914,7 @@
   {
     if (debugEnabled())
       TRACER.debugInfo(this + " shutdown()" + draftCNDbIter);
-    if (this.draftCNDbIter != null)
-    {
-      draftCNDbIter.releaseCursor();
-      draftCNDbIter = null;
-    }
+    releaseIterator();
     for (DomainContext domainCtxt : domainCtxts) {
       if (!domainCtxt.unRegisterHandler()) {
         logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -934,6 +927,15 @@
     domainCtxts = null;
   }
 
+  private void releaseIterator()
+  {
+    if (this.draftCNDbIter != null)
+    {
+      this.draftCNDbIter.releaseCursor();
+      this.draftCNDbIter = null;
+    }
+  }
+
   /**
    * Request to shutdown the associated writer.
    */
@@ -1112,7 +1114,7 @@
         {
           session.publish(
             new ErrorMsg(
-             replicationServerDomain.getReplicationServer().getServerId(),
+             replicationServer.getServerId(),
              serverId,
              Message.raw(Category.SYNC, Severity.INFORMATION,
                  "Exception raised: " + e.getMessage())));
@@ -1130,11 +1132,9 @@
     registerIntoDomain();
 
     if (debugEnabled())
-      TRACER.debugInfo(
-          this.getClass().getCanonicalName()+ " " + operationId +
-          " initialized: " +
-          " " + dumpState() + " " +
-          " " + clDomCtxtsToString(""));
+      TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
+          + " initialized: " + " " + dumpState() + " " + " "
+          + clDomCtxtsToString(""));
   }
 
   private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1522,12 +1522,8 @@
       searchPhase = UNDEFINED_PHASE;
     }
 
-    if (draftCNDbIter!=null)
-    {
-      // End of INIT_PHASE => always release the iterator
-      draftCNDbIter.releaseCursor();
-      draftCNDbIter = null;
-    }
+    // End of INIT_PHASE => always release the iterator
+    releaseIterator();
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
index 0157f44..9107b2d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -27,14 +27,7 @@
  */
 package org.opends.server.replication.server;
 
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.MonitorProvider;
@@ -50,6 +43,8 @@
 import org.opends.server.types.Attributes;
 import org.opends.server.types.InitializationException;
 
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
 /**
  * This class defines a server handler dedicated to the remote LDAP servers
  * connected to a remote Replication Server.
@@ -142,13 +137,15 @@
     this.protocolVersion = protocolVersion;
 
     if (debugEnabled())
-      TRACER.debugInfo(
-        "In " +
-  replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
-        " LWSH for remote server " + this.serverId +
-        " connected to:" + this.replServerHandler.getMonitorInstanceName() +
-        " ()");
-}
+      TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
+          + " LWSH for remote server " + this.serverId + " connected to:"
+          + this.replServerHandler.getMonitorInstanceName() + " ()");
+  }
+
+  private String getLocalRSMonitorInstanceName()
+  {
+    return rsDomain.getReplicationServer().getMonitorInstanceName();
+  }
 
   /**
    * Creates a DSInfo structure representing this remote DS.
@@ -176,15 +173,11 @@
   public void startHandler()
   {
     if (debugEnabled())
-      TRACER.debugInfo(
-      "In " +
-replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
-      " LWSH for remote server " + this.serverId +
-      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
-          " start");
+      TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
+          + " LWSH for remote server " + this.serverId + " connected to:"
+          + this.replServerHandler.getMonitorInstanceName() + " start");
     DirectoryServer.deregisterMonitorProvider(this);
     DirectoryServer.registerMonitorProvider(this);
-
   }
 
   /**
@@ -193,10 +186,8 @@
   public void stopHandler()
   {
     if (debugEnabled())
-      TRACER.debugInfo("In "
-          + replServerHandler.getDomain().getReplicationServer()
-              .getMonitorInstanceName() + " LWSH for remote server "
-          + this.serverId + " connected to:"
+      TRACER.debugInfo("In " + getLocalRSMonitorInstanceName()
+          + " LWSH for remote server " + this.serverId + " connected to:"
           + this.replServerHandler.getMonitorInstanceName() + " stop");
     DirectoryServer.deregisterMonitorProvider(this);
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index ed652c7..f55298c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@
            *           unlock memory tree
            *           restart as usual
            *   load this change on the delayList
-           *
            */
-          SortedSet<ReplicationIterator> iteratorSortedSet =
-              new TreeSet<ReplicationIterator>(
-                  new ReplicationIteratorComparator());
+          SortedSet<ReplicationIterator> iteratorSortedSet = null;
           try
           {
-            /* fill the lateQueue */
-            for (int serverId : replicationServerDomain.getServers())
-            {
-              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-              ReplicationIterator iterator = replicationServerDomain
-                  .getChangelogIterator(serverId, lastCsn);
-              if (iterator != null)
-              {
-                if (iterator.getChange() != null)
-                {
-                  iteratorSortedSet.add(iterator);
-                }
-                else
-                {
-                  iterator.releaseCursor();
-                }
-              }
-            }
+            iteratorSortedSet = collectAllIteratorsWithChanges();
 
+            /* fill the lateQueue */
             // The loop below relies on the fact that it is sorted based
             // on the currentChange of each iterator to consider the next
             // change across all servers.
@@ -320,22 +301,12 @@
               ReplicationIterator iterator = iteratorSortedSet.first();
               iteratorSortedSet.remove(iterator);
               lateQueue.add(iterator.getChange());
-              if (iterator.next())
-              {
-                iteratorSortedSet.add(iterator);
-              }
-              else
-              {
-                iterator.releaseCursor();
-              }
+              addIteratorIfNotEmpty(iteratorSortedSet, iterator);
             }
           }
           finally
           {
-            for (ReplicationIterator iterator : iteratorSortedSet)
-            {
-              iterator.releaseCursor();
-            }
+            releaseAllIterators(iteratorSortedSet);
           }
 
           /*
@@ -343,7 +314,6 @@
            * messages in the replication log so the remote serevr is not
            * late anymore.
            */
-
           if (lateQueue.isEmpty())
           {
             synchronized (msgQueue)
@@ -430,6 +400,19 @@
     return null;
   }
 
+  private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators,
+      ReplicationIterator iter)
+  {
+    if (iter.next())
+    {
+      iterators.add(iter);
+    }
+    else
+    {
+      iter.releaseCursor();
+    }
+  }
+
   /**
    * Get the older Change Number for that server.
    * Returns null when the queue is empty.
@@ -450,7 +433,12 @@
       }
       else
       {
-        if (lateQueue.isEmpty())
+        if (!lateQueue.isEmpty())
+        {
+          UpdateMsg msg = lateQueue.first();
+          result = msg.getChangeNumber();
+        }
+        else
         {
           /*
           following is false AND lateQueue is empty
@@ -460,36 +448,10 @@
           there. So let's take the last change not sent directly from
           the db.
           */
-          SortedSet<ReplicationIterator> iteratorSortedSet =
-              new TreeSet<ReplicationIterator>(
-                  new ReplicationIteratorComparator());
+          SortedSet<ReplicationIterator> iteratorSortedSet = null;
           try
           {
-            // Build a list of candidates iterator (i.e. db i.e. server)
-            for (int serverId : replicationServerDomain.getServers())
-            {
-              // get the last already sent CN from that server
-              ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
-              // get an iterator in this server db from that last change
-              ReplicationIterator iterator =
-                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
-              /*
-              if that iterator has changes, then it is a candidate
-              it is added in the sorted list at a position given by its
-              current change (see ReplicationIteratorComparator).
-              */
-              if (iterator != null)
-              {
-                if (iterator.getChange() != null)
-                {
-                  iteratorSortedSet.add(iterator);
-                }
-                else
-                {
-                  iterator.releaseCursor();
-                }
-              }
-            }
+            iteratorSortedSet = collectAllIteratorsWithChanges();
             UpdateMsg msg = iteratorSortedSet.first().getChange();
             result = msg.getChangeNumber();
           } catch (Exception e)
@@ -497,21 +459,58 @@
             result = null;
           } finally
           {
-            for (ReplicationIterator iterator : iteratorSortedSet)
-            {
-              iterator.releaseCursor();
-            }
+            releaseAllIterators(iteratorSortedSet);
           }
-        } else
-        {
-          UpdateMsg msg = lateQueue.first();
-          result = msg.getChangeNumber();
         }
       }
     }
     return result;
   }
 
+  private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges()
+  {
+    SortedSet<ReplicationIterator> results =
+        new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator());
+
+    // Build a list of candidates iterator (i.e. db i.e. server)
+    for (int serverId : replicationServerDomain.getServers())
+    {
+      // get the last already sent CN from that server
+      ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
+      // get an iterator in this server db from that last change
+      ReplicationIterator iter =
+        replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+      /*
+      if that iterator has changes, then it is a candidate
+      it is added in the sorted list at a position given by its
+      current change (see ReplicationIteratorComparator).
+      */
+      if (iter != null)
+      {
+        if (iter.getChange() != null)
+        {
+          results.add(iter);
+        }
+        else
+        {
+          iter.releaseCursor();
+        }
+      }
+    }
+    return results;
+  }
+
+  private void releaseAllIterators(SortedSet<ReplicationIterator> iterators)
+  {
+    if (iterators != null)
+    {
+      for (ReplicationIterator iter : iterators)
+      {
+        iter.releaseCursor();
+      }
+    }
+  }
+
   /**
    * Get the count of updates sent to this server.
    * @return  The count of update sent to this server.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index e335175..e004623 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -118,7 +118,8 @@
    */
   private final Map<Integer, DbHandler> sourceDbHandlers =
     new ConcurrentHashMap<Integer, DbHandler>();
-  private ReplicationServer replicationServer;
+  /** The ReplicationServer that created the current instance. */
+  private ReplicationServer localReplicationServer;
 
   /** GenerationId management. */
   private volatile long generationId = -1;
@@ -217,16 +218,16 @@
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
    *
    * @param baseDn The baseDn associated to the ReplicationServerDomain.
-   * @param replicationServer the ReplicationServer that created this
+   * @param localReplicationServer the ReplicationServer that created this
    *                          replicationServer cache.
    */
-  public ReplicationServerDomain(
-      String baseDn, ReplicationServer replicationServer)
+  public ReplicationServerDomain(String baseDn,
+      ReplicationServer localReplicationServer)
   {
     this.baseDn = baseDn;
-    this.replicationServer = replicationServer;
+    this.localReplicationServer = localReplicationServer;
     this.assuredTimeoutTimer = new Timer("Replication server RS("
-        + replicationServer.getServerId()
+        + localReplicationServer.getServerId()
         + ") assured timer for domain \"" + baseDn + "\"", true);
 
     DirectoryServer.registerMonitorProvider(this);
@@ -245,9 +246,9 @@
   public void put(UpdateMsg update, ServerHandler sourceHandler)
     throws IOException
   {
-
     ChangeNumber cn = update.getChangeNumber();
-    int id = cn.getServerId();
+    int serverId = cn.getServerId();
+
     sourceHandler.updateServerState(update);
     sourceHandler.incrementInCount();
 
@@ -297,7 +298,7 @@
         {
           // Unknown assured mode: should never happen
           Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
-            Integer.toString(replicationServer.getServerId()),
+            Integer.toString(localReplicationServer.getServerId()),
             assuredMode.toString(), baseDn, update.toString());
           logError(errorMsg);
           assuredMessage = false;
@@ -308,40 +309,11 @@
       }
     }
 
-    // look for the dbHandler that is responsible for the LDAP server which
-    // generated the change.
-    DbHandler dbHandler;
-    synchronized (sourceDbHandlers)
+    if (!publishMessage(update, serverId))
     {
-      dbHandler = sourceDbHandlers.get(id);
-      if (dbHandler == null)
-      {
-        try
-        {
-          dbHandler = replicationServer.newDbHandler(id, baseDn);
-          generationIdSavedStatus = true;
-        } catch (ChangelogException e)
-        {
-          /*
-           * Because of database problem we can't save any more changes
-           * from at least one LDAP server.
-           * This replicationServer therefore can't do it's job properly anymore
-           * and needs to close all its connections and shutdown itself.
-           */
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          replicationServer.shutdown();
-          return;
-        }
-        sourceDbHandlers.put(id, dbHandler);
-      }
+      return;
     }
 
-    // Publish the messages to the source handler
-    dbHandler.add(update);
-
     List<Integer> expectedServers = null;
     if (assuredMessage)
     {
@@ -363,7 +335,7 @@
         // times out)
         AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
         assuredTimeoutTimer.schedule(assuredTimeoutTask,
-          replicationServer.getAssuredTimeout());
+            localReplicationServer.getAssuredTimeout());
         // Purge timer every 100 treated messages
         assuredTimeoutTimerPurgeCounter++;
         if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
@@ -408,8 +380,9 @@
           if (debugEnabled())
           {
             TRACER.debugInfo("In Replication Server "
-                + replicationServer.getReplicationPort() + " " + baseDn + " "
-                + replicationServer.getServerId() + " for dn " + baseDn
+                + localReplicationServer.getReplicationPort() + " " + baseDn
+                + " "
+                + localReplicationServer.getServerId() + " for dn " + baseDn
                 + ", update " + update.getChangeNumber()
                 + " will not be sent to replication server "
                 + handler.getServerId() + " with generation id "
@@ -464,7 +437,7 @@
           }
           if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
           {
-            TRACER.debugInfo("In RS " + replicationServer.getServerId()
+            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                 + " for dn " + baseDn + ", update " + update.getChangeNumber()
                 + " will not be sent to directory server "
                 + handler.getServerId() + " as it is in full update");
@@ -484,6 +457,44 @@
     }
   }
 
+  private boolean publishMessage(UpdateMsg update, int serverId)
+  {
+    // look for the dbHandler that is responsible for the LDAP server which
+    // generated the change.
+    DbHandler dbHandler;
+    synchronized (sourceDbHandlers)
+    {
+      dbHandler = sourceDbHandlers.get(serverId);
+      if (dbHandler == null)
+      {
+        try
+        {
+          dbHandler = localReplicationServer.newDbHandler(serverId, baseDn);
+          generationIdSavedStatus = true;
+        } catch (ChangelogException e)
+        {
+          /*
+           * Because of database problem we can't save any more changes
+           * from at least one LDAP server.
+           * This replicationServer therefore can't do it's job properly anymore
+           * and needs to close all its connections and shutdown itself.
+           */
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+          mb.append(stackTraceToSingleLineString(e));
+          logError(mb.toMessage());
+          localReplicationServer.shutdown();
+          return false;
+        }
+        sourceDbHandlers.put(serverId, dbHandler);
+      }
+    }
+
+    // Publish the messages to the source handler
+    dbHandler.add(update);
+    return true;
+  }
+
   private NotAssuredUpdateMsg addUpdate(ServerHandler handler,
       UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate,
       boolean assuredMessage, List<Integer> expectedServers)
@@ -557,7 +568,7 @@
     UpdateMsg update, ServerHandler sourceHandler) throws IOException
   {
     ChangeNumber cn = update.getChangeNumber();
-    byte groupId = replicationServer.getGroupId();
+    byte groupId = localReplicationServer.getGroupId();
     byte sourceGroupId = sourceHandler.getGroupId();
     List<Integer> expectedServers = new ArrayList<Integer>();
     List<Integer> wrongStatusServers = new ArrayList<Integer>();
@@ -642,13 +653,13 @@
     ChangeNumber cn = update.getChangeNumber();
     boolean interestedInAcks = false;
     byte safeDataLevel = update.getSafeDataLevel();
-    byte groupId = replicationServer.getGroupId();
+    byte groupId = localReplicationServer.getGroupId();
     byte sourceGroupId = sourceHandler.getGroupId();
     if (safeDataLevel < (byte) 1)
     {
       // Should never happen
       Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
-        Integer.toString(replicationServer.getServerId()),
+        Integer.toString(localReplicationServer.getServerId()),
         Byte.toString(safeDataLevel), baseDn, update.toString());
       logError(errorMsg);
     } else if (sourceGroupId == groupId
@@ -799,7 +810,7 @@
              */
             MessageBuilder mb = new MessageBuilder();
             mb.append(ERR_RS_ERROR_SENDING_ACK.get(
-              Integer.toString(replicationServer.getServerId()),
+              Integer.toString(localReplicationServer.getServerId()),
               Integer.toString(origServer.getServerId()),
               cn.toString(), baseDn));
             mb.append(stackTraceToSingleLineString(e));
@@ -862,7 +873,7 @@
           ServerHandler origServer = expectedAcksInfo.getRequesterServer();
           if (debugEnabled())
           {
-            TRACER.debugInfo("In RS " + replicationServer.getServerId()
+            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
                     + " for "+ baseDn
                     + ", sending timeout for assured update with change "
                     + " number " + cn + " to server id "
@@ -879,7 +890,7 @@
              */
             MessageBuilder mb = new MessageBuilder();
             mb.append(ERR_RS_ERROR_SENDING_ACK.get(
-                Integer.toString(replicationServer.getServerId()),
+                Integer.toString(localReplicationServer.getServerId()),
                 Integer.toString(origServer.getServerId()),
                 cn.toString(), baseDn));
             mb.append(stackTraceToSingleLineString(e));
@@ -987,7 +998,7 @@
     {
       // looks like two connected LDAP servers have the same serverId
       Message message = ERR_DUPLICATE_SERVER_ID.get(
-          replicationServer.getMonitorInstanceName(),
+          localReplicationServer.getMonitorInstanceName(),
           directoryServers.get(handler.getServerId()).toString(),
           handler.toString(), handler.getServerId());
       logError(message);
@@ -1007,7 +1018,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+      TRACER.debugInfo("In "
+          + this.localReplicationServer.getMonitorInstanceName()
           + " domain=" + this + " stopServer() on the server handler "
           + handler.getMonitorInstanceName());
     }
@@ -1045,7 +1057,8 @@
         {
           if (debugEnabled())
           {
-            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+            TRACER.debugInfo("In "
+                + localReplicationServer.getMonitorInstanceName()
                 + " remote server " + handler.getMonitorInstanceName()
                 + " is the last RS/DS to be stopped:"
                 + " stopping monitoring publisher");
@@ -1078,7 +1091,7 @@
             if (debugEnabled())
             {
               TRACER.debugInfo("In "
-                  + replicationServer.getMonitorInstanceName()
+                  + localReplicationServer.getMonitorInstanceName()
                   + " remote server " + handler.getMonitorInstanceName()
                   + " is the last DS to be stopped: stopping status analyzer");
             }
@@ -1128,7 +1141,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+      TRACER.debugInfo("In "
+          + this.localReplicationServer.getMonitorInstanceName()
           + " domain=" + this + " stopServer() on the message handler "
           + handler.getMonitorInstanceName());
     }
@@ -1207,8 +1221,8 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In RS "
-          + this.replicationServer.getMonitorInstanceName() + " for " + baseDn
-          + " " + " mayResetGenerationId generationIdSavedStatus="
+          + this.localReplicationServer.getMonitorInstanceName()
+          + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus="
           + generationIdSavedStatus);
     }
 
@@ -1225,7 +1239,7 @@
           if (debugEnabled())
           {
             TRACER.debugInfo("In RS "
-                + this.replicationServer.getMonitorInstanceName() + " for "
+                + this.localReplicationServer.getMonitorInstanceName() + " for "
                 + baseDn + " " + " mayResetGenerationId skip RS"
                 + rsh.getMonitorInstanceName() + " that has different genId");
           }
@@ -1236,7 +1250,7 @@
             if (debugEnabled())
             {
               TRACER.debugInfo("In RS "
-                  + this.replicationServer.getMonitorInstanceName()
+                  + this.localReplicationServer.getMonitorInstanceName()
                   + " for "+ baseDn + " mayResetGenerationId RS"
                   + rsh.getMonitorInstanceName()
                   + " has servers connected to it"
@@ -1252,7 +1266,7 @@
       if (debugEnabled())
       {
         TRACER.debugInfo("In RS "
-            + this.replicationServer.getMonitorInstanceName() + " for "
+            + this.localReplicationServer.getMonitorInstanceName() + " for "
             + baseDn + " "
             + " has servers connected to it - will not reset generationId");
       }
@@ -1292,7 +1306,7 @@
         // looks like two replication servers have the same serverId
         // log an error message and drop this connection.
         Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
-          replicationServer.getMonitorInstanceName(), oldHandler.
+          localReplicationServer.getMonitorInstanceName(), oldHandler.
           getServerAddressURL(), handler.getServerAddressURL(),
           handler.getServerId());
         throw new DirectoryException(ResultCode.OTHER, message);
@@ -1372,12 +1386,12 @@
    * and locks used by the ReplicationIterator.
    *
    * @param serverId Identifier of the server for which the iterator is created.
-   * @param changeNumber Starting point for the iterator.
+   * @param startAfterCN Starting point for the iterator.
    * @return the created ReplicationIterator. Null when no DB is available
    * for the provided server Id.
    */
   public ReplicationIterator getChangelogIterator(int serverId,
-      ChangeNumber changeNumber)
+      ChangeNumber startAfterCN)
   {
     DbHandler handler = sourceDbHandlers.get(serverId);
     if (handler == null)
@@ -1388,7 +1402,7 @@
     ReplicationIterator it;
     try
     {
-      it = handler.generateIterator(changeNumber);
+      it = handler.generateIterator(startAfterCN);
     }
     catch (Exception e)
     {
@@ -1535,14 +1549,15 @@
   }
 
   /**
-   * Processes a message coming from one server in the topology
-   * and potentially forwards it to one or all other servers.
+   * Processes a message coming from one server in the topology and potentially
+   * forwards it to one or all other servers.
    *
-   * @param msg The message received and to be processed.
-   * @param senderHandler The server handler of the server that emitted
-   * the message.
+   * @param msg
+   *          The message received and to be processed.
+   * @param msgEmitter
+   *          The server handler of the server that emitted the message.
    */
-  public void process(RoutableMsg msg, ServerHandler senderHandler)
+  public void process(RoutableMsg msg, ServerHandler msgEmitter)
   {
     // Test the message for which a ReplicationServer is expected
     // to be the destination
@@ -1551,158 +1566,176 @@
         !(msg instanceof InitializeRcvAckMsg) &&
         !(msg instanceof EntryMsg) &&
         !(msg instanceof DoneMsg) &&
-        (msg.getDestination() == this.replicationServer.getServerId()))
+        (msg.getDestination() == this.localReplicationServer.getServerId()))
     {
       if (msg instanceof ErrorMsg)
       {
         ErrorMsg errorMsg = (ErrorMsg) msg;
-        logError(ERR_ERROR_MSG_RECEIVED.get(
-          errorMsg.getDetails()));
+        logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
       } else if (msg instanceof MonitorRequestMsg)
       {
-        // If the request comes from a Directory Server we need to
-        // build the full list of all servers in the topology
-        // and send back a MonitorMsg with the full list of all the servers
-        // in the topology.
-        if (senderHandler.isDataServer())
-        {
-          // Monitoring information requested by a DS
-          MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
-              msg.getDestination(), msg.getSenderID(), monitorData);
-
-          if (monitorMsg != null)
-          {
-            try
-            {
-              senderHandler.send(monitorMsg);
-            }
-            catch (IOException e)
-            {
-              // the connection was closed.
-            }
-          }
-          return;
-        } else
-        {
-          // Monitoring information requested by a RS
-          MonitorMsg monitorMsg =
-            createLocalTopologyMonitorMsg(msg.getDestination(),
-            msg.getSenderID());
-
-          if (monitorMsg != null)
-          {
-            try
-            {
-              senderHandler.send(monitorMsg);
-            } catch (Exception e)
-            {
-              // We log the error. The requestor will detect a timeout or
-              // any other failure on the connection.
-              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
-                  Integer.toString(msg.getDestination())));
-            }
-          }
-        }
+        replyWithMonitorMsg(msg, msgEmitter);
       } else if (msg instanceof MonitorMsg)
       {
         MonitorMsg monitorMsg = (MonitorMsg) msg;
-        receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId());
+        receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
       } else
       {
-        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
-          msg.getClass().getCanonicalName()));
-
-        MessageBuilder mb1 = new MessageBuilder();
-        mb1.append(
-            NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName()));
-        mb1.append("serverID:").append(msg.getDestination());
-        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb1.toMessage());
-        try
-        {
-          senderHandler.send(errMsg);
-        } catch (IOException ioe1)
-        {
-          // an error happened on the sender session trying to recover
-          // from an error on the receiver session.
-          // Not much more we can do at this point.
-        }
+        replyWithUnroutableMsgType(msgEmitter, msg);
       }
       return;
     }
 
-    List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
-
-    if (servers.isEmpty())
+    List<ServerHandler> servers = getDestinationServers(msg, msgEmitter);
+    if (!servers.isEmpty())
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
-          this.baseDn, Integer.toString(msg.getDestination())));
-      mb.append(" In Replication Server=").append(
-        this.replicationServer.getMonitorInstanceName());
-      mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
-      mb.append(" Details:routing table is empty");
-      ErrorMsg errMsg = new ErrorMsg(
-        this.replicationServer.getServerId(),
-        msg.getSenderID(),
-        mb.toMessage());
-      logError(mb.toMessage());
+      forwardMsgToAllServers(msg, servers, msgEmitter);
+    }
+    else
+    {
+      replyWithUnreachablePeerMsg(msgEmitter, msg);
+    }
+  }
+
+  private void replyWithMonitorMsg(RoutableMsg msg, ServerHandler msgEmitter)
+  {
+    /*
+     * If the request comes from a Directory Server we need to build the full
+     * list of all servers in the topology and send back a MonitorMsg with the
+     * full list of all the servers in the topology.
+     */
+    if (msgEmitter.isDataServer())
+    {
+      // Monitoring information requested by a DS
+      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+          msg.getDestination(), msg.getSenderID(), monitorData);
       try
       {
-        senderHandler.send(errMsg);
-      } catch (IOException ioe)
-      {
-        // TODO Handle error properly (sender timeout in addition)
-        /*
-         * An error happened trying to send an error msg to this server.
-         * Log an error and close the connection to this server.
-         */
-        MessageBuilder mb2 = new MessageBuilder();
-        mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
-        mb2.append(stackTraceToSingleLineString(ioe));
-        logError(mb2.toMessage());
-        stopServer(senderHandler, false);
+        msgEmitter.send(monitorMsg);
       }
-    } else
+      catch (IOException e)
+      {
+        // the connection was closed.
+      }
+    }
+    else
     {
-      for (ServerHandler targetHandler : servers)
+      // Monitoring information requested by a RS
+      MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(
+          msg.getDestination(), msg.getSenderID());
+
+      if (monitorMsg != null)
       {
         try
         {
-          targetHandler.send(msg);
-        } catch (IOException ioe)
+          msgEmitter.send(monitorMsg);
+        }
+        catch (IOException e)
         {
-          /*
-           * An error happened trying the send a routable message
-           * to its destination server.
-           * Send back an error to the originator of the message.
-           */
-          MessageBuilder mb1 = new MessageBuilder();
-          mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
-              this.baseDn, Integer.toString(msg.getDestination())));
-          mb1.append(" unroutable message =" + msg.getClass().getSimpleName());
-          mb1.append(" Details: " + ioe.getLocalizedMessage());
-          ErrorMsg errMsg = new ErrorMsg(
-            msg.getSenderID(), mb1.toMessage());
-          logError(mb1.toMessage());
-          try
-          {
-            senderHandler.send(errMsg);
-          } catch (IOException ioe1)
-          {
-            // an error happened on the sender session trying to recover
-            // from an error on the receiver session.
-            // We don't have much solution left beside closing the sessions.
-            stopServer(senderHandler, false);
-            stopServer(targetHandler, false);
-          }
-        // TODO Handle error properly (sender timeout in addition)
+          // We log the error. The requestor will detect a timeout or
+          // any other failure on the connection.
+          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg
+              .getDestination())));
         }
       }
     }
-
   }
 
+  private void replyWithUnroutableMsgType(ServerHandler msgEmitter,
+      RoutableMsg msg)
+  {
+    String msgClassname = msg.getClass().getCanonicalName();
+    logError(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
 
+    MessageBuilder mb = new MessageBuilder();
+    mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname));
+    mb.append("serverID:").append(msg.getDestination());
+    ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage());
+    try
+    {
+      msgEmitter.send(errMsg);
+    }
+    catch (IOException ignored)
+    {
+      // an error happened on the sender session trying to recover
+      // from an error on the receiver session.
+      // Not much more we can do at this point.
+    }
+  }
+
+  private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter,
+      RoutableMsg msg)
+  {
+    MessageBuilder mb = new MessageBuilder();
+    mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+        this.baseDn, Integer.toString(msg.getDestination())));
+    mb.append(" In Replication Server=").append(
+      this.localReplicationServer.getMonitorInstanceName());
+    mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
+    mb.append(" Details:routing table is empty");
+    final Message message = mb.toMessage();
+    logError(message);
+
+    ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(),
+        msg.getSenderID(), message);
+    try
+    {
+      msgEmitter.send(errMsg);
+    }
+    catch (IOException ignored)
+    {
+      // TODO Handle error properly (sender timeout in addition)
+      /*
+       * An error happened trying to send an error msg to this server.
+       * Log an error and close the connection to this server.
+       */
+      MessageBuilder mb2 = new MessageBuilder();
+      mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
+      mb2.append(stackTraceToSingleLineString(ignored));
+      logError(mb2.toMessage());
+      stopServer(msgEmitter, false);
+    }
+  }
+
+  private void forwardMsgToAllServers(RoutableMsg msg,
+      List<ServerHandler> servers, ServerHandler msgEmitter)
+  {
+    for (ServerHandler targetHandler : servers)
+    {
+      try
+      {
+        targetHandler.send(msg);
+      } catch (IOException ioe)
+      {
+        /*
+         * An error happened trying to send a routable message to its
+         * destination server.
+         * Send back an error to the originator of the message.
+         */
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
+            this.baseDn, Integer.toString(msg.getDestination())));
+        mb.append(" unroutable message =" + msg.getClass().getSimpleName());
+        mb.append(" Details: " + ioe.getLocalizedMessage());
+        final Message message = mb.toMessage();
+        logError(message);
+
+        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message);
+        try
+        {
+          msgEmitter.send(errMsg);
+        } catch (IOException ioe1)
+        {
+          // an error happened on the sender session trying to recover
+          // from an error on the receiver session.
+          // We don't have much solution left beside closing the sessions.
+          stopServer(msgEmitter, false);
+          stopServer(targetHandler, false);
+        }
+      // TODO Handle error properly (sender timeout in addition)
+      }
+    }
+  }
 
   /**
    * Creates a new monitor message including monitoring information for the
@@ -1720,13 +1753,11 @@
   public MonitorMsg createGlobalTopologyMonitorMsg(
       int sender, int destination, MonitorData monitorData)
   {
-    MonitorMsg returnMsg =
-      new MonitorMsg(sender, destination);
+    final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
 
     returnMsg.setReplServerDbState(getDbServerState());
 
-    // Add the informations about the Replicas currently in
-    // the topology.
+    // Add the informations about the Replicas currently in the topology.
     Iterator<Integer> it = monitorData.ldapIterator();
     while (it.hasNext())
     {
@@ -1736,8 +1767,7 @@
           monitorData.getApproxFirstMissingDate(replicaId), true);
     }
 
-    // Add the information about the Replication Servers
-    // currently in the topology.
+    // Add the information about the RSs currently in the topology.
     it = monitorData.rsIterator();
     while (it.hasNext())
     {
@@ -1787,16 +1817,14 @@
       for (DataServerHandler lsh : this.directoryServers.values())
       {
         monitorMsg.setServerState(lsh.getServerId(),
-            lsh.getServerState(), lsh.getApproxFirstMissingDate(),
-            true);
+            lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
       }
 
       // Same for the connected RS
       for (ReplicationServerHandler rsh : this.replicationServers.values())
       {
         monitorMsg.setServerState(rsh.getServerId(),
-            rsh.getServerState(), rsh.getApproxFirstMissingDate(),
-            false);
+            rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
       }
 
       // Populate the RS state in the msg from the DbState
@@ -1821,15 +1849,12 @@
 
     stopAllServers(true);
 
-    stopDbHandlers();
+    shutdownDbHandlers();
   }
 
-  /**
-   * Stop the dbHandlers .
-   */
-  private void stopDbHandlers()
+  /** Shutdown all the dbHandlers. */
+  private void shutdownDbHandlers()
   {
-    // Shutdown the dbHandlers
     synchronized (sourceDbHandlers)
     {
       for (DbHandler dbHandler : sourceDbHandlers.values())
@@ -1964,9 +1989,7 @@
 
     // Create info for the local RS
     List<RSInfo> rsInfos = new ArrayList<RSInfo>();
-    RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
-    rsInfos.add(localRSInfo);
-
+    rsInfos.add(toRSInfo(localReplicationServer, generationId));
     return new TopologyMsg(dsInfos, rsInfos);
   }
 
@@ -1982,10 +2005,8 @@
    */
   public TopologyMsg createTopologyMsgForDS(int destDsId)
   {
-    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
-    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
-
     // Go through every DSs (except recipient of msg)
+    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
     for (DataServerHandler serverHandler : directoryServers.values())
     {
       if (serverHandler.getServerId() == destDsId)
@@ -1995,15 +2016,15 @@
       dsInfos.add(serverHandler.toDSInfo());
     }
 
+
+    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
     // Add our own info (local RS)
-    RSInfo localRSInfo = toRSInfo(replicationServer, generationId);
-    rsInfos.add(localRSInfo);
+    rsInfos.add(toRSInfo(localReplicationServer, generationId));
 
     // Go through every peer RSs (and get their connected DSs), also add info
     // for RSs
     for (ReplicationServerHandler serverHandler : replicationServers.values())
     {
-      // Put RS info
       rsInfos.add(serverHandler.toRSInfo());
 
       serverHandler.addDSInfos(dsInfos);
@@ -2354,11 +2375,11 @@
           logError(mb.toMessage());
         }
       }
-      stopDbHandlers();
+      shutdownDbHandlers();
     }
     try
     {
-      replicationServer.clearGenerationId(baseDn);
+      localReplicationServer.clearGenerationId(baseDn);
     } catch (Exception e)
     {
       // TODO: i18n
@@ -2381,7 +2402,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+      TRACER.debugInfo("In "
+          + this.localReplicationServer.getMonitorInstanceName()
           + " baseDN=" + baseDn + " isDegraded serverId=" + serverId
           + " given local generation Id=" + this.generationId);
     }
@@ -2398,7 +2420,8 @@
 
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+      TRACER.debugInfo("In "
+          + this.localReplicationServer.getMonitorInstanceName()
           + " baseDN=" + baseDn + " Compute degradation of serverId="
           + serverId + " LS server generation Id=" + handler.getGenerationId());
     }
@@ -2411,7 +2434,7 @@
    */
   public ReplicationServer getReplicationServer()
   {
-    return replicationServer;
+    return localReplicationServer;
   }
 
   /**
@@ -2557,7 +2580,7 @@
               int serverId = rs.getServerId();
 
               MonitorRequestMsg msg = new MonitorRequestMsg(
-                  this.replicationServer.getServerId(), serverId);
+                  this.localReplicationServer.getServerId(), serverId);
               try
               {
                 rs.send(msg);
@@ -2684,7 +2707,7 @@
     // - from our own local db state
     // - whatever they are directly or indirectly connected
     ServerState dbServerState = getDbServerState();
-    pendingMonitorData.setRSState(replicationServer.getServerId(),
+    pendingMonitorData.setRSState(localReplicationServer.getServerId(),
         dbServerState);
     for (int serverId : dbServerState) {
       ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
@@ -2744,7 +2767,7 @@
         while (rsidIterator.hasNext())
         {
           int rsid = rsidIterator.next();
-          if (rsid == replicationServer.getServerId())
+          if (rsid == localReplicationServer.getServerId())
           {
             // this is the latency of the remote RSi regarding the current RS
             // let's update the fmd of my connected LS
@@ -2895,7 +2918,7 @@
     if (statusAnalyzer == null)
     {
       int degradedStatusThreshold =
-        replicationServer.getDegradedStatusThreshold();
+        localReplicationServer.getDegradedStatusThreshold();
       if (degradedStatusThreshold > 0) // 0 means no status analyzer
       {
         statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
@@ -2946,7 +2969,7 @@
     if (monitoringPublisher == null)
     {
       long period =
-        replicationServer.getMonitoringPublisherPeriod();
+        localReplicationServer.getMonitoringPublisherPeriod();
       if (period > 0) // 0 means no monitoring publisher
       {
         monitoringPublisher = new MonitoringPublisher(this, period);
@@ -3004,8 +3027,8 @@
   @Override
   public String getMonitorInstanceName()
   {
-    return "Replication server RS(" + replicationServer.getServerId() + ") "
-        + replicationServer.getServerURL() + ",cn="
+    return "Replication server RS(" + localReplicationServer.getServerId()
+        + ") " + localReplicationServer.getServerURL() + ",cn="
         + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication";
   }
 
@@ -3018,9 +3041,9 @@
     // publish the server id and the port number.
     List<Attribute> attributes = new ArrayList<Attribute>();
     attributes.add(Attributes.create("replication-server-id",
-        String.valueOf(replicationServer.getServerId())));
+        String.valueOf(localReplicationServer.getServerId())));
     attributes.add(Attributes.create("replication-server-port",
-        String.valueOf(replicationServer.getReplicationPort())));
+        String.valueOf(localReplicationServer.getReplicationPort())));
 
     // Add all the base DNs that are known by this replication server.
     attributes.add(Attributes.create("domain-name", baseDn));
@@ -3032,7 +3055,7 @@
     MonitorData md = getDomainMonitorData();
 
     // Missing changes
-    long missingChanges = md.getMissingChangesRS(replicationServer
+    long missingChanges = md.getMissingChangesRS(localReplicationServer
         .getServerId());
     attributes.add(Attributes.create("missing-changes",
         String.valueOf(missingChanges)));
@@ -3201,30 +3224,13 @@
       }
       */
 
-      boolean serverIdConnected = false;
-      if (directoryServers.containsKey(serverId))
-      {
-        serverIdConnected = true;
-      }
-      else
-      {
-        // not directly connected
-        for (ReplicationServerHandler rsh : replicationServers.values())
-        {
-          if (rsh.isRemoteLDAPServer(serverId))
-          {
-            serverIdConnected = true;
-            break;
-          }
-        }
-      }
-      if (!serverIdConnected)
+      if (!isServerConnected(serverId))
       {
         if (debugEnabled())
         {
           TRACER.debugInfo("In " + "Replication Server "
-              + replicationServer.getReplicationPort() + " " + baseDn + " "
-              + replicationServer.getServerId() + " Server " + serverId
+              + localReplicationServer.getReplicationPort() + " " + baseDn + " "
+              + localReplicationServer.getServerId() + " Server " + serverId
               + " is not considered for eligibility ... potentially down");
         }
         continue;
@@ -3246,13 +3252,31 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In Replication Server "
-          + replicationServer.getReplicationPort() + " " + baseDn + " "
-          + replicationServer.getServerId()
+          + localReplicationServer.getReplicationPort() + " " + baseDn + " "
+          + localReplicationServer.getServerId()
           + " getEligibleCN() returns result =" + eligibleCN);
     }
     return eligibleCN;
   }
 
+  private boolean isServerConnected(int serverId)
+  {
+    if (directoryServers.containsKey(serverId))
+    {
+      return true;
+    }
+
+    // not directly connected
+    for (ReplicationServerHandler rsHandler : replicationServers.values())
+    {
+      if (rsHandler.isRemoteLDAPServer(serverId))
+      {
+        return true;
+      }
+    }
+    return false;
+  }
+
 
   /**
    * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
@@ -3299,8 +3323,8 @@
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
             logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                 .get("Replication Server "
-                    + replicationServer.getReplicationPort() + " "
-                    + baseDn + " " + replicationServer.getServerId()));
+                    + localReplicationServer.getReplicationPort() + " "
+                    + baseDn + " " + localReplicationServer.getServerId()));
             stopServer(rsHandler, false);
           }
         }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index afe7fb6..6f96bda 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -27,11 +27,6 @@
  */
 package org.opends.server.replication.server;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.protocol.ProtocolVersion.*;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +41,11 @@
 import org.opends.server.replication.protocol.*;
 import org.opends.server.types.*;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
+
 /**
  * This class defines a server handler, which handles all interaction with a
  * peer replication server.
@@ -82,10 +82,8 @@
       generationId = inReplServerStartMsg.getGenerationId();
       serverId = inReplServerStartMsg.getServerId();
       serverURL = inReplServerStartMsg.getServerURL();
-      int separator = serverURL.lastIndexOf(':');
-      serverAddressURL =
-        session.getRemoteAddress() + ":" + serverURL.substring(separator +
-            1);
+      final String port = serverURL.substring(serverURL.lastIndexOf(':') + 1);
+      serverAddressURL = session.getRemoteAddress() + ":" + port;
       setBaseDNAndDomain(inReplServerStartMsg.getBaseDn(), false);
       setInitialServerState(inReplServerStartMsg.getServerState());
       setSendWindowSize(inReplServerStartMsg.getWindowSize());
@@ -119,8 +117,7 @@
         getReplicationServerId(), getReplicationServerURL(), getBaseDN(),
         maxRcvWindow, replicationServerDomain.getDbServerState(),
         localGenerationId, sslEncryption,
-        getLocalGroupId(), replicationServerDomain.getReplicationServer()
-            .getDegradedStatusThreshold());
+        getLocalGroupId(), replicationServer.getDegradedStatusThreshold());
     send(outReplServerStartMsg);
     return outReplServerStartMsg;
   }
@@ -296,7 +293,7 @@
     finally
     {
       // Release domain
-      if ((replicationServerDomain != null) &&
+      if (replicationServerDomain != null &&
           replicationServerDomain.hasLock())
         replicationServerDomain.release();
     }
@@ -374,11 +371,9 @@
         {
           if (debugEnabled())
           {
-            TRACER.debugInfo("In " +
-              replicationServerDomain.getReplicationServer().
-              getMonitorInstanceName() +
-              this + " RS V1 with serverID=" + serverId +
-              " is connected with the right generation ID");
+            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+                + " " + this + " RS V1 with serverID=" + serverId
+                + " is connected with the right generation ID");
           }
         } else
         {
@@ -420,10 +415,9 @@
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
       }
-      Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(Integer
-          .toString(inReplServerStartMsg.getServerId()), Integer
-          .toString(replicationServerDomain.getReplicationServer()
-              .getServerId()));
+      Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(
+          Integer.toString(inReplServerStartMsg.getServerId()),
+          Integer.toString(replicationServer.getServerId()));
       abortStart(errMessage);
     }
     catch (DirectoryException e)
@@ -444,7 +438,7 @@
     }
     finally
     {
-      if ((replicationServerDomain != null) &&
+      if (replicationServerDomain != null &&
           replicationServerDomain.hasLock())
         replicationServerDomain.release();
     }
@@ -489,12 +483,10 @@
         // connection attempt.
         return null;
       }
-      else
-      {
-        Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
-            .getClass().getCanonicalName(), "TopologyMsg");
-        throw new DirectoryException(ResultCode.OTHER, message);
-      }
+
+      Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(
+          msg.getClass().getCanonicalName(), "TopologyMsg");
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
 
     // Remote RS sent his topo msg
@@ -518,10 +510,9 @@
     {
       if (debugEnabled())
       {
-        TRACER.debugInfo("In " +
-            replicationServerDomain.getReplicationServer().
-            getMonitorInstanceName() + " RS with serverID=" + serverId +
-            " is connected with the right generation ID, same as local ="
+        TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
+            + " RS with serverID=" + serverId
+            + " is connected with the right generation ID, same as local ="
             + generationId);
       }
     }
@@ -541,42 +532,40 @@
   {
     if (localGenerationId > 0)
     { // the local RS is initialized
-      if (generationId > 0)
-      { // the remote RS is initialized.
-        // If not, there's nothing to do anyway.
-        if (generationId != localGenerationId)
-        {
-          /* Either:
-           *
-           * 1) The 2 RS have different generationID
-           * replicationServerDomain.getGenerationIdSavedStatus() == true
-           *
-           * if the present RS has received changes regarding its
-           * gen ID and so won't change without a reset
-           * then  we are just degrading the peer.
-           *
-           * 2) This RS has never received any changes for the current
-           * generation ID.
-           *
-           * Example case:
-           * - we are in RS1
-           * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
-           * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
-           * - we are in RS1 and we receive a START msg from RS2
-           * - Each RS keeps its genID / is degraded and when LS2
-           * will be populated from LS1 everything will become ok.
-           *
-           * Issue:
-           * FIXME : Would it be a good idea in some cases to just set the
-           * gen ID received from the peer RS specially if the peer has a
-           * non null state and we have a null state ?
-           * replicationServerDomain.setGenerationId(generationId, false);
-           */
-          Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
-                  serverId, session.getReadableRemoteAddress(), generationId,
-                  getBaseDN(), getReplicationServerId(), localGenerationId);
-          logError(message);
-        }
+      if (generationId > 0
+          // the remote RS is initialized. If not, there's nothing to do anyway.
+          && generationId != localGenerationId)
+      {
+        /* Either:
+         *
+         * 1) The 2 RS have different generationID
+         * replicationServerDomain.getGenerationIdSavedStatus() == true
+         *
+         * if the present RS has received changes regarding its
+         * gen ID and so won't change without a reset
+         * then  we are just degrading the peer.
+         *
+         * 2) This RS has never received any changes for the current
+         * generation ID.
+         *
+         * Example case:
+         * - we are in RS1
+         * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+         * - RS1 has genId1 from LS1 /genId1 comes from data in suffix
+         * - we are in RS1 and we receive a START msg from RS2
+         * - Each RS keeps its genID / is degraded and when LS2
+         * will be populated from LS1 everything will become ok.
+         *
+         * Issue:
+         * FIXME : Would it be a good idea in some cases to just set the
+         * gen ID received from the peer RS specially if the peer has a
+         * non null state and we have a null state ?
+         * replicationServerDomain.setGenerationId(generationId, false);
+         */
+        Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(
+            serverId, session.getReadableRemoteAddress(), generationId,
+            getBaseDN(), getReplicationServerId(), localGenerationId);
+        logError(message);
       }
     }
     else
@@ -655,9 +644,7 @@
     groupId = rsInfo.getGroupId();
     weight = rsInfo.getWeight();
 
-    /**
-     * Store info for DSs connected to the peer RS
-     */
+    // Store info for DSs connected to the peer RS
     List<DSInfo> dsInfos = topoMsg.getDsList();
 
     synchronized (remoteDirectoryServers)
@@ -688,18 +675,18 @@
    * When this handler is connected to a replication server, specifies if
    * a wanted server is connected to this replication server.
    *
-   * @param wantedServer The server we want to know if it is connected
+   * @param serverId The server we want to know if it is connected
    * to the replication server represented by this handler.
    * @return boolean True is the wanted server is connected to the server
    * represented by this handler.
    */
-  public boolean isRemoteLDAPServer(int wantedServer)
+  public boolean isRemoteLDAPServer(int serverId)
   {
     synchronized (remoteDirectoryServers)
     {
       for (LightweightServerHandler server : remoteDirectoryServers.values())
       {
-        if (wantedServer == server.getServerId())
+        if (serverId == server.getServerId())
         {
           return true;
         }
@@ -765,9 +752,8 @@
     MonitorData md = replicationServerDomain.getDomainMonitorData();
 
     // Missing changes
-    long missingChanges = md.getMissingChangesRS(serverId);
     attributes.add(Attributes.create("missing-changes",
-        String.valueOf(missingChanges)));
+        String.valueOf(md.getMissingChangesRS(serverId))));
 
     /* get the Server State */
     AttributeBuilder builder = new AttributeBuilder("server-state");
@@ -791,17 +777,10 @@
   {
     if (serverId != 0)
     {
-      StringBuilder builder = new StringBuilder("Replication server RS(");
-      builder.append(serverId);
-      builder.append(") for domain \"");
-      builder.append(replicationServerDomain.getBaseDn());
-      builder.append("\"");
-      return builder.toString();
+      return "Replication server RS(" + serverId + ") for domain \""
+          + replicationServerDomain.getBaseDn() + "\"";
     }
-    else
-    {
-      return "Unknown server";
-    }
+    return "Unknown server";
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 50362de..0708ad9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,8 +44,7 @@
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationIterator;
-import org.opends.server.replication.server.changelog.je.ReplicationDB
-    .ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.InitializationException;
@@ -265,22 +264,20 @@
    * managed by this dbHandler and starting at the position defined
    * by a given changeNumber.
    *
-   * @param changeNumber The position where the iterator must start.
-   *
+   * @param startAfterCN The position where the iterator must start.
    * @return a new ReplicationIterator that allows to browse the db
    *         managed by this dbHandler and starting at the position defined
    *         by a given changeNumber.
-   *
    * @throws ChangelogException if a database problem happened.
    */
-  public ReplicationIterator generateIterator(ChangeNumber changeNumber)
+  public ReplicationIterator generateIterator(ChangeNumber startAfterCN)
       throws ChangelogException
   {
-    if (changeNumber == null)
+    if (startAfterCN == null)
     {
       flush();
     }
-    return new JEReplicationIterator(db, changeNumber, this);
+    return new JEReplicationIterator(db, startAfterCN, this);
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
index e09fef5..e8dae9f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
@@ -32,8 +32,7 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationIterator;
-import org.opends.server.replication.server.changelog.je.ReplicationDB
-    .ReplServerDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
 
 /**
  * Berkeley DB JE implementation of IReplicationIterator.
@@ -52,20 +51,20 @@
    * releaseCursor() method.
    *
    * @param db The db where the iterator must be created.
-   * @param changeNumber The ChangeNumber after which the iterator must start.
+   * @param startAfterCN The ChangeNumber after which the iterator must start.
    * @param dbHandler The associated DbHandler.
    * @throws ChangelogException if a database problem happened.
    */
-  public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
+  public JEReplicationIterator(ReplicationDB db, ChangeNumber startAfterCN,
       DbHandler dbHandler) throws ChangelogException
   {
     this.db = db;
     this.dbHandler = dbHandler;
-    this.lastNonNullCurrentCN = changeNumber;
+    this.lastNonNullCurrentCN = startAfterCN;
 
     try
     {
-      cursor = db.openReadCursor(changeNumber);
+      cursor = db.openReadCursor(startAfterCN);
     }
     catch(Exception e)
     {
@@ -79,7 +78,7 @@
       dbHandler.flush();
 
       // look again in the db
-      cursor = db.openReadCursor(changeNumber);
+      cursor = db.openReadCursor(startAfterCN);
       if (cursor == null)
       {
         throw new ChangelogException(Message.raw("no new change"));

--
Gitblit v1.10.0