From 3a9e211d36ee94ff99941943b3b51e0f768624f5 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 06 Nov 2009 09:11:40 +0000
Subject: [PATCH] In order to support a more clever algorithm for the DS to choose his RS,  we introduce:

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |    6 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java  |    6 
 opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java                                        |    2 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java                    |   89 +--
 opends/src/server/org/opends/server/replication/common/DSInfo.java                                               |   18 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |    8 
 opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java                                         |   18 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java               |    6 
 opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java                                  |    2 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java     |    8 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                        |   17 
 opends/src/messages/messages/replication.properties                                                              |    2 
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |   92 +++
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                              |  378 +++++++++++-----
 opends/resource/schema/02-config.ldif                                                                            |    8 
 opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java                             |   21 
 opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java                                        |   27 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java             |    2 
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |   57 --
 opends/src/server/org/opends/server/replication/server/DataServerHandler.java                                    |    8 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java                 |   55 ++
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                    |  236 ++++++++-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java               |    6 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java                 |  203 ++++----
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java  |   18 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java        |    6 
 opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml                             |   23 +
 opends/src/server/org/opends/server/replication/common/RSInfo.java                                               |   30 +
 28 files changed, 935 insertions(+), 417 deletions(-)

diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 158ad3b..148b8d1 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -2458,6 +2458,11 @@
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
   SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.605
+  NAME 'ds-cfg-monitoring-period'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler'
   SUP top
@@ -3137,7 +3142,8 @@
         ds-cfg-group-id $
         ds-cfg-assured-timeout $
         ds-cfg-degraded-status-threshold $
-        ds-cfg-weight)
+        ds-cfg-weight $
+        ds-cfg-monitoring-period)
   X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
   NAME 'ds-backup-directory'
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
index 6c91def..57e8b50 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -296,4 +296,27 @@
       </ldap:attribute>
     </adm:profile>
   </adm:property>
+  <adm:property name="monitoring-period" mandatory="false">
+    <adm:synopsis>
+      The period between sending of monitoring messages.
+    </adm:synopsis>
+    <adm:description>
+      Defines the amount of milliseconds the replication server will wait before
+      sending new monitoring messages to its peers (replication servers and
+      directory servers).
+    </adm:description>
+    <adm:default-behavior>
+      <adm:defined>
+        <adm:value>3000ms</adm:value>
+      </adm:defined>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:duration base-unit="ms" lower-limit="1000" />
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-monitoring-period</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
 </adm:managed-object>
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index da7c3de..801eca7 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -170,7 +170,7 @@
  UTF-8. This is required to be able to encode the changes in the database. \
  This replication server will now shutdown
 SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \
- suffix  %s but was not able to connect to any Replication Server
+ suffix %s but was not able to connect to any Replication Server
 NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
  for domain %s with replication server id %s %s - local server id is %s - data \
  generation is %s
diff --git a/opends/src/server/org/opends/server/replication/common/DSInfo.java b/opends/src/server/org/opends/server/replication/common/DSInfo.java
index 1e58428..aeb1835 100644
--- a/opends/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -247,23 +247,23 @@
     StringBuffer sb = new StringBuffer();
     sb.append("DS id: ");
     sb.append(dsId);
-    sb.append(" RS id: ");
+    sb.append(" ; RS id: ");
     sb.append(rsId);
-    sb.append(" Generation id: ");
+    sb.append(" ; Generation id: ");
     sb.append(generationId);
-    sb.append(" Status: ");
+    sb.append(" ; Status: ");
     sb.append(status);
-    sb.append(" Assured replication: ");
+    sb.append(" ; Assured replication: ");
     sb.append(assuredFlag);
-    sb.append(" Assured mode: ");
+    sb.append(" ; Assured mode: ");
     sb.append(assuredMode);
-    sb.append(" Safe data level: ");
+    sb.append(" ; Safe data level: ");
     sb.append(safeDataLevel);
-    sb.append(" Group id: ");
+    sb.append(" ; Group id: ");
     sb.append(groupId);
-    sb.append(" Referral URLs: ");
+    sb.append(" ; Referral URLs: ");
     sb.append(refUrls);
-    sb.append(" ECL Include: ");
+    sb.append(" ; ECL Include: ");
     sb.append(eclIncludes);
     return sb.toString();
   }
diff --git a/opends/src/server/org/opends/server/replication/common/RSInfo.java b/opends/src/server/org/opends/server/replication/common/RSInfo.java
index 5b6d9e2..577ffd3 100644
--- a/opends/src/server/org/opends/server/replication/common/RSInfo.java
+++ b/opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -40,6 +40,11 @@
   private long generationId = -1;
   // Group id of the RS
   private byte groupId = (byte) -1;
+  // The weight of the RS
+  // It is important to keep the default value to 1 so that it is used as
+  // default value for a RS using protocol V3: this default value vill be used
+  // in algorithms that use weight
+  private int weight = 1;
 
   /**
    * Creates a new instance of RSInfo with every given info.
@@ -47,12 +52,14 @@
    * @param id The RS id
    * @param generationId The generation id the RS is using
    * @param groupId RS group id
+   * @param weight RS weight
    */
-  public RSInfo(int id, long generationId, byte groupId)
+  public RSInfo(int id, long generationId, byte groupId, int weight)
   {
     this.id = id;
     this.generationId = generationId;
     this.groupId = groupId;
+    this.weight = weight;
   }
 
   /**
@@ -83,6 +90,16 @@
   }
 
   /**
+   * Get the RS weight.
+   * @return The RS weight
+   */
+  public int getWeight()
+  {
+    return weight;
+  }
+
+
+  /**
    * Test if the passed object is equal to this one.
    * @param obj The object to test
    * @return True if both objects are equal
@@ -99,7 +116,8 @@
       RSInfo rsInfo = (RSInfo) obj;
       return ((id == rsInfo.getId()) &&
         (generationId == rsInfo.getGenerationId()) &&
-        (groupId == rsInfo.getGroupId()));
+        (groupId == rsInfo.getGroupId()) &&
+        (weight == rsInfo.getWeight()));
     } else
     {
       return false;
@@ -117,6 +135,7 @@
     hash = 37 * hash + this.id;
     hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
     hash = 37 * hash + this.groupId;
+    hash = 37 * hash + this.weight;
     return hash;
   }
 
@@ -130,11 +149,12 @@
     StringBuffer sb = new StringBuffer();
     sb.append("Id: ");
     sb.append(id);
-    sb.append(" Generation id: ");
+    sb.append(" ; Generation id: ");
     sb.append(generationId);
-    sb.append(" Group id: ");
+    sb.append(" ; Group id: ");
     sb.append(groupId);
+    sb.append(" ; Weight: ");
+    sb.append(weight);
     return sb.toString();
   }
-
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index e3f6e48..138b2b1 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -93,6 +93,24 @@
   }
 
   /**
+   * Sets the sender ID.
+   * @param senderID The sender ID.
+   */
+  public void setSenderID(int senderID)
+  {
+    this.senderID = senderID;
+  }
+
+  /**
+   * Sets the destination.
+   * @param destination The destination.
+   */
+  public void setDestination(int destination)
+  {
+    this.destination = destination;
+  }
+
+  /**
    * Sets the state of the replication server.
    * @param state The state.
    */
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
index db05464..0924254 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -31,7 +31,7 @@
 
 /**
  * This message is part of the replication protocol.
- * RS1 sends a MonitorRequestMsg to RS2 to requests its monitoring
+ * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
  * informations.
  * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
  * MonitorMessage.
diff --git a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
index 5b38558..8b3172a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -29,7 +29,7 @@
 /**
  * This is an abstract class of messages of the replication protocol
  * for message that needs to contain information about the server that
- * send them and the destination servers to whitch they should be sent.
+ * send them and the destination servers to which they should be sent.
  */
 public abstract class RoutableMsg extends ReplicationMsg
 {
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index f357409..85a4d1d 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -167,6 +167,7 @@
 
         if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
+          // Put ECL includes
           Set<String> attrs = dsInfo.getEclIncludes();
           oStream.write(attrs.size());
           for (String attr : attrs)
@@ -192,8 +193,15 @@
         oStream.write(String.valueOf(rsInfo.getGenerationId()).
           getBytes("UTF-8"));
         oStream.write(0);
-        // Put DS group id
+        // Put RS group id
         oStream.write(rsInfo.getGroupId());
+
+        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          // Put RS weight
+          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
+          oStream.write(0);
+        }
       }
 
       return oStream.toByteArray();
@@ -332,23 +340,30 @@
         int length = getNextLength(in, pos);
         String serverIdString = new String(in, pos, length, "UTF-8");
         int id = Integer.valueOf(serverIdString);
-        pos +=
-          length + 1;
+        pos += length + 1;
 
         /* Read the generation id */
         length = getNextLength(in, pos);
         long generationId =
           Long.valueOf(new String(in, pos, length,
           "UTF-8"));
-        pos +=
-          length + 1;
+        pos += length + 1;
 
         /* Read RS group id */
         byte groupId = in[pos++];
 
+        int weight = 1;
+        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        {
+          /* Read RS weight */
+          length = getNextLength(in, pos);
+          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+          pos += length + 1;
+        }
+
         /* Now create RSInfo and store it in list */
 
-        RSInfo rsInfo = new RSInfo(id, generationId, groupId);
+        RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight);
         rsList.add(rsInfo);
 
         nRsInfo--;
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index cbc6bad..5053f42 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -309,8 +309,7 @@
 
     try
     {
-      MonitorData md;
-      md = replicationServerDomain.computeMonitorData();
+      MonitorData md = replicationServerDomain.computeMonitorData();
 
       // Oldest missing update
       Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
@@ -538,7 +537,7 @@
           return;
         }
 
-        // Send our own TopologyMsg to remote RS
+        // Send our own TopologyMsg to remote DS
         TopologyMsg outTopoMsg = sendTopoToRemoteDS();
 
         logStartSessionHandshake(inStartSessionMsg, outTopoMsg);
@@ -572,6 +571,9 @@
       // Create the status analyzer for the domain if not already started
       createStatusAnalyzer();
 
+      // Create the monitoring publisher for the domain if not already started
+      createMonitoringPublisher();
+
       registerIntoDomain();
 
       super.finalizeStart();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index b4403da..71f4386 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -101,6 +99,7 @@
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 import com.sleepycat.je.DatabaseException;
+import java.util.Collections;
 
 /**
  * ReplicationServer Listener.
@@ -173,6 +172,10 @@
   // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
   private int degradedStatusThreshold = 5000;
 
+  // Number of milliseconds to wait before sending new monitoring messages.
+  // If value is 0, monitoring publisher is disabled
+  private long monitoringPublisherPeriod = 3000;
+
   // The handler of the draft change numbers database, the database used to
   // store the relation between a draft change number ('seqnum') and the
   // associated cookie.
@@ -211,6 +214,13 @@
   private int weight = 1;
 
   /**
+   * Holds the list of all replication servers instantiated in this VM.
+   * This allows to perform clean up of the RS databases in unit tests.
+   */
+  private static List<ReplicationServer> allInstances =
+    new ArrayList<ReplicationServer>();
+
+  /**
    * Creates a new Replication server using the provided configuration entry.
    *
    * @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
     groupId = (byte)configuration.getGroupId();
     assuredTimeout = configuration.getAssuredTimeout();
     degradedStatusThreshold = configuration.getDegradedStatusThreshold();
+    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
 
     replSessionSecurity = new ReplSessionSecurity();
     initialize(replicationPort);
@@ -274,8 +285,20 @@
     DirectoryServer.registerImportTaskListener(this);
 
     localPorts.add(replicationPort);
+
+    // Keep track of this new instance
+    allInstances.add(this);
   }
 
+  /**
+   * Get the list of every replication servers instantiated in the current VM.
+   * @return The list of every replication servers instantiated in the current
+   * VM.
+   */
+  public static List<ReplicationServer> getAllInstances()
+  {
+    return allInstances;
+  }
 
   /**
    * The run method for the Listen thread.
@@ -850,7 +873,9 @@
       dbEnv.shutdown();
     }
 
-}
+    // Remove this instance from the global instance list
+    allInstances.remove(this);
+  }
 
 
   /**
@@ -1028,6 +1053,32 @@
       }
     }
 
+    // Update period value for monitoring publishers (stop them if requested
+    // value is 0)
+    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+    {
+      long oldMonitoringPeriod = monitoringPublisherPeriod;
+      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
+      for(ReplicationServerDomain rsd : baseDNs.values())
+      {
+        if (monitoringPublisherPeriod == 0L)
+        {
+          // Requested to stop monitoring publishers
+          rsd.stopMonitoringPublisher();
+        } else if (rsd.isRunningMonitoringPublisher())
+        {
+          // Update the threshold value for this running monitoring publisher
+          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
+        } else if (oldMonitoringPeriod == 0L)
+        {
+          // Requested to start monitoring publishers with provided period value
+          if ( (rsd.getConnectedDSs().size() > 0) ||
+            (rsd.getConnectedRSs().size() > 0) )
+            rsd.startMonitoringPublisher();
+        }
+      }
+    }
+
     // Changed the group id ?
     byte newGroupId = (byte)configuration.getGroupId();
     if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
     if (weight != configuration.getWeight())
     {
       weight = configuration.getWeight();
-      // TODO: send new TopologyMsg
+      // Broadcast the new weight the the whole topology. This will make some
+      // DSs reconnect (if needed) to other RSs according to the new weight of
+      // this RS.
+      broadcastConfigChange();
     }
 
     if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
   }
 
   /**
+   * Broadcast a configuration change that just happened to the whole topology
+   * by sending a TopologyMsg to every entity in the topology.
+   */
+  private void broadcastConfigChange()
+  {
+    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+    {
+      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
+      replicationServerDomain.buildAndSendTopoInfoToRSs();
+    }
+  }
+
+  /**
    * {@inheritDoc}
    */
   public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
   }
 
   /**
+   * Get the monitoring publisher period value.
+   * @return the monitoring publisher period value.
+   */
+  public long getMonitoringPublisherPeriod()
+  {
+    return monitoringPublisherPeriod;
+  }
+
+  /**
    * Compute the list of replication servers that are not any
    * more connected to this Replication Server and stop the
    * corresponding handlers.
@@ -1411,12 +1487,80 @@
   /* The date of the last time they have been elaborated */
   private long monitorDataLastBuildDate = 0;
 
-  /* Search op on monitor data is processed by a worker thread.
-   * Requests are sent to the other RS,and responses are received by the
-   * listener threads.
-   * The worker thread is awoke on this semaphore, or on timeout.
+  /**
+   * This uniquely identifies a server (handler) in the cross-domain topology.
+   * Represents an identifier of a handler (in the whole RS) we have to wait a
+   * monitoring message from before answering to a monitor request.
    */
-  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
+  public static class GlobalServerId {
+
+    private int serverId = -1;
+    private String baseDn = null;
+
+    /**
+     * Constructor for a global server id.
+     * @param baseDn The dn of the RSD owning the handler.
+     * @param serverId The handler id in the matching RSD.
+     */
+    public GlobalServerId(String baseDn, int serverId) {
+      this.baseDn = baseDn;
+      this.serverId = serverId;
+    }
+
+    /**
+     * Get the server handler id.
+     * @return the serverId
+     */
+    public int getServerId()
+    {
+      return serverId;
+    }
+
+    /**
+     * Get the base dn.
+     * @return the baseDn
+     */
+    public String getBaseDn()
+    {
+      return baseDn;
+    }
+
+    /**
+     * Get the hascode.
+     * @return The hashcode.
+     */
+    @Override
+    public int hashCode()
+    {
+      int hash = 7;
+      hash = 43 * hash + this.serverId;
+      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
+      return hash;
+    }
+
+    /**
+     * Tests if the passed global server handler id represents the same server
+     * handler as this one.
+     * @param obj The object to test.
+     * @return True if both identifiers are the same.
+     */
+    public boolean equals(Object obj) {
+      if ( (obj == null) || (obj instanceof GlobalServerId))
+        return false;
+
+      GlobalServerId globalServerId = (GlobalServerId)obj;
+      return ( globalServerId.baseDn.equals(baseDn) &&
+        (globalServerId.serverId == serverId) );
+    }
+  }
+
+  /**
+   * This gives the list of server handlers we are willing to wait monitoring
+   * message from. Each time a monitoring message is received by a server
+   * handler, the matching server handler id is retired from the list. When the
+   * list is empty, we received all expected monitoring messages.
+   */
+  private List<GlobalServerId> expectedMonitoringMsg = null;
 
   /**
    * Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
    *
    * @throws DirectoryException If the computation cannot be achieved.
    */
-  public void computeMonitorData() throws DirectoryException
+  public synchronized void computeMonitorData() throws DirectoryException
   {
     if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
     {
@@ -1440,15 +1584,17 @@
       return;
     }
 
-    remoteMonitorResponsesSemaphore.drainPermits();
-    int count = 0;
+    // Initialize the list of server handlers we expect monitoring messages from
+    expectedMonitoringMsg =
+      Collections.synchronizedList(new ArrayList<GlobalServerId>());
+
     for (ReplicationServerDomain domain : baseDNs.values())
     {
-      count += domain.initializeMonitorData();
+      domain.initializeMonitorData(expectedMonitoringMsg);
     }
 
     // Wait for responses
-    waitMonitorDataResponses(count);
+    waitMonitorDataResponses();
 
     for (ReplicationServerDomain domain : baseDNs.values())
     {
@@ -1457,38 +1603,51 @@
   }
 
   /**
-   * Wait for the expected count of received MonitorMsg.
-   * @param expectedResponses The number of expected answers.
+   * Wait for the expected received MonitorMsg.
    * @throws DirectoryException When an error occurs.
    */
-  private void waitMonitorDataResponses(int expectedResponses)
+  private void waitMonitorDataResponses()
     throws DirectoryException
   {
     try
     {
       if (debugEnabled())
         TRACER.debugInfo(
-          "In " + getMonitorInstanceName() + " baseDn=" +
-          " waiting for " + expectedResponses + " expected monitor messages");
+          "In " + getMonitorInstanceName() +
+          " waiting for " + expectedMonitoringMsg.size() +
+          " expected monitor messages");
 
-      boolean allPermitsAcquired =
-        remoteMonitorResponsesSemaphore.tryAcquire(
-        expectedResponses,
-        (long) 5000, TimeUnit.MILLISECONDS);
-
-      if (!allPermitsAcquired)
+      // Wait up to 5 seconds for every expected monitoring message to come
+      // back.
+      boolean allReceived = false;
+      long startTime = TimeThread.getTime();
+      long curTime = startTime;
+      int maxTime = 5000;
+      while ( (curTime - startTime) < maxTime )
       {
-        monitorDataLastBuildDate = TimeThread.getTime();
+        // Have every expected monitoring messages arrived ?
+        if (expectedMonitoringMsg.size() == 0)
+        {
+          // Ok break the loop
+          allReceived = true;
+          break;
+        }
+        Thread.sleep(100);
+        curTime = TimeThread.getTime();
+      }
+
+      monitorDataLastBuildDate = TimeThread.getTime();
+
+      if (!allReceived)
+      {
         logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
-      // let's go on in best effort even with limited data received.
+        // let's go on in best effort even with limited data received.
       } else
       {
-        monitorDataLastBuildDate = TimeThread.getTime();
         if (debugEnabled())
           TRACER.debugInfo(
-            "In " + getMonitorInstanceName() + " baseDn=" +
-            " Successfully received all " + expectedResponses +
-            " expected monitor messages");
+            "In " + getMonitorInstanceName() +
+            " Successfully received all expected monitor messages");
       }
     } catch (Exception e)
     {
@@ -1499,11 +1658,18 @@
 
   /**
    * This should be called by each ReplicationServerDomain that receives
-   * a response to a monitor request message.
+   * a response to a monitor request message. This may also be called when a
+   * monitoring message is coming from a RS whose monitoring publisher thread
+   * sent it. As monitoring messages (sent because of monitoring request or
+   * because of monitoring publisher) have the same content, this is also ok
+   * to mark ok the server when the monitoring message coms from a monitoring
+   * publisher thread.
+   * @param globalServerId The server handler that is receiving the
+   * monitoring message.
    */
-  public void responseReceived()
+  public void responseReceived(GlobalServerId globalServerId)
   {
-    remoteMonitorResponsesSemaphore.release();
+    expectedMonitoringMsg.remove(globalServerId);
   }
 
 
@@ -1513,7 +1679,7 @@
    */
   public void responseReceivedAll()
   {
-    remoteMonitorResponsesSemaphore.notifyAll();
+    expectedMonitoringMsg.clear();
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4ef072c..ab1da75 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
 import org.opends.server.types.ResultCode;
 
 import com.sleepycat.je.DatabaseException;
+import org.opends.server.replication.server.
+  ReplicationServer.GlobalServerId;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
   // late or not
   private StatusAnalyzer statusAnalyzer = null;
 
+  // The monitoring publisher that periodically sends monitoring messages to the
+  // topology
+  private MonitoringPublisher monitoringPublisher = null;
+
   /*
    * The following map contains one balanced tree for each replica ID
    * to which we are currently publishing
@@ -1066,6 +1072,17 @@
           // Try doing job anyway...
         }
 
+        // Stop useless monitoring publisher if no more RS or DS in domain
+        if ( (directoryServers.size() + replicationServers.size() )== 1)
+        {
+          if (debugEnabled())
+            TRACER.debugInfo("In " +
+              replicationServer.getMonitorInstanceName() +
+              " remote server " + handler.getMonitorInstanceName() + " is " +
+              "the last RS/DS to be stopped: stopping monitoring publisher");
+          stopMonitoringPublisher();
+        }
+
         if (handler.isReplicationServer())
         {
           if (replicationServers.containsValue(handler))
@@ -1082,44 +1099,39 @@
               buildAndSendTopoInfoToDSs(null);
             }
           }
-        } else
+        } else if (directoryServers.containsValue(handler))
         {
-          if (directoryServers.containsValue(handler))
+          // If this is the last DS for the domain,
+          // shutdown the status analyzer
+          if (directoryServers.size() == 1)
           {
-            // If this is the last DS for the domain,
-            // shutdown the status analyzer
-            if (directoryServers.size() == 1)
-            {
-              if (debugEnabled())
-                TRACER.debugInfo("In " +
-                    replicationServer.getMonitorInstanceName() +
-                    " remote server " + handler.getMonitorInstanceName() +
+            if (debugEnabled())
+              TRACER.debugInfo("In " +
+                replicationServer.getMonitorInstanceName() +
+                " remote server " + handler.getMonitorInstanceName() +
                 " is the last DS to be stopped: stopping status analyzer");
-              stopStatusAnalyzer();
-            }
+            stopStatusAnalyzer();
+          }
 
-            unregisterServerHandler(handler);
-            handler.shutdown();
+          unregisterServerHandler(handler);
+          handler.shutdown();
 
-            // Check if generation id has to be reset
-            mayResetGenerationId();
+          // Check if generation id has to be reset
+          mayResetGenerationId();
+          if (!shutdown)
+          {
             // Update the remote replication servers with our list
             // of connected LDAP servers
-            if (!shutdown)
-            {
-              buildAndSendTopoInfoToRSs();
-              // Warn our DSs that a RS or DS has quit (does not use this
-              // handler as already removed from list)
-              buildAndSendTopoInfoToDSs(null);
-            }
+            buildAndSendTopoInfoToRSs();
+            // Warn our DSs that a RS or DS has quit (does not use this
+            // handler as already removed from list)
+            buildAndSendTopoInfoToDSs(null);
           }
-          else if (otherHandlers.contains(handler))
-          {
-            unRegisterHandler(handler);
-            handler.shutdown();
-          }
+        } else if (otherHandlers.contains(handler))
+        {
+          unRegisterHandler(handler);
+          handler.shutdown();
         }
-
       }
       catch(Exception e)
       {
@@ -1581,99 +1593,51 @@
         // in the topology.
         if (senderHandler.isDataServer())
         {
-          MonitorMsg returnMsg =
-            new MonitorMsg(msg.getDestination(), msg.getsenderID());
+          // Monitoring information requested by a DS
+          MonitorMsg monitorMsg =
+            createGlobalTopologyMonitorMsg(msg.getDestination(),
+            msg.getsenderID());
 
-          try
+           if (monitorMsg != null)
           {
-            returnMsg.setReplServerDbState(getDbServerState());
-            // Update the information we have about all servers
-            // in the topology.
-            MonitorData md = computeMonitorData();
-
-            // Add the informations about the Replicas currently in
-            // the topology.
-            Iterator<Integer> it = md.ldapIterator();
-            while (it.hasNext())
+            try
             {
-              int replicaId = it.next();
-              returnMsg.setServerState(
-                  replicaId, md.getLDAPServerState(replicaId),
-                  md.getApproxFirstMissingDate(replicaId), true);
-            }
-
-            // Add the informations about the Replication Servers
-            // currently in the topology.
-            it = md.rsIterator();
-            while (it.hasNext())
+              senderHandler.send(monitorMsg);
+            } catch (IOException e)
             {
-              int replicaId = it.next();
-              returnMsg.setServerState(
-                  replicaId, md.getRSStates(replicaId),
-                  md.getRSApproxFirstMissingDate(replicaId), false);
+              // the connection was closed.
             }
           }
-          catch (DirectoryException e)
-          {
-            // If we can't compute the Monitor Information, send
-            // back an empty message.
-          }
-          try
-          {
-            senderHandler.send(returnMsg);
-          } catch (IOException e)
-          {
-            // the connection was closed.
-          }
           return;
-        }
-
-        MonitorMsg monitorMsg =
-          new MonitorMsg(msg.getDestination(), msg.getsenderID());
-
-        // Populate for each connected LDAP Server
-        // from the states stored in the serverHandler.
-        // - the server state
-        // - the older missing change
-        for (DataServerHandler lsh : this.directoryServers.values())
+        } else
         {
-          monitorMsg.setServerState(
-            lsh.getServerId(),
-            lsh.getServerState(),
-            lsh.getApproxFirstMissingDate(),
-            true);
-        }
+          // Monitoring information requested by a RS
+          MonitorMsg monitorMsg =
+            createLocalTopologyMonitorMsg(msg.getDestination(),
+            msg.getsenderID());
 
-        // Same for the connected RS
-        for (ReplicationServerHandler rsh : this.replicationServers.values())
-        {
-          monitorMsg.setServerState(
-            rsh.getServerId(),
-            rsh.getServerState(),
-            rsh.getApproxFirstMissingDate(),
-            false);
-        }
-
-        // Populate the RS state in the msg from the DbState
-        monitorMsg.setReplServerDbState(this.getDbServerState());
-
-
-        try
-        {
-          senderHandler.send(monitorMsg);
-        } catch (Exception e)
-        {
-          // We log the error. The requestor will detect a timeout or
-          // any other failure on the connection.
-          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
-              Integer.toString((msg.getDestination()))));
+          if (monitorMsg != null)
+          {
+            try
+            {
+              senderHandler.send(monitorMsg);
+            } catch (Exception e)
+            {
+              // We log the error. The requestor will detect a timeout or
+              // any other failure on the connection.
+              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
+                  Integer.toString((msg.getDestination()))));
+            }
+          }
         }
       } else if (msg instanceof MonitorMsg)
       {
         MonitorMsg monitorMsg =
           (MonitorMsg) msg;
 
-        receivesMonitorDataResponse(monitorMsg);
+        GlobalServerId globalServerId =
+          new GlobalServerId(baseDn, senderHandler.getServerId());
+        receivesMonitorDataResponse(monitorMsg, globalServerId);
       } else
       {
         logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
   }
 
   /**
+   * Creates a new monitor message including monitoring information for the
+   * whole topology.
+   * @param sender The sender of this message.
+   * @param destination The destination of this message.
+   * @return The newly created and filled MonitorMsg. Null if a problem occurred
+   * during message creation.
+   */
+  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
+  {
+    MonitorMsg returnMsg =
+      new MonitorMsg(sender, destination);
+
+    try
+    {
+      returnMsg.setReplServerDbState(getDbServerState());
+      // Update the information we have about all servers
+      // in the topology.
+      MonitorData md = computeMonitorData();
+
+      // Add the informations about the Replicas currently in
+      // the topology.
+      Iterator<Integer> it = md.ldapIterator();
+      while (it.hasNext())
+      {
+        int replicaId = it.next();
+        returnMsg.setServerState(
+            replicaId, md.getLDAPServerState(replicaId),
+            md.getApproxFirstMissingDate(replicaId), true);
+      }
+
+      // Add the informations about the Replication Servers
+      // currently in the topology.
+      it = md.rsIterator();
+      while (it.hasNext())
+      {
+        int replicaId = it.next();
+        returnMsg.setServerState(
+            replicaId, md.getRSStates(replicaId),
+            md.getRSApproxFirstMissingDate(replicaId), false);
+      }
+    }
+    catch (DirectoryException e)
+    {
+      // If we can't compute the Monitor Information, send
+      // back an empty message.
+    }
+    return returnMsg;
+  }
+
+  /**
+   * Creates a new monitor message including monitoring information for the
+   * topology directly connected to this RS. This includes information for:
+   * - local RS
+   * - all direct DSs
+   * - all direct RSs
+   * @param sender The sender of this message.
+   * @param destination The destination of this message.
+   * @return The newly created and filled MonitorMsg. Null if a problem occurred
+   * during message creation.
+   */
+  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
+  {
+    MonitorMsg monitorMsg = null;
+
+    try {
+
+      // Lock domain as we need to go through connected servers list
+      lock();
+
+      monitorMsg = new MonitorMsg(sender, destination);
+
+
+      // Populate for each connected LDAP Server
+      // from the states stored in the serverHandler.
+      // - the server state
+      // - the older missing change
+      for (DataServerHandler lsh : this.directoryServers.values())
+      {
+        monitorMsg.setServerState(
+          lsh.getServerId(),
+          lsh.getServerState(),
+          lsh.getApproxFirstMissingDate(),
+          true);
+      }
+
+      // Same for the connected RS
+      for (ReplicationServerHandler rsh : this.replicationServers.values())
+      {
+        monitorMsg.setServerState(
+          rsh.getServerId(),
+          rsh.getServerState(),
+          rsh.getApproxFirstMissingDate(),
+          false);
+      }
+
+      // Populate the RS state in the msg from the DbState
+      monitorMsg.setReplServerDbState(this.getDbServerState());
+    } catch(InterruptedException e)
+    {
+      // At lock, too bad...
+    } finally
+    {
+      if (hasLock())
+        release();
+    }
+
+    return monitorMsg;
+  }
+
+  /**
    * Shutdown this ReplicationServerDomain.
    */
   public void shutdown()
@@ -1831,8 +1905,7 @@
 
   /**
    * Send a TopologyMsg to all the connected directory servers in order to
-   * let.
-   * them know the topology (every known DSs and RSs)
+   * let them know the topology (every known DSs and RSs).
    * @param notThisOne If not null, the topology message will not be sent to
    * this passed server.
    */
@@ -1931,10 +2004,11 @@
       dsInfos.add(serverHandler.toDSInfo());
     }
 
-    // Create info for us (local RS)
+    // Create info for the local RS
     List<RSInfo> rsInfos = new ArrayList<RSInfo>();
     RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
-      generationId, replicationServer.getGroupId());
+      generationId, replicationServer.getGroupId(),
+      replicationServer.getWeight());
     rsInfos.add(localRSInfo);
 
     return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
 
     // Add our own info (local RS)
     RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
-      generationId, replicationServer.getGroupId());
+      generationId, replicationServer.getGroupId(),
+      replicationServer.getWeight());
     rsInfos.add(localRSInfo);
 
     // Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
    * Start collecting global monitoring information for this
    * ReplicationServerDomain.
    *
-   * @return The number of response that should come back.
+   * @param expectedMonitoringMsg The list of server handler we have to wait a
+   * monitoring message from. Will be filled as necessary by this method.
    *
    * @throws DirectoryException In case the monitoring information could
    *                            not be collected.
    */
 
-  int initializeMonitorData() throws DirectoryException
+  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
+    throws DirectoryException
   {
     synchronized (monitorDataLock)
     {
@@ -2539,7 +2616,7 @@
     }
 
     // Send the request for remote monitor data to the
-    return sendMonitorDataRequest();
+    sendMonitorDataRequest(expectedMonitoringMsg);
   }
 
   /**
@@ -2566,22 +2643,25 @@
 
   /**
    * Sends a MonitorRequest message to all connected RS.
-   * @return the number of requests sent.
+   * @param expectedMonitoringMsg The list of server handler we have to wait a
+   * monitoring message from. Will be filled as necessary by this method.
    * @throws DirectoryException when a problem occurs.
    */
-  protected int sendMonitorDataRequest()
+  protected void sendMonitorDataRequest(
+    List<GlobalServerId> expectedMonitoringMsg)
     throws DirectoryException
   {
-    int sent = 0;
     try
     {
       for (ServerHandler rs : replicationServers.values())
       {
+        int serverId = rs.getServerId();
         MonitorRequestMsg msg =
           new MonitorRequestMsg(this.replicationServer.getServerId(),
-          rs.getServerId());
+          serverId);
         rs.send(msg);
-        sent++;
+        // Store the fact that we expect a MonitoringMsg back from this server
+        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
       }
     } catch (Exception e)
     {
@@ -2590,7 +2670,6 @@
       throw new DirectoryException(ResultCode.OTHER,
         message, e);
     }
-    return sent;
   }
 
   /**
@@ -2598,8 +2677,10 @@
    * and stores the data received.
    *
    * @param msg The message to be processed.
+   * @param globalServerHandlerId server handler that is receiving the message.
    */
-  public void receivesMonitorDataResponse(MonitorMsg msg)
+  private void receivesMonitorDataResponse(MonitorMsg msg,
+    GlobalServerId globalServerId)
   {
     try
     {
@@ -2677,7 +2758,7 @@
 
       // Decreases the number of expected responses and potentially
       // wakes up the waiting requestor thread.
-      replicationServer.responseReceived();
+      replicationServer.responseReceived(globalServerId);
 
     } catch (Exception e)
     {
@@ -2832,6 +2913,57 @@
   }
 
   /**
+   * Starts the monitoring publisher for the domain.
+   */
+  public void startMonitoringPublisher()
+  {
+    if (monitoringPublisher == null)
+    {
+      long period =
+        replicationServer.getMonitoringPublisherPeriod();
+      if (period > 0) // 0 means no monitoring publisher
+      {
+        monitoringPublisher = new MonitoringPublisher(this, period);
+        monitoringPublisher.start();
+      }
+    }
+  }
+
+  /**
+   * Stops the monitoring publisher for the domain.
+   */
+  public void stopMonitoringPublisher()
+  {
+    if (monitoringPublisher != null)
+    {
+      monitoringPublisher.shutdown();
+      monitoringPublisher.waitForShutdown();
+      monitoringPublisher = null;
+    }
+  }
+
+  /**
+   * Tests if the monitoring publisher for this domain is running.
+   * @return True if the monitoring publisher is running, false otherwise.
+   */
+  public boolean isRunningMonitoringPublisher()
+  {
+    return (monitoringPublisher != null);
+  }
+
+  /**
+   * Update the monitoring publisher with the new period value.
+   * @param period The new period value.
+   */
+  public void updateMonitoringPublisher(long period)
+  {
+    if (monitoringPublisher != null)
+    {
+      monitoringPublisher.setPeriod(period);
+    }
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 91015f3..42b255c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -240,6 +240,9 @@
 
         logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
 
+        // Create the monitoring publisher for the domain if not already started
+        createMonitoringPublisher();
+
         // FIXME: i think this should be done for all protocol version !!
         // not only those > V1
         registerIntoDomain();
@@ -408,6 +411,10 @@
         // other servers.
       }
 
+
+      // Create the monitoring publisher for the domain if not already started
+      createMonitoringPublisher();
+
       registerIntoDomain();
 
       // Process TopologyMsg sent by remote RS: store matching new info
@@ -497,7 +504,18 @@
     // Remote RS sent his topo msg
     TopologyMsg inTopoMsg = (TopologyMsg) msg;
 
-    // CONNECTION WITH A RS
+    // Store remore RS weight if it has one
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+      // List should only contain RS info for sender
+      RSInfo rsInfo = inTopoMsg.getRsList().get(0);
+      weight = rsInfo.getWeight();
+    }
+    else
+    {
+      // Remote RS uses protocol version prior to 4 : use default value for
+      // weight: 1
+    }
 
     // if the remote RS and the local RS have the same genID
     // then it's ok and nothing else to do
@@ -646,6 +664,7 @@
     RSInfo rsInfo = rsInfos.get(0);
     generationId = rsInfo.getGenerationId();
     groupId = rsInfo.getGroupId();
+    weight = rsInfo.getWeight();
 
     /**
      * Store info for DSs connected to the peer RS
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 22adad3..a0ebd0c 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -244,6 +244,10 @@
    */
   private AtomicBoolean shuttingDown = new AtomicBoolean(false);
 
+  /**
+   * Weight of this remote server.
+   */
+  protected int weight = 1;
 
   /**
    * Creates a new server handler instance with the provided socket.
@@ -1215,12 +1219,23 @@
    */
   public RSInfo toRSInfo()
   {
-    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight);
 
     return rsInfo;
   }
 
   /**
+   * Starts the monitoring publisher for the domain if not already started.
+   */
+  protected void createMonitoringPublisher()
+  {
+    if (!replicationServerDomain.isRunningMonitoringPublisher())
+    {
+      replicationServerDomain.startMonitoringPublisher();
+    }
+  }
+
+  /**
    * Performs any processing periodic processing that may be desired to update
    * the information associated with this monitor.  Note that best-effort
    * attempts will be made to ensure that calls to this method come
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 983cffa..86e858b 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.MutableBoolean;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.ChangeStatusMsg;
 import org.opends.server.replication.protocol.HeartbeatMonitor;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
   private ReplicationDomain domain = null;
 
   /**
+   * This object is used as a conditional event to be notified about
+   * the reception of monitor information from the Replication Server.
+   */
+  private final MutableBoolean monitorResponse = new MutableBoolean(false);
+
+  /**
+   * A Map containing the ServerStates of all the replicas in the topology
+   * as seen by the ReplicationServer the last time it was polled or the last
+   * time it published monitoring information.
+   */
+  private HashMap<Integer, ServerState> replicaStates =
+    new HashMap<Integer, ServerState>();
+
+  /**
+   * A Map containing the ServerStates of all the replication servers in the
+   * topology as seen by the ReplicationServer the last time it was polled or
+   * the last time it published monitoring information.
+   */
+  private HashMap<Integer, ServerState> rsStates =
+    new HashMap<Integer, ServerState>();
+
+  /**
    * The expected duration in milliseconds between heartbeats received
    * from the replication server.  Zero means heartbeats are off.
    */
@@ -1918,6 +1943,37 @@
           // Try to find a suitable RS
           this.reStart(failingSession);
         }
+        else if (msg instanceof MonitorMsg)
+        {
+          // This is the response to a MonitorRequest that was sent earlier or
+          // the regular message of the monitoring publisher of the RS.
+
+          // Extract and store replicas ServerStates
+          replicaStates = new HashMap<Integer, ServerState>();
+          MonitorMsg monitorMsg = (MonitorMsg) msg;
+          Iterator<Integer> it = monitorMsg.ldapIterator();
+          while (it.hasNext())
+          {
+            int srvId = it.next();
+            replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
+          }
+
+          // Notify the sender that the response was received.
+          synchronized (monitorResponse)
+          {
+            monitorResponse.set(true);
+            monitorResponse.notify();
+          }
+
+          // Extract and store replication servers ServerStates
+          rsStates = new HashMap<Integer, ServerState>();
+          it = monitorMsg.rsIterator();
+          while (it.hasNext())
+          {
+            int srvId = it.next();
+            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
+          }
+        }
         else
         {
           return msg;
@@ -1949,6 +2005,40 @@
   }
 
   /**
+   * Gets the States of all the Replicas currently in the
+   * Topology.
+   * When this method is called, a Monitoring message will be sent
+   * to the Replication Server to which this domain is currently connected
+   * so that it computes a table containing information about
+   * all Directory Servers in the topology.
+   * This Computation involves communications will all the servers
+   * currently connected and
+   *
+   * @return The States of all Replicas in the topology (except us)
+   */
+  public Map<Integer, ServerState> getReplicaStates()
+  {
+    monitorResponse.set(false);
+
+    // publish Monitor Request Message to the Replication Server
+    publish(new MonitorRequestMsg(serverId, getRsServerId()));
+
+    // wait for Response up to 10 seconds.
+    try
+    {
+      synchronized (monitorResponse)
+      {
+        if (monitorResponse.get() == false)
+        {
+          monitorResponse.wait(10000);
+        }
+      }
+    } catch (InterruptedException e)
+    {}
+    return replicaStates;
+  }
+
+  /**
    * This method allows to do the necessary computing for the window
    * management after treatment by the worker threads.
    *
@@ -2440,7 +2530,7 @@
     {
       ctHeartbeatPublisherThread =
         new CTHeartbeatPublisherThread(
-            "Replication CN Heartbeat Thread started for " +
+            "Replication CN Heartbeat sender for " +
             baseDn + " with " + getReplicationServer(),
             session, changeTimeHeartbeatSendInterval, serverId);
       ctHeartbeatPublisherThread.start();
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 992fe7c..1f8a7e1 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -65,7 +65,6 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -79,7 +78,6 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.replication.common.MutableBoolean;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachine;
@@ -92,8 +90,6 @@
 import org.opends.server.replication.protocol.HeartbeatMsg;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.MonitorMsg;
-import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -306,20 +302,6 @@
    */
   private final ChangeNumberGenerator generator;
 
-  /**
-   * This object is used as a conditional event to be notified about
-   * the reception of monitor information from the Replication Server.
-   */
-  private final MutableBoolean monitorResponse = new MutableBoolean(false);
-
-
-  /**
-   * A Map containing of the ServerStates of all the replicas in the topology
-   * as seen by the ReplicationServer the last time it was polled.
-   */
-  private HashMap<Integer, ServerState> replicaStates =
-    new HashMap<Integer, ServerState>();
-
   Set<String> cfgEclIncludes = new HashSet<String>();
   Set<String>    eClIncludes = new HashSet<String>();
 
@@ -586,24 +568,7 @@
    */
   public Map<Integer, ServerState> getReplicaStates()
   {
-    monitorResponse.set(false);
-
-    // publish Monitor Request Message to the Replication Server
-    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
-
-    // wait for Response up to 10 seconds.
-    try
-    {
-      synchronized (monitorResponse)
-      {
-        if (monitorResponse.get() == false)
-        {
-          monitorResponse.wait(10000);
-        }
-      }
-    } catch (InterruptedException e)
-    {}
-    return replicaStates;
+    return broker.getReplicaStates();
   }
 
   /**
@@ -834,26 +799,6 @@
           update = (UpdateMsg) msg;
           generator.adjust(update.getChangeNumber());
         }
-        else if (msg instanceof MonitorMsg)
-        {
-          // This is the response to a MonitorRequest that was sent earlier
-          // build the replicaStates Map.
-          replicaStates = new HashMap<Integer, ServerState>();
-          MonitorMsg monitorMsg = (MonitorMsg) msg;
-          Iterator<Integer> it = monitorMsg.ldapIterator();
-          while (it.hasNext())
-          {
-            int serverId = it.next();
-            replicaStates.put(
-                serverId, monitorMsg.getLDAPServerState(serverId));
-          }
-          // Notify the sender that the response was received.
-          synchronized (monitorResponse)
-          {
-            monitorResponse.set(true);
-            monitorResponse.notify();
-          }
-        }
       }
       catch (SocketTimeoutException e)
       {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index d43eb5d..bdcbebc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -27,7 +27,6 @@
 package org.opends.server.replication;
 
 import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
-import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
@@ -37,7 +36,6 @@
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-import static org.opends.server.loggers.ErrorLogger.logError;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
@@ -56,13 +54,9 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 
-import org.opends.messages.Category;
-import org.opends.messages.Message;
-import org.opends.messages.Severity;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.api.Backend;
 import org.opends.server.api.ConnectionHandler;
-import org.opends.server.api.SynchronizationProvider;
 import org.opends.server.backends.MemoryBackend;
 import org.opends.server.config.ConfigException;
 import org.opends.server.controls.ExternalChangelogRequestControl;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index 964e464..7dd2c41 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -784,6 +784,8 @@
       catch(SocketTimeoutException e)
       {
         // This is the expected result
+        // Note that timeout should be lower than RS montoring publisher period
+        // so that timeout occurs
       }
 
       //===========================================================
@@ -889,49 +891,21 @@
 
       // Broker 2 and 3 should receive 1 change status message to order them
       // to enter the bad gen id status
-      try
+      ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2,
+        ChangeStatusMsg.class.getName());
+      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
       {
-        ReplicationMsg msg = broker2.receive();
-        if (!(msg instanceof ChangeStatusMsg))
-        {
-          fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
-            " to enter the bad gen id status"
-              + msg);
-        }
-        ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
-        if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
-        {
-          fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
-            " to enter the bad gen id status"
-              + msg);
-        }
+        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
+          " to enter the bad gen id status"
+            + csMsg);
       }
-      catch(SocketTimeoutException se)
+      csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3,
+        ChangeStatusMsg.class.getName());
+      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
       {
-        fail("DS2 is expected to receive 1 ChangeStatusMsg to enter the " +
-          "bad gen id status.");
-      }
-      try
-      {
-        ReplicationMsg msg = broker3.receive();
-        if (!(msg instanceof ChangeStatusMsg))
-        {
-          fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
-            " to enter the bad gen id status"
-              + msg);
-        }
-        ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
-        if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
-        {
-          fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
-            " to enter the bad gen id status"
-              + msg);
-        }
-      }
-      catch(SocketTimeoutException se)
-      {
-        fail("DS3 is expected to receive 1 ChangeStatusMsg to enter the " +
-          "bad gen id status.");
+        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
+          " to enter the bad gen id status"
+            + csMsg);
       }
 
       debugInfo("DS1 root entry must contain the new gen ID");
@@ -988,7 +962,8 @@
 
 
       debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it.");
-      broker2.publish(createAddMsg());
+      AddMsg emsg = (AddMsg)createAddMsg();
+      broker2.publish(emsg);
 
       // Updates count in RS1 must stay unchanged = to 1
       Thread.sleep(500);
@@ -1060,8 +1035,30 @@
           isDegradedDueToGenerationId(server3ID),
       "Expecting that DS3 is not in bad gen id from RS1");
 
+      debugInfo("Verify that DS2 receives the add message stored in RS1 DB");
+      try
+      {
+        ReplicationMsg msg = broker2.receive();
+        assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
+      }
+      catch(SocketTimeoutException e)
+      {
+        fail("The msg stored in RS1 DB is expected to be received by DS2)");
+      }
+
+      debugInfo("Verify that DS3 receives the add message stored in RS1 DB");
+      try
+      {
+        ReplicationMsg msg = broker3.receive();
+        assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
+      }
+      catch(SocketTimeoutException e)
+      {
+        fail("The msg stored in RS1 DB is expected to be received by DS3)");
+      }
+
       debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it.");
-      AddMsg emsg = (AddMsg)createAddMsg();
+      emsg = (AddMsg)createAddMsg();
       broker2.publish(emsg);
 
       Thread.sleep(500);
@@ -1105,7 +1102,7 @@
    * The following test focus on:
    * - genId checking across multiple starting RS (replication servers)
    * - genId setting propagation from one RS to the others
-   * - genId reset   propagation from one RS to the others
+   * - genId reset propagation from one RS to the others
    */
   @Test(enabled=false)
   public void testMultiRS() throws Exception
@@ -1190,7 +1187,7 @@
       assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId);
       assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId);
 
-      debugInfo("Connecting broker2 to replServer1 with a bad genId");
+      debugInfo("Connecting broker3 to replServer1 with a bad genId");
       try
       {
         long badgenId = 1;
@@ -1215,7 +1212,7 @@
 
       debugInfo("Connecting DS to replServer1.");
       connectServer1ToChangelog(changelog1ID);
-      Thread.sleep(1000);
+      Thread.sleep(3000);
 
 
       debugInfo("Adding reset task to DS.");
@@ -1373,7 +1370,7 @@
 
   /**
    * Loop opening sessions to the Replication Server
-   * to check that it handle correctly deconnection and reconnection.
+   * to check that it handle correctly disconnection and reconnection.
    */
   @Test(enabled=false, groups="slow")
   public void testLoop() throws Exception
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 9e1aae1..161679c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
 import org.opends.messages.Severity;
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
@@ -65,9 +64,10 @@
 import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.plugin.PersistentServerState;
-import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.schema.DirectoryStringSyntax;
@@ -260,34 +260,6 @@
       broker.setSoTimeout(timeout);
     checkConnection(30, broker, port); // give some time to the broker to connect
                                        // to the replicationServer.
-    if (emptyOldChanges)
-    {
-      /*
-       * loop receiving update until there is nothing left
-       * to make sure that message from previous tests have been consumed.
-       */
-      try
-      {
-        while (true)
-        {
-          ReplicationMsg rMsg = broker.receive();
-          if (rMsg instanceof ErrorMsg)
-          {
-            ErrorMsg eMsg = (ErrorMsg)rMsg;
-            logError(new MessageBuilder(
-                "ReplicationTestCase/openReplicationSession ").append(
-                " received ErrorMessage when emptying old changes ").append(
-                eMsg.getDetails()).toMessage());
-          }
-        }
-      }
-      catch (Exception e)
-      {
-        logError(new MessageBuilder(
-            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
-            .append(" when emptying old changes").toMessage());
-      }
-    }
     return broker;
   }
 
@@ -313,32 +285,6 @@
       broker.setSoTimeout(timeout);
     checkConnection(30, broker, port); // give some time to the broker to connect
                                        // to the replicationServer.
-    if (emptyOldChanges)
-    {
-      // loop receiving update until there is nothing left
-      // to make sure that message from previous tests have been consumed.
-      try
-      {
-        while (true)
-        {
-          ReplicationMsg rMsg = broker.receive();
-          if (rMsg instanceof ErrorMsg)
-          {
-            ErrorMsg eMsg = (ErrorMsg)rMsg;
-            logError(new MessageBuilder(
-                "ReplicationTestCase/openReplicationSession ").append(
-                " received ErrorMessage when emptying old changes ").append(
-                eMsg.getDetails()).toMessage());
-          }
-        }
-      }
-      catch (Exception e)
-      {
-        logError(new MessageBuilder(
-            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
-            .append(" when emptying old changes").toMessage());
-      }
-    }
     return broker;
   }
   */
@@ -435,17 +381,6 @@
       boolean emptyOldChanges)
       throws Exception, SocketException
   {
-    return openReplicationSession(baseDn, serverId, window_size,
-        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
-        getGenerationId(baseDn));
-  }
-
-  protected ReplicationBroker openReplicationSession(
-      final DN baseDn, int serverId, int window_size,
-        int port, int timeout, int maxSendQueue, int maxRcvQueue,
-        boolean emptyOldChanges, long generationId)
-            throws Exception, SocketException
-  {
     ServerState state = new ServerState();
 
     if (emptyOldChanges)
@@ -453,37 +388,13 @@
 
     ReplicationBroker broker = new ReplicationBroker(null,
         state, baseDn.toNormalizedString(), serverId, window_size,
-        generationId, 0, getReplSessionSecurity(), (byte)1, 500);
+        getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500);
     ArrayList<String> servers = new ArrayList<String>(1);
     servers.add("localhost:" + port);
     broker.start(servers);
     checkConnection(30, broker, port);
     if (timeout != 0)
       broker.setSoTimeout(timeout);
-    if (emptyOldChanges)
-    {
-      /*
-       * loop receiving update until there is nothing left
-       * to make sure that message from previous tests have been consumed.
-       */
-      try
-      {
-        while (true)
-        {
-          ReplicationMsg rMsg = broker.receive();
-          if (rMsg instanceof ErrorMsg)
-          {
-            ErrorMsg eMsg = (ErrorMsg)rMsg;
-            logError(new MessageBuilder(
-                "ReplicationTestCase/openReplicationSession ").append(
-                " received ErrorMessage when emptying old changes ").append(
-                eMsg.getDetails()).toMessage());
-          }
-        }
-      }
-      catch (Exception e)
-      { }
-    }
     return broker;
   }
 
@@ -575,11 +486,14 @@
     logError(Message.raw(Category.SYNC, Severity.NOTICE,
       " ##### Calling ReplicationTestCase.classCleanUp ##### "));
 
+    // Clean RS databases
+    cleanUpReplicationServersDB();
+
     cleanConfigEntries();
-    configEntryList = null;
+    configEntryList = new LinkedList<DN>();
 
     cleanRealEntries();
-    entryList = null;
+    entryList = new LinkedList<DN>();
 
     // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
     // (in case our test created some emtries in it)
@@ -631,6 +545,10 @@
     assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)",
       "Found unexpected replication server config left");
 
+    // Be sure that no replication server instance is left
+    List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
+    assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
+
     // Check for config entries for replication domain
     assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
       "Found unexpected replication domain config left");
@@ -648,6 +566,17 @@
   }
 
   /**
+   * Cleanup databases of the currently instantiated replication servers in the
+   * VM
+   */
+  protected void cleanUpReplicationServersDB() {
+
+    for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
+      rs.clearDb();
+    }
+  }
+
+  /**
    * Performs a search on the config backend with the specified filter.
    * Fails if a config entry is found.
    * @param filter The filter to apply for the search
@@ -1266,4 +1195,90 @@
       // done
     }
   }
+
+  /**
+   * Wait for the arrival of a specific message type on the provided session
+   * before going in timeout and failing.
+   * @param session Session from which we should receive the message.
+   * @param msgType Class of the message we are waiting for.
+   * @return The expected message if it comes in time or fails (assertion).
+   */
+  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
+
+    ReplicationMsg replMsg = null;
+
+    int timeOut = 5000; // 5 seconds max to wait for the desired message
+    long startTime = System.currentTimeMillis();
+    long curTime = startTime;
+    int nMsg = 0;
+    while ((curTime - startTime) <= timeOut)
+    {
+      try
+      {
+        replMsg = session.receive();
+      } catch (Exception ex)
+      {
+        fail("Exception waiting for " + msgType + " message : " +
+          ex.getClass().getName()  + " : " + ex.getMessage());
+      }
+      // Get message type
+      String rcvMsgType = replMsg.getClass().getName();
+      if (rcvMsgType.equals(msgType))
+      {
+        // Ok, got it, let's return the expected message
+        return replMsg;
+      }
+      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
+      nMsg++;
+      curTime = System.currentTimeMillis();
+    }
+    // Timeout
+    fail("Failed to receive an expected " + msgType +
+      " message after 5 seconds : also received " + nMsg +
+      " other messages during wait time.");
+    return null;
+  }
+
+  /**
+   * Wait for the arrival of a specific message type on the provided broker
+   * before going in timeout and failing.
+   * @param broker Broker from which we should receive the message.
+   * @param msgType Class of the message we are waiting for.
+   * @return The expected message if it comes in time or fails (assertion).
+   */
+  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
+
+    ReplicationMsg replMsg = null;
+
+    int timeOut = 5000; // 5 seconds max to wait for the desired message
+    long startTime = System.currentTimeMillis();
+    long curTime = startTime;
+    int nMsg = 0;
+    while ((curTime - startTime) <= timeOut)
+    {
+      try
+      {
+        replMsg = broker.receive();
+      } catch (Exception ex)
+      {
+        fail("Exception waiting for " + msgType + " message : " +
+          ex.getClass().getName()  + " : " + ex.getMessage());
+      }
+      // Get message type
+      String rcvMsgType = replMsg.getClass().getName();
+      if (rcvMsgType.equals(msgType))
+      {
+        // Ok, got it, let's return the expected message
+        return replMsg;
+      }
+      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
+      nMsg++;
+      curTime = System.currentTimeMillis();
+    }
+    // Timeout
+    fail("Failed to receive an expected " + msgType +
+      " message after 5 seconds : also received " + nMsg +
+      " other messages during wait time.");
+    return null;
+  }
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
index 4a4cf4f..51ce7ad 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -128,6 +128,8 @@
     logError(Message.raw(Category.SYNC, Severity.NOTICE,
         "Starting replication test : pushSchemaChange "));
 
+    cleanUpReplicationServersDB();
+
     final DN baseDn = DN.decode("cn=schema");
 
     ReplicationBroker broker =
@@ -216,6 +218,8 @@
     logError(Message.raw(Category.SYNC, Severity.NOTICE,
         "Starting replication test : replaySchemaChange "));
 
+    cleanUpReplicationServersDB();
+
     final DN baseDn = DN.decode("cn=schema");
 
     ReplicationBroker broker =
@@ -253,6 +257,8 @@
     logError(Message.raw(Category.SYNC, Severity.NOTICE,
         "Starting replication test : pushSchemaFilesChange "));
 
+    cleanUpReplicationServersDB();
+
     final DN baseDn = DN.decode("cn=schema");
 
     ReplicationBroker broker =
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
index 884fa51..e98164b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -67,6 +67,7 @@
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.schema.DirectoryStringSyntax;
 import org.opends.server.types.*;
+import org.opends.server.util.StaticUtils;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -300,6 +301,9 @@
     logError(Message.raw(Category.SYNC, Severity.INFORMATION,
         "Starting synchronization test : toggleReceiveStatus"));
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
 
     /*
@@ -379,6 +383,9 @@
     logError(Message.raw(Category.SYNC, Severity.INFORMATION,
         "Starting replication test : lostHeartbeatFailover"));
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
 
     /*
@@ -483,6 +490,9 @@
          DirectoryServer.getAttributeType("entryuuid");
     String monitorAttr = "resolved-modify-conflicts";
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     /*
      * Open a session to the replicationServer using the broker API.
      * This must use a different serverId to that of the directory server.
@@ -610,6 +620,9 @@
     String resolvedMonitorAttr = "resolved-naming-conflicts";
     String unresolvedMonitorAttr = "unresolved-naming-conflicts";
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     /*
      * Open a session to the replicationServer using the ReplicationServer broker API.
      * This must use a serverId different from the LDAP server ID
@@ -1302,6 +1315,18 @@
     return new Object[][] { { false }, {true} };
   }
 
+  private void cleanupTest() {
+    try
+    {
+      classCleanUp();
+      setUp();
+    } catch (Exception e)
+    {
+      fail("Test cleanup failed: " + e.getClass().getName() + " : " +
+        e.getMessage() + " : " + StaticUtils.stackTraceToSingleLineString(e));
+    }
+  }
+
   /**
    * Tests done using directly the ReplicationBroker interface.
    */
@@ -1312,6 +1337,9 @@
         Category.SYNC, Severity.INFORMATION,
         "Starting replication test : updateOperations " + assured));
 
+    // Cleanup from previous run
+    cleanupTest();
+
     final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
 
     ReplicationBroker broker =
@@ -1341,15 +1369,15 @@
         // Check if the client has received the msg
         ReplicationMsg msg = broker.receive();
         assertTrue(msg instanceof AddMsg,
-        "The received replication message is not an ADD msg");
+        "The received replication message is not an ADD msg : " + msg);
         AddMsg addMsg =  (AddMsg) msg;
 
         Operation receivedOp = addMsg.createOperation(connection);
         assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
-        "The received replication message is not an ADD msg");
+        "The received replication message is not an ADD msg : " + addMsg);
 
         assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
-        "The received ADD replication message is not for the excepted DN");
+        "The received ADD replication message is not for the excepted DN : " + addMsg);
       }
 
       // Modify the entry
@@ -1364,12 +1392,12 @@
       // See if the client has received the msg
       ReplicationMsg msg = broker.receive();
       assertTrue(msg instanceof ModifyMsg,
-      "The received replication message is not a MODIFY msg");
+      "The received replication message is not a MODIFY msg : " + msg);
       ModifyMsg modMsg = (ModifyMsg) msg;
 
       modMsg.createOperation(connection);
       assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
-      "The received MODIFY replication message is not for the excepted DN");
+      "The received MODIFY replication message is not for the excepted DN : " + modMsg);
 
       // Modify the entry DN
       DN newDN = DN.decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING) ;
@@ -1387,12 +1415,12 @@
       // See if the client has received the msg
       msg = broker.receive();
       assertTrue(msg instanceof ModifyDNMsg,
-      "The received replication message is not a MODIFY DN msg");
+      "The received replication message is not a MODIFY DN msg : " + msg);
       ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
       moddnMsg.createOperation(connection);
 
       assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
-      "The received MODIFY_DN message is not for the excepted DN");
+      "The received MODIFY_DN message is not for the excepted DN : " + moddnMsg);
 
       // Delete the entry
       DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
@@ -1406,12 +1434,12 @@
       // See if the client has received the msg
       msg = broker.receive();
       assertTrue(msg instanceof DeleteMsg,
-      "The received replication message is not a MODIFY DN msg");
+      "The received replication message is not a MODIFY DN msg : " + msg);
       DeleteMsg delMsg = (DeleteMsg) msg;
       delMsg.createOperation(connection);
       assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
           .decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING)) == 0,
-      "The received DELETE message is not for the excepted DN");
+      "The received DELETE message is not for the excepted DN : " + delMsg);
 
       /*
        * Now check that when we send message to the ReplicationServer
@@ -1512,6 +1540,9 @@
     logError(Message.raw(Category.SYNC, Severity.INFORMATION,
         "Starting replication test : deleteNoSuchObject"));
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     DN dn = DN.decode("cn=No Such Object,ou=People," + TEST_ROOT_DN_STRING);
     DeleteOperationBasis op =
          new DeleteOperationBasis(connection,
@@ -1535,6 +1566,9 @@
 
     final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     Thread.sleep(2000);
     ReplicationBroker broker =
       openReplicationSession(baseDn,  11, 100, replServerPort, 1000, true);
@@ -1675,6 +1709,9 @@
 
     final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
 
+    // Clean replication server database from previous run
+    cleanUpReplicationServersDB();
+
     /*
      * Open a session to the replicationServer using the broker API.
      * This must use a different serverId to that of the directory server.
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 7529f60..7ad45ce 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -501,7 +501,7 @@
 
         // Send topo view
         List<RSInfo> rsList = new ArrayList<RSInfo>();
-        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
         rsList.add(rsInfo);
         TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
           rsList);
@@ -719,7 +719,7 @@
     }
 
     /**
-     * Read the coming seaf read mode updates and send back acks with errors
+     * Read the coming safe read mode updates and send back acks with errors
      */
     private void executeSafeReadManyErrorsScenario()
     {
@@ -1058,7 +1058,7 @@
   }
 
   /**
-   * Tests parameters sent in session handshake an updates, when not using
+   * Tests parameters sent in session handshake and updates, when not using
    * assured replication
    */
   @Test
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 2744026..da567bc 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -836,7 +836,7 @@
         fail("Unknown replication server id.");
     }
 
-    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId);
+    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
   }
 
   /**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
index a293fbe..b937ea1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -1092,11 +1092,11 @@
     dsList4.add(dsInfo2);
     dsList4.add(dsInfo1);
 
-    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
+    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
 
-    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
+    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
 
-    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
+    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
 
     List<RSInfo> rsList1 = new ArrayList<RSInfo>();
     rsList1.add(rsInfo1);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 10f188b..dc67c82 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -1026,13 +1026,13 @@
     dsList4.add(dsInfo2);
     dsList4.add(dsInfo1);
 
-    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
+    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
 
-    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
+    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
 
-    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
+    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
 
-    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98);
+    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1);
 
     List<RSInfo> rsList1 = new ArrayList<RSInfo>();
     rsList1.add(rsInfo1);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index cafdede..f95550a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -29,7 +29,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.math.BigInteger;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -595,6 +594,9 @@
       ReplServerFakeConfiguration conf =
         new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
         replServers, groupId, assuredTimeout, 5000);
+      // No monitoring publisher to not interfer with some SocketTimeoutException
+      // expected at some points in these tests
+      conf.setMonitoringPeriod(0L);
       ReplicationServer replicationServer = new ReplicationServer(conf);
       return replicationServer;
 
@@ -908,7 +910,7 @@
         ReplicationMsg replMsg = session.receive();
         if (replMsg instanceof ErrorMsg)
         {
-          // Support for connection done with bad gen id : we receive an error
+        // Support for connection done with bad gen id : we receive an error
           // message that we must throw away before reading our ack.
           replMsg = session.receive();
         }
@@ -967,7 +969,7 @@
         }
 
         // Send our topo mesg
-        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
         List<RSInfo> rsInfos = new ArrayList<RSInfo>();
         rsInfos.add(rsInfo);
         TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
index d277405..50c6031 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -63,8 +63,11 @@
   // The weight of the server
   private int weight = 1;
 
+  // The monitoring publisher period
+  private long monitoringPeriod = 3000;
+
   /**
-   * Constructor without goup id, assured info and weight
+   * Constructor without group id, assured info and weight
    */
   public ReplServerFakeConfiguration(
       int port, String dirName, int purgeDelay, int serverId,
@@ -254,4 +257,17 @@
     return weight;
   }
 
+  public long getMonitoringPeriod()
+  {
+    return monitoringPeriod;
+  }
+
+  /**
+   * @param monitoringPeriod the monitoringPeriod to set
+   */
+  public void setMonitoringPeriod(long monitoringPeriod)
+  {
+    this.monitoringPeriod = monitoringPeriod;
+  }
+
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index a24799f..56801e1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -74,7 +74,6 @@
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplServerStartDSMsg;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.ServerStartMsg;
@@ -1003,7 +1002,7 @@
             ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
       session.publish(msg);
 
-      // Read the Replication Server state from the ReplServerStartMsg that
+      // Read the Replication Server state from the ReplServerStartDSMsg that
       // comes back.
       ReplServerStartDSMsg replStartDSMsg =
         (ReplServerStartDSMsg) session.receive();
@@ -1079,7 +1078,8 @@
       // check that this did not change the window by sending a probe again.
       session.publish(new WindowProbeMsg());
 
-      windowMsg = (WindowMsg) session.receive();
+      // We may receive some MonitoringMsg so use filter method
+      windowMsg = (WindowMsg)waitForSpecificMsg(session, WindowMsg.class.getName());
       assertEquals(serverwindow, windowMsg.getNumAck());
       debugInfo("Ending windowProbeTest");
     }

--
Gitblit v1.10.0