From 05d24dcca61eed7921987a98bb94d94a4aa030cd Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 01 Feb 2008 13:21:19 +0000
Subject: [PATCH] Fix 2598 - fixes for global replication monitoring

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java                             |   35 +
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java |   47 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                    |  294 ++++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java                                 |  199 ++++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                          |  400 ++++++++-------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                     |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java                                      |   21 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java         |   14 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java                         |   50 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java                                      |  329 +++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java                                     |    2 
 opendj-sdk/opends/src/messages/messages/replication.properties                                                          |    3 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java    |    4 
 14 files changed, 1,031 insertions(+), 371 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 39ddbcf..bdd615b 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -252,7 +252,8 @@
 NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
   failed: %s
 SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
- are missing due to an error in the retrieval process
+ are missing due to an error in the retrieval process. Potentially a server \
+ is too slow to provide its monitoring data over the protocol
 SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
  are missing due to a processing error : %s
 SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
index 1f8b711..04a11ef 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.common;
 
@@ -339,4 +339,23 @@
   {
     return list.isEmpty();
   }
+
+  /**
+   * Make a duplicate of this state.
+   * @return The duplicate of this state.
+   */
+  public ServerState duplicate()
+  {
+    ServerState newState = new ServerState();
+    synchronized (this)
+    {
+      for (Short key  : list.keySet())
+      {
+        ChangeNumber change = list.get(key);
+        Short id =  change.getServerId();
+        newState.list.put(id,change);
+      }
+    }
+    return newState;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index a8eb808..a21ce43 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -433,7 +433,7 @@
       ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
         maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
         halfRcvWindow * 2, heartbeatInterval, state,
-        protocolVersion, generationId, isSslEncryption);
+        protocolVersion, generationId, isSslEncryption, !keepConnection);
       session.publish(msg);
 
       /*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
index 2e50e35..89ef59a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
@@ -65,15 +65,20 @@
   }
 
   /**
-   * Data structure to manage the state of the replication server
-   * and the state informations for the LDAP servers connected.
+   * Data structure to manage the state of this replication server
+   * and the state informations for the servers connected to it.
    *
    */
   class SubTopoMonitorData
   {
-    ServerState replServerState;
+    // This replication server DbState
+    ServerState replServerDbState;
+    // The data related to the LDAP servers connected to this RS
     HashMap<Short, ServerData> ldapStates =
       new HashMap<Short, ServerData>();
+    // The data related to the RS servers connected to this RS
+    HashMap<Short, ServerData> rsStates =
+      new HashMap<Short, ServerData>();
   }
 
   SubTopoMonitorData data = new SubTopoMonitorData();;
@@ -93,9 +98,9 @@
    * Sets the state of the replication server.
    * @param state The state.
    */
-  public void setReplServerState(ServerState state)
+  public void setReplServerDbState(ServerState state)
   {
-    data.replServerState = state;
+    data.replServerDbState = state;
   }
 
   /**
@@ -103,20 +108,27 @@
    * @param serverId The serverID.
    * @param state The server state.
    * @param approxFirstMissingDate  The approximation of the date
-   * of the older missing change.
-   *
+   * of the older missing change. null when none.
+   * @param isLDAP Specifies whether the server is a LS or a RS
    */
-  public void setLDAPServerState(short serverId, ServerState state,
-      Long approxFirstMissingDate)
+  public void setServerState(short serverId, ServerState state,
+      Long approxFirstMissingDate, boolean isLDAP)
   {
     if (data.ldapStates == null)
     {
       data.ldapStates = new HashMap<Short, ServerData>();
     }
+    if (data.rsStates == null)
+    {
+      data.rsStates = new HashMap<Short, ServerData>();
+    }
     ServerData sd = new ServerData();
     sd.state = state;
     sd.approxFirstMissingDate = approxFirstMissingDate;
-    data.ldapStates.put(serverId, sd);
+    if (isLDAP)
+      data.ldapStates.put(serverId, sd);
+    else
+      data.rsStates.put(serverId, sd);
   }
 
   /**
@@ -130,16 +142,37 @@
   }
 
   /**
+   * Get the server state for the RS server with the provided serverId.
+   * @param serverId The provided serverId.
+   * @return The state.
+   */
+  public ServerState getRSServerState(short serverId)
+  {
+    return data.rsStates.get(serverId).state;
+  }
+
+
+  /**
    * Get the approximation of the date of the older missing change for the
    * LDAP Server with the provided server Id.
    * @param serverId The provided serverId.
    * @return The approximated state.
    */
-  public Long getApproxFirstMissingDate(short serverId)
+  public Long getLDAPApproxFirstMissingDate(short serverId)
   {
     return data.ldapStates.get(serverId).approxFirstMissingDate;
   }
 
+  /**
+   * Get the approximation of the date of the older missing change for the
+   * RS Server with the provided server Id.
+   * @param serverId The provided serverId.
+   * @return The approximated state.
+   */
+  public Long getRSApproxFirstMissingDate(short serverId)
+  {
+    return data.rsStates.get(serverId).approxFirstMissingDate;
+  }
 
   /**
    * Creates a new EntryMessage from its encoded form.
@@ -182,39 +215,51 @@
       try
       {
         ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
+        // loop on the servers
         for (ASN1Element el0 : s0.elements())
         {
           ServerState newState = new ServerState();
           short serverId = 0;
           Long outime = (long)0;
+          boolean isLDAPServer = false;
           ASN1Sequence s1 = el0.decodeAsSequence();
+
+          // loop on the list of CN of the state
           for (ASN1Element el1 : s1.elements())
           {
             ASN1OctetString o = el1.decodeAsOctetString();
             String s = o.stringValue();
             ChangeNumber cn = new ChangeNumber(s);
-            if ((data.replServerState != null) && (serverId == 0))
+            if ((data.replServerDbState != null) && (serverId == 0))
             {
+              // we are on the first CN that is a fake CN to store the serverId
+              // and the older update time
               serverId = cn.getServerId();
               outime = cn.getTime();
+              isLDAPServer = (cn.getSeqnum()>0);
             }
             else
             {
+              // we are on a normal CN
               newState.update(cn);
             }
           }
 
-          // the first state is the replication state
-          if (data.replServerState == null)
+          if (data.replServerDbState == null)
           {
-            data.replServerState = newState;
+            // the first state is the replication state
+            data.replServerDbState = newState;
           }
           else
           {
+            // the next states are the server states
             ServerData sd = new ServerData();
             sd.state = newState;
             sd.approxFirstMissingDate = outime;
-            data.ldapStates.put(serverId, sd);
+            if (isLDAPServer)
+              data.ldapStates.put(serverId, sd);
+            else
+              data.rsStates.put(serverId, sd);
           }
         }
       } catch(Exception e)
@@ -245,14 +290,17 @@
       ASN1Sequence stateElementSequence = new ASN1Sequence();
       ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
 
-      // First loop computes the length
+      /**
+       * First loop computes the length
+       */
+
       /* Put the serverStates ... */
       stateElementSequence = new ASN1Sequence();
       stateElementList = new ArrayList<ASN1Element>();
 
       /* first put the Replication Server state */
       ArrayList<ASN1OctetString> cnOctetList =
-        data.replServerState.toASN1ArrayList();
+        data.replServerDbState.toASN1ArrayList();
       ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
       for (ASN1OctetString soci : cnOctetList)
       {
@@ -288,6 +336,35 @@
         cnSequence = new ASN1Sequence(cnElementList);
         stateElementList.add(cnSequence);
       }
+
+      // then the rs server data
+      servers = data.rsStates.keySet();
+      for (Short sid : servers)
+      {
+        // State
+        ServerState statei = data.rsStates.get(sid).state;
+        // First missing date
+        Long outime =  data.rsStates.get(sid).approxFirstMissingDate;
+
+        // retrieves the change numbers as an arrayList of ANSN1OctetString
+        cnOctetList = statei.toASN1ArrayList();
+        cnElementList = new ArrayList<ASN1Element>();
+
+        // a fake changenumber helps storing the LDAP server ID
+        // and the olderupdatetime
+        ChangeNumber cn = new ChangeNumber(outime,0,sid);
+        cnElementList.add(new ASN1OctetString(cn.toString()));
+
+        // the changenumbers
+        for (ASN1OctetString soci : cnOctetList)
+        {
+          cnElementList.add(soci);
+        }
+
+        cnSequence = new ASN1Sequence(cnElementList);
+        stateElementList.add(cnSequence);
+      }
+
       stateElementSequence.setElements(stateElementList);
       int seqLen = stateElementSequence.encode().length;
 
@@ -298,7 +375,9 @@
       // Allocate the array sized from the computed length
       byte[] resultByteArray = new byte[length];
 
-      // Second loop build the array
+      /**
+       * Second loop really builds the array
+       */
 
       /* put the type of the operation */
       resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
@@ -313,7 +392,7 @@
 
       /* first put the Replication Server state */
       cnOctetList =
-        data.replServerState.toASN1ArrayList();
+        data.replServerDbState.toASN1ArrayList();
       cnElementList = new ArrayList<ASN1Element>();
       for (ASN1OctetString soci : cnOctetList)
       {
@@ -322,7 +401,7 @@
       cnSequence = new ASN1Sequence(cnElementList);
       stateElementList.add(cnSequence);
 
-      // then the LDAP server state
+      // then the LDAP server datas
       servers = data.ldapStates.keySet();
       for (Short sid : servers)
       {
@@ -334,10 +413,10 @@
         cnElementList = new ArrayList<ASN1Element>();
 
         // a fake changenumber helps storing the LDAP server ID
-        ChangeNumber cn = new ChangeNumber(outime,0,sid);
+        ChangeNumber cn = new ChangeNumber(outime,1,sid);
         cnElementList.add(new ASN1OctetString(cn.toString()));
 
-        // the changenumbers
+        // the changenumbers that make the state
         for (ASN1OctetString soci : cnOctetList)
         {
           cnElementList.add(soci);
@@ -346,6 +425,33 @@
         cnSequence = new ASN1Sequence(cnElementList);
         stateElementList.add(cnSequence);
       }
+
+      // then the RS server datas
+      servers = data.rsStates.keySet();
+      for (Short sid : servers)
+      {
+        ServerState statei = data.rsStates.get(sid).state;
+        Long outime = data.rsStates.get(sid).approxFirstMissingDate;
+
+        // retrieves the change numbers as an arrayList of ANSN1OctetString
+        cnOctetList = statei.toASN1ArrayList();
+        cnElementList = new ArrayList<ASN1Element>();
+
+        // a fake changenumber helps storing the LDAP server ID
+        ChangeNumber cn = new ChangeNumber(outime,0,sid);
+        cnElementList.add(new ASN1OctetString(cn.toString()));
+
+        // the changenumbers that make the state
+        for (ASN1OctetString soci : cnOctetList)
+        {
+          cnElementList.add(soci);
+        }
+
+        cnSequence = new ASN1Sequence(cnElementList);
+        stateElementList.add(cnSequence);
+      }
+
+
       stateElementSequence.setElements(stateElementList);
       pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
 
@@ -361,41 +467,62 @@
    * Get the state of the replication server that sent this message.
    * @return The state.
    */
-  public ServerState getReplServerState()
+  public ServerState getReplServerDbState()
   {
-    return data.replServerState;
+    return data.replServerDbState;
   }
 
   /**
    * Returns an iterator on the serverId of the connected LDAP servers.
    * @return The iterator.
    */
-  public Iterator<Short> iterator()
+  public Iterator<Short> ldapIterator()
   {
     return data.ldapStates.keySet().iterator();
   }
 
   /**
+   * Returns an iterator on the serverId of the connected RS servers.
+   * @return The iterator.
+   */
+  public Iterator<Short> rsIterator()
+  {
+    return data.rsStates.keySet().iterator();
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
   public String toString()
   {
-    String stateS = " RState:";
-    stateS += "/" + data.replServerState.toString();
-    stateS += " LDAPStates:";
-    Iterator<ServerData> it = data.ldapStates.values().iterator();
-    while (it.hasNext())
+    String stateS = "\nRState:[";
+    stateS += data.replServerDbState.toString();
+    stateS += "]";
+
+    stateS += "\nLDAPStates:[";
+    for (Short sid : data.ldapStates.keySet())
     {
-      ServerData sd = it.next();
-      stateS += "/ state=" + sd.state.toString()
-      + " afmd=" + sd.approxFirstMissingDate + "] ";
+      ServerData sd = data.ldapStates.get(sid);
+      stateS +=
+               "\n[LSstate("+ sid + ")=" +
+                sd.state.toString() + "]" +
+                " afmd=" + sd.approxFirstMissingDate + "]";
     }
 
+    stateS += "\nRSStates:[";
+    for (Short sid : data.rsStates.keySet())
+    {
+      ServerData sd = data.rsStates.get(sid);
+      stateS +=
+               "\n[RSState("+ sid + ")=" +
+               sd.state.toString() + "]" +
+               " afmd=" + sd.approxFirstMissingDate + "]";
+    }
     String me = this.getClass().getCanonicalName() +
-    " sender=" + this.senderID +
+    "[ sender=" + this.senderID +
     " destination=" + this.destination +
-    " states=" + stateS +
+    " data=[" + stateS + "]" +
     "]";
     return me;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
index 878973e..9ac6b32 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -54,6 +54,7 @@
   private int maxReceiveDelay;
   private int maxSendDelay;
   private int windowSize;
+  private boolean handshakeOnly;
   private ServerState serverState = null;
 
   /**
@@ -87,6 +88,8 @@
    * @param generationId The generationId for this server.
    * @param sslEncryption Whether to continue using SSL to encrypt messages
    *                      after the start messages have been exchanged.
+   * @param handshakeOnly Whether this message is only to get an handshake
+   *                      with the server or not.
    */
   public ServerStartMessage(short serverId, DN baseDn, int maxReceiveDelay,
                             int maxReceiveQueue, int maxSendDelay,
@@ -95,7 +98,8 @@
                             ServerState serverState,
                             short protocolVersion,
                             long generationId,
-                            boolean sslEncryption)
+                            boolean sslEncryption,
+                            boolean handshakeOnly)
   {
     super(protocolVersion, generationId);
 
@@ -109,6 +113,7 @@
     this.heartbeatInterval = heartbeatInterval;
     this.sslEncryption = sslEncryption;
     this.serverState = serverState;
+    this.handshakeOnly = handshakeOnly;
 
     try
     {
@@ -209,10 +214,19 @@
       sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
       pos += length +1;
 
+
+      /*
+       * read the handshakeOnly flag
+       */
+      length = getNextLength(in, pos);
+      handshakeOnly = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
+      pos += length +1;
+
       /*
       * read the ServerState
       */
       serverState = new ServerState(in, pos, in.length-1);
+
     } catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
@@ -322,6 +336,8 @@
       byte[] byteSSLEncryption =
                      String.valueOf(sslEncryption).getBytes("UTF-8");
       byte[] byteServerState = serverState.getBytes();
+      byte[] byteHandshakeOnly =
+        String.valueOf(handshakeOnly).getBytes("UTF-8");
 
       int length = byteDn.length + 1 + byteServerId.length + 1 +
                    byteServerUrl.length + 1 +
@@ -332,6 +348,7 @@
                    byteWindowSize.length + 1 +
                    byteHeartbeatInterval.length + 1 +
                    byteSSLEncryption.length + 1 +
+                   byteHandshakeOnly.length + 1 +
                    byteServerState.length + 1;
 
       /* encode the header in a byte[] large enough to also contain the mods */
@@ -358,6 +375,8 @@
 
       pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
 
+      pos = addByteArray(byteHandshakeOnly, resultByteArray, pos);
+
       pos = addByteArray(byteServerState, resultByteArray, pos);
 
       return resultByteArray;
@@ -401,4 +420,16 @@
   {
     return sslEncryption;
   }
+
+  /**
+   * Get the SSL encryption value for the ldap server that created the
+   * message.
+   *
+   * @return The SSL encryption value for the ldap server that created the
+   *         message.
+   */
+  public boolean isHandshakeOnly()
+  {
+    return handshakeOnly;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
index 0c42763..077a79c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -200,16 +200,6 @@
   @Override
   public ArrayList<Attribute> getMonitorData()
   {
-    if (debugEnabled())
-      TRACER.debugInfo(
-          "In " +
-          this.replServerHandler.getDomain().getReplicationServer().
-          getMonitorInstanceName()+
-          " LWSH for remote server " + this.serverId +
-          " connected to:" + this.replServerHandler.getMonitorInstanceName() +
-      " getMonitor data");
-
-
     ArrayList<Attribute> attributes = new ArrayList<Attribute>();
 
     attributes.add(new Attribute("server-id",
@@ -220,12 +210,12 @@
         replServerHandler.getMonitorInstanceName()));
 
     // Retrieves the topology counters
+    MonitorData md;
     try
     {
-      rsDomain.retrievesRemoteMonitorData();
+      md = rsDomain.getMonitorData();
 
-      // Compute the latency for the current SH
-      ServerState remoteState = rsDomain.getServerState(serverId);
+      ServerState remoteState = md.getLDAPServerState(serverId);
       if (remoteState == null)
       {
         remoteState = new ServerState();
@@ -241,29 +231,39 @@
       {
         values.add(new AttributeValue(type,str));
       }
+      if (values.size() == 0)
+      {
+        values.add(new AttributeValue(type,"unknown"));
+      }
       Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
       attributes.add(attr);
 
-      // add the latency attribute to our monitor data
-      // Compute the latency for the current SH
-      int missingChanges = rsDomain.getMissingChanges(remoteState);
-      attributes.add(new Attribute("missing-changes",
-          String.valueOf(missingChanges)));
-
-      // Add the oldest missing update
-      Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
-      if (olderUpdateTime != null)
+      // Oldest missing update
+      Long approxFirstMissingDate=md.getApproxFirstMissingDate(serverId);
+      if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
       {
-        Date date = new Date(olderUpdateTime);
+        Date date = new Date(approxFirstMissingDate);
         attributes.add(new Attribute("approx-older-change-not-synchronized",
           date.toString()));
         attributes.add(
-          new Attribute("approx-older-change-not-synchronized-millis",
-          String.valueOf(olderUpdateTime)));
+            new Attribute("approx-older-change-not-synchronized-millis",
+            String.valueOf(approxFirstMissingDate)));
       }
+
+      // Missing changes
+      long missingChanges = md.getMissingChanges(serverId);
+      attributes.add(new Attribute("missing-changes",
+          String.valueOf(missingChanges)));
+
+      // Replication delay
+      long delay = md.getApproxDelay(serverId);
+      attributes.add(new Attribute("approximate-delay",
+          String.valueOf(delay)));
+
     }
     catch(Exception e)
     {
+      // TODO: improve the log
       // We failed retrieving the remote monitor data.
       attributes.add(new Attribute("error",
         stackTraceToSingleLineString(e)));
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
new file mode 100644
index 0000000..0801257
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -0,0 +1,329 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.util.TimeThread;
+
+/**
+ * This class defines the Monitor Data that are consolidated across the
+ * whole replication topology.
+ */
+public class MonitorData
+{
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
+  /**
+   *
+   * - For each server, the max (most recent) CN produced
+   *
+   * - For each server, its state i.e. the last processed from of each
+   *   other LDAP server.
+   *   The change latency (missing changes) will be
+   *   the difference between the max above and the state here
+   *
+   * - For each server, the date of the first missing change.
+   *   The time latency (delay) will be the difference between now and the
+   *   date of the first missing change.
+   */
+
+  /* The date of the last time they have been elaborated */
+  private long buildDate = 0;
+
+  // For each LDAP server, its server state
+  private ConcurrentHashMap<Short, ServerState> LDAPStates =
+    new ConcurrentHashMap<Short, ServerState>();
+
+  // For each LDAP server, the last(max) CN it published
+  private ConcurrentHashMap<Short, ChangeNumber> maxCNs =
+    new ConcurrentHashMap<Short, ChangeNumber>();
+
+  // For each LDAP server, an approximation of the date of the first missing
+  // change
+  private ConcurrentHashMap<Short, Long> fmd =
+    new ConcurrentHashMap<Short, Long>();
+
+  private ConcurrentHashMap<Short, Long> missingChanges =
+    new ConcurrentHashMap<Short, Long>();
+
+  // For each RS server, an approximation of the date of the first missing
+  // change
+  private ConcurrentHashMap<Short, Long> fmRSDate =
+    new ConcurrentHashMap<Short, Long>();
+
+
+  /**
+   * Get an approximation of the latency delay of the replication.
+   * @param serverId The server ID.
+   * @return The delay
+   */
+  public long getApproxDelay(short serverId)
+  {
+    Long afmd = fmd.get(serverId);
+    if ((afmd != null) && (afmd>0))
+      return ((this.getBuildDate() - afmd)/1000);
+    else
+      return 0;
+  }
+
+  /**
+   * Get an approximation of the date of the first missing update.
+   * @param serverId The server ID.
+   * @return The date.
+   */
+  public long getApproxFirstMissingDate(short serverId)
+  {
+    Long res;
+    if ((res = fmd.get(serverId)) != null)
+      return res;
+    return 0;
+  }
+
+  /**
+   * Get the number of missing changes.
+   * @param serverId The server ID.
+   * @return The number of missing changes.
+   */
+  public long getMissingChanges(short serverId)
+  {
+    Long res = missingChanges.get(serverId);
+    if (res==null)
+      return 0;
+    else
+      return res;
+  }
+
+  /**
+   * Build the monitor data that are computed from the collected ones.
+   */
+  public void completeComputing()
+  {
+    String mds = "";
+
+    // Computes the missing changes counters
+    // For each LSi ,
+    //   Regarding each other LSj
+    //    Sum the difference : max(LSj) - state(LSi)
+
+    Iterator<Short> lsiStateItr = this.LDAPStates.keySet().iterator();
+    while (lsiStateItr.hasNext())
+    {
+      Short lsiSid = lsiStateItr.next();
+      ServerState lsiState = this.LDAPStates.get(lsiSid);
+      Long lsiMissingChanges = (long)0;
+      if (lsiState != null)
+      {
+        Iterator<Short> lsjMaxItr = this.maxCNs.keySet().iterator();
+        while (lsjMaxItr.hasNext())
+        {
+          Short lsjSid = lsjMaxItr.next();
+          ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid);
+          ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid);
+
+          int missingChangesLsiLsj =
+            ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
+
+          mds +=
+            "+ diff("+lsjMaxCN+"-"
+                     +lsiLastCN+")="+missingChangesLsiLsj;
+
+          lsiMissingChanges += missingChangesLsiLsj;
+        }
+      }
+      mds += "=" + lsiMissingChanges;
+      this.missingChanges.put(lsiSid,lsiMissingChanges);
+
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "Complete monitor data : Missing changes ("+ lsiSid +")=" + mds);
+    }
+    this.setBuildDate(TimeThread.getTime());
+  }
+
+  /**
+   * Returns a <code>String</code> object representing this
+   * object's value.
+   * @return  a string representation of the value of this object in
+   */
+  public String toString()
+  {
+    String mds = "Monitor data=\n";
+
+    mds+= "Build date=" + this.getBuildDate();
+    // RS data
+    Iterator<Short> rsite = fmRSDate.keySet().iterator();
+    while (rsite.hasNext())
+    {
+      Short sid = rsite.next();
+      mds += "\nRSData(" + sid + ")=\t "+ "afmd=" + fmRSDate.get(sid);
+    }
+
+    // maxCNs
+    Iterator<Short> itc = maxCNs.keySet().iterator();
+    while (itc.hasNext())
+    {
+      Short sid = itc.next();
+      ChangeNumber cn = maxCNs.get(sid);
+      mds += "\nmaxCNs(" + sid + ")= " + cn.toString();
+    }
+
+    // LDAP data
+    Iterator<Short> lsite = LDAPStates.keySet().iterator();
+    while (lsite.hasNext())
+    {
+      Short sid = lsite.next();
+      ServerState ss = LDAPStates.get(sid);
+      mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
+      + "] afmd=" + this.getApproxFirstMissingDate(sid);
+      if (getBuildDate()>0)
+      {
+        mds += " missingDelay=" + this.getApproxDelay(sid);
+      }
+      mds +=" missingCount=" + missingChanges.get(sid);
+    }
+    //
+    mds += "--";
+    return mds;
+  }
+
+  /**
+   * Sets the build date of the data.
+   * @param buildDate The date.
+   */
+  public void setBuildDate(long buildDate)
+  {
+    this.buildDate = buildDate;
+  }
+
+  /**
+   * Returns the build date of the data.
+   * @return The date.
+   */
+  public long getBuildDate()
+  {
+    return buildDate;
+  }
+
+  /**
+   * From a provided state, sets the max CN of the monitor data.
+   * @param state the provided state.
+   */
+  public void setMaxCNs(ServerState state)
+  {
+    Iterator<Short> it = state.iterator();
+    while (it.hasNext())
+    {
+      short sid = it.next();
+      ChangeNumber newCN = state.getMaxChangeNumber(sid);
+      setMaxCN(sid, newCN);
+    }
+  }
+
+  /**
+   * For the provided serverId, sets the provided CN as the max if
+   * it is newer than the current max.
+   * @param serverId the provided serverId
+   * @param newCN the provided new CN
+   */
+  public void setMaxCN(short serverId, ChangeNumber newCN)
+  {
+    if (newCN==null) return;
+    ChangeNumber currentMaxCN = maxCNs.get(serverId);
+    if (currentMaxCN == null)
+    {
+      maxCNs.put(serverId, newCN);
+    }
+    else
+    {
+      if (newCN.newer(currentMaxCN))
+        maxCNs.replace(serverId, newCN);
+    }
+  }
+
+  /**
+   * Get the highest know change number of the LDAP server with the provided
+   * serverId.
+   * @param serverId The server ID.
+   * @return The highest change number.
+   */
+  public ChangeNumber getMaxCN(short serverId)
+  {
+    return maxCNs.get(serverId);
+  }
+
+  /**
+   * Get the state of the LDAP server with the provided serverId.
+   * @param serverId The server ID.
+   * @return The server state.
+   */
+  public ServerState getLDAPServerState(short serverId)
+  {
+    return LDAPStates.get(serverId);
+  }
+
+  /**
+   * Set the state of the LDAP server with the provided serverId.
+   * @param serverId The server ID.
+   * @param state The server state.
+   */
+  public void setLDAPServerState(short serverId, ServerState state)
+  {
+    LDAPStates.put(serverId, state);
+  }
+
+  /**
+   * Set the state of the LDAP server with the provided serverId.
+   * @param serverId The server ID.
+   * @param newFmd The first missing date.
+   */
+  public void setFirstMissingDate(short serverId, Long newFmd)
+  {
+    if (newFmd==null) return;
+    Long currentfmd = fmd.get(serverId);
+    if (currentfmd==null)
+    {
+      fmd.put(serverId, newFmd);
+    }
+    else
+    {
+      if ((newFmd!=0) && (newFmd<currentfmd))
+        fmd.replace(serverId, newFmd);
+    }
+  }
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 89726d4..77e5d5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -37,7 +37,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -129,8 +128,8 @@
 
   /* Monitor data management */
 
-  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
-  private long remoteMonitorDataLifeTime = 500;
+  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
+  private long monitorDataLifeTime = 500;
 
   /* Search op on monitor data is processed by a worker thread.
    * Requests are sent to the other RS,and responses are received by the
@@ -139,21 +138,11 @@
    */
   Semaphore remoteMonitorResponsesSemaphore;
 
-  /* The date of the last time they have been elaborated */
-  private long validityDate = 0;
-
-  // For each LDAP server, its server state
-  private HashMap<Short, ServerState> LDAPStates =
-    new HashMap<Short, ServerState>();
-
-  // For each LDAP server, the last CN it published
-  private HashMap<Short, ChangeNumber> maxCNs =
-    new HashMap<Short, ChangeNumber>();
-
-  // For each LDAP server, an approximation of the date of the first missing
-  // change
-  private HashMap<Short, Long> approxFirstMissingDate =
-    new HashMap<Short, Long>();
+  /**
+   * The monitor data consolidated over the topology.
+   */
+  private  MonitorData monitorData = new MonitorData();
+  private  MonitorData wrkMonitorData;
 
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
@@ -166,13 +155,7 @@
   {
     this.baseDn = baseDn;
     this.replicationServer = replicationServer;
-
-    if (debugEnabled())
-      TRACER.debugInfo(
-        "In " + this.replicationServer.getMonitorInstanceName() +
-        " Created Cache for " + baseDn + " " +
-        stackTraceToSingleLineString(new Exception()));
-}
+  }
 
   /**
    * Add an update that has been received to the list of
@@ -366,6 +349,10 @@
         {
           replicationServers.remove(handler.getServerId());
           handler.stopHandler();
+
+          // Update the remote replication servers with our list
+          // of connected LDAP servers
+          sendReplServerInfo();
         }
       }
       else
@@ -374,12 +361,12 @@
         {
           connectedServers.remove(handler.getServerId());
           handler.stopHandler();
+
+          // Update the remote replication servers with our list
+          // of connected LDAP servers
+          sendReplServerInfo();
         }
       }
-
-      // Update the remote replication servers with our list
-      // of connected LDAP servers
-      sendReplServerInfo();
   }
 
   /**
@@ -578,7 +565,8 @@
    *
    * @param serverId Identifier of the server for which the iterator is created.
    * @param changeNumber Starting point for the iterator.
-   * @return the created ReplicationIterator.
+   * @return the created ReplicationIterator. Null when no DB is available
+   * for the provided server Id.
    */
   public ReplicationIterator getChangelogIterator(short serverId,
                     ChangeNumber changeNumber)
@@ -591,7 +579,8 @@
     {
       return handler.generateIterator(changeNumber);
     }
-    catch (Exception e) {
+    catch (Exception e)
+    {
      return null;
     }
   }
@@ -759,6 +748,7 @@
    */
   public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
+
     // Test the message for which a ReplicationServer is expected
     // to be the destination
     if (msg.getDestination() == this.replicationServer.getServerId())
@@ -779,20 +769,33 @@
               replServerMonitorRequestMsg.getDestination(),
               replServerMonitorRequestMsg.getsenderID());
 
-        // Populate the RS state in the msg from the DbState
-        monitorMsg.setReplServerState(this.getDbServerState());
-
         // Populate for each connected LDAP Server
         // from the states stored in the serverHandler.
         // - the server state
         // - the older missing change
         for (ServerHandler lsh : this.connectedServers.values())
         {
-          monitorMsg.setLDAPServerState(
+          monitorMsg.setServerState(
               lsh.getServerId(),
               lsh.getServerState(),
-              lsh.getApproxFirstMissingDate());
+              lsh.getApproxFirstMissingDate(),
+              true);
         }
+
+        // Same for the connected RS
+        for (ServerHandler rsh : this.replicationServers.values())
+        {
+          monitorMsg.setServerState(
+              rsh.getServerId(),
+              rsh.getServerState(),
+              rsh.getApproxFirstMissingDate(),
+              false);
+        }
+
+        // Populate the RS state in the msg from the DbState
+        monitorMsg.setReplServerDbState(this.getDbServerState());
+
+
         try
         {
           senderHandler.send(monitorMsg);
@@ -1305,118 +1308,135 @@
       }
     }
 
-    /*
+    /* =======================
      * Monitor Data generation
+     * =======================
      */
 
     /**
-     * Retrieves the remote monitor data.
-     *
+     * Retrieves the global monitor data.
+     * @return The monitor data.
      * @throws DirectoryException When an error occurs.
      */
-    protected void retrievesRemoteMonitorData()
+    synchronized protected MonitorData getMonitorData()
       throws DirectoryException
     {
-      if (validityDate > TimeThread.getTime())
+      if (monitorData.getBuildDate() + monitorDataLifeTime
+          > TimeThread.getTime())
       {
-        // The current data are still valid. No need to renew them.
-        return;
+        if (debugEnabled())
+          TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " getRemoteMonitorData in cache");
+       // The current data are still valid. No need to renew them.
+        // FIXME
+        return null;
       }
 
-      // Clean
-      this.LDAPStates.clear();
-      this.maxCNs.clear();
-
-      // Init the maxCNs of our direct LDAP servers from our own dbstate
-      for (ServerHandler rs : connectedServers.values())
+      wrkMonitorData = new MonitorData();
+      synchronized(wrkMonitorData)
       {
-        short serverID = rs.getServerId();
-        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
-        if (cn == null)
-        {
-          // we have nothing in db for that server
-          cn = new ChangeNumber(0, 0 , serverID);
-        }
-        this.maxCNs.put(serverID, cn);
-      }
+        if (debugEnabled())
+          TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn + " Computing monitor data ");
 
-      ServerState replServerState = this.getDbServerState();
-      Iterator<Short> it = replServerState.iterator();
-      while (it.hasNext())
-      {
-        short sid = it.next();
-        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
-        ChangeNumber maxCN = this.maxCNs.get(sid);
-        if ((maxCN != null) && (receivedCN.newer(maxCN)))
+        // Let's process our directly connected LSes
+        // - in the ServerHandler for a given LS1, the stored state contains :
+        //   - the max CN produced by LS1
+        //   - the last CN consumed by LS1 from LS2..n
+        // - in the RSdomain/dbHandler, the built-in state contains :
+        //   - the max CN produced by each server
+        // So for a given LS connected we can take the state and the max from
+        // the LS/state.
+
+        for (ServerHandler directlsh : connectedServers.values())
         {
-          // We found a newer one
-          this.maxCNs.remove(sid);
-          this.maxCNs.put(sid, receivedCN);
+          short serverID = directlsh.getServerId();
+
+          // the state comes from the state stored in the SH
+          ServerState directlshState = directlsh.getServerState().duplicate();
+
+          // the max CN sent by that LS also comes from the SH
+          ChangeNumber maxcn = directlshState.getMaxChangeNumber(serverID);
+          if (maxcn == null)
+          {
+            // This directly connected LS has never produced any change
+            maxcn = new ChangeNumber(0, 0 , serverID);
+          }
+          wrkMonitorData.setMaxCN(serverID, maxcn);
+          wrkMonitorData.setLDAPServerState(serverID, directlshState);
+          wrkMonitorData.setFirstMissingDate(serverID, directlsh.
+                                             getApproxFirstMissingDate());
         }
+
+        // Then initialize the max CN for the LS that produced something
+        // - from our own local db state
+        // - whatever they are directly or undirectly connected
+        ServerState dbServerState = getDbServerState();
+        Iterator<Short> it = dbServerState.iterator();
+        while (it.hasNext())
+        {
+          short sid = it.next();
+          ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
+          wrkMonitorData.setMaxCN(sid, storedCN);
+        }
+
+        // Now we have used all available local informations
+        // and we need the remote ones.
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDn=" + baseDn + " Local monitor data: " +
+            wrkMonitorData.toString());
       }
 
       // Send Request to the other Replication Servers
       if (remoteMonitorResponsesSemaphore == null)
       {
-        remoteMonitorResponsesSemaphore = new Semaphore(
-            replicationServers.size() -1);
-
-        sendMonitorDataRequest();
-
+        remoteMonitorResponsesSemaphore = new Semaphore(0);
+        short requestCnt = sendMonitorDataRequest();
         // Wait reponses from them or timeout
-        waitMonitorDataResponses(replicationServers.size());
+        waitMonitorDataResponses(requestCnt);
       }
       else
       {
         // The processing of renewing the monitor cache is already running
         // We'll make it sleeping until the end
+        // TODO: unit test for this case.
         while (remoteMonitorResponsesSemaphore!=null)
         {
           waitMonitorDataResponses(1);
         }
       }
 
-      // Now we have the expected answers of an error occured
-      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+      wrkMonitorData.completeComputing();
 
-      if (debugEnabled())
+      // Store the new computed data as the reference
+      synchronized(monitorData)
       {
-        debugMonitorData();
-      }
-    }
-
-    private void debugMonitorData()
-    {
-      String mds = " Monitor data=";
-      Iterator<Short> ite = LDAPStates.keySet().iterator();
-      while (ite.hasNext())
-      {
-        Short sid = ite.next();
-        ServerState ss = LDAPStates.get(sid);
-        mds += " LDAPState(" + sid + ")=" + ss.toString();
-      }
-      Iterator<Short> itc = maxCNs.keySet().iterator();
-      while (itc.hasNext())
-      {
-        Short sid = itc.next();
-        ChangeNumber cn = maxCNs.get(sid);
-        mds += " maxCNs(" + sid + ")=" + cn.toString();
-      }
-
-      mds += "--";
-      TRACER.debugInfo(
+        // Now we have the expected answers or an error occured
+        monitorData = wrkMonitorData;
+        wrkMonitorData = null;
+        if (debugEnabled())
+          TRACER.debugInfo(
           "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDN=" + baseDn +
-          mds);
+          " baseDn=" + baseDn + " *** Computed MonitorData: " +
+          monitorData.toString());
+      }
+      return monitorData;
     }
 
+
     /**
      * Sends a MonitorRequest message to all connected RS.
+     * @return the number of requests sent.
      * @throws DirectoryException when a problem occurs.
      */
-    protected void sendMonitorDataRequest()
+    protected short sendMonitorDataRequest()
       throws DirectoryException
     {
+      short sent=0;
       try
       {
         for (ServerHandler rs : replicationServers.values())
@@ -1425,6 +1445,7 @@
             MonitorRequestMessage(this.replicationServer.getServerId(),
               rs.getServerId());
           rs.send(msg);
+          sent++;
         }
       }
       catch(Exception e)
@@ -1434,6 +1455,7 @@
         throw new DirectoryException(ResultCode.OTHER,
             message, e);
       }
+      return sent;
     }
 
     /**
@@ -1446,21 +1468,30 @@
     {
       try
       {
+        if (debugEnabled())
+          TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDn=" + baseDn +
+          " waiting for " + expectedResponses
+          + " expected monitor messages");
+
         boolean allPermitsAcquired =
           remoteMonitorResponsesSemaphore.tryAcquire(
               expectedResponses,
-              (long) 500, TimeUnit.MILLISECONDS);
+              (long) 5000, TimeUnit.MILLISECONDS);
 
         if (!allPermitsAcquired)
         {
           logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+          // FIXME let's go on in best effort even with limited data received.
         }
         else
         {
           if (debugEnabled())
             TRACER.debugInfo(
             "In " + this.replicationServer.getMonitorInstanceName() +
-            "Successfully received all " + replicationServers.size()
+            " baseDn=" + baseDn +
+            " Successfully received all " + expectedResponses
             + " expected monitor messages");
         }
       }
@@ -1482,48 +1513,94 @@
      */
     public void receivesMonitorDataResponse(MonitorMessage msg)
     {
+      if (debugEnabled())
+        TRACER.debugInfo(
+        "In " + this.replicationServer.getMonitorInstanceName() +
+        "Receiving " + msg + " from " + msg.getsenderID() +
+        remoteMonitorResponsesSemaphore);
+
       if (remoteMonitorResponsesSemaphore == null)
       {
-        // Ignoring the remote monitor data because an error occured previously
+        // FIXME
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            "Receiving " + msg + " from " + msg.getsenderID() +
+            " remoteMonitorResponsesSemaphore should not be null"));
+        // Ignoring the remote monitor data because an error occured
+        // previously
         return;
       }
 
       try
       {
-        // Here is the RS state : list <serverID, lastChangeNumber>
-        // For each LDAP Server, we keep the max CN accross the RSes
-        ServerState replServerState = msg.getReplServerState();
-        Iterator<Short> it = replServerState.iterator();
-        while (it.hasNext())
+        synchronized(wrkMonitorData)
         {
-          short sid = it.next();
-          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
-          ChangeNumber maxCN = this.maxCNs.get(sid);
-          if (receivedCN.newer(maxCN))
-          {
-            // We found a newer one
-            this.maxCNs.remove(sid);
-            this.maxCNs.put(sid, receivedCN);
-          }
-        }
+          // Here is the RS state : list <serverID, lastChangeNumber>
+          // For each LDAP Server, we keep the max CN accross the RSes
+          ServerState replServerState = msg.getReplServerDbState();
+          wrkMonitorData.setMaxCNs(replServerState);
 
-        // Store the LDAP servers states
-        Iterator<Short> sidIterator = msg.iterator();
-        while (sidIterator.hasNext())
-        {
-          short sid = sidIterator.next();
-          ServerState ss = msg.getLDAPServerState(sid);
-          this.LDAPStates.put(sid, ss);
-          this.approxFirstMissingDate.put(sid,
-              msg.getApproxFirstMissingDate(sid));
+          // Store the remote LDAP servers states
+          Iterator<Short> lsidIterator = msg.ldapIterator();
+          while (lsidIterator.hasNext())
+          {
+            short sid = lsidIterator.next();
+            wrkMonitorData.setLDAPServerState(sid,
+                msg.getLDAPServerState(sid).duplicate());
+            wrkMonitorData.setFirstMissingDate(sid,
+                msg.getLDAPApproxFirstMissingDate(sid));
+          }
+
+          // Process the latency reported by the remote RSi on its connections
+          // to the other RSes
+          Iterator<Short> rsidIterator = msg.rsIterator();
+          while (rsidIterator.hasNext())
+          {
+            short rsid = rsidIterator.next();
+            if (rsid == replicationServer.getServerId())
+            {
+              // this is the latency of the remote RSi regarding the current RS
+              // let's update the fmd of my connected LS
+              for (ServerHandler connectedlsh : connectedServers.values())
+              {
+                short connectedlsid = connectedlsh.getServerId();
+                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+                wrkMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+              }
+            }
+            else
+            {
+              // this is the latency of the remote RSi regarding another RSj
+              // let's update the latency of the LSes connected to RSj
+              ServerHandler rsjHdr = replicationServers.get(rsid);
+              for(short remotelsid : rsjHdr.getConnectedServerIds())
+              {
+                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
+                wrkMonitorData.setFirstMissingDate(remotelsid, newfmd);
+              }
+            }
+          }
+          if (debugEnabled())
+          {
+            if (debugEnabled())
+              TRACER.debugInfo(
+              "In " + this.replicationServer.getMonitorInstanceName() +
+              " baseDn=" + baseDn +
+              " Processed msg from " + msg.getsenderID() +
+              " New monitor data: " + wrkMonitorData.toString());
+          }
         }
 
         // Decreases the number of expected responses and potentially
         // wakes up the waiting requestor thread.
         remoteMonitorResponsesSemaphore.release();
+
       }
       catch (Exception e)
       {
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage() +
+            stackTraceToSingleLineString(e)));
+
         // If an exception occurs while processing one of the expected message,
         // the processing is aborted and the waiting thread is awoke.
         remoteMonitorResponsesSemaphore.notifyAll();
@@ -1531,65 +1608,6 @@
     }
 
     /**
-     * Get the state of the LDAP server with the provided serverId.
-     * @param serverId The server ID.
-     * @return The server state.
-     */
-    public ServerState getServerState(short serverId)
-    {
-      return LDAPStates.get(serverId);
-    }
-
-    /**
-     * Get the highest know change number of the LDAP server with the provided
-     * serverId.
-     * @param serverId The server ID.
-     * @return The highest change number.
-     */
-    public ChangeNumber getMaxCN(short serverId)
-    {
-      return maxCNs.get(serverId);
-    }
-
-    /**
-     * Get an approximation of the date of the oldest missing changes.
-     * serverId.
-     * @param serverId The server ID.
-     * @return The approximation of the date of the oldest missing change.
-     */
-    public Long getApproxFirstMissingDate(short serverId)
-    {
-      return approxFirstMissingDate.get(serverId);
-    }
-
-    /**
-     * Get the number of missing change for the server with the provided state.
-     * @param state The provided server state.
-     * @return The number of missing changes.
-     */
-    public int getMissingChanges(ServerState state)
-    {
-      // Traverse the max Cn transmitted by each server
-      // For each server, get the highest CN know from the current server
-      // Sum the difference betwenn the max and the last
-      int missingChanges = 0;
-      Iterator<Short> itc = maxCNs.keySet().iterator();
-      while (itc.hasNext())
-      {
-        Short sid = itc.next();
-        ChangeNumber maxCN = maxCNs.get(sid);
-        ChangeNumber last = state.getMaxChangeNumber(sid);
-        if (last == null)
-        {
-          last = new ChangeNumber(0,0, sid);
-        }
-        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
-        missingChanges += missingChangesFromSID;
-      }
-      return missingChanges;
-    }
-
-    /**
      * Set the purge delay on all the db Handlers for this Domain
      * of Replicaiton.
      *
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 3b85cb1..0e759c0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -45,6 +45,7 @@
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
@@ -124,12 +125,12 @@
 
 
   /**
-   * When this Handler is connected to a remote replication server
+   * When this Handler is related to a remote replication server
    * this collection will contain as many elements as there are
    * LDAP servers connected to the remote replication server.
    */
-  private List<LightweightServerHandler>
-     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
+  private Map<Short, LightweightServerHandler> connectedServers =
+    new ConcurrentHashMap<Short, LightweightServerHandler>();
 
   /**
    * The time in milliseconds between heartbeats from the replication
@@ -200,6 +201,8 @@
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
     long localGenerationId = -1;
+    boolean handshakeOnly = false;
+
     try
     {
       if (baseDn != null)
@@ -244,6 +247,8 @@
         maxSendQueue = receivedMsg.getMaxSendQueue();
         heartbeatInterval = receivedMsg.getHeartbeatInterval();
 
+        handshakeOnly = receivedMsg.isHandshakeOnly();
+
         // The session initiator decides whether to use SSL.
         sslEncryption = receivedMsg.getSSLEncryption();
 
@@ -524,60 +529,70 @@
       replicationServerDomain = replicationServer.
               getReplicationServerDomain(this.baseDn,true);
 
-      boolean started;
-      if (serverIsLDAPserver)
+      if (!handshakeOnly)
       {
-        started = replicationServerDomain.startServer(this);
-      }
-      else
-      {
-        started = replicationServerDomain.startReplicationServer(this);
-      }
-
-      if (started)
-      {
-        // sendWindow MUST be created before starting the writer
-        sendWindow = new Semaphore(sendWindowSize);
-
-        writer = new ServerWriter(session, serverId,
-                this, replicationServerDomain);
-        reader = new ServerReader(session, serverId,
-                this, replicationServerDomain);
-
-        reader.start();
-        writer.start();
-
-        // Create a thread to send heartbeat messages.
-        if (heartbeatInterval > 0)
+        boolean started;
+        if (serverIsLDAPserver)
         {
-          heartbeatThread = new HeartbeatThread(
-              "replication Heartbeat to " + serverURL +
-              " for " + this.baseDn,
-              session, heartbeatInterval/3);
-          heartbeatThread.start();
+          started = replicationServerDomain.startServer(this);
+        }
+        else
+        {
+          started = replicationServerDomain.startReplicationServer(this);
         }
 
-
-        DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
-        DirectoryServer.registerMonitorProvider(this);
-      }
-      else
-      {
-        // the connection is not valid, close it.
-        try
+        if (started)
         {
-          if (debugEnabled())
+          // sendWindow MUST be created before starting the writer
+          sendWindow = new Semaphore(sendWindowSize);
+
+          writer = new ServerWriter(session, serverId,
+              this, replicationServerDomain);
+          reader = new ServerReader(session, serverId,
+              this, replicationServerDomain);
+
+          reader.start();
+          writer.start();
+
+          // Create a thread to send heartbeat messages.
+          if (heartbeatInterval > 0)
           {
-            TRACER.debugInfo("In " +
-              replicationServerDomain.getReplicationServer().
-              getMonitorInstanceName() + " RS failed to start locally " +
-              " the connection from serverID="+serverId);
+            heartbeatThread = new HeartbeatThread(
+                "replication Heartbeat to " + serverURL +
+                " for " + this.baseDn,
+                session, heartbeatInterval/3);
+            heartbeatThread.start();
           }
-          session.close();
-        } catch (IOException e1)
-        {
-          // ignore
+
+          DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+          DirectoryServer.registerMonitorProvider(this);
         }
+        else
+        {
+          // the connection is not valid, close it.
+          try
+          {
+            if (debugEnabled())
+            {
+              TRACER.debugInfo("In " +
+                  replicationServerDomain.getReplicationServer().
+                  getMonitorInstanceName() + " RS failed to start locally " +
+                  " the connection from serverID="+serverId);
+            }
+            session.close();
+          } catch (IOException e1)
+          {
+            // ignore
+          }
+        }
+      }
+      else
+      {
+        // For a hanshakeOnly connection, let's only create a reader
+        // in order to detect the connection closure.
+        reader = new ServerReader(session, serverId,
+            this, replicationServerDomain);
+        reader.start();
       }
     }
     catch (Exception e)
@@ -842,22 +857,22 @@
   /**
    * Get the age of the older change that has not yet been replicated
    * to the server handled by this ServerHandler.
-   *
    * @return The age if the older change has not yet been replicated
    *         to the server handled by this ServerHandler.
    */
   public Long getApproxFirstMissingDate()
   {
-    // Get the older CN received
-    // From it, get the next sequence number
-    // Get the CN for the next sequence number
-    // If not present in the local RS db,
-    // then approximate with the older update time
-    ChangeNumber olderUpdateCN = getOlderUpdateCN();
-    if (olderUpdateCN == null)
-      return null;
+    Long result = (long)0;
 
-    return olderUpdateCN.getTime();
+    // Get the older CN received
+    ChangeNumber olderUpdateCN = getOlderUpdateCN();
+    if (olderUpdateCN != null)
+    {
+      // If not present in the local RS db,
+      // then approximate with the older update time
+      result=olderUpdateCN.getTime();
+    }
+    return result;
   }
 
   /**
@@ -874,29 +889,82 @@
 
   /**
    * Get the older Change Number for that server.
+   * Returns null when the queue is empty.
    * @return The older change number.
    */
   public ChangeNumber getOlderUpdateCN()
   {
+    ChangeNumber result = null;
     synchronized (msgQueue)
     {
       if (isFollowing())
       {
         if (msgQueue.isEmpty())
-          return null;
-
-        UpdateMessage msg = msgQueue.first();
-        return msg.getChangeNumber();
+        {
+          result=null;
+        }
+        else
+        {
+          UpdateMessage msg = msgQueue.first();
+          result = msg.getChangeNumber();
+        }
       }
       else
       {
         if (lateQueue.isEmpty())
-          return null;
+        {
+          // isFollowing is false AND lateQueue is empty
+          // We may be at the very moment when the writer has emptyed the
+          // lateQueue when it sent the last update. The writer will fill again
+          // the lateQueue when it will send the next update but we are not yet
+          // there. So let's take the last change not sent directly from
+          // the db.
 
-        UpdateMessage msg = lateQueue.first();
-        return msg.getChangeNumber();
+          ReplicationIteratorComparator comparator =
+            new ReplicationIteratorComparator();
+          SortedSet<ReplicationIterator> iteratorSortedSet =
+            new TreeSet<ReplicationIterator>(comparator);
+          try
+          {
+            // Build a list of candidates iterator (i.e. db i.e. server)
+            for (short serverId : replicationServerDomain.getServers())
+            {
+              // get the last already sent CN from that server
+              ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
+              // get an iterator in this server db from that last change
+              ReplicationIterator iterator =
+                replicationServerDomain.getChangelogIterator(serverId, lastCsn);
+              // if that iterator has changes, then it is a candidate
+              // it is added in the sorted list at a position given by its
+              // current change (see ReplicationIteratorComparator).
+              if ((iterator != null) && (iterator.getChange() != null))
+              {
+                iteratorSortedSet.add(iterator);
+              }
+            }
+            UpdateMessage msg = iteratorSortedSet.first().getChange();
+            result = msg.getChangeNumber();
+          }
+          catch(Exception e)
+          {
+            result=null;
+          }
+          finally
+          {
+            for (ReplicationIterator iterator : iteratorSortedSet)
+            {
+              iterator.releaseCursor();
+            }
+          }
+        }
+        else
+        {
+          UpdateMessage msg = lateQueue.first();
+          result = msg.getChangeNumber();
+        }
       }
     }
+    return result;
   }
 
   /**
@@ -958,7 +1026,7 @@
        */
       while (msgQueue.size() > maxQueueSize)
       {
-        following = false;
+        setFollowing(false);
         msgQueue.removeFirst();
       }
     }
@@ -1083,6 +1151,13 @@
               }
             }
           }
+
+          // The loop below relies on the fact that it is sorted based
+          // on the currentChange of each iterator to consider the next
+          // change accross all servers.
+          // Hence it is necessary to remove and eventual add again an iterator
+          // when looping in order to keep consistent the order of the
+          // iterators (see ReplicationIteratorComparator.
           while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
           {
             ReplicationIterator iterator = iteratorSortedSet.first();
@@ -1107,7 +1182,7 @@
             {
               if (msgQueue.size() < maxQueueSize)
               {
-                following = true;
+                setFollowing(true);
               }
             }
           }
@@ -1119,7 +1194,7 @@
               if (msgQueue.contains(msg))
               {
                 /* we finally catched up with the regular queue */
-                following = true;
+                setFollowing(true);
                 lateQueue.clear();
                 UpdateMessage msg1;
                 do
@@ -1459,14 +1534,6 @@
       attributes.add(new Attribute("connected-to", this.replicationServerDomain.
           getReplicationServer().getMonitorInstanceName()));
 
-      // Add the oldest missing update
-      Long olderUpdateTime = this.getApproxFirstMissingDate();
-      if (olderUpdateTime != null)
-      {
-        Date date = new Date(olderUpdateTime);
-        attributes.add(new Attribute("approx-older-change-not-synchronized",
-          date.toString()));
-      }
     }
     else
     {
@@ -1477,27 +1544,42 @@
     attributes.add(new Attribute("base-dn",
                                  baseDn.toString()));
 
-    // Update stats
-
-    // Retrieves the topology counters
     if (serverIsLDAPserver)
     {
+      MonitorData md;
       try
       {
-        replicationServerDomain.retrievesRemoteMonitorData();
+        md = replicationServerDomain.getMonitorData();
+
+        // Oldest missing update
+        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
+        if ((approxFirstMissingDate != null) && (approxFirstMissingDate>0))
+        {
+          Date date = new Date(approxFirstMissingDate);
+          attributes.add(new Attribute("approx-older-change-not-synchronized",
+              date.toString()));
+          attributes.add(
+              new Attribute("approx-older-change-not-synchronized-millis",
+                  String.valueOf(approxFirstMissingDate)));
+        }
+
+        // Missing changes
+        long missingChanges = md.getMissingChanges(serverId);
+        attributes.add(new Attribute("missing-changes",
+            String.valueOf(missingChanges)));
+
+        // Replication delay
+        long delay = md.getApproxDelay(serverId);
+        attributes.add(new Attribute("approximate-delay",
+            String.valueOf(delay)));
       }
       catch(Exception e)
       {
-        // FIXME: We failed retrieving the remote monitor data
+        // TODO: improve the log
+        // We failed retrieving the remote monitor data.
+        attributes.add(new Attribute("error",
+            stackTraceToSingleLineString(e)));
       }
-
-      // Compute the latency for the current SH
-      int missingChanges =
-        replicationServerDomain.getMissingChanges(serverState);
-
-      // add the latency attribute to our monitor data
-      attributes.add(new Attribute("missing-changes",
-          String.valueOf(missingChanges)));
     }
 
     // Deprecated
@@ -1532,8 +1614,6 @@
     attributes.add(new Attribute("waiting-changes",
         String.valueOf(getRcvMsgQueueSize())));
     // Age of oldest missing change
-    attributes.add(new Attribute("approximate-delay",
-                                 String.valueOf(getApproxDelay())));
 
     // Date of the oldest missing change
     long olderUpdateTime = getOlderUpdateTime();
@@ -1731,14 +1811,14 @@
      List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
      generationId = infoMsg.getGenerationId();
 
-     synchronized(remoteLDAPservers)
+     synchronized(connectedServers)
      {
        // Removes the existing structures
-       for (LightweightServerHandler lsh : remoteLDAPservers)
+       for (LightweightServerHandler lsh : connectedServers.values())
        {
          lsh.stopHandler();
        }
-       remoteLDAPservers.clear();
+       connectedServers.clear();
 
        // Creates the new structure according to the message received.
        for (String newConnectedServer : newRemoteLDAPservers)
@@ -1746,7 +1826,7 @@
          LightweightServerHandler lsh
          = new LightweightServerHandler(newConnectedServer, this);
          lsh.startHandler();
-         remoteLDAPservers.add(lsh);
+         connectedServers.put(lsh.getServerId(), lsh);
        }
      }
    }
@@ -1762,14 +1842,17 @@
     */
    public boolean isRemoteLDAPServer(short wantedServer)
    {
-     for (LightweightServerHandler server : remoteLDAPservers)
+     synchronized(connectedServers)
      {
-       if (wantedServer == server.getServerId())
+       for (LightweightServerHandler server : connectedServers.values())
        {
-         return true;
+         if (wantedServer == server.getServerId())
+         {
+           return true;
+         }
        }
+       return false;
      }
-     return false;
    }
 
    /**
@@ -1781,7 +1864,7 @@
     */
    public boolean hasRemoteLDAPServers()
    {
-     return !remoteLDAPservers.isEmpty();
+     return !connectedServers.isEmpty();
    }
 
   /**
@@ -1907,4 +1990,13 @@
   {
     return this.replicationServerDomain;
   }
+
+  /**
+   * Return a Set containing the servers known by this replicationServer.
+   * @return a set containing the servers known by this replicationServer.
+   */
+  public Set<Short> getConnectedServerIds()
+  {
+    return connectedServers.keySet();
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 323751e..943e6b7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -120,6 +120,7 @@
       {
         ReplicationMessage msg = session.receive();
 
+        /*
         if (debugEnabled())
         {
           TRACER.debugInfo(
@@ -128,6 +129,7 @@
               (handler.isReplicationServer()?" From RS ":" From LS")+
               " with serverId=" + serverId + " receives " + msg);
         }
+        */
         if (msg instanceof AckMessage)
         {
           AckMessage ack = (AckMessage) msg;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 698ce5e..2e47c7b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -120,6 +120,7 @@
           continue;
         }
 
+        /*
         if (debugEnabled())
         {
           TRACER.debugInfo(
@@ -131,6 +132,7 @@
             " server=" + handler.getServerId() +
             " generationId=" + handler.getGenerationId());
         }
+        */
         session.publish(update);
       }
     }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
index 2376e20..34a22dd 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -316,6 +316,10 @@
 
     CN1 = new ChangeNumber((long)0, 3, (short)0);
 
+    // 3-0 = 3
+    CN2 = new ChangeNumber((long)0, 0, (short)0);
+    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 3);
+    
     // 3-1 = 2
     CN2 = new ChangeNumber((long)0, 1, (short)0);
     assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
@@ -327,5 +331,15 @@
     // 3-4 == MAXINT (modulo)
     CN2 = new ChangeNumber((long)0, 4, (short)0);
     assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
+
+    CN1 = new ChangeNumber((long)0, 0, (short)0);
+
+    // 0-0 = 0
+    CN2 = new ChangeNumber((long)0, 0, (short)0);
+    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
+    
+    // 0-1 = MAXINT(modulo)
+    CN2 = new ChangeNumber((long)0, 1, (short)0);
+    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
   }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 592cc29..6d5c866 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -496,7 +496,7 @@
     state.update(new ChangeNumber((long)1, 1,(short)1));
     ServerStartMessage msg = new ServerStartMessage(serverId, baseDN,
         window, window, window, window, window, window, state, (short)1, 
-        (long)1, true);
+        (long)1, true, false);
     ServerStartMessage newMsg = new ServerStartMessage(msg.getBytes());
     assertEquals(msg.getServerId(), newMsg.getServerId());
     assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
@@ -511,6 +511,7 @@
         newMsg.getServerState().getMaxChangeNumber((short)1));
     assertEquals(msg.getVersion(), newMsg.getVersion());
     assertEquals(msg.getGenerationId(), newMsg.getGenerationId());
+    assertEquals(msg.isHandshakeOnly(), newMsg.isHandshakeOnly());
   }
 
   @DataProvider(name="changelogStart")
@@ -614,20 +615,28 @@
                                        (short) 123, sid2);
     s2.update(cn2);
 
+    // LS3 state
+    ServerState s3 = new ServerState();
+    short sid3 = 333;
+    ChangeNumber cn3 = new ChangeNumber(now,
+                                       (short) 123, sid3);
+    s3.update(cn3);
+
     MonitorMessage msg =
       new MonitorMessage(sender, dest);
-    msg.setReplServerState(rsState);
-    msg.setLDAPServerState(sid1, s1, now+1);
-    msg.setLDAPServerState(sid2, s2, now+2);
+    msg.setReplServerDbState(rsState);
+    msg.setServerState(sid1, s1, now+1, true);
+    msg.setServerState(sid2, s2, now+2, true);
+    msg.setServerState(sid3, s3, now+3, false);
     
     byte[] b = msg.getBytes();
     MonitorMessage newMsg = new MonitorMessage(b);
 
-    assertEquals(rsState, msg.getReplServerState());
-    assertEquals(newMsg.getReplServerState().toString(), 
-        msg.getReplServerState().toString());
+    assertEquals(rsState, msg.getReplServerDbState());
+    assertEquals(newMsg.getReplServerDbState().toString(), 
+        msg.getReplServerDbState().toString());
     
-    Iterator<Short> it = newMsg.iterator();
+    Iterator<Short> it = newMsg.ldapIterator();
     while (it.hasNext())
     {
       short sid = it.next();
@@ -635,16 +644,32 @@
       if (sid == sid1)
       {
         assertEquals(s.toString(), s1.toString(), "");
-        assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
+        assertEquals((Long)(now+1), newMsg.getLDAPApproxFirstMissingDate(sid), "");
       }
       else if (sid == sid2)
       {
         assertEquals(s.toString(), s2.toString());        
-        assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
+        assertEquals((Long)(now+2), newMsg.getLDAPApproxFirstMissingDate(sid), "");
       }
       else
       {
-        fail("Bad sid");
+        fail("Bad sid" + sid);
+      }
+    }
+
+    Iterator<Short> it2 = newMsg.rsIterator();
+    while (it2.hasNext())
+    {
+      short sid = it2.next();
+      ServerState s = newMsg.getRSServerState(sid);
+      if (sid == sid3)
+      {
+        assertEquals(s.toString(), s3.toString(), "");
+        assertEquals((Long)(now+3), newMsg.getRSApproxFirstMissingDate(sid), "");
+      }
+      else
+      {
+        fail("Bad sid " + sid);
       }
     }
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index 98b6a97..8186d82 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -874,7 +874,7 @@
       ServerStartMessage msg =
         new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
             0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
-            ProtocolVersion.currentVersion(), 0, sslEncryption);
+            ProtocolVersion.currentVersion(), 0, sslEncryption, false);
       session.publish(msg);
 
       // Read the Replication Server state from the ReplServerStartMessage that
@@ -907,7 +907,7 @@
           0, 0, 0, 0, WINDOW, (long) 5000, replServerState,
           ProtocolVersion.currentVersion(),
           ReplicationTestCase.getGenerationId(baseDn),
-          sslEncryption);
+          sslEncryption, false);
       session.publish(msg);
 
       // Read the ReplServerStartMessage that come back.

--
Gitblit v1.10.0