From 8adb35fa24b145ab593ceba3a0602dfa43e28cb7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 03 Jan 2008 09:41:49 +0000
Subject: [PATCH] 

---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java             |    7 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java |   74 +
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java                          |  122 +++
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java              |  599 ++++++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                    |  235 ++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java                                 |  397 ++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                          |  379 ++++++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                     |   15 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java                             |   14 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java         |   27 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java                         |  270 +++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java                                     |   36 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java                                     |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java                          |    6 
 opendj-sdk/opends/src/messages/messages/replication.properties                                                          |    6 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java                                         |    4 
 16 files changed, 2,101 insertions(+), 94 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index e41ddb7..d47fa34 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -251,4 +251,10 @@
   export-ldif command must be run as a task
 NOTICE_SSL_SERVER_CON_ATTEMPT_ERROR_105=SSL connection attempt from %s (%s) \
   failed: %s
+SEVERE_ERR_MISSING_REMOTE_MONITOR_DATA_106=Monitor data of remote servers \
+ are missing due to an error in the retrieval process
+SEVERE_ERR_PROCESSING_REMOTE_MONITOR_DATA_107=Monitor data of remote servers \
+ are missing due to a processing error : %s
+SEVERE_ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST_108=Exception raised when \
+ sending request to get remote monitor data
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 998efb2..57128aa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.common;
 
@@ -185,6 +185,40 @@
 
     }
   }
+
+  /**
+   * Computes the difference in number of changes between 2
+   * change numbers.
+   * @param op1 the first ChangeNumber
+   * @param op2 the second ChangeNumber
+   * @return the difference
+   */
+  public static int diffSeqNum(ChangeNumber op1, ChangeNumber op2)
+  {
+    int totalCount = 0;
+    int max = op1.getSeqnum();
+    if (op2 != null)
+    {
+      int current = op2.getSeqnum();
+      if (current == max)
+      {
+      }
+      else if (current < max)
+      {
+        totalCount += max - current;
+      }
+      else
+      {
+        totalCount += Integer.MAX_VALUE - (current - max) + 1;
+      }
+    }
+    else
+    {
+      totalCount += max;
+    }
+    return totalCount;
+  }
+
   /**
    * check if the current Object is strictly older than ChangeNumber
    * given in parameter.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index 5234eda..adc4b74 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -258,7 +258,7 @@
   @Override
   public String toString()
   {
-    return ("ADD " + getDn() + " " + getChangeNumber());
+    return ("ADD DN=(" + getDn() + ") CN=(" + getChangeNumber() + ")");
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
new file mode 100644
index 0000000..51863da
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMessage.java
@@ -0,0 +1,397 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.protocols.asn1.ASN1OctetString;
+import org.opends.server.protocols.asn1.ASN1Sequence;
+import org.opends.server.protocols.asn1.ASN1Element;
+import org.opends.server.replication.common.ChangeNumber;
+
+/**
+ * This message is part of the replication protocol.
+ * This message is sent by a server to one or several other servers and
+ * contain one entry to be sent over the protocol in the context of
+ * an import/export over the protocol.
+ */
+public class MonitorMessage extends RoutableMessage implements
+    Serializable
+{
+
+  private static final long serialVersionUID = -1900670921496804942L;
+
+  /**
+   * FIXME.
+   *
+   */
+  class ServerData
+  {
+    ServerState state;
+    Long approxFirstMissingDate;
+  }
+
+  /**
+   * FIXME.
+   *
+   */
+  class SubTopoMonitorData
+  {
+    ServerState replServerState;
+    HashMap<Short, ServerData> ldapStates =
+      new HashMap<Short, ServerData>();
+  }
+
+  SubTopoMonitorData data = new SubTopoMonitorData();;
+
+  /**
+   * Creates a new EntryMessage.
+   *
+   * @param sender The sender of this message.
+   * @param destination The destination of this message.
+   */
+  public MonitorMessage(short sender, short destination)
+  {
+    super(sender, destination);
+  }
+
+  /**
+   * FIXME.
+   * @param state a.
+   */
+  public void setReplServerState(ServerState state)
+  {
+    data.replServerState = state;
+  }
+
+  /**
+   * FIXME.
+   * @param serverId a.
+   * @param state a.
+   * @param olderUpdateTime a.
+   *
+   */
+  public void setLDAPServerState(short serverId, ServerState state,
+      Long olderUpdateTime)
+  {
+    if (data.ldapStates == null)
+    {
+      data.ldapStates = new HashMap<Short, ServerData>();
+    }
+    ServerData sd = new ServerData();
+    sd.state = state;
+    sd.approxFirstMissingDate = olderUpdateTime;
+    data.ldapStates.put(serverId, sd);
+  }
+
+  /**
+   * FIXME.
+   * @param serverId a.
+   * @return a.
+   */
+  public ServerState getLDAPServerState(short serverId)
+  {
+    return data.ldapStates.get(serverId).state;
+  }
+
+  /**
+   * FIXME.
+   * @param serverId a.
+   * @return a.
+   */
+  public Long getApproxFirstMissingDate(short serverId)
+  {
+    return data.ldapStates.get(serverId).approxFirstMissingDate;
+  }
+
+
+  /**
+   * Creates a new EntryMessage from its encoded form.
+   *
+   * @param in The byte array containing the encoded form of the message.
+   * @throws DataFormatException If the byte array does not contain a valid
+   *                             encoded form of the ServerStartMessage.
+   */
+  public MonitorMessage(byte[] in) throws DataFormatException
+  {
+    try
+    {
+      /* first byte is the type */
+      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
+        throw new DataFormatException("input is not a valid " +
+            this.getClass().getCanonicalName());
+      int pos = 1;
+
+      // sender
+      int length = getNextLength(in, pos);
+      String senderIDString = new String(in, pos, length, "UTF-8");
+      this.senderID = Short.valueOf(senderIDString);
+      pos += length +1;
+
+      // destination
+      length = getNextLength(in, pos);
+      String destinationString = new String(in, pos, length, "UTF-8");
+      this.destination = Short.valueOf(destinationString);
+      pos += length +1;
+
+       /* Read the states : all the remaining bytes but the terminating 0 */
+      byte[] encodedS = new byte[in.length-pos-1];
+      int i =0;
+      while (pos<in.length-1)
+      {
+        encodedS[i++] = in[pos++];
+      }
+
+
+      try
+      {
+        ASN1Sequence s0 = ASN1Sequence.decodeAsSequence(encodedS);
+        for (ASN1Element el0 : s0.elements())
+        {
+          ServerState newState = new ServerState();
+          short serverId = 0;
+          Long outime = (long)0;
+          ASN1Sequence s1 = el0.decodeAsSequence();
+          for (ASN1Element el1 : s1.elements())
+          {
+            ASN1OctetString o = el1.decodeAsOctetString();
+            String s = o.stringValue();
+            ChangeNumber cn = new ChangeNumber(s);
+            if ((data.replServerState != null) && (serverId == 0))
+            {
+              serverId = cn.getServerId();
+              outime = cn.getTime();
+            }
+            else
+            {
+              newState.update(cn);
+            }
+          }
+
+          // the first state is the replication state
+          if (data.replServerState == null)
+          {
+            data.replServerState = newState;
+          }
+          else
+          {
+            ServerData sd = new ServerData();
+            sd.state = newState;
+            sd.approxFirstMissingDate = outime;
+            data.ldapStates.put(serverId, sd);
+          }
+        }
+      } catch(Exception e)
+      {
+
+      }
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    try
+    {
+      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+      int length = 1 + senderBytes.length +
+                   1 + destinationBytes.length;
+
+      ASN1Sequence stateElementSequence = new ASN1Sequence();
+      ArrayList<ASN1Element> stateElementList = new ArrayList<ASN1Element>();
+
+      // First loop computes the length
+      /* Put the serverStates ... */
+      stateElementSequence = new ASN1Sequence();
+      stateElementList = new ArrayList<ASN1Element>();
+
+      /* first put the Replication Server state */
+      ArrayList<ASN1OctetString> cnOctetList =
+        data.replServerState.toASN1ArrayList();
+      ArrayList<ASN1Element> cnElementList = new ArrayList<ASN1Element>();
+      for (ASN1OctetString soci : cnOctetList)
+      {
+        cnElementList.add(soci);
+      }
+      ASN1Sequence cnSequence = new ASN1Sequence(cnElementList);
+      stateElementList.add(cnSequence);
+
+      // then the LDAP server data
+      Set<Short> servers = data.ldapStates.keySet();
+      for (Short sid : servers)
+      {
+        // State
+        ServerState statei = data.ldapStates.get(sid).state;
+        // First missing date
+        Long outime =  data.ldapStates.get(sid).approxFirstMissingDate;
+
+        // retrieves the change numbers as an arrayList of ANSN1OctetString
+        cnOctetList = statei.toASN1ArrayList();
+        cnElementList = new ArrayList<ASN1Element>();
+
+        // a fake changenumber helps storing the LDAP server ID
+        // and the olderupdatetime
+        ChangeNumber cn = new ChangeNumber(outime,0,sid);
+        cnElementList.add(new ASN1OctetString(cn.toString()));
+
+        // the changenumbers
+        for (ASN1OctetString soci : cnOctetList)
+        {
+          cnElementList.add(soci);
+        }
+
+        cnSequence = new ASN1Sequence(cnElementList);
+        stateElementList.add(cnSequence);
+      }
+      stateElementSequence.setElements(stateElementList);
+      int seqLen = stateElementSequence.encode().length;
+
+      //
+      length += seqLen;
+      length += 2;
+
+      // Allocate the array sized from the computed length
+      byte[] resultByteArray = new byte[length];
+
+      // Second loop build the array
+
+      /* put the type of the operation */
+      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
+      int pos = 1;
+
+      pos = addByteArray(senderBytes, resultByteArray, pos);
+      pos = addByteArray(destinationBytes, resultByteArray, pos);
+
+      /* Put the serverStates ... */
+      stateElementSequence = new ASN1Sequence();
+      stateElementList = new ArrayList<ASN1Element>();
+
+      /* first put the Replication Server state */
+      cnOctetList =
+        data.replServerState.toASN1ArrayList();
+      cnElementList = new ArrayList<ASN1Element>();
+      for (ASN1OctetString soci : cnOctetList)
+      {
+        cnElementList.add(soci);
+      }
+      cnSequence = new ASN1Sequence(cnElementList);
+      stateElementList.add(cnSequence);
+
+      // then the LDAP server state
+      servers = data.ldapStates.keySet();
+      for (Short sid : servers)
+      {
+        ServerState statei = data.ldapStates.get(sid).state;
+        Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
+
+        // retrieves the change numbers as an arrayList of ANSN1OctetString
+        cnOctetList = statei.toASN1ArrayList();
+        cnElementList = new ArrayList<ASN1Element>();
+
+        // a fake changenumber helps storing the LDAP server ID
+        ChangeNumber cn = new ChangeNumber(outime,0,sid);
+        cnElementList.add(new ASN1OctetString(cn.toString()));
+
+        // the changenumbers
+        for (ASN1OctetString soci : cnOctetList)
+        {
+          cnElementList.add(soci);
+        }
+
+        cnSequence = new ASN1Sequence(cnElementList);
+        stateElementList.add(cnSequence);
+      }
+      stateElementSequence.setElements(stateElementList);
+      pos = addByteArray(stateElementSequence.encode(), resultByteArray, pos);
+
+      return resultByteArray;
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      return null;
+    }
+  }
+
+  /**
+   * FIXME.
+   * @return FIXME.
+   */
+  public ServerState getReplServerState()
+  {
+    return data.replServerState;
+  }
+
+  /**
+   * FIXME.
+   * @return a.
+   */
+  public Iterator<Short> iterator()
+  {
+    return data.ldapStates.keySet().iterator();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String toString()
+  {
+    String stateS = " RState:";
+    stateS += "/" + data.replServerState.toString();
+    stateS += " LDAPStates:";
+    Iterator<ServerData> it = data.ldapStates.values().iterator();
+    while (it.hasNext())
+    {
+      ServerData sd = it.next();
+      stateS += "/ state=" + sd.state.toString()
+      + " afmd=" + sd.approxFirstMissingDate + "] ";
+    }
+
+    String me = this.getClass().getCanonicalName() +
+    " sender=" + this.senderID +
+    " destination=" + this.destination +
+    " states=" + stateS +
+    "]";
+    return me;
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
new file mode 100644
index 0000000..c762d41
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMessage.java
@@ -0,0 +1,122 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.util.zip.DataFormatException;
+
+/**
+ * This message is part of the replication protocol.
+ * FIXME: usage
+ */
+public class MonitorRequestMessage extends RoutableMessage implements
+    Serializable
+{
+
+  private static final long serialVersionUID = -2407640479423633234L;
+
+  /**
+   * Creates a message.
+   *
+   * @param sender The sender server of this message.
+   * @param destination The server or servers targetted by this message.
+   */
+  public MonitorRequestMessage(short sender, short destination)
+  {
+    super(sender, destination);
+  }
+
+  /**
+   * Creates a new message by decoding the provided byte array.
+   * @param in A byte array containing the encoded information for the message,
+   * @throws DataFormatException If the in does not contain a properly,
+   *                             encoded message.
+   */
+  public MonitorRequestMessage(byte[] in) throws DataFormatException
+  {
+    super();
+    try
+    {
+      // First byte is the type
+      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
+        throw new DataFormatException("input is not a valid " +
+            this.getClass().getCanonicalName());
+      int pos = 1;
+
+      // sender
+      int length = getNextLength(in, pos);
+      String senderString = new String(in, pos, length, "UTF-8");
+      this.senderID = Short.valueOf(senderString);
+      pos += length +1;
+
+      // destination
+      length = getNextLength(in, pos);
+      String destinationString = new String(in, pos, length, "UTF-8");
+      this.destination = Short.valueOf(destinationString);
+      pos += length +1;
+
+    } catch (UnsupportedEncodingException e)
+    {
+      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    try
+    {
+      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
+      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
+
+      int length = 1 + senderBytes.length + 1
+                     + destinationBytes.length + 1;
+
+      byte[] resultByteArray = new byte[length];
+
+      /* put the type of the operation */
+      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
+      int pos = 1;
+
+      /* put the sender */
+      pos = addByteArray(senderBytes, resultByteArray, pos);
+
+      /* put the destination */
+      pos = addByteArray(destinationBytes, resultByteArray, pos);
+
+      return resultByteArray;
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      return null;
+    }
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
index 22215c6..6109d02 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplServerInfoMessage.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2007-2008 Sun Microsystems, Inc.
  */
 
 package org.opends.server.replication.protocol;
@@ -67,7 +67,7 @@
       /* first byte is the type */
       if (in.length < 1 || in[0] != MSG_TYPE_REPL_SERVER_INFO)
         throw new DataFormatException(
-        "Input is not a valid changelogInfo Message.");
+        "Input is not a valid " + this.getClass().getCanonicalName());
 
       int pos = 1;
 
@@ -97,7 +97,7 @@
 
 
   /**
-   * Creates a new changelogInfo message from a list of the currently
+   * Creates a new ReplServerInfo message from a list of the currently
    * connected servers.
    *
    * @param connectedServers The list of currently connected servers ID.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index ba72bf7..9ec9f41 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -55,6 +55,8 @@
   static final byte MSG_TYPE_WINDOW_PROBE = 15;
   static final byte MSG_TYPE_REPL_SERVER_INFO = 16;
   static final byte MSG_TYPE_RESET_GENERATION_ID = 17;
+  static final byte MSG_TYPE_REPL_SERVER_MONITOR_REQUEST = 18;
+  static final byte MSG_TYPE_REPL_SERVER_MONITOR = 19;
 
   // Adding a new type of message here probably requires to
   // change accordingly generateMsg method below
@@ -79,6 +81,8 @@
    * MSG_TYPE_WINDOW_PROBE
    * MSG_TYPE_REPL_SERVER_INFO
    * MSG_TYPE_RESET_GENERATION_ID
+   * MSG_TYPE_REPL_SERVER_MONITOR_REQUEST
+   * MSG_TYPE_REPL_SERVER_MONITOR
    *
    * @return the byte[] representation of this message.
    * @throws UnsupportedEncodingException  When the encoding of the message
@@ -152,6 +156,12 @@
       case MSG_TYPE_REPL_SERVER_INFO:
         msg = new ReplServerInfoMessage(buffer);
       break;
+      case MSG_TYPE_REPL_SERVER_MONITOR_REQUEST:
+        msg = new MonitorRequestMessage(buffer);
+      break;
+      case MSG_TYPE_REPL_SERVER_MONITOR:
+        msg = new MonitorMessage(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
@@ -192,7 +202,7 @@
     while (in[offset++] != 0)
     {
       if (offset >= in.length)
-        throw new DataFormatException("byte[] is not a valid modify msg");
+        throw new DataFormatException("byte[] is not a valid msg");
       length++;
     }
     return length;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
new file mode 100644
index 0000000..63d6616
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -0,0 +1,270 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.LinkedHashSet;
+
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.config.ConfigException;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.DN;
+import org.opends.server.types.InitializationException;
+
+/**
+ * This class defines a server handler dedicated to the remote LDAP servers
+ * connected to a remote Replication Server.
+ * This class is necessary because we want to provide monitor entries for them
+ * and because the monitor API only allows one entry by MonitorProvider instance
+ * so that no other class can provide the monitor entry for these objects.
+ *
+ * One instance of this class is created for each instance of remote LDAP server
+ * connected to a remote Replication Server.
+ */
+public class LightweightServerHandler
+  extends MonitorProvider<MonitorProviderCfg>
+{
+  // The tracer object for the debug logger.
+  private static final DebugTracer TRACER = getTracer();
+
+  short serverId;
+  ServerHandler replServerHandler;
+  ReplicationServerDomain rsDomain;
+  DN baseDn;
+
+  /**
+   * Creates a new LighweightServerHandler with the provided serverid, connected
+   * to the remote Replication Server represented by replServerHandler.
+   *
+   * @param serverId The serverId of this remote LDAP server.
+   * @param replServerHandler The server handler of the Replication Server to
+   * which this LDAP server is remotely connected.
+   */
+  public LightweightServerHandler(String serverId,
+      ServerHandler replServerHandler)
+  {
+    super("Server Handler");
+    this.serverId = Short.valueOf(serverId);
+    this.replServerHandler = replServerHandler;
+    this.rsDomain = replServerHandler.getDomain();
+    this.baseDn = rsDomain.getBaseDn();
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In " +
+  replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+
+        " LWSH for remote server " + this.serverId +
+        " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+        " ()");
+}
+
+  /**
+   * Get the serverID associated with this LDAP server.
+   * @return The serverId.
+   */
+  public short getServerId()
+  {
+    return Short.valueOf(serverId);
+  }
+
+  /**
+   * Stop this server handler processing.
+   */
+  public void startHandler()
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+      "In " +
+replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
+      " LWSH for remote server " + this.serverId +
+      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+          " start");
+    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+    DirectoryServer.registerMonitorProvider(this);
+
+  }
+
+  /**
+   * Stop this server handler processing.
+   */
+  public void stopHandler()
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+      "In " +
+replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() +
+      " LWSH for remote server " + this.serverId +
+      " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+          " stop");
+    DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void initializeMonitorProvider(MonitorProviderCfg configuration)
+                          throws ConfigException,InitializationException
+  {
+    // Nothing to do for now
+  }
+
+  /**
+   * Retrieves the name of this monitor provider.  It should be unique among all
+   * monitor providers, including all instances of the same monitor provider.
+   *
+   * @return  The name of this monitor provider.
+   */
+  @Override
+  public String getMonitorInstanceName()
+  {
+    String serverURL=""; // FIXME
+    String str = baseDn.toString() + " " + serverURL + " "
+       + String.valueOf(serverId);
+    return "Undirect LDAP Server " + str;
+  }
+
+  /**
+   * Retrieves the length of time in milliseconds that should elapse between
+   * calls to the <CODE>updateMonitorData()</CODE> method.  A negative or zero
+   * return value indicates that the <CODE>updateMonitorData()</CODE> method
+   * should not be periodically invoked.
+   *
+   * @return  The length of time in milliseconds that should elapse between
+   *          calls to the <CODE>updateMonitorData()</CODE> method.
+   */
+  @Override
+  public long getUpdateInterval()
+  {
+    /* we don't wont to do polling on this monitor */
+    return 0;
+  }
+
+  /**
+   * Performs any processing periodic processing that may be desired to update
+   * the information associated with this monitor.  Note that best-effort
+   * attempts will be made to ensure that calls to this method come
+   * <CODE>getUpdateInterval()</CODE> milliseconds apart, but no guarantees will
+   * be made.
+   */
+  @Override
+  public void updateMonitorData()
+  {
+    // As long as getUpdateInterval() returns 0, this will never get called
+
+  }
+
+  /**
+   * Retrieves a set of attributes containing monitor data that should be
+   * returned to the client if the corresponding monitor entry is requested.
+   *
+   * @return  A set of attributes containing monitor data that should be
+   *          returned to the client if the corresponding monitor entry is
+   *          requested.
+   */
+  @Override
+  public ArrayList<Attribute> getMonitorData()
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+          "In " +
+          this.replServerHandler.getDomain().getReplicationServer().
+          getMonitorInstanceName()+
+          " LWSH for remote server " + this.serverId +
+          " connected to:" + this.replServerHandler.getMonitorInstanceName() +
+      " getMonitor data");
+
+
+    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
+
+    attributes.add(new Attribute("server-id",
+        String.valueOf(serverId)));
+    attributes.add(new Attribute("base-dn",
+        rsDomain.getBaseDn().toNormalizedString()));
+    attributes.add(new Attribute("connected-to",
+        replServerHandler.getMonitorInstanceName()));
+
+    // Retrieves the topology counters
+    try
+    {
+      rsDomain.retrievesRemoteMonitorData();
+
+      // Compute the latency for the current SH
+      ServerState remoteState = rsDomain.getServerState(serverId);
+      if (remoteState == null)
+      {
+        remoteState = new ServerState();
+      }
+
+      /* get the Server State */
+      final String ATTR_SERVER_STATE = "server-state";
+      AttributeType type =
+        DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE);
+      LinkedHashSet<AttributeValue> values =
+        new LinkedHashSet<AttributeValue>();
+      for (String str : remoteState.toStringSet())
+      {
+        values.add(new AttributeValue(type,str));
+      }
+      Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
+      attributes.add(attr);
+
+      // add the latency attribute to our monitor data
+      // Compute the latency for the current SH
+      int missingChanges = rsDomain.getMissingChanges(remoteState);
+      attributes.add(new Attribute("missing-changes",
+          String.valueOf(missingChanges)));
+
+      // Add the oldest missing update
+      Long olderUpdateTime = rsDomain.getApproxFirstMissingDate(serverId);
+      if (olderUpdateTime != null)
+      {
+        Date date = new Date(olderUpdateTime);
+        attributes.add(new Attribute("approx-older-change-not-synchronized",
+          date.toString()));
+      }
+    }
+    catch(Exception e)
+    {
+      // We failed retrieving the remote monitor data.
+      attributes.add(new Attribute("error",
+        stackTraceToSingleLineString(e)));
+    }
+    return attributes;
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 7eadd3e..6382ca3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,25 +22,31 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
+
 import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
@@ -49,9 +55,13 @@
 import org.opends.server.replication.protocol.RoutableMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.MonitorMessage;
+import org.opends.server.replication.protocol.MonitorRequestMessage;
 import org.opends.server.replication.protocol.ResetGenerationId;
 import org.opends.server.types.DN;
-
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
 
 /**
@@ -118,6 +128,34 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  /* Monitor data management */
+
+  // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
+  private long remoteMonitorDataLifeTime = 500;
+
+  /* Search op on monitor data is processed by a worker thread.
+   * Requests are sent to the other RS,and responses are received by the
+   * listener threads.
+   * The worker thread is awoke on this semaphore, or on timeout.
+   */
+  Semaphore remoteMonitorResponsesSemaphore;
+
+  /* The date of the last time they have been elaborated */
+  private long validityDate = 0;
+
+  // For each LDAP server, its server state
+  private HashMap<Short, ServerState> LDAPStates =
+    new HashMap<Short, ServerState>();
+
+  // For each LDAP server, the last CN it published
+  private HashMap<Short, ChangeNumber> maxCNs =
+    new HashMap<Short, ChangeNumber>();
+
+  // For each LDAP server, an approximation of the date of the first missing
+  // change
+  private HashMap<Short, Long> approxFirstMissingDate =
+    new HashMap<Short, Long>();
+
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
    *
@@ -352,7 +390,7 @@
         }
         else
         {
-          if (!rsh.getRemoteLDAPServers().isEmpty())
+          if (rsh.hasRemoteLDAPServers())
           {
             lDAPServersConnectedInTheTopology = true;
 
@@ -636,7 +674,7 @@
         // server connected
         for (ServerHandler rsh : replicationServers.values())
         {
-          if (!rsh.getRemoteLDAPServers().isEmpty())
+          if (rsh.hasRemoteLDAPServers())
           {
             servers.add(rsh);
           }
@@ -693,15 +731,58 @@
    */
   public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
-    // A replication server is not expected to be the destination
-    // of a routable message except for an error message.
+    // Test the message for which a ReplicationServer is expected
+    // to be the destination
     if (msg.getDestination() == this.replicationServer.getServerId())
     {
       if (msg instanceof ErrorMessage)
       {
         ErrorMessage errorMsg = (ErrorMessage)msg;
         logError(ERR_ERROR_MSG_RECEIVED.get(
-                   errorMsg.getDetails()));
+            errorMsg.getDetails()));
+      }
+      else if (msg instanceof MonitorRequestMessage)
+      {
+        MonitorRequestMessage replServerMonitorRequestMsg =
+          (MonitorRequestMessage) msg;
+
+        MonitorMessage monitorMsg =
+          new MonitorMessage(
+              replServerMonitorRequestMsg.getDestination(),
+              replServerMonitorRequestMsg.getsenderID());
+
+        // Populate the RS state in the msg from the DbState
+        monitorMsg.setReplServerState(this.getDbServerState());
+
+        // Populate for each connected LDAP Server
+        // from the states stored in the serverHandler.
+        // - the server state
+        // - the older missing change
+        for (ServerHandler lsh : this.connectedServers.values())
+        {
+          monitorMsg.setLDAPServerState(
+              lsh.getServerId(),
+              lsh.getServerState(),
+              lsh.getApproxFirstMissingDate());
+        }
+        try
+        {
+          senderHandler.send(monitorMsg);
+        }
+        catch(Exception e)
+        {
+          // We log the error. The requestor will detect a timeout or
+          // any other failure on the connection.
+          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+              Short.toString((msg.getDestination()))));
+        }
+      }
+      else if (msg instanceof MonitorMessage)
+      {
+        MonitorMessage monitorMsg =
+          (MonitorMessage) msg;
+
+        receivesMonitorDataResponse(monitorMsg);
       }
       else
       {
@@ -1156,4 +1237,288 @@
     {
       return replicationServer;
     }
+
+    /*
+     * Monitor Data generation
+     */
+
+    /**
+     * Retrieves the remote monitor data.
+     *
+     * @throws DirectoryException When an error occurs.
+     */
+    protected void retrievesRemoteMonitorData()
+      throws DirectoryException
+    {
+      if (validityDate > TimeThread.getTime())
+      {
+        // The current data are still valid. No need to renew them.
+        return;
+      }
+
+      // Clean
+      this.LDAPStates.clear();
+      this.maxCNs.clear();
+
+      // Init the maxCNs of our direct LDAP servers from our own dbstate
+      for (ServerHandler rs : connectedServers.values())
+      {
+        short serverID = rs.getServerId();
+        ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
+        if (cn == null)
+        {
+          // we have nothing in db for that server
+          cn = new ChangeNumber(0, 0 , serverID);
+        }
+        this.maxCNs.put(serverID, cn);
+      }
+
+      ServerState replServerState = this.getDbServerState();
+      Iterator<Short> it = replServerState.iterator();
+      while (it.hasNext())
+      {
+        short sid = it.next();
+        ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+        ChangeNumber maxCN = this.maxCNs.get(sid);
+        if ((maxCN != null) && (receivedCN.newer(maxCN)))
+        {
+          // We found a newer one
+          this.maxCNs.remove(sid);
+          this.maxCNs.put(sid, receivedCN);
+        }
+      }
+
+      // Send Request to the other Replication Servers
+      if (remoteMonitorResponsesSemaphore == null)
+      {
+        remoteMonitorResponsesSemaphore = new Semaphore(
+            replicationServers.size() -1);
+
+        sendMonitorDataRequest();
+
+        // Wait reponses from them or timeout
+        waitMonitorDataResponses(replicationServers.size());
+      }
+      else
+      {
+        // The processing of renewing the monitor cache is already running
+        // We'll make it sleeping until the end
+        while (remoteMonitorResponsesSemaphore!=null)
+        {
+          waitMonitorDataResponses(1);
+        }
+      }
+
+      // Now we have the expected answers of an error occured
+      validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
+
+      if (debugEnabled())
+      {
+        debugMonitorData();
+      }
+    }
+
+    private void debugMonitorData()
+    {
+      String mds = " Monitor data=";
+      Iterator<Short> ite = LDAPStates.keySet().iterator();
+      while (ite.hasNext())
+      {
+        Short sid = ite.next();
+        ServerState ss = LDAPStates.get(sid);
+        mds += " LDAPState(" + sid + ")=" + ss.toString();
+      }
+      Iterator<Short> itc = maxCNs.keySet().iterator();
+      while (itc.hasNext())
+      {
+        Short sid = itc.next();
+        ChangeNumber cn = maxCNs.get(sid);
+        mds += " maxCNs(" + sid + ")=" + cn.toString();
+      }
+
+      mds += "--";
+      TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDN=" + baseDn +
+          mds);
+    }
+
+    /**
+     * Sends a MonitorRequest message to all connected RS.
+     * @throws DirectoryException when a problem occurs.
+     */
+    protected void sendMonitorDataRequest()
+      throws DirectoryException
+    {
+      try
+      {
+        for (ServerHandler rs : replicationServers.values())
+        {
+          MonitorRequestMessage msg = new
+            MonitorRequestMessage(this.replicationServer.getServerId(),
+              rs.getServerId());
+          rs.send(msg);
+        }
+      }
+      catch(Exception e)
+      {
+        Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
+        logError(message);
+        throw new DirectoryException(ResultCode.OTHER,
+            message, e);
+      }
+    }
+
+    /**
+     * Wait for the expected count of received MonitorMessage.
+     * @param expectedResponses The number of expected answers.
+     * @throws DirectoryException When an error occurs.
+     */
+    protected void waitMonitorDataResponses(int expectedResponses)
+      throws DirectoryException
+    {
+      try
+      {
+        boolean allPermitsAcquired =
+          remoteMonitorResponsesSemaphore.tryAcquire(
+              expectedResponses,
+              (long) 500, TimeUnit.MILLISECONDS);
+
+        if (!allPermitsAcquired)
+        {
+          logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
+        }
+        else
+        {
+          if (debugEnabled())
+            TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            "Successfully received all " + replicationServers.size()
+            + " expected monitor messages");
+        }
+      }
+      catch(Exception e)
+      {
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
+      }
+      finally
+      {
+        remoteMonitorResponsesSemaphore = null;
+      }
+    }
+
+    /**
+     * Processes a Monitor message receives from a remote Replication Server
+     * and stores the data received.
+     *
+     * @param msg The message to be processed.
+     */
+    public void receivesMonitorDataResponse(MonitorMessage msg)
+    {
+      if (remoteMonitorResponsesSemaphore == null)
+      {
+        // Ignoring the remote monitor data because an error occured previously
+        return;
+      }
+
+      try
+      {
+        // Here is the RS state : list <serverID, lastChangeNumber>
+        // For each LDAP Server, we keep the max CN accross the RSes
+        ServerState replServerState = msg.getReplServerState();
+        Iterator<Short> it = replServerState.iterator();
+        while (it.hasNext())
+        {
+          short sid = it.next();
+          ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
+          ChangeNumber maxCN = this.maxCNs.get(sid);
+          if (receivedCN.newer(maxCN))
+          {
+            // We found a newer one
+            this.maxCNs.remove(sid);
+            this.maxCNs.put(sid, receivedCN);
+          }
+        }
+
+        // Store the LDAP servers states
+        Iterator<Short> sidIterator = msg.iterator();
+        while (sidIterator.hasNext())
+        {
+          short sid = sidIterator.next();
+          ServerState ss = msg.getLDAPServerState(sid);
+          this.LDAPStates.put(sid, ss);
+          this.approxFirstMissingDate.put(sid,
+              msg.getApproxFirstMissingDate(sid));
+        }
+
+        // Decreases the number of expected responses and potentially
+        // wakes up the waiting requestor thread.
+        remoteMonitorResponsesSemaphore.release();
+      }
+      catch (Exception e)
+      {
+        // If an exception occurs while processing one of the expected message,
+        // the processing is aborted and the waiting thread is awoke.
+        remoteMonitorResponsesSemaphore.notifyAll();
+      }
+    }
+
+    /**
+     * Get the state of the LDAP server with the provided serverId.
+     * @param serverId The server ID.
+     * @return The server state.
+     */
+    public ServerState getServerState(short serverId)
+    {
+      return LDAPStates.get(serverId);
+    }
+
+    /**
+     * Get the highest know change number of the LDAP server with the provided
+     * serverId.
+     * @param serverId The server ID.
+     * @return The highest change number.
+     */
+    public ChangeNumber getMaxCN(short serverId)
+    {
+      return maxCNs.get(serverId);
+    }
+
+    /**
+     * Get an approximation of the date of the oldest missing changes.
+     * serverId.
+     * @param serverId The server ID.
+     * @return The approximation of the date of the oldest missing change.
+     */
+    public Long getApproxFirstMissingDate(short serverId)
+    {
+      return approxFirstMissingDate.get(serverId);
+    }
+
+    /**
+     * Get the number of missing change for the server with the provided state.
+     * @param state The provided server state.
+     * @return The number of missing changes.
+     */
+    public int getMissingChanges(ServerState state)
+    {
+      // Traverse the max Cn transmitted by each server
+      // For each server, get the highest CN know from the current server
+      // Sum the difference betwenn the max and the last
+      int missingChanges = 0;
+      Iterator<Short> itc = maxCNs.keySet().iterator();
+      while (itc.hasNext())
+      {
+        Short sid = itc.next();
+        ChangeNumber maxCN = maxCNs.get(sid);
+        ChangeNumber last = state.getMaxChangeNumber(sid);
+        if (last == null)
+        {
+          last = new ChangeNumber(0,0, sid);
+        }
+        int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
+        missingChanges += missingChangesFromSID;
+      }
+      return missingChanges;
+    }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 2c4e3ba..cd499e3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -22,33 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
- */
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License").  You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- *      Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -62,9 +36,9 @@
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
+import java.util.Date;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -150,11 +124,12 @@
 
 
   /**
-   * When this Handler is connected to a changelog server this collection
-   * will contain the list of LDAP servers connected to the remote changelog
-   * server.
+   * When this Handler is connected to a remote replication server
+   * this collection will contain as many elements as there are
+   * LDAP servers connected to the remote replication server.
    */
-  private List<String> remoteLDAPservers = new ArrayList<String>();
+  private List<LightweightServerHandler>
+     remoteLDAPservers = new ArrayList<LightweightServerHandler>();
 
   /**
    * The time in milliseconds between heartbeats from the replication
@@ -830,27 +805,8 @@
        ServerState dbState = replicationServerDomain.getDbServerState();
        for (short id : dbState)
        {
-         int max = dbState.getMaxChangeNumber(id).getSeqnum();
-         ChangeNumber currentChange = serverState.getMaxChangeNumber(id);
-         if (currentChange != null)
-         {
-           int current = currentChange.getSeqnum();
-           if (current == max)
-           {
-           }
-           else if (current < max)
-           {
-             totalCount += max - current;
-           }
-           else
-           {
-             totalCount += Integer.MAX_VALUE - (current - max) + 1;
-           }
-         }
-         else
-         {
-           totalCount += max;
-         }
+         totalCount = ChangeNumber.diffSeqNum(dbState.getMaxChangeNumber(id),
+             serverState.getMaxChangeNumber(id));
        }
        return totalCount;
      }
@@ -858,7 +814,7 @@
   }
 
   /**
-   * Get an approximation of the delay by looking at the age of the odest
+   * Get an approximation of the delay by looking at the age of the oldest
    * message that has not been sent to this server.
    * This is an approximation because the age is calculated using the
    * clock of the servee where the replicationServer is currently running
@@ -886,25 +842,65 @@
    * @return The age if the older change has not yet been replicated
    *         to the server handled by this ServerHandler.
    */
+  public Long getApproxFirstMissingDate()
+  {
+    // Get the older CN received
+    // From it, get the next sequence number
+    // Get the CN for the next sequence number
+    // If not present in the local RS db,
+    // then approximate with the older update time
+    ChangeNumber olderUpdateCN = getOlderUpdateCN();
+    if (olderUpdateCN == null)
+      return null;
+
+    ReplicationIterator ri =
+      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
+    if (ri != null)
+    {
+      if (ri.next())
+      {
+        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
+        return firstMissingChange.getTime();
+      }
+    }
+    return olderUpdateCN.getTime();
+  }
+
+  /**
+   * Get the older update time for that server.
+   * @return The older update time.
+   */
   public long getOlderUpdateTime()
   {
+    ChangeNumber olderUpdateCN = getOlderUpdateCN();
+    if (olderUpdateCN == null)
+      return 0;
+    return  olderUpdateCN.getTime();
+  }
+
+  /**
+   * Get the older Change Number for that server.
+   * @return The older change number.
+   */
+  public ChangeNumber getOlderUpdateCN()
+  {
     synchronized (msgQueue)
     {
       if (isFollowing())
       {
         if (msgQueue.isEmpty())
-          return 0;
+          return null;
 
         UpdateMessage msg = msgQueue.first();
-        return msg.getChangeNumber().getTime();
+        return msg.getChangeNumber();
       }
       else
       {
         if (lateQueue.isEmpty())
-          return 0;
+          return null;
 
         UpdateMessage msg = lateQueue.first();
-        return msg.getChangeNumber().getTime();
+        return msg.getChangeNumber();
       }
     }
   }
@@ -1190,6 +1186,16 @@
   }
 
   /**
+   * Get the state of this server.
+   *
+   * @return ServerState the state for this server..
+   */
+  public ServerState getServerState()
+  {
+    return serverState;
+  }
+
+  /**
    * Stop this server handler processing.
    */
   public void stopHandler()
@@ -1397,7 +1403,7 @@
                  " " + serverURL + " " + String.valueOf(serverId);
 
     if (serverIsLDAPserver)
-      return "Remote LDAP Server " + str;
+      return "Direct LDAP Server " + str;
     else
       return "Remote Repl Server " + str;
   }
@@ -1445,28 +1451,68 @@
   {
     ArrayList<Attribute> attributes = new ArrayList<Attribute>();
     if (serverIsLDAPserver)
+    {
       attributes.add(new Attribute("LDAP-Server", serverURL));
+      attributes.add(new Attribute("connected-to", this.replicationServerDomain.
+          getReplicationServer().getMonitorInstanceName()));
+
+      // Add the oldest missing update
+      Long olderUpdateTime = this.getApproxFirstMissingDate();
+      if (olderUpdateTime != null)
+      {
+        Date date = new Date(olderUpdateTime);
+        attributes.add(new Attribute("approx-older-change-not-synchronized",
+          date.toString()));
+      }
+    }
     else
+    {
       attributes.add(new Attribute("ReplicationServer-Server", serverURL));
+    }
     attributes.add(new Attribute("server-id",
                                  String.valueOf(serverId)));
     attributes.add(new Attribute("base-dn",
                                  baseDn.toString()));
-    attributes.add(new Attribute("waiting-changes",
-                                 String.valueOf(getRcvMsgQueueSize())));
-    attributes.add(new Attribute("max-waiting-changes",
-                                 String.valueOf(maxQueueSize)));
-    attributes.add(new Attribute("update-waiting-acks",
-                                 String.valueOf(getWaitingAckSize())));
+
+    // Update stats
+
+    // Retrieves the topology counters
+    if (serverIsLDAPserver)
+    {
+      try
+      {
+        replicationServerDomain.retrievesRemoteMonitorData();
+      }
+      catch(Exception e)
+      {
+        // FIXME: We failed retrieving the remote monitor data
+      }
+
+      // Compute the latency for the current SH
+      int missingChanges =
+        replicationServerDomain.getMissingChanges(serverState);
+
+      // add the latency attribute to our monitor data
+      attributes.add(new Attribute("missing-changes",
+          String.valueOf(missingChanges)));
+    }
+
+    // Deprecated
+    // attributes.add(new Attribute("max-waiting-changes",
+    //                              String.valueOf(maxQueueSize)));
     attributes.add(new Attribute("update-sent",
                                  String.valueOf(getOutCount())));
     attributes.add(new Attribute("update-received",
                                  String.valueOf(getInCount())));
+
+    // Deprecated as long as assured is not exposed
+    attributes.add(new Attribute("update-waiting-acks",
+        String.valueOf(getWaitingAckSize())));
     attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount())));
     attributes.add(new Attribute("ack-received",
                                  String.valueOf(getInAckCount())));
-    attributes.add(new Attribute("approximate-delay",
-                                 String.valueOf(getApproxDelay())));
+
+    // Window stats
     attributes.add(new Attribute("max-send-window",
                                  String.valueOf(sendWindowSize)));
     attributes.add(new Attribute("current-send-window",
@@ -1475,6 +1521,18 @@
                                  String.valueOf(maxRcvWindow)));
     attributes.add(new Attribute("current-rcv-window",
                                  String.valueOf(rcvWindow)));
+
+    /*
+     * FIXME:PGB DEPRECATED
+     *
+    // Missing changes
+    attributes.add(new Attribute("waiting-changes",
+        String.valueOf(getRcvMsgQueueSize())));
+    // Age of oldest missing change
+    attributes.add(new Attribute("approximate-delay",
+                                 String.valueOf(getApproxDelay())));
+
+    // Date of the oldest missing change
     long olderUpdateTime = getOlderUpdateTime();
     if (olderUpdateTime != 0)
     {
@@ -1482,6 +1540,7 @@
       attributes.add(new Attribute("older-change-not-synchronized",
                                  String.valueOf(date.toString())));
     }
+    */
 
     /* get the Server State */
     final String ATTR_SERVER_STATE = "server-state";
@@ -1495,9 +1554,11 @@
     Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values);
     attributes.add(attr);
 
+    // Encryption
     attributes.add(new Attribute("ssl-encryption",
         String.valueOf(session.isEncrypted())));
 
+    // Data generation
     attributes.add(new Attribute("generation-id",
         String.valueOf(generationId)));
 
@@ -1663,8 +1724,28 @@
            getMonitorInstanceName() +
            " SH for remote server " + this.getMonitorInstanceName() +
            " sets replServerInfo " + "<" + infoMsg + ">");
-     remoteLDAPservers = infoMsg.getConnectedServers();
+
+     List<String> newRemoteLDAPservers = infoMsg.getConnectedServers();
      generationId = infoMsg.getGenerationId();
+
+     synchronized(remoteLDAPservers)
+     {
+       // Removes the existing structures
+       for (LightweightServerHandler lsh : remoteLDAPservers)
+       {
+         lsh.stopHandler();
+       }
+       remoteLDAPservers.clear();
+
+       // Creates the new structure according to the message received.
+       for (String newConnectedServer : newRemoteLDAPservers)
+       {
+         LightweightServerHandler lsh
+         = new LightweightServerHandler(newConnectedServer, this);
+         lsh.startHandler();
+         remoteLDAPservers.add(lsh);
+       }
+     }
    }
 
    /**
@@ -1678,9 +1759,9 @@
     */
    public boolean isRemoteLDAPServer(short wantedServer)
    {
-     for (String server : remoteLDAPservers)
+     for (LightweightServerHandler server : remoteLDAPservers)
      {
-       if (wantedServer == Short.valueOf(server))
+       if (wantedServer == server.getServerId())
        {
          return true;
        }
@@ -1695,9 +1776,9 @@
     * @return boolean True is the replication server has remote LDAP servers
     * connected to it.
     */
-   public List<String> getRemoteLDAPServers()
+   public boolean hasRemoteLDAPServers()
    {
-     return remoteLDAPservers;
+     return !remoteLDAPservers.isEmpty();
    }
 
   /**
@@ -1802,4 +1883,14 @@
   {
     this.generationId = generationId;
   }
+
+  /**
+   * Returns the Replication Server Domain to which belongs this server handler.
+   *
+   * @return The replication server domain.
+   */
+  public ReplicationServerDomain getDomain()
+  {
+    return this.replicationServerDomain;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index b1071ff..9193c99 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.Message;
@@ -49,6 +49,8 @@
 import org.opends.server.replication.protocol.WindowMessage;
 import org.opends.server.replication.protocol.WindowProbe;
 import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.MonitorMessage;
+import org.opends.server.replication.protocol.MonitorRequestMessage;
 import org.opends.server.loggers.debug.DebugTracer;
 
 
@@ -246,6 +248,17 @@
             }
           }
         }
+        else if (msg instanceof MonitorRequestMessage)
+        {
+          MonitorRequestMessage replServerMonitorRequestMsg =
+            (MonitorRequestMessage) msg;
+          handler.process(replServerMonitorRequestMsg);
+        }
+        else if (msg instanceof MonitorMessage)
+        {
+          MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
+          handler.process(replServerMonitorMsg);
+        }
         else if (msg == null)
         {
           /*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index fca9f9f..698ce5e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.Message;
@@ -126,7 +126,7 @@
             "In " + replicationServerDomain.getReplicationServer().
               getMonitorInstanceName() +
             ", writer to " + this.handler.getMonitorInstanceName() +
-            " publishes msg=" + update.toString() +
+            " publishes msg=[" + update.toString() + "]"+
             " refgenId=" + referenceGenerationId +
             " server=" + handler.getServerId() +
             " generationId=" + handler.getGenerationId());
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index c0b615b..e91164d 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication;
 
@@ -917,6 +917,11 @@
           TRACER.debugInfo("Failed to add entry " + entry.getDN() +
               "Result code = : " + addOp.getResultCode());
         }
+        else
+        {
+          TRACER.debugInfo(entry.getDN() +
+              " added " + addOp.getResultCode());          
+        }
         // They will be removed at the end of the test
         entryList.addLast(entry.getDN());
       }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
index fdc1bf7..2376e20 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ChangeNumberTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.common;
 
@@ -303,4 +303,29 @@
     CN2 = cng.newChangeNumber();
     assertTrue(CN1.compareTo(CN2) != 0 );
   }
+
+  /**
+   * Test the difference in seq num between 2 change numbers.
+   */
+  @Test
+  public void changeNumberDiffSeqNum()
+         throws Exception
+  {
+    ChangeNumber CN1;
+    ChangeNumber CN2;
+
+    CN1 = new ChangeNumber((long)0, 3, (short)0);
+
+    // 3-1 = 2
+    CN2 = new ChangeNumber((long)0, 1, (short)0);
+    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 2);
+    
+    // 3-3 = 0
+    CN2 = new ChangeNumber((long)0, 3, (short)0);
+    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), 0);
+
+    // 3-4 == MAXINT (modulo)
+    CN2 = new ChangeNumber((long)0, 4, (short)0);
+    assertEquals(ChangeNumber.diffSeqNum(CN1, CN2), Integer.MAX_VALUE);
+  }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 5d09572..592cc29 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.protocol;
 
@@ -30,7 +30,9 @@
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
+import java.util.Iterator;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -560,7 +562,7 @@
    * an exception.
    */
   @Test()
-  public void WindowProbeTest() throws Exception
+  public void windowProbeTest() throws Exception
   {
     WindowProbe msg = new WindowProbe();
     new WindowProbe(msg.getBytes());
@@ -583,6 +585,74 @@
   }
 
   /**
+   * Test MonitorMessage
+   */
+  @Test()
+  public void monitorMessageTest() throws Exception
+  {
+    short sender = 2;
+    short dest = 3;
+
+    // RS State
+    ServerState rsState = new ServerState();
+    ChangeNumber rscn1 = new ChangeNumber(1, (short) 1, (short) 1);
+    ChangeNumber rscn2 = new ChangeNumber(1, (short) 1, (short) 2);
+    rsState.update(rscn1);
+    rsState.update(rscn2);
+
+    // LS1 state
+    ServerState s1 = new ServerState();
+    short sid1 = 111;
+    ChangeNumber cn1 = new ChangeNumber(1, (short) 1, sid1);
+    s1.update(cn1);
+
+    // LS2 state
+    ServerState s2 = new ServerState();
+    short sid2 = 222;
+    Long now = TimeThread.getTime();
+    ChangeNumber cn2 = new ChangeNumber(now,
+                                       (short) 123, sid2);
+    s2.update(cn2);
+
+    MonitorMessage msg =
+      new MonitorMessage(sender, dest);
+    msg.setReplServerState(rsState);
+    msg.setLDAPServerState(sid1, s1, now+1);
+    msg.setLDAPServerState(sid2, s2, now+2);
+    
+    byte[] b = msg.getBytes();
+    MonitorMessage newMsg = new MonitorMessage(b);
+
+    assertEquals(rsState, msg.getReplServerState());
+    assertEquals(newMsg.getReplServerState().toString(), 
+        msg.getReplServerState().toString());
+    
+    Iterator<Short> it = newMsg.iterator();
+    while (it.hasNext())
+    {
+      short sid = it.next();
+      ServerState s = newMsg.getLDAPServerState(sid);
+      if (sid == sid1)
+      {
+        assertEquals(s.toString(), s1.toString(), "");
+        assertEquals((Long)(now+1), newMsg.getApproxFirstMissingDate(sid), "");
+      }
+      else if (sid == sid2)
+      {
+        assertEquals(s.toString(), s2.toString());        
+        assertEquals((Long)(now+2), newMsg.getApproxFirstMissingDate(sid), "");
+      }
+      else
+      {
+        fail("Bad sid");
+      }
+    }
+
+    assertEquals(newMsg.getsenderID(), msg.getsenderID());
+    assertEquals(newMsg.getDestination(), msg.getDestination());
+  }
+
+  /**
    * Test that EntryMessage encoding and decoding works
    * by checking that : msg == new EntryMessageTest(msg.getBytes()).
    */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
new file mode 100644
index 0000000..4c2f81d
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -0,0 +1,599 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.net.ServerSocket;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.plugin.ReplicationBroker;
+import org.opends.server.replication.plugin.ReplicationDomain;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.SocketSession;
+import org.opends.server.tools.LDAPSearch;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Tests for the replicationServer code.
+ */
+
+public class MonitorTest extends ReplicationTestCase
+{
+  // The tracer object for the debug logger
+  private static final DebugTracer TRACER = getTracer();
+
+  private static final String baseDnStr = "dc=example,dc=com";
+  private static final String baseSnStr = "genidcom";
+
+  private static final int   WINDOW_SIZE = 10;
+  private static final int   CHANGELOG_QUEUE_SIZE = 100;
+  private static final short server1ID = 1;
+  private static final short server2ID = 2;
+  private static final short server3ID = 3;
+  private static final short server4ID = 4;
+  private static final short changelog1ID = 11;
+  private static final short changelog2ID = 12;
+  private static final short changelog3ID = 13;
+
+  private DN baseDn;
+  private ReplicationBroker broker2 = null;
+  private ReplicationBroker broker3 = null;
+  private ReplicationBroker broker4 = null;
+  private ReplicationServer replServer1 = null;
+  private ReplicationServer replServer2 = null;
+  private ReplicationServer replServer3 = null;
+  private boolean emptyOldChanges = true;
+  ReplicationDomain replDomain = null;
+  SocketSession ssSession = null;
+  boolean ssShutdownRequested = false;
+  protected String[] updatedEntries;
+
+  private static int[] replServerPort = new int[20];
+
+  private void debugInfo(String s)
+  {
+    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("** TEST **" + s);
+    }
+  }
+  protected void debugInfo(String message, Exception e)
+  {
+    debugInfo(message + stackTraceToSingleLineString(e));
+  }
+
+  /**
+   * Set up the environment for performing the tests in this Class.
+   *
+   * @throws Exception
+   *           If the environment could not be set up.
+   */
+  @BeforeClass
+  public void setUp() throws Exception
+  {
+    //log("Starting generationIdTest setup: debugEnabled:" + debugEnabled());
+
+    // This test suite depends on having the schema available.
+    TestCaseUtils.startServer();
+
+    baseDn = DN.decode(baseDnStr);
+
+    updatedEntries = newLDIFEntries();
+
+    // Create an internal connection in order to provide operations
+    // to DS to populate the db -
+    connection = InternalClientConnection.getRootConnection();
+
+    // Synchro provider
+    String synchroStringDN = "cn=Synchronization Providers,cn=config";
+
+    // Synchro multi-master
+    synchroPluginStringDN = "cn=Multimaster Synchronization, "
+      + synchroStringDN;
+
+    // Synchro suffix
+    synchroServerEntry = null;
+
+    // Add config entries to the current DS server based on :
+    // Add the replication plugin: synchroPluginEntry & synchroPluginStringDN
+    // Add synchroServerEntry
+    // Add replServerEntry
+    configureReplication();
+
+    // Change log
+    String changeLogStringDN = "cn=Changelog Server, " + synchroPluginStringDN;
+    String changeLogLdif = "dn: " + changeLogStringDN + "\n"
+    + "objectClass: top\n"
+    + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+    + "cn: Changelog Server\n" + "ds-cfg-changelog-port: 8990\n"
+    + "ds-cfg-changelog-server-id: 1\n"
+    + "ds-cfg-window-size: " + WINDOW_SIZE + "\n"
+    + "ds-cfg-changelog-max-queue-size: " + CHANGELOG_QUEUE_SIZE;
+    replServerEntry = TestCaseUtils.entryFromLdifString(changeLogLdif);
+    replServerEntry = null;
+
+  }
+
+  /*
+   * Creates entries necessary to the test.
+   */
+  private String[] newLDIFEntries()
+  {
+    String[] entries =
+    {
+        "dn: " + baseDn + "\n"
+        + "objectClass: top\n"
+        + "objectClass: domain\n"
+        + "entryUUID: 21111111-1111-1111-1111-111111111111\n"
+        + "\n",
+        "dn: ou=People," + baseDn + "\n"
+        + "objectClass: top\n"
+        + "objectClass: organizationalUnit\n"
+        + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
+        + "\n",
+        "dn: cn=Fiona Jensen,ou=people," + baseDn + "\n"
+        + "objectclass: top\n"
+        + "objectclass: person\n"
+        + "objectclass: organizationalPerson\n"
+        + "objectclass: inetOrgPerson\n"
+        + "cn: Fiona Jensen\n"
+        + "sn: Jensen\n"
+        + "uid: fiona\n"
+        + "telephonenumber: +1 408 555 1212\n"
+        + "entryUUID: 21111111-1111-1111-1111-111111111113\n"
+        + "\n",
+        "dn: cn=Robert Langman,ou=people," + baseDn + "\n"
+        + "objectclass: top\n"
+        + "objectclass: person\n"
+        + "objectclass: organizationalPerson\n"
+        + "objectclass: inetOrgPerson\n"
+        + "cn: Robert Langman\n"
+        + "sn: Langman\n"
+        + "uid: robert\n"
+        + "telephonenumber: +1 408 555 1213\n"
+        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
+        + "\n"
+    };
+
+    return entries;
+  }
+
+  /**
+   * Creates a new replicationServer.
+   * @param changelogId The serverID of the replicationServer to create.
+   * @param all         Specifies whether to coonect the created replication
+   *                    server to the other replication servers in the test.
+   * @return The new created replication server.
+   */
+  private ReplicationServer createReplicationServer(short changelogId,
+      boolean all, String suffix)
+  {
+    SortedSet<String> servers = null;
+    servers = new TreeSet<String>();
+    try
+    {
+      if (changelogId==changelog1ID)
+      {
+        if (replServer1!=null)
+          return replServer1;
+      }
+      else if (changelogId==changelog2ID)
+      {
+        if (replServer2!=null)
+          return replServer2;
+      }
+      else if (changelogId==changelog3ID)
+      {
+        if (replServer3!=null)
+          return replServer3;
+      }
+      if (all)
+      {
+        servers.add("localhost:" + getChangelogPort(changelog1ID));
+        servers.add("localhost:" + getChangelogPort(changelog2ID));
+      }
+      int chPort = getChangelogPort(changelogId);
+      String chDir = "genid"+changelogId+suffix+"Db";
+      ReplServerFakeConfiguration conf =
+        new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100,
+            servers);
+      ReplicationServer replicationServer = new ReplicationServer(conf);
+      Thread.sleep(1000);
+
+      return replicationServer;
+
+    }
+    catch (Exception e)
+    {
+      fail("createChangelog" + stackTraceToSingleLineString(e));
+    }
+    return null;
+  }
+
+  /**
+   * Create a synchronized suffix in the current server providing the
+   * replication Server ID.
+   * @param changelogID
+   */
+  private void connectServer1ToChangelog(short changelogID)
+  {
+    // Connect DS to the replicationServer
+    try
+    {
+      // suffix synchronized
+      String synchroServerStringDN = synchroPluginStringDN;
+      String synchroServerLdif =
+        "dn: cn=" + baseSnStr + ", cn=domains," + synchroServerStringDN + "\n"
+        + "objectClass: top\n"
+        + "objectClass: ds-cfg-replication-domain\n"
+        + "cn: " + baseSnStr + "\n"
+        + "ds-cfg-base-dn: " + baseDnStr + "\n"
+        + "ds-cfg-replication-server: localhost:"
+        + getChangelogPort(changelogID)+"\n"
+        + "ds-cfg-server-id: " + server1ID + "\n"
+        + "ds-cfg-receive-status: true\n"
+        + "ds-cfg-window-size: " + WINDOW_SIZE;
+
+      if (synchroServerEntry == null)
+      {
+        synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+        DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+        assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+        "Unable to add the synchronized server");
+        configEntryList.add(synchroServerEntry.getDN());
+
+        replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
+
+      }
+      if (replDomain != null)
+      {
+        debugInfo("ReplicationDomain: Import/Export is running ? " + replDomain.ieRunning());
+      }
+    }
+    catch(Exception e)
+    {
+      debugInfo("connectToReplServer", e);
+      fail("connectToReplServer", e);
+    }
+  }
+
+  /*
+   * Disconnect DS from the replicationServer
+   */
+  private void disconnectFromReplServer(short changelogID)
+  {
+    try
+    {
+      // suffix synchronized
+      String synchroServerStringDN = "cn=" + baseSnStr + ", cn=domains," +
+      synchroPluginStringDN;
+      if (synchroServerEntry != null)
+      {
+        DN synchroServerDN = DN.decode(synchroServerStringDN);
+        DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN,null);
+        assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN())==null,
+        "Unable to delete the synchronized domain");
+        synchroServerEntry = null;
+
+        configEntryList.remove(configEntryList.indexOf(synchroServerDN));
+      }
+    }
+    catch(Exception e)
+    {
+      fail("disconnectFromReplServer", e);
+    }
+  }
+
+  private int getChangelogPort(short changelogID)
+  {
+    if (replServerPort[changelogID] == 0)
+    {
+      try
+      {
+        // Find  a free port for the replicationServer
+        ServerSocket socket = TestCaseUtils.bindFreePort();
+        replServerPort[changelogID] = socket.getLocalPort();
+        socket.close();
+      }
+      catch(Exception e)
+      {
+        fail("Cannot retrieve a free port for replication server."
+            + e.getMessage());
+      }
+    }
+    return replServerPort[changelogID];
+  }
+
+  private String createEntry(UUID uid)
+  {
+    String user2dn = "uid=user"+uid+",ou=People," + baseDnStr;
+    return new String(
+        "dn: "+ user2dn + "\n"
+        + "objectClass: top\n" + "objectClass: person\n"
+        + "objectClass: organizationalPerson\n"
+        + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+        + "homePhone: 951-245-7634\n"
+        + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+        + "mobile: 027-085-0537\n"
+        + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+        + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
+        + "cn: Aaccf Amar2\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+        + "street: 17984 Thirteenth Street\n"
+        + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 2\n"
+        + "sn: Amar2\n" + "givenName: Aaccf2\n" + "postalCode: 85762\n"
+        + "userPassword: password\n" + "initials: AA\n");
+  }
+
+  static protected ReplicationMessage createAddMsg(short serverId)
+  {
+    Entry personWithUUIDEntry = null;
+    String user1entryUUID;
+    String baseUUID = null;
+    String user1dn;
+
+    /*
+     * Create a Change number generator to generate new changenumbers
+     * when we need to send operation messages to the replicationServer.
+     */
+    ChangeNumberGenerator gen = new ChangeNumberGenerator(serverId, 0);
+
+    user1entryUUID = "33333333-3333-3333-3333-333333333333";
+    user1dn = "uid=user1,ou=People," + baseDnStr;
+    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
+    + "objectClass: top\n" + "objectClass: person\n"
+    + "objectClass: organizationalPerson\n"
+    + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
+    + "homePhone: 951-245-7634\n"
+    + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
+    + "mobile: 027-085-0537\n"
+    + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
+    + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
+    + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
+    + "street: 17984 Thirteenth Street\n"
+    + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
+    + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
+    + "userPassword: password\n" + "initials: AA\n"
+    + "entryUUID: " + user1entryUUID + "\n";
+
+    try
+    {
+      personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
+    }
+    catch(Exception e)
+    {
+      fail(e.getMessage());
+    }
+
+    // Create and publish an update message to add an entry.
+    AddMsg addMsg = new AddMsg(gen.newChangeNumber(),
+        personWithUUIDEntry.getDN().toString(),
+        user1entryUUID,
+        baseUUID,
+        personWithUUIDEntry.getObjectClassAttribute(),
+        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
+
+    return addMsg;
+  }
+
+  @Test(enabled=false)
+  public void testMultiRS() throws Exception
+  {
+    String testCase = "testMultiRS";
+    debugInfo("Starting " + testCase);
+
+    ReplicationDomain.clearJEBackend(false,
+        "userRoot",
+        baseDn.toNormalizedString());
+
+    debugInfo ("Creating 2 RS");
+    replServer1 = createReplicationServer(changelog1ID, true, testCase);
+    replServer2 = createReplicationServer(changelog2ID, true, testCase);
+    replServer3 = createReplicationServer(changelog3ID, true, testCase);
+    Thread.sleep(500);
+
+    debugInfo("Connecting DS to replServer1");
+    connectServer1ToChangelog(changelog1ID);
+    Thread.sleep(1500);
+
+    try
+    {
+      debugInfo("Connecting broker2 to replServer1");
+      broker2 = openReplicationSession(baseDn,
+          server2ID, 100, getChangelogPort(changelog1ID),
+          1000, !emptyOldChanges);
+      Thread.sleep(1000);
+    }
+    catch(SocketException se)
+    {
+      fail("Broker connection is expected to be accepted.");
+    }
+
+    try
+    {
+      debugInfo("Connecting broker3 to replServer2");
+      broker3 = openReplicationSession(baseDn,
+          server3ID, 100, getChangelogPort(changelog2ID),
+          1000, !emptyOldChanges);
+      Thread.sleep(1000);
+    }
+    catch(SocketException se)
+    {
+      fail("Broker connection is expected to be accepted.");
+    }
+
+    try
+    {
+      debugInfo("Connecting broker4 to replServer2");
+      broker4 = openReplicationSession(baseDn,
+          server4ID, 100, getChangelogPort(changelog2ID),
+          1000, !emptyOldChanges);
+      Thread.sleep(1000);
+    }
+    catch(SocketException se)
+    {
+      fail("Broker connection is expected to be accepted.");
+    }
+
+    // Do a bunch of change
+    updatedEntries = newLDIFEntries();
+    this.addTestEntriesToDB(updatedEntries);
+    
+    for (int i = 0; i<200; i++)
+    {
+      String ent1[] = { createEntry(UUID.randomUUID()) };
+      this.addTestEntriesToDB(ent1);
+    }
+    
+    for (int i=0; i<10; i++)
+    {
+      broker3.publish(createAddMsg(server3ID));
+    }
+
+    searchMonitor();
+
+    debugInfo("Disconnect DS from replServer1 (required in order to DEL entries).");
+    disconnectFromReplServer(changelog1ID);
+
+    debugInfo("Cleaning entries");
+    postTest();
+
+    debugInfo("Successfully ending " + testCase);
+  }
+
+  /**
+   * Disconnect broker and remove entries from the local DB
+   * @throws Exception
+   */
+  protected void postTest()
+  {
+    debugInfo("Post test cleaning.");
+
+    // Clean brokers
+    if (broker2 != null)
+      broker2.stop();
+    broker2 = null;
+    if (broker3 != null)
+      broker3.stop();
+    broker3 = null;
+    if (broker4 != null)
+      broker4.stop();
+    broker4 = null;
+
+    if (replServer1 != null)
+      replServer1.remove();
+    if (replServer2 != null)
+      replServer2.remove();
+    if (replServer3 != null)
+      replServer3.remove();
+    replServer1 = null;
+    replServer2 = null;
+    replServer3 = null;
+
+    super.cleanRealEntries();
+
+    try
+    {
+      ReplicationDomain.clearJEBackend(false,
+        replDomain.getBackend().getBackendID(),
+        baseDn.toNormalizedString());
+    }
+    catch (Exception e) {}
+  }
+  
+  @Test(enabled=true)
+  public void MonitorTest() throws Exception
+  {
+    testMultiRS();
+  }
+  
+  private static final ByteArrayOutputStream oStream =
+    new ByteArrayOutputStream();
+  private static final ByteArrayOutputStream eStream =
+    new ByteArrayOutputStream();
+
+  private void searchMonitor()
+  {
+    // test search as directory manager returns content
+    String[] args3 =
+    {
+      "-h", "127.0.0.1",
+      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
+      "-D", "cn=Directory Manager",
+      "-w", "password",
+      "-b", "cn=monitor",
+      "-s", "sub",
+      "(base-dn=*)"
+    };
+
+    oStream.reset();
+    eStream.reset();
+    int retVal =
+      LDAPSearch.mainSearch(args3, false, oStream, eStream);
+    String entries = oStream.toString();
+    debugInfo("Entries:" + entries);
+    try
+    {
+      assertEquals(retVal, 0, "Returned error: " + eStream);
+      assertTrue(!entries.equalsIgnoreCase(""), "Returned entries: " + entries);
+    }
+    catch(Exception e)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          stackTraceToSingleLineString(new Exception()));
+      fail(e.getMessage());
+    }
+  }
+}
\ No newline at end of file

--
Gitblit v1.10.0